Files
ImageApi/src/reels/mod.rs
T
Cameron Cordes ca007a618d Reels pre-gen: record true media count + real upsert for user_ai_prefs
- pregen_one recorded media_count as planned.len() (beat count); record
  the actual media item total (media.len(), photos + clips) in both the
  cache-hit and freshly-rendered ledger paths. Drops the redundant
  photo_count binding.
- Replace upsert_prefs's insert-then-catch-error-then-update dance with a
  single atomic INSERT ... ON CONFLICT(id) DO UPDATE. Explicit id=1 makes
  the conflict target deterministic; explicit column .set((...)) keeps
  None -> NULL overwrite semantics so the row mirrors the latest request
  exactly, and genuine insert errors surface instead of being swallowed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-13 15:19:41 -04:00

1429 lines
49 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! Memory reels: render an MP4 slideshow of a selection of photos with an
//! LLM-written, voice-cloned narration over it.
//!
//! Pipeline: a [`selector`] resolves *which* photos (and the reel metadata),
//! the [`script`] module writes per-photo narration via the LLM, each line is
//! synthesized to speech, and [`render`] assembles the stills + narration into
//! one MP4. Jobs run in the background (mirroring the TTS speech-job registry)
//! because a reel takes minutes; the finished MP4 is cached on disk keyed by
//! the selection so a repeat request is instant.
//!
//! Phase 1 is on-demand and photos-only. The segment model is media-typed so a
//! video-clip segment (phase 2) and a nightly pre-render (phase 3) slot in
//! without reworking the pipeline.
pub mod render;
pub mod script;
pub mod selector;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{LazyLock, Mutex, Mutex as StdMutex};
use std::time::{Duration, Instant};
use actix_files::NamedFile;
use actix_web::{HttpRequest, HttpResponse, Responder, get, post, web};
use anyhow::{Context, anyhow};
use chrono::{DateTime, Datelike, Timelike};
use serde::{Deserialize, Serialize};
use serde_json::json;
use uuid::Uuid;
use crate::data::Claims;
use crate::database::{ExifDao, InsightDao};
use crate::libraries::{Library, resolve_library_param};
use crate::memories::MemoriesSpan;
use crate::otel::extract_context_from_request;
use crate::state::AppState;
use selector::ReelSelector;
// --- Precomputed reel age limits (hours) -------------------------------------
/// Maximum age for a precomputed day reel before it's considered stale.
const REEL_PRECOMPUTED_DAY_MAX_AGE_HOURS: u64 = 26;
/// Maximum age for a precomputed week reel.
const REEL_PRECOMPUTED_WEEK_MAX_AGE_HOURS: u64 = 192;
/// Maximum age for a precomputed month reel.
const REEL_PRECOMPUTED_MONTH_MAX_AGE_HOURS: u64 = 768;
/// Resolve a library request parameter to a stable key string.
/// Returns the library's id as a string when found, or `"all"` when
/// the param is absent or the lookup fails.
pub fn normalize_library_key(libs: &[Library], param: Option<&str>) -> String {
match resolve_library_param(libs, param) {
Ok(Some(lib)) => lib.id.to_string(),
_ => "all".to_string(),
}
}
/// Best-effort: mirror the latest client reel params into `user_ai_prefs`
/// so the nightly pre-gen scheduler can pick them up. Never fails the
/// caller regardless of DB errors.
fn capture_prefs(
app_state: &AppState,
req: &web::Json<CreateReelRequest>,
library_param: Option<&str>,
) -> Result<(), anyhow::Error> {
use crate::database::models::UpsertUserAiPrefs;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
let library = match library_param {
Some(p) if !p.is_empty() => {
// Resolve to the actual library id for the DB row.
normalize_library_key(&app_state.libraries, Some(p))
}
_ => "all".to_string(),
};
let mut dao = app_state.user_ai_prefs_dao.lock().expect("lock");
let ctx = opentelemetry::Context::new();
dao.upsert_prefs(
&ctx,
&UpsertUserAiPrefs {
voice: req.voice.clone().filter(|s| !s.is_empty()),
tz_offset_minutes: Some(
req.timezone_offset_minutes
.unwrap_or_else(|| chrono::Local::now().offset().local_minus_utc()),
),
library: Some(library),
updated_at: now as i64,
},
)
.map_err(|e| anyhow::anyhow!("failed to upsert user_ai_prefs: {e}"))
}
/// Which scripting strategy to use for the reel narration.
#[derive(Clone, Copy)]
pub enum ScripterMode {
/// Fast path: single LLM call via the direct client.
Fast,
/// Agentic path: resolves the backend through the InsightGenerator
/// (honouring LLM_BACKEND, model overrides, etc.). Falls back to
/// Fast on error so a scripting failure never sinks a reel.
Agentic,
}
/// Progress callback type — receives a static-stage label.
pub type ProgressFn<'a> = dyn Fn(&'static str) + Send + Sync + 'a;
/// The media behind one shot: a still photo, or a short section of a source
/// video (played with its live audio ducked under the narration). Both carry
/// just the library-relative path; the renderer applies fixed clip framing
/// (start/length) from constants.
#[derive(Debug, Clone)]
pub enum SegmentMedia {
Photo { rel_path: String, library_id: i32 },
Clip { rel_path: String, library_id: i32 },
}
impl SegmentMedia {
fn rel_path(&self) -> &str {
match self {
SegmentMedia::Photo { rel_path, .. } | SegmentMedia::Clip { rel_path, .. } => rel_path,
}
}
fn library_id(&self) -> i32 {
match self {
SegmentMedia::Photo { library_id, .. } | SegmentMedia::Clip { library_id, .. } => {
*library_id
}
}
}
}
/// A beat: one narration line over its media. A photo beat holds one still (a
/// held shot) or several (a quick burst that flashes through moments of an
/// event while the line is read). A clip beat holds a single video clip. Either
/// way one narration line covers the whole beat, so a week/month reel can
/// *show* everything it spans without a narration line — and the seconds that
/// come with it — per item.
#[derive(Debug, Clone)]
pub struct PlannedBeat {
pub media: Vec<SegmentMedia>,
pub date: Option<i64>,
pub insight_title: Option<String>,
pub insight_summary: Option<String>,
/// GPS coordinates of the lead media item, when available.
pub gps: Option<(f64, f64)>,
}
impl PlannedBeat {
/// Human date for the prompt, e.g. "June 12, 2019". `None` when undated.
pub fn date_label(&self) -> Option<String> {
let ts = self.date?;
let dt = DateTime::from_timestamp(ts, 0)?;
Some(dt.format("%B %-d, %Y").to_string())
}
/// True when this beat is a single video clip (vs one or more photos).
pub fn is_clip(&self) -> bool {
matches!(self.media.as_slice(), [SegmentMedia::Clip { .. }])
}
}
/// Reel-wide metadata the scripter uses for framing.
#[derive(Debug, Clone)]
pub struct ReelMeta {
pub span: MemoriesSpan,
pub years: Vec<i32>,
}
impl ReelMeta {
/// Natural-language phrase for the span, e.g. "on this day".
pub fn span_phrase(&self) -> &'static str {
match self.span {
MemoriesSpan::Day => "on this day",
MemoriesSpan::Week => "this week",
MemoriesSpan::Month => "this month",
}
}
}
// --- Job registry ------------------------------------------------------------
//
// In-memory, same shape as the TTS speech-job registry: a reel takes minutes,
// too long to hold one HTTP request from a phone. POST /reels returns a job id;
// the client polls GET /reels/{id} until the video URL appears. The heavy
// artifact (the MP4) lives on disk, not in this map — jobs only carry status +
// the output path. State is intentionally not durable across restarts; the
// on-disk cache is what makes a repeat request cheap, not the registry.
#[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ReelJobStatus {
Queued,
Running,
Done,
Error,
}
impl ReelJobStatus {
fn is_terminal(self) -> bool {
matches!(self, Self::Done | Self::Error)
}
}
struct ReelJob {
status: ReelJobStatus,
/// Coarse progress label for the client ("scripting", "narrating", …).
stage: &'static str,
title: Option<String>,
output_path: Option<PathBuf>,
error: Option<String>,
created_at: Instant,
finished_at: Option<Instant>,
abort: Option<tokio::task::AbortHandle>,
}
/// Finished jobs linger so a client that lost connectivity can still collect
/// the result; anything older than MAX_AGE is dropped (aborted first if somehow
/// still running). Swept lazily on each create.
const REEL_JOB_RESULT_TTL: Duration = Duration::from_secs(30 * 60);
const REEL_JOB_MAX_AGE: Duration = Duration::from_secs(60 * 60);
static REEL_JOBS: LazyLock<StdMutex<HashMap<Uuid, ReelJob>>> =
LazyLock::new(|| StdMutex::new(HashMap::new()));
fn sweep_stale_jobs(jobs: &mut HashMap<Uuid, ReelJob>, now: Instant) {
jobs.retain(|_, job| {
let result_expired = job
.finished_at
.is_some_and(|t| now.duration_since(t) >= REEL_JOB_RESULT_TTL);
let too_old = now.duration_since(job.created_at) >= REEL_JOB_MAX_AGE;
if too_old && let Some(h) = job.abort.take() {
h.abort();
}
!(result_expired || too_old)
});
}
fn with_job<R>(id: Uuid, f: impl FnOnce(&mut ReelJob) -> R) -> Option<R> {
REEL_JOBS.lock().unwrap().get_mut(&id).map(f)
}
fn set_stage(id: Uuid, stage: &'static str) {
with_job(id, |job| {
if !job.status.is_terminal() {
job.status = ReelJobStatus::Running;
job.stage = stage;
}
});
}
/// Move a job to a terminal state (first terminal write wins).
fn finish_job(
id: Uuid,
status: ReelJobStatus,
title: Option<String>,
output_path: Option<PathBuf>,
error: Option<String>,
) {
with_job(id, |job| {
if job.status.is_terminal() {
return;
}
job.status = status;
job.stage = match status {
ReelJobStatus::Done => "done",
_ => "error",
};
job.title = title;
job.output_path = output_path;
job.error = error;
job.finished_at = Some(Instant::now());
job.abort = None;
});
}
// --- On-disk cache -----------------------------------------------------------
/// Render version: bump to invalidate every cached reel after a rendering /
/// scripting change that should produce a fresh result.
const RENDER_VERSION: u32 = 7;
/// Narration expressiveness — Chatterbox's `exaggeration` knob. A slight bump
/// over the ~0.5 default warms up otherwise-flat narration without over-acting;
/// tune via `REEL_TTS_EXAGGERATION` (0.252.0).
fn reel_tts_exaggeration() -> f32 {
std::env::var("REEL_TTS_EXAGGERATION")
.ok()
.and_then(|s| s.trim().parse::<f32>().ok())
.filter(|x| x.is_finite())
.unwrap_or(0.6)
}
/// Cache key over everything that determines *which* media and *how* it's
/// voiced — but not the (non-deterministic) narration text. Same inputs → same
/// MP4 served instantly. blake3 keeps it filesystem-safe and collision-free.
fn cache_key(selector: &ReelSelector, media: &[SegmentMedia], voice: Option<&str>) -> String {
let mut buf = format!(
"v{}|{}|voice={}|",
RENDER_VERSION,
selector.descriptor(),
voice.unwrap_or("default")
);
for m in media {
// Tag photo vs clip so the same path used as a still and as a video
// clip produce different keys.
let tag = match m {
SegmentMedia::Photo { .. } => 'P',
SegmentMedia::Clip { .. } => 'C',
};
buf.push_str(&format!("{tag}{}:{}|", m.library_id(), m.rel_path()));
}
blake3::hash(buf.as_bytes()).to_hex().to_string()
}
fn reel_mp4_path(app_state: &AppState, key: &str) -> PathBuf {
Path::new(&app_state.reels_path).join(format!("{key}.mp4"))
}
fn reel_sidecar_path(app_state: &AppState, key: &str) -> PathBuf {
Path::new(&app_state.reels_path).join(format!("{key}.json"))
}
#[derive(Serialize, Deserialize)]
struct ReelSidecar {
title: String,
}
// --- HTTP types --------------------------------------------------------------
#[derive(Debug, Deserialize)]
pub struct CreateReelRequest {
#[serde(default)]
pub span: Option<MemoriesSpan>,
#[serde(default)]
pub timezone_offset_minutes: Option<i32>,
#[serde(default)]
pub library: Option<String>,
/// Cloned TTS voice for the narration; server default when omitted.
#[serde(default)]
pub voice: Option<String>,
/// Cap on photos in the reel (clamped server-side).
#[serde(default)]
pub max_segments: Option<usize>,
}
#[derive(Debug, Serialize)]
pub struct ReelJobCreatedResponse {
pub job_id: String,
pub status: ReelJobStatus,
}
#[derive(Debug, Serialize)]
pub struct ReelStatusResponse {
pub job_id: String,
pub status: ReelJobStatus,
pub stage: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub title: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub video_url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
/// Response shape for `GET /reels/precomputed`.
#[derive(Debug, Serialize)]
pub struct PrecomputedReelResponse {
pub video_url: String,
pub title: String,
}
// --- Handlers ----------------------------------------------------------------
/// POST /reels — start (or instantly serve from cache) a memory reel for the
/// requested span. Returns 202 + a job id; the client polls GET /reels/{id}.
#[post("/reels")]
pub async fn create_reel_handler(
http_request: HttpRequest,
_claims: Claims,
req: web::Json<CreateReelRequest>,
app_state: web::Data<AppState>,
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
insight_dao: web::Data<Mutex<Box<dyn InsightDao>>>,
) -> impl Responder {
let span_context = extract_context_from_request(&http_request);
if app_state.llamacpp.is_none() {
return HttpResponse::ServiceUnavailable().json(json!({
"error": "Reel narration needs the LLM/TTS backend (set LLAMA_SWAP_URL)"
}));
}
let span = req.span.unwrap_or(MemoriesSpan::Day);
let max_segments = req.max_segments.unwrap_or(selector::DEFAULT_MAX_SEGMENTS);
let selector = ReelSelector::Memories {
span,
tz_offset_minutes: req.timezone_offset_minutes.unwrap_or(0),
library: req.library.clone(),
max_segments,
};
// Cheap pass: resolve the media set for the cache key and the emptiness
// check. Insight enrichment + scripting happen in the background job.
let (planned, meta) = match selector::resolve(&app_state, &exif_dao, &span_context, &selector) {
Ok(r) => r,
Err(msg) => return HttpResponse::BadRequest().body(msg),
};
if planned.is_empty() {
return HttpResponse::UnprocessableEntity().json(json!({
"error": "No photo memories found for this span"
}));
}
// Flatten every media item across beats (in order) into the cache key — the
// key tracks exactly which photos/clips appear and in what sequence.
let media: Vec<SegmentMedia> = planned.iter().flat_map(|b| b.media.clone()).collect();
let voice = req.voice.clone().filter(|s| !s.is_empty());
let key = cache_key(&selector, &media, voice.as_deref());
let job_id = Uuid::new_v4();
log::info!(
"reel {job_id}: request span={:?} → {} beats, {} photos",
span,
planned.len(),
media.len()
);
// Cache hit: register an already-Done job pointing at the existing MP4 so
// the client's first poll returns the video URL immediately.
let mp4 = reel_mp4_path(&app_state, &key);
if mp4.exists() {
log::info!("reel {job_id}: cache hit, serving existing reel");
let title = std::fs::read(reel_sidecar_path(&app_state, &key))
.ok()
.and_then(|b| serde_json::from_slice::<ReelSidecar>(&b).ok())
.map(|s| s.title);
let mut jobs = REEL_JOBS.lock().unwrap();
sweep_stale_jobs(&mut jobs, Instant::now());
jobs.insert(
job_id,
ReelJob {
status: ReelJobStatus::Done,
stage: "done",
title,
output_path: Some(mp4),
error: None,
created_at: Instant::now(),
finished_at: Some(Instant::now()),
abort: None,
},
);
// Capture params for passive prefs mirror (best-effort, never fails).
let _ = capture_prefs(&app_state, &req, req.library.as_deref());
return HttpResponse::Accepted().json(ReelJobCreatedResponse {
job_id: job_id.to_string(),
status: ReelJobStatus::Done,
});
}
{
let mut jobs = REEL_JOBS.lock().unwrap();
sweep_stale_jobs(&mut jobs, Instant::now());
jobs.insert(
job_id,
ReelJob {
status: ReelJobStatus::Queued,
stage: "queued",
title: None,
output_path: None,
error: None,
created_at: Instant::now(),
finished_at: None,
abort: None,
},
);
}
log::info!("reel {job_id}: queued for generation");
let state = app_state.clone();
let insight_dao = insight_dao.clone();
let exif_dao = exif_dao.clone();
let handle = tokio::spawn(async move {
match run_reel_job(
&state,
&insight_dao,
&exif_dao,
job_id,
planned,
meta,
voice,
&key,
)
.await
{
Ok((title, path)) => {
finish_job(job_id, ReelJobStatus::Done, Some(title), Some(path), None)
}
Err(e) => {
log::error!("reel job {job_id} failed: {e:?}");
finish_job(
job_id,
ReelJobStatus::Error,
None,
None,
Some(format!("{e}")),
)
}
}
});
with_job(job_id, |job| job.abort = Some(handle.abort_handle()));
// Capture params for passive prefs mirror (best-effort, never fails).
let _ = capture_prefs(&app_state, &req, req.library.as_deref());
HttpResponse::Accepted().json(ReelJobCreatedResponse {
job_id: job_id.to_string(),
status: ReelJobStatus::Queued,
})
}
/// GET /reels/{id} — poll a reel job. Done jobs carry a `video_url`.
#[get("/reels/{id}")]
pub async fn reel_status_handler(_claims: Claims, path: web::Path<String>) -> impl Responder {
let id_str = path.into_inner();
let Ok(id) = Uuid::parse_str(&id_str) else {
return HttpResponse::BadRequest().json(json!({ "error": "invalid job id" }));
};
let resp = with_job(id, |job| ReelStatusResponse {
job_id: id_str.clone(),
status: job.status,
stage: job.stage.to_string(),
title: job.title.clone(),
video_url: matches!(job.status, ReelJobStatus::Done)
.then(|| format!("/reels/{id_str}/video")),
error: job.error.clone(),
});
match resp {
Some(r) => HttpResponse::Ok().json(r),
None => HttpResponse::NotFound().json(json!({ "error": "job not found or expired" })),
}
}
/// GET /reels/{id}/video — stream the finished MP4 (supports range requests via
/// NamedFile, so the mobile player can seek).
#[get("/reels/{id}/video")]
pub async fn reel_video_handler(
_claims: Claims,
request: HttpRequest,
path: web::Path<String>,
) -> impl Responder {
let id_str = path.into_inner();
let Ok(id) = Uuid::parse_str(&id_str) else {
return HttpResponse::BadRequest().json(json!({ "error": "invalid job id" }));
};
let output = with_job(id, |job| job.output_path.clone()).flatten();
let Some(path) = output else {
return HttpResponse::NotFound().json(json!({ "error": "reel not ready" }));
};
match NamedFile::open(&path) {
Ok(file) => file.into_response(&request),
Err(e) => {
log::error!("opening reel mp4 {path:?} failed: {e:?}");
HttpResponse::NotFound().json(json!({ "error": "reel file missing" }))
}
}
}
/// GET /reels/precomputed?span=&library=
///
/// Look up the latest precomputed reel for the given span and library key.
/// Validity gate (all must hold, else 404):
/// 1. `render_version == RENDER_VERSION`
/// 2. `output_path` exists on disk
/// 3. age <= max_age(span) (Day 26h, Week 8d, Month 32d)
///
/// Returns `{ video_url: "/reels/by-key/{cache_key}/video", title }`.
#[get("/reels/precomputed")]
pub async fn precomputed_reel_handler(
_claims: Claims,
query: web::Query<HashMap<String, String>>,
app_state: web::Data<AppState>,
) -> impl Responder {
let span = query.get("span").map(|s| s.as_str()).unwrap_or("day");
let library_key = normalize_library_key(
&app_state.libraries,
query.get("library").map(|s| s.as_str()),
);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as i64;
let max_age_hours = match span {
"week" => REEL_PRECOMPUTED_WEEK_MAX_AGE_HOURS as i64,
"month" => REEL_PRECOMPUTED_MONTH_MAX_AGE_HOURS as i64,
_ => REEL_PRECOMPUTED_DAY_MAX_AGE_HOURS as i64,
};
let min_generated_at = now - (max_age_hours * 3600);
let ctx = opentelemetry::Context::new();
let mut dao = app_state
.precomputed_reel_dao
.lock()
.expect("Unable to lock PrecomputedReelDao");
// Fast existence gate: is there a fresh row at all?
if !dao
.exists_fresh(
&ctx,
span,
&library_key,
RENDER_VERSION as i32,
min_generated_at,
)
.unwrap_or(false)
{
return HttpResponse::NotFound().json(json!({ "error": "no precomputed reel found" }));
}
// Fetch the latest row for full validity checks.
let reel = match dao.latest_for(&ctx, span, &library_key) {
Ok(Some(r)) => r,
_ => {
return HttpResponse::NotFound().json(json!({ "error": "no precomputed reel found" }));
}
};
// Validity gate 1: render version must match.
if reel.render_version != RENDER_VERSION as i32 {
return HttpResponse::NotFound()
.json(json!({ "error": "precomputed reel is stale (render version mismatch)" }));
}
// Validity gate 2: output_path must exist.
let output = std::path::Path::new(&reel.output_path);
if !output.exists() {
return HttpResponse::NotFound().json(json!({ "error": "precomputed reel file missing" }));
}
// Validity gate 3: age <= max_age (re-checked via min_generated_at).
if reel.generated_at < min_generated_at {
return HttpResponse::NotFound().json(json!({ "error": "precomputed reel has expired" }));
}
HttpResponse::Ok().json(PrecomputedReelResponse {
video_url: format!("/reels/by-key/{}/video", reel.cache_key),
title: reel.title,
})
}
/// GET /reels/by-key/{key}/video — stream a precomputed reel MP4 by cache key.
#[get("/reels/by-key/{key}/video")]
pub async fn precomputed_video_handler(
_claims: Claims,
request: HttpRequest,
path: web::Path<String>,
app_state: web::Data<AppState>,
) -> impl Responder {
let key = path.into_inner();
let mp4 = reel_mp4_path(&app_state, &key);
match NamedFile::open(&mp4) {
Ok(file) => file.into_response(&request),
Err(e) => {
log::error!("opening precomputed reel {key} failed: {e:?}");
HttpResponse::NotFound().json(json!({ "error": "precomputed reel file missing" }))
}
}
}
// --- Pipeline ----------------------------------------------------------------
/// Run the full reel pipeline: enrich → script → narrate → render → concat,
/// then publish the MP4 into the cache. Returns (title, mp4_path).
///
/// The `scripter` parameter controls which narration-generation strategy is
/// used (fast single-call vs. agentic backend resolution). On scripting
/// failure in Agentic mode the pipeline falls back to the fast path so a
/// single LLM failure never sinks a reel.
pub(crate) async fn produce_reel(
app_state: &AppState,
insight_dao: &Mutex<Box<dyn InsightDao>>,
exif_dao: &Mutex<Box<dyn ExifDao>>,
mut planned: Vec<PlannedBeat>,
meta: ReelMeta,
voice: Option<String>,
key: &str,
scripter: ScripterMode,
progress: Option<&ProgressFn<'_>>,
) -> anyhow::Result<(String, PathBuf)> {
let started = Instant::now();
let total_photos: usize = planned.iter().map(|b| b.media.len()).sum();
log::info!(
"reel produce_reel: starting — span {:?}, {} beats, {} photos, voice={}",
meta.span,
planned.len(),
total_photos,
voice.as_deref().unwrap_or("default")
);
let client = app_state
.llamacpp
.as_ref()
.ok_or_else(|| anyhow::anyhow!("TTS/LLM backend not configured"))?
.clone();
// 1. Enrich each beat with its lead photo's cached insight, then script
// (one LLM call → one narration line per beat).
emit_progress(progress, "scripting");
log::info!("reel produce_reel: scripting narration via LLM…");
let span_context = opentelemetry::Context::new();
selector::enrich(insight_dao, exif_dao, &span_context, &mut planned);
let script = match scripter {
ScripterMode::Fast => script::generate_script(&client, &meta, &planned).await?,
ScripterMode::Agentic => {
match script::generate_script_agentic(&app_state.insight_generator, &meta, &planned)
.await
{
Ok(s) => s,
Err(e) => {
log::warn!(
"reel produce_reel: agentic script failed, falling back to fast: {e}"
);
script::generate_script(&client, &meta, &planned).await?
}
}
}
};
log::info!(
"reel produce_reel: scripted \"{}\" ({} lines)",
script.title,
script.lines.len()
);
// 2. Narrate each beat's line and 3. render the beat (its photos shown in
// sequence under that one narration). A beat whose audio or render fails
// is skipped (logged) rather than sinking the whole reel — handles an
// odd HEIC/corrupt file gracefully.
emit_progress(progress, "narrating");
let work = tempfile::tempdir().context("creating reel work dir")?;
let nvenc = render::is_nvenc_available().await;
log::info!(
"reel produce_reel: narrating + rendering {} beats (encoder: {})",
planned.len(),
if nvenc { "nvenc" } else { "cpu" }
);
let opts = render::SegmentOpts {
nvenc,
..Default::default()
};
let beat_total = planned.len();
let mut beat_files: Vec<String> = Vec::new();
for (i, (beat, line)) in planned.iter().zip(script.lines.iter()).enumerate() {
// Resolve the beat's media to absolute paths; drop any that don't
// resolve. An empty beat is skipped.
let paths: Vec<PathBuf> = beat
.media
.iter()
.filter_map(|m| resolve_media_path(app_state, m))
.collect();
if paths.is_empty() {
log::warn!("reel produce_reel: skipping beat {i}, no media paths resolved");
continue;
}
let audio_bytes = match crate::ai::tts::synthesize_serialized(
&client,
line,
voice.as_deref(),
"wav",
Some(reel_tts_exaggeration()),
)
.await
{
Ok(b) => b,
Err(e) => {
log::warn!("reel produce_reel: skipping beat {i}, TTS failed: {e}");
continue;
}
};
let audio_path = work.path().join(format!("narration_{i:03}.wav"));
if let Err(e) = tokio::fs::write(&audio_path, &audio_bytes).await {
log::warn!("reel produce_reel: skipping beat {i}, writing audio failed: {e}");
continue;
}
let narration_secs =
crate::video::ffmpeg::get_duration_seconds(&audio_path.to_string_lossy())
.await
.ok()
.flatten()
.unwrap_or(render::MIN_SEGMENT_SECONDS);
emit_progress(progress, "rendering");
let beat_out = work.path().join(format!("beat_{i:03}.mp4"));
let render_result = if beat.is_clip() {
log::info!(
"reel produce_reel: beat {}/{} — video clip, narration {:.1}s",
i + 1,
beat_total,
narration_secs
);
render::render_clip_beat(&paths[0], &audio_path, &beat_out, narration_secs, &opts).await
} else {
log::info!(
"reel produce_reel: beat {}/{} — {} photo(s), narration {:.1}s",
i + 1,
beat_total,
paths.len(),
narration_secs
);
render::render_beat(&paths, &audio_path, &beat_out, narration_secs, &opts).await
};
if let Err(e) = render_result {
log::warn!("reel produce_reel: skipping beat {i}, render failed: {e}");
continue;
}
beat_files.push(beat_out.to_string_lossy().to_string());
}
let segment_files = beat_files;
if segment_files.is_empty() {
return Err(anyhow!("no beats rendered successfully"));
}
// 4. Concat into the cache. Write to a temp name in the reels dir, then
// rename atomically (same filesystem) so a reader never sees a partial.
emit_progress(progress, "rendering");
log::info!(
"reel produce_reel: joining {} rendered beats into the final reel",
segment_files.len()
);
std::fs::create_dir_all(&app_state.reels_path).context("creating reels dir")?;
let final_path = reel_mp4_path(app_state, key);
let tmp_path = final_path.with_extension("mp4.tmp");
render::concat_segments(&segment_files, &tmp_path).await?;
std::fs::rename(&tmp_path, &final_path).context("publishing reel mp4")?;
// Sidecar carries the title so a future cache hit can return it without
// re-running the pipeline.
let sidecar = serde_json::to_vec(&ReelSidecar {
title: script.title.clone(),
})
.context("serializing reel sidecar")?;
let _ = std::fs::write(reel_sidecar_path(app_state, key), sidecar);
log::info!(
"reel produce_reel: done in {:.1}s — {} beats → {}",
started.elapsed().as_secs_f64(),
segment_files.len(),
final_path.display()
);
Ok((script.title, final_path))
}
/// Emit a progress stage label via the optional callback.
fn emit_progress(progress: Option<&ProgressFn<'_>>, stage: &'static str) {
if let Some(p) = progress {
p(stage);
}
}
/// Run the full reel pipeline and publish the MP4 into the cache.
/// Thin wrapper around [`produce_reel`] that wires up job-stage tracking.
async fn run_reel_job(
app_state: &AppState,
insight_dao: &Mutex<Box<dyn InsightDao>>,
exif_dao: &Mutex<Box<dyn ExifDao>>,
job_id: Uuid,
planned: Vec<PlannedBeat>,
meta: ReelMeta,
voice: Option<String>,
key: &str,
) -> anyhow::Result<(String, PathBuf)> {
let progress = move |stage: &'static str| {
set_stage(job_id, stage);
};
produce_reel(
app_state,
insight_dao,
exif_dao,
planned,
meta,
voice,
key,
ScripterMode::Fast,
Some(&progress),
)
.await
}
/// Resolve a media item's library-relative path to a validated absolute path
/// under its library root (works for both photos and clips).
fn resolve_media_path(app_state: &AppState, media: &SegmentMedia) -> Option<PathBuf> {
let lib = app_state.library_by_id(media.library_id())?;
let rel = media.rel_path().to_string();
crate::files::is_valid_full_path(&lib.root_path, &rel, false)
}
// --- Nightly pre-generation scheduler (Section D) ----------------------------
/// Env: "3" (default). The hour (0-23) when the nightly pre-gen batch fires.
/// Clamped to 0-23; invalid values fall back to default.
fn pregen_run_hour() -> u32 {
std::env::var("REEL_PREGEN_HOUR")
.ok()
.and_then(|v| v.trim().parse().ok())
.filter(|h| *h <= 23)
.unwrap_or(3)
}
/// Env: "1" (default, Monday). Day of week for weekly pre-gen (0=Sun, 1=Mon, ...).
/// Clamped to 0-6; invalid values fall back to default.
fn pregen_week_dow() -> u32 {
std::env::var("REEL_PREGEN_WEEK_DOW")
.ok()
.and_then(|v| v.trim().parse().ok())
.filter(|d| *d <= 6)
.unwrap_or(1)
}
/// Pure: seconds until the next `run_hour:00:00` strictly after `now`.
///
/// Minute/second-accurate (not just hour-granular): when `now` is already at or
/// past the target this wraps to the same hour tomorrow, so a batch that
/// finishes inside the run hour sleeps ~24h rather than busy-looping (waking,
/// re-running, and re-sleeping 0s) for the rest of that hour. The tradeoff is
/// that booting at or after `run_hour` waits until the next day. Recomputed each
/// loop iteration from `Local::now()` so DST shifts are absorbed.
pub(crate) fn secs_until_next_run_hour(now: chrono::DateTime<chrono::Local>, run_hour: u32) -> u64 {
let now_secs = now.hour() * 3600 + now.minute() * 60 + now.second();
let target_secs = run_hour * 3600;
let diff = if target_secs > now_secs {
target_secs - now_secs
} else {
86_400 - now_secs + target_secs
};
diff as u64
}
/// Load pre-gen parameters: tries the user_ai_prefs DB row first, falls back
/// to env vars, then to server-local defaults.
fn load_pregen_params(app_state: &AppState) -> (i32, Option<String>, String) {
// Try DB row first
if let Ok(mut dao) = app_state.user_ai_prefs_dao.lock() {
let ctx = opentelemetry::Context::new();
if let Ok(Some(prefs)) = dao.get_prefs(&ctx) {
let tz = prefs.tz_offset_minutes.unwrap_or_else(fixed_tz_offset);
let voice = prefs.voice;
let library = prefs.library.unwrap_or_else(|| "all".to_string());
return (tz, voice, library);
}
}
// Fall back to env (explicit offset overrides auto-detect)
let tz = std::env::var("REEL_PREGEN_TZ_OFFSET_MINUTES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or_else(fixed_tz_offset);
let voice = std::env::var("REEL_PREGEN_VOICE").ok();
let library = std::env::var("REEL_PREGEN_LIBRARY")
.ok()
.unwrap_or_else(|| "all".to_string());
(tz, voice, library)
}
/// Fixed timezone offset: reads `REEL_PREGEN_TZ_FIXED_MINUTES` (e.g. "-480"
/// for US Eastern) when set, falling back to the system local offset. Using
/// a fixed offset avoids DST shifts changing the pre-gen schedule halfway
/// through the year.
fn fixed_tz_offset() -> i32 {
std::env::var("REEL_PREGEN_TZ_FIXED_MINUTES")
.ok()
.and_then(|v| v.trim().parse().ok())
.unwrap_or_else(|| chrono::Local::now().offset().local_minus_utc())
}
/// Spawn the nightly pre-generation scheduler. Runs behind `REEL_PREGEN_ENABLED`.
pub(crate) async fn spawn_pregen_scheduler(app_state: web::Data<AppState>) {
if std::env::var("REEL_PREGEN_ENABLED").ok() != Some("1".to_string()) {
log::info!("Reel pre-generation scheduler disabled (REEL_PREGEN_ENABLED != 1)");
return;
}
let run_hour = pregen_run_hour();
log::info!(
"Reel pre-generation scheduler enabled, running at hour {} local",
run_hour
);
tokio::spawn(async move {
loop {
let now = chrono::Local::now();
let sleep_secs = secs_until_next_run_hour(now, run_hour);
log::debug!("Next pre-gen run in {}s", sleep_secs);
tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)).await;
if let Err(e) = run_pregen_batch(&app_state).await {
log::error!("Reel pre-generation batch failed: {}", e);
}
}
});
}
/// Run the pre-generation batch for all applicable spans.
async fn run_pregen_batch(app_state: &AppState) -> anyhow::Result<()> {
let now = chrono::Local::now();
let weekday = now.weekday().num_days_from_sunday(); // 0=Sun, 1=Mon, ...
let day_of_month = now.day();
let mut spans = vec!["day"];
if weekday == pregen_week_dow() {
spans.push("week");
}
if day_of_month == 1 {
spans.push("month");
}
let (tz, voice, library) = load_pregen_params(app_state);
for span in spans {
if let Err(e) = pregen_one(app_state, span, tz, voice.clone(), &library).await {
log::error!("Pre-gen failed for span={}: {}", span, e);
}
}
Ok(())
}
/// Pre-generate a single reel for the given span.
async fn pregen_one(
app_state: &AppState,
span: &str,
tz: i32,
voice: Option<String>,
library: &str,
) -> anyhow::Result<()> {
let memories_span = match span {
"day" => MemoriesSpan::Day,
"week" => MemoriesSpan::Week,
"month" => MemoriesSpan::Month,
_ => MemoriesSpan::Day,
};
let selector = ReelSelector::Memories {
span: memories_span,
tz_offset_minutes: tz,
library: if library == "all" {
None
} else {
Some(library.to_string())
},
max_segments: 24,
};
let exif_dao = app_state.insight_generator.exif_dao();
let insight_dao = app_state.insight_generator.insight_dao();
let ctx = opentelemetry::Context::new();
let (planned, reel_meta) = match selector::resolve(app_state, exif_dao, &ctx, &selector) {
Ok((p, m)) => (p, m),
Err(e) => {
log::warn!("Pre-gen resolve failed for span={}: {}", span, e);
return Ok(());
}
};
if planned.is_empty() {
log::info!("No beats for span={}, skipping", span);
return Ok(());
}
// Flatten every media item across beats (in order) into the cache key.
let media: Vec<SegmentMedia> = planned.iter().flat_map(|b| b.media.clone()).collect();
let key = cache_key(&selector, &media, voice.as_deref());
// Total media items shown (photos + clips), not beat count.
let media_count = media.len() as i32;
// Dedup: check if fresh ledger row exists
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as i64;
let max_age_hours = match span {
"week" => REEL_PRECOMPUTED_WEEK_MAX_AGE_HOURS,
"month" => REEL_PRECOMPUTED_MONTH_MAX_AGE_HOURS,
_ => REEL_PRECOMPUTED_DAY_MAX_AGE_HOURS,
};
let min_generated_at = now - (max_age_hours as i64 * 3600);
let is_fresh = {
let mut dao = app_state.precomputed_reel_dao.lock().expect("lock");
dao.exists_fresh(&ctx, span, library, RENDER_VERSION as i32, min_generated_at)
.unwrap_or(false)
};
if is_fresh {
log::info!("Fresh precomputed reel exists for span={}, skipping", span);
return Ok(());
}
// Check if MP4 already on disk (from a previous run that crashed after render)
let mp4_path = reel_mp4_path(app_state, &key);
if mp4_path.exists() {
log::info!(
"Precomputed reel MP4 already exists for key={}, recording ledger and skipping render",
key
);
// Read title from sidecar if available
let sidecar_path = mp4_path.with_extension("json");
let title = if sidecar_path.exists() {
let sidecar = tokio::fs::read_to_string(&sidecar_path).await.ok();
sidecar
.and_then(|s| serde_json::from_str::<ReelSidecar>(&s).ok())
.map(|s| s.title)
.unwrap_or_else(|| format!("{} reel", span))
} else {
format!("{} reel", span)
};
let mut reel_dao = app_state.precomputed_reel_dao.lock().expect("lock");
reel_dao.record_reel(
&ctx,
&crate::database::models::InsertablePrecomputedReel {
span: span.to_string(),
library_key: library.to_string(),
cache_key: key.clone(),
output_path: mp4_path.to_string_lossy().to_string(),
title,
media_count,
render_version: RENDER_VERSION as i32,
tz_offset_minutes: tz,
voice: voice.clone(),
generated_at: now,
},
)?;
return Ok(());
}
// Generate the reel
log::info!("Generating precomputed reel for span={}, key={}", span, key);
let (title, mp4) = produce_reel(
app_state,
insight_dao,
exif_dao,
planned,
reel_meta,
voice.clone(),
&key,
ScripterMode::Agentic,
None,
)
.await?;
// Record to ledger
let mut reel_dao = app_state.precomputed_reel_dao.lock().expect("lock");
reel_dao.record_reel(
&ctx,
&crate::database::models::InsertablePrecomputedReel {
span: span.to_string(),
library_key: library.to_string(),
cache_key: key.clone(),
output_path: mp4.to_string_lossy().to_string(),
title,
media_count,
render_version: RENDER_VERSION as i32,
tz_offset_minutes: tz,
voice: voice.clone(),
generated_at: now,
},
)?;
log::info!("Precomputed reel generated for span={}, key={}", span, key);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::libraries::Library;
use chrono::TimeZone;
fn photo(p: &str, lib: i32) -> SegmentMedia {
SegmentMedia::Photo {
rel_path: p.to_string(),
library_id: lib,
}
}
fn clip(p: &str, lib: i32) -> SegmentMedia {
SegmentMedia::Clip {
rel_path: p.to_string(),
library_id: lib,
}
}
fn day_selector() -> ReelSelector {
ReelSelector::Memories {
span: MemoriesSpan::Day,
tz_offset_minutes: 0,
library: None,
max_segments: 24,
}
}
#[test]
fn cache_key_is_stable_for_same_inputs() {
let media = vec![photo("a.jpg", 1), photo("b.jpg", 1)];
let k1 = cache_key(&day_selector(), &media, Some("grandma"));
let k2 = cache_key(&day_selector(), &media, Some("grandma"));
assert_eq!(k1, k2);
// 64-hex blake3.
assert_eq!(k1.len(), 64);
assert!(k1.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn cache_key_changes_with_media_order_voice_and_selector() {
let media = vec![photo("a.jpg", 1), photo("b.jpg", 1)];
let reordered = vec![photo("b.jpg", 1), photo("a.jpg", 1)];
let base = cache_key(&day_selector(), &media, Some("grandma"));
// Order matters (the reel sequence differs).
assert_ne!(
base,
cache_key(&day_selector(), &reordered, Some("grandma"))
);
// Voice matters.
assert_ne!(base, cache_key(&day_selector(), &media, Some("dad")));
assert_ne!(base, cache_key(&day_selector(), &media, None));
// Span matters.
let week = ReelSelector::Memories {
span: MemoriesSpan::Week,
tz_offset_minutes: 0,
library: None,
max_segments: 24,
};
assert_ne!(base, cache_key(&week, &media, Some("grandma")));
}
#[test]
fn cache_key_distinguishes_photo_from_clip() {
// Same path/library used as a still vs a video clip must differ.
let as_photo = vec![photo("v.mp4", 1)];
let as_clip = vec![clip("v.mp4", 1)];
assert_ne!(
cache_key(&day_selector(), &as_photo, None),
cache_key(&day_selector(), &as_clip, None)
);
}
#[test]
fn is_clip_only_for_single_clip_beat() {
let clip_beat = PlannedBeat {
media: vec![clip("v.mp4", 1)],
date: None,
insight_title: None,
insight_summary: None,
gps: None,
};
let photo_beat = PlannedBeat {
media: vec![photo("a.jpg", 1), photo("b.jpg", 1)],
date: None,
insight_title: None,
insight_summary: None,
gps: None,
};
assert!(clip_beat.is_clip());
assert!(!photo_beat.is_clip());
}
#[test]
fn span_phrase_maps_each_span() {
let mk = |span| ReelMeta {
span,
years: vec![],
};
assert_eq!(mk(MemoriesSpan::Day).span_phrase(), "on this day");
assert_eq!(mk(MemoriesSpan::Week).span_phrase(), "this week");
assert_eq!(mk(MemoriesSpan::Month).span_phrase(), "this month");
}
#[test]
fn date_label_formats_or_none() {
let beat = PlannedBeat {
media: vec![photo("a.jpg", 1)],
date: Some(1_560_384_000), // 2019-06-13 UTC
insight_title: None,
insight_summary: None,
gps: None,
};
assert!(beat.date_label().unwrap().contains("2019"));
let undated = PlannedBeat {
media: vec![photo("a.jpg", 1)],
date: None,
insight_title: None,
insight_summary: None,
gps: None,
};
assert_eq!(undated.date_label(), None);
}
#[test]
fn normalize_library_key_returns_id_when_found_numeric() {
let libs = vec![
Library {
id: 1,
name: "main".to_string(),
root_path: "/tmp/main".to_string(),
enabled: true,
excluded_dirs: Vec::new(),
},
Library {
id: 7,
name: "archive".to_string(),
root_path: "/tmp/archive".to_string(),
enabled: true,
excluded_dirs: Vec::new(),
},
];
assert_eq!(normalize_library_key(&libs, Some("1")), "1");
}
#[test]
fn normalize_library_key_returns_id_when_found_by_name() {
let libs = vec![Library {
id: 1,
name: "main".to_string(),
root_path: "/tmp/main".to_string(),
enabled: true,
excluded_dirs: Vec::new(),
}];
assert_eq!(normalize_library_key(&libs, Some("main")), "1");
}
#[test]
fn normalize_library_key_returns_all_when_absent() {
let libs = vec![Library {
id: 1,
name: "main".to_string(),
root_path: "/tmp/main".to_string(),
enabled: true,
excluded_dirs: Vec::new(),
}];
assert_eq!(normalize_library_key(&libs, None), "all");
}
#[test]
fn normalize_library_key_returns_all_when_empty() {
let libs = vec![Library {
id: 1,
name: "main".to_string(),
root_path: "/tmp/main".to_string(),
enabled: true,
excluded_dirs: Vec::new(),
}];
assert_eq!(normalize_library_key(&libs, Some("")), "all");
}
#[test]
fn normalize_library_key_returns_all_when_unknown() {
let libs = vec![Library {
id: 1,
name: "main".to_string(),
root_path: "/tmp/main".to_string(),
enabled: true,
excluded_dirs: Vec::new(),
}];
assert_eq!(normalize_library_key(&libs, Some("missing")), "all");
}
#[test]
fn secs_until_next_run_hour_within_run_hour_wraps_to_tomorrow() {
// 03:30, run 3 → already past today's 03:00, so wait until tomorrow
// 03:00 (23h30m). Crucially NOT 0 — that would busy-loop the scheduler
// for the rest of the hour.
let dt = chrono::Local
.with_ymd_and_hms(2026, 6, 13, 3, 30, 0)
.single()
.expect("valid datetime");
assert_eq!(secs_until_next_run_hour(dt, 3), 23 * 3600 + 30 * 60);
}
#[test]
fn secs_until_next_run_hour_future_today_counts_minutes() {
// 10:15 → 14:00 is 3h45m, not a whole-hour 4h (minutes count).
let dt = chrono::Local
.with_ymd_and_hms(2026, 6, 13, 10, 15, 0)
.single()
.expect("valid datetime");
assert_eq!(secs_until_next_run_hour(dt, 14), 3 * 3600 + 45 * 60);
}
#[test]
fn secs_until_next_run_hour_past_today_wraps() {
let dt = chrono::Local
.with_ymd_and_hms(2026, 6, 13, 20, 0, 0)
.single()
.expect("valid datetime");
assert_eq!(secs_until_next_run_hour(dt, 3), (24 - 20 + 3) * 3600);
}
#[test]
fn secs_until_next_run_hour_midnight() {
let dt = chrono::Local
.with_ymd_and_hms(2026, 6, 13, 0, 0, 0)
.single()
.expect("valid datetime");
// 0:00, run at 3 → 3 hours
assert_eq!(secs_until_next_run_hour(dt, 3), 3 * 3600);
// 0:00 exactly, run at 0 → wraps to next midnight (not 0, so no busy loop)
assert_eq!(secs_until_next_run_hour(dt, 0), 86_400);
}
#[test]
fn secs_until_next_run_hour_just_before_target() {
// 23:30, run 0 → 30 minutes to midnight (minute-accurate, not 1h).
let dt = chrono::Local
.with_ymd_and_hms(2026, 6, 13, 23, 30, 0)
.single()
.expect("valid datetime");
assert_eq!(secs_until_next_run_hour(dt, 0), 30 * 60);
// 23:30, run 23 → already past today's 23:00, wait until tomorrow.
assert_eq!(secs_until_next_run_hour(dt, 23), 86_400 - 30 * 60);
}
}