write header

This commit is contained in:
nora 2022-03-04 23:27:34 +01:00
parent b6355f5e35
commit 3bcce76885
3 changed files with 102 additions and 3 deletions

View file

@ -148,10 +148,23 @@ impl TransportConnection {
&mut self,
channel: ChannelNum,
method: &Method,
_header: ContentHeader,
header: ContentHeader,
_body: SmallVec<[Bytes; 1]>,
) -> Result<()> {
self.send_method(channel, method).await?;
let mut header_buf = Vec::new();
frame::write_content_header(&mut header_buf, header)?;
frame::write_frame(
&Frame {
kind: FrameType::Method,
channel,
payload: header_buf.into(),
},
&mut self.stream,
)
.await?;
amqp_todo!()
}

View file

@ -1,5 +1,8 @@
use crate::error::{ConException, ProtocolError, Result};
use amqp_core::connection::{ChannelNum, ContentHeader};
use amqp_core::{
amqp_todo,
connection::{ChannelNum, ContentHeader},
};
use anyhow::Context;
use bytes::Bytes;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -128,6 +131,89 @@ pub fn parse_content_header(input: &[u8]) -> Result<ContentHeader> {
}
}
mod content_header_write {
use crate::{
methods::write_helper::{octet, shortstr, table, timestamp},
Result,
};
use amqp_core::{
connection::ContentHeader,
methods::FieldValue::{FieldTable, ShortShortUInt, ShortString, Timestamp},
};
pub fn write_content_header(buf: &mut Vec<u8>, header: ContentHeader) -> Result<()> {
let mut flags = 0_u16;
buf.extend_from_slice(&flags.to_be_bytes()); // placeholder
if let Some(ShortString(value)) = header.property_fields.get("content-type") {
flags |= 1 << 15;
shortstr(value, buf)?;
}
if let Some(ShortString(value)) = header.property_fields.get("content-encoding") {
flags |= 1 << 14;
shortstr(value, buf)?;
}
if let Some(FieldTable(value)) = header.property_fields.get("headers") {
flags |= 1 << 13;
table(value, buf)?;
}
if let Some(ShortShortUInt(value)) = header.property_fields.get("delivery-mode") {
flags |= 1 << 12;
octet(value, buf)?;
}
if let Some(ShortShortUInt(value)) = header.property_fields.get("priority") {
flags |= 1 << 11;
octet(value, buf)?;
}
if let Some(ShortString(value)) = header.property_fields.get("correlation-id") {
flags |= 1 << 10;
shortstr(value, buf)?;
}
if let Some(ShortString(value)) = header.property_fields.get("reply-to") {
flags |= 1 << 9;
shortstr(value, buf)?;
}
if let Some(ShortString(value)) = header.property_fields.get("expiration") {
flags |= 1 << 8;
shortstr(value, buf)?;
}
if let Some(ShortString(value)) = header.property_fields.get("message-id") {
flags |= 1 << 7;
shortstr(value, buf)?;
}
if let Some(Timestamp(value)) = header.property_fields.get("timestamp") {
flags |= 1 << 6;
timestamp(value, buf)?;
}
if let Some(ShortString(value)) = header.property_fields.get("type") {
flags |= 1 << 5;
shortstr(value, buf)?;
}
if let Some(ShortString(value)) = header.property_fields.get("user-id") {
flags |= 1 << 4;
shortstr(value, buf)?;
}
if let Some(ShortString(value)) = header.property_fields.get("app-id") {
flags |= 1 << 3;
shortstr(value, buf)?;
}
if let Some(ShortString(value)) = header.property_fields.get("reserved") {
flags |= 1 << 2;
shortstr(value, buf)?;
}
let [a, b] = flags.to_be_bytes();
buf[0] = a;
buf[1] = b;
Ok(())
}
}
pub fn write_content_header(buf: &mut Vec<u8>, content_header: ContentHeader) -> Result<()> {
write_content_header(buf, content_header)
}
pub async fn write_frame<W>(frame: &Frame, mut w: W) -> Result<()>
where
W: AsyncWriteExt + Unpin + Send,

View file

@ -9,7 +9,7 @@ mod generated;
pub mod parse_helper;
#[cfg(test)]
mod tests;
mod write_helper;
pub mod write_helper;
pub use generated::*;