diff --git a/.env.example b/.env.example index a7bd7e5..2e431bc 100644 --- a/.env.example +++ b/.env.example @@ -160,3 +160,10 @@ SEARCH_RAG_RERANK=0 # REEL_PREGEN_LIBRARY=all # Max agentic tool iterations for pre-gen scripter. Default 8. # REEL_PREGEN_MAX_TOOL_ITERS=8 +# +# On-disk reel cache sweep (runs every 24h, independent of pre-gen). Removes +# reel MP4s with no ledger row + no live job that are older than the max age — +# i.e. the on-demand cache, which otherwise grows forever. Set to 0 to disable. +# REEL_CACHE_SWEEP_ENABLED=1 +# Age (days) before an unreferenced reel MP4 is swept. Default 7. +# REEL_CACHE_MAX_AGE_DAYS=7 diff --git a/src/database/precomputed_reel_dao.rs b/src/database/precomputed_reel_dao.rs index 7acc098..b66573b 100644 --- a/src/database/precomputed_reel_dao.rs +++ b/src/database/precomputed_reel_dao.rs @@ -41,6 +41,23 @@ pub trait PrecomputedReelDao: Sync + Send { render_version: i32, min_generated_at: i64, ) -> Result; + + /// Delete all but the newest `keep` rows for (span, library_key), returning + /// the deleted rows so the caller can unlink their output files. Used by the + /// nightly job to retire superseded reels (e.g. yesterday's daily). + #[allow(dead_code)] + fn prune_superseded( + &mut self, + context: &opentelemetry::Context, + span: &str, + library_key: &str, + keep: usize, + ) -> Result, DbError>; + + /// Every cache_key currently in the ledger. Used by the on-disk cache sweep + /// to protect files a ledger row still points at. + #[allow(dead_code)] + fn all_cache_keys(&mut self, context: &opentelemetry::Context) -> Result, DbError>; } pub struct SqlitePrecomputedReelDao { @@ -148,6 +165,60 @@ impl PrecomputedReelDao for SqlitePrecomputedReelDao { }) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } + + fn prune_superseded( + &mut self, + context: &opentelemetry::Context, + span: &str, + library_key: &str, + keep: usize, + ) -> Result, DbError> { + trace_db_call(context, "delete", "prune_superseded", |_span| { + use schema::precomputed_reels::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock PrecomputedReelDao"); + + // Newest first; everything past `keep` is superseded. The table + // holds at most a handful of rows per (span, library), so loading + // and slicing in Rust is cheaper than a correlated subquery. + let mut rows: Vec = dsl::precomputed_reels + .filter(dsl::span.eq(span)) + .filter(dsl::library_key.eq(library_key)) + .order(dsl::generated_at.desc()) + .load::(connection.deref_mut()) + .map_err(|e| anyhow::anyhow!("Failed to load reels for prune: {}", e))?; + + let stale = rows.split_off(rows.len().min(keep)); + if !stale.is_empty() { + let ids: Vec = stale.iter().map(|r| r.id).collect(); + diesel::delete(dsl::precomputed_reels.filter(dsl::id.eq_any(ids))) + .execute(connection.deref_mut()) + .map_err(|e| anyhow::anyhow!("Failed to delete superseded reels: {}", e))?; + } + Ok(stale) + }) + .map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) + } + + fn all_cache_keys(&mut self, context: &opentelemetry::Context) -> Result, DbError> { + trace_db_call(context, "query", "all_cache_keys", |_span| { + use schema::precomputed_reels::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock PrecomputedReelDao"); + + dsl::precomputed_reels + .select(dsl::cache_key) + .load::(connection.deref_mut()) + .map_err(|e| anyhow::anyhow!("Failed to load cache keys: {}", e)) + }) + .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) + } } #[cfg(test)] @@ -318,4 +389,51 @@ mod tests { assert!(dao.exists_fresh(&ctx, "day", "1", 1, 900_000).unwrap()); assert!(!dao.exists_fresh(&ctx, "day", "1", 2, 900_000).unwrap()); } + + #[test] + fn prune_superseded_keeps_newest_and_returns_deleted() { + let mut dao = setup_dao(); + let ctx = ctx(); + // Three day/lib1 reels at increasing timestamps, plus an unrelated one. + for (i, key) in ["k1", "k2", "k3"].iter().enumerate() { + dao.record_reel( + &ctx, + &InsertablePrecomputedReel { + cache_key: key.to_string(), + generated_at: 1_000_000 + i as i64 * 1000, + ..sample_row() + }, + ) + .unwrap(); + } + let other = InsertablePrecomputedReel { + library_key: "2".to_string(), + cache_key: "other".to_string(), + ..sample_row() + }; + dao.record_reel(&ctx, &other).unwrap(); + + // Keep the newest 2 of (day, "1"); k1 (oldest) is superseded. + let deleted = dao.prune_superseded(&ctx, "day", "1", 2).unwrap(); + assert_eq!(deleted.len(), 1); + assert_eq!(deleted[0].cache_key, "k1"); + + // The newest 2 survive; the other-library row is untouched. + let keys = dao.all_cache_keys(&ctx).unwrap(); + assert_eq!(keys.len(), 3); + assert!(keys.contains(&"k2".to_string())); + assert!(keys.contains(&"k3".to_string())); + assert!(keys.contains(&"other".to_string())); + assert!(!keys.contains(&"k1".to_string())); + } + + #[test] + fn prune_superseded_noop_when_within_keep() { + let mut dao = setup_dao(); + let ctx = ctx(); + dao.record_reel(&ctx, &sample_row()).unwrap(); + let deleted = dao.prune_superseded(&ctx, "day", "1", 2).unwrap(); + assert!(deleted.is_empty()); + assert_eq!(dao.all_cache_keys(&ctx).unwrap().len(), 1); + } } diff --git a/src/main.rs b/src/main.rs index dd2868f..e420d8b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -269,6 +269,8 @@ fn main() -> std::io::Result<()> { // Spawn the nightly pre-generation scheduler (Section D). reels::spawn_pregen_scheduler(app_state.clone()).await; + // Spawn the on-disk reel-cache sweeper (bounds pre-gen + on-demand reels). + reels::spawn_reel_cache_sweeper(app_state.clone()).await; HttpServer::new(move || { let user_dao = SqliteUserDao::new(); diff --git a/src/reels/mod.rs b/src/reels/mod.rs index 95769ad..afe2ced 100644 --- a/src/reels/mod.rs +++ b/src/reels/mod.rs @@ -46,6 +46,21 @@ const REEL_PRECOMPUTED_WEEK_MAX_AGE_HOURS: u64 = 192; /// Maximum age for a precomputed month reel. const REEL_PRECOMPUTED_MONTH_MAX_AGE_HOURS: u64 = 768; +/// How many precomputed reels to keep per (span, library). The newest is the +/// one served; one extra is a grace window so a regen mid-flight (or a client +/// that started a fetch just before the swap) isn't left without a file. +const PREGEN_KEEP_PER_SCOPE: usize = 2; + +/// On-disk reel cache sweep: an unreferenced reel MP4 older than this is +/// removed. Catches the on-demand cache (which has no ledger row and otherwise +/// grows forever) and any pre-gen orphans. Tunable via `REEL_CACHE_MAX_AGE_DAYS`. +const REEL_CACHE_MAX_AGE_DAYS_DEFAULT: u64 = 7; +/// Interval between on-disk cache sweeps. +const REEL_CACHE_SWEEP_INTERVAL_SECS: u64 = 24 * 3600; +/// Transient render artifacts (`.mp4.tmp`, `.concat.txt`, orphaned sidecars) +/// older than this are leftovers from a crashed render and safe to remove. +const REEL_TMP_MAX_AGE_SECS: u64 = 3600; + /// Resolve a library request parameter to a stable key string. /// Returns the library's id as a string when found, or `"all"` when /// the param is absent or the lookup fails. @@ -1142,28 +1157,165 @@ async fn pregen_one( ) .await?; - // Record to ledger - let mut reel_dao = app_state.precomputed_reel_dao.lock().expect("lock"); - reel_dao.record_reel( - &ctx, - &crate::database::models::InsertablePrecomputedReel { - span: span.to_string(), - library_key: library.to_string(), - cache_key: key.clone(), - output_path: mp4.to_string_lossy().to_string(), - title, - media_count, - render_version: RENDER_VERSION as i32, - tz_offset_minutes: tz, - voice: voice.clone(), - generated_at: now, - }, - )?; + // Record to ledger, then retire superseded reels for this (span, library) + // — yesterday's daily, an older render-version, etc. — keeping a small + // grace window. Done under one lock so the prune sees the row we just wrote. + let superseded = { + let mut reel_dao = app_state.precomputed_reel_dao.lock().expect("lock"); + reel_dao.record_reel( + &ctx, + &crate::database::models::InsertablePrecomputedReel { + span: span.to_string(), + library_key: library.to_string(), + cache_key: key.clone(), + output_path: mp4.to_string_lossy().to_string(), + title, + media_count, + render_version: RENDER_VERSION as i32, + tz_offset_minutes: tz, + voice: voice.clone(), + generated_at: now, + }, + )?; + reel_dao + .prune_superseded(&ctx, span, library, PREGEN_KEEP_PER_SCOPE) + .unwrap_or_default() + }; + for row in &superseded { + delete_reel_files(&row.output_path); + } + if !superseded.is_empty() { + log::info!( + "Pruned {} superseded precomputed reel(s) for span={}", + superseded.len(), + span + ); + } log::info!("Precomputed reel generated for span={}, key={}", span, key); Ok(()) } +// --- On-disk cache sweep ----------------------------------------------------- + +/// Best-effort unlink of a reel's MP4 and its `.json` sidecar. +fn delete_reel_files(mp4_output_path: &str) { + let mp4 = Path::new(mp4_output_path); + let _ = std::fs::remove_file(mp4); + let _ = std::fs::remove_file(mp4.with_extension("json")); +} + +/// Max age (seconds) before an unreferenced reel MP4 is swept. +fn reel_cache_max_age_secs() -> u64 { + std::env::var("REEL_CACHE_MAX_AGE_DAYS") + .ok() + .and_then(|v| v.trim().parse::().ok()) + .filter(|d| *d > 0) + .unwrap_or(REEL_CACHE_MAX_AGE_DAYS_DEFAULT) + * 86_400 +} + +/// Spawn the periodic on-disk reel-cache sweeper. Runs independently of the +/// pre-gen scheduler because the on-demand cache grows whether or not pre-gen +/// is enabled. Disable with `REEL_CACHE_SWEEP_ENABLED=0`. +pub(crate) async fn spawn_reel_cache_sweeper(app_state: web::Data) { + if std::env::var("REEL_CACHE_SWEEP_ENABLED").ok().as_deref() == Some("0") { + log::info!("Reel cache sweeper disabled (REEL_CACHE_SWEEP_ENABLED=0)"); + return; + } + tokio::spawn(async move { + // Settle after startup, then sweep on a fixed cadence. + tokio::time::sleep(Duration::from_secs(300)).await; + loop { + let removed = sweep_reel_cache(&app_state); + if removed > 0 { + log::info!("Reel cache sweep removed {removed} stale file(s)"); + } + tokio::time::sleep(Duration::from_secs(REEL_CACHE_SWEEP_INTERVAL_SECS)).await; + } + }); +} + +/// One sweep of `reels_path`. Removes: stale render artifacts (`.mp4.tmp`, +/// `.concat.txt`, orphaned sidecars) from crashed runs; and reel MP4s that no +/// ledger row references, that no live job points at, and that are older than +/// the cache max age (the on-demand cache, which has no ledger row). Returns the +/// number of files removed. Best-effort — any IO error on one entry is skipped. +fn sweep_reel_cache(app_state: &AppState) -> usize { + let dir = Path::new(&app_state.reels_path); + let read_dir = match std::fs::read_dir(dir) { + Ok(rd) => rd, + Err(_) => return 0, // dir not created yet → nothing to sweep + }; + + // Files a ledger row still points at (current pre-gen reels). + let protected: std::collections::HashSet = { + let ctx = opentelemetry::Context::new(); + let mut dao = app_state.precomputed_reel_dao.lock().expect("lock"); + dao.all_cache_keys(&ctx) + .unwrap_or_default() + .into_iter() + .collect() + }; + // Outputs of live in-memory jobs (a Done reel a client may still be fetching). + let active: std::collections::HashSet = { + let jobs = REEL_JOBS.lock().unwrap(); + jobs.values() + .filter_map(|j| j.output_path.as_ref()) + .map(|p| p.to_string_lossy().to_string()) + .collect() + }; + + let now = std::time::SystemTime::now(); + let max_age = Duration::from_secs(reel_cache_max_age_secs()); + let tmp_max_age = Duration::from_secs(REEL_TMP_MAX_AGE_SECS); + let mut removed = 0usize; + + for entry in read_dir.flatten() { + let path = entry.path(); + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + let age = entry + .metadata() + .and_then(|m| m.modified()) + .ok() + .and_then(|t| now.duration_since(t).ok()) + .unwrap_or_default(); + + // Transient render artifacts from a crashed run. + if name.ends_with(".mp4.tmp") || name.ends_with(".concat.txt") { + if age > tmp_max_age && std::fs::remove_file(&path).is_ok() { + removed += 1; + } + continue; + } + + // Reel MP4: keep if referenced (ledger or live job) or still recent. + if let Some(key) = name.strip_suffix(".mp4") { + let p = path.to_string_lossy().to_string(); + if protected.contains(key) || active.contains(&p) || age < max_age { + continue; + } + if std::fs::remove_file(&path).is_ok() { + let _ = std::fs::remove_file(path.with_extension("json")); + removed += 1; + } + continue; + } + + // Orphaned sidecar (its MP4 is gone). + if name.ends_with(".json") + && !path.with_extension("mp4").exists() + && age > tmp_max_age + && std::fs::remove_file(&path).is_ok() + { + removed += 1; + } + } + removed +} + #[cfg(test)] mod tests { use super::*;