improve everything

This commit is contained in:
nora 2022-03-19 19:01:31 +01:00
parent 543e39f129
commit dbc577abbc
10 changed files with 117 additions and 71 deletions

View file

@ -0,0 +1,13 @@
pub enum ExchangeType {
/// Routes a message to a queue if the routing-keys are equal
Direct,
/// Always routes the message to a queue
Fanout,
/// Routes a message to a queue if the routing key matches the pattern
Topic,
/// Is bound with a table of headers and values, and matches if the message headers
/// match up with the binding headers
Headers,
/// The message is sent to the server system service with the name of the routing-key
System,
}

View file

@ -3,6 +3,7 @@
pub mod connection;
pub mod consumer;
pub mod error;
pub mod exchange;
mod macros;
pub mod message;
pub mod methods;

View file

@ -3,24 +3,24 @@ macro_rules! newtype_id {
($(#[$meta:meta])* $vis:vis $name:ident) => {
$(#[$meta])*
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
$vis struct $name(::uuid::Uuid);
$vis struct $name(uuid::Uuid);
impl $name {
#[must_use]
pub fn random() -> Self {
::rand::random()
rand::random()
}
}
impl ::std::fmt::Display for $name {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
::std::fmt::Display::fmt(&self.0, f)
impl std::fmt::Display for $name {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}
impl ::rand::prelude::Distribution<$name> for ::rand::distributions::Standard {
fn sample<R: ::rand::Rng + ?Sized>(&self, rng: &mut R) -> $name {
$name(::uuid::Uuid::from_bytes(rng.gen()))
impl rand::prelude::Distribution<$name> for rand::distributions::Standard {
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> $name {
$name(uuid::Uuid::from_bytes(rng.gen()))
}
}
};
@ -58,11 +58,20 @@ macro_rules! newtype {
Self(other.into())
}
}
impl std::fmt::Display for $name
where
$ty: std::fmt::Display,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}
};
}
#[macro_export]
macro_rules! haesli_todo {
macro_rules! amqp_todo {
() => {
return Err(::haesli_core::error::ConException::NotImplemented(concat!(
file!(),

View file

@ -1,7 +1,7 @@
use std::{
borrow::Borrow,
collections::HashMap,
fmt::{Debug, Display, Formatter},
fmt::{Debug, Formatter},
sync::{atomic::AtomicUsize, Arc},
};
@ -35,22 +35,21 @@ newtype!(
impl Borrow<str> for QueueName {
fn borrow(&self) -> &str {
std::borrow::Borrow::borrow(&self.0)
}
}
impl Display for QueueName {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.0, f)
Borrow::borrow(&self.0)
}
}
#[derive(Debug)]
pub struct QueueInner {
/// Internal ID, might actually be unused
pub id: QueueId,
/// The visible name of the queue
pub name: QueueName,
pub messages: haesli_datastructure::MessageQueue<Message>,
/// Whether the queue should be kept when the server restarts
pub durable: bool,
/// To which connection the queue belongs to it will be deleted when the connection closes
// todo: connection or channel?
pub exclusive: Option<ChannelId>,
/// Whether the queue will automatically be deleted when no consumers uses it anymore.
/// The queue can always be manually deleted.

View file

@ -1,10 +1,10 @@
use std::sync::Arc;
use haesli_core::{
amqp_todo,
connection::Channel,
consumer::{Consumer, ConsumerId},
error::ChannelException,
haesli_todo,
methods::{BasicConsume, BasicConsumeOk, Method},
};
use tracing::info;
@ -23,7 +23,7 @@ pub fn consume(channel: Channel, basic_consume: BasicConsume) -> Result<Method>
} = basic_consume;
if no_wait || no_local || exclusive || no_ack {
haesli_todo!();
amqp_todo!();
}
let global_data = channel.global_data.clone();

View file

@ -2,7 +2,7 @@ mod consume;
mod publish;
mod queue;
use haesli_core::{connection::Channel, haesli_todo, message::Message, methods::Method};
use haesli_core::{amqp_todo, connection::Channel, message::Message, methods::Method};
use tracing::info;
use crate::Result;
@ -15,42 +15,42 @@ pub async fn handle_method(channel_handle: Channel, method: Method) -> Result<Me
info!(?method, "Handling method");
let response = match method {
Method::ExchangeDeclare(_) => haesli_todo!(),
Method::ExchangeDeclareOk(_) => haesli_todo!(),
Method::ExchangeDelete(_) => haesli_todo!(),
Method::ExchangeDeleteOk(_) => haesli_todo!(),
Method::ExchangeDeclare(_) => amqp_todo!(),
Method::ExchangeDeclareOk(_) => amqp_todo!(),
Method::ExchangeDelete(_) => amqp_todo!(),
Method::ExchangeDeleteOk(_) => amqp_todo!(),
Method::QueueDeclare(queue_declare) => queue::declare(channel_handle, queue_declare)?,
Method::QueueDeclareOk { .. } => haesli_todo!(),
Method::QueueDeclareOk { .. } => amqp_todo!(),
Method::QueueBind(queue_bind) => queue::bind(channel_handle, queue_bind).await?,
Method::QueueBindOk(_) => haesli_todo!(),
Method::QueueUnbind { .. } => haesli_todo!(),
Method::QueueUnbindOk(_) => haesli_todo!(),
Method::QueuePurge { .. } => haesli_todo!(),
Method::QueuePurgeOk { .. } => haesli_todo!(),
Method::QueueDelete { .. } => haesli_todo!(),
Method::QueueDeleteOk { .. } => haesli_todo!(),
Method::BasicQos { .. } => haesli_todo!(),
Method::BasicQosOk(_) => haesli_todo!(),
Method::QueueBindOk(_) => amqp_todo!(),
Method::QueueUnbind { .. } => amqp_todo!(),
Method::QueueUnbindOk(_) => amqp_todo!(),
Method::QueuePurge { .. } => amqp_todo!(),
Method::QueuePurgeOk { .. } => amqp_todo!(),
Method::QueueDelete { .. } => amqp_todo!(),
Method::QueueDeleteOk { .. } => amqp_todo!(),
Method::BasicQos { .. } => amqp_todo!(),
Method::BasicQosOk(_) => amqp_todo!(),
Method::BasicConsume(consume) => consume::consume(channel_handle, consume)?,
Method::BasicConsumeOk { .. } => haesli_todo!(),
Method::BasicCancel { .. } => haesli_todo!(),
Method::BasicCancelOk { .. } => haesli_todo!(),
Method::BasicReturn { .. } => haesli_todo!(),
Method::BasicDeliver { .. } => haesli_todo!(),
Method::BasicGet { .. } => haesli_todo!(),
Method::BasicGetOk { .. } => haesli_todo!(),
Method::BasicGetEmpty { .. } => haesli_todo!(),
Method::BasicAck { .. } => haesli_todo!(),
Method::BasicReject { .. } => haesli_todo!(),
Method::BasicRecoverAsync { .. } => haesli_todo!(),
Method::BasicRecover { .. } => haesli_todo!(),
Method::BasicRecoverOk(_) => haesli_todo!(),
Method::BasicConsumeOk { .. } => amqp_todo!(),
Method::BasicCancel { .. } => amqp_todo!(),
Method::BasicCancelOk { .. } => amqp_todo!(),
Method::BasicReturn { .. } => amqp_todo!(),
Method::BasicDeliver { .. } => amqp_todo!(),
Method::BasicGet { .. } => amqp_todo!(),
Method::BasicGetOk { .. } => amqp_todo!(),
Method::BasicGetEmpty { .. } => amqp_todo!(),
Method::BasicAck { .. } => amqp_todo!(),
Method::BasicReject { .. } => amqp_todo!(),
Method::BasicRecoverAsync { .. } => amqp_todo!(),
Method::BasicRecover { .. } => amqp_todo!(),
Method::BasicRecoverOk(_) => amqp_todo!(),
Method::TxSelect(_)
| Method::TxSelectOk(_)
| Method::TxCommit(_)
| Method::TxCommitOk(_)
| Method::TxRollback(_)
| Method::TxRollbackOk(_) => haesli_todo!(),
| Method::TxRollbackOk(_) => amqp_todo!(),
Method::BasicPublish { .. } => {
unreachable!("Basic.Publish is handled somewhere else because it has a body")
}

View file

@ -1,7 +1,7 @@
use haesli_core::{
amqp_todo,
connection::Channel,
error::{ChannelException, ConException},
haesli_todo,
message::Message,
queue::QueueEvent,
};
@ -17,7 +17,7 @@ pub fn publish(channel_handle: Channel, message: Message) -> Result<()> {
let routing = &message.routing;
if !routing.exchange.is_empty() {
haesli_todo!();
amqp_todo!();
}
let global_data = global_data.lock();

View file

@ -1,8 +1,8 @@
use std::sync::{atomic::AtomicUsize, Arc};
use haesli_core::{
amqp_todo,
connection::Channel,
haesli_todo,
methods::{Method, QueueBind, QueueDeclare, QueueDeclareOk},
queue::{QueueDeletion, QueueId, QueueInner, QueueName},
GlobalData,
@ -24,16 +24,23 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result<Method>
..
} = queue_declare;
// 2.1.4.1 - If no queue name is given, chose a name
let queue_name = if queue_name.is_empty() {
queue_name
} else {
format!("q_{}", haesli_core::random_uuid())
};
let queue_name = QueueName::new(queue_name.into());
if !arguments.is_empty() {
haesli_todo!();
amqp_todo!();
}
// todo: durable is technically spec-compliant, the spec doesn't really require it, but it's a todo
// not checked here because it's the default for amqplib which is annoying
// todo: implement durable, not checked here because it's the amqplib default
if passive || no_wait {
haesli_todo!();
amqp_todo!();
}
let global_data = channel.global_data.clone();
@ -79,7 +86,7 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result<Method>
}
pub async fn bind(_channel_handle: Channel, _queue_bind: QueueBind) -> Result<Method> {
haesli_todo!();
amqp_todo!();
}
fn bind_queue(global_data: GlobalData, _exchange: (), routing_key: Arc<str>) -> Result<()> {

View file

@ -109,12 +109,30 @@ impl TransportConnection {
}
pub async fn start_connection_processing(mut self) {
let process_result = self.process_connection().await;
self.process_connection().await;
if let Err(err) = self.stream.shutdown().await {
error!(%err, "Failed to shut down TCP stream");
}
// global connection is closed on drop
}
async fn process_connection(&mut self) {
let initialize_result = self.initialize_connection().await;
if let Err(err) = initialize_result {
// 2.2.4 - prior to sending or receiving Open or Open-Ok,
// a peer that detects an error MUST close the socket without sending any further data.
warn!(%err, "An error occurred during connection initialization");
return;
}
let process_result = self.main_loop().await;
match process_result {
Ok(()) => {}
Err(TransError::Protocol(ProtocolError::GracefullyClosed)) => {
/* do nothing, remove below */
Err(TransError::Protocol(ProtocolError::GracefullyClosed | ProtocolError::Fatal)) => {
/* do nothing, connection is closed on drop */
}
Err(TransError::Protocol(ProtocolError::ConException(ex))) => {
warn!(%ex, "Connection exception occurred. This indicates a faulty client.");
@ -129,19 +147,18 @@ impl TransportConnection {
}
Err(err) => error!(%err, "Error during processing of connection"),
}
// global connection is closed on drop
}
pub async fn process_connection(&mut self) -> Result<()> {
async fn initialize_connection(&mut self) -> Result<()> {
// 2.2.4 - Initialize connection
self.negotiate_version().await?;
self.start().await?;
// todo should `secure` happen here?
self.tune().await?;
self.open().await?;
info!("Connection is ready for usage!");
self.main_loop().await
Ok(())
}
async fn send_method_content(
@ -332,7 +349,7 @@ impl TransportConnection {
let result = match frame.kind {
FrameType::Method => self.dispatch_method(frame).await,
FrameType::Heartbeat => {
Ok(()) /* Nothing here, just the `reset_timeout` above */
Ok(()) /* Nothing here, just the `reset_timeout` above */
}
FrameType::Header => self.dispatch_header(frame),
FrameType::Body => self.dispatch_body(frame),
@ -596,7 +613,7 @@ impl TransportConnection {
.await
.context("read protocol header")?;
debug!(received_header = ?read_header_buf,"Received protocol header");
trace!(received_header = ?read_header_buf, "Received protocol header");
let protocol = &read_header_buf[0..4];
let version = &read_header_buf[5..8];
@ -606,19 +623,19 @@ impl TransportConnection {
.write_all(OWN_PROTOCOL_HEADER)
.await
.context("write protocol header")?;
debug!(?protocol, "Version negotiation failed");
trace!(?protocol, "Version negotiation failed");
return Err(ProtocolError::ProtocolNegotiationFailed.into());
}
if &read_header_buf[0..5] == b"AMQP\0" && version == SUPPORTED_PROTOCOL_VERSION {
debug!(?version, "Version negotiation successful");
trace!(?version, "Version negotiation successful");
Ok(())
} else {
self.stream
.write_all(OWN_PROTOCOL_HEADER)
.await
.context("write protocol header")?;
debug!(?version, expected_version = ?SUPPORTED_PROTOCOL_VERSION, "Version negotiation failed");
trace!(?version, expected_version = ?SUPPORTED_PROTOCOL_VERSION, "Version negotiation failed");
Err(ProtocolError::ProtocolNegotiationFailed.into())
}
}
@ -671,7 +688,7 @@ fn server_properties(host: SocketAddr) -> Table {
let host_str = host.ip().to_string();
HashMap::from([
("host".to_owned(), ls(host_str)),
("product".to_owned(), ls("no name yet")),
("product".to_owned(), ls("haesli")),
("version".to_owned(), ls("0.1.0")),
("platform".to_owned(), ls("microsoft linux")),
("copyright".to_owned(), ls("MIT")),

View file

@ -2,7 +2,7 @@ use std::io::Error;
pub use haesli_core::error::{ConException, ProtocolError};
type StdResult<T, E> = std::result::Result<T, E>;
pub type StdResult<T, E> = std::result::Result<T, E>;
pub type Result<T> = StdResult<T, TransError>;