diff --git a/Cargo.lock b/Cargo.lock index cc665b2..b5689d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -427,6 +427,7 @@ dependencies = [ "clap 3.1.6", "haesli_core", "haesli_dashboard", + "haesli_messaging", "haesli_transport", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 99b5f86..0ebb9a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ edition = "2021" anyhow = "1.0.53" haesli_core = { path = "./haesli_core" } haesli_dashboard = { path = "./haesli_dashboard" } +haesli_messaging = { path = "./haesli_messaging" } haesli_transport = { path = "./haesli_transport" } clap = { version = "3.1.5", features = ["derive"] } tokio = { version = "1.16.1", features = ["full"] } diff --git a/haesli_core/src/exchange.rs b/haesli_core/src/exchange.rs index 7e2f361..45df71d 100644 --- a/haesli_core/src/exchange.rs +++ b/haesli_core/src/exchange.rs @@ -7,7 +7,7 @@ pub enum TopicSegment { Word(String), SingleWildcard, MultiWildcard, -} +} #[derive(Debug)] pub enum ExchangeType { @@ -16,7 +16,9 @@ pub enum ExchangeType { /// Always routes the message to a queue Fanout { bindings: Vec }, /// Routes a message to a queue if the routing key matches the pattern - Topic { bindings: Vec<(Vec, Queue)> }, + Topic { + bindings: Vec<(Vec, Queue)>, + }, /// Is bound with a table of headers and values, and matches if the message headers /// match up with the binding headers /// diff --git a/haesli_messaging/src/methods/mod.rs b/haesli_messaging/src/methods/mod.rs index 91dc18f..6abca3f 100644 --- a/haesli_messaging/src/methods/mod.rs +++ b/haesli_messaging/src/methods/mod.rs @@ -10,7 +10,7 @@ use crate::Result; /// This is the entrypoint of methods not handled by the connection itself. /// Note that Basic.Publish is *not* sent here, but to [`handle_basic_publish`](crate::handle_basic_publish) -pub async fn handle_method(channel_handle: Channel, method: Method) -> Result { +pub fn handle_method(channel_handle: Channel, method: Method) -> Result { info!(?method, "Handling method"); let response = match method { @@ -20,7 +20,7 @@ pub async fn handle_method(channel_handle: Channel, method: Method) -> Result amqp_todo!(), Method::QueueDeclare(queue_declare) => queue::declare(channel_handle, queue_declare)?, Method::QueueDeclareOk { .. } => amqp_todo!(), - Method::QueueBind(queue_bind) => queue::bind(channel_handle, queue_bind).await?, + Method::QueueBind(queue_bind) => queue::bind(channel_handle, queue_bind)?, Method::QueueBindOk(_) => amqp_todo!(), Method::QueueUnbind { .. } => amqp_todo!(), Method::QueueUnbindOk(_) => amqp_todo!(), diff --git a/haesli_messaging/src/methods/queue.rs b/haesli_messaging/src/methods/queue.rs index bfdbcb5..6f10a3e 100644 --- a/haesli_messaging/src/methods/queue.rs +++ b/haesli_messaging/src/methods/queue.rs @@ -88,7 +88,7 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result })) } -pub async fn bind(_channel_handle: Channel, _queue_bind: QueueBind) -> Result { +pub fn bind(_channel_handle: Channel, _queue_bind: QueueBind) -> Result { amqp_todo!(); } diff --git a/haesli_transport/src/connection.rs b/haesli_transport/src/connection.rs index ac5d20a..084183c 100644 --- a/haesli_transport/src/connection.rs +++ b/haesli_transport/src/connection.rs @@ -27,7 +27,7 @@ use tracing::{debug, error, info, trace, warn}; use crate::{ error::{ConException, ProtocolError, Result, TransError}, frame::{self, parse_content_header, Frame, FrameType, MaxFrameSize}, - methods, sasl, + methods, sasl, Handlers, }; fn ensure_conn(condition: bool) -> Result<()> { @@ -67,6 +67,8 @@ pub struct TransportConnection { event_sender: ConEventSender, /// To receive events from other futures event_receiver: ConEventReceiver, + + handlers: Handlers, } const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); @@ -91,6 +93,7 @@ impl TransportConnection { global_data: GlobalData, method_queue_send: ConEventSender, method_queue_recv: ConEventReceiver, + handlers: Handlers, ) -> Self { Self { id, @@ -104,6 +107,7 @@ impl TransportConnection { global_data, event_sender: method_queue_send, event_receiver: method_queue_recv, + handlers, } } @@ -416,8 +420,10 @@ impl TransportConnection { // call into haesli_messaging to handle the method // it returns the response method that we are supposed to send // maybe this might become an `Option` in the future - let return_method = - haesli_messaging::methods::handle_method(channel_handle, method).await?; + let return_method = (self.handlers.handle_method)(channel_handle, method)?; + + //let return_method = + // haesli_messaging::methods::handle_method(channel_handle, method).await?; self.send_method(frame.channel, &return_method).await?; } } @@ -513,7 +519,8 @@ impl TransportConnection { let channel = self.channels.get(&channel).ok_or(ConException::Todo)?; - haesli_messaging::methods::publish(channel.global_chan.clone(), message)?; + (self.handlers.handle_basic_publish)(channel.global_chan.clone(), message)?; + //haesli_messaging::methods::publish(channel.global_chan.clone(), message)?; Ok(()) } else { Err(ConException::Todo.into()) diff --git a/haesli_transport/src/lib.rs b/haesli_transport/src/lib.rs index 2dae1ec..830b82b 100644 --- a/haesli_transport/src/lib.rs +++ b/haesli_transport/src/lib.rs @@ -13,18 +13,32 @@ mod tests; use std::{future::Future, net::SocketAddr}; use anyhow::Context; -use haesli_core::{connection::ConnectionEvent, queue::QueueEvent, GlobalData}; +use haesli_core::{ + connection::{Channel, ConnectionEvent}, + error::ProtocolError, + message::Message, + methods::Method, + queue::QueueEvent, + GlobalData, +}; use tokio::{net, net::TcpStream, select}; use tracing::{info, info_span, Instrument}; use crate::connection::TransportConnection; -pub async fn do_thing_i_guess( +#[derive(Clone, Copy)] +pub struct Handlers { + pub handle_method: fn(Channel, Method) -> Result, + pub handle_basic_publish: fn(Channel, Message) -> Result<(), ProtocolError>, +} + +pub async fn connection_loop( global_data: GlobalData, terminate: impl Future + Send, + handlers: Handlers, ) -> anyhow::Result<()> { select! { - res = accept_cons(global_data.clone()) => { + res = accept_cons(global_data.clone(), handlers) => { res } _ = terminate => { @@ -33,18 +47,18 @@ pub async fn do_thing_i_guess( } } -async fn accept_cons(global_data: GlobalData) -> anyhow::Result<()> { +async fn accept_cons(global_data: GlobalData, handlers: Handlers) -> anyhow::Result<()> { info!("Binding TCP listener..."); let listener = net::TcpListener::bind(("127.0.0.1", 5672)).await?; info!(addr = ?listener.local_addr()?, "Successfully bound TCP listener"); loop { let connection = listener.accept().await?; - handle_con(global_data.clone(), connection); + handle_con(global_data.clone(), connection, handlers); } } -fn handle_con(global_data: GlobalData, connection: (TcpStream, SocketAddr)) { +fn handle_con(global_data: GlobalData, connection: (TcpStream, SocketAddr), handlers: Handlers) { let (stream, peer_addr) = connection; let id = rand::random(); @@ -72,6 +86,7 @@ fn handle_con(global_data: GlobalData, connection: (TcpStream, SocketAddr)) { global_data.clone(), method_send, method_recv, + handlers, ); tokio::spawn(connection.start_connection_processing().instrument(span)); diff --git a/src/main.rs b/src/main.rs index 3da9bbc..f903c85 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,7 +32,12 @@ async fn main() -> Result<()> { tokio::spawn(async move { haesli_dashboard::start_dashboard(global_data).await }); } - let res = haesli_transport::do_thing_i_guess(global_data, terminate()).await; + let handlers = haesli_transport::Handlers { + handle_method: haesli_messaging::methods::handle_method, + handle_basic_publish: haesli_messaging::methods::publish, + }; + + let res = haesli_transport::connection_loop(global_data, terminate(), handlers).await; info!("Bye!");