From 99ce586dec7be3ffd1ca01532141bf8dde3c00c3 Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Wed, 23 Feb 2022 20:08:03 +0100 Subject: [PATCH] send message! --- .github/workflows/ci.yml | 4 +- amqp_core/src/lib.rs | 2 +- amqp_core/src/message.rs | 10 +-- amqp_messaging/src/methods.rs | 10 ++- amqp_transport/src/connection.rs | 116 +++++++++++++++++++++--------- amqp_transport/src/frame.rs | 69 ++++++++++++------ amqp_transport/src/methods/mod.rs | 2 +- xtask/src/test_js.rs | 22 +++++- 8 files changed, 169 insertions(+), 66 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c975e90..f4c72be 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,5 +29,5 @@ jobs: run: cargo fmt --verbose --all -- --check - name: Run tests run: cargo test --verbose --all - # - name: Run client integration tests - # run: cargo xtask test-js + - name: Run client integration tests + run: cargo xtask test-js diff --git a/amqp_core/src/lib.rs b/amqp_core/src/lib.rs index f92416d..dd3ab6c 100644 --- a/amqp_core/src/lib.rs +++ b/amqp_core/src/lib.rs @@ -1,6 +1,6 @@ #![warn(rust_2018_idioms)] -mod message; +pub mod message; pub mod methods; use parking_lot::Mutex; diff --git a/amqp_core/src/message.rs b/amqp_core/src/message.rs index eba4c8d..2b190f2 100644 --- a/amqp_core/src/message.rs +++ b/amqp_core/src/message.rs @@ -8,13 +8,15 @@ use uuid::Uuid; pub type Message = Arc; +#[derive(Debug)] pub struct RawMessage { - id: Uuid, - properties: methods::Table, - routing: RoutingInformation, - content: SmallVec<[Bytes; 1]>, + pub id: Uuid, + pub properties: methods::Table, + pub routing: RoutingInformation, + pub content: SmallVec<[Bytes; 1]>, } +#[derive(Debug)] pub struct RoutingInformation { pub exchange: String, pub routing_key: String, diff --git a/amqp_messaging/src/methods.rs b/amqp_messaging/src/methods.rs index 700c97d..8d1248f 100644 --- a/amqp_messaging/src/methods.rs +++ b/amqp_messaging/src/methods.rs @@ -1,8 +1,16 @@ +use amqp_core::message::Message; use amqp_core::methods::Method; use amqp_core::ChannelHandle; use std::time::Duration; use tokio::time; -use tracing::debug; +use tracing::{debug, info}; + +pub async fn handle_basic_publish(_channel_handle: ChannelHandle, message: Message) { + info!( + ?message, + "Someone has summoned the almighty Basic.Publish handler" + ); +} pub async fn handle_method(_channel_handle: ChannelHandle, _method: Method) { debug!("handling method or something in that cool new future"); diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 57abbf3..3678eb3 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -2,6 +2,7 @@ use std::cmp::Ordering; use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; +use std::sync::Arc; use std::time::Duration; use anyhow::Context; @@ -13,6 +14,7 @@ use tokio::time; use tracing::{debug, error, info, warn}; use uuid::Uuid; +use amqp_core::message::{RawMessage, RoutingInformation}; use amqp_core::methods::{FieldValue, Method, Table}; use amqp_core::GlobalData; @@ -33,6 +35,8 @@ const CHANNEL_MAX: u16 = 0; const FRAME_SIZE_MAX: u32 = 0; const HEARTBEAT_DELAY: u16 = 0; +const BASIC_CLASS_ID: u16 = 60; + pub struct Channel { /// A handle to the global channel representation. Used to remove the channel when it's dropped handle: amqp_core::ChannelHandle, @@ -254,15 +258,12 @@ impl Connection { } Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?, Method::ChannelClose { .. } => self.channel_close(frame.channel, method).await?, - Method::BasicPublish { .. } => { - const BASIC_CLASS_ID: u16 = 60; - match self.channels.get_mut(&frame.channel) { - Some(channel) => { - channel.status = ChannelStatus::NeedHeader(BASIC_CLASS_ID, Box::new(method)) - } - None => return Err(ConException::Todo.into_trans()), + Method::BasicPublish { .. } => match self.channels.get_mut(&frame.channel) { + Some(channel) => { + channel.status = ChannelStatus::NeedHeader(BASIC_CLASS_ID, Box::new(method)) } - } + None => return Err(ConException::Todo.into_trans()), + }, _ => { let channel_handle = self .channels @@ -305,33 +306,84 @@ impl Connection { } fn dispatch_body(&mut self, frame: Frame) -> Result<()> { - self.channels + let channel = self + .channels .get_mut(&frame.channel) - .ok_or_else(|| ConException::Todo.into_trans()) - .and_then(|channel| match channel.status.take() { - ChannelStatus::Default => { - warn!(channel = %frame.channel, "unexpected body"); - Err(ConException::UnexpectedFrame.into_trans()) - } - ChannelStatus::NeedHeader(_, _) => { - warn!(channel = %frame.channel, "unexpected body"); - Err(ConException::UnexpectedFrame.into_trans()) - } - ChannelStatus::NeedsBody(_, header, mut vec) => { - vec.push(frame.payload); - match vec - .iter() - .map(Bytes::len) - .sum::() - .cmp(&usize::try_from(header.body_size).unwrap()) - { - Ordering::Equal => todo!("process body"), - Ordering::Greater => todo!("too much data!"), - Ordering::Less => {} // wait for next body + .ok_or_else(|| ConException::Todo.into_trans())?; + + match channel.status.take() { + ChannelStatus::Default => { + warn!(channel = %frame.channel, "unexpected body"); + Err(ConException::UnexpectedFrame.into_trans()) + } + ChannelStatus::NeedHeader(_, _) => { + warn!(channel = %frame.channel, "unexpected body"); + Err(ConException::UnexpectedFrame.into_trans()) + } + ChannelStatus::NeedsBody(method, header, mut vec) => { + vec.push(frame.payload); + match vec + .iter() + .map(Bytes::len) + .sum::() + .cmp(&usize::try_from(header.body_size).unwrap()) + { + Ordering::Equal => { + self.process_method_with_body(*method, *header, vec, frame.channel) } - Ok(()) + Ordering::Greater => Err(ConException::Todo.into_trans()), + Ordering::Less => Ok(()), // wait for next body } - }) + } + } + } + + fn process_method_with_body( + &mut self, + method: Method, + header: ContentHeader, + payloads: SmallVec<[Bytes; 1]>, + channel: ChannelId, + ) -> Result<()> { + // The only method with content that is sent to the server is Basic.Publish. + ensure_conn(header.class_id == BASIC_CLASS_ID)?; + + if let Method::BasicPublish { + exchange, + routing_key, + mandatory, + immediate, + .. + } = method + { + let message = RawMessage { + id: Uuid::from_bytes(rand::random()), + properties: header.property_fields, + routing: RoutingInformation { + exchange, + routing_key, + mandatory, + immediate, + }, + content: payloads, + }; + let message = Arc::new(message); + + let channel = self + .channels + .get(&channel) + .ok_or_else(|| ConException::Todo.into_trans())?; + + // Spawn the handler for the publish. The connection task goes back to handling + // just the connection. + tokio::spawn(amqp_messaging::methods::handle_basic_publish( + channel.handle.clone(), + message, + )); + Ok(()) + } else { + Err(ConException::Todo.into_trans()) + } } async fn channel_open(&mut self, channel_id: ChannelId) -> Result<()> { diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs index cd16ea6..df81f8b 100644 --- a/amqp_transport/src/frame.rs +++ b/amqp_transport/src/frame.rs @@ -56,42 +56,66 @@ pub enum FrameType { Heartbeat = 8, } -#[derive(Debug, Clone, PartialEq)] -pub struct BasicProperties { - content_type: Option, - content_encoding: Option, - headers: Option, - delivery_mode: Option, - priority: Option, - correlation_id: Option, - reply_to: Option, - expiration: Option, - message_id: Option, - timestamp: Option, - r#type: Option, - user_id: Option, - app_id: Option, - reserved: Option, -} - #[derive(Debug, Clone, PartialEq)] pub struct ContentHeader { pub class_id: u16, pub weight: u16, pub body_size: u64, - pub property_fields: BasicProperties, + pub property_fields: methods::Table, } mod content_header_parse { use crate::error::TransError; - use crate::frame::{BasicProperties, ContentHeader}; + use crate::frame::ContentHeader; + use crate::methods::parse_helper::{octet, shortstr, table, timestamp}; + use amqp_core::methods; + use amqp_core::methods::FieldValue::{FieldTable, ShortShortUInt, ShortString, Timestamp}; use nom::number::complete::{u16, u64}; use nom::number::Endianness::Big; type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; - pub fn basic_properties(_property_flags: u16, _input: &[u8]) -> IResult<'_, BasicProperties> { - todo!() + pub fn basic_properties(flags: u16, input: &[u8]) -> IResult<'_, methods::Table> { + macro_rules! parse_property { + (if $flags:ident >> $n:literal, $parser:ident($input:ident)?, $map:ident.insert($name:expr, $ctor:path)) => { + if (($flags >> $n) & 1) == 1 { + let (input, value) = $parser($input)?; + $map.insert(String::from($name), $ctor(value)); + input + } else { + $input + } + }; + } + + let mut map = methods::Table::new(); + + let input = parse_property!(if flags >> 15, shortstr(input)?, map.insert("content-type", ShortString)); + let input = parse_property!(if flags >> 14, shortstr(input)?, map.insert("content-encoding", ShortString)); + let input = + parse_property!(if flags >> 13, table(input)?, map.insert("headers", FieldTable)); + let input = parse_property!(if flags >> 12, octet(input)?, map.insert("delivery-mode", ShortShortUInt)); + let input = + parse_property!(if flags >> 11, octet(input)?, map.insert("priority", ShortShortUInt)); + let input = parse_property!(if flags >> 10, shortstr(input)?, map.insert("correlation-id", ShortString)); + let input = + parse_property!(if flags >> 9, shortstr(input)?, map.insert("reply-to", ShortString)); + let input = + parse_property!(if flags >> 8, shortstr(input)?, map.insert("expiration", ShortString)); + let input = + parse_property!(if flags >> 7, shortstr(input)?, map.insert("message-id", ShortString)); + let input = + parse_property!(if flags >> 6, timestamp(input)?, map.insert("timestamp", Timestamp)); + let input = + parse_property!(if flags >> 5, shortstr(input)?, map.insert("type", ShortString)); + let input = + parse_property!(if flags >> 4, shortstr(input)?, map.insert("user-id", ShortString)); + let input = + parse_property!(if flags >> 3, shortstr(input)?, map.insert("app-id", ShortString)); + let input = + parse_property!(if flags >> 2, shortstr(input)?, map.insert("reserved", ShortString)); + + Ok((input, map)) } pub fn header(input: &[u8]) -> IResult<'_, Box> { @@ -101,6 +125,7 @@ mod content_header_parse { // I do not quite understand this here. Apparently, there can be more than 15 flags? // But the Basic class only specifies 15, so idk. Don't care about this for now + // Todo: But probably later. let (input, property_flags) = u16(Big)(input)?; let (input, property_fields) = basic_properties(property_flags, input)?; diff --git a/amqp_transport/src/methods/mod.rs b/amqp_transport/src/methods/mod.rs index 0118c7c..f413f91 100644 --- a/amqp_transport/src/methods/mod.rs +++ b/amqp_transport/src/methods/mod.rs @@ -4,7 +4,7 @@ use rand::Rng; use std::collections::HashMap; mod generated; -mod parse_helper; +pub mod parse_helper; #[cfg(test)] mod tests; mod write_helper; diff --git a/xtask/src/test_js.rs b/xtask/src/test_js.rs index f61343f..cc94422 100644 --- a/xtask/src/test_js.rs +++ b/xtask/src/test_js.rs @@ -1,9 +1,25 @@ use crate::project_root; use anyhow::{bail, Context, Result}; -use std::process::Command; +use std::path::Path; +use std::process::{Command, Stdio}; pub fn main() -> Result<()> { - let test_js_root = project_root().join("test-js"); + let project_root = project_root(); + let test_js_root = project_root.join("test-js"); + + let mut amqp_server = Command::new("cargo") + .arg("run") + .spawn() + .context("`cargo run` amqp")?; + + let test_result = run_js(&test_js_root); + + amqp_server.kill()?; + + test_result +} + +fn run_js(test_js_root: &Path) -> Result<()> { let status = Command::new("yarn") .current_dir(&test_js_root) .status() @@ -16,9 +32,9 @@ pub fn main() -> Result<()> { .current_dir(&test_js_root) .status() .context("yarn test tests")?; + if !status.success() { bail!("yarn tests failed"); } - Ok(()) }