From e5fa49a05a82e7df7ca8530868cc353ef315d5cf Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Wed, 9 Feb 2022 15:19:18 +0100 Subject: [PATCH] parse the frame type --- amqp_transport/src/connection.rs | 4 ++-- amqp_transport/src/frame.rs | 35 +++++++++++++++++++++++++++----- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 5309382..37bdc08 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -1,6 +1,6 @@ use crate::error::{ProtocolError, TransError}; use crate::frame; -use anyhow::{ensure, Context}; +use anyhow::Context; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tracing::{debug, error}; @@ -58,7 +58,7 @@ impl Connection { Ok(()) } else { debug!(?version, expected_version = ?PROTOCOL_VERSION, "Version negotiation failed, unsupported version"); - return Err(ProtocolError::OtherCloseConnection.into()); + Err(ProtocolError::OtherCloseConnection.into()) } } } diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs index 9cd12ea..e35188e 100644 --- a/amqp_transport/src/frame.rs +++ b/amqp_transport/src/frame.rs @@ -13,9 +13,10 @@ mod frame_type { #[derive(Debug, Clone, PartialEq, Eq)] pub struct Frame { - /// type + /// The type of the frame including its parsed metadata. kind: FrameType, channel: u16, + /// Includes the whole payload, also including the metadata from each type. payload: Vec, } @@ -26,8 +27,6 @@ pub enum FrameType { /// 2 Header { class_id: u16, - /// Unused, must always be 0 - weight: u16, body_size: u64, /// Ordered from high to low property_flags: u16, @@ -59,7 +58,7 @@ where return Err(ProtocolError::ConException(ConException::FrameError).into()); } - let kind = parse_frame_type(kind, &payload)?; + let kind = parse_frame_type(kind, &payload, channel)?; Ok(Frame { kind, @@ -70,7 +69,33 @@ where fn parse_frame_type(kind: u8, payload: &[u8], channel: u16) -> Result { match kind { - frame_type::METHOD => todo!(), + frame_type::METHOD => { + let class_id = u16::from_be_bytes(payload[0..2].try_into().unwrap()); + let method_id = u16::from_be_bytes(payload[2..4].try_into().unwrap()); + + Ok(FrameType::Method { + class_id, + method_id, + }) + } + frame_type::HEADER => { + let class_id = u16::from_be_bytes(payload[0..2].try_into().unwrap()); + let weight = u16::from_be_bytes(payload[2..4].try_into().unwrap()); + // weight is unused and must always be 0 + if weight != 0 { + return Err(ProtocolError::ConException(ConException::FrameError).into()); + } + + let body_size = u64::from_be_bytes(payload[4..12].try_into().unwrap()); + let property_flags = u16::from_be_bytes(payload[12..14].try_into().unwrap()); + + Ok(FrameType::Header { + class_id, + body_size, + property_flags, + }) + } + frame_type::BODY => Ok(FrameType::Body), frame_type::HEARTBEAT => { if channel != 0 { Err(ProtocolError::ConException(ConException::FrameError).into())