From 375551542f8ded99bc80b65b663174e806fa4dda Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Wed, 9 Feb 2022 11:32:51 +0100 Subject: [PATCH] connectoin --- amqp_transport/src/connection.rs | 58 ++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 amqp_transport/src/connection.rs diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs new file mode 100644 index 0000000..a910702 --- /dev/null +++ b/amqp_transport/src/connection.rs @@ -0,0 +1,58 @@ +use anyhow::{bail, ensure, Result}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use tracing::{debug, error}; + +pub struct Connection { + stream: TcpStream, +} + +impl Connection { + pub fn new(stream: TcpStream) -> Self { + Self { stream } + } + + pub async fn start(mut self) { + match self.run().await { + Ok(()) => {} + Err(err) => error!(%err, "Error during processing of connection"), + } + } + + pub async fn run(&mut self) -> Result<()> { + self.negotiate_version().await?; + Ok(()) + } + + async fn negotiate_version(&mut self) -> Result<()> { + const HEADER_SIZE: usize = 8; + const PROTOCOL_VERSION: &[u8] = &[0, 9, 1]; + const PROTOCOL_HEADER: &[u8] = b"AMQP\0\0\x09\x01"; + + debug!("Negotiating version"); + + let mut read_header_buf = [0; HEADER_SIZE]; + + self.stream.read_exact(&mut read_header_buf).await?; + + debug!(received_header = ?read_header_buf,"Received protocol header"); + + ensure!( + &read_header_buf[0..5] == b"AMQP\0", + "Received wrong protocol" + ); + + let version = &read_header_buf[5..8]; + + self.stream.write_all(PROTOCOL_HEADER).await?; + + if version == PROTOCOL_VERSION { + debug!(?version, "Version negotiation successful"); + Ok(()) + } else { + debug!(?version, expected_version = ?PROTOCOL_VERSION, "Version negotiation failed, unsupported version"); + self.stream.shutdown().await?; + bail!("Unsupported protocol version {:?}", version); + } + } +}