use diesel::prelude::*; use diesel::sqlite::SqliteConnection; use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use crate::database::models::{InsertablePrecomputedReel, PrecomputedReel}; use crate::database::schema; use crate::database::{DbError, DbErrorKind, connect}; use crate::otel::trace_db_call; /// Ledger for precomputed memory reels. The nightly agentic job writes a /// row after each successful render; the `GET /reels/precomputed` handler /// reads it to gate on freshness and serve the cached MP4. pub trait PrecomputedReelDao: Sync + Send { /// Insert a precomputed reel row. Returns the new row's id. /// Written by the nightly agentic job (Section D). #[allow(dead_code)] fn record_reel( &mut self, context: &opentelemetry::Context, row: &InsertablePrecomputedReel, ) -> Result; /// Find the latest precomputed reel for the given (span, library_key). fn latest_for( &mut self, context: &opentelemetry::Context, span: &str, library_key: &str, ) -> Result, DbError>; /// Return true when a fresh precomputed reel exists for the given /// (span, library_key, render_version) that was generated at or after /// `min_generated_at`. Used as a fast existence gate before falling /// back to `latest_for` (avoids a second query path). fn exists_fresh( &mut self, context: &opentelemetry::Context, span: &str, library_key: &str, render_version: i32, min_generated_at: i64, ) -> Result; /// Delete all but the newest `keep` rows for (span, library_key), returning /// the deleted rows so the caller can unlink their output files. Used by the /// nightly job to retire superseded reels (e.g. yesterday's daily). #[allow(dead_code)] fn prune_superseded( &mut self, context: &opentelemetry::Context, span: &str, library_key: &str, keep: usize, ) -> Result, DbError>; /// Every cache_key currently in the ledger. Used by the on-disk cache sweep /// to protect files a ledger row still points at. #[allow(dead_code)] fn all_cache_keys(&mut self, context: &opentelemetry::Context) -> Result, DbError>; } pub struct SqlitePrecomputedReelDao { connection: Arc>, } impl Default for SqlitePrecomputedReelDao { fn default() -> Self { Self::new() } } impl SqlitePrecomputedReelDao { pub fn new() -> Self { Self { connection: Arc::new(Mutex::new(connect())), } } #[cfg(test)] pub fn from_connection(conn: Arc>) -> Self { Self { connection: conn } } } impl PrecomputedReelDao for SqlitePrecomputedReelDao { fn record_reel( &mut self, context: &opentelemetry::Context, row: &InsertablePrecomputedReel, ) -> Result { trace_db_call(context, "insert", "record_reel", |_span| { use schema::precomputed_reels::dsl; let mut connection = self .connection .lock() .expect("Unable to lock PrecomputedReelDao"); diesel::insert_into(dsl::precomputed_reels) .values(row) .execute(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to insert reel: {}", e))?; dsl::precomputed_reels .order(dsl::id.desc()) .select(dsl::id) .first::(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to get reel id: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::InsertError, e)) } fn latest_for( &mut self, context: &opentelemetry::Context, span: &str, library_key: &str, ) -> Result, DbError> { trace_db_call(context, "query", "latest_for", |_span| { use schema::precomputed_reels::dsl; let mut connection = self .connection .lock() .expect("Unable to lock PrecomputedReelDao"); dsl::precomputed_reels .filter(dsl::span.eq(span)) .filter(dsl::library_key.eq(library_key)) .order(dsl::generated_at.desc()) .first::(connection.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Failed to get latest reel: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } fn exists_fresh( &mut self, context: &opentelemetry::Context, span: &str, library_key: &str, render_version: i32, min_generated_at: i64, ) -> Result { trace_db_call(context, "query", "exists_fresh", |_span| { use schema::precomputed_reels::dsl; let mut connection = self .connection .lock() .expect("Unable to lock PrecomputedReelDao"); let count: i64 = dsl::precomputed_reels .filter(dsl::span.eq(span)) .filter(dsl::library_key.eq(library_key)) .filter(dsl::render_version.eq(render_version)) .filter(dsl::generated_at.ge(min_generated_at)) .count() .get_result(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to check fresh reel: {}", e))?; Ok(count > 0) }) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } fn prune_superseded( &mut self, context: &opentelemetry::Context, span: &str, library_key: &str, keep: usize, ) -> Result, DbError> { trace_db_call(context, "delete", "prune_superseded", |_span| { use schema::precomputed_reels::dsl; let mut connection = self .connection .lock() .expect("Unable to lock PrecomputedReelDao"); // Newest first; everything past `keep` is superseded. The table // holds at most a handful of rows per (span, library), so loading // and slicing in Rust is cheaper than a correlated subquery. let mut rows: Vec = dsl::precomputed_reels .filter(dsl::span.eq(span)) .filter(dsl::library_key.eq(library_key)) .order(dsl::generated_at.desc()) .load::(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to load reels for prune: {}", e))?; let stale = rows.split_off(rows.len().min(keep)); if !stale.is_empty() { let ids: Vec = stale.iter().map(|r| r.id).collect(); diesel::delete(dsl::precomputed_reels.filter(dsl::id.eq_any(ids))) .execute(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to delete superseded reels: {}", e))?; } Ok(stale) }) .map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) } fn all_cache_keys(&mut self, context: &opentelemetry::Context) -> Result, DbError> { trace_db_call(context, "query", "all_cache_keys", |_span| { use schema::precomputed_reels::dsl; let mut connection = self .connection .lock() .expect("Unable to lock PrecomputedReelDao"); dsl::precomputed_reels .select(dsl::cache_key) .load::(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to load cache keys: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } } #[cfg(test)] mod tests { use super::*; use diesel::Connection; use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; const DB_MIGRATIONS: EmbeddedMigrations = embed_migrations!(); fn setup_dao() -> SqlitePrecomputedReelDao { let mut conn = SqliteConnection::establish(":memory:") .expect("Unable to create in-memory db connection"); conn.run_pending_migrations(DB_MIGRATIONS) .expect("Failure running DB migrations"); SqlitePrecomputedReelDao::from_connection(Arc::new(Mutex::new(conn))) } fn ctx() -> opentelemetry::Context { opentelemetry::Context::new() } fn sample_row() -> InsertablePrecomputedReel { InsertablePrecomputedReel { span: "day".to_string(), library_key: "1".to_string(), cache_key: "abc123".to_string(), output_path: "/tmp/reel.mp4".to_string(), title: "Test Reel".to_string(), media_count: 10, render_version: 1, tz_offset_minutes: 0, voice: Some("default".to_string()), generated_at: 1_000_000, } } #[test] fn record_reel_inserts_and_returns_id() { let mut dao = setup_dao(); let ctx = ctx(); let row = sample_row(); let id = dao.record_reel(&ctx, &row).unwrap(); assert!(id > 0, "should return a positive id"); } #[test] fn record_reel_returns_increasing_ids() { let mut dao = setup_dao(); let ctx = ctx(); let row = sample_row(); let id1 = dao.record_reel(&ctx, &row).unwrap(); let id2 = dao.record_reel(&ctx, &row).unwrap(); assert!(id2 > id1, "each insert should get a higher id"); } #[test] fn latest_for_returns_latest() { let mut dao = setup_dao(); let ctx = ctx(); let row1 = InsertablePrecomputedReel { generated_at: 1_000_000, ..sample_row() }; let row2 = InsertablePrecomputedReel { generated_at: 2_000_000, ..sample_row() }; dao.record_reel(&ctx, &row1).unwrap(); dao.record_reel(&ctx, &row2).unwrap(); let latest = dao.latest_for(&ctx, "day", "1").unwrap().unwrap(); assert_eq!(latest.generated_at, 2_000_000); } #[test] fn latest_for_scoped_by_span_and_library() { let mut dao = setup_dao(); let ctx = ctx(); let day_row = InsertablePrecomputedReel { span: "day".to_string(), library_key: "1".to_string(), generated_at: 1_000_000, ..sample_row() }; let week_row = InsertablePrecomputedReel { span: "week".to_string(), library_key: "1".to_string(), generated_at: 2_000_000, ..sample_row() }; dao.record_reel(&ctx, &day_row).unwrap(); dao.record_reel(&ctx, &week_row).unwrap(); let day_latest = dao.latest_for(&ctx, "day", "1").unwrap().unwrap(); assert_eq!(day_latest.span, "day"); let week_latest = dao.latest_for(&ctx, "week", "1").unwrap().unwrap(); assert_eq!(week_latest.span, "week"); // Different library returns None let missing = dao.latest_for(&ctx, "day", "99").unwrap(); assert!(missing.is_none()); } #[test] fn latest_for_returns_none_when_no_rows() { let mut dao = setup_dao(); let ctx = ctx(); let result = dao.latest_for(&ctx, "day", "1").unwrap(); assert!(result.is_none()); } #[test] fn exists_fresh_returns_true_when_present() { let mut dao = setup_dao(); let ctx = ctx(); dao.record_reel(&ctx, &sample_row()).unwrap(); let exists = dao.exists_fresh(&ctx, "day", "1", 1, 900_000).unwrap(); assert!(exists, "should find the row we just inserted"); } #[test] fn exists_fresh_returns_false_when_missing() { let mut dao = setup_dao(); let ctx = ctx(); let exists = dao.exists_fresh(&ctx, "day", "1", 1, 900_000).unwrap(); assert!(!exists, "should not find anything in empty table"); } #[test] fn exists_fresh_respects_min_generated_at() { let mut dao = setup_dao(); let ctx = ctx(); dao.record_reel(&ctx, &sample_row()).unwrap(); // Below the threshold — should exist let exists = dao.exists_fresh(&ctx, "day", "1", 1, 500_000).unwrap(); assert!(exists); // Above the threshold — should not exist let exists = dao.exists_fresh(&ctx, "day", "1", 1, 2_000_000).unwrap(); assert!(!exists); } #[test] fn exists_fresh_respects_render_version() { let mut dao = setup_dao(); let ctx = ctx(); let row_v1 = InsertablePrecomputedReel { render_version: 1, ..sample_row() }; dao.record_reel(&ctx, &row_v1).unwrap(); assert!(dao.exists_fresh(&ctx, "day", "1", 1, 900_000).unwrap()); assert!(!dao.exists_fresh(&ctx, "day", "1", 2, 900_000).unwrap()); } #[test] fn prune_superseded_keeps_newest_and_returns_deleted() { let mut dao = setup_dao(); let ctx = ctx(); // Three day/lib1 reels at increasing timestamps, plus an unrelated one. for (i, key) in ["k1", "k2", "k3"].iter().enumerate() { dao.record_reel( &ctx, &InsertablePrecomputedReel { cache_key: key.to_string(), generated_at: 1_000_000 + i as i64 * 1000, ..sample_row() }, ) .unwrap(); } let other = InsertablePrecomputedReel { library_key: "2".to_string(), cache_key: "other".to_string(), ..sample_row() }; dao.record_reel(&ctx, &other).unwrap(); // Keep the newest 2 of (day, "1"); k1 (oldest) is superseded. let deleted = dao.prune_superseded(&ctx, "day", "1", 2).unwrap(); assert_eq!(deleted.len(), 1); assert_eq!(deleted[0].cache_key, "k1"); // The newest 2 survive; the other-library row is untouched. let keys = dao.all_cache_keys(&ctx).unwrap(); assert_eq!(keys.len(), 3); assert!(keys.contains(&"k2".to_string())); assert!(keys.contains(&"k3".to_string())); assert!(keys.contains(&"other".to_string())); assert!(!keys.contains(&"k1".to_string())); } #[test] fn prune_superseded_noop_when_within_keep() { let mut dao = setup_dao(); let ctx = ctx(); dao.record_reel(&ctx, &sample_row()).unwrap(); let deleted = dao.prune_superseded(&ctx, "day", "1", 2).unwrap(); assert!(deleted.is_empty()); assert_eq!(dao.all_cache_keys(&ctx).unwrap().len(), 1); } }