From dbc577abbcd8c6e449779e433c02e7edc02bb5ad Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Sat, 19 Mar 2022 19:01:31 +0100 Subject: [PATCH] improve everything --- haesli_core/src/exchange.rs | 13 ++++++ haesli_core/src/lib.rs | 1 + haesli_core/src/macros.rs | 27 ++++++++---- haesli_core/src/queue.rs | 15 +++---- haesli_messaging/src/methods/consume.rs | 4 +- haesli_messaging/src/methods/mod.rs | 58 ++++++++++++------------- haesli_messaging/src/methods/publish.rs | 4 +- haesli_messaging/src/methods/queue.rs | 19 +++++--- haesli_transport/src/connection.rs | 45 +++++++++++++------ haesli_transport/src/error.rs | 2 +- 10 files changed, 117 insertions(+), 71 deletions(-) create mode 100644 haesli_core/src/exchange.rs diff --git a/haesli_core/src/exchange.rs b/haesli_core/src/exchange.rs new file mode 100644 index 0000000..1be6801 --- /dev/null +++ b/haesli_core/src/exchange.rs @@ -0,0 +1,13 @@ +pub enum ExchangeType { + /// Routes a message to a queue if the routing-keys are equal + Direct, + /// Always routes the message to a queue + Fanout, + /// Routes a message to a queue if the routing key matches the pattern + Topic, + /// Is bound with a table of headers and values, and matches if the message headers + /// match up with the binding headers + Headers, + /// The message is sent to the server system service with the name of the routing-key + System, +} diff --git a/haesli_core/src/lib.rs b/haesli_core/src/lib.rs index 701a4a1..44892fe 100644 --- a/haesli_core/src/lib.rs +++ b/haesli_core/src/lib.rs @@ -3,6 +3,7 @@ pub mod connection; pub mod consumer; pub mod error; +pub mod exchange; mod macros; pub mod message; pub mod methods; diff --git a/haesli_core/src/macros.rs b/haesli_core/src/macros.rs index 9b74e7d..95939e2 100644 --- a/haesli_core/src/macros.rs +++ b/haesli_core/src/macros.rs @@ -3,24 +3,24 @@ macro_rules! newtype_id { ($(#[$meta:meta])* $vis:vis $name:ident) => { $(#[$meta])* #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] - $vis struct $name(::uuid::Uuid); + $vis struct $name(uuid::Uuid); impl $name { #[must_use] pub fn random() -> Self { - ::rand::random() + rand::random() } } - impl ::std::fmt::Display for $name { - fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - ::std::fmt::Display::fmt(&self.0, f) + impl std::fmt::Display for $name { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) } } - impl ::rand::prelude::Distribution<$name> for ::rand::distributions::Standard { - fn sample(&self, rng: &mut R) -> $name { - $name(::uuid::Uuid::from_bytes(rng.gen())) + impl rand::prelude::Distribution<$name> for rand::distributions::Standard { + fn sample(&self, rng: &mut R) -> $name { + $name(uuid::Uuid::from_bytes(rng.gen())) } } }; @@ -58,11 +58,20 @@ macro_rules! newtype { Self(other.into()) } } + + impl std::fmt::Display for $name + where + $ty: std::fmt::Display, + { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) + } + } }; } #[macro_export] -macro_rules! haesli_todo { +macro_rules! amqp_todo { () => { return Err(::haesli_core::error::ConException::NotImplemented(concat!( file!(), diff --git a/haesli_core/src/queue.rs b/haesli_core/src/queue.rs index 5c3546f..39f5627 100644 --- a/haesli_core/src/queue.rs +++ b/haesli_core/src/queue.rs @@ -1,7 +1,7 @@ use std::{ borrow::Borrow, collections::HashMap, - fmt::{Debug, Display, Formatter}, + fmt::{Debug, Formatter}, sync::{atomic::AtomicUsize, Arc}, }; @@ -35,22 +35,21 @@ newtype!( impl Borrow for QueueName { fn borrow(&self) -> &str { - std::borrow::Borrow::borrow(&self.0) - } -} - -impl Display for QueueName { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - Display::fmt(&self.0, f) + Borrow::borrow(&self.0) } } #[derive(Debug)] pub struct QueueInner { + /// Internal ID, might actually be unused pub id: QueueId, + /// The visible name of the queue pub name: QueueName, pub messages: haesli_datastructure::MessageQueue, + /// Whether the queue should be kept when the server restarts pub durable: bool, + /// To which connection the queue belongs to it will be deleted when the connection closes + // todo: connection or channel? pub exclusive: Option, /// Whether the queue will automatically be deleted when no consumers uses it anymore. /// The queue can always be manually deleted. diff --git a/haesli_messaging/src/methods/consume.rs b/haesli_messaging/src/methods/consume.rs index 1cf2260..ef962b4 100644 --- a/haesli_messaging/src/methods/consume.rs +++ b/haesli_messaging/src/methods/consume.rs @@ -1,10 +1,10 @@ use std::sync::Arc; use haesli_core::{ + amqp_todo, connection::Channel, consumer::{Consumer, ConsumerId}, error::ChannelException, - haesli_todo, methods::{BasicConsume, BasicConsumeOk, Method}, }; use tracing::info; @@ -23,7 +23,7 @@ pub fn consume(channel: Channel, basic_consume: BasicConsume) -> Result } = basic_consume; if no_wait || no_local || exclusive || no_ack { - haesli_todo!(); + amqp_todo!(); } let global_data = channel.global_data.clone(); diff --git a/haesli_messaging/src/methods/mod.rs b/haesli_messaging/src/methods/mod.rs index 1f08d2e..fd08462 100644 --- a/haesli_messaging/src/methods/mod.rs +++ b/haesli_messaging/src/methods/mod.rs @@ -2,7 +2,7 @@ mod consume; mod publish; mod queue; -use haesli_core::{connection::Channel, haesli_todo, message::Message, methods::Method}; +use haesli_core::{amqp_todo, connection::Channel, message::Message, methods::Method}; use tracing::info; use crate::Result; @@ -15,42 +15,42 @@ pub async fn handle_method(channel_handle: Channel, method: Method) -> Result haesli_todo!(), - Method::ExchangeDeclareOk(_) => haesli_todo!(), - Method::ExchangeDelete(_) => haesli_todo!(), - Method::ExchangeDeleteOk(_) => haesli_todo!(), + Method::ExchangeDeclare(_) => amqp_todo!(), + Method::ExchangeDeclareOk(_) => amqp_todo!(), + Method::ExchangeDelete(_) => amqp_todo!(), + Method::ExchangeDeleteOk(_) => amqp_todo!(), Method::QueueDeclare(queue_declare) => queue::declare(channel_handle, queue_declare)?, - Method::QueueDeclareOk { .. } => haesli_todo!(), + Method::QueueDeclareOk { .. } => amqp_todo!(), Method::QueueBind(queue_bind) => queue::bind(channel_handle, queue_bind).await?, - Method::QueueBindOk(_) => haesli_todo!(), - Method::QueueUnbind { .. } => haesli_todo!(), - Method::QueueUnbindOk(_) => haesli_todo!(), - Method::QueuePurge { .. } => haesli_todo!(), - Method::QueuePurgeOk { .. } => haesli_todo!(), - Method::QueueDelete { .. } => haesli_todo!(), - Method::QueueDeleteOk { .. } => haesli_todo!(), - Method::BasicQos { .. } => haesli_todo!(), - Method::BasicQosOk(_) => haesli_todo!(), + Method::QueueBindOk(_) => amqp_todo!(), + Method::QueueUnbind { .. } => amqp_todo!(), + Method::QueueUnbindOk(_) => amqp_todo!(), + Method::QueuePurge { .. } => amqp_todo!(), + Method::QueuePurgeOk { .. } => amqp_todo!(), + Method::QueueDelete { .. } => amqp_todo!(), + Method::QueueDeleteOk { .. } => amqp_todo!(), + Method::BasicQos { .. } => amqp_todo!(), + Method::BasicQosOk(_) => amqp_todo!(), Method::BasicConsume(consume) => consume::consume(channel_handle, consume)?, - Method::BasicConsumeOk { .. } => haesli_todo!(), - Method::BasicCancel { .. } => haesli_todo!(), - Method::BasicCancelOk { .. } => haesli_todo!(), - Method::BasicReturn { .. } => haesli_todo!(), - Method::BasicDeliver { .. } => haesli_todo!(), - Method::BasicGet { .. } => haesli_todo!(), - Method::BasicGetOk { .. } => haesli_todo!(), - Method::BasicGetEmpty { .. } => haesli_todo!(), - Method::BasicAck { .. } => haesli_todo!(), - Method::BasicReject { .. } => haesli_todo!(), - Method::BasicRecoverAsync { .. } => haesli_todo!(), - Method::BasicRecover { .. } => haesli_todo!(), - Method::BasicRecoverOk(_) => haesli_todo!(), + Method::BasicConsumeOk { .. } => amqp_todo!(), + Method::BasicCancel { .. } => amqp_todo!(), + Method::BasicCancelOk { .. } => amqp_todo!(), + Method::BasicReturn { .. } => amqp_todo!(), + Method::BasicDeliver { .. } => amqp_todo!(), + Method::BasicGet { .. } => amqp_todo!(), + Method::BasicGetOk { .. } => amqp_todo!(), + Method::BasicGetEmpty { .. } => amqp_todo!(), + Method::BasicAck { .. } => amqp_todo!(), + Method::BasicReject { .. } => amqp_todo!(), + Method::BasicRecoverAsync { .. } => amqp_todo!(), + Method::BasicRecover { .. } => amqp_todo!(), + Method::BasicRecoverOk(_) => amqp_todo!(), Method::TxSelect(_) | Method::TxSelectOk(_) | Method::TxCommit(_) | Method::TxCommitOk(_) | Method::TxRollback(_) - | Method::TxRollbackOk(_) => haesli_todo!(), + | Method::TxRollbackOk(_) => amqp_todo!(), Method::BasicPublish { .. } => { unreachable!("Basic.Publish is handled somewhere else because it has a body") } diff --git a/haesli_messaging/src/methods/publish.rs b/haesli_messaging/src/methods/publish.rs index 3bb71f5..6843466 100644 --- a/haesli_messaging/src/methods/publish.rs +++ b/haesli_messaging/src/methods/publish.rs @@ -1,7 +1,7 @@ use haesli_core::{ + amqp_todo, connection::Channel, error::{ChannelException, ConException}, - haesli_todo, message::Message, queue::QueueEvent, }; @@ -17,7 +17,7 @@ pub fn publish(channel_handle: Channel, message: Message) -> Result<()> { let routing = &message.routing; if !routing.exchange.is_empty() { - haesli_todo!(); + amqp_todo!(); } let global_data = global_data.lock(); diff --git a/haesli_messaging/src/methods/queue.rs b/haesli_messaging/src/methods/queue.rs index 7986d1f..75409e0 100644 --- a/haesli_messaging/src/methods/queue.rs +++ b/haesli_messaging/src/methods/queue.rs @@ -1,8 +1,8 @@ use std::sync::{atomic::AtomicUsize, Arc}; use haesli_core::{ + amqp_todo, connection::Channel, - haesli_todo, methods::{Method, QueueBind, QueueDeclare, QueueDeclareOk}, queue::{QueueDeletion, QueueId, QueueInner, QueueName}, GlobalData, @@ -24,16 +24,23 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result .. } = queue_declare; + // 2.1.4.1 - If no queue name is given, chose a name + let queue_name = if queue_name.is_empty() { + queue_name + } else { + format!("q_{}", haesli_core::random_uuid()) + }; + let queue_name = QueueName::new(queue_name.into()); if !arguments.is_empty() { - haesli_todo!(); + amqp_todo!(); } - // todo: durable is technically spec-compliant, the spec doesn't really require it, but it's a todo - // not checked here because it's the default for amqplib which is annoying + // todo: implement durable, not checked here because it's the amqplib default + if passive || no_wait { - haesli_todo!(); + amqp_todo!(); } let global_data = channel.global_data.clone(); @@ -79,7 +86,7 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result } pub async fn bind(_channel_handle: Channel, _queue_bind: QueueBind) -> Result { - haesli_todo!(); + amqp_todo!(); } fn bind_queue(global_data: GlobalData, _exchange: (), routing_key: Arc) -> Result<()> { diff --git a/haesli_transport/src/connection.rs b/haesli_transport/src/connection.rs index 9cb92ef..a2de542 100644 --- a/haesli_transport/src/connection.rs +++ b/haesli_transport/src/connection.rs @@ -109,12 +109,30 @@ impl TransportConnection { } pub async fn start_connection_processing(mut self) { - let process_result = self.process_connection().await; + self.process_connection().await; + if let Err(err) = self.stream.shutdown().await { + error!(%err, "Failed to shut down TCP stream"); + } + + // global connection is closed on drop + } + + async fn process_connection(&mut self) { + let initialize_result = self.initialize_connection().await; + + if let Err(err) = initialize_result { + // 2.2.4 - prior to sending or receiving Open or Open-Ok, + // a peer that detects an error MUST close the socket without sending any further data. + warn!(%err, "An error occurred during connection initialization"); + return; + } + + let process_result = self.main_loop().await; match process_result { Ok(()) => {} - Err(TransError::Protocol(ProtocolError::GracefullyClosed)) => { - /* do nothing, remove below */ + Err(TransError::Protocol(ProtocolError::GracefullyClosed | ProtocolError::Fatal)) => { + /* do nothing, connection is closed on drop */ } Err(TransError::Protocol(ProtocolError::ConException(ex))) => { warn!(%ex, "Connection exception occurred. This indicates a faulty client."); @@ -129,19 +147,18 @@ impl TransportConnection { } Err(err) => error!(%err, "Error during processing of connection"), } - - // global connection is closed on drop } - pub async fn process_connection(&mut self) -> Result<()> { + async fn initialize_connection(&mut self) -> Result<()> { + // 2.2.4 - Initialize connection self.negotiate_version().await?; self.start().await?; + // todo should `secure` happen here? self.tune().await?; self.open().await?; info!("Connection is ready for usage!"); - - self.main_loop().await + Ok(()) } async fn send_method_content( @@ -332,7 +349,7 @@ impl TransportConnection { let result = match frame.kind { FrameType::Method => self.dispatch_method(frame).await, FrameType::Heartbeat => { - Ok(()) /* Nothing here, just the `reset_timeout` above */ + Ok(()) /* Nothing here, just the `reset_timeout` above */ } FrameType::Header => self.dispatch_header(frame), FrameType::Body => self.dispatch_body(frame), @@ -596,7 +613,7 @@ impl TransportConnection { .await .context("read protocol header")?; - debug!(received_header = ?read_header_buf,"Received protocol header"); + trace!(received_header = ?read_header_buf, "Received protocol header"); let protocol = &read_header_buf[0..4]; let version = &read_header_buf[5..8]; @@ -606,19 +623,19 @@ impl TransportConnection { .write_all(OWN_PROTOCOL_HEADER) .await .context("write protocol header")?; - debug!(?protocol, "Version negotiation failed"); + trace!(?protocol, "Version negotiation failed"); return Err(ProtocolError::ProtocolNegotiationFailed.into()); } if &read_header_buf[0..5] == b"AMQP\0" && version == SUPPORTED_PROTOCOL_VERSION { - debug!(?version, "Version negotiation successful"); + trace!(?version, "Version negotiation successful"); Ok(()) } else { self.stream .write_all(OWN_PROTOCOL_HEADER) .await .context("write protocol header")?; - debug!(?version, expected_version = ?SUPPORTED_PROTOCOL_VERSION, "Version negotiation failed"); + trace!(?version, expected_version = ?SUPPORTED_PROTOCOL_VERSION, "Version negotiation failed"); Err(ProtocolError::ProtocolNegotiationFailed.into()) } } @@ -671,7 +688,7 @@ fn server_properties(host: SocketAddr) -> Table { let host_str = host.ip().to_string(); HashMap::from([ ("host".to_owned(), ls(host_str)), - ("product".to_owned(), ls("no name yet")), + ("product".to_owned(), ls("haesli")), ("version".to_owned(), ls("0.1.0")), ("platform".to_owned(), ls("microsoft linux")), ("copyright".to_owned(), ls("MIT")), diff --git a/haesli_transport/src/error.rs b/haesli_transport/src/error.rs index cf1158c..63d4568 100644 --- a/haesli_transport/src/error.rs +++ b/haesli_transport/src/error.rs @@ -2,7 +2,7 @@ use std::io::Error; pub use haesli_core::error::{ConException, ProtocolError}; -type StdResult = std::result::Result; +pub type StdResult = std::result::Result; pub type Result = StdResult;