diff --git a/Cargo.lock b/Cargo.lock index f512767..6efc24c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,7 +26,6 @@ dependencies = [ name = "amqp_codegen" version = "0.1.0" dependencies = [ - "anyhow", "heck", "itertools", "strong-xml", diff --git a/amqp_codegen/Cargo.toml b/amqp_codegen/Cargo.toml index 3f293f0..56fd914 100644 --- a/amqp_codegen/Cargo.toml +++ b/amqp_codegen/Cargo.toml @@ -6,7 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -anyhow = "1.0.53" heck = "0.4.0" itertools = "0.10.3" strong-xml = "0.6.3" + +[features] diff --git a/amqp_codegen/README.md b/amqp_codegen/README.md new file mode 100644 index 0000000..ca5a61c --- /dev/null +++ b/amqp_codegen/README.md @@ -0,0 +1,3 @@ +codegen for method serialization/deserialization + +run using `cargo r > ../amqp_transport/src/classes/generated.rs` diff --git a/amqp_codegen/src/main.rs b/amqp_codegen/src/main.rs index 9eb94c8..5f0cb35 100644 --- a/amqp_codegen/src/main.rs +++ b/amqp_codegen/src/main.rs @@ -1,7 +1,6 @@ mod parser; use crate::parser::codegen_parser; -use anyhow::Result; use heck::ToUpperCamelCase; use std::fs; use strong_xml::XmlRead; @@ -76,21 +75,21 @@ struct Field { asserts: Vec, } -fn main() -> Result<()> { +fn main() { let content = fs::read_to_string("./amqp-0-9-1.xml").unwrap(); - let amqp = Amqp::from_str(&content)?; - codegen(&amqp) + let amqp = Amqp::from_str(&content).unwrap(); + codegen(&amqp); } -fn codegen(amqp: &Amqp) -> Result<()> { +fn codegen(amqp: &Amqp) { println!("// This file has been generated by `amqp_codegen`. Do not edit it manually.\n"); - codegen_domain_defs(amqp)?; - codegen_class_defs(amqp)?; - codegen_parser(amqp) + codegen_domain_defs(amqp); + codegen_class_defs(amqp); + codegen_parser(amqp); } -fn codegen_domain_defs(amqp: &Amqp) -> Result<()> { +fn codegen_domain_defs(amqp: &Amqp) { for domain in &amqp.domains { let invariants = invariants(domain.asserts.iter()); @@ -103,11 +102,10 @@ fn codegen_domain_defs(amqp: &Amqp) -> Result<()> { amqp_type_to_rust_type(&domain.kind), ); } - - Ok(()) } -fn codegen_class_defs(amqp: &Amqp) -> Result<()> { +fn codegen_class_defs(amqp: &Amqp) { + println!("#[derive(Debug, Clone, PartialEq)]"); println!("pub enum Class {{"); for class in &amqp.classes { let class_name = class.name.to_upper_camel_case(); @@ -118,6 +116,7 @@ fn codegen_class_defs(amqp: &Amqp) -> Result<()> { for class in &amqp.classes { let enum_name = class.name.to_upper_camel_case(); println!("/// Index {}, handler = {}", class.index, class.handler); + println!("#[derive(Debug, Clone, PartialEq)]"); println!("pub enum {enum_name} {{"); for method in &class.methods { let method_name = method.name.to_upper_camel_case(); @@ -127,10 +126,8 @@ fn codegen_class_defs(amqp: &Amqp) -> Result<()> { println!(" {{"); for field in &method.fields { let field_name = snake_case(&field.name); - let (field_type, field_docs) = resolve_type( - field.domain.as_ref().or(field.kind.as_ref()).unwrap(), - field.asserts.as_ref(), - )?; + let (field_type, field_docs) = + get_invariants_with_type(field_type(field), field.asserts.as_ref()); if !field_docs.is_empty() { println!(" /// {field_docs}"); } @@ -143,8 +140,6 @@ fn codegen_class_defs(amqp: &Amqp) -> Result<()> { } println!("}}"); } - - Ok(()) } fn amqp_type_to_rust_type(amqp_type: &str) -> &'static str { @@ -161,13 +156,25 @@ fn amqp_type_to_rust_type(amqp_type: &str) -> &'static str { } } +fn field_type(field: &Field) -> &String { + field.domain.as_ref().or(field.kind.as_ref()).unwrap() +} + +fn resolve_type_from_domain(amqp: &Amqp, domain: &str) -> String { + amqp.domains + .iter() + .find(|d| d.name == domain) + .map(|d| d.kind.clone()) + .unwrap() +} + /// returns (type name, invariant docs) -fn resolve_type(domain: &str, asserts: &[Assert]) -> Result<(String, String)> { +fn get_invariants_with_type(domain: &str, asserts: &[Assert]) -> (String, String) { let additional_docs = invariants(asserts.iter()); let type_name = domain.to_upper_camel_case(); - Ok((type_name, additional_docs)) + (type_name, additional_docs) } fn snake_case(ident: &str) -> String { diff --git a/amqp_codegen/src/parser.rs b/amqp_codegen/src/parser.rs index 7e21e37..79c4a63 100644 --- a/amqp_codegen/src/parser.rs +++ b/amqp_codegen/src/parser.rs @@ -1,5 +1,6 @@ -use crate::{Amqp, Class, Domain, Method}; -use anyhow::Result; +use crate::{ + field_type, resolve_type_from_domain, snake_case, Amqp, Assert, Class, Domain, Method, +}; use heck::{ToSnakeCase, ToUpperCamelCase}; use itertools::Itertools; @@ -15,7 +16,7 @@ fn domain_function_name(domain_name: &str) -> String { format!("domain_{domain_name}") } -pub(crate) fn codegen_parser(amqp: &Amqp) -> Result<()> { +pub(crate) fn codegen_parser(amqp: &Amqp) { println!( "pub mod parse {{ use super::*; @@ -39,7 +40,7 @@ pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; ); for domain in &amqp.domains { - domain_parser(domain)?; + domain_parser(domain); } for class in &amqp.classes { @@ -56,78 +57,120 @@ pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; " let (input, _) = tag([{class_index}])(input)?; alt(({all_methods}))(input)" ); - - Ok(()) - })?; + }); for method in &class.methods { - method_parser(class, method)?; + method_parser(amqp, class, method); } } println!("\n}}"); - Ok(()) } -fn domain_parser(domain: &Domain) -> Result<()> { +fn domain_parser(domain: &Domain) { let fn_name = domain_function_name(&domain.name); let type_name = domain.kind.to_snake_case(); - function(&fn_name, &domain.name.to_upper_camel_case(), || { - if domain.asserts.is_empty() { - if type_name == "bit" { - println!(" todo!() // bit") - } else { + // don't even bother with bit domains, do them manually at call site + if type_name != "bit" { + function(&fn_name, &domain.name.to_upper_camel_case(), || { + if domain.asserts.is_empty() { println!(" {type_name}(input)"); - } - } else { - println!(" let (input, result) = {type_name}(input)?;"); + } else { + println!(" let (input, result) = {type_name}(input)?;"); - for assert in &domain.asserts { - match &*assert.check { - "notnull" => { /* todo */ } - "regexp" => { - let value = assert.value.as_ref().unwrap(); - println!( - r#" static REGEX: Lazy = Lazy::new(|| Regex::new(r"{value}").unwrap());"# - ); - println!(" if !REGEX.is_match(&result) {{ fail!() }}"); - } - "le" => {} // can't validate this here - "length" => { - let length = assert.value.as_ref().unwrap(); - println!(" if result.len() > {length} {{ fail!() }}"); - } - _ => unimplemented!(), + for assert in &domain.asserts { + assert_check(assert, &type_name, "result"); } + println!(" Ok((input, result))"); } - println!(" Ok((input, result))"); - } - Ok(()) - }) + }); + } } -fn method_parser(class: &Class, method: &Method) -> Result<()> { +fn method_parser(amqp: &Amqp, class: &Class, method: &Method) { let class_name = class.name.to_snake_case(); let function_name = method_function_name(&class_name)(method); function(&function_name, "Class", || { let method_index = method.index; println!(" let (input, _) = tag([{method_index}])(input)?;"); - println!(" todo!()"); - for _field in &method.fields {} - Ok(()) - })?; + let mut iter = method.fields.iter().peekable(); + while let Some(field) = iter.next() { + let type_name = resolve_type_from_domain(amqp, field_type(field)); - Ok(()) + if type_name == "bit" { + let mut fields_with_bit = vec![field]; + + loop { + if iter + .peek() + .map(|f| resolve_type_from_domain(amqp, field_type(f)) == "bit") + .unwrap_or(false) + { + fields_with_bit.push(iter.next().unwrap()); + } else { + break; + } + } + + let amount = fields_with_bit.len(); + println!(" let (input, bits) = bit(input, {amount})?;"); + + for (i, field) in fields_with_bit.iter().enumerate() { + let field_name = snake_case(&field.name); + println!(" let {field_name} = bits[{i}];"); + } + } else { + let fn_name = domain_function_name(field_type(field)); + let field_name = snake_case(&field.name); + println!(" let (input, {field_name}) = {fn_name}(input)?;"); + + for assert in &field.asserts { + assert_check(assert, &type_name, &field_name); + } + } + } + let class_name = class_name.to_upper_camel_case(); + let method_name = method.name.to_upper_camel_case(); + println!(" Ok((input, Class::{class_name}({class_name}::{method_name} {{"); + for field in &method.fields { + let field_name = snake_case(&field.name); + println!(" {field_name},"); + } + println!(" }})))"); + }); } -fn function(name: &str, ret_ty: &str, body: F) -> Result<()> +fn assert_check(assert: &Assert, type_name: &str, var_name: &str) { + match &*assert.check { + "notnull" => match type_name { + "shortstr" | "longstr" => { + println!(" if {var_name}.is_empty() {{ fail!() }}") + } + "short" => println!(" if {var_name} == 0 {{ fail!() }}"), + _ => unimplemented!(), + }, + "regexp" => { + let value = assert.value.as_ref().unwrap(); + println!( + r#" static REGEX: Lazy = Lazy::new(|| Regex::new(r"{value}").unwrap());"# + ); + println!(" if !REGEX.is_match(&{var_name}) {{ fail!() }}"); + } + "le" => {} // can't validate this here + "length" => { + let length = assert.value.as_ref().unwrap(); + println!(" if {var_name}.len() > {length} {{ fail!() }}"); + } + _ => unimplemented!(), + } +} + +fn function(name: &str, ret_ty: &str, body: F) where - F: FnOnce() -> Result<()>, + F: FnOnce(), { println!("fn {name}(input: &[u8]) -> IResult<{ret_ty}> {{"); - body()?; + body(); println!("}}"); - - Ok(()) } diff --git a/amqp_transport/src/classes/generated.rs b/amqp_transport/src/classes/generated.rs index c08c4f6..b0f50a4 100644 --- a/amqp_transport/src/classes/generated.rs +++ b/amqp_transport/src/classes/generated.rs @@ -1,5 +1,3 @@ -#![allow(unused_variables)] - // This file has been generated by `amqp_codegen`. Do not edit it manually. pub type ClassId = u16; @@ -55,6 +53,7 @@ pub type Timestamp = u64; pub type Table = super::Table; +#[derive(Debug, Clone, PartialEq)] pub enum Class { Connection(Connection), Channel(Channel), @@ -65,6 +64,7 @@ pub enum Class { } /// Index 10, handler = connection +#[derive(Debug, Clone, PartialEq)] pub enum Connection { /// Index 10 Start { @@ -87,7 +87,9 @@ pub enum Connection { locale: Shortstr, }, /// Index 20 - Secure { challenge: Longstr }, + Secure { + challenge: Longstr, + }, /// Index 21 SecureOk { /// must not be null @@ -113,7 +115,9 @@ pub enum Connection { reserved_2: Bit, }, /// Index 41 - OpenOk { reserved_1: Shortstr }, + OpenOk { + reserved_1: Shortstr, + }, /// Index 50 Close { reply_code: ReplyCode, @@ -124,20 +128,31 @@ pub enum Connection { /// Index 51 CloseOk, /// Index 60 - Blocked { reason: Shortstr }, + Blocked { + reason: Shortstr, + }, /// Index 61 Unblocked, } /// Index 20, handler = channel +#[derive(Debug, Clone, PartialEq)] pub enum Channel { /// Index 10 - Open { reserved_1: Shortstr }, + Open { + reserved_1: Shortstr, + }, /// Index 11 - OpenOk { reserved_1: Longstr }, + OpenOk { + reserved_1: Longstr, + }, /// Index 20 - Flow { active: Bit }, + Flow { + active: Bit, + }, /// Index 21 - FlowOk { active: Bit }, + FlowOk { + active: Bit, + }, /// Index 40 Close { reply_code: ReplyCode, @@ -149,6 +164,7 @@ pub enum Channel { CloseOk, } /// Index 40, handler = channel +#[derive(Debug, Clone, PartialEq)] pub enum Exchange { /// Index 10 Declare { @@ -177,6 +193,7 @@ pub enum Exchange { DeleteOk, } /// Index 50, handler = channel +#[derive(Debug, Clone, PartialEq)] pub enum Queue { /// Index 10 Declare { @@ -224,7 +241,9 @@ pub enum Queue { no_wait: NoWait, }, /// Index 31 - PurgeOk { message_count: MessageCount }, + PurgeOk { + message_count: MessageCount, + }, /// Index 40 Delete { reserved_1: Short, @@ -234,9 +253,12 @@ pub enum Queue { no_wait: NoWait, }, /// Index 41 - DeleteOk { message_count: MessageCount }, + DeleteOk { + message_count: MessageCount, + }, } /// Index 60, handler = channel +#[derive(Debug, Clone, PartialEq)] pub enum Basic { /// Index 10 Qos { @@ -258,14 +280,18 @@ pub enum Basic { arguments: Table, }, /// Index 21 - ConsumeOk { consumer_tag: ConsumerTag }, + ConsumeOk { + consumer_tag: ConsumerTag, + }, /// Index 30 Cancel { consumer_tag: ConsumerTag, no_wait: NoWait, }, /// Index 31 - CancelOk { consumer_tag: ConsumerTag }, + CancelOk { + consumer_tag: ConsumerTag, + }, /// Index 40 Publish { reserved_1: Short, @@ -304,7 +330,9 @@ pub enum Basic { message_count: MessageCount, }, /// Index 72 - GetEmpty { reserved_1: Shortstr }, + GetEmpty { + reserved_1: Shortstr, + }, /// Index 80 Ack { delivery_tag: DeliveryTag, @@ -316,13 +344,18 @@ pub enum Basic { requeue: Bit, }, /// Index 100 - RecoverAsync { requeue: Bit }, + RecoverAsync { + requeue: Bit, + }, /// Index 110 - Recover { requeue: Bit }, + Recover { + requeue: Bit, + }, /// Index 111 RecoverOk, } /// Index 90, handler = channel +#[derive(Debug, Clone, PartialEq)] pub enum Tx { /// Index 10 Select, @@ -338,415 +371,664 @@ pub enum Tx { RollbackOk, } pub mod parse { - use super::*; - use crate::classes::parse_helper::*; - use crate::error::TransError; - use nom::{branch::alt, bytes::complete::tag}; - use once_cell::sync::Lazy; - use regex::Regex; +use super::*; +use crate::classes::parse_helper::*; +use crate::error::TransError; +use nom::{branch::alt, bytes::complete::tag}; +use regex::Regex; +use once_cell::sync::Lazy; - pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; +pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>; + +pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err> { + alt((connection, channel, exchange, queue, basic, tx))(input) +} +fn domain_class_id(input: &[u8]) -> IResult { + short(input) +} +fn domain_consumer_tag(input: &[u8]) -> IResult { + shortstr(input) +} +fn domain_delivery_tag(input: &[u8]) -> IResult { + longlong(input) +} +fn domain_exchange_name(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.len() > 127 { fail!() } + static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); + if !REGEX.is_match(&result) { fail!() } + Ok((input, result)) +} +fn domain_method_id(input: &[u8]) -> IResult { + short(input) +} +fn domain_path(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.is_empty() { fail!() } + if result.len() > 127 { fail!() } + Ok((input, result)) +} +fn domain_peer_properties(input: &[u8]) -> IResult { + table(input) +} +fn domain_queue_name(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.len() > 127 { fail!() } + static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); + if !REGEX.is_match(&result) { fail!() } + Ok((input, result)) +} +fn domain_message_count(input: &[u8]) -> IResult { + long(input) +} +fn domain_reply_code(input: &[u8]) -> IResult { + let (input, result) = short(input)?; + if result == 0 { fail!() } + Ok((input, result)) +} +fn domain_reply_text(input: &[u8]) -> IResult { + let (input, result) = shortstr(input)?; + if result.is_empty() { fail!() } + Ok((input, result)) +} +fn domain_octet(input: &[u8]) -> IResult { + octet(input) +} +fn domain_short(input: &[u8]) -> IResult { + short(input) +} +fn domain_long(input: &[u8]) -> IResult { + long(input) +} +fn domain_longlong(input: &[u8]) -> IResult { + longlong(input) +} +fn domain_shortstr(input: &[u8]) -> IResult { + shortstr(input) +} +fn domain_longstr(input: &[u8]) -> IResult { + longstr(input) +} +fn domain_timestamp(input: &[u8]) -> IResult { + timestamp(input) +} +fn domain_table(input: &[u8]) -> IResult { + table(input) +} +fn connection(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + alt((connection_start, connection_start_ok, connection_secure, connection_secure_ok, connection_tune, connection_tune_ok, connection_open, connection_open_ok, connection_close, connection_close_ok, connection_blocked, connection_unblocked))(input) +} +fn connection_start(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, version_major) = domain_octet(input)?; + let (input, version_minor) = domain_octet(input)?; + let (input, server_properties) = domain_peer_properties(input)?; + let (input, mechanisms) = domain_longstr(input)?; + if mechanisms.is_empty() { fail!() } + let (input, locales) = domain_longstr(input)?; + if locales.is_empty() { fail!() } + Ok((input, Class::Connection(Connection::Start { + version_major, + version_minor, + server_properties, + mechanisms, + locales, + }))) +} +fn connection_start_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + let (input, client_properties) = domain_peer_properties(input)?; + let (input, mechanism) = domain_shortstr(input)?; + if mechanism.is_empty() { fail!() } + let (input, response) = domain_longstr(input)?; + if response.is_empty() { fail!() } + let (input, locale) = domain_shortstr(input)?; + if locale.is_empty() { fail!() } + Ok((input, Class::Connection(Connection::StartOk { + client_properties, + mechanism, + response, + locale, + }))) +} +fn connection_secure(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, challenge) = domain_longstr(input)?; + Ok((input, Class::Connection(Connection::Secure { + challenge, + }))) +} +fn connection_secure_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + let (input, response) = domain_longstr(input)?; + if response.is_empty() { fail!() } + Ok((input, Class::Connection(Connection::SecureOk { + response, + }))) +} +fn connection_tune(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + let (input, channel_max) = domain_short(input)?; + let (input, frame_max) = domain_long(input)?; + let (input, heartbeat) = domain_short(input)?; + Ok((input, Class::Connection(Connection::Tune { + channel_max, + frame_max, + heartbeat, + }))) +} +fn connection_tune_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + let (input, channel_max) = domain_short(input)?; + if channel_max == 0 { fail!() } + let (input, frame_max) = domain_long(input)?; + let (input, heartbeat) = domain_short(input)?; + Ok((input, Class::Connection(Connection::TuneOk { + channel_max, + frame_max, + heartbeat, + }))) +} +fn connection_open(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, virtual_host) = domain_path(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + let (input, bits) = bit(input, 1)?; + let reserved_2 = bits[0]; + Ok((input, Class::Connection(Connection::Open { + virtual_host, + reserved_1, + reserved_2, + }))) +} +fn connection_open_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + Ok((input, Class::Connection(Connection::OpenOk { + reserved_1, + }))) +} +fn connection_close(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + let (input, reply_code) = domain_reply_code(input)?; + let (input, reply_text) = domain_reply_text(input)?; + let (input, class_id) = domain_class_id(input)?; + let (input, method_id) = domain_method_id(input)?; + Ok((input, Class::Connection(Connection::Close { + reply_code, + reply_text, + class_id, + method_id, + }))) +} +fn connection_close_ok(input: &[u8]) -> IResult { + let (input, _) = tag([51])(input)?; + Ok((input, Class::Connection(Connection::CloseOk { + }))) +} +fn connection_blocked(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + let (input, reason) = domain_shortstr(input)?; + Ok((input, Class::Connection(Connection::Blocked { + reason, + }))) +} +fn connection_unblocked(input: &[u8]) -> IResult { + let (input, _) = tag([61])(input)?; + Ok((input, Class::Connection(Connection::Unblocked { + }))) +} +fn channel(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + alt((channel_open, channel_open_ok, channel_flow, channel_flow_ok, channel_close, channel_close_ok))(input) +} +fn channel_open(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + Ok((input, Class::Channel(Channel::Open { + reserved_1, + }))) +} +fn channel_open_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + let (input, reserved_1) = domain_longstr(input)?; + Ok((input, Class::Channel(Channel::OpenOk { + reserved_1, + }))) +} +fn channel_flow(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, bits) = bit(input, 1)?; + let active = bits[0]; + Ok((input, Class::Channel(Channel::Flow { + active, + }))) +} +fn channel_flow_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + let (input, bits) = bit(input, 1)?; + let active = bits[0]; + Ok((input, Class::Channel(Channel::FlowOk { + active, + }))) +} +fn channel_close(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, reply_code) = domain_reply_code(input)?; + let (input, reply_text) = domain_reply_text(input)?; + let (input, class_id) = domain_class_id(input)?; + let (input, method_id) = domain_method_id(input)?; + Ok((input, Class::Channel(Channel::Close { + reply_code, + reply_text, + class_id, + method_id, + }))) +} +fn channel_close_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + Ok((input, Class::Channel(Channel::CloseOk { + }))) +} +fn exchange(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + alt((exchange_declare, exchange_declare_ok, exchange_delete, exchange_delete_ok))(input) +} +fn exchange_declare(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, exchange) = domain_exchange_name(input)?; + if exchange.is_empty() { fail!() } + let (input, r#type) = domain_shortstr(input)?; + let (input, bits) = bit(input, 5)?; + let passive = bits[0]; + let durable = bits[1]; + let reserved_2 = bits[2]; + let reserved_3 = bits[3]; + let no_wait = bits[4]; + let (input, arguments) = domain_table(input)?; + Ok((input, Class::Exchange(Exchange::Declare { + reserved_1, + exchange, + r#type, + passive, + durable, + reserved_2, + reserved_3, + no_wait, + arguments, + }))) +} +fn exchange_declare_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + Ok((input, Class::Exchange(Exchange::DeclareOk { + }))) +} +fn exchange_delete(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, exchange) = domain_exchange_name(input)?; + if exchange.is_empty() { fail!() } + let (input, bits) = bit(input, 2)?; + let if_unused = bits[0]; + let no_wait = bits[1]; + Ok((input, Class::Exchange(Exchange::Delete { + reserved_1, + exchange, + if_unused, + no_wait, + }))) +} +fn exchange_delete_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + Ok((input, Class::Exchange(Exchange::DeleteOk { + }))) +} +fn queue(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + alt((queue_declare, queue_declare_ok, queue_bind, queue_bind_ok, queue_unbind, queue_unbind_ok, queue_purge, queue_purge_ok, queue_delete, queue_delete_ok))(input) +} +fn queue_declare(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 5)?; + let passive = bits[0]; + let durable = bits[1]; + let exclusive = bits[2]; + let auto_delete = bits[3]; + let no_wait = bits[4]; + let (input, arguments) = domain_table(input)?; + Ok((input, Class::Queue(Queue::Declare { + reserved_1, + queue, + passive, + durable, + exclusive, + auto_delete, + no_wait, + arguments, + }))) +} +fn queue_declare_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + let (input, queue) = domain_queue_name(input)?; + if queue.is_empty() { fail!() } + let (input, message_count) = domain_message_count(input)?; + let (input, consumer_count) = domain_long(input)?; + Ok((input, Class::Queue(Queue::DeclareOk { + queue, + message_count, + consumer_count, + }))) +} +fn queue_bind(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, bits) = bit(input, 1)?; + let no_wait = bits[0]; + let (input, arguments) = domain_table(input)?; + Ok((input, Class::Queue(Queue::Bind { + reserved_1, + queue, + exchange, + routing_key, + no_wait, + arguments, + }))) +} +fn queue_bind_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + Ok((input, Class::Queue(Queue::BindOk { + }))) +} +fn queue_unbind(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, arguments) = domain_table(input)?; + Ok((input, Class::Queue(Queue::Unbind { + reserved_1, + queue, + exchange, + routing_key, + arguments, + }))) +} +fn queue_unbind_ok(input: &[u8]) -> IResult { + let (input, _) = tag([51])(input)?; + Ok((input, Class::Queue(Queue::UnbindOk { + }))) +} +fn queue_purge(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 1)?; + let no_wait = bits[0]; + Ok((input, Class::Queue(Queue::Purge { + reserved_1, + queue, + no_wait, + }))) +} +fn queue_purge_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + let (input, message_count) = domain_message_count(input)?; + Ok((input, Class::Queue(Queue::PurgeOk { + message_count, + }))) +} +fn queue_delete(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 3)?; + let if_unused = bits[0]; + let if_empty = bits[1]; + let no_wait = bits[2]; + Ok((input, Class::Queue(Queue::Delete { + reserved_1, + queue, + if_unused, + if_empty, + no_wait, + }))) +} +fn queue_delete_ok(input: &[u8]) -> IResult { + let (input, _) = tag([41])(input)?; + let (input, message_count) = domain_message_count(input)?; + Ok((input, Class::Queue(Queue::DeleteOk { + message_count, + }))) +} +fn basic(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + alt((basic_qos, basic_qos_ok, basic_consume, basic_consume_ok, basic_cancel, basic_cancel_ok, basic_publish, basic_return, basic_deliver, basic_get, basic_get_ok, basic_get_empty, basic_ack, basic_reject, basic_recover_async, basic_recover, basic_recover_ok))(input) +} +fn basic_qos(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + let (input, prefetch_size) = domain_long(input)?; + let (input, prefetch_count) = domain_short(input)?; + let (input, bits) = bit(input, 1)?; + let global = bits[0]; + Ok((input, Class::Basic(Basic::Qos { + prefetch_size, + prefetch_count, + global, + }))) +} +fn basic_qos_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + Ok((input, Class::Basic(Basic::QosOk { + }))) +} +fn basic_consume(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + let (input, bits) = bit(input, 4)?; + let no_local = bits[0]; + let no_ack = bits[1]; + let exclusive = bits[2]; + let no_wait = bits[3]; + let (input, arguments) = domain_table(input)?; + Ok((input, Class::Basic(Basic::Consume { + reserved_1, + queue, + consumer_tag, + no_local, + no_ack, + exclusive, + no_wait, + arguments, + }))) +} +fn basic_consume_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + Ok((input, Class::Basic(Basic::ConsumeOk { + consumer_tag, + }))) +} +fn basic_cancel(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + let (input, bits) = bit(input, 1)?; + let no_wait = bits[0]; + Ok((input, Class::Basic(Basic::Cancel { + consumer_tag, + no_wait, + }))) +} +fn basic_cancel_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + Ok((input, Class::Basic(Basic::CancelOk { + consumer_tag, + }))) +} +fn basic_publish(input: &[u8]) -> IResult { + let (input, _) = tag([40])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, bits) = bit(input, 2)?; + let mandatory = bits[0]; + let immediate = bits[1]; + Ok((input, Class::Basic(Basic::Publish { + reserved_1, + exchange, + routing_key, + mandatory, + immediate, + }))) +} +fn basic_return(input: &[u8]) -> IResult { + let (input, _) = tag([50])(input)?; + let (input, reply_code) = domain_reply_code(input)?; + let (input, reply_text) = domain_reply_text(input)?; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + Ok((input, Class::Basic(Basic::Return { + reply_code, + reply_text, + exchange, + routing_key, + }))) +} +fn basic_deliver(input: &[u8]) -> IResult { + let (input, _) = tag([60])(input)?; + let (input, consumer_tag) = domain_consumer_tag(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let redelivered = bits[0]; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + Ok((input, Class::Basic(Basic::Deliver { + consumer_tag, + delivery_tag, + redelivered, + exchange, + routing_key, + }))) +} +fn basic_get(input: &[u8]) -> IResult { + let (input, _) = tag([70])(input)?; + let (input, reserved_1) = domain_short(input)?; + let (input, queue) = domain_queue_name(input)?; + let (input, bits) = bit(input, 1)?; + let no_ack = bits[0]; + Ok((input, Class::Basic(Basic::Get { + reserved_1, + queue, + no_ack, + }))) +} +fn basic_get_ok(input: &[u8]) -> IResult { + let (input, _) = tag([71])(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let redelivered = bits[0]; + let (input, exchange) = domain_exchange_name(input)?; + let (input, routing_key) = domain_shortstr(input)?; + let (input, message_count) = domain_message_count(input)?; + Ok((input, Class::Basic(Basic::GetOk { + delivery_tag, + redelivered, + exchange, + routing_key, + message_count, + }))) +} +fn basic_get_empty(input: &[u8]) -> IResult { + let (input, _) = tag([72])(input)?; + let (input, reserved_1) = domain_shortstr(input)?; + Ok((input, Class::Basic(Basic::GetEmpty { + reserved_1, + }))) +} +fn basic_ack(input: &[u8]) -> IResult { + let (input, _) = tag([80])(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let multiple = bits[0]; + Ok((input, Class::Basic(Basic::Ack { + delivery_tag, + multiple, + }))) +} +fn basic_reject(input: &[u8]) -> IResult { + let (input, _) = tag([90])(input)?; + let (input, delivery_tag) = domain_delivery_tag(input)?; + let (input, bits) = bit(input, 1)?; + let requeue = bits[0]; + Ok((input, Class::Basic(Basic::Reject { + delivery_tag, + requeue, + }))) +} +fn basic_recover_async(input: &[u8]) -> IResult { + let (input, _) = tag([100])(input)?; + let (input, bits) = bit(input, 1)?; + let requeue = bits[0]; + Ok((input, Class::Basic(Basic::RecoverAsync { + requeue, + }))) +} +fn basic_recover(input: &[u8]) -> IResult { + let (input, _) = tag([110])(input)?; + let (input, bits) = bit(input, 1)?; + let requeue = bits[0]; + Ok((input, Class::Basic(Basic::Recover { + requeue, + }))) +} +fn basic_recover_ok(input: &[u8]) -> IResult { + let (input, _) = tag([111])(input)?; + Ok((input, Class::Basic(Basic::RecoverOk { + }))) +} +fn tx(input: &[u8]) -> IResult { + let (input, _) = tag([90])(input)?; + alt((tx_select, tx_select_ok, tx_commit, tx_commit_ok, tx_rollback, tx_rollback_ok))(input) +} +fn tx_select(input: &[u8]) -> IResult { + let (input, _) = tag([10])(input)?; + Ok((input, Class::Tx(Tx::Select { + }))) +} +fn tx_select_ok(input: &[u8]) -> IResult { + let (input, _) = tag([11])(input)?; + Ok((input, Class::Tx(Tx::SelectOk { + }))) +} +fn tx_commit(input: &[u8]) -> IResult { + let (input, _) = tag([20])(input)?; + Ok((input, Class::Tx(Tx::Commit { + }))) +} +fn tx_commit_ok(input: &[u8]) -> IResult { + let (input, _) = tag([21])(input)?; + Ok((input, Class::Tx(Tx::CommitOk { + }))) +} +fn tx_rollback(input: &[u8]) -> IResult { + let (input, _) = tag([30])(input)?; + Ok((input, Class::Tx(Tx::Rollback { + }))) +} +fn tx_rollback_ok(input: &[u8]) -> IResult { + let (input, _) = tag([31])(input)?; + Ok((input, Class::Tx(Tx::RollbackOk { + }))) +} - pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err> { - alt((connection, channel, exchange, queue, basic, tx))(input) - } - fn domain_class_id(input: &[u8]) -> IResult { - short(input) - } - fn domain_consumer_tag(input: &[u8]) -> IResult { - shortstr(input) - } - fn domain_delivery_tag(input: &[u8]) -> IResult { - longlong(input) - } - fn domain_exchange_name(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.len() > 127 { - fail!() - } - static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); - if !REGEX.is_match(&result) { - fail!() - } - Ok((input, result)) - } - fn domain_method_id(input: &[u8]) -> IResult { - short(input) - } - fn domain_no_ack(input: &[u8]) -> IResult { - todo!() // bit - } - fn domain_no_local(input: &[u8]) -> IResult { - todo!() // bit - } - fn domain_no_wait(input: &[u8]) -> IResult { - todo!() // bit - } - fn domain_path(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.len() > 127 { - fail!() - } - Ok((input, result)) - } - fn domain_peer_properties(input: &[u8]) -> IResult { - table(input) - } - fn domain_queue_name(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - if result.len() > 127 { - fail!() - } - static REGEX: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap()); - if !REGEX.is_match(&result) { - fail!() - } - Ok((input, result)) - } - fn domain_redelivered(input: &[u8]) -> IResult { - todo!() // bit - } - fn domain_message_count(input: &[u8]) -> IResult { - long(input) - } - fn domain_reply_code(input: &[u8]) -> IResult { - let (input, result) = short(input)?; - Ok((input, result)) - } - fn domain_reply_text(input: &[u8]) -> IResult { - let (input, result) = shortstr(input)?; - Ok((input, result)) - } - fn domain_bit(input: &[u8]) -> IResult { - todo!() // bit - } - fn domain_octet(input: &[u8]) -> IResult { - octet(input) - } - fn domain_short(input: &[u8]) -> IResult { - short(input) - } - fn domain_long(input: &[u8]) -> IResult { - long(input) - } - fn domain_longlong(input: &[u8]) -> IResult { - longlong(input) - } - fn domain_shortstr(input: &[u8]) -> IResult { - shortstr(input) - } - fn domain_longstr(input: &[u8]) -> IResult { - longstr(input) - } - fn domain_timestamp(input: &[u8]) -> IResult { - timestamp(input) - } - fn domain_table(input: &[u8]) -> IResult
{ - table(input) - } - fn connection(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - alt(( - connection_start, - connection_start_ok, - connection_secure, - connection_secure_ok, - connection_tune, - connection_tune_ok, - connection_open, - connection_open_ok, - connection_close, - connection_close_ok, - connection_blocked, - connection_unblocked, - ))(input) - } - fn connection_start(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - todo!() - } - fn connection_start_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - todo!() - } - fn connection_secure(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - todo!() - } - fn connection_secure_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - todo!() - } - fn connection_tune(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - todo!() - } - fn connection_tune_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - todo!() - } - fn connection_open(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - todo!() - } - fn connection_open_ok(input: &[u8]) -> IResult { - let (input, _) = tag([41])(input)?; - todo!() - } - fn connection_close(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - todo!() - } - fn connection_close_ok(input: &[u8]) -> IResult { - let (input, _) = tag([51])(input)?; - todo!() - } - fn connection_blocked(input: &[u8]) -> IResult { - let (input, _) = tag([60])(input)?; - todo!() - } - fn connection_unblocked(input: &[u8]) -> IResult { - let (input, _) = tag([61])(input)?; - todo!() - } - fn channel(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - alt(( - channel_open, - channel_open_ok, - channel_flow, - channel_flow_ok, - channel_close, - channel_close_ok, - ))(input) - } - fn channel_open(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - todo!() - } - fn channel_open_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - todo!() - } - fn channel_flow(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - todo!() - } - fn channel_flow_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - todo!() - } - fn channel_close(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - todo!() - } - fn channel_close_ok(input: &[u8]) -> IResult { - let (input, _) = tag([41])(input)?; - todo!() - } - fn exchange(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - alt(( - exchange_declare, - exchange_declare_ok, - exchange_delete, - exchange_delete_ok, - ))(input) - } - fn exchange_declare(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - todo!() - } - fn exchange_declare_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - todo!() - } - fn exchange_delete(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - todo!() - } - fn exchange_delete_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - todo!() - } - fn queue(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - alt(( - queue_declare, - queue_declare_ok, - queue_bind, - queue_bind_ok, - queue_unbind, - queue_unbind_ok, - queue_purge, - queue_purge_ok, - queue_delete, - queue_delete_ok, - ))(input) - } - fn queue_declare(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - todo!() - } - fn queue_declare_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - todo!() - } - fn queue_bind(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - todo!() - } - fn queue_bind_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - todo!() - } - fn queue_unbind(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - todo!() - } - fn queue_unbind_ok(input: &[u8]) -> IResult { - let (input, _) = tag([51])(input)?; - todo!() - } - fn queue_purge(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - todo!() - } - fn queue_purge_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - todo!() - } - fn queue_delete(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - todo!() - } - fn queue_delete_ok(input: &[u8]) -> IResult { - let (input, _) = tag([41])(input)?; - todo!() - } - fn basic(input: &[u8]) -> IResult { - let (input, _) = tag([60])(input)?; - alt(( - basic_qos, - basic_qos_ok, - basic_consume, - basic_consume_ok, - basic_cancel, - basic_cancel_ok, - basic_publish, - basic_return, - basic_deliver, - basic_get, - basic_get_ok, - basic_get_empty, - basic_ack, - basic_reject, - basic_recover_async, - basic_recover, - basic_recover_ok, - ))(input) - } - fn basic_qos(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - todo!() - } - fn basic_qos_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - todo!() - } - fn basic_consume(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - todo!() - } - fn basic_consume_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - todo!() - } - fn basic_cancel(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - todo!() - } - fn basic_cancel_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - todo!() - } - fn basic_publish(input: &[u8]) -> IResult { - let (input, _) = tag([40])(input)?; - todo!() - } - fn basic_return(input: &[u8]) -> IResult { - let (input, _) = tag([50])(input)?; - todo!() - } - fn basic_deliver(input: &[u8]) -> IResult { - let (input, _) = tag([60])(input)?; - todo!() - } - fn basic_get(input: &[u8]) -> IResult { - let (input, _) = tag([70])(input)?; - todo!() - } - fn basic_get_ok(input: &[u8]) -> IResult { - let (input, _) = tag([71])(input)?; - todo!() - } - fn basic_get_empty(input: &[u8]) -> IResult { - let (input, _) = tag([72])(input)?; - todo!() - } - fn basic_ack(input: &[u8]) -> IResult { - let (input, _) = tag([80])(input)?; - todo!() - } - fn basic_reject(input: &[u8]) -> IResult { - let (input, _) = tag([90])(input)?; - todo!() - } - fn basic_recover_async(input: &[u8]) -> IResult { - let (input, _) = tag([100])(input)?; - todo!() - } - fn basic_recover(input: &[u8]) -> IResult { - let (input, _) = tag([110])(input)?; - todo!() - } - fn basic_recover_ok(input: &[u8]) -> IResult { - let (input, _) = tag([111])(input)?; - todo!() - } - fn tx(input: &[u8]) -> IResult { - let (input, _) = tag([90])(input)?; - alt(( - tx_select, - tx_select_ok, - tx_commit, - tx_commit_ok, - tx_rollback, - tx_rollback_ok, - ))(input) - } - fn tx_select(input: &[u8]) -> IResult { - let (input, _) = tag([10])(input)?; - todo!() - } - fn tx_select_ok(input: &[u8]) -> IResult { - let (input, _) = tag([11])(input)?; - todo!() - } - fn tx_commit(input: &[u8]) -> IResult { - let (input, _) = tag([20])(input)?; - todo!() - } - fn tx_commit_ok(input: &[u8]) -> IResult { - let (input, _) = tag([21])(input)?; - todo!() - } - fn tx_rollback(input: &[u8]) -> IResult { - let (input, _) = tag([30])(input)?; - todo!() - } - fn tx_rollback_ok(input: &[u8]) -> IResult { - let (input, _) = tag([31])(input)?; - todo!() - } } diff --git a/amqp_transport/src/classes/mod.rs b/amqp_transport/src/classes/mod.rs index 45b5129..fcbd28c 100644 --- a/amqp_transport/src/classes/mod.rs +++ b/amqp_transport/src/classes/mod.rs @@ -1,3 +1,5 @@ +use crate::classes::generated::Class; +use crate::error::{ConException, ProtocolError, TransError}; use std::collections::HashMap; mod generated; @@ -5,6 +7,7 @@ mod parse_helper; pub type Table = HashMap; +#[derive(Debug, Clone, PartialEq)] pub enum FieldValue { Boolean(bool), ShortShortInt(i8), @@ -25,3 +28,19 @@ pub enum FieldValue { FieldTable(Table), Void, } + +pub use generated::*; + +/// Parses the payload of a method frame into the class/method +pub fn parse_method(payload: &[u8]) -> Result { + let nom_result = generated::parse::parse_method(payload); + + match nom_result { + Ok(([], class)) => Ok(class), + Ok((_, _)) => Err(ProtocolError::ConException(ConException::SyntaxError).into()), + Err(nom::Err::Incomplete(_)) => { + Err(ProtocolError::ConException(ConException::SyntaxError).into()) + } + Err(nom::Err::Failure(err) | nom::Err::Error(err)) => Err(err), + } +} diff --git a/amqp_transport/src/classes/parse_helper.rs b/amqp_transport/src/classes/parse_helper.rs index fef4a88..07bcd0e 100644 --- a/amqp_transport/src/classes/parse_helper.rs +++ b/amqp_transport/src/classes/parse_helper.rs @@ -39,6 +39,7 @@ pub fn long(input: &[u8]) -> IResult { pub fn longlong(input: &[u8]) -> IResult { todo!() } +// todo: doing this using a vec is a bit wasteful, consider not doing that pub fn bit(input: &[u8], amount: u8) -> IResult> { todo!() } diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index e79a21a..8c57d4c 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -1,5 +1,6 @@ use crate::error::{ProtocolError, TransError}; use crate::frame; +use crate::frame::FrameType; use anyhow::Context; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; @@ -27,6 +28,10 @@ impl Connection { loop { let frame = frame::read_frame(&mut self.stream, 10000).await?; debug!(?frame, "received frame"); + if frame.kind == FrameType::Method { + let class = super::classes::parse_method(&frame.payload)?; + debug!(?class, "was method frame"); + } } } diff --git a/amqp_transport/src/frame.rs b/amqp_transport/src/frame.rs index d1b2b88..bcc2eaf 100644 --- a/amqp_transport/src/frame.rs +++ b/amqp_transport/src/frame.rs @@ -14,10 +14,10 @@ mod frame_type { #[derive(Debug, Clone, PartialEq, Eq)] pub struct Frame { /// The type of the frame including its parsed metadata. - kind: FrameType, - channel: u16, + pub kind: FrameType, + pub channel: u16, /// Includes the whole payload, also including the metadata from each type. - payload: Vec, + pub payload: Vec, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -29,23 +29,6 @@ pub enum FrameType { Heartbeat = 8, } -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum FrameTypeEnum { - /// 1 - Method, - /// 2 - Header { - class_id: u16, - body_size: u64, - /// Ordered from high to low - property_flags: u16, - }, - /// 3 - Body, - /// 8 - Heartbeat, -} - pub async fn read_frame(r: &mut R, max_frame_size: usize) -> Result where R: AsyncReadExt + Unpin,