From de027d9f5a84ec2abe882dde6c4e97f8f2272c44 Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Sat, 26 Feb 2022 23:24:08 +0100 Subject: [PATCH] more cleanup --- amqp_core/src/connection.rs | 6 +++++ amqp_core/src/macros.rs | 29 +++++++++++++++++++++- amqp_messaging/src/methods/queue.rs | 2 -- amqp_transport/src/connection.rs | 4 +-- amqp_transport/src/frame.rs | 4 +-- amqp_transport/src/methods/mod.rs | 5 ++-- amqp_transport/src/methods/parse_helper.rs | 5 ++-- amqp_transport/src/tests.rs | 3 ++- src/main.rs | 2 +- 9 files changed, 46 insertions(+), 14 deletions(-) diff --git a/amqp_core/src/connection.rs b/amqp_core/src/connection.rs index 615f670..79fb1c6 100644 --- a/amqp_core/src/connection.rs +++ b/amqp_core/src/connection.rs @@ -12,18 +12,22 @@ newtype_id!(pub ChannelId); pub struct ChannelNum(u16); impl ChannelNum { + #[must_use] pub fn new(num: u16) -> Self { Self(num) } + #[must_use] pub fn num(self) -> u16 { self.0 } + #[must_use] pub fn is_zero(self) -> bool { self.0 == 0 } + #[must_use] pub fn zero() -> Self { Self(0) } @@ -47,6 +51,7 @@ pub struct Connection { } impl Connection { + #[must_use] pub fn new_handle( id: ConnectionId, peer_addr: SocketAddr, @@ -78,6 +83,7 @@ pub struct Channel { } impl Channel { + #[must_use] pub fn new_handle( id: ChannelId, num: u16, diff --git a/amqp_core/src/macros.rs b/amqp_core/src/macros.rs index 2a4c2bd..c5d791f 100644 --- a/amqp_core/src/macros.rs +++ b/amqp_core/src/macros.rs @@ -3,8 +3,9 @@ macro_rules! newtype_id { ($vis:vis $name:ident) => { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] $vis struct $name(::uuid::Uuid); - + impl $name { + #[must_use] pub fn random() -> Self { ::rand::random() } @@ -24,6 +25,32 @@ macro_rules! newtype_id { }; } +#[macro_export] +macro_rules! newtype { + ($(#[$meta:meta])* $vis:vis $name:ident: $ty:ty) => { + $(#[$meta])* + $vis struct $name($ty); + + impl $name { + pub fn new(inner: $ty) -> Self { + Self(inner) + } + + pub fn into_inner(self) -> $ty { + self.0 + } + } + + impl std::ops::Deref for $name { + type Target = $ty; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } + }; +} + #[macro_export] macro_rules! amqp_todo { () => { diff --git a/amqp_messaging/src/methods/queue.rs b/amqp_messaging/src/methods/queue.rs index 08a5fc4..fb7406f 100644 --- a/amqp_messaging/src/methods/queue.rs +++ b/amqp_messaging/src/methods/queue.rs @@ -1,5 +1,3 @@ -#![deny(clippy::future_not_send)] - use amqp_core::connection::ChannelHandle; use amqp_core::error::{ConException, ProtocolError}; use amqp_core::methods::{Bit, ExchangeName, NoWait, QueueName, Shortstr, Table}; diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 4268a25..c485b5d 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -162,7 +162,7 @@ impl Connection { ensure_conn(mechanism == "PLAIN")?; ensure_conn(locale == "en_US")?; let plain_user = sasl::parse_sasl_plain_response(&response)?; - info!(username = %plain_user.authentication_identity, "SASL Authentication successful") + info!(username = %plain_user.authentication_identity, "SASL Authentication successful"); } else { return Err(ConException::Todo.into()); } @@ -257,7 +257,7 @@ impl Connection { Method::ChannelClose { .. } => self.channel_close(frame.channel, method).await?, Method::BasicPublish { .. } => match self.channels.get_mut(&frame.channel) { Some(channel) => { - channel.status = ChannelStatus::NeedHeader(BASIC_CLASS_ID, Box::new(method)) + channel.status = ChannelStatus::NeedHeader(BASIC_CLASS_ID, Box::new(method)); } None => return Err(ConException::Todo.into()), }, diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs index e957afb..cfb9220 100644 --- a/amqp_transport/src/frame.rs +++ b/amqp_transport/src/frame.rs @@ -138,7 +138,7 @@ impl ContentHeader { pub async fn write_frame(frame: &Frame, mut w: W) -> Result<()> where - W: AsyncWriteExt + Unpin, + W: AsyncWriteExt + Unpin + Send, { trace!(?frame, "Sending frame"); @@ -154,7 +154,7 @@ where pub async fn read_frame(r: &mut R, max_frame_size: usize) -> Result where - R: AsyncReadExt + Unpin, + R: AsyncReadExt + Unpin + Send, { let kind = r.read_u8().await.context("read type")?; let channel = r.read_u16().await.context("read channel")?; diff --git a/amqp_transport/src/methods/mod.rs b/amqp_transport/src/methods/mod.rs index f2cd587..617d6e3 100644 --- a/amqp_transport/src/methods/mod.rs +++ b/amqp_transport/src/methods/mod.rs @@ -2,7 +2,6 @@ use crate::error::TransError; use amqp_core::error::ConException; use amqp_core::methods::{FieldValue, Method, Table}; use rand::Rng; -use std::collections::HashMap; mod generated; pub mod parse_helper; @@ -65,7 +64,9 @@ rand_random_method!(bool, u8, i8, u16, i16, u32, i32, u64, i64, f32, f64); impl RandomMethod for Table { fn random(rng: &mut R) -> Self { let len = rng.gen_range(0..3); - HashMap::from_iter((0..len).map(|_| (String::random(rng), FieldValue::random(rng)))) + (0..len) + .map(|_| (String::random(rng), FieldValue::random(rng))) + .collect() } } diff --git a/amqp_transport/src/methods/parse_helper.rs b/amqp_transport/src/methods/parse_helper.rs index dfdc3d0..23a6b2e 100644 --- a/amqp_transport/src/methods/parse_helper.rs +++ b/amqp_transport/src/methods/parse_helper.rs @@ -12,7 +12,6 @@ use nom::multi::{count, many0}; use nom::number::complete::{f32, f64, i16, i32, i64, i8, u16, u32, u64, u8}; use nom::number::Endianness::Big; use nom::Err; -use std::collections::HashMap; impl nom::error::ParseError for TransError { fn from_error_kind(_input: T, _kind: ErrorKind) -> Self { @@ -37,7 +36,7 @@ pub fn fail_err>(msg: S) -> impl FnOnce(Err) -> Err< } _ => vec![msg], }, - _ => vec![msg], + Err::Incomplete(_) => vec![msg], }; Err::Failure(ConException::SyntaxError(stack).into()) } @@ -133,7 +132,7 @@ pub fn table(input: &[u8]) -> IResult<'_, Table> { )); } - let table = HashMap::from_iter(values.into_iter()); + let table = values.into_iter().collect(); Ok((rest_input, table)) } diff --git a/amqp_transport/src/tests.rs b/amqp_transport/src/tests.rs index fd89146..cb61337 100644 --- a/amqp_transport/src/tests.rs +++ b/amqp_transport/src/tests.rs @@ -1,5 +1,6 @@ -use crate::frame::{ChannelNum, FrameType}; +use crate::frame::FrameType; use crate::{frame, methods}; +use amqp_core::connection::ChannelNum; use amqp_core::methods::{FieldValue, Method}; use std::collections::HashMap; diff --git a/src/main.rs b/src/main.rs index 182a8e8..645accd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,8 +29,8 @@ async fn main() -> Result<()> { } fn setup_tracing() { - let rust_log = std::env::var("RUST_LOG"); const DEFAULT_LOG: &str = "hyper=info,debug"; + let rust_log = std::env::var("RUST_LOG"); tracing_subscriber::fmt() .with_level(true)