mirror of
https://github.com/Noratrieb/haesli.git
synced 2026-01-17 05:05:03 +01:00
Merge remote-tracking branch 'origin/main' into main
# Conflicts: # xtask/src/test_js.rs
This commit is contained in:
commit
fc131327b2
38 changed files with 2037 additions and 1242 deletions
|
|
@ -1,6 +1,2 @@
|
|||
[alias]
|
||||
xtask = "run --package xtask --"
|
||||
|
||||
[build]
|
||||
# note: if this doesn't apply, update your global rustflags in "~/.cargo/config.toml"
|
||||
rustflags = ["--cfg", "tokio_unstable"]
|
||||
8
Cargo.lock
generated
8
Cargo.lock
generated
|
|
@ -30,7 +30,9 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"bytes",
|
||||
"parking_lot",
|
||||
"rand",
|
||||
"smallvec",
|
||||
"thiserror",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
|
|
@ -50,6 +52,7 @@ name = "amqp_messaging"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"amqp_core",
|
||||
"parking_lot",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
|
@ -71,7 +74,6 @@ dependencies = [
|
|||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
@ -85,9 +87,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.54"
|
||||
version = "1.0.55"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7a99269dff3bc004caa411f38845c20303f1e393ca2bd6581576fa3a7f59577d"
|
||||
checksum = "159bb86af3a200e19a068f4224eae4c8bb2d0fa054c7e5d1cacd5cef95e684cd"
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
|
|
|
|||
|
|
@ -8,5 +8,7 @@ edition = "2021"
|
|||
[dependencies]
|
||||
bytes = "1.1.0"
|
||||
parking_lot = "0.12.0"
|
||||
rand = "0.8.5"
|
||||
smallvec = { version = "1.8.0", features = ["union"] }
|
||||
thiserror = "1.0.30"
|
||||
uuid = "0.8.2"
|
||||
|
|
|
|||
105
amqp_core/src/connection.rs
Normal file
105
amqp_core/src/connection.rs
Normal file
|
|
@ -0,0 +1,105 @@
|
|||
use crate::{newtype_id, GlobalData, Handle, Queue};
|
||||
use parking_lot::Mutex;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
newtype_id!(pub ConnectionId);
|
||||
newtype_id!(pub ChannelId);
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub struct ChannelNum(u16);
|
||||
|
||||
impl ChannelNum {
|
||||
#[must_use]
|
||||
pub fn new(num: u16) -> Self {
|
||||
Self(num)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn num(self) -> u16 {
|
||||
self.0
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn is_zero(self) -> bool {
|
||||
self.0 == 0
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn zero() -> Self {
|
||||
Self(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ChannelNum {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
pub type ConnectionHandle = Handle<Connection>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Connection {
|
||||
pub id: ConnectionId,
|
||||
pub peer_addr: SocketAddr,
|
||||
pub global_data: GlobalData,
|
||||
pub channels: HashMap<u16, ChannelHandle>,
|
||||
pub exclusive_queues: Vec<Queue>,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
#[must_use]
|
||||
pub fn new_handle(
|
||||
id: ConnectionId,
|
||||
peer_addr: SocketAddr,
|
||||
global_data: GlobalData,
|
||||
) -> ConnectionHandle {
|
||||
Arc::new(Mutex::new(Self {
|
||||
id,
|
||||
peer_addr,
|
||||
global_data,
|
||||
channels: HashMap::new(),
|
||||
exclusive_queues: vec![],
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn close(&self) {
|
||||
let mut global_data = self.global_data.lock();
|
||||
global_data.connections.remove(&self.id);
|
||||
}
|
||||
}
|
||||
|
||||
pub type ChannelHandle = Handle<Channel>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Channel {
|
||||
pub id: ChannelId,
|
||||
pub num: u16,
|
||||
pub connection: ConnectionHandle,
|
||||
pub global_data: GlobalData,
|
||||
}
|
||||
|
||||
impl Channel {
|
||||
#[must_use]
|
||||
pub fn new_handle(
|
||||
id: ChannelId,
|
||||
num: u16,
|
||||
connection: ConnectionHandle,
|
||||
global_data: GlobalData,
|
||||
) -> ChannelHandle {
|
||||
Arc::new(Mutex::new(Self {
|
||||
id,
|
||||
num,
|
||||
connection,
|
||||
global_data,
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn close(&self) {
|
||||
let mut global_data = self.global_data.lock();
|
||||
global_data.channels.remove(&self.id);
|
||||
}
|
||||
}
|
||||
35
amqp_core/src/error.rs
Normal file
35
amqp_core/src/error.rs
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ProtocolError {
|
||||
#[error("fatal error")]
|
||||
Fatal,
|
||||
#[error("{0}")]
|
||||
ConException(#[from] ConException),
|
||||
#[error("{0}")]
|
||||
ChannelException(#[from] ChannelException),
|
||||
#[error("Connection must be closed")]
|
||||
CloseNow,
|
||||
#[error("Graceful connection closing requested")]
|
||||
GracefulClose,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ConException {
|
||||
#[error("501 Frame error")]
|
||||
FrameError,
|
||||
#[error("503 Command invalid")]
|
||||
CommandInvalid,
|
||||
#[error("503 Syntax error | {0:?}")]
|
||||
/// A method was received but there was a syntax error. The string stores where it occurred.
|
||||
SyntaxError(Vec<String>),
|
||||
#[error("504 Channel error")]
|
||||
ChannelError,
|
||||
#[error("505 Unexpected Frame")]
|
||||
UnexpectedFrame,
|
||||
#[error("540 Not implemented. '{0}'")]
|
||||
NotImplemented(&'static str),
|
||||
#[error("xxx Not decided yet")]
|
||||
Todo,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ChannelException {}
|
||||
|
|
@ -1,13 +1,18 @@
|
|||
#![warn(rust_2018_idioms)]
|
||||
|
||||
mod message;
|
||||
pub mod connection;
|
||||
pub mod error;
|
||||
mod macros;
|
||||
pub mod message;
|
||||
pub mod methods;
|
||||
pub mod queue;
|
||||
|
||||
use crate::connection::{ChannelHandle, ConnectionHandle};
|
||||
use crate::queue::{Queue, QueueName};
|
||||
use connection::{ChannelId, ConnectionId};
|
||||
use parking_lot::Mutex;
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use uuid::Uuid;
|
||||
|
||||
type Handle<T> = Arc<Mutex<T>>;
|
||||
|
||||
|
|
@ -22,6 +27,8 @@ impl Default for GlobalData {
|
|||
inner: Arc::new(Mutex::new(GlobalDataInner {
|
||||
connections: HashMap::new(),
|
||||
channels: HashMap::new(),
|
||||
queues: HashMap::new(),
|
||||
default_exchange: HashMap::new(),
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
|
@ -35,67 +42,9 @@ impl GlobalData {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct GlobalDataInner {
|
||||
pub connections: HashMap<Uuid, ConnectionHandle>,
|
||||
pub channels: HashMap<Uuid, ChannelHandle>,
|
||||
}
|
||||
|
||||
pub type ConnectionHandle = Handle<Connection>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Connection {
|
||||
pub id: Uuid,
|
||||
pub peer_addr: SocketAddr,
|
||||
pub global_data: GlobalData,
|
||||
pub channels: HashMap<u16, ChannelHandle>,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub fn new_handle(
|
||||
id: Uuid,
|
||||
peer_addr: SocketAddr,
|
||||
global_data: GlobalData,
|
||||
) -> ConnectionHandle {
|
||||
Arc::new(Mutex::new(Self {
|
||||
id,
|
||||
peer_addr,
|
||||
global_data,
|
||||
channels: HashMap::new(),
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn close(&self) {
|
||||
let mut global_data = self.global_data.lock();
|
||||
global_data.connections.remove(&self.id);
|
||||
}
|
||||
}
|
||||
|
||||
pub type ChannelHandle = Handle<Channel>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Channel {
|
||||
pub id: Uuid,
|
||||
pub num: u16,
|
||||
pub connection: ConnectionHandle,
|
||||
pub global_data: GlobalData,
|
||||
}
|
||||
|
||||
impl Channel {
|
||||
pub fn new_handle(
|
||||
id: Uuid,
|
||||
num: u16,
|
||||
connection: ConnectionHandle,
|
||||
global_data: GlobalData,
|
||||
) -> ChannelHandle {
|
||||
Arc::new(Mutex::new(Self {
|
||||
id,
|
||||
num,
|
||||
connection,
|
||||
global_data,
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn close(&self) {
|
||||
let mut global_data = self.global_data.lock();
|
||||
global_data.channels.remove(&self.id);
|
||||
}
|
||||
pub connections: HashMap<ConnectionId, ConnectionHandle>,
|
||||
pub channels: HashMap<ChannelId, ChannelHandle>,
|
||||
pub queues: HashMap<QueueName, Queue>,
|
||||
/// Todo: This is just for testing and will be removed later!
|
||||
pub default_exchange: HashMap<String, Queue>,
|
||||
}
|
||||
|
|
|
|||
70
amqp_core/src/macros.rs
Normal file
70
amqp_core/src/macros.rs
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
#[macro_export]
|
||||
macro_rules! newtype_id {
|
||||
($vis:vis $name:ident) => {
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
$vis struct $name(::uuid::Uuid);
|
||||
|
||||
impl $name {
|
||||
#[must_use]
|
||||
pub fn random() -> Self {
|
||||
::rand::random()
|
||||
}
|
||||
}
|
||||
|
||||
impl ::std::fmt::Display for $name {
|
||||
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl ::rand::prelude::Distribution<$name> for ::rand::distributions::Standard {
|
||||
fn sample<R: ::rand::Rng + ?Sized>(&self, rng: &mut R) -> $name {
|
||||
$name(::uuid::Uuid::from_bytes(rng.gen()))
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! newtype {
|
||||
($(#[$meta:meta])* $vis:vis $name:ident: $ty:ty) => {
|
||||
$(#[$meta])*
|
||||
$vis struct $name($ty);
|
||||
|
||||
impl $name {
|
||||
pub fn new(inner: $ty) -> Self {
|
||||
Self(inner)
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> $ty {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for $name {
|
||||
type Target = $ty;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> std::convert::From<T> for $name
|
||||
where
|
||||
$ty: From<T>,
|
||||
{
|
||||
fn from(other: T) -> Self {
|
||||
Self(other.into())
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! amqp_todo {
|
||||
() => {
|
||||
return Err(
|
||||
::amqp_core::error::ConException::NotImplemented(concat!(file!(), ":", line!())).into(),
|
||||
)
|
||||
};
|
||||
}
|
||||
|
|
@ -1,20 +1,24 @@
|
|||
#![allow(dead_code)]
|
||||
|
||||
use crate::methods;
|
||||
use crate::newtype_id;
|
||||
use bytes::Bytes;
|
||||
use smallvec::SmallVec;
|
||||
use std::sync::Arc;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub type Message = Arc<RawMessage>;
|
||||
|
||||
newtype_id!(pub MessageId);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RawMessage {
|
||||
id: Uuid,
|
||||
properties: methods::Table,
|
||||
routing: RoutingInformation,
|
||||
content: SmallVec<[Bytes; 1]>,
|
||||
pub id: MessageId,
|
||||
pub properties: methods::Table,
|
||||
pub routing: RoutingInformation,
|
||||
pub content: SmallVec<[Bytes; 1]>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RoutingInformation {
|
||||
pub exchange: String,
|
||||
pub routing_key: String,
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
34
amqp_core/src/queue.rs
Normal file
34
amqp_core/src/queue.rs
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
use crate::message::Message;
|
||||
use crate::{newtype, newtype_id, ChannelId};
|
||||
use parking_lot::Mutex;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub type Queue = Arc<RawQueue>;
|
||||
|
||||
newtype_id!(pub QueueId);
|
||||
|
||||
newtype!(
|
||||
/// The name of a queue. A newtype wrapper around `Arc<str>`, which guarantees cheap clones.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub QueueName: Arc<str>
|
||||
);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RawQueue {
|
||||
pub id: QueueId,
|
||||
pub name: QueueName,
|
||||
pub messages: Mutex<Vec<Message>>, // use a concurrent linked list???
|
||||
pub durable: bool,
|
||||
pub exclusive: Option<ChannelId>,
|
||||
/// Whether the queue will automatically be deleted when no consumers uses it anymore.
|
||||
/// The queue can always be manually deleted.
|
||||
/// If auto-delete is enabled, it keeps track of the consumer count.
|
||||
pub deletion: QueueDeletion,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum QueueDeletion {
|
||||
Auto(AtomicUsize),
|
||||
Manual,
|
||||
}
|
||||
|
|
@ -14,6 +14,8 @@
|
|||
<h1>AMQP Data</h1>
|
||||
<h2>Connections</h2>
|
||||
<div id="connection-wrapper"></div>
|
||||
<h2>Queues</h2>
|
||||
<div id="queue-wrapper"></div>
|
||||
|
||||
<script src="script.js"></script>
|
||||
</body>
|
||||
|
|
|
|||
|
|
@ -39,10 +39,23 @@ const renderConnections = (connections) => {
|
|||
wrapper.replaceChildren(table);
|
||||
};
|
||||
|
||||
const renderQueues = (queues) => {
|
||||
const wrapper = document.getElementById('queue-wrapper');
|
||||
|
||||
const table = renderTable(
|
||||
['Queue ID', 'Name', 'Durable'],
|
||||
queues.map((queue) => {
|
||||
return [queue.id, queue.name, queue.durable ? 'Yes' : 'No'];
|
||||
})
|
||||
);
|
||||
wrapper.replaceChildren(table);
|
||||
};
|
||||
|
||||
const refresh = async () => {
|
||||
const fetched = await fetch('api/data');
|
||||
const data = await fetched.json();
|
||||
renderConnections(data.connections);
|
||||
renderQueues(data.queues);
|
||||
};
|
||||
|
||||
setInterval(refresh, 1000);
|
||||
|
|
|
|||
|
|
@ -51,6 +51,7 @@ async fn get_style_css() -> Response {
|
|||
#[derive(Serialize)]
|
||||
struct Data {
|
||||
connections: Vec<Connection>,
|
||||
queues: Vec<Queue>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
|
@ -66,6 +67,13 @@ struct Channel {
|
|||
number: u16,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct Queue {
|
||||
id: String,
|
||||
name: String,
|
||||
durable: bool,
|
||||
}
|
||||
|
||||
async fn get_data(global_data: GlobalData) -> impl IntoResponse {
|
||||
let global_data = global_data.lock();
|
||||
|
||||
|
|
@ -92,7 +100,20 @@ async fn get_data(global_data: GlobalData) -> impl IntoResponse {
|
|||
})
|
||||
.collect();
|
||||
|
||||
let data = Data { connections };
|
||||
let queues = global_data
|
||||
.queues
|
||||
.values()
|
||||
.map(|queue| Queue {
|
||||
id: queue.id.to_string(),
|
||||
name: queue.name.to_string(),
|
||||
durable: queue.durable,
|
||||
})
|
||||
.collect();
|
||||
|
||||
let data = Data {
|
||||
connections,
|
||||
queues,
|
||||
};
|
||||
|
||||
Json(data)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,5 +7,6 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
amqp_core = { path = "../amqp_core" }
|
||||
parking_lot = "0.12.0"
|
||||
tracing = "0.1.31"
|
||||
tokio = { version = "1.17.0", features = ["full"] }
|
||||
|
|
@ -1,10 +0,0 @@
|
|||
use amqp_core::methods::Method;
|
||||
use amqp_core::ChannelHandle;
|
||||
use std::time::Duration;
|
||||
use tokio::time;
|
||||
use tracing::debug;
|
||||
|
||||
pub async fn handle_method(_channel_handle: ChannelHandle, _method: Method) {
|
||||
debug!("handling method or something in that cool new future");
|
||||
time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
13
amqp_messaging/src/methods/consume.rs
Normal file
13
amqp_messaging/src/methods/consume.rs
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
use amqp_core::amqp_todo;
|
||||
use amqp_core::connection::ChannelHandle;
|
||||
use amqp_core::error::ProtocolError;
|
||||
use amqp_core::methods::{BasicConsume, Method};
|
||||
|
||||
pub async fn consume(
|
||||
channel_handle: ChannelHandle,
|
||||
_basic_consume: BasicConsume,
|
||||
) -> Result<Method, ProtocolError> {
|
||||
let _channel = channel_handle.lock();
|
||||
|
||||
amqp_todo!()
|
||||
}
|
||||
70
amqp_messaging/src/methods/mod.rs
Normal file
70
amqp_messaging/src/methods/mod.rs
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
mod consume;
|
||||
mod queue;
|
||||
|
||||
use amqp_core::amqp_todo;
|
||||
use amqp_core::connection::ChannelHandle;
|
||||
use amqp_core::error::ProtocolError;
|
||||
use amqp_core::message::Message;
|
||||
use amqp_core::methods::Method;
|
||||
use tracing::info;
|
||||
|
||||
pub async fn handle_basic_publish(_channel_handle: ChannelHandle, message: Message) {
|
||||
info!(
|
||||
?message,
|
||||
"Someone has summoned the almighty Basic.Publish handler"
|
||||
);
|
||||
}
|
||||
|
||||
pub async fn handle_method(
|
||||
channel_handle: ChannelHandle,
|
||||
method: Method,
|
||||
) -> Result<Method, ProtocolError> {
|
||||
info!(?method, "Handling method");
|
||||
|
||||
let response = match method {
|
||||
Method::ExchangeDeclare(_) => amqp_todo!(),
|
||||
Method::ExchangeDeclareOk(_) => amqp_todo!(),
|
||||
Method::ExchangeDelete(_) => amqp_todo!(),
|
||||
Method::ExchangeDeleteOk(_) => amqp_todo!(),
|
||||
Method::QueueDeclare(queue_declare) => {
|
||||
queue::declare(channel_handle, queue_declare).await?
|
||||
}
|
||||
Method::QueueDeclareOk { .. } => amqp_todo!(),
|
||||
Method::QueueBind(queue_bind) => queue::bind(channel_handle, queue_bind).await?,
|
||||
Method::QueueBindOk(_) => amqp_todo!(),
|
||||
Method::QueueUnbind { .. } => amqp_todo!(),
|
||||
Method::QueueUnbindOk(_) => amqp_todo!(),
|
||||
Method::QueuePurge { .. } => amqp_todo!(),
|
||||
Method::QueuePurgeOk { .. } => amqp_todo!(),
|
||||
Method::QueueDelete { .. } => amqp_todo!(),
|
||||
Method::QueueDeleteOk { .. } => amqp_todo!(),
|
||||
Method::BasicQos { .. } => amqp_todo!(),
|
||||
Method::BasicQosOk(_) => amqp_todo!(),
|
||||
Method::BasicConsume(consume) => consume::consume(channel_handle, consume).await?,
|
||||
Method::BasicConsumeOk { .. } => amqp_todo!(),
|
||||
Method::BasicCancel { .. } => amqp_todo!(),
|
||||
Method::BasicCancelOk { .. } => amqp_todo!(),
|
||||
Method::BasicReturn { .. } => amqp_todo!(),
|
||||
Method::BasicDeliver { .. } => amqp_todo!(),
|
||||
Method::BasicGet { .. } => amqp_todo!(),
|
||||
Method::BasicGetOk { .. } => amqp_todo!(),
|
||||
Method::BasicGetEmpty { .. } => amqp_todo!(),
|
||||
Method::BasicAck { .. } => amqp_todo!(),
|
||||
Method::BasicReject { .. } => amqp_todo!(),
|
||||
Method::BasicRecoverAsync { .. } => amqp_todo!(),
|
||||
Method::BasicRecover { .. } => amqp_todo!(),
|
||||
Method::BasicRecoverOk(_) => amqp_todo!(),
|
||||
Method::TxSelect(_)
|
||||
| Method::TxSelectOk(_)
|
||||
| Method::TxCommit(_)
|
||||
| Method::TxCommitOk(_)
|
||||
| Method::TxRollback(_)
|
||||
| Method::TxRollbackOk(_) => amqp_todo!(),
|
||||
Method::BasicPublish { .. } => {
|
||||
unreachable!("Basic.Publish is handled somewhere else because it has a body")
|
||||
}
|
||||
_ => unreachable!("Method handled by transport layer"),
|
||||
};
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
95
amqp_messaging/src/methods/queue.rs
Normal file
95
amqp_messaging/src/methods/queue.rs
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
use amqp_core::connection::ChannelHandle;
|
||||
use amqp_core::error::{ConException, ProtocolError};
|
||||
use amqp_core::methods::{Method, QueueBind, QueueDeclare, QueueDeclareOk};
|
||||
use amqp_core::queue::{QueueDeletion, QueueId, QueueName, RawQueue};
|
||||
use amqp_core::{amqp_todo, GlobalData};
|
||||
use parking_lot::Mutex;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub async fn declare(
|
||||
channel_handle: ChannelHandle,
|
||||
queue_declare: QueueDeclare,
|
||||
) -> Result<Method, ProtocolError> {
|
||||
let QueueDeclare {
|
||||
queue: queue_name,
|
||||
passive,
|
||||
durable,
|
||||
exclusive,
|
||||
auto_delete,
|
||||
no_wait,
|
||||
arguments,
|
||||
..
|
||||
} = queue_declare;
|
||||
|
||||
let queue_name = QueueName::new(queue_name.into());
|
||||
|
||||
if !arguments.is_empty() {
|
||||
amqp_todo!();
|
||||
}
|
||||
|
||||
if passive || no_wait || durable {
|
||||
amqp_todo!();
|
||||
}
|
||||
|
||||
let global_data = {
|
||||
let channel = channel_handle.lock();
|
||||
let global_data = channel.global_data.clone();
|
||||
|
||||
let id = QueueId::random();
|
||||
let queue = Arc::new(RawQueue {
|
||||
id,
|
||||
name: queue_name.clone(),
|
||||
messages: Mutex::default(),
|
||||
durable,
|
||||
exclusive: exclusive.then(|| channel.id),
|
||||
deletion: if auto_delete {
|
||||
QueueDeletion::Auto(AtomicUsize::default())
|
||||
} else {
|
||||
QueueDeletion::Manual
|
||||
},
|
||||
});
|
||||
|
||||
{
|
||||
let mut global_data_lock = global_data.lock();
|
||||
global_data_lock.queues.insert(queue_name.clone(), queue);
|
||||
}
|
||||
|
||||
global_data
|
||||
};
|
||||
|
||||
bind_queue(global_data, (), queue_name.clone().into_inner()).await?;
|
||||
|
||||
Ok(Method::QueueDeclareOk(QueueDeclareOk {
|
||||
queue: queue_name.to_string(),
|
||||
message_count: 0,
|
||||
consumer_count: 0,
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn bind(
|
||||
_channel_handle: ChannelHandle,
|
||||
_queue_bind: QueueBind,
|
||||
) -> Result<Method, ProtocolError> {
|
||||
amqp_todo!();
|
||||
}
|
||||
|
||||
async fn bind_queue(
|
||||
global_data: GlobalData,
|
||||
_exchange: (),
|
||||
routing_key: Arc<str>,
|
||||
) -> Result<(), ProtocolError> {
|
||||
let mut global_data = global_data.lock();
|
||||
|
||||
// todo: don't
|
||||
let queue = global_data
|
||||
.queues
|
||||
.get(&QueueName::new(routing_key.clone()))
|
||||
.unwrap()
|
||||
.clone();
|
||||
global_data
|
||||
.default_exchange
|
||||
.insert(routing_key.to_string(), queue);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -18,7 +18,6 @@ smallvec = { version = "1.8.0", features = ["union"] }
|
|||
thiserror = "1.0.30"
|
||||
tokio = { version = "1.16.1", features = ["full"] }
|
||||
tracing = "0.1.30"
|
||||
uuid = "0.8.2"
|
||||
|
||||
[features]
|
||||
|
||||
|
|
|
|||
|
|
@ -1,30 +1,33 @@
|
|||
use crate::error::{ConException, ProtocolError, Result};
|
||||
use crate::frame::{ContentHeader, Frame, FrameType};
|
||||
use crate::{frame, methods, sasl};
|
||||
use amqp_core::connection::{ChannelHandle, ChannelNum, ConnectionHandle, ConnectionId};
|
||||
use amqp_core::message::{MessageId, RawMessage, RoutingInformation};
|
||||
use amqp_core::methods::{
|
||||
BasicPublish, ChannelClose, ChannelCloseOk, ChannelOpenOk, ConnectionClose, ConnectionCloseOk,
|
||||
ConnectionOpen, ConnectionOpenOk, ConnectionStart, ConnectionStartOk, ConnectionTune,
|
||||
ConnectionTuneOk, FieldValue, Method, Table,
|
||||
};
|
||||
use amqp_core::GlobalData;
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use smallvec::SmallVec;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use smallvec::SmallVec;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
use amqp_core::methods::{FieldValue, Method, Table};
|
||||
use amqp_core::GlobalData;
|
||||
|
||||
use crate::error::{ConException, ProtocolError, Result};
|
||||
use crate::frame::{ContentHeader, Frame, FrameType};
|
||||
use crate::{frame, methods, sasl};
|
||||
|
||||
fn ensure_conn(condition: bool) -> Result<()> {
|
||||
if condition {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ConException::Todo.into_trans())
|
||||
Err(ConException::Todo.into())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -33,37 +36,48 @@ const CHANNEL_MAX: u16 = 0;
|
|||
const FRAME_SIZE_MAX: u32 = 0;
|
||||
const HEARTBEAT_DELAY: u16 = 0;
|
||||
|
||||
#[allow(dead_code)]
|
||||
const BASIC_CLASS_ID: u16 = 60;
|
||||
|
||||
pub struct Channel {
|
||||
num: u16,
|
||||
channel_handle: amqp_core::ChannelHandle,
|
||||
/// A handle to the global channel representation. Used to remove the channel when it's dropped
|
||||
handle: ChannelHandle,
|
||||
/// The current status of the channel, whether it has sent a method that expects a body
|
||||
status: ChannelStatus,
|
||||
}
|
||||
|
||||
pub struct Connection {
|
||||
id: Uuid,
|
||||
id: ConnectionId,
|
||||
stream: TcpStream,
|
||||
max_frame_size: usize,
|
||||
heartbeat_delay: u16,
|
||||
channel_max: u16,
|
||||
/// When the next heartbeat expires
|
||||
next_timeout: Pin<Box<time::Sleep>>,
|
||||
channels: HashMap<u16, Channel>,
|
||||
connection_handle: amqp_core::ConnectionHandle,
|
||||
channels: HashMap<ChannelNum, Channel>,
|
||||
handle: ConnectionHandle,
|
||||
global_data: GlobalData,
|
||||
}
|
||||
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
enum WaitForBodyStatus {
|
||||
Method(Method),
|
||||
Header(Method, ContentHeader, SmallVec<[Bytes; 1]>),
|
||||
None,
|
||||
enum ChannelStatus {
|
||||
Default,
|
||||
/// ClassId // todo: newtype it
|
||||
NeedHeader(u16, Box<Method>),
|
||||
NeedsBody(Box<Method>, Box<ContentHeader>, SmallVec<[Bytes; 1]>),
|
||||
}
|
||||
|
||||
impl ChannelStatus {
|
||||
fn take(&mut self) -> Self {
|
||||
std::mem::replace(self, Self::Default)
|
||||
}
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub fn new(
|
||||
id: Uuid,
|
||||
id: ConnectionId,
|
||||
stream: TcpStream,
|
||||
connection_handle: amqp_core::ConnectionHandle,
|
||||
connection_handle: ConnectionHandle,
|
||||
global_data: GlobalData,
|
||||
) -> Self {
|
||||
Self {
|
||||
|
|
@ -73,8 +87,8 @@ impl Connection {
|
|||
heartbeat_delay: HEARTBEAT_DELAY,
|
||||
channel_max: CHANNEL_MAX,
|
||||
next_timeout: Box::pin(time::sleep(DEFAULT_TIMEOUT)),
|
||||
connection_handle,
|
||||
channels: HashMap::new(),
|
||||
handle: connection_handle,
|
||||
channels: HashMap::with_capacity(4),
|
||||
global_data,
|
||||
}
|
||||
}
|
||||
|
|
@ -85,7 +99,7 @@ impl Connection {
|
|||
Err(err) => error!(%err, "Error during processing of connection"),
|
||||
}
|
||||
|
||||
let connection_handle = self.connection_handle.lock();
|
||||
let connection_handle = self.handle.lock();
|
||||
connection_handle.close();
|
||||
}
|
||||
|
||||
|
|
@ -100,7 +114,7 @@ impl Connection {
|
|||
self.main_loop().await
|
||||
}
|
||||
|
||||
async fn send_method(&mut self, channel: u16, method: Method) -> Result<()> {
|
||||
async fn send_method(&mut self, channel: ChannelNum, method: Method) -> Result<()> {
|
||||
let mut payload = Vec::with_capacity(64);
|
||||
methods::write::write_method(method, &mut payload)?;
|
||||
frame::write_frame(
|
||||
|
|
@ -124,7 +138,7 @@ impl Connection {
|
|||
}
|
||||
|
||||
async fn start(&mut self) -> Result<()> {
|
||||
let start_method = Method::ConnectionStart {
|
||||
let start_method = Method::ConnectionStart(ConnectionStart {
|
||||
version_major: 0,
|
||||
version_minor: 9,
|
||||
server_properties: server_properties(
|
||||
|
|
@ -134,50 +148,50 @@ impl Connection {
|
|||
),
|
||||
mechanisms: "PLAIN".into(),
|
||||
locales: "en_US".into(),
|
||||
};
|
||||
});
|
||||
|
||||
debug!(?start_method, "Sending Start method");
|
||||
self.send_method(0, start_method).await?;
|
||||
self.send_method(ChannelNum::zero(), start_method).await?;
|
||||
|
||||
let start_ok = self.recv_method().await?;
|
||||
debug!(?start_ok, "Received Start-Ok");
|
||||
|
||||
if let Method::ConnectionStartOk {
|
||||
if let Method::ConnectionStartOk(ConnectionStartOk {
|
||||
mechanism,
|
||||
locale,
|
||||
response,
|
||||
..
|
||||
} = start_ok
|
||||
}) = start_ok
|
||||
{
|
||||
ensure_conn(mechanism == "PLAIN")?;
|
||||
ensure_conn(locale == "en_US")?;
|
||||
let plain_user = sasl::parse_sasl_plain_response(&response)?;
|
||||
info!(username = %plain_user.authentication_identity, "SASL Authentication successful")
|
||||
info!(username = %plain_user.authentication_identity, "SASL Authentication successful");
|
||||
} else {
|
||||
return Err(ConException::Todo.into_trans());
|
||||
return Err(ConException::Todo.into());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn tune(&mut self) -> Result<()> {
|
||||
let tune_method = Method::ConnectionTune {
|
||||
let tune_method = Method::ConnectionTune(ConnectionTune {
|
||||
channel_max: CHANNEL_MAX,
|
||||
frame_max: FRAME_SIZE_MAX,
|
||||
heartbeat: HEARTBEAT_DELAY,
|
||||
};
|
||||
});
|
||||
|
||||
debug!("Sending Tune method");
|
||||
self.send_method(0, tune_method).await?;
|
||||
self.send_method(ChannelNum::zero(), tune_method).await?;
|
||||
|
||||
let tune_ok = self.recv_method().await?;
|
||||
debug!(?tune_ok, "Received Tune-Ok method");
|
||||
|
||||
if let Method::ConnectionTuneOk {
|
||||
if let Method::ConnectionTuneOk(ConnectionTuneOk {
|
||||
channel_max,
|
||||
frame_max,
|
||||
heartbeat,
|
||||
} = tune_ok
|
||||
}) = tune_ok
|
||||
{
|
||||
self.channel_max = channel_max;
|
||||
self.max_frame_size = usize::try_from(frame_max).unwrap();
|
||||
|
|
@ -192,15 +206,15 @@ impl Connection {
|
|||
let open = self.recv_method().await?;
|
||||
debug!(?open, "Received Open method");
|
||||
|
||||
if let Method::ConnectionOpen { virtual_host, .. } = open {
|
||||
if let Method::ConnectionOpen(ConnectionOpen { virtual_host, .. }) = open {
|
||||
ensure_conn(virtual_host == "/")?;
|
||||
}
|
||||
|
||||
self.send_method(
|
||||
0,
|
||||
Method::ConnectionOpenOk {
|
||||
ChannelNum::zero(),
|
||||
Method::ConnectionOpenOk(ConnectionOpenOk {
|
||||
reserved_1: "".to_string(),
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
@ -208,33 +222,109 @@ impl Connection {
|
|||
}
|
||||
|
||||
async fn main_loop(&mut self) -> Result<()> {
|
||||
// todo: find out how header/body frames can interleave between channels
|
||||
let mut wait_for_body = WaitForBodyStatus::None;
|
||||
|
||||
loop {
|
||||
debug!("Waiting for next frame");
|
||||
let frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?;
|
||||
self.reset_timeout();
|
||||
|
||||
match frame.kind {
|
||||
FrameType::Method => wait_for_body = self.dispatch_method(frame).await?,
|
||||
FrameType::Heartbeat => {}
|
||||
FrameType::Header => match wait_for_body {
|
||||
WaitForBodyStatus::None => warn!(channel = %frame.channel, "unexpected header"),
|
||||
WaitForBodyStatus::Method(method) => {
|
||||
wait_for_body =
|
||||
WaitForBodyStatus::Header(method, ContentHeader::new(), SmallVec::new())
|
||||
FrameType::Method => self.dispatch_method(frame).await?,
|
||||
FrameType::Heartbeat => { /* Nothing here, just the `reset_timeout` above */ }
|
||||
FrameType::Header => self.dispatch_header(frame)?,
|
||||
FrameType::Body => self.dispatch_body(frame)?,
|
||||
}
|
||||
WaitForBodyStatus::Header(_, _, _) => {
|
||||
warn!(channel = %frame.channel, "already got header")
|
||||
}
|
||||
}
|
||||
|
||||
async fn dispatch_method(&mut self, frame: Frame) -> Result<()> {
|
||||
let method = methods::parse_method(&frame.payload)?;
|
||||
debug!(?method, "Received method");
|
||||
|
||||
// Sending a method implicitly cancels the content frames that might be ongoing
|
||||
self.channels
|
||||
.get_mut(&frame.channel)
|
||||
.map(|channel| channel.status.take());
|
||||
|
||||
match method {
|
||||
Method::ConnectionClose(ConnectionClose {
|
||||
reply_code,
|
||||
reply_text,
|
||||
class_id,
|
||||
method_id,
|
||||
}) => {
|
||||
info!(%reply_code, %reply_text, %class_id, %method_id, "Closing connection");
|
||||
self.send_method(
|
||||
ChannelNum::zero(),
|
||||
Method::ConnectionCloseOk(ConnectionCloseOk),
|
||||
)
|
||||
.await?;
|
||||
return Err(ProtocolError::GracefulClose.into());
|
||||
}
|
||||
Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?,
|
||||
Method::ChannelClose { .. } => self.channel_close(frame.channel, method).await?,
|
||||
Method::BasicPublish { .. } => match self.channels.get_mut(&frame.channel) {
|
||||
Some(channel) => {
|
||||
channel.status = ChannelStatus::NeedHeader(BASIC_CLASS_ID, Box::new(method));
|
||||
}
|
||||
None => return Err(ConException::Todo.into()),
|
||||
},
|
||||
FrameType::Body => match &mut wait_for_body {
|
||||
WaitForBodyStatus::None => warn!(channel = %frame.channel, "unexpected body"),
|
||||
WaitForBodyStatus::Method(_) => {
|
||||
warn!(channel = %frame.channel, "unexpected body")
|
||||
_ => {
|
||||
let channel_handle = self
|
||||
.channels
|
||||
.get(&frame.channel)
|
||||
.ok_or(ConException::Todo)?
|
||||
.handle
|
||||
.clone();
|
||||
|
||||
// call into amqp_messaging to handle the method
|
||||
// it returns the response method that we are supposed to send
|
||||
// maybe this might become an `Option` in the future
|
||||
let return_method =
|
||||
amqp_messaging::methods::handle_method(channel_handle, method).await?;
|
||||
self.send_method(frame.channel, return_method).await?;
|
||||
}
|
||||
WaitForBodyStatus::Header(_, header, vec) => {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn dispatch_header(&mut self, frame: Frame) -> Result<()> {
|
||||
self.channels
|
||||
.get_mut(&frame.channel)
|
||||
.ok_or_else(|| ConException::Todo.into())
|
||||
.and_then(|channel| match channel.status.take() {
|
||||
ChannelStatus::Default => {
|
||||
warn!(channel = %frame.channel, "unexpected header");
|
||||
Err(ConException::UnexpectedFrame.into())
|
||||
}
|
||||
ChannelStatus::NeedHeader(class_id, method) => {
|
||||
let header = ContentHeader::parse(&frame.payload)?;
|
||||
ensure_conn(header.class_id == class_id)?;
|
||||
|
||||
channel.status = ChannelStatus::NeedsBody(method, header, SmallVec::new());
|
||||
Ok(())
|
||||
}
|
||||
ChannelStatus::NeedsBody(_, _, _) => {
|
||||
warn!(channel = %frame.channel, "already got header");
|
||||
Err(ConException::UnexpectedFrame.into())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn dispatch_body(&mut self, frame: Frame) -> Result<()> {
|
||||
let channel = self
|
||||
.channels
|
||||
.get_mut(&frame.channel)
|
||||
.ok_or(ConException::Todo)?;
|
||||
|
||||
match channel.status.take() {
|
||||
ChannelStatus::Default => {
|
||||
warn!(channel = %frame.channel, "unexpected body");
|
||||
Err(ConException::UnexpectedFrame.into())
|
||||
}
|
||||
ChannelStatus::NeedHeader(_, _) => {
|
||||
warn!(channel = %frame.channel, "unexpected body");
|
||||
Err(ConException::UnexpectedFrame.into())
|
||||
}
|
||||
ChannelStatus::NeedsBody(method, header, mut vec) => {
|
||||
vec.push(frame.payload);
|
||||
match vec
|
||||
.iter()
|
||||
|
|
@ -242,71 +332,79 @@ impl Connection {
|
|||
.sum::<usize>()
|
||||
.cmp(&usize::try_from(header.body_size).unwrap())
|
||||
{
|
||||
Ordering::Equal => todo!("process body"),
|
||||
Ordering::Greater => todo!("too much data!"),
|
||||
Ordering::Less => {} // wait for next body
|
||||
Ordering::Equal => {
|
||||
self.process_method_with_body(*method, *header, vec, frame.channel)
|
||||
}
|
||||
Ordering::Greater => Err(ConException::Todo.into()),
|
||||
Ordering::Less => Ok(()), // wait for next body
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_method_with_body(
|
||||
&mut self,
|
||||
method: Method,
|
||||
header: ContentHeader,
|
||||
payloads: SmallVec<[Bytes; 1]>,
|
||||
channel: ChannelNum,
|
||||
) -> Result<()> {
|
||||
// The only method with content that is sent to the server is Basic.Publish.
|
||||
ensure_conn(header.class_id == BASIC_CLASS_ID)?;
|
||||
|
||||
if let Method::BasicPublish(BasicPublish {
|
||||
exchange,
|
||||
routing_key,
|
||||
mandatory,
|
||||
immediate,
|
||||
..
|
||||
}) = method
|
||||
{
|
||||
let message = RawMessage {
|
||||
id: MessageId::random(),
|
||||
properties: header.property_fields,
|
||||
routing: RoutingInformation {
|
||||
exchange,
|
||||
routing_key,
|
||||
mandatory,
|
||||
immediate,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
content: payloads,
|
||||
};
|
||||
let message = Arc::new(message);
|
||||
|
||||
async fn dispatch_method(&mut self, frame: Frame) -> Result<WaitForBodyStatus> {
|
||||
let method = methods::parse_method(&frame.payload)?;
|
||||
debug!(?method, "Received method");
|
||||
let channel = self.channels.get(&channel).ok_or(ConException::Todo)?;
|
||||
|
||||
match method {
|
||||
Method::ConnectionClose {
|
||||
reply_code,
|
||||
reply_text,
|
||||
class_id,
|
||||
method_id,
|
||||
} => {
|
||||
info!(%reply_code, %reply_text, %class_id, %method_id, "Closing connection");
|
||||
self.send_method(0, Method::ConnectionCloseOk {}).await?;
|
||||
return Err(ProtocolError::GracefulClose.into());
|
||||
}
|
||||
Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?,
|
||||
Method::ChannelClose { .. } => self.channel_close(frame.channel, method).await?,
|
||||
Method::BasicPublish { .. } => return Ok(WaitForBodyStatus::Method(method)),
|
||||
_ => {
|
||||
let channel_handle = self
|
||||
.channels
|
||||
.get(&frame.channel)
|
||||
.ok_or_else(|| ConException::Todo.into_trans())?
|
||||
.channel_handle
|
||||
.clone();
|
||||
|
||||
tokio::spawn(amqp_messaging::methods::handle_method(
|
||||
channel_handle,
|
||||
method,
|
||||
// Spawn the handler for the publish. The connection task goes back to handling
|
||||
// just the connection.
|
||||
tokio::spawn(amqp_messaging::methods::handle_basic_publish(
|
||||
channel.handle.clone(),
|
||||
message,
|
||||
));
|
||||
// we don't handle this here, forward it to *somewhere*
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ConException::Todo.into())
|
||||
}
|
||||
}
|
||||
|
||||
Ok(WaitForBodyStatus::None)
|
||||
}
|
||||
|
||||
async fn channel_open(&mut self, num: u16) -> Result<()> {
|
||||
let id = Uuid::from_bytes(rand::random());
|
||||
let channel_handle = amqp_core::Channel::new_handle(
|
||||
async fn channel_open(&mut self, channel_num: ChannelNum) -> Result<()> {
|
||||
let id = rand::random();
|
||||
let channel_handle = amqp_core::connection::Channel::new_handle(
|
||||
id,
|
||||
num,
|
||||
self.connection_handle.clone(),
|
||||
channel_num.num(),
|
||||
self.handle.clone(),
|
||||
self.global_data.clone(),
|
||||
);
|
||||
|
||||
let channel = Channel {
|
||||
num,
|
||||
channel_handle: channel_handle.clone(),
|
||||
handle: channel_handle.clone(),
|
||||
status: ChannelStatus::Default,
|
||||
};
|
||||
|
||||
let prev = self.channels.insert(num, channel);
|
||||
let prev = self.channels.insert(channel_num, channel);
|
||||
if let Some(prev) = prev {
|
||||
self.channels.insert(num, prev); // restore previous state
|
||||
return Err(ConException::ChannelError.into_trans());
|
||||
self.channels.insert(channel_num, prev); // restore previous state
|
||||
return Err(ConException::ChannelError.into());
|
||||
}
|
||||
|
||||
{
|
||||
|
|
@ -318,36 +416,37 @@ impl Connection {
|
|||
.unwrap()
|
||||
.lock()
|
||||
.channels
|
||||
.insert(num, channel_handle);
|
||||
.insert(channel_num.num(), channel_handle);
|
||||
}
|
||||
|
||||
info!(%num, "Opened new channel");
|
||||
info!(%channel_num, "Opened new channel");
|
||||
|
||||
self.send_method(
|
||||
num,
|
||||
Method::ChannelOpenOk {
|
||||
channel_num,
|
||||
Method::ChannelOpenOk(ChannelOpenOk {
|
||||
reserved_1: Vec::new(),
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn channel_close(&mut self, num: u16, method: Method) -> Result<()> {
|
||||
if let Method::ChannelClose {
|
||||
async fn channel_close(&mut self, channel_id: ChannelNum, method: Method) -> Result<()> {
|
||||
if let Method::ChannelClose(ChannelClose {
|
||||
reply_code: code,
|
||||
reply_text: reason,
|
||||
..
|
||||
} = method
|
||||
}) = method
|
||||
{
|
||||
info!(%code, %reason, "Closing channel");
|
||||
|
||||
if let Some(channel) = self.channels.remove(&num) {
|
||||
if let Some(channel) = self.channels.remove(&channel_id) {
|
||||
drop(channel);
|
||||
self.send_method(num, Method::ChannelCloseOk).await?;
|
||||
self.send_method(channel_id, Method::ChannelCloseOk(ChannelCloseOk))
|
||||
.await?;
|
||||
} else {
|
||||
return Err(ConException::Todo.into_trans());
|
||||
return Err(ConException::Todo.into());
|
||||
}
|
||||
} else {
|
||||
unreachable!()
|
||||
|
|
@ -357,7 +456,7 @@ impl Connection {
|
|||
|
||||
fn reset_timeout(&mut self) {
|
||||
if self.heartbeat_delay != 0 {
|
||||
let next = Duration::from_secs(u64::from(self.heartbeat_delay));
|
||||
let next = Duration::from_secs(u64::from(self.heartbeat_delay / 2));
|
||||
self.next_timeout = Box::pin(time::sleep(next));
|
||||
}
|
||||
}
|
||||
|
|
@ -396,13 +495,13 @@ impl Connection {
|
|||
|
||||
impl Drop for Connection {
|
||||
fn drop(&mut self) {
|
||||
self.connection_handle.lock().close();
|
||||
self.handle.lock().close();
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Channel {
|
||||
fn drop(&mut self) {
|
||||
self.channel_handle.lock().close();
|
||||
self.handle.lock().close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,15 +1,15 @@
|
|||
#![allow(dead_code)]
|
||||
|
||||
use std::io::Error;
|
||||
|
||||
pub type StdResult<T, E> = std::result::Result<T, E>;
|
||||
pub use amqp_core::error::{ConException, ProtocolError};
|
||||
|
||||
type StdResult<T, E> = std::result::Result<T, E>;
|
||||
|
||||
pub type Result<T> = StdResult<T, TransError>;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum TransError {
|
||||
#[error("{0}")]
|
||||
Invalid(#[from] ProtocolError),
|
||||
Protocol(#[from] ProtocolError),
|
||||
#[error("connection error: `{0}`")]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
|
@ -20,40 +20,8 @@ impl From<std::io::Error> for TransError {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ProtocolError {
|
||||
#[error("fatal error")]
|
||||
Fatal,
|
||||
#[error("{0}")]
|
||||
ConException(#[from] ConException),
|
||||
#[error("{0}")]
|
||||
ChannelException(#[from] ChannelException),
|
||||
#[error("Connection must be closed")]
|
||||
CloseNow,
|
||||
#[error("Graceful connection closing requested")]
|
||||
GracefulClose,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ConException {
|
||||
#[error("501 Frame error")]
|
||||
FrameError,
|
||||
#[error("503 Command invalid")]
|
||||
CommandInvalid,
|
||||
#[error("503 Syntax error | {0:?}")]
|
||||
/// A method was received but there was a syntax error. The string stores where it occurred.
|
||||
SyntaxError(Vec<String>),
|
||||
#[error("504 Channel error")]
|
||||
ChannelError,
|
||||
#[error("xxx Not decided yet")]
|
||||
Todo,
|
||||
}
|
||||
|
||||
impl ConException {
|
||||
pub fn into_trans(self) -> TransError {
|
||||
TransError::Invalid(ProtocolError::ConException(self))
|
||||
impl From<amqp_core::error::ConException> for TransError {
|
||||
fn from(err: ConException) -> Self {
|
||||
Self::Protocol(ProtocolError::ConException(err))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ChannelException {}
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
use crate::error::{ConException, ProtocolError, Result};
|
||||
use amqp_core::methods::FieldValue;
|
||||
use amqp_core::connection::ChannelNum;
|
||||
use amqp_core::methods;
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use smallvec::SmallVec;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tracing::trace;
|
||||
|
||||
|
|
@ -19,7 +19,7 @@ mod frame_type {
|
|||
pub struct Frame {
|
||||
/// The type of the frame including its parsed metadata.
|
||||
pub kind: FrameType,
|
||||
pub channel: u16,
|
||||
pub channel: ChannelNum,
|
||||
/// Includes the whole payload, also including the metadata from each type.
|
||||
pub payload: Bytes,
|
||||
}
|
||||
|
|
@ -38,24 +38,112 @@ pub struct ContentHeader {
|
|||
pub class_id: u16,
|
||||
pub weight: u16,
|
||||
pub body_size: u64,
|
||||
pub property_flags: SmallVec<[u16; 1]>,
|
||||
pub property_fields: Vec<FieldValue>,
|
||||
pub property_fields: methods::Table,
|
||||
}
|
||||
|
||||
mod content_header_parse {
|
||||
use crate::error::TransError;
|
||||
use crate::frame::ContentHeader;
|
||||
use crate::methods::parse_helper::{octet, shortstr, table, timestamp};
|
||||
use amqp_core::methods;
|
||||
use amqp_core::methods::FieldValue::{FieldTable, ShortShortUInt, ShortString, Timestamp};
|
||||
use nom::number::complete::{u16, u64};
|
||||
use nom::number::Endianness::Big;
|
||||
|
||||
type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>;
|
||||
|
||||
pub fn basic_properties(flags: u16, input: &[u8]) -> IResult<'_, methods::Table> {
|
||||
macro_rules! parse_property {
|
||||
(if $flags:ident >> $n:literal, $parser:ident($input:ident)?, $map:ident.insert($name:expr, $ctor:path)) => {
|
||||
if (($flags >> $n) & 1) == 1 {
|
||||
let (input, value) = $parser($input)?;
|
||||
$map.insert(String::from($name), $ctor(value));
|
||||
input
|
||||
} else {
|
||||
$input
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let mut map = methods::Table::new();
|
||||
|
||||
let input = parse_property!(if flags >> 15, shortstr(input)?, map.insert("content-type", ShortString));
|
||||
let input = parse_property!(if flags >> 14, shortstr(input)?, map.insert("content-encoding", ShortString));
|
||||
let input =
|
||||
parse_property!(if flags >> 13, table(input)?, map.insert("headers", FieldTable));
|
||||
let input = parse_property!(if flags >> 12, octet(input)?, map.insert("delivery-mode", ShortShortUInt));
|
||||
let input =
|
||||
parse_property!(if flags >> 11, octet(input)?, map.insert("priority", ShortShortUInt));
|
||||
let input = parse_property!(if flags >> 10, shortstr(input)?, map.insert("correlation-id", ShortString));
|
||||
let input =
|
||||
parse_property!(if flags >> 9, shortstr(input)?, map.insert("reply-to", ShortString));
|
||||
let input =
|
||||
parse_property!(if flags >> 8, shortstr(input)?, map.insert("expiration", ShortString));
|
||||
let input =
|
||||
parse_property!(if flags >> 7, shortstr(input)?, map.insert("message-id", ShortString));
|
||||
let input =
|
||||
parse_property!(if flags >> 6, timestamp(input)?, map.insert("timestamp", Timestamp));
|
||||
let input =
|
||||
parse_property!(if flags >> 5, shortstr(input)?, map.insert("type", ShortString));
|
||||
let input =
|
||||
parse_property!(if flags >> 4, shortstr(input)?, map.insert("user-id", ShortString));
|
||||
let input =
|
||||
parse_property!(if flags >> 3, shortstr(input)?, map.insert("app-id", ShortString));
|
||||
let input =
|
||||
parse_property!(if flags >> 2, shortstr(input)?, map.insert("reserved", ShortString));
|
||||
|
||||
Ok((input, map))
|
||||
}
|
||||
|
||||
pub fn header(input: &[u8]) -> IResult<'_, Box<ContentHeader>> {
|
||||
let (input, class_id) = u16(Big)(input)?;
|
||||
let (input, weight) = u16(Big)(input)?;
|
||||
let (input, body_size) = u64(Big)(input)?;
|
||||
|
||||
// I do not quite understand this here. Apparently, there can be more than 15 flags?
|
||||
// But the Basic class only specifies 15, so idk. Don't care about this for now
|
||||
// Todo: But probably later.
|
||||
let (input, property_flags) = u16(Big)(input)?;
|
||||
let (input, property_fields) = basic_properties(property_flags, input)?;
|
||||
|
||||
Ok((
|
||||
input,
|
||||
Box::new(ContentHeader {
|
||||
class_id,
|
||||
weight,
|
||||
body_size,
|
||||
property_fields,
|
||||
}),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl ContentHeader {
|
||||
pub fn new() -> Self {
|
||||
todo!()
|
||||
pub fn parse(input: &[u8]) -> Result<Box<Self>> {
|
||||
match content_header_parse::header(input) {
|
||||
Ok(([], header)) => Ok(header),
|
||||
Ok((_, _)) => {
|
||||
Err(
|
||||
ConException::SyntaxError(vec!["could not consume all input".to_string()])
|
||||
.into(),
|
||||
)
|
||||
}
|
||||
Err(nom::Err::Incomplete(_)) => {
|
||||
Err(ConException::SyntaxError(vec!["there was not enough data".to_string()]).into())
|
||||
}
|
||||
Err(nom::Err::Failure(err) | nom::Err::Error(err)) => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn write_frame<W>(frame: &Frame, mut w: W) -> Result<()>
|
||||
where
|
||||
W: AsyncWriteExt + Unpin,
|
||||
W: AsyncWriteExt + Unpin + Send,
|
||||
{
|
||||
trace!(?frame, "Sending frame");
|
||||
|
||||
w.write_u8(frame.kind as u8).await?;
|
||||
w.write_u16(frame.channel).await?;
|
||||
w.write_u16(frame.channel.num()).await?;
|
||||
w.write_u32(u32::try_from(frame.payload.len()).context("frame size too big")?)
|
||||
.await?;
|
||||
w.write_all(&frame.payload).await?;
|
||||
|
|
@ -66,10 +154,11 @@ where
|
|||
|
||||
pub async fn read_frame<R>(r: &mut R, max_frame_size: usize) -> Result<Frame>
|
||||
where
|
||||
R: AsyncReadExt + Unpin,
|
||||
R: AsyncReadExt + Unpin + Send,
|
||||
{
|
||||
let kind = r.read_u8().await.context("read type")?;
|
||||
let channel = r.read_u16().await.context("read channel")?;
|
||||
let channel = ChannelNum::new(channel);
|
||||
let size = r.read_u32().await.context("read size")?;
|
||||
|
||||
let mut payload = vec![0; size.try_into().unwrap()];
|
||||
|
|
@ -82,7 +171,7 @@ where
|
|||
}
|
||||
|
||||
if max_frame_size != 0 && payload.len() > max_frame_size {
|
||||
return Err(ConException::FrameError.into_trans());
|
||||
return Err(ConException::FrameError.into());
|
||||
}
|
||||
|
||||
let kind = parse_frame_type(kind, channel)?;
|
||||
|
|
@ -98,25 +187,25 @@ where
|
|||
Ok(frame)
|
||||
}
|
||||
|
||||
fn parse_frame_type(kind: u8, channel: u16) -> Result<FrameType> {
|
||||
fn parse_frame_type(kind: u8, channel: ChannelNum) -> Result<FrameType> {
|
||||
match kind {
|
||||
frame_type::METHOD => Ok(FrameType::Method),
|
||||
frame_type::HEADER => Ok(FrameType::Header),
|
||||
frame_type::BODY => Ok(FrameType::Body),
|
||||
frame_type::HEARTBEAT => {
|
||||
if channel != 0 {
|
||||
Err(ProtocolError::ConException(ConException::FrameError).into())
|
||||
} else {
|
||||
if channel.is_zero() {
|
||||
Ok(FrameType::Heartbeat)
|
||||
} else {
|
||||
Err(ProtocolError::ConException(ConException::FrameError).into())
|
||||
}
|
||||
}
|
||||
_ => Err(ConException::FrameError.into_trans()),
|
||||
_ => Err(ConException::FrameError.into()),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::frame::{Frame, FrameType};
|
||||
use crate::frame::{ChannelNum, Frame, FrameType};
|
||||
use bytes::Bytes;
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -145,7 +234,7 @@ mod tests {
|
|||
frame,
|
||||
Frame {
|
||||
kind: FrameType::Method,
|
||||
channel: 0,
|
||||
channel: ChannelNum::new(0),
|
||||
payload: Bytes::from_static(&[1, 2, 3]),
|
||||
}
|
||||
);
|
||||
|
|
|
|||
|
|
@ -8,12 +8,13 @@ mod sasl;
|
|||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
// TODO: handle big types
|
||||
|
||||
use crate::connection::Connection;
|
||||
use amqp_core::GlobalData;
|
||||
use anyhow::Result;
|
||||
use tokio::net;
|
||||
use tracing::{info, info_span, Instrument};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub async fn do_thing_i_guess(global_data: GlobalData) -> Result<()> {
|
||||
info!("Binding TCP listener...");
|
||||
|
|
@ -23,13 +24,13 @@ pub async fn do_thing_i_guess(global_data: GlobalData) -> Result<()> {
|
|||
loop {
|
||||
let (stream, peer_addr) = listener.accept().await?;
|
||||
|
||||
let id = Uuid::from_bytes(rand::random());
|
||||
let id = rand::random();
|
||||
|
||||
info!(local_addr = ?stream.local_addr(), %id, "Accepted new connection");
|
||||
let span = info_span!("client-connection", %id);
|
||||
|
||||
let connection_handle =
|
||||
amqp_core::Connection::new_handle(id, peer_addr, global_data.clone());
|
||||
amqp_core::connection::Connection::new_handle(id, peer_addr, global_data.clone());
|
||||
|
||||
let mut global_data_guard = global_data.lock();
|
||||
global_data_guard
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -1,10 +1,10 @@
|
|||
use crate::error::{ConException, TransError};
|
||||
use crate::error::TransError;
|
||||
use amqp_core::error::ConException;
|
||||
use amqp_core::methods::{FieldValue, Method, Table};
|
||||
use rand::Rng;
|
||||
use std::collections::HashMap;
|
||||
|
||||
mod generated;
|
||||
mod parse_helper;
|
||||
pub mod parse_helper;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
mod write_helper;
|
||||
|
|
@ -18,16 +18,10 @@ pub fn parse_method(payload: &[u8]) -> Result<Method, TransError> {
|
|||
match nom_result {
|
||||
Ok(([], method)) => Ok(method),
|
||||
Ok((_, _)) => {
|
||||
Err(
|
||||
ConException::SyntaxError(vec!["could not consume all input".to_string()])
|
||||
.into_trans(),
|
||||
)
|
||||
Err(ConException::SyntaxError(vec!["could not consume all input".to_string()]).into())
|
||||
}
|
||||
Err(nom::Err::Incomplete(_)) => {
|
||||
Err(
|
||||
ConException::SyntaxError(vec!["there was not enough data".to_string()])
|
||||
.into_trans(),
|
||||
)
|
||||
Err(ConException::SyntaxError(vec!["there was not enough data".to_string()]).into())
|
||||
}
|
||||
Err(nom::Err::Failure(err) | nom::Err::Error(err)) => Err(err),
|
||||
}
|
||||
|
|
@ -70,7 +64,9 @@ rand_random_method!(bool, u8, i8, u16, i16, u32, i32, u64, i64, f32, f64);
|
|||
impl<R: Rng> RandomMethod<R> for Table {
|
||||
fn random(rng: &mut R) -> Self {
|
||||
let len = rng.gen_range(0..3);
|
||||
HashMap::from_iter((0..len).map(|_| (String::random(rng), FieldValue::random(rng))))
|
||||
(0..len)
|
||||
.map(|_| (String::random(rng), FieldValue::random(rng)))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
use crate::error::{ConException, ProtocolError, TransError};
|
||||
use crate::error::TransError;
|
||||
use crate::methods::generated::parse::IResult;
|
||||
use amqp_core::error::{ConException, ProtocolError};
|
||||
use amqp_core::methods::{
|
||||
Bit, FieldValue, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, TableFieldName,
|
||||
Timestamp,
|
||||
|
|
@ -11,11 +12,10 @@ use nom::multi::{count, many0};
|
|||
use nom::number::complete::{f32, f64, i16, i32, i64, i8, u16, u32, u64, u8};
|
||||
use nom::number::Endianness::Big;
|
||||
use nom::Err;
|
||||
use std::collections::HashMap;
|
||||
|
||||
impl<T> nom::error::ParseError<T> for TransError {
|
||||
fn from_error_kind(_input: T, _kind: ErrorKind) -> Self {
|
||||
ConException::SyntaxError(vec![]).into_trans()
|
||||
ConException::SyntaxError(vec![]).into()
|
||||
}
|
||||
|
||||
fn append(_input: T, _kind: ErrorKind, other: Self) -> Self {
|
||||
|
|
@ -28,7 +28,7 @@ pub fn fail_err<S: Into<String>>(msg: S) -> impl FnOnce(Err<TransError>) -> Err<
|
|||
let msg = msg.into();
|
||||
let stack = match err {
|
||||
Err::Error(e) | Err::Failure(e) => match e {
|
||||
TransError::Invalid(ProtocolError::ConException(ConException::SyntaxError(
|
||||
TransError::Protocol(ProtocolError::ConException(ConException::SyntaxError(
|
||||
mut stack,
|
||||
))) => {
|
||||
stack.push(msg);
|
||||
|
|
@ -36,22 +36,22 @@ pub fn fail_err<S: Into<String>>(msg: S) -> impl FnOnce(Err<TransError>) -> Err<
|
|||
}
|
||||
_ => vec![msg],
|
||||
},
|
||||
_ => vec![msg],
|
||||
Err::Incomplete(_) => vec![msg],
|
||||
};
|
||||
Err::Failure(ConException::SyntaxError(stack).into_trans())
|
||||
Err::Failure(ConException::SyntaxError(stack).into())
|
||||
}
|
||||
}
|
||||
pub fn err_other<E, S: Into<String>>(msg: S) -> impl FnOnce(E) -> Err<TransError> {
|
||||
move |_| Err::Error(ConException::SyntaxError(vec![msg.into()]).into_trans())
|
||||
pub fn other_fail<E, S: Into<String>>(msg: S) -> impl FnOnce(E) -> Err<TransError> {
|
||||
move |_| Err::Failure(ConException::SyntaxError(vec![msg.into()]).into())
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! fail {
|
||||
($cause:expr) => {
|
||||
return Err(nom::Err::Failure(
|
||||
crate::error::ProtocolError::ConException(crate::error::ConException::SyntaxError(
|
||||
vec![String::from($cause)],
|
||||
))
|
||||
::amqp_core::error::ProtocolError::ConException(
|
||||
::amqp_core::error::ConException::SyntaxError(vec![String::from($cause)]),
|
||||
)
|
||||
.into(),
|
||||
))
|
||||
};
|
||||
|
|
@ -104,7 +104,7 @@ pub fn bit(input: &[u8], amount: usize) -> IResult<'_, Vec<Bit>> {
|
|||
pub fn shortstr(input: &[u8]) -> IResult<'_, Shortstr> {
|
||||
let (input, len) = u8(input)?;
|
||||
let (input, str_data) = take(usize::from(len))(input)?;
|
||||
let data = String::from_utf8(str_data.into()).map_err(err_other("shortstr"))?;
|
||||
let data = String::from_utf8(str_data.into()).map_err(other_fail("shortstr"))?;
|
||||
Ok((input, data))
|
||||
}
|
||||
|
||||
|
|
@ -132,7 +132,7 @@ pub fn table(input: &[u8]) -> IResult<'_, Table> {
|
|||
));
|
||||
}
|
||||
|
||||
let table = HashMap::from_iter(values.into_iter());
|
||||
let table = values.into_iter().collect();
|
||||
Ok((rest_input, table))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,7 +2,8 @@
|
|||
//!
|
||||
//! Currently only supports PLAIN (see [RFC 4616](https://datatracker.ietf.org/doc/html/rfc4616))
|
||||
|
||||
use crate::error::{ConException, Result};
|
||||
use crate::error::Result;
|
||||
use amqp_core::error::ConException;
|
||||
|
||||
pub struct PlainUser {
|
||||
pub authorization_identity: String,
|
||||
|
|
@ -13,17 +14,11 @@ pub struct PlainUser {
|
|||
pub fn parse_sasl_plain_response(response: &[u8]) -> Result<PlainUser> {
|
||||
let mut parts = response
|
||||
.split(|&n| n == 0)
|
||||
.map(|bytes| String::from_utf8(bytes.into()).map_err(|_| ConException::Todo.into_trans()));
|
||||
.map(|bytes| String::from_utf8(bytes.into()).map_err(|_| ConException::Todo));
|
||||
|
||||
let authorization_identity = parts
|
||||
.next()
|
||||
.ok_or_else(|| ConException::Todo.into_trans())??;
|
||||
let authentication_identity = parts
|
||||
.next()
|
||||
.ok_or_else(|| ConException::Todo.into_trans())??;
|
||||
let password = parts
|
||||
.next()
|
||||
.ok_or_else(|| ConException::Todo.into_trans())??;
|
||||
let authorization_identity = parts.next().ok_or(ConException::Todo)??;
|
||||
let authentication_identity = parts.next().ok_or(ConException::Todo)??;
|
||||
let password = parts.next().ok_or(ConException::Todo)??;
|
||||
|
||||
Ok(PlainUser {
|
||||
authorization_identity,
|
||||
|
|
|
|||
|
|
@ -1,12 +1,13 @@
|
|||
use crate::frame::FrameType;
|
||||
use crate::{frame, methods};
|
||||
use amqp_core::methods::{FieldValue, Method};
|
||||
use amqp_core::connection::ChannelNum;
|
||||
use amqp_core::methods::{ConnectionStart, ConnectionStartOk, FieldValue, Method};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_start_ok_frame() {
|
||||
let mut payload = Vec::new();
|
||||
let method = Method::ConnectionStart {
|
||||
let method = Method::ConnectionStart(ConnectionStart {
|
||||
version_major: 0,
|
||||
version_minor: 9,
|
||||
server_properties: HashMap::from([(
|
||||
|
|
@ -15,13 +16,13 @@ async fn write_start_ok_frame() {
|
|||
)]),
|
||||
mechanisms: "PLAIN".into(),
|
||||
locales: "en_US".into(),
|
||||
};
|
||||
});
|
||||
|
||||
methods::write::write_method(method, &mut payload).unwrap();
|
||||
|
||||
let frame = frame::Frame {
|
||||
kind: FrameType::Method,
|
||||
channel: 0,
|
||||
channel: ChannelNum::zero(),
|
||||
payload: payload.into(),
|
||||
};
|
||||
|
||||
|
|
@ -140,7 +141,7 @@ fn read_start_ok_payload() {
|
|||
|
||||
assert_eq!(
|
||||
method,
|
||||
Method::ConnectionStartOk {
|
||||
Method::ConnectionStartOk(ConnectionStartOk {
|
||||
client_properties: HashMap::from([
|
||||
(
|
||||
"product".to_string(),
|
||||
|
|
@ -178,6 +179,6 @@ fn read_start_ok_payload() {
|
|||
mechanism: "PLAIN".to_string(),
|
||||
response: "\x00admin\x00".into(),
|
||||
locale: "en_US".to_string()
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,8 +29,8 @@ async fn main() -> Result<()> {
|
|||
}
|
||||
|
||||
fn setup_tracing() {
|
||||
let rust_log = std::env::var("RUST_LOG");
|
||||
const DEFAULT_LOG: &str = "hyper=info,debug";
|
||||
let rust_log = std::env::var("RUST_LOG");
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_level(true)
|
||||
|
|
|
|||
18
test-js/src/declare-queue.js
Normal file
18
test-js/src/declare-queue.js
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
import { assert, connectAmqp } from './utils/utils.js';
|
||||
|
||||
const queueName = 'test-queue-124';
|
||||
|
||||
const connection = await connectAmqp();
|
||||
|
||||
const channel = await connection.createChannel();
|
||||
|
||||
const reply = await channel.assertQueue(queueName, { durable: false });
|
||||
|
||||
assert(reply.messageCount === 0, 'Message found in queue');
|
||||
assert(reply.consumerCount === 0, 'Consumer listening on queue');
|
||||
assert(reply.queue === queueName, 'Wrong queue name returned');
|
||||
|
||||
console.log(`created queue '${queueName}'`);
|
||||
|
||||
await channel.close();
|
||||
await connection.close();
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
import { connectAmqp, sleep } from './utils/utils.js';
|
||||
import { connectAmqp } from './utils/utils.js';
|
||||
|
||||
const connection = await connectAmqp();
|
||||
|
||||
|
|
|
|||
|
|
@ -16,3 +16,9 @@ export const connectAmqp = async () => {
|
|||
{}
|
||||
);
|
||||
};
|
||||
|
||||
export const assert = (cond, msg) => {
|
||||
if (!cond) {
|
||||
throw new Error(`Assertion failed: ${msg}`);
|
||||
}
|
||||
};
|
||||
|
|
|
|||
|
|
@ -211,11 +211,31 @@ impl Codegen {
|
|||
|
||||
for class in &amqp.classes {
|
||||
let enum_name = class.name.to_upper_camel_case();
|
||||
for method in &class.methods {
|
||||
let method_name = method.name.to_upper_camel_case();
|
||||
write!(
|
||||
self.output,
|
||||
" {enum_name}{method_name}({enum_name}{method_name}),"
|
||||
)
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
||||
writeln!(self.output, "}}\n").ok();
|
||||
|
||||
// now codegen the individual structs
|
||||
for class in &amqp.classes {
|
||||
let class_name = class.name.to_upper_camel_case();
|
||||
for method in &class.methods {
|
||||
let method_name = method.name.to_upper_camel_case();
|
||||
self.doc_comment(&class.doc, 4);
|
||||
self.doc_comment(&method.doc, 4);
|
||||
write!(self.output, " {enum_name}{method_name}").ok();
|
||||
writeln!(
|
||||
self.output,
|
||||
"#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct {class_name}{method_name}"
|
||||
)
|
||||
.ok();
|
||||
if !method.fields.is_empty() {
|
||||
writeln!(self.output, " {{").ok();
|
||||
for field in &method.fields {
|
||||
|
|
@ -228,21 +248,19 @@ impl Codegen {
|
|||
writeln!(self.output, " /// {field_docs}").ok();
|
||||
if !field.doc.is_empty() {
|
||||
writeln!(self.output, " ///").ok();
|
||||
self.doc_comment(&field.doc, 8);
|
||||
self.doc_comment(&field.doc, 4);
|
||||
}
|
||||
} else {
|
||||
self.doc_comment(&field.doc, 8);
|
||||
self.doc_comment(&field.doc, 4);
|
||||
}
|
||||
writeln!(self.output, " {field_name}: {field_type},").ok();
|
||||
writeln!(self.output, " pub {field_name}: {field_type},").ok();
|
||||
}
|
||||
writeln!(self.output, " }},").ok();
|
||||
writeln!(self.output, " }}\n").ok();
|
||||
} else {
|
||||
writeln!(self.output, ",").ok();
|
||||
writeln!(self.output, ";\n").ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
writeln!(self.output, "}}\n").ok();
|
||||
}
|
||||
|
||||
fn amqp_type_to_rust_type(&self, amqp_type: &str) -> &'static str {
|
||||
|
|
|
|||
|
|
@ -21,8 +21,8 @@ impl Codegen {
|
|||
self.output,
|
||||
"pub mod parse {{
|
||||
use amqp_core::methods::*;
|
||||
use crate::methods::parse_helper::*;
|
||||
use crate::error::TransError;
|
||||
use crate::methods::parse_helper::*;
|
||||
use nom::{{branch::alt, bytes::complete::tag}};
|
||||
use regex::Regex;
|
||||
use once_cell::sync::Lazy;
|
||||
|
|
@ -154,14 +154,14 @@ pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>;
|
|||
let method_name = method.name.to_upper_camel_case();
|
||||
writeln!(
|
||||
self.output,
|
||||
" Ok((input, Method::{class_name}{method_name} {{"
|
||||
" Ok((input, Method::{class_name}{method_name}({class_name}{method_name} {{"
|
||||
)
|
||||
.ok();
|
||||
for field in &method.fields {
|
||||
let field_name = self.snake_case(&field.name);
|
||||
writeln!(self.output, " {field_name},").ok();
|
||||
}
|
||||
writeln!(self.output, " }}))").ok();
|
||||
writeln!(self.output, " }})))").ok();
|
||||
|
||||
writeln!(self.output, "}}").ok();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ use crate::methods::RandomMethod;
|
|||
let method_name = method.name.to_upper_camel_case();
|
||||
writeln!(
|
||||
self.output,
|
||||
" {i} => Method::{class_name}{method_name} {{"
|
||||
" {i} => Method::{class_name}{method_name}( {class_name}{method_name}{{"
|
||||
)
|
||||
.ok();
|
||||
for field in &method.fields {
|
||||
|
|
@ -54,7 +54,7 @@ use crate::methods::RandomMethod;
|
|||
)
|
||||
.ok();
|
||||
}
|
||||
writeln!(self.output, " }},").ok();
|
||||
writeln!(self.output, " }}),").ok();
|
||||
}
|
||||
writeln!(
|
||||
self.output,
|
||||
|
|
|
|||
|
|
@ -7,8 +7,8 @@ impl Codegen {
|
|||
self.output,
|
||||
"pub mod write {{
|
||||
use amqp_core::methods::*;
|
||||
use crate::methods::write_helper::*;
|
||||
use crate::error::TransError;
|
||||
use crate::methods::write_helper::*;
|
||||
use std::io::Write;
|
||||
|
||||
pub fn write_method<W: Write>(method: Method, mut writer: W) -> Result<(), TransError> {{
|
||||
|
|
@ -22,12 +22,16 @@ pub fn write_method<W: Write>(method: Method, mut writer: W) -> Result<(), Trans
|
|||
for method in &class.methods {
|
||||
let method_name = method.name.to_upper_camel_case();
|
||||
let method_index = method.index;
|
||||
writeln!(self.output, " Method::{class_name}{method_name} {{").ok();
|
||||
writeln!(
|
||||
self.output,
|
||||
" Method::{class_name}{method_name}({class_name}{method_name} {{"
|
||||
)
|
||||
.ok();
|
||||
for field in &method.fields {
|
||||
let field_name = self.snake_case(&field.name);
|
||||
writeln!(self.output, " {field_name},").ok();
|
||||
}
|
||||
writeln!(self.output, " }} => {{").ok();
|
||||
writeln!(self.output, " }}) => {{").ok();
|
||||
let [ci0, ci1] = class_index.to_be_bytes();
|
||||
let [mi0, mi1] = method_index.to_be_bytes();
|
||||
writeln!(
|
||||
|
|
|
|||
|
|
@ -1,10 +1,26 @@
|
|||
use crate::project_root;
|
||||
use anyhow::{ensure, Context, Result};
|
||||
use std::path::Path;
|
||||
use std::process::Command;
|
||||
|
||||
pub fn main() -> Result<()> {
|
||||
let test_js_root = project_root().join("test-js");
|
||||
println!("$ yarn");
|
||||
let project_root = project_root();
|
||||
let test_js_root = project_root.join("test-js");
|
||||
|
||||
let mut amqp_server = Command::new("cargo")
|
||||
.arg("run")
|
||||
.env("RUST_LOG", "trace")
|
||||
.spawn()
|
||||
.context("`cargo run` amqp")?;
|
||||
|
||||
let test_result = run_js(&test_js_root);
|
||||
|
||||
amqp_server.kill()?;
|
||||
|
||||
test_result
|
||||
}
|
||||
|
||||
fn run_js(test_js_root: &Path) -> Result<()> {
|
||||
let status = Command::new("yarn")
|
||||
.current_dir(&test_js_root)
|
||||
.status()
|
||||
|
|
|
|||
|
|
@ -1,4 +0,0 @@
|
|||
# THIS IS AN AUTOGENERATED FILE. DO NOT EDIT THIS FILE DIRECTLY.
|
||||
# yarn lockfile v1
|
||||
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue