From 4346db648fafad5574ef993e1154508f1aee7bf9 Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Fri, 4 Mar 2022 22:15:19 +0100 Subject: [PATCH] things --- .rustfmt.toml | 3 + Cargo.lock | 17 ++-- Cargo.toml | 1 + amqp_core/src/connection.rs | 55 ++++++------ amqp_core/src/consumer.rs | 4 +- amqp_core/src/lib.rs | 15 ++-- amqp_core/src/message.rs | 3 +- amqp_core/src/queue.rs | 11 ++- amqp_dashboard/src/lib.rs | 10 ++- amqp_messaging/src/methods/consume.rs | 14 +-- amqp_messaging/src/methods/mod.rs | 10 +-- amqp_messaging/src/methods/publish.rs | 14 +-- amqp_messaging/src/methods/queue.rs | 32 +++---- amqp_transport/src/connection.rs | 100 +++++++++++---------- amqp_transport/src/frame.rs | 20 +++-- amqp_transport/src/lib.rs | 6 +- amqp_transport/src/methods/generated.rs | 6 +- amqp_transport/src/methods/mod.rs | 6 +- amqp_transport/src/methods/parse_helper.rs | 31 ++++--- amqp_transport/src/methods/write_helper.rs | 3 +- amqp_transport/src/tests.rs | 9 +- src/main.rs | 44 ++++----- xtask/src/codegen/mod.rs | 16 ++-- xtask/src/test_js.rs | 3 +- 24 files changed, 224 insertions(+), 209 deletions(-) create mode 100644 .rustfmt.toml diff --git a/.rustfmt.toml b/.rustfmt.toml new file mode 100644 index 0000000..2ebd1dc --- /dev/null +++ b/.rustfmt.toml @@ -0,0 +1,3 @@ +reorder_imports = true +imports_granularity = "Crate" +newline_style = "Unix" diff --git a/Cargo.lock b/Cargo.lock index 827eae8..db748d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,6 +19,7 @@ dependencies = [ "amqp_dashboard", "amqp_transport", "anyhow", + "clap 3.1.5", "tokio", "tracing", "tracing-subscriber", @@ -223,9 +224,9 @@ dependencies = [ [[package]] name = "clap" -version = "3.1.1" +version = "3.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d76c22c9b9b215eeb8d016ad3a90417bd13cb24cf8142756e6472445876cab7" +checksum = "ced1892c55c910c1219e98d6fc8d71f6bddba7905866ce740066d8bfea859312" dependencies = [ "atty", "bitflags", @@ -235,14 +236,14 @@ dependencies = [ "os_str_bytes", "strsim", "termcolor", - "textwrap 0.14.2", + "textwrap 0.15.0", ] [[package]] name = "clap_derive" -version = "3.1.0" +version = "3.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd1122e63869df2cb309f449da1ad54a7c6dfeb7c7e6ccd8e0825d9eb93bb72" +checksum = "da95d038ede1a964ce99f49cbe27a7fb538d1da595e4b4f70b8c8f338d17bf16" dependencies = [ "heck", "proc-macro-error", @@ -1138,9 +1139,9 @@ dependencies = [ [[package]] name = "textwrap" -version = "0.14.2" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0066c8d12af8b5acd21e00547c3797fde4e8677254a7ee429176ccebbe93dd80" +checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" [[package]] name = "thiserror" @@ -1562,7 +1563,7 @@ name = "xtask" version = "0.1.0" dependencies = [ "anyhow", - "clap 3.1.1", + "clap 3.1.5", "heck", "itertools", "strong-xml", diff --git a/Cargo.toml b/Cargo.toml index 930220a..a7ed71d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ anyhow = "1.0.53" amqp_core = { path = "./amqp_core" } amqp_dashboard = { path = "./amqp_dashboard" } amqp_transport = { path = "./amqp_transport" } +clap = { version = "3.1.5", features = ["derive"] } tokio = { version = "1.16.1", features = ["full"] } tracing = "0.1.30" tracing-subscriber = { version = "0.3.8", features = ["env-filter"] } diff --git a/amqp_core/src/connection.rs b/amqp_core/src/connection.rs index db07bf0..d111e84 100644 --- a/amqp_core/src/connection.rs +++ b/amqp_core/src/connection.rs @@ -1,12 +1,12 @@ -use crate::methods::Method; -use crate::{methods, newtype_id, GlobalData, Handle, Queue}; +use crate::{methods, methods::Method, newtype_id, GlobalData, Queue}; use bytes::Bytes; -use parking_lot::Mutex; use smallvec::SmallVec; -use std::collections::HashMap; -use std::fmt::{Display, Formatter}; -use std::net::SocketAddr; -use std::sync::Arc; +use std::{ + collections::HashMap, + fmt::{Display, Formatter}, + net::SocketAddr, + sync::Arc, +}; use tokio::sync::mpsc; newtype_id!(pub ConnectionId); @@ -43,16 +43,14 @@ impl Display for ChannelNum { } } -pub type ConnectionHandle = Handle; - #[derive(Debug)] pub struct Connection { pub id: ConnectionId, pub peer_addr: SocketAddr, pub global_data: GlobalData, - pub channels: HashMap, + pub channels: HashMap, pub exclusive_queues: Vec, - _method_queue: MethodSender, + _events: ConEventSender, } #[derive(Debug)] @@ -61,25 +59,25 @@ pub enum QueuedMethod { WithContent(Method, ContentHeader, SmallVec<[Bytes; 1]>), } -pub type MethodSender = mpsc::Sender<(ChannelNum, QueuedMethod)>; -pub type MethodReceiver = mpsc::Receiver<(ChannelNum, QueuedMethod)>; +pub type ConEventSender = mpsc::Sender<(ChannelNum, QueuedMethod)>; +pub type ConEventReceiver = mpsc::Receiver<(ChannelNum, QueuedMethod)>; impl Connection { #[must_use] - pub fn new_handle( + pub fn new( id: ConnectionId, peer_addr: SocketAddr, global_data: GlobalData, - method_queue: MethodSender, - ) -> ConnectionHandle { - Arc::new(Mutex::new(Self { + method_queue: ConEventSender, + ) -> Arc { + Arc::new(Self { id, peer_addr, global_data, channels: HashMap::new(), exclusive_queues: vec![], - _method_queue: method_queue, - })) + _events: method_queue, + }) } pub fn close(&self) { @@ -88,33 +86,31 @@ impl Connection { } } -pub type ChannelHandle = Handle; - #[derive(Debug)] pub struct Channel { pub id: ChannelId, pub num: ChannelNum, - pub connection: ConnectionHandle, + pub connection: Connection, pub global_data: GlobalData, - method_queue: MethodSender, + method_queue: ConEventSender, } impl Channel { #[must_use] - pub fn new_handle( + pub fn new( id: ChannelId, num: ChannelNum, - connection: ConnectionHandle, + connection: Connection, global_data: GlobalData, - method_queue: MethodSender, - ) -> ChannelHandle { - Arc::new(Mutex::new(Self { + method_queue: ConEventSender, + ) -> Arc { + Arc::new(Self { id, num, connection, global_data, method_queue, - })) + }) } pub fn close(&self) { @@ -130,6 +126,7 @@ impl Channel { } } +/// A content frame header. #[derive(Debug, Clone, PartialEq)] pub struct ContentHeader { pub class_id: u16, diff --git a/amqp_core/src/consumer.rs b/amqp_core/src/consumer.rs index fca682f..f729b20 100644 --- a/amqp_core/src/consumer.rs +++ b/amqp_core/src/consumer.rs @@ -1,4 +1,4 @@ -use crate::{newtype_id, ChannelHandle}; +use crate::{newtype_id, Channel}; newtype_id!( pub ConsumerId @@ -8,5 +8,5 @@ newtype_id!( pub struct Consumer { pub id: ConsumerId, pub tag: String, - pub channel: ChannelHandle, + pub channel: Channel, } diff --git a/amqp_core/src/lib.rs b/amqp_core/src/lib.rs index a58a104..da99e03 100644 --- a/amqp_core/src/lib.rs +++ b/amqp_core/src/lib.rs @@ -8,16 +8,15 @@ pub mod message; pub mod methods; pub mod queue; -use crate::connection::{ChannelHandle, ConnectionHandle}; -use crate::queue::{Queue, QueueName}; +use crate::{ + connection::{Channel, Connection}, + queue::{Queue, QueueName}, +}; use connection::{ChannelId, ConnectionId}; use parking_lot::Mutex; -use std::collections::HashMap; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use uuid::Uuid; -type Handle = Arc>; - #[derive(Debug, Clone)] pub struct GlobalData { inner: Arc>, @@ -44,8 +43,8 @@ impl GlobalData { #[derive(Debug)] pub struct GlobalDataInner { - pub connections: HashMap, - pub channels: HashMap, + pub connections: HashMap, + pub channels: HashMap, pub queues: HashMap, /// Todo: This is just for testing and will be removed later! pub default_exchange: HashMap, diff --git a/amqp_core/src/message.rs b/amqp_core/src/message.rs index cd24183..675eb74 100644 --- a/amqp_core/src/message.rs +++ b/amqp_core/src/message.rs @@ -1,5 +1,4 @@ -use crate::connection::ContentHeader; -use crate::newtype_id; +use crate::{connection::ContentHeader, newtype_id}; use bytes::Bytes; use smallvec::SmallVec; use std::sync::Arc; diff --git a/amqp_core/src/queue.rs b/amqp_core/src/queue.rs index b9464db..31d0b1c 100644 --- a/amqp_core/src/queue.rs +++ b/amqp_core/src/queue.rs @@ -1,10 +1,9 @@ -use crate::consumer::Consumer; -use crate::message::Message; -use crate::{newtype, newtype_id, ChannelId}; +use crate::{consumer::Consumer, message::Message, newtype, newtype_id, ChannelId}; use parking_lot::Mutex; -use std::borrow::Borrow; -use std::sync::atomic::AtomicUsize; -use std::sync::Arc; +use std::{ + borrow::Borrow, + sync::{atomic::AtomicUsize, Arc}, +}; pub type Queue = Arc; diff --git a/amqp_dashboard/src/lib.rs b/amqp_dashboard/src/lib.rs index 6b2b4b3..da5dc3a 100644 --- a/amqp_dashboard/src/lib.rs +++ b/amqp_dashboard/src/lib.rs @@ -1,10 +1,12 @@ #![warn(rust_2018_idioms)] use amqp_core::GlobalData; -use axum::body::{boxed, Full}; -use axum::response::{Html, IntoResponse, Response}; -use axum::routing::get; -use axum::{Json, Router}; +use axum::{ + body::{boxed, Full}, + response::{Html, IntoResponse, Response}, + routing::get, + Json, Router, +}; use serde::Serialize; use tracing::info; diff --git a/amqp_messaging/src/methods/consume.rs b/amqp_messaging/src/methods/consume.rs index 941c2f3..ed8bf1a 100644 --- a/amqp_messaging/src/methods/consume.rs +++ b/amqp_messaging/src/methods/consume.rs @@ -1,13 +1,15 @@ use crate::Result; -use amqp_core::amqp_todo; -use amqp_core::connection::ChannelHandle; -use amqp_core::consumer::{Consumer, ConsumerId}; -use amqp_core::error::{ChannelException}; -use amqp_core::methods::{BasicConsume, BasicConsumeOk, Method}; +use amqp_core::{ + amqp_todo, + connection::Channel, + consumer::{Consumer, ConsumerId}, + error::ChannelException, + methods::{BasicConsume, BasicConsumeOk, Method}, +}; use std::sync::Arc; use tracing::info; -pub fn consume(channel_handle: ChannelHandle, basic_consume: BasicConsume) -> Result { +pub fn consume(channel_handle: Channel, basic_consume: BasicConsume) -> Result { let BasicConsume { queue: queue_name, consumer_tag, diff --git a/amqp_messaging/src/methods/mod.rs b/amqp_messaging/src/methods/mod.rs index 2176258..c901e45 100644 --- a/amqp_messaging/src/methods/mod.rs +++ b/amqp_messaging/src/methods/mod.rs @@ -3,20 +3,18 @@ mod publish; mod queue; use crate::Result; -use amqp_core::amqp_todo; -use amqp_core::connection::ChannelHandle; -use amqp_core::message::Message; -use amqp_core::methods::Method; +use amqp_core::{amqp_todo, connection::Channel, message::Message, methods::Method}; +use std::sync::Arc; use tracing::{error, info}; -pub async fn handle_basic_publish(channel_handle: ChannelHandle, message: Message) { +pub async fn handle_basic_publish(channel_handle: Arc, message: Message) { match publish::publish(channel_handle, message).await { Ok(()) => {} Err(err) => error!(%err, "publish error occurred"), } } -pub async fn handle_method(channel_handle: ChannelHandle, method: Method) -> Result { +pub async fn handle_method(channel_handle: Arc, method: Method) -> Result { info!(?method, "Handling method"); let response = match method { diff --git a/amqp_messaging/src/methods/publish.rs b/amqp_messaging/src/methods/publish.rs index 430e509..49169bc 100644 --- a/amqp_messaging/src/methods/publish.rs +++ b/amqp_messaging/src/methods/publish.rs @@ -1,12 +1,14 @@ use crate::Result; -use amqp_core::amqp_todo; -use amqp_core::connection::{ChannelHandle, QueuedMethod}; -use amqp_core::error::ChannelException; -use amqp_core::message::Message; -use amqp_core::methods::{BasicPublish, Method}; +use amqp_core::{ + amqp_todo, + connection::{Channel, QueuedMethod}, + error::ChannelException, + message::Message, + methods::{BasicPublish, Method}, +}; use tracing::info; -pub async fn publish(channel_handle: ChannelHandle, message: Message) -> Result<()> { +pub async fn publish(channel_handle: Arc, message: Message) -> Result<()> { info!(?message, "Publishing message"); let global_data = channel_handle.lock().global_data.clone(); diff --git a/amqp_messaging/src/methods/queue.rs b/amqp_messaging/src/methods/queue.rs index dd613b2..ebe2222 100644 --- a/amqp_messaging/src/methods/queue.rs +++ b/amqp_messaging/src/methods/queue.rs @@ -1,16 +1,15 @@ -use amqp_core::connection::ChannelHandle; -use amqp_core::methods::{Method, QueueBind, QueueDeclare, QueueDeclareOk}; -use amqp_core::queue::{QueueDeletion, QueueId, QueueName, RawQueue}; -use amqp_core::{amqp_todo, GlobalData}; -use parking_lot::Mutex; -use std::sync::atomic::AtomicUsize; -use std::sync::Arc; use crate::Result; +use amqp_core::{ + amqp_todo, + connection::Channel, + methods::{Method, QueueBind, QueueDeclare, QueueDeclareOk}, + queue::{QueueDeletion, QueueId, QueueName, RawQueue}, + GlobalData, +}; +use parking_lot::Mutex; +use std::sync::{atomic::AtomicUsize, Arc}; -pub fn declare( - channel_handle: ChannelHandle, - queue_declare: QueueDeclare, -) -> Result { +pub fn declare(channel_handle: Channel, queue_declare: QueueDeclare) -> Result { let QueueDeclare { queue: queue_name, passive, @@ -70,18 +69,11 @@ pub fn declare( })) } -pub async fn bind( - _channel_handle: ChannelHandle, - _queue_bind: QueueBind, -) -> Result { +pub async fn bind(_channel_handle: Channel, _queue_bind: QueueBind) -> Result { amqp_todo!(); } -fn bind_queue( - global_data: GlobalData, - _exchange: (), - routing_key: Arc, -) -> Result<()> { +fn bind_queue(global_data: GlobalData, _exchange: (), routing_key: Arc) -> Result<()> { let mut global_data = global_data.lock(); // todo: don't diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index c22719f..06d2771 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -1,29 +1,34 @@ -use crate::error::{ConException, ProtocolError, Result, TransError}; -use crate::frame::{parse_content_header, Frame, FrameType}; -use crate::{frame, methods, sasl}; -use amqp_core::connection::{ - ChannelHandle, ChannelNum, ConnectionHandle, ConnectionId, ContentHeader, MethodReceiver, - MethodSender, QueuedMethod, +use crate::{ + error::{ConException, ProtocolError, Result, TransError}, + frame, + frame::{parse_content_header, Frame, FrameType}, + methods, sasl, }; -use amqp_core::message::{MessageId, RawMessage, RoutingInformation}; -use amqp_core::methods::{ - BasicPublish, ChannelClose, ChannelCloseOk, ChannelOpenOk, ConnectionClose, ConnectionCloseOk, - ConnectionOpen, ConnectionOpenOk, ConnectionStart, ConnectionStartOk, ConnectionTune, - ConnectionTuneOk, FieldValue, Method, Table, +use amqp_core::{ + amqp_todo, + connection::{ + Channel, ChannelNum, ConEventReceiver, ConEventSender, Connection, ConnectionId, + ContentHeader, QueuedMethod, + }, + message::{MessageId, RawMessage, RoutingInformation}, + methods::{ + BasicPublish, ChannelClose, ChannelCloseOk, ChannelOpenOk, ConnectionClose, + ConnectionCloseOk, ConnectionOpen, ConnectionOpenOk, ConnectionStart, ConnectionStartOk, + ConnectionTune, ConnectionTuneOk, FieldValue, Method, Table, + }, + GlobalData, }; -use amqp_core::{amqp_todo, GlobalData}; use anyhow::Context; use bytes::Bytes; use smallvec::SmallVec; -use std::cmp::Ordering; -use std::collections::HashMap; -use std::net::SocketAddr; -use std::pin::Pin; -use std::sync::Arc; -use std::time::Duration; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::TcpStream; -use tokio::{select, time}; +use std::{ + cmp::Ordering, collections::HashMap, net::SocketAddr, pin::Pin, sync::Arc, time::Duration, +}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpStream, + select, time, +}; use tracing::{debug, error, info, trace, warn}; fn ensure_conn(condition: bool) -> Result<()> { @@ -39,16 +44,16 @@ const CHANNEL_MAX: u16 = 0; const FRAME_SIZE_MAX: u32 = 0; const HEARTBEAT_DELAY: u16 = 0; -const BASIC_CLASS_ID: u16 = 60; +const BASIC_CLASS_ID: ChannelNum = ChannelNum::new(60); -pub struct Channel { +pub struct TransportChannel { /// A handle to the global channel representation. Used to remove the channel when it's dropped - handle: ChannelHandle, + global_chan: Arc, /// The current status of the channel, whether it has sent a method that expects a body status: ChannelStatus, } -pub struct Connection { +pub struct TransportConnection { id: ConnectionId, stream: TcpStream, max_frame_size: usize, @@ -56,20 +61,19 @@ pub struct Connection { channel_max: u16, /// When the next heartbeat expires next_timeout: Pin>, - channels: HashMap, - handle: ConnectionHandle, + channels: HashMap, + global_con: Arc, global_data: GlobalData, - method_queue_send: MethodSender, - method_queue_recv: MethodReceiver, + method_queue_send: ConEventSender, + method_queue_recv: ConEventReceiver, } const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); enum ChannelStatus { Default, - /// ClassId // todo: newtype it - NeedHeader(u16, Box), + NeedHeader(ChannelNum, Box), NeedsBody(Box, ContentHeader, SmallVec<[Bytes; 1]>), } @@ -79,14 +83,14 @@ impl ChannelStatus { } } -impl Connection { +impl TransportConnection { pub fn new( id: ConnectionId, stream: TcpStream, - connection_handle: ConnectionHandle, + connection_handle: Arc, global_data: GlobalData, - method_queue_send: MethodSender, - method_queue_recv: MethodReceiver, + method_queue_send: ConEventSender, + method_queue_recv: ConEventReceiver, ) -> Self { Self { id, @@ -95,11 +99,11 @@ impl Connection { heartbeat_delay: HEARTBEAT_DELAY, channel_max: CHANNEL_MAX, next_timeout: Box::pin(time::sleep(DEFAULT_TIMEOUT)), - handle: connection_handle, + global_con: connection_handle, channels: HashMap::with_capacity(4), global_data, method_queue_send, - method_queue_recv: method_queue_recv, + method_queue_recv, } } @@ -140,7 +144,7 @@ impl Connection { Err(err) => error!(%err, "Error during processing of connection"), } - let connection_handle = self.handle.lock(); + let connection_handle = self.global_con.lock(); connection_handle.close(); } @@ -364,7 +368,7 @@ impl Connection { .channels .get(&frame.channel) .ok_or(ConException::Todo)? - .handle + .global_chan .clone(); // call into amqp_messaging to handle the method @@ -470,7 +474,7 @@ impl Connection { // Spawn the handler for the publish. The connection task goes back to handling // just the connection. tokio::spawn(amqp_messaging::methods::handle_basic_publish( - channel.handle.clone(), + channel.global_chan.clone(), message, )); Ok(()) @@ -481,16 +485,16 @@ impl Connection { async fn channel_open(&mut self, channel_num: ChannelNum) -> Result<()> { let id = rand::random(); - let channel_handle = amqp_core::connection::Channel::new_handle( + let channel_handle = amqp_core::connection::c::new_handle( id, channel_num, - self.handle.clone(), + self.global_con.clone(), self.global_data.clone(), self.method_queue_send.clone(), ); - let channel = Channel { - handle: channel_handle.clone(), + let channel = TransportChannel { + global_chan: channel_handle.clone(), status: ChannelStatus::Default, }; @@ -597,15 +601,15 @@ impl Connection { } } -impl Drop for Connection { +impl Drop for TransportConnection { fn drop(&mut self) { - self.handle.lock().close(); + self.global_con.lock().close(); } } -impl Drop for Channel { +impl Drop for TransportChannel { fn drop(&mut self) { - self.handle.lock().close(); + self.global_chan.lock().close(); } } diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs index 679f875..68c7d73 100644 --- a/amqp_transport/src/frame.rs +++ b/amqp_transport/src/frame.rs @@ -33,13 +33,19 @@ pub enum FrameType { } mod content_header_parse { - use crate::error::TransError; - use crate::methods::parse_helper::{octet, shortstr, table, timestamp}; - use amqp_core::connection::ContentHeader; - use amqp_core::methods; - use amqp_core::methods::FieldValue::{FieldTable, ShortShortUInt, ShortString, Timestamp}; - use nom::number::complete::{u16, u64}; - use nom::number::Endianness::Big; + use crate::{ + error::TransError, + methods::parse_helper::{octet, shortstr, table, timestamp}, + }; + use amqp_core::{ + connection::ContentHeader, + methods, + methods::FieldValue::{FieldTable, ShortShortUInt, ShortString, Timestamp}, + }; + use nom::number::{ + complete::{u16, u64}, + Endianness::Big, + }; type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; diff --git a/amqp_transport/src/lib.rs b/amqp_transport/src/lib.rs index 2b40d65..f41abd2 100644 --- a/amqp_transport/src/lib.rs +++ b/amqp_transport/src/lib.rs @@ -10,7 +10,7 @@ mod tests; // TODO: handle big types -use crate::connection::Connection; +use crate::connection::TransportConnection; use amqp_core::GlobalData; use anyhow::Result; use tokio::net; @@ -31,7 +31,7 @@ pub async fn do_thing_i_guess(global_data: GlobalData) -> Result<()> { let (method_send, method_recv) = tokio::sync::mpsc::channel(10); - let connection_handle = amqp_core::connection::Connection::new_handle( + let connection_handle = amqp_core::connection::ConnectionInner::new_handle( id, peer_addr, global_data.clone(), @@ -43,7 +43,7 @@ pub async fn do_thing_i_guess(global_data: GlobalData) -> Result<()> { .connections .insert(id, connection_handle.clone()); - let connection = Connection::new( + let connection = TransportConnection::new( id, stream, connection_handle, diff --git a/amqp_transport/src/methods/generated.rs b/amqp_transport/src/methods/generated.rs index 7274a53..b04473c 100644 --- a/amqp_transport/src/methods/generated.rs +++ b/amqp_transport/src/methods/generated.rs @@ -2,8 +2,7 @@ // This file has been generated by `xtask/src/codegen`. Do not edit it manually. pub mod parse { - use crate::error::TransError; - use crate::methods::parse_helper::*; + use crate::{error::TransError, methods::parse_helper::*}; use amqp_core::methods::*; use nom::{branch::alt, bytes::complete::tag}; use once_cell::sync::Lazy; @@ -887,8 +886,7 @@ pub mod parse { } } pub mod write { - use crate::error::TransError; - use crate::methods::write_helper::*; + use crate::{error::TransError, methods::write_helper::*}; use amqp_core::methods::*; use std::io::Write; diff --git a/amqp_transport/src/methods/mod.rs b/amqp_transport/src/methods/mod.rs index 617d6e3..c921b44 100644 --- a/amqp_transport/src/methods/mod.rs +++ b/amqp_transport/src/methods/mod.rs @@ -1,6 +1,8 @@ use crate::error::TransError; -use amqp_core::error::ConException; -use amqp_core::methods::{FieldValue, Method, Table}; +use amqp_core::{ + error::ConException, + methods::{FieldValue, Method, Table}, +}; use rand::Rng; mod generated; diff --git a/amqp_transport/src/methods/parse_helper.rs b/amqp_transport/src/methods/parse_helper.rs index 23a6b2e..5792085 100644 --- a/amqp_transport/src/methods/parse_helper.rs +++ b/amqp_transport/src/methods/parse_helper.rs @@ -1,17 +1,22 @@ -use crate::error::TransError; -use crate::methods::generated::parse::IResult; -use amqp_core::error::{ConException, ProtocolError}; -use amqp_core::methods::{ - Bit, FieldValue, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, TableFieldName, - Timestamp, +use crate::{error::TransError, methods::generated::parse::IResult}; +use amqp_core::{ + error::{ConException, ProtocolError}, + methods::{ + Bit, FieldValue, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, TableFieldName, + Timestamp, + }, +}; +use nom::{ + branch::alt, + bytes::complete::{tag, take}, + error::ErrorKind, + multi::{count, many0}, + number::{ + complete::{f32, f64, i16, i32, i64, i8, u16, u32, u64, u8}, + Endianness::Big, + }, + Err, }; -use nom::branch::alt; -use nom::bytes::complete::{tag, take}; -use nom::error::ErrorKind; -use nom::multi::{count, many0}; -use nom::number::complete::{f32, f64, i16, i32, i64, i8, u16, u32, u64, u8}; -use nom::number::Endianness::Big; -use nom::Err; impl nom::error::ParseError for TransError { fn from_error_kind(_input: T, _kind: ErrorKind) -> Self { diff --git a/amqp_transport/src/methods/write_helper.rs b/amqp_transport/src/methods/write_helper.rs index d141120..89100c3 100644 --- a/amqp_transport/src/methods/write_helper.rs +++ b/amqp_transport/src/methods/write_helper.rs @@ -1,5 +1,4 @@ -use crate::error::TransError; -use crate::methods::FieldValue; +use crate::{error::TransError, methods::FieldValue}; use amqp_core::methods::{Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp}; use anyhow::Context; use std::io::Write; diff --git a/amqp_transport/src/tests.rs b/amqp_transport/src/tests.rs index f12c4d7..13425fe 100644 --- a/amqp_transport/src/tests.rs +++ b/amqp_transport/src/tests.rs @@ -1,7 +1,8 @@ -use crate::frame::FrameType; -use crate::{frame, methods}; -use amqp_core::connection::ChannelNum; -use amqp_core::methods::{ConnectionStart, ConnectionStartOk, FieldValue, Method}; +use crate::{frame, frame::FrameType, methods}; +use amqp_core::{ + connection::ChannelNum, + methods::{ConnectionStart, ConnectionStartOk, FieldValue, Method}, +}; use std::collections::HashMap; #[tokio::test] diff --git a/src/main.rs b/src/main.rs index 645accd..144ad3b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,26 +1,29 @@ #![warn(rust_2018_idioms)] use anyhow::Result; -use std::env; +use clap::Parser; use tracing::{info, info_span, Instrument}; +/// An AMQP 0-9-1 broker implementation. +#[derive(Parser)] +struct Args { + /// Whether to serve the dashboard on localhost. Port defaults to 3000. + #[clap(short, long)] + dashboard: bool, + /// The log level of the application. Overwrites the `RUST_LOG` env var. + #[clap(long)] + log_level: Option, +} + #[tokio::main] async fn main() -> Result<()> { - let mut dashboard = false; + let args = Args::parse(); - for arg in env::args().skip(1) { - match arg.as_str() { - "--dashboard" => dashboard = true, - "ignore-this-clippy" => eprintln!("yes please"), - _ => {} - } - } - - setup_tracing(); + setup_tracing(&args); let global_data = amqp_core::GlobalData::default(); - if dashboard { + if args.dashboard { let dashboard_span = info_span!("dashboard"); tokio::spawn(amqp_dashboard::dashboard(global_data.clone()).instrument(dashboard_span)); } @@ -28,21 +31,22 @@ async fn main() -> Result<()> { amqp_transport::do_thing_i_guess(global_data).await } -fn setup_tracing() { +fn setup_tracing(args: &Args) { const DEFAULT_LOG: &str = "hyper=info,debug"; - let rust_log = std::env::var("RUST_LOG"); + + let log_filter = args + .log_level + .clone() + .or_else(|| std::env::var("RUST_LOG").ok()) + .unwrap_or_else(|| DEFAULT_LOG.to_owned()); tracing_subscriber::fmt() .with_level(true) .with_timer(tracing_subscriber::fmt::time::time()) .with_ansi(true) .with_thread_names(true) - .with_env_filter(rust_log.clone().unwrap_or_else(|_| DEFAULT_LOG.to_string())) + .with_env_filter(&log_filter) .init(); - if let Ok(rust_log) = rust_log { - info!(%rust_log, "Using custom log level"); - } else { - info!(%DEFAULT_LOG, "Using default log level"); - } + info!(%log_filter, "Using log filter level"); } diff --git a/xtask/src/codegen/mod.rs b/xtask/src/codegen/mod.rs index bd66ad2..3040271 100644 --- a/xtask/src/codegen/mod.rs +++ b/xtask/src/codegen/mod.rs @@ -6,13 +6,15 @@ mod write; use anyhow::{bail, Context}; use heck::ToUpperCamelCase; -use std::fs; -use std::fs::File; -use std::io::Write; -use std::iter::Peekable; -use std::path::{Path, PathBuf}; -use std::process::Command; -use std::str::FromStr; +use std::{ + fs, + fs::File, + io::Write, + iter::Peekable, + path::{Path, PathBuf}, + process::Command, + str::FromStr, +}; use strong_xml::XmlRead; #[derive(Debug, XmlRead)] diff --git a/xtask/src/test_js.rs b/xtask/src/test_js.rs index eb2d5fa..ad8a42c 100644 --- a/xtask/src/test_js.rs +++ b/xtask/src/test_js.rs @@ -1,7 +1,6 @@ use crate::project_root; use anyhow::{ensure, Context, Result}; -use std::path::Path; -use std::process::Command; +use std::{path::Path, process::Command}; pub fn main() -> Result<()> { let project_root = project_root();