implement health status endpoint (closes #3)

This commit is contained in:
Markus Brueckner 2026-01-24 15:43:40 +01:00
parent 11b9af5b85
commit e09bb69dbf
7 changed files with 282 additions and 79 deletions

View file

@ -18,6 +18,9 @@ following are supported:
to the cache. Writing implies being able to read. to the cache. Writing implies being able to read.
- `ENEX_BIND_ADDRESSES` - comma-separated list of IP addresses to bind the - `ENEX_BIND_ADDRESSES` - comma-separated list of IP addresses to bind the
service to. If empty the service will bind to all available interfaces. service to. If empty the service will bind to all available interfaces.
- `ENEX_MAX_CACHE_SIZE` - the maximum size of the cache in bytes. If the cache
exceeds that size, the least-recently used entries will be pruned to make
room.
enex-rcache will try to read a `.env` file at startup in order to set the enex-rcache will try to read a `.env` file at startup in order to set the
environment. Existing enviroment entries will take precedence over the contents environment. Existing enviroment entries will take precedence over the contents

View file

@ -1,25 +1,51 @@
use std::sync::{Arc, Mutex}; use std::{
sync::{Arc, RwLock},
time::{Instant, SystemTime},
};
use tracing::error; use tracing::error;
use crate::lru_cache::WeightedLRUCache; use crate::lru_cache::WeightedLRUCache;
pub struct Stats {
pub started_at: Instant,
pub cache_size: u64,
pub oldest_entry_at: Option<SystemTime>,
pub newest_entry_at: Option<SystemTime>,
pub successful_writes: usize,
pub invalid_writes: usize,
pub failed_writes: usize,
pub collisions: usize,
pub invalid_reads: usize,
pub cache_misses: usize,
pub cache_hits: usize,
}
pub struct CacheEntryMeta {
pub create_at: SystemTime,
}
pub struct SharedState {
pub lru_cache: WeightedLRUCache<CacheEntryMeta>,
pub stats: Stats,
}
#[derive(Clone)] #[derive(Clone)]
pub struct AppState { pub struct AppState {
pub lru_cache: Arc<Mutex<WeightedLRUCache>>,
pub config: crate::config::AppConfig, pub config: crate::config::AppConfig,
pub shared: Arc<RwLock<SharedState>>,
} }
impl AppState { impl AppState {
/// little helper to simplify access to the LRU cache /// little helper to simplify access to the LRU cache
pub fn with_lru_cache(&self, f: impl FnOnce(&mut WeightedLRUCache)) { pub fn with_shared(&self, f: impl FnOnce(&mut SharedState)) {
let cache = self.lru_cache.lock(); let shared = self.shared.write();
match cache { match shared {
Ok(mut cache) => { Ok(mut shared) => {
f(&mut cache); f(&mut shared);
} }
Err(e) => { Err(e) => {
error!("Could not promote cache item: {}", e); error!("Could not acquire shared state write lock: {}", e);
} }
} }
} }

View file

@ -2,7 +2,12 @@ use std::{path::PathBuf, sync::mpsc::Receiver};
use tracing::{debug, warn}; use tracing::{debug, warn};
use crate::{config::AppConfig, lru_cache::WeightedLRUCache}; use crate::{
app_state::{CacheEntryMeta, Stats},
config::AppConfig,
helpers::update_boundary_items,
lru_cache::WeightedLRUCache,
};
pub fn get_cache_entry_path(config: &AppConfig, hash: &str) -> PathBuf { pub fn get_cache_entry_path(config: &AppConfig, hash: &str) -> PathBuf {
PathBuf::from(&config.cache_dir).join(hash) PathBuf::from(&config.cache_dir).join(hash)
@ -37,9 +42,10 @@ struct SortableCacheEntry {
} }
/// load the existing files from the cache directory into an LRU cache /// load the existing files from the cache directory into an LRU cache
pub fn fill_lru_cache_from_disk( pub fn init_from_disk(
config: &AppConfig, config: &AppConfig,
lru_cache: &mut WeightedLRUCache, lru_cache: &mut WeightedLRUCache<CacheEntryMeta>,
stats: &mut Stats,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
let entries = std::fs::read_dir(&config.cache_dir)?; let entries = std::fs::read_dir(&config.cache_dir)?;
let mut cache_entries = vec![]; let mut cache_entries = vec![];
@ -48,6 +54,7 @@ pub fn fill_lru_cache_from_disk(
Ok(entry) => match entry.metadata() { Ok(entry) => match entry.metadata() {
Ok(metadata) => { Ok(metadata) => {
let weight = metadata.len(); let weight = metadata.len();
stats.cache_size += weight;
let key = entry.file_name().to_string_lossy().to_string(); let key = entry.file_name().to_string_lossy().to_string();
match metadata.created() { match metadata.created() {
Ok(creation_time) => cache_entries.push(SortableCacheEntry { Ok(creation_time) => cache_entries.push(SortableCacheEntry {
@ -70,7 +77,14 @@ pub fn fill_lru_cache_from_disk(
} }
cache_entries.sort_by(|a, b| b.creation_time.cmp(&a.creation_time)); cache_entries.sort_by(|a, b| b.creation_time.cmp(&a.creation_time));
for entry in cache_entries { for entry in cache_entries {
lru_cache.put(entry.key, entry.weight); lru_cache.put(
entry.key,
entry.weight,
CacheEntryMeta {
create_at: entry.creation_time,
},
);
} }
update_boundary_items(stats, &lru_cache);
Ok(()) Ok(())
} }

View file

@ -1,17 +1,21 @@
use std::io::Write; use std::{
time::{Instant, SystemTime},
u64,
};
use actix_web::{ use actix_web::{
HttpResponse, Responder, get, put, HttpResponse, Responder, get, put,
web::{Data, Path, Payload}, web::{Data, Path, Payload},
}; };
use actix_web_httpauth::extractors::bearer::BearerAuth; use actix_web_httpauth::extractors::bearer::BearerAuth;
use futures_util::StreamExt; use serde::Serialize;
use tracing::{debug, error, trace}; use tracing::{debug, trace};
use crate::{ use crate::{
access::{AccessLevel, has_access_level}, access::{AccessLevel, has_access_level},
app_state::AppState, app_state::{AppState, CacheEntryMeta},
disk_cache::get_cache_entry_path, disk_cache::get_cache_entry_path,
helpers::{StreamingError, stream_to_file, update_boundary_items},
}; };
#[get("/v1/cache/{hash}")] #[get("/v1/cache/{hash}")]
@ -26,6 +30,7 @@ pub async fn get_cache_item(
"Tried to read cache item {} without valid read access token.", "Tried to read cache item {} without valid read access token.",
hash.as_str() hash.as_str()
); );
app_data.with_shared(|shared| shared.stats.invalid_reads += 1);
return HttpResponse::Unauthorized() return HttpResponse::Unauthorized()
.content_type("text/plain") .content_type("text/plain")
.body("Please provide a valid access token with at least read-level access."); .body("Please provide a valid access token with at least read-level access.");
@ -33,10 +38,14 @@ pub async fn get_cache_item(
let path = get_cache_entry_path(&app_data.config, &hash); let path = get_cache_entry_path(&app_data.config, &hash);
if !path.exists() { if !path.exists() {
trace!("Cache item not found: {}", hash.as_str()); trace!("Cache item not found: {}", hash.as_str());
app_data.with_shared(|shared| shared.stats.cache_misses += 1);
HttpResponse::NotFound().body("The record was not found.") HttpResponse::NotFound().body("The record was not found.")
} else { } else {
trace!("Returning cache item {}", hash.as_str()); trace!("Returning cache item {}", hash.as_str());
app_data.with_lru_cache(|cache| cache.promote(hash.to_string())); app_data.with_shared(|shared| {
shared.lru_cache.promote(hash.to_string());
shared.stats.cache_hits += 1;
});
HttpResponse::Ok().body(std::fs::read(path).unwrap()) HttpResponse::Ok().body(std::fs::read(path).unwrap())
} }
} }
@ -54,39 +63,94 @@ pub async fn put_cache_item(
"Tried to write cache item {} without valid read-write access token.", "Tried to write cache item {} without valid read-write access token.",
hash.as_str() hash.as_str()
); );
app_data.with_shared(|shared| shared.stats.invalid_writes += 1);
return HttpResponse::Unauthorized() return HttpResponse::Unauthorized()
.content_type("text/plain") .content_type("text/plain")
.body("Please provide a valid access token with read-write access."); .body("Please provide a valid access token with read-write access.");
} }
let path = get_cache_entry_path(&app_data.config, &hash);
let file = std::fs::File::create_new(&path); let stream_result = stream_to_file(&app_data.config, &mut body, &hash).await;
match file { match stream_result {
Ok(mut file) => { Ok(size) => {
let mut complete_size = 0u64; app_data.with_shared(|shared| {
while let Some(chunk) = body.next().await { shared.lru_cache.put(
match chunk { hash.to_string(),
Ok(chunk) => { size,
complete_size += chunk.len() as u64; CacheEntryMeta {
file.write_all(&chunk).expect("This should actually work") create_at: SystemTime::now(),
} },
Err(e) => { );
error!("Could not write cache item chunk: {}", e); shared.stats.successful_writes += 1;
drop(file); update_boundary_items(&mut shared.stats, &shared.lru_cache);
std::fs::remove_file(path).unwrap(); // Clean up to make sure the block doesn't get half-written with the wrong content });
complete_size = 0;
break;
}
}
}
if complete_size > 0 {
app_data.with_lru_cache(|cache| cache.put(hash.to_string(), complete_size));
}
debug!("Created cache item {}", hash.as_str()); debug!("Created cache item {}", hash.as_str());
HttpResponse::Accepted().finish() HttpResponse::Accepted().finish()
} }
Err(_) => { Err(StreamingError::Collision) => {
trace!("Tried to overwrite existing cache item {}", hash.as_str()); app_data.with_shared(|shared| shared.stats.collisions += 1);
HttpResponse::Conflict().body("Cannot overwrite an existing record.") HttpResponse::Conflict().body("Cannot overwrite an existing record.")
} }
Err(StreamingError::ReadError) => {
app_data.with_shared(|shared| shared.stats.failed_writes += 1);
HttpResponse::BadRequest().body("Failed fully read payload") // this is technically not in the spec, but BadRequest sounds plausible for an error to read the request payload
}
}
}
#[derive(Serialize)]
pub struct HealthStatus {
pub cache_entries: usize,
pub cache_size: u64,
pub cache_capacity: u64,
pub oldest_entry_ms: Option<u64>,
pub newest_entry_ms: Option<u64>,
pub uptime_ms: u64,
pub write_requests: usize,
pub invalid_writes: usize,
pub collisions: usize,
pub read_requests: usize,
pub invalid_reads: usize,
pub cache_misses: usize,
pub cache_hits: usize,
}
#[get("/health")]
pub async fn get_heath_status(app_data: Data<AppState>, auth: BearerAuth) -> impl Responder {
trace!("Received health status request");
if !has_access_level(&app_data.config, auth.token(), AccessLevel::Read) {
HttpResponse::Forbidden().body("Need at least read access to get the health status")
} else {
let now = SystemTime::now();
if let Ok(shared) = app_data.shared.read() {
HttpResponse::Ok().json(HealthStatus {
cache_entries: shared.lru_cache.len(),
cache_size: shared.stats.cache_size,
cache_capacity: app_data.config.max_cache_size.unwrap_or(u64::MAX),
oldest_entry_ms: shared
.stats
.oldest_entry_at
.and_then(|time| now.duration_since(time).ok())
.map(|elapsed| elapsed.as_millis() as u64),
newest_entry_ms: shared
.stats
.newest_entry_at
.and_then(|time| now.duration_since(time).ok())
.map(|elapsed| elapsed.as_millis() as u64),
uptime_ms: Instant::now()
.duration_since(shared.stats.started_at)
.as_millis() as u64,
write_requests: shared.stats.successful_writes,
invalid_writes: shared.stats.invalid_writes,
collisions: shared.stats.collisions,
read_requests: shared.stats.cache_misses
+ shared.stats.cache_hits
+ shared.stats.invalid_reads,
invalid_reads: shared.stats.invalid_reads,
cache_misses: shared.stats.cache_misses,
cache_hits: shared.stats.cache_hits,
})
} else {
HttpResponse::InternalServerError().body("Could not acquire shared data read lock")
}
} }
} }

68
src/helpers.rs Normal file
View file

@ -0,0 +1,68 @@
use std::io::Write;
use actix_web::web::Payload;
use futures_util::StreamExt;
use tracing::{error, trace};
use crate::{
app_state::{CacheEntryMeta, Stats},
config::AppConfig,
disk_cache::get_cache_entry_path,
lru_cache::WeightedLRUCache,
};
pub enum StreamingError {
Collision,
ReadError,
}
pub fn update_boundary_items(stats: &mut Stats, entries: &WeightedLRUCache<CacheEntryMeta>) {
for (_, value) in entries.iter() {
if stats
.oldest_entry_at
.is_none_or(|oldest_entry| oldest_entry < value.meta.create_at)
{
stats.oldest_entry_at = Some(value.meta.create_at);
}
if stats
.newest_entry_at
.is_none_or(|newest_entry| newest_entry > value.meta.create_at)
{
stats.newest_entry_at = Some(value.meta.create_at);
}
}
}
pub async fn stream_to_file(
config: &AppConfig,
body: &mut Payload,
hash: &str,
) -> Result<u64, StreamingError> {
let path = get_cache_entry_path(config, &hash);
let file = std::fs::File::create_new(&path);
match file {
Ok(mut file) => {
let mut complete_size = 0u64;
while let Some(chunk) = body.next().await {
match chunk {
Ok(chunk) => {
complete_size += chunk.len() as u64;
file.write_all(&chunk).expect("This should actually work") // this might lead to a crash in the service -> not good
}
Err(e) => {
error!("Could not read body chunk: {}", e);
drop(file);
std::fs::remove_file(path).unwrap(); // Clean up to make sure the block doesn't get half-written with the wrong content
return Err(StreamingError::ReadError);
}
}
}
Ok(complete_size)
}
Err(_) => {
trace!("Tried to overwrite existing cache item {}", hash);
return Err(StreamingError::Collision);
}
}
}

View file

@ -1,24 +1,25 @@
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque, hash_map::Iter},
sync::mpsc::Sender, sync::mpsc::Sender,
}; };
use tracing::error; use tracing::error;
struct CacheEntry { pub struct CacheEntry<TMeta> {
weight: u64, pub weight: u64,
pub meta: TMeta,
} }
pub struct WeightedLRUCache { pub struct WeightedLRUCache<TMeta> {
current_weight: u64, current_weight: u64,
pub max_weight: u64, pub max_weight: u64,
entries: HashMap<String, CacheEntry>, entries: HashMap<String, CacheEntry<TMeta>>,
lru_list: VecDeque<String>, lru_list: VecDeque<String>,
eviction_notifier: Option<Sender<Vec<String>>>, eviction_notifier: Option<Sender<Vec<String>>>,
} }
impl WeightedLRUCache { impl<TMeta> WeightedLRUCache<TMeta> {
pub fn new(max_weight: u64) -> WeightedLRUCache { pub fn new(max_weight: u64) -> WeightedLRUCache<TMeta> {
WeightedLRUCache { WeightedLRUCache {
current_weight: 0, current_weight: 0,
max_weight, max_weight,
@ -32,9 +33,14 @@ impl WeightedLRUCache {
self.eviction_notifier = Some(sender); self.eviction_notifier = Some(sender);
} }
pub fn put(&mut self, key: String, weight: u64) { pub fn put(&mut self, key: String, weight: u64, meta: TMeta) {
self.entries self.entries.insert(
.insert(key.clone(), CacheEntry { weight: weight }); key.clone(),
CacheEntry {
weight: weight,
meta,
},
);
self.current_weight += weight; self.current_weight += weight;
self.promote(key); self.promote(key);
if self.current_weight > self.max_weight { if self.current_weight > self.max_weight {
@ -60,6 +66,10 @@ impl WeightedLRUCache {
self.entries.len() self.entries.len()
} }
pub fn iter<'a>(&'a self) -> Iter<'a, String, CacheEntry<TMeta>> {
self.entries.iter()
}
fn evict(&mut self) -> Vec<String> { fn evict(&mut self) -> Vec<String> {
let mut removed_keys = Vec::new(); let mut removed_keys = Vec::new();
while self.current_weight > self.max_weight { while self.current_weight > self.max_weight {
@ -86,11 +96,11 @@ mod tests {
#[test] #[test]
fn fill_lru_cache() { fn fill_lru_cache() {
let mut lru_cache = WeightedLRUCache::new(100); let mut lru_cache = WeightedLRUCache::<u8>::new(100);
lru_cache.put("key1".to_string(), 10); lru_cache.put("key1".to_string(), 10, 0);
lru_cache.put("key2".to_string(), 20); lru_cache.put("key2".to_string(), 20, 0);
lru_cache.put("key3".to_string(), 30); lru_cache.put("key3".to_string(), 30, 0);
assert!(lru_cache.has_entry("key1")); assert!(lru_cache.has_entry("key1"));
assert!(lru_cache.has_entry("key2")); assert!(lru_cache.has_entry("key2"));
@ -99,11 +109,11 @@ mod tests {
#[test] #[test]
fn overflow_lru_cache() { fn overflow_lru_cache() {
let mut lru_cache = WeightedLRUCache::new(100); let mut lru_cache = WeightedLRUCache::<u8>::new(100);
lru_cache.put("key1".to_string(), 10); lru_cache.put("key1".to_string(), 10, 0);
lru_cache.put("key2".to_string(), 45); lru_cache.put("key2".to_string(), 45, 0);
lru_cache.put("key3".to_string(), 50); lru_cache.put("key3".to_string(), 50, 0);
assert!(!lru_cache.has_entry("key1")); assert!(!lru_cache.has_entry("key1"));
assert!(lru_cache.has_entry("key2")); assert!(lru_cache.has_entry("key2"));
@ -112,26 +122,26 @@ mod tests {
#[test] #[test]
fn calls_expunge_cb() { fn calls_expunge_cb() {
let mut lru_cache = WeightedLRUCache::new(100); let mut lru_cache = WeightedLRUCache::<u8>::new(100);
let (sender, receiver) = std::sync::mpsc::channel(); let (sender, receiver) = std::sync::mpsc::channel();
{ {
lru_cache.on_eviction(sender); lru_cache.on_eviction(sender);
} }
lru_cache.put("key1".to_string(), 10); lru_cache.put("key1".to_string(), 10, 0);
lru_cache.put("key2".to_string(), 45); lru_cache.put("key2".to_string(), 45, 0);
lru_cache.put("key3".to_string(), 50); lru_cache.put("key3".to_string(), 50, 0);
assert_eq!(receiver.recv().unwrap(), vec!["key1".to_string()]); assert_eq!(receiver.recv().unwrap(), vec!["key1".to_string()]);
} }
#[test] #[test]
fn double_insert_pushes_entries_to_the_back() { fn double_insert_pushes_entries_to_the_back() {
let mut lru_cache = WeightedLRUCache::new(100); let mut lru_cache = WeightedLRUCache::<u8>::new(100);
lru_cache.put("key1".to_string(), 10); lru_cache.put("key1".to_string(), 10, 0);
lru_cache.put("key2".to_string(), 45); lru_cache.put("key2".to_string(), 45, 0);
lru_cache.put("key1".to_string(), 20); lru_cache.put("key1".to_string(), 20, 0);
lru_cache.put("key3".to_string(), 50); lru_cache.put("key3".to_string(), 50, 0);
assert!(lru_cache.has_entry("key1")); assert!(lru_cache.has_entry("key1"));
assert!(!lru_cache.has_entry("key2")); assert!(!lru_cache.has_entry("key2"));
@ -140,12 +150,12 @@ mod tests {
#[test] #[test]
fn promote_pushes_entries_to_the_back() { fn promote_pushes_entries_to_the_back() {
let mut lru_cache = WeightedLRUCache::new(100); let mut lru_cache = WeightedLRUCache::<u8>::new(100);
lru_cache.put("key1".to_string(), 10); lru_cache.put("key1".to_string(), 10, 0);
lru_cache.put("key2".to_string(), 45); lru_cache.put("key2".to_string(), 45, 0);
lru_cache.promote("key1".to_string()); lru_cache.promote("key1".to_string());
lru_cache.put("key3".to_string(), 50); lru_cache.put("key3".to_string(), 50, 0);
assert!(lru_cache.has_entry("key1")); assert!(lru_cache.has_entry("key1"));
assert!(!lru_cache.has_entry("key2")); assert!(!lru_cache.has_entry("key2"));
@ -154,8 +164,8 @@ mod tests {
#[test] #[test]
fn gracefully_fails_with_zero_size() { fn gracefully_fails_with_zero_size() {
let mut lru_cache = WeightedLRUCache::new(0); let mut lru_cache = WeightedLRUCache::<u8>::new(0);
lru_cache.put("key1".to_string(), 10); lru_cache.put("key1".to_string(), 10, 0);
assert!(!lru_cache.has_entry("key1")); assert!(!lru_cache.has_entry("key1"));
} }

View file

@ -1,21 +1,25 @@
use std::{ use std::{
sync::{Arc, Mutex, mpsc::Receiver}, sync::{Arc, RwLock},
time::Instant,
u64, u64,
}; };
use actix_web::{App, HttpServer, web::Data}; use actix_web::{App, HttpServer, web::Data};
use app_state::AppState; use app_state::AppState;
use disk_cache::{cache_cleanup, fill_lru_cache_from_disk}; use disk_cache::{cache_cleanup, init_from_disk};
use dotenvy::dotenv; use dotenvy::dotenv;
use lru_cache::WeightedLRUCache; use lru_cache::WeightedLRUCache;
use tracing::{Level, debug, warn}; use tracing::{Level, debug, warn};
use tracing_subscriber::FmtSubscriber; use tracing_subscriber::FmtSubscriber;
use crate::app_state::{SharedState, Stats};
mod access; mod access;
mod app_state; mod app_state;
mod config; mod config;
mod disk_cache; mod disk_cache;
mod handlers; mod handlers;
mod helpers;
mod lru_cache; mod lru_cache;
#[actix_web::main] #[actix_web::main]
@ -40,13 +44,26 @@ async fn main() -> std::io::Result<()> {
} }
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
lru_cache.on_eviction(tx); lru_cache.on_eviction(tx);
let mut stats = Stats {
started_at: Instant::now(),
cache_size: 0,
oldest_entry_at: None,
newest_entry_at: None,
successful_writes: 0,
invalid_writes: 0,
failed_writes: 0,
collisions: 0,
invalid_reads: 0,
cache_misses: 0,
cache_hits: 0,
};
// Load the existing cache entries from disk // Load the existing cache entries from disk
fill_lru_cache_from_disk(&config, &mut lru_cache)?; init_from_disk(&config, &mut lru_cache, &mut stats)?;
debug!("Loaded cache with {} entries", lru_cache.len()); debug!("Loaded cache with {} entries", lru_cache.len());
let app_state = AppState { let app_state = AppState {
lru_cache: Arc::new(Mutex::new(lru_cache)),
config: config.clone(), config: config.clone(),
shared: Arc::new(RwLock::new(SharedState { lru_cache, stats })),
}; };
let mut server = HttpServer::new({ let mut server = HttpServer::new({
@ -56,6 +73,7 @@ async fn main() -> std::io::Result<()> {
.wrap(actix_web::middleware::Logger::default()) .wrap(actix_web::middleware::Logger::default())
.service(handlers::get_cache_item) .service(handlers::get_cache_item)
.service(handlers::put_cache_item) .service(handlers::put_cache_item)
.service(handlers::get_heath_status)
} }
}) })
.keep_alive(None); // disable HTTP keep-alive because it seems to break NX (at least in version 20.8) .keep_alive(None); // disable HTTP keep-alive because it seems to break NX (at least in version 20.8)