Merge remote-tracking branch 'origin/main'

# Conflicts:
#	Cargo.lock
#	amqp_transport/src/connection.rs
This commit is contained in:
nora 2022-02-21 20:41:19 +01:00
commit b67c722c19
24 changed files with 504 additions and 433 deletions

View file

@ -1,19 +1,25 @@
use crate::error::{ConException, ProtocolError, Result};
use crate::frame::{Frame, FrameType};
use crate::{frame, methods, sasl};
use amqp_core::methods::{FieldValue, Method, Table};
use amqp_core::GlobalData;
use anyhow::Context;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
use std::time::Duration;
use anyhow::Context;
use bytes::Bytes;
use smallvec::SmallVec;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use amqp_core::methods::{FieldValue, Method, Table};
use amqp_core::GlobalData;
use crate::error::{ConException, ProtocolError, Result};
use crate::frame::{ContentHeader, Frame, FrameType};
use crate::{frame, methods, sasl};
fn ensure_conn(condition: bool) -> Result<()> {
if condition {
Ok(())
@ -47,6 +53,12 @@ pub struct Connection {
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
enum WaitForBodyStatus {
Method(Method),
Header(Method, ContentHeader, SmallVec<[Bytes; 1]>),
None,
}
impl Connection {
pub fn new(
id: Uuid,
@ -95,7 +107,7 @@ impl Connection {
&Frame {
kind: FrameType::Method,
channel,
payload,
payload: payload.into(),
},
&mut self.stream,
)
@ -196,15 +208,46 @@ impl Connection {
}
async fn main_loop(&mut self) -> Result<()> {
// todo: find out how header/body frames can interleave between channels
let mut wait_for_body = WaitForBodyStatus::None;
loop {
debug!("Waiting for next frame");
let frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?;
self.reset_timeout();
match frame.kind {
FrameType::Method => self.dispatch_method(frame).await?,
FrameType::Method => wait_for_body = self.dispatch_method(frame).await?,
FrameType::Heartbeat => {}
_ => warn!(frame_type = ?frame.kind, "TODO"),
FrameType::Header => match wait_for_body {
WaitForBodyStatus::None => warn!(channel = %frame.channel, "unexpected header"),
WaitForBodyStatus::Method(method) => {
wait_for_body =
WaitForBodyStatus::Header(method, ContentHeader::new(), SmallVec::new())
}
WaitForBodyStatus::Header(_, _, _) => {
warn!(channel = %frame.channel, "already got header")
}
},
FrameType::Body => match &mut wait_for_body {
WaitForBodyStatus::None => warn!(channel = %frame.channel, "unexpected body"),
WaitForBodyStatus::Method(_) => {
warn!(channel = %frame.channel, "unexpected body")
}
WaitForBodyStatus::Header(_, header, vec) => {
vec.push(frame.payload);
match vec
.iter()
.map(Bytes::len)
.sum::<usize>()
.cmp(&usize::try_from(header.body_size).unwrap())
{
Ordering::Equal => todo!("process body"),
Ordering::Greater => todo!("too much data!"),
Ordering::Less => {} // wait for next body
}
}
},
}
}
}
@ -219,6 +262,7 @@ impl Connection {
}
Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?,
Method::ChannelClose { .. } => self.channel_close(frame.channel, method).await?,
Method::BasicPublish { .. } => return Ok(WaitForBodyStatus::Method(method)),
_ => {
let channel_handle = self
.channels
@ -235,7 +279,7 @@ impl Connection {
}
}
Ok(())
Ok(WaitForBodyStatus::None)
}
async fn channel_open(&mut self, num: u16) -> Result<()> {