From 770762b920bf2ca782bd747ef3f2e617bf4cbd18 Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Mon, 7 Mar 2022 16:33:52 +0100 Subject: [PATCH] queuing things --- Cargo.lock | 6 ++ Cargo.toml | 10 +++- amqp_core/Cargo.toml | 1 + amqp_core/src/queue.rs | 2 +- .../frontend/src/components/data-page.tsx | 3 +- amqp_dashboard/frontend/src/types.ts | 1 + amqp_dashboard/src/lib.rs | 2 + amqp_datastructure/Cargo.toml | 8 +++ amqp_datastructure/src/lib.rs | 3 + amqp_datastructure/src/message_queue.rs | 57 +++++++++++++++++++ amqp_messaging/Cargo.toml | 1 + amqp_messaging/src/methods/mod.rs | 9 +-- amqp_messaging/src/methods/queue.rs | 2 +- amqp_messaging/src/queue_worker.rs | 4 +- amqp_transport/src/connection.rs | 2 +- test-js/src/send-single-message.js | 4 +- 16 files changed, 102 insertions(+), 13 deletions(-) create mode 100644 amqp_datastructure/Cargo.toml create mode 100644 amqp_datastructure/src/lib.rs create mode 100644 amqp_datastructure/src/message_queue.rs diff --git a/Cargo.lock b/Cargo.lock index b027b17..b996d31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,6 +36,7 @@ dependencies = [ name = "amqp_core" version = "0.1.0" dependencies = [ + "amqp_datastructure", "bytes", "parking_lot", "rand", @@ -62,11 +63,16 @@ dependencies = [ "zip", ] +[[package]] +name = "amqp_datastructure" +version = "0.1.0" + [[package]] name = "amqp_messaging" version = "0.1.0" dependencies = [ "amqp_core", + "amqp_datastructure", "parking_lot", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 020fe92..a98d56b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,13 @@ [workspace] -members = [".", "amqp_core", "amqp_dashboard", "amqp_messaging", "amqp_transport","xtask"] +members = [ + ".", + "amqp_core", + "amqp_dashboard", + "amqp_datastructure", + "amqp_messaging", + "amqp_transport", + "xtask", +] [package] name = "amqp" diff --git a/amqp_core/Cargo.toml b/amqp_core/Cargo.toml index caff284..772bd12 100644 --- a/amqp_core/Cargo.toml +++ b/amqp_core/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +amqp_datastructure = { path = "../amqp_datastructure" } bytes = "1.1.0" parking_lot = "0.12.0" rand = "0.8.5" diff --git a/amqp_core/src/queue.rs b/amqp_core/src/queue.rs index 24ecb52..62a7eac 100644 --- a/amqp_core/src/queue.rs +++ b/amqp_core/src/queue.rs @@ -47,7 +47,7 @@ impl Display for QueueName { pub struct QueueInner { pub id: QueueId, pub name: QueueName, - pub messages: Mutex>, // use a concurrent linked list??? + pub messages: amqp_datastructure::MessageQueue, pub durable: bool, pub exclusive: Option, /// Whether the queue will automatically be deleted when no consumers uses it anymore. diff --git a/amqp_dashboard/frontend/src/components/data-page.tsx b/amqp_dashboard/frontend/src/components/data-page.tsx index 6d830da..e98fb2e 100644 --- a/amqp_dashboard/frontend/src/components/data-page.tsx +++ b/amqp_dashboard/frontend/src/components/data-page.tsx @@ -46,11 +46,12 @@ const DataPage: FC = ({ prefix }) => {

Queues

{data ? ( [ queue.id, queue.name, queue.durable ? 'Yes' : 'No', + queue.messages, ])} /> ) : ( diff --git a/amqp_dashboard/frontend/src/types.ts b/amqp_dashboard/frontend/src/types.ts index b052d40..7c59fbc 100644 --- a/amqp_dashboard/frontend/src/types.ts +++ b/amqp_dashboard/frontend/src/types.ts @@ -13,6 +13,7 @@ export type Queue = { id: string; name: string; durable: boolean; + messages: number; }; export type Data = { diff --git a/amqp_dashboard/src/lib.rs b/amqp_dashboard/src/lib.rs index 600a8c9..f71cc6c 100644 --- a/amqp_dashboard/src/lib.rs +++ b/amqp_dashboard/src/lib.rs @@ -76,6 +76,7 @@ struct Queue { id: String, name: String, durable: bool, + messages: usize, } async fn get_data(global_data: GlobalData) -> impl IntoResponse { @@ -106,6 +107,7 @@ async fn get_data(global_data: GlobalData) -> impl IntoResponse { id: queue.id.to_string(), name: queue.name.to_string(), durable: queue.durable, + messages: queue.messages.len(), }) .collect(); diff --git a/amqp_datastructure/Cargo.toml b/amqp_datastructure/Cargo.toml new file mode 100644 index 0000000..0cd3b52 --- /dev/null +++ b/amqp_datastructure/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "amqp_datastructure" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/amqp_datastructure/src/lib.rs b/amqp_datastructure/src/lib.rs new file mode 100644 index 0000000..f6171dd --- /dev/null +++ b/amqp_datastructure/src/lib.rs @@ -0,0 +1,3 @@ +mod message_queue; + +pub use message_queue::MessageQueue; diff --git a/amqp_datastructure/src/message_queue.rs b/amqp_datastructure/src/message_queue.rs new file mode 100644 index 0000000..a052dfe --- /dev/null +++ b/amqp_datastructure/src/message_queue.rs @@ -0,0 +1,57 @@ +// using std::sync::Mutex because it's only temporary anyways +use std::{ + collections::VecDeque, + fmt::{Debug, Formatter}, + sync::Mutex, +}; + +/// The data structure behind the message queue. +/// +/// Needs to support: +/// * concurrent access +/// * priority +/// +/// Currently supports +/// * mutex lol +// todo: see above +pub struct MessageQueue { + deque: Mutex>, +} + +impl MessageQueue { + pub fn new() -> Self { + Self { + deque: Mutex::default(), + } + } + + pub fn append(&self, message: T) { + let mut lock = self.deque.lock().unwrap(); + lock.push_back(message); + } + + pub fn try_get(&self) -> Option { + let mut lock = self.deque.lock().unwrap(); + lock.pop_front() + } + + pub fn len(&self) -> usize { + self.deque.lock().unwrap().len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl Default for MessageQueue { + fn default() -> Self { + Self::new() + } +} + +impl Debug for MessageQueue { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MessageQueue").finish_non_exhaustive() + } +} diff --git a/amqp_messaging/Cargo.toml b/amqp_messaging/Cargo.toml index d0f8199..ee871e3 100644 --- a/amqp_messaging/Cargo.toml +++ b/amqp_messaging/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] amqp_core = { path = "../amqp_core" } +amqp_datastructure = { path = "../amqp_datastructure" } parking_lot = "0.12.0" tracing = "0.1.31" tokio = { version = "1.17.0", features = ["full"] } diff --git a/amqp_messaging/src/methods/mod.rs b/amqp_messaging/src/methods/mod.rs index 008c04f..158ee5a 100644 --- a/amqp_messaging/src/methods/mod.rs +++ b/amqp_messaging/src/methods/mod.rs @@ -4,13 +4,10 @@ mod queue; use crate::Result; use amqp_core::{amqp_todo, connection::Channel, message::Message, methods::Method}; -use tracing::{error, info}; +use tracing::info; -pub fn handle_basic_publish(channel_handle: Channel, message: Message) { - match publish::publish(channel_handle, message) { - Ok(()) => {} - Err(err) => error!(%err, "publish error occurred"), - } +pub fn handle_basic_publish(channel_handle: Channel, message: Message) -> Result<()> { + publish::publish(channel_handle, message) } pub async fn handle_method(channel_handle: Channel, method: Method) -> Result { diff --git a/amqp_messaging/src/methods/queue.rs b/amqp_messaging/src/methods/queue.rs index 5744bdd..6247725 100644 --- a/amqp_messaging/src/methods/queue.rs +++ b/amqp_messaging/src/methods/queue.rs @@ -42,7 +42,7 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result let queue = Arc::new(QueueInner { id, name: queue_name.clone(), - messages: Mutex::default(), + messages: amqp_datastructure::MessageQueue::new(), durable, exclusive: exclusive.then(|| channel.id), deletion: if auto_delete { diff --git a/amqp_messaging/src/queue_worker.rs b/amqp_messaging/src/queue_worker.rs index 7a919c0..d76f463 100644 --- a/amqp_messaging/src/queue_worker.rs +++ b/amqp_messaging/src/queue_worker.rs @@ -94,7 +94,9 @@ impl QueueTask { } #[tracing::instrument(skip(self), fields(name = self.show_name()), level = "trace")] - async fn queue_message(&mut self, _message: Message) {} + async fn queue_message(&mut self, message: Message) { + self.queue.messages.append(message); + } async fn cleanup(&mut self) { // do stuff or something like that id whatever diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 72a5621..f35fc63 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -498,7 +498,7 @@ impl TransportConnection { let channel = self.channels.get(&channel).ok_or(ConException::Todo)?; - amqp_messaging::methods::handle_basic_publish(channel.global_chan.clone(), message); + amqp_messaging::methods::handle_basic_publish(channel.global_chan.clone(), message)?; Ok(()) } else { Err(ConException::Todo.into()) diff --git a/test-js/src/send-single-message.js b/test-js/src/send-single-message.js index fb6f2cb..3799e2e 100644 --- a/test-js/src/send-single-message.js +++ b/test-js/src/send-single-message.js @@ -3,7 +3,9 @@ import { connectAmqp } from './utils/utils.js'; const connection = await connectAmqp(); const channel = await connection.createChannel(); -channel.publish('exchange-1', 'queue-1', Buffer.from('hello')); +await channel.assertQueue('send-queue-352'); + +channel.publish('', 'send-queue-352', Buffer.from('hello')); console.log('Published message');