diff --git a/amqp_codegen/src/write.rs b/amqp_codegen/src/write.rs index f949649..a6113a3 100644 --- a/amqp_codegen/src/write.rs +++ b/amqp_codegen/src/write.rs @@ -7,9 +7,9 @@ pub(crate) fn codegen_write(amqp: &Amqp) { use super::*; use crate::classes::write_helper::*; use crate::error::TransError; -use tokio::io::AsyncWriteExt; +use std::io::Write; -pub async fn write_method(class: Class, mut writer: W) -> Result<(), TransError> {{ +pub fn write_method(class: Class, mut writer: W) -> Result<(), TransError> {{ match class {{" ); @@ -25,7 +25,7 @@ pub async fn write_method(class: Class, mut writer: W) println!(" {field_name},"); } println!(" }}) => {{"); - println!(" writer.write_all(&[{class_index}, {method_index}]).await?;"); + println!(" writer.write_all(&[{class_index}, {method_index}])?;"); let mut iter = method.fields.iter().peekable(); while let Some(field) = iter.next() { @@ -38,9 +38,9 @@ pub async fn write_method(class: Class, mut writer: W) let field_name = snake_case(&field.name); print!("{field_name}, "); } - println!("], &mut writer).await?;"); + println!("], &mut writer)?;"); } else { - println!(" {type_name}({field_name}, &mut writer).await?;"); + println!(" {type_name}({field_name}, &mut writer)?;"); } } println!(" }}"); diff --git a/amqp_transport/src/classes/generated.rs b/amqp_transport/src/classes/generated.rs index bc8257a..eca094d 100644 --- a/amqp_transport/src/classes/generated.rs +++ b/amqp_transport/src/classes/generated.rs @@ -1134,12 +1134,9 @@ pub mod write { use super::*; use crate::classes::write_helper::*; use crate::error::TransError; - use tokio::io::AsyncWriteExt; + use std::io::Write; - pub async fn write_method( - class: Class, - mut writer: W, - ) -> Result<(), TransError> { + pub fn write_method(class: Class, mut writer: W) -> Result<(), TransError> { match class { Class::Connection(Connection::Start { version_major, @@ -1148,12 +1145,12 @@ pub mod write { mechanisms, locales, }) => { - writer.write_all(&[10, 10]).await?; - octet(version_major, &mut writer).await?; - octet(version_minor, &mut writer).await?; - table(server_properties, &mut writer).await?; - longstr(mechanisms, &mut writer).await?; - longstr(locales, &mut writer).await?; + writer.write_all(&[10, 10])?; + octet(version_major, &mut writer)?; + octet(version_minor, &mut writer)?; + table(server_properties, &mut writer)?; + longstr(mechanisms, &mut writer)?; + longstr(locales, &mut writer)?; } Class::Connection(Connection::StartOk { client_properties, @@ -1161,53 +1158,53 @@ pub mod write { response, locale, }) => { - writer.write_all(&[10, 11]).await?; - table(client_properties, &mut writer).await?; - shortstr(mechanism, &mut writer).await?; - longstr(response, &mut writer).await?; - shortstr(locale, &mut writer).await?; + writer.write_all(&[10, 11])?; + table(client_properties, &mut writer)?; + shortstr(mechanism, &mut writer)?; + longstr(response, &mut writer)?; + shortstr(locale, &mut writer)?; } Class::Connection(Connection::Secure { challenge }) => { - writer.write_all(&[10, 20]).await?; - longstr(challenge, &mut writer).await?; + writer.write_all(&[10, 20])?; + longstr(challenge, &mut writer)?; } Class::Connection(Connection::SecureOk { response }) => { - writer.write_all(&[10, 21]).await?; - longstr(response, &mut writer).await?; + writer.write_all(&[10, 21])?; + longstr(response, &mut writer)?; } Class::Connection(Connection::Tune { channel_max, frame_max, heartbeat, }) => { - writer.write_all(&[10, 30]).await?; - short(channel_max, &mut writer).await?; - long(frame_max, &mut writer).await?; - short(heartbeat, &mut writer).await?; + writer.write_all(&[10, 30])?; + short(channel_max, &mut writer)?; + long(frame_max, &mut writer)?; + short(heartbeat, &mut writer)?; } Class::Connection(Connection::TuneOk { channel_max, frame_max, heartbeat, }) => { - writer.write_all(&[10, 31]).await?; - short(channel_max, &mut writer).await?; - long(frame_max, &mut writer).await?; - short(heartbeat, &mut writer).await?; + writer.write_all(&[10, 31])?; + short(channel_max, &mut writer)?; + long(frame_max, &mut writer)?; + short(heartbeat, &mut writer)?; } Class::Connection(Connection::Open { virtual_host, reserved_1, reserved_2, }) => { - writer.write_all(&[10, 40]).await?; - shortstr(virtual_host, &mut writer).await?; - shortstr(reserved_1, &mut writer).await?; - bit(&[reserved_2], &mut writer).await?; + writer.write_all(&[10, 40])?; + shortstr(virtual_host, &mut writer)?; + shortstr(reserved_1, &mut writer)?; + bit(&[reserved_2], &mut writer)?; } Class::Connection(Connection::OpenOk { reserved_1 }) => { - writer.write_all(&[10, 41]).await?; - shortstr(reserved_1, &mut writer).await?; + writer.write_all(&[10, 41])?; + shortstr(reserved_1, &mut writer)?; } Class::Connection(Connection::Close { reply_code, @@ -1215,37 +1212,37 @@ pub mod write { class_id, method_id, }) => { - writer.write_all(&[10, 50]).await?; - short(reply_code, &mut writer).await?; - shortstr(reply_text, &mut writer).await?; - short(class_id, &mut writer).await?; - short(method_id, &mut writer).await?; + writer.write_all(&[10, 50])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + short(class_id, &mut writer)?; + short(method_id, &mut writer)?; } Class::Connection(Connection::CloseOk {}) => { - writer.write_all(&[10, 51]).await?; + writer.write_all(&[10, 51])?; } Class::Connection(Connection::Blocked { reason }) => { - writer.write_all(&[10, 60]).await?; - shortstr(reason, &mut writer).await?; + writer.write_all(&[10, 60])?; + shortstr(reason, &mut writer)?; } Class::Connection(Connection::Unblocked {}) => { - writer.write_all(&[10, 61]).await?; + writer.write_all(&[10, 61])?; } Class::Channel(Channel::Open { reserved_1 }) => { - writer.write_all(&[20, 10]).await?; - shortstr(reserved_1, &mut writer).await?; + writer.write_all(&[20, 10])?; + shortstr(reserved_1, &mut writer)?; } Class::Channel(Channel::OpenOk { reserved_1 }) => { - writer.write_all(&[20, 11]).await?; - longstr(reserved_1, &mut writer).await?; + writer.write_all(&[20, 11])?; + longstr(reserved_1, &mut writer)?; } Class::Channel(Channel::Flow { active }) => { - writer.write_all(&[20, 20]).await?; - bit(&[active], &mut writer).await?; + writer.write_all(&[20, 20])?; + bit(&[active], &mut writer)?; } Class::Channel(Channel::FlowOk { active }) => { - writer.write_all(&[20, 21]).await?; - bit(&[active], &mut writer).await?; + writer.write_all(&[20, 21])?; + bit(&[active], &mut writer)?; } Class::Channel(Channel::Close { reply_code, @@ -1253,14 +1250,14 @@ pub mod write { class_id, method_id, }) => { - writer.write_all(&[20, 40]).await?; - short(reply_code, &mut writer).await?; - shortstr(reply_text, &mut writer).await?; - short(class_id, &mut writer).await?; - short(method_id, &mut writer).await?; + writer.write_all(&[20, 40])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + short(class_id, &mut writer)?; + short(method_id, &mut writer)?; } Class::Channel(Channel::CloseOk {}) => { - writer.write_all(&[20, 41]).await?; + writer.write_all(&[20, 41])?; } Class::Exchange(Exchange::Declare { reserved_1, @@ -1273,19 +1270,18 @@ pub mod write { no_wait, arguments, }) => { - writer.write_all(&[40, 10]).await?; - short(reserved_1, &mut writer).await?; - shortstr(exchange, &mut writer).await?; - shortstr(r#type, &mut writer).await?; + writer.write_all(&[40, 10])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(r#type, &mut writer)?; bit( &[passive, durable, reserved_2, reserved_3, no_wait], &mut writer, - ) - .await?; - table(arguments, &mut writer).await?; + )?; + table(arguments, &mut writer)?; } Class::Exchange(Exchange::DeclareOk {}) => { - writer.write_all(&[40, 11]).await?; + writer.write_all(&[40, 11])?; } Class::Exchange(Exchange::Delete { reserved_1, @@ -1293,13 +1289,13 @@ pub mod write { if_unused, no_wait, }) => { - writer.write_all(&[40, 20]).await?; - short(reserved_1, &mut writer).await?; - shortstr(exchange, &mut writer).await?; - bit(&[if_unused, no_wait], &mut writer).await?; + writer.write_all(&[40, 20])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + bit(&[if_unused, no_wait], &mut writer)?; } Class::Exchange(Exchange::DeleteOk {}) => { - writer.write_all(&[40, 21]).await?; + writer.write_all(&[40, 21])?; } Class::Queue(Queue::Declare { reserved_1, @@ -1311,25 +1307,24 @@ pub mod write { no_wait, arguments, }) => { - writer.write_all(&[50, 10]).await?; - short(reserved_1, &mut writer).await?; - shortstr(queue, &mut writer).await?; + writer.write_all(&[50, 10])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; bit( &[passive, durable, exclusive, auto_delete, no_wait], &mut writer, - ) - .await?; - table(arguments, &mut writer).await?; + )?; + table(arguments, &mut writer)?; } Class::Queue(Queue::DeclareOk { queue, message_count, consumer_count, }) => { - writer.write_all(&[50, 11]).await?; - shortstr(queue, &mut writer).await?; - long(message_count, &mut writer).await?; - long(consumer_count, &mut writer).await?; + writer.write_all(&[50, 11])?; + shortstr(queue, &mut writer)?; + long(message_count, &mut writer)?; + long(consumer_count, &mut writer)?; } Class::Queue(Queue::Bind { reserved_1, @@ -1339,16 +1334,16 @@ pub mod write { no_wait, arguments, }) => { - writer.write_all(&[50, 20]).await?; - short(reserved_1, &mut writer).await?; - shortstr(queue, &mut writer).await?; - shortstr(exchange, &mut writer).await?; - shortstr(routing_key, &mut writer).await?; - bit(&[no_wait], &mut writer).await?; - table(arguments, &mut writer).await?; + writer.write_all(&[50, 20])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + bit(&[no_wait], &mut writer)?; + table(arguments, &mut writer)?; } Class::Queue(Queue::BindOk {}) => { - writer.write_all(&[50, 21]).await?; + writer.write_all(&[50, 21])?; } Class::Queue(Queue::Unbind { reserved_1, @@ -1357,29 +1352,29 @@ pub mod write { routing_key, arguments, }) => { - writer.write_all(&[50, 50]).await?; - short(reserved_1, &mut writer).await?; - shortstr(queue, &mut writer).await?; - shortstr(exchange, &mut writer).await?; - shortstr(routing_key, &mut writer).await?; - table(arguments, &mut writer).await?; + writer.write_all(&[50, 50])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + table(arguments, &mut writer)?; } Class::Queue(Queue::UnbindOk {}) => { - writer.write_all(&[50, 51]).await?; + writer.write_all(&[50, 51])?; } Class::Queue(Queue::Purge { reserved_1, queue, no_wait, }) => { - writer.write_all(&[50, 30]).await?; - short(reserved_1, &mut writer).await?; - shortstr(queue, &mut writer).await?; - bit(&[no_wait], &mut writer).await?; + writer.write_all(&[50, 30])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit(&[no_wait], &mut writer)?; } Class::Queue(Queue::PurgeOk { message_count }) => { - writer.write_all(&[50, 31]).await?; - long(message_count, &mut writer).await?; + writer.write_all(&[50, 31])?; + long(message_count, &mut writer)?; } Class::Queue(Queue::Delete { reserved_1, @@ -1388,27 +1383,27 @@ pub mod write { if_empty, no_wait, }) => { - writer.write_all(&[50, 40]).await?; - short(reserved_1, &mut writer).await?; - shortstr(queue, &mut writer).await?; - bit(&[if_unused, if_empty, no_wait], &mut writer).await?; + writer.write_all(&[50, 40])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit(&[if_unused, if_empty, no_wait], &mut writer)?; } Class::Queue(Queue::DeleteOk { message_count }) => { - writer.write_all(&[50, 41]).await?; - long(message_count, &mut writer).await?; + writer.write_all(&[50, 41])?; + long(message_count, &mut writer)?; } Class::Basic(Basic::Qos { prefetch_size, prefetch_count, global, }) => { - writer.write_all(&[60, 10]).await?; - long(prefetch_size, &mut writer).await?; - short(prefetch_count, &mut writer).await?; - bit(&[global], &mut writer).await?; + writer.write_all(&[60, 10])?; + long(prefetch_size, &mut writer)?; + short(prefetch_count, &mut writer)?; + bit(&[global], &mut writer)?; } Class::Basic(Basic::QosOk {}) => { - writer.write_all(&[60, 11]).await?; + writer.write_all(&[60, 11])?; } Class::Basic(Basic::Consume { reserved_1, @@ -1420,28 +1415,28 @@ pub mod write { no_wait, arguments, }) => { - writer.write_all(&[60, 20]).await?; - short(reserved_1, &mut writer).await?; - shortstr(queue, &mut writer).await?; - shortstr(consumer_tag, &mut writer).await?; - bit(&[no_local, no_ack, exclusive, no_wait], &mut writer).await?; - table(arguments, &mut writer).await?; + writer.write_all(&[60, 20])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(consumer_tag, &mut writer)?; + bit(&[no_local, no_ack, exclusive, no_wait], &mut writer)?; + table(arguments, &mut writer)?; } Class::Basic(Basic::ConsumeOk { consumer_tag }) => { - writer.write_all(&[60, 21]).await?; - shortstr(consumer_tag, &mut writer).await?; + writer.write_all(&[60, 21])?; + shortstr(consumer_tag, &mut writer)?; } Class::Basic(Basic::Cancel { consumer_tag, no_wait, }) => { - writer.write_all(&[60, 30]).await?; - shortstr(consumer_tag, &mut writer).await?; - bit(&[no_wait], &mut writer).await?; + writer.write_all(&[60, 30])?; + shortstr(consumer_tag, &mut writer)?; + bit(&[no_wait], &mut writer)?; } Class::Basic(Basic::CancelOk { consumer_tag }) => { - writer.write_all(&[60, 31]).await?; - shortstr(consumer_tag, &mut writer).await?; + writer.write_all(&[60, 31])?; + shortstr(consumer_tag, &mut writer)?; } Class::Basic(Basic::Publish { reserved_1, @@ -1450,11 +1445,11 @@ pub mod write { mandatory, immediate, }) => { - writer.write_all(&[60, 40]).await?; - short(reserved_1, &mut writer).await?; - shortstr(exchange, &mut writer).await?; - shortstr(routing_key, &mut writer).await?; - bit(&[mandatory, immediate], &mut writer).await?; + writer.write_all(&[60, 40])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + bit(&[mandatory, immediate], &mut writer)?; } Class::Basic(Basic::Return { reply_code, @@ -1462,11 +1457,11 @@ pub mod write { exchange, routing_key, }) => { - writer.write_all(&[60, 50]).await?; - short(reply_code, &mut writer).await?; - shortstr(reply_text, &mut writer).await?; - shortstr(exchange, &mut writer).await?; - shortstr(routing_key, &mut writer).await?; + writer.write_all(&[60, 50])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; } Class::Basic(Basic::Deliver { consumer_tag, @@ -1475,22 +1470,22 @@ pub mod write { exchange, routing_key, }) => { - writer.write_all(&[60, 60]).await?; - shortstr(consumer_tag, &mut writer).await?; - longlong(delivery_tag, &mut writer).await?; - bit(&[redelivered], &mut writer).await?; - shortstr(exchange, &mut writer).await?; - shortstr(routing_key, &mut writer).await?; + writer.write_all(&[60, 60])?; + shortstr(consumer_tag, &mut writer)?; + longlong(delivery_tag, &mut writer)?; + bit(&[redelivered], &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; } Class::Basic(Basic::Get { reserved_1, queue, no_ack, }) => { - writer.write_all(&[60, 70]).await?; - short(reserved_1, &mut writer).await?; - shortstr(queue, &mut writer).await?; - bit(&[no_ack], &mut writer).await?; + writer.write_all(&[60, 70])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit(&[no_ack], &mut writer)?; } Class::Basic(Basic::GetOk { delivery_tag, @@ -1499,61 +1494,61 @@ pub mod write { routing_key, message_count, }) => { - writer.write_all(&[60, 71]).await?; - longlong(delivery_tag, &mut writer).await?; - bit(&[redelivered], &mut writer).await?; - shortstr(exchange, &mut writer).await?; - shortstr(routing_key, &mut writer).await?; - long(message_count, &mut writer).await?; + writer.write_all(&[60, 71])?; + longlong(delivery_tag, &mut writer)?; + bit(&[redelivered], &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + long(message_count, &mut writer)?; } Class::Basic(Basic::GetEmpty { reserved_1 }) => { - writer.write_all(&[60, 72]).await?; - shortstr(reserved_1, &mut writer).await?; + writer.write_all(&[60, 72])?; + shortstr(reserved_1, &mut writer)?; } Class::Basic(Basic::Ack { delivery_tag, multiple, }) => { - writer.write_all(&[60, 80]).await?; - longlong(delivery_tag, &mut writer).await?; - bit(&[multiple], &mut writer).await?; + writer.write_all(&[60, 80])?; + longlong(delivery_tag, &mut writer)?; + bit(&[multiple], &mut writer)?; } Class::Basic(Basic::Reject { delivery_tag, requeue, }) => { - writer.write_all(&[60, 90]).await?; - longlong(delivery_tag, &mut writer).await?; - bit(&[requeue], &mut writer).await?; + writer.write_all(&[60, 90])?; + longlong(delivery_tag, &mut writer)?; + bit(&[requeue], &mut writer)?; } Class::Basic(Basic::RecoverAsync { requeue }) => { - writer.write_all(&[60, 100]).await?; - bit(&[requeue], &mut writer).await?; + writer.write_all(&[60, 100])?; + bit(&[requeue], &mut writer)?; } Class::Basic(Basic::Recover { requeue }) => { - writer.write_all(&[60, 110]).await?; - bit(&[requeue], &mut writer).await?; + writer.write_all(&[60, 110])?; + bit(&[requeue], &mut writer)?; } Class::Basic(Basic::RecoverOk {}) => { - writer.write_all(&[60, 111]).await?; + writer.write_all(&[60, 111])?; } Class::Tx(Tx::Select {}) => { - writer.write_all(&[90, 10]).await?; + writer.write_all(&[90, 10])?; } Class::Tx(Tx::SelectOk {}) => { - writer.write_all(&[90, 11]).await?; + writer.write_all(&[90, 11])?; } Class::Tx(Tx::Commit {}) => { - writer.write_all(&[90, 20]).await?; + writer.write_all(&[90, 20])?; } Class::Tx(Tx::CommitOk {}) => { - writer.write_all(&[90, 21]).await?; + writer.write_all(&[90, 21])?; } Class::Tx(Tx::Rollback {}) => { - writer.write_all(&[90, 30]).await?; + writer.write_all(&[90, 30])?; } Class::Tx(Tx::RollbackOk {}) => { - writer.write_all(&[90, 31]).await?; + writer.write_all(&[90, 31])?; } } Ok(()) diff --git a/amqp_transport/src/classes/write_helper.rs b/amqp_transport/src/classes/write_helper.rs index d83cf86..7df0222 100644 --- a/amqp_transport/src/classes/write_helper.rs +++ b/amqp_transport/src/classes/write_helper.rs @@ -1,32 +1,33 @@ -use crate::classes::FieldValue; -use crate::classes::{Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp}; -use crate::error::Result; +use crate::classes::generated::{ + Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp, +}; +use crate::classes::{FieldValue, TableFieldName}; +use crate::error::TransError; use anyhow::Context; -use std::future::Future; -use std::pin::Pin; -use tokio::io::AsyncWriteExt; +use std::io; +use std::io::Write; -pub async fn octet(value: Octet, writer: &mut W) -> Result<()> { - writer.write_u8(value).await?; +pub fn octet(value: Octet, writer: &mut W) -> Result<(), TransError> { + writer.write_all(&[value])?; Ok(()) } -pub async fn short(value: Short, writer: &mut W) -> Result<()> { - writer.write_u16(value).await?; +pub fn short(value: Short, writer: &mut W) -> Result<(), TransError> { + writer.write_all(&value.to_be_bytes())?; Ok(()) } -pub async fn long(value: Long, writer: &mut W) -> Result<()> { - writer.write_u32(value).await?; +pub fn long(value: Long, writer: &mut W) -> Result<(), TransError> { + writer.write_all(&value.to_be_bytes())?; Ok(()) } -pub async fn longlong(value: Longlong, writer: &mut W) -> Result<()> { - writer.write_u64(value).await?; +pub fn longlong(value: Longlong, writer: &mut W) -> Result<(), TransError> { + writer.write_all(&value.to_be_bytes())?; Ok(()) } -pub async fn bit(value: &[Bit], writer: &mut W) -> Result<()> { +pub fn bit(value: &[Bit], writer: &mut W) -> Result<(), TransError> { // accumulate bits into bytes, starting from the least significant bit in each byte // how many bits have already been packed into `current_buf` @@ -35,7 +36,7 @@ pub async fn bit(value: &[Bit], writer: &mut W) -> Res for &bit in value { if already_filled >= 8 { - writer.write_u8(current_buf).await?; + writer.write_all(&[current_buf])?; current_buf = 0; already_filled = 0; } @@ -46,151 +47,146 @@ pub async fn bit(value: &[Bit], writer: &mut W) -> Res } if already_filled > 0 { - writer.write_u8(current_buf).await?; + writer.write_all(&[current_buf])?; } Ok(()) } -pub async fn shortstr(value: Shortstr, writer: &mut W) -> Result<()> { +pub fn shortstr(value: Shortstr, writer: &mut W) -> Result<(), TransError> { let len = u8::try_from(value.len()).context("shortstr too long")?; - writer.write_u8(len).await?; - writer.write_all(value.as_bytes()).await?; + writer.write_all(&[len])?; + writer.write_all(value.as_bytes())?; Ok(()) } -pub async fn longstr(value: Longstr, writer: &mut W) -> Result<()> { +pub fn longstr(value: Longstr, writer: &mut W) -> Result<(), TransError> { let len = u32::try_from(value.len()).context("longstr too long")?; - writer.write_u32(len).await?; - writer.write_all(value.as_slice()).await?; + writer.write_all(&len.to_be_bytes())?; + writer.write_all(value.as_slice())?; Ok(()) } -pub async fn timestamp(value: Timestamp, writer: &mut W) -> Result<()> { - writer.write_u64(value).await?; +pub fn timestamp(value: Timestamp, writer: &mut W) -> Result<(), TransError> { + writer.write_all(&value.to_be_bytes())?; Ok(()) } -pub async fn table(table: Table, writer: &mut W) -> Result<()> { +pub fn table(table: Table, writer: &mut W) -> Result<(), TransError> { let len = u32::try_from(table.len()).context("table too big")?; - writer.write_u32(len).await?; + writer.write_all(&len.to_be_bytes())?; for (field_name, value) in table { - shortstr(field_name, writer).await?; - field_value(value, writer).await?; + shortstr(field_name, writer)?; + field_value(value, writer)?; } Ok(()) } -fn field_value( - value: FieldValue, - writer: &mut W, -) -> Pin> + '_>> { - Box::pin(async { - match value { - FieldValue::Boolean(bool) => { - writer.write_all(&[b't', u8::from(bool)]).await?; - } - FieldValue::ShortShortInt(int) => { - writer.write_all(b"b").await?; - writer.write_all(&int.to_be_bytes()).await?; - } - FieldValue::ShortShortUInt(int) => { - writer.write_all(&[b'B', int]).await?; - } - FieldValue::ShortInt(int) => { - writer.write_all(b"U").await?; - writer.write_all(&int.to_be_bytes()).await?; - } - FieldValue::ShortUInt(int) => { - writer.write_all(b"u").await?; - writer.write_all(&int.to_be_bytes()).await?; - } - FieldValue::LongInt(int) => { - writer.write_all(b"I").await?; - writer.write_all(&int.to_be_bytes()).await?; - } - FieldValue::LongUInt(int) => { - writer.write_all(b"i").await?; - writer.write_all(&int.to_be_bytes()).await?; - } - FieldValue::LongLongInt(int) => { - writer.write_all(b"L").await?; - writer.write_all(&int.to_be_bytes()).await?; - } - FieldValue::LongLongUInt(int) => { - writer.write_all(b"l").await?; - writer.write_all(&int.to_be_bytes()).await?; - } - FieldValue::Float(float) => { - writer.write_all(b"f").await?; - writer.write_all(&float.to_be_bytes()).await?; - } - FieldValue::Double(float) => { - writer.write_all(b"d").await?; - writer.write_all(&float.to_be_bytes()).await?; - } - FieldValue::DecimalValue(scale, long) => { - writer.write_all(&[b'D', scale]).await?; - writer.write_all(&long.to_be_bytes()).await?; - } - FieldValue::ShortString(str) => { - writer.write_all(b"s").await?; - shortstr(str, writer).await?; - } - FieldValue::LongString(str) => { - writer.write_all(b"S").await?; - longstr(str, writer).await?; - } - FieldValue::FieldArray(array) => { - writer.write_all(b"A").await?; - let len = u32::try_from(array.len()).context("array too long")?; - writer.write_all(&len.to_be_bytes()).await?; +fn field_value(value: FieldValue, writer: &mut W) -> Result<(), TransError> { + match value { + FieldValue::Boolean(bool) => { + writer.write_all(&[b't', u8::from(bool)])?; + } + FieldValue::ShortShortInt(int) => { + writer.write_all(b"b")?; + writer.write_all(&int.to_be_bytes())?; + } + FieldValue::ShortShortUInt(int) => { + writer.write_all(&[b'B', int])?; + } + FieldValue::ShortInt(int) => { + writer.write_all(b"U")?; + writer.write_all(&int.to_be_bytes())?; + } + FieldValue::ShortUInt(int) => { + writer.write_all(b"u")?; + writer.write_all(&int.to_be_bytes())?; + } + FieldValue::LongInt(int) => { + writer.write_all(b"I")?; + writer.write_all(&int.to_be_bytes())?; + } + FieldValue::LongUInt(int) => { + writer.write_all(b"i")?; + writer.write_all(&int.to_be_bytes())?; + } + FieldValue::LongLongInt(int) => { + writer.write_all(b"L")?; + writer.write_all(&int.to_be_bytes())?; + } + FieldValue::LongLongUInt(int) => { + writer.write_all(b"l")?; + writer.write_all(&int.to_be_bytes())?; + } + FieldValue::Float(float) => { + writer.write_all(b"f")?; + writer.write_all(&float.to_be_bytes())?; + } + FieldValue::Double(float) => { + writer.write_all(b"d")?; + writer.write_all(&float.to_be_bytes())?; + } + FieldValue::DecimalValue(scale, long) => { + writer.write_all(&[b'D', scale])?; + writer.write_all(&long.to_be_bytes())?; + } + FieldValue::ShortString(str) => { + writer.write_all(b"s")?; + shortstr(str, writer)?; + } + FieldValue::LongString(str) => { + writer.write_all(b"S")?; + longstr(str, writer)?; + } + FieldValue::FieldArray(array) => { + writer.write_all(b"A")?; + let len = u32::try_from(array.len()).context("array too long")?; + writer.write_all(&len.to_be_bytes())?; - for element in array { - field_value(element, writer).await?; - } - } - FieldValue::Timestamp(time) => { - writer.write_all(b"T").await?; - writer.write_all(&time.to_be_bytes()).await?; - } - FieldValue::FieldTable(_) => { - writer.write_all(b"F").await?; - } - FieldValue::Void => { - writer.write_all(b"V").await?; + for element in array { + field_value(element, writer)?; } } - Ok(()) - }) + FieldValue::Timestamp(time) => { + writer.write_all(b"T")?; + writer.write_all(&time.to_be_bytes())?; + } + FieldValue::FieldTable(_) => { + writer.write_all(b"F")?; + } + FieldValue::Void => { + writer.write_all(b"V")?; + } + } + Ok(()) } #[cfg(test)] mod tests { - #[tokio::test] - async fn pack_few_bits() { - let bits = vec![true, false, true]; + #[test] + fn pack_few_bits() { + let bits = [true, false, true]; - let mut buffer = Vec::new(); - super::bit(&bits, &mut buffer).await.unwrap(); + let mut buffer = [0u8; 1]; + super::bit(&bits, &mut buffer.as_mut_slice()).unwrap(); - assert_eq!(buffer.as_slice(), &[0b00000101]) + assert_eq!(buffer, [0b00000101]) } - #[tokio::test] - async fn pack_many_bits() { - let bits = vec![ + #[test] + fn pack_many_bits() { + let bits = [ /* first 8 */ true, true, true, true, false, false, false, false, /* second 4 */ true, false, true, true, ]; - let mut buffer = Vec::new(); - super::bit(&bits, &mut buffer).await.unwrap(); + let mut buffer = [0u8; 2]; + super::bit(&bits, &mut buffer.as_mut_slice()).unwrap(); assert_eq!(buffer, [0b00001111, 0b00001101]); } diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index b467b14..f85229f 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -51,11 +51,17 @@ impl Connection { locales: vec![], }); - let fut = classes::write::write_method(start_method, &mut self.stream); - warn!(size = %std::mem::size_of_val(&fut), "that future is big"); - // todo fix out_buffer buffer things :spiral_eyes: - // maybe have a `size` method on `Class` and use `AsyncWrite`? oh god no that's horrible - // frame::write_frame(&mut self.stream, Frame {})?; + let mut payload = Vec::with_capacity(64); + classes::write::write_method(start_method, &mut payload)?; + frame::write_frame( + &mut self.stream, + &Frame { + kind: FrameType::Method, + channel: 0, + payload, + }, + ) + .await?; Ok(()) }