diff --git a/Cargo.lock b/Cargo.lock index d24442f..4c40ca7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,6 +30,7 @@ version = "0.1.0" dependencies = [ "bytes", "parking_lot", + "rand", "smallvec", "thiserror", "uuid", @@ -51,8 +52,10 @@ name = "amqp_messaging" version = "0.1.0" dependencies = [ "amqp_core", + "parking_lot", "tokio", "tracing", + "uuid", ] [[package]] diff --git a/amqp_core/Cargo.toml b/amqp_core/Cargo.toml index ce92335..3adb383 100644 --- a/amqp_core/Cargo.toml +++ b/amqp_core/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] bytes = "1.1.0" parking_lot = "0.12.0" +rand = "0.8.5" smallvec = { version = "1.8.0", features = ["union"] } thiserror = "1.0.30" uuid = "0.8.2" diff --git a/amqp_core/src/lib.rs b/amqp_core/src/lib.rs index fc875f1..1901e88 100644 --- a/amqp_core/src/lib.rs +++ b/amqp_core/src/lib.rs @@ -25,6 +25,8 @@ impl Default for GlobalData { inner: Arc::new(Mutex::new(GlobalDataInner { connections: HashMap::new(), channels: HashMap::new(), + queues: HashMap::new(), + default_exchange: HashMap::new(), })), } } @@ -40,6 +42,9 @@ impl GlobalData { pub struct GlobalDataInner { pub connections: HashMap, pub channels: HashMap, + pub queues: HashMap, + /// Todo: This is just for testing and will be removed later! + pub default_exchange: HashMap, } pub type ConnectionHandle = Handle; @@ -104,3 +109,14 @@ impl Channel { global_data.channels.remove(&self.id); } } + +pub fn gen_uuid() -> Uuid { + Uuid::from_bytes(rand::random()) +} + +#[macro_export] +macro_rules! amqp_todo { + () => { + return Err(::amqp_core::error::ConException::NotImplemented.into()) + }; +} diff --git a/amqp_core/src/queue.rs b/amqp_core/src/queue.rs index f8074a8..ecfa9c9 100644 --- a/amqp_core/src/queue.rs +++ b/amqp_core/src/queue.rs @@ -12,6 +12,7 @@ pub struct RawQueue { pub name: String, pub messages: Mutex>, // use a concurrent linked list??? pub durable: bool, + pub exclusive: Option, /// Whether the queue will automatically be deleted when no consumers uses it anymore. /// The queue can always be manually deleted. /// If auto-delete is enabled, it keeps track of the consumer count. diff --git a/amqp_messaging/Cargo.toml b/amqp_messaging/Cargo.toml index 4e17b35..b5a5eba 100644 --- a/amqp_messaging/Cargo.toml +++ b/amqp_messaging/Cargo.toml @@ -7,5 +7,7 @@ edition = "2021" [dependencies] amqp_core = { path = "../amqp_core" } +parking_lot = "0.12.0" tracing = "0.1.31" -tokio = { version = "1.17.0", features = ["full"] } \ No newline at end of file +tokio = { version = "1.17.0", features = ["full"] } +uuid = "0.8.2" \ No newline at end of file diff --git a/amqp_messaging/src/methods.rs b/amqp_messaging/src/methods.rs deleted file mode 100644 index cbea527..0000000 --- a/amqp_messaging/src/methods.rs +++ /dev/null @@ -1,60 +0,0 @@ -use amqp_core::error::{ConException, ProtocolError}; -use amqp_core::message::Message; -use amqp_core::methods::Method; -use amqp_core::ChannelHandle; -use tracing::{debug, info}; - -pub async fn handle_basic_publish(_channel_handle: ChannelHandle, message: Message) { - info!( - ?message, - "Someone has summoned the almighty Basic.Publish handler" - ); -} - -pub async fn handle_method( - _channel_handle: ChannelHandle, - method: Method, -) -> Result<(), ProtocolError> { - match method { - Method::ExchangeDeclare { .. } => Err(ConException::NotImplemented.into()), - Method::ExchangeDeclareOk => Err(ConException::NotImplemented.into()), - Method::ExchangeDelete { .. } => Err(ConException::NotImplemented.into()), - Method::ExchangeDeleteOk => Err(ConException::NotImplemented.into()), - Method::QueueDeclare { .. } => Err(ConException::NotImplemented.into()), - Method::QueueDeclareOk { .. } => Err(ConException::NotImplemented.into()), - Method::QueueBind { .. } => Err(ConException::NotImplemented.into()), - Method::QueueBindOk => Err(ConException::NotImplemented.into()), - Method::QueueUnbind { .. } => Err(ConException::NotImplemented.into()), - Method::QueueUnbindOk => Err(ConException::NotImplemented.into()), - Method::QueuePurge { .. } => Err(ConException::NotImplemented.into()), - Method::QueuePurgeOk { .. } => Err(ConException::NotImplemented.into()), - Method::QueueDelete { .. } => Err(ConException::NotImplemented.into()), - Method::QueueDeleteOk { .. } => Err(ConException::NotImplemented.into()), - Method::BasicQos { .. } => Err(ConException::NotImplemented.into()), - Method::BasicQosOk => Err(ConException::NotImplemented.into()), - Method::BasicConsume { .. } => Err(ConException::NotImplemented.into()), - Method::BasicConsumeOk { .. } => Err(ConException::NotImplemented.into()), - Method::BasicCancel { .. } => Err(ConException::NotImplemented.into()), - Method::BasicCancelOk { .. } => Err(ConException::NotImplemented.into()), - Method::BasicReturn { .. } => Err(ConException::NotImplemented.into()), - Method::BasicDeliver { .. } => Err(ConException::NotImplemented.into()), - Method::BasicGet { .. } => Err(ConException::NotImplemented.into()), - Method::BasicGetOk { .. } => Err(ConException::NotImplemented.into()), - Method::BasicGetEmpty { .. } => Err(ConException::NotImplemented.into()), - Method::BasicAck { .. } => Err(ConException::NotImplemented.into()), - Method::BasicReject { .. } => Err(ConException::NotImplemented.into()), - Method::BasicRecoverAsync { .. } => Err(ConException::NotImplemented.into()), - Method::BasicRecover { .. } => Err(ConException::NotImplemented.into()), - Method::BasicRecoverOk => Err(ConException::NotImplemented.into()), - Method::TxSelect - | Method::TxSelectOk - | Method::TxCommit - | Method::TxCommitOk - | Method::TxRollback - | Method::TxRollbackOk => Err(ConException::NotImplemented.into()), - Method::BasicPublish { .. } => { - unreachable!("Basic.Publish is handled somewhere else because it has a body") - } - _ => unreachable!("Method handled by transport layer"), - } -} diff --git a/amqp_messaging/src/methods/consume.rs b/amqp_messaging/src/methods/consume.rs new file mode 100644 index 0000000..7148705 --- /dev/null +++ b/amqp_messaging/src/methods/consume.rs @@ -0,0 +1,17 @@ +use amqp_core::error::ProtocolError; +use amqp_core::methods::{Bit, ConsumerTag, NoAck, NoLocal, NoWait, QueueName, Table}; +use amqp_core::ChannelHandle; + +#[allow(clippy::too_many_arguments)] +pub async fn consume( + _channel_handle: ChannelHandle, + _queue: QueueName, + _consumer_tag: ConsumerTag, + _no_local: NoLocal, + _no_ack: NoAck, + _exclusive: Bit, + _no_wait: NoWait, + _arguments: Table, +) -> Result<(), ProtocolError> { + Ok(()) +} diff --git a/amqp_messaging/src/methods/mod.rs b/amqp_messaging/src/methods/mod.rs new file mode 100644 index 0000000..9a4c57d --- /dev/null +++ b/amqp_messaging/src/methods/mod.rs @@ -0,0 +1,125 @@ +mod consume; +mod queue; + +use amqp_core::amqp_todo; +use amqp_core::error::ProtocolError; +use amqp_core::message::Message; +use amqp_core::methods::Method; +use amqp_core::ChannelHandle; +use tracing::info; + +pub async fn handle_basic_publish(_channel_handle: ChannelHandle, message: Message) { + info!( + ?message, + "Someone has summoned the almighty Basic.Publish handler" + ); +} + +pub async fn handle_method( + channel_handle: ChannelHandle, + method: Method, +) -> Result<(), ProtocolError> { + info!(?method, "Handling method"); + + match method { + Method::ExchangeDeclare { .. } => amqp_todo!(), + Method::ExchangeDeclareOk => amqp_todo!(), + Method::ExchangeDelete { .. } => amqp_todo!(), + Method::ExchangeDeleteOk => amqp_todo!(), + Method::QueueDeclare { + queue, + passive, + durable, + exclusive, + auto_delete, + no_wait, + arguments, + .. + } => { + queue::declare( + channel_handle, + queue, + passive, + durable, + exclusive, + auto_delete, + no_wait, + arguments, + ) + .await + } + Method::QueueDeclareOk { .. } => amqp_todo!(), + Method::QueueBind { + queue, + exchange, + routing_key, + no_wait, + arguments, + .. + } => { + queue::bind( + channel_handle, + queue, + exchange, + routing_key, + no_wait, + arguments, + ) + .await + } + Method::QueueBindOk => amqp_todo!(), + Method::QueueUnbind { .. } => amqp_todo!(), + Method::QueueUnbindOk => amqp_todo!(), + Method::QueuePurge { .. } => amqp_todo!(), + Method::QueuePurgeOk { .. } => amqp_todo!(), + Method::QueueDelete { .. } => amqp_todo!(), + Method::QueueDeleteOk { .. } => amqp_todo!(), + Method::BasicQos { .. } => amqp_todo!(), + Method::BasicQosOk => amqp_todo!(), + Method::BasicConsume { + queue, + consumer_tag, + no_local, + no_ack, + exclusive, + no_wait, + arguments, + .. + } => { + consume::consume( + channel_handle, + queue, + consumer_tag, + no_local, + no_ack, + exclusive, + no_wait, + arguments, + ) + .await + } + Method::BasicConsumeOk { .. } => amqp_todo!(), + Method::BasicCancel { .. } => amqp_todo!(), + Method::BasicCancelOk { .. } => amqp_todo!(), + Method::BasicReturn { .. } => amqp_todo!(), + Method::BasicDeliver { .. } => amqp_todo!(), + Method::BasicGet { .. } => amqp_todo!(), + Method::BasicGetOk { .. } => amqp_todo!(), + Method::BasicGetEmpty { .. } => amqp_todo!(), + Method::BasicAck { .. } => amqp_todo!(), + Method::BasicReject { .. } => amqp_todo!(), + Method::BasicRecoverAsync { .. } => amqp_todo!(), + Method::BasicRecover { .. } => amqp_todo!(), + Method::BasicRecoverOk => amqp_todo!(), + Method::TxSelect + | Method::TxSelectOk + | Method::TxCommit + | Method::TxCommitOk + | Method::TxRollback + | Method::TxRollbackOk => amqp_todo!(), + Method::BasicPublish { .. } => { + unreachable!("Basic.Publish is handled somewhere else because it has a body") + } + _ => unreachable!("Method handled by transport layer"), + } +} diff --git a/amqp_messaging/src/methods/queue.rs b/amqp_messaging/src/methods/queue.rs new file mode 100644 index 0000000..e3ff92a --- /dev/null +++ b/amqp_messaging/src/methods/queue.rs @@ -0,0 +1,80 @@ +#![deny(clippy::future_not_send)] + +use amqp_core::error::{ConException, ProtocolError}; +use amqp_core::methods::{Bit, ExchangeName, NoWait, QueueName, Shortstr, Table}; +use amqp_core::queue::{QueueDeletion, RawQueue}; +use amqp_core::ChannelHandle; +use amqp_core::{amqp_todo, GlobalData}; +use parking_lot::Mutex; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; +use uuid::Uuid; + +#[allow(clippy::too_many_arguments)] +pub async fn declare( + channel_handle: ChannelHandle, + queue_name: QueueName, + passive: Bit, + durable: Bit, + exclusive: Bit, + auto_delete: Bit, + no_wait: NoWait, + arguments: Table, +) -> Result<(), ProtocolError> { + if !arguments.is_empty() { + return Err(ConException::Todo.into()); + } + + let (global_data, id) = { + let channel = channel_handle.lock(); + + if passive || no_wait { + amqp_todo!(); + } + + let id = amqp_core::gen_uuid(); + let queue = Arc::new(RawQueue { + id, + name: queue_name.clone(), + messages: Mutex::default(), + durable, + exclusive: exclusive.then(|| channel.id), + deletion: if auto_delete { + QueueDeletion::Auto(AtomicUsize::default()) + } else { + QueueDeletion::Manual + }, + }); + + let global_data = channel.global_data.clone(); + + { + let mut global_data_lock = global_data.lock(); + global_data_lock.queues.insert(id, queue); + } + + (global_data, id) + }; + + bind_queue(global_data, id, (), queue_name).await +} + +pub async fn bind( + _channel_handle: ChannelHandle, + _queue: QueueName, + _exchange: ExchangeName, + _routing_key: Shortstr, + _no_wait: NoWait, + _arguments: Table, +) -> Result<(), ProtocolError> { + amqp_todo!(); +} + +async fn bind_queue( + _global_data: GlobalData, + _queue: Uuid, + _exchange: (), + _routing_key: String, +) -> Result<(), ProtocolError> { + amqp_todo!(); +} diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 1b5aac4..fd20366 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -353,7 +353,7 @@ impl Connection { } = method { let message = RawMessage { - id: Uuid::from_bytes(rand::random()), + id: amqp_core::gen_uuid(), properties: header.property_fields, routing: RoutingInformation { exchange, @@ -380,7 +380,7 @@ impl Connection { } async fn channel_open(&mut self, channel_id: ChannelId) -> Result<()> { - let id = Uuid::from_bytes(rand::random()); + let id = amqp_core::gen_uuid(); let channel_handle = amqp_core::Channel::new_handle( id, channel_id.num(), diff --git a/amqp_transport/src/lib.rs b/amqp_transport/src/lib.rs index f28c3fd..9705bc6 100644 --- a/amqp_transport/src/lib.rs +++ b/amqp_transport/src/lib.rs @@ -15,7 +15,6 @@ use amqp_core::GlobalData; use anyhow::Result; use tokio::net; use tracing::{info, info_span, Instrument}; -use uuid::Uuid; pub async fn do_thing_i_guess(global_data: GlobalData) -> Result<()> { info!("Binding TCP listener..."); @@ -25,7 +24,7 @@ pub async fn do_thing_i_guess(global_data: GlobalData) -> Result<()> { loop { let (stream, peer_addr) = listener.accept().await?; - let id = Uuid::from_bytes(rand::random()); + let id = amqp_core::gen_uuid(); info!(local_addr = ?stream.local_addr(), %id, "Accepted new connection"); let span = info_span!("client-connection", %id); diff --git a/amqp_transport/src/methods/parse_helper.rs b/amqp_transport/src/methods/parse_helper.rs index 0638d43..dfdc3d0 100644 --- a/amqp_transport/src/methods/parse_helper.rs +++ b/amqp_transport/src/methods/parse_helper.rs @@ -42,8 +42,8 @@ pub fn fail_err>(msg: S) -> impl FnOnce(Err) -> Err< Err::Failure(ConException::SyntaxError(stack).into()) } } -pub fn err_other>(msg: S) -> impl FnOnce(E) -> Err { - move |_| Err::Error(ConException::SyntaxError(vec![msg.into()]).into()) +pub fn other_fail>(msg: S) -> impl FnOnce(E) -> Err { + move |_| Err::Failure(ConException::SyntaxError(vec![msg.into()]).into()) } #[macro_export] @@ -105,7 +105,7 @@ pub fn bit(input: &[u8], amount: usize) -> IResult<'_, Vec> { pub fn shortstr(input: &[u8]) -> IResult<'_, Shortstr> { let (input, len) = u8(input)?; let (input, str_data) = take(usize::from(len))(input)?; - let data = String::from_utf8(str_data.into()).map_err(err_other("shortstr"))?; + let data = String::from_utf8(str_data.into()).map_err(other_fail("shortstr"))?; Ok((input, data)) } diff --git a/amqp_transport/src/sasl.rs b/amqp_transport/src/sasl.rs index 18d6992..494c433 100644 --- a/amqp_transport/src/sasl.rs +++ b/amqp_transport/src/sasl.rs @@ -16,9 +16,9 @@ pub fn parse_sasl_plain_response(response: &[u8]) -> Result { .split(|&n| n == 0) .map(|bytes| String::from_utf8(bytes.into()).map_err(|_| ConException::Todo)); - let authorization_identity = parts.next().ok_or_else(|| ConException::Todo)??; - let authentication_identity = parts.next().ok_or_else(|| ConException::Todo)??; - let password = parts.next().ok_or_else(|| ConException::Todo)??; + let authorization_identity = parts.next().ok_or(ConException::Todo)??; + let authentication_identity = parts.next().ok_or(ConException::Todo)??; + let password = parts.next().ok_or(ConException::Todo)??; Ok(PlainUser { authorization_identity,