mirror of
https://github.com/Noratrieb/haesli.git
synced 2026-01-14 19:55:03 +01:00
return exchanges from dashboard
This commit is contained in:
parent
40f25d1db7
commit
d77c31aaad
2 changed files with 71 additions and 2 deletions
|
|
@ -1,4 +1,9 @@
|
||||||
use std::{borrow::Borrow, collections::HashMap, sync::Arc};
|
use std::{
|
||||||
|
borrow::Borrow,
|
||||||
|
collections::HashMap,
|
||||||
|
fmt::{Display, Formatter},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::{newtype, Queue};
|
use crate::{newtype, Queue};
|
||||||
|
|
||||||
|
|
@ -9,6 +14,16 @@ pub enum TopicSegment {
|
||||||
MultiWildcard,
|
MultiWildcard,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Display for TopicSegment {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::Word(str) => str.fmt(f),
|
||||||
|
Self::SingleWildcard => f.write_str("*"),
|
||||||
|
Self::MultiWildcard => f.write_str("#"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum ExchangeType {
|
pub enum ExchangeType {
|
||||||
/// Routes a message to a queue if the routing-keys are equal
|
/// Routes a message to a queue if the routing-keys are equal
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ use axum::{
|
||||||
routing::{get, get_service},
|
routing::{get, get_service},
|
||||||
Json, Router,
|
Json, Router,
|
||||||
};
|
};
|
||||||
use haesli_core::GlobalData;
|
use haesli_core::{exchange::ExchangeType, GlobalData};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tower_http::cors::{Any, CorsLayer};
|
use tower_http::cors::{Any, CorsLayer};
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
|
|
@ -56,6 +56,7 @@ pub async fn dashboard(global_data: GlobalData) -> anyhow::Result<()> {
|
||||||
struct Data {
|
struct Data {
|
||||||
connections: Vec<Connection>,
|
connections: Vec<Connection>,
|
||||||
queues: Vec<Queue>,
|
queues: Vec<Queue>,
|
||||||
|
exchanges: Vec<Exchange>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
|
|
@ -80,6 +81,20 @@ struct Queue {
|
||||||
messages: usize,
|
messages: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
struct Exchange {
|
||||||
|
name: String,
|
||||||
|
durable: bool,
|
||||||
|
bindings: Vec<Binding>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct Binding {
|
||||||
|
queue: String,
|
||||||
|
routing_key: String,
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_data(global_data: GlobalData) -> impl IntoResponse {
|
async fn get_data(global_data: GlobalData) -> impl IntoResponse {
|
||||||
let global_data = global_data.lock();
|
let global_data = global_data.lock();
|
||||||
|
|
||||||
|
|
@ -112,10 +127,49 @@ async fn get_data(global_data: GlobalData) -> impl IntoResponse {
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
let exchanges = global_data.exchanges.values().map(map_exchange).collect();
|
||||||
|
|
||||||
let data = Data {
|
let data = Data {
|
||||||
connections,
|
connections,
|
||||||
queues,
|
queues,
|
||||||
|
exchanges,
|
||||||
};
|
};
|
||||||
|
|
||||||
Json(data)
|
Json(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn map_exchange(exch: &haesli_core::exchange::Exchange) -> Exchange {
|
||||||
|
Exchange {
|
||||||
|
name: exch.name.to_string(),
|
||||||
|
durable: exch.durable,
|
||||||
|
bindings: match &exch.kind {
|
||||||
|
ExchangeType::Direct { bindings } => bindings
|
||||||
|
.iter()
|
||||||
|
.map(|(name, _)| Binding {
|
||||||
|
queue: name.clone(),
|
||||||
|
routing_key: name.clone(),
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
ExchangeType::Fanout { bindings } => bindings
|
||||||
|
.iter()
|
||||||
|
.map(|q| Binding {
|
||||||
|
queue: q.name.to_string(),
|
||||||
|
routing_key: "".to_owned(),
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
ExchangeType::Topic { bindings } => bindings
|
||||||
|
.iter()
|
||||||
|
.map(|(segs, q)| Binding {
|
||||||
|
queue: q.name.to_string(),
|
||||||
|
routing_key: segs
|
||||||
|
.iter()
|
||||||
|
.map(|seg| seg.to_string())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join("."),
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
ExchangeType::Headers => Vec::new(),
|
||||||
|
ExchangeType::System => Vec::new(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue