queue worker task

This commit is contained in:
nora 2022-03-05 18:10:04 +01:00
parent f860714b2b
commit 800ccae604
8 changed files with 111 additions and 36 deletions

View file

@ -14,14 +14,24 @@ use crate::{
};
use connection::{ChannelId, ConnectionId};
use parking_lot::Mutex;
use std::{collections::HashMap, sync::Arc};
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
sync::Arc,
};
use uuid::Uuid;
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct GlobalData {
inner: Arc<Mutex<GlobalDataInner>>,
}
impl Debug for GlobalData {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("[global data]")
}
}
impl Default for GlobalData {
fn default() -> Self {
Self {

View file

@ -14,7 +14,7 @@ macro_rules! newtype_id {
impl ::std::fmt::Display for $name {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
self.0.fmt(f)
::std::fmt::Display::fmt(&self.0, f)
}
}

View file

@ -7,10 +7,18 @@ use parking_lot::Mutex;
use std::{
borrow::Borrow,
collections::HashMap,
fmt::{Debug, Display, Formatter},
sync::{atomic::AtomicUsize, Arc},
};
use tokio::sync::mpsc;
pub type Queue = Arc<RawQueue>;
pub type Queue = Arc<QueueInner>;
#[derive(Debug)]
pub enum QueueEvent {}
pub type QueueEventSender = mpsc::Sender<QueueEvent>;
pub type QueueEventReceiver = mpsc::Receiver<QueueEvent>;
newtype_id!(pub QueueId);
@ -26,8 +34,14 @@ impl Borrow<str> for QueueName {
}
}
impl Display for QueueName {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.0, f)
}
}
#[derive(Debug)]
pub struct RawQueue {
pub struct QueueInner {
pub id: QueueId,
pub name: QueueName,
pub messages: Mutex<Vec<Message>>, // use a concurrent linked list???
@ -38,6 +52,7 @@ pub struct RawQueue {
/// If auto-delete is enabled, it keeps track of the consumer count.
pub deletion: QueueDeletion,
pub consumers: Mutex<HashMap<ConsumerId, Consumer>>,
pub event_send: QueueEventSender,
}
#[derive(Debug)]

View file

@ -3,5 +3,6 @@
use amqp_core::error::ProtocolError;
pub mod methods;
mod queue;
type Result<T> = std::result::Result<T, ProtocolError>;

View file

@ -2,7 +2,7 @@ use crate::Result;
use amqp_core::{
amqp_todo,
connection::{Channel, ConnectionEvent},
error::{ChannelException, ConException, ProtocolError},
error::{ChannelException, ConException},
message::Message,
methods::{BasicDeliver, Method},
};

View file

@ -1,13 +1,15 @@
use crate::Result;
use crate::{queue::QueueTask, Result};
use amqp_core::{
amqp_todo,
connection::Channel,
methods::{Method, QueueBind, QueueDeclare, QueueDeclareOk},
queue::{QueueDeletion, QueueId, QueueName, RawQueue},
queue::{QueueDeletion, QueueId, QueueInner, QueueName},
GlobalData,
};
use parking_lot::Mutex;
use std::sync::{atomic::AtomicUsize, Arc};
use tokio::sync::mpsc;
use tracing::{info_span, Instrument};
pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result<Method> {
let QueueDeclare {
@ -33,37 +35,41 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result<Method>
amqp_todo!();
}
let global_data = {
let global_data = channel.global_data.clone();
let global_data = channel.global_data.clone();
let id = QueueId::random();
let queue = Arc::new(RawQueue {
id,
name: queue_name.clone(),
messages: Mutex::default(),
durable,
exclusive: exclusive.then(|| channel.id),
deletion: if auto_delete {
QueueDeletion::Auto(AtomicUsize::default())
} else {
QueueDeletion::Manual
},
consumers: Mutex::default(),
});
let (event_send, event_recv) = mpsc::channel(10);
{
let mut global_data_lock = global_data.lock();
let id = QueueId::random();
let queue = Arc::new(QueueInner {
id,
name: queue_name.clone(),
messages: Mutex::default(),
durable,
exclusive: exclusive.then(|| channel.id),
deletion: if auto_delete {
QueueDeletion::Auto(AtomicUsize::default())
} else {
QueueDeletion::Manual
},
consumers: Mutex::default(),
event_send,
});
global_data_lock
.queues
.entry(queue_name.clone())
.or_insert(queue);
}
{
let mut global_data_lock = global_data.lock();
global_data
};
global_data_lock
.queues
.entry(queue_name.clone())
.or_insert_with(|| queue.clone());
}
bind_queue(global_data, (), queue_name.clone().into_inner())?;
bind_queue(global_data.clone(), (), queue_name.clone().into_inner())?;
let queue_task = QueueTask::new(global_data, event_recv, queue);
let queue_worker_span = info_span!(parent: None, "queue-worker", %queue_name);
tokio::spawn(queue_task.start().instrument(queue_worker_span));
Ok(Method::QueueDeclareOk(QueueDeclareOk {
queue: queue_name.to_string(),

View file

@ -0,0 +1,43 @@
use amqp_core::{
queue::{Queue, QueueEventReceiver},
GlobalData,
};
use tracing::{debug, info};
#[derive(Debug)]
#[allow(dead_code)]
pub struct QueueTask {
global_data: GlobalData,
event_recv: QueueEventReceiver,
queue: Queue,
}
impl QueueTask {
pub fn new(global_data: GlobalData, event_recv: QueueEventReceiver, queue: Queue) -> Self {
Self {
global_data,
event_recv,
queue,
}
}
pub async fn start(mut self) {
info!("Started queue worker task");
loop {
let next_event = self.event_recv.recv().await;
match next_event {
Some(event) => debug!(?event, "Received event"),
None => {
self.cleanup().await;
return;
}
}
}
}
async fn cleanup(&mut self) {
// do stuff or something like that id whatever
}
}

View file

@ -13,7 +13,7 @@ mod tests;
use crate::connection::TransportConnection;
use amqp_core::GlobalData;
use tokio::net;
use tracing::{info, info_span, Instrument};
use tracing::{info, trace_span, Instrument};
pub async fn do_thing_i_guess(global_data: GlobalData) -> anyhow::Result<()> {
info!("Binding TCP listener...");
@ -26,7 +26,7 @@ pub async fn do_thing_i_guess(global_data: GlobalData) -> anyhow::Result<()> {
let id = rand::random();
info!(local_addr = ?stream.local_addr(), %id, "Accepted new connection");
let span = info_span!("client-connection", %id);
let span = trace_span!("client-connection", %id);
let (method_send, method_recv) = tokio::sync::mpsc::channel(10);