add channel open support

This commit is contained in:
nora 2022-02-19 23:25:06 +01:00
parent dc8efd4e4e
commit 46cccab748
10 changed files with 1950 additions and 1462 deletions

View file

@ -4,6 +4,8 @@ use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use uuid::Uuid; use uuid::Uuid;
type Handle<T> = Arc<Mutex<T>>;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct GlobalData { pub struct GlobalData {
inner: Arc<Mutex<GlobalDataInner>>, inner: Arc<Mutex<GlobalDataInner>>,
@ -14,6 +16,7 @@ impl Default for GlobalData {
Self { Self {
inner: Arc::new(Mutex::new(GlobalDataInner { inner: Arc::new(Mutex::new(GlobalDataInner {
connections: HashMap::new(), connections: HashMap::new(),
channels: HashMap::new(),
})), })),
} }
} }
@ -28,15 +31,17 @@ impl GlobalData {
#[derive(Debug)] #[derive(Debug)]
pub struct GlobalDataInner { pub struct GlobalDataInner {
pub connections: HashMap<Uuid, ConnectionHandle>, pub connections: HashMap<Uuid, ConnectionHandle>,
pub channels: HashMap<Uuid, ChannelHandle>,
} }
pub type ConnectionHandle = Arc<Mutex<Connection>>; pub type ConnectionHandle = Handle<Connection>;
#[derive(Debug)] #[derive(Debug)]
pub struct Connection { pub struct Connection {
pub id: Uuid, pub id: Uuid,
pub peer_addr: SocketAddr, pub peer_addr: SocketAddr,
pub global_data: GlobalData, pub global_data: GlobalData,
pub channels: HashMap<u16, ChannelHandle>,
} }
impl Connection { impl Connection {
@ -49,6 +54,7 @@ impl Connection {
id, id,
peer_addr, peer_addr,
global_data, global_data,
channels: HashMap::new(),
})) }))
} }
@ -57,3 +63,34 @@ impl Connection {
global_data.connections.remove(&self.id); global_data.connections.remove(&self.id);
} }
} }
pub type ChannelHandle = Handle<Channel>;
#[derive(Debug)]
pub struct Channel {
pub id: Uuid,
pub num: u16,
pub connection: ConnectionHandle,
pub global_data: GlobalData,
}
impl Channel {
pub fn new_handle(
id: Uuid,
num: u16,
connection: ConnectionHandle,
global_data: GlobalData,
) -> ChannelHandle {
Arc::new(Mutex::new(Self {
id,
num,
connection,
global_data,
}))
}
pub fn close(&self) {
let mut global_data = self.global_data.lock();
global_data.channels.remove(&self.id);
}
}

View file

@ -0,0 +1,3 @@
{
"singleQuote": true
}

View file

@ -1,19 +1,20 @@
<!doctype html> <!DOCTYPE html>
<html lang="en"> <html lang="en">
<head> <head>
<meta charset="UTF-8"> <meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0"> <meta
<meta http-equiv="X-UA-Compatible" content="ie=edge"> name="viewport"
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0"
/>
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>AMQP Data</title> <title>AMQP Data</title>
<link rel="stylesheet" href="style.css"> <link rel="stylesheet" href="style.css" />
</head> </head>
<body> <body>
<h1>AMQP Data</h1>
<h2>Connections</h2>
<div id="connection-wrapper"></div>
<h1>AMQP Data</h1> <script src="script.js"></script>
<h2>Connections</h2> </body>
<div id="connection-wrapper"> </html>
</div>
<script src="script.js"></script>
</body>
</html>

View file

@ -1,41 +1,48 @@
const renderTable = (colNames, rows) => { const renderTable = (colNames, rows) => {
const table = document.createElement("table"); const table = document.createElement('table');
const headerRow = document.createElement("tr"); const headerRow = document.createElement('tr');
colNames.forEach((name) => { colNames.forEach((name) => {
const th = document.createElement("th"); const th = document.createElement('th');
th.innerText = name; th.innerText = name;
headerRow.append(th); headerRow.append(th);
});
table.append(headerRow);
rows.forEach((row) => {
const contentRow = document.createElement('tr');
row.forEach((cell) => {
const td = document.createElement('td');
td.innerText = cell;
contentRow.append(td);
}); });
table.append(headerRow); table.append(contentRow);
});
rows.forEach((row) => { return table;
const contentRow = document.createElement("tr"); };
row.forEach((cell) => {
const td = document.createElement("td");
td.innerText = cell;
contentRow.append(td);
});
table.append(contentRow);
})
return table;
}
const renderConnections = (connections) => { const renderConnections = (connections) => {
const wrapper = document.getElementById("connection-wrapper"); const wrapper = document.getElementById('connection-wrapper');
const table = renderTable(['Connection ID', 'Client Address'], connections.map((conn) => const table = renderTable(
[conn.id, conn.peer_addr])); ['Connection ID', 'Client Address', 'Channels'],
wrapper.replaceChildren(table) connections.map((conn) => {
} const channels = conn.channels
.map((chan) => `${chan.number} - ${chan.id}`)
.join('\n');
return [conn.id, conn.peer_addr, channels];
})
);
wrapper.replaceChildren(table);
};
const refresh = async () => { const refresh = async () => {
const fetched = await fetch('http://localhost:3000/api/data'); const fetched = await fetch('http://localhost:3000/api/data');
const data = await fetched.json(); const data = await fetched.json();
renderConnections(data.connections); renderConnections(data.connections);
} };
setInterval(refresh, 1000); setInterval(refresh, 1000);

View file

@ -1,10 +1,12 @@
html { html {
font-family: arial, sans-serif; font-family: arial, sans-serif;
margin: 10px; margin: 10px;
} }
table, th, td { table,
border: 1px solid black; th,
border-collapse: collapse; td {
padding: 10px; border: 1px solid black;
} border-collapse: collapse;
padding: 10px;
}

View file

@ -55,6 +55,13 @@ struct Data {
struct Connection { struct Connection {
id: String, id: String,
peer_addr: String, peer_addr: String,
channels: Vec<Channel>,
}
#[derive(Serialize)]
struct Channel {
id: String,
number: u16,
} }
async fn get_data(global_data: GlobalData) -> impl IntoResponse { async fn get_data(global_data: GlobalData) -> impl IntoResponse {
@ -68,6 +75,17 @@ async fn get_data(global_data: GlobalData) -> impl IntoResponse {
Connection { Connection {
id: conn.id.to_string(), id: conn.id.to_string(),
peer_addr: conn.peer_addr.to_string(), peer_addr: conn.peer_addr.to_string(),
channels: conn
.channels
.values()
.map(|chan| {
let chan = chan.lock();
Channel {
id: chan.id.to_string(),
number: chan.num,
}
})
.collect(),
} }
}) })
.collect(); .collect();

File diff suppressed because it is too large Load diff

View file

@ -1,12 +1,18 @@
use crate::classes::Class;
use crate::error::{ConException, ProtocolError, Result}; use crate::error::{ConException, ProtocolError, Result};
use crate::frame::{Frame, FrameType}; use crate::frame::{Frame, FrameType};
use crate::{classes, frame, sasl}; use crate::{classes, frame, sasl};
use amqp_core::GlobalData;
use anyhow::Context; use anyhow::Context;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tracing::{debug, error, info}; use tokio::time;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
fn ensure_conn(condition: bool) -> Result<()> { fn ensure_conn(condition: bool) -> Result<()> {
if condition { if condition {
@ -21,22 +27,42 @@ const CHANNEL_MAX: u16 = 0;
const FRAME_SIZE_MAX: u32 = 0; const FRAME_SIZE_MAX: u32 = 0;
const HEARTBEAT_DELAY: u16 = 0; const HEARTBEAT_DELAY: u16 = 0;
pub struct Channel {
num: u16,
channel_handle: amqp_core::ChannelHandle,
}
pub struct Connection { pub struct Connection {
id: Uuid,
stream: TcpStream, stream: TcpStream,
max_frame_size: usize, max_frame_size: usize,
heartbeat_delay: u16, heartbeat_delay: u16,
channel_max: u16, channel_max: u16,
next_timeout: Pin<Box<time::Sleep>>,
channels: HashMap<u16, Channel>,
connection_handle: amqp_core::ConnectionHandle, connection_handle: amqp_core::ConnectionHandle,
global_data: GlobalData,
} }
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
impl Connection { impl Connection {
pub fn new(stream: TcpStream, connection_handle: amqp_core::ConnectionHandle) -> Self { pub fn new(
id: Uuid,
stream: TcpStream,
connection_handle: amqp_core::ConnectionHandle,
global_data: GlobalData,
) -> Self {
Self { Self {
id,
stream, stream,
max_frame_size: FRAME_SIZE_MIN_MAX, max_frame_size: FRAME_SIZE_MIN_MAX,
heartbeat_delay: HEARTBEAT_DELAY, heartbeat_delay: HEARTBEAT_DELAY,
channel_max: CHANNEL_MAX, channel_max: CHANNEL_MAX,
next_timeout: Box::pin(time::sleep(DEFAULT_TIMEOUT)),
connection_handle, connection_handle,
channels: HashMap::new(),
global_data,
} }
} }
@ -58,10 +84,7 @@ impl Connection {
info!("Connection is ready for usage!"); info!("Connection is ready for usage!");
loop { self.main_loop().await
let method = self.recv_method().await?;
debug!(?method, "Received method");
}
} }
async fn send_method(&mut self, channel: u16, method: classes::Class) -> Result<()> { async fn send_method(&mut self, channel: u16, method: classes::Class) -> Result<()> {
@ -146,6 +169,7 @@ impl Connection {
self.channel_max = channel_max; self.channel_max = channel_max;
self.max_frame_size = usize::try_from(frame_max).unwrap(); self.max_frame_size = usize::try_from(frame_max).unwrap();
self.heartbeat_delay = heartbeat; self.heartbeat_delay = heartbeat;
self.reset_timeout();
} }
Ok(()) Ok(())
@ -170,6 +194,103 @@ impl Connection {
Ok(()) Ok(())
} }
async fn main_loop(&mut self) -> Result<()> {
loop {
tokio::select! {
frame = frame::read_frame(&mut self.stream, self.max_frame_size) => {
debug!(?frame);
let frame = frame?;
self.reset_timeout();
match frame.kind {
FrameType::Method => self.dispatch_method(frame).await?,
FrameType::Heartbeat => {}
_ => warn!(frame_type = ?frame.kind, "TODO"),
}
}
_ = &mut self.next_timeout => {
if self.heartbeat_delay != 0 {
return Err(ProtocolError::CloseNow.into());
}
}
}
}
}
async fn dispatch_method(&mut self, frame: Frame) -> Result<()> {
let method = classes::parse_method(&frame.payload)?;
debug!(?method, "Received method");
match method {
classes::Class::Connection(classes::Connection::Close { .. }) => {
// todo: handle closing
}
classes::Class::Channel(classes::Channel::Open { .. }) => {
self.channel_open(frame.channel).await?
}
_ => {
// we don't handle this here, forward it to *somewhere*
}
}
Ok(())
}
async fn channel_open(&mut self, num: u16) -> Result<()> {
let id = Uuid::from_bytes(rand::random());
let channel_handle = amqp_core::Channel::new_handle(
id,
num,
self.connection_handle.clone(),
self.global_data.clone(),
);
let channel = Channel {
num,
channel_handle: channel_handle.clone(),
};
let prev = self.channels.insert(num, channel);
if let Some(prev) = prev {
self.channels.insert(num, prev); // restore previous state
return Err(ConException::ChannelError.into_trans());
}
{
let mut global_data = self.global_data.lock();
global_data.channels.insert(id, channel_handle.clone());
global_data
.connections
.get_mut(&self.id)
.unwrap()
.lock()
.channels
.insert(num, channel_handle);
}
info!(%num, "Opened new channel");
self.send_method(
num,
Class::Channel(classes::Channel::OpenOk {
reserved_1: Vec::new(),
}),
)
.await?;
time::sleep(Duration::from_secs(1000)).await; // for debugging the dashboard
Ok(())
}
fn reset_timeout(&mut self) {
if self.heartbeat_delay != 0 {
let next = Duration::from_secs(u64::from(self.heartbeat_delay));
self.next_timeout = Box::pin(time::sleep(next));
}
}
async fn negotiate_version(&mut self) -> Result<()> { async fn negotiate_version(&mut self) -> Result<()> {
const HEADER_SIZE: usize = 8; const HEADER_SIZE: usize = 8;
const SUPPORTED_PROTOCOL_VERSION: &[u8] = &[0, 9, 1]; const SUPPORTED_PROTOCOL_VERSION: &[u8] = &[0, 9, 1];

View file

@ -31,12 +31,12 @@ pub async fn do_thing_i_guess(global_data: GlobalData) -> Result<()> {
let connection_handle = let connection_handle =
amqp_core::Connection::new_handle(id, peer_addr, global_data.clone()); amqp_core::Connection::new_handle(id, peer_addr, global_data.clone());
let mut global_data = global_data.lock(); let mut global_data_guard = global_data.lock();
global_data global_data_guard
.connections .connections
.insert(id, connection_handle.clone()); .insert(id, connection_handle.clone());
let connection = Connection::new(stream, connection_handle); let connection = Connection::new(id, stream, connection_handle, global_data.clone());
tokio::spawn(connection.start_connection_processing().instrument(span)); tokio::spawn(connection.start_connection_processing().instrument(span));
} }

View file

@ -1,4 +1,4 @@
//! Partial implementation of the SASL Authentication (see [RFC 4422](https://datatracker.ietf.org/doc/html/rfc4422)) //! (Very) partial implementation of SASL Authentication (see [RFC 4422](https://datatracker.ietf.org/doc/html/rfc4422))
//! //!
//! Currently only supports PLAN (see [RFC 4616](https://datatracker.ietf.org/doc/html/rfc4616)) //! Currently only supports PLAN (see [RFC 4616](https://datatracker.ietf.org/doc/html/rfc4616))