implement binding

This commit is contained in:
nora 2022-03-26 13:02:39 +01:00
parent 6f6c0848ac
commit a1fa68396f
9 changed files with 181 additions and 77 deletions

View file

@ -46,11 +46,9 @@ impl Borrow<str> for ExchangeName {
pub struct Exchange { pub struct Exchange {
pub name: ExchangeName, pub name: ExchangeName,
pub kind: ExchangeType, pub kind: ExchangeType,
pub durable: bool,
} }
#[derive(Debug)]
pub struct Binding {}
pub fn default_exchanges() -> HashMap<ExchangeName, Exchange> { pub fn default_exchanges() -> HashMap<ExchangeName, Exchange> {
// 3.1.3 - The spec requires a few default exchanges to exist // 3.1.3 - The spec requires a few default exchanges to exist
@ -60,6 +58,7 @@ pub fn default_exchanges() -> HashMap<ExchangeName, Exchange> {
kind: ExchangeType::Direct { kind: ExchangeType::Direct {
bindings: HashMap::new(), bindings: HashMap::new(),
}, },
durable: true,
}; };
let direct_name = ExchangeName::new("amqp.direct".to_owned().into()); let direct_name = ExchangeName::new("amqp.direct".to_owned().into());
@ -68,6 +67,7 @@ pub fn default_exchanges() -> HashMap<ExchangeName, Exchange> {
kind: ExchangeType::Direct { kind: ExchangeType::Direct {
bindings: HashMap::new(), bindings: HashMap::new(),
}, },
durable: true,
}; };
let fanout_name = ExchangeName::new("amqp.fanout".to_owned().into()); let fanout_name = ExchangeName::new("amqp.fanout".to_owned().into());
@ -76,6 +76,7 @@ pub fn default_exchanges() -> HashMap<ExchangeName, Exchange> {
kind: ExchangeType::Fanout { kind: ExchangeType::Fanout {
bindings: Vec::new(), bindings: Vec::new(),
}, },
durable: true,
}; };
let topic_name = ExchangeName::new("amqp.topic".to_owned().into()); let topic_name = ExchangeName::new("amqp.topic".to_owned().into());
@ -84,6 +85,7 @@ pub fn default_exchanges() -> HashMap<ExchangeName, Exchange> {
kind: ExchangeType::Topic { kind: ExchangeType::Topic {
bindings: Vec::new(), bindings: Vec::new(),
}, },
durable: true,
}; };
// we don't implement headers (yet), so don't provide the default exchange for it // we don't implement headers (yet), so don't provide the default exchange for it

View file

@ -1,4 +1,4 @@
use std::sync::Arc; use std::{ops::Not, sync::Arc};
use haesli_core::{ use haesli_core::{
amqp_todo, amqp_todo,
@ -9,9 +9,9 @@ use haesli_core::{
}; };
use tracing::info; use tracing::info;
use crate::Result; use crate::methods::MethodResponse;
pub fn consume(channel: Channel, basic_consume: BasicConsume) -> Result<Method> { pub fn consume(channel: Channel, basic_consume: BasicConsume) -> MethodResponse {
let BasicConsume { let BasicConsume {
queue: queue_name, queue: queue_name,
consumer_tag, consumer_tag,
@ -22,7 +22,7 @@ pub fn consume(channel: Channel, basic_consume: BasicConsume) -> Result<Method>
.. ..
} = basic_consume; } = basic_consume;
if no_wait || no_local || exclusive || no_ack { if no_local || exclusive || no_ack {
amqp_todo!(); amqp_todo!();
} }
@ -54,7 +54,7 @@ pub fn consume(channel: Channel, basic_consume: BasicConsume) -> Result<Method>
info!(%queue_name, %consumer_tag, "Consumer started consuming"); info!(%queue_name, %consumer_tag, "Consumer started consuming");
let method = Method::BasicConsumeOk(BasicConsumeOk { consumer_tag }); Ok(no_wait
.not()
Ok(method) .then(|| Method::BasicConsumeOk(BasicConsumeOk { consumer_tag })))
} }

View file

@ -1,11 +1,70 @@
use std::{collections::HashMap, ops::Not};
use haesli_core::{ use haesli_core::{
amqp_todo, amqp_todo,
connection::Channel, connection::Channel,
methods::{ExchangeDeclare, Method}, error::ConException,
exchange::{Exchange, ExchangeName, ExchangeType},
methods::{ExchangeDeclare, ExchangeDeclareOk, Method},
}; };
use tracing::info;
use crate::Result; use crate::methods::MethodResponse;
pub fn declare(_channel: Channel, _exchange_declare: ExchangeDeclare) -> Result<Method> { fn parse_exchange_type(str: &str) -> Option<ExchangeType> {
amqp_todo!() match str {
"direct" => Some(ExchangeType::Direct {
bindings: HashMap::new(),
}),
"fanout" => Some(ExchangeType::Fanout {
bindings: Vec::new(),
}),
"topic" => Some(ExchangeType::Topic {
bindings: Vec::new(),
}),
_ => None,
}
}
pub fn declare(channel: Channel, exchange_declare: ExchangeDeclare) -> MethodResponse {
let ExchangeDeclare {
exchange: name,
r#type: kind,
passive,
durable,
no_wait,
arguments,
..
} = exchange_declare;
if !arguments.is_empty() {
amqp_todo!();
}
// todo: implement durable
if passive {
amqp_todo!();
}
let name = ExchangeName::new(name.into());
let kind = parse_exchange_type(&kind).ok_or(ConException::CommandInvalid)?;
info!(%name, "Creating exchange");
let exchange = Exchange {
name: name.clone(),
durable,
kind,
};
{
let mut global_data = channel.global_data.lock();
global_data.exchanges.entry(name).or_insert(exchange);
}
Ok(no_wait
.not()
.then(|| Method::ExchangeDeclareOk(ExchangeDeclareOk)))
} }

View file

@ -9,9 +9,11 @@ use tracing::{info, warn};
use crate::Result; use crate::Result;
type MethodResponse = Result<Option<Method>>;
/// This is the entrypoint of methods not handled by the connection itself. /// This is the entrypoint of methods not handled by the connection itself.
/// Note that Basic.Publish is *not* sent here, but to [`handle_basic_publish`](crate::handle_basic_publish) /// Note that Basic.Publish is *not* sent here, but to [`handle_basic_publish`](crate::handle_basic_publish)
pub fn handle_method(channel: Channel, method: Method) -> Result<Method> { pub fn handle_method(channel: Channel, method: Method) -> Result<Option<Method>> {
use Method::*; use Method::*;
info!(?method, "Handling method"); info!(?method, "Handling method");

View file

@ -1,7 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use haesli_core::{ use haesli_core::{
amqp_todo,
connection::Channel, connection::Channel,
error::{ChannelException, ConException}, error::{ChannelException, ConException},
message::Message, message::Message,
@ -18,21 +17,17 @@ pub fn publish(channel_handle: Channel, message: Message) -> Result<()> {
let routing = &message.routing; let routing = &message.routing;
if !routing.exchange.is_empty() {
amqp_todo!();
}
let global_data = global_data.lock(); let global_data = global_data.lock();
let exchange = &message.routing.exchange; let exchange = &routing.exchange;
let exchange = global_data let exchange = global_data
.exchanges .exchanges
.get(exchange.as_str()) .get(exchange.as_str())
.ok_or(ChannelException::NotFound)?; .ok_or(ChannelException::NotFound)?;
let queues = routing::route_message(exchange, &message.routing.routing_key) let queues =
.ok_or(ChannelException::NotFound)?; // todo this isn't really correct but the tests pass ✔️ routing::route_message(exchange, &routing.routing_key).ok_or(ChannelException::NotFound)?; // todo this isn't really correct but the tests pass ✔️
for queue in queues { for queue in queues {
queue queue

View file

@ -1,20 +1,23 @@
use std::sync::{atomic::AtomicUsize, Arc}; use std::{
ops::Not,
sync::{atomic::AtomicUsize, Arc},
};
use haesli_core::{ use haesli_core::{
amqp_todo, amqp_todo,
connection::Channel, connection::Channel,
error::ChannelException, error::ChannelException,
methods::{Method, QueueBind, QueueDeclare, QueueDeclareOk}, methods::{Method, QueueBind, QueueBindOk, QueueDeclare, QueueDeclareOk},
queue::{QueueDeletion, QueueId, QueueInner, QueueName}, queue::{Queue, QueueDeletion, QueueId, QueueInner, QueueName},
GlobalData, GlobalData,
}; };
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing::debug; use tracing::{debug, info};
use crate::{queue_worker::QueueTask, routing, Result}; use crate::{methods::MethodResponse, queue_worker::QueueTask, routing, Result};
pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result<Method> { pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> MethodResponse {
let QueueDeclare { let QueueDeclare {
queue: queue_name, queue: queue_name,
passive, passive,
@ -41,12 +44,23 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result<Method>
// todo: implement durable, not checked here because it's the amqplib default // todo: implement durable, not checked here because it's the amqplib default
if passive || no_wait { if passive {
amqp_todo!(); amqp_todo!();
} }
let global_data = channel.global_data.clone(); let global_data = channel.global_data.clone();
let queue = {
let global_data_lock = global_data.lock();
global_data_lock.queues.get(&queue_name).cloned()
};
let queue = if let Some(queue) = queue {
debug!(%queue_name, "Declaring queue that already exists");
queue
} else {
info!(%queue_name, "Creating queue");
let (event_send, event_recv) = mpsc::channel(10); let (event_send, event_recv) = mpsc::channel(10);
let id = QueueId::random(); let id = QueueId::random();
@ -65,44 +79,77 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result<Method>
event_send, event_send,
}); });
debug!(%queue_name, "Creating queue"); bind_queue(
global_data.clone(),
queue.clone(),
"",
queue_name.to_string(),
)?;
{ {
let mut global_data_lock = global_data.lock(); let mut global_data_lock = global_data.lock();
global_data_lock global_data_lock
.queues .queues
.entry(queue_name.clone()) .insert(queue_name.clone(), queue.clone());
.or_insert_with(|| queue.clone());
} }
bind_queue(global_data.clone(), "", queue_name.to_string())?; let queue_task = QueueTask::new(global_data, event_recv, queue.clone());
let queue_task = QueueTask::new(global_data, event_recv, queue);
tokio::spawn(async move { queue_task.start().await }); tokio::spawn(async move { queue_task.start().await });
Ok(Method::QueueDeclareOk(QueueDeclareOk { queue
};
Ok(no_wait.not().then(|| {
Method::QueueDeclareOk(QueueDeclareOk {
queue: queue_name.to_string(), queue: queue_name.to_string(),
message_count: 0, message_count: u32::try_from(queue.messages.len()).unwrap(),
consumer_count: 0, consumer_count: u32::try_from(queue.consumers.lock().len()).unwrap(),
})
})) }))
} }
pub fn bind(_channel_handle: Channel, _queue_bind: QueueBind) -> Result<Method> { pub fn bind(channel_handle: Channel, queue_bind: QueueBind) -> MethodResponse {
let QueueBind {
queue,
exchange,
routing_key,
no_wait,
arguments,
..
} = queue_bind;
if !arguments.is_empty() {
amqp_todo!(); amqp_todo!();
}
let queue = {
let global_data = channel_handle.global_data.lock();
global_data
.queues
.get(queue.as_str())
.ok_or(ChannelException::NotFound)?
.clone()
};
bind_queue(
channel_handle.global_data.clone(),
queue,
&exchange,
routing_key,
)?;
Ok(no_wait.not().then(|| Method::QueueBindOk(QueueBindOk)))
} }
fn bind_queue(global_data: GlobalData, exchange: &str, routing_key: String) -> Result<()> { fn bind_queue(
global_data: GlobalData,
queue: Queue,
exchange: &str,
routing_key: String,
) -> Result<()> {
let mut global_data = global_data.lock(); let mut global_data = global_data.lock();
// todo: don't
let queue = global_data
.queues
.get(&QueueName::new(routing_key.clone().into()))
.unwrap()
.clone();
let exchange = global_data let exchange = global_data
.exchanges .exchanges
.get_mut(exchange) .get_mut(exchange)

View file

@ -26,7 +26,7 @@ pub fn bind(exchange: &mut Exchange, routing_key: String, queue: Queue) {
} }
} }
/// Route a message to a queue. Returns the queue to send it to, or `None` if it can't be matched /// Route a message to a queue. Returns the queues to send it to, or `None` if it can't be matched
pub fn route_message(exchange: &Exchange, routing_key: &str) -> Option<Vec<Queue>> { pub fn route_message(exchange: &Exchange, routing_key: &str) -> Option<Vec<Queue>> {
match &exchange.kind { match &exchange.kind {
ExchangeType::Direct { bindings } => { ExchangeType::Direct { bindings } => {

View file

@ -419,12 +419,11 @@ impl TransportConnection {
// call into haesli_messaging to handle the method // call into haesli_messaging to handle the method
// it returns the response method that we are supposed to send // it returns the response method that we are supposed to send
// maybe this might become an `Option` in the future
let return_method = (self.handlers.handle_method)(channel_handle, method)?; let return_method = (self.handlers.handle_method)(channel_handle, method)?;
//let return_method = if let Some(method) = return_method {
// haesli_messaging::methods::handle_method(channel_handle, method).await?; self.send_method(frame.channel, &method).await?;
self.send_method(frame.channel, &return_method).await?; }
} }
} }
Ok(()) Ok(())

View file

@ -28,7 +28,7 @@ use crate::connection::TransportConnection;
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
pub struct Handlers { pub struct Handlers {
pub handle_method: fn(Channel, Method) -> Result<Method, ProtocolError>, pub handle_method: fn(Channel, Method) -> Result<Option<Method>, ProtocolError>,
pub handle_basic_publish: fn(Channel, Message) -> Result<(), ProtocolError>, pub handle_basic_publish: fn(Channel, Message) -> Result<(), ProtocolError>,
} }