From d90d61504cc80b1c09a1310948d4150e21563d21 Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Sun, 27 Feb 2022 18:41:14 +0100 Subject: [PATCH] even more queuing around --- amqp_dashboard/assets/index.html | 2 ++ amqp_dashboard/assets/script.js | 13 +++++++++++++ amqp_messaging/src/methods/consume.rs | 4 +++- amqp_messaging/src/methods/queue.rs | 10 +++++----- test-js/src/declare-queue.js | 2 +- 5 files changed, 24 insertions(+), 7 deletions(-) diff --git a/amqp_dashboard/assets/index.html b/amqp_dashboard/assets/index.html index 1b3a744..891ea73 100644 --- a/amqp_dashboard/assets/index.html +++ b/amqp_dashboard/assets/index.html @@ -14,6 +14,8 @@

AMQP Data

Connections

+

Queues

+
diff --git a/amqp_dashboard/assets/script.js b/amqp_dashboard/assets/script.js index 40ce29b..7b96d32 100644 --- a/amqp_dashboard/assets/script.js +++ b/amqp_dashboard/assets/script.js @@ -39,10 +39,23 @@ const renderConnections = (connections) => { wrapper.replaceChildren(table); }; +const renderQueues = (queues) => { + const wrapper = document.getElementById('queue-wrapper'); + + const table = renderTable( + ['Queue ID', 'Name', 'Durable'], + queues.map((queue) => { + return [queue.id, queue.name, queue.durable ? 'Yes' : 'No']; + }) + ); + wrapper.replaceChildren(table); +}; + const refresh = async () => { const fetched = await fetch('api/data'); const data = await fetched.json(); renderConnections(data.connections); + renderQueues(data.queues); }; setInterval(refresh, 1000); diff --git a/amqp_messaging/src/methods/consume.rs b/amqp_messaging/src/methods/consume.rs index b7fb1bc..79b4eaf 100644 --- a/amqp_messaging/src/methods/consume.rs +++ b/amqp_messaging/src/methods/consume.rs @@ -4,8 +4,10 @@ use amqp_core::error::ProtocolError; use amqp_core::methods::{BasicConsume, Method}; pub async fn consume( - _channel_handle: ChannelHandle, + channel_handle: ChannelHandle, _basic_consume: BasicConsume, ) -> Result { + let _channel = channel_handle.lock(); + amqp_todo!() } diff --git a/amqp_messaging/src/methods/queue.rs b/amqp_messaging/src/methods/queue.rs index 903a330..9b75a36 100644 --- a/amqp_messaging/src/methods/queue.rs +++ b/amqp_messaging/src/methods/queue.rs @@ -25,17 +25,17 @@ pub async fn declare( let queue_name = QueueName::new(queue_name.into()); if !arguments.is_empty() { - return Err(ConException::Todo.into()); + amqp_todo!(); + } + + if passive || no_wait || durable { + amqp_todo!(); } let global_data = { let channel = channel_handle.lock(); let global_data = channel.global_data.clone(); - if passive || no_wait { - amqp_todo!(); - } - let id = QueueId::random(); let queue = Arc::new(RawQueue { id, diff --git a/test-js/src/declare-queue.js b/test-js/src/declare-queue.js index 9a5f979..b81d670 100644 --- a/test-js/src/declare-queue.js +++ b/test-js/src/declare-queue.js @@ -6,7 +6,7 @@ const connection = await connectAmqp(); const channel = await connection.createChannel(); -const reply = await channel.assertQueue(queueName, { durable: true }); +const reply = await channel.assertQueue(queueName, { durable: false }); assert(reply.messageCount === 0, 'Message found in queue'); assert(reply.consumerCount === 0, 'Consumer listening on queue');