From 8b2ec568c3269a903278dfab60abd3454192c363 Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Sat, 3 Feb 2024 21:30:04 +0100 Subject: [PATCH] update website to use ranges for the bars --- src/client.rs | 2 +- src/db.rs | 164 ++++++++++++++++++++++++++++++++++++++++---------- src/lib.rs | 8 +-- src/main.rs | 4 ++ src/web.rs | 88 ++++++++++++++++----------- 5 files changed, 194 insertions(+), 72 deletions(-) diff --git a/src/client.rs b/src/client.rs index b60ece1..22316bb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -18,7 +18,7 @@ pub struct CheckResult { pub state: CheckState, } -#[derive(Debug, PartialEq, Clone, sqlx::Type)] +#[derive(Debug, PartialEq, Clone, Copy, sqlx::Type)] #[sqlx(rename_all = "snake_case")] pub enum CheckState { Ok, diff --git a/src/db.rs b/src/db.rs index 60f76f6..6793c08 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,4 +1,4 @@ -use std::{str::FromStr, time::Duration}; +use std::{collections::HashMap, str::FromStr, time::Duration}; use chrono::Utc; use eyre::{Context, Result}; @@ -6,7 +6,7 @@ use sqlx::{migrate::Migrator, sqlite::SqliteConnectOptions, Pool, Sqlite}; pub static MIGRATOR: Migrator = sqlx::migrate!(); -use crate::client::{CheckState, Results}; +use crate::client::{CheckResult, CheckState, Results}; #[derive(sqlx::FromRow)] pub struct Check { @@ -61,38 +61,20 @@ pub async fn insert_results(db: &Pool, results: &Results) -> Result<()> } } -pub async fn insert_results_series(db: &Pool, interval_seconds: u64, results: &Results) -> Result<()> { +pub async fn insert_results_series( + db: &Pool, + interval_seconds: u64, + results: &Results, +) -> Result<()> { let mut errors = Vec::new(); for (website, check) in results.states.iter() { - let latest = get_latest_series_for_website(db, website) - .await - .wrap_err("getting the latest series record")?; - - let threshold = chrono::Duration::from_std(Duration::from_secs(interval_seconds * 5)) - .wrap_err("cannot create threshold, interval_seconds too high or low")?; - - let result = match latest { - Some(latest) if latest.result == check.state && (latest.request_time_range_end < (check.time.checked_add_signed(threshold).unwrap())) => { - sqlx::query("UPDATE checks_series SET request_time_range_end = ? WHERE rowid = ?") - .bind(check.time) - .bind(latest.id) - .execute(db) - .await - .wrap_err_with(|| format!("updating series record for {website}")) - } - _ => { - sqlx::query("INSERT INTO checks_series (request_time_range_start, request_time_range_end, website, result) VALUES (?, ?, ?, ?);") - .bind(check.time) - .bind(check.time) - .bind(website) - .bind(&check.state) - .execute(db) - .await - .wrap_err_with(|| format!("inserting new series record for {website}")) - } - }; + let mut trans = db.begin().await.wrap_err("starting transaction")?; + let result = + insert_single_result_series(&mut trans, interval_seconds, website, check).await; if let Err(err) = result { errors.push(err); + } else { + trans.commit().await.wrap_err("comitting transaction")?; } } @@ -106,6 +88,76 @@ pub async fn insert_results_series(db: &Pool, interval_seconds: u64, res } } +pub async fn insert_single_result_series( + db: &mut sqlx::Transaction<'_, sqlx::Sqlite>, + interval_seconds: u64, + website: &str, + check: &CheckResult, +) -> Result<()> { + let latest = get_latest_series_for_website(db, website) + .await + .wrap_err("getting the latest series record")?; + + let threshold = chrono::Duration::from_std(Duration::from_secs(interval_seconds * 5)) + .wrap_err("cannot create threshold, interval_seconds too high or low")?; + + match latest { + Some(latest) if latest.result == check.state && (latest.request_time_range_end < (check.time.checked_add_signed(threshold).unwrap())) => { + sqlx::query("UPDATE checks_series SET request_time_range_end = ? WHERE rowid = ?") + .bind(check.time) + .bind(latest.id) + .execute(&mut **db) + .await + .wrap_err_with(|| format!("updating series record for {website}")) + .map(drop) + } + _ => { + sqlx::query("INSERT INTO checks_series (request_time_range_start, request_time_range_end, website, result) VALUES (?, ?, ?, ?);") + .bind(check.time) + .bind(check.time) + .bind(website) + .bind(&check.state) + .execute(&mut **db) + .await + .wrap_err_with(|| format!("inserting new series record for {website}")) + .map(drop) + } + } +} + +pub fn insert_single_result_series_in_memory( + table: &mut Vec, + latest_cache: &mut HashMap, + interval_seconds: u64, + website: &str, + check: &CheckResult, +) { + let latest = latest_cache.get(website).map(|idx| &mut table[*idx]); + + let threshold = chrono::Duration::from_std(Duration::from_secs(interval_seconds * 5)).unwrap(); + + match latest { + Some(latest) + if latest.result == check.state + && (latest.request_time_range_end + < (check.time.checked_add_signed(threshold).unwrap())) => + { + latest.request_time_range_end = check.time; + } + _ => { + let idx = table.len(); + table.push(CheckSeries { + id: 0, + request_time_range_start: check.time, + request_time_range_end: check.time, + website: website.to_owned(), + result: check.state, + }); + *latest_cache.entry(website.to_owned()).or_default() = idx; + } + } +} + pub async fn get_checks(db: &Pool) -> Result> { sqlx::query_as::<_, Check>("SELECT id, request_time, website, result FROM checks") .fetch_all(db) @@ -120,8 +172,56 @@ pub async fn get_checks_series(db: &Pool) -> Result> { .wrap_err("getting all checks") } +pub async fn migrate_checks(db: &Pool, interval_seconds: u64) -> Result<()> { + info!("Migrating checks to check_series"); + let Ok(mut checks) = get_checks(db).await else { + return Ok(()); + }; + info!("Computing checks"); + checks.sort_unstable_by_key(|check| check.request_time); + let mut table = Vec::new(); + let mut latest_cache = HashMap::new(); + + for check in checks.iter() { + let check_result = CheckResult { + time: check.request_time, + state: check.result, + }; + insert_single_result_series_in_memory( + &mut table, + &mut latest_cache, + interval_seconds, + &check.website, + &check_result, + ); + } + + info!("Inserting checks"); + let mut db = db.begin().await.wrap_err("starting transaction")?; + for check in table.iter() { + sqlx::query("INSERT INTO checks_series (request_time_range_start, request_time_range_end, website, result) VALUES (?, ?, ?, ?);") + .bind(check.request_time_range_start) + .bind(check.request_time_range_end) + .bind(&check.website) + .bind(&check.result) + .execute(&mut *db) + .await + .wrap_err_with(|| format!("inserting new series record for {}", check.website))?; + } + info!("Dropping old table"); + + sqlx::query("DROP TABLE checks") + .execute(&mut *db) + .await + .wrap_err("dropping table checks")?; + + db.commit().await.wrap_err("committing transaction")?; + + Ok(()) +} + pub async fn get_latest_series_for_website( - db: &Pool, + db: &mut sqlx::Transaction<'_, sqlx::Sqlite>, website: &str, ) -> Result> { sqlx::query_as::<_, CheckSeries>( @@ -133,7 +233,7 @@ pub async fn get_latest_series_for_website( ", ) .bind(website) - .fetch_all(db) + .fetch_all(&mut **db) .await .wrap_err("getting all checks") .map(|elems| -> Option { elems.get(0).cloned() }) diff --git a/src/lib.rs b/src/lib.rs index 4402282..84699f0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,6 +47,10 @@ pub async fn init() -> Result<(Config, Arc>)> { .await .wrap_err("running migrations")?; + db::migrate_checks(&db, config.interval_seconds) + .await + .wrap_err("migrating old checks to series")?; + Ok((config, db)) } @@ -71,10 +75,6 @@ pub async fn check_timer(config: Config, db: Arc>) -> Result<ⵑ> { let results = client::do_checks(&client).await; - if let Err(err) = db::insert_results(&db, &results).await { - error!(?err); - } - if let Err(err) = db::insert_results_series(&db, config.interval_seconds, &results).await { error!(?err); } diff --git a/src/main.rs b/src/main.rs index 7db10bd..ad3f8db 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,6 +32,10 @@ async fn main() -> eyre::Result<()> { .await .wrap_err("running migrations")?; + uptime::db::migrate_checks(&db, config.interval_seconds) + .await + .wrap_err("migrating old checks to series")?; + info!("Started up."); let checker = uptime::check_timer(config, db.clone()); diff --git a/src/web.rs b/src/web.rs index aee311b..3613ade 100644 --- a/src/web.rs +++ b/src/web.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, ops::Range, sync::Arc}; use askama::Template; use axum::{ @@ -12,7 +12,7 @@ use eyre::{Context, Result}; use http::StatusCode; use sqlx::{Pool, Sqlite}; -use crate::{client::CheckState, db::Check}; +use crate::{client::CheckState, db::CheckSeries}; trait RenderDate { fn render_nicely(&self) -> String; @@ -47,30 +47,33 @@ async fn root(State(db): State>>) -> Response { } pub async fn render_root(db: Arc>) -> Result { - let checks = crate::db::get_checks(&db).await?; + let checks = crate::db::get_checks_series(&db).await?; let status = compute_status(checks); - let html = RootTemplate { status, version: crate::VERSION } - .render() - .wrap_err("error rendering template")?; + let html = RootTemplate { + status, + version: crate::VERSION, + } + .render() + .wrap_err("error rendering template")?; Ok(html) } -fn compute_status(checks: Vec) -> Vec { +fn compute_status(checks: Vec) -> Vec { let mut websites = BTreeMap::new(); checks.into_iter().for_each(|check| { - websites - .entry(check.website) - .or_insert(Vec::new()) - .push((check.request_time, check.result)); + websites.entry(check.website).or_insert(Vec::new()).push(( + check.request_time_range_start..check.request_time_range_end, + check.result, + )); }); websites .into_iter() .map(|(website, mut checks)| { - checks.sort_by_key(|check| check.0); + checks.sort_by_key(|check| check.0.start); let mut last_ok = None; let mut count_ok = 0; @@ -81,7 +84,7 @@ fn compute_status(checks: Vec) -> Vec { let len = checks.len(); checks.into_iter().for_each(|(time, result)| { if let CheckState::Ok = result { - last_ok = std::cmp::max(last_ok, Some(time)); + last_ok = std::cmp::max(last_ok, Some(time.end)); count_ok += 1; } }); @@ -132,40 +135,51 @@ struct BarInfo { /// frontend, in a fixed sensical timeline. /// We slice the time from the first check to the last check (maybe something like last check-30d /// in the future) into slices and aggregate all checks from these times into these slices. -fn checks_to_classes(checks: &[(DateTime, CheckState)], classes: usize) -> BarInfo { +fn checks_to_classes( + checks_series: &[(Range>, CheckState)], + classes: usize, +) -> BarInfo { assert_ne!(classes, 0); - let Some(first) = checks.first() else { + let Some(first) = checks_series.first() else { return BarInfo { elems: Vec::new(), first_time: None, last_time: None, }; }; - let last = checks.last().unwrap(); + let last = checks_series.last().unwrap(); let mut bins = vec![vec![]; classes]; - let first_m = first.0.timestamp_millis(); - let last_m = last.0.timestamp_millis(); + let first_event = first.0.start.timestamp_millis() as f64; // welcome to float land, where we float + let last_event = last.0.end.timestamp_millis() as f64; - let last_rel = last_m - first_m; - assert!(last_m.is_positive(), "checks not ordered correctly"); + let event_time_range = last_event - first_event; + assert!( + event_time_range.is_sign_positive(), + "checks not ordered correctly" + ); - for check in checks { - let time_rel = check.0.timestamp_millis() - first_m; - assert!(first_m.is_positive(), "checks not ordered correctly"); + let bin_diff = event_time_range / (classes as f64); - /* - 5 bins: - | | | | | | - 0.0 0.2 0.4 0.6 0.8 1.0 division - 0.0 1.0 2.0 3.0 4.0 5.0 after multiply - */ + let bin_ranges = (0..classes).map(|i| { + // we DO NOT want to miss the last event due to imprecision, so widen the range for the last event + let end_factor_range = if i == (classes - 1) { 2.0 } else { 1.0 }; + let i = i as f64; + (i * bin_diff)..((i + end_factor_range) * bin_diff) + }); - let bin = (time_rel as f64) / (last_rel as f64) * ((classes) as f64); - let bin = bin as usize; // flooring on purpose - let bin = if bin == classes { bin - 1 } else { bin }; - bins[bin].push(check); + for series in checks_series { + for (i, bin_range) in bin_ranges.clone().enumerate() { + let start = (series.0.start.timestamp_millis() as f64) - first_event; + let end = (series.0.end.timestamp_millis() as f64) - first_event; + assert!(start.is_sign_positive(), "checks not ordered correctly"); + assert!(end.is_sign_positive(), "checks not ordered correctly"); + + if !range_disjoint(bin_range, start..end) { + bins[i].push(series); + } + } } let elems = bins @@ -193,11 +207,15 @@ fn checks_to_classes(checks: &[(DateTime, CheckState)], classes: usize) -> BarInfo { elems, - first_time: Some(first.0), - last_time: Some(last.0), + first_time: Some(first.0.start), + last_time: Some(last.0.end), } } +fn range_disjoint(a: Range, b: Range) -> bool { + (a.end < b.start) || (a.start > b.end) +} + #[derive(Debug)] struct WebsiteStatus { website: String,