Feature/unified nl search #106
@@ -160,3 +160,10 @@ SEARCH_RAG_RERANK=0
|
|||||||
# REEL_PREGEN_LIBRARY=all
|
# REEL_PREGEN_LIBRARY=all
|
||||||
# Max agentic tool iterations for pre-gen scripter. Default 8.
|
# Max agentic tool iterations for pre-gen scripter. Default 8.
|
||||||
# REEL_PREGEN_MAX_TOOL_ITERS=8
|
# REEL_PREGEN_MAX_TOOL_ITERS=8
|
||||||
|
#
|
||||||
|
# On-disk reel cache sweep (runs every 24h, independent of pre-gen). Removes
|
||||||
|
# reel MP4s with no ledger row + no live job that are older than the max age —
|
||||||
|
# i.e. the on-demand cache, which otherwise grows forever. Set to 0 to disable.
|
||||||
|
# REEL_CACHE_SWEEP_ENABLED=1
|
||||||
|
# Age (days) before an unreferenced reel MP4 is swept. Default 7.
|
||||||
|
# REEL_CACHE_MAX_AGE_DAYS=7
|
||||||
|
|||||||
@@ -41,6 +41,23 @@ pub trait PrecomputedReelDao: Sync + Send {
|
|||||||
render_version: i32,
|
render_version: i32,
|
||||||
min_generated_at: i64,
|
min_generated_at: i64,
|
||||||
) -> Result<bool, DbError>;
|
) -> Result<bool, DbError>;
|
||||||
|
|
||||||
|
/// Delete all but the newest `keep` rows for (span, library_key), returning
|
||||||
|
/// the deleted rows so the caller can unlink their output files. Used by the
|
||||||
|
/// nightly job to retire superseded reels (e.g. yesterday's daily).
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn prune_superseded(
|
||||||
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
span: &str,
|
||||||
|
library_key: &str,
|
||||||
|
keep: usize,
|
||||||
|
) -> Result<Vec<PrecomputedReel>, DbError>;
|
||||||
|
|
||||||
|
/// Every cache_key currently in the ledger. Used by the on-disk cache sweep
|
||||||
|
/// to protect files a ledger row still points at.
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn all_cache_keys(&mut self, context: &opentelemetry::Context) -> Result<Vec<String>, DbError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct SqlitePrecomputedReelDao {
|
pub struct SqlitePrecomputedReelDao {
|
||||||
@@ -148,6 +165,60 @@ impl PrecomputedReelDao for SqlitePrecomputedReelDao {
|
|||||||
})
|
})
|
||||||
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn prune_superseded(
|
||||||
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
span: &str,
|
||||||
|
library_key: &str,
|
||||||
|
keep: usize,
|
||||||
|
) -> Result<Vec<PrecomputedReel>, DbError> {
|
||||||
|
trace_db_call(context, "delete", "prune_superseded", |_span| {
|
||||||
|
use schema::precomputed_reels::dsl;
|
||||||
|
|
||||||
|
let mut connection = self
|
||||||
|
.connection
|
||||||
|
.lock()
|
||||||
|
.expect("Unable to lock PrecomputedReelDao");
|
||||||
|
|
||||||
|
// Newest first; everything past `keep` is superseded. The table
|
||||||
|
// holds at most a handful of rows per (span, library), so loading
|
||||||
|
// and slicing in Rust is cheaper than a correlated subquery.
|
||||||
|
let mut rows: Vec<PrecomputedReel> = dsl::precomputed_reels
|
||||||
|
.filter(dsl::span.eq(span))
|
||||||
|
.filter(dsl::library_key.eq(library_key))
|
||||||
|
.order(dsl::generated_at.desc())
|
||||||
|
.load::<PrecomputedReel>(connection.deref_mut())
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to load reels for prune: {}", e))?;
|
||||||
|
|
||||||
|
let stale = rows.split_off(rows.len().min(keep));
|
||||||
|
if !stale.is_empty() {
|
||||||
|
let ids: Vec<i32> = stale.iter().map(|r| r.id).collect();
|
||||||
|
diesel::delete(dsl::precomputed_reels.filter(dsl::id.eq_any(ids)))
|
||||||
|
.execute(connection.deref_mut())
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to delete superseded reels: {}", e))?;
|
||||||
|
}
|
||||||
|
Ok(stale)
|
||||||
|
})
|
||||||
|
.map_err(|e| DbError::log(DbErrorKind::UpdateError, e))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn all_cache_keys(&mut self, context: &opentelemetry::Context) -> Result<Vec<String>, DbError> {
|
||||||
|
trace_db_call(context, "query", "all_cache_keys", |_span| {
|
||||||
|
use schema::precomputed_reels::dsl;
|
||||||
|
|
||||||
|
let mut connection = self
|
||||||
|
.connection
|
||||||
|
.lock()
|
||||||
|
.expect("Unable to lock PrecomputedReelDao");
|
||||||
|
|
||||||
|
dsl::precomputed_reels
|
||||||
|
.select(dsl::cache_key)
|
||||||
|
.load::<String>(connection.deref_mut())
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to load cache keys: {}", e))
|
||||||
|
})
|
||||||
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -318,4 +389,51 @@ mod tests {
|
|||||||
assert!(dao.exists_fresh(&ctx, "day", "1", 1, 900_000).unwrap());
|
assert!(dao.exists_fresh(&ctx, "day", "1", 1, 900_000).unwrap());
|
||||||
assert!(!dao.exists_fresh(&ctx, "day", "1", 2, 900_000).unwrap());
|
assert!(!dao.exists_fresh(&ctx, "day", "1", 2, 900_000).unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn prune_superseded_keeps_newest_and_returns_deleted() {
|
||||||
|
let mut dao = setup_dao();
|
||||||
|
let ctx = ctx();
|
||||||
|
// Three day/lib1 reels at increasing timestamps, plus an unrelated one.
|
||||||
|
for (i, key) in ["k1", "k2", "k3"].iter().enumerate() {
|
||||||
|
dao.record_reel(
|
||||||
|
&ctx,
|
||||||
|
&InsertablePrecomputedReel {
|
||||||
|
cache_key: key.to_string(),
|
||||||
|
generated_at: 1_000_000 + i as i64 * 1000,
|
||||||
|
..sample_row()
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
let other = InsertablePrecomputedReel {
|
||||||
|
library_key: "2".to_string(),
|
||||||
|
cache_key: "other".to_string(),
|
||||||
|
..sample_row()
|
||||||
|
};
|
||||||
|
dao.record_reel(&ctx, &other).unwrap();
|
||||||
|
|
||||||
|
// Keep the newest 2 of (day, "1"); k1 (oldest) is superseded.
|
||||||
|
let deleted = dao.prune_superseded(&ctx, "day", "1", 2).unwrap();
|
||||||
|
assert_eq!(deleted.len(), 1);
|
||||||
|
assert_eq!(deleted[0].cache_key, "k1");
|
||||||
|
|
||||||
|
// The newest 2 survive; the other-library row is untouched.
|
||||||
|
let keys = dao.all_cache_keys(&ctx).unwrap();
|
||||||
|
assert_eq!(keys.len(), 3);
|
||||||
|
assert!(keys.contains(&"k2".to_string()));
|
||||||
|
assert!(keys.contains(&"k3".to_string()));
|
||||||
|
assert!(keys.contains(&"other".to_string()));
|
||||||
|
assert!(!keys.contains(&"k1".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn prune_superseded_noop_when_within_keep() {
|
||||||
|
let mut dao = setup_dao();
|
||||||
|
let ctx = ctx();
|
||||||
|
dao.record_reel(&ctx, &sample_row()).unwrap();
|
||||||
|
let deleted = dao.prune_superseded(&ctx, "day", "1", 2).unwrap();
|
||||||
|
assert!(deleted.is_empty());
|
||||||
|
assert_eq!(dao.all_cache_keys(&ctx).unwrap().len(), 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -269,6 +269,8 @@ fn main() -> std::io::Result<()> {
|
|||||||
|
|
||||||
// Spawn the nightly pre-generation scheduler (Section D).
|
// Spawn the nightly pre-generation scheduler (Section D).
|
||||||
reels::spawn_pregen_scheduler(app_state.clone()).await;
|
reels::spawn_pregen_scheduler(app_state.clone()).await;
|
||||||
|
// Spawn the on-disk reel-cache sweeper (bounds pre-gen + on-demand reels).
|
||||||
|
reels::spawn_reel_cache_sweeper(app_state.clone()).await;
|
||||||
|
|
||||||
HttpServer::new(move || {
|
HttpServer::new(move || {
|
||||||
let user_dao = SqliteUserDao::new();
|
let user_dao = SqliteUserDao::new();
|
||||||
|
|||||||
+169
-17
@@ -46,6 +46,21 @@ const REEL_PRECOMPUTED_WEEK_MAX_AGE_HOURS: u64 = 192;
|
|||||||
/// Maximum age for a precomputed month reel.
|
/// Maximum age for a precomputed month reel.
|
||||||
const REEL_PRECOMPUTED_MONTH_MAX_AGE_HOURS: u64 = 768;
|
const REEL_PRECOMPUTED_MONTH_MAX_AGE_HOURS: u64 = 768;
|
||||||
|
|
||||||
|
/// How many precomputed reels to keep per (span, library). The newest is the
|
||||||
|
/// one served; one extra is a grace window so a regen mid-flight (or a client
|
||||||
|
/// that started a fetch just before the swap) isn't left without a file.
|
||||||
|
const PREGEN_KEEP_PER_SCOPE: usize = 2;
|
||||||
|
|
||||||
|
/// On-disk reel cache sweep: an unreferenced reel MP4 older than this is
|
||||||
|
/// removed. Catches the on-demand cache (which has no ledger row and otherwise
|
||||||
|
/// grows forever) and any pre-gen orphans. Tunable via `REEL_CACHE_MAX_AGE_DAYS`.
|
||||||
|
const REEL_CACHE_MAX_AGE_DAYS_DEFAULT: u64 = 7;
|
||||||
|
/// Interval between on-disk cache sweeps.
|
||||||
|
const REEL_CACHE_SWEEP_INTERVAL_SECS: u64 = 24 * 3600;
|
||||||
|
/// Transient render artifacts (`.mp4.tmp`, `.concat.txt`, orphaned sidecars)
|
||||||
|
/// older than this are leftovers from a crashed render and safe to remove.
|
||||||
|
const REEL_TMP_MAX_AGE_SECS: u64 = 3600;
|
||||||
|
|
||||||
/// Resolve a library request parameter to a stable key string.
|
/// Resolve a library request parameter to a stable key string.
|
||||||
/// Returns the library's id as a string when found, or `"all"` when
|
/// Returns the library's id as a string when found, or `"all"` when
|
||||||
/// the param is absent or the lookup fails.
|
/// the param is absent or the lookup fails.
|
||||||
@@ -1142,28 +1157,165 @@ async fn pregen_one(
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Record to ledger
|
// Record to ledger, then retire superseded reels for this (span, library)
|
||||||
let mut reel_dao = app_state.precomputed_reel_dao.lock().expect("lock");
|
// — yesterday's daily, an older render-version, etc. — keeping a small
|
||||||
reel_dao.record_reel(
|
// grace window. Done under one lock so the prune sees the row we just wrote.
|
||||||
&ctx,
|
let superseded = {
|
||||||
&crate::database::models::InsertablePrecomputedReel {
|
let mut reel_dao = app_state.precomputed_reel_dao.lock().expect("lock");
|
||||||
span: span.to_string(),
|
reel_dao.record_reel(
|
||||||
library_key: library.to_string(),
|
&ctx,
|
||||||
cache_key: key.clone(),
|
&crate::database::models::InsertablePrecomputedReel {
|
||||||
output_path: mp4.to_string_lossy().to_string(),
|
span: span.to_string(),
|
||||||
title,
|
library_key: library.to_string(),
|
||||||
media_count,
|
cache_key: key.clone(),
|
||||||
render_version: RENDER_VERSION as i32,
|
output_path: mp4.to_string_lossy().to_string(),
|
||||||
tz_offset_minutes: tz,
|
title,
|
||||||
voice: voice.clone(),
|
media_count,
|
||||||
generated_at: now,
|
render_version: RENDER_VERSION as i32,
|
||||||
},
|
tz_offset_minutes: tz,
|
||||||
)?;
|
voice: voice.clone(),
|
||||||
|
generated_at: now,
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
reel_dao
|
||||||
|
.prune_superseded(&ctx, span, library, PREGEN_KEEP_PER_SCOPE)
|
||||||
|
.unwrap_or_default()
|
||||||
|
};
|
||||||
|
for row in &superseded {
|
||||||
|
delete_reel_files(&row.output_path);
|
||||||
|
}
|
||||||
|
if !superseded.is_empty() {
|
||||||
|
log::info!(
|
||||||
|
"Pruned {} superseded precomputed reel(s) for span={}",
|
||||||
|
superseded.len(),
|
||||||
|
span
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
log::info!("Precomputed reel generated for span={}, key={}", span, key);
|
log::info!("Precomputed reel generated for span={}, key={}", span, key);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- On-disk cache sweep -----------------------------------------------------
|
||||||
|
|
||||||
|
/// Best-effort unlink of a reel's MP4 and its `.json` sidecar.
|
||||||
|
fn delete_reel_files(mp4_output_path: &str) {
|
||||||
|
let mp4 = Path::new(mp4_output_path);
|
||||||
|
let _ = std::fs::remove_file(mp4);
|
||||||
|
let _ = std::fs::remove_file(mp4.with_extension("json"));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Max age (seconds) before an unreferenced reel MP4 is swept.
|
||||||
|
fn reel_cache_max_age_secs() -> u64 {
|
||||||
|
std::env::var("REEL_CACHE_MAX_AGE_DAYS")
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| v.trim().parse::<u64>().ok())
|
||||||
|
.filter(|d| *d > 0)
|
||||||
|
.unwrap_or(REEL_CACHE_MAX_AGE_DAYS_DEFAULT)
|
||||||
|
* 86_400
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn the periodic on-disk reel-cache sweeper. Runs independently of the
|
||||||
|
/// pre-gen scheduler because the on-demand cache grows whether or not pre-gen
|
||||||
|
/// is enabled. Disable with `REEL_CACHE_SWEEP_ENABLED=0`.
|
||||||
|
pub(crate) async fn spawn_reel_cache_sweeper(app_state: web::Data<AppState>) {
|
||||||
|
if std::env::var("REEL_CACHE_SWEEP_ENABLED").ok().as_deref() == Some("0") {
|
||||||
|
log::info!("Reel cache sweeper disabled (REEL_CACHE_SWEEP_ENABLED=0)");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// Settle after startup, then sweep on a fixed cadence.
|
||||||
|
tokio::time::sleep(Duration::from_secs(300)).await;
|
||||||
|
loop {
|
||||||
|
let removed = sweep_reel_cache(&app_state);
|
||||||
|
if removed > 0 {
|
||||||
|
log::info!("Reel cache sweep removed {removed} stale file(s)");
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_secs(REEL_CACHE_SWEEP_INTERVAL_SECS)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One sweep of `reels_path`. Removes: stale render artifacts (`.mp4.tmp`,
|
||||||
|
/// `.concat.txt`, orphaned sidecars) from crashed runs; and reel MP4s that no
|
||||||
|
/// ledger row references, that no live job points at, and that are older than
|
||||||
|
/// the cache max age (the on-demand cache, which has no ledger row). Returns the
|
||||||
|
/// number of files removed. Best-effort — any IO error on one entry is skipped.
|
||||||
|
fn sweep_reel_cache(app_state: &AppState) -> usize {
|
||||||
|
let dir = Path::new(&app_state.reels_path);
|
||||||
|
let read_dir = match std::fs::read_dir(dir) {
|
||||||
|
Ok(rd) => rd,
|
||||||
|
Err(_) => return 0, // dir not created yet → nothing to sweep
|
||||||
|
};
|
||||||
|
|
||||||
|
// Files a ledger row still points at (current pre-gen reels).
|
||||||
|
let protected: std::collections::HashSet<String> = {
|
||||||
|
let ctx = opentelemetry::Context::new();
|
||||||
|
let mut dao = app_state.precomputed_reel_dao.lock().expect("lock");
|
||||||
|
dao.all_cache_keys(&ctx)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.into_iter()
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
// Outputs of live in-memory jobs (a Done reel a client may still be fetching).
|
||||||
|
let active: std::collections::HashSet<String> = {
|
||||||
|
let jobs = REEL_JOBS.lock().unwrap();
|
||||||
|
jobs.values()
|
||||||
|
.filter_map(|j| j.output_path.as_ref())
|
||||||
|
.map(|p| p.to_string_lossy().to_string())
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
|
||||||
|
let now = std::time::SystemTime::now();
|
||||||
|
let max_age = Duration::from_secs(reel_cache_max_age_secs());
|
||||||
|
let tmp_max_age = Duration::from_secs(REEL_TMP_MAX_AGE_SECS);
|
||||||
|
let mut removed = 0usize;
|
||||||
|
|
||||||
|
for entry in read_dir.flatten() {
|
||||||
|
let path = entry.path();
|
||||||
|
let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let age = entry
|
||||||
|
.metadata()
|
||||||
|
.and_then(|m| m.modified())
|
||||||
|
.ok()
|
||||||
|
.and_then(|t| now.duration_since(t).ok())
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
// Transient render artifacts from a crashed run.
|
||||||
|
if name.ends_with(".mp4.tmp") || name.ends_with(".concat.txt") {
|
||||||
|
if age > tmp_max_age && std::fs::remove_file(&path).is_ok() {
|
||||||
|
removed += 1;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reel MP4: keep if referenced (ledger or live job) or still recent.
|
||||||
|
if let Some(key) = name.strip_suffix(".mp4") {
|
||||||
|
let p = path.to_string_lossy().to_string();
|
||||||
|
if protected.contains(key) || active.contains(&p) || age < max_age {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if std::fs::remove_file(&path).is_ok() {
|
||||||
|
let _ = std::fs::remove_file(path.with_extension("json"));
|
||||||
|
removed += 1;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Orphaned sidecar (its MP4 is gone).
|
||||||
|
if name.ends_with(".json")
|
||||||
|
&& !path.with_extension("mp4").exists()
|
||||||
|
&& age > tmp_max_age
|
||||||
|
&& std::fs::remove_file(&path).is_ok()
|
||||||
|
{
|
||||||
|
removed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
removed
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|||||||
Reference in New Issue
Block a user