mirror of
https://github.com/Noratrieb/haesli.git
synced 2026-01-14 11:45:02 +01:00
inject dependency on haesli_messaging into haesli_transport to simplify dependency graph
This commit is contained in:
parent
92e3ac486b
commit
b4ecb6b5d4
8 changed files with 47 additions and 16 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -427,6 +427,7 @@ dependencies = [
|
||||||
"clap 3.1.6",
|
"clap 3.1.6",
|
||||||
"haesli_core",
|
"haesli_core",
|
||||||
"haesli_dashboard",
|
"haesli_dashboard",
|
||||||
|
"haesli_messaging",
|
||||||
"haesli_transport",
|
"haesli_transport",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ edition = "2021"
|
||||||
anyhow = "1.0.53"
|
anyhow = "1.0.53"
|
||||||
haesli_core = { path = "./haesli_core" }
|
haesli_core = { path = "./haesli_core" }
|
||||||
haesli_dashboard = { path = "./haesli_dashboard" }
|
haesli_dashboard = { path = "./haesli_dashboard" }
|
||||||
|
haesli_messaging = { path = "./haesli_messaging" }
|
||||||
haesli_transport = { path = "./haesli_transport" }
|
haesli_transport = { path = "./haesli_transport" }
|
||||||
clap = { version = "3.1.5", features = ["derive"] }
|
clap = { version = "3.1.5", features = ["derive"] }
|
||||||
tokio = { version = "1.16.1", features = ["full"] }
|
tokio = { version = "1.16.1", features = ["full"] }
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,9 @@ pub enum ExchangeType {
|
||||||
/// Always routes the message to a queue
|
/// Always routes the message to a queue
|
||||||
Fanout { bindings: Vec<Queue> },
|
Fanout { bindings: Vec<Queue> },
|
||||||
/// Routes a message to a queue if the routing key matches the pattern
|
/// Routes a message to a queue if the routing key matches the pattern
|
||||||
Topic { bindings: Vec<(Vec<TopicSegment>, Queue)> },
|
Topic {
|
||||||
|
bindings: Vec<(Vec<TopicSegment>, Queue)>,
|
||||||
|
},
|
||||||
/// Is bound with a table of headers and values, and matches if the message headers
|
/// Is bound with a table of headers and values, and matches if the message headers
|
||||||
/// match up with the binding headers
|
/// match up with the binding headers
|
||||||
///
|
///
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ use crate::Result;
|
||||||
|
|
||||||
/// This is the entrypoint of methods not handled by the connection itself.
|
/// This is the entrypoint of methods not handled by the connection itself.
|
||||||
/// Note that Basic.Publish is *not* sent here, but to [`handle_basic_publish`](crate::handle_basic_publish)
|
/// Note that Basic.Publish is *not* sent here, but to [`handle_basic_publish`](crate::handle_basic_publish)
|
||||||
pub async fn handle_method(channel_handle: Channel, method: Method) -> Result<Method> {
|
pub fn handle_method(channel_handle: Channel, method: Method) -> Result<Method> {
|
||||||
info!(?method, "Handling method");
|
info!(?method, "Handling method");
|
||||||
|
|
||||||
let response = match method {
|
let response = match method {
|
||||||
|
|
@ -20,7 +20,7 @@ pub async fn handle_method(channel_handle: Channel, method: Method) -> Result<Me
|
||||||
Method::ExchangeDeleteOk(_) => amqp_todo!(),
|
Method::ExchangeDeleteOk(_) => amqp_todo!(),
|
||||||
Method::QueueDeclare(queue_declare) => queue::declare(channel_handle, queue_declare)?,
|
Method::QueueDeclare(queue_declare) => queue::declare(channel_handle, queue_declare)?,
|
||||||
Method::QueueDeclareOk { .. } => amqp_todo!(),
|
Method::QueueDeclareOk { .. } => amqp_todo!(),
|
||||||
Method::QueueBind(queue_bind) => queue::bind(channel_handle, queue_bind).await?,
|
Method::QueueBind(queue_bind) => queue::bind(channel_handle, queue_bind)?,
|
||||||
Method::QueueBindOk(_) => amqp_todo!(),
|
Method::QueueBindOk(_) => amqp_todo!(),
|
||||||
Method::QueueUnbind { .. } => amqp_todo!(),
|
Method::QueueUnbind { .. } => amqp_todo!(),
|
||||||
Method::QueueUnbindOk(_) => amqp_todo!(),
|
Method::QueueUnbindOk(_) => amqp_todo!(),
|
||||||
|
|
|
||||||
|
|
@ -88,7 +88,7 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result<Method>
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn bind(_channel_handle: Channel, _queue_bind: QueueBind) -> Result<Method> {
|
pub fn bind(_channel_handle: Channel, _queue_bind: QueueBind) -> Result<Method> {
|
||||||
amqp_todo!();
|
amqp_todo!();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ use tracing::{debug, error, info, trace, warn};
|
||||||
use crate::{
|
use crate::{
|
||||||
error::{ConException, ProtocolError, Result, TransError},
|
error::{ConException, ProtocolError, Result, TransError},
|
||||||
frame::{self, parse_content_header, Frame, FrameType, MaxFrameSize},
|
frame::{self, parse_content_header, Frame, FrameType, MaxFrameSize},
|
||||||
methods, sasl,
|
methods, sasl, Handlers,
|
||||||
};
|
};
|
||||||
|
|
||||||
fn ensure_conn(condition: bool) -> Result<()> {
|
fn ensure_conn(condition: bool) -> Result<()> {
|
||||||
|
|
@ -67,6 +67,8 @@ pub struct TransportConnection {
|
||||||
event_sender: ConEventSender,
|
event_sender: ConEventSender,
|
||||||
/// To receive events from other futures
|
/// To receive events from other futures
|
||||||
event_receiver: ConEventReceiver,
|
event_receiver: ConEventReceiver,
|
||||||
|
|
||||||
|
handlers: Handlers,
|
||||||
}
|
}
|
||||||
|
|
||||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
|
|
@ -91,6 +93,7 @@ impl TransportConnection {
|
||||||
global_data: GlobalData,
|
global_data: GlobalData,
|
||||||
method_queue_send: ConEventSender,
|
method_queue_send: ConEventSender,
|
||||||
method_queue_recv: ConEventReceiver,
|
method_queue_recv: ConEventReceiver,
|
||||||
|
handlers: Handlers,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
id,
|
id,
|
||||||
|
|
@ -104,6 +107,7 @@ impl TransportConnection {
|
||||||
global_data,
|
global_data,
|
||||||
event_sender: method_queue_send,
|
event_sender: method_queue_send,
|
||||||
event_receiver: method_queue_recv,
|
event_receiver: method_queue_recv,
|
||||||
|
handlers,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -416,8 +420,10 @@ impl TransportConnection {
|
||||||
// call into haesli_messaging to handle the method
|
// call into haesli_messaging to handle the method
|
||||||
// it returns the response method that we are supposed to send
|
// it returns the response method that we are supposed to send
|
||||||
// maybe this might become an `Option` in the future
|
// maybe this might become an `Option` in the future
|
||||||
let return_method =
|
let return_method = (self.handlers.handle_method)(channel_handle, method)?;
|
||||||
haesli_messaging::methods::handle_method(channel_handle, method).await?;
|
|
||||||
|
//let return_method =
|
||||||
|
// haesli_messaging::methods::handle_method(channel_handle, method).await?;
|
||||||
self.send_method(frame.channel, &return_method).await?;
|
self.send_method(frame.channel, &return_method).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -513,7 +519,8 @@ impl TransportConnection {
|
||||||
|
|
||||||
let channel = self.channels.get(&channel).ok_or(ConException::Todo)?;
|
let channel = self.channels.get(&channel).ok_or(ConException::Todo)?;
|
||||||
|
|
||||||
haesli_messaging::methods::publish(channel.global_chan.clone(), message)?;
|
(self.handlers.handle_basic_publish)(channel.global_chan.clone(), message)?;
|
||||||
|
//haesli_messaging::methods::publish(channel.global_chan.clone(), message)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
Err(ConException::Todo.into())
|
Err(ConException::Todo.into())
|
||||||
|
|
|
||||||
|
|
@ -13,18 +13,32 @@ mod tests;
|
||||||
use std::{future::Future, net::SocketAddr};
|
use std::{future::Future, net::SocketAddr};
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use haesli_core::{connection::ConnectionEvent, queue::QueueEvent, GlobalData};
|
use haesli_core::{
|
||||||
|
connection::{Channel, ConnectionEvent},
|
||||||
|
error::ProtocolError,
|
||||||
|
message::Message,
|
||||||
|
methods::Method,
|
||||||
|
queue::QueueEvent,
|
||||||
|
GlobalData,
|
||||||
|
};
|
||||||
use tokio::{net, net::TcpStream, select};
|
use tokio::{net, net::TcpStream, select};
|
||||||
use tracing::{info, info_span, Instrument};
|
use tracing::{info, info_span, Instrument};
|
||||||
|
|
||||||
use crate::connection::TransportConnection;
|
use crate::connection::TransportConnection;
|
||||||
|
|
||||||
pub async fn do_thing_i_guess(
|
#[derive(Clone, Copy)]
|
||||||
|
pub struct Handlers {
|
||||||
|
pub handle_method: fn(Channel, Method) -> Result<Method, ProtocolError>,
|
||||||
|
pub handle_basic_publish: fn(Channel, Message) -> Result<(), ProtocolError>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn connection_loop(
|
||||||
global_data: GlobalData,
|
global_data: GlobalData,
|
||||||
terminate: impl Future + Send,
|
terminate: impl Future + Send,
|
||||||
|
handlers: Handlers,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
select! {
|
select! {
|
||||||
res = accept_cons(global_data.clone()) => {
|
res = accept_cons(global_data.clone(), handlers) => {
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
_ = terminate => {
|
_ = terminate => {
|
||||||
|
|
@ -33,18 +47,18 @@ pub async fn do_thing_i_guess(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn accept_cons(global_data: GlobalData) -> anyhow::Result<()> {
|
async fn accept_cons(global_data: GlobalData, handlers: Handlers) -> anyhow::Result<()> {
|
||||||
info!("Binding TCP listener...");
|
info!("Binding TCP listener...");
|
||||||
let listener = net::TcpListener::bind(("127.0.0.1", 5672)).await?;
|
let listener = net::TcpListener::bind(("127.0.0.1", 5672)).await?;
|
||||||
info!(addr = ?listener.local_addr()?, "Successfully bound TCP listener");
|
info!(addr = ?listener.local_addr()?, "Successfully bound TCP listener");
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let connection = listener.accept().await?;
|
let connection = listener.accept().await?;
|
||||||
handle_con(global_data.clone(), connection);
|
handle_con(global_data.clone(), connection, handlers);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_con(global_data: GlobalData, connection: (TcpStream, SocketAddr)) {
|
fn handle_con(global_data: GlobalData, connection: (TcpStream, SocketAddr), handlers: Handlers) {
|
||||||
let (stream, peer_addr) = connection;
|
let (stream, peer_addr) = connection;
|
||||||
let id = rand::random();
|
let id = rand::random();
|
||||||
|
|
||||||
|
|
@ -72,6 +86,7 @@ fn handle_con(global_data: GlobalData, connection: (TcpStream, SocketAddr)) {
|
||||||
global_data.clone(),
|
global_data.clone(),
|
||||||
method_send,
|
method_send,
|
||||||
method_recv,
|
method_recv,
|
||||||
|
handlers,
|
||||||
);
|
);
|
||||||
|
|
||||||
tokio::spawn(connection.start_connection_processing().instrument(span));
|
tokio::spawn(connection.start_connection_processing().instrument(span));
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,12 @@ async fn main() -> Result<()> {
|
||||||
tokio::spawn(async move { haesli_dashboard::start_dashboard(global_data).await });
|
tokio::spawn(async move { haesli_dashboard::start_dashboard(global_data).await });
|
||||||
}
|
}
|
||||||
|
|
||||||
let res = haesli_transport::do_thing_i_guess(global_data, terminate()).await;
|
let handlers = haesli_transport::Handlers {
|
||||||
|
handle_method: haesli_messaging::methods::handle_method,
|
||||||
|
handle_basic_publish: haesli_messaging::methods::publish,
|
||||||
|
};
|
||||||
|
|
||||||
|
let res = haesli_transport::connection_loop(global_data, terminate(), handlers).await;
|
||||||
|
|
||||||
info!("Bye!");
|
info!("Bye!");
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue