queue declare

This commit is contained in:
nora 2022-02-26 21:55:17 +01:00
parent 606438f301
commit 8532d454c3
13 changed files with 255 additions and 71 deletions

3
Cargo.lock generated
View file

@ -30,6 +30,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"bytes", "bytes",
"parking_lot", "parking_lot",
"rand",
"smallvec", "smallvec",
"thiserror", "thiserror",
"uuid", "uuid",
@ -51,8 +52,10 @@ name = "amqp_messaging"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"amqp_core", "amqp_core",
"parking_lot",
"tokio", "tokio",
"tracing", "tracing",
"uuid",
] ]
[[package]] [[package]]

View file

@ -8,6 +8,7 @@ edition = "2021"
[dependencies] [dependencies]
bytes = "1.1.0" bytes = "1.1.0"
parking_lot = "0.12.0" parking_lot = "0.12.0"
rand = "0.8.5"
smallvec = { version = "1.8.0", features = ["union"] } smallvec = { version = "1.8.0", features = ["union"] }
thiserror = "1.0.30" thiserror = "1.0.30"
uuid = "0.8.2" uuid = "0.8.2"

View file

@ -25,6 +25,8 @@ impl Default for GlobalData {
inner: Arc::new(Mutex::new(GlobalDataInner { inner: Arc::new(Mutex::new(GlobalDataInner {
connections: HashMap::new(), connections: HashMap::new(),
channels: HashMap::new(), channels: HashMap::new(),
queues: HashMap::new(),
default_exchange: HashMap::new(),
})), })),
} }
} }
@ -40,6 +42,9 @@ impl GlobalData {
pub struct GlobalDataInner { pub struct GlobalDataInner {
pub connections: HashMap<Uuid, ConnectionHandle>, pub connections: HashMap<Uuid, ConnectionHandle>,
pub channels: HashMap<Uuid, ChannelHandle>, pub channels: HashMap<Uuid, ChannelHandle>,
pub queues: HashMap<Uuid, Queue>,
/// Todo: This is just for testing and will be removed later!
pub default_exchange: HashMap<String, Queue>,
} }
pub type ConnectionHandle = Handle<Connection>; pub type ConnectionHandle = Handle<Connection>;
@ -104,3 +109,14 @@ impl Channel {
global_data.channels.remove(&self.id); global_data.channels.remove(&self.id);
} }
} }
pub fn gen_uuid() -> Uuid {
Uuid::from_bytes(rand::random())
}
#[macro_export]
macro_rules! amqp_todo {
() => {
return Err(::amqp_core::error::ConException::NotImplemented.into())
};
}

View file

@ -12,6 +12,7 @@ pub struct RawQueue {
pub name: String, pub name: String,
pub messages: Mutex<Vec<Message>>, // use a concurrent linked list??? pub messages: Mutex<Vec<Message>>, // use a concurrent linked list???
pub durable: bool, pub durable: bool,
pub exclusive: Option<Uuid>,
/// Whether the queue will automatically be deleted when no consumers uses it anymore. /// Whether the queue will automatically be deleted when no consumers uses it anymore.
/// The queue can always be manually deleted. /// The queue can always be manually deleted.
/// If auto-delete is enabled, it keeps track of the consumer count. /// If auto-delete is enabled, it keeps track of the consumer count.

View file

@ -7,5 +7,7 @@ edition = "2021"
[dependencies] [dependencies]
amqp_core = { path = "../amqp_core" } amqp_core = { path = "../amqp_core" }
parking_lot = "0.12.0"
tracing = "0.1.31" tracing = "0.1.31"
tokio = { version = "1.17.0", features = ["full"] } tokio = { version = "1.17.0", features = ["full"] }
uuid = "0.8.2"

View file

@ -1,60 +0,0 @@
use amqp_core::error::{ConException, ProtocolError};
use amqp_core::message::Message;
use amqp_core::methods::Method;
use amqp_core::ChannelHandle;
use tracing::{debug, info};
pub async fn handle_basic_publish(_channel_handle: ChannelHandle, message: Message) {
info!(
?message,
"Someone has summoned the almighty Basic.Publish handler"
);
}
pub async fn handle_method(
_channel_handle: ChannelHandle,
method: Method,
) -> Result<(), ProtocolError> {
match method {
Method::ExchangeDeclare { .. } => Err(ConException::NotImplemented.into()),
Method::ExchangeDeclareOk => Err(ConException::NotImplemented.into()),
Method::ExchangeDelete { .. } => Err(ConException::NotImplemented.into()),
Method::ExchangeDeleteOk => Err(ConException::NotImplemented.into()),
Method::QueueDeclare { .. } => Err(ConException::NotImplemented.into()),
Method::QueueDeclareOk { .. } => Err(ConException::NotImplemented.into()),
Method::QueueBind { .. } => Err(ConException::NotImplemented.into()),
Method::QueueBindOk => Err(ConException::NotImplemented.into()),
Method::QueueUnbind { .. } => Err(ConException::NotImplemented.into()),
Method::QueueUnbindOk => Err(ConException::NotImplemented.into()),
Method::QueuePurge { .. } => Err(ConException::NotImplemented.into()),
Method::QueuePurgeOk { .. } => Err(ConException::NotImplemented.into()),
Method::QueueDelete { .. } => Err(ConException::NotImplemented.into()),
Method::QueueDeleteOk { .. } => Err(ConException::NotImplemented.into()),
Method::BasicQos { .. } => Err(ConException::NotImplemented.into()),
Method::BasicQosOk => Err(ConException::NotImplemented.into()),
Method::BasicConsume { .. } => Err(ConException::NotImplemented.into()),
Method::BasicConsumeOk { .. } => Err(ConException::NotImplemented.into()),
Method::BasicCancel { .. } => Err(ConException::NotImplemented.into()),
Method::BasicCancelOk { .. } => Err(ConException::NotImplemented.into()),
Method::BasicReturn { .. } => Err(ConException::NotImplemented.into()),
Method::BasicDeliver { .. } => Err(ConException::NotImplemented.into()),
Method::BasicGet { .. } => Err(ConException::NotImplemented.into()),
Method::BasicGetOk { .. } => Err(ConException::NotImplemented.into()),
Method::BasicGetEmpty { .. } => Err(ConException::NotImplemented.into()),
Method::BasicAck { .. } => Err(ConException::NotImplemented.into()),
Method::BasicReject { .. } => Err(ConException::NotImplemented.into()),
Method::BasicRecoverAsync { .. } => Err(ConException::NotImplemented.into()),
Method::BasicRecover { .. } => Err(ConException::NotImplemented.into()),
Method::BasicRecoverOk => Err(ConException::NotImplemented.into()),
Method::TxSelect
| Method::TxSelectOk
| Method::TxCommit
| Method::TxCommitOk
| Method::TxRollback
| Method::TxRollbackOk => Err(ConException::NotImplemented.into()),
Method::BasicPublish { .. } => {
unreachable!("Basic.Publish is handled somewhere else because it has a body")
}
_ => unreachable!("Method handled by transport layer"),
}
}

View file

@ -0,0 +1,17 @@
use amqp_core::error::ProtocolError;
use amqp_core::methods::{Bit, ConsumerTag, NoAck, NoLocal, NoWait, QueueName, Table};
use amqp_core::ChannelHandle;
#[allow(clippy::too_many_arguments)]
pub async fn consume(
_channel_handle: ChannelHandle,
_queue: QueueName,
_consumer_tag: ConsumerTag,
_no_local: NoLocal,
_no_ack: NoAck,
_exclusive: Bit,
_no_wait: NoWait,
_arguments: Table,
) -> Result<(), ProtocolError> {
Ok(())
}

View file

@ -0,0 +1,125 @@
mod consume;
mod queue;
use amqp_core::amqp_todo;
use amqp_core::error::ProtocolError;
use amqp_core::message::Message;
use amqp_core::methods::Method;
use amqp_core::ChannelHandle;
use tracing::info;
pub async fn handle_basic_publish(_channel_handle: ChannelHandle, message: Message) {
info!(
?message,
"Someone has summoned the almighty Basic.Publish handler"
);
}
pub async fn handle_method(
channel_handle: ChannelHandle,
method: Method,
) -> Result<(), ProtocolError> {
info!(?method, "Handling method");
match method {
Method::ExchangeDeclare { .. } => amqp_todo!(),
Method::ExchangeDeclareOk => amqp_todo!(),
Method::ExchangeDelete { .. } => amqp_todo!(),
Method::ExchangeDeleteOk => amqp_todo!(),
Method::QueueDeclare {
queue,
passive,
durable,
exclusive,
auto_delete,
no_wait,
arguments,
..
} => {
queue::declare(
channel_handle,
queue,
passive,
durable,
exclusive,
auto_delete,
no_wait,
arguments,
)
.await
}
Method::QueueDeclareOk { .. } => amqp_todo!(),
Method::QueueBind {
queue,
exchange,
routing_key,
no_wait,
arguments,
..
} => {
queue::bind(
channel_handle,
queue,
exchange,
routing_key,
no_wait,
arguments,
)
.await
}
Method::QueueBindOk => amqp_todo!(),
Method::QueueUnbind { .. } => amqp_todo!(),
Method::QueueUnbindOk => amqp_todo!(),
Method::QueuePurge { .. } => amqp_todo!(),
Method::QueuePurgeOk { .. } => amqp_todo!(),
Method::QueueDelete { .. } => amqp_todo!(),
Method::QueueDeleteOk { .. } => amqp_todo!(),
Method::BasicQos { .. } => amqp_todo!(),
Method::BasicQosOk => amqp_todo!(),
Method::BasicConsume {
queue,
consumer_tag,
no_local,
no_ack,
exclusive,
no_wait,
arguments,
..
} => {
consume::consume(
channel_handle,
queue,
consumer_tag,
no_local,
no_ack,
exclusive,
no_wait,
arguments,
)
.await
}
Method::BasicConsumeOk { .. } => amqp_todo!(),
Method::BasicCancel { .. } => amqp_todo!(),
Method::BasicCancelOk { .. } => amqp_todo!(),
Method::BasicReturn { .. } => amqp_todo!(),
Method::BasicDeliver { .. } => amqp_todo!(),
Method::BasicGet { .. } => amqp_todo!(),
Method::BasicGetOk { .. } => amqp_todo!(),
Method::BasicGetEmpty { .. } => amqp_todo!(),
Method::BasicAck { .. } => amqp_todo!(),
Method::BasicReject { .. } => amqp_todo!(),
Method::BasicRecoverAsync { .. } => amqp_todo!(),
Method::BasicRecover { .. } => amqp_todo!(),
Method::BasicRecoverOk => amqp_todo!(),
Method::TxSelect
| Method::TxSelectOk
| Method::TxCommit
| Method::TxCommitOk
| Method::TxRollback
| Method::TxRollbackOk => amqp_todo!(),
Method::BasicPublish { .. } => {
unreachable!("Basic.Publish is handled somewhere else because it has a body")
}
_ => unreachable!("Method handled by transport layer"),
}
}

View file

@ -0,0 +1,80 @@
#![deny(clippy::future_not_send)]
use amqp_core::error::{ConException, ProtocolError};
use amqp_core::methods::{Bit, ExchangeName, NoWait, QueueName, Shortstr, Table};
use amqp_core::queue::{QueueDeletion, RawQueue};
use amqp_core::ChannelHandle;
use amqp_core::{amqp_todo, GlobalData};
use parking_lot::Mutex;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use uuid::Uuid;
#[allow(clippy::too_many_arguments)]
pub async fn declare(
channel_handle: ChannelHandle,
queue_name: QueueName,
passive: Bit,
durable: Bit,
exclusive: Bit,
auto_delete: Bit,
no_wait: NoWait,
arguments: Table,
) -> Result<(), ProtocolError> {
if !arguments.is_empty() {
return Err(ConException::Todo.into());
}
let (global_data, id) = {
let channel = channel_handle.lock();
if passive || no_wait {
amqp_todo!();
}
let id = amqp_core::gen_uuid();
let queue = Arc::new(RawQueue {
id,
name: queue_name.clone(),
messages: Mutex::default(),
durable,
exclusive: exclusive.then(|| channel.id),
deletion: if auto_delete {
QueueDeletion::Auto(AtomicUsize::default())
} else {
QueueDeletion::Manual
},
});
let global_data = channel.global_data.clone();
{
let mut global_data_lock = global_data.lock();
global_data_lock.queues.insert(id, queue);
}
(global_data, id)
};
bind_queue(global_data, id, (), queue_name).await
}
pub async fn bind(
_channel_handle: ChannelHandle,
_queue: QueueName,
_exchange: ExchangeName,
_routing_key: Shortstr,
_no_wait: NoWait,
_arguments: Table,
) -> Result<(), ProtocolError> {
amqp_todo!();
}
async fn bind_queue(
_global_data: GlobalData,
_queue: Uuid,
_exchange: (),
_routing_key: String,
) -> Result<(), ProtocolError> {
amqp_todo!();
}

View file

@ -353,7 +353,7 @@ impl Connection {
} = method } = method
{ {
let message = RawMessage { let message = RawMessage {
id: Uuid::from_bytes(rand::random()), id: amqp_core::gen_uuid(),
properties: header.property_fields, properties: header.property_fields,
routing: RoutingInformation { routing: RoutingInformation {
exchange, exchange,
@ -380,7 +380,7 @@ impl Connection {
} }
async fn channel_open(&mut self, channel_id: ChannelId) -> Result<()> { async fn channel_open(&mut self, channel_id: ChannelId) -> Result<()> {
let id = Uuid::from_bytes(rand::random()); let id = amqp_core::gen_uuid();
let channel_handle = amqp_core::Channel::new_handle( let channel_handle = amqp_core::Channel::new_handle(
id, id,
channel_id.num(), channel_id.num(),

View file

@ -15,7 +15,6 @@ use amqp_core::GlobalData;
use anyhow::Result; use anyhow::Result;
use tokio::net; use tokio::net;
use tracing::{info, info_span, Instrument}; use tracing::{info, info_span, Instrument};
use uuid::Uuid;
pub async fn do_thing_i_guess(global_data: GlobalData) -> Result<()> { pub async fn do_thing_i_guess(global_data: GlobalData) -> Result<()> {
info!("Binding TCP listener..."); info!("Binding TCP listener...");
@ -25,7 +24,7 @@ pub async fn do_thing_i_guess(global_data: GlobalData) -> Result<()> {
loop { loop {
let (stream, peer_addr) = listener.accept().await?; let (stream, peer_addr) = listener.accept().await?;
let id = Uuid::from_bytes(rand::random()); let id = amqp_core::gen_uuid();
info!(local_addr = ?stream.local_addr(), %id, "Accepted new connection"); info!(local_addr = ?stream.local_addr(), %id, "Accepted new connection");
let span = info_span!("client-connection", %id); let span = info_span!("client-connection", %id);

View file

@ -42,8 +42,8 @@ pub fn fail_err<S: Into<String>>(msg: S) -> impl FnOnce(Err<TransError>) -> Err<
Err::Failure(ConException::SyntaxError(stack).into()) Err::Failure(ConException::SyntaxError(stack).into())
} }
} }
pub fn err_other<E, S: Into<String>>(msg: S) -> impl FnOnce(E) -> Err<TransError> { pub fn other_fail<E, S: Into<String>>(msg: S) -> impl FnOnce(E) -> Err<TransError> {
move |_| Err::Error(ConException::SyntaxError(vec![msg.into()]).into()) move |_| Err::Failure(ConException::SyntaxError(vec![msg.into()]).into())
} }
#[macro_export] #[macro_export]
@ -105,7 +105,7 @@ pub fn bit(input: &[u8], amount: usize) -> IResult<'_, Vec<Bit>> {
pub fn shortstr(input: &[u8]) -> IResult<'_, Shortstr> { pub fn shortstr(input: &[u8]) -> IResult<'_, Shortstr> {
let (input, len) = u8(input)?; let (input, len) = u8(input)?;
let (input, str_data) = take(usize::from(len))(input)?; let (input, str_data) = take(usize::from(len))(input)?;
let data = String::from_utf8(str_data.into()).map_err(err_other("shortstr"))?; let data = String::from_utf8(str_data.into()).map_err(other_fail("shortstr"))?;
Ok((input, data)) Ok((input, data))
} }

View file

@ -16,9 +16,9 @@ pub fn parse_sasl_plain_response(response: &[u8]) -> Result<PlainUser> {
.split(|&n| n == 0) .split(|&n| n == 0)
.map(|bytes| String::from_utf8(bytes.into()).map_err(|_| ConException::Todo)); .map(|bytes| String::from_utf8(bytes.into()).map_err(|_| ConException::Todo));
let authorization_identity = parts.next().ok_or_else(|| ConException::Todo)??; let authorization_identity = parts.next().ok_or(ConException::Todo)??;
let authentication_identity = parts.next().ok_or_else(|| ConException::Todo)??; let authentication_identity = parts.next().ok_or(ConException::Todo)??;
let password = parts.next().ok_or_else(|| ConException::Todo)??; let password = parts.next().ok_or(ConException::Todo)??;
Ok(PlainUser { Ok(PlainUser {
authorization_identity, authorization_identity,