From 427cfadc11b2d828ed644cc72f79f2b52f519afa Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Sun, 13 Feb 2022 14:53:12 +0100 Subject: [PATCH] table ser --- amqp_codegen/src/main.rs | 7 +- amqp_transport/src/classes/generated.rs | 2411 ++++++++++---------- amqp_transport/src/classes/mod.rs | 8 +- amqp_transport/src/classes/write_helper.rs | 172 +- 4 files changed, 1347 insertions(+), 1251 deletions(-) diff --git a/amqp_codegen/src/main.rs b/amqp_codegen/src/main.rs index 87c9201..6a77e8d 100644 --- a/amqp_codegen/src/main.rs +++ b/amqp_codegen/src/main.rs @@ -2,10 +2,10 @@ mod parser; mod write; use crate::parser::codegen_parser; +use crate::write::codegen_write; use heck::ToUpperCamelCase; use std::fs; use strong_xml::XmlRead; -use crate::write::codegen_write; #[derive(Debug, XmlRead)] #[xml(tag = "amqp")] @@ -151,8 +151,9 @@ fn amqp_type_to_rust_type(amqp_type: &str) -> &'static str { "short" => "u16", "long" => "u32", "longlong" => "u64", - "bit" => "u8", - "shortstr" | "longstr" => "String", + "bit" => "bool", + "shortstr" => "String", + "longstr" => "Vec", "timestamp" => "u64", "table" => "super::Table", _ => unreachable!("invalid type {}", amqp_type), diff --git a/amqp_transport/src/classes/generated.rs b/amqp_transport/src/classes/generated.rs index f072ab5..9979bdd 100644 --- a/amqp_transport/src/classes/generated.rs +++ b/amqp_transport/src/classes/generated.rs @@ -11,11 +11,11 @@ pub type ExchangeName = String; pub type MethodId = u16; -pub type NoAck = u8; +pub type NoAck = bool; -pub type NoLocal = u8; +pub type NoLocal = bool; -pub type NoWait = u8; +pub type NoWait = bool; /// must not be null, must be shorter than 127 pub type Path = String; @@ -25,7 +25,7 @@ pub type PeerProperties = super::Table; /// must be shorter than 127, must match `^[a-zA-Z0-9-_.:]*$` pub type QueueName = String; -pub type Redelivered = u8; +pub type Redelivered = bool; pub type MessageCount = u32; @@ -35,7 +35,7 @@ pub type ReplyCode = u16; /// must not be null pub type ReplyText = String; -pub type Bit = u8; +pub type Bit = bool; pub type Octet = u8; @@ -47,7 +47,7 @@ pub type Longlong = u64; pub type Shortstr = String; -pub type Longstr = String; +pub type Longstr = Vec; pub type Timestamp = u64; @@ -87,7 +87,9 @@ pub enum Connection { locale: Shortstr, }, /// Index 20 - Secure { challenge: Longstr }, + Secure { + challenge: Longstr, + }, /// Index 21 SecureOk { /// must not be null @@ -113,7 +115,9 @@ pub enum Connection { reserved_2: Bit, }, /// Index 41 - OpenOk { reserved_1: Shortstr }, + OpenOk { + reserved_1: Shortstr, + }, /// Index 50 Close { reply_code: ReplyCode, @@ -124,7 +128,9 @@ pub enum Connection { /// Index 51 CloseOk, /// Index 60 - Blocked { reason: Shortstr }, + Blocked { + reason: Shortstr, + }, /// Index 61 Unblocked, } @@ -132,13 +138,21 @@ pub enum Connection { #[derive(Debug, Clone, PartialEq)] pub enum Channel { /// Index 10 - Open { reserved_1: Shortstr }, + Open { + reserved_1: Shortstr, + }, /// Index 11 - OpenOk { reserved_1: Longstr }, + OpenOk { + reserved_1: Longstr, + }, /// Index 20 - Flow { active: Bit }, + Flow { + active: Bit, + }, /// Index 21 - FlowOk { active: Bit }, + FlowOk { + active: Bit, + }, /// Index 40 Close { reply_code: ReplyCode, @@ -227,7 +241,9 @@ pub enum Queue { no_wait: NoWait, }, /// Index 31 - PurgeOk { message_count: MessageCount }, + PurgeOk { + message_count: MessageCount, + }, /// Index 40 Delete { reserved_1: Short, @@ -237,7 +253,9 @@ pub enum Queue { no_wait: NoWait, }, /// Index 41 - DeleteOk { message_count: MessageCount }, + DeleteOk { + message_count: MessageCount, + }, } /// Index 60, handler = channel #[derive(Debug, Clone, PartialEq)] @@ -262,14 +280,18 @@ pub enum Basic { arguments: Table, }, /// Index 21 - ConsumeOk { consumer_tag: ConsumerTag }, + ConsumeOk { + consumer_tag: ConsumerTag, + }, /// Index 30 Cancel { consumer_tag: ConsumerTag, no_wait: NoWait, }, /// Index 31 - CancelOk { consumer_tag: ConsumerTag }, + CancelOk { + consumer_tag: ConsumerTag, + }, /// Index 40 Publish { reserved_1: Short, @@ -308,7 +330,9 @@ pub enum Basic { message_count: MessageCount, }, /// Index 72 - GetEmpty { reserved_1: Shortstr }, + GetEmpty { + reserved_1: Shortstr, + }, /// Index 80 Ack { delivery_tag: DeliveryTag, @@ -320,9 +344,13 @@ pub enum Basic { requeue: Bit, }, /// Index 100 - RecoverAsync { requeue: Bit }, + RecoverAsync { + requeue: Bit, + }, /// Index 110 - Recover { requeue: Bit }, + Recover { + requeue: Bit, + }, /// Index 111 RecoverOk, } @@ -343,1223 +371,1142 @@ pub enum Tx { RollbackOk, } pub mod parse { - use super::*; - use crate::classes::parse_helper::*; - use crate::error::TransError; - use nom::{branch::alt, bytes::complete::tag}; - use once_cell::sync::Lazy; - use regex::Regex; +use super::*; +use crate::classes::parse_helper::*; +use crate::error::TransError; +use nom::{branch::alt, bytes::complete::tag}; +use regex::Regex; +use once_cell::sync::Lazy; - pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; +pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; + +pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err> { + alt((connection, channel, exchange, queue, basic, tx))(input) +} +fn domain_class_id(input: &[u8]) -> IResult { + short(input) +} +fn domain_consumer_tag(input: &[u8]) -> IResult { + shortstr(input) +} +fn domain_delivery_tag(input: &[u8]) -> IResult { + longlong(input) +} +fn domain_exchange_name(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.len() > 127 { fail!() } + static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); + if !REGEX.is_match(&result) { fail!() } + Ok((input, result)) +} +fn domain_method_id(input: &[u8]) -> IResult { + short(input) +} +fn domain_path(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.is_empty() { fail!() } + if result.len() > 127 { fail!() } + Ok((input, result)) +} +fn domain_peer_properties(input: &[u8]) -> IResult { + table(input) +} +fn domain_queue_name(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.len() > 127 { fail!() } + static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); + if !REGEX.is_match(&result) { fail!() } + Ok((input, result)) +} +fn domain_message_count(input: &[u8]) -> IResult { + long(input) +} +fn domain_reply_code(input: &[u8]) -> IResult { + let (input, result) = short(input)?; + if result == 0 { fail!() } + Ok((input, result)) +} +fn domain_reply_text(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.is_empty() { fail!() } + Ok((input, result)) +} +fn domain_octet(input: &[u8]) -> IResult { + octet(input) +} +fn domain_short(input: &[u8]) -> IResult { + short(input) +} +fn domain_long(input: &[u8]) -> IResult { + long(input) +} +fn domain_longlong(input: &[u8]) -> IResult { + longlong(input) +} +fn domain_shortstr(input: &[u8]) -> IResult { + shortstr(input) +} +fn domain_longstr(input: &[u8]) -> IResult { + longstr(input) +} +fn domain_timestamp(input: &[u8]) -> IResult { + timestamp(input) +} +fn domain_table(input: &[u8]) -> IResult { + table(input) +} +fn connection(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + alt((connection_start, connection_start_ok, connection_secure, connection_secure_ok, connection_tune, connection_tune_ok, connection_open, connection_open_ok, connection_close, connection_close_ok, connection_blocked, connection_unblocked))(input) +} +fn connection_start(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, version_major) = domain_octet(input)?; + let (input, version_minor) = domain_octet(input)?; + let (input, server_properties) = domain_peer_properties(input)?; + let (input, mechanisms) = domain_longstr(input)?; + if mechanisms.is_empty() { fail!() } + let (input, locales) = domain_longstr(input)?; + if locales.is_empty() { fail!() } + Ok((input, Class::Connection(Connection::Start { + version_major, + version_minor, + server_properties, + mechanisms, + locales, + }))) +} +fn connection_start_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + let (input, client_properties) = domain_peer_properties(input)?; + let (input, mechanism) = domain_shortstr(input)?; + if mechanism.is_empty() { fail!() } + let (input, response) = domain_longstr(input)?; + if response.is_empty() { fail!() } + let (input, locale) = domain_shortstr(input)?; + if locale.is_empty() { fail!() } + Ok((input, Class::Connection(Connection::StartOk { + client_properties, + mechanism, + response, + locale, + }))) +} +fn connection_secure(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, challenge) = domain_longstr(input)?; + Ok((input, Class::Connection(Connection::Secure { + challenge, + }))) +} +fn connection_secure_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + let (input, response) = domain_longstr(input)?; + if response.is_empty() { fail!() } + Ok((input, Class::Connection(Connection::SecureOk { + response, + }))) +} +fn connection_tune(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + let (input, channel_max) = domain_short(input)?; + let (input, frame_max) = domain_long(input)?; + let (input, heartbeat) = domain_short(input)?; + Ok((input, Class::Connection(Connection::Tune { + channel_max, + frame_max, + heartbeat, + }))) +} +fn connection_tune_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + let (input, channel_max) = domain_short(input)?; + if channel_max == 0 { fail!() } + let (input, frame_max) = domain_long(input)?; + let (input, heartbeat) = domain_short(input)?; + Ok((input, Class::Connection(Connection::TuneOk { + channel_max, + frame_max, + heartbeat, + }))) +} +fn connection_open(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, virtual_host) = domain_path(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + let (input, bits) = bit(input, 1)?; + let reserved_2 = bits[0]; + Ok((input, Class::Connection(Connection::Open { + virtual_host, + reserved_1, + reserved_2, + }))) +} +fn connection_open_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + Ok((input, Class::Connection(Connection::OpenOk { + reserved_1, + }))) +} +fn connection_close(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + let (input, reply_code) = domain_reply_code(input)?; + let (input, reply_text) = domain_reply_text(input)?; + let (input, class_id) = domain_class_id(input)?; + let (input, method_id) = domain_method_id(input)?; + Ok((input, Class::Connection(Connection::Close { + reply_code, + reply_text, + class_id, + method_id, + }))) +} +fn connection_close_ok(input: &[u8]) -> IResult { + let (input, _) = tag([51])(input)?; + Ok((input, Class::Connection(Connection::CloseOk { + }))) +} +fn connection_blocked(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + let (input, reason) = domain_shortstr(input)?; + Ok((input, Class::Connection(Connection::Blocked { + reason, + }))) +} +fn connection_unblocked(input: &[u8]) -> IResult { + let (input, _) = tag([61])(input)?; + Ok((input, Class::Connection(Connection::Unblocked { + }))) +} +fn channel(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + alt((channel_open, channel_open_ok, channel_flow, channel_flow_ok, channel_close, channel_close_ok))(input) +} +fn channel_open(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + Ok((input, Class::Channel(Channel::Open { + reserved_1, + }))) +} +fn channel_open_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + let (input, reserved_1) = domain_longstr(input)?; + Ok((input, Class::Channel(Channel::OpenOk { + reserved_1, + }))) +} +fn channel_flow(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, bits) = bit(input, 1)?; + let active = bits[0]; + Ok((input, Class::Channel(Channel::Flow { + active, + }))) +} +fn channel_flow_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + let (input, bits) = bit(input, 1)?; + let active = bits[0]; + Ok((input, Class::Channel(Channel::FlowOk { + active, + }))) +} +fn channel_close(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, reply_code) = domain_reply_code(input)?; + let (input, reply_text) = domain_reply_text(input)?; + let (input, class_id) = domain_class_id(input)?; + let (input, method_id) = domain_method_id(input)?; + Ok((input, Class::Channel(Channel::Close { + reply_code, + reply_text, + class_id, + method_id, + }))) +} +fn channel_close_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + Ok((input, Class::Channel(Channel::CloseOk { + }))) +} +fn exchange(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + alt((exchange_declare, exchange_declare_ok, exchange_delete, exchange_delete_ok))(input) +} +fn exchange_declare(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, exchange) = domain_exchange_name(input)?; + if exchange.is_empty() { fail!() } + let (input, r#type) = domain_shortstr(input)?; + let (input, bits) = bit(input, 5)?; + let passive = bits[0]; + let durable = bits[1]; + let reserved_2 = bits[2]; + let reserved_3 = bits[3]; + let no_wait = bits[4]; + let (input, arguments) = domain_table(input)?; + Ok((input, Class::Exchange(Exchange::Declare { + reserved_1, + exchange, + r#type, + passive, + durable, + reserved_2, + reserved_3, + no_wait, + arguments, + }))) +} +fn exchange_declare_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + Ok((input, Class::Exchange(Exchange::DeclareOk { + }))) +} +fn exchange_delete(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, exchange) = domain_exchange_name(input)?; + if exchange.is_empty() { fail!() } + let (input, bits) = bit(input, 2)?; + let if_unused = bits[0]; + let no_wait = bits[1]; + Ok((input, Class::Exchange(Exchange::Delete { + reserved_1, + exchange, + if_unused, + no_wait, + }))) +} +fn exchange_delete_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + Ok((input, Class::Exchange(Exchange::DeleteOk { + }))) +} +fn queue(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + alt((queue_declare, queue_declare_ok, queue_bind, queue_bind_ok, queue_unbind, queue_unbind_ok, queue_purge, queue_purge_ok, queue_delete, queue_delete_ok))(input) +} +fn queue_declare(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 5)?; + let passive = bits[0]; + let durable = bits[1]; + let exclusive = bits[2]; + let auto_delete = bits[3]; + let no_wait = bits[4]; + let (input, arguments) = domain_table(input)?; + Ok((input, Class::Queue(Queue::Declare { + reserved_1, + queue, + passive, + durable, + exclusive, + auto_delete, + no_wait, + arguments, + }))) +} +fn queue_declare_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + let (input, queue) = domain_queue_name(input)?; + if queue.is_empty() { fail!() } + let (input, message_count) = domain_message_count(input)?; + let (input, consumer_count) = domain_long(input)?; + Ok((input, Class::Queue(Queue::DeclareOk { + queue, + message_count, + consumer_count, + }))) +} +fn queue_bind(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, bits) = bit(input, 1)?; + let no_wait = bits[0]; + let (input, arguments) = domain_table(input)?; + Ok((input, Class::Queue(Queue::Bind { + reserved_1, + queue, + exchange, + routing_key, + no_wait, + arguments, + }))) +} +fn queue_bind_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + Ok((input, Class::Queue(Queue::BindOk { + }))) +} +fn queue_unbind(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, arguments) = domain_table(input)?; + Ok((input, Class::Queue(Queue::Unbind { + reserved_1, + queue, + exchange, + routing_key, + arguments, + }))) +} +fn queue_unbind_ok(input: &[u8]) -> IResult { + let (input, _) = tag([51])(input)?; + Ok((input, Class::Queue(Queue::UnbindOk { + }))) +} +fn queue_purge(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 1)?; + let no_wait = bits[0]; + Ok((input, Class::Queue(Queue::Purge { + reserved_1, + queue, + no_wait, + }))) +} +fn queue_purge_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + let (input, message_count) = domain_message_count(input)?; + Ok((input, Class::Queue(Queue::PurgeOk { + message_count, + }))) +} +fn queue_delete(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 3)?; + let if_unused = bits[0]; + let if_empty = bits[1]; + let no_wait = bits[2]; + Ok((input, Class::Queue(Queue::Delete { + reserved_1, + queue, + if_unused, + if_empty, + no_wait, + }))) +} +fn queue_delete_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + let (input, message_count) = domain_message_count(input)?; + Ok((input, Class::Queue(Queue::DeleteOk { + message_count, + }))) +} +fn basic(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + alt((basic_qos, basic_qos_ok, basic_consume, basic_consume_ok, basic_cancel, basic_cancel_ok, basic_publish, basic_return, basic_deliver, basic_get, basic_get_ok, basic_get_empty, basic_ack, basic_reject, basic_recover_async, basic_recover, basic_recover_ok))(input) +} +fn basic_qos(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, prefetch_size) = domain_long(input)?; + let (input, prefetch_count) = domain_short(input)?; + let (input, bits) = bit(input, 1)?; + let global = bits[0]; + Ok((input, Class::Basic(Basic::Qos { + prefetch_size, + prefetch_count, + global, + }))) +} +fn basic_qos_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + Ok((input, Class::Basic(Basic::QosOk { + }))) +} +fn basic_consume(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + let (input, bits) = bit(input, 4)?; + let no_local = bits[0]; + let no_ack = bits[1]; + let exclusive = bits[2]; + let no_wait = bits[3]; + let (input, arguments) = domain_table(input)?; + Ok((input, Class::Basic(Basic::Consume { + reserved_1, + queue, + consumer_tag, + no_local, + no_ack, + exclusive, + no_wait, + arguments, + }))) +} +fn basic_consume_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + Ok((input, Class::Basic(Basic::ConsumeOk { + consumer_tag, + }))) +} +fn basic_cancel(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + let (input, bits) = bit(input, 1)?; + let no_wait = bits[0]; + Ok((input, Class::Basic(Basic::Cancel { + consumer_tag, + no_wait, + }))) +} +fn basic_cancel_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + Ok((input, Class::Basic(Basic::CancelOk { + consumer_tag, + }))) +} +fn basic_publish(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, bits) = bit(input, 2)?; + let mandatory = bits[0]; + let immediate = bits[1]; + Ok((input, Class::Basic(Basic::Publish { + reserved_1, + exchange, + routing_key, + mandatory, + immediate, + }))) +} +fn basic_return(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + let (input, reply_code) = domain_reply_code(input)?; + let (input, reply_text) = domain_reply_text(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + Ok((input, Class::Basic(Basic::Return { + reply_code, + reply_text, + exchange, + routing_key, + }))) +} +fn basic_deliver(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let redelivered = bits[0]; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + Ok((input, Class::Basic(Basic::Deliver { + consumer_tag, + delivery_tag, + redelivered, + exchange, + routing_key, + }))) +} +fn basic_get(input: &[u8]) -> IResult { + let (input, _) = tag([70])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 1)?; + let no_ack = bits[0]; + Ok((input, Class::Basic(Basic::Get { + reserved_1, + queue, + no_ack, + }))) +} +fn basic_get_ok(input: &[u8]) -> IResult { + let (input, _) = tag([71])(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let redelivered = bits[0]; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, message_count) = domain_message_count(input)?; + Ok((input, Class::Basic(Basic::GetOk { + delivery_tag, + redelivered, + exchange, + routing_key, + message_count, + }))) +} +fn basic_get_empty(input: &[u8]) -> IResult { + let (input, _) = tag([72])(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + Ok((input, Class::Basic(Basic::GetEmpty { + reserved_1, + }))) +} +fn basic_ack(input: &[u8]) -> IResult { + let (input, _) = tag([80])(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let multiple = bits[0]; + Ok((input, Class::Basic(Basic::Ack { + delivery_tag, + multiple, + }))) +} +fn basic_reject(input: &[u8]) -> IResult { + let (input, _) = tag([90])(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let requeue = bits[0]; + Ok((input, Class::Basic(Basic::Reject { + delivery_tag, + requeue, + }))) +} +fn basic_recover_async(input: &[u8]) -> IResult { + let (input, _) = tag([100])(input)?; + let (input, bits) = bit(input, 1)?; + let requeue = bits[0]; + Ok((input, Class::Basic(Basic::RecoverAsync { + requeue, + }))) +} +fn basic_recover(input: &[u8]) -> IResult { + let (input, _) = tag([110])(input)?; + let (input, bits) = bit(input, 1)?; + let requeue = bits[0]; + Ok((input, Class::Basic(Basic::Recover { + requeue, + }))) +} +fn basic_recover_ok(input: &[u8]) -> IResult { + let (input, _) = tag([111])(input)?; + Ok((input, Class::Basic(Basic::RecoverOk { + }))) +} +fn tx(input: &[u8]) -> IResult { + let (input, _) = tag([90])(input)?; + alt((tx_select, tx_select_ok, tx_commit, tx_commit_ok, tx_rollback, tx_rollback_ok))(input) +} +fn tx_select(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + Ok((input, Class::Tx(Tx::Select { + }))) +} +fn tx_select_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + Ok((input, Class::Tx(Tx::SelectOk { + }))) +} +fn tx_commit(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + Ok((input, Class::Tx(Tx::Commit { + }))) +} +fn tx_commit_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + Ok((input, Class::Tx(Tx::CommitOk { + }))) +} +fn tx_rollback(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + Ok((input, Class::Tx(Tx::Rollback { + }))) +} +fn tx_rollback_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + Ok((input, Class::Tx(Tx::RollbackOk { + }))) +} - pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err> { - alt((connection, channel, exchange, queue, basic, tx))(input) - } - fn domain_class_id(input: &[u8]) -> IResult { - short(input) - } - fn domain_consumer_tag(input: &[u8]) -> IResult { - shortstr(input) - } - fn domain_delivery_tag(input: &[u8]) -> IResult { - longlong(input) - } - fn domain_exchange_name(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.len() > 127 { - fail!() - } - static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); - if !REGEX.is_match(&result) { - fail!() - } - Ok((input, result)) - } - fn domain_method_id(input: &[u8]) -> IResult { - short(input) - } - fn domain_path(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.is_empty() { - fail!() - } - if result.len() > 127 { - fail!() - } - Ok((input, result)) - } - fn domain_peer_properties(input: &[u8]) -> IResult { - table(input) - } - fn domain_queue_name(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.len() > 127 { - fail!() - } - static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); - if !REGEX.is_match(&result) { - fail!() - } - Ok((input, result)) - } - fn domain_message_count(input: &[u8]) -> IResult { - long(input) - } - fn domain_reply_code(input: &[u8]) -> IResult { - let (input, result) = short(input)?; - if result == 0 { - fail!() - } - Ok((input, result)) - } - fn domain_reply_text(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.is_empty() { - fail!() - } - Ok((input, result)) - } - fn domain_octet(input: &[u8]) -> IResult { - octet(input) - } - fn domain_short(input: &[u8]) -> IResult { - short(input) - } - fn domain_long(input: &[u8]) -> IResult { - long(input) - } - fn domain_longlong(input: &[u8]) -> IResult { - longlong(input) - } - fn domain_shortstr(input: &[u8]) -> IResult { - shortstr(input) - } - fn domain_longstr(input: &[u8]) -> IResult { - longstr(input) - } - fn domain_timestamp(input: &[u8]) -> IResult { - timestamp(input) - } - fn domain_table(input: &[u8]) -> IResult
{ - table(input) - } - fn connection(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - alt(( - connection_start, - connection_start_ok, - connection_secure, - connection_secure_ok, - connection_tune, - connection_tune_ok, - connection_open, - connection_open_ok, - connection_close, - connection_close_ok, - connection_blocked, - connection_unblocked, - ))(input) - } - fn connection_start(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, version_major) = domain_octet(input)?; - let (input, version_minor) = domain_octet(input)?; - let (input, server_properties) = domain_peer_properties(input)?; - let (input, mechanisms) = domain_longstr(input)?; - if mechanisms.is_empty() { - fail!() - } - let (input, locales) = domain_longstr(input)?; - if locales.is_empty() { - fail!() - } - Ok(( - input, - Class::Connection(Connection::Start { - version_major, - version_minor, - server_properties, - mechanisms, - locales, - }), - )) - } - fn connection_start_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - let (input, client_properties) = domain_peer_properties(input)?; - let (input, mechanism) = domain_shortstr(input)?; - if mechanism.is_empty() { - fail!() - } - let (input, response) = domain_longstr(input)?; - if response.is_empty() { - fail!() - } - let (input, locale) = domain_shortstr(input)?; - if locale.is_empty() { - fail!() - } - Ok(( - input, - Class::Connection(Connection::StartOk { - client_properties, - mechanism, - response, - locale, - }), - )) - } - fn connection_secure(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, challenge) = domain_longstr(input)?; - Ok((input, Class::Connection(Connection::Secure { challenge }))) - } - fn connection_secure_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - let (input, response) = domain_longstr(input)?; - if response.is_empty() { - fail!() - } - Ok((input, Class::Connection(Connection::SecureOk { response }))) - } - fn connection_tune(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - let (input, channel_max) = domain_short(input)?; - let (input, frame_max) = domain_long(input)?; - let (input, heartbeat) = domain_short(input)?; - Ok(( - input, - Class::Connection(Connection::Tune { - channel_max, - frame_max, - heartbeat, - }), - )) - } - fn connection_tune_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - let (input, channel_max) = domain_short(input)?; - if channel_max == 0 { - fail!() - } - let (input, frame_max) = domain_long(input)?; - let (input, heartbeat) = domain_short(input)?; - Ok(( - input, - Class::Connection(Connection::TuneOk { - channel_max, - frame_max, - heartbeat, - }), - )) - } - fn connection_open(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - let (input, virtual_host) = domain_path(input)?; - let (input, reserved_1) = domain_shortstr(input)?; - let (input, bits) = bit(input, 1)?; - let reserved_2 = bits[0]; - Ok(( - input, - Class::Connection(Connection::Open { - virtual_host, - reserved_1, - reserved_2, - }), - )) - } - fn connection_open_ok(input: &[u8]) -> IResult { - let (input, _) = tag([41])(input)?; - let (input, reserved_1) = domain_shortstr(input)?; - Ok((input, Class::Connection(Connection::OpenOk { reserved_1 }))) - } - fn connection_close(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - let (input, reply_code) = domain_reply_code(input)?; - let (input, reply_text) = domain_reply_text(input)?; - let (input, class_id) = domain_class_id(input)?; - let (input, method_id) = domain_method_id(input)?; - Ok(( - input, - Class::Connection(Connection::Close { - reply_code, - reply_text, - class_id, - method_id, - }), - )) - } - fn connection_close_ok(input: &[u8]) -> IResult { - let (input, _) = tag([51])(input)?; - Ok((input, Class::Connection(Connection::CloseOk {}))) - } - fn connection_blocked(input: &[u8]) -> IResult { - let (input, _) = tag([60])(input)?; - let (input, reason) = domain_shortstr(input)?; - Ok((input, Class::Connection(Connection::Blocked { reason }))) - } - fn connection_unblocked(input: &[u8]) -> IResult { - let (input, _) = tag([61])(input)?; - Ok((input, Class::Connection(Connection::Unblocked {}))) - } - fn channel(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - alt(( - channel_open, - channel_open_ok, - channel_flow, - channel_flow_ok, - channel_close, - channel_close_ok, - ))(input) - } - fn channel_open(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, reserved_1) = domain_shortstr(input)?; - Ok((input, Class::Channel(Channel::Open { reserved_1 }))) - } - fn channel_open_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - let (input, reserved_1) = domain_longstr(input)?; - Ok((input, Class::Channel(Channel::OpenOk { reserved_1 }))) - } - fn channel_flow(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, bits) = bit(input, 1)?; - let active = bits[0]; - Ok((input, Class::Channel(Channel::Flow { active }))) - } - fn channel_flow_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - let (input, bits) = bit(input, 1)?; - let active = bits[0]; - Ok((input, Class::Channel(Channel::FlowOk { active }))) - } - fn channel_close(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - let (input, reply_code) = domain_reply_code(input)?; - let (input, reply_text) = domain_reply_text(input)?; - let (input, class_id) = domain_class_id(input)?; - let (input, method_id) = domain_method_id(input)?; - Ok(( - input, - Class::Channel(Channel::Close { - reply_code, - reply_text, - class_id, - method_id, - }), - )) - } - fn channel_close_ok(input: &[u8]) -> IResult { - let (input, _) = tag([41])(input)?; - Ok((input, Class::Channel(Channel::CloseOk {}))) - } - fn exchange(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - alt(( - exchange_declare, - exchange_declare_ok, - exchange_delete, - exchange_delete_ok, - ))(input) - } - fn exchange_declare(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, exchange) = domain_exchange_name(input)?; - if exchange.is_empty() { - fail!() - } - let (input, r#type) = domain_shortstr(input)?; - let (input, bits) = bit(input, 5)?; - let passive = bits[0]; - let durable = bits[1]; - let reserved_2 = bits[2]; - let reserved_3 = bits[3]; - let no_wait = bits[4]; - let (input, arguments) = domain_table(input)?; - Ok(( - input, - Class::Exchange(Exchange::Declare { - reserved_1, - exchange, - r#type, - passive, - durable, - reserved_2, - reserved_3, - no_wait, - arguments, - }), - )) - } - fn exchange_declare_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - Ok((input, Class::Exchange(Exchange::DeclareOk {}))) - } - fn exchange_delete(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, exchange) = domain_exchange_name(input)?; - if exchange.is_empty() { - fail!() - } - let (input, bits) = bit(input, 2)?; - let if_unused = bits[0]; - let no_wait = bits[1]; - Ok(( - input, - Class::Exchange(Exchange::Delete { - reserved_1, - exchange, - if_unused, - no_wait, - }), - )) - } - fn exchange_delete_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - Ok((input, Class::Exchange(Exchange::DeleteOk {}))) - } - fn queue(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - alt(( - queue_declare, - queue_declare_ok, - queue_bind, - queue_bind_ok, - queue_unbind, - queue_unbind_ok, - queue_purge, - queue_purge_ok, - queue_delete, - queue_delete_ok, - ))(input) - } - fn queue_declare(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, bits) = bit(input, 5)?; - let passive = bits[0]; - let durable = bits[1]; - let exclusive = bits[2]; - let auto_delete = bits[3]; - let no_wait = bits[4]; - let (input, arguments) = domain_table(input)?; - Ok(( - input, - Class::Queue(Queue::Declare { - reserved_1, - queue, - passive, - durable, - exclusive, - auto_delete, - no_wait, - arguments, - }), - )) - } - fn queue_declare_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - let (input, queue) = domain_queue_name(input)?; - if queue.is_empty() { - fail!() - } - let (input, message_count) = domain_message_count(input)?; - let (input, consumer_count) = domain_long(input)?; - Ok(( - input, - Class::Queue(Queue::DeclareOk { - queue, - message_count, - consumer_count, - }), - )) - } - fn queue_bind(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - let (input, bits) = bit(input, 1)?; - let no_wait = bits[0]; - let (input, arguments) = domain_table(input)?; - Ok(( - input, - Class::Queue(Queue::Bind { - reserved_1, - queue, - exchange, - routing_key, - no_wait, - arguments, - }), - )) - } - fn queue_bind_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - Ok((input, Class::Queue(Queue::BindOk {}))) - } - fn queue_unbind(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - let (input, arguments) = domain_table(input)?; - Ok(( - input, - Class::Queue(Queue::Unbind { - reserved_1, - queue, - exchange, - routing_key, - arguments, - }), - )) - } - fn queue_unbind_ok(input: &[u8]) -> IResult { - let (input, _) = tag([51])(input)?; - Ok((input, Class::Queue(Queue::UnbindOk {}))) - } - fn queue_purge(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, bits) = bit(input, 1)?; - let no_wait = bits[0]; - Ok(( - input, - Class::Queue(Queue::Purge { - reserved_1, - queue, - no_wait, - }), - )) - } - fn queue_purge_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - let (input, message_count) = domain_message_count(input)?; - Ok((input, Class::Queue(Queue::PurgeOk { message_count }))) - } - fn queue_delete(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, bits) = bit(input, 3)?; - let if_unused = bits[0]; - let if_empty = bits[1]; - let no_wait = bits[2]; - Ok(( - input, - Class::Queue(Queue::Delete { - reserved_1, - queue, - if_unused, - if_empty, - no_wait, - }), - )) - } - fn queue_delete_ok(input: &[u8]) -> IResult { - let (input, _) = tag([41])(input)?; - let (input, message_count) = domain_message_count(input)?; - Ok((input, Class::Queue(Queue::DeleteOk { message_count }))) - } - fn basic(input: &[u8]) -> IResult { - let (input, _) = tag([60])(input)?; - alt(( - basic_qos, - basic_qos_ok, - basic_consume, - basic_consume_ok, - basic_cancel, - basic_cancel_ok, - basic_publish, - basic_return, - basic_deliver, - basic_get, - basic_get_ok, - basic_get_empty, - basic_ack, - basic_reject, - basic_recover_async, - basic_recover, - basic_recover_ok, - ))(input) - } - fn basic_qos(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - let (input, prefetch_size) = domain_long(input)?; - let (input, prefetch_count) = domain_short(input)?; - let (input, bits) = bit(input, 1)?; - let global = bits[0]; - Ok(( - input, - Class::Basic(Basic::Qos { - prefetch_size, - prefetch_count, - global, - }), - )) - } - fn basic_qos_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - Ok((input, Class::Basic(Basic::QosOk {}))) - } - fn basic_consume(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - let (input, bits) = bit(input, 4)?; - let no_local = bits[0]; - let no_ack = bits[1]; - let exclusive = bits[2]; - let no_wait = bits[3]; - let (input, arguments) = domain_table(input)?; - Ok(( - input, - Class::Basic(Basic::Consume { - reserved_1, - queue, - consumer_tag, - no_local, - no_ack, - exclusive, - no_wait, - arguments, - }), - )) - } - fn basic_consume_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - Ok((input, Class::Basic(Basic::ConsumeOk { consumer_tag }))) - } - fn basic_cancel(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - let (input, bits) = bit(input, 1)?; - let no_wait = bits[0]; - Ok(( - input, - Class::Basic(Basic::Cancel { - consumer_tag, - no_wait, - }), - )) - } - fn basic_cancel_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - Ok((input, Class::Basic(Basic::CancelOk { consumer_tag }))) - } - fn basic_publish(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - let (input, bits) = bit(input, 2)?; - let mandatory = bits[0]; - let immediate = bits[1]; - Ok(( - input, - Class::Basic(Basic::Publish { - reserved_1, - exchange, - routing_key, - mandatory, - immediate, - }), - )) - } - fn basic_return(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - let (input, reply_code) = domain_reply_code(input)?; - let (input, reply_text) = domain_reply_text(input)?; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - Ok(( - input, - Class::Basic(Basic::Return { - reply_code, - reply_text, - exchange, - routing_key, - }), - )) - } - fn basic_deliver(input: &[u8]) -> IResult { - let (input, _) = tag([60])(input)?; - let (input, consumer_tag) = domain_consumer_tag(input)?; - let (input, delivery_tag) = domain_delivery_tag(input)?; - let (input, bits) = bit(input, 1)?; - let redelivered = bits[0]; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - Ok(( - input, - Class::Basic(Basic::Deliver { - consumer_tag, - delivery_tag, - redelivered, - exchange, - routing_key, - }), - )) - } - fn basic_get(input: &[u8]) -> IResult { - let (input, _) = tag([70])(input)?; - let (input, reserved_1) = domain_short(input)?; - let (input, queue) = domain_queue_name(input)?; - let (input, bits) = bit(input, 1)?; - let no_ack = bits[0]; - Ok(( - input, - Class::Basic(Basic::Get { - reserved_1, - queue, - no_ack, - }), - )) - } - fn basic_get_ok(input: &[u8]) -> IResult { - let (input, _) = tag([71])(input)?; - let (input, delivery_tag) = domain_delivery_tag(input)?; - let (input, bits) = bit(input, 1)?; - let redelivered = bits[0]; - let (input, exchange) = domain_exchange_name(input)?; - let (input, routing_key) = domain_shortstr(input)?; - let (input, message_count) = domain_message_count(input)?; - Ok(( - input, - Class::Basic(Basic::GetOk { - delivery_tag, - redelivered, - exchange, - routing_key, - message_count, - }), - )) - } - fn basic_get_empty(input: &[u8]) -> IResult { - let (input, _) = tag([72])(input)?; - let (input, reserved_1) = domain_shortstr(input)?; - Ok((input, Class::Basic(Basic::GetEmpty { reserved_1 }))) - } - fn basic_ack(input: &[u8]) -> IResult { - let (input, _) = tag([80])(input)?; - let (input, delivery_tag) = domain_delivery_tag(input)?; - let (input, bits) = bit(input, 1)?; - let multiple = bits[0]; - Ok(( - input, - Class::Basic(Basic::Ack { - delivery_tag, - multiple, - }), - )) - } - fn basic_reject(input: &[u8]) -> IResult { - let (input, _) = tag([90])(input)?; - let (input, delivery_tag) = domain_delivery_tag(input)?; - let (input, bits) = bit(input, 1)?; - let requeue = bits[0]; - Ok(( - input, - Class::Basic(Basic::Reject { - delivery_tag, - requeue, - }), - )) - } - fn basic_recover_async(input: &[u8]) -> IResult { - let (input, _) = tag([100])(input)?; - let (input, bits) = bit(input, 1)?; - let requeue = bits[0]; - Ok((input, Class::Basic(Basic::RecoverAsync { requeue }))) - } - fn basic_recover(input: &[u8]) -> IResult { - let (input, _) = tag([110])(input)?; - let (input, bits) = bit(input, 1)?; - let requeue = bits[0]; - Ok((input, Class::Basic(Basic::Recover { requeue }))) - } - fn basic_recover_ok(input: &[u8]) -> IResult { - let (input, _) = tag([111])(input)?; - Ok((input, Class::Basic(Basic::RecoverOk {}))) - } - fn tx(input: &[u8]) -> IResult { - let (input, _) = tag([90])(input)?; - alt(( - tx_select, - tx_select_ok, - tx_commit, - tx_commit_ok, - tx_rollback, - tx_rollback_ok, - ))(input) - } - fn tx_select(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - Ok((input, Class::Tx(Tx::Select {}))) - } - fn tx_select_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - Ok((input, Class::Tx(Tx::SelectOk {}))) - } - fn tx_commit(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - Ok((input, Class::Tx(Tx::Commit {}))) - } - fn tx_commit_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - Ok((input, Class::Tx(Tx::CommitOk {}))) - } - fn tx_rollback(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - Ok((input, Class::Tx(Tx::Rollback {}))) - } - fn tx_rollback_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - Ok((input, Class::Tx(Tx::RollbackOk {}))) - } } mod write { - use super::*; - use crate::classes::write_helper::*; - use crate::error::TransError; - use std::io::Write; +use super::*; +use crate::classes::write_helper::*; +use crate::error::TransError; +use std::io::Write; - pub fn write_method(class: Class, mut writer: W) -> Result<(), TransError> { - match class { - Class::Connection(Connection::Start { - version_major, - version_minor, - server_properties, - mechanisms, - locales, - }) => { - writer.write_all(&[10, 10])?; - octet(version_major, &mut writer)?; - octet(version_minor, &mut writer)?; - table(server_properties, &mut writer)?; - longstr(mechanisms, &mut writer)?; - longstr(locales, &mut writer)?; - } - Class::Connection(Connection::StartOk { - client_properties, - mechanism, - response, - locale, - }) => { - writer.write_all(&[10, 11])?; - table(client_properties, &mut writer)?; - shortstr(mechanism, &mut writer)?; - longstr(response, &mut writer)?; - shortstr(locale, &mut writer)?; - } - Class::Connection(Connection::Secure { challenge }) => { - writer.write_all(&[10, 20])?; - longstr(challenge, &mut writer)?; - } - Class::Connection(Connection::SecureOk { response }) => { - writer.write_all(&[10, 21])?; - longstr(response, &mut writer)?; - } - Class::Connection(Connection::Tune { - channel_max, - frame_max, - heartbeat, - }) => { - writer.write_all(&[10, 30])?; - short(channel_max, &mut writer)?; - long(frame_max, &mut writer)?; - short(heartbeat, &mut writer)?; - } - Class::Connection(Connection::TuneOk { - channel_max, - frame_max, - heartbeat, - }) => { - writer.write_all(&[10, 31])?; - short(channel_max, &mut writer)?; - long(frame_max, &mut writer)?; - short(heartbeat, &mut writer)?; - } - Class::Connection(Connection::Open { - virtual_host, - reserved_1, - reserved_2, - }) => { - writer.write_all(&[10, 40])?; - shortstr(virtual_host, &mut writer)?; - shortstr(reserved_1, &mut writer)?; - todo!(); - } - Class::Connection(Connection::OpenOk { reserved_1 }) => { - writer.write_all(&[10, 41])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Connection(Connection::Close { - reply_code, - reply_text, - class_id, - method_id, - }) => { - writer.write_all(&[10, 50])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - short(class_id, &mut writer)?; - short(method_id, &mut writer)?; - } - Class::Connection(Connection::CloseOk {}) => { - writer.write_all(&[10, 51])?; - } - Class::Connection(Connection::Blocked { reason }) => { - writer.write_all(&[10, 60])?; - shortstr(reason, &mut writer)?; - } - Class::Connection(Connection::Unblocked {}) => { - writer.write_all(&[10, 61])?; - } - Class::Channel(Channel::Open { reserved_1 }) => { - writer.write_all(&[20, 10])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Channel(Channel::OpenOk { reserved_1 }) => { - writer.write_all(&[20, 11])?; - longstr(reserved_1, &mut writer)?; - } - Class::Channel(Channel::Flow { active }) => { - writer.write_all(&[20, 20])?; - todo!(); - } - Class::Channel(Channel::FlowOk { active }) => { - writer.write_all(&[20, 21])?; - todo!(); - } - Class::Channel(Channel::Close { - reply_code, - reply_text, - class_id, - method_id, - }) => { - writer.write_all(&[20, 40])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - short(class_id, &mut writer)?; - short(method_id, &mut writer)?; - } - Class::Channel(Channel::CloseOk {}) => { - writer.write_all(&[20, 41])?; - } - Class::Exchange(Exchange::Declare { - reserved_1, - exchange, - r#type, - passive, - durable, - reserved_2, - reserved_3, - no_wait, - arguments, - }) => { - writer.write_all(&[40, 10])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(r#type, &mut writer)?; - todo!(); - todo!(); - todo!(); - todo!(); - todo!(); - table(arguments, &mut writer)?; - } - Class::Exchange(Exchange::DeclareOk {}) => { - writer.write_all(&[40, 11])?; - } - Class::Exchange(Exchange::Delete { - reserved_1, - exchange, - if_unused, - no_wait, - }) => { - writer.write_all(&[40, 20])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - todo!(); - todo!(); - } - Class::Exchange(Exchange::DeleteOk {}) => { - writer.write_all(&[40, 21])?; - } - Class::Queue(Queue::Declare { - reserved_1, - queue, - passive, - durable, - exclusive, - auto_delete, - no_wait, - arguments, - }) => { - writer.write_all(&[50, 10])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - todo!(); - todo!(); - todo!(); - todo!(); - todo!(); - table(arguments, &mut writer)?; - } - Class::Queue(Queue::DeclareOk { - queue, - message_count, - consumer_count, - }) => { - writer.write_all(&[50, 11])?; - shortstr(queue, &mut writer)?; - long(message_count, &mut writer)?; - long(consumer_count, &mut writer)?; - } - Class::Queue(Queue::Bind { - reserved_1, - queue, - exchange, - routing_key, - no_wait, - arguments, - }) => { - writer.write_all(&[50, 20])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - todo!(); - table(arguments, &mut writer)?; - } - Class::Queue(Queue::BindOk {}) => { - writer.write_all(&[50, 21])?; - } - Class::Queue(Queue::Unbind { - reserved_1, - queue, - exchange, - routing_key, - arguments, - }) => { - writer.write_all(&[50, 50])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Queue(Queue::UnbindOk {}) => { - writer.write_all(&[50, 51])?; - } - Class::Queue(Queue::Purge { - reserved_1, - queue, - no_wait, - }) => { - writer.write_all(&[50, 30])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - todo!(); - } - Class::Queue(Queue::PurgeOk { message_count }) => { - writer.write_all(&[50, 31])?; - long(message_count, &mut writer)?; - } - Class::Queue(Queue::Delete { - reserved_1, - queue, - if_unused, - if_empty, - no_wait, - }) => { - writer.write_all(&[50, 40])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - todo!(); - todo!(); - todo!(); - } - Class::Queue(Queue::DeleteOk { message_count }) => { - writer.write_all(&[50, 41])?; - long(message_count, &mut writer)?; - } - Class::Basic(Basic::Qos { - prefetch_size, - prefetch_count, - global, - }) => { - writer.write_all(&[60, 10])?; - long(prefetch_size, &mut writer)?; - short(prefetch_count, &mut writer)?; - todo!(); - } - Class::Basic(Basic::QosOk {}) => { - writer.write_all(&[60, 11])?; - } - Class::Basic(Basic::Consume { - reserved_1, - queue, - consumer_tag, - no_local, - no_ack, - exclusive, - no_wait, - arguments, - }) => { - writer.write_all(&[60, 20])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(consumer_tag, &mut writer)?; - todo!(); - todo!(); - todo!(); - todo!(); - table(arguments, &mut writer)?; - } - Class::Basic(Basic::ConsumeOk { consumer_tag }) => { - writer.write_all(&[60, 21])?; - shortstr(consumer_tag, &mut writer)?; - } - Class::Basic(Basic::Cancel { - consumer_tag, - no_wait, - }) => { - writer.write_all(&[60, 30])?; - shortstr(consumer_tag, &mut writer)?; - todo!(); - } - Class::Basic(Basic::CancelOk { consumer_tag }) => { - writer.write_all(&[60, 31])?; - shortstr(consumer_tag, &mut writer)?; - } - Class::Basic(Basic::Publish { - reserved_1, - exchange, - routing_key, - mandatory, - immediate, - }) => { - writer.write_all(&[60, 40])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - todo!(); - todo!(); - } - Class::Basic(Basic::Return { - reply_code, - reply_text, - exchange, - routing_key, - }) => { - writer.write_all(&[60, 50])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - } - Class::Basic(Basic::Deliver { - consumer_tag, - delivery_tag, - redelivered, - exchange, - routing_key, - }) => { - writer.write_all(&[60, 60])?; - shortstr(consumer_tag, &mut writer)?; - longlong(delivery_tag, &mut writer)?; - todo!(); - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - } - Class::Basic(Basic::Get { - reserved_1, - queue, - no_ack, - }) => { - writer.write_all(&[60, 70])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - todo!(); - } - Class::Basic(Basic::GetOk { - delivery_tag, - redelivered, - exchange, - routing_key, - message_count, - }) => { - writer.write_all(&[60, 71])?; - longlong(delivery_tag, &mut writer)?; - todo!(); - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - long(message_count, &mut writer)?; - } - Class::Basic(Basic::GetEmpty { reserved_1 }) => { - writer.write_all(&[60, 72])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Basic(Basic::Ack { - delivery_tag, - multiple, - }) => { - writer.write_all(&[60, 80])?; - longlong(delivery_tag, &mut writer)?; - todo!(); - } - Class::Basic(Basic::Reject { - delivery_tag, - requeue, - }) => { - writer.write_all(&[60, 90])?; - longlong(delivery_tag, &mut writer)?; - todo!(); - } - Class::Basic(Basic::RecoverAsync { requeue }) => { - writer.write_all(&[60, 100])?; - todo!(); - } - Class::Basic(Basic::Recover { requeue }) => { - writer.write_all(&[60, 110])?; - todo!(); - } - Class::Basic(Basic::RecoverOk {}) => { - writer.write_all(&[60, 111])?; - } - Class::Tx(Tx::Select {}) => { - writer.write_all(&[90, 10])?; - } - Class::Tx(Tx::SelectOk {}) => { - writer.write_all(&[90, 11])?; - } - Class::Tx(Tx::Commit {}) => { - writer.write_all(&[90, 20])?; - } - Class::Tx(Tx::CommitOk {}) => { - writer.write_all(&[90, 21])?; - } - Class::Tx(Tx::Rollback {}) => { - writer.write_all(&[90, 30])?; - } - Class::Tx(Tx::RollbackOk {}) => { - writer.write_all(&[90, 31])?; - } +pub fn write_method(class: Class, mut writer: W) -> Result<(), TransError> { + match class { + Class::Connection(Connection::Start { + version_major, + version_minor, + server_properties, + mechanisms, + locales, + }) => { + writer.write_all(&[10, 10])?; + octet(version_major, &mut writer)?; + octet(version_minor, &mut writer)?; + table(server_properties, &mut writer)?; + longstr(mechanisms, &mut writer)?; + longstr(locales, &mut writer)?; + } + Class::Connection(Connection::StartOk { + client_properties, + mechanism, + response, + locale, + }) => { + writer.write_all(&[10, 11])?; + table(client_properties, &mut writer)?; + shortstr(mechanism, &mut writer)?; + longstr(response, &mut writer)?; + shortstr(locale, &mut writer)?; + } + Class::Connection(Connection::Secure { + challenge, + }) => { + writer.write_all(&[10, 20])?; + longstr(challenge, &mut writer)?; + } + Class::Connection(Connection::SecureOk { + response, + }) => { + writer.write_all(&[10, 21])?; + longstr(response, &mut writer)?; + } + Class::Connection(Connection::Tune { + channel_max, + frame_max, + heartbeat, + }) => { + writer.write_all(&[10, 30])?; + short(channel_max, &mut writer)?; + long(frame_max, &mut writer)?; + short(heartbeat, &mut writer)?; + } + Class::Connection(Connection::TuneOk { + channel_max, + frame_max, + heartbeat, + }) => { + writer.write_all(&[10, 31])?; + short(channel_max, &mut writer)?; + long(frame_max, &mut writer)?; + short(heartbeat, &mut writer)?; + } + Class::Connection(Connection::Open { + virtual_host, + reserved_1, + reserved_2, + }) => { + writer.write_all(&[10, 40])?; + shortstr(virtual_host, &mut writer)?; + shortstr(reserved_1, &mut writer)?; + todo!(); + } + Class::Connection(Connection::OpenOk { + reserved_1, + }) => { + writer.write_all(&[10, 41])?; + shortstr(reserved_1, &mut writer)?; + } + Class::Connection(Connection::Close { + reply_code, + reply_text, + class_id, + method_id, + }) => { + writer.write_all(&[10, 50])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + short(class_id, &mut writer)?; + short(method_id, &mut writer)?; + } + Class::Connection(Connection::CloseOk { + }) => { + writer.write_all(&[10, 51])?; + } + Class::Connection(Connection::Blocked { + reason, + }) => { + writer.write_all(&[10, 60])?; + shortstr(reason, &mut writer)?; + } + Class::Connection(Connection::Unblocked { + }) => { + writer.write_all(&[10, 61])?; + } + Class::Channel(Channel::Open { + reserved_1, + }) => { + writer.write_all(&[20, 10])?; + shortstr(reserved_1, &mut writer)?; + } + Class::Channel(Channel::OpenOk { + reserved_1, + }) => { + writer.write_all(&[20, 11])?; + longstr(reserved_1, &mut writer)?; + } + Class::Channel(Channel::Flow { + active, + }) => { + writer.write_all(&[20, 20])?; + todo!(); + } + Class::Channel(Channel::FlowOk { + active, + }) => { + writer.write_all(&[20, 21])?; + todo!(); + } + Class::Channel(Channel::Close { + reply_code, + reply_text, + class_id, + method_id, + }) => { + writer.write_all(&[20, 40])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + short(class_id, &mut writer)?; + short(method_id, &mut writer)?; + } + Class::Channel(Channel::CloseOk { + }) => { + writer.write_all(&[20, 41])?; + } + Class::Exchange(Exchange::Declare { + reserved_1, + exchange, + r#type, + passive, + durable, + reserved_2, + reserved_3, + no_wait, + arguments, + }) => { + writer.write_all(&[40, 10])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(r#type, &mut writer)?; + todo!(); + todo!(); + todo!(); + todo!(); + todo!(); + table(arguments, &mut writer)?; + } + Class::Exchange(Exchange::DeclareOk { + }) => { + writer.write_all(&[40, 11])?; + } + Class::Exchange(Exchange::Delete { + reserved_1, + exchange, + if_unused, + no_wait, + }) => { + writer.write_all(&[40, 20])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + todo!(); + todo!(); + } + Class::Exchange(Exchange::DeleteOk { + }) => { + writer.write_all(&[40, 21])?; + } + Class::Queue(Queue::Declare { + reserved_1, + queue, + passive, + durable, + exclusive, + auto_delete, + no_wait, + arguments, + }) => { + writer.write_all(&[50, 10])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + todo!(); + todo!(); + todo!(); + todo!(); + todo!(); + table(arguments, &mut writer)?; + } + Class::Queue(Queue::DeclareOk { + queue, + message_count, + consumer_count, + }) => { + writer.write_all(&[50, 11])?; + shortstr(queue, &mut writer)?; + long(message_count, &mut writer)?; + long(consumer_count, &mut writer)?; + } + Class::Queue(Queue::Bind { + reserved_1, + queue, + exchange, + routing_key, + no_wait, + arguments, + }) => { + writer.write_all(&[50, 20])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + todo!(); + table(arguments, &mut writer)?; + } + Class::Queue(Queue::BindOk { + }) => { + writer.write_all(&[50, 21])?; + } + Class::Queue(Queue::Unbind { + reserved_1, + queue, + exchange, + routing_key, + arguments, + }) => { + writer.write_all(&[50, 50])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + table(arguments, &mut writer)?; + } + Class::Queue(Queue::UnbindOk { + }) => { + writer.write_all(&[50, 51])?; + } + Class::Queue(Queue::Purge { + reserved_1, + queue, + no_wait, + }) => { + writer.write_all(&[50, 30])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + todo!(); + } + Class::Queue(Queue::PurgeOk { + message_count, + }) => { + writer.write_all(&[50, 31])?; + long(message_count, &mut writer)?; + } + Class::Queue(Queue::Delete { + reserved_1, + queue, + if_unused, + if_empty, + no_wait, + }) => { + writer.write_all(&[50, 40])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + todo!(); + todo!(); + todo!(); + } + Class::Queue(Queue::DeleteOk { + message_count, + }) => { + writer.write_all(&[50, 41])?; + long(message_count, &mut writer)?; + } + Class::Basic(Basic::Qos { + prefetch_size, + prefetch_count, + global, + }) => { + writer.write_all(&[60, 10])?; + long(prefetch_size, &mut writer)?; + short(prefetch_count, &mut writer)?; + todo!(); + } + Class::Basic(Basic::QosOk { + }) => { + writer.write_all(&[60, 11])?; + } + Class::Basic(Basic::Consume { + reserved_1, + queue, + consumer_tag, + no_local, + no_ack, + exclusive, + no_wait, + arguments, + }) => { + writer.write_all(&[60, 20])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(consumer_tag, &mut writer)?; + todo!(); + todo!(); + todo!(); + todo!(); + table(arguments, &mut writer)?; + } + Class::Basic(Basic::ConsumeOk { + consumer_tag, + }) => { + writer.write_all(&[60, 21])?; + shortstr(consumer_tag, &mut writer)?; + } + Class::Basic(Basic::Cancel { + consumer_tag, + no_wait, + }) => { + writer.write_all(&[60, 30])?; + shortstr(consumer_tag, &mut writer)?; + todo!(); + } + Class::Basic(Basic::CancelOk { + consumer_tag, + }) => { + writer.write_all(&[60, 31])?; + shortstr(consumer_tag, &mut writer)?; + } + Class::Basic(Basic::Publish { + reserved_1, + exchange, + routing_key, + mandatory, + immediate, + }) => { + writer.write_all(&[60, 40])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + todo!(); + todo!(); + } + Class::Basic(Basic::Return { + reply_code, + reply_text, + exchange, + routing_key, + }) => { + writer.write_all(&[60, 50])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + } + Class::Basic(Basic::Deliver { + consumer_tag, + delivery_tag, + redelivered, + exchange, + routing_key, + }) => { + writer.write_all(&[60, 60])?; + shortstr(consumer_tag, &mut writer)?; + longlong(delivery_tag, &mut writer)?; + todo!(); + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + } + Class::Basic(Basic::Get { + reserved_1, + queue, + no_ack, + }) => { + writer.write_all(&[60, 70])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + todo!(); + } + Class::Basic(Basic::GetOk { + delivery_tag, + redelivered, + exchange, + routing_key, + message_count, + }) => { + writer.write_all(&[60, 71])?; + longlong(delivery_tag, &mut writer)?; + todo!(); + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + long(message_count, &mut writer)?; + } + Class::Basic(Basic::GetEmpty { + reserved_1, + }) => { + writer.write_all(&[60, 72])?; + shortstr(reserved_1, &mut writer)?; + } + Class::Basic(Basic::Ack { + delivery_tag, + multiple, + }) => { + writer.write_all(&[60, 80])?; + longlong(delivery_tag, &mut writer)?; + todo!(); + } + Class::Basic(Basic::Reject { + delivery_tag, + requeue, + }) => { + writer.write_all(&[60, 90])?; + longlong(delivery_tag, &mut writer)?; + todo!(); + } + Class::Basic(Basic::RecoverAsync { + requeue, + }) => { + writer.write_all(&[60, 100])?; + todo!(); + } + Class::Basic(Basic::Recover { + requeue, + }) => { + writer.write_all(&[60, 110])?; + todo!(); + } + Class::Basic(Basic::RecoverOk { + }) => { + writer.write_all(&[60, 111])?; + } + Class::Tx(Tx::Select { + }) => { + writer.write_all(&[90, 10])?; + } + Class::Tx(Tx::SelectOk { + }) => { + writer.write_all(&[90, 11])?; + } + Class::Tx(Tx::Commit { + }) => { + writer.write_all(&[90, 20])?; + } + Class::Tx(Tx::CommitOk { + }) => { + writer.write_all(&[90, 21])?; + } + Class::Tx(Tx::Rollback { + }) => { + writer.write_all(&[90, 30])?; + } + Class::Tx(Tx::RollbackOk { + }) => { + writer.write_all(&[90, 31])?; } - Ok(()) } + Ok(()) +} } diff --git a/amqp_transport/src/classes/mod.rs b/amqp_transport/src/classes/mod.rs index 35cf5f2..f132d0e 100644 --- a/amqp_transport/src/classes/mod.rs +++ b/amqp_transport/src/classes/mod.rs @@ -6,7 +6,9 @@ mod generated; mod parse_helper; mod write_helper; -pub type Table = HashMap; +pub type TableFieldName = String; + +pub type Table = HashMap; #[derive(Debug, Clone, PartialEq)] pub enum FieldValue { @@ -22,8 +24,8 @@ pub enum FieldValue { Float(f32), Double(f64), DecimalValue(u8, u32), - ShortString(String), - LongString(String), + ShortString(Shortstr), + LongString(Longstr), FieldArray(Vec), Timestamp(u64), FieldTable(Table), diff --git a/amqp_transport/src/classes/write_helper.rs b/amqp_transport/src/classes/write_helper.rs index 19d6eb5..8190ebd 100644 --- a/amqp_transport/src/classes/write_helper.rs +++ b/amqp_transport/src/classes/write_helper.rs @@ -1,47 +1,193 @@ use crate::classes::generated::{ Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp, }; +use crate::classes::{FieldValue, TableFieldName}; use crate::error::TransError; +use anyhow::Context; use std::io; use std::io::Write; -fn error(e: io::Error) -> TransError { - TransError::Other(e.into()) -} - pub fn octet(value: Octet, writer: &mut W) -> Result<(), TransError> { writer.write_all(&[value])?; Ok(()) } pub fn short(value: Short, writer: &mut W) -> Result<(), TransError> { - todo!() + writer.write_all(&value.to_be_bytes())?; + Ok(()) } pub fn long(value: Long, writer: &mut W) -> Result<(), TransError> { - todo!() + writer.write_all(&value.to_be_bytes())?; + Ok(()) } pub fn longlong(value: Longlong, writer: &mut W) -> Result<(), TransError> { - todo!() + writer.write_all(&value.to_be_bytes())?; + Ok(()) } pub fn bit(value: Vec, writer: &mut W) -> Result<(), TransError> { - todo!() + // accumulate bits into bytes, starting from the least significant bit in each byte + + // how many bits have already been packed into `current_buf` + let mut already_filled = 0; + let mut current_buf = 0u8; + + for bit in value { + if already_filled >= 8 { + writer.write_all(&[current_buf])?; + current_buf = 0; + already_filled = 0; + } + + let new_bit = (u8::from(bit)) << already_filled; + current_buf |= new_bit; + already_filled += 1; + } + + if already_filled > 0 { + writer.write_all(&[current_buf])?; + } + + Ok(()) } pub fn shortstr(value: Shortstr, writer: &mut W) -> Result<(), TransError> { - todo!() + let len = u8::try_from(value.len()).context("shortstr too long")?; + writer.write_all(&[len])?; + writer.write_all(value.as_bytes())?; + + Ok(()) } pub fn longstr(value: Longstr, writer: &mut W) -> Result<(), TransError> { - todo!() + let len = u32::try_from(value.len()).context("longstr too long")?; + writer.write_all(&len.to_be_bytes())?; + writer.write_all(value.as_slice())?; + + Ok(()) } pub fn timestamp(value: Timestamp, writer: &mut W) -> Result<(), TransError> { - todo!() + writer.write_all(&value.to_be_bytes())?; + Ok(()) } -pub fn table(value: Table, writer: &mut W) -> Result<(), TransError> { - todo!() +pub fn table(table: Table, writer: &mut W) -> Result<(), TransError> { + let len = u32::try_from(table.len()).context("table too big")?; + writer.write_all(&len.to_be_bytes())?; + + for (field_name, value) in table { + shortstr(field_name, writer)?; + field_value(value, writer)?; + } + + Ok(()) +} + +fn field_value(value: FieldValue, writer: &mut W) -> Result<(), TransError> { + match value { + FieldValue::Boolean(bool) => { + writer.write_all(&[b't', u8::from(bool)])?; + } + FieldValue::ShortShortInt(int) => { + writer.write_all(b"b")?; + writer.write_all(&int.to_be_bytes())?; + } + FieldValue::ShortShortUInt(int) => { + writer.write_all(&[b'B', int])?; + } + FieldValue::ShortInt(int) => { + writer.write_all(b"U")?; + writer.write_all(&int.to_be_bytes())?; + } + FieldValue::ShortUInt(int) => { + writer.write_all(b"u")?; + writer.write_all(&int.to_be_bytes())?; + } + FieldValue::LongInt(int) => { + writer.write_all(b"I")?; + writer.write_all(&int.to_be_bytes())?; + } + FieldValue::LongUInt(int) => { + writer.write_all(b"i")?; + writer.write_all(&int.to_be_bytes())?; + } + FieldValue::LongLongInt(int) => { + writer.write_all(b"L")?; + writer.write_all(&int.to_be_bytes())?; + } + FieldValue::LongLongUInt(int) => { + writer.write_all(b"l")?; + writer.write_all(&int.to_be_bytes())?; + } + FieldValue::Float(float) => { + writer.write_all(b"f")?; + writer.write_all(&float.to_be_bytes())?; + } + FieldValue::Double(float) => { + writer.write_all(b"d")?; + writer.write_all(&float.to_be_bytes())?; + } + FieldValue::DecimalValue(scale, long) => { + writer.write_all(&[b'D', scale])?; + writer.write_all(&long.to_be_bytes())?; + } + FieldValue::ShortString(str) => { + writer.write_all(b"s")?; + shortstr(str, writer)?; + } + FieldValue::LongString(str) => { + writer.write_all(b"S")?; + longstr(str, writer)?; + } + FieldValue::FieldArray(array) => { + writer.write_all(b"A")?; + let len = u32::try_from(array.len()).context("array too long")?; + writer.write_all(&len.to_be_bytes())?; + + for element in array { + field_value(element, writer)?; + } + } + FieldValue::Timestamp(time) => { + writer.write_all(b"T")?; + writer.write_all(&time.to_be_bytes())?; + } + FieldValue::FieldTable(_) => { + writer.write_all(b"F")?; + } + FieldValue::Void => { + writer.write_all(b"V")?; + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + #[test] + fn pack_few_bits() { + let bits = vec![true, false, true]; + + let mut buffer = [0u8; 1]; + super::bit(bits, &mut buffer.as_mut_slice()).unwrap(); + + assert_eq!(buffer, [0b00000101]) + } + + #[test] + fn pack_many_bits() { + let bits = vec![ + /* first 8 */ + true, true, true, true, false, false, false, false, /* second 4 */ + true, false, true, true, + ]; + + let mut buffer = [0u8; 2]; + super::bit(bits, &mut buffer.as_mut_slice()).unwrap(); + + assert_eq!(buffer, [0b00001111, 0b00001101]); + } }