update website to use ranges for the bars

This commit is contained in:
nora 2024-02-03 21:30:04 +01:00
parent 5a521d00d0
commit 8b2ec568c3
5 changed files with 194 additions and 72 deletions

164
src/db.rs
View file

@ -1,4 +1,4 @@
use std::{str::FromStr, time::Duration};
use std::{collections::HashMap, str::FromStr, time::Duration};
use chrono::Utc;
use eyre::{Context, Result};
@ -6,7 +6,7 @@ use sqlx::{migrate::Migrator, sqlite::SqliteConnectOptions, Pool, Sqlite};
pub static MIGRATOR: Migrator = sqlx::migrate!();
use crate::client::{CheckState, Results};
use crate::client::{CheckResult, CheckState, Results};
#[derive(sqlx::FromRow)]
pub struct Check {
@ -61,38 +61,20 @@ pub async fn insert_results(db: &Pool<Sqlite>, results: &Results) -> Result<()>
}
}
pub async fn insert_results_series(db: &Pool<Sqlite>, interval_seconds: u64, results: &Results) -> Result<()> {
pub async fn insert_results_series(
db: &Pool<Sqlite>,
interval_seconds: u64,
results: &Results,
) -> Result<()> {
let mut errors = Vec::new();
for (website, check) in results.states.iter() {
let latest = get_latest_series_for_website(db, website)
.await
.wrap_err("getting the latest series record")?;
let threshold = chrono::Duration::from_std(Duration::from_secs(interval_seconds * 5))
.wrap_err("cannot create threshold, interval_seconds too high or low")?;
let result = match latest {
Some(latest) if latest.result == check.state && (latest.request_time_range_end < (check.time.checked_add_signed(threshold).unwrap())) => {
sqlx::query("UPDATE checks_series SET request_time_range_end = ? WHERE rowid = ?")
.bind(check.time)
.bind(latest.id)
.execute(db)
.await
.wrap_err_with(|| format!("updating series record for {website}"))
}
_ => {
sqlx::query("INSERT INTO checks_series (request_time_range_start, request_time_range_end, website, result) VALUES (?, ?, ?, ?);")
.bind(check.time)
.bind(check.time)
.bind(website)
.bind(&check.state)
.execute(db)
.await
.wrap_err_with(|| format!("inserting new series record for {website}"))
}
};
let mut trans = db.begin().await.wrap_err("starting transaction")?;
let result =
insert_single_result_series(&mut trans, interval_seconds, website, check).await;
if let Err(err) = result {
errors.push(err);
} else {
trans.commit().await.wrap_err("comitting transaction")?;
}
}
@ -106,6 +88,76 @@ pub async fn insert_results_series(db: &Pool<Sqlite>, interval_seconds: u64, res
}
}
pub async fn insert_single_result_series(
db: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
interval_seconds: u64,
website: &str,
check: &CheckResult,
) -> Result<()> {
let latest = get_latest_series_for_website(db, website)
.await
.wrap_err("getting the latest series record")?;
let threshold = chrono::Duration::from_std(Duration::from_secs(interval_seconds * 5))
.wrap_err("cannot create threshold, interval_seconds too high or low")?;
match latest {
Some(latest) if latest.result == check.state && (latest.request_time_range_end < (check.time.checked_add_signed(threshold).unwrap())) => {
sqlx::query("UPDATE checks_series SET request_time_range_end = ? WHERE rowid = ?")
.bind(check.time)
.bind(latest.id)
.execute(&mut **db)
.await
.wrap_err_with(|| format!("updating series record for {website}"))
.map(drop)
}
_ => {
sqlx::query("INSERT INTO checks_series (request_time_range_start, request_time_range_end, website, result) VALUES (?, ?, ?, ?);")
.bind(check.time)
.bind(check.time)
.bind(website)
.bind(&check.state)
.execute(&mut **db)
.await
.wrap_err_with(|| format!("inserting new series record for {website}"))
.map(drop)
}
}
}
pub fn insert_single_result_series_in_memory(
table: &mut Vec<CheckSeries>,
latest_cache: &mut HashMap<String, usize>,
interval_seconds: u64,
website: &str,
check: &CheckResult,
) {
let latest = latest_cache.get(website).map(|idx| &mut table[*idx]);
let threshold = chrono::Duration::from_std(Duration::from_secs(interval_seconds * 5)).unwrap();
match latest {
Some(latest)
if latest.result == check.state
&& (latest.request_time_range_end
< (check.time.checked_add_signed(threshold).unwrap())) =>
{
latest.request_time_range_end = check.time;
}
_ => {
let idx = table.len();
table.push(CheckSeries {
id: 0,
request_time_range_start: check.time,
request_time_range_end: check.time,
website: website.to_owned(),
result: check.state,
});
*latest_cache.entry(website.to_owned()).or_default() = idx;
}
}
}
pub async fn get_checks(db: &Pool<Sqlite>) -> Result<Vec<Check>> {
sqlx::query_as::<_, Check>("SELECT id, request_time, website, result FROM checks")
.fetch_all(db)
@ -120,8 +172,56 @@ pub async fn get_checks_series(db: &Pool<Sqlite>) -> Result<Vec<CheckSeries>> {
.wrap_err("getting all checks")
}
pub async fn migrate_checks(db: &Pool<Sqlite>, interval_seconds: u64) -> Result<()> {
info!("Migrating checks to check_series");
let Ok(mut checks) = get_checks(db).await else {
return Ok(());
};
info!("Computing checks");
checks.sort_unstable_by_key(|check| check.request_time);
let mut table = Vec::new();
let mut latest_cache = HashMap::new();
for check in checks.iter() {
let check_result = CheckResult {
time: check.request_time,
state: check.result,
};
insert_single_result_series_in_memory(
&mut table,
&mut latest_cache,
interval_seconds,
&check.website,
&check_result,
);
}
info!("Inserting checks");
let mut db = db.begin().await.wrap_err("starting transaction")?;
for check in table.iter() {
sqlx::query("INSERT INTO checks_series (request_time_range_start, request_time_range_end, website, result) VALUES (?, ?, ?, ?);")
.bind(check.request_time_range_start)
.bind(check.request_time_range_end)
.bind(&check.website)
.bind(&check.result)
.execute(&mut *db)
.await
.wrap_err_with(|| format!("inserting new series record for {}", check.website))?;
}
info!("Dropping old table");
sqlx::query("DROP TABLE checks")
.execute(&mut *db)
.await
.wrap_err("dropping table checks")?;
db.commit().await.wrap_err("committing transaction")?;
Ok(())
}
pub async fn get_latest_series_for_website(
db: &Pool<Sqlite>,
db: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
website: &str,
) -> Result<Option<CheckSeries>> {
sqlx::query_as::<_, CheckSeries>(
@ -133,7 +233,7 @@ pub async fn get_latest_series_for_website(
",
)
.bind(website)
.fetch_all(db)
.fetch_all(&mut **db)
.await
.wrap_err("getting all checks")
.map(|elems| -> Option<CheckSeries> { elems.get(0).cloned() })