From 08fa9163b838323d9dcb9f5793fec488a4c271fa Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Mon, 7 Mar 2022 15:15:47 +0100 Subject: [PATCH] improve tracing --- Cargo.lock | 14 ++++ Cargo.toml | 3 + amqp_core/src/message.rs | 4 +- amqp_core/src/queue.rs | 5 +- amqp_messaging/src/lib.rs | 3 +- amqp_messaging/src/methods/mod.rs | 4 +- amqp_messaging/src/methods/publish.rs | 44 +++------- amqp_messaging/src/methods/queue.rs | 6 +- amqp_messaging/src/queue.rs | 43 ---------- amqp_messaging/src/queue_worker.rs | 102 ++++++++++++++++++++++ amqp_transport/src/connection.rs | 17 ++-- amqp_transport/src/frame.rs | 3 +- amqp_transport/src/lib.rs | 116 ++++++++++++++++++-------- src/main.rs | 59 +++++++++---- 14 files changed, 273 insertions(+), 150 deletions(-) delete mode 100644 amqp_messaging/src/queue.rs create mode 100644 amqp_messaging/src/queue_worker.rs diff --git a/Cargo.lock b/Cargo.lock index 595de17..b027b17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -29,6 +29,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "tracing-tree", ] [[package]] @@ -1439,6 +1440,19 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tracing-tree" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ce989c9962c7f61fe084dd4a230eec784649dfc2392467c790007c3a6e134e7" +dependencies = [ + "ansi_term", + "atty", + "tracing-core", + "tracing-log", + "tracing-subscriber", +] + [[package]] name = "try-lock" version = "0.2.3" diff --git a/Cargo.toml b/Cargo.toml index a7ed71d..020fe92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,6 @@ clap = { version = "3.1.5", features = ["derive"] } tokio = { version = "1.16.1", features = ["full"] } tracing = "0.1.30" tracing-subscriber = { version = "0.3.8", features = ["env-filter"] } +tracing-tree = "0.2.0" + +[features] diff --git a/amqp_core/src/message.rs b/amqp_core/src/message.rs index 675eb74..4bb831e 100644 --- a/amqp_core/src/message.rs +++ b/amqp_core/src/message.rs @@ -3,12 +3,12 @@ use bytes::Bytes; use smallvec::SmallVec; use std::sync::Arc; -pub type Message = Arc; +pub type Message = Arc; newtype_id!(pub MessageId); #[derive(Debug)] -pub struct RawMessage { +pub struct MessageInner { pub id: MessageId, pub header: ContentHeader, pub routing: RoutingInformation, diff --git a/amqp_core/src/queue.rs b/amqp_core/src/queue.rs index a3a6202..24ecb52 100644 --- a/amqp_core/src/queue.rs +++ b/amqp_core/src/queue.rs @@ -15,7 +15,10 @@ use tokio::sync::mpsc; pub type Queue = Arc; #[derive(Debug)] -pub enum QueueEvent {} +pub enum QueueEvent { + PublishMessage(Message), + Shutdown, +} pub type QueueEventSender = mpsc::Sender; pub type QueueEventReceiver = mpsc::Receiver; diff --git a/amqp_messaging/src/lib.rs b/amqp_messaging/src/lib.rs index 426495f..23b24ab 100644 --- a/amqp_messaging/src/lib.rs +++ b/amqp_messaging/src/lib.rs @@ -1,8 +1,9 @@ #![warn(rust_2018_idioms)] +#![deny(clippy::future_not_send)] use amqp_core::error::ProtocolError; pub mod methods; -mod queue; +mod queue_worker; type Result = std::result::Result; diff --git a/amqp_messaging/src/methods/mod.rs b/amqp_messaging/src/methods/mod.rs index 0b8a2a2..008c04f 100644 --- a/amqp_messaging/src/methods/mod.rs +++ b/amqp_messaging/src/methods/mod.rs @@ -6,8 +6,8 @@ use crate::Result; use amqp_core::{amqp_todo, connection::Channel, message::Message, methods::Method}; use tracing::{error, info}; -pub async fn handle_basic_publish(channel_handle: Channel, message: Message) { - match publish::publish(channel_handle, message).await { +pub fn handle_basic_publish(channel_handle: Channel, message: Message) { + match publish::publish(channel_handle, message) { Ok(()) => {} Err(err) => error!(%err, "publish error occurred"), } diff --git a/amqp_messaging/src/methods/publish.rs b/amqp_messaging/src/methods/publish.rs index 0efeb11..a029021 100644 --- a/amqp_messaging/src/methods/publish.rs +++ b/amqp_messaging/src/methods/publish.rs @@ -1,14 +1,14 @@ use crate::Result; use amqp_core::{ amqp_todo, - connection::{Channel, ConnectionEvent}, + connection::Channel, error::{ChannelException, ConException}, message::Message, - methods::{BasicDeliver, Method}, + queue::QueueEvent, }; -use tracing::debug; +use tracing::{debug, error}; -pub async fn publish(channel_handle: Channel, message: Message) -> Result<()> { +pub fn publish(channel_handle: Channel, message: Message) -> Result<()> { debug!(?message, "Publishing message"); let global_data = channel_handle.global_data.clone(); @@ -19,38 +19,20 @@ pub async fn publish(channel_handle: Channel, message: Message) -> Result<()> { amqp_todo!(); } - let mut global_data = global_data.lock(); + let global_data = global_data.lock(); let queue = global_data .queues - .get_mut(routing.routing_key.as_str()) + .get(routing.routing_key.as_str()) .ok_or(ChannelException::NotFound)?; - { - // todo: we just send it to the consumer directly and ignore it if the consumer doesn't exist - // consuming is hard, but this should work *for now* - let consumers = queue.consumers.lock(); - if let Some(consumer) = consumers.values().next() { - let method = Box::new(Method::BasicDeliver(BasicDeliver { - consumer_tag: consumer.tag.clone(), - delivery_tag: 0, - redelivered: false, - exchange: routing.exchange.clone(), - routing_key: routing.routing_key.clone(), - })); - - consumer - .channel - .event_sender - .try_send(ConnectionEvent::MethodContent( - consumer.channel.num, - method, - message.header.clone(), - message.content.clone(), - )) - .map_err(|_| ConException::InternalError)?; - } - } + queue + .event_send + .try_send(QueueEvent::PublishMessage(message)) + .map_err(|err| { + error!(?err, "Failed to send message to queue event queue"); + ConException::InternalError + })?; Ok(()) } diff --git a/amqp_messaging/src/methods/queue.rs b/amqp_messaging/src/methods/queue.rs index 8cc9e4b..5744bdd 100644 --- a/amqp_messaging/src/methods/queue.rs +++ b/amqp_messaging/src/methods/queue.rs @@ -1,4 +1,4 @@ -use crate::{queue::QueueTask, Result}; +use crate::{queue_worker::QueueTask, Result}; use amqp_core::{ amqp_todo, connection::Channel, @@ -9,7 +9,6 @@ use amqp_core::{ use parking_lot::Mutex; use std::sync::{atomic::AtomicUsize, Arc}; use tokio::sync::mpsc; -use tracing::{info_span, Instrument}; pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result { let QueueDeclare { @@ -68,8 +67,7 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result let queue_task = QueueTask::new(global_data, event_recv, queue); - let queue_worker_span = info_span!(parent: None, "queue-worker", %queue_name); - tokio::spawn(queue_task.start().instrument(queue_worker_span)); + tokio::spawn(async move { queue_task.start().await }); Ok(Method::QueueDeclareOk(QueueDeclareOk { queue: queue_name.to_string(), diff --git a/amqp_messaging/src/queue.rs b/amqp_messaging/src/queue.rs deleted file mode 100644 index 4740d2a..0000000 --- a/amqp_messaging/src/queue.rs +++ /dev/null @@ -1,43 +0,0 @@ -use amqp_core::{ - queue::{Queue, QueueEventReceiver}, - GlobalData, -}; -use tracing::{debug, info}; - -#[derive(Debug)] -#[allow(dead_code)] -pub struct QueueTask { - global_data: GlobalData, - event_recv: QueueEventReceiver, - queue: Queue, -} - -impl QueueTask { - pub fn new(global_data: GlobalData, event_recv: QueueEventReceiver, queue: Queue) -> Self { - Self { - global_data, - event_recv, - queue, - } - } - - pub async fn start(mut self) { - info!("Started queue worker task"); - - loop { - let next_event = self.event_recv.recv().await; - - match next_event { - Some(event) => debug!(?event, "Received event"), - None => { - self.cleanup().await; - return; - } - } - } - } - - async fn cleanup(&mut self) { - // do stuff or something like that id whatever - } -} diff --git a/amqp_messaging/src/queue_worker.rs b/amqp_messaging/src/queue_worker.rs new file mode 100644 index 0000000..7a919c0 --- /dev/null +++ b/amqp_messaging/src/queue_worker.rs @@ -0,0 +1,102 @@ +use amqp_core::{ + connection::ConnectionEvent, + consumer::Consumer, + message::Message, + methods::{BasicDeliver, Method}, + queue::{Queue, QueueEvent, QueueEventReceiver}, + GlobalData, +}; +use std::borrow::Borrow; +use tracing::info; + +#[derive(Debug)] +#[allow(dead_code)] +pub struct QueueTask { + global_data: GlobalData, + event_recv: QueueEventReceiver, + queue: Queue, +} + +impl QueueTask { + fn show_name(&self) -> &str { + self.queue.name.borrow() + } + + pub fn new(global_data: GlobalData, event_recv: QueueEventReceiver, queue: Queue) -> Self { + Self { + global_data, + event_recv, + queue, + } + } + + #[tracing::instrument(skip(self), fields(name = self.show_name()))] + pub async fn start(mut self) { + info!("Started queue worker task"); + + loop { + let next_event = self.event_recv.recv().await; + + match next_event { + Some(QueueEvent::PublishMessage(message)) => { + self.handle_publish_message(message).await + } + Some(QueueEvent::Shutdown) | None => { + self.cleanup().await; + return; + } + } + } + } + + #[tracing::instrument(skip(self), fields(name = self.show_name()), level = "debug")] + async fn handle_publish_message(&mut self, message: Message) { + // todo: we just send it to the consumer directly and ignore it if the consumer doesn't exist + // consuming is hard, but this should work *for now* + + let could_deliver = { + let consumers = self.queue.consumers.lock(); + if let Some(consumer) = consumers.values().next() { + Self::try_deliver(&message, consumer) + } else { + Err(()) + } + }; + + if let Err(()) = could_deliver { + self.queue_message(message).await; + } + } + + #[tracing::instrument(skip(consumer), level = "trace")] + fn try_deliver(message: &Message, consumer: &Consumer) -> Result<(), ()> { + let routing = &message.routing; + + let method = Box::new(Method::BasicDeliver(BasicDeliver { + consumer_tag: consumer.tag.clone(), + delivery_tag: 0, + redelivered: false, + exchange: routing.exchange.clone(), + routing_key: routing.routing_key.clone(), + })); + + let result = consumer + .channel + .event_sender + .try_send(ConnectionEvent::MethodContent( + consumer.channel.num, + method, + message.header.clone(), + message.content.clone(), + )); + + result.map_err(drop) + } + + #[tracing::instrument(skip(self), fields(name = self.show_name()), level = "trace")] + async fn queue_message(&mut self, _message: Message) {} + + async fn cleanup(&mut self) { + // do stuff or something like that id whatever + } +} diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 1a511a9..72a5621 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -9,7 +9,7 @@ use amqp_core::{ Channel, ChannelInner, ChannelNum, ConEventReceiver, ConEventSender, Connection, ConnectionEvent, ConnectionId, ContentHeader, }, - message::{MessageId, RawMessage, RoutingInformation}, + message::{MessageId, MessageInner, RoutingInformation}, methods::{ BasicPublish, ChannelClose, ChannelCloseOk, ChannelOpenOk, ConnectionClose, ConnectionCloseOk, ConnectionOpen, ConnectionOpenOk, ConnectionStart, ConnectionStartOk, @@ -198,9 +198,8 @@ impl TransportConnection { Ok(()) } + #[tracing::instrument(skip(self), level = "trace")] async fn send_method(&mut self, channel: ChannelNum, method: &Method) -> Result<()> { - trace!(%channel, ?method, "Sending method"); - let mut payload = Vec::with_capacity(64); methods::write::write_method(method, &mut payload)?; frame::write_frame(&mut self.stream, FrameType::Method, channel, &payload).await @@ -324,6 +323,7 @@ impl TransportConnection { } } + #[tracing::instrument(skip(self), level = "debug")] async fn handle_frame(&mut self, frame: Frame) -> Result<()> { let channel = frame.channel; self.reset_timeout(); @@ -358,9 +358,9 @@ impl TransportConnection { } } + #[tracing::instrument(skip(self, frame), level = "trace")] async fn dispatch_method(&mut self, frame: Frame) -> Result<()> { let method = methods::parse_method(&frame.payload)?; - debug!(?method, "Received method"); // Sending a method implicitly cancels the content frames that might be ongoing self.channels @@ -483,7 +483,7 @@ impl TransportConnection { .. }) = method { - let message = RawMessage { + let message = MessageInner { id: MessageId::random(), header, routing: RoutingInformation { @@ -498,12 +498,7 @@ impl TransportConnection { let channel = self.channels.get(&channel).ok_or(ConException::Todo)?; - // Spawn the handler for the publish. The connection task goes back to handling - // just the connection. - tokio::spawn(amqp_messaging::methods::handle_basic_publish( - channel.global_chan.clone(), - message, - )); + amqp_messaging::methods::handle_basic_publish(channel.global_chan.clone(), message); Ok(()) } else { Err(ConException::Todo.into()) diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs index f6dd441..347afe3 100644 --- a/amqp_transport/src/frame.rs +++ b/amqp_transport/src/frame.rs @@ -252,6 +252,7 @@ impl Debug for MaxFrameSize { } } +#[tracing::instrument(skip(w), level = "trace")] pub async fn write_frame( mut w: W, kind: FrameType, @@ -261,8 +262,6 @@ pub async fn write_frame( where W: AsyncWriteExt + Unpin + Send, { - trace!(?kind, ?channel, ?payload, "Sending frame"); - w.write_u8(kind as u8).await?; w.write_u16(channel.num()).await?; w.write_u32(u32::try_from(payload.len()).context("frame size too big")?) diff --git a/amqp_transport/src/lib.rs b/amqp_transport/src/lib.rs index 3533bb9..04e10b3 100644 --- a/amqp_transport/src/lib.rs +++ b/amqp_transport/src/lib.rs @@ -11,46 +11,92 @@ mod tests; // TODO: handle big types use crate::connection::TransportConnection; -use amqp_core::GlobalData; -use tokio::net; -use tracing::{info, trace_span, Instrument}; +use amqp_core::{connection::ConnectionEvent, queue::QueueEvent, GlobalData}; +use anyhow::Context; +use std::{future::Future, net::SocketAddr}; +use tokio::{net, net::TcpStream, select}; +use tracing::{info, info_span, Instrument}; -pub async fn do_thing_i_guess(global_data: GlobalData) -> anyhow::Result<()> { +pub async fn do_thing_i_guess( + global_data: GlobalData, + terminate: impl Future + Send, +) -> anyhow::Result<()> { + select! { + res = accept_cons(global_data.clone()) => { + res + } + _ = terminate => { + handle_shutdown(global_data).await + } + } +} + +async fn accept_cons(global_data: GlobalData) -> anyhow::Result<()> { info!("Binding TCP listener..."); let listener = net::TcpListener::bind(("127.0.0.1", 5672)).await?; info!(addr = ?listener.local_addr()?, "Successfully bound TCP listener"); loop { - let (stream, peer_addr) = listener.accept().await?; - - let id = rand::random(); - - info!(local_addr = ?stream.local_addr(), %id, "Accepted new connection"); - let span = trace_span!("client-connection", %id); - - let (method_send, method_recv) = tokio::sync::mpsc::channel(10); - - let connection_handle = amqp_core::connection::ConnectionInner::new( - id, - peer_addr, - global_data.clone(), - method_send.clone(), - ); - - let mut global_data_guard = global_data.lock(); - global_data_guard - .connections - .insert(id, connection_handle.clone()); - - let connection = TransportConnection::new( - id, - stream, - connection_handle, - global_data.clone(), - method_send, - method_recv, - ); - - tokio::spawn(connection.start_connection_processing().instrument(span)); + let connection = listener.accept().await?; + handle_con(global_data.clone(), connection); } } + +fn handle_con(global_data: GlobalData, connection: (TcpStream, SocketAddr)) { + let (stream, peer_addr) = connection; + let id = rand::random(); + + info!(local_addr = ?stream.local_addr(), %id, "Accepted new connection"); + let span = info_span!("client-connection", %id); + + let (method_send, method_recv) = tokio::sync::mpsc::channel(10); + + let connection_handle = amqp_core::connection::ConnectionInner::new( + id, + peer_addr, + global_data.clone(), + method_send.clone(), + ); + + let mut global_data_guard = global_data.lock(); + global_data_guard + .connections + .insert(id, connection_handle.clone()); + + let connection = TransportConnection::new( + id, + stream, + connection_handle, + global_data.clone(), + method_send, + method_recv, + ); + + tokio::spawn(connection.start_connection_processing().instrument(span)); +} + +async fn handle_shutdown(global_data: GlobalData) -> anyhow::Result<()> { + info!("Shutting down..."); + + let lock = global_data.lock(); + + for con in lock.connections.values() { + con.event_sender + .try_send(ConnectionEvent::Shutdown) + .context("failed to stop connection")?; + } + + for queue in lock.queues.values() { + queue + .event_send + .try_send(QueueEvent::Shutdown) + .context("failed to stop queue worker")?; + } + + // todo: here we should wait for everything to close + // https://github.com/tokio-rs/mini-redis/blob/4b4ecf0310e6bca43d336dde90a06d9dcad00d6c/src/server.rs#L51 + + info!("Finished shutdown"); + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 144ad3b..290323b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,9 @@ use anyhow::Result; use clap::Parser; +use std::str::FromStr; use tracing::{info, info_span, Instrument}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry}; /// An AMQP 0-9-1 broker implementation. #[derive(Parser)] @@ -10,9 +12,10 @@ struct Args { /// Whether to serve the dashboard on localhost. Port defaults to 3000. #[clap(short, long)] dashboard: bool, - /// The log level of the application. Overwrites the `RUST_LOG` env var. + + /// Displays logs in a flat structure, otherwise as a tree #[clap(long)] - log_level: Option, + flat_log: bool, } #[tokio::main] @@ -24,29 +27,49 @@ async fn main() -> Result<()> { let global_data = amqp_core::GlobalData::default(); if args.dashboard { - let dashboard_span = info_span!("dashboard"); - tokio::spawn(amqp_dashboard::dashboard(global_data.clone()).instrument(dashboard_span)); + let global_data = global_data.clone(); + tokio::spawn(async move { + amqp_dashboard::dashboard(global_data) + .instrument(info_span!("dashboard")) + .await + }); } - amqp_transport::do_thing_i_guess(global_data).await + let res = amqp_transport::do_thing_i_guess(global_data, terminate()).await; + + info!("Bye!"); + + res } fn setup_tracing(args: &Args) { - const DEFAULT_LOG: &str = "hyper=info,debug"; + const DEFAULT_LOG: &str = "hyper=info,debug"; // set hyper to info because I really don't care about hyper - let log_filter = args - .log_level - .clone() - .or_else(|| std::env::var("RUST_LOG").ok()) - .unwrap_or_else(|| DEFAULT_LOG.to_owned()); + let log_filter = std::env::var("RUST_LOG").unwrap_or_else(|_| DEFAULT_LOG.to_owned()); - tracing_subscriber::fmt() - .with_level(true) - .with_timer(tracing_subscriber::fmt::time::time()) - .with_ansi(true) - .with_thread_names(true) - .with_env_filter(&log_filter) - .init(); + let registry = Registry::default().with(EnvFilter::from_str(&log_filter).unwrap()); + + if args.flat_log { + let fmt_layer = tracing_subscriber::fmt::layer() + .with_level(true) + .with_timer(tracing_subscriber::fmt::time::time()) + .with_ansi(true) + .with_thread_names(true); + + registry.with(fmt_layer).init(); + } else { + let tree_layer = tracing_tree::HierarchicalLayer::new(2) + .with_targets(true) + .with_bracketed_fields(true); + + registry.with(tree_layer).init(); + }; info!(%log_filter, "Using log filter level"); } + +async fn terminate() { + tokio::signal::ctrl_c() + .await + .expect("failed to install ctrl-c signal handler"); +}