diff --git a/amqp_core/src/error.rs b/amqp_core/src/error.rs index d482d69..2d2f3b8 100644 --- a/amqp_core/src/error.rs +++ b/amqp_core/src/error.rs @@ -1,3 +1,5 @@ +use crate::methods::{ReplyCode, ReplyText}; + #[derive(Debug, thiserror::Error)] pub enum ProtocolError { #[error("fatal error")] @@ -9,7 +11,7 @@ pub enum ProtocolError { #[error("Connection must be closed")] CloseNow, #[error("Graceful connection closing requested")] - GracefulClose, + GracefullyClosed, } #[derive(Debug, thiserror::Error)] @@ -31,5 +33,31 @@ pub enum ConException { Todo, } +impl ConException { + pub fn reply_code(&self) -> ReplyCode { + match self { + ConException::FrameError => 501, + ConException::CommandInvalid => 503, + ConException::SyntaxError(_) => 503, + ConException::ChannelError => 504, + ConException::UnexpectedFrame => 505, + ConException::NotImplemented(_) => 540, + ConException::Todo => 0, + } + } + pub fn reply_text(&self) -> ReplyText { + "cant be bothered yet".to_string() // todo + } +} + #[derive(Debug, thiserror::Error)] pub enum ChannelException {} + +impl ChannelException { + pub fn reply_code(&self) -> ReplyCode { + todo!() + } + pub fn reply_text(&self) -> ReplyText { + todo!() + } +} diff --git a/amqp_messaging/src/methods/queue.rs b/amqp_messaging/src/methods/queue.rs index 9b75a36..88f2b8a 100644 --- a/amqp_messaging/src/methods/queue.rs +++ b/amqp_messaging/src/methods/queue.rs @@ -1,5 +1,5 @@ use amqp_core::connection::ChannelHandle; -use amqp_core::error::{ConException, ProtocolError}; +use amqp_core::error::ProtocolError; use amqp_core::methods::{Method, QueueBind, QueueDeclare, QueueDeclareOk}; use amqp_core::queue::{QueueDeletion, QueueId, QueueName, RawQueue}; use amqp_core::{amqp_todo, GlobalData}; diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 54ad139..14eb7c5 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -1,4 +1,4 @@ -use crate::error::{ConException, ProtocolError, Result}; +use crate::error::{ConException, ProtocolError, Result, TransError}; use crate::frame::{ContentHeader, Frame, FrameType}; use crate::{frame, methods, sasl}; use amqp_core::connection::{ChannelHandle, ChannelNum, ConnectionHandle, ConnectionId}; @@ -21,7 +21,7 @@ use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::time; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace, warn}; fn ensure_conn(condition: bool) -> Result<()> { if condition { @@ -94,8 +94,39 @@ impl Connection { } pub async fn start_connection_processing(mut self) { - match self.process_connection().await { + let process_result = self.process_connection().await; + + match process_result { Ok(()) => {} + Err(TransError::Protocol(ProtocolError::GracefullyClosed)) => { + /* do nothing, remove below */ + } + Err(TransError::Protocol(ProtocolError::ConException(ex))) => { + warn!(%ex, "Connection exception occured. This indicates a faulty client."); + if let Err(err) = self + .send_method( + ChannelNum::zero(), + Method::ConnectionClose(ConnectionClose { + reply_code: ex.reply_code(), + reply_text: ex.reply_text(), + class_id: 0, // todo: do this + method_id: 0, + }), + ) + .await + { + error!(%ex, %err, "Failed to close connection after ConnectionException"); + } + match self.recv_method().await { + Ok(Method::ConnectionCloseOk(_)) => {} + Ok(method) => { + error!(%ex, ?method, "Received wrong method after ConnectionException") + } + Err(err) => { + error!(%ex, %err, "Failed to receive Connection.CloseOk method after ConnectionException") + } + } + } Err(err) => error!(%err, "Error during processing of connection"), } @@ -115,6 +146,8 @@ impl Connection { } async fn send_method(&mut self, channel: ChannelNum, method: Method) -> Result<()> { + trace!(%channel, ?method, "Sending method"); + let mut payload = Vec::with_capacity(64); methods::write::write_method(method, &mut payload)?; frame::write_frame( @@ -224,17 +257,41 @@ impl Connection { async fn main_loop(&mut self) -> Result<()> { loop { let frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?; - self.reset_timeout(); - - match frame.kind { - FrameType::Method => self.dispatch_method(frame).await?, - FrameType::Heartbeat => { /* Nothing here, just the `reset_timeout` above */ } - FrameType::Header => self.dispatch_header(frame)?, - FrameType::Body => self.dispatch_body(frame)?, + let channel = frame.channel; + let result = self.handle_frame(frame).await; + match result { + Ok(()) => {} + Err(TransError::Protocol(ProtocolError::ChannelException(ex))) => { + self.send_method( + channel, + Method::ChannelClose(ChannelClose { + reply_code: ex.reply_code(), + reply_text: ex.reply_text(), + class_id: 0, // todo: do this + method_id: 0, + }), + ) + .await?; + drop(self.channels.remove(&channel)); + } + Err(other_err) => return Err(other_err), } } } + async fn handle_frame(&mut self, frame: Frame) -> Result<()> { + self.reset_timeout(); + + match frame.kind { + FrameType::Method => self.dispatch_method(frame).await?, + FrameType::Heartbeat => { /* Nothing here, just the `reset_timeout` above */ } + FrameType::Header => self.dispatch_header(frame)?, + FrameType::Body => self.dispatch_body(frame)?, + } + + Ok(()) + } + async fn dispatch_method(&mut self, frame: Frame) -> Result<()> { let method = methods::parse_method(&frame.payload)?; debug!(?method, "Received method"); @@ -257,7 +314,7 @@ impl Connection { Method::ConnectionCloseOk(ConnectionCloseOk), ) .await?; - return Err(ProtocolError::GracefulClose.into()); + return Err(ProtocolError::GracefullyClosed.into()); } Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?, Method::ChannelClose { .. } => self.channel_close(frame.channel, method).await?, diff --git a/test-js/src/declare-queue.js b/test-js/src/declare-queue.js index b81d670..9a5f979 100644 --- a/test-js/src/declare-queue.js +++ b/test-js/src/declare-queue.js @@ -6,7 +6,7 @@ const connection = await connectAmqp(); const channel = await connection.createChannel(); -const reply = await channel.assertQueue(queueName, { durable: false }); +const reply = await channel.assertQueue(queueName, { durable: true }); assert(reply.messageCount === 0, 'Message found in queue'); assert(reply.consumerCount === 0, 'Consumer listening on queue');