diff --git a/Cargo.lock b/Cargo.lock index 8b6f3b6..8b3cc6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -43,11 +43,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "amqp_messaging" +version = "0.1.0" +dependencies = [ + "amqp_core", + "tracing", +] + [[package]] name = "amqp_transport" version = "0.1.0" dependencies = [ "amqp_core", + "amqp_messaging", "anyhow", "criterion", "nom", @@ -528,6 +537,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.9" @@ -843,6 +861,9 @@ name = "regex-automata" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax", +] [[package]] name = "regex-syntax" @@ -1220,9 +1241,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74786ce43333fcf51efe947aed9718fbe46d5c7328ec3f1029e818083966d9aa" dependencies = [ "ansi_term", + "lazy_static", + "matchers", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/Cargo.toml b/Cargo.toml index eec63dd..930220a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = [".", "amqp_core", "amqp_dashboard", "amqp_transport","xtask"] +members = [".", "amqp_core", "amqp_dashboard", "amqp_messaging", "amqp_transport","xtask"] [package] name = "amqp" @@ -15,4 +15,4 @@ amqp_dashboard = { path = "./amqp_dashboard" } amqp_transport = { path = "./amqp_transport" } tokio = { version = "1.16.1", features = ["full"] } tracing = "0.1.30" -tracing-subscriber = "0.3.8" +tracing-subscriber = { version = "0.3.8", features = ["env-filter"] } diff --git a/amqp_messaging/Cargo.toml b/amqp_messaging/Cargo.toml new file mode 100644 index 0000000..371f3c9 --- /dev/null +++ b/amqp_messaging/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "amqp_messaging" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +amqp_core = { path = "../amqp_core" } +tracing = "0.1.31" \ No newline at end of file diff --git a/amqp_messaging/src/lib.rs b/amqp_messaging/src/lib.rs new file mode 100644 index 0000000..020bd3b --- /dev/null +++ b/amqp_messaging/src/lib.rs @@ -0,0 +1,3 @@ +#![warn(rust_2018_idioms)] + +mod method; diff --git a/amqp_messaging/src/method.rs b/amqp_messaging/src/method.rs new file mode 100644 index 0000000..b49b6ab --- /dev/null +++ b/amqp_messaging/src/method.rs @@ -0,0 +1,3 @@ +use amqp_core::ChannelHandle; + +pub async fn handle_method(channel_handle: ChannelHandle) {} diff --git a/amqp_transport/Cargo.toml b/amqp_transport/Cargo.toml index cb0199c..a69d7a7 100644 --- a/amqp_transport/Cargo.toml +++ b/amqp_transport/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] amqp_core = { path = "../amqp_core" } +amqp_messaging = { path = "../amqp_messaging" } anyhow = "1.0.53" nom = "7.1.0" once_cell = "1.9.0" diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 6662933..7ba509c 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -197,6 +197,7 @@ impl Connection { async fn main_loop(&mut self) -> Result<()> { loop { + debug!("Waiting for next frame"); let frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?; debug!(?frame); self.reset_timeout(); @@ -218,8 +219,8 @@ impl Connection { // todo: handle closing } Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?, - _ => { + tokio::spawn(amqp_core::method::handle_method()) // we don't handle this here, forward it to *somewhere* } } @@ -269,8 +270,6 @@ impl Connection { ) .await?; - time::sleep(Duration::from_secs(1000)).await; // for debugging the dashboard - Ok(()) } diff --git a/src/main.rs b/src/main.rs index 31638d4..a18e469 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,25 +2,21 @@ use anyhow::Result; use std::env; -use tracing::Level; use tracing::{info_span, Instrument}; #[tokio::main] async fn main() -> Result<()> { let mut dashboard = false; - let mut level = Level::INFO; for arg in env::args().skip(1) { match arg.as_str() { - "--debug" => level = Level::DEBUG, - "--trace" => level = Level::TRACE, "--dashboard" => dashboard = true, "ignore-this-clippy" => eprintln!("yes please"), _ => {} } } - setup_tracing(level); + setup_tracing(); let global_data = amqp_core::GlobalData::default(); @@ -32,12 +28,14 @@ async fn main() -> Result<()> { amqp_transport::do_thing_i_guess(global_data).await } -fn setup_tracing(level: Level) { +fn setup_tracing() { tracing_subscriber::fmt() .with_level(true) .with_timer(tracing_subscriber::fmt::time::time()) .with_ansi(true) .with_thread_names(true) - .with_max_level(level) + .with_env_filter( + std::env::var("RUST_LOG").unwrap_or_else(|_| "hyper=info,debug".to_string()), + ) .init() }