more parser generation

This commit is contained in:
nora 2022-02-12 18:54:58 +01:00
parent 6f45a52871
commit c43126af1f
10 changed files with 904 additions and 252 deletions

68
Cargo.lock generated
View file

@ -2,6 +2,15 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "aho-corasick"
version = "0.7.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f"
dependencies = [
"memchr",
]
[[package]]
name = "amqp"
version = "0.1.0"
@ -19,6 +28,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"heck",
"itertools",
"strong-xml",
]
@ -27,7 +37,10 @@ name = "amqp_transport"
version = "0.1.0"
dependencies = [
"anyhow",
"nom",
"once_cell",
"rand",
"regex",
"thiserror",
"tokio",
"tracing",
@ -68,6 +81,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "either"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "getrandom"
version = "0.2.4"
@ -103,6 +122,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "itertools"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3"
dependencies = [
"either",
]
[[package]]
name = "jetscii"
version = "0.5.1"
@ -145,6 +173,12 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "mio"
version = "0.7.14"
@ -167,6 +201,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "nom"
version = "7.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109"
dependencies = [
"memchr",
"minimal-lexical",
"version_check",
]
[[package]]
name = "ntapi"
version = "0.3.6"
@ -296,6 +341,23 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "scopeguard"
version = "1.1.0"
@ -496,6 +558,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"

View file

@ -1,4 +1,5 @@
# amqp
https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf
https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf
https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf

View file

@ -8,4 +8,5 @@ edition = "2021"
[dependencies]
anyhow = "1.0.53"
heck = "0.4.0"
itertools = "0.10.3"
strong-xml = "0.6.3"

View file

@ -1,4 +1,7 @@
use anyhow::{Context, Result};
mod parser;
use crate::parser::codegen_parser;
use anyhow::Result;
use heck::ToUpperCamelCase;
use std::fs;
use strong_xml::XmlRead;
@ -81,12 +84,13 @@ fn main() -> Result<()> {
}
fn codegen(amqp: &Amqp) -> Result<()> {
println!("use std::collections::HashMap;\n");
domain_defs(amqp)?;
class_defs(amqp)
println!("// This file has been generated by `amqp_codegen`. Do not edit it manually.\n");
codegen_domain_defs(amqp)?;
codegen_class_defs(amqp)?;
codegen_parser(amqp)
}
fn domain_defs(amqp: &Amqp) -> Result<()> {
fn codegen_domain_defs(amqp: &Amqp) -> Result<()> {
for domain in &amqp.domains {
let invariants = invariants(domain.asserts.iter());
@ -94,7 +98,7 @@ fn domain_defs(amqp: &Amqp) -> Result<()> {
println!("/// {invariants}");
}
println!(
"type {} = {};\n",
"pub type {} = {};\n",
domain.name.to_upper_camel_case(),
amqp_type_to_rust_type(&domain.kind),
);
@ -103,7 +107,7 @@ fn domain_defs(amqp: &Amqp) -> Result<()> {
Ok(())
}
fn class_defs(amqp: &Amqp) -> Result<()> {
fn codegen_class_defs(amqp: &Amqp) -> Result<()> {
println!("pub enum Class {{");
for class in &amqp.classes {
let class_name = class.name.to_upper_camel_case();
@ -111,12 +115,6 @@ fn class_defs(amqp: &Amqp) -> Result<()> {
}
println!("}}\n");
println!(
"pub enum TableValue {{
"
);
for class in &amqp.classes {
let enum_name = class.name.to_upper_camel_case();
println!("/// Index {}, handler = {}", class.index, class.handler);
@ -125,13 +123,12 @@ fn class_defs(amqp: &Amqp) -> Result<()> {
let method_name = method.name.to_upper_camel_case();
println!(" /// Index {}", method.index);
print!(" {method_name}");
if method.fields.len() > 0 {
if !method.fields.is_empty() {
println!(" {{");
for field in &method.fields {
let field_name = snake_case(&field.name);
let (field_type, field_docs) = resolve_type(
amqp,
&field.domain.as_ref().or(field.kind.as_ref()).unwrap(),
field.domain.as_ref().or(field.kind.as_ref()).unwrap(),
field.asserts.as_ref(),
)?;
if !field_docs.is_empty() {
@ -150,7 +147,7 @@ fn class_defs(amqp: &Amqp) -> Result<()> {
Ok(())
}
fn amqp_type_to_rust_type<'a>(amqp_type: &str) -> &'static str {
fn amqp_type_to_rust_type(amqp_type: &str) -> &'static str {
match amqp_type {
"octet" => "u8",
"short" => "u16",
@ -159,37 +156,18 @@ fn amqp_type_to_rust_type<'a>(amqp_type: &str) -> &'static str {
"bit" => "u8",
"shortstr" | "longstr" => "String",
"timestamp" => "u64",
"table" => "HashMap<Shortstr, (Octet, /* todo */ Box<dyn std::any::Any>)>",
"table" => "super::Table",
_ => unreachable!("invalid type {}", amqp_type),
}
}
/// returns (type name, invariant docs)
fn resolve_type(amqp: &Amqp, domain: &str, asserts: &[Assert]) -> Result<(String, String)> {
let kind = amqp
.domains
.iter()
.find(|d| &d.name == domain)
.context("domain not found")?;
let is_nonnull = is_nonnull(asserts.iter().chain(kind.asserts.iter()));
fn resolve_type(domain: &str, asserts: &[Assert]) -> Result<(String, String)> {
let additional_docs = invariants(asserts.iter());
let type_name = domain.to_upper_camel_case();
Ok((
if is_nonnull {
type_name
} else {
format!("Option<{type_name}>")
},
additional_docs,
))
}
fn is_nonnull<'a>(mut asserts: impl Iterator<Item = &'a Assert>) -> bool {
asserts.find(|assert| assert.check == "notnull").is_some()
Ok((type_name, additional_docs))
}
fn snake_case(ident: &str) -> String {
@ -204,18 +182,17 @@ fn snake_case(ident: &str) -> String {
fn invariants<'a>(asserts: impl Iterator<Item = &'a Assert>) -> String {
asserts
.filter_map(|assert| match &*assert.check {
"notnull" => None,
"length" => Some(format!(
"must be shorter than {}",
assert.value.as_ref().unwrap()
)),
"regexp" => Some(format!("must match `{}`", assert.value.as_ref().unwrap())),
"le" => Some(format!(
"must be less than the {} field of the method {}",
assert.method.as_ref().unwrap(),
assert.field.as_ref().unwrap()
)),
.map(|assert| match &*assert.check {
"notnull" => "must not be null".to_string(),
"length" => format!("must be shorter than {}", assert.value.as_ref().unwrap()),
"regexp" => format!("must match `{}`", assert.value.as_ref().unwrap()),
"le" => {
format!(
"must be less than the {} field of the method {}",
assert.method.as_ref().unwrap(),
assert.field.as_ref().unwrap()
)
}
_ => unimplemented!(),
})
.collect::<Vec<_>>()

133
amqp_codegen/src/parser.rs Normal file
View file

@ -0,0 +1,133 @@
use crate::{Amqp, Class, Domain, Method};
use anyhow::Result;
use heck::{ToSnakeCase, ToUpperCamelCase};
use itertools::Itertools;
fn method_function_name(class_name: &str) -> impl Fn(&Method) -> String + '_ {
move |method| {
let method_name = method.name.to_snake_case();
format!("{class_name}_{method_name}")
}
}
fn domain_function_name(domain_name: &str) -> String {
let domain_name = domain_name.to_snake_case();
format!("domain_{domain_name}")
}
pub(crate) fn codegen_parser(amqp: &Amqp) -> Result<()> {
println!(
"pub mod parse {{
use super::*;
use crate::classes::parse_helper::*;
use crate::error::TransError;
use nom::{{branch::alt, bytes::complete::tag}};
use regex::Regex;
use once_cell::sync::Lazy;
pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>;
"
);
println!(
"pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err<TransError>> {{
alt(({}))(input)
}}",
amqp.classes
.iter()
.map(|class| class.name.to_snake_case())
.join(", ")
);
for domain in &amqp.domains {
domain_parser(domain)?;
}
for class in &amqp.classes {
let class_name = class.name.to_snake_case();
function(&class_name, "Class", || {
let class_index = class.index;
let all_methods = class
.methods
.iter()
.map(method_function_name(&class_name))
.join(", ");
println!(
" let (input, _) = tag([{class_index}])(input)?;
alt(({all_methods}))(input)"
);
Ok(())
})?;
for method in &class.methods {
method_parser(class, method)?;
}
}
println!("\n}}");
Ok(())
}
fn domain_parser(domain: &Domain) -> Result<()> {
let fn_name = domain_function_name(&domain.name);
let type_name = domain.kind.to_snake_case();
function(&fn_name, &domain.name.to_upper_camel_case(), || {
if domain.asserts.is_empty() {
if type_name == "bit" {
println!(" todo!() // bit")
} else {
println!(" {type_name}(input)");
}
} else {
println!(" let (input, result) = {type_name}(input)?;");
for assert in &domain.asserts {
match &*assert.check {
"notnull" => { /* todo */ }
"regexp" => {
let value = assert.value.as_ref().unwrap();
println!(
r#" static REGEX: Lazy<Regex> = Lazy::new(|| Regex::new(r"{value}").unwrap());"#
);
println!(" if !REGEX.is_match(&result) {{ fail!() }}");
}
"le" => {} // can't validate this here
"length" => {
let length = assert.value.as_ref().unwrap();
println!(" if result.len() > {length} {{ fail!() }}");
}
_ => unimplemented!(),
}
}
println!(" Ok((input, result))");
}
Ok(())
})
}
fn method_parser(class: &Class, method: &Method) -> Result<()> {
let class_name = class.name.to_snake_case();
let function_name = method_function_name(&class_name)(method);
function(&function_name, "Class", || {
let method_index = method.index;
println!(" let (input, _) = tag([{method_index}])(input)?;");
println!(" todo!()");
for _field in &method.fields {}
Ok(())
})?;
Ok(())
}
fn function<F>(name: &str, ret_ty: &str, body: F) -> Result<()>
where
F: FnOnce() -> Result<()>,
{
println!("fn {name}(input: &[u8]) -> IResult<{ret_ty}> {{");
body()?;
println!("}}");
Ok(())
}

View file

@ -7,9 +7,14 @@ edition = "2021"
[dependencies]
anyhow = "1.0.53"
nom = "7.1.0"
once_cell = "1.9.0"
rand = "0.8.4"
regex = "1.5.4"
thiserror = "1.0.30"
tokio = { version = "1.16.1", features = ["full"] }
tracing = "0.1.30"
tracing-subscriber = "0.3.8"
uuid = "0.8.2"
[features]

View file

@ -1,55 +1,59 @@
use std::collections::HashMap;
#![allow(unused_variables)]
type ClassId = u16;
// This file has been generated by `amqp_codegen`. Do not edit it manually.
type ConsumerTag = String;
pub type ClassId = u16;
type DeliveryTag = u64;
pub type ConsumerTag = String;
pub type DeliveryTag = u64;
/// must be shorter than 127, must match `^[a-zA-Z0-9-_.:]*$`
type ExchangeName = String;
pub type ExchangeName = String;
type MethodId = u16;
pub type MethodId = u16;
type NoAck = u8;
pub type NoAck = u8;
type NoLocal = u8;
pub type NoLocal = u8;
type NoWait = u8;
pub type NoWait = u8;
/// must be shorter than 127
type Path = String;
/// must not be null, must be shorter than 127
pub type Path = String;
type PeerProperties = HashMap<Shortstr, (Octet, /* todo */ Box<dyn std::any::Any>)>;
pub type PeerProperties = super::Table;
/// must be shorter than 127, must match `^[a-zA-Z0-9-_.:]*$`
type QueueName = String;
pub type QueueName = String;
type Redelivered = u8;
pub type Redelivered = u8;
type MessageCount = u32;
pub type MessageCount = u32;
type ReplyCode = u16;
/// must not be null
pub type ReplyCode = u16;
type ReplyText = String;
/// must not be null
pub type ReplyText = String;
type Bit = u8;
pub type Bit = u8;
type Octet = u8;
pub type Octet = u8;
type Short = u16;
pub type Short = u16;
type Long = u32;
pub type Long = u32;
type Longlong = u64;
pub type Longlong = u64;
type Shortstr = String;
pub type Shortstr = String;
type Longstr = String;
pub type Longstr = String;
type Timestamp = u64;
pub type Timestamp = u64;
type Table = HashMap<Shortstr, (Octet, /* todo */ Box<dyn std::any::Any>)>;
pub type Table = super::Table;
pub enum Class {
Connection(Connection),
@ -64,90 +68,82 @@ pub enum Class {
pub enum Connection {
/// Index 10
Start {
version_major: Option<Octet>,
version_minor: Option<Octet>,
server_properties: Option<PeerProperties>,
version_major: Octet,
version_minor: Octet,
server_properties: PeerProperties,
/// must not be null
mechanisms: Longstr,
/// must not be null
locales: Longstr,
},
/// Index 11
StartOk {
client_properties: Option<PeerProperties>,
client_properties: PeerProperties,
/// must not be null
mechanism: Shortstr,
/// must not be null
response: Longstr,
/// must not be null
locale: Shortstr,
},
/// Index 20
Secure {
challenge: Option<Longstr>,
},
Secure { challenge: Longstr },
/// Index 21
SecureOk {
/// must not be null
response: Longstr,
},
/// Index 30
Tune {
channel_max: Option<Short>,
frame_max: Option<Long>,
heartbeat: Option<Short>,
channel_max: Short,
frame_max: Long,
heartbeat: Short,
},
/// Index 31
TuneOk {
/// must be less than the tune field of the method channel-max
/// must not be null, must be less than the tune field of the method channel-max
channel_max: Short,
frame_max: Option<Long>,
heartbeat: Option<Short>,
frame_max: Long,
heartbeat: Short,
},
/// Index 40
Open {
virtual_host: Path,
reserved_1: Option<Shortstr>,
reserved_2: Option<Bit>,
reserved_1: Shortstr,
reserved_2: Bit,
},
/// Index 41
OpenOk {
reserved_1: Option<Shortstr>,
},
OpenOk { reserved_1: Shortstr },
/// Index 50
Close {
reply_code: ReplyCode,
reply_text: ReplyText,
class_id: Option<ClassId>,
method_id: Option<MethodId>,
class_id: ClassId,
method_id: MethodId,
},
/// Index 51
CloseOk,
/// Index 60
Blocked {
reason: Option<Shortstr>,
},
Blocked { reason: Shortstr },
/// Index 61
Unblocked,
}
/// Index 20, handler = channel
pub enum Channel {
/// Index 10
Open {
reserved_1: Option<Shortstr>,
},
Open { reserved_1: Shortstr },
/// Index 11
OpenOk {
reserved_1: Option<Longstr>,
},
OpenOk { reserved_1: Longstr },
/// Index 20
Flow {
active: Option<Bit>,
},
Flow { active: Bit },
/// Index 21
FlowOk {
active: Option<Bit>,
},
FlowOk { active: Bit },
/// Index 40
Close {
reply_code: ReplyCode,
reply_text: ReplyText,
class_id: Option<ClassId>,
method_id: Option<MethodId>,
class_id: ClassId,
method_id: MethodId,
},
/// Index 41
CloseOk,
@ -156,24 +152,26 @@ pub enum Channel {
pub enum Exchange {
/// Index 10
Declare {
reserved_1: Option<Short>,
reserved_1: Short,
/// must not be null
exchange: ExchangeName,
r#type: Option<Shortstr>,
passive: Option<Bit>,
durable: Option<Bit>,
reserved_2: Option<Bit>,
reserved_3: Option<Bit>,
no_wait: Option<NoWait>,
arguments: Option<Table>,
r#type: Shortstr,
passive: Bit,
durable: Bit,
reserved_2: Bit,
reserved_3: Bit,
no_wait: NoWait,
arguments: Table,
},
/// Index 11
DeclareOk,
/// Index 20
Delete {
reserved_1: Option<Short>,
reserved_1: Short,
/// must not be null
exchange: ExchangeName,
if_unused: Option<Bit>,
no_wait: Option<NoWait>,
if_unused: Bit,
no_wait: NoWait,
},
/// Index 21
DeleteOk,
@ -182,158 +180,145 @@ pub enum Exchange {
pub enum Queue {
/// Index 10
Declare {
reserved_1: Option<Short>,
queue: Option<QueueName>,
passive: Option<Bit>,
durable: Option<Bit>,
exclusive: Option<Bit>,
auto_delete: Option<Bit>,
no_wait: Option<NoWait>,
arguments: Option<Table>,
reserved_1: Short,
queue: QueueName,
passive: Bit,
durable: Bit,
exclusive: Bit,
auto_delete: Bit,
no_wait: NoWait,
arguments: Table,
},
/// Index 11
DeclareOk {
/// must not be null
queue: QueueName,
message_count: Option<MessageCount>,
consumer_count: Option<Long>,
message_count: MessageCount,
consumer_count: Long,
},
/// Index 20
Bind {
reserved_1: Option<Short>,
queue: Option<QueueName>,
exchange: Option<ExchangeName>,
routing_key: Option<Shortstr>,
no_wait: Option<NoWait>,
arguments: Option<Table>,
reserved_1: Short,
queue: QueueName,
exchange: ExchangeName,
routing_key: Shortstr,
no_wait: NoWait,
arguments: Table,
},
/// Index 21
BindOk,
/// Index 50
Unbind {
reserved_1: Option<Short>,
queue: Option<QueueName>,
exchange: Option<ExchangeName>,
routing_key: Option<Shortstr>,
arguments: Option<Table>,
reserved_1: Short,
queue: QueueName,
exchange: ExchangeName,
routing_key: Shortstr,
arguments: Table,
},
/// Index 51
UnbindOk,
/// Index 30
Purge {
reserved_1: Option<Short>,
queue: Option<QueueName>,
no_wait: Option<NoWait>,
reserved_1: Short,
queue: QueueName,
no_wait: NoWait,
},
/// Index 31
PurgeOk {
message_count: Option<MessageCount>,
},
PurgeOk { message_count: MessageCount },
/// Index 40
Delete {
reserved_1: Option<Short>,
queue: Option<QueueName>,
if_unused: Option<Bit>,
if_empty: Option<Bit>,
no_wait: Option<NoWait>,
reserved_1: Short,
queue: QueueName,
if_unused: Bit,
if_empty: Bit,
no_wait: NoWait,
},
/// Index 41
DeleteOk {
message_count: Option<MessageCount>,
},
DeleteOk { message_count: MessageCount },
}
/// Index 60, handler = channel
pub enum Basic {
/// Index 10
Qos {
prefetch_size: Option<Long>,
prefetch_count: Option<Short>,
global: Option<Bit>,
prefetch_size: Long,
prefetch_count: Short,
global: Bit,
},
/// Index 11
QosOk,
/// Index 20
Consume {
reserved_1: Option<Short>,
queue: Option<QueueName>,
consumer_tag: Option<ConsumerTag>,
no_local: Option<NoLocal>,
no_ack: Option<NoAck>,
exclusive: Option<Bit>,
no_wait: Option<NoWait>,
arguments: Option<Table>,
reserved_1: Short,
queue: QueueName,
consumer_tag: ConsumerTag,
no_local: NoLocal,
no_ack: NoAck,
exclusive: Bit,
no_wait: NoWait,
arguments: Table,
},
/// Index 21
ConsumeOk {
consumer_tag: Option<ConsumerTag>,
},
ConsumeOk { consumer_tag: ConsumerTag },
/// Index 30
Cancel {
consumer_tag: Option<ConsumerTag>,
no_wait: Option<NoWait>,
consumer_tag: ConsumerTag,
no_wait: NoWait,
},
/// Index 31
CancelOk {
consumer_tag: Option<ConsumerTag>,
},
CancelOk { consumer_tag: ConsumerTag },
/// Index 40
Publish {
reserved_1: Option<Short>,
exchange: Option<ExchangeName>,
routing_key: Option<Shortstr>,
mandatory: Option<Bit>,
immediate: Option<Bit>,
reserved_1: Short,
exchange: ExchangeName,
routing_key: Shortstr,
mandatory: Bit,
immediate: Bit,
},
/// Index 50
Return {
reply_code: ReplyCode,
reply_text: ReplyText,
exchange: Option<ExchangeName>,
routing_key: Option<Shortstr>,
exchange: ExchangeName,
routing_key: Shortstr,
},
/// Index 60
Deliver {
consumer_tag: Option<ConsumerTag>,
delivery_tag: Option<DeliveryTag>,
redelivered: Option<Redelivered>,
exchange: Option<ExchangeName>,
routing_key: Option<Shortstr>,
consumer_tag: ConsumerTag,
delivery_tag: DeliveryTag,
redelivered: Redelivered,
exchange: ExchangeName,
routing_key: Shortstr,
},
/// Index 70
Get {
reserved_1: Option<Short>,
queue: Option<QueueName>,
no_ack: Option<NoAck>,
reserved_1: Short,
queue: QueueName,
no_ack: NoAck,
},
/// Index 71
GetOk {
delivery_tag: Option<DeliveryTag>,
redelivered: Option<Redelivered>,
exchange: Option<ExchangeName>,
routing_key: Option<Shortstr>,
message_count: Option<MessageCount>,
delivery_tag: DeliveryTag,
redelivered: Redelivered,
exchange: ExchangeName,
routing_key: Shortstr,
message_count: MessageCount,
},
/// Index 72
GetEmpty {
reserved_1: Option<Shortstr>,
},
GetEmpty { reserved_1: Shortstr },
/// Index 80
Ack {
delivery_tag: Option<DeliveryTag>,
multiple: Option<Bit>,
delivery_tag: DeliveryTag,
multiple: Bit,
},
/// Index 90
Reject {
delivery_tag: Option<DeliveryTag>,
requeue: Option<Bit>,
delivery_tag: DeliveryTag,
requeue: Bit,
},
/// Index 100
RecoverAsync {
requeue: Option<Bit>,
},
RecoverAsync { requeue: Bit },
/// Index 110
Recover {
requeue: Option<Bit>,
},
Recover { requeue: Bit },
/// Index 111
RecoverOk,
}
@ -352,3 +337,416 @@ pub enum Tx {
/// Index 31
RollbackOk,
}
pub mod parse {
use super::*;
use crate::classes::parse_helper::*;
use crate::error::TransError;
use nom::{branch::alt, bytes::complete::tag};
use once_cell::sync::Lazy;
use regex::Regex;
pub type IResult<'a, T> = nom::IResult<&'a [u8], T, TransError>;
pub fn parse_method(input: &[u8]) -> Result<(&[u8], Class), nom::Err<TransError>> {
alt((connection, channel, exchange, queue, basic, tx))(input)
}
fn domain_class_id(input: &[u8]) -> IResult<ClassId> {
short(input)
}
fn domain_consumer_tag(input: &[u8]) -> IResult<ConsumerTag> {
shortstr(input)
}
fn domain_delivery_tag(input: &[u8]) -> IResult<DeliveryTag> {
longlong(input)
}
fn domain_exchange_name(input: &[u8]) -> IResult<ExchangeName> {
let (input, result) = shortstr(input)?;
if result.len() > 127 {
fail!()
}
static REGEX: Lazy<Regex> = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap());
if !REGEX.is_match(&result) {
fail!()
}
Ok((input, result))
}
fn domain_method_id(input: &[u8]) -> IResult<MethodId> {
short(input)
}
fn domain_no_ack(input: &[u8]) -> IResult<NoAck> {
todo!() // bit
}
fn domain_no_local(input: &[u8]) -> IResult<NoLocal> {
todo!() // bit
}
fn domain_no_wait(input: &[u8]) -> IResult<NoWait> {
todo!() // bit
}
fn domain_path(input: &[u8]) -> IResult<Path> {
let (input, result) = shortstr(input)?;
if result.len() > 127 {
fail!()
}
Ok((input, result))
}
fn domain_peer_properties(input: &[u8]) -> IResult<PeerProperties> {
table(input)
}
fn domain_queue_name(input: &[u8]) -> IResult<QueueName> {
let (input, result) = shortstr(input)?;
if result.len() > 127 {
fail!()
}
static REGEX: Lazy<Regex> = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_.:]*$").unwrap());
if !REGEX.is_match(&result) {
fail!()
}
Ok((input, result))
}
fn domain_redelivered(input: &[u8]) -> IResult<Redelivered> {
todo!() // bit
}
fn domain_message_count(input: &[u8]) -> IResult<MessageCount> {
long(input)
}
fn domain_reply_code(input: &[u8]) -> IResult<ReplyCode> {
let (input, result) = short(input)?;
Ok((input, result))
}
fn domain_reply_text(input: &[u8]) -> IResult<ReplyText> {
let (input, result) = shortstr(input)?;
Ok((input, result))
}
fn domain_bit(input: &[u8]) -> IResult<Bit> {
todo!() // bit
}
fn domain_octet(input: &[u8]) -> IResult<Octet> {
octet(input)
}
fn domain_short(input: &[u8]) -> IResult<Short> {
short(input)
}
fn domain_long(input: &[u8]) -> IResult<Long> {
long(input)
}
fn domain_longlong(input: &[u8]) -> IResult<Longlong> {
longlong(input)
}
fn domain_shortstr(input: &[u8]) -> IResult<Shortstr> {
shortstr(input)
}
fn domain_longstr(input: &[u8]) -> IResult<Longstr> {
longstr(input)
}
fn domain_timestamp(input: &[u8]) -> IResult<Timestamp> {
timestamp(input)
}
fn domain_table(input: &[u8]) -> IResult<Table> {
table(input)
}
fn connection(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([10])(input)?;
alt((
connection_start,
connection_start_ok,
connection_secure,
connection_secure_ok,
connection_tune,
connection_tune_ok,
connection_open,
connection_open_ok,
connection_close,
connection_close_ok,
connection_blocked,
connection_unblocked,
))(input)
}
fn connection_start(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([10])(input)?;
todo!()
}
fn connection_start_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([11])(input)?;
todo!()
}
fn connection_secure(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([20])(input)?;
todo!()
}
fn connection_secure_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([21])(input)?;
todo!()
}
fn connection_tune(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([30])(input)?;
todo!()
}
fn connection_tune_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([31])(input)?;
todo!()
}
fn connection_open(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([40])(input)?;
todo!()
}
fn connection_open_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([41])(input)?;
todo!()
}
fn connection_close(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([50])(input)?;
todo!()
}
fn connection_close_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([51])(input)?;
todo!()
}
fn connection_blocked(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([60])(input)?;
todo!()
}
fn connection_unblocked(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([61])(input)?;
todo!()
}
fn channel(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([20])(input)?;
alt((
channel_open,
channel_open_ok,
channel_flow,
channel_flow_ok,
channel_close,
channel_close_ok,
))(input)
}
fn channel_open(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([10])(input)?;
todo!()
}
fn channel_open_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([11])(input)?;
todo!()
}
fn channel_flow(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([20])(input)?;
todo!()
}
fn channel_flow_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([21])(input)?;
todo!()
}
fn channel_close(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([40])(input)?;
todo!()
}
fn channel_close_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([41])(input)?;
todo!()
}
fn exchange(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([40])(input)?;
alt((
exchange_declare,
exchange_declare_ok,
exchange_delete,
exchange_delete_ok,
))(input)
}
fn exchange_declare(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([10])(input)?;
todo!()
}
fn exchange_declare_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([11])(input)?;
todo!()
}
fn exchange_delete(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([20])(input)?;
todo!()
}
fn exchange_delete_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([21])(input)?;
todo!()
}
fn queue(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([50])(input)?;
alt((
queue_declare,
queue_declare_ok,
queue_bind,
queue_bind_ok,
queue_unbind,
queue_unbind_ok,
queue_purge,
queue_purge_ok,
queue_delete,
queue_delete_ok,
))(input)
}
fn queue_declare(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([10])(input)?;
todo!()
}
fn queue_declare_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([11])(input)?;
todo!()
}
fn queue_bind(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([20])(input)?;
todo!()
}
fn queue_bind_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([21])(input)?;
todo!()
}
fn queue_unbind(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([50])(input)?;
todo!()
}
fn queue_unbind_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([51])(input)?;
todo!()
}
fn queue_purge(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([30])(input)?;
todo!()
}
fn queue_purge_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([31])(input)?;
todo!()
}
fn queue_delete(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([40])(input)?;
todo!()
}
fn queue_delete_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([41])(input)?;
todo!()
}
fn basic(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([60])(input)?;
alt((
basic_qos,
basic_qos_ok,
basic_consume,
basic_consume_ok,
basic_cancel,
basic_cancel_ok,
basic_publish,
basic_return,
basic_deliver,
basic_get,
basic_get_ok,
basic_get_empty,
basic_ack,
basic_reject,
basic_recover_async,
basic_recover,
basic_recover_ok,
))(input)
}
fn basic_qos(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([10])(input)?;
todo!()
}
fn basic_qos_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([11])(input)?;
todo!()
}
fn basic_consume(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([20])(input)?;
todo!()
}
fn basic_consume_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([21])(input)?;
todo!()
}
fn basic_cancel(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([30])(input)?;
todo!()
}
fn basic_cancel_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([31])(input)?;
todo!()
}
fn basic_publish(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([40])(input)?;
todo!()
}
fn basic_return(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([50])(input)?;
todo!()
}
fn basic_deliver(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([60])(input)?;
todo!()
}
fn basic_get(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([70])(input)?;
todo!()
}
fn basic_get_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([71])(input)?;
todo!()
}
fn basic_get_empty(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([72])(input)?;
todo!()
}
fn basic_ack(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([80])(input)?;
todo!()
}
fn basic_reject(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([90])(input)?;
todo!()
}
fn basic_recover_async(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([100])(input)?;
todo!()
}
fn basic_recover(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([110])(input)?;
todo!()
}
fn basic_recover_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([111])(input)?;
todo!()
}
fn tx(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([90])(input)?;
alt((
tx_select,
tx_select_ok,
tx_commit,
tx_commit_ok,
tx_rollback,
tx_rollback_ok,
))(input)
}
fn tx_select(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([10])(input)?;
todo!()
}
fn tx_select_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([11])(input)?;
todo!()
}
fn tx_commit(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([20])(input)?;
todo!()
}
fn tx_commit_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([21])(input)?;
todo!()
}
fn tx_rollback(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([30])(input)?;
todo!()
}
fn tx_rollback_ok(input: &[u8]) -> IResult<Class> {
let (input, _) = tag([31])(input)?;
todo!()
}
}

View file

@ -1,3 +1,27 @@
mod generated;
use std::collections::HashMap;
static TABLE_VALUES: &[(char, &str)] = &[[]];
mod generated;
mod parse_helper;
pub type Table = HashMap<String, FieldValue>;
pub enum FieldValue {
Boolean(bool),
ShortShortInt(i8),
ShortShortUInt(u8),
ShortInt(i16),
ShortUInt(u16),
LongInt(i32),
LongUInt(u32),
LongLongInt(i64),
LongLongUInt(u64),
Float(f32),
Double(f64),
DecimalValue(u8, u32),
ShortString(String),
LongString(String),
FieldArray(Vec<FieldValue>),
Timestamp(u64),
FieldTable(Table),
Void,
}

View file

@ -0,0 +1,56 @@
use crate::classes::generated::parse::IResult;
use crate::classes::generated::{
Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp,
};
use crate::error::{ConException, ProtocolError, TransError};
use nom::error::ErrorKind;
impl nom::error::ParseError<&[u8]> for TransError {
fn from_error_kind(_input: &[u8], _kind: ErrorKind) -> Self {
ProtocolError::ConException(ConException::SyntaxError).into()
}
fn append(_input: &[u8], _kind: ErrorKind, other: Self) -> Self {
other
}
}
#[macro_export]
macro_rules! fail {
() => {
return Err(nom::Err::Failure(
crate::error::ProtocolError::ConException(crate::error::ConException::SyntaxError)
.into(),
))
};
}
pub use fail;
pub fn octet(input: &[u8]) -> IResult<Octet> {
todo!()
}
pub fn short(input: &[u8]) -> IResult<Short> {
todo!()
}
pub fn long(input: &[u8]) -> IResult<Long> {
todo!()
}
pub fn longlong(input: &[u8]) -> IResult<Longlong> {
todo!()
}
pub fn bit(input: &[u8], amount: u8) -> IResult<Vec<Bit>> {
todo!()
}
pub fn shortstr(input: &[u8]) -> IResult<Shortstr> {
todo!()
}
pub fn longstr(input: &[u8]) -> IResult<Longstr> {
todo!()
}
pub fn timestamp(input: &[u8]) -> IResult<Timestamp> {
todo!()
}
pub fn table(input: &[u8]) -> IResult<Table> {
todo!()
}

View file

@ -21,9 +21,18 @@ pub struct Frame {
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[repr(u8)]
pub enum FrameType {
Method = 1,
Header = 2,
Body = 3,
Heartbeat = 8,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FrameTypeEnum {
/// 1
Method { class_id: u16, method_id: u16 },
Method,
/// 2
Header {
class_id: u16,
@ -33,7 +42,7 @@ pub enum FrameType {
},
/// 3
Body,
/// 4
/// 8
Heartbeat,
}
@ -58,7 +67,7 @@ where
return Err(ProtocolError::ConException(ConException::FrameError).into());
}
let kind = parse_frame_type(kind, &payload, channel)?;
let kind = parse_frame_type(kind, channel)?;
Ok(Frame {
kind,
@ -67,34 +76,10 @@ where
})
}
fn parse_frame_type(kind: u8, payload: &[u8], channel: u16) -> Result<FrameType, TransError> {
fn parse_frame_type(kind: u8, channel: u16) -> Result<FrameType, TransError> {
match kind {
frame_type::METHOD => {
let class_id = u16::from_be_bytes(payload[0..2].try_into().unwrap());
let method_id = u16::from_be_bytes(payload[2..4].try_into().unwrap());
Ok(FrameType::Method {
class_id,
method_id,
})
}
frame_type::HEADER => {
let class_id = u16::from_be_bytes(payload[0..2].try_into().unwrap());
let weight = u16::from_be_bytes(payload[2..4].try_into().unwrap());
// weight is unused and must always be 0
if weight != 0 {
return Err(ProtocolError::ConException(ConException::FrameError).into());
}
let body_size = u64::from_be_bytes(payload[4..12].try_into().unwrap());
let property_flags = u16::from_be_bytes(payload[12..14].try_into().unwrap());
Ok(FrameType::Header {
class_id,
body_size,
property_flags,
})
}
frame_type::METHOD => Ok(FrameType::Method),
frame_type::HEADER => Ok(FrameType::Header),
frame_type::BODY => Ok(FrameType::Body),
frame_type::HEARTBEAT => {
if channel != 0 {
@ -103,7 +88,7 @@ fn parse_frame_type(kind: u8, payload: &[u8], channel: u16) -> Result<FrameType,
Ok(FrameType::Heartbeat)
}
}
_ => todo!(),
_ => Err(ProtocolError::ConException(ConException::FrameError).into()),
}
}
@ -114,17 +99,22 @@ mod tests {
#[tokio::test]
async fn read_small_body() {
let mut bytes: &[u8] = &[
/*type*/ 1,
/*channel*/ 0,
/*type*/
1,
/*channel*/
0,
0,
/*size*/
0,
/*size*/ 0,
0,
0,
3,
/*payload*/ 1,
/*payload*/
1,
2,
3,
/*frame-end*/ super::REQUIRED_FRAME_END,
/*frame-end*/
super::REQUIRED_FRAME_END,
];
let frame = super::read_frame(&mut bytes, 10000).await.unwrap();
@ -133,7 +123,6 @@ mod tests {
Frame {
kind: FrameType::Method,
channel: 0,
size: 3,
payload: vec![1, 2, 3],
}
);