From c43126af1ff1ecdc62b80530569435ab1b57e428 Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Sat, 12 Feb 2022 18:54:58 +0100 Subject: [PATCH] more parser generation --- Cargo.lock | 68 ++ README.md | 3 +- amqp_codegen/Cargo.toml | 1 + amqp_codegen/src/main.rs | 79 +-- amqp_codegen/src/parser.rs | 133 ++++ amqp_transport/Cargo.toml | 5 + amqp_transport/src/classes/generated.rs | 720 ++++++++++++++++----- amqp_transport/src/classes/mod.rs | 28 +- amqp_transport/src/classes/parse_helper.rs | 56 ++ amqp_transport/src/frame.rs | 63 +- 10 files changed, 904 insertions(+), 252 deletions(-) create mode 100644 amqp_codegen/src/parser.rs create mode 100644 amqp_transport/src/classes/parse_helper.rs diff --git a/Cargo.lock b/Cargo.lock index dffc482..f512767 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "aho-corasick" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +dependencies = [ + "memchr", +] + [[package]] name = "amqp" version = "0.1.0" @@ -19,6 +28,7 @@ version = "0.1.0" dependencies = [ "anyhow", "heck", + "itertools", "strong-xml", ] @@ -27,7 +37,10 @@ name = "amqp_transport" version = "0.1.0" dependencies = [ "anyhow", + "nom", + "once_cell", "rand", + "regex", "thiserror", "tokio", "tracing", @@ -68,6 +81,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "either" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" + [[package]] name = "getrandom" version = "0.2.4" @@ -103,6 +122,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "itertools" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3" +dependencies = [ + "either", +] + [[package]] name = "jetscii" version = "0.5.1" @@ -145,6 +173,12 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "mio" version = "0.7.14" @@ -167,6 +201,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "nom" +version = "7.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109" +dependencies = [ + "memchr", + "minimal-lexical", + "version_check", +] + [[package]] name = "ntapi" version = "0.3.6" @@ -296,6 +341,23 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" + [[package]] name = "scopeguard" version = "1.1.0" @@ -496,6 +558,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + [[package]] name = "wasi" version = "0.10.2+wasi-snapshot-preview1" diff --git a/README.md b/README.md index 661bfcc..837c71c 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ # amqp https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf -https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf \ No newline at end of file + +https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf diff --git a/amqp_codegen/Cargo.toml b/amqp_codegen/Cargo.toml index 6c0d404..3f293f0 100644 --- a/amqp_codegen/Cargo.toml +++ b/amqp_codegen/Cargo.toml @@ -8,4 +8,5 @@ edition = "2021" [dependencies] anyhow = "1.0.53" heck = "0.4.0" +itertools = "0.10.3" strong-xml = "0.6.3" diff --git a/amqp_codegen/src/main.rs b/amqp_codegen/src/main.rs index 6158fba..9eb94c8 100644 --- a/amqp_codegen/src/main.rs +++ b/amqp_codegen/src/main.rs @@ -1,4 +1,7 @@ -use anyhow::{Context, Result}; +mod parser; + +use crate::parser::codegen_parser; +use anyhow::Result; use heck::ToUpperCamelCase; use std::fs; use strong_xml::XmlRead; @@ -81,12 +84,13 @@ fn main() -> Result<()> { } fn codegen(amqp: &Amqp) -> Result<()> { - println!("use std::collections::HashMap;\n"); - domain_defs(amqp)?; - class_defs(amqp) + println!("// This file has been generated by `amqp_codegen`. Do not edit it manually.\n"); + codegen_domain_defs(amqp)?; + codegen_class_defs(amqp)?; + codegen_parser(amqp) } -fn domain_defs(amqp: &Amqp) -> Result<()> { +fn codegen_domain_defs(amqp: &Amqp) -> Result<()> { for domain in &amqp.domains { let invariants = invariants(domain.asserts.iter()); @@ -94,7 +98,7 @@ fn domain_defs(amqp: &Amqp) -> Result<()> { println!("/// {invariants}"); } println!( - "type {} = {};\n", + "pub type {} = {};\n", domain.name.to_upper_camel_case(), amqp_type_to_rust_type(&domain.kind), ); @@ -103,7 +107,7 @@ fn domain_defs(amqp: &Amqp) -> Result<()> { Ok(()) } -fn class_defs(amqp: &Amqp) -> Result<()> { +fn codegen_class_defs(amqp: &Amqp) -> Result<()> { println!("pub enum Class {{"); for class in &amqp.classes { let class_name = class.name.to_upper_camel_case(); @@ -111,12 +115,6 @@ fn class_defs(amqp: &Amqp) -> Result<()> { } println!("}}\n"); - println!( - "pub enum TableValue {{ - - " - ); - for class in &amqp.classes { let enum_name = class.name.to_upper_camel_case(); println!("/// Index {}, handler = {}", class.index, class.handler); @@ -125,13 +123,12 @@ fn class_defs(amqp: &Amqp) -> Result<()> { let method_name = method.name.to_upper_camel_case(); println!(" /// Index {}", method.index); print!(" {method_name}"); - if method.fields.len() > 0 { + if !method.fields.is_empty() { println!(" {{"); for field in &method.fields { let field_name = snake_case(&field.name); let (field_type, field_docs) = resolve_type( - amqp, - &field.domain.as_ref().or(field.kind.as_ref()).unwrap(), + field.domain.as_ref().or(field.kind.as_ref()).unwrap(), field.asserts.as_ref(), )?; if !field_docs.is_empty() { @@ -150,7 +147,7 @@ fn class_defs(amqp: &Amqp) -> Result<()> { Ok(()) } -fn amqp_type_to_rust_type<'a>(amqp_type: &str) -> &'static str { +fn amqp_type_to_rust_type(amqp_type: &str) -> &'static str { match amqp_type { "octet" => "u8", "short" => "u16", @@ -159,37 +156,18 @@ fn amqp_type_to_rust_type<'a>(amqp_type: &str) -> &'static str { "bit" => "u8", "shortstr" | "longstr" => "String", "timestamp" => "u64", - "table" => "HashMap)>", + "table" => "super::Table", _ => unreachable!("invalid type {}", amqp_type), } } /// returns (type name, invariant docs) -fn resolve_type(amqp: &Amqp, domain: &str, asserts: &[Assert]) -> Result<(String, String)> { - let kind = amqp - .domains - .iter() - .find(|d| &d.name == domain) - .context("domain not found")?; - - let is_nonnull = is_nonnull(asserts.iter().chain(kind.asserts.iter())); - +fn resolve_type(domain: &str, asserts: &[Assert]) -> Result<(String, String)> { let additional_docs = invariants(asserts.iter()); let type_name = domain.to_upper_camel_case(); - Ok(( - if is_nonnull { - type_name - } else { - format!("Option<{type_name}>") - }, - additional_docs, - )) -} - -fn is_nonnull<'a>(mut asserts: impl Iterator) -> bool { - asserts.find(|assert| assert.check == "notnull").is_some() + Ok((type_name, additional_docs)) } fn snake_case(ident: &str) -> String { @@ -204,18 +182,17 @@ fn snake_case(ident: &str) -> String { fn invariants<'a>(asserts: impl Iterator) -> String { asserts - .filter_map(|assert| match &*assert.check { - "notnull" => None, - "length" => Some(format!( - "must be shorter than {}", - assert.value.as_ref().unwrap() - )), - "regexp" => Some(format!("must match `{}`", assert.value.as_ref().unwrap())), - "le" => Some(format!( - "must be less than the {} field of the method {}", - assert.method.as_ref().unwrap(), - assert.field.as_ref().unwrap() - )), + .map(|assert| match &*assert.check { + "notnull" => "must not be null".to_string(), + "length" => format!("must be shorter than {}", assert.value.as_ref().unwrap()), + "regexp" => format!("must match `{}`", assert.value.as_ref().unwrap()), + "le" => { + format!( + "must be less than the {} field of the method {}", + assert.method.as_ref().unwrap(), + assert.field.as_ref().unwrap() + ) + } _ => unimplemented!(), }) .collect::>() diff --git a/amqp_codegen/src/parser.rs b/amqp_codegen/src/parser.rs new file mode 100644 index 0000000..7e21e37 --- /dev/null +++ b/amqp_codegen/src/parser.rs @@ -0,0 +1,133 @@ +use crate::{Amqp, Class, Domain, Method}; +use anyhow::Result; +use heck::{ToSnakeCase, ToUpperCamelCase}; +use itertools::Itertools; + +fn method_function_name(class_name: &str) -> impl Fn(&Method) -> String + '_ { + move |method| { + let method_name = method.name.to_snake_case(); + format!("{class_name}_{method_name}") + } +} + +fn domain_function_name(domain_name: &str) -> String { + let domain_name = domain_name.to_snake_case(); + format!("domain_{domain_name}") +} + +pub(crate) fn codegen_parser(amqp: &Amqp) -> Result<()> { + println!( + "pub mod parse {{ +use super::*; +use crate::classes::parse_helper::*; +use crate::error::TransError; +use nom::{{branch::alt, bytes::complete::tag}}; +use regex::Regex; +use once_cell::sync::Lazy; + +pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; +" + ); + println!( + "pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err> {{ + alt(({}))(input) +}}", + amqp.classes + .iter() + .map(|class| class.name.to_snake_case()) + .join(", ") + ); + + for domain in &amqp.domains { + domain_parser(domain)?; + } + + for class in &amqp.classes { + let class_name = class.name.to_snake_case(); + + function(&class_name, "Class", || { + let class_index = class.index; + let all_methods = class + .methods + .iter() + .map(method_function_name(&class_name)) + .join(", "); + println!( + " let (input, _) = tag([{class_index}])(input)?; + alt(({all_methods}))(input)" + ); + + Ok(()) + })?; + + for method in &class.methods { + method_parser(class, method)?; + } + } + + println!("\n}}"); + Ok(()) +} + +fn domain_parser(domain: &Domain) -> Result<()> { + let fn_name = domain_function_name(&domain.name); + let type_name = domain.kind.to_snake_case(); + function(&fn_name, &domain.name.to_upper_camel_case(), || { + if domain.asserts.is_empty() { + if type_name == "bit" { + println!(" todo!() // bit") + } else { + println!(" {type_name}(input)"); + } + } else { + println!(" let (input, result) = {type_name}(input)?;"); + + for assert in &domain.asserts { + match &*assert.check { + "notnull" => { /* todo */ } + "regexp" => { + let value = assert.value.as_ref().unwrap(); + println!( + r#" static REGEX: Lazy = Lazy::new(|| Regex::new(r"{value}").unwrap());"# + ); + println!(" if !REGEX.is_match(&result) {{ fail!() }}"); + } + "le" => {} // can't validate this here + "length" => { + let length = assert.value.as_ref().unwrap(); + println!(" if result.len() > {length} {{ fail!() }}"); + } + _ => unimplemented!(), + } + } + println!(" Ok((input, result))"); + } + Ok(()) + }) +} + +fn method_parser(class: &Class, method: &Method) -> Result<()> { + let class_name = class.name.to_snake_case(); + + let function_name = method_function_name(&class_name)(method); + function(&function_name, "Class", || { + let method_index = method.index; + println!(" let (input, _) = tag([{method_index}])(input)?;"); + println!(" todo!()"); + for _field in &method.fields {} + Ok(()) + })?; + + Ok(()) +} + +fn function(name: &str, ret_ty: &str, body: F) -> Result<()> +where + F: FnOnce() -> Result<()>, +{ + println!("fn {name}(input: &[u8]) -> IResult<{ret_ty}> {{"); + body()?; + println!("}}"); + + Ok(()) +} diff --git a/amqp_transport/Cargo.toml b/amqp_transport/Cargo.toml index c596e00..c9fd802 100644 --- a/amqp_transport/Cargo.toml +++ b/amqp_transport/Cargo.toml @@ -7,9 +7,14 @@ edition = "2021" [dependencies] anyhow = "1.0.53" +nom = "7.1.0" +once_cell = "1.9.0" rand = "0.8.4" +regex = "1.5.4" thiserror = "1.0.30" tokio = { version = "1.16.1", features = ["full"] } tracing = "0.1.30" tracing-subscriber = "0.3.8" uuid = "0.8.2" + +[features] diff --git a/amqp_transport/src/classes/generated.rs b/amqp_transport/src/classes/generated.rs index 5346807..c08c4f6 100644 --- a/amqp_transport/src/classes/generated.rs +++ b/amqp_transport/src/classes/generated.rs @@ -1,55 +1,59 @@ -use std::collections::HashMap; +#![allow(unused_variables)] -type ClassId = u16; +// This file has been generated by `amqp_codegen`. Do not edit it manually. -type ConsumerTag = String; +pub type ClassId = u16; -type DeliveryTag = u64; +pub type ConsumerTag = String; + +pub type DeliveryTag = u64; /// must be shorter than 127, must match `^[a-zA-Z0-9-_.:]*$` -type ExchangeName = String; +pub type ExchangeName = String; -type MethodId = u16; +pub type MethodId = u16; -type NoAck = u8; +pub type NoAck = u8; -type NoLocal = u8; +pub type NoLocal = u8; -type NoWait = u8; +pub type NoWait = u8; -/// must be shorter than 127 -type Path = String; +/// must not be null, must be shorter than 127 +pub type Path = String; -type PeerProperties = HashMap)>; +pub type PeerProperties = super::Table; /// must be shorter than 127, must match `^[a-zA-Z0-9-_.:]*$` -type QueueName = String; +pub type QueueName = String; -type Redelivered = u8; +pub type Redelivered = u8; -type MessageCount = u32; +pub type MessageCount = u32; -type ReplyCode = u16; +/// must not be null +pub type ReplyCode = u16; -type ReplyText = String; +/// must not be null +pub type ReplyText = String; -type Bit = u8; +pub type Bit = u8; -type Octet = u8; +pub type Octet = u8; -type Short = u16; +pub type Short = u16; -type Long = u32; +pub type Long = u32; -type Longlong = u64; +pub type Longlong = u64; -type Shortstr = String; +pub type Shortstr = String; -type Longstr = String; +pub type Longstr = String; -type Timestamp = u64; +pub type Timestamp = u64; -type Table = HashMap)>; +pub type Table = super::Table; pub enum Class { Connection(Connection), @@ -64,90 +68,82 @@ pub enum Class { pub enum Connection { /// Index 10 Start { - version_major: Option, - version_minor: Option, - server_properties: Option, + version_major: Octet, + version_minor: Octet, + server_properties: PeerProperties, + /// must not be null mechanisms: Longstr, + /// must not be null locales: Longstr, }, /// Index 11 StartOk { - client_properties: Option, + client_properties: PeerProperties, + /// must not be null mechanism: Shortstr, + /// must not be null response: Longstr, + /// must not be null locale: Shortstr, }, /// Index 20 - Secure { - challenge: Option, - }, + Secure { challenge: Longstr }, /// Index 21 SecureOk { + /// must not be null response: Longstr, }, /// Index 30 Tune { - channel_max: Option, - frame_max: Option, - heartbeat: Option, + channel_max: Short, + frame_max: Long, + heartbeat: Short, }, /// Index 31 TuneOk { - /// must be less than the tune field of the method channel-max + /// must not be null, must be less than the tune field of the method channel-max channel_max: Short, - frame_max: Option, - heartbeat: Option, + frame_max: Long, + heartbeat: Short, }, /// Index 40 Open { virtual_host: Path, - reserved_1: Option, - reserved_2: Option, + reserved_1: Shortstr, + reserved_2: Bit, }, /// Index 41 - OpenOk { - reserved_1: Option, - }, + OpenOk { reserved_1: Shortstr }, /// Index 50 Close { reply_code: ReplyCode, reply_text: ReplyText, - class_id: Option, - method_id: Option, + class_id: ClassId, + method_id: MethodId, }, /// Index 51 CloseOk, /// Index 60 - Blocked { - reason: Option, - }, + Blocked { reason: Shortstr }, /// Index 61 Unblocked, } /// Index 20, handler = channel pub enum Channel { /// Index 10 - Open { - reserved_1: Option, - }, + Open { reserved_1: Shortstr }, /// Index 11 - OpenOk { - reserved_1: Option, - }, + OpenOk { reserved_1: Longstr }, /// Index 20 - Flow { - active: Option, - }, + Flow { active: Bit }, /// Index 21 - FlowOk { - active: Option, - }, + FlowOk { active: Bit }, /// Index 40 Close { reply_code: ReplyCode, reply_text: ReplyText, - class_id: Option, - method_id: Option, + class_id: ClassId, + method_id: MethodId, }, /// Index 41 CloseOk, @@ -156,24 +152,26 @@ pub enum Channel { pub enum Exchange { /// Index 10 Declare { - reserved_1: Option, + reserved_1: Short, + /// must not be null exchange: ExchangeName, - r#type: Option, - passive: Option, - durable: Option, - reserved_2: Option, - reserved_3: Option, - no_wait: Option, - arguments: Option, + r#type: Shortstr, + passive: Bit, + durable: Bit, + reserved_2: Bit, + reserved_3: Bit, + no_wait: NoWait, + arguments: Table, }, /// Index 11 DeclareOk, /// Index 20 Delete { - reserved_1: Option, + reserved_1: Short, + /// must not be null exchange: ExchangeName, - if_unused: Option, - no_wait: Option, + if_unused: Bit, + no_wait: NoWait, }, /// Index 21 DeleteOk, @@ -182,158 +180,145 @@ pub enum Exchange { pub enum Queue { /// Index 10 Declare { - reserved_1: Option, - queue: Option, - passive: Option, - durable: Option, - exclusive: Option, - auto_delete: Option, - no_wait: Option, - arguments: Option
, + reserved_1: Short, + queue: QueueName, + passive: Bit, + durable: Bit, + exclusive: Bit, + auto_delete: Bit, + no_wait: NoWait, + arguments: Table, }, /// Index 11 DeclareOk { + /// must not be null queue: QueueName, - message_count: Option, - consumer_count: Option, + message_count: MessageCount, + consumer_count: Long, }, /// Index 20 Bind { - reserved_1: Option, - queue: Option, - exchange: Option, - routing_key: Option, - no_wait: Option, - arguments: Option
, + reserved_1: Short, + queue: QueueName, + exchange: ExchangeName, + routing_key: Shortstr, + no_wait: NoWait, + arguments: Table, }, /// Index 21 BindOk, /// Index 50 Unbind { - reserved_1: Option, - queue: Option, - exchange: Option, - routing_key: Option, - arguments: Option
, + reserved_1: Short, + queue: QueueName, + exchange: ExchangeName, + routing_key: Shortstr, + arguments: Table, }, /// Index 51 UnbindOk, /// Index 30 Purge { - reserved_1: Option, - queue: Option, - no_wait: Option, + reserved_1: Short, + queue: QueueName, + no_wait: NoWait, }, /// Index 31 - PurgeOk { - message_count: Option, - }, + PurgeOk { message_count: MessageCount }, /// Index 40 Delete { - reserved_1: Option, - queue: Option, - if_unused: Option, - if_empty: Option, - no_wait: Option, + reserved_1: Short, + queue: QueueName, + if_unused: Bit, + if_empty: Bit, + no_wait: NoWait, }, /// Index 41 - DeleteOk { - message_count: Option, - }, + DeleteOk { message_count: MessageCount }, } /// Index 60, handler = channel pub enum Basic { /// Index 10 Qos { - prefetch_size: Option, - prefetch_count: Option, - global: Option, + prefetch_size: Long, + prefetch_count: Short, + global: Bit, }, /// Index 11 QosOk, /// Index 20 Consume { - reserved_1: Option, - queue: Option, - consumer_tag: Option, - no_local: Option, - no_ack: Option, - exclusive: Option, - no_wait: Option, - arguments: Option
, + reserved_1: Short, + queue: QueueName, + consumer_tag: ConsumerTag, + no_local: NoLocal, + no_ack: NoAck, + exclusive: Bit, + no_wait: NoWait, + arguments: Table, }, /// Index 21 - ConsumeOk { - consumer_tag: Option, - }, + ConsumeOk { consumer_tag: ConsumerTag }, /// Index 30 Cancel { - consumer_tag: Option, - no_wait: Option, + consumer_tag: ConsumerTag, + no_wait: NoWait, }, /// Index 31 - CancelOk { - consumer_tag: Option, - }, + CancelOk { consumer_tag: ConsumerTag }, /// Index 40 Publish { - reserved_1: Option, - exchange: Option, - routing_key: Option, - mandatory: Option, - immediate: Option, + reserved_1: Short, + exchange: ExchangeName, + routing_key: Shortstr, + mandatory: Bit, + immediate: Bit, }, /// Index 50 Return { reply_code: ReplyCode, reply_text: ReplyText, - exchange: Option, - routing_key: Option, + exchange: ExchangeName, + routing_key: Shortstr, }, /// Index 60 Deliver { - consumer_tag: Option, - delivery_tag: Option, - redelivered: Option, - exchange: Option, - routing_key: Option, + consumer_tag: ConsumerTag, + delivery_tag: DeliveryTag, + redelivered: Redelivered, + exchange: ExchangeName, + routing_key: Shortstr, }, /// Index 70 Get { - reserved_1: Option, - queue: Option, - no_ack: Option, + reserved_1: Short, + queue: QueueName, + no_ack: NoAck, }, /// Index 71 GetOk { - delivery_tag: Option, - redelivered: Option, - exchange: Option, - routing_key: Option, - message_count: Option, + delivery_tag: DeliveryTag, + redelivered: Redelivered, + exchange: ExchangeName, + routing_key: Shortstr, + message_count: MessageCount, }, /// Index 72 - GetEmpty { - reserved_1: Option, - }, + GetEmpty { reserved_1: Shortstr }, /// Index 80 Ack { - delivery_tag: Option, - multiple: Option, + delivery_tag: DeliveryTag, + multiple: Bit, }, /// Index 90 Reject { - delivery_tag: Option, - requeue: Option, + delivery_tag: DeliveryTag, + requeue: Bit, }, /// Index 100 - RecoverAsync { - requeue: Option, - }, + RecoverAsync { requeue: Bit }, /// Index 110 - Recover { - requeue: Option, - }, + Recover { requeue: Bit }, /// Index 111 RecoverOk, } @@ -352,3 +337,416 @@ pub enum Tx { /// Index 31 RollbackOk, } +pub mod parse { + use super::*; + use crate::classes::parse_helper::*; + use crate::error::TransError; + use nom::{branch::alt, bytes::complete::tag}; + use once_cell::sync::Lazy; + use regex::Regex; + + pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; + + pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err> { + alt((connection, channel, exchange, queue, basic, tx))(input) + } + fn domain_class_id(input: &[u8]) -> IResult { + short(input) + } + fn domain_consumer_tag(input: &[u8]) -> IResult { + shortstr(input) + } + fn domain_delivery_tag(input: &[u8]) -> IResult { + longlong(input) + } + fn domain_exchange_name(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.len() > 127 { + fail!() + } + static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); + if !REGEX.is_match(&result) { + fail!() + } + Ok((input, result)) + } + fn domain_method_id(input: &[u8]) -> IResult { + short(input) + } + fn domain_no_ack(input: &[u8]) -> IResult { + todo!() // bit + } + fn domain_no_local(input: &[u8]) -> IResult { + todo!() // bit + } + fn domain_no_wait(input: &[u8]) -> IResult { + todo!() // bit + } + fn domain_path(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.len() > 127 { + fail!() + } + Ok((input, result)) + } + fn domain_peer_properties(input: &[u8]) -> IResult { + table(input) + } + fn domain_queue_name(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.len() > 127 { + fail!() + } + static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); + if !REGEX.is_match(&result) { + fail!() + } + Ok((input, result)) + } + fn domain_redelivered(input: &[u8]) -> IResult { + todo!() // bit + } + fn domain_message_count(input: &[u8]) -> IResult { + long(input) + } + fn domain_reply_code(input: &[u8]) -> IResult { + let (input, result) = short(input)?; + Ok((input, result)) + } + fn domain_reply_text(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + Ok((input, result)) + } + fn domain_bit(input: &[u8]) -> IResult { + todo!() // bit + } + fn domain_octet(input: &[u8]) -> IResult { + octet(input) + } + fn domain_short(input: &[u8]) -> IResult { + short(input) + } + fn domain_long(input: &[u8]) -> IResult { + long(input) + } + fn domain_longlong(input: &[u8]) -> IResult { + longlong(input) + } + fn domain_shortstr(input: &[u8]) -> IResult { + shortstr(input) + } + fn domain_longstr(input: &[u8]) -> IResult { + longstr(input) + } + fn domain_timestamp(input: &[u8]) -> IResult { + timestamp(input) + } + fn domain_table(input: &[u8]) -> IResult
{ + table(input) + } + fn connection(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + alt(( + connection_start, + connection_start_ok, + connection_secure, + connection_secure_ok, + connection_tune, + connection_tune_ok, + connection_open, + connection_open_ok, + connection_close, + connection_close_ok, + connection_blocked, + connection_unblocked, + ))(input) + } + fn connection_start(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + todo!() + } + fn connection_start_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + todo!() + } + fn connection_secure(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + todo!() + } + fn connection_secure_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + todo!() + } + fn connection_tune(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + todo!() + } + fn connection_tune_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + todo!() + } + fn connection_open(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + todo!() + } + fn connection_open_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + todo!() + } + fn connection_close(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + todo!() + } + fn connection_close_ok(input: &[u8]) -> IResult { + let (input, _) = tag([51])(input)?; + todo!() + } + fn connection_blocked(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + todo!() + } + fn connection_unblocked(input: &[u8]) -> IResult { + let (input, _) = tag([61])(input)?; + todo!() + } + fn channel(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + alt(( + channel_open, + channel_open_ok, + channel_flow, + channel_flow_ok, + channel_close, + channel_close_ok, + ))(input) + } + fn channel_open(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + todo!() + } + fn channel_open_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + todo!() + } + fn channel_flow(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + todo!() + } + fn channel_flow_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + todo!() + } + fn channel_close(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + todo!() + } + fn channel_close_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + todo!() + } + fn exchange(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + alt(( + exchange_declare, + exchange_declare_ok, + exchange_delete, + exchange_delete_ok, + ))(input) + } + fn exchange_declare(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + todo!() + } + fn exchange_declare_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + todo!() + } + fn exchange_delete(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + todo!() + } + fn exchange_delete_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + todo!() + } + fn queue(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + alt(( + queue_declare, + queue_declare_ok, + queue_bind, + queue_bind_ok, + queue_unbind, + queue_unbind_ok, + queue_purge, + queue_purge_ok, + queue_delete, + queue_delete_ok, + ))(input) + } + fn queue_declare(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + todo!() + } + fn queue_declare_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + todo!() + } + fn queue_bind(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + todo!() + } + fn queue_bind_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + todo!() + } + fn queue_unbind(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + todo!() + } + fn queue_unbind_ok(input: &[u8]) -> IResult { + let (input, _) = tag([51])(input)?; + todo!() + } + fn queue_purge(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + todo!() + } + fn queue_purge_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + todo!() + } + fn queue_delete(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + todo!() + } + fn queue_delete_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + todo!() + } + fn basic(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + alt(( + basic_qos, + basic_qos_ok, + basic_consume, + basic_consume_ok, + basic_cancel, + basic_cancel_ok, + basic_publish, + basic_return, + basic_deliver, + basic_get, + basic_get_ok, + basic_get_empty, + basic_ack, + basic_reject, + basic_recover_async, + basic_recover, + basic_recover_ok, + ))(input) + } + fn basic_qos(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + todo!() + } + fn basic_qos_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + todo!() + } + fn basic_consume(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + todo!() + } + fn basic_consume_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + todo!() + } + fn basic_cancel(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + todo!() + } + fn basic_cancel_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + todo!() + } + fn basic_publish(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + todo!() + } + fn basic_return(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + todo!() + } + fn basic_deliver(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + todo!() + } + fn basic_get(input: &[u8]) -> IResult { + let (input, _) = tag([70])(input)?; + todo!() + } + fn basic_get_ok(input: &[u8]) -> IResult { + let (input, _) = tag([71])(input)?; + todo!() + } + fn basic_get_empty(input: &[u8]) -> IResult { + let (input, _) = tag([72])(input)?; + todo!() + } + fn basic_ack(input: &[u8]) -> IResult { + let (input, _) = tag([80])(input)?; + todo!() + } + fn basic_reject(input: &[u8]) -> IResult { + let (input, _) = tag([90])(input)?; + todo!() + } + fn basic_recover_async(input: &[u8]) -> IResult { + let (input, _) = tag([100])(input)?; + todo!() + } + fn basic_recover(input: &[u8]) -> IResult { + let (input, _) = tag([110])(input)?; + todo!() + } + fn basic_recover_ok(input: &[u8]) -> IResult { + let (input, _) = tag([111])(input)?; + todo!() + } + fn tx(input: &[u8]) -> IResult { + let (input, _) = tag([90])(input)?; + alt(( + tx_select, + tx_select_ok, + tx_commit, + tx_commit_ok, + tx_rollback, + tx_rollback_ok, + ))(input) + } + fn tx_select(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + todo!() + } + fn tx_select_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + todo!() + } + fn tx_commit(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + todo!() + } + fn tx_commit_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + todo!() + } + fn tx_rollback(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + todo!() + } + fn tx_rollback_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + todo!() + } +} diff --git a/amqp_transport/src/classes/mod.rs b/amqp_transport/src/classes/mod.rs index 915cf7d..45b5129 100644 --- a/amqp_transport/src/classes/mod.rs +++ b/amqp_transport/src/classes/mod.rs @@ -1,3 +1,27 @@ -mod generated; +use std::collections::HashMap; -static TABLE_VALUES: &[(char, &str)] = &[[]]; +mod generated; +mod parse_helper; + +pub type Table = HashMap; + +pub enum FieldValue { + Boolean(bool), + ShortShortInt(i8), + ShortShortUInt(u8), + ShortInt(i16), + ShortUInt(u16), + LongInt(i32), + LongUInt(u32), + LongLongInt(i64), + LongLongUInt(u64), + Float(f32), + Double(f64), + DecimalValue(u8, u32), + ShortString(String), + LongString(String), + FieldArray(Vec), + Timestamp(u64), + FieldTable(Table), + Void, +} diff --git a/amqp_transport/src/classes/parse_helper.rs b/amqp_transport/src/classes/parse_helper.rs new file mode 100644 index 0000000..fef4a88 --- /dev/null +++ b/amqp_transport/src/classes/parse_helper.rs @@ -0,0 +1,56 @@ +use crate::classes::generated::parse::IResult; +use crate::classes::generated::{ + Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp, +}; +use crate::error::{ConException, ProtocolError, TransError}; +use nom::error::ErrorKind; + +impl nom::error::ParseError<&[u8]> for TransError { + fn from_error_kind(_input: &[u8], _kind: ErrorKind) -> Self { + ProtocolError::ConException(ConException::SyntaxError).into() + } + + fn append(_input: &[u8], _kind: ErrorKind, other: Self) -> Self { + other + } +} + +#[macro_export] +macro_rules! fail { + () => { + return Err(nom::Err::Failure( + crate::error::ProtocolError::ConException(crate::error::ConException::SyntaxError) + .into(), + )) + }; +} + +pub use fail; + +pub fn octet(input: &[u8]) -> IResult { + todo!() +} +pub fn short(input: &[u8]) -> IResult { + todo!() +} +pub fn long(input: &[u8]) -> IResult { + todo!() +} +pub fn longlong(input: &[u8]) -> IResult { + todo!() +} +pub fn bit(input: &[u8], amount: u8) -> IResult> { + todo!() +} +pub fn shortstr(input: &[u8]) -> IResult { + todo!() +} +pub fn longstr(input: &[u8]) -> IResult { + todo!() +} +pub fn timestamp(input: &[u8]) -> IResult { + todo!() +} +pub fn table(input: &[u8]) -> IResult
{ + todo!() +} diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs index 6ae2042..d1b2b88 100644 --- a/amqp_transport/src/frame.rs +++ b/amqp_transport/src/frame.rs @@ -21,9 +21,18 @@ pub struct Frame { } #[derive(Debug, Clone, PartialEq, Eq)] +#[repr(u8)] pub enum FrameType { + Method = 1, + Header = 2, + Body = 3, + Heartbeat = 8, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FrameTypeEnum { /// 1 - Method { class_id: u16, method_id: u16 }, + Method, /// 2 Header { class_id: u16, @@ -33,7 +42,7 @@ pub enum FrameType { }, /// 3 Body, - /// 4 + /// 8 Heartbeat, } @@ -58,7 +67,7 @@ where return Err(ProtocolError::ConException(ConException::FrameError).into()); } - let kind = parse_frame_type(kind, &payload, channel)?; + let kind = parse_frame_type(kind, channel)?; Ok(Frame { kind, @@ -67,34 +76,10 @@ where }) } -fn parse_frame_type(kind: u8, payload: &[u8], channel: u16) -> Result { +fn parse_frame_type(kind: u8, channel: u16) -> Result { match kind { - frame_type::METHOD => { - let class_id = u16::from_be_bytes(payload[0..2].try_into().unwrap()); - let method_id = u16::from_be_bytes(payload[2..4].try_into().unwrap()); - - Ok(FrameType::Method { - class_id, - method_id, - }) - } - frame_type::HEADER => { - let class_id = u16::from_be_bytes(payload[0..2].try_into().unwrap()); - let weight = u16::from_be_bytes(payload[2..4].try_into().unwrap()); - // weight is unused and must always be 0 - if weight != 0 { - return Err(ProtocolError::ConException(ConException::FrameError).into()); - } - - let body_size = u64::from_be_bytes(payload[4..12].try_into().unwrap()); - let property_flags = u16::from_be_bytes(payload[12..14].try_into().unwrap()); - - Ok(FrameType::Header { - class_id, - body_size, - property_flags, - }) - } + frame_type::METHOD => Ok(FrameType::Method), + frame_type::HEADER => Ok(FrameType::Header), frame_type::BODY => Ok(FrameType::Body), frame_type::HEARTBEAT => { if channel != 0 { @@ -103,7 +88,7 @@ fn parse_frame_type(kind: u8, payload: &[u8], channel: u16) -> Result todo!(), + _ => Err(ProtocolError::ConException(ConException::FrameError).into()), } } @@ -114,17 +99,22 @@ mod tests { #[tokio::test] async fn read_small_body() { let mut bytes: &[u8] = &[ - /*type*/ 1, - /*channel*/ 0, + /*type*/ + 1, + /*channel*/ + 0, + 0, + /*size*/ 0, - /*size*/ 0, 0, 0, 3, - /*payload*/ 1, + /*payload*/ + 1, 2, 3, - /*frame-end*/ super::REQUIRED_FRAME_END, + /*frame-end*/ + super::REQUIRED_FRAME_END, ]; let frame = super::read_frame(&mut bytes, 10000).await.unwrap(); @@ -133,7 +123,6 @@ mod tests { Frame { kind: FrameType::Method, channel: 0, - size: 3, payload: vec![1, 2, 3], } );