From 9b48dec5334b33b14bf8a2ac090bd159dae96123 Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Sun, 20 Feb 2022 14:59:54 +0100 Subject: [PATCH] restructuring --- amqp_dashboard/assets/script.js | 2 +- amqp_transport/src/classes/generated.rs | 3396 +++++++++++--------- amqp_transport/src/classes/mod.rs | 2 +- amqp_transport/src/classes/parse_helper.rs | 6 +- amqp_transport/src/classes/tests.rs | 4 +- amqp_transport/src/connection.rs | 73 +- amqp_transport/src/tests.rs | 10 +- src/main.rs | 1 + xtask/src/codegen/mod.rs | 18 +- xtask/src/codegen/parser.rs | 21 +- xtask/src/codegen/random.rs | 35 +- xtask/src/codegen/write.rs | 6 +- 12 files changed, 1988 insertions(+), 1586 deletions(-) diff --git a/amqp_dashboard/assets/script.js b/amqp_dashboard/assets/script.js index 2a7f630..40ce29b 100644 --- a/amqp_dashboard/assets/script.js +++ b/amqp_dashboard/assets/script.js @@ -40,7 +40,7 @@ const renderConnections = (connections) => { }; const refresh = async () => { - const fetched = await fetch('http://localhost:3000/api/data'); + const fetched = await fetch('api/data'); const data = await fetched.json(); renderConnections(data.connections); }; diff --git a/amqp_transport/src/classes/generated.rs b/amqp_transport/src/classes/generated.rs index 8a48ef4..0d3c341 100644 --- a/amqp_transport/src/classes/generated.rs +++ b/amqp_transport/src/classes/generated.rs @@ -1,5 +1,5 @@ #![allow(dead_code)] -// This file has been generated by `amqp_codegen`. Do not edit it manually. +// This file has been generated by `xtask/src/codegen`. Do not edit it manually. pub type ClassId = u16; @@ -125,23 +125,13 @@ pub type Timestamp = u64; pub type Table = super::Table; #[derive(Debug, Clone, PartialEq)] -pub enum Class { - Connection(Connection), - Channel(Channel), - Exchange(Exchange), - Queue(Queue), - Basic(Basic), - Tx(Tx), -} - -/// The connection class provides methods for a client to establish a network connection to -/// a server, and for both peers to operate the connection thereafter. -#[derive(Debug, Clone, PartialEq)] -pub enum Connection { +pub enum Method { + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. /// This method starts the connection negotiation process by telling the client the /// protocol version that the server proposes, along with a list of security mechanisms /// which the client can use for authentication. - Start { + ConnectionStart { /// The major version number can take any value from 0 to 99 as defined in the /// AMQP specification. version_major: Octet, @@ -159,8 +149,10 @@ pub enum Connection { /// locale defines the language in which the server will send reply texts. locales: Longstr, }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. /// This method selects a SASL security mechanism. - StartOk { + ConnectionStartOk { client_properties: PeerProperties, /// must not be null /// @@ -178,26 +170,32 @@ pub enum Connection { /// specified by the server. locale: Shortstr, }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. /// The SASL protocol works by exchanging challenges and responses until both peers have /// received sufficient information to authenticate each other. This method challenges /// the client to provide more information. - Secure { + ConnectionSecure { /// Challenge information, a block of opaque binary data passed to the security /// mechanism. challenge: Longstr, }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. /// This method attempts to authenticate, passing a block of SASL data for the security /// mechanism at the server side. - SecureOk { + ConnectionSecureOk { /// must not be null /// /// A block of opaque data passed to the security mechanism. The contents of this /// data are defined by the SASL security mechanism. response: Longstr, }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. /// This method proposes a set of connection configuration values to the client. The /// client can accept and/or adjust these. - Tune { + ConnectionTune { /// Specifies highest channel number that the server permits. Usable channel numbers /// are in the range 1..channel-max. Zero indicates no specified limit. channel_max: Short, @@ -210,9 +208,11 @@ pub enum Connection { /// Zero means the server does not want a heartbeat. heartbeat: Short, }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. /// This method sends the client's connection tuning parameters to the server. /// Certain fields are negotiated, others provide capability information. - TuneOk { + ConnectionTuneOk { /// must not be null, must be less than the tune field of the method channel-max /// /// The maximum total number of channels that the client will use per connection. @@ -227,25 +227,29 @@ pub enum Connection { /// means the client does not want a heartbeat. heartbeat: Short, }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. /// This method opens a connection to a virtual host, which is a collection of /// resources, and acts to separate multiple application domains within a server. /// The server may apply arbitrary limits per virtual host, such as the number /// of each type of entity that may be used, per connection and/or in total. - Open { + ConnectionOpen { /// The name of the virtual host to work with. virtual_host: Path, reserved_1: Shortstr, reserved_2: Bit, }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. /// This method signals to the client that the connection is ready for use. - OpenOk { - reserved_1: Shortstr, - }, + ConnectionOpenOk { reserved_1: Shortstr }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. /// This method indicates that the sender wants to close the connection. This may be /// due to internal conditions (e.g. a forced shut-down) or due to an error handling /// a specific method, i.e. an exception. When a close is due to an exception, the /// sender provides the class and method id of the method which caused the exception. - Close { + ConnectionClose { reply_code: ReplyCode, reply_text: ReplyText, /// When the close is provoked by a method exception, this is the class of the @@ -254,43 +258,46 @@ pub enum Connection { /// When the close is provoked by a method exception, this is the ID of the method. method_id: MethodId, }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. /// This method confirms a Connection.Close method and tells the recipient that it is /// safe to release resources for the connection and close the socket. - CloseOk, -} -/// The channel class provides methods for a client to establish a channel to a -/// server and for both peers to operate the channel thereafter. -#[derive(Debug, Clone, PartialEq)] -pub enum Channel { + ConnectionCloseOk, + /// The channel class provides methods for a client to establish a channel to a + /// server and for both peers to operate the channel thereafter. /// This method opens a channel to the server. - Open { - reserved_1: Shortstr, - }, + ChannelOpen { reserved_1: Shortstr }, + /// The channel class provides methods for a client to establish a channel to a + /// server and for both peers to operate the channel thereafter. /// This method signals to the client that the channel is ready for use. - OpenOk { - reserved_1: Longstr, - }, + ChannelOpenOk { reserved_1: Longstr }, + /// The channel class provides methods for a client to establish a channel to a + /// server and for both peers to operate the channel thereafter. /// This method asks the peer to pause or restart the flow of content data sent by /// a consumer. This is a simple flow-control mechanism that a peer can use to avoid /// overflowing its queues or otherwise finding itself receiving more messages than /// it can process. Note that this method is not intended for window control. It does /// not affect contents returned by Basic.Get-Ok methods. - Flow { + ChannelFlow { /// If 1, the peer starts sending content frames. If 0, the peer stops sending /// content frames. active: Bit, }, + /// The channel class provides methods for a client to establish a channel to a + /// server and for both peers to operate the channel thereafter. /// Confirms to the peer that a flow command was received and processed. - FlowOk { + ChannelFlowOk { /// Confirms the setting of the processed flow method: 1 means the peer will start /// sending or continue to send content frames; 0 means it will not. active: Bit, }, + /// The channel class provides methods for a client to establish a channel to a + /// server and for both peers to operate the channel thereafter. /// This method indicates that the sender wants to close the channel. This may be due to /// internal conditions (e.g. a forced shut-down) or due to an error handling a specific /// method, i.e. an exception. When a close is due to an exception, the sender provides /// the class and method id of the method which caused the exception. - Close { + ChannelClose { reply_code: ReplyCode, reply_text: ReplyText, /// When the close is provoked by a method exception, this is the class of the @@ -299,17 +306,16 @@ pub enum Channel { /// When the close is provoked by a method exception, this is the ID of the method. method_id: MethodId, }, + /// The channel class provides methods for a client to establish a channel to a + /// server and for both peers to operate the channel thereafter. /// This method confirms a Channel.Close method and tells the recipient that it is safe /// to release resources for the channel. - CloseOk, -} -/// Exchanges match and distribute messages across queues. Exchanges can be configured in -/// the server or declared at runtime. -#[derive(Debug, Clone, PartialEq)] -pub enum Exchange { + ChannelCloseOk, + /// Exchanges match and distribute messages across queues. Exchanges can be configured in + /// the server or declared at runtime. /// This method creates an exchange if it does not already exist, and if the exchange /// exists, verifies that it is of the correct and expected class. - Declare { + ExchangeDeclare { reserved_1: Short, /// must not be null exchange: ExchangeName, @@ -336,12 +342,16 @@ pub enum Exchange { /// arguments depends on the server implementation. arguments: Table, }, + /// Exchanges match and distribute messages across queues. Exchanges can be configured in + /// the server or declared at runtime. /// This method confirms a Declare method and confirms the name of the exchange, /// essential for automatically-named exchanges. - DeclareOk, + ExchangeDeclareOk, + /// Exchanges match and distribute messages across queues. Exchanges can be configured in + /// the server or declared at runtime. /// This method deletes an exchange. When an exchange is deleted all queue bindings on /// the exchange are cancelled. - Delete { + ExchangeDelete { reserved_1: Short, /// must not be null exchange: ExchangeName, @@ -351,18 +361,17 @@ pub enum Exchange { if_unused: Bit, no_wait: NoWait, }, + /// Exchanges match and distribute messages across queues. Exchanges can be configured in + /// the server or declared at runtime. /// This method confirms the deletion of an exchange. - DeleteOk, -} -/// Queues store and forward messages. Queues can be configured in the server or created at -/// runtime. Queues must be attached to at least one exchange in order to receive messages -/// from publishers. -#[derive(Debug, Clone, PartialEq)] -pub enum Queue { + ExchangeDeleteOk, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. /// This method creates or checks a queue. When creating a new queue the client can /// specify various properties that control the durability of the queue and its /// contents, and the level of sharing for the queue. - Declare { + QueueDeclare { reserved_1: Short, queue: QueueName, /// If set, the server will reply with Declare-Ok if the queue already @@ -392,9 +401,12 @@ pub enum Queue { /// arguments depends on the server implementation. arguments: Table, }, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. /// This method confirms a Declare method and confirms the name of the queue, essential /// for automatically-named queues. - DeclareOk { + QueueDeclareOk { /// must not be null /// /// Reports the name of the queue. If the server generated a queue name, this field @@ -405,11 +417,14 @@ pub enum Queue { /// suspend activity (Channel.Flow) in which case they do not appear in this count. consumer_count: Long, }, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. /// This method binds a queue to an exchange. Until a queue is bound it will not /// receive any messages. In a classic messaging model, store-and-forward queues /// are bound to a direct exchange and subscription queues are bound to a topic /// exchange. - Bind { + QueueBind { reserved_1: Short, /// Specifies the name of the queue to bind. queue: QueueName, @@ -428,10 +443,16 @@ pub enum Queue { /// depends on the exchange class. arguments: Table, }, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. /// This method confirms that the bind was successful. - BindOk, + QueueBindOk, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. /// This method unbinds a queue from an exchange. - Unbind { + QueueUnbind { reserved_1: Short, /// Specifies the name of the queue to unbind. queue: QueueName, @@ -442,25 +463,37 @@ pub enum Queue { /// Specifies the arguments of the binding to unbind. arguments: Table, }, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. /// This method confirms that the unbind was successful. - UnbindOk, + QueueUnbindOk, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. /// This method removes all messages from a queue which are not awaiting /// acknowledgment. - Purge { + QueuePurge { reserved_1: Short, /// Specifies the name of the queue to purge. queue: QueueName, no_wait: NoWait, }, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. /// This method confirms the purge of a queue. - PurgeOk { + QueuePurgeOk { /// Reports the number of messages purged. message_count: MessageCount, }, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. /// This method deletes a queue. When a queue is deleted any pending messages are sent /// to a dead-letter queue if this is defined in the server configuration, and all /// consumers on the queue are cancelled. - Delete { + QueueDelete { reserved_1: Short, /// Specifies the name of the queue to delete. queue: QueueName, @@ -472,21 +505,21 @@ pub enum Queue { if_empty: Bit, no_wait: NoWait, }, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. /// This method confirms the deletion of a queue. - DeleteOk { + QueueDeleteOk { /// Reports the number of messages deleted. message_count: MessageCount, }, -} -/// The Basic class provides methods that support an industry-standard messaging model. -#[derive(Debug, Clone, PartialEq)] -pub enum Basic { + /// The Basic class provides methods that support an industry-standard messaging model. /// This method requests a specific quality of service. The QoS can be specified for the /// current channel or for all channels on the connection. The particular properties and /// semantics of a qos method always depend on the content class semantics. Though the /// qos method could in principle apply to both peers, it is currently meaningful only /// for the server. - Qos { + BasicQos { /// The client can request that messages be sent in advance so that when the client /// finishes processing a message, the following message is already held locally, /// rather than needing to be sent down the channel. Prefetching gives a performance @@ -505,14 +538,16 @@ pub enum Basic { /// set, they are applied to the entire connection. global: Bit, }, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method tells the client that the requested QoS levels could be handled by the /// server. The requested QoS applies to all active consumers until a new QoS is /// defined. - QosOk, + BasicQosOk, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method asks the server to start a "consumer", which is a transient request for /// messages from a specific queue. Consumers last as long as the channel they were /// declared on, or until the client cancels them. - Consume { + BasicConsume { reserved_1: Short, /// Specifies the name of the queue to consume from. queue: QueueName, @@ -530,28 +565,30 @@ pub enum Basic { /// arguments depends on the server implementation. arguments: Table, }, + /// The Basic class provides methods that support an industry-standard messaging model. /// The server provides the client with a consumer tag, which is used by the client /// for methods called on the consumer at a later stage. - ConsumeOk { + BasicConsumeOk { /// Holds the consumer tag specified by the client or provided by the server. consumer_tag: ConsumerTag, }, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method cancels a consumer. This does not affect already delivered /// messages, but it does mean the server will not send any more messages for /// that consumer. The client may receive an arbitrary number of messages in /// between sending the cancel method and receiving the cancel-ok reply. - Cancel { + BasicCancel { consumer_tag: ConsumerTag, no_wait: NoWait, }, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method confirms that the cancellation was completed. - CancelOk { - consumer_tag: ConsumerTag, - }, + BasicCancelOk { consumer_tag: ConsumerTag }, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method publishes a message to a specific exchange. The message will be routed /// to queues as defined by the exchange configuration and distributed to any active /// consumers when the transaction, if any, is committed. - Publish { + BasicPublish { reserved_1: Short, /// Specifies the name of the exchange to publish to. The exchange name can be /// empty, meaning the default exchange. If the exchange name is specified, and that @@ -570,11 +607,12 @@ pub enum Basic { /// will queue the message, but with no guarantee that it will ever be consumed. immediate: Bit, }, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method returns an undeliverable message that was published with the "immediate" /// flag set, or an unroutable message published with the "mandatory" flag set. The /// reply code and text provide information about the reason that the message was /// undeliverable. - Return { + BasicReturn { reply_code: ReplyCode, reply_text: ReplyText, /// Specifies the name of the exchange that the message was originally published @@ -583,11 +621,12 @@ pub enum Basic { /// Specifies the routing key name specified when the message was published. routing_key: Shortstr, }, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method delivers a message to the client, via a consumer. In the asynchronous /// message delivery model, the client starts a consumer using the Consume method, then /// the server responds with Deliver methods as and when messages arrive for that /// consumer. - Deliver { + BasicDeliver { consumer_tag: ConsumerTag, delivery_tag: DeliveryTag, redelivered: Redelivered, @@ -597,19 +636,21 @@ pub enum Basic { /// Specifies the routing key name specified when the message was published. routing_key: Shortstr, }, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method provides a direct access to the messages in a queue using a synchronous /// dialogue that is designed for specific types of application where synchronous /// functionality is more important than performance. - Get { + BasicGet { reserved_1: Short, /// Specifies the name of the queue to get a message from. queue: QueueName, no_ack: NoAck, }, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method delivers a message to the client following a get method. A message /// delivered by 'get-ok' must be acknowledged unless the no-ack option was set in the /// get method. - GetOk { + BasicGetOk { delivery_tag: DeliveryTag, redelivered: Redelivered, /// Specifies the name of the exchange that the message was originally published to. @@ -619,15 +660,15 @@ pub enum Basic { routing_key: Shortstr, message_count: MessageCount, }, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method tells the client that the queue has no messages available for the /// client. - GetEmpty { - reserved_1: Shortstr, - }, + BasicGetEmpty { reserved_1: Shortstr }, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method acknowledges one or more messages delivered via the Deliver or Get-Ok /// methods. The client can ask to confirm a single message or a set of messages up to /// and including a specific message. - Ack { + BasicAck { delivery_tag: DeliveryTag, /// If set to 1, the delivery tag is treated as "up to and including", so that the /// client can acknowledge multiple messages with a single method. If set to zero, @@ -635,1465 +676,1832 @@ pub enum Basic { /// delivery tag is zero, tells the server to acknowledge all outstanding messages. multiple: Bit, }, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method allows a client to reject a message. It can be used to interrupt and /// cancel large incoming messages, or return untreatable messages to their original /// queue. - Reject { + BasicReject { delivery_tag: DeliveryTag, /// If requeue is true, the server will attempt to requeue the message. If requeue /// is false or the requeue attempt fails the messages are discarded or dead-lettered. requeue: Bit, }, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method asks the server to redeliver all unacknowledged messages on a /// specified channel. Zero or more messages may be redelivered. This method /// is deprecated in favour of the synchronous Recover/Recover-Ok. - RecoverAsync { + BasicRecoverAsync { /// If this field is zero, the message will be redelivered to the original /// recipient. If this bit is 1, the server will attempt to requeue the message, /// potentially then delivering it to an alternative subscriber. requeue: Bit, }, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method asks the server to redeliver all unacknowledged messages on a /// specified channel. Zero or more messages may be redelivered. This method /// replaces the asynchronous Recover. - Recover { + BasicRecover { /// If this field is zero, the message will be redelivered to the original /// recipient. If this bit is 1, the server will attempt to requeue the message, /// potentially then delivering it to an alternative subscriber. requeue: Bit, }, + /// The Basic class provides methods that support an industry-standard messaging model. /// This method acknowledges a Basic.Recover method. - RecoverOk, -} -/// The Tx class allows publish and ack operations to be batched into atomic -/// units of work. The intention is that all publish and ack requests issued -/// within a transaction will complete successfully or none of them will. -/// Servers SHOULD implement atomic transactions at least where all publish -/// or ack requests affect a single queue. Transactions that cover multiple -/// queues may be non-atomic, given that queues can be created and destroyed -/// asynchronously, and such events do not form part of any transaction. -/// Further, the behaviour of transactions with respect to the immediate and -/// mandatory flags on Basic.Publish methods is not defined. -#[derive(Debug, Clone, PartialEq)] -pub enum Tx { + BasicRecoverOk, + /// The Tx class allows publish and ack operations to be batched into atomic + /// units of work. The intention is that all publish and ack requests issued + /// within a transaction will complete successfully or none of them will. + /// Servers SHOULD implement atomic transactions at least where all publish + /// or ack requests affect a single queue. Transactions that cover multiple + /// queues may be non-atomic, given that queues can be created and destroyed + /// asynchronously, and such events do not form part of any transaction. + /// Further, the behaviour of transactions with respect to the immediate and + /// mandatory flags on Basic.Publish methods is not defined. /// This method sets the channel to use standard transactions. The client must use this /// method at least once on a channel before using the Commit or Rollback methods. - Select, + TxSelect, + /// The Tx class allows publish and ack operations to be batched into atomic + /// units of work. The intention is that all publish and ack requests issued + /// within a transaction will complete successfully or none of them will. + /// Servers SHOULD implement atomic transactions at least where all publish + /// or ack requests affect a single queue. Transactions that cover multiple + /// queues may be non-atomic, given that queues can be created and destroyed + /// asynchronously, and such events do not form part of any transaction. + /// Further, the behaviour of transactions with respect to the immediate and + /// mandatory flags on Basic.Publish methods is not defined. /// This method confirms to the client that the channel was successfully set to use /// standard transactions. - SelectOk, + TxSelectOk, + /// The Tx class allows publish and ack operations to be batched into atomic + /// units of work. The intention is that all publish and ack requests issued + /// within a transaction will complete successfully or none of them will. + /// Servers SHOULD implement atomic transactions at least where all publish + /// or ack requests affect a single queue. Transactions that cover multiple + /// queues may be non-atomic, given that queues can be created and destroyed + /// asynchronously, and such events do not form part of any transaction. + /// Further, the behaviour of transactions with respect to the immediate and + /// mandatory flags on Basic.Publish methods is not defined. /// This method commits all message publications and acknowledgments performed in /// the current transaction. A new transaction starts immediately after a commit. - Commit, + TxCommit, + /// The Tx class allows publish and ack operations to be batched into atomic + /// units of work. The intention is that all publish and ack requests issued + /// within a transaction will complete successfully or none of them will. + /// Servers SHOULD implement atomic transactions at least where all publish + /// or ack requests affect a single queue. Transactions that cover multiple + /// queues may be non-atomic, given that queues can be created and destroyed + /// asynchronously, and such events do not form part of any transaction. + /// Further, the behaviour of transactions with respect to the immediate and + /// mandatory flags on Basic.Publish methods is not defined. /// This method confirms to the client that the commit succeeded. Note that if a commit /// fails, the server raises a channel exception. - CommitOk, + TxCommitOk, + /// The Tx class allows publish and ack operations to be batched into atomic + /// units of work. The intention is that all publish and ack requests issued + /// within a transaction will complete successfully or none of them will. + /// Servers SHOULD implement atomic transactions at least where all publish + /// or ack requests affect a single queue. Transactions that cover multiple + /// queues may be non-atomic, given that queues can be created and destroyed + /// asynchronously, and such events do not form part of any transaction. + /// Further, the behaviour of transactions with respect to the immediate and + /// mandatory flags on Basic.Publish methods is not defined. /// This method abandons all message publications and acknowledgments performed in /// the current transaction. A new transaction starts immediately after a rollback. /// Note that unacked messages will not be automatically redelivered by rollback; /// if that is required an explicit recover call should be issued. - Rollback, + TxRollback, + /// The Tx class allows publish and ack operations to be batched into atomic + /// units of work. The intention is that all publish and ack requests issued + /// within a transaction will complete successfully or none of them will. + /// Servers SHOULD implement atomic transactions at least where all publish + /// or ack requests affect a single queue. Transactions that cover multiple + /// queues may be non-atomic, given that queues can be created and destroyed + /// asynchronously, and such events do not form part of any transaction. + /// Further, the behaviour of transactions with respect to the immediate and + /// mandatory flags on Basic.Publish methods is not defined. /// This method confirms to the client that the rollback succeeded. Note that if an /// rollback fails, the server raises a channel exception. - RollbackOk, + TxRollbackOk, } + pub mod parse { -use super::*; -use crate::classes::parse_helper::*; -use crate::error::TransError; -use nom::{branch::alt, bytes::complete::tag}; -use regex::Regex; -use once_cell::sync::Lazy; + use super::*; + use crate::classes::parse_helper::*; + use crate::error::TransError; + use nom::{branch::alt, bytes::complete::tag}; + use once_cell::sync::Lazy; + use regex::Regex; -pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; - -pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err> { - alt((connection, channel, exchange, queue, basic, tx))(input) -} -fn domain_class_id(input: &[u8]) -> IResult<'_, ClassId> { - short(input) -} -fn domain_consumer_tag(input: &[u8]) -> IResult<'_, ConsumerTag> { - shortstr(input) -} -fn domain_delivery_tag(input: &[u8]) -> IResult<'_, DeliveryTag> { - longlong(input) -} -fn domain_exchange_name(input: &[u8]) -> IResult<'_, ExchangeName> { - let (input, result) = shortstr(input)?; - if result.len() > 127 { fail!("value is shorter than 127 for field result") } - static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); - if !REGEX.is_match(&result) { fail!(r"regex `^[a-zA-Z0-9-_.:]*$` did not match value for field result") } - Ok((input, result)) -} -fn domain_method_id(input: &[u8]) -> IResult<'_, MethodId> { - short(input) -} -fn domain_path(input: &[u8]) -> IResult<'_, Path> { - let (input, result) = shortstr(input)?; - if result.is_empty() { fail!("string was null for field result") } - if result.len() > 127 { fail!("value is shorter than 127 for field result") } - Ok((input, result)) -} -fn domain_peer_properties(input: &[u8]) -> IResult<'_, PeerProperties> { - table(input) -} -fn domain_queue_name(input: &[u8]) -> IResult<'_, QueueName> { - let (input, result) = shortstr(input)?; - if result.len() > 127 { fail!("value is shorter than 127 for field result") } - static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); - if !REGEX.is_match(&result) { fail!(r"regex `^[a-zA-Z0-9-_.:]*$` did not match value for field result") } - Ok((input, result)) -} -fn domain_message_count(input: &[u8]) -> IResult<'_, MessageCount> { - long(input) -} -fn domain_reply_code(input: &[u8]) -> IResult<'_, ReplyCode> { - let (input, result) = short(input)?; - if result == 0 { fail!("number was 0 for field result") } - Ok((input, result)) -} -fn domain_reply_text(input: &[u8]) -> IResult<'_, ReplyText> { - let (input, result) = shortstr(input)?; - if result.is_empty() { fail!("string was null for field result") } - Ok((input, result)) -} -fn domain_octet(input: &[u8]) -> IResult<'_, Octet> { - octet(input) -} -fn domain_short(input: &[u8]) -> IResult<'_, Short> { - short(input) -} -fn domain_long(input: &[u8]) -> IResult<'_, Long> { - long(input) -} -fn domain_longlong(input: &[u8]) -> IResult<'_, Longlong> { - longlong(input) -} -fn domain_shortstr(input: &[u8]) -> IResult<'_, Shortstr> { - shortstr(input) -} -fn domain_longstr(input: &[u8]) -> IResult<'_, Longstr> { - longstr(input) -} -fn domain_timestamp(input: &[u8]) -> IResult<'_, Timestamp> { - timestamp(input) -} -fn domain_table(input: &[u8]) -> IResult<'_, Table> { - table(input) -} -fn connection(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("invalid tag for class connection"))?; - alt((connection_start, connection_start_ok, connection_secure, connection_secure_ok, connection_tune, connection_tune_ok, connection_open, connection_open_ok, connection_close, connection_close_ok))(input).map_err(err("class connection")).map_err(failure) -} -fn connection_start(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, version_major) = domain_octet(input).map_err(err("field version-major in method start")).map_err(failure)?; - let (input, version_minor) = domain_octet(input).map_err(err("field version-minor in method start")).map_err(failure)?; - let (input, server_properties) = domain_peer_properties(input).map_err(err("field server-properties in method start")).map_err(failure)?; - let (input, mechanisms) = domain_longstr(input).map_err(err("field mechanisms in method start")).map_err(failure)?; - if mechanisms.is_empty() { fail!("string was null for field mechanisms") } - let (input, locales) = domain_longstr(input).map_err(err("field locales in method start")).map_err(failure)?; - if locales.is_empty() { fail!("string was null for field locales") } - Ok((input, Class::Connection(Connection::Start { - version_major, - version_minor, - server_properties, - mechanisms, - locales, - }))) -} -fn connection_start_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, client_properties) = domain_peer_properties(input).map_err(err("field client-properties in method start-ok")).map_err(failure)?; - let (input, mechanism) = domain_shortstr(input).map_err(err("field mechanism in method start-ok")).map_err(failure)?; - if mechanism.is_empty() { fail!("string was null for field mechanism") } - let (input, response) = domain_longstr(input).map_err(err("field response in method start-ok")).map_err(failure)?; - if response.is_empty() { fail!("string was null for field response") } - let (input, locale) = domain_shortstr(input).map_err(err("field locale in method start-ok")).map_err(failure)?; - if locale.is_empty() { fail!("string was null for field locale") } - Ok((input, Class::Connection(Connection::StartOk { - client_properties, - mechanism, - response, - locale, - }))) -} -fn connection_secure(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, challenge) = domain_longstr(input).map_err(err("field challenge in method secure")).map_err(failure)?; - Ok((input, Class::Connection(Connection::Secure { - challenge, - }))) -} -fn connection_secure_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, response) = domain_longstr(input).map_err(err("field response in method secure-ok")).map_err(failure)?; - if response.is_empty() { fail!("string was null for field response") } - Ok((input, Class::Connection(Connection::SecureOk { - response, - }))) -} -fn connection_tune(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(30_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, channel_max) = domain_short(input).map_err(err("field channel-max in method tune")).map_err(failure)?; - let (input, frame_max) = domain_long(input).map_err(err("field frame-max in method tune")).map_err(failure)?; - let (input, heartbeat) = domain_short(input).map_err(err("field heartbeat in method tune")).map_err(failure)?; - Ok((input, Class::Connection(Connection::Tune { - channel_max, - frame_max, - heartbeat, - }))) -} -fn connection_tune_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(31_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, channel_max) = domain_short(input).map_err(err("field channel-max in method tune-ok")).map_err(failure)?; - if channel_max == 0 { fail!("number was 0 for field channel_max") } - let (input, frame_max) = domain_long(input).map_err(err("field frame-max in method tune-ok")).map_err(failure)?; - let (input, heartbeat) = domain_short(input).map_err(err("field heartbeat in method tune-ok")).map_err(failure)?; - Ok((input, Class::Connection(Connection::TuneOk { - channel_max, - frame_max, - heartbeat, - }))) -} -fn connection_open(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(40_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, virtual_host) = domain_path(input).map_err(err("field virtual-host in method open")).map_err(failure)?; - let (input, reserved_1) = domain_shortstr(input).map_err(err("field reserved-1 in method open")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field reserved-2 in method open")).map_err(failure)?; - let reserved_2 = bits[0]; - Ok((input, Class::Connection(Connection::Open { - virtual_host, - reserved_1, - reserved_2, - }))) -} -fn connection_open_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(41_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_shortstr(input).map_err(err("field reserved-1 in method open-ok")).map_err(failure)?; - Ok((input, Class::Connection(Connection::OpenOk { - reserved_1, - }))) -} -fn connection_close(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(50_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reply_code) = domain_reply_code(input).map_err(err("field reply-code in method close")).map_err(failure)?; - let (input, reply_text) = domain_reply_text(input).map_err(err("field reply-text in method close")).map_err(failure)?; - let (input, class_id) = domain_class_id(input).map_err(err("field class-id in method close")).map_err(failure)?; - let (input, method_id) = domain_method_id(input).map_err(err("field method-id in method close")).map_err(failure)?; - Ok((input, Class::Connection(Connection::Close { - reply_code, - reply_text, - class_id, - method_id, - }))) -} -fn connection_close_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(51_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Connection(Connection::CloseOk { - }))) -} -fn channel(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("invalid tag for class channel"))?; - alt((channel_open, channel_open_ok, channel_flow, channel_flow_ok, channel_close, channel_close_ok))(input).map_err(err("class channel")).map_err(failure) -} -fn channel_open(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_shortstr(input).map_err(err("field reserved-1 in method open")).map_err(failure)?; - Ok((input, Class::Channel(Channel::Open { - reserved_1, - }))) -} -fn channel_open_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_longstr(input).map_err(err("field reserved-1 in method open-ok")).map_err(failure)?; - Ok((input, Class::Channel(Channel::OpenOk { - reserved_1, - }))) -} -fn channel_flow(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, bits) = bit(input, 1).map_err(err("field active in method flow")).map_err(failure)?; - let active = bits[0]; - Ok((input, Class::Channel(Channel::Flow { - active, - }))) -} -fn channel_flow_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, bits) = bit(input, 1).map_err(err("field active in method flow-ok")).map_err(failure)?; - let active = bits[0]; - Ok((input, Class::Channel(Channel::FlowOk { - active, - }))) -} -fn channel_close(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(40_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reply_code) = domain_reply_code(input).map_err(err("field reply-code in method close")).map_err(failure)?; - let (input, reply_text) = domain_reply_text(input).map_err(err("field reply-text in method close")).map_err(failure)?; - let (input, class_id) = domain_class_id(input).map_err(err("field class-id in method close")).map_err(failure)?; - let (input, method_id) = domain_method_id(input).map_err(err("field method-id in method close")).map_err(failure)?; - Ok((input, Class::Channel(Channel::Close { - reply_code, - reply_text, - class_id, - method_id, - }))) -} -fn channel_close_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(41_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Channel(Channel::CloseOk { - }))) -} -fn exchange(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(40_u16.to_be_bytes())(input).map_err(err("invalid tag for class exchange"))?; - alt((exchange_declare, exchange_declare_ok, exchange_delete, exchange_delete_ok))(input).map_err(err("class exchange")).map_err(failure) -} -fn exchange_declare(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method declare")).map_err(failure)?; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method declare")).map_err(failure)?; - if exchange.is_empty() { fail!("string was null for field exchange") } - let (input, r#type) = domain_shortstr(input).map_err(err("field type in method declare")).map_err(failure)?; - let (input, bits) = bit(input, 5).map_err(err("field passive in method declare")).map_err(failure)?; - let passive = bits[0]; - let durable = bits[1]; - let reserved_2 = bits[2]; - let reserved_3 = bits[3]; - let no_wait = bits[4]; - let (input, arguments) = domain_table(input).map_err(err("field arguments in method declare")).map_err(failure)?; - Ok((input, Class::Exchange(Exchange::Declare { - reserved_1, - exchange, - r#type, - passive, - durable, - reserved_2, - reserved_3, - no_wait, - arguments, - }))) -} -fn exchange_declare_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Exchange(Exchange::DeclareOk { - }))) -} -fn exchange_delete(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method delete")).map_err(failure)?; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method delete")).map_err(failure)?; - if exchange.is_empty() { fail!("string was null for field exchange") } - let (input, bits) = bit(input, 2).map_err(err("field if-unused in method delete")).map_err(failure)?; - let if_unused = bits[0]; - let no_wait = bits[1]; - Ok((input, Class::Exchange(Exchange::Delete { - reserved_1, - exchange, - if_unused, - no_wait, - }))) -} -fn exchange_delete_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Exchange(Exchange::DeleteOk { - }))) -} -fn queue(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(50_u16.to_be_bytes())(input).map_err(err("invalid tag for class queue"))?; - alt((queue_declare, queue_declare_ok, queue_bind, queue_bind_ok, queue_unbind, queue_unbind_ok, queue_purge, queue_purge_ok, queue_delete, queue_delete_ok))(input).map_err(err("class queue")).map_err(failure) -} -fn queue_declare(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method declare")).map_err(failure)?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method declare")).map_err(failure)?; - let (input, bits) = bit(input, 5).map_err(err("field passive in method declare")).map_err(failure)?; - let passive = bits[0]; - let durable = bits[1]; - let exclusive = bits[2]; - let auto_delete = bits[3]; - let no_wait = bits[4]; - let (input, arguments) = domain_table(input).map_err(err("field arguments in method declare")).map_err(failure)?; - Ok((input, Class::Queue(Queue::Declare { - reserved_1, - queue, - passive, - durable, - exclusive, - auto_delete, - no_wait, - arguments, - }))) -} -fn queue_declare_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method declare-ok")).map_err(failure)?; - if queue.is_empty() { fail!("string was null for field queue") } - let (input, message_count) = domain_message_count(input).map_err(err("field message-count in method declare-ok")).map_err(failure)?; - let (input, consumer_count) = domain_long(input).map_err(err("field consumer-count in method declare-ok")).map_err(failure)?; - Ok((input, Class::Queue(Queue::DeclareOk { - queue, - message_count, - consumer_count, - }))) -} -fn queue_bind(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method bind")).map_err(failure)?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method bind")).map_err(failure)?; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method bind")).map_err(failure)?; - let (input, routing_key) = domain_shortstr(input).map_err(err("field routing-key in method bind")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field no-wait in method bind")).map_err(failure)?; - let no_wait = bits[0]; - let (input, arguments) = domain_table(input).map_err(err("field arguments in method bind")).map_err(failure)?; - Ok((input, Class::Queue(Queue::Bind { - reserved_1, - queue, - exchange, - routing_key, - no_wait, - arguments, - }))) -} -fn queue_bind_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Queue(Queue::BindOk { - }))) -} -fn queue_unbind(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(50_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method unbind")).map_err(failure)?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method unbind")).map_err(failure)?; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method unbind")).map_err(failure)?; - let (input, routing_key) = domain_shortstr(input).map_err(err("field routing-key in method unbind")).map_err(failure)?; - let (input, arguments) = domain_table(input).map_err(err("field arguments in method unbind")).map_err(failure)?; - Ok((input, Class::Queue(Queue::Unbind { - reserved_1, - queue, - exchange, - routing_key, - arguments, - }))) -} -fn queue_unbind_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(51_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Queue(Queue::UnbindOk { - }))) -} -fn queue_purge(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(30_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method purge")).map_err(failure)?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method purge")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field no-wait in method purge")).map_err(failure)?; - let no_wait = bits[0]; - Ok((input, Class::Queue(Queue::Purge { - reserved_1, - queue, - no_wait, - }))) -} -fn queue_purge_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(31_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, message_count) = domain_message_count(input).map_err(err("field message-count in method purge-ok")).map_err(failure)?; - Ok((input, Class::Queue(Queue::PurgeOk { - message_count, - }))) -} -fn queue_delete(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(40_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method delete")).map_err(failure)?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method delete")).map_err(failure)?; - let (input, bits) = bit(input, 3).map_err(err("field if-unused in method delete")).map_err(failure)?; - let if_unused = bits[0]; - let if_empty = bits[1]; - let no_wait = bits[2]; - Ok((input, Class::Queue(Queue::Delete { - reserved_1, - queue, - if_unused, - if_empty, - no_wait, - }))) -} -fn queue_delete_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(41_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, message_count) = domain_message_count(input).map_err(err("field message-count in method delete-ok")).map_err(failure)?; - Ok((input, Class::Queue(Queue::DeleteOk { - message_count, - }))) -} -fn basic(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(60_u16.to_be_bytes())(input).map_err(err("invalid tag for class basic"))?; - alt((basic_qos, basic_qos_ok, basic_consume, basic_consume_ok, basic_cancel, basic_cancel_ok, basic_publish, basic_return, basic_deliver, basic_get, basic_get_ok, basic_get_empty, basic_ack, basic_reject, basic_recover_async, basic_recover, basic_recover_ok))(input).map_err(err("class basic")).map_err(failure) -} -fn basic_qos(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, prefetch_size) = domain_long(input).map_err(err("field prefetch-size in method qos")).map_err(failure)?; - let (input, prefetch_count) = domain_short(input).map_err(err("field prefetch-count in method qos")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field global in method qos")).map_err(failure)?; - let global = bits[0]; - Ok((input, Class::Basic(Basic::Qos { - prefetch_size, - prefetch_count, - global, - }))) -} -fn basic_qos_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Basic(Basic::QosOk { - }))) -} -fn basic_consume(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method consume")).map_err(failure)?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method consume")).map_err(failure)?; - let (input, consumer_tag) = domain_consumer_tag(input).map_err(err("field consumer-tag in method consume")).map_err(failure)?; - let (input, bits) = bit(input, 4).map_err(err("field no-local in method consume")).map_err(failure)?; - let no_local = bits[0]; - let no_ack = bits[1]; - let exclusive = bits[2]; - let no_wait = bits[3]; - let (input, arguments) = domain_table(input).map_err(err("field arguments in method consume")).map_err(failure)?; - Ok((input, Class::Basic(Basic::Consume { - reserved_1, - queue, - consumer_tag, - no_local, - no_ack, - exclusive, - no_wait, - arguments, - }))) -} -fn basic_consume_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, consumer_tag) = domain_consumer_tag(input).map_err(err("field consumer-tag in method consume-ok")).map_err(failure)?; - Ok((input, Class::Basic(Basic::ConsumeOk { - consumer_tag, - }))) -} -fn basic_cancel(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(30_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, consumer_tag) = domain_consumer_tag(input).map_err(err("field consumer-tag in method cancel")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field no-wait in method cancel")).map_err(failure)?; - let no_wait = bits[0]; - Ok((input, Class::Basic(Basic::Cancel { - consumer_tag, - no_wait, - }))) -} -fn basic_cancel_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(31_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, consumer_tag) = domain_consumer_tag(input).map_err(err("field consumer-tag in method cancel-ok")).map_err(failure)?; - Ok((input, Class::Basic(Basic::CancelOk { - consumer_tag, - }))) -} -fn basic_publish(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(40_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method publish")).map_err(failure)?; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method publish")).map_err(failure)?; - let (input, routing_key) = domain_shortstr(input).map_err(err("field routing-key in method publish")).map_err(failure)?; - let (input, bits) = bit(input, 2).map_err(err("field mandatory in method publish")).map_err(failure)?; - let mandatory = bits[0]; - let immediate = bits[1]; - Ok((input, Class::Basic(Basic::Publish { - reserved_1, - exchange, - routing_key, - mandatory, - immediate, - }))) -} -fn basic_return(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(50_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reply_code) = domain_reply_code(input).map_err(err("field reply-code in method return")).map_err(failure)?; - let (input, reply_text) = domain_reply_text(input).map_err(err("field reply-text in method return")).map_err(failure)?; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method return")).map_err(failure)?; - let (input, routing_key) = domain_shortstr(input).map_err(err("field routing-key in method return")).map_err(failure)?; - Ok((input, Class::Basic(Basic::Return { - reply_code, - reply_text, - exchange, - routing_key, - }))) -} -fn basic_deliver(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(60_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, consumer_tag) = domain_consumer_tag(input).map_err(err("field consumer-tag in method deliver")).map_err(failure)?; - let (input, delivery_tag) = domain_delivery_tag(input).map_err(err("field delivery-tag in method deliver")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field redelivered in method deliver")).map_err(failure)?; - let redelivered = bits[0]; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method deliver")).map_err(failure)?; - let (input, routing_key) = domain_shortstr(input).map_err(err("field routing-key in method deliver")).map_err(failure)?; - Ok((input, Class::Basic(Basic::Deliver { - consumer_tag, - delivery_tag, - redelivered, - exchange, - routing_key, - }))) -} -fn basic_get(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(70_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_short(input).map_err(err("field reserved-1 in method get")).map_err(failure)?; - let (input, queue) = domain_queue_name(input).map_err(err("field queue in method get")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field no-ack in method get")).map_err(failure)?; - let no_ack = bits[0]; - Ok((input, Class::Basic(Basic::Get { - reserved_1, - queue, - no_ack, - }))) -} -fn basic_get_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(71_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, delivery_tag) = domain_delivery_tag(input).map_err(err("field delivery-tag in method get-ok")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field redelivered in method get-ok")).map_err(failure)?; - let redelivered = bits[0]; - let (input, exchange) = domain_exchange_name(input).map_err(err("field exchange in method get-ok")).map_err(failure)?; - let (input, routing_key) = domain_shortstr(input).map_err(err("field routing-key in method get-ok")).map_err(failure)?; - let (input, message_count) = domain_message_count(input).map_err(err("field message-count in method get-ok")).map_err(failure)?; - Ok((input, Class::Basic(Basic::GetOk { - delivery_tag, - redelivered, - exchange, - routing_key, - message_count, - }))) -} -fn basic_get_empty(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(72_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, reserved_1) = domain_shortstr(input).map_err(err("field reserved-1 in method get-empty")).map_err(failure)?; - Ok((input, Class::Basic(Basic::GetEmpty { - reserved_1, - }))) -} -fn basic_ack(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(80_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, delivery_tag) = domain_delivery_tag(input).map_err(err("field delivery-tag in method ack")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field multiple in method ack")).map_err(failure)?; - let multiple = bits[0]; - Ok((input, Class::Basic(Basic::Ack { - delivery_tag, - multiple, - }))) -} -fn basic_reject(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(90_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, delivery_tag) = domain_delivery_tag(input).map_err(err("field delivery-tag in method reject")).map_err(failure)?; - let (input, bits) = bit(input, 1).map_err(err("field requeue in method reject")).map_err(failure)?; - let requeue = bits[0]; - Ok((input, Class::Basic(Basic::Reject { - delivery_tag, - requeue, - }))) -} -fn basic_recover_async(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(100_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, bits) = bit(input, 1).map_err(err("field requeue in method recover-async")).map_err(failure)?; - let requeue = bits[0]; - Ok((input, Class::Basic(Basic::RecoverAsync { - requeue, - }))) -} -fn basic_recover(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(110_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - let (input, bits) = bit(input, 1).map_err(err("field requeue in method recover")).map_err(failure)?; - let requeue = bits[0]; - Ok((input, Class::Basic(Basic::Recover { - requeue, - }))) -} -fn basic_recover_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(111_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Basic(Basic::RecoverOk { - }))) -} -fn tx(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(90_u16.to_be_bytes())(input).map_err(err("invalid tag for class tx"))?; - alt((tx_select, tx_select_ok, tx_commit, tx_commit_ok, tx_rollback, tx_rollback_ok))(input).map_err(err("class tx")).map_err(failure) -} -fn tx_select(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(10_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Tx(Tx::Select { - }))) -} -fn tx_select_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(11_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Tx(Tx::SelectOk { - }))) -} -fn tx_commit(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(20_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Tx(Tx::Commit { - }))) -} -fn tx_commit_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(21_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Tx(Tx::CommitOk { - }))) -} -fn tx_rollback(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(30_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Tx(Tx::Rollback { - }))) -} -fn tx_rollback_ok(input: &[u8]) -> IResult<'_, Class> { - let (input, _) = tag(31_u16.to_be_bytes())(input).map_err(err("parsing method index"))?; - Ok((input, Class::Tx(Tx::RollbackOk { - }))) -} + pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; + pub fn parse_method(input: &[u8]) -> Result<(&[u8], Method), nom::Err> { + alt((connection, channel, exchange, queue, basic, tx))(input) + } + fn domain_class_id(input: &[u8]) -> IResult<'_, ClassId> { + short(input) + } + fn domain_consumer_tag(input: &[u8]) -> IResult<'_, ConsumerTag> { + shortstr(input) + } + fn domain_delivery_tag(input: &[u8]) -> IResult<'_, DeliveryTag> { + longlong(input) + } + fn domain_exchange_name(input: &[u8]) -> IResult<'_, ExchangeName> { + let (input, result) = shortstr(input)?; + if result.len() > 127 { + fail!("value is shorter than 127 for field result") + } + static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); + if !REGEX.is_match(&result) { + fail!(r"regex `^[a-zA-Z0-9-_.:]*$` did not match value for field result") + } + Ok((input, result)) + } + fn domain_method_id(input: &[u8]) -> IResult<'_, MethodId> { + short(input) + } + fn domain_path(input: &[u8]) -> IResult<'_, Path> { + let (input, result) = shortstr(input)?; + if result.is_empty() { + fail!("string was null for field result") + } + if result.len() > 127 { + fail!("value is shorter than 127 for field result") + } + Ok((input, result)) + } + fn domain_peer_properties(input: &[u8]) -> IResult<'_, PeerProperties> { + table(input) + } + fn domain_queue_name(input: &[u8]) -> IResult<'_, QueueName> { + let (input, result) = shortstr(input)?; + if result.len() > 127 { + fail!("value is shorter than 127 for field result") + } + static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); + if !REGEX.is_match(&result) { + fail!(r"regex `^[a-zA-Z0-9-_.:]*$` did not match value for field result") + } + Ok((input, result)) + } + fn domain_message_count(input: &[u8]) -> IResult<'_, MessageCount> { + long(input) + } + fn domain_reply_code(input: &[u8]) -> IResult<'_, ReplyCode> { + let (input, result) = short(input)?; + if result == 0 { + fail!("number was 0 for field result") + } + Ok((input, result)) + } + fn domain_reply_text(input: &[u8]) -> IResult<'_, ReplyText> { + let (input, result) = shortstr(input)?; + if result.is_empty() { + fail!("string was null for field result") + } + Ok((input, result)) + } + fn domain_octet(input: &[u8]) -> IResult<'_, Octet> { + octet(input) + } + fn domain_short(input: &[u8]) -> IResult<'_, Short> { + short(input) + } + fn domain_long(input: &[u8]) -> IResult<'_, Long> { + long(input) + } + fn domain_longlong(input: &[u8]) -> IResult<'_, Longlong> { + longlong(input) + } + fn domain_shortstr(input: &[u8]) -> IResult<'_, Shortstr> { + shortstr(input) + } + fn domain_longstr(input: &[u8]) -> IResult<'_, Longstr> { + longstr(input) + } + fn domain_timestamp(input: &[u8]) -> IResult<'_, Timestamp> { + timestamp(input) + } + fn domain_table(input: &[u8]) -> IResult<'_, Table> { + table(input) + } + fn connection(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = tag(10_u16.to_be_bytes())(input) + .map_err(fail_err("invalid tag for class connection"))?; + alt(( + connection_start, + connection_start_ok, + connection_secure, + connection_secure_ok, + connection_tune, + connection_tune_ok, + connection_open, + connection_open_ok, + connection_close, + connection_close_ok, + ))(input) + .map_err(fail_err("class connection")) + } + fn connection_start(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(10_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, version_major) = domain_octet(input) + .map_err(fail_err("field version-major in method start")) + .map_err(failure)?; + let (input, version_minor) = domain_octet(input) + .map_err(fail_err("field version-minor in method start")) + .map_err(failure)?; + let (input, server_properties) = domain_peer_properties(input) + .map_err(fail_err("field server-properties in method start")) + .map_err(failure)?; + let (input, mechanisms) = domain_longstr(input) + .map_err(fail_err("field mechanisms in method start")) + .map_err(failure)?; + if mechanisms.is_empty() { + fail!("string was null for field mechanisms") + } + let (input, locales) = domain_longstr(input) + .map_err(fail_err("field locales in method start")) + .map_err(failure)?; + if locales.is_empty() { + fail!("string was null for field locales") + } + Ok(( + input, + Method::ConnectionStart { + version_major, + version_minor, + server_properties, + mechanisms, + locales, + }, + )) + } + fn connection_start_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(11_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, client_properties) = domain_peer_properties(input) + .map_err(fail_err("field client-properties in method start-ok")) + .map_err(failure)?; + let (input, mechanism) = domain_shortstr(input) + .map_err(fail_err("field mechanism in method start-ok")) + .map_err(failure)?; + if mechanism.is_empty() { + fail!("string was null for field mechanism") + } + let (input, response) = domain_longstr(input) + .map_err(fail_err("field response in method start-ok")) + .map_err(failure)?; + if response.is_empty() { + fail!("string was null for field response") + } + let (input, locale) = domain_shortstr(input) + .map_err(fail_err("field locale in method start-ok")) + .map_err(failure)?; + if locale.is_empty() { + fail!("string was null for field locale") + } + Ok(( + input, + Method::ConnectionStartOk { + client_properties, + mechanism, + response, + locale, + }, + )) + } + fn connection_secure(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(20_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, challenge) = domain_longstr(input) + .map_err(fail_err("field challenge in method secure")) + .map_err(failure)?; + Ok((input, Method::ConnectionSecure { challenge })) + } + fn connection_secure_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(21_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, response) = domain_longstr(input) + .map_err(fail_err("field response in method secure-ok")) + .map_err(failure)?; + if response.is_empty() { + fail!("string was null for field response") + } + Ok((input, Method::ConnectionSecureOk { response })) + } + fn connection_tune(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(30_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, channel_max) = domain_short(input) + .map_err(fail_err("field channel-max in method tune")) + .map_err(failure)?; + let (input, frame_max) = domain_long(input) + .map_err(fail_err("field frame-max in method tune")) + .map_err(failure)?; + let (input, heartbeat) = domain_short(input) + .map_err(fail_err("field heartbeat in method tune")) + .map_err(failure)?; + Ok(( + input, + Method::ConnectionTune { + channel_max, + frame_max, + heartbeat, + }, + )) + } + fn connection_tune_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(31_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, channel_max) = domain_short(input) + .map_err(fail_err("field channel-max in method tune-ok")) + .map_err(failure)?; + if channel_max == 0 { + fail!("number was 0 for field channel_max") + } + let (input, frame_max) = domain_long(input) + .map_err(fail_err("field frame-max in method tune-ok")) + .map_err(failure)?; + let (input, heartbeat) = domain_short(input) + .map_err(fail_err("field heartbeat in method tune-ok")) + .map_err(failure)?; + Ok(( + input, + Method::ConnectionTuneOk { + channel_max, + frame_max, + heartbeat, + }, + )) + } + fn connection_open(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(40_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, virtual_host) = domain_path(input) + .map_err(fail_err("field virtual-host in method open")) + .map_err(failure)?; + let (input, reserved_1) = domain_shortstr(input) + .map_err(fail_err("field reserved-1 in method open")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(fail_err("field reserved-2 in method open")) + .map_err(failure)?; + let reserved_2 = bits[0]; + Ok(( + input, + Method::ConnectionOpen { + virtual_host, + reserved_1, + reserved_2, + }, + )) + } + fn connection_open_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(41_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reserved_1) = domain_shortstr(input) + .map_err(fail_err("field reserved-1 in method open-ok")) + .map_err(failure)?; + Ok((input, Method::ConnectionOpenOk { reserved_1 })) + } + fn connection_close(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(50_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reply_code) = domain_reply_code(input) + .map_err(fail_err("field reply-code in method close")) + .map_err(failure)?; + let (input, reply_text) = domain_reply_text(input) + .map_err(fail_err("field reply-text in method close")) + .map_err(failure)?; + let (input, class_id) = domain_class_id(input) + .map_err(fail_err("field class-id in method close")) + .map_err(failure)?; + let (input, method_id) = domain_method_id(input) + .map_err(fail_err("field method-id in method close")) + .map_err(failure)?; + Ok(( + input, + Method::ConnectionClose { + reply_code, + reply_text, + class_id, + method_id, + }, + )) + } + fn connection_close_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(51_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + Ok((input, Method::ConnectionCloseOk {})) + } + fn channel(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(20_u16.to_be_bytes())(input).map_err(fail_err("invalid tag for class channel"))?; + alt(( + channel_open, + channel_open_ok, + channel_flow, + channel_flow_ok, + channel_close, + channel_close_ok, + ))(input) + .map_err(fail_err("class channel")) + } + fn channel_open(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(10_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reserved_1) = domain_shortstr(input) + .map_err(fail_err("field reserved-1 in method open")) + .map_err(failure)?; + Ok((input, Method::ChannelOpen { reserved_1 })) + } + fn channel_open_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(11_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reserved_1) = domain_longstr(input) + .map_err(fail_err("field reserved-1 in method open-ok")) + .map_err(failure)?; + Ok((input, Method::ChannelOpenOk { reserved_1 })) + } + fn channel_flow(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(20_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, bits) = bit(input, 1) + .map_err(fail_err("field active in method flow")) + .map_err(failure)?; + let active = bits[0]; + Ok((input, Method::ChannelFlow { active })) + } + fn channel_flow_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(21_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, bits) = bit(input, 1) + .map_err(fail_err("field active in method flow-ok")) + .map_err(failure)?; + let active = bits[0]; + Ok((input, Method::ChannelFlowOk { active })) + } + fn channel_close(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(40_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reply_code) = domain_reply_code(input) + .map_err(fail_err("field reply-code in method close")) + .map_err(failure)?; + let (input, reply_text) = domain_reply_text(input) + .map_err(fail_err("field reply-text in method close")) + .map_err(failure)?; + let (input, class_id) = domain_class_id(input) + .map_err(fail_err("field class-id in method close")) + .map_err(failure)?; + let (input, method_id) = domain_method_id(input) + .map_err(fail_err("field method-id in method close")) + .map_err(failure)?; + Ok(( + input, + Method::ChannelClose { + reply_code, + reply_text, + class_id, + method_id, + }, + )) + } + fn channel_close_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(41_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + Ok((input, Method::ChannelCloseOk {})) + } + fn exchange(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(40_u16.to_be_bytes())(input).map_err(fail_err("invalid tag for class exchange"))?; + alt(( + exchange_declare, + exchange_declare_ok, + exchange_delete, + exchange_delete_ok, + ))(input) + .map_err(fail_err("class exchange")) + } + fn exchange_declare(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(10_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(fail_err("field reserved-1 in method declare")) + .map_err(failure)?; + let (input, exchange) = domain_exchange_name(input) + .map_err(fail_err("field exchange in method declare")) + .map_err(failure)?; + if exchange.is_empty() { + fail!("string was null for field exchange") + } + let (input, r#type) = domain_shortstr(input) + .map_err(fail_err("field type in method declare")) + .map_err(failure)?; + let (input, bits) = bit(input, 5) + .map_err(fail_err("field passive in method declare")) + .map_err(failure)?; + let passive = bits[0]; + let durable = bits[1]; + let reserved_2 = bits[2]; + let reserved_3 = bits[3]; + let no_wait = bits[4]; + let (input, arguments) = domain_table(input) + .map_err(fail_err("field arguments in method declare")) + .map_err(failure)?; + Ok(( + input, + Method::ExchangeDeclare { + reserved_1, + exchange, + r#type, + passive, + durable, + reserved_2, + reserved_3, + no_wait, + arguments, + }, + )) + } + fn exchange_declare_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(11_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + Ok((input, Method::ExchangeDeclareOk {})) + } + fn exchange_delete(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(20_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(fail_err("field reserved-1 in method delete")) + .map_err(failure)?; + let (input, exchange) = domain_exchange_name(input) + .map_err(fail_err("field exchange in method delete")) + .map_err(failure)?; + if exchange.is_empty() { + fail!("string was null for field exchange") + } + let (input, bits) = bit(input, 2) + .map_err(fail_err("field if-unused in method delete")) + .map_err(failure)?; + let if_unused = bits[0]; + let no_wait = bits[1]; + Ok(( + input, + Method::ExchangeDelete { + reserved_1, + exchange, + if_unused, + no_wait, + }, + )) + } + fn exchange_delete_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(21_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + Ok((input, Method::ExchangeDeleteOk {})) + } + fn queue(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(50_u16.to_be_bytes())(input).map_err(fail_err("invalid tag for class queue"))?; + alt(( + queue_declare, + queue_declare_ok, + queue_bind, + queue_bind_ok, + queue_unbind, + queue_unbind_ok, + queue_purge, + queue_purge_ok, + queue_delete, + queue_delete_ok, + ))(input) + .map_err(fail_err("class queue")) + } + fn queue_declare(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(10_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(fail_err("field reserved-1 in method declare")) + .map_err(failure)?; + let (input, queue) = domain_queue_name(input) + .map_err(fail_err("field queue in method declare")) + .map_err(failure)?; + let (input, bits) = bit(input, 5) + .map_err(fail_err("field passive in method declare")) + .map_err(failure)?; + let passive = bits[0]; + let durable = bits[1]; + let exclusive = bits[2]; + let auto_delete = bits[3]; + let no_wait = bits[4]; + let (input, arguments) = domain_table(input) + .map_err(fail_err("field arguments in method declare")) + .map_err(failure)?; + Ok(( + input, + Method::QueueDeclare { + reserved_1, + queue, + passive, + durable, + exclusive, + auto_delete, + no_wait, + arguments, + }, + )) + } + fn queue_declare_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(11_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, queue) = domain_queue_name(input) + .map_err(fail_err("field queue in method declare-ok")) + .map_err(failure)?; + if queue.is_empty() { + fail!("string was null for field queue") + } + let (input, message_count) = domain_message_count(input) + .map_err(fail_err("field message-count in method declare-ok")) + .map_err(failure)?; + let (input, consumer_count) = domain_long(input) + .map_err(fail_err("field consumer-count in method declare-ok")) + .map_err(failure)?; + Ok(( + input, + Method::QueueDeclareOk { + queue, + message_count, + consumer_count, + }, + )) + } + fn queue_bind(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(20_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(fail_err("field reserved-1 in method bind")) + .map_err(failure)?; + let (input, queue) = domain_queue_name(input) + .map_err(fail_err("field queue in method bind")) + .map_err(failure)?; + let (input, exchange) = domain_exchange_name(input) + .map_err(fail_err("field exchange in method bind")) + .map_err(failure)?; + let (input, routing_key) = domain_shortstr(input) + .map_err(fail_err("field routing-key in method bind")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(fail_err("field no-wait in method bind")) + .map_err(failure)?; + let no_wait = bits[0]; + let (input, arguments) = domain_table(input) + .map_err(fail_err("field arguments in method bind")) + .map_err(failure)?; + Ok(( + input, + Method::QueueBind { + reserved_1, + queue, + exchange, + routing_key, + no_wait, + arguments, + }, + )) + } + fn queue_bind_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(21_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + Ok((input, Method::QueueBindOk {})) + } + fn queue_unbind(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(50_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(fail_err("field reserved-1 in method unbind")) + .map_err(failure)?; + let (input, queue) = domain_queue_name(input) + .map_err(fail_err("field queue in method unbind")) + .map_err(failure)?; + let (input, exchange) = domain_exchange_name(input) + .map_err(fail_err("field exchange in method unbind")) + .map_err(failure)?; + let (input, routing_key) = domain_shortstr(input) + .map_err(fail_err("field routing-key in method unbind")) + .map_err(failure)?; + let (input, arguments) = domain_table(input) + .map_err(fail_err("field arguments in method unbind")) + .map_err(failure)?; + Ok(( + input, + Method::QueueUnbind { + reserved_1, + queue, + exchange, + routing_key, + arguments, + }, + )) + } + fn queue_unbind_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(51_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + Ok((input, Method::QueueUnbindOk {})) + } + fn queue_purge(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(30_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(fail_err("field reserved-1 in method purge")) + .map_err(failure)?; + let (input, queue) = domain_queue_name(input) + .map_err(fail_err("field queue in method purge")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(fail_err("field no-wait in method purge")) + .map_err(failure)?; + let no_wait = bits[0]; + Ok(( + input, + Method::QueuePurge { + reserved_1, + queue, + no_wait, + }, + )) + } + fn queue_purge_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(31_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, message_count) = domain_message_count(input) + .map_err(fail_err("field message-count in method purge-ok")) + .map_err(failure)?; + Ok((input, Method::QueuePurgeOk { message_count })) + } + fn queue_delete(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(40_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(fail_err("field reserved-1 in method delete")) + .map_err(failure)?; + let (input, queue) = domain_queue_name(input) + .map_err(fail_err("field queue in method delete")) + .map_err(failure)?; + let (input, bits) = bit(input, 3) + .map_err(fail_err("field if-unused in method delete")) + .map_err(failure)?; + let if_unused = bits[0]; + let if_empty = bits[1]; + let no_wait = bits[2]; + Ok(( + input, + Method::QueueDelete { + reserved_1, + queue, + if_unused, + if_empty, + no_wait, + }, + )) + } + fn queue_delete_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(41_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, message_count) = domain_message_count(input) + .map_err(fail_err("field message-count in method delete-ok")) + .map_err(failure)?; + Ok((input, Method::QueueDeleteOk { message_count })) + } + fn basic(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(60_u16.to_be_bytes())(input).map_err(fail_err("invalid tag for class basic"))?; + alt(( + basic_qos, + basic_qos_ok, + basic_consume, + basic_consume_ok, + basic_cancel, + basic_cancel_ok, + basic_publish, + basic_return, + basic_deliver, + basic_get, + basic_get_ok, + basic_get_empty, + basic_ack, + basic_reject, + basic_recover_async, + basic_recover, + basic_recover_ok, + ))(input) + .map_err(fail_err("class basic")) + } + fn basic_qos(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(10_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, prefetch_size) = domain_long(input) + .map_err(fail_err("field prefetch-size in method qos")) + .map_err(failure)?; + let (input, prefetch_count) = domain_short(input) + .map_err(fail_err("field prefetch-count in method qos")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(fail_err("field global in method qos")) + .map_err(failure)?; + let global = bits[0]; + Ok(( + input, + Method::BasicQos { + prefetch_size, + prefetch_count, + global, + }, + )) + } + fn basic_qos_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(11_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + Ok((input, Method::BasicQosOk {})) + } + fn basic_consume(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(20_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(fail_err("field reserved-1 in method consume")) + .map_err(failure)?; + let (input, queue) = domain_queue_name(input) + .map_err(fail_err("field queue in method consume")) + .map_err(failure)?; + let (input, consumer_tag) = domain_consumer_tag(input) + .map_err(fail_err("field consumer-tag in method consume")) + .map_err(failure)?; + let (input, bits) = bit(input, 4) + .map_err(fail_err("field no-local in method consume")) + .map_err(failure)?; + let no_local = bits[0]; + let no_ack = bits[1]; + let exclusive = bits[2]; + let no_wait = bits[3]; + let (input, arguments) = domain_table(input) + .map_err(fail_err("field arguments in method consume")) + .map_err(failure)?; + Ok(( + input, + Method::BasicConsume { + reserved_1, + queue, + consumer_tag, + no_local, + no_ack, + exclusive, + no_wait, + arguments, + }, + )) + } + fn basic_consume_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(21_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, consumer_tag) = domain_consumer_tag(input) + .map_err(fail_err("field consumer-tag in method consume-ok")) + .map_err(failure)?; + Ok((input, Method::BasicConsumeOk { consumer_tag })) + } + fn basic_cancel(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(30_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, consumer_tag) = domain_consumer_tag(input) + .map_err(fail_err("field consumer-tag in method cancel")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(fail_err("field no-wait in method cancel")) + .map_err(failure)?; + let no_wait = bits[0]; + Ok(( + input, + Method::BasicCancel { + consumer_tag, + no_wait, + }, + )) + } + fn basic_cancel_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(31_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, consumer_tag) = domain_consumer_tag(input) + .map_err(fail_err("field consumer-tag in method cancel-ok")) + .map_err(failure)?; + Ok((input, Method::BasicCancelOk { consumer_tag })) + } + fn basic_publish(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(40_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(fail_err("field reserved-1 in method publish")) + .map_err(failure)?; + let (input, exchange) = domain_exchange_name(input) + .map_err(fail_err("field exchange in method publish")) + .map_err(failure)?; + let (input, routing_key) = domain_shortstr(input) + .map_err(fail_err("field routing-key in method publish")) + .map_err(failure)?; + let (input, bits) = bit(input, 2) + .map_err(fail_err("field mandatory in method publish")) + .map_err(failure)?; + let mandatory = bits[0]; + let immediate = bits[1]; + Ok(( + input, + Method::BasicPublish { + reserved_1, + exchange, + routing_key, + mandatory, + immediate, + }, + )) + } + fn basic_return(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(50_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reply_code) = domain_reply_code(input) + .map_err(fail_err("field reply-code in method return")) + .map_err(failure)?; + let (input, reply_text) = domain_reply_text(input) + .map_err(fail_err("field reply-text in method return")) + .map_err(failure)?; + let (input, exchange) = domain_exchange_name(input) + .map_err(fail_err("field exchange in method return")) + .map_err(failure)?; + let (input, routing_key) = domain_shortstr(input) + .map_err(fail_err("field routing-key in method return")) + .map_err(failure)?; + Ok(( + input, + Method::BasicReturn { + reply_code, + reply_text, + exchange, + routing_key, + }, + )) + } + fn basic_deliver(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(60_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, consumer_tag) = domain_consumer_tag(input) + .map_err(fail_err("field consumer-tag in method deliver")) + .map_err(failure)?; + let (input, delivery_tag) = domain_delivery_tag(input) + .map_err(fail_err("field delivery-tag in method deliver")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(fail_err("field redelivered in method deliver")) + .map_err(failure)?; + let redelivered = bits[0]; + let (input, exchange) = domain_exchange_name(input) + .map_err(fail_err("field exchange in method deliver")) + .map_err(failure)?; + let (input, routing_key) = domain_shortstr(input) + .map_err(fail_err("field routing-key in method deliver")) + .map_err(failure)?; + Ok(( + input, + Method::BasicDeliver { + consumer_tag, + delivery_tag, + redelivered, + exchange, + routing_key, + }, + )) + } + fn basic_get(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(70_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reserved_1) = domain_short(input) + .map_err(fail_err("field reserved-1 in method get")) + .map_err(failure)?; + let (input, queue) = domain_queue_name(input) + .map_err(fail_err("field queue in method get")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(fail_err("field no-ack in method get")) + .map_err(failure)?; + let no_ack = bits[0]; + Ok(( + input, + Method::BasicGet { + reserved_1, + queue, + no_ack, + }, + )) + } + fn basic_get_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(71_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, delivery_tag) = domain_delivery_tag(input) + .map_err(fail_err("field delivery-tag in method get-ok")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(fail_err("field redelivered in method get-ok")) + .map_err(failure)?; + let redelivered = bits[0]; + let (input, exchange) = domain_exchange_name(input) + .map_err(fail_err("field exchange in method get-ok")) + .map_err(failure)?; + let (input, routing_key) = domain_shortstr(input) + .map_err(fail_err("field routing-key in method get-ok")) + .map_err(failure)?; + let (input, message_count) = domain_message_count(input) + .map_err(fail_err("field message-count in method get-ok")) + .map_err(failure)?; + Ok(( + input, + Method::BasicGetOk { + delivery_tag, + redelivered, + exchange, + routing_key, + message_count, + }, + )) + } + fn basic_get_empty(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(72_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, reserved_1) = domain_shortstr(input) + .map_err(fail_err("field reserved-1 in method get-empty")) + .map_err(failure)?; + Ok((input, Method::BasicGetEmpty { reserved_1 })) + } + fn basic_ack(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(80_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, delivery_tag) = domain_delivery_tag(input) + .map_err(fail_err("field delivery-tag in method ack")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(fail_err("field multiple in method ack")) + .map_err(failure)?; + let multiple = bits[0]; + Ok(( + input, + Method::BasicAck { + delivery_tag, + multiple, + }, + )) + } + fn basic_reject(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(90_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, delivery_tag) = domain_delivery_tag(input) + .map_err(fail_err("field delivery-tag in method reject")) + .map_err(failure)?; + let (input, bits) = bit(input, 1) + .map_err(fail_err("field requeue in method reject")) + .map_err(failure)?; + let requeue = bits[0]; + Ok(( + input, + Method::BasicReject { + delivery_tag, + requeue, + }, + )) + } + fn basic_recover_async(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(100_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, bits) = bit(input, 1) + .map_err(fail_err("field requeue in method recover-async")) + .map_err(failure)?; + let requeue = bits[0]; + Ok((input, Method::BasicRecoverAsync { requeue })) + } + fn basic_recover(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(110_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + let (input, bits) = bit(input, 1) + .map_err(fail_err("field requeue in method recover")) + .map_err(failure)?; + let requeue = bits[0]; + Ok((input, Method::BasicRecover { requeue })) + } + fn basic_recover_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(111_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + Ok((input, Method::BasicRecoverOk {})) + } + fn tx(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(90_u16.to_be_bytes())(input).map_err(fail_err("invalid tag for class tx"))?; + alt(( + tx_select, + tx_select_ok, + tx_commit, + tx_commit_ok, + tx_rollback, + tx_rollback_ok, + ))(input) + .map_err(fail_err("class tx")) + } + fn tx_select(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(10_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + Ok((input, Method::TxSelect {})) + } + fn tx_select_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(11_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + Ok((input, Method::TxSelectOk {})) + } + fn tx_commit(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(20_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + Ok((input, Method::TxCommit {})) + } + fn tx_commit_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(21_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + Ok((input, Method::TxCommitOk {})) + } + fn tx_rollback(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(30_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + Ok((input, Method::TxRollback {})) + } + fn tx_rollback_ok(input: &[u8]) -> IResult<'_, Method> { + let (input, _) = + tag(31_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?; + Ok((input, Method::TxRollbackOk {})) + } } pub mod write { -use super::*; -use crate::classes::write_helper::*; -use crate::error::TransError; -use std::io::Write; + use super::*; + use crate::classes::write_helper::*; + use crate::error::TransError; + use std::io::Write; -pub fn write_method(class: Class, mut writer: W) -> Result<(), TransError> { - match class { - Class::Connection(Connection::Start { - version_major, - version_minor, - server_properties, - mechanisms, - locales, - }) => { - writer.write_all(&[0, 10, 0, 10])?; - octet(version_major, &mut writer)?; - octet(version_minor, &mut writer)?; - table(server_properties, &mut writer)?; - longstr(mechanisms, &mut writer)?; - longstr(locales, &mut writer)?; - } - Class::Connection(Connection::StartOk { - client_properties, - mechanism, - response, - locale, - }) => { - writer.write_all(&[0, 10, 0, 11])?; - table(client_properties, &mut writer)?; - shortstr(mechanism, &mut writer)?; - longstr(response, &mut writer)?; - shortstr(locale, &mut writer)?; - } - Class::Connection(Connection::Secure { - challenge, - }) => { - writer.write_all(&[0, 10, 0, 20])?; - longstr(challenge, &mut writer)?; - } - Class::Connection(Connection::SecureOk { - response, - }) => { - writer.write_all(&[0, 10, 0, 21])?; - longstr(response, &mut writer)?; - } - Class::Connection(Connection::Tune { - channel_max, - frame_max, - heartbeat, - }) => { - writer.write_all(&[0, 10, 0, 30])?; - short(channel_max, &mut writer)?; - long(frame_max, &mut writer)?; - short(heartbeat, &mut writer)?; - } - Class::Connection(Connection::TuneOk { - channel_max, - frame_max, - heartbeat, - }) => { - writer.write_all(&[0, 10, 0, 31])?; - short(channel_max, &mut writer)?; - long(frame_max, &mut writer)?; - short(heartbeat, &mut writer)?; - } - Class::Connection(Connection::Open { - virtual_host, - reserved_1, - reserved_2, - }) => { - writer.write_all(&[0, 10, 0, 40])?; - shortstr(virtual_host, &mut writer)?; - shortstr(reserved_1, &mut writer)?; - bit(&[reserved_2, ], &mut writer)?; - } - Class::Connection(Connection::OpenOk { - reserved_1, - }) => { - writer.write_all(&[0, 10, 0, 41])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Connection(Connection::Close { - reply_code, - reply_text, - class_id, - method_id, - }) => { - writer.write_all(&[0, 10, 0, 50])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - short(class_id, &mut writer)?; - short(method_id, &mut writer)?; - } - Class::Connection(Connection::CloseOk { - }) => { - writer.write_all(&[0, 10, 0, 51])?; - } - Class::Channel(Channel::Open { - reserved_1, - }) => { - writer.write_all(&[0, 20, 0, 10])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Channel(Channel::OpenOk { - reserved_1, - }) => { - writer.write_all(&[0, 20, 0, 11])?; - longstr(reserved_1, &mut writer)?; - } - Class::Channel(Channel::Flow { - active, - }) => { - writer.write_all(&[0, 20, 0, 20])?; - bit(&[active, ], &mut writer)?; - } - Class::Channel(Channel::FlowOk { - active, - }) => { - writer.write_all(&[0, 20, 0, 21])?; - bit(&[active, ], &mut writer)?; - } - Class::Channel(Channel::Close { - reply_code, - reply_text, - class_id, - method_id, - }) => { - writer.write_all(&[0, 20, 0, 40])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - short(class_id, &mut writer)?; - short(method_id, &mut writer)?; - } - Class::Channel(Channel::CloseOk { - }) => { - writer.write_all(&[0, 20, 0, 41])?; - } - Class::Exchange(Exchange::Declare { - reserved_1, - exchange, - r#type, - passive, - durable, - reserved_2, - reserved_3, - no_wait, - arguments, - }) => { - writer.write_all(&[0, 40, 0, 10])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(r#type, &mut writer)?; - bit(&[passive, durable, reserved_2, reserved_3, no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Exchange(Exchange::DeclareOk { - }) => { - writer.write_all(&[0, 40, 0, 11])?; - } - Class::Exchange(Exchange::Delete { - reserved_1, - exchange, - if_unused, - no_wait, - }) => { - writer.write_all(&[0, 40, 0, 20])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - bit(&[if_unused, no_wait, ], &mut writer)?; - } - Class::Exchange(Exchange::DeleteOk { - }) => { - writer.write_all(&[0, 40, 0, 21])?; - } - Class::Queue(Queue::Declare { - reserved_1, - queue, - passive, - durable, - exclusive, - auto_delete, - no_wait, - arguments, - }) => { - writer.write_all(&[0, 50, 0, 10])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[passive, durable, exclusive, auto_delete, no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Queue(Queue::DeclareOk { - queue, - message_count, - consumer_count, - }) => { - writer.write_all(&[0, 50, 0, 11])?; - shortstr(queue, &mut writer)?; - long(message_count, &mut writer)?; - long(consumer_count, &mut writer)?; - } - Class::Queue(Queue::Bind { - reserved_1, - queue, - exchange, - routing_key, - no_wait, - arguments, - }) => { - writer.write_all(&[0, 50, 0, 20])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - bit(&[no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Queue(Queue::BindOk { - }) => { - writer.write_all(&[0, 50, 0, 21])?; - } - Class::Queue(Queue::Unbind { - reserved_1, - queue, - exchange, - routing_key, - arguments, - }) => { - writer.write_all(&[0, 50, 0, 50])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Queue(Queue::UnbindOk { - }) => { - writer.write_all(&[0, 50, 0, 51])?; - } - Class::Queue(Queue::Purge { - reserved_1, - queue, - no_wait, - }) => { - writer.write_all(&[0, 50, 0, 30])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[no_wait, ], &mut writer)?; - } - Class::Queue(Queue::PurgeOk { - message_count, - }) => { - writer.write_all(&[0, 50, 0, 31])?; - long(message_count, &mut writer)?; - } - Class::Queue(Queue::Delete { - reserved_1, - queue, - if_unused, - if_empty, - no_wait, - }) => { - writer.write_all(&[0, 50, 0, 40])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[if_unused, if_empty, no_wait, ], &mut writer)?; - } - Class::Queue(Queue::DeleteOk { - message_count, - }) => { - writer.write_all(&[0, 50, 0, 41])?; - long(message_count, &mut writer)?; - } - Class::Basic(Basic::Qos { - prefetch_size, - prefetch_count, - global, - }) => { - writer.write_all(&[0, 60, 0, 10])?; - long(prefetch_size, &mut writer)?; - short(prefetch_count, &mut writer)?; - bit(&[global, ], &mut writer)?; - } - Class::Basic(Basic::QosOk { - }) => { - writer.write_all(&[0, 60, 0, 11])?; - } - Class::Basic(Basic::Consume { - reserved_1, - queue, - consumer_tag, - no_local, - no_ack, - exclusive, - no_wait, - arguments, - }) => { - writer.write_all(&[0, 60, 0, 20])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - shortstr(consumer_tag, &mut writer)?; - bit(&[no_local, no_ack, exclusive, no_wait, ], &mut writer)?; - table(arguments, &mut writer)?; - } - Class::Basic(Basic::ConsumeOk { - consumer_tag, - }) => { - writer.write_all(&[0, 60, 0, 21])?; - shortstr(consumer_tag, &mut writer)?; - } - Class::Basic(Basic::Cancel { - consumer_tag, - no_wait, - }) => { - writer.write_all(&[0, 60, 0, 30])?; - shortstr(consumer_tag, &mut writer)?; - bit(&[no_wait, ], &mut writer)?; - } - Class::Basic(Basic::CancelOk { - consumer_tag, - }) => { - writer.write_all(&[0, 60, 0, 31])?; - shortstr(consumer_tag, &mut writer)?; - } - Class::Basic(Basic::Publish { - reserved_1, - exchange, - routing_key, - mandatory, - immediate, - }) => { - writer.write_all(&[0, 60, 0, 40])?; - short(reserved_1, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - bit(&[mandatory, immediate, ], &mut writer)?; - } - Class::Basic(Basic::Return { - reply_code, - reply_text, - exchange, - routing_key, - }) => { - writer.write_all(&[0, 60, 0, 50])?; - short(reply_code, &mut writer)?; - shortstr(reply_text, &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - } - Class::Basic(Basic::Deliver { - consumer_tag, - delivery_tag, - redelivered, - exchange, - routing_key, - }) => { - writer.write_all(&[0, 60, 0, 60])?; - shortstr(consumer_tag, &mut writer)?; - longlong(delivery_tag, &mut writer)?; - bit(&[redelivered, ], &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - } - Class::Basic(Basic::Get { - reserved_1, - queue, - no_ack, - }) => { - writer.write_all(&[0, 60, 0, 70])?; - short(reserved_1, &mut writer)?; - shortstr(queue, &mut writer)?; - bit(&[no_ack, ], &mut writer)?; - } - Class::Basic(Basic::GetOk { - delivery_tag, - redelivered, - exchange, - routing_key, - message_count, - }) => { - writer.write_all(&[0, 60, 0, 71])?; - longlong(delivery_tag, &mut writer)?; - bit(&[redelivered, ], &mut writer)?; - shortstr(exchange, &mut writer)?; - shortstr(routing_key, &mut writer)?; - long(message_count, &mut writer)?; - } - Class::Basic(Basic::GetEmpty { - reserved_1, - }) => { - writer.write_all(&[0, 60, 0, 72])?; - shortstr(reserved_1, &mut writer)?; - } - Class::Basic(Basic::Ack { - delivery_tag, - multiple, - }) => { - writer.write_all(&[0, 60, 0, 80])?; - longlong(delivery_tag, &mut writer)?; - bit(&[multiple, ], &mut writer)?; - } - Class::Basic(Basic::Reject { - delivery_tag, - requeue, - }) => { - writer.write_all(&[0, 60, 0, 90])?; - longlong(delivery_tag, &mut writer)?; - bit(&[requeue, ], &mut writer)?; - } - Class::Basic(Basic::RecoverAsync { - requeue, - }) => { - writer.write_all(&[0, 60, 0, 100])?; - bit(&[requeue, ], &mut writer)?; - } - Class::Basic(Basic::Recover { - requeue, - }) => { - writer.write_all(&[0, 60, 0, 110])?; - bit(&[requeue, ], &mut writer)?; - } - Class::Basic(Basic::RecoverOk { - }) => { - writer.write_all(&[0, 60, 0, 111])?; - } - Class::Tx(Tx::Select { - }) => { - writer.write_all(&[0, 90, 0, 10])?; - } - Class::Tx(Tx::SelectOk { - }) => { - writer.write_all(&[0, 90, 0, 11])?; - } - Class::Tx(Tx::Commit { - }) => { - writer.write_all(&[0, 90, 0, 20])?; - } - Class::Tx(Tx::CommitOk { - }) => { - writer.write_all(&[0, 90, 0, 21])?; - } - Class::Tx(Tx::Rollback { - }) => { - writer.write_all(&[0, 90, 0, 30])?; - } - Class::Tx(Tx::RollbackOk { - }) => { - writer.write_all(&[0, 90, 0, 31])?; + pub fn write_method(class: Method, mut writer: W) -> Result<(), TransError> { + match class { + Method::ConnectionStart { + version_major, + version_minor, + server_properties, + mechanisms, + locales, + } => { + writer.write_all(&[0, 10, 0, 10])?; + octet(version_major, &mut writer)?; + octet(version_minor, &mut writer)?; + table(server_properties, &mut writer)?; + longstr(mechanisms, &mut writer)?; + longstr(locales, &mut writer)?; + } + Method::ConnectionStartOk { + client_properties, + mechanism, + response, + locale, + } => { + writer.write_all(&[0, 10, 0, 11])?; + table(client_properties, &mut writer)?; + shortstr(mechanism, &mut writer)?; + longstr(response, &mut writer)?; + shortstr(locale, &mut writer)?; + } + Method::ConnectionSecure { challenge } => { + writer.write_all(&[0, 10, 0, 20])?; + longstr(challenge, &mut writer)?; + } + Method::ConnectionSecureOk { response } => { + writer.write_all(&[0, 10, 0, 21])?; + longstr(response, &mut writer)?; + } + Method::ConnectionTune { + channel_max, + frame_max, + heartbeat, + } => { + writer.write_all(&[0, 10, 0, 30])?; + short(channel_max, &mut writer)?; + long(frame_max, &mut writer)?; + short(heartbeat, &mut writer)?; + } + Method::ConnectionTuneOk { + channel_max, + frame_max, + heartbeat, + } => { + writer.write_all(&[0, 10, 0, 31])?; + short(channel_max, &mut writer)?; + long(frame_max, &mut writer)?; + short(heartbeat, &mut writer)?; + } + Method::ConnectionOpen { + virtual_host, + reserved_1, + reserved_2, + } => { + writer.write_all(&[0, 10, 0, 40])?; + shortstr(virtual_host, &mut writer)?; + shortstr(reserved_1, &mut writer)?; + bit(&[reserved_2], &mut writer)?; + } + Method::ConnectionOpenOk { reserved_1 } => { + writer.write_all(&[0, 10, 0, 41])?; + shortstr(reserved_1, &mut writer)?; + } + Method::ConnectionClose { + reply_code, + reply_text, + class_id, + method_id, + } => { + writer.write_all(&[0, 10, 0, 50])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + short(class_id, &mut writer)?; + short(method_id, &mut writer)?; + } + Method::ConnectionCloseOk {} => { + writer.write_all(&[0, 10, 0, 51])?; + } + Method::ChannelOpen { reserved_1 } => { + writer.write_all(&[0, 20, 0, 10])?; + shortstr(reserved_1, &mut writer)?; + } + Method::ChannelOpenOk { reserved_1 } => { + writer.write_all(&[0, 20, 0, 11])?; + longstr(reserved_1, &mut writer)?; + } + Method::ChannelFlow { active } => { + writer.write_all(&[0, 20, 0, 20])?; + bit(&[active], &mut writer)?; + } + Method::ChannelFlowOk { active } => { + writer.write_all(&[0, 20, 0, 21])?; + bit(&[active], &mut writer)?; + } + Method::ChannelClose { + reply_code, + reply_text, + class_id, + method_id, + } => { + writer.write_all(&[0, 20, 0, 40])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + short(class_id, &mut writer)?; + short(method_id, &mut writer)?; + } + Method::ChannelCloseOk {} => { + writer.write_all(&[0, 20, 0, 41])?; + } + Method::ExchangeDeclare { + reserved_1, + exchange, + r#type, + passive, + durable, + reserved_2, + reserved_3, + no_wait, + arguments, + } => { + writer.write_all(&[0, 40, 0, 10])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(r#type, &mut writer)?; + bit( + &[passive, durable, reserved_2, reserved_3, no_wait], + &mut writer, + )?; + table(arguments, &mut writer)?; + } + Method::ExchangeDeclareOk {} => { + writer.write_all(&[0, 40, 0, 11])?; + } + Method::ExchangeDelete { + reserved_1, + exchange, + if_unused, + no_wait, + } => { + writer.write_all(&[0, 40, 0, 20])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + bit(&[if_unused, no_wait], &mut writer)?; + } + Method::ExchangeDeleteOk {} => { + writer.write_all(&[0, 40, 0, 21])?; + } + Method::QueueDeclare { + reserved_1, + queue, + passive, + durable, + exclusive, + auto_delete, + no_wait, + arguments, + } => { + writer.write_all(&[0, 50, 0, 10])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit( + &[passive, durable, exclusive, auto_delete, no_wait], + &mut writer, + )?; + table(arguments, &mut writer)?; + } + Method::QueueDeclareOk { + queue, + message_count, + consumer_count, + } => { + writer.write_all(&[0, 50, 0, 11])?; + shortstr(queue, &mut writer)?; + long(message_count, &mut writer)?; + long(consumer_count, &mut writer)?; + } + Method::QueueBind { + reserved_1, + queue, + exchange, + routing_key, + no_wait, + arguments, + } => { + writer.write_all(&[0, 50, 0, 20])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + bit(&[no_wait], &mut writer)?; + table(arguments, &mut writer)?; + } + Method::QueueBindOk {} => { + writer.write_all(&[0, 50, 0, 21])?; + } + Method::QueueUnbind { + reserved_1, + queue, + exchange, + routing_key, + arguments, + } => { + writer.write_all(&[0, 50, 0, 50])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + table(arguments, &mut writer)?; + } + Method::QueueUnbindOk {} => { + writer.write_all(&[0, 50, 0, 51])?; + } + Method::QueuePurge { + reserved_1, + queue, + no_wait, + } => { + writer.write_all(&[0, 50, 0, 30])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit(&[no_wait], &mut writer)?; + } + Method::QueuePurgeOk { message_count } => { + writer.write_all(&[0, 50, 0, 31])?; + long(message_count, &mut writer)?; + } + Method::QueueDelete { + reserved_1, + queue, + if_unused, + if_empty, + no_wait, + } => { + writer.write_all(&[0, 50, 0, 40])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit(&[if_unused, if_empty, no_wait], &mut writer)?; + } + Method::QueueDeleteOk { message_count } => { + writer.write_all(&[0, 50, 0, 41])?; + long(message_count, &mut writer)?; + } + Method::BasicQos { + prefetch_size, + prefetch_count, + global, + } => { + writer.write_all(&[0, 60, 0, 10])?; + long(prefetch_size, &mut writer)?; + short(prefetch_count, &mut writer)?; + bit(&[global], &mut writer)?; + } + Method::BasicQosOk {} => { + writer.write_all(&[0, 60, 0, 11])?; + } + Method::BasicConsume { + reserved_1, + queue, + consumer_tag, + no_local, + no_ack, + exclusive, + no_wait, + arguments, + } => { + writer.write_all(&[0, 60, 0, 20])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + shortstr(consumer_tag, &mut writer)?; + bit(&[no_local, no_ack, exclusive, no_wait], &mut writer)?; + table(arguments, &mut writer)?; + } + Method::BasicConsumeOk { consumer_tag } => { + writer.write_all(&[0, 60, 0, 21])?; + shortstr(consumer_tag, &mut writer)?; + } + Method::BasicCancel { + consumer_tag, + no_wait, + } => { + writer.write_all(&[0, 60, 0, 30])?; + shortstr(consumer_tag, &mut writer)?; + bit(&[no_wait], &mut writer)?; + } + Method::BasicCancelOk { consumer_tag } => { + writer.write_all(&[0, 60, 0, 31])?; + shortstr(consumer_tag, &mut writer)?; + } + Method::BasicPublish { + reserved_1, + exchange, + routing_key, + mandatory, + immediate, + } => { + writer.write_all(&[0, 60, 0, 40])?; + short(reserved_1, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + bit(&[mandatory, immediate], &mut writer)?; + } + Method::BasicReturn { + reply_code, + reply_text, + exchange, + routing_key, + } => { + writer.write_all(&[0, 60, 0, 50])?; + short(reply_code, &mut writer)?; + shortstr(reply_text, &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + } + Method::BasicDeliver { + consumer_tag, + delivery_tag, + redelivered, + exchange, + routing_key, + } => { + writer.write_all(&[0, 60, 0, 60])?; + shortstr(consumer_tag, &mut writer)?; + longlong(delivery_tag, &mut writer)?; + bit(&[redelivered], &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + } + Method::BasicGet { + reserved_1, + queue, + no_ack, + } => { + writer.write_all(&[0, 60, 0, 70])?; + short(reserved_1, &mut writer)?; + shortstr(queue, &mut writer)?; + bit(&[no_ack], &mut writer)?; + } + Method::BasicGetOk { + delivery_tag, + redelivered, + exchange, + routing_key, + message_count, + } => { + writer.write_all(&[0, 60, 0, 71])?; + longlong(delivery_tag, &mut writer)?; + bit(&[redelivered], &mut writer)?; + shortstr(exchange, &mut writer)?; + shortstr(routing_key, &mut writer)?; + long(message_count, &mut writer)?; + } + Method::BasicGetEmpty { reserved_1 } => { + writer.write_all(&[0, 60, 0, 72])?; + shortstr(reserved_1, &mut writer)?; + } + Method::BasicAck { + delivery_tag, + multiple, + } => { + writer.write_all(&[0, 60, 0, 80])?; + longlong(delivery_tag, &mut writer)?; + bit(&[multiple], &mut writer)?; + } + Method::BasicReject { + delivery_tag, + requeue, + } => { + writer.write_all(&[0, 60, 0, 90])?; + longlong(delivery_tag, &mut writer)?; + bit(&[requeue], &mut writer)?; + } + Method::BasicRecoverAsync { requeue } => { + writer.write_all(&[0, 60, 0, 100])?; + bit(&[requeue], &mut writer)?; + } + Method::BasicRecover { requeue } => { + writer.write_all(&[0, 60, 0, 110])?; + bit(&[requeue], &mut writer)?; + } + Method::BasicRecoverOk {} => { + writer.write_all(&[0, 60, 0, 111])?; + } + Method::TxSelect {} => { + writer.write_all(&[0, 90, 0, 10])?; + } + Method::TxSelectOk {} => { + writer.write_all(&[0, 90, 0, 11])?; + } + Method::TxCommit {} => { + writer.write_all(&[0, 90, 0, 20])?; + } + Method::TxCommitOk {} => { + writer.write_all(&[0, 90, 0, 21])?; + } + Method::TxRollback {} => { + writer.write_all(&[0, 90, 0, 30])?; + } + Method::TxRollbackOk {} => { + writer.write_all(&[0, 90, 0, 31])?; + } } + Ok(()) } - Ok(()) -} } #[cfg(test)] mod random { -use rand::Rng; -use crate::classes::tests::RandomMethod; -use super::*; + use super::*; + use crate::classes::tests::RandomMethod; + use rand::Rng; -impl RandomMethod for Class { - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self { - match rng.gen_range(0u32..6) { - 0 => Class::Connection(Connection::random(rng)), - 1 => Class::Channel(Channel::random(rng)), - 2 => Class::Exchange(Exchange::random(rng)), - 3 => Class::Queue(Queue::random(rng)), - 4 => Class::Basic(Basic::random(rng)), - 5 => Class::Tx(Tx::random(rng)), - _ => unreachable!(), + impl RandomMethod for Method { + #[allow(unused_variables)] + fn random(rng: &mut R) -> Self { + match rng.gen_range(0u32..6) { + 0 => match rng.gen_range(0u32..10) { + 0 => Method::ConnectionStart { + version_major: RandomMethod::random(rng), + version_minor: RandomMethod::random(rng), + server_properties: RandomMethod::random(rng), + mechanisms: RandomMethod::random(rng), + locales: RandomMethod::random(rng), + }, + 1 => Method::ConnectionStartOk { + client_properties: RandomMethod::random(rng), + mechanism: RandomMethod::random(rng), + response: RandomMethod::random(rng), + locale: RandomMethod::random(rng), + }, + 2 => Method::ConnectionSecure { + challenge: RandomMethod::random(rng), + }, + 3 => Method::ConnectionSecureOk { + response: RandomMethod::random(rng), + }, + 4 => Method::ConnectionTune { + channel_max: RandomMethod::random(rng), + frame_max: RandomMethod::random(rng), + heartbeat: RandomMethod::random(rng), + }, + 5 => Method::ConnectionTuneOk { + channel_max: RandomMethod::random(rng), + frame_max: RandomMethod::random(rng), + heartbeat: RandomMethod::random(rng), + }, + 6 => Method::ConnectionOpen { + virtual_host: RandomMethod::random(rng), + reserved_1: RandomMethod::random(rng), + reserved_2: RandomMethod::random(rng), + }, + 7 => Method::ConnectionOpenOk { + reserved_1: RandomMethod::random(rng), + }, + 8 => Method::ConnectionClose { + reply_code: RandomMethod::random(rng), + reply_text: RandomMethod::random(rng), + class_id: RandomMethod::random(rng), + method_id: RandomMethod::random(rng), + }, + 9 => Method::ConnectionCloseOk {}, + _ => unreachable!(), + }, + 1 => match rng.gen_range(0u32..6) { + 0 => Method::ChannelOpen { + reserved_1: RandomMethod::random(rng), + }, + 1 => Method::ChannelOpenOk { + reserved_1: RandomMethod::random(rng), + }, + 2 => Method::ChannelFlow { + active: RandomMethod::random(rng), + }, + 3 => Method::ChannelFlowOk { + active: RandomMethod::random(rng), + }, + 4 => Method::ChannelClose { + reply_code: RandomMethod::random(rng), + reply_text: RandomMethod::random(rng), + class_id: RandomMethod::random(rng), + method_id: RandomMethod::random(rng), + }, + 5 => Method::ChannelCloseOk {}, + _ => unreachable!(), + }, + 2 => match rng.gen_range(0u32..4) { + 0 => Method::ExchangeDeclare { + reserved_1: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + r#type: RandomMethod::random(rng), + passive: RandomMethod::random(rng), + durable: RandomMethod::random(rng), + reserved_2: RandomMethod::random(rng), + reserved_3: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + arguments: RandomMethod::random(rng), + }, + 1 => Method::ExchangeDeclareOk {}, + 2 => Method::ExchangeDelete { + reserved_1: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + if_unused: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + }, + 3 => Method::ExchangeDeleteOk {}, + _ => unreachable!(), + }, + 3 => match rng.gen_range(0u32..10) { + 0 => Method::QueueDeclare { + reserved_1: RandomMethod::random(rng), + queue: RandomMethod::random(rng), + passive: RandomMethod::random(rng), + durable: RandomMethod::random(rng), + exclusive: RandomMethod::random(rng), + auto_delete: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + arguments: RandomMethod::random(rng), + }, + 1 => Method::QueueDeclareOk { + queue: RandomMethod::random(rng), + message_count: RandomMethod::random(rng), + consumer_count: RandomMethod::random(rng), + }, + 2 => Method::QueueBind { + reserved_1: RandomMethod::random(rng), + queue: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + routing_key: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + arguments: RandomMethod::random(rng), + }, + 3 => Method::QueueBindOk {}, + 4 => Method::QueueUnbind { + reserved_1: RandomMethod::random(rng), + queue: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + routing_key: RandomMethod::random(rng), + arguments: RandomMethod::random(rng), + }, + 5 => Method::QueueUnbindOk {}, + 6 => Method::QueuePurge { + reserved_1: RandomMethod::random(rng), + queue: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + }, + 7 => Method::QueuePurgeOk { + message_count: RandomMethod::random(rng), + }, + 8 => Method::QueueDelete { + reserved_1: RandomMethod::random(rng), + queue: RandomMethod::random(rng), + if_unused: RandomMethod::random(rng), + if_empty: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + }, + 9 => Method::QueueDeleteOk { + message_count: RandomMethod::random(rng), + }, + _ => unreachable!(), + }, + 4 => match rng.gen_range(0u32..17) { + 0 => Method::BasicQos { + prefetch_size: RandomMethod::random(rng), + prefetch_count: RandomMethod::random(rng), + global: RandomMethod::random(rng), + }, + 1 => Method::BasicQosOk {}, + 2 => Method::BasicConsume { + reserved_1: RandomMethod::random(rng), + queue: RandomMethod::random(rng), + consumer_tag: RandomMethod::random(rng), + no_local: RandomMethod::random(rng), + no_ack: RandomMethod::random(rng), + exclusive: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + arguments: RandomMethod::random(rng), + }, + 3 => Method::BasicConsumeOk { + consumer_tag: RandomMethod::random(rng), + }, + 4 => Method::BasicCancel { + consumer_tag: RandomMethod::random(rng), + no_wait: RandomMethod::random(rng), + }, + 5 => Method::BasicCancelOk { + consumer_tag: RandomMethod::random(rng), + }, + 6 => Method::BasicPublish { + reserved_1: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + routing_key: RandomMethod::random(rng), + mandatory: RandomMethod::random(rng), + immediate: RandomMethod::random(rng), + }, + 7 => Method::BasicReturn { + reply_code: RandomMethod::random(rng), + reply_text: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + routing_key: RandomMethod::random(rng), + }, + 8 => Method::BasicDeliver { + consumer_tag: RandomMethod::random(rng), + delivery_tag: RandomMethod::random(rng), + redelivered: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + routing_key: RandomMethod::random(rng), + }, + 9 => Method::BasicGet { + reserved_1: RandomMethod::random(rng), + queue: RandomMethod::random(rng), + no_ack: RandomMethod::random(rng), + }, + 10 => Method::BasicGetOk { + delivery_tag: RandomMethod::random(rng), + redelivered: RandomMethod::random(rng), + exchange: RandomMethod::random(rng), + routing_key: RandomMethod::random(rng), + message_count: RandomMethod::random(rng), + }, + 11 => Method::BasicGetEmpty { + reserved_1: RandomMethod::random(rng), + }, + 12 => Method::BasicAck { + delivery_tag: RandomMethod::random(rng), + multiple: RandomMethod::random(rng), + }, + 13 => Method::BasicReject { + delivery_tag: RandomMethod::random(rng), + requeue: RandomMethod::random(rng), + }, + 14 => Method::BasicRecoverAsync { + requeue: RandomMethod::random(rng), + }, + 15 => Method::BasicRecover { + requeue: RandomMethod::random(rng), + }, + 16 => Method::BasicRecoverOk {}, + _ => unreachable!(), + }, + 5 => match rng.gen_range(0u32..6) { + 0 => Method::TxSelect {}, + 1 => Method::TxSelectOk {}, + 2 => Method::TxCommit {}, + 3 => Method::TxCommitOk {}, + 4 => Method::TxRollback {}, + 5 => Method::TxRollbackOk {}, + _ => unreachable!(), + }, + _ => unreachable!(), } + } } } -impl RandomMethod for Connection { - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self { - match rng.gen_range(0u32..10) { - 0 => Connection::Start { - version_major: RandomMethod::random(rng), - version_minor: RandomMethod::random(rng), - server_properties: RandomMethod::random(rng), - mechanisms: RandomMethod::random(rng), - locales: RandomMethod::random(rng), - }, - 1 => Connection::StartOk { - client_properties: RandomMethod::random(rng), - mechanism: RandomMethod::random(rng), - response: RandomMethod::random(rng), - locale: RandomMethod::random(rng), - }, - 2 => Connection::Secure { - challenge: RandomMethod::random(rng), - }, - 3 => Connection::SecureOk { - response: RandomMethod::random(rng), - }, - 4 => Connection::Tune { - channel_max: RandomMethod::random(rng), - frame_max: RandomMethod::random(rng), - heartbeat: RandomMethod::random(rng), - }, - 5 => Connection::TuneOk { - channel_max: RandomMethod::random(rng), - frame_max: RandomMethod::random(rng), - heartbeat: RandomMethod::random(rng), - }, - 6 => Connection::Open { - virtual_host: RandomMethod::random(rng), - reserved_1: RandomMethod::random(rng), - reserved_2: RandomMethod::random(rng), - }, - 7 => Connection::OpenOk { - reserved_1: RandomMethod::random(rng), - }, - 8 => Connection::Close { - reply_code: RandomMethod::random(rng), - reply_text: RandomMethod::random(rng), - class_id: RandomMethod::random(rng), - method_id: RandomMethod::random(rng), - }, - 9 => Connection::CloseOk { - }, - _ => unreachable!(), - } - } -} -impl RandomMethod for Channel { - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self { - match rng.gen_range(0u32..6) { - 0 => Channel::Open { - reserved_1: RandomMethod::random(rng), - }, - 1 => Channel::OpenOk { - reserved_1: RandomMethod::random(rng), - }, - 2 => Channel::Flow { - active: RandomMethod::random(rng), - }, - 3 => Channel::FlowOk { - active: RandomMethod::random(rng), - }, - 4 => Channel::Close { - reply_code: RandomMethod::random(rng), - reply_text: RandomMethod::random(rng), - class_id: RandomMethod::random(rng), - method_id: RandomMethod::random(rng), - }, - 5 => Channel::CloseOk { - }, - _ => unreachable!(), - } - } -} -impl RandomMethod for Exchange { - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self { - match rng.gen_range(0u32..4) { - 0 => Exchange::Declare { - reserved_1: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - r#type: RandomMethod::random(rng), - passive: RandomMethod::random(rng), - durable: RandomMethod::random(rng), - reserved_2: RandomMethod::random(rng), - reserved_3: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - arguments: RandomMethod::random(rng), - }, - 1 => Exchange::DeclareOk { - }, - 2 => Exchange::Delete { - reserved_1: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - if_unused: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - }, - 3 => Exchange::DeleteOk { - }, - _ => unreachable!(), - } - } -} -impl RandomMethod for Queue { - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self { - match rng.gen_range(0u32..10) { - 0 => Queue::Declare { - reserved_1: RandomMethod::random(rng), - queue: RandomMethod::random(rng), - passive: RandomMethod::random(rng), - durable: RandomMethod::random(rng), - exclusive: RandomMethod::random(rng), - auto_delete: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - arguments: RandomMethod::random(rng), - }, - 1 => Queue::DeclareOk { - queue: RandomMethod::random(rng), - message_count: RandomMethod::random(rng), - consumer_count: RandomMethod::random(rng), - }, - 2 => Queue::Bind { - reserved_1: RandomMethod::random(rng), - queue: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - routing_key: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - arguments: RandomMethod::random(rng), - }, - 3 => Queue::BindOk { - }, - 4 => Queue::Unbind { - reserved_1: RandomMethod::random(rng), - queue: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - routing_key: RandomMethod::random(rng), - arguments: RandomMethod::random(rng), - }, - 5 => Queue::UnbindOk { - }, - 6 => Queue::Purge { - reserved_1: RandomMethod::random(rng), - queue: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - }, - 7 => Queue::PurgeOk { - message_count: RandomMethod::random(rng), - }, - 8 => Queue::Delete { - reserved_1: RandomMethod::random(rng), - queue: RandomMethod::random(rng), - if_unused: RandomMethod::random(rng), - if_empty: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - }, - 9 => Queue::DeleteOk { - message_count: RandomMethod::random(rng), - }, - _ => unreachable!(), - } - } -} -impl RandomMethod for Basic { - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self { - match rng.gen_range(0u32..17) { - 0 => Basic::Qos { - prefetch_size: RandomMethod::random(rng), - prefetch_count: RandomMethod::random(rng), - global: RandomMethod::random(rng), - }, - 1 => Basic::QosOk { - }, - 2 => Basic::Consume { - reserved_1: RandomMethod::random(rng), - queue: RandomMethod::random(rng), - consumer_tag: RandomMethod::random(rng), - no_local: RandomMethod::random(rng), - no_ack: RandomMethod::random(rng), - exclusive: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - arguments: RandomMethod::random(rng), - }, - 3 => Basic::ConsumeOk { - consumer_tag: RandomMethod::random(rng), - }, - 4 => Basic::Cancel { - consumer_tag: RandomMethod::random(rng), - no_wait: RandomMethod::random(rng), - }, - 5 => Basic::CancelOk { - consumer_tag: RandomMethod::random(rng), - }, - 6 => Basic::Publish { - reserved_1: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - routing_key: RandomMethod::random(rng), - mandatory: RandomMethod::random(rng), - immediate: RandomMethod::random(rng), - }, - 7 => Basic::Return { - reply_code: RandomMethod::random(rng), - reply_text: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - routing_key: RandomMethod::random(rng), - }, - 8 => Basic::Deliver { - consumer_tag: RandomMethod::random(rng), - delivery_tag: RandomMethod::random(rng), - redelivered: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - routing_key: RandomMethod::random(rng), - }, - 9 => Basic::Get { - reserved_1: RandomMethod::random(rng), - queue: RandomMethod::random(rng), - no_ack: RandomMethod::random(rng), - }, - 10 => Basic::GetOk { - delivery_tag: RandomMethod::random(rng), - redelivered: RandomMethod::random(rng), - exchange: RandomMethod::random(rng), - routing_key: RandomMethod::random(rng), - message_count: RandomMethod::random(rng), - }, - 11 => Basic::GetEmpty { - reserved_1: RandomMethod::random(rng), - }, - 12 => Basic::Ack { - delivery_tag: RandomMethod::random(rng), - multiple: RandomMethod::random(rng), - }, - 13 => Basic::Reject { - delivery_tag: RandomMethod::random(rng), - requeue: RandomMethod::random(rng), - }, - 14 => Basic::RecoverAsync { - requeue: RandomMethod::random(rng), - }, - 15 => Basic::Recover { - requeue: RandomMethod::random(rng), - }, - 16 => Basic::RecoverOk { - }, - _ => unreachable!(), - } - } -} -impl RandomMethod for Tx { - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self { - match rng.gen_range(0u32..6) { - 0 => Tx::Select { - }, - 1 => Tx::SelectOk { - }, - 2 => Tx::Commit { - }, - 3 => Tx::CommitOk { - }, - 4 => Tx::Rollback { - }, - 5 => Tx::RollbackOk { - }, - _ => unreachable!(), - } - } -} -} diff --git a/amqp_transport/src/classes/mod.rs b/amqp_transport/src/classes/mod.rs index 72bfd3f..00c04d4 100644 --- a/amqp_transport/src/classes/mod.rs +++ b/amqp_transport/src/classes/mod.rs @@ -36,7 +36,7 @@ pub enum FieldValue { pub use generated::*; /// Parses the payload of a method frame into the class/method -pub fn parse_method(payload: &[u8]) -> Result { +pub fn parse_method(payload: &[u8]) -> Result { let nom_result = generated::parse::parse_method(payload); match nom_result { diff --git a/amqp_transport/src/classes/parse_helper.rs b/amqp_transport/src/classes/parse_helper.rs index 05e2320..d47c982 100644 --- a/amqp_transport/src/classes/parse_helper.rs +++ b/amqp_transport/src/classes/parse_helper.rs @@ -25,8 +25,7 @@ impl nom::error::ParseError for TransError { } } -// todo: make this into fail_err to avoid useless allocations -pub fn err>(msg: S) -> impl FnOnce(Err) -> Err { +pub fn fail_err>(msg: S) -> impl FnOnce(Err) -> Err { move |err| { let error_level = if matches!(err, nom::Err::Failure(_)) { Err::Failure @@ -156,7 +155,8 @@ pub fn table(input: &[u8]) -> IResult<'_, Table> { fn table_value_pair(input: &[u8]) -> IResult<'_, (TableFieldName, FieldValue)> { let (input, field_name) = shortstr(input)?; - let (input, field_value) = field_value(input).map_err(err(format!("field {field_name}")))?; + let (input, field_value) = + field_value(input).map_err(fail_err(format!("field {field_name}")))?; Ok((input, (field_name, field_value))) } diff --git a/amqp_transport/src/classes/tests.rs b/amqp_transport/src/classes/tests.rs index 55781f9..5c86302 100644 --- a/amqp_transport/src/classes/tests.rs +++ b/amqp_transport/src/classes/tests.rs @@ -1,7 +1,7 @@ // create random methods to test the ser/de code together. if they diverge, we have a bug // this is not perfect, if they both have the same bug it won't be found, but tha's an ok tradeoff -use crate::classes::{Class, FieldValue}; +use crate::classes::{FieldValue, Method}; use rand::{Rng, SeedableRng}; use std::collections::HashMap; @@ -103,7 +103,7 @@ fn random_ser_de() { let mut rng = rand::rngs::StdRng::from_seed([0; 32]); for _ in 0..ITERATIONS { - let class = Class::random(&mut rng); + let class = Method::random(&mut rng); let mut bytes = Vec::new(); if let Err(err) = super::write::write_method(class.clone(), &mut bytes) { diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index b27f015..471d844 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -1,4 +1,4 @@ -use crate::classes::Class; +use crate::classes::Method; use crate::error::{ConException, ProtocolError, Result}; use crate::frame::{Frame, FrameType}; use crate::{classes, frame, sasl}; @@ -88,7 +88,7 @@ impl Connection { self.main_loop().await } - async fn send_method(&mut self, channel: u16, method: classes::Class) -> Result<()> { + async fn send_method(&mut self, channel: u16, method: Method) -> Result<()> { let mut payload = Vec::with_capacity(64); classes::write::write_method(method, &mut payload)?; frame::write_frame( @@ -102,7 +102,7 @@ impl Connection { .await } - async fn recv_method(&mut self) -> Result { + async fn recv_method(&mut self) -> Result { let start_ok_frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?; ensure_conn(start_ok_frame.kind == FrameType::Method)?; @@ -112,7 +112,7 @@ impl Connection { } async fn start(&mut self) -> Result<()> { - let start_method = classes::Class::Connection(classes::Connection::Start { + let start_method = Method::ConnectionStart { version_major: 0, version_minor: 9, server_properties: server_properties( @@ -122,7 +122,7 @@ impl Connection { ), mechanisms: "PLAIN".into(), locales: "en_US".into(), - }); + }; debug!(?start_method, "Sending Start method"); self.send_method(0, start_method).await?; @@ -130,12 +130,12 @@ impl Connection { let start_ok = self.recv_method().await?; debug!(?start_ok, "Received Start-Ok"); - if let classes::Class::Connection(classes::Connection::StartOk { + if let Method::ConnectionStartOk { mechanism, locale, response, .. - }) = start_ok + } = start_ok { ensure_conn(mechanism == "PLAIN")?; ensure_conn(locale == "en_US")?; @@ -149,11 +149,11 @@ impl Connection { } async fn tune(&mut self) -> Result<()> { - let tune_method = classes::Class::Connection(classes::Connection::Tune { + let tune_method = Method::ConnectionTune { channel_max: CHANNEL_MAX, frame_max: FRAME_SIZE_MAX, heartbeat: HEARTBEAT_DELAY, - }); + }; debug!("Sending Tune method"); self.send_method(0, tune_method).await?; @@ -161,11 +161,11 @@ impl Connection { let tune_ok = self.recv_method().await?; debug!(?tune_ok, "Received Tune-Ok method"); - if let classes::Class::Connection(classes::Connection::TuneOk { + if let Method::ConnectionTuneOk { channel_max, frame_max, heartbeat, - }) = tune_ok + } = tune_ok { self.channel_max = channel_max; self.max_frame_size = usize::try_from(frame_max).unwrap(); @@ -180,15 +180,15 @@ impl Connection { let open = self.recv_method().await?; debug!(?open, "Received Open method"); - if let classes::Class::Connection(classes::Connection::Open { virtual_host, .. }) = open { + if let Method::ConnectionOpen { virtual_host, .. } = open { ensure_conn(virtual_host == "/")?; } self.send_method( 0, - classes::Class::Connection(classes::Connection::OpenOk { + Method::ConnectionOpenOk { reserved_1: "".to_string(), - }), + }, ) .await?; @@ -197,23 +197,14 @@ impl Connection { async fn main_loop(&mut self) -> Result<()> { loop { - tokio::select! { - frame = frame::read_frame(&mut self.stream, self.max_frame_size) => { - debug!(?frame); - let frame = frame?; - self.reset_timeout(); + let frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?; + debug!(?frame); + self.reset_timeout(); - match frame.kind { - FrameType::Method => self.dispatch_method(frame).await?, - FrameType::Heartbeat => {} - _ => warn!(frame_type = ?frame.kind, "TODO"), - } - } - _ = &mut self.next_timeout => { - if self.heartbeat_delay != 0 { - return Err(ProtocolError::CloseNow.into()); - } - } + match frame.kind { + FrameType::Method => self.dispatch_method(frame).await?, + FrameType::Heartbeat => {} + _ => warn!(frame_type = ?frame.kind, "TODO"), } } } @@ -223,12 +214,10 @@ impl Connection { debug!(?method, "Received method"); match method { - classes::Class::Connection(classes::Connection::Close { .. }) => { + Method::ConnectionClose { .. } => { // todo: handle closing } - classes::Class::Channel(classes::Channel::Open { .. }) => { - self.channel_open(frame.channel).await? - } + Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?, _ => { // we don't handle this here, forward it to *somewhere* @@ -274,9 +263,9 @@ impl Connection { self.send_method( num, - Class::Channel(classes::Channel::OpenOk { + Method::ChannelOpenOk { reserved_1: Vec::new(), - }), + }, ) .await?; @@ -325,6 +314,18 @@ impl Connection { } } +impl Drop for Connection { + fn drop(&mut self) { + self.connection_handle.lock().close(); + } +} + +impl Drop for Channel { + fn drop(&mut self) { + self.channel_handle.lock().close(); + } +} + fn server_properties(host: SocketAddr) -> classes::Table { fn ls(str: &str) -> classes::FieldValue { classes::FieldValue::LongString(str.into()) diff --git a/amqp_transport/src/tests.rs b/amqp_transport/src/tests.rs index a0a929a..d7298c2 100644 --- a/amqp_transport/src/tests.rs +++ b/amqp_transport/src/tests.rs @@ -1,4 +1,4 @@ -use crate::classes::{Class, Connection, FieldValue}; +use crate::classes::{FieldValue, Method}; use crate::frame::FrameType; use crate::{classes, frame}; use std::collections::HashMap; @@ -6,7 +6,7 @@ use std::collections::HashMap; #[tokio::test] async fn write_start_ok_frame() { let mut payload = Vec::new(); - let method = classes::Class::Connection(classes::Connection::Start { + let method = Method::ConnectionStart { version_major: 0, version_minor: 9, server_properties: HashMap::from([( @@ -15,7 +15,7 @@ async fn write_start_ok_frame() { )]), mechanisms: "PLAIN".into(), locales: "en_US".into(), - }); + }; classes::write::write_method(method, &mut payload).unwrap(); @@ -140,7 +140,7 @@ fn read_start_ok_payload() { assert_eq!( method, - Class::Connection(Connection::StartOk { + Method::ConnectionStartOk { client_properties: HashMap::from([ ( "product".to_string(), @@ -178,6 +178,6 @@ fn read_start_ok_payload() { mechanism: "PLAIN".to_string(), response: "\x00admin\x00".into(), locale: "en_US".to_string() - }) + } ); } diff --git a/src/main.rs b/src/main.rs index 10a137c..31638d4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ async fn main() -> Result<()> { for arg in env::args().skip(1) { match arg.as_str() { + "--debug" => level = Level::DEBUG, "--trace" => level = Level::TRACE, "--dashboard" => dashboard = true, "ignore-this-clippy" => eprintln!("yes please"), diff --git a/xtask/src/codegen/mod.rs b/xtask/src/codegen/mod.rs index 63f44a5..4fde8bd 100644 --- a/xtask/src/codegen/mod.rs +++ b/xtask/src/codegen/mod.rs @@ -123,7 +123,7 @@ pub fn main() { fn codegen(amqp: &Amqp) { println!("#![allow(dead_code)]"); - println!("// This file has been generated by `amqp_codegen`. Do not edit it manually.\n"); + println!("// This file has been generated by `xtask/src/codegen`. Do not edit it manually.\n"); codegen_domain_defs(amqp); codegen_class_defs(amqp); codegen_parser(amqp); @@ -159,22 +159,15 @@ fn codegen_domain_defs(amqp: &Amqp) { fn codegen_class_defs(amqp: &Amqp) { println!("#[derive(Debug, Clone, PartialEq)]"); - println!("pub enum Class {{"); - for class in &amqp.classes { - let class_name = class.name.to_upper_camel_case(); - println!(" {class_name}({class_name}),"); - } - println!("}}\n"); + println!("pub enum Method {{"); for class in &amqp.classes { let enum_name = class.name.to_upper_camel_case(); - doc_comment(&class.doc, 0); - println!("#[derive(Debug, Clone, PartialEq)]"); - println!("pub enum {enum_name} {{"); for method in &class.methods { let method_name = method.name.to_upper_camel_case(); + doc_comment(&class.doc, 4); doc_comment(&method.doc, 4); - print!(" {method_name}"); + print!(" {enum_name}{method_name}"); if !method.fields.is_empty() { println!(" {{"); for field in &method.fields { @@ -197,8 +190,9 @@ fn codegen_class_defs(amqp: &Amqp) { println!(","); } } - println!("}}"); } + + println!("}}\n"); } fn amqp_type_to_rust_type(amqp_type: &str) -> &'static str { diff --git a/xtask/src/codegen/parser.rs b/xtask/src/codegen/parser.rs index 6fc0072..d3e420f 100644 --- a/xtask/src/codegen/parser.rs +++ b/xtask/src/codegen/parser.rs @@ -31,7 +31,7 @@ pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; " ); println!( - "pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err> {{ + "pub fn parse_method(input: &[u8]) -> Result<(&[u8], Method), nom::Err> {{ alt(({}))(input) }}", amqp.classes @@ -47,7 +47,7 @@ pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; for class in &amqp.classes { let class_name = class.name.to_snake_case(); - function(&class_name, "Class", || { + function(&class_name, "Method", || { let class_index = class.index; let all_methods = class .methods @@ -56,8 +56,8 @@ pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; .join(", "); let class_name_raw = &class.name; println!( - r#" let (input, _) = tag({class_index}_u16.to_be_bytes())(input).map_err(err("invalid tag for class {class_name_raw}"))?; - alt(({all_methods}))(input).map_err(err("class {class_name_raw}")).map_err(failure)"# + r#" let (input, _) = tag({class_index}_u16.to_be_bytes())(input).map_err(fail_err("invalid tag for class {class_name_raw}"))?; + alt(({all_methods}))(input).map_err(fail_err("class {class_name_raw}"))"# ); }); @@ -94,10 +94,10 @@ fn method_parser(amqp: &Amqp, class: &Class, method: &Method) { let method_name_raw = &method.name; let function_name = method_function_name(&class_name)(method); - function(&function_name, "Class", || { + function(&function_name, "Method", || { let method_index = method.index; println!( - r#" let (input, _) = tag({method_index}_u16.to_be_bytes())(input).map_err(err("parsing method index"))?;"# + r#" let (input, _) = tag({method_index}_u16.to_be_bytes())(input).map_err(fail_err("parsing method index"))?;"# ); let mut iter = method.fields.iter().peekable(); while let Some(field) = iter.next() { @@ -108,8 +108,9 @@ fn method_parser(amqp: &Amqp, class: &Class, method: &Method) { let fields_with_bit = subsequent_bit_fields(amqp, field, &mut iter); let amount = fields_with_bit.len(); + // todo: remove those map_err(failure) println!( - r#" let (input, bits) = bit(input, {amount}).map_err(err("field {field_name_raw} in method {method_name_raw}")).map_err(failure)?;"# + r#" let (input, bits) = bit(input, {amount}).map_err(fail_err("field {field_name_raw} in method {method_name_raw}")).map_err(failure)?;"# ); for (i, field) in fields_with_bit.iter().enumerate() { @@ -120,7 +121,7 @@ fn method_parser(amqp: &Amqp, class: &Class, method: &Method) { let fn_name = domain_function_name(field_type(field)); let field_name = snake_case(&field.name); println!( - r#" let (input, {field_name}) = {fn_name}(input).map_err(err("field {field_name_raw} in method {method_name_raw}")).map_err(failure)?;"# + r#" let (input, {field_name}) = {fn_name}(input).map_err(fail_err("field {field_name_raw} in method {method_name_raw}")).map_err(failure)?;"# ); for assert in &field.asserts { @@ -130,12 +131,12 @@ fn method_parser(amqp: &Amqp, class: &Class, method: &Method) { } let class_name = class_name.to_upper_camel_case(); let method_name = method.name.to_upper_camel_case(); - println!(" Ok((input, Class::{class_name}({class_name}::{method_name} {{"); + println!(" Ok((input, Method::{class_name}{method_name} {{"); for field in &method.fields { let field_name = snake_case(&field.name); println!(" {field_name},"); } - println!(" }})))"); + println!(" }}))"); }); } diff --git a/xtask/src/codegen/random.rs b/xtask/src/codegen/random.rs index bad07e7..a98d4e5 100644 --- a/xtask/src/codegen/random.rs +++ b/xtask/src/codegen/random.rs @@ -11,40 +11,37 @@ use super::*; " ); - impl_random("Class", || { + impl_random("Method", || { let class_lens = amqp.classes.len(); println!(" match rng.gen_range(0u32..{class_lens}) {{"); for (i, class) in amqp.classes.iter().enumerate() { let class_name = class.name.to_upper_camel_case(); - println!(" {i} => Class::{class_name}({class_name}::random(rng)),"); - } - println!( - " _ => unreachable!(), - }}" - ); - }); + println!(" {i} => {{"); - for class in &amqp.classes { - let class_name = class.name.to_upper_camel_case(); - impl_random(&class_name, || { let method_len = class.methods.len(); - println!(" match rng.gen_range(0u32..{method_len}) {{"); + println!(" match rng.gen_range(0u32..{method_len}) {{"); for (i, method) in class.methods.iter().enumerate() { let method_name = method.name.to_upper_camel_case(); - println!(" {i} => {class_name}::{method_name} {{"); + println!(" {i} => Method::{class_name}{method_name} {{"); for field in &method.fields { let field_name = snake_case(&field.name); - println!(" {field_name}: RandomMethod::random(rng),"); + println!(" {field_name}: RandomMethod::random(rng),"); } - println!(" }},"); + println!(" }},"); } println!( - " _ => unreachable!(), - }}" + " _ => unreachable!(), + }}" ); - }); - } + + println!(" }}"); + } + println!( + " _ => unreachable!(), + }}" + ); + }); println!("}}"); } diff --git a/xtask/src/codegen/write.rs b/xtask/src/codegen/write.rs index 27c37bd..4e22d37 100644 --- a/xtask/src/codegen/write.rs +++ b/xtask/src/codegen/write.rs @@ -9,7 +9,7 @@ use crate::classes::write_helper::*; use crate::error::TransError; use std::io::Write; -pub fn write_method(class: Class, mut writer: W) -> Result<(), TransError> {{ +pub fn write_method(class: Method, mut writer: W) -> Result<(), TransError> {{ match class {{" ); @@ -19,12 +19,12 @@ pub fn write_method(class: Class, mut writer: W) -> Result<(), TransEr for method in &class.methods { let method_name = method.name.to_upper_camel_case(); let method_index = method.index; - println!(" Class::{class_name}({class_name}::{method_name} {{"); + println!(" Method::{class_name}{method_name} {{"); for field in &method.fields { let field_name = snake_case(&field.name); println!(" {field_name},"); } - println!(" }}) => {{"); + println!(" }} => {{"); let [ci0, ci1] = class_index.to_be_bytes(); let [mi0, mi1] = method_index.to_be_bytes(); println!(" writer.write_all(&[{ci0}, {ci1}, {mi0}, {mi1}])?;");