This commit is contained in:
nora 2022-03-19 21:38:27 +01:00
parent dbc577abbc
commit 504757b324
7 changed files with 52 additions and 29 deletions

30
Cargo.lock generated
View file

@ -17,21 +17,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "amqp"
version = "0.1.0"
dependencies = [
"anyhow",
"clap 3.1.6",
"haesli_core",
"haesli_dashboard",
"haesli_transport",
"tokio",
"tracing",
"tracing-subscriber",
"tracing-tree",
]
[[package]]
name = "ansi_term"
version = "0.12.1"
@ -434,6 +419,21 @@ dependencies = [
"wasi 0.10.2+wasi-snapshot-preview1",
]
[[package]]
name = "haesli"
version = "0.1.0"
dependencies = [
"anyhow",
"clap 3.1.6",
"haesli_core",
"haesli_dashboard",
"haesli_transport",
"tokio",
"tracing",
"tracing-subscriber",
"tracing-tree",
]
[[package]]
name = "haesli_core"
version = "0.1.0"

View file

@ -10,7 +10,7 @@ members = [
]
[package]
name = "amqp"
name = "haesli"
version = "0.1.0"
edition = "2021"

View file

@ -9,6 +9,7 @@ use haesli_core::{
};
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tracing::debug;
use crate::{queue_worker::QueueTask, Result};
@ -25,7 +26,7 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result<Method>
} = queue_declare;
// 2.1.4.1 - If no queue name is given, chose a name
let queue_name = if queue_name.is_empty() {
let queue_name = if !queue_name.is_empty() {
queue_name
} else {
format!("q_{}", haesli_core::random_uuid())
@ -63,6 +64,8 @@ pub fn declare(channel: Channel, queue_declare: QueueDeclare) -> Result<Method>
event_send,
});
debug!(%queue_name, "Creating queue");
{
let mut global_data_lock = global_data.lock();

View file

@ -224,7 +224,9 @@ impl TransportConnection {
}
async fn recv_method(&mut self) -> Result<Method> {
let start_ok_frame = frame::read_frame(&mut self.stream, self.max_frame_size).await?;
let start_ok_frame = frame::read_frame(&mut self.stream, self.max_frame_size)
.await
.context("read from stream, peer disconnected")?;
ensure_conn(start_ok_frame.kind == FrameType::Method)?;
@ -320,7 +322,7 @@ impl TransportConnection {
loop {
select! {
frame = frame::read_frame(&mut self.stream, self.max_frame_size) => {
let frame = frame?;
let frame = frame.context("read from stream, peer disconnected")?;
self.handle_frame(frame).await?;
}
queued_method = self.event_receiver.recv() => {

View file

@ -283,15 +283,15 @@ pub async fn read_frame<R>(r: &mut R, max_frame_size: MaxFrameSize) -> Result<Fr
where
R: AsyncReadExt + Unpin + Send,
{
let kind = r.read_u8().await.context("read type")?;
let channel = r.read_u16().await.context("read channel")?;
let kind = r.read_u8().await?;
let channel = r.read_u16().await?;
let channel = ChannelNum::new(channel);
let size = r.read_u32().await.context("read size")?;
let size = r.read_u32().await?;
let mut payload = vec![0; size.try_into().unwrap()];
r.read_exact(&mut payload).await.context("read payload")?;
r.read_exact(&mut payload).await?;
let frame_end = r.read_u8().await.context("read frame end")?;
let frame_end = r.read_u8().await?;
if frame_end != REQUIRED_FRAME_END {
return Err(ProtocolError::Fatal.into());

View file

@ -0,0 +1,16 @@
import { assert, connectAmqp } from './utils/utils.js';
const connection = await connectAmqp();
const channel = await connection.createChannel();
const reply = await channel.assertQueue('');
assert(reply.messageCount === 0, 'Message found in queue');
assert(reply.consumerCount === 0, 'Consumer listening on queue');
assert(reply.queue !== '', 'Wrong queue name returned');
console.log(`created queue '${reply.queue}'`);
await channel.close();
await connection.close();

View file

@ -1,4 +1,4 @@
use std::{path::Path, process::Command, thread::sleep, time::Duration};
use std::{path::Path, process::Command, thread, time::Duration};
use anyhow::{ensure, Context, Result};
@ -15,17 +15,19 @@ pub fn main() -> Result<()> {
.context("cargo build")?;
ensure!(status.success(), "cargo build failed");
let mut haesli_server = Command::new("target/debug/amqp")
let server_binary = project_root.join("target/debug/haesli");
let mut server_process = Command::new(&server_binary)
.env("RUST_LOG", "trace")
.spawn()
.context("target/debug/amqp run")?;
.context(server_binary.display().to_string())
.context("run server binary")?;
// give it time for startup
sleep(Duration::from_secs(1));
thread::sleep(Duration::from_secs(1));
let test_result = run_js(&test_js_root);
haesli_server.kill().context("killing amqp server")?;
server_process.kill().context("killing amqp server")?;
test_result
}