This commit is contained in:
nora 2022-03-05 17:12:27 +01:00
parent 08ba799d23
commit f860714b2b
9 changed files with 64 additions and 24 deletions

View file

@ -1,4 +1,4 @@
use crate::{methods, methods::Method, newtype_id, GlobalData, Queue}; use crate::{consumer::Consumer, methods, methods::Method, newtype_id, GlobalData, Queue};
use bytes::Bytes; use bytes::Bytes;
use parking_lot::Mutex; use parking_lot::Mutex;
use smallvec::SmallVec; use smallvec::SmallVec;
@ -54,6 +54,7 @@ pub struct ConnectionInner {
pub channels: Mutex<HashMap<ChannelNum, Channel>>, pub channels: Mutex<HashMap<ChannelNum, Channel>>,
pub exclusive_queues: Vec<Queue>, pub exclusive_queues: Vec<Queue>,
pub event_sender: ConEventSender, pub event_sender: ConEventSender,
pub consuming: Mutex<Vec<Consumer>>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -78,15 +79,22 @@ impl ConnectionInner {
id, id,
peer_addr, peer_addr,
global_data, global_data,
channels: Mutex::new(HashMap::new()), channels: Mutex::default(),
exclusive_queues: vec![], exclusive_queues: vec![],
event_sender, event_sender,
consuming: Mutex::default(),
}) })
} }
pub fn close(&self) { pub fn close(&self) {
// todo: make a better system that prevents all leaks
let mut global_data = self.global_data.lock(); let mut global_data = self.global_data.lock();
global_data.connections.remove(&self.id); global_data.connections.remove(&self.id);
self.consuming
.lock()
.iter()
.for_each(|consumer| drop(consumer.queue.consumers.lock().remove(&consumer.id)));
} }
} }

View file

@ -1,12 +1,13 @@
use crate::{newtype_id, Channel}; use crate::{newtype_id, Channel, Queue};
newtype_id!( newtype_id!(
pub ConsumerId pub ConsumerId
); );
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct Consumer { pub struct Consumer {
pub id: ConsumerId, pub id: ConsumerId,
pub tag: String, pub tag: String,
pub channel: Channel, pub channel: Channel,
pub queue: Queue,
} }

View file

@ -1,7 +1,12 @@
use crate::{consumer::Consumer, message::Message, newtype, newtype_id, ChannelId}; use crate::{
consumer::{Consumer, ConsumerId},
message::Message,
newtype, newtype_id, ChannelId,
};
use parking_lot::Mutex; use parking_lot::Mutex;
use std::{ use std::{
borrow::Borrow, borrow::Borrow,
collections::HashMap,
sync::{atomic::AtomicUsize, Arc}, sync::{atomic::AtomicUsize, Arc},
}; };
@ -32,7 +37,7 @@ pub struct RawQueue {
/// The queue can always be manually deleted. /// The queue can always be manually deleted.
/// If auto-delete is enabled, it keeps track of the consumer count. /// If auto-delete is enabled, it keeps track of the consumer count.
pub deletion: QueueDeletion, pub deletion: QueueDeletion,
pub consumers: Mutex<Vec<Consumer>>, pub consumers: Mutex<HashMap<ConsumerId, Consumer>>,
} }
#[derive(Debug)] #[derive(Debug)]

View file

@ -34,18 +34,21 @@ pub fn consume(channel: Channel, basic_consume: BasicConsume) -> Result<Method>
let mut global_data = global_data.lock(); let mut global_data = global_data.lock();
let consumer = Consumer {
id: ConsumerId::random(),
tag: consumer_tag.clone(),
channel: Arc::clone(&channel),
};
let queue = global_data let queue = global_data
.queues .queues
.get_mut(queue_name.as_str()) .get_mut(queue_name.as_str())
.ok_or(ChannelException::NotFound)?; .ok_or(ChannelException::NotFound)?;
queue.consumers.lock().push(consumer); let consumer = Consumer {
id: ConsumerId::random(),
tag: consumer_tag.clone(),
channel: Arc::clone(&channel),
queue: Arc::clone(queue),
};
queue.consumers.lock().insert(consumer.id, consumer.clone());
channel.connection.consuming.lock().push(consumer);
info!(%queue_name, %consumer_tag, "Consumer started consuming"); info!(%queue_name, %consumer_tag, "Consumer started consuming");

View file

@ -2,7 +2,7 @@ use crate::Result;
use amqp_core::{ use amqp_core::{
amqp_todo, amqp_todo,
connection::{Channel, ConnectionEvent}, connection::{Channel, ConnectionEvent},
error::ChannelException, error::{ChannelException, ConException, ProtocolError},
message::Message, message::Message,
methods::{BasicDeliver, Method}, methods::{BasicDeliver, Method},
}; };
@ -30,7 +30,7 @@ pub async fn publish(channel_handle: Channel, message: Message) -> Result<()> {
// todo: we just send it to the consumer directly and ignore it if the consumer doesn't exist // todo: we just send it to the consumer directly and ignore it if the consumer doesn't exist
// consuming is hard, but this should work *for now* // consuming is hard, but this should work *for now*
let consumers = queue.consumers.lock(); let consumers = queue.consumers.lock();
if let Some(consumer) = consumers.first() { if let Some(consumer) = consumers.values().next() {
let method = Box::new(Method::BasicDeliver(BasicDeliver { let method = Box::new(Method::BasicDeliver(BasicDeliver {
consumer_tag: consumer.tag.clone(), consumer_tag: consumer.tag.clone(),
delivery_tag: 0, delivery_tag: 0,
@ -48,7 +48,7 @@ pub async fn publish(channel_handle: Channel, message: Message) -> Result<()> {
message.header.clone(), message.header.clone(),
message.content.clone(), message.content.clone(),
)) ))
.unwrap(); .map_err(|_| ConException::InternalError)?;
} }
} }

View file

@ -53,7 +53,11 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result<Method>
{ {
let mut global_data_lock = global_data.lock(); let mut global_data_lock = global_data.lock();
global_data_lock.queues.insert(queue_name.clone(), queue);
global_data_lock
.queues
.entry(queue_name.clone())
.or_insert(queue);
} }
global_data global_data

View file

@ -144,8 +144,9 @@ mod content_header_write {
Table, Table,
}, },
}; };
use std::io::Write;
pub fn write_content_header(buf: &mut Vec<u8>, header: &ContentHeader) -> Result<()> { pub fn write_content_header<W: Write>(buf: &mut W, header: &ContentHeader) -> Result<()> {
short(&header.class_id, buf)?; short(&header.class_id, buf)?;
short(&header.weight, buf)?; short(&header.weight, buf)?;
longlong(&header.body_size, buf)?; longlong(&header.body_size, buf)?;
@ -153,8 +154,12 @@ mod content_header_write {
write_content_header_props(buf, &header.property_fields) write_content_header_props(buf, &header.property_fields)
} }
pub fn write_content_header_props(buf: &mut Vec<u8>, header: &Table) -> Result<()> { pub fn write_content_header_props<W: Write>(writer: &mut W, header: &Table) -> Result<()> {
let mut flags = 0_u16; let mut flags = 0_u16;
// todo: don't allocate for no reason here
let mut temp_buf = Vec::new();
let buf = &mut temp_buf;
buf.extend_from_slice(&flags.to_be_bytes()); // placeholder buf.extend_from_slice(&flags.to_be_bytes()); // placeholder
if let Some(ShortString(value)) = header.get("content-type") { if let Some(ShortString(value)) = header.get("content-type") {
@ -218,6 +223,8 @@ mod content_header_write {
buf[0] = a; buf[0] = a;
buf[1] = b; buf[1] = b;
writer.write_all(&temp_buf)?;
Ok(()) Ok(())
} }
} }

View file

@ -21,3 +21,8 @@ await channel.sendToQueue('consume-queue-1415', Buffer.from('STOP'));
console.log('Sent STOP message to queue'); console.log('Sent STOP message to queue');
await consumePromise; await consumePromise;
console.log('Received STOP!');
await channel.close();
await connection.close();

View file

@ -26,12 +26,19 @@ function maybeDone() {
for (const success of successes) { for (const success of successes) {
console.log(`✔️ Test ${success} successful`); console.log(`✔️ Test ${success} successful`);
} }
for (const { name, stderr } of failures) { for (const { name, stdout, stderr } of failures) {
console.log( console.log(
`------------------- stderr test ${name} -------------------` `------------------- start stdout test ${name} -------------------`
);
console.log(stdout);
console.log(
`-------------------- end stdout test ${name} --------------------`
);
console.log(
`------------------- start stderr test ${name} -------------------`
); );
console.log(stderr); console.log(stderr);
console.log(`------------------- stderr test ${name} ------------------- console.log(`-------------------- end stderr test ${name} --------------------
Test ${name} failed`); Test ${name} failed`);
} }
@ -42,11 +49,11 @@ function maybeDone() {
} }
function runTest(path, name) { function runTest(path, name) {
childProcess.exec(`node ${path}`, {}, (error, _, stderr) => { childProcess.exec(`node ${path}`, {}, (error, stdout, stderr) => {
if (!error) { if (!error) {
successes.push(name); successes.push(name);
} else { } else {
failures.push({ name, stderr }); failures.push({ name, stdout, stderr });
} }
done += 1; done += 1;
maybeDone(); maybeDone();