add exchanges to internal model

This commit is contained in:
nora 2022-03-20 17:52:41 +01:00
parent 504757b324
commit 58de7f1e2d
15 changed files with 169 additions and 40 deletions

View file

@ -2,15 +2,14 @@ mod consume;
mod publish;
mod queue;
use haesli_core::{amqp_todo, connection::Channel, message::Message, methods::Method};
use haesli_core::{amqp_todo, connection::Channel, methods::Method};
pub use publish::publish;
use tracing::info;
use crate::Result;
pub fn handle_basic_publish(channel_handle: Channel, message: Message) -> Result<()> {
publish::publish(channel_handle, message)
}
/// This is the entrypoint of methods not handled by the connection itself.
/// Note that Basic.Publish is *not* sent here, but to [`handle_basic_publish`](crate::handle_basic_publish)
pub async fn handle_method(channel_handle: Channel, method: Method) -> Result<Method> {
info!(?method, "Handling method");

View file

@ -7,7 +7,7 @@ use haesli_core::{
};
use tracing::{debug, error};
use crate::Result;
use crate::{routing, Result};
pub fn publish(channel_handle: Channel, message: Message) -> Result<()> {
debug!(?message, "Publishing message");
@ -22,9 +22,14 @@ pub fn publish(channel_handle: Channel, message: Message) -> Result<()> {
let global_data = global_data.lock();
let queue = global_data
.queues
.get(routing.routing_key.as_str())
let exchange = &message.routing.exchange;
let exchange = global_data
.exchanges
.get(exchange.as_str())
.ok_or(ChannelException::NotFound)?;
let queue = routing::route_message(exchange, &message.routing.routing_key)
.ok_or(ChannelException::NotFound)?;
queue

View file

@ -11,7 +11,7 @@ use parking_lot::Mutex;
use tokio::sync::mpsc;
use tracing::debug;
use crate::{queue_worker::QueueTask, Result};
use crate::{queue_worker::QueueTask, routing, Result};
pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result<Method> {
let QueueDeclare {
@ -75,7 +75,7 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result<Method>
.or_insert_with(|| queue.clone());
}
bind_queue(global_data.clone(), (), queue_name.clone().into_inner())?;
bind_queue(global_data.clone(), (), queue_name.to_string())?;
let queue_task = QueueTask::new(global_data, event_recv, queue);
@ -92,18 +92,22 @@ pub async fn bind(_channel_handle: Channel, _queue_bind: QueueBind) -> Result<Me
amqp_todo!();
}
fn bind_queue(global_data: GlobalData, _exchange: (), routing_key: Arc<str>) -> Result<()> {
fn bind_queue(global_data: GlobalData, _exchange: (), routing_key: String) -> Result<()> {
let mut global_data = global_data.lock();
// todo: don't
let queue = global_data
.queues
.get(&QueueName::new(routing_key.clone()))
.get(&QueueName::new(routing_key.clone().into()))
.unwrap()
.clone();
global_data
.default_exchange
.insert(routing_key.to_string(), queue);
let exchange = global_data
.exchanges
.get_mut("")
.expect("default empty exchange");
routing::bind(exchange, routing_key, queue);
Ok(())
}