fix some things

This commit is contained in:
nora 2022-02-21 21:09:04 +01:00
parent b67c722c19
commit 6f5fef2f23
9 changed files with 55 additions and 55 deletions

27
Cargo.lock generated
View file

@ -50,6 +50,7 @@ name = "amqp_messaging"
version = "0.1.0"
dependencies = [
"amqp_core",
"tokio",
"tracing",
]
@ -565,9 +566,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.117"
version = "0.2.119"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e74d72e0f9b65b5b4ca49a346af3976df0f9c61d550727f349ecd559f251a26c"
checksum = "1bf2e165bb3457c8e098ea76f3e3bc9db55f87aa90d52d0e6be741470916aaa4"
[[package]]
name = "lock_api"
@ -670,9 +671,9 @@ dependencies = [
[[package]]
name = "ntapi"
version = "0.3.6"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44"
checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f"
dependencies = [
"winapi",
]
@ -856,14 +857,13 @@ dependencies = [
[[package]]
name = "rand"
version = "0.8.4"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
"rand_hc",
]
[[package]]
@ -885,15 +885,6 @@ dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7"
dependencies = [
"rand_core",
]
[[package]]
name = "rayon"
version = "1.5.1"
@ -1340,9 +1331,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.8"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74786ce43333fcf51efe947aed9718fbe46d5c7328ec3f1029e818083966d9aa"
checksum = "9e0ab7bdc962035a87fba73f3acca9b8a8d0034c2e6f60b84aeaaddddc155dce"
dependencies = [
"ansi_term",
"lazy_static",

View file

@ -13,7 +13,6 @@ anyhow = "1.0.53"
amqp_core = { path = "./amqp_core" }
amqp_dashboard = { path = "./amqp_dashboard" }
amqp_transport = { path = "./amqp_transport" }
tokio = { version = "1.16.1", features = ["full", "tracing"] }
tokio = { version = "1.16.1", features = ["full"] }
tracing = "0.1.30"
tracing-subscriber = { version = "0.3.8", features = ["env-filter"] }
console-subscriber = "0.1.3"

View file

@ -16,7 +16,7 @@ rand = "0.8.4"
regex = "1.5.4"
smallvec = { version = "1.8.0", features = ["union"] }
thiserror = "1.0.30"
tokio = { version = "1.16.1", features = ["full", "tracing"] }
tokio = { version = "1.16.1", features = ["full"] }
tracing = "0.1.30"
uuid = "0.8.2"

View file

@ -252,7 +252,7 @@ impl Connection {
}
}
async fn dispatch_method(&mut self, frame: Frame) -> Result<()> {
async fn dispatch_method(&mut self, frame: Frame) -> Result<WaitForBodyStatus> {
let method = methods::parse_method(&frame.payload)?;
debug!(?method, "Received method");
@ -373,15 +373,14 @@ impl Connection {
let version = &read_header_buf[5..8];
self.stream
.write_all(OWN_PROTOCOL_HEADER)
.await
.context("write protocol header")?;
if &read_header_buf[0..5] == b"AMQP\0" && version == SUPPORTED_PROTOCOL_VERSION {
debug!(?version, "Version negotiation successful");
Ok(())
} else {
self.stream
.write_all(OWN_PROTOCOL_HEADER)
.await
.context("write protocol header")?;
debug!(?version, expected_version = ?SUPPORTED_PROTOCOL_VERSION, "Version negotiation failed, unsupported version");
Err(ProtocolError::CloseNow.into())
}

View file

@ -38,8 +38,6 @@ pub async fn do_thing_i_guess(global_data: GlobalData) -> Result<()> {
let connection = Connection::new(id, stream, connection_handle, global_data.clone());
tokio::task::Builder::new()
.name(&format!("connection {id}"))
.spawn(connection.start_connection_processing().instrument(span));
tokio::spawn(connection.start_connection_processing().instrument(span));
}
}

View file

@ -2,49 +2,47 @@
use anyhow::Result;
use std::env;
use tracing::{info_span, Instrument};
use tracing::{info, info_span, Instrument};
#[tokio::main]
async fn main() -> Result<()> {
let mut dashboard = false;
let mut console = false;
for arg in env::args().skip(1) {
match arg.as_str() {
"--dashboard" => dashboard = true,
"--console" => console = true,
"ignore-this-clippy" => eprintln!("yes please"),
_ => {}
}
}
setup_tracing(console);
setup_tracing();
let global_data = amqp_core::GlobalData::default();
if dashboard {
let dashboard_span = info_span!("dashboard");
tokio::task::Builder::new()
.name("dashboard")
.spawn(amqp_dashboard::dashboard(global_data.clone()).instrument(dashboard_span));
tokio::spawn(amqp_dashboard::dashboard(global_data.clone()).instrument(dashboard_span));
}
amqp_transport::do_thing_i_guess(global_data).await
}
fn setup_tracing(console: bool) {
if console {
console_subscriber::init();
fn setup_tracing() {
let rust_log = std::env::var("RUST_LOG");
const DEFAULT_LOG: &str = "hyper=info,debug";
tracing_subscriber::fmt()
.with_level(true)
.with_timer(tracing_subscriber::fmt::time::time())
.with_ansi(true)
.with_thread_names(true)
.with_env_filter(rust_log.clone().unwrap_or_else(|_| DEFAULT_LOG.to_string()))
.init();
if let Ok(rust_log) = rust_log {
info!(%rust_log, "Using custom log level");
} else {
tracing_subscriber::fmt()
.with_level(true)
.with_timer(tracing_subscriber::fmt::time::time())
.with_ansi(true)
.with_thread_names(true)
.with_env_filter(
std::env::var("RUST_LOG")
.unwrap_or_else(|_| "hyper=info,tokio=trace,runtime=trace,debug".to_string()),
)
.init();
info!(%DEFAULT_LOG, "Using default log level");
}
}

View file

@ -1,7 +1,6 @@
import { connect } from 'amqplib';
import { sleep } from './utils/utils.js';
import {connectAmqp, sleep} from './utils/utils.js';
const connection = await connect('amqp://localhost');
const connection = await connectAmqp();
const channel = await connection.createChannel();

View file

@ -1,6 +1,6 @@
import { connect } from 'amqplib';
import {connectAmqp} from "./utils/utils.js";
const connection = await connect('amqp://localhost');
const connection = await connectAmqp();
const channel = await connection.createChannel();

View file

@ -1 +1,17 @@
import {connect} from "amqplib";
export const sleep = (ms) => new Promise((res) => setTimeout(res, ms));
export const connectAmqp = async () => {
return connect({
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'admin',
password: '',
frameMax: 238556565673829,
}, {
});
}