This commit is contained in:
nora 2022-03-20 22:18:32 +01:00
parent 96f2d9f4f0
commit 6f6c0848ac
2 changed files with 67 additions and 41 deletions

View file

@ -0,0 +1,11 @@
use haesli_core::{
amqp_todo,
connection::Channel,
methods::{ExchangeDeclare, Method},
};
use crate::Result;
pub fn declare(_channel: Channel, _exchange_declare: ExchangeDeclare) -> Result<Method> {
amqp_todo!()
}

View file

@ -1,59 +1,74 @@
mod consume;
mod exchange;
mod publish;
mod queue;
use haesli_core::{amqp_todo, connection::Channel, methods::Method};
use haesli_core::{amqp_todo, connection::Channel, error::ConException, methods::Method};
pub use publish::publish;
use tracing::info;
use tracing::{info, warn};
use crate::Result;
/// This is the entrypoint of methods not handled by the connection itself.
/// Note that Basic.Publish is *not* sent here, but to [`handle_basic_publish`](crate::handle_basic_publish)
pub fn handle_method(channel_handle: Channel, method: Method) -> Result<Method> {
pub fn handle_method(channel: Channel, method: Method) -> Result<Method> {
use Method::*;
info!(?method, "Handling method");
let response = match method {
Method::ExchangeDeclare(_) => amqp_todo!(),
Method::ExchangeDeclareOk(_) => amqp_todo!(),
Method::ExchangeDelete(_) => amqp_todo!(),
Method::ExchangeDeleteOk(_) => amqp_todo!(),
Method::QueueDeclare(queue_declare) => queue::declare(channel_handle, queue_declare)?,
Method::QueueDeclareOk { .. } => amqp_todo!(),
Method::QueueBind(queue_bind) => queue::bind(channel_handle, queue_bind)?,
Method::QueueBindOk(_) => amqp_todo!(),
Method::QueueUnbind { .. } => amqp_todo!(),
Method::QueueUnbindOk(_) => amqp_todo!(),
Method::QueuePurge { .. } => amqp_todo!(),
Method::QueuePurgeOk { .. } => amqp_todo!(),
Method::QueueDelete { .. } => amqp_todo!(),
Method::QueueDeleteOk { .. } => amqp_todo!(),
Method::BasicQos { .. } => amqp_todo!(),
Method::BasicQosOk(_) => amqp_todo!(),
Method::BasicConsume(consume) => consume::consume(channel_handle, consume)?,
Method::BasicConsumeOk { .. } => amqp_todo!(),
Method::BasicCancel { .. } => amqp_todo!(),
Method::BasicCancelOk { .. } => amqp_todo!(),
Method::BasicReturn { .. } => amqp_todo!(),
Method::BasicDeliver { .. } => amqp_todo!(),
Method::BasicGet { .. } => amqp_todo!(),
Method::BasicGetOk { .. } => amqp_todo!(),
Method::BasicGetEmpty { .. } => amqp_todo!(),
Method::BasicAck { .. } => amqp_todo!(),
Method::BasicReject { .. } => amqp_todo!(),
Method::BasicRecoverAsync { .. } => amqp_todo!(),
Method::BasicRecover { .. } => amqp_todo!(),
Method::BasicRecoverOk(_) => amqp_todo!(),
Method::TxSelect(_)
| Method::TxSelectOk(_)
| Method::TxCommit(_)
| Method::TxCommitOk(_)
| Method::TxRollback(_)
| Method::TxRollbackOk(_) => amqp_todo!(),
Method::BasicPublish { .. } => {
ExchangeDeclare(exchange_declare) => exchange::declare(channel, exchange_declare)?,
ExchangeDelete(_) => amqp_todo!(),
QueueDeclare(queue_declare) => queue::declare(channel, queue_declare)?,
QueueBind(queue_bind) => queue::bind(channel, queue_bind)?,
QueueUnbind(_) => amqp_todo!(),
QueuePurge(_) => amqp_todo!(),
QueueDelete(_) => amqp_todo!(),
BasicQos(_) => amqp_todo!(),
BasicConsume(consume) => consume::consume(channel, consume)?,
BasicCancel(_) => amqp_todo!(),
BasicGet(_) => amqp_todo!(),
BasicAck(_) => amqp_todo!(),
BasicReject(_) => amqp_todo!(),
BasicRecoverAsync(_) => amqp_todo!(),
BasicRecover(_) => amqp_todo!(),
TxSelect(_) => amqp_todo!(),
TxSelectOk(_) => amqp_todo!(),
TxCommit(_) => amqp_todo!(),
TxRollback(_) => amqp_todo!(),
BasicPublish(_) => {
unreachable!("Basic.Publish is handled somewhere else because it has a body")
}
_ => unreachable!("Method handled by transport layer"),
ConnectionStartOk(_)
| ConnectionSecureOk(_)
| ConnectionTuneOk(_)
| ConnectionOpenOk(_)
| ConnectionCloseOk(_)
| ChannelOpenOk(_)
| ChannelFlowOk(_)
| ChannelCloseOk(_)
| ExchangeDeclareOk(_)
| ExchangeDeleteOk(_)
| QueueDeclareOk(_)
| QueueBindOk(_)
| QueueUnbindOk(_)
| QueuePurgeOk(_)
| QueueDeleteOk(_)
| BasicQosOk(_)
| BasicCancelOk(_)
| BasicConsumeOk(_)
| BasicReturn(_)
| BasicDeliver(_)
| BasicGetOk(_)
| BasicGetEmpty(_)
| BasicRecoverOk(_)
| TxCommitOk(_)
| TxRollbackOk(_) => return Err(ConException::NotAllowed.into()), // only sent by server
ConnectionStart(_) | ConnectionSecure(_) | ConnectionTune(_) | ConnectionOpen(_)
| ConnectionClose(_) | ChannelOpen(_) | ChannelFlow(_) | ChannelClose(_) => {
warn!("method should be processed by transport layer");
return Err(ConException::NotAllowed.into());
}
};
Ok(response)