From 3bcce768851197c7e097de2f3202f03ad5815d9d Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Fri, 4 Mar 2022 23:27:34 +0100 Subject: [PATCH] write header --- amqp_transport/src/connection.rs | 15 +++++- amqp_transport/src/frame.rs | 88 ++++++++++++++++++++++++++++++- amqp_transport/src/methods/mod.rs | 2 +- 3 files changed, 102 insertions(+), 3 deletions(-) diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 0b40d94..ee10969 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -148,10 +148,23 @@ impl TransportConnection { &mut self, channel: ChannelNum, method: &Method, - _header: ContentHeader, + header: ContentHeader, _body: SmallVec<[Bytes; 1]>, ) -> Result<()> { self.send_method(channel, method).await?; + + let mut header_buf = Vec::new(); + frame::write_content_header(&mut header_buf, header)?; + frame::write_frame( + &Frame { + kind: FrameType::Method, + channel, + payload: header_buf.into(), + }, + &mut self.stream, + ) + .await?; + amqp_todo!() } diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs index 55812e4..d2ff318 100644 --- a/amqp_transport/src/frame.rs +++ b/amqp_transport/src/frame.rs @@ -1,5 +1,8 @@ use crate::error::{ConException, ProtocolError, Result}; -use amqp_core::connection::{ChannelNum, ContentHeader}; +use amqp_core::{ + amqp_todo, + connection::{ChannelNum, ContentHeader}, +}; use anyhow::Context; use bytes::Bytes; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -128,6 +131,89 @@ pub fn parse_content_header(input: &[u8]) -> Result { } } +mod content_header_write { + use crate::{ + methods::write_helper::{octet, shortstr, table, timestamp}, + Result, + }; + use amqp_core::{ + connection::ContentHeader, + methods::FieldValue::{FieldTable, ShortShortUInt, ShortString, Timestamp}, + }; + + pub fn write_content_header(buf: &mut Vec, header: ContentHeader) -> Result<()> { + let mut flags = 0_u16; + buf.extend_from_slice(&flags.to_be_bytes()); // placeholder + + if let Some(ShortString(value)) = header.property_fields.get("content-type") { + flags |= 1 << 15; + shortstr(value, buf)?; + } + if let Some(ShortString(value)) = header.property_fields.get("content-encoding") { + flags |= 1 << 14; + shortstr(value, buf)?; + } + if let Some(FieldTable(value)) = header.property_fields.get("headers") { + flags |= 1 << 13; + table(value, buf)?; + } + if let Some(ShortShortUInt(value)) = header.property_fields.get("delivery-mode") { + flags |= 1 << 12; + octet(value, buf)?; + } + if let Some(ShortShortUInt(value)) = header.property_fields.get("priority") { + flags |= 1 << 11; + octet(value, buf)?; + } + if let Some(ShortString(value)) = header.property_fields.get("correlation-id") { + flags |= 1 << 10; + shortstr(value, buf)?; + } + if let Some(ShortString(value)) = header.property_fields.get("reply-to") { + flags |= 1 << 9; + shortstr(value, buf)?; + } + if let Some(ShortString(value)) = header.property_fields.get("expiration") { + flags |= 1 << 8; + shortstr(value, buf)?; + } + if let Some(ShortString(value)) = header.property_fields.get("message-id") { + flags |= 1 << 7; + shortstr(value, buf)?; + } + if let Some(Timestamp(value)) = header.property_fields.get("timestamp") { + flags |= 1 << 6; + timestamp(value, buf)?; + } + if let Some(ShortString(value)) = header.property_fields.get("type") { + flags |= 1 << 5; + shortstr(value, buf)?; + } + if let Some(ShortString(value)) = header.property_fields.get("user-id") { + flags |= 1 << 4; + shortstr(value, buf)?; + } + if let Some(ShortString(value)) = header.property_fields.get("app-id") { + flags |= 1 << 3; + shortstr(value, buf)?; + } + if let Some(ShortString(value)) = header.property_fields.get("reserved") { + flags |= 1 << 2; + shortstr(value, buf)?; + } + + let [a, b] = flags.to_be_bytes(); + buf[0] = a; + buf[1] = b; + + Ok(()) + } +} + +pub fn write_content_header(buf: &mut Vec, content_header: ContentHeader) -> Result<()> { + write_content_header(buf, content_header) +} + pub async fn write_frame(frame: &Frame, mut w: W) -> Result<()> where W: AsyncWriteExt + Unpin + Send, diff --git a/amqp_transport/src/methods/mod.rs b/amqp_transport/src/methods/mod.rs index a317451..c219b44 100644 --- a/amqp_transport/src/methods/mod.rs +++ b/amqp_transport/src/methods/mod.rs @@ -9,7 +9,7 @@ mod generated; pub mod parse_helper; #[cfg(test)] mod tests; -mod write_helper; +pub mod write_helper; pub use generated::*;