From f2195133fb712814468d357c3b282217f33cdf61 Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Sun, 20 Feb 2022 21:50:50 +0100 Subject: [PATCH] handle ChannelClose --- Cargo.lock | 1 + amqp_messaging/Cargo.toml | 3 ++- amqp_messaging/src/methods.rs | 8 +++++++- amqp_transport/src/connection.rs | 23 ++++++++++++++++++++++- amqp_transport/src/methods/generated.rs | 3 --- xtask/src/codegen/parser.rs | 7 ++++++- 6 files changed, 38 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 04a1a6e..5fce753 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,6 +48,7 @@ name = "amqp_messaging" version = "0.1.0" dependencies = [ "amqp_core", + "tokio", "tracing", ] diff --git a/amqp_messaging/Cargo.toml b/amqp_messaging/Cargo.toml index 371f3c9..4e17b35 100644 --- a/amqp_messaging/Cargo.toml +++ b/amqp_messaging/Cargo.toml @@ -7,4 +7,5 @@ edition = "2021" [dependencies] amqp_core = { path = "../amqp_core" } -tracing = "0.1.31" \ No newline at end of file +tracing = "0.1.31" +tokio = { version = "1.17.0", features = ["full"] } \ No newline at end of file diff --git a/amqp_messaging/src/methods.rs b/amqp_messaging/src/methods.rs index 28be325..700c97d 100644 --- a/amqp_messaging/src/methods.rs +++ b/amqp_messaging/src/methods.rs @@ -1,4 +1,10 @@ use amqp_core::methods::Method; use amqp_core::ChannelHandle; +use std::time::Duration; +use tokio::time; +use tracing::debug; -pub async fn handle_method(_channel_handle: ChannelHandle, _method: Method) {} +pub async fn handle_method(_channel_handle: ChannelHandle, _method: Method) { + debug!("handling method or something in that cool new future"); + time::sleep(Duration::from_secs(10)).await; +} diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 2628fca..3a02055 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -199,7 +199,6 @@ impl Connection { loop { debug!("Waiting for next frame"); let frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?; - debug!(?frame); self.reset_timeout(); match frame.kind { @@ -219,6 +218,7 @@ impl Connection { // todo: handle closing } Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?, + Method::ChannelClose { .. } => self.channel_close(frame.channel, method).await?, _ => { let channel_handle = self .channels @@ -283,6 +283,27 @@ impl Connection { Ok(()) } + async fn channel_close(&mut self, num: u16, method: Method) -> Result<()> { + if let Method::ChannelClose { + reply_code: code, + reply_text: reason, + .. + } = method + { + info!(%code, %reason, "Closing channel"); + + if let Some(channel) = self.channels.remove(&num) { + drop(channel); + self.send_method(num, Method::ChannelCloseOk).await?; + } else { + return Err(ConException::Todo.into_trans()); + } + } else { + unreachable!() + } + Ok(()) + } + fn reset_timeout(&mut self) { if self.heartbeat_delay != 0 { let next = Duration::from_secs(u64::from(self.heartbeat_delay)); diff --git a/amqp_transport/src/methods/generated.rs b/amqp_transport/src/methods/generated.rs index 9ab93ca..9da8657 100644 --- a/amqp_transport/src/methods/generated.rs +++ b/amqp_transport/src/methods/generated.rs @@ -66,9 +66,6 @@ pub mod parse { } fn domain_reply_code(input: &[u8]) -> IResult<'_, ReplyCode> { let (input, result) = short(input)?; - if result == 0 { - fail!("number was 0 for field result") - } Ok((input, result)) } fn domain_reply_text(input: &[u8]) -> IResult<'_, ReplyText> { diff --git a/xtask/src/codegen/parser.rs b/xtask/src/codegen/parser.rs index 58ed2b2..60499d2 100644 --- a/xtask/src/codegen/parser.rs +++ b/xtask/src/codegen/parser.rs @@ -93,7 +93,12 @@ pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; .ok(); for assert in &domain.asserts { - self.assert_check(assert, &type_name, "result"); + // channel.close requires a reply code, but there exists no reply code for + // a regular shutdown, and pythons `pika` just sends 0, even though the spec + // technically says that reply-code must be nonnull. Ignore that here. + if domain.name != "reply-code" { + self.assert_check(assert, &type_name, "result"); + } } writeln!(self.output, " Ok((input, result))").ok(); }