This commit is contained in:
nora 2022-03-04 22:15:19 +01:00
parent 93ce632b5d
commit 4346db648f
24 changed files with 224 additions and 209 deletions

View file

@ -1,29 +1,34 @@
use crate::error::{ConException, ProtocolError, Result, TransError};
use crate::frame::{parse_content_header, Frame, FrameType};
use crate::{frame, methods, sasl};
use amqp_core::connection::{
ChannelHandle, ChannelNum, ConnectionHandle, ConnectionId, ContentHeader, MethodReceiver,
MethodSender, QueuedMethod,
use crate::{
error::{ConException, ProtocolError, Result, TransError},
frame,
frame::{parse_content_header, Frame, FrameType},
methods, sasl,
};
use amqp_core::message::{MessageId, RawMessage, RoutingInformation};
use amqp_core::methods::{
BasicPublish, ChannelClose, ChannelCloseOk, ChannelOpenOk, ConnectionClose, ConnectionCloseOk,
ConnectionOpen, ConnectionOpenOk, ConnectionStart, ConnectionStartOk, ConnectionTune,
ConnectionTuneOk, FieldValue, Method, Table,
use amqp_core::{
amqp_todo,
connection::{
Channel, ChannelNum, ConEventReceiver, ConEventSender, Connection, ConnectionId,
ContentHeader, QueuedMethod,
},
message::{MessageId, RawMessage, RoutingInformation},
methods::{
BasicPublish, ChannelClose, ChannelCloseOk, ChannelOpenOk, ConnectionClose,
ConnectionCloseOk, ConnectionOpen, ConnectionOpenOk, ConnectionStart, ConnectionStartOk,
ConnectionTune, ConnectionTuneOk, FieldValue, Method, Table,
},
GlobalData,
};
use amqp_core::{amqp_todo, GlobalData};
use anyhow::Context;
use bytes::Bytes;
use smallvec::SmallVec;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::{select, time};
use std::{
cmp::Ordering, collections::HashMap, net::SocketAddr, pin::Pin, sync::Arc, time::Duration,
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
select, time,
};
use tracing::{debug, error, info, trace, warn};
fn ensure_conn(condition: bool) -> Result<()> {
@ -39,16 +44,16 @@ const CHANNEL_MAX: u16 = 0;
const FRAME_SIZE_MAX: u32 = 0;
const HEARTBEAT_DELAY: u16 = 0;
const BASIC_CLASS_ID: u16 = 60;
const BASIC_CLASS_ID: ChannelNum = ChannelNum::new(60);
pub struct Channel {
pub struct TransportChannel {
/// A handle to the global channel representation. Used to remove the channel when it's dropped
handle: ChannelHandle,
global_chan: Arc<Channel>,
/// The current status of the channel, whether it has sent a method that expects a body
status: ChannelStatus,
}
pub struct Connection {
pub struct TransportConnection {
id: ConnectionId,
stream: TcpStream,
max_frame_size: usize,
@ -56,20 +61,19 @@ pub struct Connection {
channel_max: u16,
/// When the next heartbeat expires
next_timeout: Pin<Box<time::Sleep>>,
channels: HashMap<ChannelNum, Channel>,
handle: ConnectionHandle,
channels: HashMap<ChannelNum, TransportChannel>,
global_con: Arc<Connection>,
global_data: GlobalData,
method_queue_send: MethodSender,
method_queue_recv: MethodReceiver,
method_queue_send: ConEventSender,
method_queue_recv: ConEventReceiver,
}
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
enum ChannelStatus {
Default,
/// ClassId // todo: newtype it
NeedHeader(u16, Box<Method>),
NeedHeader(ChannelNum, Box<Method>),
NeedsBody(Box<Method>, ContentHeader, SmallVec<[Bytes; 1]>),
}
@ -79,14 +83,14 @@ impl ChannelStatus {
}
}
impl Connection {
impl TransportConnection {
pub fn new(
id: ConnectionId,
stream: TcpStream,
connection_handle: ConnectionHandle,
connection_handle: Arc<GConnection>,
global_data: GlobalData,
method_queue_send: MethodSender,
method_queue_recv: MethodReceiver,
method_queue_send: ConEventSender,
method_queue_recv: ConEventReceiver,
) -> Self {
Self {
id,
@ -95,11 +99,11 @@ impl Connection {
heartbeat_delay: HEARTBEAT_DELAY,
channel_max: CHANNEL_MAX,
next_timeout: Box::pin(time::sleep(DEFAULT_TIMEOUT)),
handle: connection_handle,
global_con: connection_handle,
channels: HashMap::with_capacity(4),
global_data,
method_queue_send,
method_queue_recv: method_queue_recv,
method_queue_recv,
}
}
@ -140,7 +144,7 @@ impl Connection {
Err(err) => error!(%err, "Error during processing of connection"),
}
let connection_handle = self.handle.lock();
let connection_handle = self.global_con.lock();
connection_handle.close();
}
@ -364,7 +368,7 @@ impl Connection {
.channels
.get(&frame.channel)
.ok_or(ConException::Todo)?
.handle
.global_chan
.clone();
// call into amqp_messaging to handle the method
@ -470,7 +474,7 @@ impl Connection {
// Spawn the handler for the publish. The connection task goes back to handling
// just the connection.
tokio::spawn(amqp_messaging::methods::handle_basic_publish(
channel.handle.clone(),
channel.global_chan.clone(),
message,
));
Ok(())
@ -481,16 +485,16 @@ impl Connection {
async fn channel_open(&mut self, channel_num: ChannelNum) -> Result<()> {
let id = rand::random();
let channel_handle = amqp_core::connection::Channel::new_handle(
let channel_handle = amqp_core::connection::c::new_handle(
id,
channel_num,
self.handle.clone(),
self.global_con.clone(),
self.global_data.clone(),
self.method_queue_send.clone(),
);
let channel = Channel {
handle: channel_handle.clone(),
let channel = TransportChannel {
global_chan: channel_handle.clone(),
status: ChannelStatus::Default,
};
@ -597,15 +601,15 @@ impl Connection {
}
}
impl Drop for Connection {
impl Drop for TransportConnection {
fn drop(&mut self) {
self.handle.lock().close();
self.global_con.lock().close();
}
}
impl Drop for Channel {
impl Drop for TransportChannel {
fn drop(&mut self) {
self.handle.lock().close();
self.global_chan.lock().close();
}
}