more cleanup

This commit is contained in:
nora 2022-02-26 23:47:47 +01:00
parent 439696cf3f
commit 14ad4e1011
11 changed files with 1199 additions and 1048 deletions

View file

@ -3,7 +3,11 @@ use crate::frame::{ContentHeader, Frame, FrameType};
use crate::{frame, methods, sasl};
use amqp_core::connection::{ChannelHandle, ChannelNum, ConnectionHandle, ConnectionId};
use amqp_core::message::{MessageId, RawMessage, RoutingInformation};
use amqp_core::methods::{FieldValue, Method, Table};
use amqp_core::methods::{
BasicPublish, ChannelClose, ChannelCloseOk, ChannelOpenOk, ConnectionClose, ConnectionCloseOk,
ConnectionOpen, ConnectionOpenOk, ConnectionStart, ConnectionStartOk, ConnectionTune,
ConnectionTuneOk, FieldValue, Method, Table,
};
use amqp_core::GlobalData;
use anyhow::Context;
use bytes::Bytes;
@ -134,7 +138,7 @@ impl Connection {
}
async fn start(&mut self) -> Result<()> {
let start_method = Method::ConnectionStart {
let start_method = Method::ConnectionStart(ConnectionStart {
version_major: 0,
version_minor: 9,
server_properties: server_properties(
@ -144,7 +148,7 @@ impl Connection {
),
mechanisms: "PLAIN".into(),
locales: "en_US".into(),
};
});
debug!(?start_method, "Sending Start method");
self.send_method(ChannelNum::zero(), start_method).await?;
@ -152,12 +156,12 @@ impl Connection {
let start_ok = self.recv_method().await?;
debug!(?start_ok, "Received Start-Ok");
if let Method::ConnectionStartOk {
if let Method::ConnectionStartOk(ConnectionStartOk {
mechanism,
locale,
response,
..
} = start_ok
}) = start_ok
{
ensure_conn(mechanism == "PLAIN")?;
ensure_conn(locale == "en_US")?;
@ -171,11 +175,11 @@ impl Connection {
}
async fn tune(&mut self) -> Result<()> {
let tune_method = Method::ConnectionTune {
let tune_method = Method::ConnectionTune(ConnectionTune {
channel_max: CHANNEL_MAX,
frame_max: FRAME_SIZE_MAX,
heartbeat: HEARTBEAT_DELAY,
};
});
debug!("Sending Tune method");
self.send_method(ChannelNum::zero(), tune_method).await?;
@ -183,11 +187,11 @@ impl Connection {
let tune_ok = self.recv_method().await?;
debug!(?tune_ok, "Received Tune-Ok method");
if let Method::ConnectionTuneOk {
if let Method::ConnectionTuneOk(ConnectionTuneOk {
channel_max,
frame_max,
heartbeat,
} = tune_ok
}) = tune_ok
{
self.channel_max = channel_max;
self.max_frame_size = usize::try_from(frame_max).unwrap();
@ -202,15 +206,15 @@ impl Connection {
let open = self.recv_method().await?;
debug!(?open, "Received Open method");
if let Method::ConnectionOpen { virtual_host, .. } = open {
if let Method::ConnectionOpen(ConnectionOpen { virtual_host, .. }) = open {
ensure_conn(virtual_host == "/")?;
}
self.send_method(
ChannelNum::zero(),
Method::ConnectionOpenOk {
Method::ConnectionOpenOk(ConnectionOpenOk {
reserved_1: "".to_string(),
},
}),
)
.await?;
@ -242,15 +246,18 @@ impl Connection {
.map(|channel| channel.status.take());
match method {
Method::ConnectionClose {
Method::ConnectionClose(ConnectionClose {
reply_code,
reply_text,
class_id,
method_id,
} => {
}) => {
info!(%reply_code, %reply_text, %class_id, %method_id, "Closing connection");
self.send_method(ChannelNum::zero(), Method::ConnectionCloseOk {})
.await?;
self.send_method(
ChannelNum::zero(),
Method::ConnectionCloseOk(ConnectionCloseOk),
)
.await?;
return Err(ProtocolError::GracefulClose.into());
}
Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?,
@ -344,13 +351,13 @@ impl Connection {
// The only method with content that is sent to the server is Basic.Publish.
ensure_conn(header.class_id == BASIC_CLASS_ID)?;
if let Method::BasicPublish {
if let Method::BasicPublish(BasicPublish {
exchange,
routing_key,
mandatory,
immediate,
..
} = method
}) = method
{
let message = RawMessage {
id: MessageId::random(),
@ -415,9 +422,9 @@ impl Connection {
self.send_method(
channel_num,
Method::ChannelOpenOk {
Method::ChannelOpenOk(ChannelOpenOk {
reserved_1: Vec::new(),
},
}),
)
.await?;
@ -425,17 +432,18 @@ impl Connection {
}
async fn channel_close(&mut self, channel_id: ChannelNum, method: Method) -> Result<()> {
if let Method::ChannelClose {
if let Method::ChannelClose(ChannelClose {
reply_code: code,
reply_text: reason,
..
} = method
}) = method
{
info!(%code, %reason, "Closing channel");
if let Some(channel) = self.channels.remove(&channel_id) {
drop(channel);
self.send_method(channel_id, Method::ChannelCloseOk).await?;
self.send_method(channel_id, Method::ChannelCloseOk(ChannelCloseOk))
.await?;
} else {
return Err(ConException::Todo.into());
}

File diff suppressed because it is too large Load diff

View file

@ -1,13 +1,13 @@
use crate::frame::FrameType;
use crate::{frame, methods};
use amqp_core::connection::ChannelNum;
use amqp_core::methods::{FieldValue, Method};
use amqp_core::methods::{ConnectionStart, ConnectionStartOk, FieldValue, Method};
use std::collections::HashMap;
#[tokio::test]
async fn write_start_ok_frame() {
let mut payload = Vec::new();
let method = Method::ConnectionStart {
let method = Method::ConnectionStart(ConnectionStart {
version_major: 0,
version_minor: 9,
server_properties: HashMap::from([(
@ -16,7 +16,7 @@ async fn write_start_ok_frame() {
)]),
mechanisms: "PLAIN".into(),
locales: "en_US".into(),
};
});
methods::write::write_method(method, &mut payload).unwrap();
@ -141,7 +141,7 @@ fn read_start_ok_payload() {
assert_eq!(
method,
Method::ConnectionStartOk {
Method::ConnectionStartOk(ConnectionStartOk {
client_properties: HashMap::from([
(
"product".to_string(),
@ -179,6 +179,6 @@ fn read_start_ok_payload() {
mechanism: "PLAIN".to_string(),
response: "\x00admin\x00".into(),
locale: "en_US".to_string()
}
})
);
}