From f860714b2ba6de4adab10f4ae2664983d155f104 Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Sat, 5 Mar 2022 17:12:27 +0100 Subject: [PATCH] more --- amqp_core/src/connection.rs | 12 ++++++++++-- amqp_core/src/consumer.rs | 5 +++-- amqp_core/src/queue.rs | 9 +++++++-- amqp_messaging/src/methods/consume.rs | 17 ++++++++++------- amqp_messaging/src/methods/publish.rs | 6 +++--- amqp_messaging/src/methods/queue.rs | 6 +++++- amqp_transport/src/frame.rs | 11 +++++++++-- test-js/src/consume-message.js | 5 +++++ test-js/test-all.js | 17 ++++++++++++----- 9 files changed, 64 insertions(+), 24 deletions(-) diff --git a/amqp_core/src/connection.rs b/amqp_core/src/connection.rs index 4055724..e0116bd 100644 --- a/amqp_core/src/connection.rs +++ b/amqp_core/src/connection.rs @@ -1,4 +1,4 @@ -use crate::{methods, methods::Method, newtype_id, GlobalData, Queue}; +use crate::{consumer::Consumer, methods, methods::Method, newtype_id, GlobalData, Queue}; use bytes::Bytes; use parking_lot::Mutex; use smallvec::SmallVec; @@ -54,6 +54,7 @@ pub struct ConnectionInner { pub channels: Mutex>, pub exclusive_queues: Vec, pub event_sender: ConEventSender, + pub consuming: Mutex>, } #[derive(Debug)] @@ -78,15 +79,22 @@ impl ConnectionInner { id, peer_addr, global_data, - channels: Mutex::new(HashMap::new()), + channels: Mutex::default(), exclusive_queues: vec![], event_sender, + consuming: Mutex::default(), }) } pub fn close(&self) { + // todo: make a better system that prevents all leaks + let mut global_data = self.global_data.lock(); global_data.connections.remove(&self.id); + self.consuming + .lock() + .iter() + .for_each(|consumer| drop(consumer.queue.consumers.lock().remove(&consumer.id))); } } diff --git a/amqp_core/src/consumer.rs b/amqp_core/src/consumer.rs index f729b20..1e645ca 100644 --- a/amqp_core/src/consumer.rs +++ b/amqp_core/src/consumer.rs @@ -1,12 +1,13 @@ -use crate::{newtype_id, Channel}; +use crate::{newtype_id, Channel, Queue}; newtype_id!( pub ConsumerId ); -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Consumer { pub id: ConsumerId, pub tag: String, pub channel: Channel, + pub queue: Queue, } diff --git a/amqp_core/src/queue.rs b/amqp_core/src/queue.rs index 31d0b1c..77f507f 100644 --- a/amqp_core/src/queue.rs +++ b/amqp_core/src/queue.rs @@ -1,7 +1,12 @@ -use crate::{consumer::Consumer, message::Message, newtype, newtype_id, ChannelId}; +use crate::{ + consumer::{Consumer, ConsumerId}, + message::Message, + newtype, newtype_id, ChannelId, +}; use parking_lot::Mutex; use std::{ borrow::Borrow, + collections::HashMap, sync::{atomic::AtomicUsize, Arc}, }; @@ -32,7 +37,7 @@ pub struct RawQueue { /// The queue can always be manually deleted. /// If auto-delete is enabled, it keeps track of the consumer count. pub deletion: QueueDeletion, - pub consumers: Mutex>, + pub consumers: Mutex>, } #[derive(Debug)] diff --git a/amqp_messaging/src/methods/consume.rs b/amqp_messaging/src/methods/consume.rs index 7dc0764..0753b49 100644 --- a/amqp_messaging/src/methods/consume.rs +++ b/amqp_messaging/src/methods/consume.rs @@ -34,18 +34,21 @@ pub fn consume(channel: Channel, basic_consume: BasicConsume) -> Result let mut global_data = global_data.lock(); - let consumer = Consumer { - id: ConsumerId::random(), - tag: consumer_tag.clone(), - channel: Arc::clone(&channel), - }; - let queue = global_data .queues .get_mut(queue_name.as_str()) .ok_or(ChannelException::NotFound)?; - queue.consumers.lock().push(consumer); + let consumer = Consumer { + id: ConsumerId::random(), + tag: consumer_tag.clone(), + channel: Arc::clone(&channel), + queue: Arc::clone(queue), + }; + + queue.consumers.lock().insert(consumer.id, consumer.clone()); + + channel.connection.consuming.lock().push(consumer); info!(%queue_name, %consumer_tag, "Consumer started consuming"); diff --git a/amqp_messaging/src/methods/publish.rs b/amqp_messaging/src/methods/publish.rs index a0ca73f..79ec6a4 100644 --- a/amqp_messaging/src/methods/publish.rs +++ b/amqp_messaging/src/methods/publish.rs @@ -2,7 +2,7 @@ use crate::Result; use amqp_core::{ amqp_todo, connection::{Channel, ConnectionEvent}, - error::ChannelException, + error::{ChannelException, ConException, ProtocolError}, message::Message, methods::{BasicDeliver, Method}, }; @@ -30,7 +30,7 @@ pub async fn publish(channel_handle: Channel, message: Message) -> Result<()> { // todo: we just send it to the consumer directly and ignore it if the consumer doesn't exist // consuming is hard, but this should work *for now* let consumers = queue.consumers.lock(); - if let Some(consumer) = consumers.first() { + if let Some(consumer) = consumers.values().next() { let method = Box::new(Method::BasicDeliver(BasicDeliver { consumer_tag: consumer.tag.clone(), delivery_tag: 0, @@ -48,7 +48,7 @@ pub async fn publish(channel_handle: Channel, message: Message) -> Result<()> { message.header.clone(), message.content.clone(), )) - .unwrap(); + .map_err(|_| ConException::InternalError)?; } } diff --git a/amqp_messaging/src/methods/queue.rs b/amqp_messaging/src/methods/queue.rs index 075101e..e0c03da 100644 --- a/amqp_messaging/src/methods/queue.rs +++ b/amqp_messaging/src/methods/queue.rs @@ -53,7 +53,11 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result { let mut global_data_lock = global_data.lock(); - global_data_lock.queues.insert(queue_name.clone(), queue); + + global_data_lock + .queues + .entry(queue_name.clone()) + .or_insert(queue); } global_data diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs index 4df5c29..f6dd441 100644 --- a/amqp_transport/src/frame.rs +++ b/amqp_transport/src/frame.rs @@ -144,8 +144,9 @@ mod content_header_write { Table, }, }; + use std::io::Write; - pub fn write_content_header(buf: &mut Vec, header: &ContentHeader) -> Result<()> { + pub fn write_content_header(buf: &mut W, header: &ContentHeader) -> Result<()> { short(&header.class_id, buf)?; short(&header.weight, buf)?; longlong(&header.body_size, buf)?; @@ -153,8 +154,12 @@ mod content_header_write { write_content_header_props(buf, &header.property_fields) } - pub fn write_content_header_props(buf: &mut Vec, header: &Table) -> Result<()> { + pub fn write_content_header_props(writer: &mut W, header: &Table) -> Result<()> { let mut flags = 0_u16; + // todo: don't allocate for no reason here + let mut temp_buf = Vec::new(); + let buf = &mut temp_buf; + buf.extend_from_slice(&flags.to_be_bytes()); // placeholder if let Some(ShortString(value)) = header.get("content-type") { @@ -218,6 +223,8 @@ mod content_header_write { buf[0] = a; buf[1] = b; + writer.write_all(&temp_buf)?; + Ok(()) } } diff --git a/test-js/src/consume-message.js b/test-js/src/consume-message.js index 03bf749..d53ebb1 100644 --- a/test-js/src/consume-message.js +++ b/test-js/src/consume-message.js @@ -21,3 +21,8 @@ await channel.sendToQueue('consume-queue-1415', Buffer.from('STOP')); console.log('Sent STOP message to queue'); await consumePromise; + +console.log('Received STOP!'); + +await channel.close(); +await connection.close(); diff --git a/test-js/test-all.js b/test-js/test-all.js index 1e7343e..5ab593a 100644 --- a/test-js/test-all.js +++ b/test-js/test-all.js @@ -26,12 +26,19 @@ function maybeDone() { for (const success of successes) { console.log(`✔️ Test ${success} successful`); } - for (const { name, stderr } of failures) { + for (const { name, stdout, stderr } of failures) { console.log( - `------------------- stderr test ${name} -------------------` + `------------------- start stdout test ${name} -------------------` + ); + console.log(stdout); + console.log( + `-------------------- end stdout test ${name} --------------------` + ); + console.log( + `------------------- start stderr test ${name} -------------------` ); console.log(stderr); - console.log(`------------------- stderr test ${name} ------------------- + console.log(`-------------------- end stderr test ${name} -------------------- ❌ Test ${name} failed`); } @@ -42,11 +49,11 @@ function maybeDone() { } function runTest(path, name) { - childProcess.exec(`node ${path}`, {}, (error, _, stderr) => { + childProcess.exec(`node ${path}`, {}, (error, stdout, stderr) => { if (!error) { successes.push(name); } else { - failures.push({ name, stderr }); + failures.push({ name, stdout, stderr }); } done += 1; maybeDone();