From e09bb69dbfb5feeea46c247cb7f4360b7103ee3d Mon Sep 17 00:00:00 2001 From: Markus Brueckner Date: Sat, 24 Jan 2026 15:43:40 +0100 Subject: [PATCH] implement health status endpoint (closes #3) --- README.md | 3 ++ src/app_state.rs | 42 +++++++++++++--- src/disk_cache.rs | 22 ++++++-- src/handlers.rs | 124 +++++++++++++++++++++++++++++++++++----------- src/helpers.rs | 68 +++++++++++++++++++++++++ src/lru_cache.rs | 76 ++++++++++++++++------------ src/main.rs | 26 ++++++++-- 7 files changed, 282 insertions(+), 79 deletions(-) create mode 100644 src/helpers.rs diff --git a/README.md b/README.md index 85decbc..646773f 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,9 @@ following are supported: to the cache. Writing implies being able to read. - `ENEX_BIND_ADDRESSES` - comma-separated list of IP addresses to bind the service to. If empty the service will bind to all available interfaces. +- `ENEX_MAX_CACHE_SIZE` - the maximum size of the cache in bytes. If the cache + exceeds that size, the least-recently used entries will be pruned to make + room. enex-rcache will try to read a `.env` file at startup in order to set the environment. Existing enviroment entries will take precedence over the contents diff --git a/src/app_state.rs b/src/app_state.rs index 9d308d2..2137f23 100644 --- a/src/app_state.rs +++ b/src/app_state.rs @@ -1,25 +1,51 @@ -use std::sync::{Arc, Mutex}; +use std::{ + sync::{Arc, RwLock}, + time::{Instant, SystemTime}, +}; use tracing::error; use crate::lru_cache::WeightedLRUCache; +pub struct Stats { + pub started_at: Instant, + pub cache_size: u64, + pub oldest_entry_at: Option, + pub newest_entry_at: Option, + pub successful_writes: usize, + pub invalid_writes: usize, + pub failed_writes: usize, + pub collisions: usize, + pub invalid_reads: usize, + pub cache_misses: usize, + pub cache_hits: usize, +} + +pub struct CacheEntryMeta { + pub create_at: SystemTime, +} + +pub struct SharedState { + pub lru_cache: WeightedLRUCache, + pub stats: Stats, +} + #[derive(Clone)] pub struct AppState { - pub lru_cache: Arc>, pub config: crate::config::AppConfig, + pub shared: Arc>, } impl AppState { /// little helper to simplify access to the LRU cache - pub fn with_lru_cache(&self, f: impl FnOnce(&mut WeightedLRUCache)) { - let cache = self.lru_cache.lock(); - match cache { - Ok(mut cache) => { - f(&mut cache); + pub fn with_shared(&self, f: impl FnOnce(&mut SharedState)) { + let shared = self.shared.write(); + match shared { + Ok(mut shared) => { + f(&mut shared); } Err(e) => { - error!("Could not promote cache item: {}", e); + error!("Could not acquire shared state write lock: {}", e); } } } diff --git a/src/disk_cache.rs b/src/disk_cache.rs index 5d8cba9..20c71e2 100644 --- a/src/disk_cache.rs +++ b/src/disk_cache.rs @@ -2,7 +2,12 @@ use std::{path::PathBuf, sync::mpsc::Receiver}; use tracing::{debug, warn}; -use crate::{config::AppConfig, lru_cache::WeightedLRUCache}; +use crate::{ + app_state::{CacheEntryMeta, Stats}, + config::AppConfig, + helpers::update_boundary_items, + lru_cache::WeightedLRUCache, +}; pub fn get_cache_entry_path(config: &AppConfig, hash: &str) -> PathBuf { PathBuf::from(&config.cache_dir).join(hash) @@ -37,9 +42,10 @@ struct SortableCacheEntry { } /// load the existing files from the cache directory into an LRU cache -pub fn fill_lru_cache_from_disk( +pub fn init_from_disk( config: &AppConfig, - lru_cache: &mut WeightedLRUCache, + lru_cache: &mut WeightedLRUCache, + stats: &mut Stats, ) -> std::io::Result<()> { let entries = std::fs::read_dir(&config.cache_dir)?; let mut cache_entries = vec![]; @@ -48,6 +54,7 @@ pub fn fill_lru_cache_from_disk( Ok(entry) => match entry.metadata() { Ok(metadata) => { let weight = metadata.len(); + stats.cache_size += weight; let key = entry.file_name().to_string_lossy().to_string(); match metadata.created() { Ok(creation_time) => cache_entries.push(SortableCacheEntry { @@ -70,7 +77,14 @@ pub fn fill_lru_cache_from_disk( } cache_entries.sort_by(|a, b| b.creation_time.cmp(&a.creation_time)); for entry in cache_entries { - lru_cache.put(entry.key, entry.weight); + lru_cache.put( + entry.key, + entry.weight, + CacheEntryMeta { + create_at: entry.creation_time, + }, + ); } + update_boundary_items(stats, &lru_cache); Ok(()) } diff --git a/src/handlers.rs b/src/handlers.rs index 828a5ab..8df26f4 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,17 +1,21 @@ -use std::io::Write; +use std::{ + time::{Instant, SystemTime}, + u64, +}; use actix_web::{ HttpResponse, Responder, get, put, web::{Data, Path, Payload}, }; use actix_web_httpauth::extractors::bearer::BearerAuth; -use futures_util::StreamExt; -use tracing::{debug, error, trace}; +use serde::Serialize; +use tracing::{debug, trace}; use crate::{ access::{AccessLevel, has_access_level}, - app_state::AppState, + app_state::{AppState, CacheEntryMeta}, disk_cache::get_cache_entry_path, + helpers::{StreamingError, stream_to_file, update_boundary_items}, }; #[get("/v1/cache/{hash}")] @@ -26,6 +30,7 @@ pub async fn get_cache_item( "Tried to read cache item {} without valid read access token.", hash.as_str() ); + app_data.with_shared(|shared| shared.stats.invalid_reads += 1); return HttpResponse::Unauthorized() .content_type("text/plain") .body("Please provide a valid access token with at least read-level access."); @@ -33,10 +38,14 @@ pub async fn get_cache_item( let path = get_cache_entry_path(&app_data.config, &hash); if !path.exists() { trace!("Cache item not found: {}", hash.as_str()); + app_data.with_shared(|shared| shared.stats.cache_misses += 1); HttpResponse::NotFound().body("The record was not found.") } else { trace!("Returning cache item {}", hash.as_str()); - app_data.with_lru_cache(|cache| cache.promote(hash.to_string())); + app_data.with_shared(|shared| { + shared.lru_cache.promote(hash.to_string()); + shared.stats.cache_hits += 1; + }); HttpResponse::Ok().body(std::fs::read(path).unwrap()) } } @@ -54,39 +63,94 @@ pub async fn put_cache_item( "Tried to write cache item {} without valid read-write access token.", hash.as_str() ); + app_data.with_shared(|shared| shared.stats.invalid_writes += 1); return HttpResponse::Unauthorized() .content_type("text/plain") .body("Please provide a valid access token with read-write access."); } - let path = get_cache_entry_path(&app_data.config, &hash); - let file = std::fs::File::create_new(&path); - match file { - Ok(mut file) => { - let mut complete_size = 0u64; - while let Some(chunk) = body.next().await { - match chunk { - Ok(chunk) => { - complete_size += chunk.len() as u64; - file.write_all(&chunk).expect("This should actually work") - } - Err(e) => { - error!("Could not write cache item chunk: {}", e); - drop(file); - std::fs::remove_file(path).unwrap(); // Clean up to make sure the block doesn't get half-written with the wrong content - complete_size = 0; - break; - } - } - } - if complete_size > 0 { - app_data.with_lru_cache(|cache| cache.put(hash.to_string(), complete_size)); - } + + let stream_result = stream_to_file(&app_data.config, &mut body, &hash).await; + match stream_result { + Ok(size) => { + app_data.with_shared(|shared| { + shared.lru_cache.put( + hash.to_string(), + size, + CacheEntryMeta { + create_at: SystemTime::now(), + }, + ); + shared.stats.successful_writes += 1; + update_boundary_items(&mut shared.stats, &shared.lru_cache); + }); debug!("Created cache item {}", hash.as_str()); HttpResponse::Accepted().finish() } - Err(_) => { - trace!("Tried to overwrite existing cache item {}", hash.as_str()); + Err(StreamingError::Collision) => { + app_data.with_shared(|shared| shared.stats.collisions += 1); HttpResponse::Conflict().body("Cannot overwrite an existing record.") } + Err(StreamingError::ReadError) => { + app_data.with_shared(|shared| shared.stats.failed_writes += 1); + HttpResponse::BadRequest().body("Failed fully read payload") // this is technically not in the spec, but BadRequest sounds plausible for an error to read the request payload + } + } +} + +#[derive(Serialize)] +pub struct HealthStatus { + pub cache_entries: usize, + pub cache_size: u64, + pub cache_capacity: u64, + pub oldest_entry_ms: Option, + pub newest_entry_ms: Option, + pub uptime_ms: u64, + pub write_requests: usize, + pub invalid_writes: usize, + pub collisions: usize, + pub read_requests: usize, + pub invalid_reads: usize, + pub cache_misses: usize, + pub cache_hits: usize, +} + +#[get("/health")] +pub async fn get_heath_status(app_data: Data, auth: BearerAuth) -> impl Responder { + trace!("Received health status request"); + if !has_access_level(&app_data.config, auth.token(), AccessLevel::Read) { + HttpResponse::Forbidden().body("Need at least read access to get the health status") + } else { + let now = SystemTime::now(); + if let Ok(shared) = app_data.shared.read() { + HttpResponse::Ok().json(HealthStatus { + cache_entries: shared.lru_cache.len(), + cache_size: shared.stats.cache_size, + cache_capacity: app_data.config.max_cache_size.unwrap_or(u64::MAX), + oldest_entry_ms: shared + .stats + .oldest_entry_at + .and_then(|time| now.duration_since(time).ok()) + .map(|elapsed| elapsed.as_millis() as u64), + newest_entry_ms: shared + .stats + .newest_entry_at + .and_then(|time| now.duration_since(time).ok()) + .map(|elapsed| elapsed.as_millis() as u64), + uptime_ms: Instant::now() + .duration_since(shared.stats.started_at) + .as_millis() as u64, + write_requests: shared.stats.successful_writes, + invalid_writes: shared.stats.invalid_writes, + collisions: shared.stats.collisions, + read_requests: shared.stats.cache_misses + + shared.stats.cache_hits + + shared.stats.invalid_reads, + invalid_reads: shared.stats.invalid_reads, + cache_misses: shared.stats.cache_misses, + cache_hits: shared.stats.cache_hits, + }) + } else { + HttpResponse::InternalServerError().body("Could not acquire shared data read lock") + } } } diff --git a/src/helpers.rs b/src/helpers.rs new file mode 100644 index 0000000..ca74696 --- /dev/null +++ b/src/helpers.rs @@ -0,0 +1,68 @@ +use std::io::Write; + +use actix_web::web::Payload; +use futures_util::StreamExt; +use tracing::{error, trace}; + +use crate::{ + app_state::{CacheEntryMeta, Stats}, + config::AppConfig, + disk_cache::get_cache_entry_path, + lru_cache::WeightedLRUCache, +}; + +pub enum StreamingError { + Collision, + ReadError, +} + +pub fn update_boundary_items(stats: &mut Stats, entries: &WeightedLRUCache) { + for (_, value) in entries.iter() { + if stats + .oldest_entry_at + .is_none_or(|oldest_entry| oldest_entry < value.meta.create_at) + { + stats.oldest_entry_at = Some(value.meta.create_at); + } + if stats + .newest_entry_at + .is_none_or(|newest_entry| newest_entry > value.meta.create_at) + { + stats.newest_entry_at = Some(value.meta.create_at); + } + } +} + +pub async fn stream_to_file( + config: &AppConfig, + body: &mut Payload, + hash: &str, +) -> Result { + let path = get_cache_entry_path(config, &hash); + let file = std::fs::File::create_new(&path); + + match file { + Ok(mut file) => { + let mut complete_size = 0u64; + while let Some(chunk) = body.next().await { + match chunk { + Ok(chunk) => { + complete_size += chunk.len() as u64; + file.write_all(&chunk).expect("This should actually work") // this might lead to a crash in the service -> not good + } + Err(e) => { + error!("Could not read body chunk: {}", e); + drop(file); + std::fs::remove_file(path).unwrap(); // Clean up to make sure the block doesn't get half-written with the wrong content + return Err(StreamingError::ReadError); + } + } + } + Ok(complete_size) + } + Err(_) => { + trace!("Tried to overwrite existing cache item {}", hash); + return Err(StreamingError::Collision); + } + } +} diff --git a/src/lru_cache.rs b/src/lru_cache.rs index ab294ef..51c9329 100644 --- a/src/lru_cache.rs +++ b/src/lru_cache.rs @@ -1,24 +1,25 @@ use std::{ - collections::{HashMap, VecDeque}, + collections::{HashMap, VecDeque, hash_map::Iter}, sync::mpsc::Sender, }; use tracing::error; -struct CacheEntry { - weight: u64, +pub struct CacheEntry { + pub weight: u64, + pub meta: TMeta, } -pub struct WeightedLRUCache { +pub struct WeightedLRUCache { current_weight: u64, pub max_weight: u64, - entries: HashMap, + entries: HashMap>, lru_list: VecDeque, eviction_notifier: Option>>, } -impl WeightedLRUCache { - pub fn new(max_weight: u64) -> WeightedLRUCache { +impl WeightedLRUCache { + pub fn new(max_weight: u64) -> WeightedLRUCache { WeightedLRUCache { current_weight: 0, max_weight, @@ -32,9 +33,14 @@ impl WeightedLRUCache { self.eviction_notifier = Some(sender); } - pub fn put(&mut self, key: String, weight: u64) { - self.entries - .insert(key.clone(), CacheEntry { weight: weight }); + pub fn put(&mut self, key: String, weight: u64, meta: TMeta) { + self.entries.insert( + key.clone(), + CacheEntry { + weight: weight, + meta, + }, + ); self.current_weight += weight; self.promote(key); if self.current_weight > self.max_weight { @@ -60,6 +66,10 @@ impl WeightedLRUCache { self.entries.len() } + pub fn iter<'a>(&'a self) -> Iter<'a, String, CacheEntry> { + self.entries.iter() + } + fn evict(&mut self) -> Vec { let mut removed_keys = Vec::new(); while self.current_weight > self.max_weight { @@ -86,11 +96,11 @@ mod tests { #[test] fn fill_lru_cache() { - let mut lru_cache = WeightedLRUCache::new(100); + let mut lru_cache = WeightedLRUCache::::new(100); - lru_cache.put("key1".to_string(), 10); - lru_cache.put("key2".to_string(), 20); - lru_cache.put("key3".to_string(), 30); + lru_cache.put("key1".to_string(), 10, 0); + lru_cache.put("key2".to_string(), 20, 0); + lru_cache.put("key3".to_string(), 30, 0); assert!(lru_cache.has_entry("key1")); assert!(lru_cache.has_entry("key2")); @@ -99,11 +109,11 @@ mod tests { #[test] fn overflow_lru_cache() { - let mut lru_cache = WeightedLRUCache::new(100); + let mut lru_cache = WeightedLRUCache::::new(100); - lru_cache.put("key1".to_string(), 10); - lru_cache.put("key2".to_string(), 45); - lru_cache.put("key3".to_string(), 50); + lru_cache.put("key1".to_string(), 10, 0); + lru_cache.put("key2".to_string(), 45, 0); + lru_cache.put("key3".to_string(), 50, 0); assert!(!lru_cache.has_entry("key1")); assert!(lru_cache.has_entry("key2")); @@ -112,26 +122,26 @@ mod tests { #[test] fn calls_expunge_cb() { - let mut lru_cache = WeightedLRUCache::new(100); + let mut lru_cache = WeightedLRUCache::::new(100); let (sender, receiver) = std::sync::mpsc::channel(); { lru_cache.on_eviction(sender); } - lru_cache.put("key1".to_string(), 10); - lru_cache.put("key2".to_string(), 45); - lru_cache.put("key3".to_string(), 50); + lru_cache.put("key1".to_string(), 10, 0); + lru_cache.put("key2".to_string(), 45, 0); + lru_cache.put("key3".to_string(), 50, 0); assert_eq!(receiver.recv().unwrap(), vec!["key1".to_string()]); } #[test] fn double_insert_pushes_entries_to_the_back() { - let mut lru_cache = WeightedLRUCache::new(100); + let mut lru_cache = WeightedLRUCache::::new(100); - lru_cache.put("key1".to_string(), 10); - lru_cache.put("key2".to_string(), 45); - lru_cache.put("key1".to_string(), 20); - lru_cache.put("key3".to_string(), 50); + lru_cache.put("key1".to_string(), 10, 0); + lru_cache.put("key2".to_string(), 45, 0); + lru_cache.put("key1".to_string(), 20, 0); + lru_cache.put("key3".to_string(), 50, 0); assert!(lru_cache.has_entry("key1")); assert!(!lru_cache.has_entry("key2")); @@ -140,12 +150,12 @@ mod tests { #[test] fn promote_pushes_entries_to_the_back() { - let mut lru_cache = WeightedLRUCache::new(100); + let mut lru_cache = WeightedLRUCache::::new(100); - lru_cache.put("key1".to_string(), 10); - lru_cache.put("key2".to_string(), 45); + lru_cache.put("key1".to_string(), 10, 0); + lru_cache.put("key2".to_string(), 45, 0); lru_cache.promote("key1".to_string()); - lru_cache.put("key3".to_string(), 50); + lru_cache.put("key3".to_string(), 50, 0); assert!(lru_cache.has_entry("key1")); assert!(!lru_cache.has_entry("key2")); @@ -154,8 +164,8 @@ mod tests { #[test] fn gracefully_fails_with_zero_size() { - let mut lru_cache = WeightedLRUCache::new(0); - lru_cache.put("key1".to_string(), 10); + let mut lru_cache = WeightedLRUCache::::new(0); + lru_cache.put("key1".to_string(), 10, 0); assert!(!lru_cache.has_entry("key1")); } diff --git a/src/main.rs b/src/main.rs index f7ea13a..b3d49a0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,25 @@ use std::{ - sync::{Arc, Mutex, mpsc::Receiver}, + sync::{Arc, RwLock}, + time::Instant, u64, }; use actix_web::{App, HttpServer, web::Data}; use app_state::AppState; -use disk_cache::{cache_cleanup, fill_lru_cache_from_disk}; +use disk_cache::{cache_cleanup, init_from_disk}; use dotenvy::dotenv; use lru_cache::WeightedLRUCache; use tracing::{Level, debug, warn}; use tracing_subscriber::FmtSubscriber; +use crate::app_state::{SharedState, Stats}; + mod access; mod app_state; mod config; mod disk_cache; mod handlers; +mod helpers; mod lru_cache; #[actix_web::main] @@ -40,13 +44,26 @@ async fn main() -> std::io::Result<()> { } let (tx, rx) = std::sync::mpsc::channel(); lru_cache.on_eviction(tx); + let mut stats = Stats { + started_at: Instant::now(), + cache_size: 0, + oldest_entry_at: None, + newest_entry_at: None, + successful_writes: 0, + invalid_writes: 0, + failed_writes: 0, + collisions: 0, + invalid_reads: 0, + cache_misses: 0, + cache_hits: 0, + }; // Load the existing cache entries from disk - fill_lru_cache_from_disk(&config, &mut lru_cache)?; + init_from_disk(&config, &mut lru_cache, &mut stats)?; debug!("Loaded cache with {} entries", lru_cache.len()); let app_state = AppState { - lru_cache: Arc::new(Mutex::new(lru_cache)), config: config.clone(), + shared: Arc::new(RwLock::new(SharedState { lru_cache, stats })), }; let mut server = HttpServer::new({ @@ -56,6 +73,7 @@ async fn main() -> std::io::Result<()> { .wrap(actix_web::middleware::Logger::default()) .service(handlers::get_cache_item) .service(handlers::put_cache_item) + .service(handlers::get_heath_status) } }) .keep_alive(None); // disable HTTP keep-alive because it seems to break NX (at least in version 20.8)