sending frames works

This commit is contained in:
nora 2022-02-13 16:29:14 +01:00
parent fcf531df43
commit 217a419ef1
4 changed files with 296 additions and 299 deletions

View file

@ -7,9 +7,9 @@ pub(crate) fn codegen_write(amqp: &Amqp) {
use super::*;
use crate::classes::write_helper::*;
use crate::error::TransError;
use tokio::io::AsyncWriteExt;
use std::io::Write;
pub async fn write_method<W: AsyncWriteExt + Unpin>(class: Class, mut writer: W) -> Result<(), TransError> {{
pub fn write_method<W: Write>(class: Class, mut writer: W) -> Result<(), TransError> {{
match class {{"
);
@ -25,7 +25,7 @@ pub async fn write_method<W: AsyncWriteExt + Unpin>(class: Class, mut writer: W)
println!(" {field_name},");
}
println!(" }}) => {{");
println!(" writer.write_all(&[{class_index}, {method_index}]).await?;");
println!(" writer.write_all(&[{class_index}, {method_index}])?;");
let mut iter = method.fields.iter().peekable();
while let Some(field) = iter.next() {
@ -38,9 +38,9 @@ pub async fn write_method<W: AsyncWriteExt + Unpin>(class: Class, mut writer: W)
let field_name = snake_case(&field.name);
print!("{field_name}, ");
}
println!("], &mut writer).await?;");
println!("], &mut writer)?;");
} else {
println!(" {type_name}({field_name}, &mut writer).await?;");
println!(" {type_name}({field_name}, &mut writer)?;");
}
}
println!(" }}");

View file

@ -1134,12 +1134,9 @@ pub mod write {
use super::*;
use crate::classes::write_helper::*;
use crate::error::TransError;
use tokio::io::AsyncWriteExt;
use std::io::Write;
pub async fn write_method<W: AsyncWriteExt + Unpin>(
class: Class,
mut writer: W,
) -> Result<(), TransError> {
pub fn write_method<W: Write>(class: Class, mut writer: W) -> Result<(), TransError> {
match class {
Class::Connection(Connection::Start {
version_major,
@ -1148,12 +1145,12 @@ pub mod write {
mechanisms,
locales,
}) => {
writer.write_all(&[10, 10]).await?;
octet(version_major, &mut writer).await?;
octet(version_minor, &mut writer).await?;
table(server_properties, &mut writer).await?;
longstr(mechanisms, &mut writer).await?;
longstr(locales, &mut writer).await?;
writer.write_all(&[10, 10])?;
octet(version_major, &mut writer)?;
octet(version_minor, &mut writer)?;
table(server_properties, &mut writer)?;
longstr(mechanisms, &mut writer)?;
longstr(locales, &mut writer)?;
}
Class::Connection(Connection::StartOk {
client_properties,
@ -1161,53 +1158,53 @@ pub mod write {
response,
locale,
}) => {
writer.write_all(&[10, 11]).await?;
table(client_properties, &mut writer).await?;
shortstr(mechanism, &mut writer).await?;
longstr(response, &mut writer).await?;
shortstr(locale, &mut writer).await?;
writer.write_all(&[10, 11])?;
table(client_properties, &mut writer)?;
shortstr(mechanism, &mut writer)?;
longstr(response, &mut writer)?;
shortstr(locale, &mut writer)?;
}
Class::Connection(Connection::Secure { challenge }) => {
writer.write_all(&[10, 20]).await?;
longstr(challenge, &mut writer).await?;
writer.write_all(&[10, 20])?;
longstr(challenge, &mut writer)?;
}
Class::Connection(Connection::SecureOk { response }) => {
writer.write_all(&[10, 21]).await?;
longstr(response, &mut writer).await?;
writer.write_all(&[10, 21])?;
longstr(response, &mut writer)?;
}
Class::Connection(Connection::Tune {
channel_max,
frame_max,
heartbeat,
}) => {
writer.write_all(&[10, 30]).await?;
short(channel_max, &mut writer).await?;
long(frame_max, &mut writer).await?;
short(heartbeat, &mut writer).await?;
writer.write_all(&[10, 30])?;
short(channel_max, &mut writer)?;
long(frame_max, &mut writer)?;
short(heartbeat, &mut writer)?;
}
Class::Connection(Connection::TuneOk {
channel_max,
frame_max,
heartbeat,
}) => {
writer.write_all(&[10, 31]).await?;
short(channel_max, &mut writer).await?;
long(frame_max, &mut writer).await?;
short(heartbeat, &mut writer).await?;
writer.write_all(&[10, 31])?;
short(channel_max, &mut writer)?;
long(frame_max, &mut writer)?;
short(heartbeat, &mut writer)?;
}
Class::Connection(Connection::Open {
virtual_host,
reserved_1,
reserved_2,
}) => {
writer.write_all(&[10, 40]).await?;
shortstr(virtual_host, &mut writer).await?;
shortstr(reserved_1, &mut writer).await?;
bit(&[reserved_2], &mut writer).await?;
writer.write_all(&[10, 40])?;
shortstr(virtual_host, &mut writer)?;
shortstr(reserved_1, &mut writer)?;
bit(&[reserved_2], &mut writer)?;
}
Class::Connection(Connection::OpenOk { reserved_1 }) => {
writer.write_all(&[10, 41]).await?;
shortstr(reserved_1, &mut writer).await?;
writer.write_all(&[10, 41])?;
shortstr(reserved_1, &mut writer)?;
}
Class::Connection(Connection::Close {
reply_code,
@ -1215,37 +1212,37 @@ pub mod write {
class_id,
method_id,
}) => {
writer.write_all(&[10, 50]).await?;
short(reply_code, &mut writer).await?;
shortstr(reply_text, &mut writer).await?;
short(class_id, &mut writer).await?;
short(method_id, &mut writer).await?;
writer.write_all(&[10, 50])?;
short(reply_code, &mut writer)?;
shortstr(reply_text, &mut writer)?;
short(class_id, &mut writer)?;
short(method_id, &mut writer)?;
}
Class::Connection(Connection::CloseOk {}) => {
writer.write_all(&[10, 51]).await?;
writer.write_all(&[10, 51])?;
}
Class::Connection(Connection::Blocked { reason }) => {
writer.write_all(&[10, 60]).await?;
shortstr(reason, &mut writer).await?;
writer.write_all(&[10, 60])?;
shortstr(reason, &mut writer)?;
}
Class::Connection(Connection::Unblocked {}) => {
writer.write_all(&[10, 61]).await?;
writer.write_all(&[10, 61])?;
}
Class::Channel(Channel::Open { reserved_1 }) => {
writer.write_all(&[20, 10]).await?;
shortstr(reserved_1, &mut writer).await?;
writer.write_all(&[20, 10])?;
shortstr(reserved_1, &mut writer)?;
}
Class::Channel(Channel::OpenOk { reserved_1 }) => {
writer.write_all(&[20, 11]).await?;
longstr(reserved_1, &mut writer).await?;
writer.write_all(&[20, 11])?;
longstr(reserved_1, &mut writer)?;
}
Class::Channel(Channel::Flow { active }) => {
writer.write_all(&[20, 20]).await?;
bit(&[active], &mut writer).await?;
writer.write_all(&[20, 20])?;
bit(&[active], &mut writer)?;
}
Class::Channel(Channel::FlowOk { active }) => {
writer.write_all(&[20, 21]).await?;
bit(&[active], &mut writer).await?;
writer.write_all(&[20, 21])?;
bit(&[active], &mut writer)?;
}
Class::Channel(Channel::Close {
reply_code,
@ -1253,14 +1250,14 @@ pub mod write {
class_id,
method_id,
}) => {
writer.write_all(&[20, 40]).await?;
short(reply_code, &mut writer).await?;
shortstr(reply_text, &mut writer).await?;
short(class_id, &mut writer).await?;
short(method_id, &mut writer).await?;
writer.write_all(&[20, 40])?;
short(reply_code, &mut writer)?;
shortstr(reply_text, &mut writer)?;
short(class_id, &mut writer)?;
short(method_id, &mut writer)?;
}
Class::Channel(Channel::CloseOk {}) => {
writer.write_all(&[20, 41]).await?;
writer.write_all(&[20, 41])?;
}
Class::Exchange(Exchange::Declare {
reserved_1,
@ -1273,19 +1270,18 @@ pub mod write {
no_wait,
arguments,
}) => {
writer.write_all(&[40, 10]).await?;
short(reserved_1, &mut writer).await?;
shortstr(exchange, &mut writer).await?;
shortstr(r#type, &mut writer).await?;
writer.write_all(&[40, 10])?;
short(reserved_1, &mut writer)?;
shortstr(exchange, &mut writer)?;
shortstr(r#type, &mut writer)?;
bit(
&[passive, durable, reserved_2, reserved_3, no_wait],
&mut writer,
)
.await?;
table(arguments, &mut writer).await?;
)?;
table(arguments, &mut writer)?;
}
Class::Exchange(Exchange::DeclareOk {}) => {
writer.write_all(&[40, 11]).await?;
writer.write_all(&[40, 11])?;
}
Class::Exchange(Exchange::Delete {
reserved_1,
@ -1293,13 +1289,13 @@ pub mod write {
if_unused,
no_wait,
}) => {
writer.write_all(&[40, 20]).await?;
short(reserved_1, &mut writer).await?;
shortstr(exchange, &mut writer).await?;
bit(&[if_unused, no_wait], &mut writer).await?;
writer.write_all(&[40, 20])?;
short(reserved_1, &mut writer)?;
shortstr(exchange, &mut writer)?;
bit(&[if_unused, no_wait], &mut writer)?;
}
Class::Exchange(Exchange::DeleteOk {}) => {
writer.write_all(&[40, 21]).await?;
writer.write_all(&[40, 21])?;
}
Class::Queue(Queue::Declare {
reserved_1,
@ -1311,25 +1307,24 @@ pub mod write {
no_wait,
arguments,
}) => {
writer.write_all(&[50, 10]).await?;
short(reserved_1, &mut writer).await?;
shortstr(queue, &mut writer).await?;
writer.write_all(&[50, 10])?;
short(reserved_1, &mut writer)?;
shortstr(queue, &mut writer)?;
bit(
&[passive, durable, exclusive, auto_delete, no_wait],
&mut writer,
)
.await?;
table(arguments, &mut writer).await?;
)?;
table(arguments, &mut writer)?;
}
Class::Queue(Queue::DeclareOk {
queue,
message_count,
consumer_count,
}) => {
writer.write_all(&[50, 11]).await?;
shortstr(queue, &mut writer).await?;
long(message_count, &mut writer).await?;
long(consumer_count, &mut writer).await?;
writer.write_all(&[50, 11])?;
shortstr(queue, &mut writer)?;
long(message_count, &mut writer)?;
long(consumer_count, &mut writer)?;
}
Class::Queue(Queue::Bind {
reserved_1,
@ -1339,16 +1334,16 @@ pub mod write {
no_wait,
arguments,
}) => {
writer.write_all(&[50, 20]).await?;
short(reserved_1, &mut writer).await?;
shortstr(queue, &mut writer).await?;
shortstr(exchange, &mut writer).await?;
shortstr(routing_key, &mut writer).await?;
bit(&[no_wait], &mut writer).await?;
table(arguments, &mut writer).await?;
writer.write_all(&[50, 20])?;
short(reserved_1, &mut writer)?;
shortstr(queue, &mut writer)?;
shortstr(exchange, &mut writer)?;
shortstr(routing_key, &mut writer)?;
bit(&[no_wait], &mut writer)?;
table(arguments, &mut writer)?;
}
Class::Queue(Queue::BindOk {}) => {
writer.write_all(&[50, 21]).await?;
writer.write_all(&[50, 21])?;
}
Class::Queue(Queue::Unbind {
reserved_1,
@ -1357,29 +1352,29 @@ pub mod write {
routing_key,
arguments,
}) => {
writer.write_all(&[50, 50]).await?;
short(reserved_1, &mut writer).await?;
shortstr(queue, &mut writer).await?;
shortstr(exchange, &mut writer).await?;
shortstr(routing_key, &mut writer).await?;
table(arguments, &mut writer).await?;
writer.write_all(&[50, 50])?;
short(reserved_1, &mut writer)?;
shortstr(queue, &mut writer)?;
shortstr(exchange, &mut writer)?;
shortstr(routing_key, &mut writer)?;
table(arguments, &mut writer)?;
}
Class::Queue(Queue::UnbindOk {}) => {
writer.write_all(&[50, 51]).await?;
writer.write_all(&[50, 51])?;
}
Class::Queue(Queue::Purge {
reserved_1,
queue,
no_wait,
}) => {
writer.write_all(&[50, 30]).await?;
short(reserved_1, &mut writer).await?;
shortstr(queue, &mut writer).await?;
bit(&[no_wait], &mut writer).await?;
writer.write_all(&[50, 30])?;
short(reserved_1, &mut writer)?;
shortstr(queue, &mut writer)?;
bit(&[no_wait], &mut writer)?;
}
Class::Queue(Queue::PurgeOk { message_count }) => {
writer.write_all(&[50, 31]).await?;
long(message_count, &mut writer).await?;
writer.write_all(&[50, 31])?;
long(message_count, &mut writer)?;
}
Class::Queue(Queue::Delete {
reserved_1,
@ -1388,27 +1383,27 @@ pub mod write {
if_empty,
no_wait,
}) => {
writer.write_all(&[50, 40]).await?;
short(reserved_1, &mut writer).await?;
shortstr(queue, &mut writer).await?;
bit(&[if_unused, if_empty, no_wait], &mut writer).await?;
writer.write_all(&[50, 40])?;
short(reserved_1, &mut writer)?;
shortstr(queue, &mut writer)?;
bit(&[if_unused, if_empty, no_wait], &mut writer)?;
}
Class::Queue(Queue::DeleteOk { message_count }) => {
writer.write_all(&[50, 41]).await?;
long(message_count, &mut writer).await?;
writer.write_all(&[50, 41])?;
long(message_count, &mut writer)?;
}
Class::Basic(Basic::Qos {
prefetch_size,
prefetch_count,
global,
}) => {
writer.write_all(&[60, 10]).await?;
long(prefetch_size, &mut writer).await?;
short(prefetch_count, &mut writer).await?;
bit(&[global], &mut writer).await?;
writer.write_all(&[60, 10])?;
long(prefetch_size, &mut writer)?;
short(prefetch_count, &mut writer)?;
bit(&[global], &mut writer)?;
}
Class::Basic(Basic::QosOk {}) => {
writer.write_all(&[60, 11]).await?;
writer.write_all(&[60, 11])?;
}
Class::Basic(Basic::Consume {
reserved_1,
@ -1420,28 +1415,28 @@ pub mod write {
no_wait,
arguments,
}) => {
writer.write_all(&[60, 20]).await?;
short(reserved_1, &mut writer).await?;
shortstr(queue, &mut writer).await?;
shortstr(consumer_tag, &mut writer).await?;
bit(&[no_local, no_ack, exclusive, no_wait], &mut writer).await?;
table(arguments, &mut writer).await?;
writer.write_all(&[60, 20])?;
short(reserved_1, &mut writer)?;
shortstr(queue, &mut writer)?;
shortstr(consumer_tag, &mut writer)?;
bit(&[no_local, no_ack, exclusive, no_wait], &mut writer)?;
table(arguments, &mut writer)?;
}
Class::Basic(Basic::ConsumeOk { consumer_tag }) => {
writer.write_all(&[60, 21]).await?;
shortstr(consumer_tag, &mut writer).await?;
writer.write_all(&[60, 21])?;
shortstr(consumer_tag, &mut writer)?;
}
Class::Basic(Basic::Cancel {
consumer_tag,
no_wait,
}) => {
writer.write_all(&[60, 30]).await?;
shortstr(consumer_tag, &mut writer).await?;
bit(&[no_wait], &mut writer).await?;
writer.write_all(&[60, 30])?;
shortstr(consumer_tag, &mut writer)?;
bit(&[no_wait], &mut writer)?;
}
Class::Basic(Basic::CancelOk { consumer_tag }) => {
writer.write_all(&[60, 31]).await?;
shortstr(consumer_tag, &mut writer).await?;
writer.write_all(&[60, 31])?;
shortstr(consumer_tag, &mut writer)?;
}
Class::Basic(Basic::Publish {
reserved_1,
@ -1450,11 +1445,11 @@ pub mod write {
mandatory,
immediate,
}) => {
writer.write_all(&[60, 40]).await?;
short(reserved_1, &mut writer).await?;
shortstr(exchange, &mut writer).await?;
shortstr(routing_key, &mut writer).await?;
bit(&[mandatory, immediate], &mut writer).await?;
writer.write_all(&[60, 40])?;
short(reserved_1, &mut writer)?;
shortstr(exchange, &mut writer)?;
shortstr(routing_key, &mut writer)?;
bit(&[mandatory, immediate], &mut writer)?;
}
Class::Basic(Basic::Return {
reply_code,
@ -1462,11 +1457,11 @@ pub mod write {
exchange,
routing_key,
}) => {
writer.write_all(&[60, 50]).await?;
short(reply_code, &mut writer).await?;
shortstr(reply_text, &mut writer).await?;
shortstr(exchange, &mut writer).await?;
shortstr(routing_key, &mut writer).await?;
writer.write_all(&[60, 50])?;
short(reply_code, &mut writer)?;
shortstr(reply_text, &mut writer)?;
shortstr(exchange, &mut writer)?;
shortstr(routing_key, &mut writer)?;
}
Class::Basic(Basic::Deliver {
consumer_tag,
@ -1475,22 +1470,22 @@ pub mod write {
exchange,
routing_key,
}) => {
writer.write_all(&[60, 60]).await?;
shortstr(consumer_tag, &mut writer).await?;
longlong(delivery_tag, &mut writer).await?;
bit(&[redelivered], &mut writer).await?;
shortstr(exchange, &mut writer).await?;
shortstr(routing_key, &mut writer).await?;
writer.write_all(&[60, 60])?;
shortstr(consumer_tag, &mut writer)?;
longlong(delivery_tag, &mut writer)?;
bit(&[redelivered], &mut writer)?;
shortstr(exchange, &mut writer)?;
shortstr(routing_key, &mut writer)?;
}
Class::Basic(Basic::Get {
reserved_1,
queue,
no_ack,
}) => {
writer.write_all(&[60, 70]).await?;
short(reserved_1, &mut writer).await?;
shortstr(queue, &mut writer).await?;
bit(&[no_ack], &mut writer).await?;
writer.write_all(&[60, 70])?;
short(reserved_1, &mut writer)?;
shortstr(queue, &mut writer)?;
bit(&[no_ack], &mut writer)?;
}
Class::Basic(Basic::GetOk {
delivery_tag,
@ -1499,61 +1494,61 @@ pub mod write {
routing_key,
message_count,
}) => {
writer.write_all(&[60, 71]).await?;
longlong(delivery_tag, &mut writer).await?;
bit(&[redelivered], &mut writer).await?;
shortstr(exchange, &mut writer).await?;
shortstr(routing_key, &mut writer).await?;
long(message_count, &mut writer).await?;
writer.write_all(&[60, 71])?;
longlong(delivery_tag, &mut writer)?;
bit(&[redelivered], &mut writer)?;
shortstr(exchange, &mut writer)?;
shortstr(routing_key, &mut writer)?;
long(message_count, &mut writer)?;
}
Class::Basic(Basic::GetEmpty { reserved_1 }) => {
writer.write_all(&[60, 72]).await?;
shortstr(reserved_1, &mut writer).await?;
writer.write_all(&[60, 72])?;
shortstr(reserved_1, &mut writer)?;
}
Class::Basic(Basic::Ack {
delivery_tag,
multiple,
}) => {
writer.write_all(&[60, 80]).await?;
longlong(delivery_tag, &mut writer).await?;
bit(&[multiple], &mut writer).await?;
writer.write_all(&[60, 80])?;
longlong(delivery_tag, &mut writer)?;
bit(&[multiple], &mut writer)?;
}
Class::Basic(Basic::Reject {
delivery_tag,
requeue,
}) => {
writer.write_all(&[60, 90]).await?;
longlong(delivery_tag, &mut writer).await?;
bit(&[requeue], &mut writer).await?;
writer.write_all(&[60, 90])?;
longlong(delivery_tag, &mut writer)?;
bit(&[requeue], &mut writer)?;
}
Class::Basic(Basic::RecoverAsync { requeue }) => {
writer.write_all(&[60, 100]).await?;
bit(&[requeue], &mut writer).await?;
writer.write_all(&[60, 100])?;
bit(&[requeue], &mut writer)?;
}
Class::Basic(Basic::Recover { requeue }) => {
writer.write_all(&[60, 110]).await?;
bit(&[requeue], &mut writer).await?;
writer.write_all(&[60, 110])?;
bit(&[requeue], &mut writer)?;
}
Class::Basic(Basic::RecoverOk {}) => {
writer.write_all(&[60, 111]).await?;
writer.write_all(&[60, 111])?;
}
Class::Tx(Tx::Select {}) => {
writer.write_all(&[90, 10]).await?;
writer.write_all(&[90, 10])?;
}
Class::Tx(Tx::SelectOk {}) => {
writer.write_all(&[90, 11]).await?;
writer.write_all(&[90, 11])?;
}
Class::Tx(Tx::Commit {}) => {
writer.write_all(&[90, 20]).await?;
writer.write_all(&[90, 20])?;
}
Class::Tx(Tx::CommitOk {}) => {
writer.write_all(&[90, 21]).await?;
writer.write_all(&[90, 21])?;
}
Class::Tx(Tx::Rollback {}) => {
writer.write_all(&[90, 30]).await?;
writer.write_all(&[90, 30])?;
}
Class::Tx(Tx::RollbackOk {}) => {
writer.write_all(&[90, 31]).await?;
writer.write_all(&[90, 31])?;
}
}
Ok(())

View file

@ -1,32 +1,33 @@
use crate::classes::FieldValue;
use crate::classes::{Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp};
use crate::error::Result;
use crate::classes::generated::{
Bit, Long, Longlong, Longstr, Octet, Short, Shortstr, Table, Timestamp,
};
use crate::classes::{FieldValue, TableFieldName};
use crate::error::TransError;
use anyhow::Context;
use std::future::Future;
use std::pin::Pin;
use tokio::io::AsyncWriteExt;
use std::io;
use std::io::Write;
pub async fn octet<W: AsyncWriteExt + Unpin>(value: Octet, writer: &mut W) -> Result<()> {
writer.write_u8(value).await?;
pub fn octet<W: Write>(value: Octet, writer: &mut W) -> Result<(), TransError> {
writer.write_all(&[value])?;
Ok(())
}
pub async fn short<W: AsyncWriteExt + Unpin>(value: Short, writer: &mut W) -> Result<()> {
writer.write_u16(value).await?;
pub fn short<W: Write>(value: Short, writer: &mut W) -> Result<(), TransError> {
writer.write_all(&value.to_be_bytes())?;
Ok(())
}
pub async fn long<W: AsyncWriteExt + Unpin>(value: Long, writer: &mut W) -> Result<()> {
writer.write_u32(value).await?;
pub fn long<W: Write>(value: Long, writer: &mut W) -> Result<(), TransError> {
writer.write_all(&value.to_be_bytes())?;
Ok(())
}
pub async fn longlong<W: AsyncWriteExt + Unpin>(value: Longlong, writer: &mut W) -> Result<()> {
writer.write_u64(value).await?;
pub fn longlong<W: Write>(value: Longlong, writer: &mut W) -> Result<(), TransError> {
writer.write_all(&value.to_be_bytes())?;
Ok(())
}
pub async fn bit<W: AsyncWriteExt + Unpin>(value: &[Bit], writer: &mut W) -> Result<()> {
pub fn bit<W: Write>(value: &[Bit], writer: &mut W) -> Result<(), TransError> {
// accumulate bits into bytes, starting from the least significant bit in each byte
// how many bits have already been packed into `current_buf`
@ -35,7 +36,7 @@ pub async fn bit<W: AsyncWriteExt + Unpin>(value: &[Bit], writer: &mut W) -> Res
for &bit in value {
if already_filled >= 8 {
writer.write_u8(current_buf).await?;
writer.write_all(&[current_buf])?;
current_buf = 0;
already_filled = 0;
}
@ -46,151 +47,146 @@ pub async fn bit<W: AsyncWriteExt + Unpin>(value: &[Bit], writer: &mut W) -> Res
}
if already_filled > 0 {
writer.write_u8(current_buf).await?;
writer.write_all(&[current_buf])?;
}
Ok(())
}
pub async fn shortstr<W: AsyncWriteExt + Unpin>(value: Shortstr, writer: &mut W) -> Result<()> {
pub fn shortstr<W: Write>(value: Shortstr, writer: &mut W) -> Result<(), TransError> {
let len = u8::try_from(value.len()).context("shortstr too long")?;
writer.write_u8(len).await?;
writer.write_all(value.as_bytes()).await?;
writer.write_all(&[len])?;
writer.write_all(value.as_bytes())?;
Ok(())
}
pub async fn longstr<W: AsyncWriteExt + Unpin>(value: Longstr, writer: &mut W) -> Result<()> {
pub fn longstr<W: Write>(value: Longstr, writer: &mut W) -> Result<(), TransError> {
let len = u32::try_from(value.len()).context("longstr too long")?;
writer.write_u32(len).await?;
writer.write_all(value.as_slice()).await?;
writer.write_all(&len.to_be_bytes())?;
writer.write_all(value.as_slice())?;
Ok(())
}
pub async fn timestamp<W: AsyncWriteExt + Unpin>(value: Timestamp, writer: &mut W) -> Result<()> {
writer.write_u64(value).await?;
pub fn timestamp<W: Write>(value: Timestamp, writer: &mut W) -> Result<(), TransError> {
writer.write_all(&value.to_be_bytes())?;
Ok(())
}
pub async fn table<W: AsyncWriteExt + Unpin>(table: Table, writer: &mut W) -> Result<()> {
pub fn table<W: Write>(table: Table, writer: &mut W) -> Result<(), TransError> {
let len = u32::try_from(table.len()).context("table too big")?;
writer.write_u32(len).await?;
writer.write_all(&len.to_be_bytes())?;
for (field_name, value) in table {
shortstr(field_name, writer).await?;
field_value(value, writer).await?;
shortstr(field_name, writer)?;
field_value(value, writer)?;
}
Ok(())
}
fn field_value<W: AsyncWriteExt + Unpin>(
value: FieldValue,
writer: &mut W,
) -> Pin<Box<dyn Future<Output = Result<()>> + '_>> {
Box::pin(async {
fn field_value<W: Write>(value: FieldValue, writer: &mut W) -> Result<(), TransError> {
match value {
FieldValue::Boolean(bool) => {
writer.write_all(&[b't', u8::from(bool)]).await?;
writer.write_all(&[b't', u8::from(bool)])?;
}
FieldValue::ShortShortInt(int) => {
writer.write_all(b"b").await?;
writer.write_all(&int.to_be_bytes()).await?;
writer.write_all(b"b")?;
writer.write_all(&int.to_be_bytes())?;
}
FieldValue::ShortShortUInt(int) => {
writer.write_all(&[b'B', int]).await?;
writer.write_all(&[b'B', int])?;
}
FieldValue::ShortInt(int) => {
writer.write_all(b"U").await?;
writer.write_all(&int.to_be_bytes()).await?;
writer.write_all(b"U")?;
writer.write_all(&int.to_be_bytes())?;
}
FieldValue::ShortUInt(int) => {
writer.write_all(b"u").await?;
writer.write_all(&int.to_be_bytes()).await?;
writer.write_all(b"u")?;
writer.write_all(&int.to_be_bytes())?;
}
FieldValue::LongInt(int) => {
writer.write_all(b"I").await?;
writer.write_all(&int.to_be_bytes()).await?;
writer.write_all(b"I")?;
writer.write_all(&int.to_be_bytes())?;
}
FieldValue::LongUInt(int) => {
writer.write_all(b"i").await?;
writer.write_all(&int.to_be_bytes()).await?;
writer.write_all(b"i")?;
writer.write_all(&int.to_be_bytes())?;
}
FieldValue::LongLongInt(int) => {
writer.write_all(b"L").await?;
writer.write_all(&int.to_be_bytes()).await?;
writer.write_all(b"L")?;
writer.write_all(&int.to_be_bytes())?;
}
FieldValue::LongLongUInt(int) => {
writer.write_all(b"l").await?;
writer.write_all(&int.to_be_bytes()).await?;
writer.write_all(b"l")?;
writer.write_all(&int.to_be_bytes())?;
}
FieldValue::Float(float) => {
writer.write_all(b"f").await?;
writer.write_all(&float.to_be_bytes()).await?;
writer.write_all(b"f")?;
writer.write_all(&float.to_be_bytes())?;
}
FieldValue::Double(float) => {
writer.write_all(b"d").await?;
writer.write_all(&float.to_be_bytes()).await?;
writer.write_all(b"d")?;
writer.write_all(&float.to_be_bytes())?;
}
FieldValue::DecimalValue(scale, long) => {
writer.write_all(&[b'D', scale]).await?;
writer.write_all(&long.to_be_bytes()).await?;
writer.write_all(&[b'D', scale])?;
writer.write_all(&long.to_be_bytes())?;
}
FieldValue::ShortString(str) => {
writer.write_all(b"s").await?;
shortstr(str, writer).await?;
writer.write_all(b"s")?;
shortstr(str, writer)?;
}
FieldValue::LongString(str) => {
writer.write_all(b"S").await?;
longstr(str, writer).await?;
writer.write_all(b"S")?;
longstr(str, writer)?;
}
FieldValue::FieldArray(array) => {
writer.write_all(b"A").await?;
writer.write_all(b"A")?;
let len = u32::try_from(array.len()).context("array too long")?;
writer.write_all(&len.to_be_bytes()).await?;
writer.write_all(&len.to_be_bytes())?;
for element in array {
field_value(element, writer).await?;
field_value(element, writer)?;
}
}
FieldValue::Timestamp(time) => {
writer.write_all(b"T").await?;
writer.write_all(&time.to_be_bytes()).await?;
writer.write_all(b"T")?;
writer.write_all(&time.to_be_bytes())?;
}
FieldValue::FieldTable(_) => {
writer.write_all(b"F").await?;
writer.write_all(b"F")?;
}
FieldValue::Void => {
writer.write_all(b"V").await?;
writer.write_all(b"V")?;
}
}
Ok(())
})
}
#[cfg(test)]
mod tests {
#[tokio::test]
async fn pack_few_bits() {
let bits = vec![true, false, true];
#[test]
fn pack_few_bits() {
let bits = [true, false, true];
let mut buffer = Vec::new();
super::bit(&bits, &mut buffer).await.unwrap();
let mut buffer = [0u8; 1];
super::bit(&bits, &mut buffer.as_mut_slice()).unwrap();
assert_eq!(buffer.as_slice(), &[0b00000101])
assert_eq!(buffer, [0b00000101])
}
#[tokio::test]
async fn pack_many_bits() {
let bits = vec![
#[test]
fn pack_many_bits() {
let bits = [
/* first 8 */
true, true, true, true, false, false, false, false, /* second 4 */
true, false, true, true,
];
let mut buffer = Vec::new();
super::bit(&bits, &mut buffer).await.unwrap();
let mut buffer = [0u8; 2];
super::bit(&bits, &mut buffer.as_mut_slice()).unwrap();
assert_eq!(buffer, [0b00001111, 0b00001101]);
}

View file

@ -51,11 +51,17 @@ impl Connection {
locales: vec![],
});
let fut = classes::write::write_method(start_method, &mut self.stream);
warn!(size = %std::mem::size_of_val(&fut), "that future is big");
// todo fix out_buffer buffer things :spiral_eyes:
// maybe have a `size` method on `Class` and use `AsyncWrite`? oh god no that's horrible
// frame::write_frame(&mut self.stream, Frame {})?;
let mut payload = Vec::with_capacity(64);
classes::write::write_method(start_method, &mut payload)?;
frame::write_frame(
&mut self.stream,
&Frame {
kind: FrameType::Method,
channel: 0,
payload,
},
)
.await?;
Ok(())
}