feat: nightly agentic pre-generation of memory reels
Implement end-to-end nightly pre-generation of memory reels with agentic
scripting that grounds narration in calendar, location, messages, and RAG.
Sections A-E from the plan:
A. Extract produce_reel pipeline core from run_reel_job with
ScripterMode::Fast/Agentic and progress callbacks.
B. Agentic scripter: factor run_readonly_tool_loop from the insight
generator, build read-only tool gate, prompt builder with GPS, and
generate_script_agentic with fallback to fast path.
C. Precomputed reels ledger (SQLite table + DAO), GET /reels/precomputed
handler with validity gate, GET /reels/by-key/{key}/video streaming,
and normalize_library_key helper.
D. Nightly scheduler: spawn_pregen_scheduler with configurable hour,
run_pregen_batch (day/week/month spans), pregen_one with dedup and
disk-check, secs_until_next_run_hour time math.
E. user_ai_prefs passive mirror table + DAO for param capture in
create_reel_handler and replay in the scheduler.
Also fixes resolve_library_param signature to take &[Library] and adds
resolve_library_param_state wrapper for AppState callers.
New files: migrations/2026-06-13-000000_add_precomputed_reels/,
migrations/2026-06-13-000010_add_user_ai_prefs/,
src/database/precomputed_reel_dao.rs,
src/database/user_ai_prefs_dao.rs
This commit is contained in:
+707
-28
@@ -18,24 +18,59 @@ pub mod selector;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{LazyLock, Mutex as StdMutex};
|
||||
use std::sync::{Arc, LazyLock, Mutex, Mutex as StdMutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use actix_files::NamedFile;
|
||||
use actix_web::{HttpRequest, HttpResponse, Responder, get, post, web};
|
||||
use chrono::DateTime;
|
||||
use anyhow::{Context, anyhow};
|
||||
use chrono::{DateTime, Datelike, Timelike};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use std::sync::Mutex;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::data::Claims;
|
||||
use crate::database::{ExifDao, InsightDao};
|
||||
use crate::database::{ExifDao, InsightDao, PrecomputedReelDao, UserAiPrefsDao};
|
||||
use crate::libraries::{Library, resolve_library_param};
|
||||
use crate::memories::MemoriesSpan;
|
||||
use crate::otel::extract_context_from_request;
|
||||
use crate::state::AppState;
|
||||
use selector::ReelSelector;
|
||||
|
||||
// --- Precomputed reel age limits (hours) -------------------------------------
|
||||
|
||||
/// Maximum age for a precomputed day reel before it's considered stale.
|
||||
const REEL_PRECOMPUTED_DAY_MAX_AGE_HOURS: u64 = 26;
|
||||
/// Maximum age for a precomputed week reel.
|
||||
const REEL_PRECOMPUTED_WEEK_MAX_AGE_HOURS: u64 = 192;
|
||||
/// Maximum age for a precomputed month reel.
|
||||
const REEL_PRECOMPUTED_MONTH_MAX_AGE_HOURS: u64 = 768;
|
||||
|
||||
/// Resolve a library request parameter to a stable key string.
|
||||
/// Returns the library's id as a string when found, or `"all"` when
|
||||
/// the param is absent or the lookup fails.
|
||||
pub fn normalize_library_key(libs: &[Library], param: Option<&str>) -> String {
|
||||
match resolve_library_param(libs, param) {
|
||||
Ok(Some(lib)) => lib.id.to_string(),
|
||||
_ => "all".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Which scripting strategy to use for the reel narration.
|
||||
#[derive(Clone, Copy)]
|
||||
#[allow(dead_code)]
|
||||
pub enum ScripterMode {
|
||||
/// Fast path: single LLM call via the direct client.
|
||||
Fast,
|
||||
/// Agentic path: resolves the backend through the InsightGenerator
|
||||
/// (honouring LLM_BACKEND, model overrides, etc.). Falls back to
|
||||
/// Fast on error so a scripting failure never sinks a reel.
|
||||
Agentic,
|
||||
}
|
||||
|
||||
/// Progress callback type — receives a static-stage label.
|
||||
pub type ProgressFn<'a> = dyn Fn(&'static str) + Send + Sync + 'a;
|
||||
|
||||
/// The media behind one shot: a still photo, or a short section of a source
|
||||
/// video (played with its live audio ducked under the narration). Both carry
|
||||
/// just the library-relative path; the renderer applies fixed clip framing
|
||||
@@ -73,6 +108,8 @@ pub struct PlannedBeat {
|
||||
pub date: Option<i64>,
|
||||
pub insight_title: Option<String>,
|
||||
pub insight_summary: Option<String>,
|
||||
/// GPS coordinates of the lead media item, when available.
|
||||
pub gps: Option<(f64, f64)>,
|
||||
}
|
||||
|
||||
impl PlannedBeat {
|
||||
@@ -292,6 +329,13 @@ pub struct ReelStatusResponse {
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
/// Response shape for `GET /reels/precomputed`.
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct PrecomputedReelResponse {
|
||||
pub video_url: String,
|
||||
pub title: String,
|
||||
}
|
||||
|
||||
// --- Handlers ----------------------------------------------------------------
|
||||
|
||||
/// POST /reels — start (or instantly serve from cache) a memory reel for the
|
||||
@@ -399,8 +443,20 @@ pub async fn create_reel_handler(
|
||||
|
||||
let state = app_state.clone();
|
||||
let insight_dao = insight_dao.clone();
|
||||
let exif_dao = exif_dao.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
match run_reel_job(&state, &insight_dao, job_id, planned, meta, voice, &key).await {
|
||||
match run_reel_job(
|
||||
&state,
|
||||
&insight_dao,
|
||||
&exif_dao,
|
||||
job_id,
|
||||
planned,
|
||||
meta,
|
||||
voice,
|
||||
&key,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((title, path)) => {
|
||||
finish_job(job_id, ReelJobStatus::Done, Some(title), Some(path), None)
|
||||
}
|
||||
@@ -471,25 +527,131 @@ pub async fn reel_video_handler(
|
||||
}
|
||||
}
|
||||
|
||||
/// GET /reels/precomputed?span=&library=
|
||||
///
|
||||
/// Look up the latest precomputed reel for the given span and library key.
|
||||
/// Validity gate (all must hold, else 404):
|
||||
/// 1. `render_version == RENDER_VERSION`
|
||||
/// 2. `output_path` exists on disk
|
||||
/// 3. age <= max_age(span) (Day 26h, Week 8d, Month 32d)
|
||||
///
|
||||
/// Returns `{ video_url: "/reels/by-key/{cache_key}/video", title }`.
|
||||
#[get("/reels/precomputed")]
|
||||
pub async fn precomputed_reel_handler(
|
||||
_claims: Claims,
|
||||
query: web::Query<HashMap<String, String>>,
|
||||
app_state: web::Data<AppState>,
|
||||
reel_dao: web::Data<Mutex<Box<dyn PrecomputedReelDao>>>,
|
||||
) -> impl Responder {
|
||||
let span = query.get("span").map(|s| s.as_str()).unwrap_or("day");
|
||||
let library_key = normalize_library_key(
|
||||
&app_state.libraries,
|
||||
query.get("library").map(|s| s.as_str()),
|
||||
);
|
||||
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_secs() as i64;
|
||||
|
||||
let max_age_hours = match span {
|
||||
"week" => REEL_PRECOMPUTED_WEEK_MAX_AGE_HOURS as i64,
|
||||
"month" => REEL_PRECOMPUTED_MONTH_MAX_AGE_HOURS as i64,
|
||||
_ => REEL_PRECOMPUTED_DAY_MAX_AGE_HOURS as i64,
|
||||
};
|
||||
let min_generated_at = now - (max_age_hours * 3600);
|
||||
|
||||
let ctx = opentelemetry::Context::new();
|
||||
let mut dao = reel_dao.lock().expect("Unable to lock PrecomputedReelDao");
|
||||
|
||||
// Fast existence gate: is there a fresh row at all?
|
||||
if !dao
|
||||
.exists_fresh(
|
||||
&ctx,
|
||||
span,
|
||||
&library_key,
|
||||
RENDER_VERSION as i32,
|
||||
min_generated_at,
|
||||
)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return HttpResponse::NotFound().json(json!({ "error": "no precomputed reel found" }));
|
||||
}
|
||||
|
||||
// Fetch the latest row for full validity checks.
|
||||
let reel = match dao.latest_for(&ctx, span, &library_key) {
|
||||
Ok(Some(r)) => r,
|
||||
_ => {
|
||||
return HttpResponse::NotFound().json(json!({ "error": "no precomputed reel found" }));
|
||||
}
|
||||
};
|
||||
|
||||
// Validity gate 1: render version must match.
|
||||
if reel.render_version != RENDER_VERSION as i32 {
|
||||
return HttpResponse::NotFound()
|
||||
.json(json!({ "error": "precomputed reel is stale (render version mismatch)" }));
|
||||
}
|
||||
|
||||
// Validity gate 2: output_path must exist.
|
||||
let output = std::path::Path::new(&reel.output_path);
|
||||
if !output.exists() {
|
||||
return HttpResponse::NotFound().json(json!({ "error": "precomputed reel file missing" }));
|
||||
}
|
||||
|
||||
// Validity gate 3: age <= max_age (re-checked via min_generated_at).
|
||||
if reel.generated_at < min_generated_at {
|
||||
return HttpResponse::NotFound().json(json!({ "error": "precomputed reel has expired" }));
|
||||
}
|
||||
|
||||
HttpResponse::Ok().json(PrecomputedReelResponse {
|
||||
video_url: format!("/reels/by-key/{}/video", reel.cache_key),
|
||||
title: reel.title,
|
||||
})
|
||||
}
|
||||
|
||||
/// GET /reels/by-key/{key}/video — stream a precomputed reel MP4 by cache key.
|
||||
#[get("/reels/by-key/{key}/video")]
|
||||
pub async fn precomputed_video_handler(
|
||||
_claims: Claims,
|
||||
request: HttpRequest,
|
||||
path: web::Path<String>,
|
||||
app_state: web::Data<AppState>,
|
||||
) -> impl Responder {
|
||||
let key = path.into_inner();
|
||||
let mp4 = reel_mp4_path(&app_state, &key);
|
||||
match NamedFile::open(&mp4) {
|
||||
Ok(file) => file.into_response(&request),
|
||||
Err(e) => {
|
||||
log::error!("opening precomputed reel {key} failed: {e:?}");
|
||||
HttpResponse::NotFound().json(json!({ "error": "precomputed reel file missing" }))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Pipeline ----------------------------------------------------------------
|
||||
|
||||
/// Run the full reel pipeline: enrich → script → narrate → render → concat,
|
||||
/// then publish the MP4 into the cache. Returns (title, mp4_path).
|
||||
async fn run_reel_job(
|
||||
///
|
||||
/// The `scripter` parameter controls which narration-generation strategy is
|
||||
/// used (fast single-call vs. agentic backend resolution). On scripting
|
||||
/// failure in Agentic mode the pipeline falls back to the fast path so a
|
||||
/// single LLM failure never sinks a reel.
|
||||
pub(crate) async fn produce_reel(
|
||||
app_state: &AppState,
|
||||
insight_dao: &Mutex<Box<dyn InsightDao>>,
|
||||
job_id: Uuid,
|
||||
exif_dao: &Mutex<Box<dyn ExifDao>>,
|
||||
mut planned: Vec<PlannedBeat>,
|
||||
meta: ReelMeta,
|
||||
voice: Option<String>,
|
||||
key: &str,
|
||||
scripter: ScripterMode,
|
||||
progress: Option<&ProgressFn<'_>>,
|
||||
) -> anyhow::Result<(String, PathBuf)> {
|
||||
use anyhow::{Context, anyhow};
|
||||
|
||||
let started = Instant::now();
|
||||
let total_photos: usize = planned.iter().map(|b| b.media.len()).sum();
|
||||
log::info!(
|
||||
"reel {job_id}: starting — span {:?}, {} beats, {} photos, voice={}",
|
||||
"reel produce_reel: starting — span {:?}, {} beats, {} photos, voice={}",
|
||||
meta.span,
|
||||
planned.len(),
|
||||
total_photos,
|
||||
@@ -499,18 +661,33 @@ async fn run_reel_job(
|
||||
let client = app_state
|
||||
.llamacpp
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("TTS/LLM backend not configured"))?
|
||||
.ok_or_else(|| anyhow::anyhow!("TTS/LLM backend not configured"))?
|
||||
.clone();
|
||||
|
||||
// 1. Enrich each beat with its lead photo's cached insight, then script
|
||||
// (one LLM call → one narration line per beat).
|
||||
set_stage(job_id, "scripting");
|
||||
log::info!("reel {job_id}: scripting narration via LLM…");
|
||||
emit_progress(progress, "scripting");
|
||||
log::info!("reel produce_reel: scripting narration via LLM…");
|
||||
let span_context = opentelemetry::Context::new();
|
||||
selector::enrich(insight_dao, &span_context, &mut planned);
|
||||
let script = script::generate_script(&client, &meta, &planned).await?;
|
||||
selector::enrich(insight_dao, exif_dao, &span_context, &mut planned);
|
||||
let script = match scripter {
|
||||
ScripterMode::Fast => script::generate_script(&client, &meta, &planned).await?,
|
||||
ScripterMode::Agentic => {
|
||||
match script::generate_script_agentic(&app_state.insight_generator, &meta, &planned)
|
||||
.await
|
||||
{
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
log::warn!(
|
||||
"reel produce_reel: agentic script failed, falling back to fast: {e}"
|
||||
);
|
||||
script::generate_script(&client, &meta, &planned).await?
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
log::info!(
|
||||
"reel {job_id}: scripted \"{}\" ({} lines)",
|
||||
"reel produce_reel: scripted \"{}\" ({} lines)",
|
||||
script.title,
|
||||
script.lines.len()
|
||||
);
|
||||
@@ -519,11 +696,11 @@ async fn run_reel_job(
|
||||
// sequence under that one narration). A beat whose audio or render fails
|
||||
// is skipped (logged) rather than sinking the whole reel — handles an
|
||||
// odd HEIC/corrupt file gracefully.
|
||||
set_stage(job_id, "narrating");
|
||||
emit_progress(progress, "narrating");
|
||||
let work = tempfile::tempdir().context("creating reel work dir")?;
|
||||
let nvenc = render::is_nvenc_available().await;
|
||||
log::info!(
|
||||
"reel {job_id}: narrating + rendering {} beats (encoder: {})",
|
||||
"reel produce_reel: narrating + rendering {} beats (encoder: {})",
|
||||
planned.len(),
|
||||
if nvenc { "nvenc" } else { "cpu" }
|
||||
);
|
||||
@@ -543,7 +720,7 @@ async fn run_reel_job(
|
||||
.filter_map(|m| resolve_media_path(app_state, m))
|
||||
.collect();
|
||||
if paths.is_empty() {
|
||||
log::warn!("reel {job_id}: skipping beat {i}, no media paths resolved");
|
||||
log::warn!("reel produce_reel: skipping beat {i}, no media paths resolved");
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -558,13 +735,13 @@ async fn run_reel_job(
|
||||
{
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
log::warn!("reel {job_id}: skipping beat {i}, TTS failed: {e}");
|
||||
log::warn!("reel produce_reel: skipping beat {i}, TTS failed: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let audio_path = work.path().join(format!("narration_{i:03}.wav"));
|
||||
if let Err(e) = tokio::fs::write(&audio_path, &audio_bytes).await {
|
||||
log::warn!("reel {job_id}: skipping beat {i}, writing audio failed: {e}");
|
||||
log::warn!("reel produce_reel: skipping beat {i}, writing audio failed: {e}");
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -575,11 +752,11 @@ async fn run_reel_job(
|
||||
.flatten()
|
||||
.unwrap_or(render::MIN_SEGMENT_SECONDS);
|
||||
|
||||
set_stage(job_id, "rendering");
|
||||
emit_progress(progress, "rendering");
|
||||
let beat_out = work.path().join(format!("beat_{i:03}.mp4"));
|
||||
let render_result = if beat.is_clip() {
|
||||
log::info!(
|
||||
"reel {job_id}: beat {}/{} — video clip, narration {:.1}s",
|
||||
"reel produce_reel: beat {}/{} — video clip, narration {:.1}s",
|
||||
i + 1,
|
||||
beat_total,
|
||||
narration_secs
|
||||
@@ -587,7 +764,7 @@ async fn run_reel_job(
|
||||
render::render_clip_beat(&paths[0], &audio_path, &beat_out, narration_secs, &opts).await
|
||||
} else {
|
||||
log::info!(
|
||||
"reel {job_id}: beat {}/{} — {} photo(s), narration {:.1}s",
|
||||
"reel produce_reel: beat {}/{} — {} photo(s), narration {:.1}s",
|
||||
i + 1,
|
||||
beat_total,
|
||||
paths.len(),
|
||||
@@ -596,7 +773,7 @@ async fn run_reel_job(
|
||||
render::render_beat(&paths, &audio_path, &beat_out, narration_secs, &opts).await
|
||||
};
|
||||
if let Err(e) = render_result {
|
||||
log::warn!("reel {job_id}: skipping beat {i}, render failed: {e}");
|
||||
log::warn!("reel produce_reel: skipping beat {i}, render failed: {e}");
|
||||
continue;
|
||||
}
|
||||
beat_files.push(beat_out.to_string_lossy().to_string());
|
||||
@@ -609,9 +786,9 @@ async fn run_reel_job(
|
||||
|
||||
// 4. Concat into the cache. Write to a temp name in the reels dir, then
|
||||
// rename atomically (same filesystem) so a reader never sees a partial.
|
||||
set_stage(job_id, "rendering");
|
||||
emit_progress(progress, "rendering");
|
||||
log::info!(
|
||||
"reel {job_id}: joining {} rendered beats into the final reel",
|
||||
"reel produce_reel: joining {} rendered beats into the final reel",
|
||||
segment_files.len()
|
||||
);
|
||||
std::fs::create_dir_all(&app_state.reels_path).context("creating reels dir")?;
|
||||
@@ -629,7 +806,7 @@ async fn run_reel_job(
|
||||
let _ = std::fs::write(reel_sidecar_path(app_state, key), sidecar);
|
||||
|
||||
log::info!(
|
||||
"reel {job_id}: done in {:.1}s — {} beats → {}",
|
||||
"reel produce_reel: done in {:.1}s — {} beats → {}",
|
||||
started.elapsed().as_secs_f64(),
|
||||
segment_files.len(),
|
||||
final_path.display()
|
||||
@@ -637,6 +814,42 @@ async fn run_reel_job(
|
||||
Ok((script.title, final_path))
|
||||
}
|
||||
|
||||
/// Emit a progress stage label via the optional callback.
|
||||
fn emit_progress(progress: Option<&ProgressFn<'_>>, stage: &'static str) {
|
||||
if let Some(p) = progress {
|
||||
p(stage);
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the full reel pipeline and publish the MP4 into the cache.
|
||||
/// Thin wrapper around [`produce_reel`] that wires up job-stage tracking.
|
||||
async fn run_reel_job(
|
||||
app_state: &AppState,
|
||||
insight_dao: &Mutex<Box<dyn InsightDao>>,
|
||||
exif_dao: &Mutex<Box<dyn ExifDao>>,
|
||||
job_id: Uuid,
|
||||
planned: Vec<PlannedBeat>,
|
||||
meta: ReelMeta,
|
||||
voice: Option<String>,
|
||||
key: &str,
|
||||
) -> anyhow::Result<(String, PathBuf)> {
|
||||
let progress = move |stage: &'static str| {
|
||||
set_stage(job_id, stage);
|
||||
};
|
||||
produce_reel(
|
||||
app_state,
|
||||
insight_dao,
|
||||
exif_dao,
|
||||
planned,
|
||||
meta,
|
||||
voice,
|
||||
key,
|
||||
ScripterMode::Fast,
|
||||
Some(&progress),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Resolve a media item's library-relative path to a validated absolute path
|
||||
/// under its library root (works for both photos and clips).
|
||||
fn resolve_media_path(app_state: &AppState, media: &SegmentMedia) -> Option<PathBuf> {
|
||||
@@ -645,9 +858,280 @@ fn resolve_media_path(app_state: &AppState, media: &SegmentMedia) -> Option<Path
|
||||
crate::files::is_valid_full_path(&lib.root_path, &rel, false)
|
||||
}
|
||||
|
||||
// --- Nightly pre-generation scheduler (Section D) ----------------------------
|
||||
|
||||
/// Env: "3" (default). The hour (0-23) when the nightly pre-gen batch fires.
|
||||
fn pregen_run_hour() -> u32 {
|
||||
std::env::var("REEL_PREGEN_HOUR")
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(3)
|
||||
}
|
||||
|
||||
/// Env: "1" (default, Monday). Day of week for weekly pre-gen (0=Sun, 1=Mon, ...).
|
||||
fn pregen_week_dow() -> u32 {
|
||||
std::env::var("REEL_PREGEN_WEEK_DOW")
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(1)
|
||||
}
|
||||
|
||||
/// Pure: seconds until the next run of `run_hour` given the current local time.
|
||||
/// Handles same-day vs wrap-around. Recomputed each loop iteration to absorb
|
||||
/// DST shifts.
|
||||
pub(crate) fn secs_until_next_run_hour(now: chrono::DateTime<chrono::Local>, run_hour: u32) -> u64 {
|
||||
let now_hour = now.hour();
|
||||
let diff = if now_hour >= run_hour {
|
||||
24 - now_hour + run_hour
|
||||
} else {
|
||||
run_hour - now_hour
|
||||
};
|
||||
(diff * 3600) as u64
|
||||
}
|
||||
|
||||
/// Load pre-gen parameters: tries the user_ai_prefs DB row first, falls back
|
||||
/// to env vars, then to server-local defaults.
|
||||
fn load_pregen_params(
|
||||
prefs_dao: &web::Data<Arc<Mutex<Box<dyn UserAiPrefsDao>>>>,
|
||||
) -> (i32, Option<String>, String) {
|
||||
// Try DB row first
|
||||
if let Ok(mut dao) = prefs_dao.lock() {
|
||||
let ctx = opentelemetry::Context::new();
|
||||
if let Ok(Some(prefs)) = dao.get_prefs(&ctx) {
|
||||
let tz = prefs
|
||||
.tz_offset_minutes
|
||||
.unwrap_or_else(|| chrono::Local::now().offset().local_minus_utc());
|
||||
let voice = prefs.voice;
|
||||
let library = prefs.library.unwrap_or_else(|| "all".to_string());
|
||||
return (tz, voice, library);
|
||||
}
|
||||
}
|
||||
// Fall back to env
|
||||
let tz = std::env::var("REEL_PREGEN_TZ_OFFSET_MINUTES")
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or_else(|| chrono::Local::now().offset().local_minus_utc());
|
||||
let voice = std::env::var("REEL_PREGEN_VOICE").ok();
|
||||
let library = std::env::var("REEL_PREGEN_LIBRARY")
|
||||
.ok()
|
||||
.unwrap_or_else(|| "all".to_string());
|
||||
(tz, voice, library)
|
||||
}
|
||||
|
||||
/// Spawn the nightly pre-generation scheduler. Runs behind `REEL_PREGEN_ENABLED`.
|
||||
pub(crate) async fn spawn_pregen_scheduler(
|
||||
app_state: web::Data<AppState>,
|
||||
insight_dao: web::Data<Arc<Mutex<Box<dyn InsightDao>>>>,
|
||||
prefs_dao: web::Data<Arc<Mutex<Box<dyn UserAiPrefsDao>>>>,
|
||||
) {
|
||||
if std::env::var("REEL_PREGEN_ENABLED").ok() != Some("1".to_string()) {
|
||||
log::info!("Reel pre-generation scheduler disabled (REEL_PREGEN_ENABLED != 1)");
|
||||
return;
|
||||
}
|
||||
|
||||
let run_hour = pregen_run_hour();
|
||||
log::info!(
|
||||
"Reel pre-generation scheduler enabled, running at hour {} local",
|
||||
run_hour
|
||||
);
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let now = chrono::Local::now();
|
||||
let sleep_secs = secs_until_next_run_hour(now, run_hour);
|
||||
log::debug!("Next pre-gen run in {}s", sleep_secs);
|
||||
tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)).await;
|
||||
|
||||
if let Err(e) = run_pregen_batch(&app_state, &insight_dao, &prefs_dao).await {
|
||||
log::error!("Reel pre-generation batch failed: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Run the pre-generation batch for all applicable spans.
|
||||
async fn run_pregen_batch(
|
||||
app_state: &AppState,
|
||||
insight_dao: &web::Data<Arc<Mutex<Box<dyn InsightDao>>>>,
|
||||
prefs_dao: &web::Data<Arc<Mutex<Box<dyn UserAiPrefsDao>>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let now = chrono::Local::now();
|
||||
let weekday = now.weekday().num_days_from_sunday(); // 0=Sun, 1=Mon, ...
|
||||
let day_of_month = now.day();
|
||||
|
||||
let mut spans = vec!["day"];
|
||||
if weekday == pregen_week_dow() {
|
||||
spans.push("week");
|
||||
}
|
||||
if day_of_month == 1 {
|
||||
spans.push("month");
|
||||
}
|
||||
|
||||
let (tz, voice, library) = load_pregen_params(prefs_dao);
|
||||
|
||||
for span in spans {
|
||||
if let Err(e) = pregen_one(app_state, insight_dao, span, tz, voice.clone(), &library).await
|
||||
{
|
||||
log::error!("Pre-gen failed for span={}: {}", span, e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Pre-generate a single reel for the given span.
|
||||
async fn pregen_one(
|
||||
app_state: &AppState,
|
||||
insight_dao: &web::Data<Arc<Mutex<Box<dyn InsightDao>>>>,
|
||||
span: &str,
|
||||
tz: i32,
|
||||
voice: Option<String>,
|
||||
library: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let memories_span = match span {
|
||||
"day" => MemoriesSpan::Day,
|
||||
"week" => MemoriesSpan::Week,
|
||||
"month" => MemoriesSpan::Month,
|
||||
_ => MemoriesSpan::Day,
|
||||
};
|
||||
|
||||
let selector = ReelSelector::Memories {
|
||||
span: memories_span,
|
||||
tz_offset_minutes: tz,
|
||||
library: if library == "all" {
|
||||
None
|
||||
} else {
|
||||
Some(library.to_string())
|
||||
},
|
||||
max_segments: 24,
|
||||
};
|
||||
|
||||
let exif_dao: Arc<StdMutex<Box<dyn ExifDao>>> = Arc::new(StdMutex::new(Box::new(
|
||||
crate::database::SqliteExifDao::new(),
|
||||
)));
|
||||
let ctx = opentelemetry::Context::new();
|
||||
let (planned, reel_meta) = match selector::resolve(app_state, &exif_dao, &ctx, &selector) {
|
||||
Ok((p, m)) => (p, m),
|
||||
Err(e) => {
|
||||
log::warn!("Pre-gen resolve failed for span={}: {}", span, e);
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
if planned.is_empty() {
|
||||
log::info!("No beats for span={}, skipping", span);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Flatten every media item across beats (in order) into the cache key.
|
||||
let media: Vec<SegmentMedia> = planned.iter().flat_map(|b| b.media.clone()).collect();
|
||||
let key = cache_key(&selector, &media, voice.as_deref());
|
||||
|
||||
// Dedup: check if fresh ledger row exists
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_secs() as i64;
|
||||
|
||||
let max_age_hours = match span {
|
||||
"week" => REEL_PRECOMPUTED_WEEK_MAX_AGE_HOURS,
|
||||
"month" => REEL_PRECOMPUTED_MONTH_MAX_AGE_HOURS,
|
||||
_ => REEL_PRECOMPUTED_DAY_MAX_AGE_HOURS,
|
||||
};
|
||||
let min_generated_at = now - (max_age_hours as i64 * 3600);
|
||||
|
||||
let is_fresh = {
|
||||
let mut dao = app_state.precomputed_reel_dao.lock().expect("lock");
|
||||
dao.exists_fresh(&ctx, span, "all", RENDER_VERSION as i32, min_generated_at)
|
||||
.unwrap_or(false)
|
||||
};
|
||||
|
||||
if is_fresh {
|
||||
log::info!("Fresh precomputed reel exists for span={}, skipping", span);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Check if MP4 already on disk (from a previous run that crashed after render)
|
||||
let mp4_path = reel_mp4_path(app_state, &key);
|
||||
if mp4_path.exists() {
|
||||
log::info!(
|
||||
"Precomputed reel MP4 already exists for key={}, recording ledger and skipping render",
|
||||
key
|
||||
);
|
||||
// Read title from sidecar if available
|
||||
let sidecar_path = mp4_path.with_extension("json");
|
||||
let title = if sidecar_path.exists() {
|
||||
let sidecar = tokio::fs::read_to_string(&sidecar_path).await.ok();
|
||||
sidecar
|
||||
.and_then(|s| serde_json::from_str::<ReelSidecar>(&s).ok())
|
||||
.map(|s| s.title)
|
||||
.unwrap_or_else(|| format!("{} reel", span))
|
||||
} else {
|
||||
format!("{} reel", span)
|
||||
};
|
||||
let mut reel_dao = app_state.precomputed_reel_dao.lock().expect("lock");
|
||||
reel_dao.record_reel(
|
||||
&ctx,
|
||||
&crate::database::models::InsertablePrecomputedReel {
|
||||
span: span.to_string(),
|
||||
library_key: "all".to_string(),
|
||||
cache_key: key.clone(),
|
||||
output_path: mp4_path.to_string_lossy().to_string(),
|
||||
title,
|
||||
media_count: planned.len() as i32,
|
||||
render_version: RENDER_VERSION as i32,
|
||||
tz_offset_minutes: tz,
|
||||
voice: voice.clone(),
|
||||
generated_at: now,
|
||||
},
|
||||
)?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Generate the reel
|
||||
log::info!("Generating precomputed reel for span={}, key={}", span, key);
|
||||
let photo_count = planned.len() as i32;
|
||||
let (title, mp4) = produce_reel(
|
||||
app_state,
|
||||
insight_dao,
|
||||
&exif_dao,
|
||||
planned,
|
||||
reel_meta,
|
||||
voice.clone(),
|
||||
&key,
|
||||
ScripterMode::Agentic,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Record to ledger
|
||||
let mut reel_dao = app_state.precomputed_reel_dao.lock().expect("lock");
|
||||
reel_dao.record_reel(
|
||||
&ctx,
|
||||
&crate::database::models::InsertablePrecomputedReel {
|
||||
span: span.to_string(),
|
||||
library_key: "all".to_string(),
|
||||
cache_key: key.clone(),
|
||||
output_path: mp4.to_string_lossy().to_string(),
|
||||
title,
|
||||
media_count: photo_count,
|
||||
render_version: RENDER_VERSION as i32,
|
||||
tz_offset_minutes: tz,
|
||||
voice: voice.clone(),
|
||||
generated_at: now,
|
||||
},
|
||||
)?;
|
||||
|
||||
log::info!("Precomputed reel generated for span={}, key={}", span, key);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::ai::face_client::FaceClient;
|
||||
use crate::libraries::Library;
|
||||
use crate::video::actors::StreamActor;
|
||||
|
||||
fn photo(p: &str, lib: i32) -> SegmentMedia {
|
||||
SegmentMedia::Photo {
|
||||
@@ -672,6 +1156,128 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
/// Minimal AppState for tests that only need library lookup.
|
||||
#[allow(dead_code)]
|
||||
fn test_app_state() -> AppState {
|
||||
use crate::ai::InsightGenerator;
|
||||
use crate::ai::insight_chat::{ChatLockMap, InsightChatService};
|
||||
use crate::ai::turn_registry::TurnRegistry;
|
||||
use crate::ai::{OllamaClient, SmsApiClient};
|
||||
use crate::database::{
|
||||
ExifDao, InsightDao, InsightGenerationJobDao, PreviewDao, SqliteExifDao,
|
||||
SqliteInsightDao, SqliteInsightGenerationJobDao, SqlitePreviewDao,
|
||||
};
|
||||
use crate::faces;
|
||||
use crate::state::AppState;
|
||||
use crate::tags::SqliteTagDao;
|
||||
use actix::Actor;
|
||||
use std::sync::Mutex;
|
||||
|
||||
let temp_dir = tempfile::tempdir().expect("Failed to create temp directory");
|
||||
let base_path = temp_dir.path().to_path_buf();
|
||||
let base_path_str = base_path.to_string_lossy().to_string();
|
||||
|
||||
let test_lib = Library {
|
||||
id: crate::libraries::PRIMARY_LIBRARY_ID,
|
||||
name: "main".to_string(),
|
||||
root_path: base_path_str.clone(),
|
||||
enabled: true,
|
||||
excluded_dirs: Vec::new(),
|
||||
};
|
||||
|
||||
let ollama = OllamaClient::new(
|
||||
"http://localhost:11434".to_string(),
|
||||
None,
|
||||
"llama3.2".to_string(),
|
||||
None,
|
||||
);
|
||||
let sms_client = SmsApiClient::new("http://localhost:8000".to_string(), None);
|
||||
let apollo_client = crate::ai::apollo_client::ApolloClient::new(None);
|
||||
|
||||
let insight_dao: std::sync::Arc<Mutex<Box<dyn InsightDao>>> =
|
||||
std::sync::Arc::new(Mutex::new(Box::new(SqliteInsightDao::new())));
|
||||
let exif_dao: std::sync::Arc<Mutex<Box<dyn ExifDao>>> =
|
||||
std::sync::Arc::new(Mutex::new(Box::new(SqliteExifDao::new())));
|
||||
let daily_summary_dao: std::sync::Arc<Mutex<Box<dyn crate::database::DailySummaryDao>>> =
|
||||
std::sync::Arc::new(Mutex::new(Box::new(
|
||||
crate::database::SqliteDailySummaryDao::new(),
|
||||
)));
|
||||
let insight_generator = InsightGenerator::new(
|
||||
ollama.clone(),
|
||||
None,
|
||||
None,
|
||||
sms_client.clone(),
|
||||
apollo_client.clone(),
|
||||
insight_dao.clone(),
|
||||
exif_dao.clone(),
|
||||
daily_summary_dao,
|
||||
std::sync::Arc::new(Mutex::new(Box::new(
|
||||
crate::database::SqliteCalendarEventDao::new(),
|
||||
))),
|
||||
std::sync::Arc::new(Mutex::new(Box::new(
|
||||
crate::database::SqliteLocationHistoryDao::new(),
|
||||
))),
|
||||
std::sync::Arc::new(Mutex::new(Box::new(
|
||||
crate::database::SqliteSearchHistoryDao::new(),
|
||||
))),
|
||||
std::sync::Arc::new(Mutex::new(Box::new(SqliteTagDao::default()))),
|
||||
std::sync::Arc::new(Mutex::new(Box::new(faces::SqliteFaceDao::new()))),
|
||||
std::sync::Arc::new(Mutex::new(Box::new(
|
||||
crate::database::SqliteKnowledgeDao::new(),
|
||||
))),
|
||||
std::sync::Arc::new(Mutex::new(Box::new(
|
||||
crate::database::SqlitePersonaDao::new(),
|
||||
))),
|
||||
vec![test_lib.clone()],
|
||||
);
|
||||
|
||||
let chat_locks: ChatLockMap =
|
||||
std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new()));
|
||||
let insight_chat = std::sync::Arc::new(InsightChatService::new(
|
||||
std::sync::Arc::new(insight_generator.clone()),
|
||||
insight_dao.clone(),
|
||||
chat_locks,
|
||||
));
|
||||
let turn_registry = std::sync::Arc::new(TurnRegistry::new(300));
|
||||
let preview_dao: std::sync::Arc<Mutex<Box<dyn PreviewDao>>> =
|
||||
std::sync::Arc::new(Mutex::new(Box::new(SqlitePreviewDao::new())));
|
||||
let insight_job_dao: std::sync::Arc<Mutex<Box<dyn InsightGenerationJobDao>>> =
|
||||
std::sync::Arc::new(Mutex::new(Box::new(SqliteInsightGenerationJobDao::new())));
|
||||
let insight_job_handles: std::sync::Arc<
|
||||
Mutex<std::collections::HashMap<i32, tokio::task::AbortHandle>>,
|
||||
> = std::sync::Arc::new(Mutex::new(std::collections::HashMap::new()));
|
||||
|
||||
AppState::new(
|
||||
std::sync::Arc::new(StreamActor {}.start()),
|
||||
vec![test_lib],
|
||||
base_path_str.clone(),
|
||||
base_path_str.clone(),
|
||||
base_path_str.clone(),
|
||||
base_path_str.clone(),
|
||||
Vec::new(),
|
||||
ollama,
|
||||
None,
|
||||
Vec::new(),
|
||||
None,
|
||||
Vec::new(),
|
||||
sms_client,
|
||||
insight_generator,
|
||||
insight_chat,
|
||||
turn_registry,
|
||||
preview_dao,
|
||||
FaceClient::new(None),
|
||||
crate::ai::clip_client::ClipClient::new(None),
|
||||
insight_job_dao,
|
||||
insight_job_handles,
|
||||
std::sync::Arc::new(Mutex::new(Box::new(
|
||||
crate::database::SqlitePrecomputedReelDao::new(),
|
||||
))),
|
||||
std::sync::Arc::new(Mutex::new(Box::new(
|
||||
crate::database::SqliteUserAiPrefsDao::new(),
|
||||
))),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cache_key_is_stable_for_same_inputs() {
|
||||
let media = vec![photo("a.jpg", 1), photo("b.jpg", 1)];
|
||||
@@ -724,12 +1330,14 @@ mod tests {
|
||||
date: None,
|
||||
insight_title: None,
|
||||
insight_summary: None,
|
||||
gps: None,
|
||||
};
|
||||
let photo_beat = PlannedBeat {
|
||||
media: vec![photo("a.jpg", 1), photo("b.jpg", 1)],
|
||||
date: None,
|
||||
insight_title: None,
|
||||
insight_summary: None,
|
||||
gps: None,
|
||||
};
|
||||
assert!(clip_beat.is_clip());
|
||||
assert!(!photo_beat.is_clip());
|
||||
@@ -753,6 +1361,7 @@ mod tests {
|
||||
date: Some(1_560_384_000), // 2019-06-13 UTC
|
||||
insight_title: None,
|
||||
insight_summary: None,
|
||||
gps: None,
|
||||
};
|
||||
assert!(beat.date_label().unwrap().contains("2019"));
|
||||
|
||||
@@ -761,7 +1370,77 @@ mod tests {
|
||||
date: None,
|
||||
insight_title: None,
|
||||
insight_summary: None,
|
||||
gps: None,
|
||||
};
|
||||
assert_eq!(undated.date_label(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_library_key_returns_id_when_found_numeric() {
|
||||
let libs = vec![
|
||||
Library {
|
||||
id: 1,
|
||||
name: "main".to_string(),
|
||||
root_path: "/tmp/main".to_string(),
|
||||
enabled: true,
|
||||
excluded_dirs: Vec::new(),
|
||||
},
|
||||
Library {
|
||||
id: 7,
|
||||
name: "archive".to_string(),
|
||||
root_path: "/tmp/archive".to_string(),
|
||||
enabled: true,
|
||||
excluded_dirs: Vec::new(),
|
||||
},
|
||||
];
|
||||
assert_eq!(normalize_library_key(&libs, Some("1")), "1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_library_key_returns_id_when_found_by_name() {
|
||||
let libs = vec![Library {
|
||||
id: 1,
|
||||
name: "main".to_string(),
|
||||
root_path: "/tmp/main".to_string(),
|
||||
enabled: true,
|
||||
excluded_dirs: Vec::new(),
|
||||
}];
|
||||
assert_eq!(normalize_library_key(&libs, Some("main")), "1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_library_key_returns_all_when_absent() {
|
||||
let libs = vec![Library {
|
||||
id: 1,
|
||||
name: "main".to_string(),
|
||||
root_path: "/tmp/main".to_string(),
|
||||
enabled: true,
|
||||
excluded_dirs: Vec::new(),
|
||||
}];
|
||||
assert_eq!(normalize_library_key(&libs, None), "all");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_library_key_returns_all_when_empty() {
|
||||
let libs = vec![Library {
|
||||
id: 1,
|
||||
name: "main".to_string(),
|
||||
root_path: "/tmp/main".to_string(),
|
||||
enabled: true,
|
||||
excluded_dirs: Vec::new(),
|
||||
}];
|
||||
assert_eq!(normalize_library_key(&libs, Some("")), "all");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_library_key_returns_all_when_unknown() {
|
||||
let libs = vec![Library {
|
||||
id: 1,
|
||||
name: "main".to_string(),
|
||||
root_path: "/tmp/main".to_string(),
|
||||
enabled: true,
|
||||
excluded_dirs: Vec::new(),
|
||||
}];
|
||||
assert_eq!(normalize_library_key(&libs, Some("missing")), "all");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user