content frames

This commit is contained in:
nora 2022-02-10 02:50:53 +01:00
parent 970fdbb9b5
commit 4cf7d7558b
7 changed files with 101 additions and 10 deletions

3
Cargo.lock generated
View file

@ -28,7 +28,9 @@ dependencies = [
name = "amqp_core" name = "amqp_core"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"bytes",
"parking_lot", "parking_lot",
"smallvec",
"uuid", "uuid",
] ]
@ -64,6 +66,7 @@ dependencies = [
"once_cell", "once_cell",
"rand", "rand",
"regex", "regex",
"smallvec",
"thiserror", "thiserror",
"tokio", "tokio",
"tracing", "tracing",

View file

@ -6,5 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
bytes = "1.1.0"
parking_lot = "0.12.0" parking_lot = "0.12.0"
smallvec = { version = "1.8.0", features = ["union"] }
uuid = "0.8.2" uuid = "0.8.2"

View file

@ -1,5 +1,6 @@
#![warn(rust_2018_idioms)] #![warn(rust_2018_idioms)]
mod message;
pub mod methods; pub mod methods;
use parking_lot::Mutex; use parking_lot::Mutex;

23
amqp_core/src/message.rs Normal file
View file

@ -0,0 +1,23 @@
#![allow(dead_code)]
use crate::methods;
use bytes::Bytes;
use smallvec::SmallVec;
use std::sync::Arc;
use uuid::Uuid;
pub type Message = Arc<RawMessage>;
pub struct RawMessage {
id: Uuid,
properties: methods::Table,
routing: RoutingInformation,
content: SmallVec<[Bytes; 1]>,
}
pub struct RoutingInformation {
pub exchange: String,
pub routing_key: String,
pub mandatory: bool,
pub immediate: bool,
}

View file

@ -14,6 +14,7 @@ nom = "7.1.0"
once_cell = "1.9.0" once_cell = "1.9.0"
rand = "0.8.4" rand = "0.8.4"
regex = "1.5.4" regex = "1.5.4"
smallvec = { version = "1.8.0", features = ["union"] }
thiserror = "1.0.30" thiserror = "1.0.30"
tokio = { version = "1.16.1", features = ["full"] } tokio = { version = "1.16.1", features = ["full"] }
tracing = "0.1.30" tracing = "0.1.30"

View file

@ -1,19 +1,25 @@
use crate::error::{ConException, ProtocolError, Result}; use std::cmp::Ordering;
use crate::frame::{Frame, FrameType};
use crate::{frame, methods, sasl};
use amqp_core::methods::{FieldValue, Method, Table};
use amqp_core::GlobalData;
use anyhow::Context;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use std::time::Duration; use std::time::Duration;
use anyhow::Context;
use bytes::Bytes;
use smallvec::SmallVec;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::time; use tokio::time;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use uuid::Uuid; use uuid::Uuid;
use amqp_core::methods::{FieldValue, Method, Table};
use amqp_core::GlobalData;
use crate::error::{ConException, ProtocolError, Result};
use crate::frame::{ContentHeader, Frame, FrameType};
use crate::{frame, methods, sasl};
fn ensure_conn(condition: bool) -> Result<()> { fn ensure_conn(condition: bool) -> Result<()> {
if condition { if condition {
Ok(()) Ok(())
@ -47,6 +53,12 @@ pub struct Connection {
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
enum WaitForBodyStatus {
Method(Method),
Header(Method, ContentHeader, SmallVec<[Bytes; 1]>),
None,
}
impl Connection { impl Connection {
pub fn new( pub fn new(
id: Uuid, id: Uuid,
@ -196,6 +208,9 @@ impl Connection {
} }
async fn main_loop(&mut self) -> Result<()> { async fn main_loop(&mut self) -> Result<()> {
// todo: find out how header/body frames can interleave between channels
let mut wait_for_body = WaitForBodyStatus::None;
loop { loop {
debug!("Waiting for next frame"); debug!("Waiting for next frame");
let frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?; let frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?;
@ -203,14 +218,42 @@ impl Connection {
self.reset_timeout(); self.reset_timeout();
match frame.kind { match frame.kind {
FrameType::Method => self.dispatch_method(frame).await?, FrameType::Method => wait_for_body = self.dispatch_method(frame).await?,
FrameType::Heartbeat => {} FrameType::Heartbeat => {}
_ => warn!(frame_type = ?frame.kind, "TODO"), FrameType::Header => match wait_for_body {
WaitForBodyStatus::None => warn!(channel = %frame.channel, "unexpected header"),
WaitForBodyStatus::Method(method) => {
wait_for_body =
WaitForBodyStatus::Header(method, ContentHeader::new(), SmallVec::new())
}
WaitForBodyStatus::Header(_, _, _) => {
warn!(channel = %frame.channel, "already got header")
}
},
FrameType::Body => match &mut wait_for_body {
WaitForBodyStatus::None => warn!(channel = %frame.channel, "unexpected body"),
WaitForBodyStatus::Method(_) => {
warn!(channel = %frame.channel, "unexpected body")
}
WaitForBodyStatus::Header(_, header, vec) => {
vec.push(frame.payload);
match vec
.iter()
.map(Bytes::len)
.sum::<usize>()
.cmp(&usize::try_from(header.body_size).unwrap())
{
Ordering::Equal => todo!("process body"),
Ordering::Greater => todo!("too much data!"),
Ordering::Less => {} // wait for next body
}
}
},
} }
} }
} }
async fn dispatch_method(&mut self, frame: Frame) -> Result<()> { async fn dispatch_method(&mut self, frame: Frame) -> Result<WaitForBodyStatus> {
let method = methods::parse_method(&frame.payload)?; let method = methods::parse_method(&frame.payload)?;
debug!(?method, "Received method"); debug!(?method, "Received method");
@ -219,6 +262,7 @@ impl Connection {
// todo: handle closing // todo: handle closing
} }
Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?, Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?,
Method::BasicPublish { .. } => return Ok(WaitForBodyStatus::Method(method)),
_ => { _ => {
let channel_handle = self let channel_handle = self
.channels .channels
@ -235,7 +279,7 @@ impl Connection {
} }
} }
Ok(()) Ok(WaitForBodyStatus::None)
} }
async fn channel_open(&mut self, num: u16) -> Result<()> { async fn channel_open(&mut self, num: u16) -> Result<()> {

View file

@ -1,6 +1,8 @@
use crate::error::{ConException, ProtocolError, Result}; use crate::error::{ConException, ProtocolError, Result};
use amqp_core::methods::FieldValue;
use anyhow::Context; use anyhow::Context;
use bytes::Bytes; use bytes::Bytes;
use smallvec::SmallVec;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::trace; use tracing::trace;
@ -31,6 +33,21 @@ pub enum FrameType {
Heartbeat = 8, Heartbeat = 8,
} }
#[derive(Debug, Clone, PartialEq)]
pub struct ContentHeader {
pub class_id: u16,
pub weight: u16,
pub body_size: u64,
pub property_flags: SmallVec<[u16; 1]>,
pub property_fields: Vec<FieldValue>,
}
impl ContentHeader {
pub fn new() -> Self {
todo!()
}
}
pub async fn write_frame<W>(frame: &Frame, mut w: W) -> Result<()> pub async fn write_frame<W>(frame: &Frame, mut w: W) -> Result<()>
where where
W: AsyncWriteExt + Unpin, W: AsyncWriteExt + Unpin,