From 5d127eceee530017af3ce1c0b8002fdf8da55540 Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Fri, 4 Mar 2022 22:35:55 +0100 Subject: [PATCH] fix --- amqp_core/src/connection.rs | 21 +++++++++++-------- amqp_dashboard/src/lib.rs | 29 +++++++++++---------------- amqp_messaging/src/methods/consume.rs | 9 +++------ amqp_messaging/src/methods/mod.rs | 5 ++--- amqp_messaging/src/methods/publish.rs | 17 +++++++--------- amqp_messaging/src/methods/queue.rs | 3 +-- amqp_transport/src/connection.rs | 27 ++++++++++++------------- amqp_transport/src/lib.rs | 4 ++-- 8 files changed, 53 insertions(+), 62 deletions(-) diff --git a/amqp_core/src/connection.rs b/amqp_core/src/connection.rs index d111e84..685542c 100644 --- a/amqp_core/src/connection.rs +++ b/amqp_core/src/connection.rs @@ -1,5 +1,6 @@ use crate::{methods, methods::Method, newtype_id, GlobalData, Queue}; use bytes::Bytes; +use parking_lot::Mutex; use smallvec::SmallVec; use std::{ collections::HashMap, @@ -43,12 +44,14 @@ impl Display for ChannelNum { } } +pub type Connection = Arc; + #[derive(Debug)] -pub struct Connection { +pub struct ConnectionInner { pub id: ConnectionId, pub peer_addr: SocketAddr, pub global_data: GlobalData, - pub channels: HashMap, + pub channels: Mutex>, pub exclusive_queues: Vec, _events: ConEventSender, } @@ -62,19 +65,19 @@ pub enum QueuedMethod { pub type ConEventSender = mpsc::Sender<(ChannelNum, QueuedMethod)>; pub type ConEventReceiver = mpsc::Receiver<(ChannelNum, QueuedMethod)>; -impl Connection { +impl ConnectionInner { #[must_use] pub fn new( id: ConnectionId, peer_addr: SocketAddr, global_data: GlobalData, method_queue: ConEventSender, - ) -> Arc { + ) -> Connection { Arc::new(Self { id, peer_addr, global_data, - channels: HashMap::new(), + channels: Mutex::new(HashMap::new()), exclusive_queues: vec![], _events: method_queue, }) @@ -86,8 +89,10 @@ impl Connection { } } +pub type Channel = Arc; + #[derive(Debug)] -pub struct Channel { +pub struct ChannelInner { pub id: ChannelId, pub num: ChannelNum, pub connection: Connection, @@ -95,7 +100,7 @@ pub struct Channel { method_queue: ConEventSender, } -impl Channel { +impl ChannelInner { #[must_use] pub fn new( id: ChannelId, @@ -103,7 +108,7 @@ impl Channel { connection: Connection, global_data: GlobalData, method_queue: ConEventSender, - ) -> Arc { + ) -> Channel { Arc::new(Self { id, num, diff --git a/amqp_dashboard/src/lib.rs b/amqp_dashboard/src/lib.rs index da5dc3a..fbc3785 100644 --- a/amqp_dashboard/src/lib.rs +++ b/amqp_dashboard/src/lib.rs @@ -82,23 +82,18 @@ async fn get_data(global_data: GlobalData) -> impl IntoResponse { let connections = global_data .connections .values() - .map(|conn| { - let conn = conn.lock(); - Connection { - id: conn.id.to_string(), - peer_addr: conn.peer_addr.to_string(), - channels: conn - .channels - .values() - .map(|chan| { - let chan = chan.lock(); - Channel { - id: chan.id.to_string(), - number: chan.num.num(), - } - }) - .collect(), - } + .map(|conn| Connection { + id: conn.id.to_string(), + peer_addr: conn.peer_addr.to_string(), + channels: conn + .channels + .lock() + .values() + .map(|chan| Channel { + id: chan.id.to_string(), + number: chan.num.num(), + }) + .collect(), }) .collect(); diff --git a/amqp_messaging/src/methods/consume.rs b/amqp_messaging/src/methods/consume.rs index ed8bf1a..7dc0764 100644 --- a/amqp_messaging/src/methods/consume.rs +++ b/amqp_messaging/src/methods/consume.rs @@ -9,7 +9,7 @@ use amqp_core::{ use std::sync::Arc; use tracing::info; -pub fn consume(channel_handle: Channel, basic_consume: BasicConsume) -> Result { +pub fn consume(channel: Channel, basic_consume: BasicConsume) -> Result { let BasicConsume { queue: queue_name, consumer_tag, @@ -24,10 +24,7 @@ pub fn consume(channel_handle: Channel, basic_consume: BasicConsume) -> Result Result, message: Message) { +pub async fn handle_basic_publish(channel_handle: Channel, message: Message) { match publish::publish(channel_handle, message).await { Ok(()) => {} Err(err) => error!(%err, "publish error occurred"), } } -pub async fn handle_method(channel_handle: Arc, method: Method) -> Result { +pub async fn handle_method(channel_handle: Channel, method: Method) -> Result { info!(?method, "Handling method"); let response = match method { diff --git a/amqp_messaging/src/methods/publish.rs b/amqp_messaging/src/methods/publish.rs index 49169bc..301add2 100644 --- a/amqp_messaging/src/methods/publish.rs +++ b/amqp_messaging/src/methods/publish.rs @@ -8,10 +8,10 @@ use amqp_core::{ }; use tracing::info; -pub async fn publish(channel_handle: Arc, message: Message) -> Result<()> { +pub async fn publish(channel_handle: Channel, message: Message) -> Result<()> { info!(?message, "Publishing message"); - let global_data = channel_handle.lock().global_data.clone(); + let global_data = channel_handle.global_data.clone(); let routing = &message.routing; @@ -39,14 +39,11 @@ pub async fn publish(channel_handle: Arc, message: Message) -> Result<( immediate: false, }); - consumer - .channel - .lock() - .queue_method(QueuedMethod::WithContent( - method, - message.header.clone(), - message.content.clone(), - )); + consumer.channel.queue_method(QueuedMethod::WithContent( + method, + message.header.clone(), + message.content.clone(), + )); } } diff --git a/amqp_messaging/src/methods/queue.rs b/amqp_messaging/src/methods/queue.rs index ebe2222..075101e 100644 --- a/amqp_messaging/src/methods/queue.rs +++ b/amqp_messaging/src/methods/queue.rs @@ -9,7 +9,7 @@ use amqp_core::{ use parking_lot::Mutex; use std::sync::{atomic::AtomicUsize, Arc}; -pub fn declare(channel_handle: Channel, queue_declare: QueueDeclare) -> Result { +pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result { let QueueDeclare { queue: queue_name, passive, @@ -34,7 +34,6 @@ pub fn declare(channel_handle: Channel, queue_declare: QueueDeclare) -> Result, + global_chan: Channel, /// The current status of the channel, whether it has sent a method that expects a body status: ChannelStatus, } @@ -62,7 +62,7 @@ pub struct TransportConnection { /// When the next heartbeat expires next_timeout: Pin>, channels: HashMap, - global_con: Arc, + global_con: Connection, global_data: GlobalData, method_queue_send: ConEventSender, @@ -73,7 +73,7 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); enum ChannelStatus { Default, - NeedHeader(ChannelNum, Box), + NeedHeader(u16, Box), NeedsBody(Box, ContentHeader, SmallVec<[Bytes; 1]>), } @@ -87,7 +87,7 @@ impl TransportConnection { pub fn new( id: ConnectionId, stream: TcpStream, - connection_handle: Arc, + global_con: Connection, global_data: GlobalData, method_queue_send: ConEventSender, method_queue_recv: ConEventReceiver, @@ -99,7 +99,7 @@ impl TransportConnection { heartbeat_delay: HEARTBEAT_DELAY, channel_max: CHANNEL_MAX, next_timeout: Box::pin(time::sleep(DEFAULT_TIMEOUT)), - global_con: connection_handle, + global_con, channels: HashMap::with_capacity(4), global_data, method_queue_send, @@ -144,8 +144,7 @@ impl TransportConnection { Err(err) => error!(%err, "Error during processing of connection"), } - let connection_handle = self.global_con.lock(); - connection_handle.close(); + // global connection is closed on drop } pub async fn process_connection(&mut self) -> Result<()> { @@ -485,7 +484,7 @@ impl TransportConnection { async fn channel_open(&mut self, channel_num: ChannelNum) -> Result<()> { let id = rand::random(); - let channel_handle = amqp_core::connection::c::new_handle( + let channel_handle = ChannelInner::new( id, channel_num, self.global_con.clone(), @@ -511,8 +510,8 @@ impl TransportConnection { .connections .get_mut(&self.id) .unwrap() - .lock() .channels + .lock() .insert(channel_num, channel_handle); } @@ -603,13 +602,13 @@ impl TransportConnection { impl Drop for TransportConnection { fn drop(&mut self) { - self.global_con.lock().close(); + self.global_con.close(); } } impl Drop for TransportChannel { fn drop(&mut self) { - self.global_chan.lock().close(); + self.global_chan.close(); } } diff --git a/amqp_transport/src/lib.rs b/amqp_transport/src/lib.rs index f41abd2..cdeefd6 100644 --- a/amqp_transport/src/lib.rs +++ b/amqp_transport/src/lib.rs @@ -3,7 +3,7 @@ mod connection; mod error; mod frame; -pub mod methods; +mod methods; mod sasl; #[cfg(test)] mod tests; @@ -31,7 +31,7 @@ pub async fn do_thing_i_guess(global_data: GlobalData) -> Result<()> { let (method_send, method_recv) = tokio::sync::mpsc::channel(10); - let connection_handle = amqp_core::connection::ConnectionInner::new_handle( + let connection_handle = amqp_core::connection::ConnectionInner::new( id, peer_addr, global_data.clone(),