diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs new file mode 100644 index 0000000..7691b58 --- /dev/null +++ b/amqp_transport/src/frame.rs @@ -0,0 +1,58 @@ +use anyhow::Result; +use tokio::io::AsyncReadExt; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Frame { + r#type: u8, + channel: u16, + size: u32, + payload: Vec, + frame_end: u8, +} + +pub async fn read_frame(r: &mut R) -> Result +where + R: AsyncReadExt + Unpin, +{ + let r#type = r.read_u8().await?; + let channel = r.read_u16().await?; + let size = r.read_u32().await?; + + let mut payload = vec![0; size.try_into().unwrap()]; + r.read_exact(&mut payload).await?; + + let frame_end = r.read_u8().await?; + + Ok(Frame { + r#type, + channel, + size, + payload, + frame_end, + }) +} + +#[cfg(test)] +mod tests { + use crate::frame::Frame; + + #[tokio::test] + async fn read_small_body() { + let mut bytes: &[u8] = &[ + /*type*/ 1, /*channel*/ 0, 0, /*size*/ 0, 0, 0, 3, /*payload*/ 1, + 2, 3, /*frame-end*/ 0, + ]; + + let frame = super::read_frame(&mut bytes).await.unwrap(); + assert_eq!( + frame, + Frame { + r#type: 1, + channel: 0, + size: 3, + payload: vec![1, 2, 3], + frame_end: 0 + } + ); + } +} diff --git a/amqp_transport/src/lib.rs b/amqp_transport/src/lib.rs index acf635d..2ca54cb 100644 --- a/amqp_transport/src/lib.rs +++ b/amqp_transport/src/lib.rs @@ -1,4 +1,5 @@ mod connection; +mod frame; use crate::connection::Connection; use anyhow::Result;