mirror of
https://github.com/Noratrieb/haesli.git
synced 2026-01-16 20:55:03 +01:00
big future
This commit is contained in:
parent
2aeb588ab3
commit
fcf531df43
8 changed files with 1405 additions and 1297 deletions
|
|
@ -1,32 +1,39 @@
|
|||
use crate::error::{ProtocolError, TransError};
|
||||
use crate::frame;
|
||||
use crate::frame::FrameType;
|
||||
use crate::error::{ProtocolError, Result};
|
||||
use crate::frame::{Frame, FrameType};
|
||||
use crate::{classes, frame};
|
||||
use anyhow::Context;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
use tracing::{debug, error};
|
||||
use tracing::{debug, error, warn};
|
||||
|
||||
const MIN_MAX_FRAME_SIZE: usize = 4096;
|
||||
|
||||
pub struct Connection {
|
||||
stream: TcpStream,
|
||||
max_frame_size: usize,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub fn new(stream: TcpStream) -> Self {
|
||||
Self { stream }
|
||||
Self {
|
||||
stream,
|
||||
max_frame_size: MIN_MAX_FRAME_SIZE,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(mut self) {
|
||||
pub async fn open_connection(mut self) {
|
||||
match self.run().await {
|
||||
Ok(()) => {}
|
||||
Err(err) => error!(%err, "Error during processing of connection"),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) -> Result<(), TransError> {
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
self.negotiate_version().await?;
|
||||
self.start().await?;
|
||||
|
||||
loop {
|
||||
let frame = frame::read_frame(&mut self.stream, 10000).await?;
|
||||
let frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?;
|
||||
debug!(?frame, "received frame");
|
||||
if frame.kind == FrameType::Method {
|
||||
let class = super::classes::parse_method(&frame.payload)?;
|
||||
|
|
@ -35,7 +42,25 @@ impl Connection {
|
|||
}
|
||||
}
|
||||
|
||||
async fn negotiate_version(&mut self) -> Result<(), TransError> {
|
||||
async fn start(&mut self) -> Result<()> {
|
||||
let start_method = classes::Class::Connection(classes::Connection::Start {
|
||||
version_major: 0,
|
||||
version_minor: 9,
|
||||
server_properties: Default::default(),
|
||||
mechanisms: vec![],
|
||||
locales: vec![],
|
||||
});
|
||||
|
||||
let fut = classes::write::write_method(start_method, &mut self.stream);
|
||||
warn!(size = %std::mem::size_of_val(&fut), "that future is big");
|
||||
// todo fix out_buffer buffer things :spiral_eyes:
|
||||
// maybe have a `size` method on `Class` and use `AsyncWrite`? oh god no that's horrible
|
||||
// frame::write_frame(&mut self.stream, Frame {})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn negotiate_version(&mut self) -> Result<()> {
|
||||
const HEADER_SIZE: usize = 8;
|
||||
const SUPPORTED_PROTOCOL_VERSION: &[u8] = &[0, 9, 1];
|
||||
const OWN_PROTOCOL_HEADER: &[u8] = b"AMQP\0\0\x09\x01";
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue