From ccee6c36f61f8684ed49f83287c1fbe6b92f7f64 Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Wed, 9 Feb 2022 14:51:44 +0100 Subject: [PATCH] parse frame payload --- amqp_transport/src/connection.rs | 6 +-- amqp_transport/src/error.rs | 6 ++- amqp_transport/src/frame.rs | 76 ++++++++++++++++++++------------ 3 files changed, 55 insertions(+), 33 deletions(-) diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 4021a38..5309382 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -1,4 +1,4 @@ -use crate::error::{ConError, ProtocolError}; +use crate::error::{ProtocolError, TransError}; use crate::frame; use anyhow::{ensure, Context}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -21,7 +21,7 @@ impl Connection { } } - pub async fn run(&mut self) -> Result<(), ConError> { + pub async fn run(&mut self) -> Result<(), TransError> { self.negotiate_version().await?; loop { @@ -30,7 +30,7 @@ impl Connection { } } - async fn negotiate_version(&mut self) -> Result<(), ConError> { + async fn negotiate_version(&mut self) -> Result<(), TransError> { const HEADER_SIZE: usize = 8; const PROTOCOL_VERSION: &[u8] = &[0, 9, 1]; const PROTOCOL_HEADER: &[u8] = b"AMQP\0\0\x09\x01"; diff --git a/amqp_transport/src/error.rs b/amqp_transport/src/error.rs index db36cd2..0346767 100644 --- a/amqp_transport/src/error.rs +++ b/amqp_transport/src/error.rs @@ -1,5 +1,5 @@ #[derive(Debug, thiserror::Error)] -pub enum ConError { +pub enum TransError { #[error("{0}")] Invalid(#[from] ProtocolError), #[error("connection error: `{0}`")] @@ -24,6 +24,10 @@ pub enum ConException { FrameError, #[error("503 Command invalid")] CommandInvalid, + #[error("503 Syntax error")] + SyntaxError, + #[error("504 Channel error")] + ChannelError, } #[derive(Debug, thiserror::Error)] diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs index 16dae6d..9cd12ea 100644 --- a/amqp_transport/src/frame.rs +++ b/amqp_transport/src/frame.rs @@ -1,45 +1,48 @@ -use crate::error::{ConError, ConException, ProtocolError}; +use crate::error::{ConException, ProtocolError, TransError}; use anyhow::Context; use tokio::io::AsyncReadExt; const REQUIRED_FRAME_END: u8 = 0xCE; -#[derive(Debug, Clone, PartialEq, Eq)] -#[repr(u8)] -pub enum FrameType { - Method = 1, - Header = 2, - Body = 3, - Heartbeat = 4, -} - -impl TryFrom for FrameType { - type Error = ConError; - - fn try_from(value: u8) -> Result { - Ok(match value { - 1 => Self::Method, - 2 => Self::Header, - 3 => Self::Body, - 4 => Self::Heartbeat, - _ => return Err(ProtocolError::Fatal.into()), - }) - } +mod frame_type { + pub const METHOD: u8 = 1; + pub const HEADER: u8 = 2; + pub const BODY: u8 = 3; + pub const HEARTBEAT: u8 = 4; } #[derive(Debug, Clone, PartialEq, Eq)] pub struct Frame { - r#type: FrameType, + /// type + kind: FrameType, channel: u16, - size: u32, payload: Vec, } -pub async fn read_frame(r: &mut R, max_frame_size: usize) -> Result +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FrameType { + /// 1 + Method { class_id: u16, method_id: u16 }, + /// 2 + Header { + class_id: u16, + /// Unused, must always be 0 + weight: u16, + body_size: u64, + /// Ordered from high to low + property_flags: u16, + }, + /// 3 + Body, + /// 4 + Heartbeat, +} + +pub async fn read_frame(r: &mut R, max_frame_size: usize) -> Result where R: AsyncReadExt + Unpin, { - let r#type = r.read_u8().await.context("read type")?; + let kind = r.read_u8().await.context("read type")?; let channel = r.read_u16().await.context("read channel")?; let size = r.read_u32().await.context("read size")?; @@ -56,14 +59,29 @@ where return Err(ProtocolError::ConException(ConException::FrameError).into()); } + let kind = parse_frame_type(kind, &payload)?; + Ok(Frame { - r#type: r#type.try_into()?, + kind, channel, - size, payload, }) } +fn parse_frame_type(kind: u8, payload: &[u8], channel: u16) -> Result { + match kind { + frame_type::METHOD => todo!(), + frame_type::HEARTBEAT => { + if channel != 0 { + Err(ProtocolError::ConException(ConException::FrameError).into()) + } else { + Ok(FrameType::Heartbeat) + } + } + _ => todo!(), + } +} + #[cfg(test)] mod tests { use crate::frame::{Frame, FrameType}; @@ -88,7 +106,7 @@ mod tests { assert_eq!( frame, Frame { - r#type: FrameType::Method, + kind: FrameType::Method, channel: 0, size: 3, payload: vec![1, 2, 3],