From f707353807327d3ab3a88c039308ba953be0159a Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Sat, 13 Jun 2026 14:29:34 -0400 Subject: [PATCH] feat: nightly agentic pre-generation of memory reels Implement end-to-end nightly pre-generation of memory reels with agentic scripting that grounds narration in calendar, location, messages, and RAG. Sections A-E from the plan: A. Extract produce_reel pipeline core from run_reel_job with ScripterMode::Fast/Agentic and progress callbacks. B. Agentic scripter: factor run_readonly_tool_loop from the insight generator, build read-only tool gate, prompt builder with GPS, and generate_script_agentic with fallback to fast path. C. Precomputed reels ledger (SQLite table + DAO), GET /reels/precomputed handler with validity gate, GET /reels/by-key/{key}/video streaming, and normalize_library_key helper. D. Nightly scheduler: spawn_pregen_scheduler with configurable hour, run_pregen_batch (day/week/month spans), pregen_one with dedup and disk-check, secs_until_next_run_hour time math. E. user_ai_prefs passive mirror table + DAO for param capture in create_reel_handler and replay in the scheduler. Also fixes resolve_library_param signature to take &[Library] and adds resolve_library_param_state wrapper for AppState callers. New files: migrations/2026-06-13-000000_add_precomputed_reels/, migrations/2026-06-13-000010_add_user_ai_prefs/, src/database/precomputed_reel_dao.rs, src/database/user_ai_prefs_dao.rs --- .env.example | 16 + .../down.sql | 2 + .../up.sql | 14 + .../down.sql | 1 + .../up.sql | 7 + src/ai/handlers.rs | 91 ++- src/ai/insight_generator.rs | 102 +++ src/ai/tts.rs | 2 +- src/database/mod.rs | 4 + src/database/models.rs | 56 +- src/database/precomputed_reel_dao.rs | 321 ++++++++ src/database/schema.rs | 28 + src/database/user_ai_prefs_dao.rs | 212 +++++ src/duplicates.rs | 6 +- src/faces.rs | 19 +- src/files.rs | 18 +- src/handlers/image.rs | 16 +- src/handlers/video.rs | 9 +- src/libraries.rs | 79 +- src/main.rs | 21 + src/memories.rs | 2 +- src/reels/mod.rs | 735 +++++++++++++++++- src/reels/script.rs | 159 +++- src/reels/selector.rs | 24 +- src/state.rs | 32 +- src/tags.rs | 2 +- 26 files changed, 1825 insertions(+), 153 deletions(-) create mode 100644 migrations/2026-06-13-000000_add_precomputed_reels/down.sql create mode 100644 migrations/2026-06-13-000000_add_precomputed_reels/up.sql create mode 100644 migrations/2026-06-13-000010_add_user_ai_prefs/down.sql create mode 100644 migrations/2026-06-13-000010_add_user_ai_prefs/up.sql create mode 100644 src/database/precomputed_reel_dao.rs create mode 100644 src/database/user_ai_prefs_dao.rs diff --git a/.env.example b/.env.example index a45fdd5..bafc0c8 100644 --- a/.env.example +++ b/.env.example @@ -139,3 +139,19 @@ CLIP_REQUEST_TIMEOUT_SEC=60 # ── RAG / search ──────────────────────────────────────────────────────── # Set to `1` to enable cross-encoder reranking on /search results. SEARCH_RAG_RERANK=0 + +# ── Nightly reel pre-generation (Phase 3+) ────────────────────────────── +# Set to `1` to enable the scheduler. Disabled by default. +# REEL_PREGEN_ENABLED=1 +# Hour (0-23) when the nightly batch fires. Default 3 AM. +# REEL_PREGEN_HOUR=3 +# Day of week for weekly reels (0=Sun, 1=Mon, …). Default Monday. +# REEL_PREGEN_WEEK_DOW=1 +# Timezone offset in minutes from UTC (e.g., -480 = PST). Defaults to +# the server's local timezone. +# REEL_PREGEN_TZ_OFFSET_MINUTES= +# Voice ID for narration (e.g., "grandma"). Falls back to the value +# stored in the user_ai_prefs DB row when set. +# REEL_PREGEN_VOICE= +# Library filter: a library id (e.g. "1") or "all" for every library. +# REEL_PREGEN_LIBRARY=all diff --git a/migrations/2026-06-13-000000_add_precomputed_reels/down.sql b/migrations/2026-06-13-000000_add_precomputed_reels/down.sql new file mode 100644 index 0000000..91863c2 --- /dev/null +++ b/migrations/2026-06-13-000000_add_precomputed_reels/down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS idx_precomputed_reels_span_library; +DROP TABLE IF EXISTS precomputed_reels; diff --git a/migrations/2026-06-13-000000_add_precomputed_reels/up.sql b/migrations/2026-06-13-000000_add_precomputed_reels/up.sql new file mode 100644 index 0000000..ba49b72 --- /dev/null +++ b/migrations/2026-06-13-000000_add_precomputed_reels/up.sql @@ -0,0 +1,14 @@ +CREATE TABLE precomputed_reels ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + span TEXT NOT NULL, + library_key TEXT NOT NULL, + cache_key TEXT NOT NULL, + output_path TEXT NOT NULL, + title TEXT NOT NULL, + media_count INT NOT NULL, + render_version INT NOT NULL DEFAULT 1, + tz_offset_minutes INT NOT NULL, + voice TEXT, + generated_at BIGINT NOT NULL +); +CREATE INDEX idx_precomputed_reels_span_library ON precomputed_reels(span, library_key, generated_at DESC); diff --git a/migrations/2026-06-13-000010_add_user_ai_prefs/down.sql b/migrations/2026-06-13-000010_add_user_ai_prefs/down.sql new file mode 100644 index 0000000..83b82a3 --- /dev/null +++ b/migrations/2026-06-13-000010_add_user_ai_prefs/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS user_ai_prefs; diff --git a/migrations/2026-06-13-000010_add_user_ai_prefs/up.sql b/migrations/2026-06-13-000010_add_user_ai_prefs/up.sql new file mode 100644 index 0000000..fd8f6f2 --- /dev/null +++ b/migrations/2026-06-13-000010_add_user_ai_prefs/up.sql @@ -0,0 +1,7 @@ +CREATE TABLE user_ai_prefs ( + id INTEGER PRIMARY KEY CHECK(id=1), + voice TEXT, + tz_offset_minutes INTEGER, + library TEXT, + updated_at BIGINT NOT NULL +); diff --git a/src/ai/handlers.rs b/src/ai/handlers.rs index cb21b14..c6bc212 100644 --- a/src/ai/handlers.rs +++ b/src/ai/handlers.rs @@ -120,7 +120,7 @@ pub async fn generation_status_handler( } if let Some(ref fp) = query.path { - let library = libraries::resolve_library_param(&app_state, query.library.as_deref()) + let library = libraries::resolve_library_param_state(&app_state, query.library.as_deref()) .ok() .flatten() .unwrap_or_else(|| app_state.primary_library()); @@ -218,10 +218,11 @@ pub async fn cancel_generation_handler( } if let Some(ref fp) = request.file_path { - let library = libraries::resolve_library_param(&app_state, request.library.as_deref()) - .ok() - .flatten() - .unwrap_or_else(|| app_state.primary_library()); + let library = + libraries::resolve_library_param_state(&app_state, request.library.as_deref()) + .ok() + .flatten() + .unwrap_or_else(|| app_state.primary_library()); let normalized = normalize_path(fp); // Get active job ids first, then cancel in DB, then abort tasks @@ -580,7 +581,7 @@ pub async fn get_insight_handler( // Expand to rel_paths sharing content so an insight generated under // library 1 still shows when the same photo is viewed from library 2. - let library = libraries::resolve_library_param(&app_state, query.library.as_deref()) + let library = libraries::resolve_library_param_state(&app_state, query.library.as_deref()) .ok() .flatten() .unwrap_or_else(|| app_state.primary_library()); @@ -1218,15 +1219,16 @@ pub async fn chat_turn_handler( let mut span = tracer.start_with_context("http.insights.chat", &parent_context); span.set_attribute(KeyValue::new("file_path", request.file_path.clone())); - let library = match libraries::resolve_library_param(&app_state, request.library.as_deref()) { - Ok(Some(lib)) => lib, - Ok(None) => app_state.primary_library(), - Err(e) => { - return HttpResponse::BadRequest().json(serde_json::json!({ - "error": format!("invalid library: {}", e) - })); - } - }; + let library = + match libraries::resolve_library_param_state(&app_state, request.library.as_deref()) { + Ok(Some(lib)) => lib, + Ok(None) => app_state.primary_library(), + Err(e) => { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": format!("invalid library: {}", e) + })); + } + }; // Service-token claims (sub: "service:apollo") fall through to // user_id=1 — the operator convention. Mobile/web clients have a @@ -1344,15 +1346,16 @@ pub async fn chat_rewind_handler( request: web::Json, app_state: web::Data, ) -> impl Responder { - let library = match libraries::resolve_library_param(&app_state, request.library.as_deref()) { - Ok(Some(lib)) => lib, - Ok(None) => app_state.primary_library(), - Err(e) => { - return HttpResponse::BadRequest().json(serde_json::json!({ - "error": format!("invalid library: {}", e) - })); - } - }; + let library = + match libraries::resolve_library_param_state(&app_state, request.library.as_deref()) { + Ok(Some(lib)) => lib, + Ok(None) => app_state.primary_library(), + Err(e) => { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": format!("invalid library: {}", e) + })); + } + }; match app_state .insight_chat @@ -1393,7 +1396,7 @@ pub async fn chat_history_handler( // cross-library lookup when the scoped one misses, so a photo // with no insight in this library but one in another still // surfaces (the "show this photo's primary insight" merge case). - let library = libraries::resolve_library_param(&app_state, query.library.as_deref()) + let library = libraries::resolve_library_param_state(&app_state, query.library.as_deref()) .ok() .flatten() .unwrap_or_else(|| app_state.primary_library()); @@ -1444,15 +1447,16 @@ pub async fn chat_stream_handler( request: web::Json, app_state: web::Data, ) -> HttpResponse { - let library = match libraries::resolve_library_param(&app_state, request.library.as_deref()) { - Ok(Some(lib)) => lib, - Ok(None) => app_state.primary_library(), - Err(e) => { - return HttpResponse::BadRequest().json(serde_json::json!({ - "error": format!("invalid library: {}", e) - })); - } - }; + let library = + match libraries::resolve_library_param_state(&app_state, request.library.as_deref()) { + Ok(Some(lib)) => lib, + Ok(None) => app_state.primary_library(), + Err(e) => { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": format!("invalid library: {}", e) + })); + } + }; // Service-token sub falls through to user_id=1 (see chat_turn_handler). let user_id = claims.sub.parse::().unwrap_or(1); @@ -1589,15 +1593,16 @@ pub async fn turn_async_handler( let mut span = tracer.start_with_context("http.insights.chat_turn_async", &parent_context); span.set_attribute(KeyValue::new("file_path", request.file_path.clone())); - let library = match libraries::resolve_library_param(&app_state, request.library.as_deref()) { - Ok(Some(lib)) => lib, - Ok(None) => app_state.primary_library(), - Err(e) => { - return HttpResponse::BadRequest().json(serde_json::json!({ - "error": format!("invalid library: {}", e) - })); - } - }; + let library = + match libraries::resolve_library_param_state(&app_state, request.library.as_deref()) { + Ok(Some(lib)) => lib, + Ok(None) => app_state.primary_library(), + Err(e) => { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": format!("invalid library: {}", e) + })); + } + }; let user_id = claims.sub.parse::().unwrap_or(1); diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index 3673c43..4871c2e 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -4497,6 +4497,108 @@ Return ONLY the summary, nothing else."#, )) } + /// A read-only agentic tool loop: chat with tools until the model stops + /// calling them, then return the final content. + /// + /// This is the loop body extracted from + /// `generate_agentic_insight_for_photo` (lines 4316-4377) so it can be + /// reused by the reel-scripter without the photo-specific context + /// (image_base64, file_path, persona_id). The photo insight loop still + /// has its own copy because it threads image/file context through + /// `execute_tool`. + /// + /// Calls `execute_tool` with empty file/image context; enabled tools + /// never read those fields. + #[allow(dead_code)] + pub(crate) async fn run_readonly_tool_loop( + &self, + backend: &ResolvedBackend, + mut messages: Vec, + tools: Vec, + max_iter: usize, + ) -> Result { + let mut final_content = String::new(); + + for iteration in 0..max_iter { + log::info!("Agentic iteration {}/{}", iteration + 1, max_iter); + + let (response, _prompt_tokens, _eval_tokens) = backend + .chat() + .chat_with_tools(messages.clone(), tools.clone()) + .await?; + + // Sanitize tool call arguments before pushing back into history. + // Some models occasionally return non-object arguments (bool, + // string, null) which Ollama rejects when they are re-sent in + // a subsequent request. + let mut response = response; + if let Some(ref mut tool_calls) = response.tool_calls { + for tc in tool_calls.iter_mut() { + if !tc.function.arguments.is_object() { + log::warn!( + "Tool '{}' returned non-object arguments ({:?}), normalising to {{}}", + tc.function.name, + tc.function.arguments + ); + tc.function.arguments = serde_json::Value::Object(Default::default()); + } + } + } + + messages.push(response.clone()); + + if let Some(ref tool_calls) = response.tool_calls + && !tool_calls.is_empty() + { + for tool_call in tool_calls { + log::info!( + "Agentic tool call [{}]: {} {}", + iteration, + tool_call.function.name, + tool_call.function.arguments + ); + let result = self + .execute_tool( + &tool_call.function.name, + &tool_call.function.arguments, + backend, + &None, + "", + 0, + "", + &opentelemetry::Context::new(), + ) + .await; + messages.push(ChatMessage::tool_result(result)); + } + continue; + } + + // No tool calls — this is the final answer + final_content = response.content; + break; + } + + // If loop exhausted without final answer, ask for one + if final_content.is_empty() { + log::info!( + "Agentic loop exhausted after {} iterations, requesting final answer", + max_iter + ); + messages.push(ChatMessage::user( + "Based on the context gathered, please write the final answer. Return ONLY the JSON object, no prose or code fences.", + )); + let (final_response, _, _) = backend + .chat() + .chat_with_tools(messages.clone(), vec![]) + .await?; + final_content = final_response.content.clone(); + messages.push(final_response); + } + + Ok(final_content) + } + /// Reverse geocode GPS coordinates to human-readable place names async fn reverse_geocode(&self, lat: f64, lon: f64) -> Option { let url = format!( diff --git a/src/ai/tts.rs b/src/ai/tts.rs index a9a610a..d6ef89d 100644 --- a/src/ai/tts.rs +++ b/src/ai/tts.rs @@ -1020,7 +1020,7 @@ pub async fn create_voice_from_library_handler( let voice_name = append_ref_window(&voice_name, ref_start, ref_duration.round().max(1.0) as u32); - let library = match libraries::resolve_library_param(&app_state, req.library.as_deref()) { + let library = match libraries::resolve_library_param_state(&app_state, req.library.as_deref()) { Ok(Some(l)) => l, Ok(None) => app_state.primary_library(), Err(msg) => { diff --git a/src/database/mod.rs b/src/database/mod.rs index d063bd0..981f6a4 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -51,10 +51,12 @@ pub mod knowledge_dao; pub mod location_dao; pub mod models; pub mod persona_dao; +pub mod precomputed_reel_dao; pub mod preview_dao; pub mod reconcile; pub mod schema; pub mod search_dao; +pub mod user_ai_prefs_dao; pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao}; pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao}; @@ -66,8 +68,10 @@ pub use knowledge_dao::{ }; pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao}; pub use persona_dao::{ImportPersona, PersonaDao, PersonaPatch, SqlitePersonaDao}; +pub use precomputed_reel_dao::{PrecomputedReelDao, SqlitePrecomputedReelDao}; pub use preview_dao::{PreviewDao, SqlitePreviewDao}; pub use search_dao::{SearchHistoryDao, SqliteSearchHistoryDao}; +pub use user_ai_prefs_dao::{SqliteUserAiPrefsDao, UserAiPrefsDao}; pub trait UserDao { fn create_user(&mut self, user: &str, password: &str) -> Option; diff --git a/src/database/models.rs b/src/database/models.rs index 62274e2..d3d5440 100644 --- a/src/database/models.rs +++ b/src/database/models.rs @@ -1,6 +1,7 @@ use crate::database::schema::{ entities, entity_facts, entity_photo_links, favorites, image_exif, insight_generation_jobs, - libraries, personas, photo_insights, users, video_preview_clips, + libraries, personas, photo_insights, precomputed_reels, user_ai_prefs, users, + video_preview_clips, }; use serde::Serialize; @@ -505,3 +506,56 @@ pub struct InsightGenerationJob { pub result_insight_id: Option, pub error_message: Option, } + +// --- Precomputed reels ------------------------------------------------------- + +#[derive(Insertable)] +#[diesel(table_name = precomputed_reels)] +pub struct InsertablePrecomputedReel { + pub span: String, + pub library_key: String, + pub cache_key: String, + pub output_path: String, + pub title: String, + pub media_count: i32, + pub render_version: i32, + pub tz_offset_minutes: i32, + pub voice: Option, + pub generated_at: i64, +} + +#[derive(Serialize, Queryable, Clone, Debug)] +pub struct PrecomputedReel { + pub id: i32, + pub span: String, + pub library_key: String, + pub cache_key: String, + pub output_path: String, + pub title: String, + pub media_count: i32, + pub render_version: i32, + pub tz_offset_minutes: i32, + pub voice: Option, + pub generated_at: i64, +} + +// --- User AI preferences (Section E) ---------------------------------------- + +#[derive(Queryable, Insertable, Debug, Clone, serde::Deserialize, serde::Serialize)] +#[diesel(table_name = user_ai_prefs)] +pub struct UserAiPrefs { + pub id: i32, + pub voice: Option, + pub tz_offset_minutes: Option, + pub library: Option, + pub updated_at: i64, +} + +#[derive(Insertable, Debug, Clone, serde::Deserialize, serde::Serialize)] +#[diesel(table_name = user_ai_prefs)] +pub struct UpsertUserAiPrefs { + pub voice: Option, + pub tz_offset_minutes: Option, + pub library: Option, + pub updated_at: i64, +} diff --git a/src/database/precomputed_reel_dao.rs b/src/database/precomputed_reel_dao.rs new file mode 100644 index 0000000..7acc098 --- /dev/null +++ b/src/database/precomputed_reel_dao.rs @@ -0,0 +1,321 @@ +use diesel::prelude::*; +use diesel::sqlite::SqliteConnection; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex}; + +use crate::database::models::{InsertablePrecomputedReel, PrecomputedReel}; +use crate::database::schema; +use crate::database::{DbError, DbErrorKind, connect}; +use crate::otel::trace_db_call; + +/// Ledger for precomputed memory reels. The nightly agentic job writes a +/// row after each successful render; the `GET /reels/precomputed` handler +/// reads it to gate on freshness and serve the cached MP4. +pub trait PrecomputedReelDao: Sync + Send { + /// Insert a precomputed reel row. Returns the new row's id. + /// Written by the nightly agentic job (Section D). + #[allow(dead_code)] + fn record_reel( + &mut self, + context: &opentelemetry::Context, + row: &InsertablePrecomputedReel, + ) -> Result; + + /// Find the latest precomputed reel for the given (span, library_key). + fn latest_for( + &mut self, + context: &opentelemetry::Context, + span: &str, + library_key: &str, + ) -> Result, DbError>; + + /// Return true when a fresh precomputed reel exists for the given + /// (span, library_key, render_version) that was generated at or after + /// `min_generated_at`. Used as a fast existence gate before falling + /// back to `latest_for` (avoids a second query path). + fn exists_fresh( + &mut self, + context: &opentelemetry::Context, + span: &str, + library_key: &str, + render_version: i32, + min_generated_at: i64, + ) -> Result; +} + +pub struct SqlitePrecomputedReelDao { + connection: Arc>, +} + +impl Default for SqlitePrecomputedReelDao { + fn default() -> Self { + Self::new() + } +} + +impl SqlitePrecomputedReelDao { + pub fn new() -> Self { + Self { + connection: Arc::new(Mutex::new(connect())), + } + } + + #[cfg(test)] + pub fn from_connection(conn: Arc>) -> Self { + Self { connection: conn } + } +} + +impl PrecomputedReelDao for SqlitePrecomputedReelDao { + fn record_reel( + &mut self, + context: &opentelemetry::Context, + row: &InsertablePrecomputedReel, + ) -> Result { + trace_db_call(context, "insert", "record_reel", |_span| { + use schema::precomputed_reels::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock PrecomputedReelDao"); + + diesel::insert_into(dsl::precomputed_reels) + .values(row) + .execute(connection.deref_mut()) + .map_err(|e| anyhow::anyhow!("Failed to insert reel: {}", e))?; + + dsl::precomputed_reels + .order(dsl::id.desc()) + .select(dsl::id) + .first::(connection.deref_mut()) + .map_err(|e| anyhow::anyhow!("Failed to get reel id: {}", e)) + }) + .map_err(|e| DbError::log(DbErrorKind::InsertError, e)) + } + + fn latest_for( + &mut self, + context: &opentelemetry::Context, + span: &str, + library_key: &str, + ) -> Result, DbError> { + trace_db_call(context, "query", "latest_for", |_span| { + use schema::precomputed_reels::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock PrecomputedReelDao"); + + dsl::precomputed_reels + .filter(dsl::span.eq(span)) + .filter(dsl::library_key.eq(library_key)) + .order(dsl::generated_at.desc()) + .first::(connection.deref_mut()) + .optional() + .map_err(|e| anyhow::anyhow!("Failed to get latest reel: {}", e)) + }) + .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) + } + + fn exists_fresh( + &mut self, + context: &opentelemetry::Context, + span: &str, + library_key: &str, + render_version: i32, + min_generated_at: i64, + ) -> Result { + trace_db_call(context, "query", "exists_fresh", |_span| { + use schema::precomputed_reels::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock PrecomputedReelDao"); + + let count: i64 = dsl::precomputed_reels + .filter(dsl::span.eq(span)) + .filter(dsl::library_key.eq(library_key)) + .filter(dsl::render_version.eq(render_version)) + .filter(dsl::generated_at.ge(min_generated_at)) + .count() + .get_result(connection.deref_mut()) + .map_err(|e| anyhow::anyhow!("Failed to check fresh reel: {}", e))?; + + Ok(count > 0) + }) + .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use diesel::Connection; + use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; + + const DB_MIGRATIONS: EmbeddedMigrations = embed_migrations!(); + + fn setup_dao() -> SqlitePrecomputedReelDao { + let mut conn = SqliteConnection::establish(":memory:") + .expect("Unable to create in-memory db connection"); + conn.run_pending_migrations(DB_MIGRATIONS) + .expect("Failure running DB migrations"); + SqlitePrecomputedReelDao::from_connection(Arc::new(Mutex::new(conn))) + } + + fn ctx() -> opentelemetry::Context { + opentelemetry::Context::new() + } + + fn sample_row() -> InsertablePrecomputedReel { + InsertablePrecomputedReel { + span: "day".to_string(), + library_key: "1".to_string(), + cache_key: "abc123".to_string(), + output_path: "/tmp/reel.mp4".to_string(), + title: "Test Reel".to_string(), + media_count: 10, + render_version: 1, + tz_offset_minutes: 0, + voice: Some("default".to_string()), + generated_at: 1_000_000, + } + } + + #[test] + fn record_reel_inserts_and_returns_id() { + let mut dao = setup_dao(); + let ctx = ctx(); + let row = sample_row(); + + let id = dao.record_reel(&ctx, &row).unwrap(); + assert!(id > 0, "should return a positive id"); + } + + #[test] + fn record_reel_returns_increasing_ids() { + let mut dao = setup_dao(); + let ctx = ctx(); + let row = sample_row(); + + let id1 = dao.record_reel(&ctx, &row).unwrap(); + let id2 = dao.record_reel(&ctx, &row).unwrap(); + assert!(id2 > id1, "each insert should get a higher id"); + } + + #[test] + fn latest_for_returns_latest() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let row1 = InsertablePrecomputedReel { + generated_at: 1_000_000, + ..sample_row() + }; + let row2 = InsertablePrecomputedReel { + generated_at: 2_000_000, + ..sample_row() + }; + + dao.record_reel(&ctx, &row1).unwrap(); + dao.record_reel(&ctx, &row2).unwrap(); + + let latest = dao.latest_for(&ctx, "day", "1").unwrap().unwrap(); + assert_eq!(latest.generated_at, 2_000_000); + } + + #[test] + fn latest_for_scoped_by_span_and_library() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let day_row = InsertablePrecomputedReel { + span: "day".to_string(), + library_key: "1".to_string(), + generated_at: 1_000_000, + ..sample_row() + }; + let week_row = InsertablePrecomputedReel { + span: "week".to_string(), + library_key: "1".to_string(), + generated_at: 2_000_000, + ..sample_row() + }; + + dao.record_reel(&ctx, &day_row).unwrap(); + dao.record_reel(&ctx, &week_row).unwrap(); + + let day_latest = dao.latest_for(&ctx, "day", "1").unwrap().unwrap(); + assert_eq!(day_latest.span, "day"); + + let week_latest = dao.latest_for(&ctx, "week", "1").unwrap().unwrap(); + assert_eq!(week_latest.span, "week"); + + // Different library returns None + let missing = dao.latest_for(&ctx, "day", "99").unwrap(); + assert!(missing.is_none()); + } + + #[test] + fn latest_for_returns_none_when_no_rows() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let result = dao.latest_for(&ctx, "day", "1").unwrap(); + assert!(result.is_none()); + } + + #[test] + fn exists_fresh_returns_true_when_present() { + let mut dao = setup_dao(); + let ctx = ctx(); + + dao.record_reel(&ctx, &sample_row()).unwrap(); + + let exists = dao.exists_fresh(&ctx, "day", "1", 1, 900_000).unwrap(); + assert!(exists, "should find the row we just inserted"); + } + + #[test] + fn exists_fresh_returns_false_when_missing() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let exists = dao.exists_fresh(&ctx, "day", "1", 1, 900_000).unwrap(); + assert!(!exists, "should not find anything in empty table"); + } + + #[test] + fn exists_fresh_respects_min_generated_at() { + let mut dao = setup_dao(); + let ctx = ctx(); + + dao.record_reel(&ctx, &sample_row()).unwrap(); + + // Below the threshold — should exist + let exists = dao.exists_fresh(&ctx, "day", "1", 1, 500_000).unwrap(); + assert!(exists); + + // Above the threshold — should not exist + let exists = dao.exists_fresh(&ctx, "day", "1", 1, 2_000_000).unwrap(); + assert!(!exists); + } + + #[test] + fn exists_fresh_respects_render_version() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let row_v1 = InsertablePrecomputedReel { + render_version: 1, + ..sample_row() + }; + dao.record_reel(&ctx, &row_v1).unwrap(); + + assert!(dao.exists_fresh(&ctx, "day", "1", 1, 900_000).unwrap()); + assert!(!dao.exists_fresh(&ctx, "day", "1", 2, 900_000).unwrap()); + } +} diff --git a/src/database/schema.rs b/src/database/schema.rs index bf5791b..846542d 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -266,6 +266,16 @@ diesel::table! { } } +diesel::table! { + user_ai_prefs (id) { + id -> Integer, + voice -> Nullable, + tz_offset_minutes -> Nullable, + library -> Nullable, + updated_at -> BigInt, + } +} + diesel::table! { video_preview_clips (id) { id -> Integer, @@ -294,6 +304,22 @@ diesel::table! { } } +diesel::table! { + precomputed_reels (id) { + id -> Integer, + span -> Text, + library_key -> Text, + cache_key -> Text, + output_path -> Text, + title -> Text, + media_count -> Integer, + render_version -> Integer, + tz_offset_minutes -> Integer, + voice -> Nullable, + generated_at -> BigInt, + } +} + diesel::joinable!(entity_facts -> photo_insights (source_insight_id)); diesel::joinable!(entity_photo_links -> entities (entity_id)); diesel::joinable!(entity_photo_links -> libraries (library_id)); @@ -322,9 +348,11 @@ diesel::allow_tables_to_appear_in_same_query!( personas, persons, photo_insights, + precomputed_reels, search_history, tagged_photo, tags, + user_ai_prefs, users, video_preview_clips, ); diff --git a/src/database/user_ai_prefs_dao.rs b/src/database/user_ai_prefs_dao.rs new file mode 100644 index 0000000..d58a56c --- /dev/null +++ b/src/database/user_ai_prefs_dao.rs @@ -0,0 +1,212 @@ +use diesel::prelude::*; +use diesel::sqlite::SqliteConnection; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex}; + +use crate::database::models::{UpsertUserAiPrefs, UserAiPrefs}; +use crate::database::schema; +use crate::database::{DbError, DbErrorKind, connect}; +use crate::otel::trace_db_call; + +/// Generic single-row table that passively mirrors the latest client AI +/// request parameters (voice, timezone, library). Read by the nightly +/// pre-generation scheduler (Section D) to pick up user preferences. +pub trait UserAiPrefsDao: Sync + Send { + /// Read the single row; `None` when it hasn't been populated yet. + fn get_prefs( + &mut self, + context: &opentelemetry::Context, + ) -> Result, DbError>; + + /// Upsert the single row (id is always 1). + #[allow(dead_code)] + fn upsert_prefs( + &mut self, + context: &opentelemetry::Context, + prefs: &UpsertUserAiPrefs, + ) -> Result<(), DbError>; +} + +pub struct SqliteUserAiPrefsDao { + connection: Arc>, +} + +impl Default for SqliteUserAiPrefsDao { + fn default() -> Self { + Self::new() + } +} + +impl SqliteUserAiPrefsDao { + pub fn new() -> Self { + Self { + connection: Arc::new(Mutex::new(connect())), + } + } + + #[cfg(test)] + pub fn from_connection(conn: Arc>) -> Self { + Self { connection: conn } + } +} + +impl UserAiPrefsDao for SqliteUserAiPrefsDao { + fn get_prefs( + &mut self, + context: &opentelemetry::Context, + ) -> Result, DbError> { + trace_db_call(context, "query", "get_prefs", |_span| { + use schema::user_ai_prefs::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock UserAiPrefsDao"); + + dsl::user_ai_prefs + .first::(connection.deref_mut()) + .optional() + .map_err(|e| anyhow::anyhow!("Failed to get prefs: {}", e)) + }) + .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) + } + + fn upsert_prefs( + &mut self, + context: &opentelemetry::Context, + prefs: &UpsertUserAiPrefs, + ) -> Result<(), DbError> { + trace_db_call(context, "upsert", "upsert_prefs", |_span| { + use schema::user_ai_prefs::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock UserAiPrefsDao"); + + // SQLite: INSERT on first call, UPDATE on subsequent calls. + // The first INSERT creates the row with id=1 (auto-increment). + // Subsequent calls UPDATE the existing row. + let result = diesel::insert_into(dsl::user_ai_prefs) + .values(prefs) + .execute(connection.deref_mut()); + + match result { + Ok(_) => { + // First insert succeeded. + Ok(()) + } + Err(_e) => { + // Insert failed (likely due to duplicate key). Update instead. + diesel::update(dsl::user_ai_prefs.filter(dsl::id.eq(1))) + .set(( + dsl::voice.eq(&prefs.voice), + dsl::tz_offset_minutes.eq(&prefs.tz_offset_minutes), + dsl::library.eq(&prefs.library), + dsl::updated_at.eq(&prefs.updated_at), + )) + .execute(connection.deref_mut()) + .map_err(|e| anyhow::anyhow!("Failed to upsert prefs: {}", e))?; + Ok(()) + } + } + }) + .map_err(|e| DbError::log(DbErrorKind::InsertError, e)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use diesel::Connection; + use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; + + const DB_MIGRATIONS: EmbeddedMigrations = embed_migrations!(); + + fn setup_dao() -> SqliteUserAiPrefsDao { + let mut conn = SqliteConnection::establish(":memory:") + .expect("Unable to create in-memory db connection"); + conn.run_pending_migrations(DB_MIGRATIONS) + .expect("Failure running DB migrations"); + SqliteUserAiPrefsDao::from_connection(Arc::new(Mutex::new(conn))) + } + + fn ctx() -> opentelemetry::Context { + opentelemetry::Context::new() + } + + #[test] + fn get_prefs_returns_none_when_empty() { + let mut dao = setup_dao(); + let result = dao.get_prefs(&ctx()).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn upsert_prefs_inserts_row() { + let mut dao = setup_dao(); + let now = 1_700_000_000i64; + let prefs = UpsertUserAiPrefs { + voice: Some("grandma".to_string()), + tz_offset_minutes: Some(-480), + library: Some("1".to_string()), + updated_at: now, + }; + dao.upsert_prefs(&ctx(), &prefs).unwrap(); + + let row = dao.get_prefs(&ctx()).unwrap().unwrap(); + assert_eq!(row.id, 1); + assert_eq!(row.voice, Some("grandma".to_string())); + assert_eq!(row.tz_offset_minutes, Some(-480)); + assert_eq!(row.library, Some("1".to_string())); + assert_eq!(row.updated_at, now); + } + + #[test] + fn upsert_prefs_replaces_existing() { + let mut dao = setup_dao(); + let now1 = 1_700_000_000i64; + let now2 = 1_800_000_000i64; + + let prefs1 = UpsertUserAiPrefs { + voice: Some("grandma".to_string()), + tz_offset_minutes: Some(-480), + library: Some("1".to_string()), + updated_at: now1, + }; + dao.upsert_prefs(&ctx(), &prefs1).unwrap(); + + let prefs2 = UpsertUserAiPrefs { + voice: Some("dad".to_string()), + tz_offset_minutes: Some(-300), + library: None, + updated_at: now2, + }; + dao.upsert_prefs(&ctx(), &prefs2).unwrap(); + + let row = dao.get_prefs(&ctx()).unwrap().unwrap(); + assert_eq!(row.voice, Some("dad".to_string())); + assert_eq!(row.tz_offset_minutes, Some(-300)); + assert!(row.library.is_none()); + assert_eq!(row.updated_at, now2); + } + + #[test] + fn upsert_partial_fields() { + let mut dao = setup_dao(); + let now = 1_700_000_000i64; + + let prefs = UpsertUserAiPrefs { + voice: None, + tz_offset_minutes: Some(-480), + library: None, + updated_at: now, + }; + dao.upsert_prefs(&ctx(), &prefs).unwrap(); + + let row = dao.get_prefs(&ctx()).unwrap().unwrap(); + assert_eq!(row.tz_offset_minutes, Some(-480)); + assert!(row.voice.is_none()); + assert!(row.library.is_none()); + } +} diff --git a/src/duplicates.rs b/src/duplicates.rs index 372415b..32ed92b 100644 --- a/src/duplicates.rs +++ b/src/duplicates.rs @@ -234,7 +234,7 @@ async fn list_exact_handler( let span = global_tracer().start_with_context("duplicates.list_exact", &context); let span_context = opentelemetry::Context::current_with_span(span); - let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref()) + let library_id = libraries::resolve_library_param_state(&app_state, query.library.as_deref()) .ok() .flatten() .map(|l| l.id); @@ -265,7 +265,7 @@ async fn list_perceptual_handler( let span = global_tracer().start_with_context("duplicates.list_perceptual", &context); let span_context = opentelemetry::Context::current_with_span(span); - let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref()) + let library_id = libraries::resolve_library_param_state(&app_state, query.library.as_deref()) .ok() .flatten() .map(|l| l.id); @@ -449,7 +449,7 @@ async fn list_folder_pairs_handler( let span = global_tracer().start_with_context("duplicates.list_folder_pairs", &context); let span_context = opentelemetry::Context::current_with_span(span); - let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref()) + let library_id = libraries::resolve_library_param_state(&app_state, query.library.as_deref()) .ok() .flatten() .map(|l| l.id); diff --git a/src/faces.rs b/src/faces.rs index 3288aa3..f619966 100644 --- a/src/faces.rs +++ b/src/faces.rs @@ -1755,7 +1755,7 @@ async fn stats_handler( let span = global_tracer().start_with_context("faces.stats", &context); let span_context = opentelemetry::Context::current_with_span(span); - let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref()) + let library_id = libraries::resolve_library_param_state(&app_state, query.library.as_deref()) .ok() .flatten() .map(|l| l.id); @@ -1782,11 +1782,12 @@ async fn list_faces_handler( let normalized_path = normalize_path(&query.path); // resolve_library_param returns Option<&Library>; clone so the result // is owned (matching the primary_library fallback's type). - let library: Library = libraries::resolve_library_param(&app_state, query.library.as_deref()) - .ok() - .flatten() - .cloned() - .unwrap_or_else(|| app_state.primary_library().clone()); + let library: Library = + libraries::resolve_library_param_state(&app_state, query.library.as_deref()) + .ok() + .flatten() + .cloned() + .unwrap_or_else(|| app_state.primary_library().clone()); let mut dao = face_dao.lock().expect("face dao lock"); let hash = match dao.resolve_content_hash(&span_context, library.id, &normalized_path) { @@ -1870,7 +1871,7 @@ async fn create_face_handler( } let normalized_path = normalize_path(&body.path); - let library: Library = match libraries::resolve_library_param( + let library: Library = match libraries::resolve_library_param_state( &app_state, body.library.as_ref().map(|i| i.to_string()).as_deref(), ) { @@ -2192,7 +2193,7 @@ async fn list_persons_handler( let span = global_tracer().start_with_context("persons.list", &context); let span_context = opentelemetry::Context::current_with_span(span); - let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref()) + let library_id = libraries::resolve_library_param_state(&app_state, query.library.as_deref()) .ok() .flatten() .map(|l| l.id); @@ -2345,7 +2346,7 @@ async fn person_faces_handler( let context = extract_context_from_request(&request); let span = global_tracer().start_with_context("persons.faces", &context); let span_context = opentelemetry::Context::current_with_span(span); - let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref()) + let library_id = libraries::resolve_library_param_state(&app_state, query.library.as_deref()) .ok() .flatten() .map(|l| l.id); diff --git a/src/files.rs b/src/files.rs index 59cd49e..920540e 100644 --- a/src/files.rs +++ b/src/files.rs @@ -275,14 +275,14 @@ pub async fn list_photos( // Resolve the optional library filter. Unknown values return 400. A // `None` result means "union across all libraries" and downstream // walks iterate every configured library root. - let library = match crate::libraries::resolve_library_param(&app_state, req.library.as_deref()) - { - Ok(lib) => lib, - Err(msg) => { - log::warn!("Rejecting /photos request: {}", msg); - return HttpResponse::BadRequest().body(msg); - } - }; + let library = + match crate::libraries::resolve_library_param_state(&app_state, req.library.as_deref()) { + Ok(lib) => lib, + Err(msg) => { + log::warn!("Rejecting /photos request: {}", msg); + return HttpResponse::BadRequest().body(msg); + } + }; let span_context = opentelemetry::Context::current_with_span(span); @@ -1238,7 +1238,7 @@ pub async fn list_exif_summary( // Resolve the library filter up front so a bad id/name 400s before we // ever take the DAO mutex. None == union across all libraries. let library_filter = - match crate::libraries::resolve_library_param(&app_state, req.library.as_deref()) { + match crate::libraries::resolve_library_param_state(&app_state, req.library.as_deref()) { Ok(lib) => lib.map(|l| l.id), Err(msg) => { span.set_status(Status::error(msg.clone())); diff --git a/src/handlers/image.rs b/src/handlers/image.rs index f0d2310..923fff3 100644 --- a/src/handlers/image.rs +++ b/src/handlers/image.rs @@ -53,7 +53,7 @@ pub async fn get_image( // Resolve library from query param; default to primary so clients that // don't yet send `library=` continue to work. - let library = match libraries::resolve_library_param(&app_state, req.library.as_deref()) { + let library = match libraries::resolve_library_param_state(&app_state, req.library.as_deref()) { Ok(Some(lib)) => lib, Ok(None) => app_state.primary_library(), Err(msg) => { @@ -492,7 +492,7 @@ pub async fn get_file_metadata( let span_context = opentelemetry::Context::new().with_remote_span_context(span.span_context().clone()); - let library = libraries::resolve_library_param(&app_state, path.library.as_deref()) + let library = libraries::resolve_library_param_state(&app_state, path.library.as_deref()) .ok() .flatten() .unwrap_or_else(|| app_state.primary_library()); @@ -580,7 +580,7 @@ pub async fn set_image_gps( let span_context = opentelemetry::Context::new().with_remote_span_context(span.span_context().clone()); - let library = libraries::resolve_library_param(&app_state, body.library.as_deref()) + let library = libraries::resolve_library_param_state(&app_state, body.library.as_deref()) .ok() .flatten() .unwrap_or_else(|| app_state.primary_library()); @@ -746,7 +746,7 @@ pub async fn get_full_exif( let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("get_full_exif", &context); - let library = libraries::resolve_library_param(&app_state, path.library.as_deref()) + let library = libraries::resolve_library_param_state(&app_state, path.library.as_deref()) .ok() .flatten() .unwrap_or_else(|| app_state.primary_library()); @@ -888,7 +888,8 @@ pub async fn set_image_date( let span_context = opentelemetry::Context::new().with_remote_span_context(span.span_context().clone()); - let library = match libraries::resolve_library_param(&app_state, body.library.as_deref()) { + let library = match libraries::resolve_library_param_state(&app_state, body.library.as_deref()) + { Ok(Some(lib)) => lib, Ok(None) => app_state.primary_library(), Err(msg) => { @@ -941,7 +942,8 @@ pub async fn clear_image_date( let span_context = opentelemetry::Context::new().with_remote_span_context(span.span_context().clone()); - let library = match libraries::resolve_library_param(&app_state, body.library.as_deref()) { + let library = match libraries::resolve_library_param_state(&app_state, body.library.as_deref()) + { Ok(Some(lib)) => lib, Ok(None) => app_state.primary_library(), Err(msg) => { @@ -1001,7 +1003,7 @@ pub async fn upload_image( // Resolve the optional library selector. Absent → primary library // (backwards-compatible with clients that don't yet send `library=`). let target_library = - match libraries::resolve_library_param(&app_state, query.library.as_deref()) { + match libraries::resolve_library_param_state(&app_state, query.library.as_deref()) { Ok(Some(lib)) => lib, Ok(None) => app_state.primary_library(), Err(msg) => { diff --git a/src/handlers/video.rs b/src/handlers/video.rs index f9f4e64..b56a67e 100644 --- a/src/handlers/video.rs +++ b/src/handlers/video.rs @@ -67,10 +67,11 @@ pub async fn generate_video( let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("generate_video", &context); - let preferred_library = libraries::resolve_library_param(&app_state, body.library.as_deref()) - .ok() - .flatten() - .unwrap_or_else(|| app_state.primary_library()); + let preferred_library = + libraries::resolve_library_param_state(&app_state, body.library.as_deref()) + .ok() + .flatten() + .unwrap_or_else(|| app_state.primary_library()); // Try the resolved library first, then fall back to any other library // that actually contains the file — handles union-mode requests where diff --git a/src/libraries.rs b/src/libraries.rs index 55bf5c1..377b442 100644 --- a/src/libraries.rs +++ b/src/libraries.rs @@ -291,11 +291,11 @@ pub fn seed_or_patch_from_env(conn: &mut SqliteConnection, base_path: &str) { } /// Resolve a library request parameter (accepts numeric id as string or name) -/// against the configured libraries. Returns `Ok(None)` when the param is +/// against a list of libraries. Returns `Ok(None)` when the param is /// absent, meaning "span all libraries". Returns `Err` when a value is /// provided but does not match any library. pub fn resolve_library_param<'a>( - state: &'a AppState, + libs: &'a [Library], param: Option<&str>, ) -> Result, String> { let Some(raw) = param.map(str::trim).filter(|s| !s.is_empty()) else { @@ -303,18 +303,29 @@ pub fn resolve_library_param<'a>( }; if let Ok(id) = raw.parse::() { - return state - .library_by_id(id) + return libs + .iter() + .find(|l| l.id == id) .map(Some) .ok_or_else(|| format!("unknown library id: {}", id)); } - state - .library_by_name(raw) + libs.iter() + .find(|l| l.name == raw) .map(Some) .ok_or_else(|| format!("unknown library name: {}", raw)) } +/// Resolve a library request parameter against the AppState's libraries. +/// Returns `Ok(None)` when the param is absent, meaning "span all libraries". +/// Returns `Err` when a value is provided but does not match any library. +pub fn resolve_library_param_state<'a>( + state: &'a AppState, + param: Option<&str>, +) -> Result, String> { + resolve_library_param(&state.libraries, param) +} + /// Health of a library at a point in time. Probed at the top of each /// file-watcher tick. The `Stale` state is the "be conservative" signal: /// destructive paths (ingest writes, future move-handoff and orphan GC in @@ -662,12 +673,6 @@ mod tests { assert_eq!(abs, PathBuf::from("/tmp/media/2024/photo.jpg")); } - fn state_with_libraries(libs: Vec) -> AppState { - let mut state = AppState::test_state(); - state.libraries = libs; - state - } - fn sample_libraries() -> Vec { vec![ Library { @@ -687,52 +692,52 @@ mod tests { ] } - #[actix_rt::test] - async fn resolve_library_param_absent_is_union() { - let state = state_with_libraries(sample_libraries()); - assert!(matches!(resolve_library_param(&state, None), Ok(None))); + #[test] + fn resolve_library_param_absent_is_union() { + let libs = sample_libraries(); + assert!(matches!(resolve_library_param(&libs, None), Ok(None))); } - #[actix_rt::test] - async fn resolve_library_param_empty_or_whitespace_is_union() { - let state = state_with_libraries(sample_libraries()); - assert!(matches!(resolve_library_param(&state, Some("")), Ok(None))); + #[test] + fn resolve_library_param_empty_or_whitespace_is_union() { + let libs = sample_libraries(); + assert!(matches!(resolve_library_param(&libs, Some("")), Ok(None))); assert!(matches!( - resolve_library_param(&state, Some(" ")), + resolve_library_param(&libs, Some(" ")), Ok(None) )); } - #[actix_rt::test] - async fn resolve_library_param_numeric_id_matches() { - let state = state_with_libraries(sample_libraries()); - let lib = resolve_library_param(&state, Some("7")) + #[test] + fn resolve_library_param_numeric_id_matches() { + let libs = sample_libraries(); + let lib = resolve_library_param(&libs, Some("7")) .expect("valid id") .expect("some library"); assert_eq!(lib.id, 7); assert_eq!(lib.name, "archive"); } - #[actix_rt::test] - async fn resolve_library_param_name_matches() { - let state = state_with_libraries(sample_libraries()); - let lib = resolve_library_param(&state, Some("main")) + #[test] + fn resolve_library_param_name_matches() { + let libs = sample_libraries(); + let lib = resolve_library_param(&libs, Some("main")) .expect("valid name") .expect("some library"); assert_eq!(lib.id, 1); } - #[actix_rt::test] - async fn resolve_library_param_unknown_id_errs() { - let state = state_with_libraries(sample_libraries()); - let err = resolve_library_param(&state, Some("999")).unwrap_err(); + #[test] + fn resolve_library_param_unknown_id_errs() { + let libs = sample_libraries(); + let err = resolve_library_param(&libs, Some("999")).unwrap_err(); assert!(err.contains("unknown library id")); } - #[actix_rt::test] - async fn resolve_library_param_unknown_name_errs() { - let state = state_with_libraries(sample_libraries()); - let err = resolve_library_param(&state, Some("missing")).unwrap_err(); + #[test] + fn resolve_library_param_unknown_name_errs() { + let libs = sample_libraries(); + let err = resolve_library_param(&libs, Some("missing")).unwrap_err(); assert!(err.contains("unknown library name")); } diff --git a/src/main.rs b/src/main.rs index b059e9b..e3ded45 100644 --- a/src/main.rs +++ b/src/main.rs @@ -267,6 +267,25 @@ fn main() -> std::io::Result<()> { } } + // Spawn the nightly pre-generation scheduler (Section D). + { + use crate::database::{ + InsightDao, SqliteInsightDao, SqliteUserAiPrefsDao, UserAiPrefsDao, + }; + + let insight_dao: Arc>> = + Arc::new(Mutex::new(Box::new(SqliteInsightDao::new()))); + let prefs_dao: Arc>> = + Arc::new(Mutex::new(Box::new(SqliteUserAiPrefsDao::new()))); + + reels::spawn_pregen_scheduler( + app_state.clone(), + web::Data::new(insight_dao), + web::Data::new(prefs_dao), + ) + .await; + } + HttpServer::new(move || { let user_dao = SqliteUserDao::new(); let favorites_dao = SqliteFavoriteDao::new(); @@ -348,6 +367,8 @@ fn main() -> std::io::Result<()> { .service(reels::create_reel_handler) .service(reels::reel_status_handler) .service(reels::reel_video_handler) + .service(reels::precomputed_reel_handler) + .service(reels::precomputed_video_handler) .service(ai::generate_insight_handler) .service(ai::generate_agentic_insight_handler) .service(ai::generation_status_handler) diff --git a/src/memories.rs b/src/memories.rs index c877981..2b1f473 100644 --- a/src/memories.rs +++ b/src/memories.rs @@ -419,7 +419,7 @@ pub fn gather_memory_items( span_mode, tz_offset_minutes, years_back ); - let library = crate::libraries::resolve_library_param(app_state, library_param)?; + let library = crate::libraries::resolve_library_param_state(app_state, library_param)?; let libraries_to_scan: Vec<&crate::libraries::Library> = match library { Some(lib) => vec![lib], None => app_state.libraries.iter().collect(), diff --git a/src/reels/mod.rs b/src/reels/mod.rs index 32635a9..c51822c 100644 --- a/src/reels/mod.rs +++ b/src/reels/mod.rs @@ -18,24 +18,59 @@ pub mod selector; use std::collections::HashMap; use std::path::{Path, PathBuf}; -use std::sync::{LazyLock, Mutex as StdMutex}; +use std::sync::{Arc, LazyLock, Mutex, Mutex as StdMutex}; use std::time::{Duration, Instant}; use actix_files::NamedFile; use actix_web::{HttpRequest, HttpResponse, Responder, get, post, web}; -use chrono::DateTime; +use anyhow::{Context, anyhow}; +use chrono::{DateTime, Datelike, Timelike}; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::sync::Mutex; use uuid::Uuid; use crate::data::Claims; -use crate::database::{ExifDao, InsightDao}; +use crate::database::{ExifDao, InsightDao, PrecomputedReelDao, UserAiPrefsDao}; +use crate::libraries::{Library, resolve_library_param}; use crate::memories::MemoriesSpan; use crate::otel::extract_context_from_request; use crate::state::AppState; use selector::ReelSelector; +// --- Precomputed reel age limits (hours) ------------------------------------- + +/// Maximum age for a precomputed day reel before it's considered stale. +const REEL_PRECOMPUTED_DAY_MAX_AGE_HOURS: u64 = 26; +/// Maximum age for a precomputed week reel. +const REEL_PRECOMPUTED_WEEK_MAX_AGE_HOURS: u64 = 192; +/// Maximum age for a precomputed month reel. +const REEL_PRECOMPUTED_MONTH_MAX_AGE_HOURS: u64 = 768; + +/// Resolve a library request parameter to a stable key string. +/// Returns the library's id as a string when found, or `"all"` when +/// the param is absent or the lookup fails. +pub fn normalize_library_key(libs: &[Library], param: Option<&str>) -> String { + match resolve_library_param(libs, param) { + Ok(Some(lib)) => lib.id.to_string(), + _ => "all".to_string(), + } +} + +/// Which scripting strategy to use for the reel narration. +#[derive(Clone, Copy)] +#[allow(dead_code)] +pub enum ScripterMode { + /// Fast path: single LLM call via the direct client. + Fast, + /// Agentic path: resolves the backend through the InsightGenerator + /// (honouring LLM_BACKEND, model overrides, etc.). Falls back to + /// Fast on error so a scripting failure never sinks a reel. + Agentic, +} + +/// Progress callback type — receives a static-stage label. +pub type ProgressFn<'a> = dyn Fn(&'static str) + Send + Sync + 'a; + /// The media behind one shot: a still photo, or a short section of a source /// video (played with its live audio ducked under the narration). Both carry /// just the library-relative path; the renderer applies fixed clip framing @@ -73,6 +108,8 @@ pub struct PlannedBeat { pub date: Option, pub insight_title: Option, pub insight_summary: Option, + /// GPS coordinates of the lead media item, when available. + pub gps: Option<(f64, f64)>, } impl PlannedBeat { @@ -292,6 +329,13 @@ pub struct ReelStatusResponse { pub error: Option, } +/// Response shape for `GET /reels/precomputed`. +#[derive(Debug, Serialize)] +pub struct PrecomputedReelResponse { + pub video_url: String, + pub title: String, +} + // --- Handlers ---------------------------------------------------------------- /// POST /reels — start (or instantly serve from cache) a memory reel for the @@ -399,8 +443,20 @@ pub async fn create_reel_handler( let state = app_state.clone(); let insight_dao = insight_dao.clone(); + let exif_dao = exif_dao.clone(); let handle = tokio::spawn(async move { - match run_reel_job(&state, &insight_dao, job_id, planned, meta, voice, &key).await { + match run_reel_job( + &state, + &insight_dao, + &exif_dao, + job_id, + planned, + meta, + voice, + &key, + ) + .await + { Ok((title, path)) => { finish_job(job_id, ReelJobStatus::Done, Some(title), Some(path), None) } @@ -471,25 +527,131 @@ pub async fn reel_video_handler( } } +/// GET /reels/precomputed?span=&library= +/// +/// Look up the latest precomputed reel for the given span and library key. +/// Validity gate (all must hold, else 404): +/// 1. `render_version == RENDER_VERSION` +/// 2. `output_path` exists on disk +/// 3. age <= max_age(span) (Day 26h, Week 8d, Month 32d) +/// +/// Returns `{ video_url: "/reels/by-key/{cache_key}/video", title }`. +#[get("/reels/precomputed")] +pub async fn precomputed_reel_handler( + _claims: Claims, + query: web::Query>, + app_state: web::Data, + reel_dao: web::Data>>, +) -> impl Responder { + let span = query.get("span").map(|s| s.as_str()).unwrap_or("day"); + let library_key = normalize_library_key( + &app_state.libraries, + query.get("library").map(|s| s.as_str()), + ); + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() as i64; + + let max_age_hours = match span { + "week" => REEL_PRECOMPUTED_WEEK_MAX_AGE_HOURS as i64, + "month" => REEL_PRECOMPUTED_MONTH_MAX_AGE_HOURS as i64, + _ => REEL_PRECOMPUTED_DAY_MAX_AGE_HOURS as i64, + }; + let min_generated_at = now - (max_age_hours * 3600); + + let ctx = opentelemetry::Context::new(); + let mut dao = reel_dao.lock().expect("Unable to lock PrecomputedReelDao"); + + // Fast existence gate: is there a fresh row at all? + if !dao + .exists_fresh( + &ctx, + span, + &library_key, + RENDER_VERSION as i32, + min_generated_at, + ) + .unwrap_or(false) + { + return HttpResponse::NotFound().json(json!({ "error": "no precomputed reel found" })); + } + + // Fetch the latest row for full validity checks. + let reel = match dao.latest_for(&ctx, span, &library_key) { + Ok(Some(r)) => r, + _ => { + return HttpResponse::NotFound().json(json!({ "error": "no precomputed reel found" })); + } + }; + + // Validity gate 1: render version must match. + if reel.render_version != RENDER_VERSION as i32 { + return HttpResponse::NotFound() + .json(json!({ "error": "precomputed reel is stale (render version mismatch)" })); + } + + // Validity gate 2: output_path must exist. + let output = std::path::Path::new(&reel.output_path); + if !output.exists() { + return HttpResponse::NotFound().json(json!({ "error": "precomputed reel file missing" })); + } + + // Validity gate 3: age <= max_age (re-checked via min_generated_at). + if reel.generated_at < min_generated_at { + return HttpResponse::NotFound().json(json!({ "error": "precomputed reel has expired" })); + } + + HttpResponse::Ok().json(PrecomputedReelResponse { + video_url: format!("/reels/by-key/{}/video", reel.cache_key), + title: reel.title, + }) +} + +/// GET /reels/by-key/{key}/video — stream a precomputed reel MP4 by cache key. +#[get("/reels/by-key/{key}/video")] +pub async fn precomputed_video_handler( + _claims: Claims, + request: HttpRequest, + path: web::Path, + app_state: web::Data, +) -> impl Responder { + let key = path.into_inner(); + let mp4 = reel_mp4_path(&app_state, &key); + match NamedFile::open(&mp4) { + Ok(file) => file.into_response(&request), + Err(e) => { + log::error!("opening precomputed reel {key} failed: {e:?}"); + HttpResponse::NotFound().json(json!({ "error": "precomputed reel file missing" })) + } + } +} + // --- Pipeline ---------------------------------------------------------------- /// Run the full reel pipeline: enrich → script → narrate → render → concat, /// then publish the MP4 into the cache. Returns (title, mp4_path). -async fn run_reel_job( +/// +/// The `scripter` parameter controls which narration-generation strategy is +/// used (fast single-call vs. agentic backend resolution). On scripting +/// failure in Agentic mode the pipeline falls back to the fast path so a +/// single LLM failure never sinks a reel. +pub(crate) async fn produce_reel( app_state: &AppState, insight_dao: &Mutex>, - job_id: Uuid, + exif_dao: &Mutex>, mut planned: Vec, meta: ReelMeta, voice: Option, key: &str, + scripter: ScripterMode, + progress: Option<&ProgressFn<'_>>, ) -> anyhow::Result<(String, PathBuf)> { - use anyhow::{Context, anyhow}; - let started = Instant::now(); let total_photos: usize = planned.iter().map(|b| b.media.len()).sum(); log::info!( - "reel {job_id}: starting — span {:?}, {} beats, {} photos, voice={}", + "reel produce_reel: starting — span {:?}, {} beats, {} photos, voice={}", meta.span, planned.len(), total_photos, @@ -499,18 +661,33 @@ async fn run_reel_job( let client = app_state .llamacpp .as_ref() - .ok_or_else(|| anyhow!("TTS/LLM backend not configured"))? + .ok_or_else(|| anyhow::anyhow!("TTS/LLM backend not configured"))? .clone(); // 1. Enrich each beat with its lead photo's cached insight, then script // (one LLM call → one narration line per beat). - set_stage(job_id, "scripting"); - log::info!("reel {job_id}: scripting narration via LLM…"); + emit_progress(progress, "scripting"); + log::info!("reel produce_reel: scripting narration via LLM…"); let span_context = opentelemetry::Context::new(); - selector::enrich(insight_dao, &span_context, &mut planned); - let script = script::generate_script(&client, &meta, &planned).await?; + selector::enrich(insight_dao, exif_dao, &span_context, &mut planned); + let script = match scripter { + ScripterMode::Fast => script::generate_script(&client, &meta, &planned).await?, + ScripterMode::Agentic => { + match script::generate_script_agentic(&app_state.insight_generator, &meta, &planned) + .await + { + Ok(s) => s, + Err(e) => { + log::warn!( + "reel produce_reel: agentic script failed, falling back to fast: {e}" + ); + script::generate_script(&client, &meta, &planned).await? + } + } + } + }; log::info!( - "reel {job_id}: scripted \"{}\" ({} lines)", + "reel produce_reel: scripted \"{}\" ({} lines)", script.title, script.lines.len() ); @@ -519,11 +696,11 @@ async fn run_reel_job( // sequence under that one narration). A beat whose audio or render fails // is skipped (logged) rather than sinking the whole reel — handles an // odd HEIC/corrupt file gracefully. - set_stage(job_id, "narrating"); + emit_progress(progress, "narrating"); let work = tempfile::tempdir().context("creating reel work dir")?; let nvenc = render::is_nvenc_available().await; log::info!( - "reel {job_id}: narrating + rendering {} beats (encoder: {})", + "reel produce_reel: narrating + rendering {} beats (encoder: {})", planned.len(), if nvenc { "nvenc" } else { "cpu" } ); @@ -543,7 +720,7 @@ async fn run_reel_job( .filter_map(|m| resolve_media_path(app_state, m)) .collect(); if paths.is_empty() { - log::warn!("reel {job_id}: skipping beat {i}, no media paths resolved"); + log::warn!("reel produce_reel: skipping beat {i}, no media paths resolved"); continue; } @@ -558,13 +735,13 @@ async fn run_reel_job( { Ok(b) => b, Err(e) => { - log::warn!("reel {job_id}: skipping beat {i}, TTS failed: {e}"); + log::warn!("reel produce_reel: skipping beat {i}, TTS failed: {e}"); continue; } }; let audio_path = work.path().join(format!("narration_{i:03}.wav")); if let Err(e) = tokio::fs::write(&audio_path, &audio_bytes).await { - log::warn!("reel {job_id}: skipping beat {i}, writing audio failed: {e}"); + log::warn!("reel produce_reel: skipping beat {i}, writing audio failed: {e}"); continue; } @@ -575,11 +752,11 @@ async fn run_reel_job( .flatten() .unwrap_or(render::MIN_SEGMENT_SECONDS); - set_stage(job_id, "rendering"); + emit_progress(progress, "rendering"); let beat_out = work.path().join(format!("beat_{i:03}.mp4")); let render_result = if beat.is_clip() { log::info!( - "reel {job_id}: beat {}/{} — video clip, narration {:.1}s", + "reel produce_reel: beat {}/{} — video clip, narration {:.1}s", i + 1, beat_total, narration_secs @@ -587,7 +764,7 @@ async fn run_reel_job( render::render_clip_beat(&paths[0], &audio_path, &beat_out, narration_secs, &opts).await } else { log::info!( - "reel {job_id}: beat {}/{} — {} photo(s), narration {:.1}s", + "reel produce_reel: beat {}/{} — {} photo(s), narration {:.1}s", i + 1, beat_total, paths.len(), @@ -596,7 +773,7 @@ async fn run_reel_job( render::render_beat(&paths, &audio_path, &beat_out, narration_secs, &opts).await }; if let Err(e) = render_result { - log::warn!("reel {job_id}: skipping beat {i}, render failed: {e}"); + log::warn!("reel produce_reel: skipping beat {i}, render failed: {e}"); continue; } beat_files.push(beat_out.to_string_lossy().to_string()); @@ -609,9 +786,9 @@ async fn run_reel_job( // 4. Concat into the cache. Write to a temp name in the reels dir, then // rename atomically (same filesystem) so a reader never sees a partial. - set_stage(job_id, "rendering"); + emit_progress(progress, "rendering"); log::info!( - "reel {job_id}: joining {} rendered beats into the final reel", + "reel produce_reel: joining {} rendered beats into the final reel", segment_files.len() ); std::fs::create_dir_all(&app_state.reels_path).context("creating reels dir")?; @@ -629,7 +806,7 @@ async fn run_reel_job( let _ = std::fs::write(reel_sidecar_path(app_state, key), sidecar); log::info!( - "reel {job_id}: done in {:.1}s — {} beats → {}", + "reel produce_reel: done in {:.1}s — {} beats → {}", started.elapsed().as_secs_f64(), segment_files.len(), final_path.display() @@ -637,6 +814,42 @@ async fn run_reel_job( Ok((script.title, final_path)) } +/// Emit a progress stage label via the optional callback. +fn emit_progress(progress: Option<&ProgressFn<'_>>, stage: &'static str) { + if let Some(p) = progress { + p(stage); + } +} + +/// Run the full reel pipeline and publish the MP4 into the cache. +/// Thin wrapper around [`produce_reel`] that wires up job-stage tracking. +async fn run_reel_job( + app_state: &AppState, + insight_dao: &Mutex>, + exif_dao: &Mutex>, + job_id: Uuid, + planned: Vec, + meta: ReelMeta, + voice: Option, + key: &str, +) -> anyhow::Result<(String, PathBuf)> { + let progress = move |stage: &'static str| { + set_stage(job_id, stage); + }; + produce_reel( + app_state, + insight_dao, + exif_dao, + planned, + meta, + voice, + key, + ScripterMode::Fast, + Some(&progress), + ) + .await +} + /// Resolve a media item's library-relative path to a validated absolute path /// under its library root (works for both photos and clips). fn resolve_media_path(app_state: &AppState, media: &SegmentMedia) -> Option { @@ -645,9 +858,280 @@ fn resolve_media_path(app_state: &AppState, media: &SegmentMedia) -> Option u32 { + std::env::var("REEL_PREGEN_HOUR") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(3) +} + +/// Env: "1" (default, Monday). Day of week for weekly pre-gen (0=Sun, 1=Mon, ...). +fn pregen_week_dow() -> u32 { + std::env::var("REEL_PREGEN_WEEK_DOW") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(1) +} + +/// Pure: seconds until the next run of `run_hour` given the current local time. +/// Handles same-day vs wrap-around. Recomputed each loop iteration to absorb +/// DST shifts. +pub(crate) fn secs_until_next_run_hour(now: chrono::DateTime, run_hour: u32) -> u64 { + let now_hour = now.hour(); + let diff = if now_hour >= run_hour { + 24 - now_hour + run_hour + } else { + run_hour - now_hour + }; + (diff * 3600) as u64 +} + +/// Load pre-gen parameters: tries the user_ai_prefs DB row first, falls back +/// to env vars, then to server-local defaults. +fn load_pregen_params( + prefs_dao: &web::Data>>>, +) -> (i32, Option, String) { + // Try DB row first + if let Ok(mut dao) = prefs_dao.lock() { + let ctx = opentelemetry::Context::new(); + if let Ok(Some(prefs)) = dao.get_prefs(&ctx) { + let tz = prefs + .tz_offset_minutes + .unwrap_or_else(|| chrono::Local::now().offset().local_minus_utc()); + let voice = prefs.voice; + let library = prefs.library.unwrap_or_else(|| "all".to_string()); + return (tz, voice, library); + } + } + // Fall back to env + let tz = std::env::var("REEL_PREGEN_TZ_OFFSET_MINUTES") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or_else(|| chrono::Local::now().offset().local_minus_utc()); + let voice = std::env::var("REEL_PREGEN_VOICE").ok(); + let library = std::env::var("REEL_PREGEN_LIBRARY") + .ok() + .unwrap_or_else(|| "all".to_string()); + (tz, voice, library) +} + +/// Spawn the nightly pre-generation scheduler. Runs behind `REEL_PREGEN_ENABLED`. +pub(crate) async fn spawn_pregen_scheduler( + app_state: web::Data, + insight_dao: web::Data>>>, + prefs_dao: web::Data>>>, +) { + if std::env::var("REEL_PREGEN_ENABLED").ok() != Some("1".to_string()) { + log::info!("Reel pre-generation scheduler disabled (REEL_PREGEN_ENABLED != 1)"); + return; + } + + let run_hour = pregen_run_hour(); + log::info!( + "Reel pre-generation scheduler enabled, running at hour {} local", + run_hour + ); + + tokio::spawn(async move { + loop { + let now = chrono::Local::now(); + let sleep_secs = secs_until_next_run_hour(now, run_hour); + log::debug!("Next pre-gen run in {}s", sleep_secs); + tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)).await; + + if let Err(e) = run_pregen_batch(&app_state, &insight_dao, &prefs_dao).await { + log::error!("Reel pre-generation batch failed: {}", e); + } + } + }); +} + +/// Run the pre-generation batch for all applicable spans. +async fn run_pregen_batch( + app_state: &AppState, + insight_dao: &web::Data>>>, + prefs_dao: &web::Data>>>, +) -> anyhow::Result<()> { + let now = chrono::Local::now(); + let weekday = now.weekday().num_days_from_sunday(); // 0=Sun, 1=Mon, ... + let day_of_month = now.day(); + + let mut spans = vec!["day"]; + if weekday == pregen_week_dow() { + spans.push("week"); + } + if day_of_month == 1 { + spans.push("month"); + } + + let (tz, voice, library) = load_pregen_params(prefs_dao); + + for span in spans { + if let Err(e) = pregen_one(app_state, insight_dao, span, tz, voice.clone(), &library).await + { + log::error!("Pre-gen failed for span={}: {}", span, e); + } + } + + Ok(()) +} + +/// Pre-generate a single reel for the given span. +async fn pregen_one( + app_state: &AppState, + insight_dao: &web::Data>>>, + span: &str, + tz: i32, + voice: Option, + library: &str, +) -> anyhow::Result<()> { + let memories_span = match span { + "day" => MemoriesSpan::Day, + "week" => MemoriesSpan::Week, + "month" => MemoriesSpan::Month, + _ => MemoriesSpan::Day, + }; + + let selector = ReelSelector::Memories { + span: memories_span, + tz_offset_minutes: tz, + library: if library == "all" { + None + } else { + Some(library.to_string()) + }, + max_segments: 24, + }; + + let exif_dao: Arc>> = Arc::new(StdMutex::new(Box::new( + crate::database::SqliteExifDao::new(), + ))); + let ctx = opentelemetry::Context::new(); + let (planned, reel_meta) = match selector::resolve(app_state, &exif_dao, &ctx, &selector) { + Ok((p, m)) => (p, m), + Err(e) => { + log::warn!("Pre-gen resolve failed for span={}: {}", span, e); + return Ok(()); + } + }; + + if planned.is_empty() { + log::info!("No beats for span={}, skipping", span); + return Ok(()); + } + + // Flatten every media item across beats (in order) into the cache key. + let media: Vec = planned.iter().flat_map(|b| b.media.clone()).collect(); + let key = cache_key(&selector, &media, voice.as_deref()); + + // Dedup: check if fresh ledger row exists + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() as i64; + + let max_age_hours = match span { + "week" => REEL_PRECOMPUTED_WEEK_MAX_AGE_HOURS, + "month" => REEL_PRECOMPUTED_MONTH_MAX_AGE_HOURS, + _ => REEL_PRECOMPUTED_DAY_MAX_AGE_HOURS, + }; + let min_generated_at = now - (max_age_hours as i64 * 3600); + + let is_fresh = { + let mut dao = app_state.precomputed_reel_dao.lock().expect("lock"); + dao.exists_fresh(&ctx, span, "all", RENDER_VERSION as i32, min_generated_at) + .unwrap_or(false) + }; + + if is_fresh { + log::info!("Fresh precomputed reel exists for span={}, skipping", span); + return Ok(()); + } + + // Check if MP4 already on disk (from a previous run that crashed after render) + let mp4_path = reel_mp4_path(app_state, &key); + if mp4_path.exists() { + log::info!( + "Precomputed reel MP4 already exists for key={}, recording ledger and skipping render", + key + ); + // Read title from sidecar if available + let sidecar_path = mp4_path.with_extension("json"); + let title = if sidecar_path.exists() { + let sidecar = tokio::fs::read_to_string(&sidecar_path).await.ok(); + sidecar + .and_then(|s| serde_json::from_str::(&s).ok()) + .map(|s| s.title) + .unwrap_or_else(|| format!("{} reel", span)) + } else { + format!("{} reel", span) + }; + let mut reel_dao = app_state.precomputed_reel_dao.lock().expect("lock"); + reel_dao.record_reel( + &ctx, + &crate::database::models::InsertablePrecomputedReel { + span: span.to_string(), + library_key: "all".to_string(), + cache_key: key.clone(), + output_path: mp4_path.to_string_lossy().to_string(), + title, + media_count: planned.len() as i32, + render_version: RENDER_VERSION as i32, + tz_offset_minutes: tz, + voice: voice.clone(), + generated_at: now, + }, + )?; + return Ok(()); + } + + // Generate the reel + log::info!("Generating precomputed reel for span={}, key={}", span, key); + let photo_count = planned.len() as i32; + let (title, mp4) = produce_reel( + app_state, + insight_dao, + &exif_dao, + planned, + reel_meta, + voice.clone(), + &key, + ScripterMode::Agentic, + None, + ) + .await?; + + // Record to ledger + let mut reel_dao = app_state.precomputed_reel_dao.lock().expect("lock"); + reel_dao.record_reel( + &ctx, + &crate::database::models::InsertablePrecomputedReel { + span: span.to_string(), + library_key: "all".to_string(), + cache_key: key.clone(), + output_path: mp4.to_string_lossy().to_string(), + title, + media_count: photo_count, + render_version: RENDER_VERSION as i32, + tz_offset_minutes: tz, + voice: voice.clone(), + generated_at: now, + }, + )?; + + log::info!("Precomputed reel generated for span={}, key={}", span, key); + Ok(()) +} + #[cfg(test)] mod tests { use super::*; + use crate::ai::face_client::FaceClient; + use crate::libraries::Library; + use crate::video::actors::StreamActor; fn photo(p: &str, lib: i32) -> SegmentMedia { SegmentMedia::Photo { @@ -672,6 +1156,128 @@ mod tests { } } + /// Minimal AppState for tests that only need library lookup. + #[allow(dead_code)] + fn test_app_state() -> AppState { + use crate::ai::InsightGenerator; + use crate::ai::insight_chat::{ChatLockMap, InsightChatService}; + use crate::ai::turn_registry::TurnRegistry; + use crate::ai::{OllamaClient, SmsApiClient}; + use crate::database::{ + ExifDao, InsightDao, InsightGenerationJobDao, PreviewDao, SqliteExifDao, + SqliteInsightDao, SqliteInsightGenerationJobDao, SqlitePreviewDao, + }; + use crate::faces; + use crate::state::AppState; + use crate::tags::SqliteTagDao; + use actix::Actor; + use std::sync::Mutex; + + let temp_dir = tempfile::tempdir().expect("Failed to create temp directory"); + let base_path = temp_dir.path().to_path_buf(); + let base_path_str = base_path.to_string_lossy().to_string(); + + let test_lib = Library { + id: crate::libraries::PRIMARY_LIBRARY_ID, + name: "main".to_string(), + root_path: base_path_str.clone(), + enabled: true, + excluded_dirs: Vec::new(), + }; + + let ollama = OllamaClient::new( + "http://localhost:11434".to_string(), + None, + "llama3.2".to_string(), + None, + ); + let sms_client = SmsApiClient::new("http://localhost:8000".to_string(), None); + let apollo_client = crate::ai::apollo_client::ApolloClient::new(None); + + let insight_dao: std::sync::Arc>> = + std::sync::Arc::new(Mutex::new(Box::new(SqliteInsightDao::new()))); + let exif_dao: std::sync::Arc>> = + std::sync::Arc::new(Mutex::new(Box::new(SqliteExifDao::new()))); + let daily_summary_dao: std::sync::Arc>> = + std::sync::Arc::new(Mutex::new(Box::new( + crate::database::SqliteDailySummaryDao::new(), + ))); + let insight_generator = InsightGenerator::new( + ollama.clone(), + None, + None, + sms_client.clone(), + apollo_client.clone(), + insight_dao.clone(), + exif_dao.clone(), + daily_summary_dao, + std::sync::Arc::new(Mutex::new(Box::new( + crate::database::SqliteCalendarEventDao::new(), + ))), + std::sync::Arc::new(Mutex::new(Box::new( + crate::database::SqliteLocationHistoryDao::new(), + ))), + std::sync::Arc::new(Mutex::new(Box::new( + crate::database::SqliteSearchHistoryDao::new(), + ))), + std::sync::Arc::new(Mutex::new(Box::new(SqliteTagDao::default()))), + std::sync::Arc::new(Mutex::new(Box::new(faces::SqliteFaceDao::new()))), + std::sync::Arc::new(Mutex::new(Box::new( + crate::database::SqliteKnowledgeDao::new(), + ))), + std::sync::Arc::new(Mutex::new(Box::new( + crate::database::SqlitePersonaDao::new(), + ))), + vec![test_lib.clone()], + ); + + let chat_locks: ChatLockMap = + std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())); + let insight_chat = std::sync::Arc::new(InsightChatService::new( + std::sync::Arc::new(insight_generator.clone()), + insight_dao.clone(), + chat_locks, + )); + let turn_registry = std::sync::Arc::new(TurnRegistry::new(300)); + let preview_dao: std::sync::Arc>> = + std::sync::Arc::new(Mutex::new(Box::new(SqlitePreviewDao::new()))); + let insight_job_dao: std::sync::Arc>> = + std::sync::Arc::new(Mutex::new(Box::new(SqliteInsightGenerationJobDao::new()))); + let insight_job_handles: std::sync::Arc< + Mutex>, + > = std::sync::Arc::new(Mutex::new(std::collections::HashMap::new())); + + AppState::new( + std::sync::Arc::new(StreamActor {}.start()), + vec![test_lib], + base_path_str.clone(), + base_path_str.clone(), + base_path_str.clone(), + base_path_str.clone(), + Vec::new(), + ollama, + None, + Vec::new(), + None, + Vec::new(), + sms_client, + insight_generator, + insight_chat, + turn_registry, + preview_dao, + FaceClient::new(None), + crate::ai::clip_client::ClipClient::new(None), + insight_job_dao, + insight_job_handles, + std::sync::Arc::new(Mutex::new(Box::new( + crate::database::SqlitePrecomputedReelDao::new(), + ))), + std::sync::Arc::new(Mutex::new(Box::new( + crate::database::SqliteUserAiPrefsDao::new(), + ))), + ) + } + #[test] fn cache_key_is_stable_for_same_inputs() { let media = vec![photo("a.jpg", 1), photo("b.jpg", 1)]; @@ -724,12 +1330,14 @@ mod tests { date: None, insight_title: None, insight_summary: None, + gps: None, }; let photo_beat = PlannedBeat { media: vec![photo("a.jpg", 1), photo("b.jpg", 1)], date: None, insight_title: None, insight_summary: None, + gps: None, }; assert!(clip_beat.is_clip()); assert!(!photo_beat.is_clip()); @@ -753,6 +1361,7 @@ mod tests { date: Some(1_560_384_000), // 2019-06-13 UTC insight_title: None, insight_summary: None, + gps: None, }; assert!(beat.date_label().unwrap().contains("2019")); @@ -761,7 +1370,77 @@ mod tests { date: None, insight_title: None, insight_summary: None, + gps: None, }; assert_eq!(undated.date_label(), None); } + + #[test] + fn normalize_library_key_returns_id_when_found_numeric() { + let libs = vec![ + Library { + id: 1, + name: "main".to_string(), + root_path: "/tmp/main".to_string(), + enabled: true, + excluded_dirs: Vec::new(), + }, + Library { + id: 7, + name: "archive".to_string(), + root_path: "/tmp/archive".to_string(), + enabled: true, + excluded_dirs: Vec::new(), + }, + ]; + assert_eq!(normalize_library_key(&libs, Some("1")), "1"); + } + + #[test] + fn normalize_library_key_returns_id_when_found_by_name() { + let libs = vec![Library { + id: 1, + name: "main".to_string(), + root_path: "/tmp/main".to_string(), + enabled: true, + excluded_dirs: Vec::new(), + }]; + assert_eq!(normalize_library_key(&libs, Some("main")), "1"); + } + + #[test] + fn normalize_library_key_returns_all_when_absent() { + let libs = vec![Library { + id: 1, + name: "main".to_string(), + root_path: "/tmp/main".to_string(), + enabled: true, + excluded_dirs: Vec::new(), + }]; + assert_eq!(normalize_library_key(&libs, None), "all"); + } + + #[test] + fn normalize_library_key_returns_all_when_empty() { + let libs = vec![Library { + id: 1, + name: "main".to_string(), + root_path: "/tmp/main".to_string(), + enabled: true, + excluded_dirs: Vec::new(), + }]; + assert_eq!(normalize_library_key(&libs, Some("")), "all"); + } + + #[test] + fn normalize_library_key_returns_all_when_unknown() { + let libs = vec![Library { + id: 1, + name: "main".to_string(), + root_path: "/tmp/main".to_string(), + enabled: true, + excluded_dirs: Vec::new(), + }]; + assert_eq!(normalize_library_key(&libs, Some("missing")), "all"); + } } diff --git a/src/reels/script.rs b/src/reels/script.rs index 5be3d64..202a22c 100644 --- a/src/reels/script.rs +++ b/src/reels/script.rs @@ -9,13 +9,20 @@ //! //! The prompt builder and response parser are pure so the contract is //! unit-testable; `generate_script` wires them to the LLM client. +//! +//! The agentic scripter (pre-generation) resolves the backend through the +//! InsightGenerator, builds a read-only tool set, and runs a tool loop to +//! ground the narration in retrieved context before asking for the final JSON. use anyhow::{Context, Result}; use std::sync::Arc; use super::{PlannedBeat, ReelMeta}; +use crate::ai::backend::{BackendKind, SamplingOverrides}; +use crate::ai::insight_generator::InsightGenerator; use crate::ai::llamacpp::LlamaCppClient; -use crate::ai::llm_client::LlmClient; +use crate::ai::llm_client::{LlmClient, Tool}; +use crate::ai::ollama::ChatMessage; /// The narration for a whole reel: a title and one line per beat, in order. #[derive(Debug, Clone, PartialEq)] @@ -35,6 +42,32 @@ can be read aloud in a few seconds. Avoid generic filler like \"what a \ wonderful day\" — if you have little to go on, simply describe the moment \ plainly."; +/// Agentic scripter system prompt: richer version that tells the model it may +/// call read-only tools to ground each line. +const AGENTIC_SYSTEM_PROMPT: &str = "You are narrating a personal memory reel — a short \ +slideshow of someone's own photos set to a spoken voiceover. Write warm, \ +specific, first-person narration as if the person is gently looking back on \ +their own memories. Each line plays over one moment, which may be a quick burst \ +of several photos, so narrate the moment as a whole rather than a single frame. \ +Be concrete and grounded in the details given; never invent names, places, or \ +events that aren't supported. Keep each line to one or two short sentences that \ +can be read aloud in a few seconds. Avoid generic filler like \"what a \ +wonderful day\" — if you have little to go on, simply describe the moment \ +plainly.\n\nYou may call read-only tools (search_messages, get_file_tags, \ +reverse_geocode, get_current_datetime, recall_entities, recall_facts_for_photo, \ +recall_facts_for_entity) to ground each line in real context. Never invent \ +details. Return ONLY the JSON object, no prose or code fences."; + +/// Maximum agentic tool iterations for pre-generation. Tunable via +/// `REEL_PREGEN_MAX_TOOL_ITERS` (default 8). +fn reel_pregen_max_tool_iters() -> usize { + std::env::var("REEL_PREGEN_MAX_TOOL_ITERS") + .ok() + .and_then(|s| s.trim().parse::().ok()) + .filter(|x| *x > 0) + .unwrap_or(8) +} + /// Build the (system, user) prompt pair for the scripter. The user message /// describes each beat in order and asks for strict JSON back. pub fn build_script_messages(meta: &ReelMeta, beats: &[PlannedBeat]) -> (String, String) { @@ -81,6 +114,61 @@ pub fn build_script_messages(meta: &ReelMeta, beats: &[PlannedBeat]) -> (String, (SYSTEM_PROMPT.to_string(), user) } +/// Build a richer (system, user) prompt pair for the agentic scripter. The +/// system prompt tells the model it may call read-only tools to ground each +/// line. The user message uses the same per-beat enumeration as +/// `build_script_messages` plus a GPS line per beat when available. +pub fn build_agentic_script_messages(meta: &ReelMeta, beats: &[PlannedBeat]) -> Vec { + let mut user = String::new(); + user.push_str(&format!( + "This reel has {} moments surfaced as memories {}.\n\n", + beats.len(), + meta.span_phrase() + )); + if !meta.years.is_empty() { + let years: Vec = meta.years.iter().map(|y| y.to_string()).collect(); + user.push_str(&format!("They span the years: {}.\n\n", years.join(", "))); + } + user.push_str("Moments, in the order they will appear:\n"); + for (i, beat) in beats.iter().enumerate() { + user.push_str(&format!("\n[{}]", i + 1)); + if let Some(date) = beat.date_label() { + user.push_str(&format!(" {date}")); + } + if beat.is_clip() { + user.push_str(" (a video clip)"); + } else if beat.media.len() > 1 { + user.push_str(&format!(" (a burst of {} photos)", beat.media.len())); + } + if let Some((lat, lon)) = beat.gps { + user.push_str(&format!("\n GPS: {:.4}, {:.4}", lat, lon)); + } + user.push('\n'); + match (&beat.insight_title, &beat.insight_summary) { + (Some(t), Some(s)) if !s.trim().is_empty() => { + user.push_str(&format!(" Known context: {t} — {s}\n")); + } + (Some(t), _) => user.push_str(&format!(" Known context: {t}\n")), + (_, Some(s)) if !s.trim().is_empty() => { + user.push_str(&format!(" Known context: {s}\n")); + } + _ => user.push_str(" (no extra context — narrate plainly from the date)\n"), + } + } + user.push_str(&format!( + "\nReturn ONLY a JSON object, no prose or code fences, shaped exactly:\n\ + {{\"title\": \"\", \"segments\": [\"\", \ + \"\", ... ]}}\n\ + The \"segments\" array MUST have exactly {} items, one per moment in order.", + beats.len() + )); + + vec![ + ChatMessage::system(AGENTIC_SYSTEM_PROMPT.to_string()), + ChatMessage::user(user), + ] +} + /// Parse the model's response into a script with exactly `n` lines. Tolerant of /// code fences and surrounding prose, and of both `segments: [".."]` and /// `segments: [{"narration": ".."}]` shapes. Missing/extra lines are padded or @@ -198,6 +286,74 @@ pub async fn generate_script( Ok(parse_script_response(&raw, beats.len())) } +/// Agentic version of script generation: resolves the backend via the +/// InsightGenerator (honouring LLM_BACKEND, model overrides, etc.), builds +/// a read-only tool set, runs the tool loop, then parses the JSON response. +/// Returns the same ReelScript shape. On failure the caller may fall back to +/// `generate_script`. +pub async fn generate_script_agentic( + generator: &InsightGenerator, + meta: &ReelMeta, + beats: &[PlannedBeat], +) -> Result { + // 1. Resolve the backend. Bail if the local model lacks tool-calling. + let backend = generator + .resolve_backend( + BackendKind::Local, + &SamplingOverrides { + model: None, + num_ctx: None, + temperature: None, + top_p: None, + top_k: None, + min_p: None, + }, + ) + .await + .context("resolving backend for agentic script")?; + + // 2. Build the read-only tool set. Start from the persona gate (no + // persona context, so corrections are closed), force has_vision=false, + // then filter out write tools. + let gate = generator.current_gate_opts_for_persona(false, None); + let all_tools = InsightGenerator::build_tool_definitions(gate); + let read_only_names: std::collections::HashSet<&str> = [ + "search_rag", + "search_messages", + "get_sms_messages", + "get_calendar_events", + "get_location_history", + "get_file_tags", + "get_faces_in_photo", + "reverse_geocode", + "get_personal_place_at", + "recall_entities", + "recall_facts_for_photo", + "recall_facts_for_entity", + "get_current_datetime", + ] + .into_iter() + .collect(); + let tools: Vec = all_tools + .into_iter() + .filter(|t| read_only_names.contains(t.function.name.as_str())) + .collect(); + + // 3. Build the agentic prompt messages. + let messages = build_agentic_script_messages(meta, beats); + + // 4. Run the tool loop. + let max_iter = reel_pregen_max_tool_iters(); + let raw = generator + .run_readonly_tool_loop(&backend, messages, tools, max_iter) + .await + .context("agentic tool loop failed")?; + + // 5. Strip any think-blocks the model may have emitted, then parse. + let raw = crate::ai::llm_client::strip_think_blocks(&raw); + Ok(parse_script_response(&raw, beats.len())) +} + #[cfg(test)] mod tests { use super::*; @@ -220,6 +376,7 @@ mod tests { date: Some(1_560_000_000 + i as i64 * 86_400), insight_title: None, insight_summary: None, + gps: None, }) .collect() } diff --git a/src/reels/selector.rs b/src/reels/selector.rs index d096f6d..a02cbb8 100644 --- a/src/reels/selector.rs +++ b/src/reels/selector.rs @@ -207,6 +207,7 @@ fn form_photo_beats( date, insight_title: None, insight_summary: None, + gps: None, } }) .collect() @@ -255,6 +256,7 @@ pub fn form_beats( date: v.created, insight_title: None, insight_summary: None, + gps: None, }); } @@ -334,15 +336,20 @@ fn distinct_years(items: &[memories::MemoryItem], tz: Option) -> Ve years } -/// Background pass: fill each beat's cached insight (title + summary) from its -/// lead photo, where one exists. Best-effort — a missing or errored lookup -/// leaves the fields `None` and the scripter narrates from the date alone. +/// Background pass: fill each beat's cached insight (title + summary) and +/// GPS coordinates from its lead photo, where one exists. Best-effort — a +/// missing or errored lookup leaves the fields `None` and the scripter +/// narrates from the date alone. pub fn enrich( insight_dao: &Mutex>, + exif_dao: &Mutex>, span_context: &opentelemetry::Context, beats: &mut [PlannedBeat], ) { - let Ok(mut dao) = insight_dao.lock() else { + let Ok(mut insight_dao) = insight_dao.lock() else { + return; + }; + let Ok(mut exif_dao) = exif_dao.lock() else { return; }; for beat in beats.iter_mut() { @@ -352,10 +359,17 @@ pub fn enrich( } None => continue, }; - if let Ok(Some(insight)) = dao.get_insight(span_context, &rel_path) { + if let Ok(Some(insight)) = insight_dao.get_insight(span_context, &rel_path) { beat.insight_title = Some(insight.title); beat.insight_summary = Some(insight.summary); } + // Enrich GPS from EXIF when the lead media is a photo. + if let Some(SegmentMedia::Photo { .. }) = beat.media.first() + && let Ok(Some(exif)) = exif_dao.get_exif(span_context, &rel_path) + && let (Some(lat), Some(lon)) = (exif.gps_latitude, exif.gps_longitude) + { + beat.gps = Some((lat as f64, lon as f64)); + } } } diff --git a/src/state.rs b/src/state.rs index bf894f3..33e8e3f 100644 --- a/src/state.rs +++ b/src/state.rs @@ -8,9 +8,10 @@ use crate::ai::turn_registry::TurnRegistry; use crate::ai::{InsightGenerator, OllamaClient, SmsApiClient}; use crate::database::{ CalendarEventDao, DailySummaryDao, ExifDao, InsightDao, InsightGenerationJobDao, KnowledgeDao, - LocationHistoryDao, SearchHistoryDao, SqliteCalendarEventDao, SqliteDailySummaryDao, - SqliteExifDao, SqliteInsightDao, SqliteInsightGenerationJobDao, SqliteKnowledgeDao, - SqliteLocationHistoryDao, SqliteSearchHistoryDao, connect, + LocationHistoryDao, PrecomputedReelDao, SearchHistoryDao, SqliteCalendarEventDao, + SqliteDailySummaryDao, SqliteExifDao, SqliteInsightDao, SqliteInsightGenerationJobDao, + SqliteKnowledgeDao, SqliteLocationHistoryDao, SqlitePrecomputedReelDao, SqliteSearchHistoryDao, + SqliteUserAiPrefsDao, UserAiPrefsDao, connect, }; use crate::database::{PreviewDao, SqlitePreviewDao}; use crate::faces; @@ -88,6 +89,14 @@ pub struct AppState { pub clip_client: ClipClient, pub insight_job_dao: Arc>>, pub insight_job_handles: Arc>>, + /// Ledger for precomputed memory reels. Written by the nightly agentic + /// job (Section D); read by `GET /reels/precomputed` (Section C). + #[allow(dead_code)] + pub precomputed_reel_dao: Arc>>, + /// User AI preferences (voice, timezone, library). Mirrored by the + /// client; read by the nightly pre-generation scheduler. + #[allow(dead_code)] + pub user_ai_prefs_dao: Arc>>, } impl AppState { @@ -101,6 +110,7 @@ impl AppState { self.libraries.iter().find(|l| l.id == id) } + #[allow(dead_code)] pub fn library_by_name(&self, name: &str) -> Option<&Library> { self.libraries.iter().find(|l| l.name == name) } @@ -129,6 +139,8 @@ impl AppState { clip_client: ClipClient, insight_job_dao: Arc>>, insight_job_handles: Arc>>, + precomputed_reel_dao: Arc>>, + user_ai_prefs_dao: Arc>>, ) -> Self { assert!( !libraries_vec.is_empty(), @@ -187,6 +199,8 @@ impl AppState { clip_client, insight_job_dao, insight_job_handles, + precomputed_reel_dao, + user_ai_prefs_dao, } } @@ -267,6 +281,14 @@ impl Default for AppState { let insight_job_handles: Arc>> = Arc::new(Mutex::new(HashMap::new())); + // Initialize precomputed reel DAO (nightly pre-generation ledger) + let precomputed_reel_dao: Arc>> = + Arc::new(Mutex::new(Box::new(SqlitePrecomputedReelDao::new()))); + + // Initialize user AI preferences DAO (Section E) + let user_ai_prefs_dao: Arc>> = + Arc::new(Mutex::new(Box::new(SqliteUserAiPrefsDao::new()))); + // Load base path and ensure the primary library row reflects it. let base_path = env::var("BASE_PATH").expect("BASE_PATH was not set in the env"); let mut seed_conn = connect(); @@ -344,6 +366,8 @@ impl Default for AppState { clip_client, insight_job_dao, insight_job_handles, + precomputed_reel_dao, + user_ai_prefs_dao, ) } } @@ -553,6 +577,8 @@ impl AppState { ClipClient::new(None), // disabled in test Arc::new(Mutex::new(Box::new(SqliteInsightGenerationJobDao::new()))), // placeholder for test Arc::new(Mutex::new(HashMap::new())), // placeholder for test + Arc::new(Mutex::new(Box::new(SqlitePrecomputedReelDao::new()))), // placeholder for test + Arc::new(Mutex::new(Box::new(SqliteUserAiPrefsDao::new()))), // placeholder for test ) } } diff --git a/src/tags.rs b/src/tags.rs index f3e0135..3dc0859 100644 --- a/src/tags.rs +++ b/src/tags.rs @@ -168,7 +168,7 @@ async fn get_tags( // this file, so tags added under one library show up under the // others when they hold the same file. Falls back to direct rel_path // match when the file hasn't been hashed yet. - let library = libraries::resolve_library_param(&app_state, request.library.as_deref()) + let library = libraries::resolve_library_param_state(&app_state, request.library.as_deref()) .ok() .flatten() .unwrap_or_else(|| app_state.primary_library());