parse frame payload

This commit is contained in:
nora 2022-02-09 14:51:44 +01:00
parent 706219c046
commit ccee6c36f6
3 changed files with 55 additions and 33 deletions

View file

@ -1,4 +1,4 @@
use crate::error::{ConError, ProtocolError};
use crate::error::{ProtocolError, TransError};
use crate::frame;
use anyhow::{ensure, Context};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -21,7 +21,7 @@ impl Connection {
}
}
pub async fn run(&mut self) -> Result<(), ConError> {
pub async fn run(&mut self) -> Result<(), TransError> {
self.negotiate_version().await?;
loop {
@ -30,7 +30,7 @@ impl Connection {
}
}
async fn negotiate_version(&mut self) -> Result<(), ConError> {
async fn negotiate_version(&mut self) -> Result<(), TransError> {
const HEADER_SIZE: usize = 8;
const PROTOCOL_VERSION: &[u8] = &[0, 9, 1];
const PROTOCOL_HEADER: &[u8] = b"AMQP\0\0\x09\x01";

View file

@ -1,5 +1,5 @@
#[derive(Debug, thiserror::Error)]
pub enum ConError {
pub enum TransError {
#[error("{0}")]
Invalid(#[from] ProtocolError),
#[error("connection error: `{0}`")]
@ -24,6 +24,10 @@ pub enum ConException {
FrameError,
#[error("503 Command invalid")]
CommandInvalid,
#[error("503 Syntax error")]
SyntaxError,
#[error("504 Channel error")]
ChannelError,
}
#[derive(Debug, thiserror::Error)]

View file

@ -1,45 +1,48 @@
use crate::error::{ConError, ConException, ProtocolError};
use crate::error::{ConException, ProtocolError, TransError};
use anyhow::Context;
use tokio::io::AsyncReadExt;
const REQUIRED_FRAME_END: u8 = 0xCE;
#[derive(Debug, Clone, PartialEq, Eq)]
#[repr(u8)]
pub enum FrameType {
Method = 1,
Header = 2,
Body = 3,
Heartbeat = 4,
}
impl TryFrom<u8> for FrameType {
type Error = ConError;
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
1 => Self::Method,
2 => Self::Header,
3 => Self::Body,
4 => Self::Heartbeat,
_ => return Err(ProtocolError::Fatal.into()),
})
}
mod frame_type {
pub const METHOD: u8 = 1;
pub const HEADER: u8 = 2;
pub const BODY: u8 = 3;
pub const HEARTBEAT: u8 = 4;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Frame {
r#type: FrameType,
/// type
kind: FrameType,
channel: u16,
size: u32,
payload: Vec<u8>,
}
pub async fn read_frame<R>(r: &mut R, max_frame_size: usize) -> Result<Frame, ConError>
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FrameType {
/// 1
Method { class_id: u16, method_id: u16 },
/// 2
Header {
class_id: u16,
/// Unused, must always be 0
weight: u16,
body_size: u64,
/// Ordered from high to low
property_flags: u16,
},
/// 3
Body,
/// 4
Heartbeat,
}
pub async fn read_frame<R>(r: &mut R, max_frame_size: usize) -> Result<Frame, TransError>
where
R: AsyncReadExt + Unpin,
{
let r#type = r.read_u8().await.context("read type")?;
let kind = r.read_u8().await.context("read type")?;
let channel = r.read_u16().await.context("read channel")?;
let size = r.read_u32().await.context("read size")?;
@ -56,14 +59,29 @@ where
return Err(ProtocolError::ConException(ConException::FrameError).into());
}
let kind = parse_frame_type(kind, &payload)?;
Ok(Frame {
r#type: r#type.try_into()?,
kind,
channel,
size,
payload,
})
}
fn parse_frame_type(kind: u8, payload: &[u8], channel: u16) -> Result<FrameType, TransError> {
match kind {
frame_type::METHOD => todo!(),
frame_type::HEARTBEAT => {
if channel != 0 {
Err(ProtocolError::ConException(ConException::FrameError).into())
} else {
Ok(FrameType::Heartbeat)
}
}
_ => todo!(),
}
}
#[cfg(test)]
mod tests {
use crate::frame::{Frame, FrameType};
@ -88,7 +106,7 @@ mod tests {
assert_eq!(
frame,
Frame {
r#type: FrameType::Method,
kind: FrameType::Method,
channel: 0,
size: 3,
payload: vec![1, 2, 3],