route topics

This commit is contained in:
nora 2022-03-20 18:31:41 +01:00
parent 58de7f1e2d
commit 43d0ce05dc
10 changed files with 98 additions and 49 deletions

18
Cargo.lock generated
View file

@ -442,8 +442,8 @@ dependencies = [
"haesli_datastructure", "haesli_datastructure",
"parking_lot", "parking_lot",
"rand", "rand",
"smallvec",
"thiserror", "thiserror",
"tinyvec",
"tokio", "tokio",
"uuid", "uuid",
] ]
@ -494,7 +494,6 @@ dependencies = [
"rand", "rand",
"regex", "regex",
"thiserror", "thiserror",
"tinyvec",
"tokio", "tokio",
"tracing", "tracing",
] ]
@ -1291,21 +1290,6 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "tinyvec"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c1c1d5a42b6245520c249549ec267180beaffcc0615401ac8e31853d4b6d8d2"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.17.0" version = "1.17.0"

View file

@ -10,7 +10,7 @@ haesli_datastructure = { path = "../haesli_datastructure" }
bytes = "1.1.0" bytes = "1.1.0"
parking_lot = "0.12.0" parking_lot = "0.12.0"
rand = "0.8.5" rand = "0.8.5"
tinyvec = { version = "1.5.1", features = ["alloc", "rustc_1_55"] } smallvec = { version = "1.8.0", features = ["union"] }
thiserror = "1.0.30" thiserror = "1.0.30"
tokio = { version = "1.17.0", features = ["sync"] } tokio = { version = "1.17.0", features = ["sync"] }
uuid = "0.8.2" uuid = "0.8.2"

View file

@ -7,13 +7,12 @@ use std::{
use bytes::Bytes; use bytes::Bytes;
use parking_lot::Mutex; use parking_lot::Mutex;
use tinyvec::TinyVec;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::{ use crate::{
consumer::Consumer, consumer::Consumer,
methods::{self, Method}, methods::{self, Method},
newtype_id, GlobalData, Queue, newtype_id, GlobalData, Queue, SingleVec,
}; };
newtype_id!(pub ConnectionId); newtype_id!(pub ConnectionId);
@ -67,7 +66,7 @@ pub struct ConnectionInner {
pub enum ConnectionEvent { pub enum ConnectionEvent {
Shutdown, Shutdown,
Method(ChannelNum, Box<Method>), Method(ChannelNum, Box<Method>),
MethodContent(ChannelNum, Box<Method>, ContentHeader, TinyVec<[Bytes; 1]>), MethodContent(ChannelNum, Box<Method>, ContentHeader, SingleVec<Bytes>),
} }
pub type ConEventSender = mpsc::Sender<ConnectionEvent>; pub type ConEventSender = mpsc::Sender<ConnectionEvent>;

View file

@ -2,6 +2,13 @@ use std::{borrow::Borrow, collections::HashMap, sync::Arc};
use crate::{newtype, Queue}; use crate::{newtype, Queue};
#[derive(Debug)]
pub enum TopicSegment {
Word(String),
SingleWildcard,
MultiWildcard,
}
#[derive(Debug)] #[derive(Debug)]
pub enum ExchangeType { pub enum ExchangeType {
/// Routes a message to a queue if the routing-keys are equal /// Routes a message to a queue if the routing-keys are equal
@ -9,7 +16,7 @@ pub enum ExchangeType {
/// Always routes the message to a queue /// Always routes the message to a queue
Fanout { bindings: Vec<Queue> }, Fanout { bindings: Vec<Queue> },
/// Routes a message to a queue if the routing key matches the pattern /// Routes a message to a queue if the routing key matches the pattern
Topic { bindings: Vec<(String, Queue)> }, Topic { bindings: Vec<(Vec<TopicSegment>, Queue)> },
/// Is bound with a table of headers and values, and matches if the message headers /// Is bound with a table of headers and values, and matches if the message headers
/// match up with the binding headers /// match up with the binding headers
/// ///

View file

@ -25,6 +25,8 @@ use crate::{
queue::{Queue, QueueName}, queue::{Queue, QueueName},
}; };
pub type SingleVec<T> = smallvec::SmallVec<[T; 1]>;
#[derive(Clone)] #[derive(Clone)]
// todo: what if this was downstream? // todo: what if this was downstream?
pub struct GlobalData { pub struct GlobalData {

View file

@ -1,9 +1,8 @@
use std::sync::Arc; use std::sync::Arc;
use bytes::Bytes; use bytes::Bytes;
use tinyvec::TinyVec;
use crate::{connection::ContentHeader, newtype_id}; use crate::{connection::ContentHeader, newtype_id, SingleVec};
pub type Message = Arc<MessageInner>; pub type Message = Arc<MessageInner>;
@ -14,7 +13,7 @@ pub struct MessageInner {
pub id: MessageId, pub id: MessageId,
pub header: ContentHeader, pub header: ContentHeader,
pub routing: RoutingInformation, pub routing: RoutingInformation,
pub content: TinyVec<[Bytes; 1]>, pub content: SingleVec<Bytes>,
} }
#[derive(Debug)] #[derive(Debug)]

View file

@ -1,3 +1,5 @@
use std::sync::Arc;
use haesli_core::{ use haesli_core::{
amqp_todo, amqp_todo,
connection::Channel, connection::Channel,
@ -29,16 +31,17 @@ pub fn publish(channel_handle: Channel, message: Message) -> Result<()> {
.get(exchange.as_str()) .get(exchange.as_str())
.ok_or(ChannelException::NotFound)?; .ok_or(ChannelException::NotFound)?;
let queue = routing::route_message(exchange, &message.routing.routing_key) let queues = routing::route_message(exchange, &message.routing.routing_key)
.ok_or(ChannelException::NotFound)?; .ok_or(ChannelException::NotFound)?; // todo this isn't really correct but the tests pass ✔️
queue
.event_send
.try_send(QueueEvent::PublishMessage(message))
.map_err(|err| {
error!(?err, "Failed to send message to queue event queue");
ConException::InternalError
})?;
for queue in queues {
queue
.event_send
.try_send(QueueEvent::PublishMessage(Arc::clone(&message)))
.map_err(|err| {
error!(?err, "Failed to send message to queue event queue");
ConException::InternalError
})?;
}
Ok(()) Ok(())
} }

View file

@ -1,30 +1,87 @@
use std::sync::Arc;
use haesli_core::{ use haesli_core::{
exchange::{Exchange, ExchangeType}, exchange::{Exchange, ExchangeType, TopicSegment},
queue::Queue, queue::Queue,
}; };
fn parse_topic(topic: &str) -> Vec<TopicSegment> {
topic
.split(".")
.map(|segment| match segment {
"*" => TopicSegment::SingleWildcard,
"#" => TopicSegment::MultiWildcard,
word => TopicSegment::Word(word.to_owned()),
})
.collect()
}
pub fn bind(exchange: &mut Exchange, routing_key: String, queue: Queue) { pub fn bind(exchange: &mut Exchange, routing_key: String, queue: Queue) {
match &mut exchange.kind { match &mut exchange.kind {
ExchangeType::Direct { bindings } => { ExchangeType::Direct { bindings } => {
bindings.insert(routing_key, queue); bindings.insert(routing_key, queue);
} }
ExchangeType::Fanout { bindings } => bindings.push(queue), ExchangeType::Fanout { bindings } => bindings.push(queue),
ExchangeType::Topic { bindings } => bindings.push((routing_key, queue)), ExchangeType::Topic { bindings } => bindings.push((parse_topic(&routing_key), queue)),
ExchangeType::Headers => {} // unsupported ExchangeType::Headers => {} // unsupported
ExchangeType::System => {} // unsupported ExchangeType::System => {} // unsupported
} }
} }
/// Route a message to a queue. Returns the queue to send it to, or `None` if it can't be matched /// Route a message to a queue. Returns the queue to send it to, or `None` if it can't be matched
pub fn route_message(exchange: &Exchange, routing_key: &str) -> Option<Queue> { pub fn route_message(exchange: &Exchange, routing_key: &str) -> Option<Vec<Queue>> {
match &exchange.kind { match &exchange.kind {
ExchangeType::Direct { bindings } => { ExchangeType::Direct { bindings } => {
// 3.1.3.1 - routing-key = routing-key // 3.1.3.1 - routing-key = routing-key
bindings.get(routing_key).cloned() bindings.get(routing_key).cloned().map(|q| vec![q])
}
ExchangeType::Fanout { bindings } => {
// 3.1.3.2 - unconditionally
Some(bindings.clone()) // see, this is actually Not That Bad I Hope
}
ExchangeType::Topic { bindings } => {
let topic = parse_topic(routing_key);
// todo: optimizing this is a fun problem
Some(match_topic(bindings, topic))
} }
ExchangeType::Fanout { .. } => None,
ExchangeType::Topic { .. } => None,
ExchangeType::Headers => None, // unsupported ExchangeType::Headers => None, // unsupported
ExchangeType::System => None, // unsupported ExchangeType::System => None, // unsupported
} }
} }
fn match_topic<Q: Clone>(
patterns: &[(Vec<TopicSegment>, Q)],
routing_key: Vec<TopicSegment>,
) -> Vec<Q> {
patterns
.iter()
.filter_map(|(pattern, value)| {
let mut queue_segments = routing_key.iter();
for segment in pattern {}
Some(value.clone())
})
.collect()
}
#[cfg(test)]
mod tests {
use crate::routing::{match_topic, parse_topic};
macro_rules! match_topics {
(patterns: $($pattern:expr),*) => {};
}
#[test]
#[ignore]
fn match_empty_topic() {
let patterns = [(parse_topic(""), 1), (parse_topic("BAD"), 2)];
let routing_key = parse_topic("");
let matched = match_topic(&patterns, routing_key);
assert_eq!(matched, vec![1])
}
}

View file

@ -14,7 +14,6 @@ nom = "7.1.0"
once_cell = "1.9.0" once_cell = "1.9.0"
rand = "0.8.4" rand = "0.8.4"
regex = "1.5.4" regex = "1.5.4"
tinyvec = { version = "1.5.1", features = ["alloc", "rustc_1_55"] }
thiserror = "1.0.30" thiserror = "1.0.30"
tokio = { version = "1.16.1", features = ["full"] } tokio = { version = "1.16.1", features = ["full"] }
tracing = "0.1.30" tracing = "0.1.30"

View file

@ -15,9 +15,8 @@ use haesli_core::{
ConnectionCloseOk, ConnectionOpen, ConnectionOpenOk, ConnectionStart, ConnectionStartOk, ConnectionCloseOk, ConnectionOpen, ConnectionOpenOk, ConnectionStart, ConnectionStartOk,
ConnectionTune, ConnectionTuneOk, FieldValue, Longstr, Method, ReplyCode, ReplyText, Table, ConnectionTune, ConnectionTuneOk, FieldValue, Longstr, Method, ReplyCode, ReplyText, Table,
}, },
GlobalData, GlobalData, SingleVec,
}; };
use tinyvec::TinyVec;
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream, net::TcpStream,
@ -75,7 +74,7 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
enum ChannelStatus { enum ChannelStatus {
Default, Default,
NeedHeader(u16, Box<Method>), NeedHeader(u16, Box<Method>),
NeedsBody(Box<Method>, ContentHeader, TinyVec<[Bytes; 1]>), NeedsBody(Box<Method>, ContentHeader, SingleVec<Bytes>),
} }
impl ChannelStatus { impl ChannelStatus {
@ -166,7 +165,7 @@ impl TransportConnection {
channel: ChannelNum, channel: ChannelNum,
method: &Method, method: &Method,
header: ContentHeader, header: ContentHeader,
body: &TinyVec<[Bytes; 1]>, body: &SingleVec<Bytes>,
) -> Result<()> { ) -> Result<()> {
self.send_method(channel, method).await?; self.send_method(channel, method).await?;
@ -177,7 +176,7 @@ impl TransportConnection {
self.send_bodies(channel, body).await self.send_bodies(channel, body).await
} }
async fn send_bodies(&mut self, channel: ChannelNum, body: &TinyVec<[Bytes; 1]>) -> Result<()> { async fn send_bodies(&mut self, channel: ChannelNum, body: &SingleVec<Bytes>) -> Result<()> {
// this is inefficient if it's a huge message sent by a client with big frames to one with // this is inefficient if it's a huge message sent by a client with big frames to one with
// small frames // small frames
// we assume that this won't happen that that the first branch will be taken in most cases, // we assume that this won't happen that that the first branch will be taken in most cases,
@ -438,7 +437,7 @@ impl TransportConnection {
let header = parse_content_header(&frame.payload)?; let header = parse_content_header(&frame.payload)?;
ensure_conn(header.class_id == class_id)?; ensure_conn(header.class_id == class_id)?;
channel.status = ChannelStatus::NeedsBody(method, header, TinyVec::new()); channel.status = ChannelStatus::NeedsBody(method, header, SingleVec::new());
Ok(()) Ok(())
} }
ChannelStatus::NeedsBody(_, _, _) => { ChannelStatus::NeedsBody(_, _, _) => {
@ -485,7 +484,7 @@ impl TransportConnection {
&mut self, &mut self,
method: Method, method: Method,
header: ContentHeader, header: ContentHeader,
payloads: TinyVec<[Bytes; 1]>, payloads: SingleVec<Bytes>,
channel: ChannelNum, channel: ChannelNum,
) -> Result<()> { ) -> Result<()> {
// The only method with content that is sent to the server is Basic.Publish. // The only method with content that is sent to the server is Basic.Publish.