minor fixes

This commit is contained in:
nora 2022-02-28 16:59:30 +01:00
parent 2be26f7e49
commit beb2187cd6
3 changed files with 17 additions and 6 deletions

View file

@ -46,7 +46,7 @@ pub struct Connection {
pub id: ConnectionId, pub id: ConnectionId,
pub peer_addr: SocketAddr, pub peer_addr: SocketAddr,
pub global_data: GlobalData, pub global_data: GlobalData,
pub channels: HashMap<u16, ChannelHandle>, pub channels: HashMap<ChannelNum, ChannelHandle>,
pub exclusive_queues: Vec<Queue>, pub exclusive_queues: Vec<Queue>,
} }

View file

@ -8,8 +8,8 @@ pub enum ProtocolError {
ConException(#[from] ConException), ConException(#[from] ConException),
#[error("{0}")] #[error("{0}")]
ChannelException(#[from] ChannelException), ChannelException(#[from] ChannelException),
#[error("Connection must be closed")] #[error("Protocol negotiation failed")]
CloseNow, ProtocolNegotiationFailed,
#[error("Graceful connection closing requested")] #[error("Graceful connection closing requested")]
GracefullyClosed, GracefullyClosed,
} }

View file

@ -474,7 +474,7 @@ impl Connection {
.unwrap() .unwrap()
.lock() .lock()
.channels .channels
.insert(channel_num.num(), channel_handle); .insert(channel_num, channel_handle);
} }
info!(%channel_num, "Opened new channel"); info!(%channel_num, "Opened new channel");
@ -522,6 +522,7 @@ impl Connection {
async fn negotiate_version(&mut self) -> Result<()> { async fn negotiate_version(&mut self) -> Result<()> {
const HEADER_SIZE: usize = 8; const HEADER_SIZE: usize = 8;
const SUPPORTED_PROTOCOL_VERSION: &[u8] = &[0, 9, 1]; const SUPPORTED_PROTOCOL_VERSION: &[u8] = &[0, 9, 1];
const AMQP_PROTOCOL: &[u8] = b"AMQP";
const OWN_PROTOCOL_HEADER: &[u8] = b"AMQP\0\0\x09\x01"; const OWN_PROTOCOL_HEADER: &[u8] = b"AMQP\0\0\x09\x01";
debug!("Negotiating version"); debug!("Negotiating version");
@ -535,8 +536,18 @@ impl Connection {
debug!(received_header = ?read_header_buf,"Received protocol header"); debug!(received_header = ?read_header_buf,"Received protocol header");
let protocol = &read_header_buf[0..4];
let version = &read_header_buf[5..8]; let version = &read_header_buf[5..8];
if protocol != AMQP_PROTOCOL {
self.stream
.write_all(OWN_PROTOCOL_HEADER)
.await
.context("write protocol header")?;
debug!(?protocol, "Version negotiation failed");
return Err(ProtocolError::ProtocolNegotiationFailed.into());
}
if &read_header_buf[0..5] == b"AMQP\0" && version == SUPPORTED_PROTOCOL_VERSION { if &read_header_buf[0..5] == b"AMQP\0" && version == SUPPORTED_PROTOCOL_VERSION {
debug!(?version, "Version negotiation successful"); debug!(?version, "Version negotiation successful");
Ok(()) Ok(())
@ -545,8 +556,8 @@ impl Connection {
.write_all(OWN_PROTOCOL_HEADER) .write_all(OWN_PROTOCOL_HEADER)
.await .await
.context("write protocol header")?; .context("write protocol header")?;
debug!(?version, expected_version = ?SUPPORTED_PROTOCOL_VERSION, "Version negotiation failed, unsupported version"); debug!(?version, expected_version = ?SUPPORTED_PROTOCOL_VERSION, "Version negotiation failed");
Err(ProtocolError::CloseNow.into()) Err(ProtocolError::ProtocolNegotiationFailed.into())
} }
} }
} }