diff --git a/Cargo.lock b/Cargo.lock index b43bcf9..cc665b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -442,8 +442,8 @@ dependencies = [ "haesli_datastructure", "parking_lot", "rand", + "smallvec", "thiserror", - "tinyvec", "tokio", "uuid", ] @@ -494,7 +494,6 @@ dependencies = [ "rand", "regex", "thiserror", - "tinyvec", "tokio", "tracing", ] @@ -1291,21 +1290,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "tinyvec" -version = "1.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c1c1d5a42b6245520c249549ec267180beaffcc0615401ac8e31853d4b6d8d2" -dependencies = [ - "tinyvec_macros", -] - -[[package]] -name = "tinyvec_macros" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" - [[package]] name = "tokio" version = "1.17.0" diff --git a/haesli_core/Cargo.toml b/haesli_core/Cargo.toml index 431be64..ecc7b90 100644 --- a/haesli_core/Cargo.toml +++ b/haesli_core/Cargo.toml @@ -10,7 +10,7 @@ haesli_datastructure = { path = "../haesli_datastructure" } bytes = "1.1.0" parking_lot = "0.12.0" rand = "0.8.5" -tinyvec = { version = "1.5.1", features = ["alloc", "rustc_1_55"] } +smallvec = { version = "1.8.0", features = ["union"] } thiserror = "1.0.30" tokio = { version = "1.17.0", features = ["sync"] } uuid = "0.8.2" diff --git a/haesli_core/src/connection.rs b/haesli_core/src/connection.rs index da9511d..fc53e07 100644 --- a/haesli_core/src/connection.rs +++ b/haesli_core/src/connection.rs @@ -7,13 +7,12 @@ use std::{ use bytes::Bytes; use parking_lot::Mutex; -use tinyvec::TinyVec; use tokio::sync::mpsc; use crate::{ consumer::Consumer, methods::{self, Method}, - newtype_id, GlobalData, Queue, + newtype_id, GlobalData, Queue, SingleVec, }; newtype_id!(pub ConnectionId); @@ -67,7 +66,7 @@ pub struct ConnectionInner { pub enum ConnectionEvent { Shutdown, Method(ChannelNum, Box), - MethodContent(ChannelNum, Box, ContentHeader, TinyVec<[Bytes; 1]>), + MethodContent(ChannelNum, Box, ContentHeader, SingleVec), } pub type ConEventSender = mpsc::Sender; diff --git a/haesli_core/src/exchange.rs b/haesli_core/src/exchange.rs index beeaedf..7e2f361 100644 --- a/haesli_core/src/exchange.rs +++ b/haesli_core/src/exchange.rs @@ -2,6 +2,13 @@ use std::{borrow::Borrow, collections::HashMap, sync::Arc}; use crate::{newtype, Queue}; +#[derive(Debug)] +pub enum TopicSegment { + Word(String), + SingleWildcard, + MultiWildcard, +} + #[derive(Debug)] pub enum ExchangeType { /// Routes a message to a queue if the routing-keys are equal @@ -9,7 +16,7 @@ pub enum ExchangeType { /// Always routes the message to a queue Fanout { bindings: Vec }, /// Routes a message to a queue if the routing key matches the pattern - Topic { bindings: Vec<(String, Queue)> }, + Topic { bindings: Vec<(Vec, Queue)> }, /// Is bound with a table of headers and values, and matches if the message headers /// match up with the binding headers /// diff --git a/haesli_core/src/lib.rs b/haesli_core/src/lib.rs index 1552fa4..8478414 100644 --- a/haesli_core/src/lib.rs +++ b/haesli_core/src/lib.rs @@ -25,6 +25,8 @@ use crate::{ queue::{Queue, QueueName}, }; +pub type SingleVec = smallvec::SmallVec<[T; 1]>; + #[derive(Clone)] // todo: what if this was downstream? pub struct GlobalData { diff --git a/haesli_core/src/message.rs b/haesli_core/src/message.rs index eba4319..c8eed80 100644 --- a/haesli_core/src/message.rs +++ b/haesli_core/src/message.rs @@ -1,9 +1,8 @@ use std::sync::Arc; use bytes::Bytes; -use tinyvec::TinyVec; -use crate::{connection::ContentHeader, newtype_id}; +use crate::{connection::ContentHeader, newtype_id, SingleVec}; pub type Message = Arc; @@ -14,7 +13,7 @@ pub struct MessageInner { pub id: MessageId, pub header: ContentHeader, pub routing: RoutingInformation, - pub content: TinyVec<[Bytes; 1]>, + pub content: SingleVec, } #[derive(Debug)] diff --git a/haesli_messaging/src/methods/publish.rs b/haesli_messaging/src/methods/publish.rs index 43fa4c0..b20ebf3 100644 --- a/haesli_messaging/src/methods/publish.rs +++ b/haesli_messaging/src/methods/publish.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use haesli_core::{ amqp_todo, connection::Channel, @@ -29,16 +31,17 @@ pub fn publish(channel_handle: Channel, message: Message) -> Result<()> { .get(exchange.as_str()) .ok_or(ChannelException::NotFound)?; - let queue = routing::route_message(exchange, &message.routing.routing_key) - .ok_or(ChannelException::NotFound)?; - - queue - .event_send - .try_send(QueueEvent::PublishMessage(message)) - .map_err(|err| { - error!(?err, "Failed to send message to queue event queue"); - ConException::InternalError - })?; + let queues = routing::route_message(exchange, &message.routing.routing_key) + .ok_or(ChannelException::NotFound)?; // todo this isn't really correct but the tests pass ✔️ + for queue in queues { + queue + .event_send + .try_send(QueueEvent::PublishMessage(Arc::clone(&message))) + .map_err(|err| { + error!(?err, "Failed to send message to queue event queue"); + ConException::InternalError + })?; + } Ok(()) } diff --git a/haesli_messaging/src/routing.rs b/haesli_messaging/src/routing.rs index 0d98a64..d1cf35b 100644 --- a/haesli_messaging/src/routing.rs +++ b/haesli_messaging/src/routing.rs @@ -1,30 +1,87 @@ +use std::sync::Arc; + use haesli_core::{ - exchange::{Exchange, ExchangeType}, + exchange::{Exchange, ExchangeType, TopicSegment}, queue::Queue, }; +fn parse_topic(topic: &str) -> Vec { + topic + .split(".") + .map(|segment| match segment { + "*" => TopicSegment::SingleWildcard, + "#" => TopicSegment::MultiWildcard, + word => TopicSegment::Word(word.to_owned()), + }) + .collect() +} + pub fn bind(exchange: &mut Exchange, routing_key: String, queue: Queue) { match &mut exchange.kind { ExchangeType::Direct { bindings } => { bindings.insert(routing_key, queue); } ExchangeType::Fanout { bindings } => bindings.push(queue), - ExchangeType::Topic { bindings } => bindings.push((routing_key, queue)), + ExchangeType::Topic { bindings } => bindings.push((parse_topic(&routing_key), queue)), ExchangeType::Headers => {} // unsupported ExchangeType::System => {} // unsupported } } /// Route a message to a queue. Returns the queue to send it to, or `None` if it can't be matched -pub fn route_message(exchange: &Exchange, routing_key: &str) -> Option { +pub fn route_message(exchange: &Exchange, routing_key: &str) -> Option> { match &exchange.kind { ExchangeType::Direct { bindings } => { // 3.1.3.1 - routing-key = routing-key - bindings.get(routing_key).cloned() + bindings.get(routing_key).cloned().map(|q| vec![q]) + } + ExchangeType::Fanout { bindings } => { + // 3.1.3.2 - unconditionally + Some(bindings.clone()) // see, this is actually Not That Bad I Hope + } + ExchangeType::Topic { bindings } => { + let topic = parse_topic(routing_key); + // todo: optimizing this is a fun problem + + Some(match_topic(bindings, topic)) } - ExchangeType::Fanout { .. } => None, - ExchangeType::Topic { .. } => None, ExchangeType::Headers => None, // unsupported ExchangeType::System => None, // unsupported } } + +fn match_topic( + patterns: &[(Vec, Q)], + routing_key: Vec, +) -> Vec { + patterns + .iter() + .filter_map(|(pattern, value)| { + let mut queue_segments = routing_key.iter(); + + for segment in pattern {} + + Some(value.clone()) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use crate::routing::{match_topic, parse_topic}; + + macro_rules! match_topics { + (patterns: $($pattern:expr),*) => {}; + } + + #[test] + #[ignore] + fn match_empty_topic() { + let patterns = [(parse_topic(""), 1), (parse_topic("BAD"), 2)]; + let routing_key = parse_topic(""); + + let matched = match_topic(&patterns, routing_key); + + assert_eq!(matched, vec![1]) + } +} diff --git a/haesli_transport/Cargo.toml b/haesli_transport/Cargo.toml index 85976a5..90789fd 100644 --- a/haesli_transport/Cargo.toml +++ b/haesli_transport/Cargo.toml @@ -14,7 +14,6 @@ nom = "7.1.0" once_cell = "1.9.0" rand = "0.8.4" regex = "1.5.4" -tinyvec = { version = "1.5.1", features = ["alloc", "rustc_1_55"] } thiserror = "1.0.30" tokio = { version = "1.16.1", features = ["full"] } tracing = "0.1.30" diff --git a/haesli_transport/src/connection.rs b/haesli_transport/src/connection.rs index dbca7fc..ac5d20a 100644 --- a/haesli_transport/src/connection.rs +++ b/haesli_transport/src/connection.rs @@ -15,9 +15,8 @@ use haesli_core::{ ConnectionCloseOk, ConnectionOpen, ConnectionOpenOk, ConnectionStart, ConnectionStartOk, ConnectionTune, ConnectionTuneOk, FieldValue, Longstr, Method, ReplyCode, ReplyText, Table, }, - GlobalData, + GlobalData, SingleVec, }; -use tinyvec::TinyVec; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::TcpStream, @@ -75,7 +74,7 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); enum ChannelStatus { Default, NeedHeader(u16, Box), - NeedsBody(Box, ContentHeader, TinyVec<[Bytes; 1]>), + NeedsBody(Box, ContentHeader, SingleVec), } impl ChannelStatus { @@ -166,7 +165,7 @@ impl TransportConnection { channel: ChannelNum, method: &Method, header: ContentHeader, - body: &TinyVec<[Bytes; 1]>, + body: &SingleVec, ) -> Result<()> { self.send_method(channel, method).await?; @@ -177,7 +176,7 @@ impl TransportConnection { self.send_bodies(channel, body).await } - async fn send_bodies(&mut self, channel: ChannelNum, body: &TinyVec<[Bytes; 1]>) -> Result<()> { + async fn send_bodies(&mut self, channel: ChannelNum, body: &SingleVec) -> Result<()> { // this is inefficient if it's a huge message sent by a client with big frames to one with // small frames // we assume that this won't happen that that the first branch will be taken in most cases, @@ -438,7 +437,7 @@ impl TransportConnection { let header = parse_content_header(&frame.payload)?; ensure_conn(header.class_id == class_id)?; - channel.status = ChannelStatus::NeedsBody(method, header, TinyVec::new()); + channel.status = ChannelStatus::NeedsBody(method, header, SingleVec::new()); Ok(()) } ChannelStatus::NeedsBody(_, _, _) => { @@ -485,7 +484,7 @@ impl TransportConnection { &mut self, method: Method, header: ContentHeader, - payloads: TinyVec<[Bytes; 1]>, + payloads: SingleVec, channel: ChannelNum, ) -> Result<()> { // The only method with content that is sent to the server is Basic.Publish.