diff --git a/amqp_core/src/lib.rs b/amqp_core/src/lib.rs index da99e03..67cf362 100644 --- a/amqp_core/src/lib.rs +++ b/amqp_core/src/lib.rs @@ -14,14 +14,24 @@ use crate::{ }; use connection::{ChannelId, ConnectionId}; use parking_lot::Mutex; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + fmt::{Debug, Formatter}, + sync::Arc, +}; use uuid::Uuid; -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct GlobalData { inner: Arc>, } +impl Debug for GlobalData { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str("[global data]") + } +} + impl Default for GlobalData { fn default() -> Self { Self { diff --git a/amqp_core/src/macros.rs b/amqp_core/src/macros.rs index 62a0356..2500ef9 100644 --- a/amqp_core/src/macros.rs +++ b/amqp_core/src/macros.rs @@ -14,7 +14,7 @@ macro_rules! newtype_id { impl ::std::fmt::Display for $name { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - self.0.fmt(f) + ::std::fmt::Display::fmt(&self.0, f) } } diff --git a/amqp_core/src/queue.rs b/amqp_core/src/queue.rs index 77f507f..a3a6202 100644 --- a/amqp_core/src/queue.rs +++ b/amqp_core/src/queue.rs @@ -7,10 +7,18 @@ use parking_lot::Mutex; use std::{ borrow::Borrow, collections::HashMap, + fmt::{Debug, Display, Formatter}, sync::{atomic::AtomicUsize, Arc}, }; +use tokio::sync::mpsc; -pub type Queue = Arc; +pub type Queue = Arc; + +#[derive(Debug)] +pub enum QueueEvent {} + +pub type QueueEventSender = mpsc::Sender; +pub type QueueEventReceiver = mpsc::Receiver; newtype_id!(pub QueueId); @@ -26,8 +34,14 @@ impl Borrow for QueueName { } } +impl Display for QueueName { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.0, f) + } +} + #[derive(Debug)] -pub struct RawQueue { +pub struct QueueInner { pub id: QueueId, pub name: QueueName, pub messages: Mutex>, // use a concurrent linked list??? @@ -38,6 +52,7 @@ pub struct RawQueue { /// If auto-delete is enabled, it keeps track of the consumer count. pub deletion: QueueDeletion, pub consumers: Mutex>, + pub event_send: QueueEventSender, } #[derive(Debug)] diff --git a/amqp_messaging/src/lib.rs b/amqp_messaging/src/lib.rs index f39dd79..426495f 100644 --- a/amqp_messaging/src/lib.rs +++ b/amqp_messaging/src/lib.rs @@ -3,5 +3,6 @@ use amqp_core::error::ProtocolError; pub mod methods; +mod queue; type Result = std::result::Result; diff --git a/amqp_messaging/src/methods/publish.rs b/amqp_messaging/src/methods/publish.rs index 79ec6a4..0efeb11 100644 --- a/amqp_messaging/src/methods/publish.rs +++ b/amqp_messaging/src/methods/publish.rs @@ -2,7 +2,7 @@ use crate::Result; use amqp_core::{ amqp_todo, connection::{Channel, ConnectionEvent}, - error::{ChannelException, ConException, ProtocolError}, + error::{ChannelException, ConException}, message::Message, methods::{BasicDeliver, Method}, }; diff --git a/amqp_messaging/src/methods/queue.rs b/amqp_messaging/src/methods/queue.rs index e0c03da..8cc9e4b 100644 --- a/amqp_messaging/src/methods/queue.rs +++ b/amqp_messaging/src/methods/queue.rs @@ -1,13 +1,15 @@ -use crate::Result; +use crate::{queue::QueueTask, Result}; use amqp_core::{ amqp_todo, connection::Channel, methods::{Method, QueueBind, QueueDeclare, QueueDeclareOk}, - queue::{QueueDeletion, QueueId, QueueName, RawQueue}, + queue::{QueueDeletion, QueueId, QueueInner, QueueName}, GlobalData, }; use parking_lot::Mutex; use std::sync::{atomic::AtomicUsize, Arc}; +use tokio::sync::mpsc; +use tracing::{info_span, Instrument}; pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result { let QueueDeclare { @@ -33,37 +35,41 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result amqp_todo!(); } - let global_data = { - let global_data = channel.global_data.clone(); + let global_data = channel.global_data.clone(); - let id = QueueId::random(); - let queue = Arc::new(RawQueue { - id, - name: queue_name.clone(), - messages: Mutex::default(), - durable, - exclusive: exclusive.then(|| channel.id), - deletion: if auto_delete { - QueueDeletion::Auto(AtomicUsize::default()) - } else { - QueueDeletion::Manual - }, - consumers: Mutex::default(), - }); + let (event_send, event_recv) = mpsc::channel(10); - { - let mut global_data_lock = global_data.lock(); + let id = QueueId::random(); + let queue = Arc::new(QueueInner { + id, + name: queue_name.clone(), + messages: Mutex::default(), + durable, + exclusive: exclusive.then(|| channel.id), + deletion: if auto_delete { + QueueDeletion::Auto(AtomicUsize::default()) + } else { + QueueDeletion::Manual + }, + consumers: Mutex::default(), + event_send, + }); - global_data_lock - .queues - .entry(queue_name.clone()) - .or_insert(queue); - } + { + let mut global_data_lock = global_data.lock(); - global_data - }; + global_data_lock + .queues + .entry(queue_name.clone()) + .or_insert_with(|| queue.clone()); + } - bind_queue(global_data, (), queue_name.clone().into_inner())?; + bind_queue(global_data.clone(), (), queue_name.clone().into_inner())?; + + let queue_task = QueueTask::new(global_data, event_recv, queue); + + let queue_worker_span = info_span!(parent: None, "queue-worker", %queue_name); + tokio::spawn(queue_task.start().instrument(queue_worker_span)); Ok(Method::QueueDeclareOk(QueueDeclareOk { queue: queue_name.to_string(), diff --git a/amqp_messaging/src/queue.rs b/amqp_messaging/src/queue.rs new file mode 100644 index 0000000..4740d2a --- /dev/null +++ b/amqp_messaging/src/queue.rs @@ -0,0 +1,43 @@ +use amqp_core::{ + queue::{Queue, QueueEventReceiver}, + GlobalData, +}; +use tracing::{debug, info}; + +#[derive(Debug)] +#[allow(dead_code)] +pub struct QueueTask { + global_data: GlobalData, + event_recv: QueueEventReceiver, + queue: Queue, +} + +impl QueueTask { + pub fn new(global_data: GlobalData, event_recv: QueueEventReceiver, queue: Queue) -> Self { + Self { + global_data, + event_recv, + queue, + } + } + + pub async fn start(mut self) { + info!("Started queue worker task"); + + loop { + let next_event = self.event_recv.recv().await; + + match next_event { + Some(event) => debug!(?event, "Received event"), + None => { + self.cleanup().await; + return; + } + } + } + } + + async fn cleanup(&mut self) { + // do stuff or something like that id whatever + } +} diff --git a/amqp_transport/src/lib.rs b/amqp_transport/src/lib.rs index 16c2a15..3533bb9 100644 --- a/amqp_transport/src/lib.rs +++ b/amqp_transport/src/lib.rs @@ -13,7 +13,7 @@ mod tests; use crate::connection::TransportConnection; use amqp_core::GlobalData; use tokio::net; -use tracing::{info, info_span, Instrument}; +use tracing::{info, trace_span, Instrument}; pub async fn do_thing_i_guess(global_data: GlobalData) -> anyhow::Result<()> { info!("Binding TCP listener..."); @@ -26,7 +26,7 @@ pub async fn do_thing_i_guess(global_data: GlobalData) -> anyhow::Result<()> { let id = rand::random(); info!(local_addr = ?stream.local_addr(), %id, "Accepted new connection"); - let span = info_span!("client-connection", %id); + let span = trace_span!("client-connection", %id); let (method_send, method_recv) = tokio::sync::mpsc::channel(10);