split generated code so that methods are now in core

This commit is contained in:
nora 2022-02-20 21:22:30 +01:00
parent 3b656b911a
commit c333f20531
20 changed files with 1337 additions and 1206 deletions

5
Cargo.lock generated
View file

@ -80,9 +80,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.53"
version = "1.0.54"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0"
checksum = "7a99269dff3bc004caa411f38845c20303f1e393ca2bd6581576fa3a7f59577d"
[[package]]
name = "async-trait"
@ -1463,6 +1463,7 @@ checksum = "114ba2b24d2167ef6d67d7d04c8cc86522b87f490025f39f0303b7db5bf5e3d8"
name = "xtask"
version = "0.1.0"
dependencies = [
"anyhow",
"heck",
"itertools",
"strong-xml",

View file

@ -1,5 +1,7 @@
#![warn(rust_2018_idioms)]
pub mod methods;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::net::SocketAddr;

View file

@ -0,0 +1,786 @@
#![allow(dead_code)]
// This file has been generated by `xtask/src/codegen`. Do not edit it manually.
pub type ClassId = u16;
/// consumer tag
///
/// Identifier for the consumer, valid within the current channel.
pub type ConsumerTag = String;
/// server-assigned delivery tag
///
/// The server-assigned and channel-specific delivery tag
pub type DeliveryTag = u64;
/// exchange name
///
/// must be shorter than 127, must match `^[a-zA-Z0-9-_.:]*$`
///
/// The exchange name is a client-selected string that identifies the exchange for
/// publish methods.
pub type ExchangeName = String;
pub type MethodId = u16;
/// no acknowledgement needed
///
/// If this field is set the server does not expect acknowledgements for
/// messages. That is, when a message is delivered to the client the server
/// assumes the delivery will succeed and immediately dequeues it. This
/// functionality may increase performance but at the cost of reliability.
/// Messages can get lost if a client dies before they are delivered to the
/// application.
pub type NoAck = bool;
/// do not deliver own messages
///
/// If the no-local field is set the server will not send messages to the connection that
/// published them.
pub type NoLocal = bool;
/// do not send reply method
///
/// If set, the server will not respond to the method. The client should not wait
/// for a reply method. If the server could not complete the method it will raise a
/// channel or connection exception.
pub type NoWait = bool;
/// must not be null, must be shorter than 127
///
/// Unconstrained.
pub type Path = String;
///
/// This table provides a set of peer properties, used for identification, debugging,
/// and general information.
pub type PeerProperties = super::Table;
/// queue name
///
/// must be shorter than 127, must match `^[a-zA-Z0-9-_.:]*$`
///
/// The queue name identifies the queue within the vhost. In methods where the queue
/// name may be blank, and that has no specific significance, this refers to the
/// 'current' queue for the channel, meaning the last queue that the client declared
/// on the channel. If the client did not declare a queue, and the method needs a
/// queue name, this will result in a 502 (syntax error) channel exception.
pub type QueueName = String;
/// message is being redelivered
///
/// This indicates that the message has been previously delivered to this or
/// another client.
pub type Redelivered = bool;
/// number of messages in queue
///
/// The number of messages in the queue, which will be zero for newly-declared
/// queues. This is the number of messages present in the queue, and committed
/// if the channel on which they were published is transacted, that are not
/// waiting acknowledgement.
pub type MessageCount = u32;
/// reply code from server
///
/// must not be null
///
/// The reply code. The AMQ reply codes are defined as constants at the start
/// of this formal specification.
pub type ReplyCode = u16;
/// localised reply text
///
/// must not be null
///
/// The localised reply text. This text can be logged as an aid to resolving
/// issues.
pub type ReplyText = String;
/// single bit
pub type Bit = bool;
/// single octet
pub type Octet = u8;
/// 16-bit integer
pub type Short = u16;
/// 32-bit integer
pub type Long = u32;
/// 64-bit integer
pub type Longlong = u64;
/// short string (max. 256 characters)
pub type Shortstr = String;
/// long string
pub type Longstr = Vec<u8>;
/// 64-bit timestamp
pub type Timestamp = u64;
/// field table
pub type Table = super::Table;
#[derive(Debug, Clone, PartialEq)]
pub enum Method {
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method starts the connection negotiation process by telling the client the
/// protocol version that the server proposes, along with a list of security mechanisms
/// which the client can use for authentication.
ConnectionStart {
/// The major version number can take any value from 0 to 99 as defined in the
/// AMQP specification.
version_major: Octet,
/// The minor version number can take any value from 0 to 99 as defined in the
/// AMQP specification.
version_minor: Octet,
server_properties: PeerProperties,
/// must not be null
///
/// A list of the security mechanisms that the server supports, delimited by spaces.
mechanisms: Longstr,
/// must not be null
///
/// A list of the message locales that the server supports, delimited by spaces. The
/// locale defines the language in which the server will send reply texts.
locales: Longstr,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method selects a SASL security mechanism.
ConnectionStartOk {
client_properties: PeerProperties,
/// must not be null
///
/// A single security mechanisms selected by the client, which must be one of those
/// specified by the server.
mechanism: Shortstr,
/// must not be null
///
/// A block of opaque data passed to the security mechanism. The contents of this
/// data are defined by the SASL security mechanism.
response: Longstr,
/// must not be null
///
/// A single message locale selected by the client, which must be one of those
/// specified by the server.
locale: Shortstr,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// The SASL protocol works by exchanging challenges and responses until both peers have
/// received sufficient information to authenticate each other. This method challenges
/// the client to provide more information.
ConnectionSecure {
/// Challenge information, a block of opaque binary data passed to the security
/// mechanism.
challenge: Longstr,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method attempts to authenticate, passing a block of SASL data for the security
/// mechanism at the server side.
ConnectionSecureOk {
/// must not be null
///
/// A block of opaque data passed to the security mechanism. The contents of this
/// data are defined by the SASL security mechanism.
response: Longstr,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method proposes a set of connection configuration values to the client. The
/// client can accept and/or adjust these.
ConnectionTune {
/// Specifies highest channel number that the server permits. Usable channel numbers
/// are in the range 1..channel-max. Zero indicates no specified limit.
channel_max: Short,
/// The largest frame size that the server proposes for the connection, including
/// frame header and end-byte. The client can negotiate a lower value. Zero means
/// that the server does not impose any specific limit but may reject very large
/// frames if it cannot allocate resources for them.
frame_max: Long,
/// The delay, in seconds, of the connection heartbeat that the server wants.
/// Zero means the server does not want a heartbeat.
heartbeat: Short,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method sends the client's connection tuning parameters to the server.
/// Certain fields are negotiated, others provide capability information.
ConnectionTuneOk {
/// must not be null, must be less than the tune field of the method channel-max
///
/// The maximum total number of channels that the client will use per connection.
channel_max: Short,
/// The largest frame size that the client and server will use for the connection.
/// Zero means that the client does not impose any specific limit but may reject
/// very large frames if it cannot allocate resources for them. Note that the
/// frame-max limit applies principally to content frames, where large contents can
/// be broken into frames of arbitrary size.
frame_max: Long,
/// The delay, in seconds, of the connection heartbeat that the client wants. Zero
/// means the client does not want a heartbeat.
heartbeat: Short,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method opens a connection to a virtual host, which is a collection of
/// resources, and acts to separate multiple application domains within a server.
/// The server may apply arbitrary limits per virtual host, such as the number
/// of each type of entity that may be used, per connection and/or in total.
ConnectionOpen {
/// The name of the virtual host to work with.
virtual_host: Path,
reserved_1: Shortstr,
reserved_2: Bit,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method signals to the client that the connection is ready for use.
ConnectionOpenOk { reserved_1: Shortstr },
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method indicates that the sender wants to close the connection. This may be
/// due to internal conditions (e.g. a forced shut-down) or due to an error handling
/// a specific method, i.e. an exception. When a close is due to an exception, the
/// sender provides the class and method id of the method which caused the exception.
ConnectionClose {
reply_code: ReplyCode,
reply_text: ReplyText,
/// When the close is provoked by a method exception, this is the class of the
/// method.
class_id: ClassId,
/// When the close is provoked by a method exception, this is the ID of the method.
method_id: MethodId,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method confirms a Connection.Close method and tells the recipient that it is
/// safe to release resources for the connection and close the socket.
ConnectionCloseOk,
/// The channel class provides methods for a client to establish a channel to a
/// server and for both peers to operate the channel thereafter.
/// This method opens a channel to the server.
ChannelOpen { reserved_1: Shortstr },
/// The channel class provides methods for a client to establish a channel to a
/// server and for both peers to operate the channel thereafter.
/// This method signals to the client that the channel is ready for use.
ChannelOpenOk { reserved_1: Longstr },
/// The channel class provides methods for a client to establish a channel to a
/// server and for both peers to operate the channel thereafter.
/// This method asks the peer to pause or restart the flow of content data sent by
/// a consumer. This is a simple flow-control mechanism that a peer can use to avoid
/// overflowing its queues or otherwise finding itself receiving more messages than
/// it can process. Note that this method is not intended for window control. It does
/// not affect contents returned by Basic.Get-Ok methods.
ChannelFlow {
/// If 1, the peer starts sending content frames. If 0, the peer stops sending
/// content frames.
active: Bit,
},
/// The channel class provides methods for a client to establish a channel to a
/// server and for both peers to operate the channel thereafter.
/// Confirms to the peer that a flow command was received and processed.
ChannelFlowOk {
/// Confirms the setting of the processed flow method: 1 means the peer will start
/// sending or continue to send content frames; 0 means it will not.
active: Bit,
},
/// The channel class provides methods for a client to establish a channel to a
/// server and for both peers to operate the channel thereafter.
/// This method indicates that the sender wants to close the channel. This may be due to
/// internal conditions (e.g. a forced shut-down) or due to an error handling a specific
/// method, i.e. an exception. When a close is due to an exception, the sender provides
/// the class and method id of the method which caused the exception.
ChannelClose {
reply_code: ReplyCode,
reply_text: ReplyText,
/// When the close is provoked by a method exception, this is the class of the
/// method.
class_id: ClassId,
/// When the close is provoked by a method exception, this is the ID of the method.
method_id: MethodId,
},
/// The channel class provides methods for a client to establish a channel to a
/// server and for both peers to operate the channel thereafter.
/// This method confirms a Channel.Close method and tells the recipient that it is safe
/// to release resources for the channel.
ChannelCloseOk,
/// Exchanges match and distribute messages across queues. Exchanges can be configured in
/// the server or declared at runtime.
/// This method creates an exchange if it does not already exist, and if the exchange
/// exists, verifies that it is of the correct and expected class.
ExchangeDeclare {
reserved_1: Short,
/// must not be null
exchange: ExchangeName,
/// Each exchange belongs to one of a set of exchange types implemented by the
/// server. The exchange types define the functionality of the exchange - i.e. how
/// messages are routed through it. It is not valid or meaningful to attempt to
/// change the type of an existing exchange.
r#type: Shortstr,
/// If set, the server will reply with Declare-Ok if the exchange already
/// exists with the same name, and raise an error if not. The client can
/// use this to check whether an exchange exists without modifying the
/// server state. When set, all other method fields except name and no-wait
/// are ignored. A declare with both passive and no-wait has no effect.
/// Arguments are compared for semantic equivalence.
passive: Bit,
/// If set when creating a new exchange, the exchange will be marked as durable.
/// Durable exchanges remain active when a server restarts. Non-durable exchanges
/// (transient exchanges) are purged if/when a server restarts.
durable: Bit,
reserved_2: Bit,
reserved_3: Bit,
no_wait: NoWait,
/// A set of arguments for the declaration. The syntax and semantics of these
/// arguments depends on the server implementation.
arguments: Table,
},
/// Exchanges match and distribute messages across queues. Exchanges can be configured in
/// the server or declared at runtime.
/// This method confirms a Declare method and confirms the name of the exchange,
/// essential for automatically-named exchanges.
ExchangeDeclareOk,
/// Exchanges match and distribute messages across queues. Exchanges can be configured in
/// the server or declared at runtime.
/// This method deletes an exchange. When an exchange is deleted all queue bindings on
/// the exchange are cancelled.
ExchangeDelete {
reserved_1: Short,
/// must not be null
exchange: ExchangeName,
/// If set, the server will only delete the exchange if it has no queue bindings. If
/// the exchange has queue bindings the server does not delete it but raises a
/// channel exception instead.
if_unused: Bit,
no_wait: NoWait,
},
/// Exchanges match and distribute messages across queues. Exchanges can be configured in
/// the server or declared at runtime.
/// This method confirms the deletion of an exchange.
ExchangeDeleteOk,
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method creates or checks a queue. When creating a new queue the client can
/// specify various properties that control the durability of the queue and its
/// contents, and the level of sharing for the queue.
QueueDeclare {
reserved_1: Short,
queue: QueueName,
/// If set, the server will reply with Declare-Ok if the queue already
/// exists with the same name, and raise an error if not. The client can
/// use this to check whether a queue exists without modifying the
/// server state. When set, all other method fields except name and no-wait
/// are ignored. A declare with both passive and no-wait has no effect.
/// Arguments are compared for semantic equivalence.
passive: Bit,
/// If set when creating a new queue, the queue will be marked as durable. Durable
/// queues remain active when a server restarts. Non-durable queues (transient
/// queues) are purged if/when a server restarts. Note that durable queues do not
/// necessarily hold persistent messages, although it does not make sense to send
/// persistent messages to a transient queue.
durable: Bit,
/// Exclusive queues may only be accessed by the current connection, and are
/// deleted when that connection closes. Passive declaration of an exclusive
/// queue by other connections are not allowed.
exclusive: Bit,
/// If set, the queue is deleted when all consumers have finished using it. The last
/// consumer can be cancelled either explicitly or because its channel is closed. If
/// there was no consumer ever on the queue, it won't be deleted. Applications can
/// explicitly delete auto-delete queues using the Delete method as normal.
auto_delete: Bit,
no_wait: NoWait,
/// A set of arguments for the declaration. The syntax and semantics of these
/// arguments depends on the server implementation.
arguments: Table,
},
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method confirms a Declare method and confirms the name of the queue, essential
/// for automatically-named queues.
QueueDeclareOk {
/// must not be null
///
/// Reports the name of the queue. If the server generated a queue name, this field
/// contains that name.
queue: QueueName,
message_count: MessageCount,
/// Reports the number of active consumers for the queue. Note that consumers can
/// suspend activity (Channel.Flow) in which case they do not appear in this count.
consumer_count: Long,
},
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method binds a queue to an exchange. Until a queue is bound it will not
/// receive any messages. In a classic messaging model, store-and-forward queues
/// are bound to a direct exchange and subscription queues are bound to a topic
/// exchange.
QueueBind {
reserved_1: Short,
/// Specifies the name of the queue to bind.
queue: QueueName,
exchange: ExchangeName,
/// Specifies the routing key for the binding. The routing key is used for routing
/// messages depending on the exchange configuration. Not all exchanges use a
/// routing key - refer to the specific exchange documentation. If the queue name
/// is empty, the server uses the last queue declared on the channel. If the
/// routing key is also empty, the server uses this queue name for the routing
/// key as well. If the queue name is provided but the routing key is empty, the
/// server does the binding with that empty routing key. The meaning of empty
/// routing keys depends on the exchange implementation.
routing_key: Shortstr,
no_wait: NoWait,
/// A set of arguments for the binding. The syntax and semantics of these arguments
/// depends on the exchange class.
arguments: Table,
},
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method confirms that the bind was successful.
QueueBindOk,
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method unbinds a queue from an exchange.
QueueUnbind {
reserved_1: Short,
/// Specifies the name of the queue to unbind.
queue: QueueName,
/// The name of the exchange to unbind from.
exchange: ExchangeName,
/// Specifies the routing key of the binding to unbind.
routing_key: Shortstr,
/// Specifies the arguments of the binding to unbind.
arguments: Table,
},
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method confirms that the unbind was successful.
QueueUnbindOk,
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method removes all messages from a queue which are not awaiting
/// acknowledgment.
QueuePurge {
reserved_1: Short,
/// Specifies the name of the queue to purge.
queue: QueueName,
no_wait: NoWait,
},
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method confirms the purge of a queue.
QueuePurgeOk {
/// Reports the number of messages purged.
message_count: MessageCount,
},
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method deletes a queue. When a queue is deleted any pending messages are sent
/// to a dead-letter queue if this is defined in the server configuration, and all
/// consumers on the queue are cancelled.
QueueDelete {
reserved_1: Short,
/// Specifies the name of the queue to delete.
queue: QueueName,
/// If set, the server will only delete the queue if it has no consumers. If the
/// queue has consumers the server does does not delete it but raises a channel
/// exception instead.
if_unused: Bit,
/// If set, the server will only delete the queue if it has no messages.
if_empty: Bit,
no_wait: NoWait,
},
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method confirms the deletion of a queue.
QueueDeleteOk {
/// Reports the number of messages deleted.
message_count: MessageCount,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method requests a specific quality of service. The QoS can be specified for the
/// current channel or for all channels on the connection. The particular properties and
/// semantics of a qos method always depend on the content class semantics. Though the
/// qos method could in principle apply to both peers, it is currently meaningful only
/// for the server.
BasicQos {
/// The client can request that messages be sent in advance so that when the client
/// finishes processing a message, the following message is already held locally,
/// rather than needing to be sent down the channel. Prefetching gives a performance
/// improvement. This field specifies the prefetch window size in octets. The server
/// will send a message in advance if it is equal to or smaller in size than the
/// available prefetch size (and also falls into other prefetch limits). May be set
/// to zero, meaning "no specific limit", although other prefetch limits may still
/// apply. The prefetch-size is ignored if the no-ack option is set.
prefetch_size: Long,
/// Specifies a prefetch window in terms of whole messages. This field may be used
/// in combination with the prefetch-size field; a message will only be sent in
/// advance if both prefetch windows (and those at the channel and connection level)
/// allow it. The prefetch-count is ignored if the no-ack option is set.
prefetch_count: Short,
/// By default the QoS settings apply to the current channel only. If this field is
/// set, they are applied to the entire connection.
global: Bit,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method tells the client that the requested QoS levels could be handled by the
/// server. The requested QoS applies to all active consumers until a new QoS is
/// defined.
BasicQosOk,
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method asks the server to start a "consumer", which is a transient request for
/// messages from a specific queue. Consumers last as long as the channel they were
/// declared on, or until the client cancels them.
BasicConsume {
reserved_1: Short,
/// Specifies the name of the queue to consume from.
queue: QueueName,
/// Specifies the identifier for the consumer. The consumer tag is local to a
/// channel, so two clients can use the same consumer tags. If this field is
/// empty the server will generate a unique tag.
consumer_tag: ConsumerTag,
no_local: NoLocal,
no_ack: NoAck,
/// Request exclusive consumer access, meaning only this consumer can access the
/// queue.
exclusive: Bit,
no_wait: NoWait,
/// A set of arguments for the consume. The syntax and semantics of these
/// arguments depends on the server implementation.
arguments: Table,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// The server provides the client with a consumer tag, which is used by the client
/// for methods called on the consumer at a later stage.
BasicConsumeOk {
/// Holds the consumer tag specified by the client or provided by the server.
consumer_tag: ConsumerTag,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method cancels a consumer. This does not affect already delivered
/// messages, but it does mean the server will not send any more messages for
/// that consumer. The client may receive an arbitrary number of messages in
/// between sending the cancel method and receiving the cancel-ok reply.
BasicCancel {
consumer_tag: ConsumerTag,
no_wait: NoWait,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method confirms that the cancellation was completed.
BasicCancelOk { consumer_tag: ConsumerTag },
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method publishes a message to a specific exchange. The message will be routed
/// to queues as defined by the exchange configuration and distributed to any active
/// consumers when the transaction, if any, is committed.
BasicPublish {
reserved_1: Short,
/// Specifies the name of the exchange to publish to. The exchange name can be
/// empty, meaning the default exchange. If the exchange name is specified, and that
/// exchange does not exist, the server will raise a channel exception.
exchange: ExchangeName,
/// Specifies the routing key for the message. The routing key is used for routing
/// messages depending on the exchange configuration.
routing_key: Shortstr,
/// This flag tells the server how to react if the message cannot be routed to a
/// queue. If this flag is set, the server will return an unroutable message with a
/// Return method. If this flag is zero, the server silently drops the message.
mandatory: Bit,
/// This flag tells the server how to react if the message cannot be routed to a
/// queue consumer immediately. If this flag is set, the server will return an
/// undeliverable message with a Return method. If this flag is zero, the server
/// will queue the message, but with no guarantee that it will ever be consumed.
immediate: Bit,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method returns an undeliverable message that was published with the "immediate"
/// flag set, or an unroutable message published with the "mandatory" flag set. The
/// reply code and text provide information about the reason that the message was
/// undeliverable.
BasicReturn {
reply_code: ReplyCode,
reply_text: ReplyText,
/// Specifies the name of the exchange that the message was originally published
/// to. May be empty, meaning the default exchange.
exchange: ExchangeName,
/// Specifies the routing key name specified when the message was published.
routing_key: Shortstr,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method delivers a message to the client, via a consumer. In the asynchronous
/// message delivery model, the client starts a consumer using the Consume method, then
/// the server responds with Deliver methods as and when messages arrive for that
/// consumer.
BasicDeliver {
consumer_tag: ConsumerTag,
delivery_tag: DeliveryTag,
redelivered: Redelivered,
/// Specifies the name of the exchange that the message was originally published to.
/// May be empty, indicating the default exchange.
exchange: ExchangeName,
/// Specifies the routing key name specified when the message was published.
routing_key: Shortstr,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method provides a direct access to the messages in a queue using a synchronous
/// dialogue that is designed for specific types of application where synchronous
/// functionality is more important than performance.
BasicGet {
reserved_1: Short,
/// Specifies the name of the queue to get a message from.
queue: QueueName,
no_ack: NoAck,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method delivers a message to the client following a get method. A message
/// delivered by 'get-ok' must be acknowledged unless the no-ack option was set in the
/// get method.
BasicGetOk {
delivery_tag: DeliveryTag,
redelivered: Redelivered,
/// Specifies the name of the exchange that the message was originally published to.
/// If empty, the message was published to the default exchange.
exchange: ExchangeName,
/// Specifies the routing key name specified when the message was published.
routing_key: Shortstr,
message_count: MessageCount,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method tells the client that the queue has no messages available for the
/// client.
BasicGetEmpty { reserved_1: Shortstr },
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method acknowledges one or more messages delivered via the Deliver or Get-Ok
/// methods. The client can ask to confirm a single message or a set of messages up to
/// and including a specific message.
BasicAck {
delivery_tag: DeliveryTag,
/// If set to 1, the delivery tag is treated as "up to and including", so that the
/// client can acknowledge multiple messages with a single method. If set to zero,
/// the delivery tag refers to a single message. If the multiple field is 1, and the
/// delivery tag is zero, tells the server to acknowledge all outstanding messages.
multiple: Bit,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method allows a client to reject a message. It can be used to interrupt and
/// cancel large incoming messages, or return untreatable messages to their original
/// queue.
BasicReject {
delivery_tag: DeliveryTag,
/// If requeue is true, the server will attempt to requeue the message. If requeue
/// is false or the requeue attempt fails the messages are discarded or dead-lettered.
requeue: Bit,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method asks the server to redeliver all unacknowledged messages on a
/// specified channel. Zero or more messages may be redelivered. This method
/// is deprecated in favour of the synchronous Recover/Recover-Ok.
BasicRecoverAsync {
/// If this field is zero, the message will be redelivered to the original
/// recipient. If this bit is 1, the server will attempt to requeue the message,
/// potentially then delivering it to an alternative subscriber.
requeue: Bit,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method asks the server to redeliver all unacknowledged messages on a
/// specified channel. Zero or more messages may be redelivered. This method
/// replaces the asynchronous Recover.
BasicRecover {
/// If this field is zero, the message will be redelivered to the original
/// recipient. If this bit is 1, the server will attempt to requeue the message,
/// potentially then delivering it to an alternative subscriber.
requeue: Bit,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method acknowledges a Basic.Recover method.
BasicRecoverOk,
/// The Tx class allows publish and ack operations to be batched into atomic
/// units of work. The intention is that all publish and ack requests issued
/// within a transaction will complete successfully or none of them will.
/// Servers SHOULD implement atomic transactions at least where all publish
/// or ack requests affect a single queue. Transactions that cover multiple
/// queues may be non-atomic, given that queues can be created and destroyed
/// asynchronously, and such events do not form part of any transaction.
/// Further, the behaviour of transactions with respect to the immediate and
/// mandatory flags on Basic.Publish methods is not defined.
/// This method sets the channel to use standard transactions. The client must use this
/// method at least once on a channel before using the Commit or Rollback methods.
TxSelect,
/// The Tx class allows publish and ack operations to be batched into atomic
/// units of work. The intention is that all publish and ack requests issued
/// within a transaction will complete successfully or none of them will.
/// Servers SHOULD implement atomic transactions at least where all publish
/// or ack requests affect a single queue. Transactions that cover multiple
/// queues may be non-atomic, given that queues can be created and destroyed
/// asynchronously, and such events do not form part of any transaction.
/// Further, the behaviour of transactions with respect to the immediate and
/// mandatory flags on Basic.Publish methods is not defined.
/// This method confirms to the client that the channel was successfully set to use
/// standard transactions.
TxSelectOk,
/// The Tx class allows publish and ack operations to be batched into atomic
/// units of work. The intention is that all publish and ack requests issued
/// within a transaction will complete successfully or none of them will.
/// Servers SHOULD implement atomic transactions at least where all publish
/// or ack requests affect a single queue. Transactions that cover multiple
/// queues may be non-atomic, given that queues can be created and destroyed
/// asynchronously, and such events do not form part of any transaction.
/// Further, the behaviour of transactions with respect to the immediate and
/// mandatory flags on Basic.Publish methods is not defined.
/// This method commits all message publications and acknowledgments performed in
/// the current transaction. A new transaction starts immediately after a commit.
TxCommit,
/// The Tx class allows publish and ack operations to be batched into atomic
/// units of work. The intention is that all publish and ack requests issued
/// within a transaction will complete successfully or none of them will.
/// Servers SHOULD implement atomic transactions at least where all publish
/// or ack requests affect a single queue. Transactions that cover multiple
/// queues may be non-atomic, given that queues can be created and destroyed
/// asynchronously, and such events do not form part of any transaction.
/// Further, the behaviour of transactions with respect to the immediate and
/// mandatory flags on Basic.Publish methods is not defined.
/// This method confirms to the client that the commit succeeded. Note that if a commit
/// fails, the server raises a channel exception.
TxCommitOk,
/// The Tx class allows publish and ack operations to be batched into atomic
/// units of work. The intention is that all publish and ack requests issued
/// within a transaction will complete successfully or none of them will.
/// Servers SHOULD implement atomic transactions at least where all publish
/// or ack requests affect a single queue. Transactions that cover multiple
/// queues may be non-atomic, given that queues can be created and destroyed
/// asynchronously, and such events do not form part of any transaction.
/// Further, the behaviour of transactions with respect to the immediate and
/// mandatory flags on Basic.Publish methods is not defined.
/// This method abandons all message publications and acknowledgments performed in
/// the current transaction. A new transaction starts immediately after a rollback.
/// Note that unacked messages will not be automatically redelivered by rollback;
/// if that is required an explicit recover call should be issued.
TxRollback,
/// The Tx class allows publish and ack operations to be batched into atomic
/// units of work. The intention is that all publish and ack requests issued
/// within a transaction will complete successfully or none of them will.
/// Servers SHOULD implement atomic transactions at least where all publish
/// or ack requests affect a single queue. Transactions that cover multiple
/// queues may be non-atomic, given that queues can be created and destroyed
/// asynchronously, and such events do not form part of any transaction.
/// Further, the behaviour of transactions with respect to the immediate and
/// mandatory flags on Basic.Publish methods is not defined.
/// This method confirms to the client that the rollback succeeded. Note that if an
/// rollback fails, the server raises a channel exception.
TxRollbackOk,
}

View file

@ -0,0 +1,31 @@
mod generated;
use std::collections::HashMap;
pub use generated::*;
pub type TableFieldName = String;
pub type Table = HashMap<TableFieldName, FieldValue>;
#[derive(Debug, Clone, PartialEq)]
pub enum FieldValue {
Boolean(bool),
ShortShortInt(i8),
ShortShortUInt(u8),
ShortInt(i16),
ShortUInt(u16),
LongInt(i32),
LongUInt(u32),
LongLongInt(i64),
LongLongUInt(u64),
Float(f32),
Double(f64),
DecimalValue(u8, u32),
ShortString(Shortstr),
LongString(Longstr),
FieldArray(Vec<FieldValue>),
Timestamp(u64),
FieldTable(Table),
Void,
}

View file

@ -1,3 +1,3 @@
#![warn(rust_2018_idioms)]
mod method;
pub mod methods;

View file

@ -1,3 +0,0 @@
use amqp_core::ChannelHandle;
pub async fn handle_method(channel_handle: ChannelHandle) {}

View file

@ -0,0 +1,4 @@
use amqp_core::methods::Method;
use amqp_core::ChannelHandle;
pub async fn handle_method(_channel_handle: ChannelHandle, _method: Method) {}

View file

@ -1,7 +1,7 @@
use crate::error::{ConException, ProtocolError, Result};
use crate::frame::{Frame, FrameType};
use crate::methods::Method;
use crate::{frame, methods, sasl};
use amqp_core::methods::{FieldValue, Method, Table};
use amqp_core::GlobalData;
use anyhow::Context;
use std::collections::HashMap;
@ -220,7 +220,17 @@ impl Connection {
}
Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?,
_ => {
tokio::spawn(amqp_core::method::handle_method())
let channel_handle = self
.channels
.get(&frame.channel)
.ok_or_else(|| ConException::Todo.into_trans())?
.channel_handle
.clone();
tokio::spawn(amqp_messaging::methods::handle_method(
channel_handle,
method,
));
// we don't handle this here, forward it to *somewhere*
}
}
@ -325,9 +335,9 @@ impl Drop for Channel {
}
}
fn server_properties(host: SocketAddr) -> methods::Table {
fn ls(str: &str) -> methods::FieldValue {
methods::FieldValue::LongString(str.into())
fn server_properties(host: SocketAddr) -> Table {
fn ls(str: &str) -> FieldValue {
FieldValue::LongString(str.into())
}
let host_str = host.ip().to_string();

View file

@ -1,794 +1,10 @@
#![allow(dead_code)]
// This file has been generated by `xtask/src/codegen`. Do not edit it manually.
pub type ClassId = u16;
/// consumer tag
///
/// Identifier for the consumer, valid within the current channel.
pub type ConsumerTag = String;
/// server-assigned delivery tag
///
/// The server-assigned and channel-specific delivery tag
pub type DeliveryTag = u64;
/// exchange name
///
/// must be shorter than 127, must match `^[a-zA-Z0-9-_.:]*$`
///
/// The exchange name is a client-selected string that identifies the exchange for
/// publish methods.
pub type ExchangeName = String;
pub type MethodId = u16;
/// no acknowledgement needed
///
/// If this field is set the server does not expect acknowledgements for
/// messages. That is, when a message is delivered to the client the server
/// assumes the delivery will succeed and immediately dequeues it. This
/// functionality may increase performance but at the cost of reliability.
/// Messages can get lost if a client dies before they are delivered to the
/// application.
pub type NoAck = bool;
/// do not deliver own messages
///
/// If the no-local field is set the server will not send messages to the connection that
/// published them.
pub type NoLocal = bool;
/// do not send reply method
///
/// If set, the server will not respond to the method. The client should not wait
/// for a reply method. If the server could not complete the method it will raise a
/// channel or connection exception.
pub type NoWait = bool;
/// must not be null, must be shorter than 127
///
/// Unconstrained.
pub type Path = String;
///
/// This table provides a set of peer properties, used for identification, debugging,
/// and general information.
pub type PeerProperties = super::Table;
/// queue name
///
/// must be shorter than 127, must match `^[a-zA-Z0-9-_.:]*$`
///
/// The queue name identifies the queue within the vhost. In methods where the queue
/// name may be blank, and that has no specific significance, this refers to the
/// 'current' queue for the channel, meaning the last queue that the client declared
/// on the channel. If the client did not declare a queue, and the method needs a
/// queue name, this will result in a 502 (syntax error) channel exception.
pub type QueueName = String;
/// message is being redelivered
///
/// This indicates that the message has been previously delivered to this or
/// another client.
pub type Redelivered = bool;
/// number of messages in queue
///
/// The number of messages in the queue, which will be zero for newly-declared
/// queues. This is the number of messages present in the queue, and committed
/// if the channel on which they were published is transacted, that are not
/// waiting acknowledgement.
pub type MessageCount = u32;
/// reply code from server
///
/// must not be null
///
/// The reply code. The AMQ reply codes are defined as constants at the start
/// of this formal specification.
pub type ReplyCode = u16;
/// localised reply text
///
/// must not be null
///
/// The localised reply text. This text can be logged as an aid to resolving
/// issues.
pub type ReplyText = String;
/// single bit
pub type Bit = bool;
/// single octet
pub type Octet = u8;
/// 16-bit integer
pub type Short = u16;
/// 32-bit integer
pub type Long = u32;
/// 64-bit integer
pub type Longlong = u64;
/// short string (max. 256 characters)
pub type Shortstr = String;
/// long string
pub type Longstr = Vec<u8>;
/// 64-bit timestamp
pub type Timestamp = u64;
/// field table
pub type Table = super::Table;
#[derive(Debug, Clone, PartialEq)]
pub enum Method {
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method starts the connection negotiation process by telling the client the
/// protocol version that the server proposes, along with a list of security mechanisms
/// which the client can use for authentication.
ConnectionStart {
/// The major version number can take any value from 0 to 99 as defined in the
/// AMQP specification.
version_major: Octet,
/// The minor version number can take any value from 0 to 99 as defined in the
/// AMQP specification.
version_minor: Octet,
server_properties: PeerProperties,
/// must not be null
///
/// A list of the security mechanisms that the server supports, delimited by spaces.
mechanisms: Longstr,
/// must not be null
///
/// A list of the message locales that the server supports, delimited by spaces. The
/// locale defines the language in which the server will send reply texts.
locales: Longstr,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method selects a SASL security mechanism.
ConnectionStartOk {
client_properties: PeerProperties,
/// must not be null
///
/// A single security mechanisms selected by the client, which must be one of those
/// specified by the server.
mechanism: Shortstr,
/// must not be null
///
/// A block of opaque data passed to the security mechanism. The contents of this
/// data are defined by the SASL security mechanism.
response: Longstr,
/// must not be null
///
/// A single message locale selected by the client, which must be one of those
/// specified by the server.
locale: Shortstr,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// The SASL protocol works by exchanging challenges and responses until both peers have
/// received sufficient information to authenticate each other. This method challenges
/// the client to provide more information.
ConnectionSecure {
/// Challenge information, a block of opaque binary data passed to the security
/// mechanism.
challenge: Longstr,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method attempts to authenticate, passing a block of SASL data for the security
/// mechanism at the server side.
ConnectionSecureOk {
/// must not be null
///
/// A block of opaque data passed to the security mechanism. The contents of this
/// data are defined by the SASL security mechanism.
response: Longstr,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method proposes a set of connection configuration values to the client. The
/// client can accept and/or adjust these.
ConnectionTune {
/// Specifies highest channel number that the server permits. Usable channel numbers
/// are in the range 1..channel-max. Zero indicates no specified limit.
channel_max: Short,
/// The largest frame size that the server proposes for the connection, including
/// frame header and end-byte. The client can negotiate a lower value. Zero means
/// that the server does not impose any specific limit but may reject very large
/// frames if it cannot allocate resources for them.
frame_max: Long,
/// The delay, in seconds, of the connection heartbeat that the server wants.
/// Zero means the server does not want a heartbeat.
heartbeat: Short,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method sends the client's connection tuning parameters to the server.
/// Certain fields are negotiated, others provide capability information.
ConnectionTuneOk {
/// must not be null, must be less than the tune field of the method channel-max
///
/// The maximum total number of channels that the client will use per connection.
channel_max: Short,
/// The largest frame size that the client and server will use for the connection.
/// Zero means that the client does not impose any specific limit but may reject
/// very large frames if it cannot allocate resources for them. Note that the
/// frame-max limit applies principally to content frames, where large contents can
/// be broken into frames of arbitrary size.
frame_max: Long,
/// The delay, in seconds, of the connection heartbeat that the client wants. Zero
/// means the client does not want a heartbeat.
heartbeat: Short,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method opens a connection to a virtual host, which is a collection of
/// resources, and acts to separate multiple application domains within a server.
/// The server may apply arbitrary limits per virtual host, such as the number
/// of each type of entity that may be used, per connection and/or in total.
ConnectionOpen {
/// The name of the virtual host to work with.
virtual_host: Path,
reserved_1: Shortstr,
reserved_2: Bit,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method signals to the client that the connection is ready for use.
ConnectionOpenOk { reserved_1: Shortstr },
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method indicates that the sender wants to close the connection. This may be
/// due to internal conditions (e.g. a forced shut-down) or due to an error handling
/// a specific method, i.e. an exception. When a close is due to an exception, the
/// sender provides the class and method id of the method which caused the exception.
ConnectionClose {
reply_code: ReplyCode,
reply_text: ReplyText,
/// When the close is provoked by a method exception, this is the class of the
/// method.
class_id: ClassId,
/// When the close is provoked by a method exception, this is the ID of the method.
method_id: MethodId,
},
/// The connection class provides methods for a client to establish a network connection to
/// a server, and for both peers to operate the connection thereafter.
/// This method confirms a Connection.Close method and tells the recipient that it is
/// safe to release resources for the connection and close the socket.
ConnectionCloseOk,
/// The channel class provides methods for a client to establish a channel to a
/// server and for both peers to operate the channel thereafter.
/// This method opens a channel to the server.
ChannelOpen { reserved_1: Shortstr },
/// The channel class provides methods for a client to establish a channel to a
/// server and for both peers to operate the channel thereafter.
/// This method signals to the client that the channel is ready for use.
ChannelOpenOk { reserved_1: Longstr },
/// The channel class provides methods for a client to establish a channel to a
/// server and for both peers to operate the channel thereafter.
/// This method asks the peer to pause or restart the flow of content data sent by
/// a consumer. This is a simple flow-control mechanism that a peer can use to avoid
/// overflowing its queues or otherwise finding itself receiving more messages than
/// it can process. Note that this method is not intended for window control. It does
/// not affect contents returned by Basic.Get-Ok methods.
ChannelFlow {
/// If 1, the peer starts sending content frames. If 0, the peer stops sending
/// content frames.
active: Bit,
},
/// The channel class provides methods for a client to establish a channel to a
/// server and for both peers to operate the channel thereafter.
/// Confirms to the peer that a flow command was received and processed.
ChannelFlowOk {
/// Confirms the setting of the processed flow method: 1 means the peer will start
/// sending or continue to send content frames; 0 means it will not.
active: Bit,
},
/// The channel class provides methods for a client to establish a channel to a
/// server and for both peers to operate the channel thereafter.
/// This method indicates that the sender wants to close the channel. This may be due to
/// internal conditions (e.g. a forced shut-down) or due to an error handling a specific
/// method, i.e. an exception. When a close is due to an exception, the sender provides
/// the class and method id of the method which caused the exception.
ChannelClose {
reply_code: ReplyCode,
reply_text: ReplyText,
/// When the close is provoked by a method exception, this is the class of the
/// method.
class_id: ClassId,
/// When the close is provoked by a method exception, this is the ID of the method.
method_id: MethodId,
},
/// The channel class provides methods for a client to establish a channel to a
/// server and for both peers to operate the channel thereafter.
/// This method confirms a Channel.Close method and tells the recipient that it is safe
/// to release resources for the channel.
ChannelCloseOk,
/// Exchanges match and distribute messages across queues. Exchanges can be configured in
/// the server or declared at runtime.
/// This method creates an exchange if it does not already exist, and if the exchange
/// exists, verifies that it is of the correct and expected class.
ExchangeDeclare {
reserved_1: Short,
/// must not be null
exchange: ExchangeName,
/// Each exchange belongs to one of a set of exchange types implemented by the
/// server. The exchange types define the functionality of the exchange - i.e. how
/// messages are routed through it. It is not valid or meaningful to attempt to
/// change the type of an existing exchange.
r#type: Shortstr,
/// If set, the server will reply with Declare-Ok if the exchange already
/// exists with the same name, and raise an error if not. The client can
/// use this to check whether an exchange exists without modifying the
/// server state. When set, all other method fields except name and no-wait
/// are ignored. A declare with both passive and no-wait has no effect.
/// Arguments are compared for semantic equivalence.
passive: Bit,
/// If set when creating a new exchange, the exchange will be marked as durable.
/// Durable exchanges remain active when a server restarts. Non-durable exchanges
/// (transient exchanges) are purged if/when a server restarts.
durable: Bit,
reserved_2: Bit,
reserved_3: Bit,
no_wait: NoWait,
/// A set of arguments for the declaration. The syntax and semantics of these
/// arguments depends on the server implementation.
arguments: Table,
},
/// Exchanges match and distribute messages across queues. Exchanges can be configured in
/// the server or declared at runtime.
/// This method confirms a Declare method and confirms the name of the exchange,
/// essential for automatically-named exchanges.
ExchangeDeclareOk,
/// Exchanges match and distribute messages across queues. Exchanges can be configured in
/// the server or declared at runtime.
/// This method deletes an exchange. When an exchange is deleted all queue bindings on
/// the exchange are cancelled.
ExchangeDelete {
reserved_1: Short,
/// must not be null
exchange: ExchangeName,
/// If set, the server will only delete the exchange if it has no queue bindings. If
/// the exchange has queue bindings the server does not delete it but raises a
/// channel exception instead.
if_unused: Bit,
no_wait: NoWait,
},
/// Exchanges match and distribute messages across queues. Exchanges can be configured in
/// the server or declared at runtime.
/// This method confirms the deletion of an exchange.
ExchangeDeleteOk,
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method creates or checks a queue. When creating a new queue the client can
/// specify various properties that control the durability of the queue and its
/// contents, and the level of sharing for the queue.
QueueDeclare {
reserved_1: Short,
queue: QueueName,
/// If set, the server will reply with Declare-Ok if the queue already
/// exists with the same name, and raise an error if not. The client can
/// use this to check whether a queue exists without modifying the
/// server state. When set, all other method fields except name and no-wait
/// are ignored. A declare with both passive and no-wait has no effect.
/// Arguments are compared for semantic equivalence.
passive: Bit,
/// If set when creating a new queue, the queue will be marked as durable. Durable
/// queues remain active when a server restarts. Non-durable queues (transient
/// queues) are purged if/when a server restarts. Note that durable queues do not
/// necessarily hold persistent messages, although it does not make sense to send
/// persistent messages to a transient queue.
durable: Bit,
/// Exclusive queues may only be accessed by the current connection, and are
/// deleted when that connection closes. Passive declaration of an exclusive
/// queue by other connections are not allowed.
exclusive: Bit,
/// If set, the queue is deleted when all consumers have finished using it. The last
/// consumer can be cancelled either explicitly or because its channel is closed. If
/// there was no consumer ever on the queue, it won't be deleted. Applications can
/// explicitly delete auto-delete queues using the Delete method as normal.
auto_delete: Bit,
no_wait: NoWait,
/// A set of arguments for the declaration. The syntax and semantics of these
/// arguments depends on the server implementation.
arguments: Table,
},
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method confirms a Declare method and confirms the name of the queue, essential
/// for automatically-named queues.
QueueDeclareOk {
/// must not be null
///
/// Reports the name of the queue. If the server generated a queue name, this field
/// contains that name.
queue: QueueName,
message_count: MessageCount,
/// Reports the number of active consumers for the queue. Note that consumers can
/// suspend activity (Channel.Flow) in which case they do not appear in this count.
consumer_count: Long,
},
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method binds a queue to an exchange. Until a queue is bound it will not
/// receive any messages. In a classic messaging model, store-and-forward queues
/// are bound to a direct exchange and subscription queues are bound to a topic
/// exchange.
QueueBind {
reserved_1: Short,
/// Specifies the name of the queue to bind.
queue: QueueName,
exchange: ExchangeName,
/// Specifies the routing key for the binding. The routing key is used for routing
/// messages depending on the exchange configuration. Not all exchanges use a
/// routing key - refer to the specific exchange documentation. If the queue name
/// is empty, the server uses the last queue declared on the channel. If the
/// routing key is also empty, the server uses this queue name for the routing
/// key as well. If the queue name is provided but the routing key is empty, the
/// server does the binding with that empty routing key. The meaning of empty
/// routing keys depends on the exchange implementation.
routing_key: Shortstr,
no_wait: NoWait,
/// A set of arguments for the binding. The syntax and semantics of these arguments
/// depends on the exchange class.
arguments: Table,
},
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method confirms that the bind was successful.
QueueBindOk,
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method unbinds a queue from an exchange.
QueueUnbind {
reserved_1: Short,
/// Specifies the name of the queue to unbind.
queue: QueueName,
/// The name of the exchange to unbind from.
exchange: ExchangeName,
/// Specifies the routing key of the binding to unbind.
routing_key: Shortstr,
/// Specifies the arguments of the binding to unbind.
arguments: Table,
},
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method confirms that the unbind was successful.
QueueUnbindOk,
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method removes all messages from a queue which are not awaiting
/// acknowledgment.
QueuePurge {
reserved_1: Short,
/// Specifies the name of the queue to purge.
queue: QueueName,
no_wait: NoWait,
},
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method confirms the purge of a queue.
QueuePurgeOk {
/// Reports the number of messages purged.
message_count: MessageCount,
},
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method deletes a queue. When a queue is deleted any pending messages are sent
/// to a dead-letter queue if this is defined in the server configuration, and all
/// consumers on the queue are cancelled.
QueueDelete {
reserved_1: Short,
/// Specifies the name of the queue to delete.
queue: QueueName,
/// If set, the server will only delete the queue if it has no consumers. If the
/// queue has consumers the server does does not delete it but raises a channel
/// exception instead.
if_unused: Bit,
/// If set, the server will only delete the queue if it has no messages.
if_empty: Bit,
no_wait: NoWait,
},
/// Queues store and forward messages. Queues can be configured in the server or created at
/// runtime. Queues must be attached to at least one exchange in order to receive messages
/// from publishers.
/// This method confirms the deletion of a queue.
QueueDeleteOk {
/// Reports the number of messages deleted.
message_count: MessageCount,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method requests a specific quality of service. The QoS can be specified for the
/// current channel or for all channels on the connection. The particular properties and
/// semantics of a qos method always depend on the content class semantics. Though the
/// qos method could in principle apply to both peers, it is currently meaningful only
/// for the server.
BasicQos {
/// The client can request that messages be sent in advance so that when the client
/// finishes processing a message, the following message is already held locally,
/// rather than needing to be sent down the channel. Prefetching gives a performance
/// improvement. This field specifies the prefetch window size in octets. The server
/// will send a message in advance if it is equal to or smaller in size than the
/// available prefetch size (and also falls into other prefetch limits). May be set
/// to zero, meaning "no specific limit", although other prefetch limits may still
/// apply. The prefetch-size is ignored if the no-ack option is set.
prefetch_size: Long,
/// Specifies a prefetch window in terms of whole messages. This field may be used
/// in combination with the prefetch-size field; a message will only be sent in
/// advance if both prefetch windows (and those at the channel and connection level)
/// allow it. The prefetch-count is ignored if the no-ack option is set.
prefetch_count: Short,
/// By default the QoS settings apply to the current channel only. If this field is
/// set, they are applied to the entire connection.
global: Bit,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method tells the client that the requested QoS levels could be handled by the
/// server. The requested QoS applies to all active consumers until a new QoS is
/// defined.
BasicQosOk,
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method asks the server to start a "consumer", which is a transient request for
/// messages from a specific queue. Consumers last as long as the channel they were
/// declared on, or until the client cancels them.
BasicConsume {
reserved_1: Short,
/// Specifies the name of the queue to consume from.
queue: QueueName,
/// Specifies the identifier for the consumer. The consumer tag is local to a
/// channel, so two clients can use the same consumer tags. If this field is
/// empty the server will generate a unique tag.
consumer_tag: ConsumerTag,
no_local: NoLocal,
no_ack: NoAck,
/// Request exclusive consumer access, meaning only this consumer can access the
/// queue.
exclusive: Bit,
no_wait: NoWait,
/// A set of arguments for the consume. The syntax and semantics of these
/// arguments depends on the server implementation.
arguments: Table,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// The server provides the client with a consumer tag, which is used by the client
/// for methods called on the consumer at a later stage.
BasicConsumeOk {
/// Holds the consumer tag specified by the client or provided by the server.
consumer_tag: ConsumerTag,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method cancels a consumer. This does not affect already delivered
/// messages, but it does mean the server will not send any more messages for
/// that consumer. The client may receive an arbitrary number of messages in
/// between sending the cancel method and receiving the cancel-ok reply.
BasicCancel {
consumer_tag: ConsumerTag,
no_wait: NoWait,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method confirms that the cancellation was completed.
BasicCancelOk { consumer_tag: ConsumerTag },
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method publishes a message to a specific exchange. The message will be routed
/// to queues as defined by the exchange configuration and distributed to any active
/// consumers when the transaction, if any, is committed.
BasicPublish {
reserved_1: Short,
/// Specifies the name of the exchange to publish to. The exchange name can be
/// empty, meaning the default exchange. If the exchange name is specified, and that
/// exchange does not exist, the server will raise a channel exception.
exchange: ExchangeName,
/// Specifies the routing key for the message. The routing key is used for routing
/// messages depending on the exchange configuration.
routing_key: Shortstr,
/// This flag tells the server how to react if the message cannot be routed to a
/// queue. If this flag is set, the server will return an unroutable message with a
/// Return method. If this flag is zero, the server silently drops the message.
mandatory: Bit,
/// This flag tells the server how to react if the message cannot be routed to a
/// queue consumer immediately. If this flag is set, the server will return an
/// undeliverable message with a Return method. If this flag is zero, the server
/// will queue the message, but with no guarantee that it will ever be consumed.
immediate: Bit,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method returns an undeliverable message that was published with the "immediate"
/// flag set, or an unroutable message published with the "mandatory" flag set. The
/// reply code and text provide information about the reason that the message was
/// undeliverable.
BasicReturn {
reply_code: ReplyCode,
reply_text: ReplyText,
/// Specifies the name of the exchange that the message was originally published
/// to. May be empty, meaning the default exchange.
exchange: ExchangeName,
/// Specifies the routing key name specified when the message was published.
routing_key: Shortstr,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method delivers a message to the client, via a consumer. In the asynchronous
/// message delivery model, the client starts a consumer using the Consume method, then
/// the server responds with Deliver methods as and when messages arrive for that
/// consumer.
BasicDeliver {
consumer_tag: ConsumerTag,
delivery_tag: DeliveryTag,
redelivered: Redelivered,
/// Specifies the name of the exchange that the message was originally published to.
/// May be empty, indicating the default exchange.
exchange: ExchangeName,
/// Specifies the routing key name specified when the message was published.
routing_key: Shortstr,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method provides a direct access to the messages in a queue using a synchronous
/// dialogue that is designed for specific types of application where synchronous
/// functionality is more important than performance.
BasicGet {
reserved_1: Short,
/// Specifies the name of the queue to get a message from.
queue: QueueName,
no_ack: NoAck,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method delivers a message to the client following a get method. A message
/// delivered by 'get-ok' must be acknowledged unless the no-ack option was set in the
/// get method.
BasicGetOk {
delivery_tag: DeliveryTag,
redelivered: Redelivered,
/// Specifies the name of the exchange that the message was originally published to.
/// If empty, the message was published to the default exchange.
exchange: ExchangeName,
/// Specifies the routing key name specified when the message was published.
routing_key: Shortstr,
message_count: MessageCount,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method tells the client that the queue has no messages available for the
/// client.
BasicGetEmpty { reserved_1: Shortstr },
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method acknowledges one or more messages delivered via the Deliver or Get-Ok
/// methods. The client can ask to confirm a single message or a set of messages up to
/// and including a specific message.
BasicAck {
delivery_tag: DeliveryTag,
/// If set to 1, the delivery tag is treated as "up to and including", so that the
/// client can acknowledge multiple messages with a single method. If set to zero,
/// the delivery tag refers to a single message. If the multiple field is 1, and the
/// delivery tag is zero, tells the server to acknowledge all outstanding messages.
multiple: Bit,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method allows a client to reject a message. It can be used to interrupt and
/// cancel large incoming messages, or return untreatable messages to their original
/// queue.
BasicReject {
delivery_tag: DeliveryTag,
/// If requeue is true, the server will attempt to requeue the message. If requeue
/// is false or the requeue attempt fails the messages are discarded or dead-lettered.
requeue: Bit,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method asks the server to redeliver all unacknowledged messages on a
/// specified channel. Zero or more messages may be redelivered. This method
/// is deprecated in favour of the synchronous Recover/Recover-Ok.
BasicRecoverAsync {
/// If this field is zero, the message will be redelivered to the original
/// recipient. If this bit is 1, the server will attempt to requeue the message,
/// potentially then delivering it to an alternative subscriber.
requeue: Bit,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method asks the server to redeliver all unacknowledged messages on a
/// specified channel. Zero or more messages may be redelivered. This method
/// replaces the asynchronous Recover.
BasicRecover {
/// If this field is zero, the message will be redelivered to the original
/// recipient. If this bit is 1, the server will attempt to requeue the message,
/// potentially then delivering it to an alternative subscriber.
requeue: Bit,
},
/// The Basic class provides methods that support an industry-standard messaging model.
/// This method acknowledges a Basic.Recover method.
BasicRecoverOk,
/// The Tx class allows publish and ack operations to be batched into atomic
/// units of work. The intention is that all publish and ack requests issued
/// within a transaction will complete successfully or none of them will.
/// Servers SHOULD implement atomic transactions at least where all publish
/// or ack requests affect a single queue. Transactions that cover multiple
/// queues may be non-atomic, given that queues can be created and destroyed
/// asynchronously, and such events do not form part of any transaction.
/// Further, the behaviour of transactions with respect to the immediate and
/// mandatory flags on Basic.Publish methods is not defined.
/// This method sets the channel to use standard transactions. The client must use this
/// method at least once on a channel before using the Commit or Rollback methods.
TxSelect,
/// The Tx class allows publish and ack operations to be batched into atomic
/// units of work. The intention is that all publish and ack requests issued
/// within a transaction will complete successfully or none of them will.
/// Servers SHOULD implement atomic transactions at least where all publish
/// or ack requests affect a single queue. Transactions that cover multiple
/// queues may be non-atomic, given that queues can be created and destroyed
/// asynchronously, and such events do not form part of any transaction.
/// Further, the behaviour of transactions with respect to the immediate and
/// mandatory flags on Basic.Publish methods is not defined.
/// This method confirms to the client that the channel was successfully set to use
/// standard transactions.
TxSelectOk,
/// The Tx class allows publish and ack operations to be batched into atomic
/// units of work. The intention is that all publish and ack requests issued
/// within a transaction will complete successfully or none of them will.
/// Servers SHOULD implement atomic transactions at least where all publish
/// or ack requests affect a single queue. Transactions that cover multiple
/// queues may be non-atomic, given that queues can be created and destroyed
/// asynchronously, and such events do not form part of any transaction.
/// Further, the behaviour of transactions with respect to the immediate and
/// mandatory flags on Basic.Publish methods is not defined.
/// This method commits all message publications and acknowledgments performed in
/// the current transaction. A new transaction starts immediately after a commit.
TxCommit,
/// The Tx class allows publish and ack operations to be batched into atomic
/// units of work. The intention is that all publish and ack requests issued
/// within a transaction will complete successfully or none of them will.
/// Servers SHOULD implement atomic transactions at least where all publish
/// or ack requests affect a single queue. Transactions that cover multiple
/// queues may be non-atomic, given that queues can be created and destroyed
/// asynchronously, and such events do not form part of any transaction.
/// Further, the behaviour of transactions with respect to the immediate and
/// mandatory flags on Basic.Publish methods is not defined.
/// This method confirms to the client that the commit succeeded. Note that if a commit
/// fails, the server raises a channel exception.
TxCommitOk,
/// The Tx class allows publish and ack operations to be batched into atomic
/// units of work. The intention is that all publish and ack requests issued
/// within a transaction will complete successfully or none of them will.
/// Servers SHOULD implement atomic transactions at least where all publish
/// or ack requests affect a single queue. Transactions that cover multiple
/// queues may be non-atomic, given that queues can be created and destroyed
/// asynchronously, and such events do not form part of any transaction.
/// Further, the behaviour of transactions with respect to the immediate and
/// mandatory flags on Basic.Publish methods is not defined.
/// This method abandons all message publications and acknowledgments performed in
/// the current transaction. A new transaction starts immediately after a rollback.
/// Note that unacked messages will not be automatically redelivered by rollback;
/// if that is required an explicit recover call should be issued.
TxRollback,
/// The Tx class allows publish and ack operations to be batched into atomic
/// units of work. The intention is that all publish and ack requests issued
/// within a transaction will complete successfully or none of them will.
/// Servers SHOULD implement atomic transactions at least where all publish
/// or ack requests affect a single queue. Transactions that cover multiple
/// queues may be non-atomic, given that queues can be created and destroyed
/// asynchronously, and such events do not form part of any transaction.
/// Further, the behaviour of transactions with respect to the immediate and
/// mandatory flags on Basic.Publish methods is not defined.
/// This method confirms to the client that the rollback succeeded. Note that if an
/// rollback fails, the server raises a channel exception.
TxRollbackOk,
}
pub mod parse {
use super::*;
use crate::error::TransError;
use crate::methods::parse_helper::*;
use amqp_core::methods::*;
use nom::{branch::alt, bytes::complete::tag};
use once_cell::sync::Lazy;
use regex::Regex;
@ -1659,9 +875,9 @@ pub mod parse {
}
}
pub mod write {
use super::*;
use crate::error::TransError;
use crate::methods::write_helper::*;
use amqp_core::methods::*;
use std::io::Write;
pub fn write_method<W: Write>(method: Method, mut writer: W) -> Result<(), TransError> {
@ -2077,8 +1293,8 @@ pub mod write {
}
mod random {
use super::*;
use crate::methods::RandomMethod;
use amqp_core::methods::*;
use rand::Rng;
impl<R: Rng> RandomMethod<R> for Method {

View file

@ -1,4 +1,5 @@
use crate::error::{ConException, TransError};
use amqp_core::methods::{FieldValue, Method, Table};
use rand::Rng;
use std::collections::HashMap;
@ -8,36 +9,10 @@ mod parse_helper;
mod tests;
mod write_helper;
pub type TableFieldName = String;
pub type Table = HashMap<TableFieldName, FieldValue>;
#[derive(Debug, Clone, PartialEq)]
pub enum FieldValue {
Boolean(bool),
ShortShortInt(i8),
ShortShortUInt(u8),
ShortInt(i16),
ShortUInt(u16),
LongInt(i32),
LongUInt(u32),
LongLongInt(i64),
LongLongUInt(u64),
Float(f32),
Double(f64),
DecimalValue(u8, u32),
ShortString(Shortstr),
LongString(Longstr),
FieldArray(Vec<FieldValue>),
Timestamp(u64),
FieldTable(Table),
Void,
}
pub use generated::*;
/// Parses the payload of a method frame into the method
pub fn parse_method(payload: &[u8]) -> Result<generated::Method, TransError> {
pub fn parse_method(payload: &[u8]) -> Result<Method, TransError> {
let nom_result = generated::parse::parse_method(payload);
match nom_result {
@ -92,7 +67,7 @@ macro_rules! rand_random_method {
rand_random_method!(bool, u8, i8, u16, i16, u32, i32, u64, i64, f32, f64);
impl<R: Rng> RandomMethod<R> for HashMap<String, FieldValue> {
impl<R: Rng> RandomMethod<R> for Table {
fn random(rng: &mut R) -> Self {
let len = rng.gen_range(0..3);
HashMap::from_iter((0..len).map(|_| (String::random(rng), FieldValue::random(rng))))
@ -103,23 +78,23 @@ impl<R: Rng> RandomMethod<R> for FieldValue {
fn random(rng: &mut R) -> Self {
let index = rng.gen_range(0_u32..17);
match index {
0 => FieldValue::Boolean(RandomMethod::random(rng)),
1 => FieldValue::ShortShortInt(RandomMethod::random(rng)),
2 => FieldValue::ShortShortUInt(RandomMethod::random(rng)),
3 => FieldValue::ShortInt(RandomMethod::random(rng)),
4 => FieldValue::ShortUInt(RandomMethod::random(rng)),
5 => FieldValue::LongInt(RandomMethod::random(rng)),
6 => FieldValue::LongUInt(RandomMethod::random(rng)),
7 => FieldValue::LongLongInt(RandomMethod::random(rng)),
8 => FieldValue::LongLongUInt(RandomMethod::random(rng)),
9 => FieldValue::Float(RandomMethod::random(rng)),
10 => FieldValue::Double(RandomMethod::random(rng)),
11 => FieldValue::ShortString(RandomMethod::random(rng)),
12 => FieldValue::LongString(RandomMethod::random(rng)),
13 => FieldValue::FieldArray(RandomMethod::random(rng)),
14 => FieldValue::Timestamp(RandomMethod::random(rng)),
15 => FieldValue::FieldTable(RandomMethod::random(rng)),
16 => FieldValue::Void,
0 => Self::Boolean(RandomMethod::random(rng)),
1 => Self::ShortShortInt(RandomMethod::random(rng)),
2 => Self::ShortShortUInt(RandomMethod::random(rng)),
3 => Self::ShortInt(RandomMethod::random(rng)),
4 => Self::ShortUInt(RandomMethod::random(rng)),
5 => Self::LongInt(RandomMethod::random(rng)),
6 => Self::LongUInt(RandomMethod::random(rng)),
7 => Self::LongLongInt(RandomMethod::random(rng)),
8 => Self::LongLongUInt(RandomMethod::random(rng)),
9 => Self::Float(RandomMethod::random(rng)),
10 => Self::Double(RandomMethod::random(rng)),
11 => Self::ShortString(RandomMethod::random(rng)),
12 => Self::LongString(RandomMethod::random(rng)),
13 => Self::FieldArray(RandomMethod::random(rng)),
14 => Self::Timestamp(RandomMethod::random(rng)),
15 => Self::FieldTable(RandomMethod::random(rng)),
16 => Self::Void,
_ => unreachable!(),
}
}

View file

@ -1,7 +1,8 @@
use crate::error::{ConException, ProtocolError, TransError};
use crate::methods::generated::parse::IResult;
use crate::methods::generated::{
Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp,
use amqp_core::methods::{
Bit, FieldValue, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, TableFieldName,
Timestamp,
};
use nom::branch::alt;
use nom::bytes::complete::{tag, take};
@ -56,7 +57,6 @@ macro_rules! fail {
};
}
use crate::methods::{FieldValue, TableFieldName};
pub use fail;
pub fn octet(input: &[u8]) -> IResult<'_, Octet> {

View file

@ -1,8 +1,6 @@
use crate::error::TransError;
use crate::methods::generated::{
Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp,
};
use crate::methods::FieldValue;
use amqp_core::methods::{Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp};
use anyhow::Context;
use std::io::Write;

View file

@ -1,6 +1,6 @@
use crate::frame::FrameType;
use crate::methods::{FieldValue, Method};
use crate::{frame, methods};
use amqp_core::methods::{FieldValue, Method};
use std::collections::HashMap;
#[tokio::test]

View file

@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.54"
heck = "0.4.0"
itertools = "0.10.3"
strong-xml = "0.6.3"

View file

@ -4,15 +4,15 @@ mod parser;
mod random;
mod write;
use anyhow::Context;
use heck::ToUpperCamelCase;
use parser::codegen_parser;
use random::codegen_random;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::iter::Peekable;
use std::path::PathBuf;
use std::str::FromStr;
use strong_xml::XmlRead;
use write::codegen_write;
#[derive(Debug, XmlRead)]
#[xml(tag = "amqp")]
@ -101,101 +101,138 @@ struct Doc {
kind: Option<String>,
}
pub fn main() {
let this_file = PathBuf::from_str(file!()).unwrap();
let expected_location = this_file
struct Codegen {
output: Box<dyn Write>,
}
pub fn main() -> anyhow::Result<()> {
let this_file = PathBuf::from_str(file!()).context("own file path")?;
let xtask_root = this_file
.parent()
.unwrap()
.context("codegen directory path")?
.parent()
.unwrap()
.join("amqp0-9-1.xml");
let content = fs::read_to_string(expected_location).unwrap();
.context("src directory path")?
.parent()
.context("xtask root path")?;
let amqp_spec = xtask_root.join("amqp0-9-1.xml");
let project_root = xtask_root.parent().context("get project root parent")?;
let amqp = match Amqp::from_str(&content) {
Ok(amqp) => amqp,
Err(err) => {
eprintln!("{err}");
std::process::exit(1);
let transport_generated_path = project_root.join("amqp_transport/src/methods/generated.rs");
let core_generated_path = project_root.join("amqp_core/src/methods/generated.rs");
let content = fs::read_to_string(amqp_spec).context("read amqp spec file")?;
let amqp = Amqp::from_str(&content).context("parse amqp spec file")?;
let transport_output =
File::create(transport_generated_path).context("transport output file create")?;
let core_output = File::create(core_generated_path).context("core output file create")?;
Codegen {
output: Box::new(transport_output),
}
};
codegen(&amqp);
.transport_codegen(&amqp);
Codegen {
output: Box::new(core_output),
}
.core_codegen(&amqp);
Ok(())
}
impl Codegen {
fn transport_codegen(&mut self, amqp: &Amqp) {
writeln!(self.output, "#![allow(dead_code)]").ok();
writeln!(
self.output,
"// This file has been generated by `xtask/src/codegen`. Do not edit it manually.\n"
)
.ok();
self.codegen_parser(amqp);
self.codegen_write(amqp);
self.codegen_random(amqp);
}
fn codegen(amqp: &Amqp) {
println!("#![allow(dead_code)]");
println!("// This file has been generated by `xtask/src/codegen`. Do not edit it manually.\n");
codegen_domain_defs(amqp);
codegen_class_defs(amqp);
codegen_parser(amqp);
codegen_write(amqp);
codegen_random(amqp);
fn core_codegen(&mut self, amqp: &Amqp) {
writeln!(self.output, "#![allow(dead_code)]").ok();
writeln!(
self.output,
"// This file has been generated by `xtask/src/codegen`. Do not edit it manually.\n"
)
.ok();
self.codegen_domain_defs(amqp);
self.codegen_class_defs(amqp);
}
fn codegen_domain_defs(amqp: &Amqp) {
fn codegen_domain_defs(&mut self, amqp: &Amqp) {
for domain in &amqp.domains {
let invariants = invariants(domain.asserts.iter());
let invariants = self.invariants(domain.asserts.iter());
if let Some(label) = &domain.label {
println!("/// {label}");
writeln!(self.output, "/// {label}").ok();
}
if !invariants.is_empty() {
if domain.label.is_some() {
println!("///");
writeln!(self.output, "///").ok();
}
println!("/// {invariants}");
writeln!(self.output, "/// {invariants}").ok();
}
if !domain.doc.is_empty() {
println!("///");
doc_comment(&domain.doc, 0);
writeln!(self.output, "///").ok();
self.doc_comment(&domain.doc, 0);
}
println!(
writeln!(
self.output,
"pub type {} = {};\n",
domain.name.to_upper_camel_case(),
amqp_type_to_rust_type(&domain.kind),
);
self.amqp_type_to_rust_type(&domain.kind),
)
.ok();
}
}
fn codegen_class_defs(amqp: &Amqp) {
println!("#[derive(Debug, Clone, PartialEq)]");
println!("pub enum Method {{");
fn codegen_class_defs(&mut self, amqp: &Amqp) {
writeln!(self.output, "#[derive(Debug, Clone, PartialEq)]").ok();
writeln!(self.output, "pub enum Method {{").ok();
for class in &amqp.classes {
let enum_name = class.name.to_upper_camel_case();
for method in &class.methods {
let method_name = method.name.to_upper_camel_case();
doc_comment(&class.doc, 4);
doc_comment(&method.doc, 4);
print!(" {enum_name}{method_name}");
self.doc_comment(&class.doc, 4);
self.doc_comment(&method.doc, 4);
write!(self.output, " {enum_name}{method_name}").ok();
if !method.fields.is_empty() {
println!(" {{");
writeln!(self.output, " {{").ok();
for field in &method.fields {
let field_name = snake_case(&field.name);
let (field_type, field_docs) =
get_invariants_with_type(field_type(field), field.asserts.as_ref());
let field_name = self.snake_case(&field.name);
let (field_type, field_docs) = self.get_invariants_with_type(
self.field_type(field),
field.asserts.as_ref(),
);
if !field_docs.is_empty() {
println!(" /// {field_docs}");
writeln!(self.output, " /// {field_docs}").ok();
if !field.doc.is_empty() {
println!(" ///");
doc_comment(&field.doc, 8);
writeln!(self.output, " ///").ok();
self.doc_comment(&field.doc, 8);
}
} else {
doc_comment(&field.doc, 8);
self.doc_comment(&field.doc, 8);
}
println!(" {field_name}: {field_type},");
writeln!(self.output, " {field_name}: {field_type},").ok();
}
println!(" }},");
writeln!(self.output, " }},").ok();
} else {
println!(",");
writeln!(self.output, ",").ok();
}
}
}
println!("}}\n");
writeln!(self.output, "}}\n").ok();
}
fn amqp_type_to_rust_type(amqp_type: &str) -> &'static str {
fn amqp_type_to_rust_type(&self, amqp_type: &str) -> &'static str {
match amqp_type {
"octet" => "u8",
"short" => "u16",
@ -210,11 +247,11 @@ fn amqp_type_to_rust_type(amqp_type: &str) -> &'static str {
}
}
fn field_type(field: &Field) -> &String {
fn field_type<'a>(&self, field: &'a Field) -> &'a String {
field.domain.as_ref().or(field.kind.as_ref()).unwrap()
}
fn resolve_type_from_domain(amqp: &Amqp, domain: &str) -> String {
fn resolve_type_from_domain(&self, amqp: &Amqp, domain: &str) -> String {
amqp.domains
.iter()
.find(|d| d.name == domain)
@ -223,15 +260,15 @@ fn resolve_type_from_domain(amqp: &Amqp, domain: &str) -> String {
}
/// returns (type name, invariant docs)
fn get_invariants_with_type(domain: &str, asserts: &[Assert]) -> (String, String) {
let additional_docs = invariants(asserts.iter());
fn get_invariants_with_type(&self, domain: &str, asserts: &[Assert]) -> (String, String) {
let additional_docs = self.invariants(asserts.iter());
let type_name = domain.to_upper_camel_case();
(type_name, additional_docs)
}
fn snake_case(ident: &str) -> String {
fn snake_case(&self, ident: &str) -> String {
use heck::ToSnakeCase;
if ident == "type" {
@ -242,16 +279,17 @@ fn snake_case(ident: &str) -> String {
}
fn subsequent_bit_fields<'a>(
amqp: &Amqp,
&self,
bit_field: &'a Field,
iter: &mut Peekable<impl Iterator<Item = &'a Field>>,
amqp: &Amqp,
) -> Vec<&'a Field> {
let mut fields_with_bit = vec![bit_field];
loop {
if iter
.peek()
.map(|f| resolve_type_from_domain(amqp, field_type(f)) == "bit")
.map(|f| self.resolve_type_from_domain(amqp, self.field_type(f)) == "bit")
.unwrap_or(false)
{
fields_with_bit.push(iter.next().unwrap());
@ -262,7 +300,7 @@ fn subsequent_bit_fields<'a>(
fields_with_bit
}
fn invariants<'a>(asserts: impl Iterator<Item = &'a Assert>) -> String {
fn invariants<'a>(&self, asserts: impl Iterator<Item = &'a Assert>) -> String {
asserts
.map(|assert| match &*assert.check {
"notnull" => "must not be null".to_string(),
@ -281,7 +319,7 @@ fn invariants<'a>(asserts: impl Iterator<Item = &'a Assert>) -> String {
.join(", ")
}
fn doc_comment(docs: &[Doc], indent: usize) {
fn doc_comment(&mut self, docs: &[Doc], indent: usize) {
for doc in docs {
if doc.kind == Some("grammar".to_string()) {
continue;
@ -290,7 +328,8 @@ fn doc_comment(docs: &[Doc], indent: usize) {
let line = line.trim();
if !line.is_empty() {
let indent = " ".repeat(indent);
println!("{indent}/// {line}");
writeln!(self.output, "{indent}/// {line}").ok();
}
}
}
}

View file

@ -1,7 +1,5 @@
use super::{
field_type, resolve_type_from_domain, snake_case, subsequent_bit_fields, Amqp, Assert, Class,
Domain, Method,
};
use super::{Amqp, Assert, Class, Domain, Method};
use crate::codegen::Codegen;
use heck::{ToSnakeCase, ToUpperCamelCase};
use itertools::Itertools;
@ -17,10 +15,12 @@ fn domain_function_name(domain_name: &str) -> String {
format!("domain_{domain_name}")
}
pub(super) fn codegen_parser(amqp: &Amqp) {
println!(
impl Codegen {
pub(super) fn codegen_parser(&mut self, amqp: &Amqp) {
writeln!(
self.output,
"pub mod parse {{
use super::*;
use amqp_core::methods::*;
use crate::methods::parse_helper::*;
use crate::error::TransError;
use nom::{{branch::alt, bytes::complete::tag}};
@ -29,8 +29,10 @@ use once_cell::sync::Lazy;
pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>;
"
);
println!(
)
.ok();
writeln!(
self.output,
"pub fn parse_method(input: &[u8]) -> Result<(&[u8], Method), nom::Err<TransError>> {{
alt(({}))(input)
}}",
@ -38,16 +40,18 @@ pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>;
.iter()
.map(|class| class.name.to_snake_case())
.join(", ")
);
)
.ok();
for domain in &amqp.domains {
domain_parser(domain);
self.domain_parser(domain);
}
for class in &amqp.classes {
let class_name = class.name.to_snake_case();
function(&class_name, "Method", || {
self.function(&class_name, "Method");
let class_index = class.index;
let all_methods = class
.methods
@ -55,126 +59,158 @@ pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>;
.map(method_function_name(&class_name))
.join(", ");
let class_name_raw = &class.name;
println!(
writeln!(
self.output,
r#" let (input, _) = tag({class_index}_u16.to_be_bytes())(input)?;
alt(({all_methods}))(input).map_err(fail_err("class {class_name_raw}"))"#
);
});
)
.ok();
writeln!(self.output, "}}").ok();
for method in &class.methods {
method_parser(amqp, class, method);
self.method_parser(class, method, amqp);
}
}
println!("\n}}");
writeln!(self.output, "\n}}").ok();
}
fn domain_parser(domain: &Domain) {
fn domain_parser(&mut self, domain: &Domain) {
let fn_name = domain_function_name(&domain.name);
let type_name = domain.kind.to_snake_case();
// don't even bother with bit domains, do them manually at call site
if type_name != "bit" {
function(&fn_name, &domain.name.to_upper_camel_case(), || {
self.function(&fn_name, &domain.name.to_upper_camel_case());
if domain.asserts.is_empty() {
println!(" {type_name}(input)");
writeln!(self.output, " {type_name}(input)").ok();
} else {
println!(" let (input, result) = {type_name}(input)?;");
writeln!(
self.output,
" let (input, result) = {type_name}(input)?;"
)
.ok();
for assert in &domain.asserts {
assert_check(assert, &type_name, "result");
self.assert_check(assert, &type_name, "result");
}
println!(" Ok((input, result))");
writeln!(self.output, " Ok((input, result))").ok();
}
});
writeln!(self.output, "}}").ok();
}
}
fn method_parser(amqp: &Amqp, class: &Class, method: &Method) {
fn method_parser(&mut self, class: &Class, method: &Method, amqp: &Amqp) {
let class_name = class.name.to_snake_case();
let method_name_raw = &method.name;
let function_name = method_function_name(&class_name)(method);
function(&function_name, "Method", || {
self.function(&function_name, "Method");
let method_index = method.index;
println!(r#" let (input, _) = tag({method_index}_u16.to_be_bytes())(input)?;"#);
writeln!(
self.output,
r#" let (input, _) = tag({method_index}_u16.to_be_bytes())(input)?;"#
)
.ok();
let mut iter = method.fields.iter().peekable();
while let Some(field) = iter.next() {
let field_name_raw = &field.name;
let type_name = resolve_type_from_domain(amqp, field_type(field));
let type_name = self.resolve_type_from_domain(amqp, self.field_type(field));
if type_name == "bit" {
let fields_with_bit = subsequent_bit_fields(amqp, field, &mut iter);
let fields_with_bit = self.subsequent_bit_fields(field, &mut iter, amqp);
let amount = fields_with_bit.len();
println!(
writeln!(
self.output,
r#" let (input, bits) = bit(input, {amount}).map_err(fail_err("field {field_name_raw} in method {method_name_raw}"))?;"#
);
).ok();
for (i, field) in fields_with_bit.iter().enumerate() {
let field_name = snake_case(&field.name);
println!(" let {field_name} = bits[{i}];");
let field_name = self.snake_case(&field.name);
writeln!(self.output, " let {field_name} = bits[{i}];").ok();
}
} else {
let fn_name = domain_function_name(field_type(field));
let field_name = snake_case(&field.name);
println!(
let fn_name = domain_function_name(self.field_type(field));
let field_name = self.snake_case(&field.name);
writeln!(
self.output,
r#" let (input, {field_name}) = {fn_name}(input).map_err(fail_err("field {field_name_raw} in method {method_name_raw}"))?;"#
);
).ok();
for assert in &field.asserts {
assert_check(assert, &type_name, &field_name);
self.assert_check(assert, &type_name, &field_name);
}
}
}
let class_name = class_name.to_upper_camel_case();
let method_name = method.name.to_upper_camel_case();
println!(" Ok((input, Method::{class_name}{method_name} {{");
writeln!(
self.output,
" Ok((input, Method::{class_name}{method_name} {{"
)
.ok();
for field in &method.fields {
let field_name = snake_case(&field.name);
println!(" {field_name},");
let field_name = self.snake_case(&field.name);
writeln!(self.output, " {field_name},").ok();
}
println!(" }}))");
});
writeln!(self.output, " }}))").ok();
writeln!(self.output, "}}").ok();
}
fn assert_check(assert: &Assert, type_name: &str, var_name: &str) {
fn assert_check(&mut self, assert: &Assert, type_name: &str, var_name: &str) {
match &*assert.check {
"notnull" => match type_name {
"shortstr" | "longstr" => {
println!(
writeln!(
self.output,
r#" if {var_name}.is_empty() {{ fail!("string was null for field {var_name}") }}"#
);
).ok();
}
"short" => {
println!(
writeln!(
self.output,
r#" if {var_name} == 0 {{ fail!("number was 0 for field {var_name}") }}"#
);
)
.ok();
}
_ => unimplemented!(),
},
"regexp" => {
let value = assert.value.as_ref().unwrap();
println!(
writeln!(
self.output,
r#" static REGEX: Lazy<Regex> = Lazy::new(|| Regex::new(r"{value}").unwrap());"#
);
).ok();
let cause = format!("regex `{value}` did not match value for field {var_name}");
println!(r#" if !REGEX.is_match(&{var_name}) {{ fail!(r"{cause}") }}"#);
writeln!(
self.output,
r#" if !REGEX.is_match(&{var_name}) {{ fail!(r"{cause}") }}"#
)
.ok();
}
"le" => {} // can't validate this here
"length" => {
let length = assert.value.as_ref().unwrap();
let cause = format!("value is shorter than {length} for field {var_name}");
println!(r#" if {var_name}.len() > {length} {{ fail!("{cause}") }}"#);
writeln!(
self.output,
r#" if {var_name}.len() > {length} {{ fail!("{cause}") }}"#
)
.ok();
}
_ => unimplemented!(),
}
}
fn function<F>(name: &str, ret_ty: &str, body: F)
where
F: FnOnce(),
{
println!("fn {name}(input: &[u8]) -> IResult<'_, {ret_ty}> {{");
body();
println!("}}");
fn function(&mut self, name: &str, ret_ty: &str) {
writeln!(
self.output,
"fn {name}(input: &[u8]) -> IResult<'_, {ret_ty}> {{"
)
.ok();
}
}

View file

@ -1,59 +1,78 @@
use super::{snake_case, Amqp};
use crate::codegen::{Amqp, Codegen};
use heck::ToUpperCamelCase;
pub(super) fn codegen_random(amqp: &Amqp) {
println!(
impl Codegen {
pub fn codegen_random(&mut self, amqp: &Amqp) {
writeln!(
self.output,
"
mod random {{
use rand::Rng;
use amqp_core::methods::*;
use crate::methods::RandomMethod;
use super::*;
"
);
)
.ok();
writeln!(
self.output,
"impl<R: Rng> RandomMethod<R> for Method {{
#[allow(unused_variables)]
fn random(rng: &mut R) -> Self {{"
)
.ok();
impl_random("Method", || {
let class_lens = amqp.classes.len();
println!(" match rng.gen_range(0u32..{class_lens}) {{");
writeln!(
self.output,
" match rng.gen_range(0u32..{class_lens}) {{"
)
.ok();
for (i, class) in amqp.classes.iter().enumerate() {
let class_name = class.name.to_upper_camel_case();
println!(" {i} => {{");
writeln!(self.output, " {i} => {{").ok();
let method_len = class.methods.len();
println!(" match rng.gen_range(0u32..{method_len}) {{");
writeln!(
self.output,
" match rng.gen_range(0u32..{method_len}) {{"
)
.ok();
for (i, method) in class.methods.iter().enumerate() {
let method_name = method.name.to_upper_camel_case();
println!(" {i} => Method::{class_name}{method_name} {{");
writeln!(
self.output,
" {i} => Method::{class_name}{method_name} {{"
)
.ok();
for field in &method.fields {
let field_name = snake_case(&field.name);
println!(" {field_name}: RandomMethod::random(rng),");
let field_name = self.snake_case(&field.name);
writeln!(
self.output,
" {field_name}: RandomMethod::random(rng),"
)
.ok();
}
println!(" }},");
writeln!(self.output, " }},").ok();
}
println!(
writeln!(
self.output,
" _ => unreachable!(),
}}"
);
)
.ok();
println!(" }}");
writeln!(self.output, " }}").ok();
}
println!(
writeln!(
self.output,
" _ => unreachable!(),
}}"
);
});
)
.ok();
writeln!(self.output, " }}\n}}").ok();
println!("}}");
writeln!(self.output, "}}").ok();
}
fn impl_random(name: &str, body: impl FnOnce()) {
println!(
"impl<R: Rng> RandomMethod<R> for {name} {{
#[allow(unused_variables)]
fn random(rng: &mut R) -> Self {{"
);
body();
println!(" }}\n}}");
}

View file

@ -1,17 +1,20 @@
use super::{field_type, resolve_type_from_domain, snake_case, subsequent_bit_fields, Amqp};
use crate::codegen::{Amqp, Codegen};
use heck::ToUpperCamelCase;
pub(super) fn codegen_write(amqp: &Amqp) {
println!(
impl Codegen {
pub fn codegen_write(&mut self, amqp: &Amqp) {
writeln!(
self.output,
"pub mod write {{
use super::*;
use amqp_core::methods::*;
use crate::methods::write_helper::*;
use crate::error::TransError;
use std::io::Write;
pub fn write_method<W: Write>(method: Method, mut writer: W) -> Result<(), TransError> {{
match method {{"
);
)
.ok();
for class in &amqp.classes {
let class_name = class.name.to_upper_camel_case();
@ -19,40 +22,51 @@ pub fn write_method<W: Write>(method: Method, mut writer: W) -> Result<(), Trans
for method in &class.methods {
let method_name = method.name.to_upper_camel_case();
let method_index = method.index;
println!(" Method::{class_name}{method_name} {{");
writeln!(self.output, " Method::{class_name}{method_name} {{").ok();
for field in &method.fields {
let field_name = snake_case(&field.name);
println!(" {field_name},");
let field_name = self.snake_case(&field.name);
writeln!(self.output, " {field_name},").ok();
}
println!(" }} => {{");
writeln!(self.output, " }} => {{").ok();
let [ci0, ci1] = class_index.to_be_bytes();
let [mi0, mi1] = method_index.to_be_bytes();
println!(" writer.write_all(&[{ci0}, {ci1}, {mi0}, {mi1}])?;");
writeln!(
self.output,
" writer.write_all(&[{ci0}, {ci1}, {mi0}, {mi1}])?;"
)
.ok();
let mut iter = method.fields.iter().peekable();
while let Some(field) = iter.next() {
let field_name = snake_case(&field.name);
let type_name = resolve_type_from_domain(amqp, field_type(field));
let field_name = self.snake_case(&field.name);
let type_name = self.resolve_type_from_domain(amqp, self.field_type(field));
if type_name == "bit" {
let fields_with_bit = subsequent_bit_fields(amqp, field, &mut iter);
print!(" bit(&[");
let fields_with_bit = self.subsequent_bit_fields(field, &mut iter, amqp);
write!(self.output, " bit(&[").ok();
for field in fields_with_bit {
let field_name = snake_case(&field.name);
print!("{field_name}, ");
let field_name = self.snake_case(&field.name);
write!(self.output, "{field_name}, ").ok();
}
println!("], &mut writer)?;");
writeln!(self.output, "], &mut writer)?;").ok();
} else {
println!(" {type_name}({field_name}, &mut writer)?;");
writeln!(
self.output,
" {type_name}({field_name}, &mut writer)?;"
)
.ok();
}
}
println!(" }}");
writeln!(self.output, " }}").ok();
}
}
println!(
writeln!(
self.output,
" }}
Ok(())
}}
}}"
);
)
.ok();
}
}

View file

@ -1,22 +1,24 @@
mod codegen;
fn main() {
fn main() -> anyhow::Result<()> {
let command = std::env::args().nth(1).unwrap_or_else(|| {
eprintln!("No task provided");
eprintln!("Error: No task provided");
help();
std::process::exit(1);
});
match command.as_str() {
"generate" | "gen" => codegen::main(),
_ => eprintln!("Unknown command {command}."),
_ => {
eprintln!("Unknown command {command}.");
Ok(())
}
}
}
fn help() {
println!(
"Available tasks:
generate - Generate amqp method code in `amqp_transport/src/methods/generated.rs.
Dumps code to stdout and should be redirected manually."
generate, gen - Generate amqp method code in `amqp_transport/src/methods/generated.rs and amqp_core/src/methods/generated.rs"
);
}