send message!

This commit is contained in:
nora 2022-02-23 20:08:03 +01:00
parent b50634841d
commit 99ce586dec
8 changed files with 169 additions and 66 deletions

View file

@ -29,5 +29,5 @@ jobs:
run: cargo fmt --verbose --all -- --check run: cargo fmt --verbose --all -- --check
- name: Run tests - name: Run tests
run: cargo test --verbose --all run: cargo test --verbose --all
# - name: Run client integration tests - name: Run client integration tests
# run: cargo xtask test-js run: cargo xtask test-js

View file

@ -1,6 +1,6 @@
#![warn(rust_2018_idioms)] #![warn(rust_2018_idioms)]
mod message; pub mod message;
pub mod methods; pub mod methods;
use parking_lot::Mutex; use parking_lot::Mutex;

View file

@ -8,13 +8,15 @@ use uuid::Uuid;
pub type Message = Arc<RawMessage>; pub type Message = Arc<RawMessage>;
#[derive(Debug)]
pub struct RawMessage { pub struct RawMessage {
id: Uuid, pub id: Uuid,
properties: methods::Table, pub properties: methods::Table,
routing: RoutingInformation, pub routing: RoutingInformation,
content: SmallVec<[Bytes; 1]>, pub content: SmallVec<[Bytes; 1]>,
} }
#[derive(Debug)]
pub struct RoutingInformation { pub struct RoutingInformation {
pub exchange: String, pub exchange: String,
pub routing_key: String, pub routing_key: String,

View file

@ -1,8 +1,16 @@
use amqp_core::message::Message;
use amqp_core::methods::Method; use amqp_core::methods::Method;
use amqp_core::ChannelHandle; use amqp_core::ChannelHandle;
use std::time::Duration; use std::time::Duration;
use tokio::time; use tokio::time;
use tracing::debug; use tracing::{debug, info};
pub async fn handle_basic_publish(_channel_handle: ChannelHandle, message: Message) {
info!(
?message,
"Someone has summoned the almighty Basic.Publish handler"
);
}
pub async fn handle_method(_channel_handle: ChannelHandle, _method: Method) { pub async fn handle_method(_channel_handle: ChannelHandle, _method: Method) {
debug!("handling method or something in that cool new future"); debug!("handling method or something in that cool new future");

View file

@ -2,6 +2,7 @@ use std::cmp::Ordering;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use anyhow::Context; use anyhow::Context;
@ -13,6 +14,7 @@ use tokio::time;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use uuid::Uuid; use uuid::Uuid;
use amqp_core::message::{RawMessage, RoutingInformation};
use amqp_core::methods::{FieldValue, Method, Table}; use amqp_core::methods::{FieldValue, Method, Table};
use amqp_core::GlobalData; use amqp_core::GlobalData;
@ -33,6 +35,8 @@ const CHANNEL_MAX: u16 = 0;
const FRAME_SIZE_MAX: u32 = 0; const FRAME_SIZE_MAX: u32 = 0;
const HEARTBEAT_DELAY: u16 = 0; const HEARTBEAT_DELAY: u16 = 0;
const BASIC_CLASS_ID: u16 = 60;
pub struct Channel { pub struct Channel {
/// A handle to the global channel representation. Used to remove the channel when it's dropped /// A handle to the global channel representation. Used to remove the channel when it's dropped
handle: amqp_core::ChannelHandle, handle: amqp_core::ChannelHandle,
@ -254,15 +258,12 @@ impl Connection {
} }
Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?, Method::ChannelOpen { .. } => self.channel_open(frame.channel).await?,
Method::ChannelClose { .. } => self.channel_close(frame.channel, method).await?, Method::ChannelClose { .. } => self.channel_close(frame.channel, method).await?,
Method::BasicPublish { .. } => { Method::BasicPublish { .. } => match self.channels.get_mut(&frame.channel) {
const BASIC_CLASS_ID: u16 = 60; Some(channel) => {
match self.channels.get_mut(&frame.channel) { channel.status = ChannelStatus::NeedHeader(BASIC_CLASS_ID, Box::new(method))
Some(channel) => {
channel.status = ChannelStatus::NeedHeader(BASIC_CLASS_ID, Box::new(method))
}
None => return Err(ConException::Todo.into_trans()),
} }
} None => return Err(ConException::Todo.into_trans()),
},
_ => { _ => {
let channel_handle = self let channel_handle = self
.channels .channels
@ -305,33 +306,84 @@ impl Connection {
} }
fn dispatch_body(&mut self, frame: Frame) -> Result<()> { fn dispatch_body(&mut self, frame: Frame) -> Result<()> {
self.channels let channel = self
.channels
.get_mut(&frame.channel) .get_mut(&frame.channel)
.ok_or_else(|| ConException::Todo.into_trans()) .ok_or_else(|| ConException::Todo.into_trans())?;
.and_then(|channel| match channel.status.take() {
ChannelStatus::Default => { match channel.status.take() {
warn!(channel = %frame.channel, "unexpected body"); ChannelStatus::Default => {
Err(ConException::UnexpectedFrame.into_trans()) warn!(channel = %frame.channel, "unexpected body");
} Err(ConException::UnexpectedFrame.into_trans())
ChannelStatus::NeedHeader(_, _) => { }
warn!(channel = %frame.channel, "unexpected body"); ChannelStatus::NeedHeader(_, _) => {
Err(ConException::UnexpectedFrame.into_trans()) warn!(channel = %frame.channel, "unexpected body");
} Err(ConException::UnexpectedFrame.into_trans())
ChannelStatus::NeedsBody(_, header, mut vec) => { }
vec.push(frame.payload); ChannelStatus::NeedsBody(method, header, mut vec) => {
match vec vec.push(frame.payload);
.iter() match vec
.map(Bytes::len) .iter()
.sum::<usize>() .map(Bytes::len)
.cmp(&usize::try_from(header.body_size).unwrap()) .sum::<usize>()
{ .cmp(&usize::try_from(header.body_size).unwrap())
Ordering::Equal => todo!("process body"), {
Ordering::Greater => todo!("too much data!"), Ordering::Equal => {
Ordering::Less => {} // wait for next body self.process_method_with_body(*method, *header, vec, frame.channel)
} }
Ok(()) Ordering::Greater => Err(ConException::Todo.into_trans()),
Ordering::Less => Ok(()), // wait for next body
} }
}) }
}
}
fn process_method_with_body(
&mut self,
method: Method,
header: ContentHeader,
payloads: SmallVec<[Bytes; 1]>,
channel: ChannelId,
) -> Result<()> {
// The only method with content that is sent to the server is Basic.Publish.
ensure_conn(header.class_id == BASIC_CLASS_ID)?;
if let Method::BasicPublish {
exchange,
routing_key,
mandatory,
immediate,
..
} = method
{
let message = RawMessage {
id: Uuid::from_bytes(rand::random()),
properties: header.property_fields,
routing: RoutingInformation {
exchange,
routing_key,
mandatory,
immediate,
},
content: payloads,
};
let message = Arc::new(message);
let channel = self
.channels
.get(&channel)
.ok_or_else(|| ConException::Todo.into_trans())?;
// Spawn the handler for the publish. The connection task goes back to handling
// just the connection.
tokio::spawn(amqp_messaging::methods::handle_basic_publish(
channel.handle.clone(),
message,
));
Ok(())
} else {
Err(ConException::Todo.into_trans())
}
} }
async fn channel_open(&mut self, channel_id: ChannelId) -> Result<()> { async fn channel_open(&mut self, channel_id: ChannelId) -> Result<()> {

View file

@ -56,42 +56,66 @@ pub enum FrameType {
Heartbeat = 8, Heartbeat = 8,
} }
#[derive(Debug, Clone, PartialEq)]
pub struct BasicProperties {
content_type: Option<methods::Shortstr>,
content_encoding: Option<methods::Shortstr>,
headers: Option<methods::Table>,
delivery_mode: Option<methods::Octet>,
priority: Option<methods::Octet>,
correlation_id: Option<methods::Shortstr>,
reply_to: Option<methods::Shortstr>,
expiration: Option<methods::Shortstr>,
message_id: Option<methods::Shortstr>,
timestamp: Option<methods::Timestamp>,
r#type: Option<methods::Shortstr>,
user_id: Option<methods::Shortstr>,
app_id: Option<methods::Shortstr>,
reserved: Option<methods::Shortstr>,
}
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub struct ContentHeader { pub struct ContentHeader {
pub class_id: u16, pub class_id: u16,
pub weight: u16, pub weight: u16,
pub body_size: u64, pub body_size: u64,
pub property_fields: BasicProperties, pub property_fields: methods::Table,
} }
mod content_header_parse { mod content_header_parse {
use crate::error::TransError; use crate::error::TransError;
use crate::frame::{BasicProperties, ContentHeader}; use crate::frame::ContentHeader;
use crate::methods::parse_helper::{octet, shortstr, table, timestamp};
use amqp_core::methods;
use amqp_core::methods::FieldValue::{FieldTable, ShortShortUInt, ShortString, Timestamp};
use nom::number::complete::{u16, u64}; use nom::number::complete::{u16, u64};
use nom::number::Endianness::Big; use nom::number::Endianness::Big;
type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>;
pub fn basic_properties(_property_flags: u16, _input: &[u8]) -> IResult<'_, BasicProperties> { pub fn basic_properties(flags: u16, input: &[u8]) -> IResult<'_, methods::Table> {
todo!() macro_rules! parse_property {
(if $flags:ident >> $n:literal, $parser:ident($input:ident)?, $map:ident.insert($name:expr, $ctor:path)) => {
if (($flags >> $n) & 1) == 1 {
let (input, value) = $parser($input)?;
$map.insert(String::from($name), $ctor(value));
input
} else {
$input
}
};
}
let mut map = methods::Table::new();
let input = parse_property!(if flags >> 15, shortstr(input)?, map.insert("content-type", ShortString));
let input = parse_property!(if flags >> 14, shortstr(input)?, map.insert("content-encoding", ShortString));
let input =
parse_property!(if flags >> 13, table(input)?, map.insert("headers", FieldTable));
let input = parse_property!(if flags >> 12, octet(input)?, map.insert("delivery-mode", ShortShortUInt));
let input =
parse_property!(if flags >> 11, octet(input)?, map.insert("priority", ShortShortUInt));
let input = parse_property!(if flags >> 10, shortstr(input)?, map.insert("correlation-id", ShortString));
let input =
parse_property!(if flags >> 9, shortstr(input)?, map.insert("reply-to", ShortString));
let input =
parse_property!(if flags >> 8, shortstr(input)?, map.insert("expiration", ShortString));
let input =
parse_property!(if flags >> 7, shortstr(input)?, map.insert("message-id", ShortString));
let input =
parse_property!(if flags >> 6, timestamp(input)?, map.insert("timestamp", Timestamp));
let input =
parse_property!(if flags >> 5, shortstr(input)?, map.insert("type", ShortString));
let input =
parse_property!(if flags >> 4, shortstr(input)?, map.insert("user-id", ShortString));
let input =
parse_property!(if flags >> 3, shortstr(input)?, map.insert("app-id", ShortString));
let input =
parse_property!(if flags >> 2, shortstr(input)?, map.insert("reserved", ShortString));
Ok((input, map))
} }
pub fn header(input: &[u8]) -> IResult<'_, Box<ContentHeader>> { pub fn header(input: &[u8]) -> IResult<'_, Box<ContentHeader>> {
@ -101,6 +125,7 @@ mod content_header_parse {
// I do not quite understand this here. Apparently, there can be more than 15 flags? // I do not quite understand this here. Apparently, there can be more than 15 flags?
// But the Basic class only specifies 15, so idk. Don't care about this for now // But the Basic class only specifies 15, so idk. Don't care about this for now
// Todo: But probably later.
let (input, property_flags) = u16(Big)(input)?; let (input, property_flags) = u16(Big)(input)?;
let (input, property_fields) = basic_properties(property_flags, input)?; let (input, property_fields) = basic_properties(property_flags, input)?;

View file

@ -4,7 +4,7 @@ use rand::Rng;
use std::collections::HashMap; use std::collections::HashMap;
mod generated; mod generated;
mod parse_helper; pub mod parse_helper;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
mod write_helper; mod write_helper;

View file

@ -1,9 +1,25 @@
use crate::project_root; use crate::project_root;
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use std::process::Command; use std::path::Path;
use std::process::{Command, Stdio};
pub fn main() -> Result<()> { pub fn main() -> Result<()> {
let test_js_root = project_root().join("test-js"); let project_root = project_root();
let test_js_root = project_root.join("test-js");
let mut amqp_server = Command::new("cargo")
.arg("run")
.spawn()
.context("`cargo run` amqp")?;
let test_result = run_js(&test_js_root);
amqp_server.kill()?;
test_result
}
fn run_js(test_js_root: &Path) -> Result<()> {
let status = Command::new("yarn") let status = Command::new("yarn")
.current_dir(&test_js_root) .current_dir(&test_js_root)
.status() .status()
@ -16,9 +32,9 @@ pub fn main() -> Result<()> {
.current_dir(&test_js_root) .current_dir(&test_js_root)
.status() .status()
.context("yarn test tests")?; .context("yarn test tests")?;
if !status.success() { if !status.success() {
bail!("yarn tests failed"); bail!("yarn tests failed");
} }
Ok(()) Ok(())
} }