diff --git a/Cargo.lock b/Cargo.lock index e1a825e..82808ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,9 @@ dependencies = [ name = "amqp_core" version = "0.1.0" dependencies = [ + "bytes", "parking_lot", + "smallvec", "uuid", ] @@ -64,6 +66,7 @@ dependencies = [ "once_cell", "rand", "regex", + "smallvec", "thiserror", "tokio", "tracing", diff --git a/amqp_core/Cargo.toml b/amqp_core/Cargo.toml index cec72f4..4a10592 100644 --- a/amqp_core/Cargo.toml +++ b/amqp_core/Cargo.toml @@ -6,5 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +bytes = "1.1.0" parking_lot = "0.12.0" +smallvec = { version = "1.8.0", features = ["union"] } uuid = "0.8.2" diff --git a/amqp_core/src/lib.rs b/amqp_core/src/lib.rs index 8dfb2f7..f92416d 100644 --- a/amqp_core/src/lib.rs +++ b/amqp_core/src/lib.rs @@ -1,5 +1,6 @@ #![warn(rust_2018_idioms)] +mod message; pub mod methods; use parking_lot::Mutex; diff --git a/amqp_core/src/message.rs b/amqp_core/src/message.rs new file mode 100644 index 0000000..eba4c8d --- /dev/null +++ b/amqp_core/src/message.rs @@ -0,0 +1,23 @@ +#![allow(dead_code)] + +use crate::methods; +use bytes::Bytes; +use smallvec::SmallVec; +use std::sync::Arc; +use uuid::Uuid; + +pub type Message = Arc; + +pub struct RawMessage { + id: Uuid, + properties: methods::Table, + routing: RoutingInformation, + content: SmallVec<[Bytes; 1]>, +} + +pub struct RoutingInformation { + pub exchange: String, + pub routing_key: String, + pub mandatory: bool, + pub immediate: bool, +} diff --git a/amqp_transport/Cargo.toml b/amqp_transport/Cargo.toml index 664ebd8..9fcae77 100644 --- a/amqp_transport/Cargo.toml +++ b/amqp_transport/Cargo.toml @@ -14,6 +14,7 @@ nom = "7.1.0" once_cell = "1.9.0" rand = "0.8.4" regex = "1.5.4" +smallvec = { version = "1.8.0", features = ["union"] } thiserror = "1.0.30" tokio = { version = "1.16.1", features = ["full"] } tracing = "0.1.30" diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 55a20e9..1a88f0b 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -1,19 +1,25 @@ -use crate::error::{ConException, ProtocolError, Result}; -use crate::frame::{Frame, FrameType}; -use crate::{frame, methods, sasl}; -use amqp_core::methods::{FieldValue, Method, Table}; -use amqp_core::GlobalData; -use anyhow::Context; +use std::cmp::Ordering; use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; use std::time::Duration; + +use anyhow::Context; +use bytes::Bytes; +use smallvec::SmallVec; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::time; use tracing::{debug, error, info, warn}; use uuid::Uuid; +use amqp_core::methods::{FieldValue, Method, Table}; +use amqp_core::GlobalData; + +use crate::error::{ConException, ProtocolError, Result}; +use crate::frame::{ContentHeader, Frame, FrameType}; +use crate::{frame, methods, sasl}; + fn ensure_conn(condition: bool) -> Result<()> { if condition { Ok(()) @@ -47,6 +53,12 @@ pub struct Connection { const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); +enum WaitForBodyStatus { + Method(Method), + Header(Method, ContentHeader, SmallVec<[Bytes; 1]>), + None, +} + impl Connection { pub fn new( id: Uuid, @@ -196,6 +208,9 @@ impl Connection { } async fn main_loop(&mut self) -> Result<()> { + // todo: find out how header/body frames can interleave between channels + let mut wait_for_body = WaitForBodyStatus::None; + loop { debug!("Waiting for next frame"); let frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?; @@ -203,14 +218,42 @@ impl Connection { self.reset_timeout(); match frame.kind { - FrameType::Method => self.dispatch_method(frame).await?, + FrameType::Method => wait_for_body = self.dispatch_method(frame).await?, FrameType::Heartbeat => {} - _ => warn!(frame_type = ?frame.kind, "TODO"), + FrameType::Header => match wait_for_body { + WaitForBodyStatus::None => warn!(channel = %frame.channel, "unexpected header"), + WaitForBodyStatus::Method(method) => { + wait_for_body = + WaitForBodyStatus::Header(method, ContentHeader::new(), SmallVec::new()) + } + WaitForBodyStatus::Header(_, _, _) => { + warn!(channel = %frame.channel, "already got header") + } + }, + FrameType::Body => match &mut wait_for_body { + WaitForBodyStatus::None => warn!(channel = %frame.channel, "unexpected body"), + WaitForBodyStatus::Method(_) => { + warn!(channel = %frame.channel, "unexpected body") + } + WaitForBodyStatus::Header(_, header, vec) => { + vec.push(frame.payload); + match vec + .iter() + .map(Bytes::len) + .sum::() + .cmp(&usize::try_from(header.body_size).unwrap()) + { + Ordering::Equal => todo!("process body"), + Ordering::Greater => todo!("too much data!"), + Ordering::Less => {} // wait for next body + } + } + }, } } } - async fn dispatch_method(&mut self, frame: Frame) -> Result<()> { + async fn dispatch_method(&mut self, frame: Frame) -> Result { let method = methods::parse_method(&frame.payload)?; debug!(?method, "Received method"); @@ -219,6 +262,7 @@ impl Connection { // todo: handle closing } Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?, + Method::BasicPublish { .. } => return Ok(WaitForBodyStatus::Method(method)), _ => { let channel_handle = self .channels @@ -235,7 +279,7 @@ impl Connection { } } - Ok(()) + Ok(WaitForBodyStatus::None) } async fn channel_open(&mut self, num: u16) -> Result<()> { diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs index c548233..cd9c53c 100644 --- a/amqp_transport/src/frame.rs +++ b/amqp_transport/src/frame.rs @@ -1,6 +1,8 @@ use crate::error::{ConException, ProtocolError, Result}; +use amqp_core::methods::FieldValue; use anyhow::Context; use bytes::Bytes; +use smallvec::SmallVec; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::trace; @@ -31,6 +33,21 @@ pub enum FrameType { Heartbeat = 8, } +#[derive(Debug, Clone, PartialEq)] +pub struct ContentHeader { + pub class_id: u16, + pub weight: u16, + pub body_size: u64, + pub property_flags: SmallVec<[u16; 1]>, + pub property_fields: Vec, +} + +impl ContentHeader { + pub fn new() -> Self { + todo!() + } +} + pub async fn write_frame(frame: &Frame, mut w: W) -> Result<()> where W: AsyncWriteExt + Unpin,