From 5c9ee5652784785bd736c5152bdf9f82165b3074 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Sat, 13 Jun 2026 14:59:00 -0400 Subject: [PATCH] Fix agentic reel audit issues: midnight bug, DAO wiring, dead code, DST timezone, validation Blocking fixes: - secs_until_next_run_hour: same-hour now returns 0 instead of 24h - capture_prefs: called at both handler return points, never fails request - capture_prefs: resolves library param, upserts to user_ai_prefs via DAO - Scheduler: uses AppState DAOs instead of separate connections - Pregen dedup: uses resolved library param instead of hardcoded 'all' - run_readonly_tool_loop: added #[allow(dead_code)] (used in main.rs only) - run_readonly_tool_loop: removed dead messages.push() call - InsightGenerator: added exif_dao() getter for scheduler reuse Medium fixes: - Input validation: run_hour clamped 0-23, week_dow clamped 0-6 - DST-sensitive timezone: fixed_tz_offset() with env var config Low fixes: - Documented REEL_PREGEN_MAX_TOOL_ITERS and REEL_PREGEN_TZ_FIXED_MINUTES - Removed dead test_app_state function and unused imports Also fix: UpsertUserAiPrefs import path, chrono::Local::with_ymd_and_hms requires TimeZone trait + .single(), unwrap_or_else closure simplification --- .env.example | 5 + src/ai/insight_generator.rs | 13 +- src/main.rs | 18 +-- src/reels/mod.rs | 295 ++++++++++++++++-------------------- 4 files changed, 151 insertions(+), 180 deletions(-) diff --git a/.env.example b/.env.example index bafc0c8..a7bd7e5 100644 --- a/.env.example +++ b/.env.example @@ -150,8 +150,13 @@ SEARCH_RAG_RERANK=0 # Timezone offset in minutes from UTC (e.g., -480 = PST). Defaults to # the server's local timezone. # REEL_PREGEN_TZ_OFFSET_MINUTES= +# Fixed timezone offset — overrides auto-detect to avoid DST shifts. +# When set, both the DB fallback and env fallback use this value. +# REEL_PREGEN_TZ_FIXED_MINUTES=-480 # Voice ID for narration (e.g., "grandma"). Falls back to the value # stored in the user_ai_prefs DB row when set. # REEL_PREGEN_VOICE= # Library filter: a library id (e.g. "1") or "all" for every library. # REEL_PREGEN_LIBRARY=all +# Max agentic tool iterations for pre-gen scripter. Default 8. +# REEL_PREGEN_MAX_TOOL_ITERS=8 diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index 4871c2e..4ff8494 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -217,6 +217,13 @@ impl InsightGenerator { &self.insight_dao } + /// Accessor for the EXIF DAO (used by the reel scheduler to resolve + /// GPS enrichment without creating a separate DB connection). + #[allow(dead_code)] + pub fn exif_dao(&self) -> &Arc>> { + &self.exif_dao + } + /// Whether the optional Apollo Places integration is wired up. Drives /// tool-definition gating (no point offering `get_personal_place_at` /// when Apollo is unreachable) — exposed publicly so `insight_chat` @@ -4509,6 +4516,9 @@ Return ONLY the summary, nothing else."#, /// /// Calls `execute_tool` with empty file/image context; enabled tools /// never read those fields. + /// + /// Only used by the `reels` module (compiled in `main.rs`, not `lib.rs`), + /// so the `#[allow(dead_code)]` suppresses the lib-target warning. #[allow(dead_code)] pub(crate) async fn run_readonly_tool_loop( &self, @@ -4592,8 +4602,7 @@ Return ONLY the summary, nothing else."#, .chat() .chat_with_tools(messages.clone(), vec![]) .await?; - final_content = final_response.content.clone(); - messages.push(final_response); + final_content = final_response.content; } Ok(final_content) diff --git a/src/main.rs b/src/main.rs index e3ded45..dd2868f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -268,23 +268,7 @@ fn main() -> std::io::Result<()> { } // Spawn the nightly pre-generation scheduler (Section D). - { - use crate::database::{ - InsightDao, SqliteInsightDao, SqliteUserAiPrefsDao, UserAiPrefsDao, - }; - - let insight_dao: Arc>> = - Arc::new(Mutex::new(Box::new(SqliteInsightDao::new()))); - let prefs_dao: Arc>> = - Arc::new(Mutex::new(Box::new(SqliteUserAiPrefsDao::new()))); - - reels::spawn_pregen_scheduler( - app_state.clone(), - web::Data::new(insight_dao), - web::Data::new(prefs_dao), - ) - .await; - } + reels::spawn_pregen_scheduler(app_state.clone()).await; HttpServer::new(move || { let user_dao = SqliteUserDao::new(); diff --git a/src/reels/mod.rs b/src/reels/mod.rs index c51822c..1fc5b3b 100644 --- a/src/reels/mod.rs +++ b/src/reels/mod.rs @@ -56,9 +56,46 @@ pub fn normalize_library_key(libs: &[Library], param: Option<&str>) -> String { } } +/// Best-effort: mirror the latest client reel params into `user_ai_prefs` +/// so the nightly pre-gen scheduler can pick them up. Never fails the +/// caller regardless of DB errors. +fn capture_prefs( + app_state: &AppState, + prefs_dao: &web::Data>>>, + req: &web::Json, + library_param: Option<&str>, +) -> Result<(), anyhow::Error> { + use crate::database::models::UpsertUserAiPrefs; + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + let library = match library_param { + Some(p) if !p.is_empty() => { + // Resolve to the actual library id for the DB row. + normalize_library_key(&app_state.libraries, Some(p)) + } + _ => "all".to_string(), + }; + let mut dao = prefs_dao.lock().expect("lock"); + let ctx = opentelemetry::Context::new(); + dao.upsert_prefs( + &ctx, + &UpsertUserAiPrefs { + voice: req.voice.clone().filter(|s| !s.is_empty()), + tz_offset_minutes: Some( + req.timezone_offset_minutes + .unwrap_or_else(|| chrono::Local::now().offset().local_minus_utc()), + ), + library: Some(library), + updated_at: now as i64, + }, + ) + .map_err(|e| anyhow::anyhow!("failed to upsert user_ai_prefs: {e}")) +} + /// Which scripting strategy to use for the reel narration. #[derive(Clone, Copy)] -#[allow(dead_code)] pub enum ScripterMode { /// Fast path: single LLM call via the direct client. Fast, @@ -348,6 +385,7 @@ pub async fn create_reel_handler( app_state: web::Data, exif_dao: web::Data>>, insight_dao: web::Data>>, + prefs_dao: web::Data>>>, ) -> impl Responder { let span_context = extract_context_from_request(&http_request); @@ -416,7 +454,9 @@ pub async fn create_reel_handler( abort: None, }, ); - return HttpResponse::Accepted().json(ReelJobCreatedResponse { + // Capture params for passive prefs mirror (best-effort, never fails). + let _ = capture_prefs(&app_state, &prefs_dao, &req, req.library.as_deref()); + HttpResponse::Accepted().json(ReelJobCreatedResponse { job_id: job_id.to_string(), status: ReelJobStatus::Done, }); @@ -474,6 +514,9 @@ pub async fn create_reel_handler( }); with_job(job_id, |job| job.abort = Some(handle.abort_handle())); + // Capture params for passive prefs mirror (best-effort, never fails). + let _ = capture_prefs(&app_state, &prefs_dao, &req, req.library.as_deref()); + HttpResponse::Accepted().json(ReelJobCreatedResponse { job_id: job_id.to_string(), status: ReelJobStatus::Queued, @@ -861,18 +904,22 @@ fn resolve_media_path(app_state: &AppState, media: &SegmentMedia) -> Option u32 { std::env::var("REEL_PREGEN_HOUR") .ok() - .and_then(|v| v.parse().ok()) + .and_then(|v| v.trim().parse().ok()) + .filter(|h| *h <= 23) .unwrap_or(3) } /// Env: "1" (default, Monday). Day of week for weekly pre-gen (0=Sun, 1=Mon, ...). +/// Clamped to 0-6; invalid values fall back to default. fn pregen_week_dow() -> u32 { std::env::var("REEL_PREGEN_WEEK_DOW") .ok() - .and_then(|v| v.parse().ok()) + .and_then(|v| v.trim().parse().ok()) + .filter(|d| *d <= 6) .unwrap_or(1) } @@ -881,8 +928,10 @@ fn pregen_week_dow() -> u32 { /// DST shifts. pub(crate) fn secs_until_next_run_hour(now: chrono::DateTime, run_hour: u32) -> u64 { let now_hour = now.hour(); - let diff = if now_hour >= run_hour { + let diff = if now_hour > run_hour { 24 - now_hour + run_hour + } else if now_hour == run_hour { + 0 } else { run_hour - now_hour }; @@ -891,26 +940,22 @@ pub(crate) fn secs_until_next_run_hour(now: chrono::DateTime, run /// Load pre-gen parameters: tries the user_ai_prefs DB row first, falls back /// to env vars, then to server-local defaults. -fn load_pregen_params( - prefs_dao: &web::Data>>>, -) -> (i32, Option, String) { +fn load_pregen_params(app_state: &AppState) -> (i32, Option, String) { // Try DB row first - if let Ok(mut dao) = prefs_dao.lock() { + if let Ok(mut dao) = app_state.user_ai_prefs_dao.lock() { let ctx = opentelemetry::Context::new(); if let Ok(Some(prefs)) = dao.get_prefs(&ctx) { - let tz = prefs - .tz_offset_minutes - .unwrap_or_else(|| chrono::Local::now().offset().local_minus_utc()); + let tz = prefs.tz_offset_minutes.unwrap_or_else(fixed_tz_offset); let voice = prefs.voice; let library = prefs.library.unwrap_or_else(|| "all".to_string()); return (tz, voice, library); } } - // Fall back to env + // Fall back to env (explicit offset overrides auto-detect) let tz = std::env::var("REEL_PREGEN_TZ_OFFSET_MINUTES") .ok() .and_then(|v| v.parse().ok()) - .unwrap_or_else(|| chrono::Local::now().offset().local_minus_utc()); + .unwrap_or_else(fixed_tz_offset); let voice = std::env::var("REEL_PREGEN_VOICE").ok(); let library = std::env::var("REEL_PREGEN_LIBRARY") .ok() @@ -918,12 +963,19 @@ fn load_pregen_params( (tz, voice, library) } +/// Fixed timezone offset: reads `REEL_PREGEN_TZ_FIXED_MINUTES` (e.g. "-480" +/// for US Eastern) when set, falling back to the system local offset. Using +/// a fixed offset avoids DST shifts changing the pre-gen schedule halfway +/// through the year. +fn fixed_tz_offset() -> i32 { + std::env::var("REEL_PREGEN_TZ_FIXED_MINUTES") + .ok() + .and_then(|v| v.trim().parse().ok()) + .unwrap_or_else(|| chrono::Local::now().offset().local_minus_utc()) +} + /// Spawn the nightly pre-generation scheduler. Runs behind `REEL_PREGEN_ENABLED`. -pub(crate) async fn spawn_pregen_scheduler( - app_state: web::Data, - insight_dao: web::Data>>>, - prefs_dao: web::Data>>>, -) { +pub(crate) async fn spawn_pregen_scheduler(app_state: web::Data) { if std::env::var("REEL_PREGEN_ENABLED").ok() != Some("1".to_string()) { log::info!("Reel pre-generation scheduler disabled (REEL_PREGEN_ENABLED != 1)"); return; @@ -942,7 +994,7 @@ pub(crate) async fn spawn_pregen_scheduler( log::debug!("Next pre-gen run in {}s", sleep_secs); tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)).await; - if let Err(e) = run_pregen_batch(&app_state, &insight_dao, &prefs_dao).await { + if let Err(e) = run_pregen_batch(&app_state).await { log::error!("Reel pre-generation batch failed: {}", e); } } @@ -950,11 +1002,7 @@ pub(crate) async fn spawn_pregen_scheduler( } /// Run the pre-generation batch for all applicable spans. -async fn run_pregen_batch( - app_state: &AppState, - insight_dao: &web::Data>>>, - prefs_dao: &web::Data>>>, -) -> anyhow::Result<()> { +async fn run_pregen_batch(app_state: &AppState) -> anyhow::Result<()> { let now = chrono::Local::now(); let weekday = now.weekday().num_days_from_sunday(); // 0=Sun, 1=Mon, ... let day_of_month = now.day(); @@ -967,11 +1015,10 @@ async fn run_pregen_batch( spans.push("month"); } - let (tz, voice, library) = load_pregen_params(prefs_dao); + let (tz, voice, library) = load_pregen_params(app_state); for span in spans { - if let Err(e) = pregen_one(app_state, insight_dao, span, tz, voice.clone(), &library).await - { + if let Err(e) = pregen_one(app_state, span, tz, voice.clone(), &library).await { log::error!("Pre-gen failed for span={}: {}", span, e); } } @@ -982,7 +1029,6 @@ async fn run_pregen_batch( /// Pre-generate a single reel for the given span. async fn pregen_one( app_state: &AppState, - insight_dao: &web::Data>>>, span: &str, tz: i32, voice: Option, @@ -1006,11 +1052,10 @@ async fn pregen_one( max_segments: 24, }; - let exif_dao: Arc>> = Arc::new(StdMutex::new(Box::new( - crate::database::SqliteExifDao::new(), - ))); + let exif_dao = app_state.insight_generator.exif_dao(); + let insight_dao = app_state.insight_generator.insight_dao(); let ctx = opentelemetry::Context::new(); - let (planned, reel_meta) = match selector::resolve(app_state, &exif_dao, &ctx, &selector) { + let (planned, reel_meta) = match selector::resolve(app_state, exif_dao, &ctx, &selector) { Ok((p, m)) => (p, m), Err(e) => { log::warn!("Pre-gen resolve failed for span={}: {}", span, e); @@ -1042,7 +1087,7 @@ async fn pregen_one( let is_fresh = { let mut dao = app_state.precomputed_reel_dao.lock().expect("lock"); - dao.exists_fresh(&ctx, span, "all", RENDER_VERSION as i32, min_generated_at) + dao.exists_fresh(&ctx, span, library, RENDER_VERSION as i32, min_generated_at) .unwrap_or(false) }; @@ -1074,7 +1119,7 @@ async fn pregen_one( &ctx, &crate::database::models::InsertablePrecomputedReel { span: span.to_string(), - library_key: "all".to_string(), + library_key: library.to_string(), cache_key: key.clone(), output_path: mp4_path.to_string_lossy().to_string(), title, @@ -1094,7 +1139,7 @@ async fn pregen_one( let (title, mp4) = produce_reel( app_state, insight_dao, - &exif_dao, + exif_dao, planned, reel_meta, voice.clone(), @@ -1110,7 +1155,7 @@ async fn pregen_one( &ctx, &crate::database::models::InsertablePrecomputedReel { span: span.to_string(), - library_key: "all".to_string(), + library_key: library.to_string(), cache_key: key.clone(), output_path: mp4.to_string_lossy().to_string(), title, @@ -1129,9 +1174,8 @@ async fn pregen_one( #[cfg(test)] mod tests { use super::*; - use crate::ai::face_client::FaceClient; use crate::libraries::Library; - use crate::video::actors::StreamActor; + use chrono::TimeZone; fn photo(p: &str, lib: i32) -> SegmentMedia { SegmentMedia::Photo { @@ -1156,128 +1200,6 @@ mod tests { } } - /// Minimal AppState for tests that only need library lookup. - #[allow(dead_code)] - fn test_app_state() -> AppState { - use crate::ai::InsightGenerator; - use crate::ai::insight_chat::{ChatLockMap, InsightChatService}; - use crate::ai::turn_registry::TurnRegistry; - use crate::ai::{OllamaClient, SmsApiClient}; - use crate::database::{ - ExifDao, InsightDao, InsightGenerationJobDao, PreviewDao, SqliteExifDao, - SqliteInsightDao, SqliteInsightGenerationJobDao, SqlitePreviewDao, - }; - use crate::faces; - use crate::state::AppState; - use crate::tags::SqliteTagDao; - use actix::Actor; - use std::sync::Mutex; - - let temp_dir = tempfile::tempdir().expect("Failed to create temp directory"); - let base_path = temp_dir.path().to_path_buf(); - let base_path_str = base_path.to_string_lossy().to_string(); - - let test_lib = Library { - id: crate::libraries::PRIMARY_LIBRARY_ID, - name: "main".to_string(), - root_path: base_path_str.clone(), - enabled: true, - excluded_dirs: Vec::new(), - }; - - let ollama = OllamaClient::new( - "http://localhost:11434".to_string(), - None, - "llama3.2".to_string(), - None, - ); - let sms_client = SmsApiClient::new("http://localhost:8000".to_string(), None); - let apollo_client = crate::ai::apollo_client::ApolloClient::new(None); - - let insight_dao: std::sync::Arc>> = - std::sync::Arc::new(Mutex::new(Box::new(SqliteInsightDao::new()))); - let exif_dao: std::sync::Arc>> = - std::sync::Arc::new(Mutex::new(Box::new(SqliteExifDao::new()))); - let daily_summary_dao: std::sync::Arc>> = - std::sync::Arc::new(Mutex::new(Box::new( - crate::database::SqliteDailySummaryDao::new(), - ))); - let insight_generator = InsightGenerator::new( - ollama.clone(), - None, - None, - sms_client.clone(), - apollo_client.clone(), - insight_dao.clone(), - exif_dao.clone(), - daily_summary_dao, - std::sync::Arc::new(Mutex::new(Box::new( - crate::database::SqliteCalendarEventDao::new(), - ))), - std::sync::Arc::new(Mutex::new(Box::new( - crate::database::SqliteLocationHistoryDao::new(), - ))), - std::sync::Arc::new(Mutex::new(Box::new( - crate::database::SqliteSearchHistoryDao::new(), - ))), - std::sync::Arc::new(Mutex::new(Box::new(SqliteTagDao::default()))), - std::sync::Arc::new(Mutex::new(Box::new(faces::SqliteFaceDao::new()))), - std::sync::Arc::new(Mutex::new(Box::new( - crate::database::SqliteKnowledgeDao::new(), - ))), - std::sync::Arc::new(Mutex::new(Box::new( - crate::database::SqlitePersonaDao::new(), - ))), - vec![test_lib.clone()], - ); - - let chat_locks: ChatLockMap = - std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())); - let insight_chat = std::sync::Arc::new(InsightChatService::new( - std::sync::Arc::new(insight_generator.clone()), - insight_dao.clone(), - chat_locks, - )); - let turn_registry = std::sync::Arc::new(TurnRegistry::new(300)); - let preview_dao: std::sync::Arc>> = - std::sync::Arc::new(Mutex::new(Box::new(SqlitePreviewDao::new()))); - let insight_job_dao: std::sync::Arc>> = - std::sync::Arc::new(Mutex::new(Box::new(SqliteInsightGenerationJobDao::new()))); - let insight_job_handles: std::sync::Arc< - Mutex>, - > = std::sync::Arc::new(Mutex::new(std::collections::HashMap::new())); - - AppState::new( - std::sync::Arc::new(StreamActor {}.start()), - vec![test_lib], - base_path_str.clone(), - base_path_str.clone(), - base_path_str.clone(), - base_path_str.clone(), - Vec::new(), - ollama, - None, - Vec::new(), - None, - Vec::new(), - sms_client, - insight_generator, - insight_chat, - turn_registry, - preview_dao, - FaceClient::new(None), - crate::ai::clip_client::ClipClient::new(None), - insight_job_dao, - insight_job_handles, - std::sync::Arc::new(Mutex::new(Box::new( - crate::database::SqlitePrecomputedReelDao::new(), - ))), - std::sync::Arc::new(Mutex::new(Box::new( - crate::database::SqliteUserAiPrefsDao::new(), - ))), - ) - } - #[test] fn cache_key_is_stable_for_same_inputs() { let media = vec![photo("a.jpg", 1), photo("b.jpg", 1)]; @@ -1443,4 +1365,55 @@ mod tests { }]; assert_eq!(normalize_library_key(&libs, Some("missing")), "all"); } + + #[test] + fn secs_until_next_run_hour_same_hour_returns_zero() { + let dt = chrono::Local + .with_ymd_and_hms(2026, 6, 13, 3, 30, 0) + .single() + .expect("valid datetime"); + assert_eq!(secs_until_next_run_hour(dt, 3), 0); + } + + #[test] + fn secs_until_next_run_hour_future_today_returns_remaining() { + let dt = chrono::Local + .with_ymd_and_hms(2026, 6, 13, 10, 0, 0) + .single() + .expect("valid datetime"); + assert_eq!(secs_until_next_run_hour(dt, 14), 4 * 3600); + } + + #[test] + fn secs_until_next_run_hour_past_today_wraps() { + let dt = chrono::Local + .with_ymd_and_hms(2026, 6, 13, 20, 0, 0) + .single() + .expect("valid datetime"); + assert_eq!(secs_until_next_run_hour(dt, 3), (24 - 20 + 3) * 3600); + } + + #[test] + fn secs_until_next_run_hour_midnight() { + let dt = chrono::Local + .with_ymd_and_hms(2026, 6, 13, 0, 0, 0) + .single() + .expect("valid datetime"); + // 0:00, run at 3 → 3 hours + assert_eq!(secs_until_next_run_hour(dt, 3), 3 * 3600); + // 0:00, run at 0 → 0 (immediate) + assert_eq!(secs_until_next_run_hour(dt, 0), 0); + } + + #[test] + fn secs_until_next_run_hour_last_hour() { + let dt = chrono::Local + .with_ymd_and_hms(2026, 6, 13, 23, 30, 0) + .single() + .expect("valid datetime"); + // 23:30, run at 23 → 0 (still in hour 23) + assert_eq!(secs_until_next_run_hour(dt, 23), 0); + // 23:30, run at 0 → 1 hour + assert_eq!(secs_until_next_run_hour(dt, 0), 3600); + } }