diff --git a/Cargo.lock b/Cargo.lock index 4c6a23f..6c38649 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,21 +17,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "amqp" -version = "0.1.0" -dependencies = [ - "anyhow", - "clap 3.1.6", - "haesli_core", - "haesli_dashboard", - "haesli_transport", - "tokio", - "tracing", - "tracing-subscriber", - "tracing-tree", -] - [[package]] name = "ansi_term" version = "0.12.1" @@ -434,6 +419,21 @@ dependencies = [ "wasi 0.10.2+wasi-snapshot-preview1", ] +[[package]] +name = "haesli" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap 3.1.6", + "haesli_core", + "haesli_dashboard", + "haesli_transport", + "tokio", + "tracing", + "tracing-subscriber", + "tracing-tree", +] + [[package]] name = "haesli_core" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 327a416..99b5f86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ members = [ ] [package] -name = "amqp" +name = "haesli" version = "0.1.0" edition = "2021" diff --git a/haesli_messaging/src/methods/queue.rs b/haesli_messaging/src/methods/queue.rs index 75409e0..dddb5aa 100644 --- a/haesli_messaging/src/methods/queue.rs +++ b/haesli_messaging/src/methods/queue.rs @@ -9,6 +9,7 @@ use haesli_core::{ }; use parking_lot::Mutex; use tokio::sync::mpsc; +use tracing::debug; use crate::{queue_worker::QueueTask, Result}; @@ -25,7 +26,7 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result } = queue_declare; // 2.1.4.1 - If no queue name is given, chose a name - let queue_name = if queue_name.is_empty() { + let queue_name = if !queue_name.is_empty() { queue_name } else { format!("q_{}", haesli_core::random_uuid()) @@ -63,6 +64,8 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result event_send, }); + debug!(%queue_name, "Creating queue"); + { let mut global_data_lock = global_data.lock(); diff --git a/haesli_transport/src/connection.rs b/haesli_transport/src/connection.rs index a2de542..2832f46 100644 --- a/haesli_transport/src/connection.rs +++ b/haesli_transport/src/connection.rs @@ -224,7 +224,9 @@ impl TransportConnection { } async fn recv_method(&mut self) -> Result { - let start_ok_frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?; + let start_ok_frame = frame::read_frame(&mut self.stream, self.max_frame_size) + .await + .context("read from stream, peer disconnected")?; ensure_conn(start_ok_frame.kind == FrameType::Method)?; @@ -320,7 +322,7 @@ impl TransportConnection { loop { select! { frame = frame::read_frame(&mut self.stream, self.max_frame_size) => { - let frame = frame?; + let frame = frame.context("read from stream, peer disconnected")?; self.handle_frame(frame).await?; } queued_method = self.event_receiver.recv() => { diff --git a/haesli_transport/src/frame.rs b/haesli_transport/src/frame.rs index 56583aa..89caa5b 100644 --- a/haesli_transport/src/frame.rs +++ b/haesli_transport/src/frame.rs @@ -283,15 +283,15 @@ pub async fn read_frame(r: &mut R, max_frame_size: MaxFrameSize) -> Result Result<()> { .context("cargo build")?; ensure!(status.success(), "cargo build failed"); - let mut haesli_server = Command::new("target/debug/amqp") + let server_binary = project_root.join("target/debug/haesli"); + let mut server_process = Command::new(&server_binary) .env("RUST_LOG", "trace") .spawn() - .context("target/debug/amqp run")?; + .context(server_binary.display().to_string()) + .context("run server binary")?; // give it time for startup - sleep(Duration::from_secs(1)); + thread::sleep(Duration::from_secs(1)); let test_result = run_js(&test_js_root); - haesli_server.kill().context("killing amqp server")?; + server_process.kill().context("killing amqp server")?; test_result }