diff --git a/.rustfmt.toml b/.rustfmt.toml index 2ebd1dc..eba1ff0 100644 --- a/.rustfmt.toml +++ b/.rustfmt.toml @@ -1,3 +1,3 @@ -reorder_imports = true imports_granularity = "Crate" newline_style = "Unix" +group_imports = "StdExternalCrate" diff --git a/amqp_core/src/connection.rs b/amqp_core/src/connection.rs index e0116bd..b80ab75 100644 --- a/amqp_core/src/connection.rs +++ b/amqp_core/src/connection.rs @@ -1,15 +1,21 @@ -use crate::{consumer::Consumer, methods, methods::Method, newtype_id, GlobalData, Queue}; -use bytes::Bytes; -use parking_lot::Mutex; -use smallvec::SmallVec; use std::{ collections::HashMap, fmt::{Display, Formatter}, net::SocketAddr, sync::Arc, }; + +use bytes::Bytes; +use parking_lot::Mutex; +use smallvec::SmallVec; use tokio::sync::mpsc; +use crate::{ + consumer::Consumer, + methods::{self, Method}, + newtype_id, GlobalData, Queue, +}; + newtype_id!(pub ConnectionId); newtype_id!(pub ChannelId); diff --git a/amqp_core/src/lib.rs b/amqp_core/src/lib.rs index 67cf362..701a4a1 100644 --- a/amqp_core/src/lib.rs +++ b/amqp_core/src/lib.rs @@ -8,19 +8,21 @@ pub mod message; pub mod methods; pub mod queue; -use crate::{ - connection::{Channel, Connection}, - queue::{Queue, QueueName}, -}; -use connection::{ChannelId, ConnectionId}; -use parking_lot::Mutex; use std::{ collections::HashMap, fmt::{Debug, Formatter}, sync::Arc, }; + +use connection::{ChannelId, ConnectionId}; +use parking_lot::Mutex; use uuid::Uuid; +use crate::{ + connection::{Channel, Connection}, + queue::{Queue, QueueName}, +}; + #[derive(Clone)] pub struct GlobalData { inner: Arc>, diff --git a/amqp_core/src/message.rs b/amqp_core/src/message.rs index 4bb831e..b37bbde 100644 --- a/amqp_core/src/message.rs +++ b/amqp_core/src/message.rs @@ -1,7 +1,9 @@ -use crate::{connection::ContentHeader, newtype_id}; +use std::sync::Arc; + use bytes::Bytes; use smallvec::SmallVec; -use std::sync::Arc; + +use crate::{connection::ContentHeader, newtype_id}; pub type Message = Arc; diff --git a/amqp_core/src/queue.rs b/amqp_core/src/queue.rs index 62a7eac..4d9c924 100644 --- a/amqp_core/src/queue.rs +++ b/amqp_core/src/queue.rs @@ -1,17 +1,19 @@ -use crate::{ - consumer::{Consumer, ConsumerId}, - message::Message, - newtype, newtype_id, ChannelId, -}; -use parking_lot::Mutex; use std::{ borrow::Borrow, collections::HashMap, fmt::{Debug, Display, Formatter}, sync::{atomic::AtomicUsize, Arc}, }; + +use parking_lot::Mutex; use tokio::sync::mpsc; +use crate::{ + consumer::{Consumer, ConsumerId}, + message::Message, + newtype, newtype_id, ChannelId, +}; + pub type Queue = Arc; #[derive(Debug)] diff --git a/amqp_dashboard/build.rs b/amqp_dashboard/build.rs index 1d1f7f0..db7ddf5 100644 --- a/amqp_dashboard/build.rs +++ b/amqp_dashboard/build.rs @@ -1,10 +1,11 @@ -use anyhow::{ensure, Context, Result}; use std::{ env, fs::File, path::{Path, PathBuf}, process::Command, }; + +use anyhow::{ensure, Context, Result}; use walkdir::WalkDir; use zip::{write::FileOptions, ZipWriter}; diff --git a/amqp_dashboard/src/archive.rs b/amqp_dashboard/src/archive.rs index 849c660..0754ff9 100644 --- a/amqp_dashboard/src/archive.rs +++ b/amqp_dashboard/src/archive.rs @@ -1,8 +1,3 @@ -use axum::{ - body::Body, - http::{header, Request, Response, StatusCode}, -}; -use mime_guess::mime; use std::{ collections::HashMap, fmt::{Debug, Formatter}, @@ -11,6 +6,12 @@ use std::{ path::Path, task::{Context, Poll}, }; + +use axum::{ + body::Body, + http::{header, Request, Response, StatusCode}, +}; +use mime_guess::mime; use tracing::trace; use zip::ZipArchive; diff --git a/amqp_dashboard/src/lib.rs b/amqp_dashboard/src/lib.rs index f71cc6c..a6334bb 100644 --- a/amqp_dashboard/src/lib.rs +++ b/amqp_dashboard/src/lib.rs @@ -2,7 +2,6 @@ mod archive; -use crate::archive::StaticFileService; use amqp_core::GlobalData; use axum::{ http::{Method, StatusCode}, @@ -14,6 +13,8 @@ use serde::Serialize; use tower_http::cors::{Any, CorsLayer}; use tracing::{error, info}; +use crate::archive::StaticFileService; + const DATA_ZIP: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/frontend.zip")); pub async fn start_dashboard(global_data: GlobalData) { diff --git a/amqp_messaging/src/methods/consume.rs b/amqp_messaging/src/methods/consume.rs index 0753b49..92f1b91 100644 --- a/amqp_messaging/src/methods/consume.rs +++ b/amqp_messaging/src/methods/consume.rs @@ -1,4 +1,5 @@ -use crate::Result; +use std::sync::Arc; + use amqp_core::{ amqp_todo, connection::Channel, @@ -6,9 +7,10 @@ use amqp_core::{ error::ChannelException, methods::{BasicConsume, BasicConsumeOk, Method}, }; -use std::sync::Arc; use tracing::info; +use crate::Result; + pub fn consume(channel: Channel, basic_consume: BasicConsume) -> Result { let BasicConsume { queue: queue_name, diff --git a/amqp_messaging/src/methods/mod.rs b/amqp_messaging/src/methods/mod.rs index 158ee5a..8b5a0e7 100644 --- a/amqp_messaging/src/methods/mod.rs +++ b/amqp_messaging/src/methods/mod.rs @@ -2,10 +2,11 @@ mod consume; mod publish; mod queue; -use crate::Result; use amqp_core::{amqp_todo, connection::Channel, message::Message, methods::Method}; use tracing::info; +use crate::Result; + pub fn handle_basic_publish(channel_handle: Channel, message: Message) -> Result<()> { publish::publish(channel_handle, message) } diff --git a/amqp_messaging/src/methods/publish.rs b/amqp_messaging/src/methods/publish.rs index a029021..4a1c0dd 100644 --- a/amqp_messaging/src/methods/publish.rs +++ b/amqp_messaging/src/methods/publish.rs @@ -1,4 +1,3 @@ -use crate::Result; use amqp_core::{ amqp_todo, connection::Channel, @@ -8,6 +7,8 @@ use amqp_core::{ }; use tracing::{debug, error}; +use crate::Result; + pub fn publish(channel_handle: Channel, message: Message) -> Result<()> { debug!(?message, "Publishing message"); diff --git a/amqp_messaging/src/methods/queue.rs b/amqp_messaging/src/methods/queue.rs index 6247725..2752ecf 100644 --- a/amqp_messaging/src/methods/queue.rs +++ b/amqp_messaging/src/methods/queue.rs @@ -1,4 +1,5 @@ -use crate::{queue_worker::QueueTask, Result}; +use std::sync::{atomic::AtomicUsize, Arc}; + use amqp_core::{ amqp_todo, connection::Channel, @@ -7,9 +8,10 @@ use amqp_core::{ GlobalData, }; use parking_lot::Mutex; -use std::sync::{atomic::AtomicUsize, Arc}; use tokio::sync::mpsc; +use crate::{queue_worker::QueueTask, Result}; + pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result { let QueueDeclare { queue: queue_name, diff --git a/amqp_messaging/src/queue_worker.rs b/amqp_messaging/src/queue_worker.rs index d76f463..16a1a78 100644 --- a/amqp_messaging/src/queue_worker.rs +++ b/amqp_messaging/src/queue_worker.rs @@ -1,3 +1,5 @@ +use std::borrow::Borrow; + use amqp_core::{ connection::ConnectionEvent, consumer::Consumer, @@ -6,7 +8,6 @@ use amqp_core::{ queue::{Queue, QueueEvent, QueueEventReceiver}, GlobalData, }; -use std::borrow::Borrow; use tracing::info; #[derive(Debug)] diff --git a/amqp_transport/benches/parser.rs b/amqp_transport/benches/parser.rs index 4a25923..cd78e87 100644 --- a/amqp_transport/benches/parser.rs +++ b/amqp_transport/benches/parser.rs @@ -1,5 +1,7 @@ use amqp_core::methods::Method; -use amqp_transport::methods::{self, RandomMethod}; +use amqp_transport::methods::{ + RandomMethod, {self}, +}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use rand::SeedableRng; diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index f35fc63..15022f6 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -1,9 +1,7 @@ -use crate::{ - error::{ConException, ProtocolError, Result, TransError}, - frame, - frame::{parse_content_header, Frame, FrameType, MaxFrameSize}, - methods, sasl, +use std::{ + cmp::Ordering, collections::HashMap, net::SocketAddr, pin::Pin, sync::Arc, time::Duration, }; + use amqp_core::{ connection::{ Channel, ChannelInner, ChannelNum, ConEventReceiver, ConEventSender, Connection, @@ -20,9 +18,6 @@ use amqp_core::{ use anyhow::{anyhow, Context}; use bytes::Bytes; use smallvec::SmallVec; -use std::{ - cmp::Ordering, collections::HashMap, net::SocketAddr, pin::Pin, sync::Arc, time::Duration, -}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::TcpStream, @@ -30,6 +25,12 @@ use tokio::{ }; use tracing::{debug, error, info, trace, warn}; +use crate::{ + error::{ConException, ProtocolError, Result, TransError}, + frame::{self, parse_content_header, Frame, FrameType, MaxFrameSize}, + methods, sasl, +}; + fn ensure_conn(condition: bool) -> Result<()> { if condition { Ok(()) diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs index 347afe3..ddb4d79 100644 --- a/amqp_transport/src/frame.rs +++ b/amqp_transport/src/frame.rs @@ -1,14 +1,16 @@ -use crate::error::{ConException, ProtocolError, Result}; -use amqp_core::connection::{ChannelNum, ContentHeader}; -use anyhow::Context; -use bytes::Bytes; use std::{ fmt::{Debug, Formatter}, num::NonZeroUsize, }; + +use amqp_core::connection::{ChannelNum, ContentHeader}; +use anyhow::Context; +use bytes::Bytes; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::trace; +use crate::error::{ConException, ProtocolError, Result}; + const REQUIRED_FRAME_END: u8 = 0xCE; mod frame_type { @@ -37,20 +39,23 @@ pub enum FrameType { } mod content_header_parse { - use crate::{ - error::TransError, - methods::parse_helper::{octet, shortstr, table, timestamp}, - }; use amqp_core::{ connection::ContentHeader, - methods, - methods::FieldValue::{FieldTable, ShortShortUInt, ShortString, Timestamp}, + methods::{ + self, + FieldValue::{FieldTable, ShortShortUInt, ShortString, Timestamp}, + }, }; use nom::number::{ complete::{u16, u64}, Endianness::Big, }; + use crate::{ + error::TransError, + methods::parse_helper::{octet, shortstr, table, timestamp}, + }; + type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; pub fn basic_properties(flags: u16, input: &[u8]) -> IResult<'_, methods::Table> { @@ -133,10 +138,8 @@ pub fn parse_content_header(input: &[u8]) -> Result { } mod content_header_write { - use crate::{ - error::Result, - methods::write_helper::{longlong, octet, short, shortstr, table, timestamp}, - }; + use std::io::Write; + use amqp_core::{ connection::ContentHeader, methods::{ @@ -144,7 +147,11 @@ mod content_header_write { Table, }, }; - use std::io::Write; + + use crate::{ + error::Result, + methods::write_helper::{longlong, octet, short, shortstr, table, timestamp}, + }; pub fn write_content_header(buf: &mut W, header: &ContentHeader) -> Result<()> { short(&header.class_id, buf)?; @@ -325,9 +332,10 @@ fn parse_frame_type(kind: u8, channel: ChannelNum) -> Result { #[cfg(test)] mod tests { - use crate::frame::{ChannelNum, Frame, FrameType, MaxFrameSize}; use bytes::Bytes; + use crate::frame::{ChannelNum, Frame, FrameType, MaxFrameSize}; + #[tokio::test] async fn read_small_body() { let mut bytes: &[u8] = &[ diff --git a/amqp_transport/src/lib.rs b/amqp_transport/src/lib.rs index 04e10b3..17822ca 100644 --- a/amqp_transport/src/lib.rs +++ b/amqp_transport/src/lib.rs @@ -10,13 +10,15 @@ mod tests; // TODO: handle big types -use crate::connection::TransportConnection; +use std::{future::Future, net::SocketAddr}; + use amqp_core::{connection::ConnectionEvent, queue::QueueEvent, GlobalData}; use anyhow::Context; -use std::{future::Future, net::SocketAddr}; use tokio::{net, net::TcpStream, select}; use tracing::{info, info_span, Instrument}; +use crate::connection::TransportConnection; + pub async fn do_thing_i_guess( global_data: GlobalData, terminate: impl Future + Send, diff --git a/amqp_transport/src/methods/generated.rs b/amqp_transport/src/methods/generated.rs index 566b520..82fda42 100644 --- a/amqp_transport/src/methods/generated.rs +++ b/amqp_transport/src/methods/generated.rs @@ -2,12 +2,13 @@ // This file has been generated by `xtask/src/codegen`. Do not edit it manually. pub mod parse { - use crate::{error::TransError, methods::parse_helper::*}; use amqp_core::methods::*; use nom::{branch::alt, bytes::complete::tag}; use once_cell::sync::Lazy; use regex::Regex; + use crate::{error::TransError, methods::parse_helper::*}; + pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; pub fn parse_method(input: &[u8]) -> Result<(&[u8], Method), nom::Err> { @@ -886,10 +887,12 @@ pub mod parse { } } pub mod write { - use crate::{error::TransError, methods::write_helper::*}; - use amqp_core::methods::*; use std::io::Write; + use amqp_core::methods::*; + + use crate::{error::TransError, methods::write_helper::*}; + pub fn write_method(method: &Method, mut writer: W) -> Result<(), TransError> { match method { Method::ConnectionStart(ConnectionStart { @@ -1303,10 +1306,11 @@ pub mod write { } mod random { - use crate::methods::RandomMethod; use amqp_core::methods::*; use rand::Rng; + use crate::methods::RandomMethod; + impl RandomMethod for Method { #[allow(unused_variables)] fn random(rng: &mut R) -> Self { diff --git a/amqp_transport/src/methods/mod.rs b/amqp_transport/src/methods/mod.rs index c219b44..4153db1 100644 --- a/amqp_transport/src/methods/mod.rs +++ b/amqp_transport/src/methods/mod.rs @@ -1,10 +1,11 @@ -use crate::error::TransError; use amqp_core::{ error::ConException, methods::{FieldValue, Method, Table}, }; use rand::Rng; +use crate::error::TransError; + mod generated; pub mod parse_helper; #[cfg(test)] diff --git a/amqp_transport/src/methods/parse_helper.rs b/amqp_transport/src/methods/parse_helper.rs index 5792085..56a24fb 100644 --- a/amqp_transport/src/methods/parse_helper.rs +++ b/amqp_transport/src/methods/parse_helper.rs @@ -1,4 +1,3 @@ -use crate::{error::TransError, methods::generated::parse::IResult}; use amqp_core::{ error::{ConException, ProtocolError}, methods::{ @@ -18,6 +17,8 @@ use nom::{ Err, }; +use crate::{error::TransError, methods::generated::parse::IResult}; + impl nom::error::ParseError for TransError { fn from_error_kind(_input: T, _kind: ErrorKind) -> Self { ConException::SyntaxError(vec![]).into() diff --git a/amqp_transport/src/methods/tests.rs b/amqp_transport/src/methods/tests.rs index 3620eb6..86a8216 100644 --- a/amqp_transport/src/methods/tests.rs +++ b/amqp_transport/src/methods/tests.rs @@ -1,10 +1,12 @@ // create random methods to test the ser/de code together. if they diverge, we have a bug // this is not perfect, if they both have the same bug it won't be found, but that's an ok tradeoff -use crate::methods::{FieldValue, Method, RandomMethod}; -use rand::SeedableRng; use std::collections::HashMap; +use rand::SeedableRng; + +use crate::methods::{FieldValue, Method, RandomMethod}; + #[test] fn pack_few_bits() { let bits = [true, false, true]; diff --git a/amqp_transport/src/methods/write_helper.rs b/amqp_transport/src/methods/write_helper.rs index f98d09b..69add00 100644 --- a/amqp_transport/src/methods/write_helper.rs +++ b/amqp_transport/src/methods/write_helper.rs @@ -1,7 +1,9 @@ -use crate::{error::TransError, methods::FieldValue}; +use std::io::Write; + use amqp_core::methods::{Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp}; use anyhow::Context; -use std::io::Write; + +use crate::{error::TransError, methods::FieldValue}; pub fn octet(value: &Octet, writer: &mut W) -> Result<(), TransError> { writer.write_all(&[*value])?; diff --git a/amqp_transport/src/sasl.rs b/amqp_transport/src/sasl.rs index 494c433..89a2a5e 100644 --- a/amqp_transport/src/sasl.rs +++ b/amqp_transport/src/sasl.rs @@ -2,9 +2,10 @@ //! //! Currently only supports PLAIN (see [RFC 4616](https://datatracker.ietf.org/doc/html/rfc4616)) -use crate::error::Result; use amqp_core::error::ConException; +use crate::error::Result; + pub struct PlainUser { pub authorization_identity: String, pub authentication_identity: String, diff --git a/amqp_transport/src/tests.rs b/amqp_transport/src/tests.rs index 3127d64..c44412c 100644 --- a/amqp_transport/src/tests.rs +++ b/amqp_transport/src/tests.rs @@ -1,9 +1,11 @@ -use crate::{frame, frame::FrameType, methods}; +use std::collections::HashMap; + use amqp_core::{ connection::ChannelNum, methods::{ConnectionStart, ConnectionStartOk, FieldValue, Method}, }; -use std::collections::HashMap; + +use crate::{frame, frame::FrameType, methods}; #[tokio::test] async fn write_start_ok_frame() { diff --git a/src/main.rs b/src/main.rs index dc00d1c..2ce6942 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,9 @@ #![warn(rust_2018_idioms)] +use std::str::FromStr; + use anyhow::Result; use clap::Parser; -use std::str::FromStr; use tracing::info; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry}; diff --git a/xtask/src/check_fmt.rs b/xtask/src/check_fmt.rs index a4d5808..8e861b2 100644 --- a/xtask/src/check_fmt.rs +++ b/xtask/src/check_fmt.rs @@ -1,7 +1,9 @@ -use crate::{project_root, yarn_install}; -use anyhow::ensure; use std::process::Command; +use anyhow::ensure; + +use crate::{project_root, yarn_install}; + pub fn main() -> anyhow::Result<()> { println!("$ cargo fmt --check"); let status = Command::new("cargo") diff --git a/xtask/src/codegen/mod.rs b/xtask/src/codegen/mod.rs index 5c672ea..3143594 100644 --- a/xtask/src/codegen/mod.rs +++ b/xtask/src/codegen/mod.rs @@ -4,8 +4,6 @@ mod parser; mod random; mod write; -use anyhow::{bail, Context}; -use heck::ToUpperCamelCase; use std::{ fs, fs::File, @@ -15,6 +13,9 @@ use std::{ process::Command, str::FromStr, }; + +use anyhow::{bail, Context}; +use heck::ToUpperCamelCase; use strong_xml::XmlRead; #[derive(Debug, XmlRead)] diff --git a/xtask/src/codegen/parser.rs b/xtask/src/codegen/parser.rs index 4234d1e..1914953 100644 --- a/xtask/src/codegen/parser.rs +++ b/xtask/src/codegen/parser.rs @@ -1,8 +1,9 @@ -use super::{Amqp, Assert, Class, Domain, Method}; -use crate::codegen::Codegen; use heck::{ToSnakeCase, ToUpperCamelCase}; use itertools::Itertools; +use super::{Amqp, Assert, Class, Domain, Method}; +use crate::codegen::Codegen; + fn method_function_name(class_name: &str) -> impl Fn(&Method) -> String + '_ { move |method| { let method_name = method.name.to_snake_case(); diff --git a/xtask/src/codegen/random.rs b/xtask/src/codegen/random.rs index 9ab1442..06226fb 100644 --- a/xtask/src/codegen/random.rs +++ b/xtask/src/codegen/random.rs @@ -1,6 +1,7 @@ -use crate::codegen::{Amqp, Codegen}; use heck::ToUpperCamelCase; +use crate::codegen::{Amqp, Codegen}; + impl Codegen { pub fn codegen_random(&mut self, amqp: &Amqp) { writeln!( diff --git a/xtask/src/codegen/write.rs b/xtask/src/codegen/write.rs index 2a31db5..8da7beb 100644 --- a/xtask/src/codegen/write.rs +++ b/xtask/src/codegen/write.rs @@ -1,6 +1,7 @@ -use crate::codegen::{Amqp, Codegen}; use heck::ToUpperCamelCase; +use crate::codegen::{Amqp, Codegen}; + impl Codegen { pub fn codegen_write(&mut self, amqp: &Amqp) { writeln!( diff --git a/xtask/src/fmt.rs b/xtask/src/fmt.rs index e425ef2..e59f90a 100644 --- a/xtask/src/fmt.rs +++ b/xtask/src/fmt.rs @@ -1,7 +1,9 @@ -use crate::{project_root, yarn_install}; -use anyhow::ensure; use std::process::Command; +use anyhow::ensure; + +use crate::{project_root, yarn_install}; + pub fn main() -> anyhow::Result<()> { println!("$ cargo fmt"); let status = Command::new("cargo") diff --git a/xtask/src/main.rs b/xtask/src/main.rs index 6303a03..fa6b1d7 100644 --- a/xtask/src/main.rs +++ b/xtask/src/main.rs @@ -1,9 +1,10 @@ -use anyhow::{ensure, Context, Result}; use std::{ path::{Path, PathBuf}, process::Command, }; +use anyhow::{ensure, Context, Result}; + mod check_fmt; mod codegen; mod fmt; diff --git a/xtask/src/test_js.rs b/xtask/src/test_js.rs index 6bed801..ddfdc9f 100644 --- a/xtask/src/test_js.rs +++ b/xtask/src/test_js.rs @@ -1,7 +1,9 @@ -use crate::{project_root, yarn_install}; -use anyhow::{ensure, Context, Result}; use std::{path::Path, process::Command, thread::sleep, time::Duration}; +use anyhow::{ensure, Context, Result}; + +use crate::{project_root, yarn_install}; + pub fn main() -> Result<()> { let project_root = project_root(); let test_js_root = project_root.join("test-js");