error handling

This commit is contained in:
nora 2022-02-28 12:46:08 +01:00
parent fc131327b2
commit 232c1d2830
4 changed files with 99 additions and 14 deletions

View file

@ -1,3 +1,5 @@
use crate::methods::{ReplyCode, ReplyText};
#[derive(Debug, thiserror::Error)]
pub enum ProtocolError {
#[error("fatal error")]
@ -9,7 +11,7 @@ pub enum ProtocolError {
#[error("Connection must be closed")]
CloseNow,
#[error("Graceful connection closing requested")]
GracefulClose,
GracefullyClosed,
}
#[derive(Debug, thiserror::Error)]
@ -31,5 +33,31 @@ pub enum ConException {
Todo,
}
impl ConException {
pub fn reply_code(&self) -> ReplyCode {
match self {
ConException::FrameError => 501,
ConException::CommandInvalid => 503,
ConException::SyntaxError(_) => 503,
ConException::ChannelError => 504,
ConException::UnexpectedFrame => 505,
ConException::NotImplemented(_) => 540,
ConException::Todo => 0,
}
}
pub fn reply_text(&self) -> ReplyText {
"cant be bothered yet".to_string() // todo
}
}
#[derive(Debug, thiserror::Error)]
pub enum ChannelException {}
impl ChannelException {
pub fn reply_code(&self) -> ReplyCode {
todo!()
}
pub fn reply_text(&self) -> ReplyText {
todo!()
}
}

View file

@ -1,5 +1,5 @@
use amqp_core::connection::ChannelHandle;
use amqp_core::error::{ConException, ProtocolError};
use amqp_core::error::ProtocolError;
use amqp_core::methods::{Method, QueueBind, QueueDeclare, QueueDeclareOk};
use amqp_core::queue::{QueueDeletion, QueueId, QueueName, RawQueue};
use amqp_core::{amqp_todo, GlobalData};

View file

@ -1,4 +1,4 @@
use crate::error::{ConException, ProtocolError, Result};
use crate::error::{ConException, ProtocolError, Result, TransError};
use crate::frame::{ContentHeader, Frame, FrameType};
use crate::{frame, methods, sasl};
use amqp_core::connection::{ChannelHandle, ChannelNum, ConnectionHandle, ConnectionId};
@ -21,7 +21,7 @@ use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, trace, warn};
fn ensure_conn(condition: bool) -> Result<()> {
if condition {
@ -94,8 +94,39 @@ impl Connection {
}
pub async fn start_connection_processing(mut self) {
match self.process_connection().await {
let process_result = self.process_connection().await;
match process_result {
Ok(()) => {}
Err(TransError::Protocol(ProtocolError::GracefullyClosed)) => {
/* do nothing, remove below */
}
Err(TransError::Protocol(ProtocolError::ConException(ex))) => {
warn!(%ex, "Connection exception occured. This indicates a faulty client.");
if let Err(err) = self
.send_method(
ChannelNum::zero(),
Method::ConnectionClose(ConnectionClose {
reply_code: ex.reply_code(),
reply_text: ex.reply_text(),
class_id: 0, // todo: do this
method_id: 0,
}),
)
.await
{
error!(%ex, %err, "Failed to close connection after ConnectionException");
}
match self.recv_method().await {
Ok(Method::ConnectionCloseOk(_)) => {}
Ok(method) => {
error!(%ex, ?method, "Received wrong method after ConnectionException")
}
Err(err) => {
error!(%ex, %err, "Failed to receive Connection.CloseOk method after ConnectionException")
}
}
}
Err(err) => error!(%err, "Error during processing of connection"),
}
@ -115,6 +146,8 @@ impl Connection {
}
async fn send_method(&mut self, channel: ChannelNum, method: Method) -> Result<()> {
trace!(%channel, ?method, "Sending method");
let mut payload = Vec::with_capacity(64);
methods::write::write_method(method, &mut payload)?;
frame::write_frame(
@ -224,17 +257,41 @@ impl Connection {
async fn main_loop(&mut self) -> Result<()> {
loop {
let frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?;
self.reset_timeout();
match frame.kind {
FrameType::Method => self.dispatch_method(frame).await?,
FrameType::Heartbeat => { /* Nothing here, just the `reset_timeout` above */ }
FrameType::Header => self.dispatch_header(frame)?,
FrameType::Body => self.dispatch_body(frame)?,
let channel = frame.channel;
let result = self.handle_frame(frame).await;
match result {
Ok(()) => {}
Err(TransError::Protocol(ProtocolError::ChannelException(ex))) => {
self.send_method(
channel,
Method::ChannelClose(ChannelClose {
reply_code: ex.reply_code(),
reply_text: ex.reply_text(),
class_id: 0, // todo: do this
method_id: 0,
}),
)
.await?;
drop(self.channels.remove(&channel));
}
Err(other_err) => return Err(other_err),
}
}
}
async fn handle_frame(&mut self, frame: Frame) -> Result<()> {
self.reset_timeout();
match frame.kind {
FrameType::Method => self.dispatch_method(frame).await?,
FrameType::Heartbeat => { /* Nothing here, just the `reset_timeout` above */ }
FrameType::Header => self.dispatch_header(frame)?,
FrameType::Body => self.dispatch_body(frame)?,
}
Ok(())
}
async fn dispatch_method(&mut self, frame: Frame) -> Result<()> {
let method = methods::parse_method(&frame.payload)?;
debug!(?method, "Received method");
@ -257,7 +314,7 @@ impl Connection {
Method::ConnectionCloseOk(ConnectionCloseOk),
)
.await?;
return Err(ProtocolError::GracefulClose.into());
return Err(ProtocolError::GracefullyClosed.into());
}
Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?,
Method::ChannelClose { .. } => self.channel_close(frame.channel, method).await?,

View file

@ -6,7 +6,7 @@ const connection = await connectAmqp();
const channel = await connection.createChannel();
const reply = await channel.assertQueue(queueName, { durable: false });
const reply = await channel.assertQueue(queueName, { durable: true });
assert(reply.messageCount === 0, 'Message found in queue');
assert(reply.consumerCount === 0, 'Consumer listening on queue');