From fcf531df43cb4a7782098e977b10f0d44d46914e Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Sun, 13 Feb 2022 16:16:10 +0100 Subject: [PATCH] big future --- amqp_codegen/src/write.rs | 12 +- amqp_transport/src/classes/generated.rs | 2386 ++++++++++---------- amqp_transport/src/classes/mod.rs | 3 +- amqp_transport/src/classes/write_helper.rs | 230 +- amqp_transport/src/connection.rs | 43 +- amqp_transport/src/error.rs | 2 + amqp_transport/src/frame.rs | 24 +- amqp_transport/src/lib.rs | 2 +- 8 files changed, 1405 insertions(+), 1297 deletions(-) diff --git a/amqp_codegen/src/write.rs b/amqp_codegen/src/write.rs index 289493f..f949649 100644 --- a/amqp_codegen/src/write.rs +++ b/amqp_codegen/src/write.rs @@ -3,13 +3,13 @@ use heck::ToUpperCamelCase; pub(crate) fn codegen_write(amqp: &Amqp) { println!( - "mod write {{ + "pub mod write {{ use super::*; use crate::classes::write_helper::*; use crate::error::TransError; -use std::io::Write; +use tokio::io::AsyncWriteExt; -pub fn write_method(class: Class, mut writer: W) -> Result<(), TransError> {{ +pub async fn write_method(class: Class, mut writer: W) -> Result<(), TransError> {{ match class {{" ); @@ -25,7 +25,7 @@ pub fn write_method(class: Class, mut writer: W) -> Result<(), TransEr println!(" {field_name},"); } println!(" }}) => {{"); - println!(" writer.write_all(&[{class_index}, {method_index}])?;"); + println!(" writer.write_all(&[{class_index}, {method_index}]).await?;"); let mut iter = method.fields.iter().peekable(); while let Some(field) = iter.next() { @@ -38,9 +38,9 @@ pub fn write_method(class: Class, mut writer: W) -> Result<(), TransEr let field_name = snake_case(&field.name); print!("{field_name}, "); } - println!("], &mut writer)?;"); + println!("], &mut writer).await?;"); } else { - println!(" {type_name}({field_name}, &mut writer)?;"); + println!(" {type_name}({field_name}, &mut writer).await?;"); } } println!(" }}"); diff --git a/amqp_transport/src/classes/generated.rs b/amqp_transport/src/classes/generated.rs index 45d879d..bc8257a 100644 --- a/amqp_transport/src/classes/generated.rs +++ b/amqp_transport/src/classes/generated.rs @@ -87,9 +87,7 @@ pub enum Connection { locale: Shortstr, }, /// Index 20 - Secure { - challenge: Longstr, - }, + Secure { challenge: Longstr }, /// Index 21 SecureOk { /// must not be null @@ -115,9 +113,7 @@ pub enum Connection { reserved_2: Bit, }, /// Index 41 - OpenOk { - reserved_1: Shortstr, - }, + OpenOk { reserved_1: Shortstr }, /// Index 50 Close { reply_code: ReplyCode, @@ -128,9 +124,7 @@ pub enum Connection { /// Index 51 CloseOk, /// Index 60 - Blocked { - reason: Shortstr, - }, + Blocked { reason: Shortstr }, /// Index 61 Unblocked, } @@ -138,21 +132,13 @@ pub enum Connection { #[derive(Debug, Clone, PartialEq)] pub enum Channel { /// Index 10 - Open { - reserved_1: Shortstr, - }, + Open { reserved_1: Shortstr }, /// Index 11 - OpenOk { - reserved_1: Longstr, - }, + OpenOk { reserved_1: Longstr }, /// Index 20 - Flow { - active: Bit, - }, + Flow { active: Bit }, /// Index 21 - FlowOk { - active: Bit, - }, + FlowOk { active: Bit }, /// Index 40 Close { reply_code: ReplyCode, @@ -241,9 +227,7 @@ pub enum Queue { no_wait: NoWait, }, /// Index 31 - PurgeOk { - message_count: MessageCount, - }, + PurgeOk { message_count: MessageCount }, /// Index 40 Delete { reserved_1: Short, @@ -253,9 +237,7 @@ pub enum Queue { no_wait: NoWait, }, /// Index 41 - DeleteOk { - message_count: MessageCount, - }, + DeleteOk { message_count: MessageCount }, } /// Index 60, handler = channel #[derive(Debug, Clone, PartialEq)] @@ -280,18 +262,14 @@ pub enum Basic { arguments: Table, }, /// Index 21 - ConsumeOk { - consumer_tag: ConsumerTag, - }, + ConsumeOk { consumer_tag: ConsumerTag }, /// Index 30 Cancel { consumer_tag: ConsumerTag, no_wait: NoWait, }, /// Index 31 - CancelOk { - consumer_tag: ConsumerTag, - }, + CancelOk { consumer_tag: ConsumerTag }, /// Index 40 Publish { reserved_1: Short, @@ -330,9 +308,7 @@ pub enum Basic { message_count: MessageCount, }, /// Index 72 - GetEmpty { - reserved_1: Shortstr, - }, + GetEmpty { reserved_1: Shortstr }, /// Index 80 Ack { delivery_tag: DeliveryTag, @@ -344,13 +320,9 @@ pub enum Basic { requeue: Bit, }, /// Index 100 - RecoverAsync { - requeue: Bit, - }, + RecoverAsync { requeue: Bit }, /// Index 110 - Recover { - requeue: Bit, - }, + Recover { requeue: Bit }, /// Index 111 RecoverOk, } @@ -371,1127 +343,1219 @@ pub enum Tx { RollbackOk, } pub mod parse { -use super::*; -use crate::classes::parse_helper::*; -use crate::error::TransError; -use nom::{branch::alt, bytes::complete::tag}; -use regex::Regex; -use once_cell::sync::Lazy; + use super::*; + use crate::classes::parse_helper::*; + use crate::error::TransError; + use nom::{branch::alt, bytes::complete::tag}; + use once_cell::sync::Lazy; + use regex::Regex; -pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; + pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; -pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err> { - alt((connection, channel, exchange, queue, basic, tx))(input) -} -fn domain_class_id(input: &[u8]) -> IResult { - short(input) -} -fn domain_consumer_tag(input: &[u8]) -> IResult { - shortstr(input) -} -fn domain_delivery_tag(input: &[u8]) -> IResult { - longlong(input) -} -fn domain_exchange_name(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.len() > 127 { fail!() } - static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); - if !REGEX.is_match(&result) { fail!() } - Ok((input, result)) -} -fn domain_method_id(input: &[u8]) -> IResult { - short(input) -} -fn domain_path(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.is_empty() { fail!() } - if result.len() > 127 { fail!() } - Ok((input, result)) -} -fn domain_peer_properties(input: &[u8]) -> IResult { - table(input) -} -fn domain_queue_name(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.len() > 127 { fail!() } - static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); - if !REGEX.is_match(&result) { fail!() } - Ok((input, result)) -} -fn domain_message_count(input: &[u8]) -> IResult { - long(input) -} -fn domain_reply_code(input: &[u8]) -> IResult { - let (input, result) = short(input)?; - if result == 0 { fail!() } - Ok((input, result)) -} -fn domain_reply_text(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.is_empty() { fail!() } - Ok((input, result)) -} -fn domain_octet(input: &[u8]) -> IResult { - octet(input) -} -fn domain_short(input: &[u8]) -> IResult { - short(input) -} -fn domain_long(input: &[u8]) -> IResult { - long(input) -} -fn domain_longlong(input: &[u8]) -> IResult { - longlong(input) -} -fn domain_shortstr(input: &[u8]) -> IResult { - shortstr(input) -} -fn domain_longstr(input: &[u8]) -> IResult { - longstr(input) -} -fn domain_timestamp(input: &[u8]) -> IResult { - timestamp(input) -} -fn domain_table(input: &[u8]) -> IResult { - table(input) -} -fn connection(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - alt((connection_start, connection_start_ok, connection_secure, connection_secure_ok, connection_tune, connection_tune_ok, connection_open, connection_open_ok, connection_close, connection_close_ok, connection_blocked, connection_unblocked))(input) -} -fn connection_start(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, version_major) = domain_octet(input)?; - let (input, version_minor) = domain_octet(input)?; - let (input, server_properties) = domain_peer_properties(input)?; - let (input, mechanisms) = domain_longstr(input)?; - if mechanisms.is_empty() { fail!() } - let (input, locales) = domain_longstr(input)?; - if locales.is_empty() { fail!() } - Ok((input, Class::Connection(Connection::Start { - version_major, - version_minor, - server_properties, - mechanisms, - locales, - }))) -} -fn connection_start_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - let (input, client_properties) = domain_peer_properties(input)?; - let (input, mechanism) = domain_shortstr(input)?; - if mechanism.is_empty() { fail!() } - let (input, response) = domain_longstr(input)?; - if response.is_empty() { fail!() } - let (input, locale) = domain_shortstr(input)?; - if locale.is_empty() { fail!() } - Ok((input, Class::Connection(Connection::StartOk { - client_properties, - mechanism, - response, - locale, - }))) -} -fn connection_secure(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, challenge) = domain_longstr(input)?; - Ok((input, Class::Connection(Connection::Secure { - challenge, - }))) -} -fn connection_secure_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - let (input, response) = domain_longstr(input)?; - if response.is_empty() { fail!() } - Ok((input, Class::Connection(Connection::SecureOk { - response, - }))) -} -fn connection_tune(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - let (input, channel_max) = domain_short(input)?; - let (input, frame_max) = domain_long(input)?; - let (input, heartbeat) = domain_short(input)?; - Ok((input, Class::Connection(Connection::Tune { - channel_max, - frame_max, - heartbeat, - }))) -} -fn connection_tune_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - let (input, channel_max) = domain_short(input)?; - if channel_max == 0 { fail!() } - let (input, frame_max) = domain_long(input)?; - let (input, heartbeat) = domain_short(input)?; - Ok((input, Class::Connection(Connection::TuneOk { - channel_max, - frame_max, - heartbeat, - }))) -} -fn connection_open(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - let (input, virtual_host) = domain_path(input)?; - let (input, reserved_1) = domain_shortstr(input)?; - let (input, bits) = bit(input, 1)?; - let reserved_2 = bits[0]; - Ok((input, Class::Connection(Connection::Open { - virtual_host, - reserved_1, - reserved_2, - }))) -} -fn connection_open_ok(input: &[u8]) -> IResult { - let (input, _) = tag([41])(input)?; - let (input, reserved_1) = domain_shortstr(input)?; - Ok((input, Class::Connection(Connection::OpenOk { - reserved_1, - }))) -} -fn connection_close(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - let (input, reply_code) = domain_reply_code(input)?; - let (input, reply_text) = domain_reply_text(input)?; - let (input, class_id) = domain_class_id(input)?; - let (input, method_id) = domain_method_id(input)?; - Ok((input, Class::Connection(Connection::Close { - reply_code, - reply_text, - class_id, - method_id, - }))) -} -fn connection_close_ok(input: &[u8]) -> IResult { - let (input, _) = tag([51])(input)?; - Ok((input, Class::Connection(Connection::CloseOk { - }))) -} -fn connection_blocked(input: &[u8]) -> IResult { - let (input, _) = tag([60])(input)?; - let (input, reason) = domain_shortstr(input)?; - Ok((input, Class::Connection(Connection::Blocked { - reason, - }))) -} -fn connection_unblocked(input: &[u8]) -> IResult { - let (input, _) = tag([61])(input)?; - Ok((input, Class::Connection(Connection::Unblocked { - }))) -} -fn channel(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - alt((channel_open, channel_open_ok, channel_flow, channel_flow_ok, channel_close, channel_close_ok))(input) -} -fn channel_open(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, reserved_1) = domain_shortstr(input)?; - Ok((input, Class::Channel(Channel::Open { - reserved_1, - }))) -} -fn channel_open_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - let (input, reserved_1) = domain_longstr(input)?; - Ok((input, Class::Channel(Channel::OpenOk { - reserved_1, - }))) -} -fn channel_flow(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, bits) = bit(input, 1)?; - let active = bits[0]; - Ok((input, Class::Channel(Channel::Flow { - active, - }))) -} -fn channel_flow_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - let (input, bits) = bit(input, 1)?; - let active = bits[0]; - Ok((input, Class::Channel(Channel::FlowOk { - active, - }))) -} -fn channel_close(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - let (input, reply_code) = domain_reply_code(input)?; - let (input, reply_text) = domain_reply_text(input)?; - let (input, class_id) = domain_class_id(input)?; - let (input, method_id) = domain_method_id(input)?; - Ok((input, Class::Channel(Channel::Close { - reply_code, - reply_text, - class_id, - method_id, - }))) -} -fn channel_close_ok(input: &[u8]) -> IResult { - let (input, _) = tag([41])(input)?; - Ok((input, Class::Channel(Channel::CloseOk { - }))) -} -fn exchange(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - alt((exchange_declare, exchange_declare_ok, exchange_delete, exchange_delete_ok))(input) -} -fn exchange_declare(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, exchange) = domain_exchange_name(input)?; - if exchange.is_empty() { fail!() } - let (input, r#type) = domain_shortstr(input)?; - let (input, bits) = bit(input, 5)?; - let passive = bits[0]; - let durable = bits[1]; - let reserved_2 = bits[2]; - let reserved_3 = bits[3]; - let no_wait = bits[4]; - let (input, arguments) = domain_table(input)?; - Ok((input, Class::Exchange(Exchange::Declare { - reserved_1, - exchange, - r#type, - passive, - durable, - reserved_2, - reserved_3, - no_wait, - arguments, - }))) -} -fn exchange_declare_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - Ok((input, Class::Exchange(Exchange::DeclareOk { - }))) -} -fn exchange_delete(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, exchange) = domain_exchange_name(input)?; - if exchange.is_empty() { fail!() } - let (input, bits) = bit(input, 2)?; - let if_unused = bits[0]; - let no_wait = bits[1]; - Ok((input, Class::Exchange(Exchange::Delete { - reserved_1, - exchange, - if_unused, - no_wait, - }))) -} -fn exchange_delete_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - Ok((input, Class::Exchange(Exchange::DeleteOk { - }))) -} -fn queue(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - alt((queue_declare, queue_declare_ok, queue_bind, queue_bind_ok, queue_unbind, queue_unbind_ok, queue_purge, queue_purge_ok, queue_delete, queue_delete_ok))(input) -} -fn queue_declare(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, bits) = bit(input, 5)?; - let passive = bits[0]; - let durable = bits[1]; - let exclusive = bits[2]; - let auto_delete = bits[3]; - let no_wait = bits[4]; - let (input, arguments) = domain_table(input)?; - Ok((input, Class::Queue(Queue::Declare { - reserved_1, - queue, - passive, - durable, - exclusive, - auto_delete, - no_wait, - arguments, - }))) -} -fn queue_declare_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - let (input, queue) = domain_queue_name(input)?; - if queue.is_empty() { fail!() } - let (input, message_count) = domain_message_count(input)?; - let (input, consumer_count) = domain_long(input)?; - Ok((input, Class::Queue(Queue::DeclareOk { - queue, - message_count, - consumer_count, - }))) -} -fn queue_bind(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - let (input, bits) = bit(input, 1)?; - let no_wait = bits[0]; - let (input, arguments) = domain_table(input)?; - Ok((input, Class::Queue(Queue::Bind { - reserved_1, - queue, - exchange, - routing_key, - no_wait, - arguments, - }))) -} -fn queue_bind_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - Ok((input, Class::Queue(Queue::BindOk { - }))) -} -fn queue_unbind(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - let (input, arguments) = domain_table(input)?; - Ok((input, Class::Queue(Queue::Unbind { - reserved_1, - queue, - exchange, - routing_key, - arguments, - }))) -} -fn queue_unbind_ok(input: &[u8]) -> IResult { - let (input, _) = tag([51])(input)?; - Ok((input, Class::Queue(Queue::UnbindOk { - }))) -} -fn queue_purge(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, bits) = bit(input, 1)?; - let no_wait = bits[0]; - Ok((input, Class::Queue(Queue::Purge { - reserved_1, - queue, - no_wait, - }))) -} -fn queue_purge_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - let (input, message_count) = domain_message_count(input)?; - Ok((input, Class::Queue(Queue::PurgeOk { - message_count, - }))) -} -fn queue_delete(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, bits) = bit(input, 3)?; - let if_unused = bits[0]; - let if_empty = bits[1]; - let no_wait = bits[2]; - Ok((input, Class::Queue(Queue::Delete { - reserved_1, - queue, - if_unused, - if_empty, - no_wait, - }))) -} -fn queue_delete_ok(input: &[u8]) -> IResult { - let (input, _) = tag([41])(input)?; - let (input, message_count) = domain_message_count(input)?; - Ok((input, Class::Queue(Queue::DeleteOk { - message_count, - }))) -} -fn basic(input: &[u8]) -> IResult { - let (input, _) = tag([60])(input)?; - alt((basic_qos, basic_qos_ok, basic_consume, basic_consume_ok, basic_cancel, basic_cancel_ok, basic_publish, basic_return, basic_deliver, basic_get, basic_get_ok, basic_get_empty, basic_ack, basic_reject, basic_recover_async, basic_recover, basic_recover_ok))(input) -} -fn basic_qos(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, prefetch_size) = domain_long(input)?; - let (input, prefetch_count) = domain_short(input)?; - let (input, bits) = bit(input, 1)?; - let global = bits[0]; - Ok((input, Class::Basic(Basic::Qos { - prefetch_size, - prefetch_count, - global, - }))) -} -fn basic_qos_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - Ok((input, Class::Basic(Basic::QosOk { - }))) -} -fn basic_consume(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - let (input, bits) = bit(input, 4)?; - let no_local = bits[0]; - let no_ack = bits[1]; - let exclusive = bits[2]; - let no_wait = bits[3]; - let (input, arguments) = domain_table(input)?; - Ok((input, Class::Basic(Basic::Consume { - reserved_1, - queue, - consumer_tag, - no_local, - no_ack, - exclusive, - no_wait, - arguments, - }))) -} -fn basic_consume_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - Ok((input, Class::Basic(Basic::ConsumeOk { - consumer_tag, - }))) -} -fn basic_cancel(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - let (input, bits) = bit(input, 1)?; - let no_wait = bits[0]; - Ok((input, Class::Basic(Basic::Cancel { - consumer_tag, - no_wait, - }))) -} -fn basic_cancel_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - Ok((input, Class::Basic(Basic::CancelOk { - consumer_tag, - }))) -} -fn basic_publish(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - let (input, bits) = bit(input, 2)?; - let mandatory = bits[0]; - let immediate = bits[1]; - Ok((input, Class::Basic(Basic::Publish { - reserved_1, - exchange, - routing_key, - mandatory, - immediate, - }))) -} -fn basic_return(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - let (input, reply_code) = domain_reply_code(input)?; - let (input, reply_text) = domain_reply_text(input)?; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - Ok((input, Class::Basic(Basic::Return { - reply_code, - reply_text, - exchange, - routing_key, - }))) -} -fn basic_deliver(input: &[u8]) -> IResult { - let (input, _) = tag([60])(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - let (input, delivery_tag) = domain_delivery_tag(input)?; - let (input, bits) = bit(input, 1)?; - let redelivered = bits[0]; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - Ok((input, Class::Basic(Basic::Deliver { - consumer_tag, - delivery_tag, - redelivered, - exchange, - routing_key, - }))) -} -fn basic_get(input: &[u8]) -> IResult { - let (input, _) = tag([70])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, bits) = bit(input, 1)?; - let no_ack = bits[0]; - Ok((input, Class::Basic(Basic::Get { - reserved_1, - queue, - no_ack, - }))) -} -fn basic_get_ok(input: &[u8]) -> IResult { - let (input, _) = tag([71])(input)?; - let (input, delivery_tag) = domain_delivery_tag(input)?; - let (input, bits) = bit(input, 1)?; - let redelivered = bits[0]; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - let (input, message_count) = domain_message_count(input)?; - Ok((input, Class::Basic(Basic::GetOk { - delivery_tag, - redelivered, - exchange, - routing_key, - message_count, - }))) -} -fn basic_get_empty(input: &[u8]) -> IResult { - let (input, _) = tag([72])(input)?; - let (input, reserved_1) = domain_shortstr(input)?; - Ok((input, Class::Basic(Basic::GetEmpty { - reserved_1, - }))) -} -fn basic_ack(input: &[u8]) -> IResult { - let (input, _) = tag([80])(input)?; - let (input, delivery_tag) = domain_delivery_tag(input)?; - let (input, bits) = bit(input, 1)?; - let multiple = bits[0]; - Ok((input, Class::Basic(Basic::Ack { - delivery_tag, - multiple, - }))) -} -fn basic_reject(input: &[u8]) -> IResult { - let (input, _) = tag([90])(input)?; - let (input, delivery_tag) = domain_delivery_tag(input)?; - let (input, bits) = bit(input, 1)?; - let requeue = bits[0]; - Ok((input, Class::Basic(Basic::Reject { - delivery_tag, - requeue, - }))) -} -fn basic_recover_async(input: &[u8]) -> IResult { - let (input, _) = tag([100])(input)?; - let (input, bits) = bit(input, 1)?; - let requeue = bits[0]; - Ok((input, Class::Basic(Basic::RecoverAsync { - requeue, - }))) -} -fn basic_recover(input: &[u8]) -> IResult { - let (input, _) = tag([110])(input)?; - let (input, bits) = bit(input, 1)?; - let requeue = bits[0]; - Ok((input, Class::Basic(Basic::Recover { - requeue, - }))) -} -fn basic_recover_ok(input: &[u8]) -> IResult { - let (input, _) = tag([111])(input)?; - Ok((input, Class::Basic(Basic::RecoverOk { - }))) -} -fn tx(input: &[u8]) -> IResult { - let (input, _) = tag([90])(input)?; - alt((tx_select, tx_select_ok, tx_commit, tx_commit_ok, tx_rollback, tx_rollback_ok))(input) -} -fn tx_select(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - Ok((input, Class::Tx(Tx::Select { - }))) -} -fn tx_select_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - Ok((input, Class::Tx(Tx::SelectOk { - }))) -} -fn tx_commit(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - Ok((input, Class::Tx(Tx::Commit { - }))) -} -fn tx_commit_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - Ok((input, Class::Tx(Tx::CommitOk { - }))) -} -fn tx_rollback(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - Ok((input, Class::Tx(Tx::Rollback { - }))) -} -fn tx_rollback_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - Ok((input, Class::Tx(Tx::RollbackOk { - }))) -} - -} -mod write { -use super::*; -use crate::classes::write_helper::*; -use crate::error::TransError; -use std::io::Write; - -pub fn write_method(class: Class, mut writer: W) -> Result<(), TransError> { - match class { - Class::Connection(Connection::Start { - version_major, - version_minor, - server_properties, - mechanisms, - locales, - }) => { - writer.write_all(&[10, 10])?; - octet(version_major, &mut writer)?; - octet(version_minor, &mut writer)?; - table(server_properties, &mut writer)?; - longstr(mechanisms, &mut writer)?; - longstr(locales, &mut writer)?; - } - Class::Connection(Connection::StartOk { - client_properties, - mechanism, - response, - locale, - }) => { - writer.write_all(&[10, 11])?; - table(client_properties, &mut writer)?; - shortstr(mechanism, &mut writer)?; - longstr(response, &mut writer)?; - shortstr(locale, &mut writer)?; - } - Class::Connection(Connection::Secure { - challenge, - }) => { - writer.write_all(&[10, 20])?; - longstr(challenge, &mut writer)?; - } - Class::Connection(Connection::SecureOk { - response, - }) => { - writer.write_all(&[10, 21])?; - longstr(response, &mut writer)?; - } - Class::Connection(Connection::Tune { - channel_max, - frame_max, - heartbeat, - }) => { - writer.write_all(&[10, 30])?; - short(channel_max, &mut writer)?; - long(frame_max, &mut writer)?; - short(heartbeat, &mut writer)?; - } - Class::Connection(Connection::TuneOk { - channel_max, - frame_max, - heartbeat, - }) => { - writer.write_all(&[10, 31])?; - short(channel_max, &mut writer)?; - long(frame_max, &mut writer)?; - short(heartbeat, &mut writer)?; - } - Class::Connection(Connection::Open { - virtual_host, - reserved_1, - reserved_2, - }) => { - writer.write_all(&[10, 40])?; - shortstr(virtual_host, &mut writer)?; - shortstr(reserved_1, &mut writer)?; - bit(&[reserved_2, ], &mut writer)?; - } - Class::Connection(Connection::OpenOk { - reserved_1, - }) => { - writer.write_all(&[10, 41])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Connection(Connection::Close { - reply_code, - reply_text, - class_id, - method_id, - }) => { - writer.write_all(&[10, 50])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - short(class_id, &mut writer)?; - short(method_id, &mut writer)?; - } - Class::Connection(Connection::CloseOk { - }) => { - writer.write_all(&[10, 51])?; - } - Class::Connection(Connection::Blocked { - reason, - }) => { - writer.write_all(&[10, 60])?; - shortstr(reason, &mut writer)?; - } - Class::Connection(Connection::Unblocked { - }) => { - writer.write_all(&[10, 61])?; - } - Class::Channel(Channel::Open { - reserved_1, - }) => { - writer.write_all(&[20, 10])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Channel(Channel::OpenOk { - reserved_1, - }) => { - writer.write_all(&[20, 11])?; - longstr(reserved_1, &mut writer)?; - } - Class::Channel(Channel::Flow { - active, - }) => { - writer.write_all(&[20, 20])?; - bit(&[active, ], &mut writer)?; - } - Class::Channel(Channel::FlowOk { - active, - }) => { - writer.write_all(&[20, 21])?; - bit(&[active, ], &mut writer)?; - } - Class::Channel(Channel::Close { - reply_code, - reply_text, - class_id, - method_id, - }) => { - writer.write_all(&[20, 40])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - short(class_id, &mut writer)?; - short(method_id, &mut writer)?; - } - Class::Channel(Channel::CloseOk { - }) => { - writer.write_all(&[20, 41])?; - } - Class::Exchange(Exchange::Declare { - reserved_1, - exchange, - r#type, - passive, - durable, - reserved_2, - reserved_3, - no_wait, - arguments, - }) => { - writer.write_all(&[40, 10])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(r#type, &mut writer)?; - bit(&[passive, durable, reserved_2, reserved_3, no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Exchange(Exchange::DeclareOk { - }) => { - writer.write_all(&[40, 11])?; - } - Class::Exchange(Exchange::Delete { - reserved_1, - exchange, - if_unused, - no_wait, - }) => { - writer.write_all(&[40, 20])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - bit(&[if_unused, no_wait, ], &mut writer)?; - } - Class::Exchange(Exchange::DeleteOk { - }) => { - writer.write_all(&[40, 21])?; - } - Class::Queue(Queue::Declare { - reserved_1, - queue, - passive, - durable, - exclusive, - auto_delete, - no_wait, - arguments, - }) => { - writer.write_all(&[50, 10])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[passive, durable, exclusive, auto_delete, no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Queue(Queue::DeclareOk { - queue, - message_count, - consumer_count, - }) => { - writer.write_all(&[50, 11])?; - shortstr(queue, &mut writer)?; - long(message_count, &mut writer)?; - long(consumer_count, &mut writer)?; - } - Class::Queue(Queue::Bind { - reserved_1, - queue, - exchange, - routing_key, - no_wait, - arguments, - }) => { - writer.write_all(&[50, 20])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - bit(&[no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Queue(Queue::BindOk { - }) => { - writer.write_all(&[50, 21])?; - } - Class::Queue(Queue::Unbind { - reserved_1, - queue, - exchange, - routing_key, - arguments, - }) => { - writer.write_all(&[50, 50])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Queue(Queue::UnbindOk { - }) => { - writer.write_all(&[50, 51])?; - } - Class::Queue(Queue::Purge { - reserved_1, - queue, - no_wait, - }) => { - writer.write_all(&[50, 30])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[no_wait, ], &mut writer)?; - } - Class::Queue(Queue::PurgeOk { - message_count, - }) => { - writer.write_all(&[50, 31])?; - long(message_count, &mut writer)?; - } - Class::Queue(Queue::Delete { - reserved_1, - queue, - if_unused, - if_empty, - no_wait, - }) => { - writer.write_all(&[50, 40])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[if_unused, if_empty, no_wait, ], &mut writer)?; - } - Class::Queue(Queue::DeleteOk { - message_count, - }) => { - writer.write_all(&[50, 41])?; - long(message_count, &mut writer)?; - } - Class::Basic(Basic::Qos { - prefetch_size, - prefetch_count, - global, - }) => { - writer.write_all(&[60, 10])?; - long(prefetch_size, &mut writer)?; - short(prefetch_count, &mut writer)?; - bit(&[global, ], &mut writer)?; - } - Class::Basic(Basic::QosOk { - }) => { - writer.write_all(&[60, 11])?; - } - Class::Basic(Basic::Consume { - reserved_1, - queue, - consumer_tag, - no_local, - no_ack, - exclusive, - no_wait, - arguments, - }) => { - writer.write_all(&[60, 20])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(consumer_tag, &mut writer)?; - bit(&[no_local, no_ack, exclusive, no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Basic(Basic::ConsumeOk { - consumer_tag, - }) => { - writer.write_all(&[60, 21])?; - shortstr(consumer_tag, &mut writer)?; - } - Class::Basic(Basic::Cancel { - consumer_tag, - no_wait, - }) => { - writer.write_all(&[60, 30])?; - shortstr(consumer_tag, &mut writer)?; - bit(&[no_wait, ], &mut writer)?; - } - Class::Basic(Basic::CancelOk { - consumer_tag, - }) => { - writer.write_all(&[60, 31])?; - shortstr(consumer_tag, &mut writer)?; - } - Class::Basic(Basic::Publish { - reserved_1, - exchange, - routing_key, - mandatory, - immediate, - }) => { - writer.write_all(&[60, 40])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - bit(&[mandatory, immediate, ], &mut writer)?; - } - Class::Basic(Basic::Return { - reply_code, - reply_text, - exchange, - routing_key, - }) => { - writer.write_all(&[60, 50])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - } - Class::Basic(Basic::Deliver { - consumer_tag, - delivery_tag, - redelivered, - exchange, - routing_key, - }) => { - writer.write_all(&[60, 60])?; - shortstr(consumer_tag, &mut writer)?; - longlong(delivery_tag, &mut writer)?; - bit(&[redelivered, ], &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - } - Class::Basic(Basic::Get { - reserved_1, - queue, - no_ack, - }) => { - writer.write_all(&[60, 70])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[no_ack, ], &mut writer)?; - } - Class::Basic(Basic::GetOk { - delivery_tag, - redelivered, - exchange, - routing_key, - message_count, - }) => { - writer.write_all(&[60, 71])?; - longlong(delivery_tag, &mut writer)?; - bit(&[redelivered, ], &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - long(message_count, &mut writer)?; - } - Class::Basic(Basic::GetEmpty { - reserved_1, - }) => { - writer.write_all(&[60, 72])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Basic(Basic::Ack { - delivery_tag, - multiple, - }) => { - writer.write_all(&[60, 80])?; - longlong(delivery_tag, &mut writer)?; - bit(&[multiple, ], &mut writer)?; - } - Class::Basic(Basic::Reject { - delivery_tag, - requeue, - }) => { - writer.write_all(&[60, 90])?; - longlong(delivery_tag, &mut writer)?; - bit(&[requeue, ], &mut writer)?; - } - Class::Basic(Basic::RecoverAsync { - requeue, - }) => { - writer.write_all(&[60, 100])?; - bit(&[requeue, ], &mut writer)?; - } - Class::Basic(Basic::Recover { - requeue, - }) => { - writer.write_all(&[60, 110])?; - bit(&[requeue, ], &mut writer)?; - } - Class::Basic(Basic::RecoverOk { - }) => { - writer.write_all(&[60, 111])?; - } - Class::Tx(Tx::Select { - }) => { - writer.write_all(&[90, 10])?; - } - Class::Tx(Tx::SelectOk { - }) => { - writer.write_all(&[90, 11])?; - } - Class::Tx(Tx::Commit { - }) => { - writer.write_all(&[90, 20])?; - } - Class::Tx(Tx::CommitOk { - }) => { - writer.write_all(&[90, 21])?; - } - Class::Tx(Tx::Rollback { - }) => { - writer.write_all(&[90, 30])?; - } - Class::Tx(Tx::RollbackOk { - }) => { - writer.write_all(&[90, 31])?; - } + pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err> { + alt((connection, channel, exchange, queue, basic, tx))(input) + } + fn domain_class_id(input: &[u8]) -> IResult { + short(input) + } + fn domain_consumer_tag(input: &[u8]) -> IResult { + shortstr(input) + } + fn domain_delivery_tag(input: &[u8]) -> IResult { + longlong(input) + } + fn domain_exchange_name(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.len() > 127 { + fail!() + } + static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); + if !REGEX.is_match(&result) { + fail!() + } + Ok((input, result)) + } + fn domain_method_id(input: &[u8]) -> IResult { + short(input) + } + fn domain_path(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.is_empty() { + fail!() + } + if result.len() > 127 { + fail!() + } + Ok((input, result)) + } + fn domain_peer_properties(input: &[u8]) -> IResult { + table(input) + } + fn domain_queue_name(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.len() > 127 { + fail!() + } + static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); + if !REGEX.is_match(&result) { + fail!() + } + Ok((input, result)) + } + fn domain_message_count(input: &[u8]) -> IResult { + long(input) + } + fn domain_reply_code(input: &[u8]) -> IResult { + let (input, result) = short(input)?; + if result == 0 { + fail!() + } + Ok((input, result)) + } + fn domain_reply_text(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.is_empty() { + fail!() + } + Ok((input, result)) + } + fn domain_octet(input: &[u8]) -> IResult { + octet(input) + } + fn domain_short(input: &[u8]) -> IResult { + short(input) + } + fn domain_long(input: &[u8]) -> IResult { + long(input) + } + fn domain_longlong(input: &[u8]) -> IResult { + longlong(input) + } + fn domain_shortstr(input: &[u8]) -> IResult { + shortstr(input) + } + fn domain_longstr(input: &[u8]) -> IResult { + longstr(input) + } + fn domain_timestamp(input: &[u8]) -> IResult { + timestamp(input) + } + fn domain_table(input: &[u8]) -> IResult
{ + table(input) + } + fn connection(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + alt(( + connection_start, + connection_start_ok, + connection_secure, + connection_secure_ok, + connection_tune, + connection_tune_ok, + connection_open, + connection_open_ok, + connection_close, + connection_close_ok, + connection_blocked, + connection_unblocked, + ))(input) + } + fn connection_start(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, version_major) = domain_octet(input)?; + let (input, version_minor) = domain_octet(input)?; + let (input, server_properties) = domain_peer_properties(input)?; + let (input, mechanisms) = domain_longstr(input)?; + if mechanisms.is_empty() { + fail!() + } + let (input, locales) = domain_longstr(input)?; + if locales.is_empty() { + fail!() + } + Ok(( + input, + Class::Connection(Connection::Start { + version_major, + version_minor, + server_properties, + mechanisms, + locales, + }), + )) + } + fn connection_start_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + let (input, client_properties) = domain_peer_properties(input)?; + let (input, mechanism) = domain_shortstr(input)?; + if mechanism.is_empty() { + fail!() + } + let (input, response) = domain_longstr(input)?; + if response.is_empty() { + fail!() + } + let (input, locale) = domain_shortstr(input)?; + if locale.is_empty() { + fail!() + } + Ok(( + input, + Class::Connection(Connection::StartOk { + client_properties, + mechanism, + response, + locale, + }), + )) + } + fn connection_secure(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, challenge) = domain_longstr(input)?; + Ok((input, Class::Connection(Connection::Secure { challenge }))) + } + fn connection_secure_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + let (input, response) = domain_longstr(input)?; + if response.is_empty() { + fail!() + } + Ok((input, Class::Connection(Connection::SecureOk { response }))) + } + fn connection_tune(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + let (input, channel_max) = domain_short(input)?; + let (input, frame_max) = domain_long(input)?; + let (input, heartbeat) = domain_short(input)?; + Ok(( + input, + Class::Connection(Connection::Tune { + channel_max, + frame_max, + heartbeat, + }), + )) + } + fn connection_tune_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + let (input, channel_max) = domain_short(input)?; + if channel_max == 0 { + fail!() + } + let (input, frame_max) = domain_long(input)?; + let (input, heartbeat) = domain_short(input)?; + Ok(( + input, + Class::Connection(Connection::TuneOk { + channel_max, + frame_max, + heartbeat, + }), + )) + } + fn connection_open(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, virtual_host) = domain_path(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + let (input, bits) = bit(input, 1)?; + let reserved_2 = bits[0]; + Ok(( + input, + Class::Connection(Connection::Open { + virtual_host, + reserved_1, + reserved_2, + }), + )) + } + fn connection_open_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + Ok((input, Class::Connection(Connection::OpenOk { reserved_1 }))) + } + fn connection_close(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + let (input, reply_code) = domain_reply_code(input)?; + let (input, reply_text) = domain_reply_text(input)?; + let (input, class_id) = domain_class_id(input)?; + let (input, method_id) = domain_method_id(input)?; + Ok(( + input, + Class::Connection(Connection::Close { + reply_code, + reply_text, + class_id, + method_id, + }), + )) + } + fn connection_close_ok(input: &[u8]) -> IResult { + let (input, _) = tag([51])(input)?; + Ok((input, Class::Connection(Connection::CloseOk {}))) + } + fn connection_blocked(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + let (input, reason) = domain_shortstr(input)?; + Ok((input, Class::Connection(Connection::Blocked { reason }))) + } + fn connection_unblocked(input: &[u8]) -> IResult { + let (input, _) = tag([61])(input)?; + Ok((input, Class::Connection(Connection::Unblocked {}))) + } + fn channel(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + alt(( + channel_open, + channel_open_ok, + channel_flow, + channel_flow_ok, + channel_close, + channel_close_ok, + ))(input) + } + fn channel_open(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + Ok((input, Class::Channel(Channel::Open { reserved_1 }))) + } + fn channel_open_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + let (input, reserved_1) = domain_longstr(input)?; + Ok((input, Class::Channel(Channel::OpenOk { reserved_1 }))) + } + fn channel_flow(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, bits) = bit(input, 1)?; + let active = bits[0]; + Ok((input, Class::Channel(Channel::Flow { active }))) + } + fn channel_flow_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + let (input, bits) = bit(input, 1)?; + let active = bits[0]; + Ok((input, Class::Channel(Channel::FlowOk { active }))) + } + fn channel_close(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, reply_code) = domain_reply_code(input)?; + let (input, reply_text) = domain_reply_text(input)?; + let (input, class_id) = domain_class_id(input)?; + let (input, method_id) = domain_method_id(input)?; + Ok(( + input, + Class::Channel(Channel::Close { + reply_code, + reply_text, + class_id, + method_id, + }), + )) + } + fn channel_close_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + Ok((input, Class::Channel(Channel::CloseOk {}))) + } + fn exchange(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + alt(( + exchange_declare, + exchange_declare_ok, + exchange_delete, + exchange_delete_ok, + ))(input) + } + fn exchange_declare(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, exchange) = domain_exchange_name(input)?; + if exchange.is_empty() { + fail!() + } + let (input, r#type) = domain_shortstr(input)?; + let (input, bits) = bit(input, 5)?; + let passive = bits[0]; + let durable = bits[1]; + let reserved_2 = bits[2]; + let reserved_3 = bits[3]; + let no_wait = bits[4]; + let (input, arguments) = domain_table(input)?; + Ok(( + input, + Class::Exchange(Exchange::Declare { + reserved_1, + exchange, + r#type, + passive, + durable, + reserved_2, + reserved_3, + no_wait, + arguments, + }), + )) + } + fn exchange_declare_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + Ok((input, Class::Exchange(Exchange::DeclareOk {}))) + } + fn exchange_delete(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, exchange) = domain_exchange_name(input)?; + if exchange.is_empty() { + fail!() + } + let (input, bits) = bit(input, 2)?; + let if_unused = bits[0]; + let no_wait = bits[1]; + Ok(( + input, + Class::Exchange(Exchange::Delete { + reserved_1, + exchange, + if_unused, + no_wait, + }), + )) + } + fn exchange_delete_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + Ok((input, Class::Exchange(Exchange::DeleteOk {}))) + } + fn queue(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + alt(( + queue_declare, + queue_declare_ok, + queue_bind, + queue_bind_ok, + queue_unbind, + queue_unbind_ok, + queue_purge, + queue_purge_ok, + queue_delete, + queue_delete_ok, + ))(input) + } + fn queue_declare(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 5)?; + let passive = bits[0]; + let durable = bits[1]; + let exclusive = bits[2]; + let auto_delete = bits[3]; + let no_wait = bits[4]; + let (input, arguments) = domain_table(input)?; + Ok(( + input, + Class::Queue(Queue::Declare { + reserved_1, + queue, + passive, + durable, + exclusive, + auto_delete, + no_wait, + arguments, + }), + )) + } + fn queue_declare_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + let (input, queue) = domain_queue_name(input)?; + if queue.is_empty() { + fail!() + } + let (input, message_count) = domain_message_count(input)?; + let (input, consumer_count) = domain_long(input)?; + Ok(( + input, + Class::Queue(Queue::DeclareOk { + queue, + message_count, + consumer_count, + }), + )) + } + fn queue_bind(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, bits) = bit(input, 1)?; + let no_wait = bits[0]; + let (input, arguments) = domain_table(input)?; + Ok(( + input, + Class::Queue(Queue::Bind { + reserved_1, + queue, + exchange, + routing_key, + no_wait, + arguments, + }), + )) + } + fn queue_bind_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + Ok((input, Class::Queue(Queue::BindOk {}))) + } + fn queue_unbind(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, arguments) = domain_table(input)?; + Ok(( + input, + Class::Queue(Queue::Unbind { + reserved_1, + queue, + exchange, + routing_key, + arguments, + }), + )) + } + fn queue_unbind_ok(input: &[u8]) -> IResult { + let (input, _) = tag([51])(input)?; + Ok((input, Class::Queue(Queue::UnbindOk {}))) + } + fn queue_purge(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 1)?; + let no_wait = bits[0]; + Ok(( + input, + Class::Queue(Queue::Purge { + reserved_1, + queue, + no_wait, + }), + )) + } + fn queue_purge_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + let (input, message_count) = domain_message_count(input)?; + Ok((input, Class::Queue(Queue::PurgeOk { message_count }))) + } + fn queue_delete(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 3)?; + let if_unused = bits[0]; + let if_empty = bits[1]; + let no_wait = bits[2]; + Ok(( + input, + Class::Queue(Queue::Delete { + reserved_1, + queue, + if_unused, + if_empty, + no_wait, + }), + )) + } + fn queue_delete_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + let (input, message_count) = domain_message_count(input)?; + Ok((input, Class::Queue(Queue::DeleteOk { message_count }))) + } + fn basic(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + alt(( + basic_qos, + basic_qos_ok, + basic_consume, + basic_consume_ok, + basic_cancel, + basic_cancel_ok, + basic_publish, + basic_return, + basic_deliver, + basic_get, + basic_get_ok, + basic_get_empty, + basic_ack, + basic_reject, + basic_recover_async, + basic_recover, + basic_recover_ok, + ))(input) + } + fn basic_qos(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, prefetch_size) = domain_long(input)?; + let (input, prefetch_count) = domain_short(input)?; + let (input, bits) = bit(input, 1)?; + let global = bits[0]; + Ok(( + input, + Class::Basic(Basic::Qos { + prefetch_size, + prefetch_count, + global, + }), + )) + } + fn basic_qos_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + Ok((input, Class::Basic(Basic::QosOk {}))) + } + fn basic_consume(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + let (input, bits) = bit(input, 4)?; + let no_local = bits[0]; + let no_ack = bits[1]; + let exclusive = bits[2]; + let no_wait = bits[3]; + let (input, arguments) = domain_table(input)?; + Ok(( + input, + Class::Basic(Basic::Consume { + reserved_1, + queue, + consumer_tag, + no_local, + no_ack, + exclusive, + no_wait, + arguments, + }), + )) + } + fn basic_consume_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + Ok((input, Class::Basic(Basic::ConsumeOk { consumer_tag }))) + } + fn basic_cancel(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + let (input, bits) = bit(input, 1)?; + let no_wait = bits[0]; + Ok(( + input, + Class::Basic(Basic::Cancel { + consumer_tag, + no_wait, + }), + )) + } + fn basic_cancel_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + Ok((input, Class::Basic(Basic::CancelOk { consumer_tag }))) + } + fn basic_publish(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, bits) = bit(input, 2)?; + let mandatory = bits[0]; + let immediate = bits[1]; + Ok(( + input, + Class::Basic(Basic::Publish { + reserved_1, + exchange, + routing_key, + mandatory, + immediate, + }), + )) + } + fn basic_return(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + let (input, reply_code) = domain_reply_code(input)?; + let (input, reply_text) = domain_reply_text(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + Ok(( + input, + Class::Basic(Basic::Return { + reply_code, + reply_text, + exchange, + routing_key, + }), + )) + } + fn basic_deliver(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let redelivered = bits[0]; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + Ok(( + input, + Class::Basic(Basic::Deliver { + consumer_tag, + delivery_tag, + redelivered, + exchange, + routing_key, + }), + )) + } + fn basic_get(input: &[u8]) -> IResult { + let (input, _) = tag([70])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 1)?; + let no_ack = bits[0]; + Ok(( + input, + Class::Basic(Basic::Get { + reserved_1, + queue, + no_ack, + }), + )) + } + fn basic_get_ok(input: &[u8]) -> IResult { + let (input, _) = tag([71])(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let redelivered = bits[0]; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, message_count) = domain_message_count(input)?; + Ok(( + input, + Class::Basic(Basic::GetOk { + delivery_tag, + redelivered, + exchange, + routing_key, + message_count, + }), + )) + } + fn basic_get_empty(input: &[u8]) -> IResult { + let (input, _) = tag([72])(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + Ok((input, Class::Basic(Basic::GetEmpty { reserved_1 }))) + } + fn basic_ack(input: &[u8]) -> IResult { + let (input, _) = tag([80])(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let multiple = bits[0]; + Ok(( + input, + Class::Basic(Basic::Ack { + delivery_tag, + multiple, + }), + )) + } + fn basic_reject(input: &[u8]) -> IResult { + let (input, _) = tag([90])(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let requeue = bits[0]; + Ok(( + input, + Class::Basic(Basic::Reject { + delivery_tag, + requeue, + }), + )) + } + fn basic_recover_async(input: &[u8]) -> IResult { + let (input, _) = tag([100])(input)?; + let (input, bits) = bit(input, 1)?; + let requeue = bits[0]; + Ok((input, Class::Basic(Basic::RecoverAsync { requeue }))) + } + fn basic_recover(input: &[u8]) -> IResult { + let (input, _) = tag([110])(input)?; + let (input, bits) = bit(input, 1)?; + let requeue = bits[0]; + Ok((input, Class::Basic(Basic::Recover { requeue }))) + } + fn basic_recover_ok(input: &[u8]) -> IResult { + let (input, _) = tag([111])(input)?; + Ok((input, Class::Basic(Basic::RecoverOk {}))) + } + fn tx(input: &[u8]) -> IResult { + let (input, _) = tag([90])(input)?; + alt(( + tx_select, + tx_select_ok, + tx_commit, + tx_commit_ok, + tx_rollback, + tx_rollback_ok, + ))(input) + } + fn tx_select(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + Ok((input, Class::Tx(Tx::Select {}))) + } + fn tx_select_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + Ok((input, Class::Tx(Tx::SelectOk {}))) + } + fn tx_commit(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + Ok((input, Class::Tx(Tx::Commit {}))) + } + fn tx_commit_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + Ok((input, Class::Tx(Tx::CommitOk {}))) + } + fn tx_rollback(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + Ok((input, Class::Tx(Tx::Rollback {}))) + } + fn tx_rollback_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + Ok((input, Class::Tx(Tx::RollbackOk {}))) } - Ok(()) } +pub mod write { + use super::*; + use crate::classes::write_helper::*; + use crate::error::TransError; + use tokio::io::AsyncWriteExt; + + pub async fn write_method( + class: Class, + mut writer: W, + ) -> Result<(), TransError> { + match class { + Class::Connection(Connection::Start { + version_major, + version_minor, + server_properties, + mechanisms, + locales, + }) => { + writer.write_all(&[10, 10]).await?; + octet(version_major, &mut writer).await?; + octet(version_minor, &mut writer).await?; + table(server_properties, &mut writer).await?; + longstr(mechanisms, &mut writer).await?; + longstr(locales, &mut writer).await?; + } + Class::Connection(Connection::StartOk { + client_properties, + mechanism, + response, + locale, + }) => { + writer.write_all(&[10, 11]).await?; + table(client_properties, &mut writer).await?; + shortstr(mechanism, &mut writer).await?; + longstr(response, &mut writer).await?; + shortstr(locale, &mut writer).await?; + } + Class::Connection(Connection::Secure { challenge }) => { + writer.write_all(&[10, 20]).await?; + longstr(challenge, &mut writer).await?; + } + Class::Connection(Connection::SecureOk { response }) => { + writer.write_all(&[10, 21]).await?; + longstr(response, &mut writer).await?; + } + Class::Connection(Connection::Tune { + channel_max, + frame_max, + heartbeat, + }) => { + writer.write_all(&[10, 30]).await?; + short(channel_max, &mut writer).await?; + long(frame_max, &mut writer).await?; + short(heartbeat, &mut writer).await?; + } + Class::Connection(Connection::TuneOk { + channel_max, + frame_max, + heartbeat, + }) => { + writer.write_all(&[10, 31]).await?; + short(channel_max, &mut writer).await?; + long(frame_max, &mut writer).await?; + short(heartbeat, &mut writer).await?; + } + Class::Connection(Connection::Open { + virtual_host, + reserved_1, + reserved_2, + }) => { + writer.write_all(&[10, 40]).await?; + shortstr(virtual_host, &mut writer).await?; + shortstr(reserved_1, &mut writer).await?; + bit(&[reserved_2], &mut writer).await?; + } + Class::Connection(Connection::OpenOk { reserved_1 }) => { + writer.write_all(&[10, 41]).await?; + shortstr(reserved_1, &mut writer).await?; + } + Class::Connection(Connection::Close { + reply_code, + reply_text, + class_id, + method_id, + }) => { + writer.write_all(&[10, 50]).await?; + short(reply_code, &mut writer).await?; + shortstr(reply_text, &mut writer).await?; + short(class_id, &mut writer).await?; + short(method_id, &mut writer).await?; + } + Class::Connection(Connection::CloseOk {}) => { + writer.write_all(&[10, 51]).await?; + } + Class::Connection(Connection::Blocked { reason }) => { + writer.write_all(&[10, 60]).await?; + shortstr(reason, &mut writer).await?; + } + Class::Connection(Connection::Unblocked {}) => { + writer.write_all(&[10, 61]).await?; + } + Class::Channel(Channel::Open { reserved_1 }) => { + writer.write_all(&[20, 10]).await?; + shortstr(reserved_1, &mut writer).await?; + } + Class::Channel(Channel::OpenOk { reserved_1 }) => { + writer.write_all(&[20, 11]).await?; + longstr(reserved_1, &mut writer).await?; + } + Class::Channel(Channel::Flow { active }) => { + writer.write_all(&[20, 20]).await?; + bit(&[active], &mut writer).await?; + } + Class::Channel(Channel::FlowOk { active }) => { + writer.write_all(&[20, 21]).await?; + bit(&[active], &mut writer).await?; + } + Class::Channel(Channel::Close { + reply_code, + reply_text, + class_id, + method_id, + }) => { + writer.write_all(&[20, 40]).await?; + short(reply_code, &mut writer).await?; + shortstr(reply_text, &mut writer).await?; + short(class_id, &mut writer).await?; + short(method_id, &mut writer).await?; + } + Class::Channel(Channel::CloseOk {}) => { + writer.write_all(&[20, 41]).await?; + } + Class::Exchange(Exchange::Declare { + reserved_1, + exchange, + r#type, + passive, + durable, + reserved_2, + reserved_3, + no_wait, + arguments, + }) => { + writer.write_all(&[40, 10]).await?; + short(reserved_1, &mut writer).await?; + shortstr(exchange, &mut writer).await?; + shortstr(r#type, &mut writer).await?; + bit( + &[passive, durable, reserved_2, reserved_3, no_wait], + &mut writer, + ) + .await?; + table(arguments, &mut writer).await?; + } + Class::Exchange(Exchange::DeclareOk {}) => { + writer.write_all(&[40, 11]).await?; + } + Class::Exchange(Exchange::Delete { + reserved_1, + exchange, + if_unused, + no_wait, + }) => { + writer.write_all(&[40, 20]).await?; + short(reserved_1, &mut writer).await?; + shortstr(exchange, &mut writer).await?; + bit(&[if_unused, no_wait], &mut writer).await?; + } + Class::Exchange(Exchange::DeleteOk {}) => { + writer.write_all(&[40, 21]).await?; + } + Class::Queue(Queue::Declare { + reserved_1, + queue, + passive, + durable, + exclusive, + auto_delete, + no_wait, + arguments, + }) => { + writer.write_all(&[50, 10]).await?; + short(reserved_1, &mut writer).await?; + shortstr(queue, &mut writer).await?; + bit( + &[passive, durable, exclusive, auto_delete, no_wait], + &mut writer, + ) + .await?; + table(arguments, &mut writer).await?; + } + Class::Queue(Queue::DeclareOk { + queue, + message_count, + consumer_count, + }) => { + writer.write_all(&[50, 11]).await?; + shortstr(queue, &mut writer).await?; + long(message_count, &mut writer).await?; + long(consumer_count, &mut writer).await?; + } + Class::Queue(Queue::Bind { + reserved_1, + queue, + exchange, + routing_key, + no_wait, + arguments, + }) => { + writer.write_all(&[50, 20]).await?; + short(reserved_1, &mut writer).await?; + shortstr(queue, &mut writer).await?; + shortstr(exchange, &mut writer).await?; + shortstr(routing_key, &mut writer).await?; + bit(&[no_wait], &mut writer).await?; + table(arguments, &mut writer).await?; + } + Class::Queue(Queue::BindOk {}) => { + writer.write_all(&[50, 21]).await?; + } + Class::Queue(Queue::Unbind { + reserved_1, + queue, + exchange, + routing_key, + arguments, + }) => { + writer.write_all(&[50, 50]).await?; + short(reserved_1, &mut writer).await?; + shortstr(queue, &mut writer).await?; + shortstr(exchange, &mut writer).await?; + shortstr(routing_key, &mut writer).await?; + table(arguments, &mut writer).await?; + } + Class::Queue(Queue::UnbindOk {}) => { + writer.write_all(&[50, 51]).await?; + } + Class::Queue(Queue::Purge { + reserved_1, + queue, + no_wait, + }) => { + writer.write_all(&[50, 30]).await?; + short(reserved_1, &mut writer).await?; + shortstr(queue, &mut writer).await?; + bit(&[no_wait], &mut writer).await?; + } + Class::Queue(Queue::PurgeOk { message_count }) => { + writer.write_all(&[50, 31]).await?; + long(message_count, &mut writer).await?; + } + Class::Queue(Queue::Delete { + reserved_1, + queue, + if_unused, + if_empty, + no_wait, + }) => { + writer.write_all(&[50, 40]).await?; + short(reserved_1, &mut writer).await?; + shortstr(queue, &mut writer).await?; + bit(&[if_unused, if_empty, no_wait], &mut writer).await?; + } + Class::Queue(Queue::DeleteOk { message_count }) => { + writer.write_all(&[50, 41]).await?; + long(message_count, &mut writer).await?; + } + Class::Basic(Basic::Qos { + prefetch_size, + prefetch_count, + global, + }) => { + writer.write_all(&[60, 10]).await?; + long(prefetch_size, &mut writer).await?; + short(prefetch_count, &mut writer).await?; + bit(&[global], &mut writer).await?; + } + Class::Basic(Basic::QosOk {}) => { + writer.write_all(&[60, 11]).await?; + } + Class::Basic(Basic::Consume { + reserved_1, + queue, + consumer_tag, + no_local, + no_ack, + exclusive, + no_wait, + arguments, + }) => { + writer.write_all(&[60, 20]).await?; + short(reserved_1, &mut writer).await?; + shortstr(queue, &mut writer).await?; + shortstr(consumer_tag, &mut writer).await?; + bit(&[no_local, no_ack, exclusive, no_wait], &mut writer).await?; + table(arguments, &mut writer).await?; + } + Class::Basic(Basic::ConsumeOk { consumer_tag }) => { + writer.write_all(&[60, 21]).await?; + shortstr(consumer_tag, &mut writer).await?; + } + Class::Basic(Basic::Cancel { + consumer_tag, + no_wait, + }) => { + writer.write_all(&[60, 30]).await?; + shortstr(consumer_tag, &mut writer).await?; + bit(&[no_wait], &mut writer).await?; + } + Class::Basic(Basic::CancelOk { consumer_tag }) => { + writer.write_all(&[60, 31]).await?; + shortstr(consumer_tag, &mut writer).await?; + } + Class::Basic(Basic::Publish { + reserved_1, + exchange, + routing_key, + mandatory, + immediate, + }) => { + writer.write_all(&[60, 40]).await?; + short(reserved_1, &mut writer).await?; + shortstr(exchange, &mut writer).await?; + shortstr(routing_key, &mut writer).await?; + bit(&[mandatory, immediate], &mut writer).await?; + } + Class::Basic(Basic::Return { + reply_code, + reply_text, + exchange, + routing_key, + }) => { + writer.write_all(&[60, 50]).await?; + short(reply_code, &mut writer).await?; + shortstr(reply_text, &mut writer).await?; + shortstr(exchange, &mut writer).await?; + shortstr(routing_key, &mut writer).await?; + } + Class::Basic(Basic::Deliver { + consumer_tag, + delivery_tag, + redelivered, + exchange, + routing_key, + }) => { + writer.write_all(&[60, 60]).await?; + shortstr(consumer_tag, &mut writer).await?; + longlong(delivery_tag, &mut writer).await?; + bit(&[redelivered], &mut writer).await?; + shortstr(exchange, &mut writer).await?; + shortstr(routing_key, &mut writer).await?; + } + Class::Basic(Basic::Get { + reserved_1, + queue, + no_ack, + }) => { + writer.write_all(&[60, 70]).await?; + short(reserved_1, &mut writer).await?; + shortstr(queue, &mut writer).await?; + bit(&[no_ack], &mut writer).await?; + } + Class::Basic(Basic::GetOk { + delivery_tag, + redelivered, + exchange, + routing_key, + message_count, + }) => { + writer.write_all(&[60, 71]).await?; + longlong(delivery_tag, &mut writer).await?; + bit(&[redelivered], &mut writer).await?; + shortstr(exchange, &mut writer).await?; + shortstr(routing_key, &mut writer).await?; + long(message_count, &mut writer).await?; + } + Class::Basic(Basic::GetEmpty { reserved_1 }) => { + writer.write_all(&[60, 72]).await?; + shortstr(reserved_1, &mut writer).await?; + } + Class::Basic(Basic::Ack { + delivery_tag, + multiple, + }) => { + writer.write_all(&[60, 80]).await?; + longlong(delivery_tag, &mut writer).await?; + bit(&[multiple], &mut writer).await?; + } + Class::Basic(Basic::Reject { + delivery_tag, + requeue, + }) => { + writer.write_all(&[60, 90]).await?; + longlong(delivery_tag, &mut writer).await?; + bit(&[requeue], &mut writer).await?; + } + Class::Basic(Basic::RecoverAsync { requeue }) => { + writer.write_all(&[60, 100]).await?; + bit(&[requeue], &mut writer).await?; + } + Class::Basic(Basic::Recover { requeue }) => { + writer.write_all(&[60, 110]).await?; + bit(&[requeue], &mut writer).await?; + } + Class::Basic(Basic::RecoverOk {}) => { + writer.write_all(&[60, 111]).await?; + } + Class::Tx(Tx::Select {}) => { + writer.write_all(&[90, 10]).await?; + } + Class::Tx(Tx::SelectOk {}) => { + writer.write_all(&[90, 11]).await?; + } + Class::Tx(Tx::Commit {}) => { + writer.write_all(&[90, 20]).await?; + } + Class::Tx(Tx::CommitOk {}) => { + writer.write_all(&[90, 21]).await?; + } + Class::Tx(Tx::Rollback {}) => { + writer.write_all(&[90, 30]).await?; + } + Class::Tx(Tx::RollbackOk {}) => { + writer.write_all(&[90, 31]).await?; + } + } + Ok(()) + } } diff --git a/amqp_transport/src/classes/mod.rs b/amqp_transport/src/classes/mod.rs index f132d0e..6a9751c 100644 --- a/amqp_transport/src/classes/mod.rs +++ b/amqp_transport/src/classes/mod.rs @@ -1,4 +1,3 @@ -use crate::classes::generated::Class; use crate::error::{ConException, ProtocolError, TransError}; use std::collections::HashMap; @@ -35,7 +34,7 @@ pub enum FieldValue { pub use generated::*; /// Parses the payload of a method frame into the class/method -pub fn parse_method(payload: &[u8]) -> Result { +pub fn parse_method(payload: &[u8]) -> Result { let nom_result = generated::parse::parse_method(payload); match nom_result { diff --git a/amqp_transport/src/classes/write_helper.rs b/amqp_transport/src/classes/write_helper.rs index a2c8031..d83cf86 100644 --- a/amqp_transport/src/classes/write_helper.rs +++ b/amqp_transport/src/classes/write_helper.rs @@ -1,33 +1,32 @@ -use crate::classes::generated::{ - Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp, -}; -use crate::classes::{FieldValue, TableFieldName}; -use crate::error::TransError; +use crate::classes::FieldValue; +use crate::classes::{Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp}; +use crate::error::Result; use anyhow::Context; -use std::io; -use std::io::Write; +use std::future::Future; +use std::pin::Pin; +use tokio::io::AsyncWriteExt; -pub fn octet(value: Octet, writer: &mut W) -> Result<(), TransError> { - writer.write_all(&[value])?; +pub async fn octet(value: Octet, writer: &mut W) -> Result<()> { + writer.write_u8(value).await?; Ok(()) } -pub fn short(value: Short, writer: &mut W) -> Result<(), TransError> { - writer.write_all(&value.to_be_bytes())?; +pub async fn short(value: Short, writer: &mut W) -> Result<()> { + writer.write_u16(value).await?; Ok(()) } -pub fn long(value: Long, writer: &mut W) -> Result<(), TransError> { - writer.write_all(&value.to_be_bytes())?; +pub async fn long(value: Long, writer: &mut W) -> Result<()> { + writer.write_u32(value).await?; Ok(()) } -pub fn longlong(value: Longlong, writer: &mut W) -> Result<(), TransError> { - writer.write_all(&value.to_be_bytes())?; +pub async fn longlong(value: Longlong, writer: &mut W) -> Result<()> { + writer.write_u64(value).await?; Ok(()) } -pub fn bit(value: &[Bit], writer: &mut W) -> Result<(), TransError> { +pub async fn bit(value: &[Bit], writer: &mut W) -> Result<()> { // accumulate bits into bytes, starting from the least significant bit in each byte // how many bits have already been packed into `current_buf` @@ -36,7 +35,7 @@ pub fn bit(value: &[Bit], writer: &mut W) -> Result<(), TransError> { for &bit in value { if already_filled >= 8 { - writer.write_all(&[current_buf])?; + writer.write_u8(current_buf).await?; current_buf = 0; already_filled = 0; } @@ -47,146 +46,151 @@ pub fn bit(value: &[Bit], writer: &mut W) -> Result<(), TransError> { } if already_filled > 0 { - writer.write_all(&[current_buf])?; + writer.write_u8(current_buf).await?; } Ok(()) } -pub fn shortstr(value: Shortstr, writer: &mut W) -> Result<(), TransError> { +pub async fn shortstr(value: Shortstr, writer: &mut W) -> Result<()> { let len = u8::try_from(value.len()).context("shortstr too long")?; - writer.write_all(&[len])?; - writer.write_all(value.as_bytes())?; + writer.write_u8(len).await?; + writer.write_all(value.as_bytes()).await?; Ok(()) } -pub fn longstr(value: Longstr, writer: &mut W) -> Result<(), TransError> { +pub async fn longstr(value: Longstr, writer: &mut W) -> Result<()> { let len = u32::try_from(value.len()).context("longstr too long")?; - writer.write_all(&len.to_be_bytes())?; - writer.write_all(value.as_slice())?; + writer.write_u32(len).await?; + writer.write_all(value.as_slice()).await?; Ok(()) } -pub fn timestamp(value: Timestamp, writer: &mut W) -> Result<(), TransError> { - writer.write_all(&value.to_be_bytes())?; +pub async fn timestamp(value: Timestamp, writer: &mut W) -> Result<()> { + writer.write_u64(value).await?; Ok(()) } -pub fn table(table: Table, writer: &mut W) -> Result<(), TransError> { +pub async fn table(table: Table, writer: &mut W) -> Result<()> { let len = u32::try_from(table.len()).context("table too big")?; - writer.write_all(&len.to_be_bytes())?; + writer.write_u32(len).await?; for (field_name, value) in table { - shortstr(field_name, writer)?; - field_value(value, writer)?; + shortstr(field_name, writer).await?; + field_value(value, writer).await?; } Ok(()) } -fn field_value(value: FieldValue, writer: &mut W) -> Result<(), TransError> { - match value { - FieldValue::Boolean(bool) => { - writer.write_all(&[b't', u8::from(bool)])?; - } - FieldValue::ShortShortInt(int) => { - writer.write_all(b"b")?; - writer.write_all(&int.to_be_bytes())?; - } - FieldValue::ShortShortUInt(int) => { - writer.write_all(&[b'B', int])?; - } - FieldValue::ShortInt(int) => { - writer.write_all(b"U")?; - writer.write_all(&int.to_be_bytes())?; - } - FieldValue::ShortUInt(int) => { - writer.write_all(b"u")?; - writer.write_all(&int.to_be_bytes())?; - } - FieldValue::LongInt(int) => { - writer.write_all(b"I")?; - writer.write_all(&int.to_be_bytes())?; - } - FieldValue::LongUInt(int) => { - writer.write_all(b"i")?; - writer.write_all(&int.to_be_bytes())?; - } - FieldValue::LongLongInt(int) => { - writer.write_all(b"L")?; - writer.write_all(&int.to_be_bytes())?; - } - FieldValue::LongLongUInt(int) => { - writer.write_all(b"l")?; - writer.write_all(&int.to_be_bytes())?; - } - FieldValue::Float(float) => { - writer.write_all(b"f")?; - writer.write_all(&float.to_be_bytes())?; - } - FieldValue::Double(float) => { - writer.write_all(b"d")?; - writer.write_all(&float.to_be_bytes())?; - } - FieldValue::DecimalValue(scale, long) => { - writer.write_all(&[b'D', scale])?; - writer.write_all(&long.to_be_bytes())?; - } - FieldValue::ShortString(str) => { - writer.write_all(b"s")?; - shortstr(str, writer)?; - } - FieldValue::LongString(str) => { - writer.write_all(b"S")?; - longstr(str, writer)?; - } - FieldValue::FieldArray(array) => { - writer.write_all(b"A")?; - let len = u32::try_from(array.len()).context("array too long")?; - writer.write_all(&len.to_be_bytes())?; +fn field_value( + value: FieldValue, + writer: &mut W, +) -> Pin> + '_>> { + Box::pin(async { + match value { + FieldValue::Boolean(bool) => { + writer.write_all(&[b't', u8::from(bool)]).await?; + } + FieldValue::ShortShortInt(int) => { + writer.write_all(b"b").await?; + writer.write_all(&int.to_be_bytes()).await?; + } + FieldValue::ShortShortUInt(int) => { + writer.write_all(&[b'B', int]).await?; + } + FieldValue::ShortInt(int) => { + writer.write_all(b"U").await?; + writer.write_all(&int.to_be_bytes()).await?; + } + FieldValue::ShortUInt(int) => { + writer.write_all(b"u").await?; + writer.write_all(&int.to_be_bytes()).await?; + } + FieldValue::LongInt(int) => { + writer.write_all(b"I").await?; + writer.write_all(&int.to_be_bytes()).await?; + } + FieldValue::LongUInt(int) => { + writer.write_all(b"i").await?; + writer.write_all(&int.to_be_bytes()).await?; + } + FieldValue::LongLongInt(int) => { + writer.write_all(b"L").await?; + writer.write_all(&int.to_be_bytes()).await?; + } + FieldValue::LongLongUInt(int) => { + writer.write_all(b"l").await?; + writer.write_all(&int.to_be_bytes()).await?; + } + FieldValue::Float(float) => { + writer.write_all(b"f").await?; + writer.write_all(&float.to_be_bytes()).await?; + } + FieldValue::Double(float) => { + writer.write_all(b"d").await?; + writer.write_all(&float.to_be_bytes()).await?; + } + FieldValue::DecimalValue(scale, long) => { + writer.write_all(&[b'D', scale]).await?; + writer.write_all(&long.to_be_bytes()).await?; + } + FieldValue::ShortString(str) => { + writer.write_all(b"s").await?; + shortstr(str, writer).await?; + } + FieldValue::LongString(str) => { + writer.write_all(b"S").await?; + longstr(str, writer).await?; + } + FieldValue::FieldArray(array) => { + writer.write_all(b"A").await?; + let len = u32::try_from(array.len()).context("array too long")?; + writer.write_all(&len.to_be_bytes()).await?; - for element in array { - field_value(element, writer)?; + for element in array { + field_value(element, writer).await?; + } + } + FieldValue::Timestamp(time) => { + writer.write_all(b"T").await?; + writer.write_all(&time.to_be_bytes()).await?; + } + FieldValue::FieldTable(_) => { + writer.write_all(b"F").await?; + } + FieldValue::Void => { + writer.write_all(b"V").await?; } } - FieldValue::Timestamp(time) => { - writer.write_all(b"T")?; - writer.write_all(&time.to_be_bytes())?; - } - FieldValue::FieldTable(_) => { - writer.write_all(b"F")?; - } - FieldValue::Void => { - writer.write_all(b"V")?; - } - } - Ok(()) + Ok(()) + }) } #[cfg(test)] mod tests { - #[test] - fn pack_few_bits() { + #[tokio::test] + async fn pack_few_bits() { let bits = vec![true, false, true]; - let mut buffer = [0u8; 1]; - super::bit(bits, &mut buffer.as_mut_slice()).unwrap(); + let mut buffer = Vec::new(); + super::bit(&bits, &mut buffer).await.unwrap(); - assert_eq!(buffer, [0b00000101]) + assert_eq!(buffer.as_slice(), &[0b00000101]) } - #[test] - fn pack_many_bits() { + #[tokio::test] + async fn pack_many_bits() { let bits = vec![ /* first 8 */ true, true, true, true, false, false, false, false, /* second 4 */ true, false, true, true, ]; - let mut buffer = [0u8; 2]; - super::bit(bits, &mut buffer.as_mut_slice()).unwrap(); + let mut buffer = Vec::new(); + super::bit(&bits, &mut buffer).await.unwrap(); assert_eq!(buffer, [0b00001111, 0b00001101]); } diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 8c57d4c..b467b14 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -1,32 +1,39 @@ -use crate::error::{ProtocolError, TransError}; -use crate::frame; -use crate::frame::FrameType; +use crate::error::{ProtocolError, Result}; +use crate::frame::{Frame, FrameType}; +use crate::{classes, frame}; use anyhow::Context; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; + +const MIN_MAX_FRAME_SIZE: usize = 4096; pub struct Connection { stream: TcpStream, + max_frame_size: usize, } impl Connection { pub fn new(stream: TcpStream) -> Self { - Self { stream } + Self { + stream, + max_frame_size: MIN_MAX_FRAME_SIZE, + } } - pub async fn start(mut self) { + pub async fn open_connection(mut self) { match self.run().await { Ok(()) => {} Err(err) => error!(%err, "Error during processing of connection"), } } - pub async fn run(&mut self) -> Result<(), TransError> { + pub async fn run(&mut self) -> Result<()> { self.negotiate_version().await?; + self.start().await?; loop { - let frame = frame::read_frame(&mut self.stream, 10000).await?; + let frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?; debug!(?frame, "received frame"); if frame.kind == FrameType::Method { let class = super::classes::parse_method(&frame.payload)?; @@ -35,7 +42,25 @@ impl Connection { } } - async fn negotiate_version(&mut self) -> Result<(), TransError> { + async fn start(&mut self) -> Result<()> { + let start_method = classes::Class::Connection(classes::Connection::Start { + version_major: 0, + version_minor: 9, + server_properties: Default::default(), + mechanisms: vec![], + locales: vec![], + }); + + let fut = classes::write::write_method(start_method, &mut self.stream); + warn!(size = %std::mem::size_of_val(&fut), "that future is big"); + // todo fix out_buffer buffer things :spiral_eyes: + // maybe have a `size` method on `Class` and use `AsyncWrite`? oh god no that's horrible + // frame::write_frame(&mut self.stream, Frame {})?; + + Ok(()) + } + + async fn negotiate_version(&mut self) -> Result<()> { const HEADER_SIZE: usize = 8; const SUPPORTED_PROTOCOL_VERSION: &[u8] = &[0, 9, 1]; const OWN_PROTOCOL_HEADER: &[u8] = b"AMQP\0\0\x09\x01"; diff --git a/amqp_transport/src/error.rs b/amqp_transport/src/error.rs index 8520a9f..7b7618f 100644 --- a/amqp_transport/src/error.rs +++ b/amqp_transport/src/error.rs @@ -1,5 +1,7 @@ use std::io::Error; +pub type Result = std::result::Result; + #[derive(Debug, thiserror::Error)] pub enum TransError { #[error("{0}")] diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs index bcc2eaf..5571649 100644 --- a/amqp_transport/src/frame.rs +++ b/amqp_transport/src/frame.rs @@ -1,6 +1,6 @@ -use crate::error::{ConException, ProtocolError, TransError}; +use crate::error::{ConException, ProtocolError, Result, TransError}; use anyhow::Context; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; const REQUIRED_FRAME_END: u8 = 0xCE; @@ -20,7 +20,7 @@ pub struct Frame { pub payload: Vec, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[repr(u8)] pub enum FrameType { Method = 1, @@ -29,7 +29,21 @@ pub enum FrameType { Heartbeat = 8, } -pub async fn read_frame(r: &mut R, max_frame_size: usize) -> Result +pub async fn write_frame(mut w: W, frame: &Frame) -> Result<()> +where + W: AsyncWriteExt + Unpin, +{ + w.write_u8(frame.kind as u8).await?; + w.write_u16(frame.channel).await?; + w.write_u32(u32::try_from(frame.payload.len()).context("frame size too big")?) + .await?; + w.write_all(&frame.payload).await?; + w.write_u8(REQUIRED_FRAME_END).await?; + + Ok(()) +} + +pub async fn read_frame(r: &mut R, max_frame_size: usize) -> Result where R: AsyncReadExt + Unpin, { @@ -59,7 +73,7 @@ where }) } -fn parse_frame_type(kind: u8, channel: u16) -> Result { +fn parse_frame_type(kind: u8, channel: u16) -> Result { match kind { frame_type::METHOD => Ok(FrameType::Method), frame_type::HEADER => Ok(FrameType::Header), diff --git a/amqp_transport/src/lib.rs b/amqp_transport/src/lib.rs index bae2e80..f9685c0 100644 --- a/amqp_transport/src/lib.rs +++ b/amqp_transport/src/lib.rs @@ -22,6 +22,6 @@ pub async fn do_thing_i_guess() -> Result<()> { let connection = Connection::new(stream); - tokio::spawn(connection.start()); + tokio::spawn(connection.open_connection()); } }