diff --git a/amqp_core/src/lib.rs b/amqp_core/src/lib.rs index 0a5f487..42ecdd0 100644 --- a/amqp_core/src/lib.rs +++ b/amqp_core/src/lib.rs @@ -4,6 +4,8 @@ use std::net::SocketAddr; use std::sync::Arc; use uuid::Uuid; +type Handle = Arc>; + #[derive(Debug, Clone)] pub struct GlobalData { inner: Arc>, @@ -14,6 +16,7 @@ impl Default for GlobalData { Self { inner: Arc::new(Mutex::new(GlobalDataInner { connections: HashMap::new(), + channels: HashMap::new(), })), } } @@ -28,15 +31,17 @@ impl GlobalData { #[derive(Debug)] pub struct GlobalDataInner { pub connections: HashMap, + pub channels: HashMap, } -pub type ConnectionHandle = Arc>; +pub type ConnectionHandle = Handle; #[derive(Debug)] pub struct Connection { pub id: Uuid, pub peer_addr: SocketAddr, pub global_data: GlobalData, + pub channels: HashMap, } impl Connection { @@ -49,6 +54,7 @@ impl Connection { id, peer_addr, global_data, + channels: HashMap::new(), })) } @@ -57,3 +63,34 @@ impl Connection { global_data.connections.remove(&self.id); } } + +pub type ChannelHandle = Handle; + +#[derive(Debug)] +pub struct Channel { + pub id: Uuid, + pub num: u16, + pub connection: ConnectionHandle, + pub global_data: GlobalData, +} + +impl Channel { + pub fn new_handle( + id: Uuid, + num: u16, + connection: ConnectionHandle, + global_data: GlobalData, + ) -> ChannelHandle { + Arc::new(Mutex::new(Self { + id, + num, + connection, + global_data, + })) + } + + pub fn close(&self) { + let mut global_data = self.global_data.lock(); + global_data.channels.remove(&self.id); + } +} diff --git a/amqp_dashboard/assets/.prettierrc.json b/amqp_dashboard/assets/.prettierrc.json new file mode 100644 index 0000000..544138b --- /dev/null +++ b/amqp_dashboard/assets/.prettierrc.json @@ -0,0 +1,3 @@ +{ + "singleQuote": true +} diff --git a/amqp_dashboard/assets/index.html b/amqp_dashboard/assets/index.html index 17f3185..1b3a744 100644 --- a/amqp_dashboard/assets/index.html +++ b/amqp_dashboard/assets/index.html @@ -1,19 +1,20 @@ - + - - - - + + + + AMQP Data - - - + + + +

AMQP Data

+

Connections

+
-

AMQP Data

-

Connections

-
-
- - - - \ No newline at end of file + + + diff --git a/amqp_dashboard/assets/script.js b/amqp_dashboard/assets/script.js index 7485f2c..2a7f630 100644 --- a/amqp_dashboard/assets/script.js +++ b/amqp_dashboard/assets/script.js @@ -1,41 +1,48 @@ const renderTable = (colNames, rows) => { - const table = document.createElement("table"); + const table = document.createElement('table'); - const headerRow = document.createElement("tr"); + const headerRow = document.createElement('tr'); - colNames.forEach((name) => { - const th = document.createElement("th"); - th.innerText = name; + colNames.forEach((name) => { + const th = document.createElement('th'); + th.innerText = name; - headerRow.append(th); + headerRow.append(th); + }); + table.append(headerRow); + + rows.forEach((row) => { + const contentRow = document.createElement('tr'); + row.forEach((cell) => { + const td = document.createElement('td'); + td.innerText = cell; + contentRow.append(td); }); - table.append(headerRow); + table.append(contentRow); + }); - rows.forEach((row) => { - const contentRow = document.createElement("tr"); - row.forEach((cell) => { - const td = document.createElement("td"); - td.innerText = cell; - contentRow.append(td); - }); - table.append(contentRow); - }) - - return table; -} + return table; +}; const renderConnections = (connections) => { - const wrapper = document.getElementById("connection-wrapper"); + const wrapper = document.getElementById('connection-wrapper'); - const table = renderTable(['Connection ID', 'Client Address'], connections.map((conn) => - [conn.id, conn.peer_addr])); - wrapper.replaceChildren(table) -} + const table = renderTable( + ['Connection ID', 'Client Address', 'Channels'], + connections.map((conn) => { + const channels = conn.channels + .map((chan) => `${chan.number} - ${chan.id}`) + .join('\n'); + return [conn.id, conn.peer_addr, channels]; + }) + ); + wrapper.replaceChildren(table); +}; const refresh = async () => { - const fetched = await fetch('http://localhost:3000/api/data'); - const data = await fetched.json(); - renderConnections(data.connections); -} + const fetched = await fetch('http://localhost:3000/api/data'); + const data = await fetched.json(); + renderConnections(data.connections); +}; setInterval(refresh, 1000); diff --git a/amqp_dashboard/assets/style.css b/amqp_dashboard/assets/style.css index 53fa93b..ba9150d 100644 --- a/amqp_dashboard/assets/style.css +++ b/amqp_dashboard/assets/style.css @@ -1,10 +1,12 @@ html { - font-family: arial, sans-serif; - margin: 10px; + font-family: arial, sans-serif; + margin: 10px; } -table, th, td { - border: 1px solid black; - border-collapse: collapse; - padding: 10px; -} \ No newline at end of file +table, +th, +td { + border: 1px solid black; + border-collapse: collapse; + padding: 10px; +} diff --git a/amqp_dashboard/src/lib.rs b/amqp_dashboard/src/lib.rs index 6ef3867..9e3282d 100644 --- a/amqp_dashboard/src/lib.rs +++ b/amqp_dashboard/src/lib.rs @@ -55,6 +55,13 @@ struct Data { struct Connection { id: String, peer_addr: String, + channels: Vec, +} + +#[derive(Serialize)] +struct Channel { + id: String, + number: u16, } async fn get_data(global_data: GlobalData) -> impl IntoResponse { @@ -68,6 +75,17 @@ async fn get_data(global_data: GlobalData) -> impl IntoResponse { Connection { id: conn.id.to_string(), peer_addr: conn.peer_addr.to_string(), + channels: conn + .channels + .values() + .map(|chan| { + let chan = chan.lock(); + Channel { + id: chan.id.to_string(), + number: chan.num, + } + }) + .collect(), } }) .collect(); diff --git a/amqp_transport/src/classes/generated.rs b/amqp_transport/src/classes/generated.rs index 0a0daae..93b6ee3 100644 --- a/amqp_transport/src/classes/generated.rs +++ b/amqp_transport/src/classes/generated.rs @@ -238,9 +238,7 @@ pub enum Connection { reserved_2: Bit, }, /// This method signals to the client that the connection is ready for use. - OpenOk { - reserved_1: Shortstr, - }, + OpenOk { reserved_1: Shortstr }, /// This method indicates that the sender wants to close the connection. This may be /// due to internal conditions (e.g. a forced shut-down) or due to an error handling /// a specific method, i.e. an exception. When a close is due to an exception, the @@ -263,13 +261,9 @@ pub enum Connection { #[derive(Debug, Clone, PartialEq)] pub enum Channel { /// This method opens a channel to the server. - Open { - reserved_1: Shortstr, - }, + Open { reserved_1: Shortstr }, /// This method signals to the client that the channel is ready for use. - OpenOk { - reserved_1: Longstr, - }, + OpenOk { reserved_1: Longstr }, /// This method asks the peer to pause or restart the flow of content data sent by /// a consumer. This is a simple flow-control mechanism that a peer can use to avoid /// overflowing its queues or otherwise finding itself receiving more messages than @@ -545,9 +539,7 @@ pub enum Basic { no_wait: NoWait, }, /// This method confirms that the cancellation was completed. - CancelOk { - consumer_tag: ConsumerTag, - }, + CancelOk { consumer_tag: ConsumerTag }, /// This method publishes a message to a specific exchange. The message will be routed /// to queues as defined by the exchange configuration and distributed to any active /// consumers when the transaction, if any, is committed. @@ -621,9 +613,7 @@ pub enum Basic { }, /// This method tells the client that the queue has no messages available for the /// client. - GetEmpty { - reserved_1: Shortstr, - }, + GetEmpty { reserved_1: Shortstr }, /// This method acknowledges one or more messages delivered via the Deliver or Get-Ok /// methods. The client can ask to confirm a single message or a set of messages up to /// and including a specific message. @@ -698,1402 +688,1711 @@ pub enum Tx { RollbackOk, } pub mod parse { -use super::*; -use crate::classes::parse_helper::*; -use crate::error::TransError; -use nom::{branch::alt, bytes::complete::tag}; -use regex::Regex; -use once_cell::sync::Lazy; + use super::*; + use crate::classes::parse_helper::*; + use crate::error::TransError; + use nom::{branch::alt, bytes::complete::tag}; + use once_cell::sync::Lazy; + use regex::Regex; -pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; - -pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err> { - alt((connection, channel, exchange, queue, basic, tx))(input) -} -fn domain_class_id(input: &[u8]) -> IResult { - short(input) -} -fn domain_consumer_tag(input: &[u8]) -> IResult { - shortstr(input) -} -fn domain_delivery_tag(input: &[u8]) -> IResult { - longlong(input) -} -fn domain_exchange_name(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.len() > 127 { fail!("value is shorter than 127 for field result") } - static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); - if !REGEX.is_match(&result) { fail!(r"regex `^[a-zA-Z0-9-_.:]*$` did not match value for field result") } - Ok((input, result)) -} -fn domain_method_id(input: &[u8]) -> IResult { - short(input) -} -fn domain_path(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.is_empty() { fail!("string was null for field result") } - if result.len() > 127 { fail!("value is shorter than 127 for field result") } - Ok((input, result)) -} -fn domain_peer_properties(input: &[u8]) -> IResult { - table(input) -} -fn domain_queue_name(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.len() > 127 { fail!("value is shorter than 127 for field result") } - static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); - if !REGEX.is_match(&result) { fail!(r"regex `^[a-zA-Z0-9-_.:]*$` did not match value for field result") } - Ok((input, result)) -} -fn domain_message_count(input: &[u8]) -> IResult { - long(input) -} -fn domain_reply_code(input: &[u8]) -> IResult { - let (input, result) = short(input)?; - if result == 0 { fail!("number was 0 for field result") } - Ok((input, result)) -} -fn domain_reply_text(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.is_empty() { fail!("string was null for field result") } - Ok((input, result)) -} -fn domain_octet(input: &[u8]) -> IResult { - octet(input) -} -fn domain_short(input: &[u8]) -> IResult { - short(input) -} -fn domain_long(input: &[u8]) -> IResult { - long(input) -} -fn domain_longlong(input: &[u8]) -> IResult { - longlong(input) -} -fn domain_shortstr(input: &[u8]) -> IResult { - shortstr(input) -} -fn domain_longstr(input: &[u8]) -> IResult { - longstr(input) -} -fn domain_timestamp(input: &[u8]) -> IResult { - timestamp(input) -} -fn domain_table(input: &[u8]) -> IResult { - table(input) -} -fn connection(input: &[u8]) -> IResult { - let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("invalid tag for class connection"))?; - alt((connection_start, connection_start_ok, connection_secure, connection_secure_ok, connection_tune, connection_tune_ok, connection_open, connection_open_ok, connection_close, connection_close_ok))(input).map_err(err("class connection")).map_err(failure) -} -fn connection_start(input: &[u8]) -> IResult { - let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, version_major) = domain_octet(input).map_err(err("field version-major in method start")).map_err(failure)?; - let (input, version_minor) = domain_octet(input).map_err(err("field version-minor in method start")).map_err(failure)?; - let (input, server_properties) = domain_peer_properties(input).map_err(err("field server-properties in method start")).map_err(failure)?; - let (input, mechanisms) = domain_longstr(input).map_err(err("field mechanisms in method start")).map_err(failure)?; - if mechanisms.is_empty() { fail!("string was null for field mechanisms") } - let (input, locales) = domain_longstr(input).map_err(err("field locales in method start")).map_err(failure)?; - if locales.is_empty() { fail!("string was null for field locales") } - Ok((input, Class::Connection(Connection::Start { - version_major, - version_minor, - server_properties, - mechanisms, - locales, - }))) -} -fn connection_start_ok(input: &[u8]) -> IResult { - let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, client_properties) = domain_peer_properties(input).map_err(err("field client-properties in method start-ok")).map_err(failure)?; - let (input, mechanism) = domain_shortstr(input).map_err(err("field mechanism in method start-ok")).map_err(failure)?; - if mechanism.is_empty() { fail!("string was null for field mechanism") } - let (input, response) = domain_longstr(input).map_err(err("field response in method start-ok")).map_err(failure)?; - if response.is_empty() { fail!("string was null for field response") } - let (input, locale) = domain_shortstr(input).map_err(err("field locale in method start-ok")).map_err(failure)?; - if locale.is_empty() { fail!("string was null for field locale") } - Ok((input, Class::Connection(Connection::StartOk { - client_properties, - mechanism, - response, - locale, - }))) -} -fn connection_secure(input: &[u8]) -> IResult { - let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, challenge) = domain_longstr(input).map_err(err("field challenge in method secure")).map_err(failure)?; - Ok((input, Class::Connection(Connection::Secure { - challenge, - }))) -} -fn connection_secure_ok(input: &[u8]) -> IResult { - let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, response) = domain_longstr(input).map_err(err("field response in method secure-ok")).map_err(failure)?; - if response.is_empty() { fail!("string was null for field response") } - Ok((input, Class::Connection(Connection::SecureOk { - response, - }))) -} -fn connection_tune(input: &[u8]) -> IResult { - let (input, _) = tag(30_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, channel_max) = domain_short(input).map_err(err("field channel-max in method tune")).map_err(failure)?; - let (input, frame_max) = domain_long(input).map_err(err("field frame-max in method tune")).map_err(failure)?; - let (input, heartbeat) = domain_short(input).map_err(err("field heartbeat in method tune")).map_err(failure)?; - Ok((input, Class::Connection(Connection::Tune { - channel_max, - frame_max, - heartbeat, - }))) -} -fn connection_tune_ok(input: &[u8]) -> IResult { - let (input, _) = tag(31_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, channel_max) = domain_short(input).map_err(err("field channel-max in method tune-ok")).map_err(failure)?; - if channel_max == 0 { fail!("number was 0 for field channel_max") } - let (input, frame_max) = domain_long(input).map_err(err("field frame-max in method tune-ok")).map_err(failure)?; - let (input, heartbeat) = domain_short(input).map_err(err("field heartbeat in method tune-ok")).map_err(failure)?; - Ok((input, Class::Connection(Connection::TuneOk { - channel_max, - frame_max, - heartbeat, - }))) -} -fn connection_open(input: &[u8]) -> IResult { - let (input, _) = tag(40_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, virtual_host) = domain_path(input).map_err(err("field virtual-host in method open")).map_err(failure)?; - let (input, reserved_1) = domain_shortstr(input).map_err(err("field reserved-1 in method open")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field reserved-2 in method open")).map_err(failure)?; - let reserved_2 = bits[0]; - Ok((input, Class::Connection(Connection::Open { - virtual_host, - reserved_1, - reserved_2, - }))) -} -fn connection_open_ok(input: &[u8]) -> IResult { - let (input, _) = tag(41_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_shortstr(input).map_err(err("field reserved-1 in method open-ok")).map_err(failure)?; - Ok((input, Class::Connection(Connection::OpenOk { - reserved_1, - }))) -} -fn connection_close(input: &[u8]) -> IResult { - let (input, _) = tag(50_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reply_code) = domain_reply_code(input).map_err(err("field reply-code in method close")).map_err(failure)?; - let (input, reply_text) = domain_reply_text(input).map_err(err("field reply-text in method close")).map_err(failure)?; - let (input, class_id) = domain_class_id(input).map_err(err("field class-id in method close")).map_err(failure)?; - let (input, method_id) = domain_method_id(input).map_err(err("field method-id in method close")).map_err(failure)?; - Ok((input, Class::Connection(Connection::Close { - reply_code, - reply_text, - class_id, - method_id, - }))) -} -fn connection_close_ok(input: &[u8]) -> IResult { - let (input, _) = tag(51_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Connection(Connection::CloseOk { - }))) -} -fn channel(input: &[u8]) -> IResult { - let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("invalid tag for class channel"))?; - alt((channel_open, channel_open_ok, channel_flow, channel_flow_ok, channel_close, channel_close_ok))(input).map_err(err("class channel")).map_err(failure) -} -fn channel_open(input: &[u8]) -> IResult { - let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_shortstr(input).map_err(err("field reserved-1 in method open")).map_err(failure)?; - Ok((input, Class::Channel(Channel::Open { - reserved_1, - }))) -} -fn channel_open_ok(input: &[u8]) -> IResult { - let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_longstr(input).map_err(err("field reserved-1 in method open-ok")).map_err(failure)?; - Ok((input, Class::Channel(Channel::OpenOk { - reserved_1, - }))) -} -fn channel_flow(input: &[u8]) -> IResult { - let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, bits) = bit(input, 1).map_err(err("field active in method flow")).map_err(failure)?; - let active = bits[0]; - Ok((input, Class::Channel(Channel::Flow { - active, - }))) -} -fn channel_flow_ok(input: &[u8]) -> IResult { - let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, bits) = bit(input, 1).map_err(err("field active in method flow-ok")).map_err(failure)?; - let active = bits[0]; - Ok((input, Class::Channel(Channel::FlowOk { - active, - }))) -} -fn channel_close(input: &[u8]) -> IResult { - let (input, _) = tag(40_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reply_code) = domain_reply_code(input).map_err(err("field reply-code in method close")).map_err(failure)?; - let (input, reply_text) = domain_reply_text(input).map_err(err("field reply-text in method close")).map_err(failure)?; - let (input, class_id) = domain_class_id(input).map_err(err("field class-id in method close")).map_err(failure)?; - let (input, method_id) = domain_method_id(input).map_err(err("field method-id in method close")).map_err(failure)?; - Ok((input, Class::Channel(Channel::Close { - reply_code, - reply_text, - class_id, - method_id, - }))) -} -fn channel_close_ok(input: &[u8]) -> IResult { - let (input, _) = tag(41_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Channel(Channel::CloseOk { - }))) -} -fn exchange(input: &[u8]) -> IResult { - let (input, _) = tag(40_u16.to_be_bytes())(input).map_err(err("invalid tag for class exchange"))?; - alt((exchange_declare, exchange_declare_ok, exchange_delete, exchange_delete_ok))(input).map_err(err("class exchange")).map_err(failure) -} -fn exchange_declare(input: &[u8]) -> IResult { - let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method declare")).map_err(failure)?; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method declare")).map_err(failure)?; - if exchange.is_empty() { fail!("string was null for field exchange") } - let (input, r#type) = domain_shortstr(input).map_err(err("field type in method declare")).map_err(failure)?; - let (input, bits) = bit(input, 5).map_err(err("field passive in method declare")).map_err(failure)?; - let passive = bits[0]; - let durable = bits[1]; - let reserved_2 = bits[2]; - let reserved_3 = bits[3]; - let no_wait = bits[4]; - let (input, arguments) = domain_table(input).map_err(err("field arguments in method declare")).map_err(failure)?; - Ok((input, Class::Exchange(Exchange::Declare { - reserved_1, - exchange, - r#type, - passive, - durable, - reserved_2, - reserved_3, - no_wait, - arguments, - }))) -} -fn exchange_declare_ok(input: &[u8]) -> IResult { - let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Exchange(Exchange::DeclareOk { - }))) -} -fn exchange_delete(input: &[u8]) -> IResult { - let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method delete")).map_err(failure)?; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method delete")).map_err(failure)?; - if exchange.is_empty() { fail!("string was null for field exchange") } - let (input, bits) = bit(input, 2).map_err(err("field if-unused in method delete")).map_err(failure)?; - let if_unused = bits[0]; - let no_wait = bits[1]; - Ok((input, Class::Exchange(Exchange::Delete { - reserved_1, - exchange, - if_unused, - no_wait, - }))) -} -fn exchange_delete_ok(input: &[u8]) -> IResult { - let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Exchange(Exchange::DeleteOk { - }))) -} -fn queue(input: &[u8]) -> IResult { - let (input, _) = tag(50_u16.to_be_bytes())(input).map_err(err("invalid tag for class queue"))?; - alt((queue_declare, queue_declare_ok, queue_bind, queue_bind_ok, queue_unbind, queue_unbind_ok, queue_purge, queue_purge_ok, queue_delete, queue_delete_ok))(input).map_err(err("class queue")).map_err(failure) -} -fn queue_declare(input: &[u8]) -> IResult { - let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method declare")).map_err(failure)?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method declare")).map_err(failure)?; - let (input, bits) = bit(input, 5).map_err(err("field passive in method declare")).map_err(failure)?; - let passive = bits[0]; - let durable = bits[1]; - let exclusive = bits[2]; - let auto_delete = bits[3]; - let no_wait = bits[4]; - let (input, arguments) = domain_table(input).map_err(err("field arguments in method declare")).map_err(failure)?; - Ok((input, Class::Queue(Queue::Declare { - reserved_1, - queue, - passive, - durable, - exclusive, - auto_delete, - no_wait, - arguments, - }))) -} -fn queue_declare_ok(input: &[u8]) -> IResult { - let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method declare-ok")).map_err(failure)?; - if queue.is_empty() { fail!("string was null for field queue") } - let (input, message_count) = domain_message_count(input).map_err(err("field message-count in method declare-ok")).map_err(failure)?; - let (input, consumer_count) = domain_long(input).map_err(err("field consumer-count in method declare-ok")).map_err(failure)?; - Ok((input, Class::Queue(Queue::DeclareOk { - queue, - message_count, - consumer_count, - }))) -} -fn queue_bind(input: &[u8]) -> IResult { - let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method bind")).map_err(failure)?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method bind")).map_err(failure)?; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method bind")).map_err(failure)?; - let (input, routing_key) = domain_shortstr(input).map_err(err("field routing-key in method bind")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field no-wait in method bind")).map_err(failure)?; - let no_wait = bits[0]; - let (input, arguments) = domain_table(input).map_err(err("field arguments in method bind")).map_err(failure)?; - Ok((input, Class::Queue(Queue::Bind { - reserved_1, - queue, - exchange, - routing_key, - no_wait, - arguments, - }))) -} -fn queue_bind_ok(input: &[u8]) -> IResult { - let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Queue(Queue::BindOk { - }))) -} -fn queue_unbind(input: &[u8]) -> IResult { - let (input, _) = tag(50_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method unbind")).map_err(failure)?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method unbind")).map_err(failure)?; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method unbind")).map_err(failure)?; - let (input, routing_key) = domain_shortstr(input).map_err(err("field routing-key in method unbind")).map_err(failure)?; - let (input, arguments) = domain_table(input).map_err(err("field arguments in method unbind")).map_err(failure)?; - Ok((input, Class::Queue(Queue::Unbind { - reserved_1, - queue, - exchange, - routing_key, - arguments, - }))) -} -fn queue_unbind_ok(input: &[u8]) -> IResult { - let (input, _) = tag(51_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Queue(Queue::UnbindOk { - }))) -} -fn queue_purge(input: &[u8]) -> IResult { - let (input, _) = tag(30_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method purge")).map_err(failure)?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method purge")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field no-wait in method purge")).map_err(failure)?; - let no_wait = bits[0]; - Ok((input, Class::Queue(Queue::Purge { - reserved_1, - queue, - no_wait, - }))) -} -fn queue_purge_ok(input: &[u8]) -> IResult { - let (input, _) = tag(31_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, message_count) = domain_message_count(input).map_err(err("field message-count in method purge-ok")).map_err(failure)?; - Ok((input, Class::Queue(Queue::PurgeOk { - message_count, - }))) -} -fn queue_delete(input: &[u8]) -> IResult { - let (input, _) = tag(40_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method delete")).map_err(failure)?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method delete")).map_err(failure)?; - let (input, bits) = bit(input, 3).map_err(err("field if-unused in method delete")).map_err(failure)?; - let if_unused = bits[0]; - let if_empty = bits[1]; - let no_wait = bits[2]; - Ok((input, Class::Queue(Queue::Delete { - reserved_1, - queue, - if_unused, - if_empty, - no_wait, - }))) -} -fn queue_delete_ok(input: &[u8]) -> IResult { - let (input, _) = tag(41_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, message_count) = domain_message_count(input).map_err(err("field message-count in method delete-ok")).map_err(failure)?; - Ok((input, Class::Queue(Queue::DeleteOk { - message_count, - }))) -} -fn basic(input: &[u8]) -> IResult { - let (input, _) = tag(60_u16.to_be_bytes())(input).map_err(err("invalid tag for class basic"))?; - alt((basic_qos, basic_qos_ok, basic_consume, basic_consume_ok, basic_cancel, basic_cancel_ok, basic_publish, basic_return, basic_deliver, basic_get, basic_get_ok, basic_get_empty, basic_ack, basic_reject, basic_recover_async, basic_recover, basic_recover_ok))(input).map_err(err("class basic")).map_err(failure) -} -fn basic_qos(input: &[u8]) -> IResult { - let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, prefetch_size) = domain_long(input).map_err(err("field prefetch-size in method qos")).map_err(failure)?; - let (input, prefetch_count) = domain_short(input).map_err(err("field prefetch-count in method qos")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field global in method qos")).map_err(failure)?; - let global = bits[0]; - Ok((input, Class::Basic(Basic::Qos { - prefetch_size, - prefetch_count, - global, - }))) -} -fn basic_qos_ok(input: &[u8]) -> IResult { - let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Basic(Basic::QosOk { - }))) -} -fn basic_consume(input: &[u8]) -> IResult { - let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method consume")).map_err(failure)?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method consume")).map_err(failure)?; - let (input, consumer_tag) = domain_consumer_tag(input).map_err(err("field consumer-tag in method consume")).map_err(failure)?; - let (input, bits) = bit(input, 4).map_err(err("field no-local in method consume")).map_err(failure)?; - let no_local = bits[0]; - let no_ack = bits[1]; - let exclusive = bits[2]; - let no_wait = bits[3]; - let (input, arguments) = domain_table(input).map_err(err("field arguments in method consume")).map_err(failure)?; - Ok((input, Class::Basic(Basic::Consume { - reserved_1, - queue, - consumer_tag, - no_local, - no_ack, - exclusive, - no_wait, - arguments, - }))) -} -fn basic_consume_ok(input: &[u8]) -> IResult { - let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, consumer_tag) = domain_consumer_tag(input).map_err(err("field consumer-tag in method consume-ok")).map_err(failure)?; - Ok((input, Class::Basic(Basic::ConsumeOk { - consumer_tag, - }))) -} -fn basic_cancel(input: &[u8]) -> IResult { - let (input, _) = tag(30_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, consumer_tag) = domain_consumer_tag(input).map_err(err("field consumer-tag in method cancel")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field no-wait in method cancel")).map_err(failure)?; - let no_wait = bits[0]; - Ok((input, Class::Basic(Basic::Cancel { - consumer_tag, - no_wait, - }))) -} -fn basic_cancel_ok(input: &[u8]) -> IResult { - let (input, _) = tag(31_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, consumer_tag) = domain_consumer_tag(input).map_err(err("field consumer-tag in method cancel-ok")).map_err(failure)?; - Ok((input, Class::Basic(Basic::CancelOk { - consumer_tag, - }))) -} -fn basic_publish(input: &[u8]) -> IResult { - let (input, _) = tag(40_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method publish")).map_err(failure)?; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method publish")).map_err(failure)?; - let (input, routing_key) = domain_shortstr(input).map_err(err("field routing-key in method publish")).map_err(failure)?; - let (input, bits) = bit(input, 2).map_err(err("field mandatory in method publish")).map_err(failure)?; - let mandatory = bits[0]; - let immediate = bits[1]; - Ok((input, Class::Basic(Basic::Publish { - reserved_1, - exchange, - routing_key, - mandatory, - immediate, - }))) -} -fn basic_return(input: &[u8]) -> IResult { - let (input, _) = tag(50_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reply_code) = domain_reply_code(input).map_err(err("field reply-code in method return")).map_err(failure)?; - let (input, reply_text) = domain_reply_text(input).map_err(err("field reply-text in method return")).map_err(failure)?; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method return")).map_err(failure)?; - let (input, routing_key) = domain_shortstr(input).map_err(err("field routing-key in method return")).map_err(failure)?; - Ok((input, Class::Basic(Basic::Return { - reply_code, - reply_text, - exchange, - routing_key, - }))) -} -fn basic_deliver(input: &[u8]) -> IResult { - let (input, _) = tag(60_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, consumer_tag) = domain_consumer_tag(input).map_err(err("field consumer-tag in method deliver")).map_err(failure)?; - let (input, delivery_tag) = domain_delivery_tag(input).map_err(err("field delivery-tag in method deliver")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field redelivered in method deliver")).map_err(failure)?; - let redelivered = bits[0]; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method deliver")).map_err(failure)?; - let (input, routing_key) = domain_shortstr(input).map_err(err("field routing-key in method deliver")).map_err(failure)?; - Ok((input, Class::Basic(Basic::Deliver { - consumer_tag, - delivery_tag, - redelivered, - exchange, - routing_key, - }))) -} -fn basic_get(input: &[u8]) -> IResult { - let (input, _) = tag(70_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method get")).map_err(failure)?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method get")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field no-ack in method get")).map_err(failure)?; - let no_ack = bits[0]; - Ok((input, Class::Basic(Basic::Get { - reserved_1, - queue, - no_ack, - }))) -} -fn basic_get_ok(input: &[u8]) -> IResult { - let (input, _) = tag(71_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, delivery_tag) = domain_delivery_tag(input).map_err(err("field delivery-tag in method get-ok")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field redelivered in method get-ok")).map_err(failure)?; - let redelivered = bits[0]; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method get-ok")).map_err(failure)?; - let (input, routing_key) = domain_shortstr(input).map_err(err("field routing-key in method get-ok")).map_err(failure)?; - let (input, message_count) = domain_message_count(input).map_err(err("field message-count in method get-ok")).map_err(failure)?; - Ok((input, Class::Basic(Basic::GetOk { - delivery_tag, - redelivered, - exchange, - routing_key, - message_count, - }))) -} -fn basic_get_empty(input: &[u8]) -> IResult { - let (input, _) = tag(72_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_shortstr(input).map_err(err("field reserved-1 in method get-empty")).map_err(failure)?; - Ok((input, Class::Basic(Basic::GetEmpty { - reserved_1, - }))) -} -fn basic_ack(input: &[u8]) -> IResult { - let (input, _) = tag(80_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, delivery_tag) = domain_delivery_tag(input).map_err(err("field delivery-tag in method ack")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field multiple in method ack")).map_err(failure)?; - let multiple = bits[0]; - Ok((input, Class::Basic(Basic::Ack { - delivery_tag, - multiple, - }))) -} -fn basic_reject(input: &[u8]) -> IResult { - let (input, _) = tag(90_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, delivery_tag) = domain_delivery_tag(input).map_err(err("field delivery-tag in method reject")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field requeue in method reject")).map_err(failure)?; - let requeue = bits[0]; - Ok((input, Class::Basic(Basic::Reject { - delivery_tag, - requeue, - }))) -} -fn basic_recover_async(input: &[u8]) -> IResult { - let (input, _) = tag(100_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, bits) = bit(input, 1).map_err(err("field requeue in method recover-async")).map_err(failure)?; - let requeue = bits[0]; - Ok((input, Class::Basic(Basic::RecoverAsync { - requeue, - }))) -} -fn basic_recover(input: &[u8]) -> IResult { - let (input, _) = tag(110_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, bits) = bit(input, 1).map_err(err("field requeue in method recover")).map_err(failure)?; - let requeue = bits[0]; - Ok((input, Class::Basic(Basic::Recover { - requeue, - }))) -} -fn basic_recover_ok(input: &[u8]) -> IResult { - let (input, _) = tag(111_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Basic(Basic::RecoverOk { - }))) -} -fn tx(input: &[u8]) -> IResult { - let (input, _) = tag(90_u16.to_be_bytes())(input).map_err(err("invalid tag for class tx"))?; - alt((tx_select, tx_select_ok, tx_commit, tx_commit_ok, tx_rollback, tx_rollback_ok))(input).map_err(err("class tx")).map_err(failure) -} -fn tx_select(input: &[u8]) -> IResult { - let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Tx(Tx::Select { - }))) -} -fn tx_select_ok(input: &[u8]) -> IResult { - let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Tx(Tx::SelectOk { - }))) -} -fn tx_commit(input: &[u8]) -> IResult { - let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Tx(Tx::Commit { - }))) -} -fn tx_commit_ok(input: &[u8]) -> IResult { - let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Tx(Tx::CommitOk { - }))) -} -fn tx_rollback(input: &[u8]) -> IResult { - let (input, _) = tag(30_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Tx(Tx::Rollback { - }))) -} -fn tx_rollback_ok(input: &[u8]) -> IResult { - let (input, _) = tag(31_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Tx(Tx::RollbackOk { - }))) -} + pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; + pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err> { + alt((connection, channel, exchange, queue, basic, tx))(input) + } + fn domain_class_id(input: &[u8]) -> IResult { + short(input) + } + fn domain_consumer_tag(input: &[u8]) -> IResult { + shortstr(input) + } + fn domain_delivery_tag(input: &[u8]) -> IResult { + longlong(input) + } + fn domain_exchange_name(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.len() > 127 { + fail!("value is shorter than 127 for field result") + } + static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); + if !REGEX.is_match(&result) { + fail!(r"regex `^[a-zA-Z0-9-_.:]*$` did not match value for field result") + } + Ok((input, result)) + } + fn domain_method_id(input: &[u8]) -> IResult { + short(input) + } + fn domain_path(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.is_empty() { + fail!("string was null for field result") + } + if result.len() > 127 { + fail!("value is shorter than 127 for field result") + } + Ok((input, result)) + } + fn domain_peer_properties(input: &[u8]) -> IResult { + table(input) + } + fn domain_queue_name(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.len() > 127 { + fail!("value is shorter than 127 for field result") + } + static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); + if !REGEX.is_match(&result) { + fail!(r"regex `^[a-zA-Z0-9-_.:]*$` did not match value for field result") + } + Ok((input, result)) + } + fn domain_message_count(input: &[u8]) -> IResult { + long(input) + } + fn domain_reply_code(input: &[u8]) -> IResult { + let (input, result) = short(input)?; + if result == 0 { + fail!("number was 0 for field result") + } + Ok((input, result)) + } + fn domain_reply_text(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.is_empty() { + fail!("string was null for field result") + } + Ok((input, result)) + } + fn domain_octet(input: &[u8]) -> IResult { + octet(input) + } + fn domain_short(input: &[u8]) -> IResult { + short(input) + } + fn domain_long(input: &[u8]) -> IResult { + long(input) + } + fn domain_longlong(input: &[u8]) -> IResult { + longlong(input) + } + fn domain_shortstr(input: &[u8]) -> IResult { + shortstr(input) + } + fn domain_longstr(input: &[u8]) -> IResult { + longstr(input) + } + fn domain_timestamp(input: &[u8]) -> IResult { + timestamp(input) + } + fn domain_table(input: &[u8]) -> IResult
{ + table(input) + } + fn connection(input: &[u8]) -> IResult { + let (input, _) = + tag(10_u16.to_be_bytes())(input).map_err(err("invalid tag for class connection"))?; + alt(( + connection_start, + connection_start_ok, + connection_secure, + connection_secure_ok, + connection_tune, + connection_tune_ok, + connection_open, + connection_open_ok, + connection_close, + connection_close_ok, + ))(input) + .map_err(err("class connection")) + .map_err(failure) + } + fn connection_start(input: &[u8]) -> IResult { + let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, version_major) = domain_octet(input) + .map_err(err("field version-major in method start")) + .map_err(failure)?; + let (input, version_minor) = domain_octet(input) + .map_err(err("field version-minor in method start")) + .map_err(failure)?; + let (input, server_properties) = domain_peer_properties(input) + .map_err(err("field server-properties in method start")) + .map_err(failure)?; + let (input, mechanisms) = domain_longstr(input) + .map_err(err("field mechanisms in method start")) + .map_err(failure)?; + if mechanisms.is_empty() { + fail!("string was null for field mechanisms") + } + let (input, locales) = domain_longstr(input) + .map_err(err("field locales in method start")) + .map_err(failure)?; + if locales.is_empty() { + fail!("string was null for field locales") + } + Ok(( + input, + Class::Connection(Connection::Start { + version_major, + version_minor, + server_properties, + mechanisms, + locales, + }), + )) + } + fn connection_start_ok(input: &[u8]) -> IResult { + let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, client_properties) = domain_peer_properties(input) + .map_err(err("field client-properties in method start-ok")) + .map_err(failure)?; + let (input, mechanism) = domain_shortstr(input) + .map_err(err("field mechanism in method start-ok")) + .map_err(failure)?; + if mechanism.is_empty() { + fail!("string was null for field mechanism") + } + let (input, response) = domain_longstr(input) + .map_err(err("field response in method start-ok")) + .map_err(failure)?; + if response.is_empty() { + fail!("string was null for field response") + } + let (input, locale) = domain_shortstr(input) + .map_err(err("field locale in method start-ok")) + .map_err(failure)?; + if locale.is_empty() { + fail!("string was null for field locale") + } + Ok(( + input, + Class::Connection(Connection::StartOk { + client_properties, + mechanism, + response, + locale, + }), + )) + } + fn connection_secure(input: &[u8]) -> IResult { + let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, challenge) = domain_longstr(input) + .map_err(err("field challenge in method secure")) + .map_err(failure)?; + Ok((input, Class::Connection(Connection::Secure { challenge }))) + } + fn connection_secure_ok(input: &[u8]) -> IResult { + let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, response) = domain_longstr(input) + .map_err(err("field response in method secure-ok")) + .map_err(failure)?; + if response.is_empty() { + fail!("string was null for field response") + } + Ok((input, Class::Connection(Connection::SecureOk { response }))) + } + fn connection_tune(input: &[u8]) -> IResult { + let (input, _) = tag(30_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, channel_max) = domain_short(input) + .map_err(err("field channel-max in method tune")) + .map_err(failure)?; + let (input, frame_max) = domain_long(input) + .map_err(err("field frame-max in method tune")) + .map_err(failure)?; + let (input, heartbeat) = domain_short(input) + .map_err(err("field heartbeat in method tune")) + .map_err(failure)?; + Ok(( + input, + Class::Connection(Connection::Tune { + channel_max, + frame_max, + heartbeat, + }), + )) + } + fn connection_tune_ok(input: &[u8]) -> IResult { + let (input, _) = tag(31_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, channel_max) = domain_short(input) + .map_err(err("field channel-max in method tune-ok")) + .map_err(failure)?; + if channel_max == 0 { + fail!("number was 0 for field channel_max") + } + let (input, frame_max) = domain_long(input) + .map_err(err("field frame-max in method tune-ok")) + .map_err(failure)?; + let (input, heartbeat) = domain_short(input) + .map_err(err("field heartbeat in method tune-ok")) + .map_err(failure)?; + Ok(( + input, + Class::Connection(Connection::TuneOk { + channel_max, + frame_max, + heartbeat, + }), + )) + } + fn connection_open(input: &[u8]) -> IResult { + let (input, _) = tag(40_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, virtual_host) = domain_path(input) + .map_err(err("field virtual-host in method open")) + .map_err(failure)?; + let (input, reserved_1) = domain_shortstr(input) + .map_err(err("field reserved-1 in method open")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(err("field reserved-2 in method open")) + .map_err(failure)?; + let reserved_2 = bits[0]; + Ok(( + input, + Class::Connection(Connection::Open { + virtual_host, + reserved_1, + reserved_2, + }), + )) + } + fn connection_open_ok(input: &[u8]) -> IResult { + let (input, _) = tag(41_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reserved_1) = domain_shortstr(input) + .map_err(err("field reserved-1 in method open-ok")) + .map_err(failure)?; + Ok((input, Class::Connection(Connection::OpenOk { reserved_1 }))) + } + fn connection_close(input: &[u8]) -> IResult { + let (input, _) = tag(50_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reply_code) = domain_reply_code(input) + .map_err(err("field reply-code in method close")) + .map_err(failure)?; + let (input, reply_text) = domain_reply_text(input) + .map_err(err("field reply-text in method close")) + .map_err(failure)?; + let (input, class_id) = domain_class_id(input) + .map_err(err("field class-id in method close")) + .map_err(failure)?; + let (input, method_id) = domain_method_id(input) + .map_err(err("field method-id in method close")) + .map_err(failure)?; + Ok(( + input, + Class::Connection(Connection::Close { + reply_code, + reply_text, + class_id, + method_id, + }), + )) + } + fn connection_close_ok(input: &[u8]) -> IResult { + let (input, _) = tag(51_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + Ok((input, Class::Connection(Connection::CloseOk {}))) + } + fn channel(input: &[u8]) -> IResult { + let (input, _) = + tag(20_u16.to_be_bytes())(input).map_err(err("invalid tag for class channel"))?; + alt(( + channel_open, + channel_open_ok, + channel_flow, + channel_flow_ok, + channel_close, + channel_close_ok, + ))(input) + .map_err(err("class channel")) + .map_err(failure) + } + fn channel_open(input: &[u8]) -> IResult { + let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reserved_1) = domain_shortstr(input) + .map_err(err("field reserved-1 in method open")) + .map_err(failure)?; + Ok((input, Class::Channel(Channel::Open { reserved_1 }))) + } + fn channel_open_ok(input: &[u8]) -> IResult { + let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reserved_1) = domain_longstr(input) + .map_err(err("field reserved-1 in method open-ok")) + .map_err(failure)?; + Ok((input, Class::Channel(Channel::OpenOk { reserved_1 }))) + } + fn channel_flow(input: &[u8]) -> IResult { + let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, bits) = bit(input, 1) + .map_err(err("field active in method flow")) + .map_err(failure)?; + let active = bits[0]; + Ok((input, Class::Channel(Channel::Flow { active }))) + } + fn channel_flow_ok(input: &[u8]) -> IResult { + let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, bits) = bit(input, 1) + .map_err(err("field active in method flow-ok")) + .map_err(failure)?; + let active = bits[0]; + Ok((input, Class::Channel(Channel::FlowOk { active }))) + } + fn channel_close(input: &[u8]) -> IResult { + let (input, _) = tag(40_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reply_code) = domain_reply_code(input) + .map_err(err("field reply-code in method close")) + .map_err(failure)?; + let (input, reply_text) = domain_reply_text(input) + .map_err(err("field reply-text in method close")) + .map_err(failure)?; + let (input, class_id) = domain_class_id(input) + .map_err(err("field class-id in method close")) + .map_err(failure)?; + let (input, method_id) = domain_method_id(input) + .map_err(err("field method-id in method close")) + .map_err(failure)?; + Ok(( + input, + Class::Channel(Channel::Close { + reply_code, + reply_text, + class_id, + method_id, + }), + )) + } + fn channel_close_ok(input: &[u8]) -> IResult { + let (input, _) = tag(41_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + Ok((input, Class::Channel(Channel::CloseOk {}))) + } + fn exchange(input: &[u8]) -> IResult { + let (input, _) = + tag(40_u16.to_be_bytes())(input).map_err(err("invalid tag for class exchange"))?; + alt(( + exchange_declare, + exchange_declare_ok, + exchange_delete, + exchange_delete_ok, + ))(input) + .map_err(err("class exchange")) + .map_err(failure) + } + fn exchange_declare(input: &[u8]) -> IResult { + let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(err("field reserved-1 in method declare")) + .map_err(failure)?; + let (input, exchange) = domain_exchange_name(input) + .map_err(err("field exchange in method declare")) + .map_err(failure)?; + if exchange.is_empty() { + fail!("string was null for field exchange") + } + let (input, r#type) = domain_shortstr(input) + .map_err(err("field type in method declare")) + .map_err(failure)?; + let (input, bits) = bit(input, 5) + .map_err(err("field passive in method declare")) + .map_err(failure)?; + let passive = bits[0]; + let durable = bits[1]; + let reserved_2 = bits[2]; + let reserved_3 = bits[3]; + let no_wait = bits[4]; + let (input, arguments) = domain_table(input) + .map_err(err("field arguments in method declare")) + .map_err(failure)?; + Ok(( + input, + Class::Exchange(Exchange::Declare { + reserved_1, + exchange, + r#type, + passive, + durable, + reserved_2, + reserved_3, + no_wait, + arguments, + }), + )) + } + fn exchange_declare_ok(input: &[u8]) -> IResult { + let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + Ok((input, Class::Exchange(Exchange::DeclareOk {}))) + } + fn exchange_delete(input: &[u8]) -> IResult { + let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(err("field reserved-1 in method delete")) + .map_err(failure)?; + let (input, exchange) = domain_exchange_name(input) + .map_err(err("field exchange in method delete")) + .map_err(failure)?; + if exchange.is_empty() { + fail!("string was null for field exchange") + } + let (input, bits) = bit(input, 2) + .map_err(err("field if-unused in method delete")) + .map_err(failure)?; + let if_unused = bits[0]; + let no_wait = bits[1]; + Ok(( + input, + Class::Exchange(Exchange::Delete { + reserved_1, + exchange, + if_unused, + no_wait, + }), + )) + } + fn exchange_delete_ok(input: &[u8]) -> IResult { + let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + Ok((input, Class::Exchange(Exchange::DeleteOk {}))) + } + fn queue(input: &[u8]) -> IResult { + let (input, _) = + tag(50_u16.to_be_bytes())(input).map_err(err("invalid tag for class queue"))?; + alt(( + queue_declare, + queue_declare_ok, + queue_bind, + queue_bind_ok, + queue_unbind, + queue_unbind_ok, + queue_purge, + queue_purge_ok, + queue_delete, + queue_delete_ok, + ))(input) + .map_err(err("class queue")) + .map_err(failure) + } + fn queue_declare(input: &[u8]) -> IResult { + let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(err("field reserved-1 in method declare")) + .map_err(failure)?; + let (input, queue) = domain_queue_name(input) + .map_err(err("field queue in method declare")) + .map_err(failure)?; + let (input, bits) = bit(input, 5) + .map_err(err("field passive in method declare")) + .map_err(failure)?; + let passive = bits[0]; + let durable = bits[1]; + let exclusive = bits[2]; + let auto_delete = bits[3]; + let no_wait = bits[4]; + let (input, arguments) = domain_table(input) + .map_err(err("field arguments in method declare")) + .map_err(failure)?; + Ok(( + input, + Class::Queue(Queue::Declare { + reserved_1, + queue, + passive, + durable, + exclusive, + auto_delete, + no_wait, + arguments, + }), + )) + } + fn queue_declare_ok(input: &[u8]) -> IResult { + let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, queue) = domain_queue_name(input) + .map_err(err("field queue in method declare-ok")) + .map_err(failure)?; + if queue.is_empty() { + fail!("string was null for field queue") + } + let (input, message_count) = domain_message_count(input) + .map_err(err("field message-count in method declare-ok")) + .map_err(failure)?; + let (input, consumer_count) = domain_long(input) + .map_err(err("field consumer-count in method declare-ok")) + .map_err(failure)?; + Ok(( + input, + Class::Queue(Queue::DeclareOk { + queue, + message_count, + consumer_count, + }), + )) + } + fn queue_bind(input: &[u8]) -> IResult { + let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(err("field reserved-1 in method bind")) + .map_err(failure)?; + let (input, queue) = domain_queue_name(input) + .map_err(err("field queue in method bind")) + .map_err(failure)?; + let (input, exchange) = domain_exchange_name(input) + .map_err(err("field exchange in method bind")) + .map_err(failure)?; + let (input, routing_key) = domain_shortstr(input) + .map_err(err("field routing-key in method bind")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(err("field no-wait in method bind")) + .map_err(failure)?; + let no_wait = bits[0]; + let (input, arguments) = domain_table(input) + .map_err(err("field arguments in method bind")) + .map_err(failure)?; + Ok(( + input, + Class::Queue(Queue::Bind { + reserved_1, + queue, + exchange, + routing_key, + no_wait, + arguments, + }), + )) + } + fn queue_bind_ok(input: &[u8]) -> IResult { + let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + Ok((input, Class::Queue(Queue::BindOk {}))) + } + fn queue_unbind(input: &[u8]) -> IResult { + let (input, _) = tag(50_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(err("field reserved-1 in method unbind")) + .map_err(failure)?; + let (input, queue) = domain_queue_name(input) + .map_err(err("field queue in method unbind")) + .map_err(failure)?; + let (input, exchange) = domain_exchange_name(input) + .map_err(err("field exchange in method unbind")) + .map_err(failure)?; + let (input, routing_key) = domain_shortstr(input) + .map_err(err("field routing-key in method unbind")) + .map_err(failure)?; + let (input, arguments) = domain_table(input) + .map_err(err("field arguments in method unbind")) + .map_err(failure)?; + Ok(( + input, + Class::Queue(Queue::Unbind { + reserved_1, + queue, + exchange, + routing_key, + arguments, + }), + )) + } + fn queue_unbind_ok(input: &[u8]) -> IResult { + let (input, _) = tag(51_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + Ok((input, Class::Queue(Queue::UnbindOk {}))) + } + fn queue_purge(input: &[u8]) -> IResult { + let (input, _) = tag(30_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(err("field reserved-1 in method purge")) + .map_err(failure)?; + let (input, queue) = domain_queue_name(input) + .map_err(err("field queue in method purge")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(err("field no-wait in method purge")) + .map_err(failure)?; + let no_wait = bits[0]; + Ok(( + input, + Class::Queue(Queue::Purge { + reserved_1, + queue, + no_wait, + }), + )) + } + fn queue_purge_ok(input: &[u8]) -> IResult { + let (input, _) = tag(31_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, message_count) = domain_message_count(input) + .map_err(err("field message-count in method purge-ok")) + .map_err(failure)?; + Ok((input, Class::Queue(Queue::PurgeOk { message_count }))) + } + fn queue_delete(input: &[u8]) -> IResult { + let (input, _) = tag(40_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(err("field reserved-1 in method delete")) + .map_err(failure)?; + let (input, queue) = domain_queue_name(input) + .map_err(err("field queue in method delete")) + .map_err(failure)?; + let (input, bits) = bit(input, 3) + .map_err(err("field if-unused in method delete")) + .map_err(failure)?; + let if_unused = bits[0]; + let if_empty = bits[1]; + let no_wait = bits[2]; + Ok(( + input, + Class::Queue(Queue::Delete { + reserved_1, + queue, + if_unused, + if_empty, + no_wait, + }), + )) + } + fn queue_delete_ok(input: &[u8]) -> IResult { + let (input, _) = tag(41_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, message_count) = domain_message_count(input) + .map_err(err("field message-count in method delete-ok")) + .map_err(failure)?; + Ok((input, Class::Queue(Queue::DeleteOk { message_count }))) + } + fn basic(input: &[u8]) -> IResult { + let (input, _) = + tag(60_u16.to_be_bytes())(input).map_err(err("invalid tag for class basic"))?; + alt(( + basic_qos, + basic_qos_ok, + basic_consume, + basic_consume_ok, + basic_cancel, + basic_cancel_ok, + basic_publish, + basic_return, + basic_deliver, + basic_get, + basic_get_ok, + basic_get_empty, + basic_ack, + basic_reject, + basic_recover_async, + basic_recover, + basic_recover_ok, + ))(input) + .map_err(err("class basic")) + .map_err(failure) + } + fn basic_qos(input: &[u8]) -> IResult { + let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, prefetch_size) = domain_long(input) + .map_err(err("field prefetch-size in method qos")) + .map_err(failure)?; + let (input, prefetch_count) = domain_short(input) + .map_err(err("field prefetch-count in method qos")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(err("field global in method qos")) + .map_err(failure)?; + let global = bits[0]; + Ok(( + input, + Class::Basic(Basic::Qos { + prefetch_size, + prefetch_count, + global, + }), + )) + } + fn basic_qos_ok(input: &[u8]) -> IResult { + let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + Ok((input, Class::Basic(Basic::QosOk {}))) + } + fn basic_consume(input: &[u8]) -> IResult { + let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(err("field reserved-1 in method consume")) + .map_err(failure)?; + let (input, queue) = domain_queue_name(input) + .map_err(err("field queue in method consume")) + .map_err(failure)?; + let (input, consumer_tag) = domain_consumer_tag(input) + .map_err(err("field consumer-tag in method consume")) + .map_err(failure)?; + let (input, bits) = bit(input, 4) + .map_err(err("field no-local in method consume")) + .map_err(failure)?; + let no_local = bits[0]; + let no_ack = bits[1]; + let exclusive = bits[2]; + let no_wait = bits[3]; + let (input, arguments) = domain_table(input) + .map_err(err("field arguments in method consume")) + .map_err(failure)?; + Ok(( + input, + Class::Basic(Basic::Consume { + reserved_1, + queue, + consumer_tag, + no_local, + no_ack, + exclusive, + no_wait, + arguments, + }), + )) + } + fn basic_consume_ok(input: &[u8]) -> IResult { + let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, consumer_tag) = domain_consumer_tag(input) + .map_err(err("field consumer-tag in method consume-ok")) + .map_err(failure)?; + Ok((input, Class::Basic(Basic::ConsumeOk { consumer_tag }))) + } + fn basic_cancel(input: &[u8]) -> IResult { + let (input, _) = tag(30_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, consumer_tag) = domain_consumer_tag(input) + .map_err(err("field consumer-tag in method cancel")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(err("field no-wait in method cancel")) + .map_err(failure)?; + let no_wait = bits[0]; + Ok(( + input, + Class::Basic(Basic::Cancel { + consumer_tag, + no_wait, + }), + )) + } + fn basic_cancel_ok(input: &[u8]) -> IResult { + let (input, _) = tag(31_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, consumer_tag) = domain_consumer_tag(input) + .map_err(err("field consumer-tag in method cancel-ok")) + .map_err(failure)?; + Ok((input, Class::Basic(Basic::CancelOk { consumer_tag }))) + } + fn basic_publish(input: &[u8]) -> IResult { + let (input, _) = tag(40_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(err("field reserved-1 in method publish")) + .map_err(failure)?; + let (input, exchange) = domain_exchange_name(input) + .map_err(err("field exchange in method publish")) + .map_err(failure)?; + let (input, routing_key) = domain_shortstr(input) + .map_err(err("field routing-key in method publish")) + .map_err(failure)?; + let (input, bits) = bit(input, 2) + .map_err(err("field mandatory in method publish")) + .map_err(failure)?; + let mandatory = bits[0]; + let immediate = bits[1]; + Ok(( + input, + Class::Basic(Basic::Publish { + reserved_1, + exchange, + routing_key, + mandatory, + immediate, + }), + )) + } + fn basic_return(input: &[u8]) -> IResult { + let (input, _) = tag(50_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reply_code) = domain_reply_code(input) + .map_err(err("field reply-code in method return")) + .map_err(failure)?; + let (input, reply_text) = domain_reply_text(input) + .map_err(err("field reply-text in method return")) + .map_err(failure)?; + let (input, exchange) = domain_exchange_name(input) + .map_err(err("field exchange in method return")) + .map_err(failure)?; + let (input, routing_key) = domain_shortstr(input) + .map_err(err("field routing-key in method return")) + .map_err(failure)?; + Ok(( + input, + Class::Basic(Basic::Return { + reply_code, + reply_text, + exchange, + routing_key, + }), + )) + } + fn basic_deliver(input: &[u8]) -> IResult { + let (input, _) = tag(60_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, consumer_tag) = domain_consumer_tag(input) + .map_err(err("field consumer-tag in method deliver")) + .map_err(failure)?; + let (input, delivery_tag) = domain_delivery_tag(input) + .map_err(err("field delivery-tag in method deliver")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(err("field redelivered in method deliver")) + .map_err(failure)?; + let redelivered = bits[0]; + let (input, exchange) = domain_exchange_name(input) + .map_err(err("field exchange in method deliver")) + .map_err(failure)?; + let (input, routing_key) = domain_shortstr(input) + .map_err(err("field routing-key in method deliver")) + .map_err(failure)?; + Ok(( + input, + Class::Basic(Basic::Deliver { + consumer_tag, + delivery_tag, + redelivered, + exchange, + routing_key, + }), + )) + } + fn basic_get(input: &[u8]) -> IResult { + let (input, _) = tag(70_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(err("field reserved-1 in method get")) + .map_err(failure)?; + let (input, queue) = domain_queue_name(input) + .map_err(err("field queue in method get")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(err("field no-ack in method get")) + .map_err(failure)?; + let no_ack = bits[0]; + Ok(( + input, + Class::Basic(Basic::Get { + reserved_1, + queue, + no_ack, + }), + )) + } + fn basic_get_ok(input: &[u8]) -> IResult { + let (input, _) = tag(71_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, delivery_tag) = domain_delivery_tag(input) + .map_err(err("field delivery-tag in method get-ok")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(err("field redelivered in method get-ok")) + .map_err(failure)?; + let redelivered = bits[0]; + let (input, exchange) = domain_exchange_name(input) + .map_err(err("field exchange in method get-ok")) + .map_err(failure)?; + let (input, routing_key) = domain_shortstr(input) + .map_err(err("field routing-key in method get-ok")) + .map_err(failure)?; + let (input, message_count) = domain_message_count(input) + .map_err(err("field message-count in method get-ok")) + .map_err(failure)?; + Ok(( + input, + Class::Basic(Basic::GetOk { + delivery_tag, + redelivered, + exchange, + routing_key, + message_count, + }), + )) + } + fn basic_get_empty(input: &[u8]) -> IResult { + let (input, _) = tag(72_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, reserved_1) = domain_shortstr(input) + .map_err(err("field reserved-1 in method get-empty")) + .map_err(failure)?; + Ok((input, Class::Basic(Basic::GetEmpty { reserved_1 }))) + } + fn basic_ack(input: &[u8]) -> IResult { + let (input, _) = tag(80_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, delivery_tag) = domain_delivery_tag(input) + .map_err(err("field delivery-tag in method ack")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(err("field multiple in method ack")) + .map_err(failure)?; + let multiple = bits[0]; + Ok(( + input, + Class::Basic(Basic::Ack { + delivery_tag, + multiple, + }), + )) + } + fn basic_reject(input: &[u8]) -> IResult { + let (input, _) = tag(90_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, delivery_tag) = domain_delivery_tag(input) + .map_err(err("field delivery-tag in method reject")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(err("field requeue in method reject")) + .map_err(failure)?; + let requeue = bits[0]; + Ok(( + input, + Class::Basic(Basic::Reject { + delivery_tag, + requeue, + }), + )) + } + fn basic_recover_async(input: &[u8]) -> IResult { + let (input, _) = tag(100_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, bits) = bit(input, 1) + .map_err(err("field requeue in method recover-async")) + .map_err(failure)?; + let requeue = bits[0]; + Ok((input, Class::Basic(Basic::RecoverAsync { requeue }))) + } + fn basic_recover(input: &[u8]) -> IResult { + let (input, _) = tag(110_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + let (input, bits) = bit(input, 1) + .map_err(err("field requeue in method recover")) + .map_err(failure)?; + let requeue = bits[0]; + Ok((input, Class::Basic(Basic::Recover { requeue }))) + } + fn basic_recover_ok(input: &[u8]) -> IResult { + let (input, _) = tag(111_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + Ok((input, Class::Basic(Basic::RecoverOk {}))) + } + fn tx(input: &[u8]) -> IResult { + let (input, _) = + tag(90_u16.to_be_bytes())(input).map_err(err("invalid tag for class tx"))?; + alt(( + tx_select, + tx_select_ok, + tx_commit, + tx_commit_ok, + tx_rollback, + tx_rollback_ok, + ))(input) + .map_err(err("class tx")) + .map_err(failure) + } + fn tx_select(input: &[u8]) -> IResult { + let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + Ok((input, Class::Tx(Tx::Select {}))) + } + fn tx_select_ok(input: &[u8]) -> IResult { + let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + Ok((input, Class::Tx(Tx::SelectOk {}))) + } + fn tx_commit(input: &[u8]) -> IResult { + let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + Ok((input, Class::Tx(Tx::Commit {}))) + } + fn tx_commit_ok(input: &[u8]) -> IResult { + let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + Ok((input, Class::Tx(Tx::CommitOk {}))) + } + fn tx_rollback(input: &[u8]) -> IResult { + let (input, _) = tag(30_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + Ok((input, Class::Tx(Tx::Rollback {}))) + } + fn tx_rollback_ok(input: &[u8]) -> IResult { + let (input, _) = tag(31_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; + Ok((input, Class::Tx(Tx::RollbackOk {}))) + } } pub mod write { -use super::*; -use crate::classes::write_helper::*; -use crate::error::TransError; -use std::io::Write; + use super::*; + use crate::classes::write_helper::*; + use crate::error::TransError; + use std::io::Write; -pub fn write_method(class: Class, mut writer: W) -> Result<(), TransError> { - match class { - Class::Connection(Connection::Start { - version_major, - version_minor, - server_properties, - mechanisms, - locales, - }) => { - writer.write_all(&[0, 10, 0, 10])?; - octet(version_major, &mut writer)?; - octet(version_minor, &mut writer)?; - table(server_properties, &mut writer)?; - longstr(mechanisms, &mut writer)?; - longstr(locales, &mut writer)?; - } - Class::Connection(Connection::StartOk { - client_properties, - mechanism, - response, - locale, - }) => { - writer.write_all(&[0, 10, 0, 11])?; - table(client_properties, &mut writer)?; - shortstr(mechanism, &mut writer)?; - longstr(response, &mut writer)?; - shortstr(locale, &mut writer)?; - } - Class::Connection(Connection::Secure { - challenge, - }) => { - writer.write_all(&[0, 10, 0, 20])?; - longstr(challenge, &mut writer)?; - } - Class::Connection(Connection::SecureOk { - response, - }) => { - writer.write_all(&[0, 10, 0, 21])?; - longstr(response, &mut writer)?; - } - Class::Connection(Connection::Tune { - channel_max, - frame_max, - heartbeat, - }) => { - writer.write_all(&[0, 10, 0, 30])?; - short(channel_max, &mut writer)?; - long(frame_max, &mut writer)?; - short(heartbeat, &mut writer)?; - } - Class::Connection(Connection::TuneOk { - channel_max, - frame_max, - heartbeat, - }) => { - writer.write_all(&[0, 10, 0, 31])?; - short(channel_max, &mut writer)?; - long(frame_max, &mut writer)?; - short(heartbeat, &mut writer)?; - } - Class::Connection(Connection::Open { - virtual_host, - reserved_1, - reserved_2, - }) => { - writer.write_all(&[0, 10, 0, 40])?; - shortstr(virtual_host, &mut writer)?; - shortstr(reserved_1, &mut writer)?; - bit(&[reserved_2, ], &mut writer)?; - } - Class::Connection(Connection::OpenOk { - reserved_1, - }) => { - writer.write_all(&[0, 10, 0, 41])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Connection(Connection::Close { - reply_code, - reply_text, - class_id, - method_id, - }) => { - writer.write_all(&[0, 10, 0, 50])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - short(class_id, &mut writer)?; - short(method_id, &mut writer)?; - } - Class::Connection(Connection::CloseOk { - }) => { - writer.write_all(&[0, 10, 0, 51])?; - } - Class::Channel(Channel::Open { - reserved_1, - }) => { - writer.write_all(&[0, 20, 0, 10])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Channel(Channel::OpenOk { - reserved_1, - }) => { - writer.write_all(&[0, 20, 0, 11])?; - longstr(reserved_1, &mut writer)?; - } - Class::Channel(Channel::Flow { - active, - }) => { - writer.write_all(&[0, 20, 0, 20])?; - bit(&[active, ], &mut writer)?; - } - Class::Channel(Channel::FlowOk { - active, - }) => { - writer.write_all(&[0, 20, 0, 21])?; - bit(&[active, ], &mut writer)?; - } - Class::Channel(Channel::Close { - reply_code, - reply_text, - class_id, - method_id, - }) => { - writer.write_all(&[0, 20, 0, 40])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - short(class_id, &mut writer)?; - short(method_id, &mut writer)?; - } - Class::Channel(Channel::CloseOk { - }) => { - writer.write_all(&[0, 20, 0, 41])?; - } - Class::Exchange(Exchange::Declare { - reserved_1, - exchange, - r#type, - passive, - durable, - reserved_2, - reserved_3, - no_wait, - arguments, - }) => { - writer.write_all(&[0, 40, 0, 10])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(r#type, &mut writer)?; - bit(&[passive, durable, reserved_2, reserved_3, no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Exchange(Exchange::DeclareOk { - }) => { - writer.write_all(&[0, 40, 0, 11])?; - } - Class::Exchange(Exchange::Delete { - reserved_1, - exchange, - if_unused, - no_wait, - }) => { - writer.write_all(&[0, 40, 0, 20])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - bit(&[if_unused, no_wait, ], &mut writer)?; - } - Class::Exchange(Exchange::DeleteOk { - }) => { - writer.write_all(&[0, 40, 0, 21])?; - } - Class::Queue(Queue::Declare { - reserved_1, - queue, - passive, - durable, - exclusive, - auto_delete, - no_wait, - arguments, - }) => { - writer.write_all(&[0, 50, 0, 10])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[passive, durable, exclusive, auto_delete, no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Queue(Queue::DeclareOk { - queue, - message_count, - consumer_count, - }) => { - writer.write_all(&[0, 50, 0, 11])?; - shortstr(queue, &mut writer)?; - long(message_count, &mut writer)?; - long(consumer_count, &mut writer)?; - } - Class::Queue(Queue::Bind { - reserved_1, - queue, - exchange, - routing_key, - no_wait, - arguments, - }) => { - writer.write_all(&[0, 50, 0, 20])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - bit(&[no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Queue(Queue::BindOk { - }) => { - writer.write_all(&[0, 50, 0, 21])?; - } - Class::Queue(Queue::Unbind { - reserved_1, - queue, - exchange, - routing_key, - arguments, - }) => { - writer.write_all(&[0, 50, 0, 50])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Queue(Queue::UnbindOk { - }) => { - writer.write_all(&[0, 50, 0, 51])?; - } - Class::Queue(Queue::Purge { - reserved_1, - queue, - no_wait, - }) => { - writer.write_all(&[0, 50, 0, 30])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[no_wait, ], &mut writer)?; - } - Class::Queue(Queue::PurgeOk { - message_count, - }) => { - writer.write_all(&[0, 50, 0, 31])?; - long(message_count, &mut writer)?; - } - Class::Queue(Queue::Delete { - reserved_1, - queue, - if_unused, - if_empty, - no_wait, - }) => { - writer.write_all(&[0, 50, 0, 40])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[if_unused, if_empty, no_wait, ], &mut writer)?; - } - Class::Queue(Queue::DeleteOk { - message_count, - }) => { - writer.write_all(&[0, 50, 0, 41])?; - long(message_count, &mut writer)?; - } - Class::Basic(Basic::Qos { - prefetch_size, - prefetch_count, - global, - }) => { - writer.write_all(&[0, 60, 0, 10])?; - long(prefetch_size, &mut writer)?; - short(prefetch_count, &mut writer)?; - bit(&[global, ], &mut writer)?; - } - Class::Basic(Basic::QosOk { - }) => { - writer.write_all(&[0, 60, 0, 11])?; - } - Class::Basic(Basic::Consume { - reserved_1, - queue, - consumer_tag, - no_local, - no_ack, - exclusive, - no_wait, - arguments, - }) => { - writer.write_all(&[0, 60, 0, 20])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(consumer_tag, &mut writer)?; - bit(&[no_local, no_ack, exclusive, no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Basic(Basic::ConsumeOk { - consumer_tag, - }) => { - writer.write_all(&[0, 60, 0, 21])?; - shortstr(consumer_tag, &mut writer)?; - } - Class::Basic(Basic::Cancel { - consumer_tag, - no_wait, - }) => { - writer.write_all(&[0, 60, 0, 30])?; - shortstr(consumer_tag, &mut writer)?; - bit(&[no_wait, ], &mut writer)?; - } - Class::Basic(Basic::CancelOk { - consumer_tag, - }) => { - writer.write_all(&[0, 60, 0, 31])?; - shortstr(consumer_tag, &mut writer)?; - } - Class::Basic(Basic::Publish { - reserved_1, - exchange, - routing_key, - mandatory, - immediate, - }) => { - writer.write_all(&[0, 60, 0, 40])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - bit(&[mandatory, immediate, ], &mut writer)?; - } - Class::Basic(Basic::Return { - reply_code, - reply_text, - exchange, - routing_key, - }) => { - writer.write_all(&[0, 60, 0, 50])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - } - Class::Basic(Basic::Deliver { - consumer_tag, - delivery_tag, - redelivered, - exchange, - routing_key, - }) => { - writer.write_all(&[0, 60, 0, 60])?; - shortstr(consumer_tag, &mut writer)?; - longlong(delivery_tag, &mut writer)?; - bit(&[redelivered, ], &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - } - Class::Basic(Basic::Get { - reserved_1, - queue, - no_ack, - }) => { - writer.write_all(&[0, 60, 0, 70])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[no_ack, ], &mut writer)?; - } - Class::Basic(Basic::GetOk { - delivery_tag, - redelivered, - exchange, - routing_key, - message_count, - }) => { - writer.write_all(&[0, 60, 0, 71])?; - longlong(delivery_tag, &mut writer)?; - bit(&[redelivered, ], &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - long(message_count, &mut writer)?; - } - Class::Basic(Basic::GetEmpty { - reserved_1, - }) => { - writer.write_all(&[0, 60, 0, 72])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Basic(Basic::Ack { - delivery_tag, - multiple, - }) => { - writer.write_all(&[0, 60, 0, 80])?; - longlong(delivery_tag, &mut writer)?; - bit(&[multiple, ], &mut writer)?; - } - Class::Basic(Basic::Reject { - delivery_tag, - requeue, - }) => { - writer.write_all(&[0, 60, 0, 90])?; - longlong(delivery_tag, &mut writer)?; - bit(&[requeue, ], &mut writer)?; - } - Class::Basic(Basic::RecoverAsync { - requeue, - }) => { - writer.write_all(&[0, 60, 0, 100])?; - bit(&[requeue, ], &mut writer)?; - } - Class::Basic(Basic::Recover { - requeue, - }) => { - writer.write_all(&[0, 60, 0, 110])?; - bit(&[requeue, ], &mut writer)?; - } - Class::Basic(Basic::RecoverOk { - }) => { - writer.write_all(&[0, 60, 0, 111])?; - } - Class::Tx(Tx::Select { - }) => { - writer.write_all(&[0, 90, 0, 10])?; - } - Class::Tx(Tx::SelectOk { - }) => { - writer.write_all(&[0, 90, 0, 11])?; - } - Class::Tx(Tx::Commit { - }) => { - writer.write_all(&[0, 90, 0, 20])?; - } - Class::Tx(Tx::CommitOk { - }) => { - writer.write_all(&[0, 90, 0, 21])?; - } - Class::Tx(Tx::Rollback { - }) => { - writer.write_all(&[0, 90, 0, 30])?; - } - Class::Tx(Tx::RollbackOk { - }) => { - writer.write_all(&[0, 90, 0, 31])?; + pub fn write_method(class: Class, mut writer: W) -> Result<(), TransError> { + match class { + Class::Connection(Connection::Start { + version_major, + version_minor, + server_properties, + mechanisms, + locales, + }) => { + writer.write_all(&[0, 10, 0, 10])?; + octet(version_major, &mut writer)?; + octet(version_minor, &mut writer)?; + table(server_properties, &mut writer)?; + longstr(mechanisms, &mut writer)?; + longstr(locales, &mut writer)?; + } + Class::Connection(Connection::StartOk { + client_properties, + mechanism, + response, + locale, + }) => { + writer.write_all(&[0, 10, 0, 11])?; + table(client_properties, &mut writer)?; + shortstr(mechanism, &mut writer)?; + longstr(response, &mut writer)?; + shortstr(locale, &mut writer)?; + } + Class::Connection(Connection::Secure { challenge }) => { + writer.write_all(&[0, 10, 0, 20])?; + longstr(challenge, &mut writer)?; + } + Class::Connection(Connection::SecureOk { response }) => { + writer.write_all(&[0, 10, 0, 21])?; + longstr(response, &mut writer)?; + } + Class::Connection(Connection::Tune { + channel_max, + frame_max, + heartbeat, + }) => { + writer.write_all(&[0, 10, 0, 30])?; + short(channel_max, &mut writer)?; + long(frame_max, &mut writer)?; + short(heartbeat, &mut writer)?; + } + Class::Connection(Connection::TuneOk { + channel_max, + frame_max, + heartbeat, + }) => { + writer.write_all(&[0, 10, 0, 31])?; + short(channel_max, &mut writer)?; + long(frame_max, &mut writer)?; + short(heartbeat, &mut writer)?; + } + Class::Connection(Connection::Open { + virtual_host, + reserved_1, + reserved_2, + }) => { + writer.write_all(&[0, 10, 0, 40])?; + shortstr(virtual_host, &mut writer)?; + shortstr(reserved_1, &mut writer)?; + bit(&[reserved_2], &mut writer)?; + } + Class::Connection(Connection::OpenOk { reserved_1 }) => { + writer.write_all(&[0, 10, 0, 41])?; + shortstr(reserved_1, &mut writer)?; + } + Class::Connection(Connection::Close { + reply_code, + reply_text, + class_id, + method_id, + }) => { + writer.write_all(&[0, 10, 0, 50])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + short(class_id, &mut writer)?; + short(method_id, &mut writer)?; + } + Class::Connection(Connection::CloseOk {}) => { + writer.write_all(&[0, 10, 0, 51])?; + } + Class::Channel(Channel::Open { reserved_1 }) => { + writer.write_all(&[0, 20, 0, 10])?; + shortstr(reserved_1, &mut writer)?; + } + Class::Channel(Channel::OpenOk { reserved_1 }) => { + writer.write_all(&[0, 20, 0, 11])?; + longstr(reserved_1, &mut writer)?; + } + Class::Channel(Channel::Flow { active }) => { + writer.write_all(&[0, 20, 0, 20])?; + bit(&[active], &mut writer)?; + } + Class::Channel(Channel::FlowOk { active }) => { + writer.write_all(&[0, 20, 0, 21])?; + bit(&[active], &mut writer)?; + } + Class::Channel(Channel::Close { + reply_code, + reply_text, + class_id, + method_id, + }) => { + writer.write_all(&[0, 20, 0, 40])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + short(class_id, &mut writer)?; + short(method_id, &mut writer)?; + } + Class::Channel(Channel::CloseOk {}) => { + writer.write_all(&[0, 20, 0, 41])?; + } + Class::Exchange(Exchange::Declare { + reserved_1, + exchange, + r#type, + passive, + durable, + reserved_2, + reserved_3, + no_wait, + arguments, + }) => { + writer.write_all(&[0, 40, 0, 10])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(r#type, &mut writer)?; + bit( + &[passive, durable, reserved_2, reserved_3, no_wait], + &mut writer, + )?; + table(arguments, &mut writer)?; + } + Class::Exchange(Exchange::DeclareOk {}) => { + writer.write_all(&[0, 40, 0, 11])?; + } + Class::Exchange(Exchange::Delete { + reserved_1, + exchange, + if_unused, + no_wait, + }) => { + writer.write_all(&[0, 40, 0, 20])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + bit(&[if_unused, no_wait], &mut writer)?; + } + Class::Exchange(Exchange::DeleteOk {}) => { + writer.write_all(&[0, 40, 0, 21])?; + } + Class::Queue(Queue::Declare { + reserved_1, + queue, + passive, + durable, + exclusive, + auto_delete, + no_wait, + arguments, + }) => { + writer.write_all(&[0, 50, 0, 10])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit( + &[passive, durable, exclusive, auto_delete, no_wait], + &mut writer, + )?; + table(arguments, &mut writer)?; + } + Class::Queue(Queue::DeclareOk { + queue, + message_count, + consumer_count, + }) => { + writer.write_all(&[0, 50, 0, 11])?; + shortstr(queue, &mut writer)?; + long(message_count, &mut writer)?; + long(consumer_count, &mut writer)?; + } + Class::Queue(Queue::Bind { + reserved_1, + queue, + exchange, + routing_key, + no_wait, + arguments, + }) => { + writer.write_all(&[0, 50, 0, 20])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + bit(&[no_wait], &mut writer)?; + table(arguments, &mut writer)?; + } + Class::Queue(Queue::BindOk {}) => { + writer.write_all(&[0, 50, 0, 21])?; + } + Class::Queue(Queue::Unbind { + reserved_1, + queue, + exchange, + routing_key, + arguments, + }) => { + writer.write_all(&[0, 50, 0, 50])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + table(arguments, &mut writer)?; + } + Class::Queue(Queue::UnbindOk {}) => { + writer.write_all(&[0, 50, 0, 51])?; + } + Class::Queue(Queue::Purge { + reserved_1, + queue, + no_wait, + }) => { + writer.write_all(&[0, 50, 0, 30])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit(&[no_wait], &mut writer)?; + } + Class::Queue(Queue::PurgeOk { message_count }) => { + writer.write_all(&[0, 50, 0, 31])?; + long(message_count, &mut writer)?; + } + Class::Queue(Queue::Delete { + reserved_1, + queue, + if_unused, + if_empty, + no_wait, + }) => { + writer.write_all(&[0, 50, 0, 40])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit(&[if_unused, if_empty, no_wait], &mut writer)?; + } + Class::Queue(Queue::DeleteOk { message_count }) => { + writer.write_all(&[0, 50, 0, 41])?; + long(message_count, &mut writer)?; + } + Class::Basic(Basic::Qos { + prefetch_size, + prefetch_count, + global, + }) => { + writer.write_all(&[0, 60, 0, 10])?; + long(prefetch_size, &mut writer)?; + short(prefetch_count, &mut writer)?; + bit(&[global], &mut writer)?; + } + Class::Basic(Basic::QosOk {}) => { + writer.write_all(&[0, 60, 0, 11])?; + } + Class::Basic(Basic::Consume { + reserved_1, + queue, + consumer_tag, + no_local, + no_ack, + exclusive, + no_wait, + arguments, + }) => { + writer.write_all(&[0, 60, 0, 20])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(consumer_tag, &mut writer)?; + bit(&[no_local, no_ack, exclusive, no_wait], &mut writer)?; + table(arguments, &mut writer)?; + } + Class::Basic(Basic::ConsumeOk { consumer_tag }) => { + writer.write_all(&[0, 60, 0, 21])?; + shortstr(consumer_tag, &mut writer)?; + } + Class::Basic(Basic::Cancel { + consumer_tag, + no_wait, + }) => { + writer.write_all(&[0, 60, 0, 30])?; + shortstr(consumer_tag, &mut writer)?; + bit(&[no_wait], &mut writer)?; + } + Class::Basic(Basic::CancelOk { consumer_tag }) => { + writer.write_all(&[0, 60, 0, 31])?; + shortstr(consumer_tag, &mut writer)?; + } + Class::Basic(Basic::Publish { + reserved_1, + exchange, + routing_key, + mandatory, + immediate, + }) => { + writer.write_all(&[0, 60, 0, 40])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + bit(&[mandatory, immediate], &mut writer)?; + } + Class::Basic(Basic::Return { + reply_code, + reply_text, + exchange, + routing_key, + }) => { + writer.write_all(&[0, 60, 0, 50])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + } + Class::Basic(Basic::Deliver { + consumer_tag, + delivery_tag, + redelivered, + exchange, + routing_key, + }) => { + writer.write_all(&[0, 60, 0, 60])?; + shortstr(consumer_tag, &mut writer)?; + longlong(delivery_tag, &mut writer)?; + bit(&[redelivered], &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + } + Class::Basic(Basic::Get { + reserved_1, + queue, + no_ack, + }) => { + writer.write_all(&[0, 60, 0, 70])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit(&[no_ack], &mut writer)?; + } + Class::Basic(Basic::GetOk { + delivery_tag, + redelivered, + exchange, + routing_key, + message_count, + }) => { + writer.write_all(&[0, 60, 0, 71])?; + longlong(delivery_tag, &mut writer)?; + bit(&[redelivered], &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + long(message_count, &mut writer)?; + } + Class::Basic(Basic::GetEmpty { reserved_1 }) => { + writer.write_all(&[0, 60, 0, 72])?; + shortstr(reserved_1, &mut writer)?; + } + Class::Basic(Basic::Ack { + delivery_tag, + multiple, + }) => { + writer.write_all(&[0, 60, 0, 80])?; + longlong(delivery_tag, &mut writer)?; + bit(&[multiple], &mut writer)?; + } + Class::Basic(Basic::Reject { + delivery_tag, + requeue, + }) => { + writer.write_all(&[0, 60, 0, 90])?; + longlong(delivery_tag, &mut writer)?; + bit(&[requeue], &mut writer)?; + } + Class::Basic(Basic::RecoverAsync { requeue }) => { + writer.write_all(&[0, 60, 0, 100])?; + bit(&[requeue], &mut writer)?; + } + Class::Basic(Basic::Recover { requeue }) => { + writer.write_all(&[0, 60, 0, 110])?; + bit(&[requeue], &mut writer)?; + } + Class::Basic(Basic::RecoverOk {}) => { + writer.write_all(&[0, 60, 0, 111])?; + } + Class::Tx(Tx::Select {}) => { + writer.write_all(&[0, 90, 0, 10])?; + } + Class::Tx(Tx::SelectOk {}) => { + writer.write_all(&[0, 90, 0, 11])?; + } + Class::Tx(Tx::Commit {}) => { + writer.write_all(&[0, 90, 0, 20])?; + } + Class::Tx(Tx::CommitOk {}) => { + writer.write_all(&[0, 90, 0, 21])?; + } + Class::Tx(Tx::Rollback {}) => { + writer.write_all(&[0, 90, 0, 30])?; + } + Class::Tx(Tx::RollbackOk {}) => { + writer.write_all(&[0, 90, 0, 31])?; + } } + Ok(()) } - Ok(()) -} } #[cfg(test)] mod random { -use rand::Rng; -use crate::classes::tests::RandomMethod; -use super::*; + use super::*; + use crate::classes::tests::RandomMethod; + use rand::Rng; -impl RandomMethod for Class { - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self { - match rng.gen_range(0u32..6) { - 0 => Class::Connection(Connection::random(rng)), - 1 => Class::Channel(Channel::random(rng)), - 2 => Class::Exchange(Exchange::random(rng)), - 3 => Class::Queue(Queue::random(rng)), - 4 => Class::Basic(Basic::random(rng)), - 5 => Class::Tx(Tx::random(rng)), - _ => unreachable!(), + impl RandomMethod for Class { + #[allow(unused_variables)] + fn random(rng: &mut R) -> Self { + match rng.gen_range(0u32..6) { + 0 => Class::Connection(Connection::random(rng)), + 1 => Class::Channel(Channel::random(rng)), + 2 => Class::Exchange(Exchange::random(rng)), + 3 => Class::Queue(Queue::random(rng)), + 4 => Class::Basic(Basic::random(rng)), + 5 => Class::Tx(Tx::random(rng)), + _ => unreachable!(), } + } + } + impl RandomMethod for Connection { + #[allow(unused_variables)] + fn random(rng: &mut R) -> Self { + match rng.gen_range(0u32..10) { + 0 => Connection::Start { + version_major: RandomMethod::random(rng), + version_minor: RandomMethod::random(rng), + server_properties: RandomMethod::random(rng), + mechanisms: RandomMethod::random(rng), + locales: RandomMethod::random(rng), + }, + 1 => Connection::StartOk { + client_properties: RandomMethod::random(rng), + mechanism: RandomMethod::random(rng), + response: RandomMethod::random(rng), + locale: RandomMethod::random(rng), + }, + 2 => Connection::Secure { + challenge: RandomMethod::random(rng), + }, + 3 => Connection::SecureOk { + response: RandomMethod::random(rng), + }, + 4 => Connection::Tune { + channel_max: RandomMethod::random(rng), + frame_max: RandomMethod::random(rng), + heartbeat: RandomMethod::random(rng), + }, + 5 => Connection::TuneOk { + channel_max: RandomMethod::random(rng), + frame_max: RandomMethod::random(rng), + heartbeat: RandomMethod::random(rng), + }, + 6 => Connection::Open { + virtual_host: RandomMethod::random(rng), + reserved_1: RandomMethod::random(rng), + reserved_2: RandomMethod::random(rng), + }, + 7 => Connection::OpenOk { + reserved_1: RandomMethod::random(rng), + }, + 8 => Connection::Close { + reply_code: RandomMethod::random(rng), + reply_text: RandomMethod::random(rng), + class_id: RandomMethod::random(rng), + method_id: RandomMethod::random(rng), + }, + 9 => Connection::CloseOk {}, + _ => unreachable!(), + } + } + } + impl RandomMethod for Channel { + #[allow(unused_variables)] + fn random(rng: &mut R) -> Self { + match rng.gen_range(0u32..6) { + 0 => Channel::Open { + reserved_1: RandomMethod::random(rng), + }, + 1 => Channel::OpenOk { + reserved_1: RandomMethod::random(rng), + }, + 2 => Channel::Flow { + active: RandomMethod::random(rng), + }, + 3 => Channel::FlowOk { + active: RandomMethod::random(rng), + }, + 4 => Channel::Close { + reply_code: RandomMethod::random(rng), + reply_text: RandomMethod::random(rng), + class_id: RandomMethod::random(rng), + method_id: RandomMethod::random(rng), + }, + 5 => Channel::CloseOk {}, + _ => unreachable!(), + } + } + } + impl RandomMethod for Exchange { + #[allow(unused_variables)] + fn random(rng: &mut R) -> Self { + match rng.gen_range(0u32..4) { + 0 => Exchange::Declare { + reserved_1: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + r#type: RandomMethod::random(rng), + passive: RandomMethod::random(rng), + durable: RandomMethod::random(rng), + reserved_2: RandomMethod::random(rng), + reserved_3: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + arguments: RandomMethod::random(rng), + }, + 1 => Exchange::DeclareOk {}, + 2 => Exchange::Delete { + reserved_1: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + if_unused: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + }, + 3 => Exchange::DeleteOk {}, + _ => unreachable!(), + } + } + } + impl RandomMethod for Queue { + #[allow(unused_variables)] + fn random(rng: &mut R) -> Self { + match rng.gen_range(0u32..10) { + 0 => Queue::Declare { + reserved_1: RandomMethod::random(rng), + queue: RandomMethod::random(rng), + passive: RandomMethod::random(rng), + durable: RandomMethod::random(rng), + exclusive: RandomMethod::random(rng), + auto_delete: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + arguments: RandomMethod::random(rng), + }, + 1 => Queue::DeclareOk { + queue: RandomMethod::random(rng), + message_count: RandomMethod::random(rng), + consumer_count: RandomMethod::random(rng), + }, + 2 => Queue::Bind { + reserved_1: RandomMethod::random(rng), + queue: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + routing_key: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + arguments: RandomMethod::random(rng), + }, + 3 => Queue::BindOk {}, + 4 => Queue::Unbind { + reserved_1: RandomMethod::random(rng), + queue: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + routing_key: RandomMethod::random(rng), + arguments: RandomMethod::random(rng), + }, + 5 => Queue::UnbindOk {}, + 6 => Queue::Purge { + reserved_1: RandomMethod::random(rng), + queue: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + }, + 7 => Queue::PurgeOk { + message_count: RandomMethod::random(rng), + }, + 8 => Queue::Delete { + reserved_1: RandomMethod::random(rng), + queue: RandomMethod::random(rng), + if_unused: RandomMethod::random(rng), + if_empty: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + }, + 9 => Queue::DeleteOk { + message_count: RandomMethod::random(rng), + }, + _ => unreachable!(), + } + } + } + impl RandomMethod for Basic { + #[allow(unused_variables)] + fn random(rng: &mut R) -> Self { + match rng.gen_range(0u32..17) { + 0 => Basic::Qos { + prefetch_size: RandomMethod::random(rng), + prefetch_count: RandomMethod::random(rng), + global: RandomMethod::random(rng), + }, + 1 => Basic::QosOk {}, + 2 => Basic::Consume { + reserved_1: RandomMethod::random(rng), + queue: RandomMethod::random(rng), + consumer_tag: RandomMethod::random(rng), + no_local: RandomMethod::random(rng), + no_ack: RandomMethod::random(rng), + exclusive: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + arguments: RandomMethod::random(rng), + }, + 3 => Basic::ConsumeOk { + consumer_tag: RandomMethod::random(rng), + }, + 4 => Basic::Cancel { + consumer_tag: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + }, + 5 => Basic::CancelOk { + consumer_tag: RandomMethod::random(rng), + }, + 6 => Basic::Publish { + reserved_1: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + routing_key: RandomMethod::random(rng), + mandatory: RandomMethod::random(rng), + immediate: RandomMethod::random(rng), + }, + 7 => Basic::Return { + reply_code: RandomMethod::random(rng), + reply_text: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + routing_key: RandomMethod::random(rng), + }, + 8 => Basic::Deliver { + consumer_tag: RandomMethod::random(rng), + delivery_tag: RandomMethod::random(rng), + redelivered: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + routing_key: RandomMethod::random(rng), + }, + 9 => Basic::Get { + reserved_1: RandomMethod::random(rng), + queue: RandomMethod::random(rng), + no_ack: RandomMethod::random(rng), + }, + 10 => Basic::GetOk { + delivery_tag: RandomMethod::random(rng), + redelivered: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + routing_key: RandomMethod::random(rng), + message_count: RandomMethod::random(rng), + }, + 11 => Basic::GetEmpty { + reserved_1: RandomMethod::random(rng), + }, + 12 => Basic::Ack { + delivery_tag: RandomMethod::random(rng), + multiple: RandomMethod::random(rng), + }, + 13 => Basic::Reject { + delivery_tag: RandomMethod::random(rng), + requeue: RandomMethod::random(rng), + }, + 14 => Basic::RecoverAsync { + requeue: RandomMethod::random(rng), + }, + 15 => Basic::Recover { + requeue: RandomMethod::random(rng), + }, + 16 => Basic::RecoverOk {}, + _ => unreachable!(), + } + } + } + impl RandomMethod for Tx { + #[allow(unused_variables)] + fn random(rng: &mut R) -> Self { + match rng.gen_range(0u32..6) { + 0 => Tx::Select {}, + 1 => Tx::SelectOk {}, + 2 => Tx::Commit {}, + 3 => Tx::CommitOk {}, + 4 => Tx::Rollback {}, + 5 => Tx::RollbackOk {}, + _ => unreachable!(), + } + } } } -impl RandomMethod for Connection { - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self { - match rng.gen_range(0u32..10) { - 0 => Connection::Start { - version_major: RandomMethod::random(rng), - version_minor: RandomMethod::random(rng), - server_properties: RandomMethod::random(rng), - mechanisms: RandomMethod::random(rng), - locales: RandomMethod::random(rng), - }, - 1 => Connection::StartOk { - client_properties: RandomMethod::random(rng), - mechanism: RandomMethod::random(rng), - response: RandomMethod::random(rng), - locale: RandomMethod::random(rng), - }, - 2 => Connection::Secure { - challenge: RandomMethod::random(rng), - }, - 3 => Connection::SecureOk { - response: RandomMethod::random(rng), - }, - 4 => Connection::Tune { - channel_max: RandomMethod::random(rng), - frame_max: RandomMethod::random(rng), - heartbeat: RandomMethod::random(rng), - }, - 5 => Connection::TuneOk { - channel_max: RandomMethod::random(rng), - frame_max: RandomMethod::random(rng), - heartbeat: RandomMethod::random(rng), - }, - 6 => Connection::Open { - virtual_host: RandomMethod::random(rng), - reserved_1: RandomMethod::random(rng), - reserved_2: RandomMethod::random(rng), - }, - 7 => Connection::OpenOk { - reserved_1: RandomMethod::random(rng), - }, - 8 => Connection::Close { - reply_code: RandomMethod::random(rng), - reply_text: RandomMethod::random(rng), - class_id: RandomMethod::random(rng), - method_id: RandomMethod::random(rng), - }, - 9 => Connection::CloseOk { - }, - _ => unreachable!(), - } - } -} -impl RandomMethod for Channel { - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self { - match rng.gen_range(0u32..6) { - 0 => Channel::Open { - reserved_1: RandomMethod::random(rng), - }, - 1 => Channel::OpenOk { - reserved_1: RandomMethod::random(rng), - }, - 2 => Channel::Flow { - active: RandomMethod::random(rng), - }, - 3 => Channel::FlowOk { - active: RandomMethod::random(rng), - }, - 4 => Channel::Close { - reply_code: RandomMethod::random(rng), - reply_text: RandomMethod::random(rng), - class_id: RandomMethod::random(rng), - method_id: RandomMethod::random(rng), - }, - 5 => Channel::CloseOk { - }, - _ => unreachable!(), - } - } -} -impl RandomMethod for Exchange { - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self { - match rng.gen_range(0u32..4) { - 0 => Exchange::Declare { - reserved_1: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - r#type: RandomMethod::random(rng), - passive: RandomMethod::random(rng), - durable: RandomMethod::random(rng), - reserved_2: RandomMethod::random(rng), - reserved_3: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - arguments: RandomMethod::random(rng), - }, - 1 => Exchange::DeclareOk { - }, - 2 => Exchange::Delete { - reserved_1: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - if_unused: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - }, - 3 => Exchange::DeleteOk { - }, - _ => unreachable!(), - } - } -} -impl RandomMethod for Queue { - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self { - match rng.gen_range(0u32..10) { - 0 => Queue::Declare { - reserved_1: RandomMethod::random(rng), - queue: RandomMethod::random(rng), - passive: RandomMethod::random(rng), - durable: RandomMethod::random(rng), - exclusive: RandomMethod::random(rng), - auto_delete: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - arguments: RandomMethod::random(rng), - }, - 1 => Queue::DeclareOk { - queue: RandomMethod::random(rng), - message_count: RandomMethod::random(rng), - consumer_count: RandomMethod::random(rng), - }, - 2 => Queue::Bind { - reserved_1: RandomMethod::random(rng), - queue: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - routing_key: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - arguments: RandomMethod::random(rng), - }, - 3 => Queue::BindOk { - }, - 4 => Queue::Unbind { - reserved_1: RandomMethod::random(rng), - queue: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - routing_key: RandomMethod::random(rng), - arguments: RandomMethod::random(rng), - }, - 5 => Queue::UnbindOk { - }, - 6 => Queue::Purge { - reserved_1: RandomMethod::random(rng), - queue: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - }, - 7 => Queue::PurgeOk { - message_count: RandomMethod::random(rng), - }, - 8 => Queue::Delete { - reserved_1: RandomMethod::random(rng), - queue: RandomMethod::random(rng), - if_unused: RandomMethod::random(rng), - if_empty: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - }, - 9 => Queue::DeleteOk { - message_count: RandomMethod::random(rng), - }, - _ => unreachable!(), - } - } -} -impl RandomMethod for Basic { - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self { - match rng.gen_range(0u32..17) { - 0 => Basic::Qos { - prefetch_size: RandomMethod::random(rng), - prefetch_count: RandomMethod::random(rng), - global: RandomMethod::random(rng), - }, - 1 => Basic::QosOk { - }, - 2 => Basic::Consume { - reserved_1: RandomMethod::random(rng), - queue: RandomMethod::random(rng), - consumer_tag: RandomMethod::random(rng), - no_local: RandomMethod::random(rng), - no_ack: RandomMethod::random(rng), - exclusive: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - arguments: RandomMethod::random(rng), - }, - 3 => Basic::ConsumeOk { - consumer_tag: RandomMethod::random(rng), - }, - 4 => Basic::Cancel { - consumer_tag: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - }, - 5 => Basic::CancelOk { - consumer_tag: RandomMethod::random(rng), - }, - 6 => Basic::Publish { - reserved_1: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - routing_key: RandomMethod::random(rng), - mandatory: RandomMethod::random(rng), - immediate: RandomMethod::random(rng), - }, - 7 => Basic::Return { - reply_code: RandomMethod::random(rng), - reply_text: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - routing_key: RandomMethod::random(rng), - }, - 8 => Basic::Deliver { - consumer_tag: RandomMethod::random(rng), - delivery_tag: RandomMethod::random(rng), - redelivered: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - routing_key: RandomMethod::random(rng), - }, - 9 => Basic::Get { - reserved_1: RandomMethod::random(rng), - queue: RandomMethod::random(rng), - no_ack: RandomMethod::random(rng), - }, - 10 => Basic::GetOk { - delivery_tag: RandomMethod::random(rng), - redelivered: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - routing_key: RandomMethod::random(rng), - message_count: RandomMethod::random(rng), - }, - 11 => Basic::GetEmpty { - reserved_1: RandomMethod::random(rng), - }, - 12 => Basic::Ack { - delivery_tag: RandomMethod::random(rng), - multiple: RandomMethod::random(rng), - }, - 13 => Basic::Reject { - delivery_tag: RandomMethod::random(rng), - requeue: RandomMethod::random(rng), - }, - 14 => Basic::RecoverAsync { - requeue: RandomMethod::random(rng), - }, - 15 => Basic::Recover { - requeue: RandomMethod::random(rng), - }, - 16 => Basic::RecoverOk { - }, - _ => unreachable!(), - } - } -} -impl RandomMethod for Tx { - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self { - match rng.gen_range(0u32..6) { - 0 => Tx::Select { - }, - 1 => Tx::SelectOk { - }, - 2 => Tx::Commit { - }, - 3 => Tx::CommitOk { - }, - 4 => Tx::Rollback { - }, - 5 => Tx::RollbackOk { - }, - _ => unreachable!(), - } - } -} -} diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 11829ea..192df99 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -1,12 +1,18 @@ +use crate::classes::Class; use crate::error::{ConException, ProtocolError, Result}; use crate::frame::{Frame, FrameType}; use crate::{classes, frame, sasl}; +use amqp_core::GlobalData; use anyhow::Context; use std::collections::HashMap; use std::net::SocketAddr; +use std::pin::Pin; +use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; -use tracing::{debug, error, info}; +use tokio::time; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; fn ensure_conn(condition: bool) -> Result<()> { if condition { @@ -21,22 +27,42 @@ const CHANNEL_MAX: u16 = 0; const FRAME_SIZE_MAX: u32 = 0; const HEARTBEAT_DELAY: u16 = 0; +pub struct Channel { + num: u16, + channel_handle: amqp_core::ChannelHandle, +} + pub struct Connection { + id: Uuid, stream: TcpStream, max_frame_size: usize, heartbeat_delay: u16, channel_max: u16, + next_timeout: Pin>, + channels: HashMap, connection_handle: amqp_core::ConnectionHandle, + global_data: GlobalData, } +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); + impl Connection { - pub fn new(stream: TcpStream, connection_handle: amqp_core::ConnectionHandle) -> Self { + pub fn new( + id: Uuid, + stream: TcpStream, + connection_handle: amqp_core::ConnectionHandle, + global_data: GlobalData, + ) -> Self { Self { + id, stream, max_frame_size: FRAME_SIZE_MIN_MAX, heartbeat_delay: HEARTBEAT_DELAY, channel_max: CHANNEL_MAX, + next_timeout: Box::pin(time::sleep(DEFAULT_TIMEOUT)), connection_handle, + channels: HashMap::new(), + global_data, } } @@ -58,10 +84,7 @@ impl Connection { info!("Connection is ready for usage!"); - loop { - let method = self.recv_method().await?; - debug!(?method, "Received method"); - } + self.main_loop().await } async fn send_method(&mut self, channel: u16, method: classes::Class) -> Result<()> { @@ -146,6 +169,7 @@ impl Connection { self.channel_max = channel_max; self.max_frame_size = usize::try_from(frame_max).unwrap(); self.heartbeat_delay = heartbeat; + self.reset_timeout(); } Ok(()) @@ -170,6 +194,103 @@ impl Connection { Ok(()) } + async fn main_loop(&mut self) -> Result<()> { + loop { + tokio::select! { + frame = frame::read_frame(&mut self.stream, self.max_frame_size) => { + debug!(?frame); + let frame = frame?; + self.reset_timeout(); + + match frame.kind { + FrameType::Method => self.dispatch_method(frame).await?, + FrameType::Heartbeat => {} + _ => warn!(frame_type = ?frame.kind, "TODO"), + } + } + _ = &mut self.next_timeout => { + if self.heartbeat_delay != 0 { + return Err(ProtocolError::CloseNow.into()); + } + } + } + } + } + + async fn dispatch_method(&mut self, frame: Frame) -> Result<()> { + let method = classes::parse_method(&frame.payload)?; + debug!(?method, "Received method"); + + match method { + classes::Class::Connection(classes::Connection::Close { .. }) => { + // todo: handle closing + } + classes::Class::Channel(classes::Channel::Open { .. }) => { + self.channel_open(frame.channel).await? + } + + _ => { + // we don't handle this here, forward it to *somewhere* + } + } + + Ok(()) + } + + async fn channel_open(&mut self, num: u16) -> Result<()> { + let id = Uuid::from_bytes(rand::random()); + let channel_handle = amqp_core::Channel::new_handle( + id, + num, + self.connection_handle.clone(), + self.global_data.clone(), + ); + + let channel = Channel { + num, + channel_handle: channel_handle.clone(), + }; + + let prev = self.channels.insert(num, channel); + if let Some(prev) = prev { + self.channels.insert(num, prev); // restore previous state + return Err(ConException::ChannelError.into_trans()); + } + + { + let mut global_data = self.global_data.lock(); + global_data.channels.insert(id, channel_handle.clone()); + global_data + .connections + .get_mut(&self.id) + .unwrap() + .lock() + .channels + .insert(num, channel_handle); + } + + info!(%num, "Opened new channel"); + + self.send_method( + num, + Class::Channel(classes::Channel::OpenOk { + reserved_1: Vec::new(), + }), + ) + .await?; + + time::sleep(Duration::from_secs(1000)).await; // for debugging the dashboard + + Ok(()) + } + + fn reset_timeout(&mut self) { + if self.heartbeat_delay != 0 { + let next = Duration::from_secs(u64::from(self.heartbeat_delay)); + self.next_timeout = Box::pin(time::sleep(next)); + } + } + async fn negotiate_version(&mut self) -> Result<()> { const HEADER_SIZE: usize = 8; const SUPPORTED_PROTOCOL_VERSION: &[u8] = &[0, 9, 1]; diff --git a/amqp_transport/src/lib.rs b/amqp_transport/src/lib.rs index 2c07ab6..a1a4c57 100644 --- a/amqp_transport/src/lib.rs +++ b/amqp_transport/src/lib.rs @@ -31,12 +31,12 @@ pub async fn do_thing_i_guess(global_data: GlobalData) -> Result<()> { let connection_handle = amqp_core::Connection::new_handle(id, peer_addr, global_data.clone()); - let mut global_data = global_data.lock(); - global_data + let mut global_data_guard = global_data.lock(); + global_data_guard .connections .insert(id, connection_handle.clone()); - let connection = Connection::new(stream, connection_handle); + let connection = Connection::new(id, stream, connection_handle, global_data.clone()); tokio::spawn(connection.start_connection_processing().instrument(span)); } diff --git a/amqp_transport/src/sasl.rs b/amqp_transport/src/sasl.rs index caefd12..986dcb0 100644 --- a/amqp_transport/src/sasl.rs +++ b/amqp_transport/src/sasl.rs @@ -1,4 +1,4 @@ -//! Partial implementation of the SASL Authentication (see [RFC 4422](https://datatracker.ietf.org/doc/html/rfc4422)) +//! (Very) partial implementation of SASL Authentication (see [RFC 4422](https://datatracker.ietf.org/doc/html/rfc4422)) //! //! Currently only supports PLAN (see [RFC 4616](https://datatracker.ietf.org/doc/html/rfc4616))