mirror of
https://github.com/Noratrieb/haesli.git
synced 2026-01-14 19:55:03 +01:00
handle ChannelClose
This commit is contained in:
parent
cb73214bc3
commit
f2195133fb
6 changed files with 38 additions and 7 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -48,6 +48,7 @@ name = "amqp_messaging"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"amqp_core",
|
"amqp_core",
|
||||||
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,4 +7,5 @@ edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
amqp_core = { path = "../amqp_core" }
|
amqp_core = { path = "../amqp_core" }
|
||||||
tracing = "0.1.31"
|
tracing = "0.1.31"
|
||||||
|
tokio = { version = "1.17.0", features = ["full"] }
|
||||||
|
|
@ -1,4 +1,10 @@
|
||||||
use amqp_core::methods::Method;
|
use amqp_core::methods::Method;
|
||||||
use amqp_core::ChannelHandle;
|
use amqp_core::ChannelHandle;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::time;
|
||||||
|
use tracing::debug;
|
||||||
|
|
||||||
pub async fn handle_method(_channel_handle: ChannelHandle, _method: Method) {}
|
pub async fn handle_method(_channel_handle: ChannelHandle, _method: Method) {
|
||||||
|
debug!("handling method or something in that cool new future");
|
||||||
|
time::sleep(Duration::from_secs(10)).await;
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -199,7 +199,6 @@ impl Connection {
|
||||||
loop {
|
loop {
|
||||||
debug!("Waiting for next frame");
|
debug!("Waiting for next frame");
|
||||||
let frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?;
|
let frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?;
|
||||||
debug!(?frame);
|
|
||||||
self.reset_timeout();
|
self.reset_timeout();
|
||||||
|
|
||||||
match frame.kind {
|
match frame.kind {
|
||||||
|
|
@ -219,6 +218,7 @@ impl Connection {
|
||||||
// todo: handle closing
|
// todo: handle closing
|
||||||
}
|
}
|
||||||
Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?,
|
Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?,
|
||||||
|
Method::ChannelClose { .. } => self.channel_close(frame.channel, method).await?,
|
||||||
_ => {
|
_ => {
|
||||||
let channel_handle = self
|
let channel_handle = self
|
||||||
.channels
|
.channels
|
||||||
|
|
@ -283,6 +283,27 @@ impl Connection {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn channel_close(&mut self, num: u16, method: Method) -> Result<()> {
|
||||||
|
if let Method::ChannelClose {
|
||||||
|
reply_code: code,
|
||||||
|
reply_text: reason,
|
||||||
|
..
|
||||||
|
} = method
|
||||||
|
{
|
||||||
|
info!(%code, %reason, "Closing channel");
|
||||||
|
|
||||||
|
if let Some(channel) = self.channels.remove(&num) {
|
||||||
|
drop(channel);
|
||||||
|
self.send_method(num, Method::ChannelCloseOk).await?;
|
||||||
|
} else {
|
||||||
|
return Err(ConException::Todo.into_trans());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
unreachable!()
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn reset_timeout(&mut self) {
|
fn reset_timeout(&mut self) {
|
||||||
if self.heartbeat_delay != 0 {
|
if self.heartbeat_delay != 0 {
|
||||||
let next = Duration::from_secs(u64::from(self.heartbeat_delay));
|
let next = Duration::from_secs(u64::from(self.heartbeat_delay));
|
||||||
|
|
|
||||||
|
|
@ -66,9 +66,6 @@ pub mod parse {
|
||||||
}
|
}
|
||||||
fn domain_reply_code(input: &[u8]) -> IResult<'_, ReplyCode> {
|
fn domain_reply_code(input: &[u8]) -> IResult<'_, ReplyCode> {
|
||||||
let (input, result) = short(input)?;
|
let (input, result) = short(input)?;
|
||||||
if result == 0 {
|
|
||||||
fail!("number was 0 for field result")
|
|
||||||
}
|
|
||||||
Ok((input, result))
|
Ok((input, result))
|
||||||
}
|
}
|
||||||
fn domain_reply_text(input: &[u8]) -> IResult<'_, ReplyText> {
|
fn domain_reply_text(input: &[u8]) -> IResult<'_, ReplyText> {
|
||||||
|
|
|
||||||
|
|
@ -93,7 +93,12 @@ pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>;
|
||||||
.ok();
|
.ok();
|
||||||
|
|
||||||
for assert in &domain.asserts {
|
for assert in &domain.asserts {
|
||||||
self.assert_check(assert, &type_name, "result");
|
// channel.close requires a reply code, but there exists no reply code for
|
||||||
|
// a regular shutdown, and pythons `pika` just sends 0, even though the spec
|
||||||
|
// technically says that reply-code must be nonnull. Ignore that here.
|
||||||
|
if domain.name != "reply-code" {
|
||||||
|
self.assert_check(assert, &type_name, "result");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
writeln!(self.output, " Ok((input, result))").ok();
|
writeln!(self.output, " Ok((input, result))").ok();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue