diff --git a/Cargo.lock b/Cargo.lock index 8b3cc6b..04a1a6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,9 +80,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.53" +version = "1.0.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0" +checksum = "7a99269dff3bc004caa411f38845c20303f1e393ca2bd6581576fa3a7f59577d" [[package]] name = "async-trait" @@ -1463,6 +1463,7 @@ checksum = "114ba2b24d2167ef6d67d7d04c8cc86522b87f490025f39f0303b7db5bf5e3d8" name = "xtask" version = "0.1.0" dependencies = [ + "anyhow", "heck", "itertools", "strong-xml", diff --git a/amqp_core/src/lib.rs b/amqp_core/src/lib.rs index e9f7db2..8dfb2f7 100644 --- a/amqp_core/src/lib.rs +++ b/amqp_core/src/lib.rs @@ -1,5 +1,7 @@ #![warn(rust_2018_idioms)] +pub mod methods; + use parking_lot::Mutex; use std::collections::HashMap; use std::net::SocketAddr; diff --git a/amqp_core/src/methods/generated.rs b/amqp_core/src/methods/generated.rs new file mode 100644 index 0000000..bc4f27f --- /dev/null +++ b/amqp_core/src/methods/generated.rs @@ -0,0 +1,786 @@ +#![allow(dead_code)] +// This file has been generated by `xtask/src/codegen`. Do not edit it manually. + +pub type ClassId = u16; + +/// consumer tag +/// +/// Identifier for the consumer, valid within the current channel. +pub type ConsumerTag = String; + +/// server-assigned delivery tag +/// +/// The server-assigned and channel-specific delivery tag +pub type DeliveryTag = u64; + +/// exchange name +/// +/// must be shorter than 127, must match `^[a-zA-Z0-9-_.:]*$` +/// +/// The exchange name is a client-selected string that identifies the exchange for +/// publish methods. +pub type ExchangeName = String; + +pub type MethodId = u16; + +/// no acknowledgement needed +/// +/// If this field is set the server does not expect acknowledgements for +/// messages. That is, when a message is delivered to the client the server +/// assumes the delivery will succeed and immediately dequeues it. This +/// functionality may increase performance but at the cost of reliability. +/// Messages can get lost if a client dies before they are delivered to the +/// application. +pub type NoAck = bool; + +/// do not deliver own messages +/// +/// If the no-local field is set the server will not send messages to the connection that +/// published them. +pub type NoLocal = bool; + +/// do not send reply method +/// +/// If set, the server will not respond to the method. The client should not wait +/// for a reply method. If the server could not complete the method it will raise a +/// channel or connection exception. +pub type NoWait = bool; + +/// must not be null, must be shorter than 127 +/// +/// Unconstrained. +pub type Path = String; + +/// +/// This table provides a set of peer properties, used for identification, debugging, +/// and general information. +pub type PeerProperties = super::Table; + +/// queue name +/// +/// must be shorter than 127, must match `^[a-zA-Z0-9-_.:]*$` +/// +/// The queue name identifies the queue within the vhost. In methods where the queue +/// name may be blank, and that has no specific significance, this refers to the +/// 'current' queue for the channel, meaning the last queue that the client declared +/// on the channel. If the client did not declare a queue, and the method needs a +/// queue name, this will result in a 502 (syntax error) channel exception. +pub type QueueName = String; + +/// message is being redelivered +/// +/// This indicates that the message has been previously delivered to this or +/// another client. +pub type Redelivered = bool; + +/// number of messages in queue +/// +/// The number of messages in the queue, which will be zero for newly-declared +/// queues. This is the number of messages present in the queue, and committed +/// if the channel on which they were published is transacted, that are not +/// waiting acknowledgement. +pub type MessageCount = u32; + +/// reply code from server +/// +/// must not be null +/// +/// The reply code. The AMQ reply codes are defined as constants at the start +/// of this formal specification. +pub type ReplyCode = u16; + +/// localised reply text +/// +/// must not be null +/// +/// The localised reply text. This text can be logged as an aid to resolving +/// issues. +pub type ReplyText = String; + +/// single bit +pub type Bit = bool; + +/// single octet +pub type Octet = u8; + +/// 16-bit integer +pub type Short = u16; + +/// 32-bit integer +pub type Long = u32; + +/// 64-bit integer +pub type Longlong = u64; + +/// short string (max. 256 characters) +pub type Shortstr = String; + +/// long string +pub type Longstr = Vec; + +/// 64-bit timestamp +pub type Timestamp = u64; + +/// field table +pub type Table = super::Table; + +#[derive(Debug, Clone, PartialEq)] +pub enum Method { + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. + /// This method starts the connection negotiation process by telling the client the + /// protocol version that the server proposes, along with a list of security mechanisms + /// which the client can use for authentication. + ConnectionStart { + /// The major version number can take any value from 0 to 99 as defined in the + /// AMQP specification. + version_major: Octet, + /// The minor version number can take any value from 0 to 99 as defined in the + /// AMQP specification. + version_minor: Octet, + server_properties: PeerProperties, + /// must not be null + /// + /// A list of the security mechanisms that the server supports, delimited by spaces. + mechanisms: Longstr, + /// must not be null + /// + /// A list of the message locales that the server supports, delimited by spaces. The + /// locale defines the language in which the server will send reply texts. + locales: Longstr, + }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. + /// This method selects a SASL security mechanism. + ConnectionStartOk { + client_properties: PeerProperties, + /// must not be null + /// + /// A single security mechanisms selected by the client, which must be one of those + /// specified by the server. + mechanism: Shortstr, + /// must not be null + /// + /// A block of opaque data passed to the security mechanism. The contents of this + /// data are defined by the SASL security mechanism. + response: Longstr, + /// must not be null + /// + /// A single message locale selected by the client, which must be one of those + /// specified by the server. + locale: Shortstr, + }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. + /// The SASL protocol works by exchanging challenges and responses until both peers have + /// received sufficient information to authenticate each other. This method challenges + /// the client to provide more information. + ConnectionSecure { + /// Challenge information, a block of opaque binary data passed to the security + /// mechanism. + challenge: Longstr, + }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. + /// This method attempts to authenticate, passing a block of SASL data for the security + /// mechanism at the server side. + ConnectionSecureOk { + /// must not be null + /// + /// A block of opaque data passed to the security mechanism. The contents of this + /// data are defined by the SASL security mechanism. + response: Longstr, + }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. + /// This method proposes a set of connection configuration values to the client. The + /// client can accept and/or adjust these. + ConnectionTune { + /// Specifies highest channel number that the server permits. Usable channel numbers + /// are in the range 1..channel-max. Zero indicates no specified limit. + channel_max: Short, + /// The largest frame size that the server proposes for the connection, including + /// frame header and end-byte. The client can negotiate a lower value. Zero means + /// that the server does not impose any specific limit but may reject very large + /// frames if it cannot allocate resources for them. + frame_max: Long, + /// The delay, in seconds, of the connection heartbeat that the server wants. + /// Zero means the server does not want a heartbeat. + heartbeat: Short, + }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. + /// This method sends the client's connection tuning parameters to the server. + /// Certain fields are negotiated, others provide capability information. + ConnectionTuneOk { + /// must not be null, must be less than the tune field of the method channel-max + /// + /// The maximum total number of channels that the client will use per connection. + channel_max: Short, + /// The largest frame size that the client and server will use for the connection. + /// Zero means that the client does not impose any specific limit but may reject + /// very large frames if it cannot allocate resources for them. Note that the + /// frame-max limit applies principally to content frames, where large contents can + /// be broken into frames of arbitrary size. + frame_max: Long, + /// The delay, in seconds, of the connection heartbeat that the client wants. Zero + /// means the client does not want a heartbeat. + heartbeat: Short, + }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. + /// This method opens a connection to a virtual host, which is a collection of + /// resources, and acts to separate multiple application domains within a server. + /// The server may apply arbitrary limits per virtual host, such as the number + /// of each type of entity that may be used, per connection and/or in total. + ConnectionOpen { + /// The name of the virtual host to work with. + virtual_host: Path, + reserved_1: Shortstr, + reserved_2: Bit, + }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. + /// This method signals to the client that the connection is ready for use. + ConnectionOpenOk { reserved_1: Shortstr }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. + /// This method indicates that the sender wants to close the connection. This may be + /// due to internal conditions (e.g. a forced shut-down) or due to an error handling + /// a specific method, i.e. an exception. When a close is due to an exception, the + /// sender provides the class and method id of the method which caused the exception. + ConnectionClose { + reply_code: ReplyCode, + reply_text: ReplyText, + /// When the close is provoked by a method exception, this is the class of the + /// method. + class_id: ClassId, + /// When the close is provoked by a method exception, this is the ID of the method. + method_id: MethodId, + }, + /// The connection class provides methods for a client to establish a network connection to + /// a server, and for both peers to operate the connection thereafter. + /// This method confirms a Connection.Close method and tells the recipient that it is + /// safe to release resources for the connection and close the socket. + ConnectionCloseOk, + /// The channel class provides methods for a client to establish a channel to a + /// server and for both peers to operate the channel thereafter. + /// This method opens a channel to the server. + ChannelOpen { reserved_1: Shortstr }, + /// The channel class provides methods for a client to establish a channel to a + /// server and for both peers to operate the channel thereafter. + /// This method signals to the client that the channel is ready for use. + ChannelOpenOk { reserved_1: Longstr }, + /// The channel class provides methods for a client to establish a channel to a + /// server and for both peers to operate the channel thereafter. + /// This method asks the peer to pause or restart the flow of content data sent by + /// a consumer. This is a simple flow-control mechanism that a peer can use to avoid + /// overflowing its queues or otherwise finding itself receiving more messages than + /// it can process. Note that this method is not intended for window control. It does + /// not affect contents returned by Basic.Get-Ok methods. + ChannelFlow { + /// If 1, the peer starts sending content frames. If 0, the peer stops sending + /// content frames. + active: Bit, + }, + /// The channel class provides methods for a client to establish a channel to a + /// server and for both peers to operate the channel thereafter. + /// Confirms to the peer that a flow command was received and processed. + ChannelFlowOk { + /// Confirms the setting of the processed flow method: 1 means the peer will start + /// sending or continue to send content frames; 0 means it will not. + active: Bit, + }, + /// The channel class provides methods for a client to establish a channel to a + /// server and for both peers to operate the channel thereafter. + /// This method indicates that the sender wants to close the channel. This may be due to + /// internal conditions (e.g. a forced shut-down) or due to an error handling a specific + /// method, i.e. an exception. When a close is due to an exception, the sender provides + /// the class and method id of the method which caused the exception. + ChannelClose { + reply_code: ReplyCode, + reply_text: ReplyText, + /// When the close is provoked by a method exception, this is the class of the + /// method. + class_id: ClassId, + /// When the close is provoked by a method exception, this is the ID of the method. + method_id: MethodId, + }, + /// The channel class provides methods for a client to establish a channel to a + /// server and for both peers to operate the channel thereafter. + /// This method confirms a Channel.Close method and tells the recipient that it is safe + /// to release resources for the channel. + ChannelCloseOk, + /// Exchanges match and distribute messages across queues. Exchanges can be configured in + /// the server or declared at runtime. + /// This method creates an exchange if it does not already exist, and if the exchange + /// exists, verifies that it is of the correct and expected class. + ExchangeDeclare { + reserved_1: Short, + /// must not be null + exchange: ExchangeName, + /// Each exchange belongs to one of a set of exchange types implemented by the + /// server. The exchange types define the functionality of the exchange - i.e. how + /// messages are routed through it. It is not valid or meaningful to attempt to + /// change the type of an existing exchange. + r#type: Shortstr, + /// If set, the server will reply with Declare-Ok if the exchange already + /// exists with the same name, and raise an error if not. The client can + /// use this to check whether an exchange exists without modifying the + /// server state. When set, all other method fields except name and no-wait + /// are ignored. A declare with both passive and no-wait has no effect. + /// Arguments are compared for semantic equivalence. + passive: Bit, + /// If set when creating a new exchange, the exchange will be marked as durable. + /// Durable exchanges remain active when a server restarts. Non-durable exchanges + /// (transient exchanges) are purged if/when a server restarts. + durable: Bit, + reserved_2: Bit, + reserved_3: Bit, + no_wait: NoWait, + /// A set of arguments for the declaration. The syntax and semantics of these + /// arguments depends on the server implementation. + arguments: Table, + }, + /// Exchanges match and distribute messages across queues. Exchanges can be configured in + /// the server or declared at runtime. + /// This method confirms a Declare method and confirms the name of the exchange, + /// essential for automatically-named exchanges. + ExchangeDeclareOk, + /// Exchanges match and distribute messages across queues. Exchanges can be configured in + /// the server or declared at runtime. + /// This method deletes an exchange. When an exchange is deleted all queue bindings on + /// the exchange are cancelled. + ExchangeDelete { + reserved_1: Short, + /// must not be null + exchange: ExchangeName, + /// If set, the server will only delete the exchange if it has no queue bindings. If + /// the exchange has queue bindings the server does not delete it but raises a + /// channel exception instead. + if_unused: Bit, + no_wait: NoWait, + }, + /// Exchanges match and distribute messages across queues. Exchanges can be configured in + /// the server or declared at runtime. + /// This method confirms the deletion of an exchange. + ExchangeDeleteOk, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. + /// This method creates or checks a queue. When creating a new queue the client can + /// specify various properties that control the durability of the queue and its + /// contents, and the level of sharing for the queue. + QueueDeclare { + reserved_1: Short, + queue: QueueName, + /// If set, the server will reply with Declare-Ok if the queue already + /// exists with the same name, and raise an error if not. The client can + /// use this to check whether a queue exists without modifying the + /// server state. When set, all other method fields except name and no-wait + /// are ignored. A declare with both passive and no-wait has no effect. + /// Arguments are compared for semantic equivalence. + passive: Bit, + /// If set when creating a new queue, the queue will be marked as durable. Durable + /// queues remain active when a server restarts. Non-durable queues (transient + /// queues) are purged if/when a server restarts. Note that durable queues do not + /// necessarily hold persistent messages, although it does not make sense to send + /// persistent messages to a transient queue. + durable: Bit, + /// Exclusive queues may only be accessed by the current connection, and are + /// deleted when that connection closes. Passive declaration of an exclusive + /// queue by other connections are not allowed. + exclusive: Bit, + /// If set, the queue is deleted when all consumers have finished using it. The last + /// consumer can be cancelled either explicitly or because its channel is closed. If + /// there was no consumer ever on the queue, it won't be deleted. Applications can + /// explicitly delete auto-delete queues using the Delete method as normal. + auto_delete: Bit, + no_wait: NoWait, + /// A set of arguments for the declaration. The syntax and semantics of these + /// arguments depends on the server implementation. + arguments: Table, + }, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. + /// This method confirms a Declare method and confirms the name of the queue, essential + /// for automatically-named queues. + QueueDeclareOk { + /// must not be null + /// + /// Reports the name of the queue. If the server generated a queue name, this field + /// contains that name. + queue: QueueName, + message_count: MessageCount, + /// Reports the number of active consumers for the queue. Note that consumers can + /// suspend activity (Channel.Flow) in which case they do not appear in this count. + consumer_count: Long, + }, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. + /// This method binds a queue to an exchange. Until a queue is bound it will not + /// receive any messages. In a classic messaging model, store-and-forward queues + /// are bound to a direct exchange and subscription queues are bound to a topic + /// exchange. + QueueBind { + reserved_1: Short, + /// Specifies the name of the queue to bind. + queue: QueueName, + exchange: ExchangeName, + /// Specifies the routing key for the binding. The routing key is used for routing + /// messages depending on the exchange configuration. Not all exchanges use a + /// routing key - refer to the specific exchange documentation. If the queue name + /// is empty, the server uses the last queue declared on the channel. If the + /// routing key is also empty, the server uses this queue name for the routing + /// key as well. If the queue name is provided but the routing key is empty, the + /// server does the binding with that empty routing key. The meaning of empty + /// routing keys depends on the exchange implementation. + routing_key: Shortstr, + no_wait: NoWait, + /// A set of arguments for the binding. The syntax and semantics of these arguments + /// depends on the exchange class. + arguments: Table, + }, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. + /// This method confirms that the bind was successful. + QueueBindOk, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. + /// This method unbinds a queue from an exchange. + QueueUnbind { + reserved_1: Short, + /// Specifies the name of the queue to unbind. + queue: QueueName, + /// The name of the exchange to unbind from. + exchange: ExchangeName, + /// Specifies the routing key of the binding to unbind. + routing_key: Shortstr, + /// Specifies the arguments of the binding to unbind. + arguments: Table, + }, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. + /// This method confirms that the unbind was successful. + QueueUnbindOk, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. + /// This method removes all messages from a queue which are not awaiting + /// acknowledgment. + QueuePurge { + reserved_1: Short, + /// Specifies the name of the queue to purge. + queue: QueueName, + no_wait: NoWait, + }, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. + /// This method confirms the purge of a queue. + QueuePurgeOk { + /// Reports the number of messages purged. + message_count: MessageCount, + }, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. + /// This method deletes a queue. When a queue is deleted any pending messages are sent + /// to a dead-letter queue if this is defined in the server configuration, and all + /// consumers on the queue are cancelled. + QueueDelete { + reserved_1: Short, + /// Specifies the name of the queue to delete. + queue: QueueName, + /// If set, the server will only delete the queue if it has no consumers. If the + /// queue has consumers the server does does not delete it but raises a channel + /// exception instead. + if_unused: Bit, + /// If set, the server will only delete the queue if it has no messages. + if_empty: Bit, + no_wait: NoWait, + }, + /// Queues store and forward messages. Queues can be configured in the server or created at + /// runtime. Queues must be attached to at least one exchange in order to receive messages + /// from publishers. + /// This method confirms the deletion of a queue. + QueueDeleteOk { + /// Reports the number of messages deleted. + message_count: MessageCount, + }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method requests a specific quality of service. The QoS can be specified for the + /// current channel or for all channels on the connection. The particular properties and + /// semantics of a qos method always depend on the content class semantics. Though the + /// qos method could in principle apply to both peers, it is currently meaningful only + /// for the server. + BasicQos { + /// The client can request that messages be sent in advance so that when the client + /// finishes processing a message, the following message is already held locally, + /// rather than needing to be sent down the channel. Prefetching gives a performance + /// improvement. This field specifies the prefetch window size in octets. The server + /// will send a message in advance if it is equal to or smaller in size than the + /// available prefetch size (and also falls into other prefetch limits). May be set + /// to zero, meaning "no specific limit", although other prefetch limits may still + /// apply. The prefetch-size is ignored if the no-ack option is set. + prefetch_size: Long, + /// Specifies a prefetch window in terms of whole messages. This field may be used + /// in combination with the prefetch-size field; a message will only be sent in + /// advance if both prefetch windows (and those at the channel and connection level) + /// allow it. The prefetch-count is ignored if the no-ack option is set. + prefetch_count: Short, + /// By default the QoS settings apply to the current channel only. If this field is + /// set, they are applied to the entire connection. + global: Bit, + }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method tells the client that the requested QoS levels could be handled by the + /// server. The requested QoS applies to all active consumers until a new QoS is + /// defined. + BasicQosOk, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method asks the server to start a "consumer", which is a transient request for + /// messages from a specific queue. Consumers last as long as the channel they were + /// declared on, or until the client cancels them. + BasicConsume { + reserved_1: Short, + /// Specifies the name of the queue to consume from. + queue: QueueName, + /// Specifies the identifier for the consumer. The consumer tag is local to a + /// channel, so two clients can use the same consumer tags. If this field is + /// empty the server will generate a unique tag. + consumer_tag: ConsumerTag, + no_local: NoLocal, + no_ack: NoAck, + /// Request exclusive consumer access, meaning only this consumer can access the + /// queue. + exclusive: Bit, + no_wait: NoWait, + /// A set of arguments for the consume. The syntax and semantics of these + /// arguments depends on the server implementation. + arguments: Table, + }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// The server provides the client with a consumer tag, which is used by the client + /// for methods called on the consumer at a later stage. + BasicConsumeOk { + /// Holds the consumer tag specified by the client or provided by the server. + consumer_tag: ConsumerTag, + }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method cancels a consumer. This does not affect already delivered + /// messages, but it does mean the server will not send any more messages for + /// that consumer. The client may receive an arbitrary number of messages in + /// between sending the cancel method and receiving the cancel-ok reply. + BasicCancel { + consumer_tag: ConsumerTag, + no_wait: NoWait, + }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method confirms that the cancellation was completed. + BasicCancelOk { consumer_tag: ConsumerTag }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method publishes a message to a specific exchange. The message will be routed + /// to queues as defined by the exchange configuration and distributed to any active + /// consumers when the transaction, if any, is committed. + BasicPublish { + reserved_1: Short, + /// Specifies the name of the exchange to publish to. The exchange name can be + /// empty, meaning the default exchange. If the exchange name is specified, and that + /// exchange does not exist, the server will raise a channel exception. + exchange: ExchangeName, + /// Specifies the routing key for the message. The routing key is used for routing + /// messages depending on the exchange configuration. + routing_key: Shortstr, + /// This flag tells the server how to react if the message cannot be routed to a + /// queue. If this flag is set, the server will return an unroutable message with a + /// Return method. If this flag is zero, the server silently drops the message. + mandatory: Bit, + /// This flag tells the server how to react if the message cannot be routed to a + /// queue consumer immediately. If this flag is set, the server will return an + /// undeliverable message with a Return method. If this flag is zero, the server + /// will queue the message, but with no guarantee that it will ever be consumed. + immediate: Bit, + }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method returns an undeliverable message that was published with the "immediate" + /// flag set, or an unroutable message published with the "mandatory" flag set. The + /// reply code and text provide information about the reason that the message was + /// undeliverable. + BasicReturn { + reply_code: ReplyCode, + reply_text: ReplyText, + /// Specifies the name of the exchange that the message was originally published + /// to. May be empty, meaning the default exchange. + exchange: ExchangeName, + /// Specifies the routing key name specified when the message was published. + routing_key: Shortstr, + }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method delivers a message to the client, via a consumer. In the asynchronous + /// message delivery model, the client starts a consumer using the Consume method, then + /// the server responds with Deliver methods as and when messages arrive for that + /// consumer. + BasicDeliver { + consumer_tag: ConsumerTag, + delivery_tag: DeliveryTag, + redelivered: Redelivered, + /// Specifies the name of the exchange that the message was originally published to. + /// May be empty, indicating the default exchange. + exchange: ExchangeName, + /// Specifies the routing key name specified when the message was published. + routing_key: Shortstr, + }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method provides a direct access to the messages in a queue using a synchronous + /// dialogue that is designed for specific types of application where synchronous + /// functionality is more important than performance. + BasicGet { + reserved_1: Short, + /// Specifies the name of the queue to get a message from. + queue: QueueName, + no_ack: NoAck, + }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method delivers a message to the client following a get method. A message + /// delivered by 'get-ok' must be acknowledged unless the no-ack option was set in the + /// get method. + BasicGetOk { + delivery_tag: DeliveryTag, + redelivered: Redelivered, + /// Specifies the name of the exchange that the message was originally published to. + /// If empty, the message was published to the default exchange. + exchange: ExchangeName, + /// Specifies the routing key name specified when the message was published. + routing_key: Shortstr, + message_count: MessageCount, + }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method tells the client that the queue has no messages available for the + /// client. + BasicGetEmpty { reserved_1: Shortstr }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method acknowledges one or more messages delivered via the Deliver or Get-Ok + /// methods. The client can ask to confirm a single message or a set of messages up to + /// and including a specific message. + BasicAck { + delivery_tag: DeliveryTag, + /// If set to 1, the delivery tag is treated as "up to and including", so that the + /// client can acknowledge multiple messages with a single method. If set to zero, + /// the delivery tag refers to a single message. If the multiple field is 1, and the + /// delivery tag is zero, tells the server to acknowledge all outstanding messages. + multiple: Bit, + }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method allows a client to reject a message. It can be used to interrupt and + /// cancel large incoming messages, or return untreatable messages to their original + /// queue. + BasicReject { + delivery_tag: DeliveryTag, + /// If requeue is true, the server will attempt to requeue the message. If requeue + /// is false or the requeue attempt fails the messages are discarded or dead-lettered. + requeue: Bit, + }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method asks the server to redeliver all unacknowledged messages on a + /// specified channel. Zero or more messages may be redelivered. This method + /// is deprecated in favour of the synchronous Recover/Recover-Ok. + BasicRecoverAsync { + /// If this field is zero, the message will be redelivered to the original + /// recipient. If this bit is 1, the server will attempt to requeue the message, + /// potentially then delivering it to an alternative subscriber. + requeue: Bit, + }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method asks the server to redeliver all unacknowledged messages on a + /// specified channel. Zero or more messages may be redelivered. This method + /// replaces the asynchronous Recover. + BasicRecover { + /// If this field is zero, the message will be redelivered to the original + /// recipient. If this bit is 1, the server will attempt to requeue the message, + /// potentially then delivering it to an alternative subscriber. + requeue: Bit, + }, + /// The Basic class provides methods that support an industry-standard messaging model. + /// This method acknowledges a Basic.Recover method. + BasicRecoverOk, + /// The Tx class allows publish and ack operations to be batched into atomic + /// units of work. The intention is that all publish and ack requests issued + /// within a transaction will complete successfully or none of them will. + /// Servers SHOULD implement atomic transactions at least where all publish + /// or ack requests affect a single queue. Transactions that cover multiple + /// queues may be non-atomic, given that queues can be created and destroyed + /// asynchronously, and such events do not form part of any transaction. + /// Further, the behaviour of transactions with respect to the immediate and + /// mandatory flags on Basic.Publish methods is not defined. + /// This method sets the channel to use standard transactions. The client must use this + /// method at least once on a channel before using the Commit or Rollback methods. + TxSelect, + /// The Tx class allows publish and ack operations to be batched into atomic + /// units of work. The intention is that all publish and ack requests issued + /// within a transaction will complete successfully or none of them will. + /// Servers SHOULD implement atomic transactions at least where all publish + /// or ack requests affect a single queue. Transactions that cover multiple + /// queues may be non-atomic, given that queues can be created and destroyed + /// asynchronously, and such events do not form part of any transaction. + /// Further, the behaviour of transactions with respect to the immediate and + /// mandatory flags on Basic.Publish methods is not defined. + /// This method confirms to the client that the channel was successfully set to use + /// standard transactions. + TxSelectOk, + /// The Tx class allows publish and ack operations to be batched into atomic + /// units of work. The intention is that all publish and ack requests issued + /// within a transaction will complete successfully or none of them will. + /// Servers SHOULD implement atomic transactions at least where all publish + /// or ack requests affect a single queue. Transactions that cover multiple + /// queues may be non-atomic, given that queues can be created and destroyed + /// asynchronously, and such events do not form part of any transaction. + /// Further, the behaviour of transactions with respect to the immediate and + /// mandatory flags on Basic.Publish methods is not defined. + /// This method commits all message publications and acknowledgments performed in + /// the current transaction. A new transaction starts immediately after a commit. + TxCommit, + /// The Tx class allows publish and ack operations to be batched into atomic + /// units of work. The intention is that all publish and ack requests issued + /// within a transaction will complete successfully or none of them will. + /// Servers SHOULD implement atomic transactions at least where all publish + /// or ack requests affect a single queue. Transactions that cover multiple + /// queues may be non-atomic, given that queues can be created and destroyed + /// asynchronously, and such events do not form part of any transaction. + /// Further, the behaviour of transactions with respect to the immediate and + /// mandatory flags on Basic.Publish methods is not defined. + /// This method confirms to the client that the commit succeeded. Note that if a commit + /// fails, the server raises a channel exception. + TxCommitOk, + /// The Tx class allows publish and ack operations to be batched into atomic + /// units of work. The intention is that all publish and ack requests issued + /// within a transaction will complete successfully or none of them will. + /// Servers SHOULD implement atomic transactions at least where all publish + /// or ack requests affect a single queue. Transactions that cover multiple + /// queues may be non-atomic, given that queues can be created and destroyed + /// asynchronously, and such events do not form part of any transaction. + /// Further, the behaviour of transactions with respect to the immediate and + /// mandatory flags on Basic.Publish methods is not defined. + /// This method abandons all message publications and acknowledgments performed in + /// the current transaction. A new transaction starts immediately after a rollback. + /// Note that unacked messages will not be automatically redelivered by rollback; + /// if that is required an explicit recover call should be issued. + TxRollback, + /// The Tx class allows publish and ack operations to be batched into atomic + /// units of work. The intention is that all publish and ack requests issued + /// within a transaction will complete successfully or none of them will. + /// Servers SHOULD implement atomic transactions at least where all publish + /// or ack requests affect a single queue. Transactions that cover multiple + /// queues may be non-atomic, given that queues can be created and destroyed + /// asynchronously, and such events do not form part of any transaction. + /// Further, the behaviour of transactions with respect to the immediate and + /// mandatory flags on Basic.Publish methods is not defined. + /// This method confirms to the client that the rollback succeeded. Note that if an + /// rollback fails, the server raises a channel exception. + TxRollbackOk, +} diff --git a/amqp_core/src/methods/mod.rs b/amqp_core/src/methods/mod.rs new file mode 100644 index 0000000..971e5f4 --- /dev/null +++ b/amqp_core/src/methods/mod.rs @@ -0,0 +1,31 @@ +mod generated; + +use std::collections::HashMap; + +pub use generated::*; + +pub type TableFieldName = String; + +pub type Table = HashMap; + +#[derive(Debug, Clone, PartialEq)] +pub enum FieldValue { + Boolean(bool), + ShortShortInt(i8), + ShortShortUInt(u8), + ShortInt(i16), + ShortUInt(u16), + LongInt(i32), + LongUInt(u32), + LongLongInt(i64), + LongLongUInt(u64), + Float(f32), + Double(f64), + DecimalValue(u8, u32), + ShortString(Shortstr), + LongString(Longstr), + FieldArray(Vec), + Timestamp(u64), + FieldTable(Table), + Void, +} diff --git a/amqp_messaging/src/lib.rs b/amqp_messaging/src/lib.rs index 020bd3b..99badb9 100644 --- a/amqp_messaging/src/lib.rs +++ b/amqp_messaging/src/lib.rs @@ -1,3 +1,3 @@ #![warn(rust_2018_idioms)] -mod method; +pub mod methods; diff --git a/amqp_messaging/src/method.rs b/amqp_messaging/src/method.rs deleted file mode 100644 index b49b6ab..0000000 --- a/amqp_messaging/src/method.rs +++ /dev/null @@ -1,3 +0,0 @@ -use amqp_core::ChannelHandle; - -pub async fn handle_method(channel_handle: ChannelHandle) {} diff --git a/amqp_messaging/src/methods.rs b/amqp_messaging/src/methods.rs new file mode 100644 index 0000000..28be325 --- /dev/null +++ b/amqp_messaging/src/methods.rs @@ -0,0 +1,4 @@ +use amqp_core::methods::Method; +use amqp_core::ChannelHandle; + +pub async fn handle_method(_channel_handle: ChannelHandle, _method: Method) {} diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 7ba509c..2628fca 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -1,7 +1,7 @@ use crate::error::{ConException, ProtocolError, Result}; use crate::frame::{Frame, FrameType}; -use crate::methods::Method; use crate::{frame, methods, sasl}; +use amqp_core::methods::{FieldValue, Method, Table}; use amqp_core::GlobalData; use anyhow::Context; use std::collections::HashMap; @@ -220,7 +220,17 @@ impl Connection { } Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?, _ => { - tokio::spawn(amqp_core::method::handle_method()) + let channel_handle = self + .channels + .get(&frame.channel) + .ok_or_else(|| ConException::Todo.into_trans())? + .channel_handle + .clone(); + + tokio::spawn(amqp_messaging::methods::handle_method( + channel_handle, + method, + )); // we don't handle this here, forward it to *somewhere* } } @@ -325,9 +335,9 @@ impl Drop for Channel { } } -fn server_properties(host: SocketAddr) -> methods::Table { - fn ls(str: &str) -> methods::FieldValue { - methods::FieldValue::LongString(str.into()) +fn server_properties(host: SocketAddr) -> Table { + fn ls(str: &str) -> FieldValue { + FieldValue::LongString(str.into()) } let host_str = host.ip().to_string(); diff --git a/amqp_transport/src/methods/generated.rs b/amqp_transport/src/methods/generated.rs index 547792d..9ab93ca 100644 --- a/amqp_transport/src/methods/generated.rs +++ b/amqp_transport/src/methods/generated.rs @@ -1,794 +1,10 @@ #![allow(dead_code)] // This file has been generated by `xtask/src/codegen`. Do not edit it manually. -pub type ClassId = u16; - -/// consumer tag -/// -/// Identifier for the consumer, valid within the current channel. -pub type ConsumerTag = String; - -/// server-assigned delivery tag -/// -/// The server-assigned and channel-specific delivery tag -pub type DeliveryTag = u64; - -/// exchange name -/// -/// must be shorter than 127, must match `^[a-zA-Z0-9-_.:]*$` -/// -/// The exchange name is a client-selected string that identifies the exchange for -/// publish methods. -pub type ExchangeName = String; - -pub type MethodId = u16; - -/// no acknowledgement needed -/// -/// If this field is set the server does not expect acknowledgements for -/// messages. That is, when a message is delivered to the client the server -/// assumes the delivery will succeed and immediately dequeues it. This -/// functionality may increase performance but at the cost of reliability. -/// Messages can get lost if a client dies before they are delivered to the -/// application. -pub type NoAck = bool; - -/// do not deliver own messages -/// -/// If the no-local field is set the server will not send messages to the connection that -/// published them. -pub type NoLocal = bool; - -/// do not send reply method -/// -/// If set, the server will not respond to the method. The client should not wait -/// for a reply method. If the server could not complete the method it will raise a -/// channel or connection exception. -pub type NoWait = bool; - -/// must not be null, must be shorter than 127 -/// -/// Unconstrained. -pub type Path = String; - -/// -/// This table provides a set of peer properties, used for identification, debugging, -/// and general information. -pub type PeerProperties = super::Table; - -/// queue name -/// -/// must be shorter than 127, must match `^[a-zA-Z0-9-_.:]*$` -/// -/// The queue name identifies the queue within the vhost. In methods where the queue -/// name may be blank, and that has no specific significance, this refers to the -/// 'current' queue for the channel, meaning the last queue that the client declared -/// on the channel. If the client did not declare a queue, and the method needs a -/// queue name, this will result in a 502 (syntax error) channel exception. -pub type QueueName = String; - -/// message is being redelivered -/// -/// This indicates that the message has been previously delivered to this or -/// another client. -pub type Redelivered = bool; - -/// number of messages in queue -/// -/// The number of messages in the queue, which will be zero for newly-declared -/// queues. This is the number of messages present in the queue, and committed -/// if the channel on which they were published is transacted, that are not -/// waiting acknowledgement. -pub type MessageCount = u32; - -/// reply code from server -/// -/// must not be null -/// -/// The reply code. The AMQ reply codes are defined as constants at the start -/// of this formal specification. -pub type ReplyCode = u16; - -/// localised reply text -/// -/// must not be null -/// -/// The localised reply text. This text can be logged as an aid to resolving -/// issues. -pub type ReplyText = String; - -/// single bit -pub type Bit = bool; - -/// single octet -pub type Octet = u8; - -/// 16-bit integer -pub type Short = u16; - -/// 32-bit integer -pub type Long = u32; - -/// 64-bit integer -pub type Longlong = u64; - -/// short string (max. 256 characters) -pub type Shortstr = String; - -/// long string -pub type Longstr = Vec; - -/// 64-bit timestamp -pub type Timestamp = u64; - -/// field table -pub type Table = super::Table; - -#[derive(Debug, Clone, PartialEq)] -pub enum Method { - /// The connection class provides methods for a client to establish a network connection to - /// a server, and for both peers to operate the connection thereafter. - /// This method starts the connection negotiation process by telling the client the - /// protocol version that the server proposes, along with a list of security mechanisms - /// which the client can use for authentication. - ConnectionStart { - /// The major version number can take any value from 0 to 99 as defined in the - /// AMQP specification. - version_major: Octet, - /// The minor version number can take any value from 0 to 99 as defined in the - /// AMQP specification. - version_minor: Octet, - server_properties: PeerProperties, - /// must not be null - /// - /// A list of the security mechanisms that the server supports, delimited by spaces. - mechanisms: Longstr, - /// must not be null - /// - /// A list of the message locales that the server supports, delimited by spaces. The - /// locale defines the language in which the server will send reply texts. - locales: Longstr, - }, - /// The connection class provides methods for a client to establish a network connection to - /// a server, and for both peers to operate the connection thereafter. - /// This method selects a SASL security mechanism. - ConnectionStartOk { - client_properties: PeerProperties, - /// must not be null - /// - /// A single security mechanisms selected by the client, which must be one of those - /// specified by the server. - mechanism: Shortstr, - /// must not be null - /// - /// A block of opaque data passed to the security mechanism. The contents of this - /// data are defined by the SASL security mechanism. - response: Longstr, - /// must not be null - /// - /// A single message locale selected by the client, which must be one of those - /// specified by the server. - locale: Shortstr, - }, - /// The connection class provides methods for a client to establish a network connection to - /// a server, and for both peers to operate the connection thereafter. - /// The SASL protocol works by exchanging challenges and responses until both peers have - /// received sufficient information to authenticate each other. This method challenges - /// the client to provide more information. - ConnectionSecure { - /// Challenge information, a block of opaque binary data passed to the security - /// mechanism. - challenge: Longstr, - }, - /// The connection class provides methods for a client to establish a network connection to - /// a server, and for both peers to operate the connection thereafter. - /// This method attempts to authenticate, passing a block of SASL data for the security - /// mechanism at the server side. - ConnectionSecureOk { - /// must not be null - /// - /// A block of opaque data passed to the security mechanism. The contents of this - /// data are defined by the SASL security mechanism. - response: Longstr, - }, - /// The connection class provides methods for a client to establish a network connection to - /// a server, and for both peers to operate the connection thereafter. - /// This method proposes a set of connection configuration values to the client. The - /// client can accept and/or adjust these. - ConnectionTune { - /// Specifies highest channel number that the server permits. Usable channel numbers - /// are in the range 1..channel-max. Zero indicates no specified limit. - channel_max: Short, - /// The largest frame size that the server proposes for the connection, including - /// frame header and end-byte. The client can negotiate a lower value. Zero means - /// that the server does not impose any specific limit but may reject very large - /// frames if it cannot allocate resources for them. - frame_max: Long, - /// The delay, in seconds, of the connection heartbeat that the server wants. - /// Zero means the server does not want a heartbeat. - heartbeat: Short, - }, - /// The connection class provides methods for a client to establish a network connection to - /// a server, and for both peers to operate the connection thereafter. - /// This method sends the client's connection tuning parameters to the server. - /// Certain fields are negotiated, others provide capability information. - ConnectionTuneOk { - /// must not be null, must be less than the tune field of the method channel-max - /// - /// The maximum total number of channels that the client will use per connection. - channel_max: Short, - /// The largest frame size that the client and server will use for the connection. - /// Zero means that the client does not impose any specific limit but may reject - /// very large frames if it cannot allocate resources for them. Note that the - /// frame-max limit applies principally to content frames, where large contents can - /// be broken into frames of arbitrary size. - frame_max: Long, - /// The delay, in seconds, of the connection heartbeat that the client wants. Zero - /// means the client does not want a heartbeat. - heartbeat: Short, - }, - /// The connection class provides methods for a client to establish a network connection to - /// a server, and for both peers to operate the connection thereafter. - /// This method opens a connection to a virtual host, which is a collection of - /// resources, and acts to separate multiple application domains within a server. - /// The server may apply arbitrary limits per virtual host, such as the number - /// of each type of entity that may be used, per connection and/or in total. - ConnectionOpen { - /// The name of the virtual host to work with. - virtual_host: Path, - reserved_1: Shortstr, - reserved_2: Bit, - }, - /// The connection class provides methods for a client to establish a network connection to - /// a server, and for both peers to operate the connection thereafter. - /// This method signals to the client that the connection is ready for use. - ConnectionOpenOk { reserved_1: Shortstr }, - /// The connection class provides methods for a client to establish a network connection to - /// a server, and for both peers to operate the connection thereafter. - /// This method indicates that the sender wants to close the connection. This may be - /// due to internal conditions (e.g. a forced shut-down) or due to an error handling - /// a specific method, i.e. an exception. When a close is due to an exception, the - /// sender provides the class and method id of the method which caused the exception. - ConnectionClose { - reply_code: ReplyCode, - reply_text: ReplyText, - /// When the close is provoked by a method exception, this is the class of the - /// method. - class_id: ClassId, - /// When the close is provoked by a method exception, this is the ID of the method. - method_id: MethodId, - }, - /// The connection class provides methods for a client to establish a network connection to - /// a server, and for both peers to operate the connection thereafter. - /// This method confirms a Connection.Close method and tells the recipient that it is - /// safe to release resources for the connection and close the socket. - ConnectionCloseOk, - /// The channel class provides methods for a client to establish a channel to a - /// server and for both peers to operate the channel thereafter. - /// This method opens a channel to the server. - ChannelOpen { reserved_1: Shortstr }, - /// The channel class provides methods for a client to establish a channel to a - /// server and for both peers to operate the channel thereafter. - /// This method signals to the client that the channel is ready for use. - ChannelOpenOk { reserved_1: Longstr }, - /// The channel class provides methods for a client to establish a channel to a - /// server and for both peers to operate the channel thereafter. - /// This method asks the peer to pause or restart the flow of content data sent by - /// a consumer. This is a simple flow-control mechanism that a peer can use to avoid - /// overflowing its queues or otherwise finding itself receiving more messages than - /// it can process. Note that this method is not intended for window control. It does - /// not affect contents returned by Basic.Get-Ok methods. - ChannelFlow { - /// If 1, the peer starts sending content frames. If 0, the peer stops sending - /// content frames. - active: Bit, - }, - /// The channel class provides methods for a client to establish a channel to a - /// server and for both peers to operate the channel thereafter. - /// Confirms to the peer that a flow command was received and processed. - ChannelFlowOk { - /// Confirms the setting of the processed flow method: 1 means the peer will start - /// sending or continue to send content frames; 0 means it will not. - active: Bit, - }, - /// The channel class provides methods for a client to establish a channel to a - /// server and for both peers to operate the channel thereafter. - /// This method indicates that the sender wants to close the channel. This may be due to - /// internal conditions (e.g. a forced shut-down) or due to an error handling a specific - /// method, i.e. an exception. When a close is due to an exception, the sender provides - /// the class and method id of the method which caused the exception. - ChannelClose { - reply_code: ReplyCode, - reply_text: ReplyText, - /// When the close is provoked by a method exception, this is the class of the - /// method. - class_id: ClassId, - /// When the close is provoked by a method exception, this is the ID of the method. - method_id: MethodId, - }, - /// The channel class provides methods for a client to establish a channel to a - /// server and for both peers to operate the channel thereafter. - /// This method confirms a Channel.Close method and tells the recipient that it is safe - /// to release resources for the channel. - ChannelCloseOk, - /// Exchanges match and distribute messages across queues. Exchanges can be configured in - /// the server or declared at runtime. - /// This method creates an exchange if it does not already exist, and if the exchange - /// exists, verifies that it is of the correct and expected class. - ExchangeDeclare { - reserved_1: Short, - /// must not be null - exchange: ExchangeName, - /// Each exchange belongs to one of a set of exchange types implemented by the - /// server. The exchange types define the functionality of the exchange - i.e. how - /// messages are routed through it. It is not valid or meaningful to attempt to - /// change the type of an existing exchange. - r#type: Shortstr, - /// If set, the server will reply with Declare-Ok if the exchange already - /// exists with the same name, and raise an error if not. The client can - /// use this to check whether an exchange exists without modifying the - /// server state. When set, all other method fields except name and no-wait - /// are ignored. A declare with both passive and no-wait has no effect. - /// Arguments are compared for semantic equivalence. - passive: Bit, - /// If set when creating a new exchange, the exchange will be marked as durable. - /// Durable exchanges remain active when a server restarts. Non-durable exchanges - /// (transient exchanges) are purged if/when a server restarts. - durable: Bit, - reserved_2: Bit, - reserved_3: Bit, - no_wait: NoWait, - /// A set of arguments for the declaration. The syntax and semantics of these - /// arguments depends on the server implementation. - arguments: Table, - }, - /// Exchanges match and distribute messages across queues. Exchanges can be configured in - /// the server or declared at runtime. - /// This method confirms a Declare method and confirms the name of the exchange, - /// essential for automatically-named exchanges. - ExchangeDeclareOk, - /// Exchanges match and distribute messages across queues. Exchanges can be configured in - /// the server or declared at runtime. - /// This method deletes an exchange. When an exchange is deleted all queue bindings on - /// the exchange are cancelled. - ExchangeDelete { - reserved_1: Short, - /// must not be null - exchange: ExchangeName, - /// If set, the server will only delete the exchange if it has no queue bindings. If - /// the exchange has queue bindings the server does not delete it but raises a - /// channel exception instead. - if_unused: Bit, - no_wait: NoWait, - }, - /// Exchanges match and distribute messages across queues. Exchanges can be configured in - /// the server or declared at runtime. - /// This method confirms the deletion of an exchange. - ExchangeDeleteOk, - /// Queues store and forward messages. Queues can be configured in the server or created at - /// runtime. Queues must be attached to at least one exchange in order to receive messages - /// from publishers. - /// This method creates or checks a queue. When creating a new queue the client can - /// specify various properties that control the durability of the queue and its - /// contents, and the level of sharing for the queue. - QueueDeclare { - reserved_1: Short, - queue: QueueName, - /// If set, the server will reply with Declare-Ok if the queue already - /// exists with the same name, and raise an error if not. The client can - /// use this to check whether a queue exists without modifying the - /// server state. When set, all other method fields except name and no-wait - /// are ignored. A declare with both passive and no-wait has no effect. - /// Arguments are compared for semantic equivalence. - passive: Bit, - /// If set when creating a new queue, the queue will be marked as durable. Durable - /// queues remain active when a server restarts. Non-durable queues (transient - /// queues) are purged if/when a server restarts. Note that durable queues do not - /// necessarily hold persistent messages, although it does not make sense to send - /// persistent messages to a transient queue. - durable: Bit, - /// Exclusive queues may only be accessed by the current connection, and are - /// deleted when that connection closes. Passive declaration of an exclusive - /// queue by other connections are not allowed. - exclusive: Bit, - /// If set, the queue is deleted when all consumers have finished using it. The last - /// consumer can be cancelled either explicitly or because its channel is closed. If - /// there was no consumer ever on the queue, it won't be deleted. Applications can - /// explicitly delete auto-delete queues using the Delete method as normal. - auto_delete: Bit, - no_wait: NoWait, - /// A set of arguments for the declaration. The syntax and semantics of these - /// arguments depends on the server implementation. - arguments: Table, - }, - /// Queues store and forward messages. Queues can be configured in the server or created at - /// runtime. Queues must be attached to at least one exchange in order to receive messages - /// from publishers. - /// This method confirms a Declare method and confirms the name of the queue, essential - /// for automatically-named queues. - QueueDeclareOk { - /// must not be null - /// - /// Reports the name of the queue. If the server generated a queue name, this field - /// contains that name. - queue: QueueName, - message_count: MessageCount, - /// Reports the number of active consumers for the queue. Note that consumers can - /// suspend activity (Channel.Flow) in which case they do not appear in this count. - consumer_count: Long, - }, - /// Queues store and forward messages. Queues can be configured in the server or created at - /// runtime. Queues must be attached to at least one exchange in order to receive messages - /// from publishers. - /// This method binds a queue to an exchange. Until a queue is bound it will not - /// receive any messages. In a classic messaging model, store-and-forward queues - /// are bound to a direct exchange and subscription queues are bound to a topic - /// exchange. - QueueBind { - reserved_1: Short, - /// Specifies the name of the queue to bind. - queue: QueueName, - exchange: ExchangeName, - /// Specifies the routing key for the binding. The routing key is used for routing - /// messages depending on the exchange configuration. Not all exchanges use a - /// routing key - refer to the specific exchange documentation. If the queue name - /// is empty, the server uses the last queue declared on the channel. If the - /// routing key is also empty, the server uses this queue name for the routing - /// key as well. If the queue name is provided but the routing key is empty, the - /// server does the binding with that empty routing key. The meaning of empty - /// routing keys depends on the exchange implementation. - routing_key: Shortstr, - no_wait: NoWait, - /// A set of arguments for the binding. The syntax and semantics of these arguments - /// depends on the exchange class. - arguments: Table, - }, - /// Queues store and forward messages. Queues can be configured in the server or created at - /// runtime. Queues must be attached to at least one exchange in order to receive messages - /// from publishers. - /// This method confirms that the bind was successful. - QueueBindOk, - /// Queues store and forward messages. Queues can be configured in the server or created at - /// runtime. Queues must be attached to at least one exchange in order to receive messages - /// from publishers. - /// This method unbinds a queue from an exchange. - QueueUnbind { - reserved_1: Short, - /// Specifies the name of the queue to unbind. - queue: QueueName, - /// The name of the exchange to unbind from. - exchange: ExchangeName, - /// Specifies the routing key of the binding to unbind. - routing_key: Shortstr, - /// Specifies the arguments of the binding to unbind. - arguments: Table, - }, - /// Queues store and forward messages. Queues can be configured in the server or created at - /// runtime. Queues must be attached to at least one exchange in order to receive messages - /// from publishers. - /// This method confirms that the unbind was successful. - QueueUnbindOk, - /// Queues store and forward messages. Queues can be configured in the server or created at - /// runtime. Queues must be attached to at least one exchange in order to receive messages - /// from publishers. - /// This method removes all messages from a queue which are not awaiting - /// acknowledgment. - QueuePurge { - reserved_1: Short, - /// Specifies the name of the queue to purge. - queue: QueueName, - no_wait: NoWait, - }, - /// Queues store and forward messages. Queues can be configured in the server or created at - /// runtime. Queues must be attached to at least one exchange in order to receive messages - /// from publishers. - /// This method confirms the purge of a queue. - QueuePurgeOk { - /// Reports the number of messages purged. - message_count: MessageCount, - }, - /// Queues store and forward messages. Queues can be configured in the server or created at - /// runtime. Queues must be attached to at least one exchange in order to receive messages - /// from publishers. - /// This method deletes a queue. When a queue is deleted any pending messages are sent - /// to a dead-letter queue if this is defined in the server configuration, and all - /// consumers on the queue are cancelled. - QueueDelete { - reserved_1: Short, - /// Specifies the name of the queue to delete. - queue: QueueName, - /// If set, the server will only delete the queue if it has no consumers. If the - /// queue has consumers the server does does not delete it but raises a channel - /// exception instead. - if_unused: Bit, - /// If set, the server will only delete the queue if it has no messages. - if_empty: Bit, - no_wait: NoWait, - }, - /// Queues store and forward messages. Queues can be configured in the server or created at - /// runtime. Queues must be attached to at least one exchange in order to receive messages - /// from publishers. - /// This method confirms the deletion of a queue. - QueueDeleteOk { - /// Reports the number of messages deleted. - message_count: MessageCount, - }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method requests a specific quality of service. The QoS can be specified for the - /// current channel or for all channels on the connection. The particular properties and - /// semantics of a qos method always depend on the content class semantics. Though the - /// qos method could in principle apply to both peers, it is currently meaningful only - /// for the server. - BasicQos { - /// The client can request that messages be sent in advance so that when the client - /// finishes processing a message, the following message is already held locally, - /// rather than needing to be sent down the channel. Prefetching gives a performance - /// improvement. This field specifies the prefetch window size in octets. The server - /// will send a message in advance if it is equal to or smaller in size than the - /// available prefetch size (and also falls into other prefetch limits). May be set - /// to zero, meaning "no specific limit", although other prefetch limits may still - /// apply. The prefetch-size is ignored if the no-ack option is set. - prefetch_size: Long, - /// Specifies a prefetch window in terms of whole messages. This field may be used - /// in combination with the prefetch-size field; a message will only be sent in - /// advance if both prefetch windows (and those at the channel and connection level) - /// allow it. The prefetch-count is ignored if the no-ack option is set. - prefetch_count: Short, - /// By default the QoS settings apply to the current channel only. If this field is - /// set, they are applied to the entire connection. - global: Bit, - }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method tells the client that the requested QoS levels could be handled by the - /// server. The requested QoS applies to all active consumers until a new QoS is - /// defined. - BasicQosOk, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method asks the server to start a "consumer", which is a transient request for - /// messages from a specific queue. Consumers last as long as the channel they were - /// declared on, or until the client cancels them. - BasicConsume { - reserved_1: Short, - /// Specifies the name of the queue to consume from. - queue: QueueName, - /// Specifies the identifier for the consumer. The consumer tag is local to a - /// channel, so two clients can use the same consumer tags. If this field is - /// empty the server will generate a unique tag. - consumer_tag: ConsumerTag, - no_local: NoLocal, - no_ack: NoAck, - /// Request exclusive consumer access, meaning only this consumer can access the - /// queue. - exclusive: Bit, - no_wait: NoWait, - /// A set of arguments for the consume. The syntax and semantics of these - /// arguments depends on the server implementation. - arguments: Table, - }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// The server provides the client with a consumer tag, which is used by the client - /// for methods called on the consumer at a later stage. - BasicConsumeOk { - /// Holds the consumer tag specified by the client or provided by the server. - consumer_tag: ConsumerTag, - }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method cancels a consumer. This does not affect already delivered - /// messages, but it does mean the server will not send any more messages for - /// that consumer. The client may receive an arbitrary number of messages in - /// between sending the cancel method and receiving the cancel-ok reply. - BasicCancel { - consumer_tag: ConsumerTag, - no_wait: NoWait, - }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method confirms that the cancellation was completed. - BasicCancelOk { consumer_tag: ConsumerTag }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method publishes a message to a specific exchange. The message will be routed - /// to queues as defined by the exchange configuration and distributed to any active - /// consumers when the transaction, if any, is committed. - BasicPublish { - reserved_1: Short, - /// Specifies the name of the exchange to publish to. The exchange name can be - /// empty, meaning the default exchange. If the exchange name is specified, and that - /// exchange does not exist, the server will raise a channel exception. - exchange: ExchangeName, - /// Specifies the routing key for the message. The routing key is used for routing - /// messages depending on the exchange configuration. - routing_key: Shortstr, - /// This flag tells the server how to react if the message cannot be routed to a - /// queue. If this flag is set, the server will return an unroutable message with a - /// Return method. If this flag is zero, the server silently drops the message. - mandatory: Bit, - /// This flag tells the server how to react if the message cannot be routed to a - /// queue consumer immediately. If this flag is set, the server will return an - /// undeliverable message with a Return method. If this flag is zero, the server - /// will queue the message, but with no guarantee that it will ever be consumed. - immediate: Bit, - }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method returns an undeliverable message that was published with the "immediate" - /// flag set, or an unroutable message published with the "mandatory" flag set. The - /// reply code and text provide information about the reason that the message was - /// undeliverable. - BasicReturn { - reply_code: ReplyCode, - reply_text: ReplyText, - /// Specifies the name of the exchange that the message was originally published - /// to. May be empty, meaning the default exchange. - exchange: ExchangeName, - /// Specifies the routing key name specified when the message was published. - routing_key: Shortstr, - }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method delivers a message to the client, via a consumer. In the asynchronous - /// message delivery model, the client starts a consumer using the Consume method, then - /// the server responds with Deliver methods as and when messages arrive for that - /// consumer. - BasicDeliver { - consumer_tag: ConsumerTag, - delivery_tag: DeliveryTag, - redelivered: Redelivered, - /// Specifies the name of the exchange that the message was originally published to. - /// May be empty, indicating the default exchange. - exchange: ExchangeName, - /// Specifies the routing key name specified when the message was published. - routing_key: Shortstr, - }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method provides a direct access to the messages in a queue using a synchronous - /// dialogue that is designed for specific types of application where synchronous - /// functionality is more important than performance. - BasicGet { - reserved_1: Short, - /// Specifies the name of the queue to get a message from. - queue: QueueName, - no_ack: NoAck, - }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method delivers a message to the client following a get method. A message - /// delivered by 'get-ok' must be acknowledged unless the no-ack option was set in the - /// get method. - BasicGetOk { - delivery_tag: DeliveryTag, - redelivered: Redelivered, - /// Specifies the name of the exchange that the message was originally published to. - /// If empty, the message was published to the default exchange. - exchange: ExchangeName, - /// Specifies the routing key name specified when the message was published. - routing_key: Shortstr, - message_count: MessageCount, - }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method tells the client that the queue has no messages available for the - /// client. - BasicGetEmpty { reserved_1: Shortstr }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method acknowledges one or more messages delivered via the Deliver or Get-Ok - /// methods. The client can ask to confirm a single message or a set of messages up to - /// and including a specific message. - BasicAck { - delivery_tag: DeliveryTag, - /// If set to 1, the delivery tag is treated as "up to and including", so that the - /// client can acknowledge multiple messages with a single method. If set to zero, - /// the delivery tag refers to a single message. If the multiple field is 1, and the - /// delivery tag is zero, tells the server to acknowledge all outstanding messages. - multiple: Bit, - }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method allows a client to reject a message. It can be used to interrupt and - /// cancel large incoming messages, or return untreatable messages to their original - /// queue. - BasicReject { - delivery_tag: DeliveryTag, - /// If requeue is true, the server will attempt to requeue the message. If requeue - /// is false or the requeue attempt fails the messages are discarded or dead-lettered. - requeue: Bit, - }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method asks the server to redeliver all unacknowledged messages on a - /// specified channel. Zero or more messages may be redelivered. This method - /// is deprecated in favour of the synchronous Recover/Recover-Ok. - BasicRecoverAsync { - /// If this field is zero, the message will be redelivered to the original - /// recipient. If this bit is 1, the server will attempt to requeue the message, - /// potentially then delivering it to an alternative subscriber. - requeue: Bit, - }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method asks the server to redeliver all unacknowledged messages on a - /// specified channel. Zero or more messages may be redelivered. This method - /// replaces the asynchronous Recover. - BasicRecover { - /// If this field is zero, the message will be redelivered to the original - /// recipient. If this bit is 1, the server will attempt to requeue the message, - /// potentially then delivering it to an alternative subscriber. - requeue: Bit, - }, - /// The Basic class provides methods that support an industry-standard messaging model. - /// This method acknowledges a Basic.Recover method. - BasicRecoverOk, - /// The Tx class allows publish and ack operations to be batched into atomic - /// units of work. The intention is that all publish and ack requests issued - /// within a transaction will complete successfully or none of them will. - /// Servers SHOULD implement atomic transactions at least where all publish - /// or ack requests affect a single queue. Transactions that cover multiple - /// queues may be non-atomic, given that queues can be created and destroyed - /// asynchronously, and such events do not form part of any transaction. - /// Further, the behaviour of transactions with respect to the immediate and - /// mandatory flags on Basic.Publish methods is not defined. - /// This method sets the channel to use standard transactions. The client must use this - /// method at least once on a channel before using the Commit or Rollback methods. - TxSelect, - /// The Tx class allows publish and ack operations to be batched into atomic - /// units of work. The intention is that all publish and ack requests issued - /// within a transaction will complete successfully or none of them will. - /// Servers SHOULD implement atomic transactions at least where all publish - /// or ack requests affect a single queue. Transactions that cover multiple - /// queues may be non-atomic, given that queues can be created and destroyed - /// asynchronously, and such events do not form part of any transaction. - /// Further, the behaviour of transactions with respect to the immediate and - /// mandatory flags on Basic.Publish methods is not defined. - /// This method confirms to the client that the channel was successfully set to use - /// standard transactions. - TxSelectOk, - /// The Tx class allows publish and ack operations to be batched into atomic - /// units of work. The intention is that all publish and ack requests issued - /// within a transaction will complete successfully or none of them will. - /// Servers SHOULD implement atomic transactions at least where all publish - /// or ack requests affect a single queue. Transactions that cover multiple - /// queues may be non-atomic, given that queues can be created and destroyed - /// asynchronously, and such events do not form part of any transaction. - /// Further, the behaviour of transactions with respect to the immediate and - /// mandatory flags on Basic.Publish methods is not defined. - /// This method commits all message publications and acknowledgments performed in - /// the current transaction. A new transaction starts immediately after a commit. - TxCommit, - /// The Tx class allows publish and ack operations to be batched into atomic - /// units of work. The intention is that all publish and ack requests issued - /// within a transaction will complete successfully or none of them will. - /// Servers SHOULD implement atomic transactions at least where all publish - /// or ack requests affect a single queue. Transactions that cover multiple - /// queues may be non-atomic, given that queues can be created and destroyed - /// asynchronously, and such events do not form part of any transaction. - /// Further, the behaviour of transactions with respect to the immediate and - /// mandatory flags on Basic.Publish methods is not defined. - /// This method confirms to the client that the commit succeeded. Note that if a commit - /// fails, the server raises a channel exception. - TxCommitOk, - /// The Tx class allows publish and ack operations to be batched into atomic - /// units of work. The intention is that all publish and ack requests issued - /// within a transaction will complete successfully or none of them will. - /// Servers SHOULD implement atomic transactions at least where all publish - /// or ack requests affect a single queue. Transactions that cover multiple - /// queues may be non-atomic, given that queues can be created and destroyed - /// asynchronously, and such events do not form part of any transaction. - /// Further, the behaviour of transactions with respect to the immediate and - /// mandatory flags on Basic.Publish methods is not defined. - /// This method abandons all message publications and acknowledgments performed in - /// the current transaction. A new transaction starts immediately after a rollback. - /// Note that unacked messages will not be automatically redelivered by rollback; - /// if that is required an explicit recover call should be issued. - TxRollback, - /// The Tx class allows publish and ack operations to be batched into atomic - /// units of work. The intention is that all publish and ack requests issued - /// within a transaction will complete successfully or none of them will. - /// Servers SHOULD implement atomic transactions at least where all publish - /// or ack requests affect a single queue. Transactions that cover multiple - /// queues may be non-atomic, given that queues can be created and destroyed - /// asynchronously, and such events do not form part of any transaction. - /// Further, the behaviour of transactions with respect to the immediate and - /// mandatory flags on Basic.Publish methods is not defined. - /// This method confirms to the client that the rollback succeeded. Note that if an - /// rollback fails, the server raises a channel exception. - TxRollbackOk, -} - pub mod parse { - use super::*; use crate::error::TransError; use crate::methods::parse_helper::*; + use amqp_core::methods::*; use nom::{branch::alt, bytes::complete::tag}; use once_cell::sync::Lazy; use regex::Regex; @@ -1659,9 +875,9 @@ pub mod parse { } } pub mod write { - use super::*; use crate::error::TransError; use crate::methods::write_helper::*; + use amqp_core::methods::*; use std::io::Write; pub fn write_method(method: Method, mut writer: W) -> Result<(), TransError> { @@ -2077,8 +1293,8 @@ pub mod write { } mod random { - use super::*; use crate::methods::RandomMethod; + use amqp_core::methods::*; use rand::Rng; impl RandomMethod for Method { diff --git a/amqp_transport/src/methods/mod.rs b/amqp_transport/src/methods/mod.rs index 1296ffc..0118c7c 100644 --- a/amqp_transport/src/methods/mod.rs +++ b/amqp_transport/src/methods/mod.rs @@ -1,4 +1,5 @@ use crate::error::{ConException, TransError}; +use amqp_core::methods::{FieldValue, Method, Table}; use rand::Rng; use std::collections::HashMap; @@ -8,36 +9,10 @@ mod parse_helper; mod tests; mod write_helper; -pub type TableFieldName = String; - -pub type Table = HashMap; - -#[derive(Debug, Clone, PartialEq)] -pub enum FieldValue { - Boolean(bool), - ShortShortInt(i8), - ShortShortUInt(u8), - ShortInt(i16), - ShortUInt(u16), - LongInt(i32), - LongUInt(u32), - LongLongInt(i64), - LongLongUInt(u64), - Float(f32), - Double(f64), - DecimalValue(u8, u32), - ShortString(Shortstr), - LongString(Longstr), - FieldArray(Vec), - Timestamp(u64), - FieldTable(Table), - Void, -} - pub use generated::*; /// Parses the payload of a method frame into the method -pub fn parse_method(payload: &[u8]) -> Result { +pub fn parse_method(payload: &[u8]) -> Result { let nom_result = generated::parse::parse_method(payload); match nom_result { @@ -92,7 +67,7 @@ macro_rules! rand_random_method { rand_random_method!(bool, u8, i8, u16, i16, u32, i32, u64, i64, f32, f64); -impl RandomMethod for HashMap { +impl RandomMethod for Table { fn random(rng: &mut R) -> Self { let len = rng.gen_range(0..3); HashMap::from_iter((0..len).map(|_| (String::random(rng), FieldValue::random(rng)))) @@ -103,23 +78,23 @@ impl RandomMethod for FieldValue { fn random(rng: &mut R) -> Self { let index = rng.gen_range(0_u32..17); match index { - 0 => FieldValue::Boolean(RandomMethod::random(rng)), - 1 => FieldValue::ShortShortInt(RandomMethod::random(rng)), - 2 => FieldValue::ShortShortUInt(RandomMethod::random(rng)), - 3 => FieldValue::ShortInt(RandomMethod::random(rng)), - 4 => FieldValue::ShortUInt(RandomMethod::random(rng)), - 5 => FieldValue::LongInt(RandomMethod::random(rng)), - 6 => FieldValue::LongUInt(RandomMethod::random(rng)), - 7 => FieldValue::LongLongInt(RandomMethod::random(rng)), - 8 => FieldValue::LongLongUInt(RandomMethod::random(rng)), - 9 => FieldValue::Float(RandomMethod::random(rng)), - 10 => FieldValue::Double(RandomMethod::random(rng)), - 11 => FieldValue::ShortString(RandomMethod::random(rng)), - 12 => FieldValue::LongString(RandomMethod::random(rng)), - 13 => FieldValue::FieldArray(RandomMethod::random(rng)), - 14 => FieldValue::Timestamp(RandomMethod::random(rng)), - 15 => FieldValue::FieldTable(RandomMethod::random(rng)), - 16 => FieldValue::Void, + 0 => Self::Boolean(RandomMethod::random(rng)), + 1 => Self::ShortShortInt(RandomMethod::random(rng)), + 2 => Self::ShortShortUInt(RandomMethod::random(rng)), + 3 => Self::ShortInt(RandomMethod::random(rng)), + 4 => Self::ShortUInt(RandomMethod::random(rng)), + 5 => Self::LongInt(RandomMethod::random(rng)), + 6 => Self::LongUInt(RandomMethod::random(rng)), + 7 => Self::LongLongInt(RandomMethod::random(rng)), + 8 => Self::LongLongUInt(RandomMethod::random(rng)), + 9 => Self::Float(RandomMethod::random(rng)), + 10 => Self::Double(RandomMethod::random(rng)), + 11 => Self::ShortString(RandomMethod::random(rng)), + 12 => Self::LongString(RandomMethod::random(rng)), + 13 => Self::FieldArray(RandomMethod::random(rng)), + 14 => Self::Timestamp(RandomMethod::random(rng)), + 15 => Self::FieldTable(RandomMethod::random(rng)), + 16 => Self::Void, _ => unreachable!(), } } diff --git a/amqp_transport/src/methods/parse_helper.rs b/amqp_transport/src/methods/parse_helper.rs index 7cbe7f6..f59d694 100644 --- a/amqp_transport/src/methods/parse_helper.rs +++ b/amqp_transport/src/methods/parse_helper.rs @@ -1,7 +1,8 @@ use crate::error::{ConException, ProtocolError, TransError}; use crate::methods::generated::parse::IResult; -use crate::methods::generated::{ - Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp, +use amqp_core::methods::{ + Bit, FieldValue, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, TableFieldName, + Timestamp, }; use nom::branch::alt; use nom::bytes::complete::{tag, take}; @@ -56,7 +57,6 @@ macro_rules! fail { }; } -use crate::methods::{FieldValue, TableFieldName}; pub use fail; pub fn octet(input: &[u8]) -> IResult<'_, Octet> { diff --git a/amqp_transport/src/methods/write_helper.rs b/amqp_transport/src/methods/write_helper.rs index a7b6dba..d141120 100644 --- a/amqp_transport/src/methods/write_helper.rs +++ b/amqp_transport/src/methods/write_helper.rs @@ -1,8 +1,6 @@ use crate::error::TransError; -use crate::methods::generated::{ - Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp, -}; use crate::methods::FieldValue; +use amqp_core::methods::{Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp}; use anyhow::Context; use std::io::Write; diff --git a/amqp_transport/src/tests.rs b/amqp_transport/src/tests.rs index 3a88ae2..1f6dc71 100644 --- a/amqp_transport/src/tests.rs +++ b/amqp_transport/src/tests.rs @@ -1,6 +1,6 @@ use crate::frame::FrameType; -use crate::methods::{FieldValue, Method}; use crate::{frame, methods}; +use amqp_core::methods::{FieldValue, Method}; use std::collections::HashMap; #[tokio::test] diff --git a/xtask/Cargo.toml b/xtask/Cargo.toml index 1982620..be38f42 100644 --- a/xtask/Cargo.toml +++ b/xtask/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1.0.54" heck = "0.4.0" itertools = "0.10.3" strong-xml = "0.6.3" \ No newline at end of file diff --git a/xtask/src/amqp0-9-1.xml b/xtask/amqp0-9-1.xml similarity index 100% rename from xtask/src/amqp0-9-1.xml rename to xtask/amqp0-9-1.xml diff --git a/xtask/src/codegen/mod.rs b/xtask/src/codegen/mod.rs index 4fde8bd..f7a0d95 100644 --- a/xtask/src/codegen/mod.rs +++ b/xtask/src/codegen/mod.rs @@ -4,15 +4,15 @@ mod parser; mod random; mod write; +use anyhow::Context; use heck::ToUpperCamelCase; -use parser::codegen_parser; -use random::codegen_random; use std::fs; +use std::fs::File; +use std::io::Write; use std::iter::Peekable; use std::path::PathBuf; use std::str::FromStr; use strong_xml::XmlRead; -use write::codegen_write; #[derive(Debug, XmlRead)] #[xml(tag = "amqp")] @@ -101,196 +101,235 @@ struct Doc { kind: Option, } -pub fn main() { - let this_file = PathBuf::from_str(file!()).unwrap(); - let expected_location = this_file - .parent() - .unwrap() - .parent() - .unwrap() - .join("amqp0-9-1.xml"); - let content = fs::read_to_string(expected_location).unwrap(); - - let amqp = match Amqp::from_str(&content) { - Ok(amqp) => amqp, - Err(err) => { - eprintln!("{err}"); - std::process::exit(1); - } - }; - codegen(&amqp); +struct Codegen { + output: Box, } -fn codegen(amqp: &Amqp) { - println!("#![allow(dead_code)]"); - println!("// This file has been generated by `xtask/src/codegen`. Do not edit it manually.\n"); - codegen_domain_defs(amqp); - codegen_class_defs(amqp); - codegen_parser(amqp); - codegen_write(amqp); - codegen_random(amqp); -} +pub fn main() -> anyhow::Result<()> { + let this_file = PathBuf::from_str(file!()).context("own file path")?; + let xtask_root = this_file + .parent() + .context("codegen directory path")? + .parent() + .context("src directory path")? + .parent() + .context("xtask root path")?; + let amqp_spec = xtask_root.join("amqp0-9-1.xml"); + let project_root = xtask_root.parent().context("get project root parent")?; -fn codegen_domain_defs(amqp: &Amqp) { - for domain in &amqp.domains { - let invariants = invariants(domain.asserts.iter()); + let transport_generated_path = project_root.join("amqp_transport/src/methods/generated.rs"); + let core_generated_path = project_root.join("amqp_core/src/methods/generated.rs"); - if let Some(label) = &domain.label { - println!("/// {label}"); - } + let content = fs::read_to_string(amqp_spec).context("read amqp spec file")?; - if !invariants.is_empty() { - if domain.label.is_some() { - println!("///"); - } - println!("/// {invariants}"); - } - if !domain.doc.is_empty() { - println!("///"); - doc_comment(&domain.doc, 0); - } - println!( - "pub type {} = {};\n", - domain.name.to_upper_camel_case(), - amqp_type_to_rust_type(&domain.kind), - ); + let amqp = Amqp::from_str(&content).context("parse amqp spec file")?; + + let transport_output = + File::create(transport_generated_path).context("transport output file create")?; + let core_output = File::create(core_generated_path).context("core output file create")?; + + Codegen { + output: Box::new(transport_output), } + .transport_codegen(&amqp); + + Codegen { + output: Box::new(core_output), + } + .core_codegen(&amqp); + + Ok(()) } +impl Codegen { + fn transport_codegen(&mut self, amqp: &Amqp) { + writeln!(self.output, "#![allow(dead_code)]").ok(); + writeln!( + self.output, + "// This file has been generated by `xtask/src/codegen`. Do not edit it manually.\n" + ) + .ok(); + self.codegen_parser(amqp); + self.codegen_write(amqp); + self.codegen_random(amqp); + } -fn codegen_class_defs(amqp: &Amqp) { - println!("#[derive(Debug, Clone, PartialEq)]"); - println!("pub enum Method {{"); + fn core_codegen(&mut self, amqp: &Amqp) { + writeln!(self.output, "#![allow(dead_code)]").ok(); + writeln!( + self.output, + "// This file has been generated by `xtask/src/codegen`. Do not edit it manually.\n" + ) + .ok(); + self.codegen_domain_defs(amqp); + self.codegen_class_defs(amqp); + } - for class in &amqp.classes { - let enum_name = class.name.to_upper_camel_case(); - for method in &class.methods { - let method_name = method.name.to_upper_camel_case(); - doc_comment(&class.doc, 4); - doc_comment(&method.doc, 4); - print!(" {enum_name}{method_name}"); - if !method.fields.is_empty() { - println!(" {{"); - for field in &method.fields { - let field_name = snake_case(&field.name); - let (field_type, field_docs) = - get_invariants_with_type(field_type(field), field.asserts.as_ref()); - if !field_docs.is_empty() { - println!(" /// {field_docs}"); - if !field.doc.is_empty() { - println!(" ///"); - doc_comment(&field.doc, 8); - } - } else { - doc_comment(&field.doc, 8); - } - println!(" {field_name}: {field_type},"); + fn codegen_domain_defs(&mut self, amqp: &Amqp) { + for domain in &amqp.domains { + let invariants = self.invariants(domain.asserts.iter()); + + if let Some(label) = &domain.label { + writeln!(self.output, "/// {label}").ok(); + } + + if !invariants.is_empty() { + if domain.label.is_some() { + writeln!(self.output, "///").ok(); } - println!(" }},"); - } else { - println!(","); + writeln!(self.output, "/// {invariants}").ok(); } + if !domain.doc.is_empty() { + writeln!(self.output, "///").ok(); + self.doc_comment(&domain.doc, 0); + } + writeln!( + self.output, + "pub type {} = {};\n", + domain.name.to_upper_camel_case(), + self.amqp_type_to_rust_type(&domain.kind), + ) + .ok(); } } - println!("}}\n"); -} + fn codegen_class_defs(&mut self, amqp: &Amqp) { + writeln!(self.output, "#[derive(Debug, Clone, PartialEq)]").ok(); + writeln!(self.output, "pub enum Method {{").ok(); -fn amqp_type_to_rust_type(amqp_type: &str) -> &'static str { - match amqp_type { - "octet" => "u8", - "short" => "u16", - "long" => "u32", - "longlong" => "u64", - "bit" => "bool", - "shortstr" => "String", - "longstr" => "Vec", - "timestamp" => "u64", - "table" => "super::Table", - _ => unreachable!("invalid type {}", amqp_type), + for class in &amqp.classes { + let enum_name = class.name.to_upper_camel_case(); + for method in &class.methods { + let method_name = method.name.to_upper_camel_case(); + self.doc_comment(&class.doc, 4); + self.doc_comment(&method.doc, 4); + write!(self.output, " {enum_name}{method_name}").ok(); + if !method.fields.is_empty() { + writeln!(self.output, " {{").ok(); + for field in &method.fields { + let field_name = self.snake_case(&field.name); + let (field_type, field_docs) = self.get_invariants_with_type( + self.field_type(field), + field.asserts.as_ref(), + ); + if !field_docs.is_empty() { + writeln!(self.output, " /// {field_docs}").ok(); + if !field.doc.is_empty() { + writeln!(self.output, " ///").ok(); + self.doc_comment(&field.doc, 8); + } + } else { + self.doc_comment(&field.doc, 8); + } + writeln!(self.output, " {field_name}: {field_type},").ok(); + } + writeln!(self.output, " }},").ok(); + } else { + writeln!(self.output, ",").ok(); + } + } + } + + writeln!(self.output, "}}\n").ok(); } -} -fn field_type(field: &Field) -> &String { - field.domain.as_ref().or(field.kind.as_ref()).unwrap() -} - -fn resolve_type_from_domain(amqp: &Amqp, domain: &str) -> String { - amqp.domains - .iter() - .find(|d| d.name == domain) - .map(|d| d.kind.clone()) - .unwrap() -} - -/// returns (type name, invariant docs) -fn get_invariants_with_type(domain: &str, asserts: &[Assert]) -> (String, String) { - let additional_docs = invariants(asserts.iter()); - - let type_name = domain.to_upper_camel_case(); - - (type_name, additional_docs) -} - -fn snake_case(ident: &str) -> String { - use heck::ToSnakeCase; - - if ident == "type" { - "r#type".to_string() - } else { - ident.to_snake_case() + fn amqp_type_to_rust_type(&self, amqp_type: &str) -> &'static str { + match amqp_type { + "octet" => "u8", + "short" => "u16", + "long" => "u32", + "longlong" => "u64", + "bit" => "bool", + "shortstr" => "String", + "longstr" => "Vec", + "timestamp" => "u64", + "table" => "super::Table", + _ => unreachable!("invalid type {}", amqp_type), + } } -} -fn subsequent_bit_fields<'a>( - amqp: &Amqp, - bit_field: &'a Field, - iter: &mut Peekable>, -) -> Vec<&'a Field> { - let mut fields_with_bit = vec![bit_field]; + fn field_type<'a>(&self, field: &'a Field) -> &'a String { + field.domain.as_ref().or(field.kind.as_ref()).unwrap() + } - loop { - if iter - .peek() - .map(|f| resolve_type_from_domain(amqp, field_type(f)) == "bit") - .unwrap_or(false) - { - fields_with_bit.push(iter.next().unwrap()); + fn resolve_type_from_domain(&self, amqp: &Amqp, domain: &str) -> String { + amqp.domains + .iter() + .find(|d| d.name == domain) + .map(|d| d.kind.clone()) + .unwrap() + } + + /// returns (type name, invariant docs) + fn get_invariants_with_type(&self, domain: &str, asserts: &[Assert]) -> (String, String) { + let additional_docs = self.invariants(asserts.iter()); + + let type_name = domain.to_upper_camel_case(); + + (type_name, additional_docs) + } + + fn snake_case(&self, ident: &str) -> String { + use heck::ToSnakeCase; + + if ident == "type" { + "r#type".to_string() } else { - break; + ident.to_snake_case() } } - fields_with_bit -} -fn invariants<'a>(asserts: impl Iterator) -> String { - asserts - .map(|assert| match &*assert.check { - "notnull" => "must not be null".to_string(), - "length" => format!("must be shorter than {}", assert.value.as_ref().unwrap()), - "regexp" => format!("must match `{}`", assert.value.as_ref().unwrap()), - "le" => { - format!( - "must be less than the {} field of the method {}", - assert.method.as_ref().unwrap(), - assert.field.as_ref().unwrap() - ) + fn subsequent_bit_fields<'a>( + &self, + bit_field: &'a Field, + iter: &mut Peekable>, + amqp: &Amqp, + ) -> Vec<&'a Field> { + let mut fields_with_bit = vec![bit_field]; + + loop { + if iter + .peek() + .map(|f| self.resolve_type_from_domain(amqp, self.field_type(f)) == "bit") + .unwrap_or(false) + { + fields_with_bit.push(iter.next().unwrap()); + } else { + break; } - _ => unimplemented!(), - }) - .collect::>() - .join(", ") -} - -fn doc_comment(docs: &[Doc], indent: usize) { - for doc in docs { - if doc.kind == Some("grammar".to_string()) { - continue; } - for line in doc.text.lines() { - let line = line.trim(); - if !line.is_empty() { - let indent = " ".repeat(indent); - println!("{indent}/// {line}"); + fields_with_bit + } + + fn invariants<'a>(&self, asserts: impl Iterator) -> String { + asserts + .map(|assert| match &*assert.check { + "notnull" => "must not be null".to_string(), + "length" => format!("must be shorter than {}", assert.value.as_ref().unwrap()), + "regexp" => format!("must match `{}`", assert.value.as_ref().unwrap()), + "le" => { + format!( + "must be less than the {} field of the method {}", + assert.method.as_ref().unwrap(), + assert.field.as_ref().unwrap() + ) + } + _ => unimplemented!(), + }) + .collect::>() + .join(", ") + } + + fn doc_comment(&mut self, docs: &[Doc], indent: usize) { + for doc in docs { + if doc.kind == Some("grammar".to_string()) { + continue; + } + for line in doc.text.lines() { + let line = line.trim(); + if !line.is_empty() { + let indent = " ".repeat(indent); + writeln!(self.output, "{indent}/// {line}").ok(); + } } } } diff --git a/xtask/src/codegen/parser.rs b/xtask/src/codegen/parser.rs index 5ced7fd..58ed2b2 100644 --- a/xtask/src/codegen/parser.rs +++ b/xtask/src/codegen/parser.rs @@ -1,7 +1,5 @@ -use super::{ - field_type, resolve_type_from_domain, snake_case, subsequent_bit_fields, Amqp, Assert, Class, - Domain, Method, -}; +use super::{Amqp, Assert, Class, Domain, Method}; +use crate::codegen::Codegen; use heck::{ToSnakeCase, ToUpperCamelCase}; use itertools::Itertools; @@ -17,10 +15,12 @@ fn domain_function_name(domain_name: &str) -> String { format!("domain_{domain_name}") } -pub(super) fn codegen_parser(amqp: &Amqp) { - println!( - "pub mod parse {{ -use super::*; +impl Codegen { + pub(super) fn codegen_parser(&mut self, amqp: &Amqp) { + writeln!( + self.output, + "pub mod parse {{ +use amqp_core::methods::*; use crate::methods::parse_helper::*; use crate::error::TransError; use nom::{{branch::alt, bytes::complete::tag}}; @@ -29,25 +29,29 @@ use once_cell::sync::Lazy; pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; " - ); - println!( - "pub fn parse_method(input: &[u8]) -> Result<(&[u8], Method), nom::Err> {{ + ) + .ok(); + writeln!( + self.output, + "pub fn parse_method(input: &[u8]) -> Result<(&[u8], Method), nom::Err> {{ alt(({}))(input) }}", - amqp.classes - .iter() - .map(|class| class.name.to_snake_case()) - .join(", ") - ); + amqp.classes + .iter() + .map(|class| class.name.to_snake_case()) + .join(", ") + ) + .ok(); - for domain in &amqp.domains { - domain_parser(domain); - } + for domain in &amqp.domains { + self.domain_parser(domain); + } - for class in &amqp.classes { - let class_name = class.name.to_snake_case(); + for class in &amqp.classes { + let class_name = class.name.to_snake_case(); + + self.function(&class_name, "Method"); - function(&class_name, "Method", || { let class_index = class.index; let all_methods = class .methods @@ -55,126 +59,158 @@ pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; .map(method_function_name(&class_name)) .join(", "); let class_name_raw = &class.name; - println!( + writeln!( + self.output, r#" let (input, _) = tag({class_index}_u16.to_be_bytes())(input)?; alt(({all_methods}))(input).map_err(fail_err("class {class_name_raw}"))"# - ); - }); + ) + .ok(); - for method in &class.methods { - method_parser(amqp, class, method); + writeln!(self.output, "}}").ok(); + + for method in &class.methods { + self.method_parser(class, method, amqp); + } + } + + writeln!(self.output, "\n}}").ok(); + } + + fn domain_parser(&mut self, domain: &Domain) { + let fn_name = domain_function_name(&domain.name); + let type_name = domain.kind.to_snake_case(); + // don't even bother with bit domains, do them manually at call site + if type_name != "bit" { + self.function(&fn_name, &domain.name.to_upper_camel_case()); + + if domain.asserts.is_empty() { + writeln!(self.output, " {type_name}(input)").ok(); + } else { + writeln!( + self.output, + " let (input, result) = {type_name}(input)?;" + ) + .ok(); + + for assert in &domain.asserts { + self.assert_check(assert, &type_name, "result"); + } + writeln!(self.output, " Ok((input, result))").ok(); + } + + writeln!(self.output, "}}").ok(); } } - println!("\n}}"); -} + fn method_parser(&mut self, class: &Class, method: &Method, amqp: &Amqp) { + let class_name = class.name.to_snake_case(); + let method_name_raw = &method.name; -fn domain_parser(domain: &Domain) { - let fn_name = domain_function_name(&domain.name); - let type_name = domain.kind.to_snake_case(); - // don't even bother with bit domains, do them manually at call site - if type_name != "bit" { - function(&fn_name, &domain.name.to_upper_camel_case(), || { - if domain.asserts.is_empty() { - println!(" {type_name}(input)"); - } else { - println!(" let (input, result) = {type_name}(input)?;"); - - for assert in &domain.asserts { - assert_check(assert, &type_name, "result"); - } - println!(" Ok((input, result))"); - } - }); - } -} - -fn method_parser(amqp: &Amqp, class: &Class, method: &Method) { - let class_name = class.name.to_snake_case(); - let method_name_raw = &method.name; - - let function_name = method_function_name(&class_name)(method); - function(&function_name, "Method", || { + let function_name = method_function_name(&class_name)(method); + self.function(&function_name, "Method"); let method_index = method.index; - println!(r#" let (input, _) = tag({method_index}_u16.to_be_bytes())(input)?;"#); + writeln!( + self.output, + r#" let (input, _) = tag({method_index}_u16.to_be_bytes())(input)?;"# + ) + .ok(); let mut iter = method.fields.iter().peekable(); while let Some(field) = iter.next() { let field_name_raw = &field.name; - let type_name = resolve_type_from_domain(amqp, field_type(field)); + let type_name = self.resolve_type_from_domain(amqp, self.field_type(field)); if type_name == "bit" { - let fields_with_bit = subsequent_bit_fields(amqp, field, &mut iter); + let fields_with_bit = self.subsequent_bit_fields(field, &mut iter, amqp); let amount = fields_with_bit.len(); - println!( + writeln!( + self.output, r#" let (input, bits) = bit(input, {amount}).map_err(fail_err("field {field_name_raw} in method {method_name_raw}"))?;"# - ); + ).ok(); for (i, field) in fields_with_bit.iter().enumerate() { - let field_name = snake_case(&field.name); - println!(" let {field_name} = bits[{i}];"); + let field_name = self.snake_case(&field.name); + writeln!(self.output, " let {field_name} = bits[{i}];").ok(); } } else { - let fn_name = domain_function_name(field_type(field)); - let field_name = snake_case(&field.name); - println!( + let fn_name = domain_function_name(self.field_type(field)); + let field_name = self.snake_case(&field.name); + writeln!( + self.output, r#" let (input, {field_name}) = {fn_name}(input).map_err(fail_err("field {field_name_raw} in method {method_name_raw}"))?;"# - ); + ).ok(); for assert in &field.asserts { - assert_check(assert, &type_name, &field_name); + self.assert_check(assert, &type_name, &field_name); } } } let class_name = class_name.to_upper_camel_case(); let method_name = method.name.to_upper_camel_case(); - println!(" Ok((input, Method::{class_name}{method_name} {{"); + writeln!( + self.output, + " Ok((input, Method::{class_name}{method_name} {{" + ) + .ok(); for field in &method.fields { - let field_name = snake_case(&field.name); - println!(" {field_name},"); + let field_name = self.snake_case(&field.name); + writeln!(self.output, " {field_name},").ok(); } - println!(" }}))"); - }); -} + writeln!(self.output, " }}))").ok(); -fn assert_check(assert: &Assert, type_name: &str, var_name: &str) { - match &*assert.check { - "notnull" => match type_name { - "shortstr" | "longstr" => { - println!( - r#" if {var_name}.is_empty() {{ fail!("string was null for field {var_name}") }}"# - ); + writeln!(self.output, "}}").ok(); + } + + fn assert_check(&mut self, assert: &Assert, type_name: &str, var_name: &str) { + match &*assert.check { + "notnull" => match type_name { + "shortstr" | "longstr" => { + writeln!( + self.output, + r#" if {var_name}.is_empty() {{ fail!("string was null for field {var_name}") }}"# + ).ok(); + } + "short" => { + writeln!( + self.output, + r#" if {var_name} == 0 {{ fail!("number was 0 for field {var_name}") }}"# + ) + .ok(); + } + _ => unimplemented!(), + }, + "regexp" => { + let value = assert.value.as_ref().unwrap(); + writeln!( + self.output, + r#" static REGEX: Lazy = Lazy::new(|| Regex::new(r"{value}").unwrap());"# + ).ok(); + let cause = format!("regex `{value}` did not match value for field {var_name}"); + writeln!( + self.output, + r#" if !REGEX.is_match(&{var_name}) {{ fail!(r"{cause}") }}"# + ) + .ok(); } - "short" => { - println!( - r#" if {var_name} == 0 {{ fail!("number was 0 for field {var_name}") }}"# - ); + "le" => {} // can't validate this here + "length" => { + let length = assert.value.as_ref().unwrap(); + let cause = format!("value is shorter than {length} for field {var_name}"); + writeln!( + self.output, + r#" if {var_name}.len() > {length} {{ fail!("{cause}") }}"# + ) + .ok(); } _ => unimplemented!(), - }, - "regexp" => { - let value = assert.value.as_ref().unwrap(); - println!( - r#" static REGEX: Lazy = Lazy::new(|| Regex::new(r"{value}").unwrap());"# - ); - let cause = format!("regex `{value}` did not match value for field {var_name}"); - println!(r#" if !REGEX.is_match(&{var_name}) {{ fail!(r"{cause}") }}"#); } - "le" => {} // can't validate this here - "length" => { - let length = assert.value.as_ref().unwrap(); - let cause = format!("value is shorter than {length} for field {var_name}"); - println!(r#" if {var_name}.len() > {length} {{ fail!("{cause}") }}"#); - } - _ => unimplemented!(), + } + + fn function(&mut self, name: &str, ret_ty: &str) { + writeln!( + self.output, + "fn {name}(input: &[u8]) -> IResult<'_, {ret_ty}> {{" + ) + .ok(); } } - -fn function(name: &str, ret_ty: &str, body: F) -where - F: FnOnce(), -{ - println!("fn {name}(input: &[u8]) -> IResult<'_, {ret_ty}> {{"); - body(); - println!("}}"); -} diff --git a/xtask/src/codegen/random.rs b/xtask/src/codegen/random.rs index 9db17b6..6dbb287 100644 --- a/xtask/src/codegen/random.rs +++ b/xtask/src/codegen/random.rs @@ -1,59 +1,78 @@ -use super::{snake_case, Amqp}; +use crate::codegen::{Amqp, Codegen}; use heck::ToUpperCamelCase; -pub(super) fn codegen_random(amqp: &Amqp) { - println!( - " +impl Codegen { + pub fn codegen_random(&mut self, amqp: &Amqp) { + writeln!( + self.output, + " mod random {{ use rand::Rng; +use amqp_core::methods::*; use crate::methods::RandomMethod; -use super::*; " - ); + ) + .ok(); + + writeln!( + self.output, + "impl RandomMethod for Method {{ + #[allow(unused_variables)] + fn random(rng: &mut R) -> Self {{" + ) + .ok(); - impl_random("Method", || { let class_lens = amqp.classes.len(); - println!(" match rng.gen_range(0u32..{class_lens}) {{"); + writeln!( + self.output, + " match rng.gen_range(0u32..{class_lens}) {{" + ) + .ok(); for (i, class) in amqp.classes.iter().enumerate() { let class_name = class.name.to_upper_camel_case(); - println!(" {i} => {{"); + writeln!(self.output, " {i} => {{").ok(); let method_len = class.methods.len(); - println!(" match rng.gen_range(0u32..{method_len}) {{"); + writeln!( + self.output, + " match rng.gen_range(0u32..{method_len}) {{" + ) + .ok(); for (i, method) in class.methods.iter().enumerate() { let method_name = method.name.to_upper_camel_case(); - println!(" {i} => Method::{class_name}{method_name} {{"); + writeln!( + self.output, + " {i} => Method::{class_name}{method_name} {{" + ) + .ok(); for field in &method.fields { - let field_name = snake_case(&field.name); - println!(" {field_name}: RandomMethod::random(rng),"); + let field_name = self.snake_case(&field.name); + writeln!( + self.output, + " {field_name}: RandomMethod::random(rng)," + ) + .ok(); } - println!(" }},"); + writeln!(self.output, " }},").ok(); } - println!( + writeln!( + self.output, " _ => unreachable!(), }}" - ); + ) + .ok(); - println!(" }}"); + writeln!(self.output, " }}").ok(); } - println!( + writeln!( + self.output, " _ => unreachable!(), }}" - ); - }); + ) + .ok(); + writeln!(self.output, " }}\n}}").ok(); - println!("}}"); -} - -fn impl_random(name: &str, body: impl FnOnce()) { - println!( - "impl RandomMethod for {name} {{ - #[allow(unused_variables)] - fn random(rng: &mut R) -> Self {{" - ); - - body(); - - println!(" }}\n}}"); + writeln!(self.output, "}}").ok(); + } } diff --git a/xtask/src/codegen/write.rs b/xtask/src/codegen/write.rs index 6ba6fbc..104e5df 100644 --- a/xtask/src/codegen/write.rs +++ b/xtask/src/codegen/write.rs @@ -1,58 +1,72 @@ -use super::{field_type, resolve_type_from_domain, snake_case, subsequent_bit_fields, Amqp}; +use crate::codegen::{Amqp, Codegen}; use heck::ToUpperCamelCase; -pub(super) fn codegen_write(amqp: &Amqp) { - println!( - "pub mod write {{ -use super::*; +impl Codegen { + pub fn codegen_write(&mut self, amqp: &Amqp) { + writeln!( + self.output, + "pub mod write {{ +use amqp_core::methods::*; use crate::methods::write_helper::*; use crate::error::TransError; use std::io::Write; pub fn write_method(method: Method, mut writer: W) -> Result<(), TransError> {{ match method {{" - ); + ) + .ok(); - for class in &amqp.classes { - let class_name = class.name.to_upper_camel_case(); - let class_index = class.index; - for method in &class.methods { - let method_name = method.name.to_upper_camel_case(); - let method_index = method.index; - println!(" Method::{class_name}{method_name} {{"); - for field in &method.fields { - let field_name = snake_case(&field.name); - println!(" {field_name},"); - } - println!(" }} => {{"); - let [ci0, ci1] = class_index.to_be_bytes(); - let [mi0, mi1] = method_index.to_be_bytes(); - println!(" writer.write_all(&[{ci0}, {ci1}, {mi0}, {mi1}])?;"); - let mut iter = method.fields.iter().peekable(); - - while let Some(field) = iter.next() { - let field_name = snake_case(&field.name); - let type_name = resolve_type_from_domain(amqp, field_type(field)); - if type_name == "bit" { - let fields_with_bit = subsequent_bit_fields(amqp, field, &mut iter); - print!(" bit(&["); - for field in fields_with_bit { - let field_name = snake_case(&field.name); - print!("{field_name}, "); - } - println!("], &mut writer)?;"); - } else { - println!(" {type_name}({field_name}, &mut writer)?;"); + for class in &amqp.classes { + let class_name = class.name.to_upper_camel_case(); + let class_index = class.index; + for method in &class.methods { + let method_name = method.name.to_upper_camel_case(); + let method_index = method.index; + writeln!(self.output, " Method::{class_name}{method_name} {{").ok(); + for field in &method.fields { + let field_name = self.snake_case(&field.name); + writeln!(self.output, " {field_name},").ok(); } - } - println!(" }}"); - } - } + writeln!(self.output, " }} => {{").ok(); + let [ci0, ci1] = class_index.to_be_bytes(); + let [mi0, mi1] = method_index.to_be_bytes(); + writeln!( + self.output, + " writer.write_all(&[{ci0}, {ci1}, {mi0}, {mi1}])?;" + ) + .ok(); + let mut iter = method.fields.iter().peekable(); - println!( - " }} + while let Some(field) = iter.next() { + let field_name = self.snake_case(&field.name); + let type_name = self.resolve_type_from_domain(amqp, self.field_type(field)); + if type_name == "bit" { + let fields_with_bit = self.subsequent_bit_fields(field, &mut iter, amqp); + write!(self.output, " bit(&[").ok(); + for field in fields_with_bit { + let field_name = self.snake_case(&field.name); + write!(self.output, "{field_name}, ").ok(); + } + writeln!(self.output, "], &mut writer)?;").ok(); + } else { + writeln!( + self.output, + " {type_name}({field_name}, &mut writer)?;" + ) + .ok(); + } + } + writeln!(self.output, " }}").ok(); + } + } + + writeln!( + self.output, + " }} Ok(()) }} }}" - ); + ) + .ok(); + } } diff --git a/xtask/src/main.rs b/xtask/src/main.rs index 264c4d6..dfdacf5 100644 --- a/xtask/src/main.rs +++ b/xtask/src/main.rs @@ -1,22 +1,24 @@ mod codegen; -fn main() { +fn main() -> anyhow::Result<()> { let command = std::env::args().nth(1).unwrap_or_else(|| { - eprintln!("No task provided"); + eprintln!("Error: No task provided"); help(); std::process::exit(1); }); match command.as_str() { "generate" | "gen" => codegen::main(), - _ => eprintln!("Unknown command {command}."), + _ => { + eprintln!("Unknown command {command}."); + Ok(()) + } } } fn help() { println!( "Available tasks: -generate - Generate amqp method code in `amqp_transport/src/methods/generated.rs. - Dumps code to stdout and should be redirected manually." + generate, gen - Generate amqp method code in `amqp_transport/src/methods/generated.rs and amqp_core/src/methods/generated.rs" ); }