From 606438f301422ec0f5f0561110aea6a77de874dc Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Sat, 26 Feb 2022 13:06:34 +0100 Subject: [PATCH] things --- Cargo.lock | 1 + amqp_core/Cargo.toml | 1 + amqp_core/src/error.rs | 35 ++++++++++++ amqp_core/src/lib.rs | 5 ++ amqp_core/src/queue.rs | 25 +++++++++ amqp_messaging/src/methods.rs | 52 ++++++++++++++++-- amqp_transport/src/connection.rs | 63 ++++++++++------------ amqp_transport/src/error.rs | 44 +++------------ amqp_transport/src/frame.rs | 11 ++-- amqp_transport/src/methods/mod.rs | 13 ++--- amqp_transport/src/methods/parse_helper.rs | 17 +++--- amqp_transport/src/sasl.rs | 17 +++--- test-js/src/open-channel.js | 2 +- xtask/src/test_js.rs | 1 + 14 files changed, 173 insertions(+), 114 deletions(-) create mode 100644 amqp_core/src/error.rs create mode 100644 amqp_core/src/queue.rs diff --git a/Cargo.lock b/Cargo.lock index b34948d..d24442f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,6 +31,7 @@ dependencies = [ "bytes", "parking_lot", "smallvec", + "thiserror", "uuid", ] diff --git a/amqp_core/Cargo.toml b/amqp_core/Cargo.toml index 4a10592..ce92335 100644 --- a/amqp_core/Cargo.toml +++ b/amqp_core/Cargo.toml @@ -9,4 +9,5 @@ edition = "2021" bytes = "1.1.0" parking_lot = "0.12.0" smallvec = { version = "1.8.0", features = ["union"] } +thiserror = "1.0.30" uuid = "0.8.2" diff --git a/amqp_core/src/error.rs b/amqp_core/src/error.rs new file mode 100644 index 0000000..ab42013 --- /dev/null +++ b/amqp_core/src/error.rs @@ -0,0 +1,35 @@ +#[derive(Debug, thiserror::Error)] +pub enum ProtocolError { + #[error("fatal error")] + Fatal, + #[error("{0}")] + ConException(#[from] ConException), + #[error("{0}")] + ChannelException(#[from] ChannelException), + #[error("Connection must be closed")] + CloseNow, + #[error("Graceful connection closing requested")] + GracefulClose, +} + +#[derive(Debug, thiserror::Error)] +pub enum ConException { + #[error("501 Frame error")] + FrameError, + #[error("503 Command invalid")] + CommandInvalid, + #[error("503 Syntax error | {0:?}")] + /// A method was received but there was a syntax error. The string stores where it occurred. + SyntaxError(Vec), + #[error("504 Channel error")] + ChannelError, + #[error("505 Unexpected Frame")] + UnexpectedFrame, + #[error("540 Not implemented")] + NotImplemented, + #[error("xxx Not decided yet")] + Todo, +} + +#[derive(Debug, thiserror::Error)] +pub enum ChannelException {} diff --git a/amqp_core/src/lib.rs b/amqp_core/src/lib.rs index dd3ab6c..fc875f1 100644 --- a/amqp_core/src/lib.rs +++ b/amqp_core/src/lib.rs @@ -1,8 +1,11 @@ #![warn(rust_2018_idioms)] +pub mod error; pub mod message; pub mod methods; +pub mod queue; +use crate::queue::Queue; use parking_lot::Mutex; use std::collections::HashMap; use std::net::SocketAddr; @@ -47,6 +50,7 @@ pub struct Connection { pub peer_addr: SocketAddr, pub global_data: GlobalData, pub channels: HashMap, + pub exclusive_queues: Vec, } impl Connection { @@ -60,6 +64,7 @@ impl Connection { peer_addr, global_data, channels: HashMap::new(), + exclusive_queues: vec![], })) } diff --git a/amqp_core/src/queue.rs b/amqp_core/src/queue.rs new file mode 100644 index 0000000..f8074a8 --- /dev/null +++ b/amqp_core/src/queue.rs @@ -0,0 +1,25 @@ +use crate::message::Message; +use parking_lot::Mutex; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; +use uuid::Uuid; + +pub type Queue = Arc; + +#[derive(Debug)] +pub struct RawQueue { + pub id: Uuid, + pub name: String, + pub messages: Mutex>, // use a concurrent linked list??? + pub durable: bool, + /// Whether the queue will automatically be deleted when no consumers uses it anymore. + /// The queue can always be manually deleted. + /// If auto-delete is enabled, it keeps track of the consumer count. + pub deletion: QueueDeletion, +} + +#[derive(Debug)] +pub enum QueueDeletion { + Auto(AtomicUsize), + Manual, +} diff --git a/amqp_messaging/src/methods.rs b/amqp_messaging/src/methods.rs index 8d1248f..cbea527 100644 --- a/amqp_messaging/src/methods.rs +++ b/amqp_messaging/src/methods.rs @@ -1,8 +1,7 @@ +use amqp_core::error::{ConException, ProtocolError}; use amqp_core::message::Message; use amqp_core::methods::Method; use amqp_core::ChannelHandle; -use std::time::Duration; -use tokio::time; use tracing::{debug, info}; pub async fn handle_basic_publish(_channel_handle: ChannelHandle, message: Message) { @@ -12,7 +11,50 @@ pub async fn handle_basic_publish(_channel_handle: ChannelHandle, message: Messa ); } -pub async fn handle_method(_channel_handle: ChannelHandle, _method: Method) { - debug!("handling method or something in that cool new future"); - time::sleep(Duration::from_secs(10)).await; +pub async fn handle_method( + _channel_handle: ChannelHandle, + method: Method, +) -> Result<(), ProtocolError> { + match method { + Method::ExchangeDeclare { .. } => Err(ConException::NotImplemented.into()), + Method::ExchangeDeclareOk => Err(ConException::NotImplemented.into()), + Method::ExchangeDelete { .. } => Err(ConException::NotImplemented.into()), + Method::ExchangeDeleteOk => Err(ConException::NotImplemented.into()), + Method::QueueDeclare { .. } => Err(ConException::NotImplemented.into()), + Method::QueueDeclareOk { .. } => Err(ConException::NotImplemented.into()), + Method::QueueBind { .. } => Err(ConException::NotImplemented.into()), + Method::QueueBindOk => Err(ConException::NotImplemented.into()), + Method::QueueUnbind { .. } => Err(ConException::NotImplemented.into()), + Method::QueueUnbindOk => Err(ConException::NotImplemented.into()), + Method::QueuePurge { .. } => Err(ConException::NotImplemented.into()), + Method::QueuePurgeOk { .. } => Err(ConException::NotImplemented.into()), + Method::QueueDelete { .. } => Err(ConException::NotImplemented.into()), + Method::QueueDeleteOk { .. } => Err(ConException::NotImplemented.into()), + Method::BasicQos { .. } => Err(ConException::NotImplemented.into()), + Method::BasicQosOk => Err(ConException::NotImplemented.into()), + Method::BasicConsume { .. } => Err(ConException::NotImplemented.into()), + Method::BasicConsumeOk { .. } => Err(ConException::NotImplemented.into()), + Method::BasicCancel { .. } => Err(ConException::NotImplemented.into()), + Method::BasicCancelOk { .. } => Err(ConException::NotImplemented.into()), + Method::BasicReturn { .. } => Err(ConException::NotImplemented.into()), + Method::BasicDeliver { .. } => Err(ConException::NotImplemented.into()), + Method::BasicGet { .. } => Err(ConException::NotImplemented.into()), + Method::BasicGetOk { .. } => Err(ConException::NotImplemented.into()), + Method::BasicGetEmpty { .. } => Err(ConException::NotImplemented.into()), + Method::BasicAck { .. } => Err(ConException::NotImplemented.into()), + Method::BasicReject { .. } => Err(ConException::NotImplemented.into()), + Method::BasicRecoverAsync { .. } => Err(ConException::NotImplemented.into()), + Method::BasicRecover { .. } => Err(ConException::NotImplemented.into()), + Method::BasicRecoverOk => Err(ConException::NotImplemented.into()), + Method::TxSelect + | Method::TxSelectOk + | Method::TxCommit + | Method::TxCommitOk + | Method::TxRollback + | Method::TxRollbackOk => Err(ConException::NotImplemented.into()), + Method::BasicPublish { .. } => { + unreachable!("Basic.Publish is handled somewhere else because it has a body") + } + _ => unreachable!("Method handled by transport layer"), + } } diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 3678eb3..1b5aac4 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -1,32 +1,29 @@ +use crate::error::{ConException, ProtocolError, Result}; +use crate::frame::{ChannelId, ContentHeader, Frame, FrameType}; +use crate::{frame, methods, sasl}; +use amqp_core::message::{RawMessage, RoutingInformation}; +use amqp_core::methods::{FieldValue, Method, Table}; +use amqp_core::GlobalData; +use anyhow::Context; +use bytes::Bytes; +use smallvec::SmallVec; use std::cmp::Ordering; use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; - -use anyhow::Context; -use bytes::Bytes; -use smallvec::SmallVec; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::time; use tracing::{debug, error, info, warn}; use uuid::Uuid; -use amqp_core::message::{RawMessage, RoutingInformation}; -use amqp_core::methods::{FieldValue, Method, Table}; -use amqp_core::GlobalData; - -use crate::error::{ConException, ProtocolError, Result}; -use crate::frame::{ChannelId, ContentHeader, Frame, FrameType}; -use crate::{frame, methods, sasl}; - fn ensure_conn(condition: bool) -> Result<()> { if condition { Ok(()) } else { - Err(ConException::Todo.into_trans()) + Err(ConException::Todo.into()) } } @@ -167,7 +164,7 @@ impl Connection { let plain_user = sasl::parse_sasl_plain_response(&response)?; info!(username = %plain_user.authentication_identity, "SASL Authentication successful") } else { - return Err(ConException::Todo.into_trans()); + return Err(ConException::Todo.into()); } Ok(()) @@ -262,21 +259,20 @@ impl Connection { Some(channel) => { channel.status = ChannelStatus::NeedHeader(BASIC_CLASS_ID, Box::new(method)) } - None => return Err(ConException::Todo.into_trans()), + None => return Err(ConException::Todo.into()), }, _ => { let channel_handle = self .channels .get(&frame.channel) - .ok_or_else(|| ConException::Todo.into_trans())? + .ok_or(ConException::Todo)? .handle .clone(); - tokio::spawn(amqp_messaging::methods::handle_method( - channel_handle, - method, - )); - // we don't handle this here, forward it to *somewhere* + // call into amqp_messaging to handle the method + // amqp_messaging then branches and spawns a new task for longer running things, + // so the connection task will only be "blocked" for a short amount of time + amqp_messaging::methods::handle_method(channel_handle, method).await?; } } Ok(()) @@ -285,11 +281,11 @@ impl Connection { fn dispatch_header(&mut self, frame: Frame) -> Result<()> { self.channels .get_mut(&frame.channel) - .ok_or_else(|| ConException::Todo.into_trans()) + .ok_or_else(|| ConException::Todo.into()) .and_then(|channel| match channel.status.take() { ChannelStatus::Default => { warn!(channel = %frame.channel, "unexpected header"); - Err(ConException::UnexpectedFrame.into_trans()) + Err(ConException::UnexpectedFrame.into()) } ChannelStatus::NeedHeader(class_id, method) => { let header = ContentHeader::parse(&frame.payload)?; @@ -300,7 +296,7 @@ impl Connection { } ChannelStatus::NeedsBody(_, _, _) => { warn!(channel = %frame.channel, "already got header"); - Err(ConException::UnexpectedFrame.into_trans()) + Err(ConException::UnexpectedFrame.into()) } }) } @@ -309,16 +305,16 @@ impl Connection { let channel = self .channels .get_mut(&frame.channel) - .ok_or_else(|| ConException::Todo.into_trans())?; + .ok_or(ConException::Todo)?; match channel.status.take() { ChannelStatus::Default => { warn!(channel = %frame.channel, "unexpected body"); - Err(ConException::UnexpectedFrame.into_trans()) + Err(ConException::UnexpectedFrame.into()) } ChannelStatus::NeedHeader(_, _) => { warn!(channel = %frame.channel, "unexpected body"); - Err(ConException::UnexpectedFrame.into_trans()) + Err(ConException::UnexpectedFrame.into()) } ChannelStatus::NeedsBody(method, header, mut vec) => { vec.push(frame.payload); @@ -331,7 +327,7 @@ impl Connection { Ordering::Equal => { self.process_method_with_body(*method, *header, vec, frame.channel) } - Ordering::Greater => Err(ConException::Todo.into_trans()), + Ordering::Greater => Err(ConException::Todo.into()), Ordering::Less => Ok(()), // wait for next body } } @@ -369,10 +365,7 @@ impl Connection { }; let message = Arc::new(message); - let channel = self - .channels - .get(&channel) - .ok_or_else(|| ConException::Todo.into_trans())?; + let channel = self.channels.get(&channel).ok_or(ConException::Todo)?; // Spawn the handler for the publish. The connection task goes back to handling // just the connection. @@ -382,7 +375,7 @@ impl Connection { )); Ok(()) } else { - Err(ConException::Todo.into_trans()) + Err(ConException::Todo.into()) } } @@ -403,7 +396,7 @@ impl Connection { let prev = self.channels.insert(channel_id, channel); if let Some(prev) = prev { self.channels.insert(channel_id, prev); // restore previous state - return Err(ConException::ChannelError.into_trans()); + return Err(ConException::ChannelError.into()); } { @@ -444,7 +437,7 @@ impl Connection { drop(channel); self.send_method(channel_id, Method::ChannelCloseOk).await?; } else { - return Err(ConException::Todo.into_trans()); + return Err(ConException::Todo.into()); } } else { unreachable!() diff --git a/amqp_transport/src/error.rs b/amqp_transport/src/error.rs index 188db0a..419ebf0 100644 --- a/amqp_transport/src/error.rs +++ b/amqp_transport/src/error.rs @@ -2,6 +2,8 @@ use std::io::Error; +pub use amqp_core::error::{ConException, ProtocolError}; + pub type StdResult = std::result::Result; pub type Result = StdResult; @@ -9,7 +11,7 @@ pub type Result = StdResult; #[derive(Debug, thiserror::Error)] pub enum TransError { #[error("{0}")] - Invalid(#[from] ProtocolError), + Protocol(#[from] ProtocolError), #[error("connection error: `{0}`")] Other(#[from] anyhow::Error), } @@ -20,42 +22,8 @@ impl From for TransError { } } -#[derive(Debug, thiserror::Error)] -pub enum ProtocolError { - #[error("fatal error")] - Fatal, - #[error("{0}")] - ConException(#[from] ConException), - #[error("{0}")] - ChannelException(#[from] ChannelException), - #[error("Connection must be closed")] - CloseNow, - #[error("Graceful connection closing requested")] - GracefulClose, -} - -#[derive(Debug, thiserror::Error)] -pub enum ConException { - #[error("501 Frame error")] - FrameError, - #[error("503 Command invalid")] - CommandInvalid, - #[error("503 Syntax error | {0:?}")] - /// A method was received but there was a syntax error. The string stores where it occurred. - SyntaxError(Vec), - #[error("504 Channel error")] - ChannelError, - #[error("505 Unexpected Frame")] - UnexpectedFrame, - #[error("xxx Not decided yet")] - Todo, -} - -impl ConException { - pub fn into_trans(self) -> TransError { - TransError::Invalid(ProtocolError::ConException(self)) +impl From for TransError { + fn from(err: ConException) -> Self { + Self::Protocol(ProtocolError::ConException(err)) } } - -#[derive(Debug, thiserror::Error)] -pub enum ChannelException {} diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs index df81f8b..b4f71ff 100644 --- a/amqp_transport/src/frame.rs +++ b/amqp_transport/src/frame.rs @@ -148,14 +148,11 @@ impl ContentHeader { Ok((_, _)) => { Err( ConException::SyntaxError(vec!["could not consume all input".to_string()]) - .into_trans(), + .into(), ) } Err(nom::Err::Incomplete(_)) => { - Err( - ConException::SyntaxError(vec!["there was not enough data".to_string()]) - .into_trans(), - ) + Err(ConException::SyntaxError(vec!["there was not enough data".to_string()]).into()) } Err(nom::Err::Failure(err) | nom::Err::Error(err)) => Err(err), } @@ -197,7 +194,7 @@ where } if max_frame_size != 0 && payload.len() > max_frame_size { - return Err(ConException::FrameError.into_trans()); + return Err(ConException::FrameError.into()); } let kind = parse_frame_type(kind, channel)?; @@ -225,7 +222,7 @@ fn parse_frame_type(kind: u8, channel: ChannelId) -> Result { Err(ProtocolError::ConException(ConException::FrameError).into()) } } - _ => Err(ConException::FrameError.into_trans()), + _ => Err(ConException::FrameError.into()), } } diff --git a/amqp_transport/src/methods/mod.rs b/amqp_transport/src/methods/mod.rs index f413f91..f2cd587 100644 --- a/amqp_transport/src/methods/mod.rs +++ b/amqp_transport/src/methods/mod.rs @@ -1,4 +1,5 @@ -use crate::error::{ConException, TransError}; +use crate::error::TransError; +use amqp_core::error::ConException; use amqp_core::methods::{FieldValue, Method, Table}; use rand::Rng; use std::collections::HashMap; @@ -18,16 +19,10 @@ pub fn parse_method(payload: &[u8]) -> Result { match nom_result { Ok(([], method)) => Ok(method), Ok((_, _)) => { - Err( - ConException::SyntaxError(vec!["could not consume all input".to_string()]) - .into_trans(), - ) + Err(ConException::SyntaxError(vec!["could not consume all input".to_string()]).into()) } Err(nom::Err::Incomplete(_)) => { - Err( - ConException::SyntaxError(vec!["there was not enough data".to_string()]) - .into_trans(), - ) + Err(ConException::SyntaxError(vec!["there was not enough data".to_string()]).into()) } Err(nom::Err::Failure(err) | nom::Err::Error(err)) => Err(err), } diff --git a/amqp_transport/src/methods/parse_helper.rs b/amqp_transport/src/methods/parse_helper.rs index f59d694..0638d43 100644 --- a/amqp_transport/src/methods/parse_helper.rs +++ b/amqp_transport/src/methods/parse_helper.rs @@ -1,5 +1,6 @@ -use crate::error::{ConException, ProtocolError, TransError}; +use crate::error::TransError; use crate::methods::generated::parse::IResult; +use amqp_core::error::{ConException, ProtocolError}; use amqp_core::methods::{ Bit, FieldValue, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, TableFieldName, Timestamp, @@ -15,7 +16,7 @@ use std::collections::HashMap; impl nom::error::ParseError for TransError { fn from_error_kind(_input: T, _kind: ErrorKind) -> Self { - ConException::SyntaxError(vec![]).into_trans() + ConException::SyntaxError(vec![]).into() } fn append(_input: T, _kind: ErrorKind, other: Self) -> Self { @@ -28,7 +29,7 @@ pub fn fail_err>(msg: S) -> impl FnOnce(Err) -> Err< let msg = msg.into(); let stack = match err { Err::Error(e) | Err::Failure(e) => match e { - TransError::Invalid(ProtocolError::ConException(ConException::SyntaxError( + TransError::Protocol(ProtocolError::ConException(ConException::SyntaxError( mut stack, ))) => { stack.push(msg); @@ -38,20 +39,20 @@ pub fn fail_err>(msg: S) -> impl FnOnce(Err) -> Err< }, _ => vec![msg], }; - Err::Failure(ConException::SyntaxError(stack).into_trans()) + Err::Failure(ConException::SyntaxError(stack).into()) } } pub fn err_other>(msg: S) -> impl FnOnce(E) -> Err { - move |_| Err::Error(ConException::SyntaxError(vec![msg.into()]).into_trans()) + move |_| Err::Error(ConException::SyntaxError(vec![msg.into()]).into()) } #[macro_export] macro_rules! fail { ($cause:expr) => { return Err(nom::Err::Failure( - crate::error::ProtocolError::ConException(crate::error::ConException::SyntaxError( - vec![String::from($cause)], - )) + ::amqp_core::error::ProtocolError::ConException( + ::amqp_core::error::ConException::SyntaxError(vec![String::from($cause)]), + ) .into(), )) }; diff --git a/amqp_transport/src/sasl.rs b/amqp_transport/src/sasl.rs index 970d113..18d6992 100644 --- a/amqp_transport/src/sasl.rs +++ b/amqp_transport/src/sasl.rs @@ -2,7 +2,8 @@ //! //! Currently only supports PLAIN (see [RFC 4616](https://datatracker.ietf.org/doc/html/rfc4616)) -use crate::error::{ConException, Result}; +use crate::error::Result; +use amqp_core::error::ConException; pub struct PlainUser { pub authorization_identity: String, @@ -13,17 +14,11 @@ pub struct PlainUser { pub fn parse_sasl_plain_response(response: &[u8]) -> Result { let mut parts = response .split(|&n| n == 0) - .map(|bytes| String::from_utf8(bytes.into()).map_err(|_| ConException::Todo.into_trans())); + .map(|bytes| String::from_utf8(bytes.into()).map_err(|_| ConException::Todo)); - let authorization_identity = parts - .next() - .ok_or_else(|| ConException::Todo.into_trans())??; - let authentication_identity = parts - .next() - .ok_or_else(|| ConException::Todo.into_trans())??; - let password = parts - .next() - .ok_or_else(|| ConException::Todo.into_trans())??; + let authorization_identity = parts.next().ok_or_else(|| ConException::Todo)??; + let authentication_identity = parts.next().ok_or_else(|| ConException::Todo)??; + let password = parts.next().ok_or_else(|| ConException::Todo)??; Ok(PlainUser { authorization_identity, diff --git a/test-js/src/open-channel.js b/test-js/src/open-channel.js index 3d47839..0e636ec 100644 --- a/test-js/src/open-channel.js +++ b/test-js/src/open-channel.js @@ -1,4 +1,4 @@ -import { connectAmqp, sleep } from './utils/utils.js'; +import { connectAmqp } from './utils/utils.js'; const connection = await connectAmqp(); diff --git a/xtask/src/test_js.rs b/xtask/src/test_js.rs index c625d48..1a1f63e 100644 --- a/xtask/src/test_js.rs +++ b/xtask/src/test_js.rs @@ -9,6 +9,7 @@ pub fn main() -> Result<()> { let mut amqp_server = Command::new("cargo") .arg("run") + .env("RUST_LOG", "trace") .spawn() .context("`cargo run` amqp")?;