From 2455e95d45ef965812bd3c90c688304c01b65e43 Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Sun, 13 Feb 2022 20:51:24 +0100 Subject: [PATCH] parsing --- Cargo.lock | 76 + amqp_transport/Cargo.toml | 2 +- amqp_transport/src/classes/generated.rs | 2375 ++++++++++---------- amqp_transport/src/classes/mod.rs | 28 + amqp_transport/src/classes/parse_helper.rs | 168 +- amqp_transport/src/connection.rs | 17 +- amqp_transport/src/error.rs | 2 + 7 files changed, 1493 insertions(+), 1175 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6efc24c..cac33ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aho-corasick" version = "0.7.18" @@ -61,6 +76,30 @@ name = "anyhow" version = "1.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0" +dependencies = [ + "backtrace", +] + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "backtrace" +version = "0.3.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e121dee8023ce33ab248d9ce1493df03c3b38a659b240096fcbd7048ff9c31f" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] [[package]] name = "bitflags" @@ -74,6 +113,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "cc" +version = "1.0.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee" + [[package]] name = "cfg-if" version = "1.0.0" @@ -97,6 +142,12 @@ dependencies = [ "wasi", ] +[[package]] +name = "gimli" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" + [[package]] name = "heck" version = "0.4.0" @@ -178,6 +229,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "miniz_oxide" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" +dependencies = [ + "adler", + "autocfg", +] + [[package]] name = "mio" version = "0.7.14" @@ -230,6 +291,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67ac1d3f9a1d3616fd9a60c8d74296f22406a238b6a72f5cc1e6f314df4ffbf9" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.9.0" @@ -357,6 +427,12 @@ version = "0.6.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" +[[package]] +name = "rustc-demangle" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" + [[package]] name = "scopeguard" version = "1.1.0" diff --git a/amqp_transport/Cargo.toml b/amqp_transport/Cargo.toml index c9fd802..7129bf7 100644 --- a/amqp_transport/Cargo.toml +++ b/amqp_transport/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -anyhow = "1.0.53" +anyhow = { version = "1.0.53", features = ["backtrace"] } nom = "7.1.0" once_cell = "1.9.0" rand = "0.8.4" diff --git a/amqp_transport/src/classes/generated.rs b/amqp_transport/src/classes/generated.rs index 6743a57..534b8f7 100644 --- a/amqp_transport/src/classes/generated.rs +++ b/amqp_transport/src/classes/generated.rs @@ -87,9 +87,7 @@ pub enum Connection { locale: Shortstr, }, /// Index 20 - Secure { - challenge: Longstr, - }, + Secure { challenge: Longstr }, /// Index 21 SecureOk { /// must not be null @@ -115,9 +113,7 @@ pub enum Connection { reserved_2: Bit, }, /// Index 41 - OpenOk { - reserved_1: Shortstr, - }, + OpenOk { reserved_1: Shortstr }, /// Index 50 Close { reply_code: ReplyCode, @@ -128,9 +124,7 @@ pub enum Connection { /// Index 51 CloseOk, /// Index 60 - Blocked { - reason: Shortstr, - }, + Blocked { reason: Shortstr }, /// Index 61 Unblocked, } @@ -138,21 +132,13 @@ pub enum Connection { #[derive(Debug, Clone, PartialEq)] pub enum Channel { /// Index 10 - Open { - reserved_1: Shortstr, - }, + Open { reserved_1: Shortstr }, /// Index 11 - OpenOk { - reserved_1: Longstr, - }, + OpenOk { reserved_1: Longstr }, /// Index 20 - Flow { - active: Bit, - }, + Flow { active: Bit }, /// Index 21 - FlowOk { - active: Bit, - }, + FlowOk { active: Bit }, /// Index 40 Close { reply_code: ReplyCode, @@ -241,9 +227,7 @@ pub enum Queue { no_wait: NoWait, }, /// Index 31 - PurgeOk { - message_count: MessageCount, - }, + PurgeOk { message_count: MessageCount }, /// Index 40 Delete { reserved_1: Short, @@ -253,9 +237,7 @@ pub enum Queue { no_wait: NoWait, }, /// Index 41 - DeleteOk { - message_count: MessageCount, - }, + DeleteOk { message_count: MessageCount }, } /// Index 60, handler = channel #[derive(Debug, Clone, PartialEq)] @@ -280,18 +262,14 @@ pub enum Basic { arguments: Table, }, /// Index 21 - ConsumeOk { - consumer_tag: ConsumerTag, - }, + ConsumeOk { consumer_tag: ConsumerTag }, /// Index 30 Cancel { consumer_tag: ConsumerTag, no_wait: NoWait, }, /// Index 31 - CancelOk { - consumer_tag: ConsumerTag, - }, + CancelOk { consumer_tag: ConsumerTag }, /// Index 40 Publish { reserved_1: Short, @@ -330,9 +308,7 @@ pub enum Basic { message_count: MessageCount, }, /// Index 72 - GetEmpty { - reserved_1: Shortstr, - }, + GetEmpty { reserved_1: Shortstr }, /// Index 80 Ack { delivery_tag: DeliveryTag, @@ -344,13 +320,9 @@ pub enum Basic { requeue: Bit, }, /// Index 100 - RecoverAsync { - requeue: Bit, - }, + RecoverAsync { requeue: Bit }, /// Index 110 - Recover { - requeue: Bit, - }, + Recover { requeue: Bit }, /// Index 111 RecoverOk, } @@ -371,1127 +343,1214 @@ pub enum Tx { RollbackOk, } pub mod parse { -use super::*; -use crate::classes::parse_helper::*; -use crate::error::TransError; -use nom::{branch::alt, bytes::complete::tag}; -use regex::Regex; -use once_cell::sync::Lazy; + use super::*; + use crate::classes::parse_helper::*; + use crate::error::TransError; + use nom::{branch::alt, bytes::complete::tag}; + use once_cell::sync::Lazy; + use regex::Regex; -pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; - -pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err> { - alt((connection, channel, exchange, queue, basic, tx))(input) -} -fn domain_class_id(input: &[u8]) -> IResult { - short(input) -} -fn domain_consumer_tag(input: &[u8]) -> IResult { - shortstr(input) -} -fn domain_delivery_tag(input: &[u8]) -> IResult { - longlong(input) -} -fn domain_exchange_name(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.len() > 127 { fail!() } - static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); - if !REGEX.is_match(&result) { fail!() } - Ok((input, result)) -} -fn domain_method_id(input: &[u8]) -> IResult { - short(input) -} -fn domain_path(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.is_empty() { fail!() } - if result.len() > 127 { fail!() } - Ok((input, result)) -} -fn domain_peer_properties(input: &[u8]) -> IResult { - table(input) -} -fn domain_queue_name(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.len() > 127 { fail!() } - static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); - if !REGEX.is_match(&result) { fail!() } - Ok((input, result)) -} -fn domain_message_count(input: &[u8]) -> IResult { - long(input) -} -fn domain_reply_code(input: &[u8]) -> IResult { - let (input, result) = short(input)?; - if result == 0 { fail!() } - Ok((input, result)) -} -fn domain_reply_text(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.is_empty() { fail!() } - Ok((input, result)) -} -fn domain_octet(input: &[u8]) -> IResult { - octet(input) -} -fn domain_short(input: &[u8]) -> IResult { - short(input) -} -fn domain_long(input: &[u8]) -> IResult { - long(input) -} -fn domain_longlong(input: &[u8]) -> IResult { - longlong(input) -} -fn domain_shortstr(input: &[u8]) -> IResult { - shortstr(input) -} -fn domain_longstr(input: &[u8]) -> IResult { - longstr(input) -} -fn domain_timestamp(input: &[u8]) -> IResult { - timestamp(input) -} -fn domain_table(input: &[u8]) -> IResult { - table(input) -} -fn connection(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - alt((connection_start, connection_start_ok, connection_secure, connection_secure_ok, connection_tune, connection_tune_ok, connection_open, connection_open_ok, connection_close, connection_close_ok, connection_blocked, connection_unblocked))(input) -} -fn connection_start(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, version_major) = domain_octet(input)?; - let (input, version_minor) = domain_octet(input)?; - let (input, server_properties) = domain_peer_properties(input)?; - let (input, mechanisms) = domain_longstr(input)?; - if mechanisms.is_empty() { fail!() } - let (input, locales) = domain_longstr(input)?; - if locales.is_empty() { fail!() } - Ok((input, Class::Connection(Connection::Start { - version_major, - version_minor, - server_properties, - mechanisms, - locales, - }))) -} -fn connection_start_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - let (input, client_properties) = domain_peer_properties(input)?; - let (input, mechanism) = domain_shortstr(input)?; - if mechanism.is_empty() { fail!() } - let (input, response) = domain_longstr(input)?; - if response.is_empty() { fail!() } - let (input, locale) = domain_shortstr(input)?; - if locale.is_empty() { fail!() } - Ok((input, Class::Connection(Connection::StartOk { - client_properties, - mechanism, - response, - locale, - }))) -} -fn connection_secure(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, challenge) = domain_longstr(input)?; - Ok((input, Class::Connection(Connection::Secure { - challenge, - }))) -} -fn connection_secure_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - let (input, response) = domain_longstr(input)?; - if response.is_empty() { fail!() } - Ok((input, Class::Connection(Connection::SecureOk { - response, - }))) -} -fn connection_tune(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - let (input, channel_max) = domain_short(input)?; - let (input, frame_max) = domain_long(input)?; - let (input, heartbeat) = domain_short(input)?; - Ok((input, Class::Connection(Connection::Tune { - channel_max, - frame_max, - heartbeat, - }))) -} -fn connection_tune_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - let (input, channel_max) = domain_short(input)?; - if channel_max == 0 { fail!() } - let (input, frame_max) = domain_long(input)?; - let (input, heartbeat) = domain_short(input)?; - Ok((input, Class::Connection(Connection::TuneOk { - channel_max, - frame_max, - heartbeat, - }))) -} -fn connection_open(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - let (input, virtual_host) = domain_path(input)?; - let (input, reserved_1) = domain_shortstr(input)?; - let (input, bits) = bit(input, 1)?; - let reserved_2 = bits[0]; - Ok((input, Class::Connection(Connection::Open { - virtual_host, - reserved_1, - reserved_2, - }))) -} -fn connection_open_ok(input: &[u8]) -> IResult { - let (input, _) = tag([41])(input)?; - let (input, reserved_1) = domain_shortstr(input)?; - Ok((input, Class::Connection(Connection::OpenOk { - reserved_1, - }))) -} -fn connection_close(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - let (input, reply_code) = domain_reply_code(input)?; - let (input, reply_text) = domain_reply_text(input)?; - let (input, class_id) = domain_class_id(input)?; - let (input, method_id) = domain_method_id(input)?; - Ok((input, Class::Connection(Connection::Close { - reply_code, - reply_text, - class_id, - method_id, - }))) -} -fn connection_close_ok(input: &[u8]) -> IResult { - let (input, _) = tag([51])(input)?; - Ok((input, Class::Connection(Connection::CloseOk { - }))) -} -fn connection_blocked(input: &[u8]) -> IResult { - let (input, _) = tag([60])(input)?; - let (input, reason) = domain_shortstr(input)?; - Ok((input, Class::Connection(Connection::Blocked { - reason, - }))) -} -fn connection_unblocked(input: &[u8]) -> IResult { - let (input, _) = tag([61])(input)?; - Ok((input, Class::Connection(Connection::Unblocked { - }))) -} -fn channel(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - alt((channel_open, channel_open_ok, channel_flow, channel_flow_ok, channel_close, channel_close_ok))(input) -} -fn channel_open(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, reserved_1) = domain_shortstr(input)?; - Ok((input, Class::Channel(Channel::Open { - reserved_1, - }))) -} -fn channel_open_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - let (input, reserved_1) = domain_longstr(input)?; - Ok((input, Class::Channel(Channel::OpenOk { - reserved_1, - }))) -} -fn channel_flow(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, bits) = bit(input, 1)?; - let active = bits[0]; - Ok((input, Class::Channel(Channel::Flow { - active, - }))) -} -fn channel_flow_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - let (input, bits) = bit(input, 1)?; - let active = bits[0]; - Ok((input, Class::Channel(Channel::FlowOk { - active, - }))) -} -fn channel_close(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - let (input, reply_code) = domain_reply_code(input)?; - let (input, reply_text) = domain_reply_text(input)?; - let (input, class_id) = domain_class_id(input)?; - let (input, method_id) = domain_method_id(input)?; - Ok((input, Class::Channel(Channel::Close { - reply_code, - reply_text, - class_id, - method_id, - }))) -} -fn channel_close_ok(input: &[u8]) -> IResult { - let (input, _) = tag([41])(input)?; - Ok((input, Class::Channel(Channel::CloseOk { - }))) -} -fn exchange(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - alt((exchange_declare, exchange_declare_ok, exchange_delete, exchange_delete_ok))(input) -} -fn exchange_declare(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, exchange) = domain_exchange_name(input)?; - if exchange.is_empty() { fail!() } - let (input, r#type) = domain_shortstr(input)?; - let (input, bits) = bit(input, 5)?; - let passive = bits[0]; - let durable = bits[1]; - let reserved_2 = bits[2]; - let reserved_3 = bits[3]; - let no_wait = bits[4]; - let (input, arguments) = domain_table(input)?; - Ok((input, Class::Exchange(Exchange::Declare { - reserved_1, - exchange, - r#type, - passive, - durable, - reserved_2, - reserved_3, - no_wait, - arguments, - }))) -} -fn exchange_declare_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - Ok((input, Class::Exchange(Exchange::DeclareOk { - }))) -} -fn exchange_delete(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, exchange) = domain_exchange_name(input)?; - if exchange.is_empty() { fail!() } - let (input, bits) = bit(input, 2)?; - let if_unused = bits[0]; - let no_wait = bits[1]; - Ok((input, Class::Exchange(Exchange::Delete { - reserved_1, - exchange, - if_unused, - no_wait, - }))) -} -fn exchange_delete_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - Ok((input, Class::Exchange(Exchange::DeleteOk { - }))) -} -fn queue(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - alt((queue_declare, queue_declare_ok, queue_bind, queue_bind_ok, queue_unbind, queue_unbind_ok, queue_purge, queue_purge_ok, queue_delete, queue_delete_ok))(input) -} -fn queue_declare(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, bits) = bit(input, 5)?; - let passive = bits[0]; - let durable = bits[1]; - let exclusive = bits[2]; - let auto_delete = bits[3]; - let no_wait = bits[4]; - let (input, arguments) = domain_table(input)?; - Ok((input, Class::Queue(Queue::Declare { - reserved_1, - queue, - passive, - durable, - exclusive, - auto_delete, - no_wait, - arguments, - }))) -} -fn queue_declare_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - let (input, queue) = domain_queue_name(input)?; - if queue.is_empty() { fail!() } - let (input, message_count) = domain_message_count(input)?; - let (input, consumer_count) = domain_long(input)?; - Ok((input, Class::Queue(Queue::DeclareOk { - queue, - message_count, - consumer_count, - }))) -} -fn queue_bind(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - let (input, bits) = bit(input, 1)?; - let no_wait = bits[0]; - let (input, arguments) = domain_table(input)?; - Ok((input, Class::Queue(Queue::Bind { - reserved_1, - queue, - exchange, - routing_key, - no_wait, - arguments, - }))) -} -fn queue_bind_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - Ok((input, Class::Queue(Queue::BindOk { - }))) -} -fn queue_unbind(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - let (input, arguments) = domain_table(input)?; - Ok((input, Class::Queue(Queue::Unbind { - reserved_1, - queue, - exchange, - routing_key, - arguments, - }))) -} -fn queue_unbind_ok(input: &[u8]) -> IResult { - let (input, _) = tag([51])(input)?; - Ok((input, Class::Queue(Queue::UnbindOk { - }))) -} -fn queue_purge(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, bits) = bit(input, 1)?; - let no_wait = bits[0]; - Ok((input, Class::Queue(Queue::Purge { - reserved_1, - queue, - no_wait, - }))) -} -fn queue_purge_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - let (input, message_count) = domain_message_count(input)?; - Ok((input, Class::Queue(Queue::PurgeOk { - message_count, - }))) -} -fn queue_delete(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, bits) = bit(input, 3)?; - let if_unused = bits[0]; - let if_empty = bits[1]; - let no_wait = bits[2]; - Ok((input, Class::Queue(Queue::Delete { - reserved_1, - queue, - if_unused, - if_empty, - no_wait, - }))) -} -fn queue_delete_ok(input: &[u8]) -> IResult { - let (input, _) = tag([41])(input)?; - let (input, message_count) = domain_message_count(input)?; - Ok((input, Class::Queue(Queue::DeleteOk { - message_count, - }))) -} -fn basic(input: &[u8]) -> IResult { - let (input, _) = tag([60])(input)?; - alt((basic_qos, basic_qos_ok, basic_consume, basic_consume_ok, basic_cancel, basic_cancel_ok, basic_publish, basic_return, basic_deliver, basic_get, basic_get_ok, basic_get_empty, basic_ack, basic_reject, basic_recover_async, basic_recover, basic_recover_ok))(input) -} -fn basic_qos(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, prefetch_size) = domain_long(input)?; - let (input, prefetch_count) = domain_short(input)?; - let (input, bits) = bit(input, 1)?; - let global = bits[0]; - Ok((input, Class::Basic(Basic::Qos { - prefetch_size, - prefetch_count, - global, - }))) -} -fn basic_qos_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - Ok((input, Class::Basic(Basic::QosOk { - }))) -} -fn basic_consume(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - let (input, bits) = bit(input, 4)?; - let no_local = bits[0]; - let no_ack = bits[1]; - let exclusive = bits[2]; - let no_wait = bits[3]; - let (input, arguments) = domain_table(input)?; - Ok((input, Class::Basic(Basic::Consume { - reserved_1, - queue, - consumer_tag, - no_local, - no_ack, - exclusive, - no_wait, - arguments, - }))) -} -fn basic_consume_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - Ok((input, Class::Basic(Basic::ConsumeOk { - consumer_tag, - }))) -} -fn basic_cancel(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - let (input, bits) = bit(input, 1)?; - let no_wait = bits[0]; - Ok((input, Class::Basic(Basic::Cancel { - consumer_tag, - no_wait, - }))) -} -fn basic_cancel_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - Ok((input, Class::Basic(Basic::CancelOk { - consumer_tag, - }))) -} -fn basic_publish(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - let (input, bits) = bit(input, 2)?; - let mandatory = bits[0]; - let immediate = bits[1]; - Ok((input, Class::Basic(Basic::Publish { - reserved_1, - exchange, - routing_key, - mandatory, - immediate, - }))) -} -fn basic_return(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - let (input, reply_code) = domain_reply_code(input)?; - let (input, reply_text) = domain_reply_text(input)?; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - Ok((input, Class::Basic(Basic::Return { - reply_code, - reply_text, - exchange, - routing_key, - }))) -} -fn basic_deliver(input: &[u8]) -> IResult { - let (input, _) = tag([60])(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - let (input, delivery_tag) = domain_delivery_tag(input)?; - let (input, bits) = bit(input, 1)?; - let redelivered = bits[0]; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - Ok((input, Class::Basic(Basic::Deliver { - consumer_tag, - delivery_tag, - redelivered, - exchange, - routing_key, - }))) -} -fn basic_get(input: &[u8]) -> IResult { - let (input, _) = tag([70])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, bits) = bit(input, 1)?; - let no_ack = bits[0]; - Ok((input, Class::Basic(Basic::Get { - reserved_1, - queue, - no_ack, - }))) -} -fn basic_get_ok(input: &[u8]) -> IResult { - let (input, _) = tag([71])(input)?; - let (input, delivery_tag) = domain_delivery_tag(input)?; - let (input, bits) = bit(input, 1)?; - let redelivered = bits[0]; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - let (input, message_count) = domain_message_count(input)?; - Ok((input, Class::Basic(Basic::GetOk { - delivery_tag, - redelivered, - exchange, - routing_key, - message_count, - }))) -} -fn basic_get_empty(input: &[u8]) -> IResult { - let (input, _) = tag([72])(input)?; - let (input, reserved_1) = domain_shortstr(input)?; - Ok((input, Class::Basic(Basic::GetEmpty { - reserved_1, - }))) -} -fn basic_ack(input: &[u8]) -> IResult { - let (input, _) = tag([80])(input)?; - let (input, delivery_tag) = domain_delivery_tag(input)?; - let (input, bits) = bit(input, 1)?; - let multiple = bits[0]; - Ok((input, Class::Basic(Basic::Ack { - delivery_tag, - multiple, - }))) -} -fn basic_reject(input: &[u8]) -> IResult { - let (input, _) = tag([90])(input)?; - let (input, delivery_tag) = domain_delivery_tag(input)?; - let (input, bits) = bit(input, 1)?; - let requeue = bits[0]; - Ok((input, Class::Basic(Basic::Reject { - delivery_tag, - requeue, - }))) -} -fn basic_recover_async(input: &[u8]) -> IResult { - let (input, _) = tag([100])(input)?; - let (input, bits) = bit(input, 1)?; - let requeue = bits[0]; - Ok((input, Class::Basic(Basic::RecoverAsync { - requeue, - }))) -} -fn basic_recover(input: &[u8]) -> IResult { - let (input, _) = tag([110])(input)?; - let (input, bits) = bit(input, 1)?; - let requeue = bits[0]; - Ok((input, Class::Basic(Basic::Recover { - requeue, - }))) -} -fn basic_recover_ok(input: &[u8]) -> IResult { - let (input, _) = tag([111])(input)?; - Ok((input, Class::Basic(Basic::RecoverOk { - }))) -} -fn tx(input: &[u8]) -> IResult { - let (input, _) = tag([90])(input)?; - alt((tx_select, tx_select_ok, tx_commit, tx_commit_ok, tx_rollback, tx_rollback_ok))(input) -} -fn tx_select(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - Ok((input, Class::Tx(Tx::Select { - }))) -} -fn tx_select_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - Ok((input, Class::Tx(Tx::SelectOk { - }))) -} -fn tx_commit(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - Ok((input, Class::Tx(Tx::Commit { - }))) -} -fn tx_commit_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - Ok((input, Class::Tx(Tx::CommitOk { - }))) -} -fn tx_rollback(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - Ok((input, Class::Tx(Tx::Rollback { - }))) -} -fn tx_rollback_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - Ok((input, Class::Tx(Tx::RollbackOk { - }))) -} + pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; + pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err> { + alt((connection, channel, exchange, queue, basic, tx))(input) + } + fn domain_class_id(input: &[u8]) -> IResult { + short(input) + } + fn domain_consumer_tag(input: &[u8]) -> IResult { + shortstr(input) + } + fn domain_delivery_tag(input: &[u8]) -> IResult { + longlong(input) + } + fn domain_exchange_name(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.len() > 127 { + fail!() + } + static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); + if !REGEX.is_match(&result) { + fail!() + } + Ok((input, result)) + } + fn domain_method_id(input: &[u8]) -> IResult { + short(input) + } + fn domain_path(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.is_empty() { + fail!() + } + if result.len() > 127 { + fail!() + } + Ok((input, result)) + } + fn domain_peer_properties(input: &[u8]) -> IResult { + table(input) + } + fn domain_queue_name(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.len() > 127 { + fail!() + } + static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); + if !REGEX.is_match(&result) { + fail!() + } + Ok((input, result)) + } + fn domain_message_count(input: &[u8]) -> IResult { + long(input) + } + fn domain_reply_code(input: &[u8]) -> IResult { + let (input, result) = short(input)?; + if result == 0 { + fail!() + } + Ok((input, result)) + } + fn domain_reply_text(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.is_empty() { + fail!() + } + Ok((input, result)) + } + fn domain_octet(input: &[u8]) -> IResult { + octet(input) + } + fn domain_short(input: &[u8]) -> IResult { + short(input) + } + fn domain_long(input: &[u8]) -> IResult { + long(input) + } + fn domain_longlong(input: &[u8]) -> IResult { + longlong(input) + } + fn domain_shortstr(input: &[u8]) -> IResult { + shortstr(input) + } + fn domain_longstr(input: &[u8]) -> IResult { + longstr(input) + } + fn domain_timestamp(input: &[u8]) -> IResult { + timestamp(input) + } + fn domain_table(input: &[u8]) -> IResult
{ + table(input) + } + fn connection(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + alt(( + connection_start, + connection_start_ok, + connection_secure, + connection_secure_ok, + connection_tune, + connection_tune_ok, + connection_open, + connection_open_ok, + connection_close, + connection_close_ok, + connection_blocked, + connection_unblocked, + ))(input) + } + fn connection_start(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, version_major) = domain_octet(input)?; + let (input, version_minor) = domain_octet(input)?; + let (input, server_properties) = domain_peer_properties(input)?; + let (input, mechanisms) = domain_longstr(input)?; + if mechanisms.is_empty() { + fail!() + } + let (input, locales) = domain_longstr(input)?; + if locales.is_empty() { + fail!() + } + Ok(( + input, + Class::Connection(Connection::Start { + version_major, + version_minor, + server_properties, + mechanisms, + locales, + }), + )) + } + fn connection_start_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + let (input, client_properties) = domain_peer_properties(input)?; + let (input, mechanism) = domain_shortstr(input)?; + if mechanism.is_empty() { + fail!() + } + let (input, response) = domain_longstr(input)?; + if response.is_empty() { + fail!() + } + let (input, locale) = domain_shortstr(input)?; + if locale.is_empty() { + fail!() + } + Ok(( + input, + Class::Connection(Connection::StartOk { + client_properties, + mechanism, + response, + locale, + }), + )) + } + fn connection_secure(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, challenge) = domain_longstr(input)?; + Ok((input, Class::Connection(Connection::Secure { challenge }))) + } + fn connection_secure_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + let (input, response) = domain_longstr(input)?; + if response.is_empty() { + fail!() + } + Ok((input, Class::Connection(Connection::SecureOk { response }))) + } + fn connection_tune(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + let (input, channel_max) = domain_short(input)?; + let (input, frame_max) = domain_long(input)?; + let (input, heartbeat) = domain_short(input)?; + Ok(( + input, + Class::Connection(Connection::Tune { + channel_max, + frame_max, + heartbeat, + }), + )) + } + fn connection_tune_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + let (input, channel_max) = domain_short(input)?; + if channel_max == 0 { + fail!() + } + let (input, frame_max) = domain_long(input)?; + let (input, heartbeat) = domain_short(input)?; + Ok(( + input, + Class::Connection(Connection::TuneOk { + channel_max, + frame_max, + heartbeat, + }), + )) + } + fn connection_open(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, virtual_host) = domain_path(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + let (input, bits) = bit(input, 1)?; + let reserved_2 = bits[0]; + Ok(( + input, + Class::Connection(Connection::Open { + virtual_host, + reserved_1, + reserved_2, + }), + )) + } + fn connection_open_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + Ok((input, Class::Connection(Connection::OpenOk { reserved_1 }))) + } + fn connection_close(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + let (input, reply_code) = domain_reply_code(input)?; + let (input, reply_text) = domain_reply_text(input)?; + let (input, class_id) = domain_class_id(input)?; + let (input, method_id) = domain_method_id(input)?; + Ok(( + input, + Class::Connection(Connection::Close { + reply_code, + reply_text, + class_id, + method_id, + }), + )) + } + fn connection_close_ok(input: &[u8]) -> IResult { + let (input, _) = tag([51])(input)?; + Ok((input, Class::Connection(Connection::CloseOk {}))) + } + fn connection_blocked(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + let (input, reason) = domain_shortstr(input)?; + Ok((input, Class::Connection(Connection::Blocked { reason }))) + } + fn connection_unblocked(input: &[u8]) -> IResult { + let (input, _) = tag([61])(input)?; + Ok((input, Class::Connection(Connection::Unblocked {}))) + } + fn channel(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + alt(( + channel_open, + channel_open_ok, + channel_flow, + channel_flow_ok, + channel_close, + channel_close_ok, + ))(input) + } + fn channel_open(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + Ok((input, Class::Channel(Channel::Open { reserved_1 }))) + } + fn channel_open_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + let (input, reserved_1) = domain_longstr(input)?; + Ok((input, Class::Channel(Channel::OpenOk { reserved_1 }))) + } + fn channel_flow(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, bits) = bit(input, 1)?; + let active = bits[0]; + Ok((input, Class::Channel(Channel::Flow { active }))) + } + fn channel_flow_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + let (input, bits) = bit(input, 1)?; + let active = bits[0]; + Ok((input, Class::Channel(Channel::FlowOk { active }))) + } + fn channel_close(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, reply_code) = domain_reply_code(input)?; + let (input, reply_text) = domain_reply_text(input)?; + let (input, class_id) = domain_class_id(input)?; + let (input, method_id) = domain_method_id(input)?; + Ok(( + input, + Class::Channel(Channel::Close { + reply_code, + reply_text, + class_id, + method_id, + }), + )) + } + fn channel_close_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + Ok((input, Class::Channel(Channel::CloseOk {}))) + } + fn exchange(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + alt(( + exchange_declare, + exchange_declare_ok, + exchange_delete, + exchange_delete_ok, + ))(input) + } + fn exchange_declare(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, exchange) = domain_exchange_name(input)?; + if exchange.is_empty() { + fail!() + } + let (input, r#type) = domain_shortstr(input)?; + let (input, bits) = bit(input, 5)?; + let passive = bits[0]; + let durable = bits[1]; + let reserved_2 = bits[2]; + let reserved_3 = bits[3]; + let no_wait = bits[4]; + let (input, arguments) = domain_table(input)?; + Ok(( + input, + Class::Exchange(Exchange::Declare { + reserved_1, + exchange, + r#type, + passive, + durable, + reserved_2, + reserved_3, + no_wait, + arguments, + }), + )) + } + fn exchange_declare_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + Ok((input, Class::Exchange(Exchange::DeclareOk {}))) + } + fn exchange_delete(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, exchange) = domain_exchange_name(input)?; + if exchange.is_empty() { + fail!() + } + let (input, bits) = bit(input, 2)?; + let if_unused = bits[0]; + let no_wait = bits[1]; + Ok(( + input, + Class::Exchange(Exchange::Delete { + reserved_1, + exchange, + if_unused, + no_wait, + }), + )) + } + fn exchange_delete_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + Ok((input, Class::Exchange(Exchange::DeleteOk {}))) + } + fn queue(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + alt(( + queue_declare, + queue_declare_ok, + queue_bind, + queue_bind_ok, + queue_unbind, + queue_unbind_ok, + queue_purge, + queue_purge_ok, + queue_delete, + queue_delete_ok, + ))(input) + } + fn queue_declare(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 5)?; + let passive = bits[0]; + let durable = bits[1]; + let exclusive = bits[2]; + let auto_delete = bits[3]; + let no_wait = bits[4]; + let (input, arguments) = domain_table(input)?; + Ok(( + input, + Class::Queue(Queue::Declare { + reserved_1, + queue, + passive, + durable, + exclusive, + auto_delete, + no_wait, + arguments, + }), + )) + } + fn queue_declare_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + let (input, queue) = domain_queue_name(input)?; + if queue.is_empty() { + fail!() + } + let (input, message_count) = domain_message_count(input)?; + let (input, consumer_count) = domain_long(input)?; + Ok(( + input, + Class::Queue(Queue::DeclareOk { + queue, + message_count, + consumer_count, + }), + )) + } + fn queue_bind(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, bits) = bit(input, 1)?; + let no_wait = bits[0]; + let (input, arguments) = domain_table(input)?; + Ok(( + input, + Class::Queue(Queue::Bind { + reserved_1, + queue, + exchange, + routing_key, + no_wait, + arguments, + }), + )) + } + fn queue_bind_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + Ok((input, Class::Queue(Queue::BindOk {}))) + } + fn queue_unbind(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, arguments) = domain_table(input)?; + Ok(( + input, + Class::Queue(Queue::Unbind { + reserved_1, + queue, + exchange, + routing_key, + arguments, + }), + )) + } + fn queue_unbind_ok(input: &[u8]) -> IResult { + let (input, _) = tag([51])(input)?; + Ok((input, Class::Queue(Queue::UnbindOk {}))) + } + fn queue_purge(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 1)?; + let no_wait = bits[0]; + Ok(( + input, + Class::Queue(Queue::Purge { + reserved_1, + queue, + no_wait, + }), + )) + } + fn queue_purge_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + let (input, message_count) = domain_message_count(input)?; + Ok((input, Class::Queue(Queue::PurgeOk { message_count }))) + } + fn queue_delete(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 3)?; + let if_unused = bits[0]; + let if_empty = bits[1]; + let no_wait = bits[2]; + Ok(( + input, + Class::Queue(Queue::Delete { + reserved_1, + queue, + if_unused, + if_empty, + no_wait, + }), + )) + } + fn queue_delete_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + let (input, message_count) = domain_message_count(input)?; + Ok((input, Class::Queue(Queue::DeleteOk { message_count }))) + } + fn basic(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + alt(( + basic_qos, + basic_qos_ok, + basic_consume, + basic_consume_ok, + basic_cancel, + basic_cancel_ok, + basic_publish, + basic_return, + basic_deliver, + basic_get, + basic_get_ok, + basic_get_empty, + basic_ack, + basic_reject, + basic_recover_async, + basic_recover, + basic_recover_ok, + ))(input) + } + fn basic_qos(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, prefetch_size) = domain_long(input)?; + let (input, prefetch_count) = domain_short(input)?; + let (input, bits) = bit(input, 1)?; + let global = bits[0]; + Ok(( + input, + Class::Basic(Basic::Qos { + prefetch_size, + prefetch_count, + global, + }), + )) + } + fn basic_qos_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + Ok((input, Class::Basic(Basic::QosOk {}))) + } + fn basic_consume(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + let (input, bits) = bit(input, 4)?; + let no_local = bits[0]; + let no_ack = bits[1]; + let exclusive = bits[2]; + let no_wait = bits[3]; + let (input, arguments) = domain_table(input)?; + Ok(( + input, + Class::Basic(Basic::Consume { + reserved_1, + queue, + consumer_tag, + no_local, + no_ack, + exclusive, + no_wait, + arguments, + }), + )) + } + fn basic_consume_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + Ok((input, Class::Basic(Basic::ConsumeOk { consumer_tag }))) + } + fn basic_cancel(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + let (input, bits) = bit(input, 1)?; + let no_wait = bits[0]; + Ok(( + input, + Class::Basic(Basic::Cancel { + consumer_tag, + no_wait, + }), + )) + } + fn basic_cancel_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + Ok((input, Class::Basic(Basic::CancelOk { consumer_tag }))) + } + fn basic_publish(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, bits) = bit(input, 2)?; + let mandatory = bits[0]; + let immediate = bits[1]; + Ok(( + input, + Class::Basic(Basic::Publish { + reserved_1, + exchange, + routing_key, + mandatory, + immediate, + }), + )) + } + fn basic_return(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + let (input, reply_code) = domain_reply_code(input)?; + let (input, reply_text) = domain_reply_text(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + Ok(( + input, + Class::Basic(Basic::Return { + reply_code, + reply_text, + exchange, + routing_key, + }), + )) + } + fn basic_deliver(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let redelivered = bits[0]; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + Ok(( + input, + Class::Basic(Basic::Deliver { + consumer_tag, + delivery_tag, + redelivered, + exchange, + routing_key, + }), + )) + } + fn basic_get(input: &[u8]) -> IResult { + let (input, _) = tag([70])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 1)?; + let no_ack = bits[0]; + Ok(( + input, + Class::Basic(Basic::Get { + reserved_1, + queue, + no_ack, + }), + )) + } + fn basic_get_ok(input: &[u8]) -> IResult { + let (input, _) = tag([71])(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let redelivered = bits[0]; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, message_count) = domain_message_count(input)?; + Ok(( + input, + Class::Basic(Basic::GetOk { + delivery_tag, + redelivered, + exchange, + routing_key, + message_count, + }), + )) + } + fn basic_get_empty(input: &[u8]) -> IResult { + let (input, _) = tag([72])(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + Ok((input, Class::Basic(Basic::GetEmpty { reserved_1 }))) + } + fn basic_ack(input: &[u8]) -> IResult { + let (input, _) = tag([80])(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let multiple = bits[0]; + Ok(( + input, + Class::Basic(Basic::Ack { + delivery_tag, + multiple, + }), + )) + } + fn basic_reject(input: &[u8]) -> IResult { + let (input, _) = tag([90])(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let requeue = bits[0]; + Ok(( + input, + Class::Basic(Basic::Reject { + delivery_tag, + requeue, + }), + )) + } + fn basic_recover_async(input: &[u8]) -> IResult { + let (input, _) = tag([100])(input)?; + let (input, bits) = bit(input, 1)?; + let requeue = bits[0]; + Ok((input, Class::Basic(Basic::RecoverAsync { requeue }))) + } + fn basic_recover(input: &[u8]) -> IResult { + let (input, _) = tag([110])(input)?; + let (input, bits) = bit(input, 1)?; + let requeue = bits[0]; + Ok((input, Class::Basic(Basic::Recover { requeue }))) + } + fn basic_recover_ok(input: &[u8]) -> IResult { + let (input, _) = tag([111])(input)?; + Ok((input, Class::Basic(Basic::RecoverOk {}))) + } + fn tx(input: &[u8]) -> IResult { + let (input, _) = tag([90])(input)?; + alt(( + tx_select, + tx_select_ok, + tx_commit, + tx_commit_ok, + tx_rollback, + tx_rollback_ok, + ))(input) + } + fn tx_select(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + Ok((input, Class::Tx(Tx::Select {}))) + } + fn tx_select_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + Ok((input, Class::Tx(Tx::SelectOk {}))) + } + fn tx_commit(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + Ok((input, Class::Tx(Tx::Commit {}))) + } + fn tx_commit_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + Ok((input, Class::Tx(Tx::CommitOk {}))) + } + fn tx_rollback(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + Ok((input, Class::Tx(Tx::Rollback {}))) + } + fn tx_rollback_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + Ok((input, Class::Tx(Tx::RollbackOk {}))) + } } pub mod write { -use super::*; -use crate::classes::write_helper::*; -use crate::error::TransError; -use std::io::Write; + use super::*; + use crate::classes::write_helper::*; + use crate::error::TransError; + use std::io::Write; -pub fn write_method(class: Class, mut writer: W) -> Result<(), TransError> { - match class { - Class::Connection(Connection::Start { - version_major, - version_minor, - server_properties, - mechanisms, - locales, - }) => { - writer.write_all(&[0, 10, 0, 10])?; - octet(version_major, &mut writer)?; - octet(version_minor, &mut writer)?; - table(server_properties, &mut writer)?; - longstr(mechanisms, &mut writer)?; - longstr(locales, &mut writer)?; - } - Class::Connection(Connection::StartOk { - client_properties, - mechanism, - response, - locale, - }) => { - writer.write_all(&[0, 10, 0, 11])?; - table(client_properties, &mut writer)?; - shortstr(mechanism, &mut writer)?; - longstr(response, &mut writer)?; - shortstr(locale, &mut writer)?; - } - Class::Connection(Connection::Secure { - challenge, - }) => { - writer.write_all(&[0, 10, 0, 20])?; - longstr(challenge, &mut writer)?; - } - Class::Connection(Connection::SecureOk { - response, - }) => { - writer.write_all(&[0, 10, 0, 21])?; - longstr(response, &mut writer)?; - } - Class::Connection(Connection::Tune { - channel_max, - frame_max, - heartbeat, - }) => { - writer.write_all(&[0, 10, 0, 30])?; - short(channel_max, &mut writer)?; - long(frame_max, &mut writer)?; - short(heartbeat, &mut writer)?; - } - Class::Connection(Connection::TuneOk { - channel_max, - frame_max, - heartbeat, - }) => { - writer.write_all(&[0, 10, 0, 31])?; - short(channel_max, &mut writer)?; - long(frame_max, &mut writer)?; - short(heartbeat, &mut writer)?; - } - Class::Connection(Connection::Open { - virtual_host, - reserved_1, - reserved_2, - }) => { - writer.write_all(&[0, 10, 0, 40])?; - shortstr(virtual_host, &mut writer)?; - shortstr(reserved_1, &mut writer)?; - bit(&[reserved_2, ], &mut writer)?; - } - Class::Connection(Connection::OpenOk { - reserved_1, - }) => { - writer.write_all(&[0, 10, 0, 41])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Connection(Connection::Close { - reply_code, - reply_text, - class_id, - method_id, - }) => { - writer.write_all(&[0, 10, 0, 50])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - short(class_id, &mut writer)?; - short(method_id, &mut writer)?; - } - Class::Connection(Connection::CloseOk { - }) => { - writer.write_all(&[0, 10, 0, 51])?; - } - Class::Connection(Connection::Blocked { - reason, - }) => { - writer.write_all(&[0, 10, 0, 60])?; - shortstr(reason, &mut writer)?; - } - Class::Connection(Connection::Unblocked { - }) => { - writer.write_all(&[0, 10, 0, 61])?; - } - Class::Channel(Channel::Open { - reserved_1, - }) => { - writer.write_all(&[0, 20, 0, 10])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Channel(Channel::OpenOk { - reserved_1, - }) => { - writer.write_all(&[0, 20, 0, 11])?; - longstr(reserved_1, &mut writer)?; - } - Class::Channel(Channel::Flow { - active, - }) => { - writer.write_all(&[0, 20, 0, 20])?; - bit(&[active, ], &mut writer)?; - } - Class::Channel(Channel::FlowOk { - active, - }) => { - writer.write_all(&[0, 20, 0, 21])?; - bit(&[active, ], &mut writer)?; - } - Class::Channel(Channel::Close { - reply_code, - reply_text, - class_id, - method_id, - }) => { - writer.write_all(&[0, 20, 0, 40])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - short(class_id, &mut writer)?; - short(method_id, &mut writer)?; - } - Class::Channel(Channel::CloseOk { - }) => { - writer.write_all(&[0, 20, 0, 41])?; - } - Class::Exchange(Exchange::Declare { - reserved_1, - exchange, - r#type, - passive, - durable, - reserved_2, - reserved_3, - no_wait, - arguments, - }) => { - writer.write_all(&[0, 40, 0, 10])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(r#type, &mut writer)?; - bit(&[passive, durable, reserved_2, reserved_3, no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Exchange(Exchange::DeclareOk { - }) => { - writer.write_all(&[0, 40, 0, 11])?; - } - Class::Exchange(Exchange::Delete { - reserved_1, - exchange, - if_unused, - no_wait, - }) => { - writer.write_all(&[0, 40, 0, 20])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - bit(&[if_unused, no_wait, ], &mut writer)?; - } - Class::Exchange(Exchange::DeleteOk { - }) => { - writer.write_all(&[0, 40, 0, 21])?; - } - Class::Queue(Queue::Declare { - reserved_1, - queue, - passive, - durable, - exclusive, - auto_delete, - no_wait, - arguments, - }) => { - writer.write_all(&[0, 50, 0, 10])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[passive, durable, exclusive, auto_delete, no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Queue(Queue::DeclareOk { - queue, - message_count, - consumer_count, - }) => { - writer.write_all(&[0, 50, 0, 11])?; - shortstr(queue, &mut writer)?; - long(message_count, &mut writer)?; - long(consumer_count, &mut writer)?; - } - Class::Queue(Queue::Bind { - reserved_1, - queue, - exchange, - routing_key, - no_wait, - arguments, - }) => { - writer.write_all(&[0, 50, 0, 20])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - bit(&[no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Queue(Queue::BindOk { - }) => { - writer.write_all(&[0, 50, 0, 21])?; - } - Class::Queue(Queue::Unbind { - reserved_1, - queue, - exchange, - routing_key, - arguments, - }) => { - writer.write_all(&[0, 50, 0, 50])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Queue(Queue::UnbindOk { - }) => { - writer.write_all(&[0, 50, 0, 51])?; - } - Class::Queue(Queue::Purge { - reserved_1, - queue, - no_wait, - }) => { - writer.write_all(&[0, 50, 0, 30])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[no_wait, ], &mut writer)?; - } - Class::Queue(Queue::PurgeOk { - message_count, - }) => { - writer.write_all(&[0, 50, 0, 31])?; - long(message_count, &mut writer)?; - } - Class::Queue(Queue::Delete { - reserved_1, - queue, - if_unused, - if_empty, - no_wait, - }) => { - writer.write_all(&[0, 50, 0, 40])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[if_unused, if_empty, no_wait, ], &mut writer)?; - } - Class::Queue(Queue::DeleteOk { - message_count, - }) => { - writer.write_all(&[0, 50, 0, 41])?; - long(message_count, &mut writer)?; - } - Class::Basic(Basic::Qos { - prefetch_size, - prefetch_count, - global, - }) => { - writer.write_all(&[0, 60, 0, 10])?; - long(prefetch_size, &mut writer)?; - short(prefetch_count, &mut writer)?; - bit(&[global, ], &mut writer)?; - } - Class::Basic(Basic::QosOk { - }) => { - writer.write_all(&[0, 60, 0, 11])?; - } - Class::Basic(Basic::Consume { - reserved_1, - queue, - consumer_tag, - no_local, - no_ack, - exclusive, - no_wait, - arguments, - }) => { - writer.write_all(&[0, 60, 0, 20])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(consumer_tag, &mut writer)?; - bit(&[no_local, no_ack, exclusive, no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Basic(Basic::ConsumeOk { - consumer_tag, - }) => { - writer.write_all(&[0, 60, 0, 21])?; - shortstr(consumer_tag, &mut writer)?; - } - Class::Basic(Basic::Cancel { - consumer_tag, - no_wait, - }) => { - writer.write_all(&[0, 60, 0, 30])?; - shortstr(consumer_tag, &mut writer)?; - bit(&[no_wait, ], &mut writer)?; - } - Class::Basic(Basic::CancelOk { - consumer_tag, - }) => { - writer.write_all(&[0, 60, 0, 31])?; - shortstr(consumer_tag, &mut writer)?; - } - Class::Basic(Basic::Publish { - reserved_1, - exchange, - routing_key, - mandatory, - immediate, - }) => { - writer.write_all(&[0, 60, 0, 40])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - bit(&[mandatory, immediate, ], &mut writer)?; - } - Class::Basic(Basic::Return { - reply_code, - reply_text, - exchange, - routing_key, - }) => { - writer.write_all(&[0, 60, 0, 50])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - } - Class::Basic(Basic::Deliver { - consumer_tag, - delivery_tag, - redelivered, - exchange, - routing_key, - }) => { - writer.write_all(&[0, 60, 0, 60])?; - shortstr(consumer_tag, &mut writer)?; - longlong(delivery_tag, &mut writer)?; - bit(&[redelivered, ], &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - } - Class::Basic(Basic::Get { - reserved_1, - queue, - no_ack, - }) => { - writer.write_all(&[0, 60, 0, 70])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[no_ack, ], &mut writer)?; - } - Class::Basic(Basic::GetOk { - delivery_tag, - redelivered, - exchange, - routing_key, - message_count, - }) => { - writer.write_all(&[0, 60, 0, 71])?; - longlong(delivery_tag, &mut writer)?; - bit(&[redelivered, ], &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - long(message_count, &mut writer)?; - } - Class::Basic(Basic::GetEmpty { - reserved_1, - }) => { - writer.write_all(&[0, 60, 0, 72])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Basic(Basic::Ack { - delivery_tag, - multiple, - }) => { - writer.write_all(&[0, 60, 0, 80])?; - longlong(delivery_tag, &mut writer)?; - bit(&[multiple, ], &mut writer)?; - } - Class::Basic(Basic::Reject { - delivery_tag, - requeue, - }) => { - writer.write_all(&[0, 60, 0, 90])?; - longlong(delivery_tag, &mut writer)?; - bit(&[requeue, ], &mut writer)?; - } - Class::Basic(Basic::RecoverAsync { - requeue, - }) => { - writer.write_all(&[0, 60, 0, 100])?; - bit(&[requeue, ], &mut writer)?; - } - Class::Basic(Basic::Recover { - requeue, - }) => { - writer.write_all(&[0, 60, 0, 110])?; - bit(&[requeue, ], &mut writer)?; - } - Class::Basic(Basic::RecoverOk { - }) => { - writer.write_all(&[0, 60, 0, 111])?; - } - Class::Tx(Tx::Select { - }) => { - writer.write_all(&[0, 90, 0, 10])?; - } - Class::Tx(Tx::SelectOk { - }) => { - writer.write_all(&[0, 90, 0, 11])?; - } - Class::Tx(Tx::Commit { - }) => { - writer.write_all(&[0, 90, 0, 20])?; - } - Class::Tx(Tx::CommitOk { - }) => { - writer.write_all(&[0, 90, 0, 21])?; - } - Class::Tx(Tx::Rollback { - }) => { - writer.write_all(&[0, 90, 0, 30])?; - } - Class::Tx(Tx::RollbackOk { - }) => { - writer.write_all(&[0, 90, 0, 31])?; + pub fn write_method(class: Class, mut writer: W) -> Result<(), TransError> { + match class { + Class::Connection(Connection::Start { + version_major, + version_minor, + server_properties, + mechanisms, + locales, + }) => { + writer.write_all(&[0, 10, 0, 10])?; + octet(version_major, &mut writer)?; + octet(version_minor, &mut writer)?; + table(server_properties, &mut writer)?; + longstr(mechanisms, &mut writer)?; + longstr(locales, &mut writer)?; + } + Class::Connection(Connection::StartOk { + client_properties, + mechanism, + response, + locale, + }) => { + writer.write_all(&[0, 10, 0, 11])?; + table(client_properties, &mut writer)?; + shortstr(mechanism, &mut writer)?; + longstr(response, &mut writer)?; + shortstr(locale, &mut writer)?; + } + Class::Connection(Connection::Secure { challenge }) => { + writer.write_all(&[0, 10, 0, 20])?; + longstr(challenge, &mut writer)?; + } + Class::Connection(Connection::SecureOk { response }) => { + writer.write_all(&[0, 10, 0, 21])?; + longstr(response, &mut writer)?; + } + Class::Connection(Connection::Tune { + channel_max, + frame_max, + heartbeat, + }) => { + writer.write_all(&[0, 10, 0, 30])?; + short(channel_max, &mut writer)?; + long(frame_max, &mut writer)?; + short(heartbeat, &mut writer)?; + } + Class::Connection(Connection::TuneOk { + channel_max, + frame_max, + heartbeat, + }) => { + writer.write_all(&[0, 10, 0, 31])?; + short(channel_max, &mut writer)?; + long(frame_max, &mut writer)?; + short(heartbeat, &mut writer)?; + } + Class::Connection(Connection::Open { + virtual_host, + reserved_1, + reserved_2, + }) => { + writer.write_all(&[0, 10, 0, 40])?; + shortstr(virtual_host, &mut writer)?; + shortstr(reserved_1, &mut writer)?; + bit(&[reserved_2], &mut writer)?; + } + Class::Connection(Connection::OpenOk { reserved_1 }) => { + writer.write_all(&[0, 10, 0, 41])?; + shortstr(reserved_1, &mut writer)?; + } + Class::Connection(Connection::Close { + reply_code, + reply_text, + class_id, + method_id, + }) => { + writer.write_all(&[0, 10, 0, 50])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + short(class_id, &mut writer)?; + short(method_id, &mut writer)?; + } + Class::Connection(Connection::CloseOk {}) => { + writer.write_all(&[0, 10, 0, 51])?; + } + Class::Connection(Connection::Blocked { reason }) => { + writer.write_all(&[0, 10, 0, 60])?; + shortstr(reason, &mut writer)?; + } + Class::Connection(Connection::Unblocked {}) => { + writer.write_all(&[0, 10, 0, 61])?; + } + Class::Channel(Channel::Open { reserved_1 }) => { + writer.write_all(&[0, 20, 0, 10])?; + shortstr(reserved_1, &mut writer)?; + } + Class::Channel(Channel::OpenOk { reserved_1 }) => { + writer.write_all(&[0, 20, 0, 11])?; + longstr(reserved_1, &mut writer)?; + } + Class::Channel(Channel::Flow { active }) => { + writer.write_all(&[0, 20, 0, 20])?; + bit(&[active], &mut writer)?; + } + Class::Channel(Channel::FlowOk { active }) => { + writer.write_all(&[0, 20, 0, 21])?; + bit(&[active], &mut writer)?; + } + Class::Channel(Channel::Close { + reply_code, + reply_text, + class_id, + method_id, + }) => { + writer.write_all(&[0, 20, 0, 40])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + short(class_id, &mut writer)?; + short(method_id, &mut writer)?; + } + Class::Channel(Channel::CloseOk {}) => { + writer.write_all(&[0, 20, 0, 41])?; + } + Class::Exchange(Exchange::Declare { + reserved_1, + exchange, + r#type, + passive, + durable, + reserved_2, + reserved_3, + no_wait, + arguments, + }) => { + writer.write_all(&[0, 40, 0, 10])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(r#type, &mut writer)?; + bit( + &[passive, durable, reserved_2, reserved_3, no_wait], + &mut writer, + )?; + table(arguments, &mut writer)?; + } + Class::Exchange(Exchange::DeclareOk {}) => { + writer.write_all(&[0, 40, 0, 11])?; + } + Class::Exchange(Exchange::Delete { + reserved_1, + exchange, + if_unused, + no_wait, + }) => { + writer.write_all(&[0, 40, 0, 20])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + bit(&[if_unused, no_wait], &mut writer)?; + } + Class::Exchange(Exchange::DeleteOk {}) => { + writer.write_all(&[0, 40, 0, 21])?; + } + Class::Queue(Queue::Declare { + reserved_1, + queue, + passive, + durable, + exclusive, + auto_delete, + no_wait, + arguments, + }) => { + writer.write_all(&[0, 50, 0, 10])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit( + &[passive, durable, exclusive, auto_delete, no_wait], + &mut writer, + )?; + table(arguments, &mut writer)?; + } + Class::Queue(Queue::DeclareOk { + queue, + message_count, + consumer_count, + }) => { + writer.write_all(&[0, 50, 0, 11])?; + shortstr(queue, &mut writer)?; + long(message_count, &mut writer)?; + long(consumer_count, &mut writer)?; + } + Class::Queue(Queue::Bind { + reserved_1, + queue, + exchange, + routing_key, + no_wait, + arguments, + }) => { + writer.write_all(&[0, 50, 0, 20])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + bit(&[no_wait], &mut writer)?; + table(arguments, &mut writer)?; + } + Class::Queue(Queue::BindOk {}) => { + writer.write_all(&[0, 50, 0, 21])?; + } + Class::Queue(Queue::Unbind { + reserved_1, + queue, + exchange, + routing_key, + arguments, + }) => { + writer.write_all(&[0, 50, 0, 50])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + table(arguments, &mut writer)?; + } + Class::Queue(Queue::UnbindOk {}) => { + writer.write_all(&[0, 50, 0, 51])?; + } + Class::Queue(Queue::Purge { + reserved_1, + queue, + no_wait, + }) => { + writer.write_all(&[0, 50, 0, 30])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit(&[no_wait], &mut writer)?; + } + Class::Queue(Queue::PurgeOk { message_count }) => { + writer.write_all(&[0, 50, 0, 31])?; + long(message_count, &mut writer)?; + } + Class::Queue(Queue::Delete { + reserved_1, + queue, + if_unused, + if_empty, + no_wait, + }) => { + writer.write_all(&[0, 50, 0, 40])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit(&[if_unused, if_empty, no_wait], &mut writer)?; + } + Class::Queue(Queue::DeleteOk { message_count }) => { + writer.write_all(&[0, 50, 0, 41])?; + long(message_count, &mut writer)?; + } + Class::Basic(Basic::Qos { + prefetch_size, + prefetch_count, + global, + }) => { + writer.write_all(&[0, 60, 0, 10])?; + long(prefetch_size, &mut writer)?; + short(prefetch_count, &mut writer)?; + bit(&[global], &mut writer)?; + } + Class::Basic(Basic::QosOk {}) => { + writer.write_all(&[0, 60, 0, 11])?; + } + Class::Basic(Basic::Consume { + reserved_1, + queue, + consumer_tag, + no_local, + no_ack, + exclusive, + no_wait, + arguments, + }) => { + writer.write_all(&[0, 60, 0, 20])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(consumer_tag, &mut writer)?; + bit(&[no_local, no_ack, exclusive, no_wait], &mut writer)?; + table(arguments, &mut writer)?; + } + Class::Basic(Basic::ConsumeOk { consumer_tag }) => { + writer.write_all(&[0, 60, 0, 21])?; + shortstr(consumer_tag, &mut writer)?; + } + Class::Basic(Basic::Cancel { + consumer_tag, + no_wait, + }) => { + writer.write_all(&[0, 60, 0, 30])?; + shortstr(consumer_tag, &mut writer)?; + bit(&[no_wait], &mut writer)?; + } + Class::Basic(Basic::CancelOk { consumer_tag }) => { + writer.write_all(&[0, 60, 0, 31])?; + shortstr(consumer_tag, &mut writer)?; + } + Class::Basic(Basic::Publish { + reserved_1, + exchange, + routing_key, + mandatory, + immediate, + }) => { + writer.write_all(&[0, 60, 0, 40])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + bit(&[mandatory, immediate], &mut writer)?; + } + Class::Basic(Basic::Return { + reply_code, + reply_text, + exchange, + routing_key, + }) => { + writer.write_all(&[0, 60, 0, 50])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + } + Class::Basic(Basic::Deliver { + consumer_tag, + delivery_tag, + redelivered, + exchange, + routing_key, + }) => { + writer.write_all(&[0, 60, 0, 60])?; + shortstr(consumer_tag, &mut writer)?; + longlong(delivery_tag, &mut writer)?; + bit(&[redelivered], &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + } + Class::Basic(Basic::Get { + reserved_1, + queue, + no_ack, + }) => { + writer.write_all(&[0, 60, 0, 70])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit(&[no_ack], &mut writer)?; + } + Class::Basic(Basic::GetOk { + delivery_tag, + redelivered, + exchange, + routing_key, + message_count, + }) => { + writer.write_all(&[0, 60, 0, 71])?; + longlong(delivery_tag, &mut writer)?; + bit(&[redelivered], &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + long(message_count, &mut writer)?; + } + Class::Basic(Basic::GetEmpty { reserved_1 }) => { + writer.write_all(&[0, 60, 0, 72])?; + shortstr(reserved_1, &mut writer)?; + } + Class::Basic(Basic::Ack { + delivery_tag, + multiple, + }) => { + writer.write_all(&[0, 60, 0, 80])?; + longlong(delivery_tag, &mut writer)?; + bit(&[multiple], &mut writer)?; + } + Class::Basic(Basic::Reject { + delivery_tag, + requeue, + }) => { + writer.write_all(&[0, 60, 0, 90])?; + longlong(delivery_tag, &mut writer)?; + bit(&[requeue], &mut writer)?; + } + Class::Basic(Basic::RecoverAsync { requeue }) => { + writer.write_all(&[0, 60, 0, 100])?; + bit(&[requeue], &mut writer)?; + } + Class::Basic(Basic::Recover { requeue }) => { + writer.write_all(&[0, 60, 0, 110])?; + bit(&[requeue], &mut writer)?; + } + Class::Basic(Basic::RecoverOk {}) => { + writer.write_all(&[0, 60, 0, 111])?; + } + Class::Tx(Tx::Select {}) => { + writer.write_all(&[0, 90, 0, 10])?; + } + Class::Tx(Tx::SelectOk {}) => { + writer.write_all(&[0, 90, 0, 11])?; + } + Class::Tx(Tx::Commit {}) => { + writer.write_all(&[0, 90, 0, 20])?; + } + Class::Tx(Tx::CommitOk {}) => { + writer.write_all(&[0, 90, 0, 21])?; + } + Class::Tx(Tx::Rollback {}) => { + writer.write_all(&[0, 90, 0, 30])?; + } + Class::Tx(Tx::RollbackOk {}) => { + writer.write_all(&[0, 90, 0, 31])?; + } } + Ok(()) } - Ok(()) -} } diff --git a/amqp_transport/src/classes/mod.rs b/amqp_transport/src/classes/mod.rs index 6a9751c..2eb051d 100644 --- a/amqp_transport/src/classes/mod.rs +++ b/amqp_transport/src/classes/mod.rs @@ -46,3 +46,31 @@ pub fn parse_method(payload: &[u8]) -> Result { Err(nom::Err::Failure(err) | nom::Err::Error(err)) => Err(err), } } + +#[cfg(test)] +mod tests { + #[test] + fn pack_few_bits() { + let bits = [true, false, true]; + + let mut buffer = [0u8; 2]; + super::write_helper::bit(&bits, &mut buffer.as_mut_slice()).unwrap(); + + let (_, parsed_bits) = super::parse_helper::bit(&buffer, 3).unwrap(); + assert_eq!(bits.as_slice(), parsed_bits.as_slice()); + } + + #[test] + fn pack_many_bits() { + let bits = [ + /* first 8 */ + true, true, true, true, false, false, false, false, /* second 4 */ + true, false, true, true, + ]; + let mut buffer = [0u8; 2]; + super::write_helper::bit(&bits, &mut buffer.as_mut_slice()).unwrap(); + + let (_, parsed_bits) = super::parse_helper::bit(&buffer, 12).unwrap(); + assert_eq!(bits.as_slice(), parsed_bits.as_slice()); + } +} diff --git a/amqp_transport/src/classes/parse_helper.rs b/amqp_transport/src/classes/parse_helper.rs index 07bcd0e..8234b0c 100644 --- a/amqp_transport/src/classes/parse_helper.rs +++ b/amqp_transport/src/classes/parse_helper.rs @@ -3,14 +3,20 @@ use crate::classes::generated::{ Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp, }; use crate::error::{ConException, ProtocolError, TransError}; +use nom::branch::alt; +use nom::bytes::complete::{tag, take}; use nom::error::ErrorKind; +use nom::multi::count; +use nom::number::complete::{f32, f64, i16, i32, i64, i8, u16, u32, u64, u8}; +use nom::number::Endianness::Big; +use std::collections::HashMap; -impl nom::error::ParseError<&[u8]> for TransError { - fn from_error_kind(_input: &[u8], _kind: ErrorKind) -> Self { +impl nom::error::ParseError for TransError { + fn from_error_kind(_input: T, _kind: ErrorKind) -> Self { ProtocolError::ConException(ConException::SyntaxError).into() } - fn append(_input: &[u8], _kind: ErrorKind, other: Self) -> Self { + fn append(_input: T, _kind: ErrorKind, other: Self) -> Self { other } } @@ -25,33 +31,167 @@ macro_rules! fail { }; } +use crate::classes::{FieldValue, TableFieldName}; pub use fail; pub fn octet(input: &[u8]) -> IResult { - todo!() + u8(input) } + pub fn short(input: &[u8]) -> IResult { - todo!() + u16(Big)(input) } + pub fn long(input: &[u8]) -> IResult { - todo!() + u32(Big)(input) } + pub fn longlong(input: &[u8]) -> IResult { - todo!() + u64(Big)(input) } -// todo: doing this using a vec is a bit wasteful, consider not doing that -pub fn bit(input: &[u8], amount: u8) -> IResult> { - todo!() + +pub fn bit(input: &[u8], amount: usize) -> IResult> { + let octets = (amount + 7) / 8; + let (input, bytes) = take(octets)(input)?; + + let mut vec = Vec::new(); + let mut byte_index = 0; + let mut total_index = 0; + + for &byte in bytes { + while byte_index < 8 && total_index < amount { + let next_bit = 1 & (byte >> byte_index); + let bit_bool = match next_bit { + 0 => false, + 1 => true, + _ => unreachable!(), + }; + vec.push(bit_bool); + byte_index += 1; + total_index += 1; + } + byte_index = 0; + } + + Ok((input, vec)) } + pub fn shortstr(input: &[u8]) -> IResult { - todo!() + let (input, len) = u8(input)?; + let (input, str_data) = take(usize::from(len))(input)?; + let data = String::from_utf8(str_data.into()).map_err(|_| { + nom::Err::Failure(ProtocolError::ConException(ConException::SyntaxError).into()) + })?; + Ok((input, data)) } + pub fn longstr(input: &[u8]) -> IResult { - todo!() + let (input, len) = u32(Big)(input)?; + let (input, str_data) = take(usize::try_from(len).unwrap())(input)?; + let data = str_data.into(); + Ok((input, data)) } + pub fn timestamp(input: &[u8]) -> IResult { - todo!() + u64(Big)(input) } + pub fn table(input: &[u8]) -> IResult
{ - todo!() + let (input, len) = u32(Big)(input)?; + + let (input, values) = count(table_value_pair, usize::try_from(len).unwrap())(input)?; + let table = HashMap::from_iter(values.into_iter()); + Ok((input, table)) +} + +fn table_value_pair(input: &[u8]) -> IResult<(TableFieldName, FieldValue)> { + let (input, field_name) = shortstr(input)?; + let (input, field_value) = field_value(input)?; + Ok((input, (field_name, field_value))) +} + +fn field_value(input: &[u8]) -> IResult { + type R<'a> = IResult<'a, FieldValue>; + + fn boolean(input: &[u8]) -> R { + let (input, _) = tag(b"t")(input)?; + let (input, bool_byte) = u8(input)?; + match bool_byte { + 0 => Ok((input, FieldValue::Boolean(false))), + 1 => Ok((input, FieldValue::Boolean(true))), + _ => fail!(), + } + } + + macro_rules! number { + ($tag:literal, $name:ident, $comb:expr, $value:ident, $r:path) => { + fn $name(input: &[u8]) -> $r { + let (input, _) = tag($tag)(input)?; + $comb(input).map(|(input, int)| (input, FieldValue::$value(int))) + } + }; + } + + number!(b"b", short_short_int, i8, ShortShortInt, R); + number!(b"B", short_short_uint, u8, ShortShortUInt, R); + number!(b"U", short_int, i16(Big), ShortInt, R); + number!(b"u", short_uint, u16(Big), ShortUInt, R); + number!(b"I", long_int, i32(Big), LongInt, R); + number!(b"i", long_uint, u32(Big), LongUInt, R); + number!(b"L", long_long_int, i64(Big), LongLongInt, R); + number!(b"l", long_long_uint, u64(Big), LongLongUInt, R); + number!(b"f", float, f32(Big), Float, R); + number!(b"d", double, f64(Big), Double, R); + + fn decimal(input: &[u8]) -> R { + let (input, _) = tag("D")(input)?; + let (input, scale) = u8(input)?; + let (input, value) = u32(Big)(input)?; + Ok((input, FieldValue::DecimalValue(scale, value))) + } + + fn short_str(input: &[u8]) -> R { + let (input, _) = tag("s")(input)?; + let (input, str) = shortstr(input)?; + Ok((input, FieldValue::ShortString(str))) + } + + fn long_str(input: &[u8]) -> R { + let (input, _) = tag("S")(input)?; + let (input, str) = longstr(input)?; + Ok((input, FieldValue::LongString(str))) + } + + fn field_array(input: &[u8]) -> R { + let (input, _) = tag("A")(input)?; + // todo is it i32? + let (input, len) = u32(Big)(input)?; + count(field_value, usize::try_from(len).unwrap())(input) + .map(|(input, value)| (input, FieldValue::FieldArray(value))) + } + + number!(b"T", timestamp, u64(Big), Timestamp, R); + + fn field_table(input: &[u8]) -> R { + table(input).map(|(input, value)| (input, FieldValue::FieldTable(value))) + } + + alt(( + boolean, + short_short_int, + short_short_uint, + short_int, + short_uint, + long_int, + long_uint, + long_long_int, + long_long_uint, + float, + double, + decimal, + short_str, + long_str, + field_array, + timestamp, + ))(input) } diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 036b8dc..71810a7 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -1,5 +1,5 @@ use crate::classes::FieldValue; -use crate::error::{ProtocolError, Result}; +use crate::error::{ConException, ProtocolError, Result}; use crate::frame::{Frame, FrameType}; use crate::{classes, frame}; use anyhow::Context; @@ -54,7 +54,7 @@ impl Connection { .local_addr() .context("failed to get local_addr")?, ), - mechanisms: "none".to_string().into(), + mechanisms: "PLAIN".to_string().into(), locales: "en_US".to_string().into(), }); @@ -72,6 +72,17 @@ impl Connection { ) .await?; + let start_ok_frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?; + debug!(?start_ok_frame, "Received Start-Ok frame"); + + if start_ok_frame.kind != FrameType::Method { + return Err(ProtocolError::ConException(ConException::Todo).into()); + } + + let class = classes::parse_method(&start_ok_frame.payload)?; + + debug!(?class, "extracted method"); + Ok(()) } @@ -120,6 +131,8 @@ fn server_properties(host: SocketAddr) -> classes::Table { FieldValue::LongString(host_str.into()) }; + // todo: fix + //HashMap::from([ // ("host".to_string(), host_value), // ("product".to_string(), ss("no name yet")), diff --git a/amqp_transport/src/error.rs b/amqp_transport/src/error.rs index 7b7618f..10085a7 100644 --- a/amqp_transport/src/error.rs +++ b/amqp_transport/src/error.rs @@ -38,6 +38,8 @@ pub enum ConException { SyntaxError, #[error("504 Channel error")] ChannelError, + #[error("xxx Not decided yet")] + Todo, } #[derive(Debug, thiserror::Error)]