From 9f8a69fc6d139443aa05c0c52ca5775dc5ebeb50 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Tue, 12 May 2026 12:54:37 -0400 Subject: [PATCH] Split main.rs: extract watcher loop into src/watcher.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit main.rs drops from 1200 → 346 lines (90% smaller than the pre-branch 3542). What's left is the startup wiring it was always meant to be: .env, migrations, AppState construction, route registration, server bind. The four background-loop functions move into src/watcher.rs: - watch_files (310 lines) — quick/full scan tick, per-library probe, backfill drain dispatch, missing-file scan, back-ref refresh, orphan GC. - process_new_files (351 lines) — file walk → EXIF write → face-candidate build → HLS / preview-clip queueing → reconciliation. The "biggest untested chunk" from the earlier audit. - cleanup_orphaned_playlists (167 lines) — separate slower-tick thread. - playlist_needs_generation — small mtime-comparison helper. Plus 4 unit tests for playlist_needs_generation (covers missing playlist, newer playlist, newer video, video-missing-metadata fallback). main.rs's imports correspondingly shrink — Addr, HashSet, WalkDir, Utc, InsertImageExif, and the bulk of video::actors all leave with the watcher. CLAUDE.md updated to reflect the new module layout (layered architecture box + module map for the face-detection section). cargo test --bin image-api: 329 passing (no regression). Co-Authored-By: Claude Opus 4.7 (1M context) --- CLAUDE.md | 15 +- src/main.rs | 869 +------------------------------------------- src/watcher.rs | 951 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 967 insertions(+), 868 deletions(-) create mode 100644 src/watcher.rs diff --git a/CLAUDE.md b/CLAUDE.md index 1e34b44..3924fb5 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -76,7 +76,10 @@ cargo run --bin cleanup_files -- --base-path /path/to/media --database-url ./dat ### Core Components **Layered Architecture:** -- **HTTP Layer** (`main.rs`): Route handlers for images, videos, metadata, tags, favorites, memories +- **Startup wiring** (`main.rs`): only ~350 lines — env load, migrations, AppState, route registration, server bind. Background jobs are kicked off here but defined elsewhere. +- **HTTP Layer** (`handlers/{image,video,favorites}.rs`, `files.rs`, `tags.rs`, `faces.rs`, `memories.rs`, `ai/handlers.rs`): the route handlers, grouped by domain. +- **Background loops** (`watcher.rs`): the file-watcher tick (`watch_files`, `process_new_files`) and the orphaned-playlist cleanup (`cleanup_orphaned_playlists`). Per-tick drains are factored into `backfill.rs` (`backfill_unhashed_backlog`, `backfill_missing_date_taken`, `backfill_missing_content_hashes`, `process_face_backlog`, `build_face_candidates`). +- **Thumbnails** (`thumbnails.rs`): generation pipeline + the `IMAGE_GAUGE` / `VIDEO_GAUGE` Prometheus metrics. - **Auth Layer** (`auth.rs`): JWT token validation, Claims extraction via FromRequest trait - **Service Layer** (`files.rs`, `exif.rs`, `memories.rs`): Business logic for file operations and EXIF extraction - **DAO Layer** (`database/mod.rs`): Trait-based data access (ExifDao, UserDao, FavoriteDao, TagDao) @@ -392,8 +395,8 @@ under 2021, not 2014 — on the theory that EXIF is more reliable than import-named filenames. The reverse case (no EXIF, filename has a date) is unchanged. -The `backfill_missing_date_taken` drain (`src/main.rs`) runs every -watcher tick alongside `backfill_unhashed_backlog`. It loads up to +The `backfill_missing_date_taken` drain (`src/backfill.rs`) runs every +watcher tick alongside `backfill_unhashed_backlog` (also `src/backfill.rs`). It loads up to `DATE_BACKFILL_MAX_PER_TICK` rows (default 500) where `date_taken IS NULL OR date_taken_source = 'fs_time'` (backed by the `idx_image_exif_date_backfill` partial index), runs the waterfall @@ -504,9 +507,9 @@ ImageApi owns the face data; Apollo (sibling repo) hosts the insightface inferen **Why content_hash and not (library_id, rel_path):** ties face data to the bytes, not the path. A backup mount that copies files from the primary library naturally inherits the existing detections without re-running inference. This is the reference implementation of the multi-library data model — see "Multi-library data model" above. -**File-watch hook** (`src/main.rs::process_new_files`): for each photo with a populated `content_hash`, check `FaceDao::already_scanned(hash)`; if not, send bytes (or embedded JPEG preview for RAW via `exif::extract_embedded_jpeg_preview`) to Apollo's `/api/internal/faces/detect`. K=`FACE_DETECT_CONCURRENCY` (default 8) parallel calls per scan tick; Apollo serializes them via its single-worker GPU pool. `face_watch.rs` is the Tokio orchestration layer. +**File-watch hook** (`src/watcher.rs::process_new_files`): for each photo with a populated `content_hash`, check `FaceDao::already_scanned(hash)`; if not, send bytes (or embedded JPEG preview for RAW via `exif::extract_embedded_jpeg_preview`) to Apollo's `/api/internal/faces/detect`. K=`FACE_DETECT_CONCURRENCY` (default 8) parallel calls per scan tick; Apollo serializes them via its single-worker GPU pool. `face_watch.rs` is the Tokio orchestration layer. -**Per-tick backlog drain** (also `src/main.rs`): two passes that run on every watcher tick regardless of quick-vs-full scan: +**Per-tick backlog drain** (`src/backfill.rs`): two passes that run on every watcher tick regardless of quick-vs-full scan: - `backfill_unhashed_backlog` — populates `image_exif.content_hash` for photos that arrived before the hash field was retroactive. Capped by `FACE_HASH_BACKFILL_MAX_PER_TICK` (default 2000); errors don't burn the cap. - `process_face_backlog` — runs detection on photos that have a hash but no `face_detections` row. Capped by `FACE_BACKLOG_MAX_PER_TICK` (default 64). Selected via a SQL anti-join (`FaceDao::list_unscanned_candidates`); videos and EXCLUDED_DIRS paths filtered out client-side via `face_watch::filter_excluded` so they never reach Apollo. @@ -521,6 +524,8 @@ ImageApi owns the face data; Apollo (sibling repo) hosts the insightface inferen Module map: - `src/faces.rs` — `FaceDao` trait + `SqliteFaceDao` impl, route handlers for `/faces/*`, `/image/faces/*`, `/persons/*`. Mirror of `tags.rs` layout. - `src/face_watch.rs` — Tokio orchestration for the file-watch detect pass; `filter_excluded` (PathExcluder + image-extension filter), `read_image_bytes_for_detect` (RAW preview fallback). +- `src/backfill.rs` — per-tick drains (unhashed-hash, date_taken, face-backlog, etc.) called from `watcher::watch_files` and `watcher::process_new_files`. +- `src/watcher.rs` — the watcher loop itself and `process_new_files` (file walk → EXIF write → face-candidate build). - `src/ai/face_client.rs` — HTTP client for Apollo's inference. Configured by `APOLLO_FACE_API_BASE_URL`, falls back to `APOLLO_API_BASE_URL`. Both unset → feature disabled, file-watch hook is a no-op. - `migrations/2026-04-29-000000_add_faces/` — schema. diff --git a/src/main.rs b/src/main.rs index 8d1836e..335527b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,38 +4,30 @@ extern crate diesel; extern crate rayon; -use actix::Addr; use actix_web::web::Data; use actix_web_prom::PrometheusMetricsBuilder; use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::env; -use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; -use std::time::{Duration, SystemTime}; -use walkdir::WalkDir; use actix_cors::Cors; use actix_governor::{Governor, GovernorConfigBuilder}; use actix_multipart as mp; use actix_web::{App, HttpResponse, HttpServer, middleware, web}; -use chrono::Utc; use diesel::sqlite::Sqlite; use std::error::Error; use crate::ai::InsightGenerator; use crate::auth::login; use crate::data::*; -use crate::database::models::InsertImageExif; use crate::database::*; use crate::files::{RealFileSystem, move_file}; use crate::service::ServiceBuilder; use crate::state::AppState; use crate::tags::*; -use crate::video::actors::{ - GeneratePreviewClipMessage, QueueVideosMessage, ScanDirectoryMessage, VideoPlaylistManager, -}; -use log::{debug, error, info, warn}; +use crate::video::actors::ScanDirectoryMessage; +use log::{error, info}; mod ai; mod auth; @@ -62,6 +54,7 @@ mod tags; mod thumbnails; mod utils; mod video; +mod watcher; mod knowledge; mod memories; @@ -135,7 +128,7 @@ fn main() -> std::io::Result<()> { // Start file watcher with playlist manager and preview generator let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone(); let preview_gen_for_watcher = app_state.preview_clip_generator.as_ref().clone(); - watch_files( + watcher::watch_files( app_state.libraries.clone(), playlist_mgr_for_watcher, preview_gen_for_watcher, @@ -148,7 +141,7 @@ fn main() -> std::io::Result<()> { // every configured library when looking for the source video, and // skips the whole cycle while any library is stale (a missing // source is indistinguishable from a transiently-unmounted share). - cleanup_orphaned_playlists( + watcher::cleanup_orphaned_playlists( app_state.libraries.clone(), app_state.excluded_dirs.clone(), app_state.library_health.clone(), @@ -350,853 +343,3 @@ fn run_migrations( Ok(()) } - -/// Clean up orphaned HLS playlists and segments whose source videos no longer exist -fn cleanup_orphaned_playlists( - libs: Vec, - excluded_dirs: Vec, - library_health: libraries::LibraryHealthMap, -) { - std::thread::spawn(move || { - let video_path = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); - - // Get cleanup interval from environment (default: 24 hours) - let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS") - .ok() - .and_then(|s| s.parse::().ok()) - .unwrap_or(86400); // 24 hours - - info!("Starting orphaned playlist cleanup job"); - info!(" Cleanup interval: {} seconds", cleanup_interval_secs); - info!(" Playlist directory: {}", video_path); - for lib in &libs { - info!( - " Checking sources under '{}' at {}", - lib.name, lib.root_path - ); - } - - loop { - std::thread::sleep(Duration::from_secs(cleanup_interval_secs)); - - // Safety gate: skip the cleanup cycle if any library is - // stale. A missing source video on a stale library is - // indistinguishable from a transient unmount, and the - // cleanup is destructive — we'd rather leak a few playlist - // files for a tick than delete one whose source is briefly - // unreachable. The cycle re-runs on the next interval. - { - let guard = library_health.read().unwrap_or_else(|e| e.into_inner()); - let stale: Vec = libs - .iter() - .filter(|lib| guard.get(&lib.id).map(|h| !h.is_online()).unwrap_or(false)) - .map(|lib| lib.name.clone()) - .collect(); - if !stale.is_empty() { - warn!( - "Skipping orphaned-playlist cleanup: {} library(ies) stale: [{}]", - stale.len(), - stale.join(", ") - ); - continue; - } - } - - info!("Running orphaned playlist cleanup"); - let start = std::time::Instant::now(); - let mut deleted_count = 0; - let mut error_count = 0; - - // Find all .m3u8 files in VIDEO_PATH - let playlists: Vec = WalkDir::new(&video_path) - .into_iter() - .filter_map(|e| e.ok()) - .filter(|e| e.file_type().is_file()) - .filter(|e| { - e.path() - .extension() - .and_then(|s| s.to_str()) - .map(|ext| ext.eq_ignore_ascii_case("m3u8")) - .unwrap_or(false) - }) - .map(|e| e.path().to_path_buf()) - .collect(); - - info!("Found {} playlist files to check", playlists.len()); - - for playlist_path in playlists { - // Extract the original video filename from playlist name - // Playlist format: {VIDEO_PATH}/{original_filename}.m3u8 - if let Some(filename) = playlist_path.file_stem() { - let video_filename = filename.to_string_lossy(); - - // Search for this video file across every configured - // library, respecting EXCLUDED_DIRS so we don't - // false-resurrect playlists for videos that only - // exist inside an excluded subtree. As soon as one - // library has a matching source, we're done — the - // playlist isn't orphaned. - let mut video_exists = false; - 'libs: for lib in &libs { - let effective = lib.effective_excluded_dirs(&excluded_dirs); - for entry in image_api::file_scan::walk_library_files( - Path::new(&lib.root_path), - &effective, - ) { - if let Some(entry_stem) = entry.path().file_stem() - && entry_stem == filename - && file_types::is_video_file(entry.path()) - { - video_exists = true; - break 'libs; - } - } - } - - if !video_exists { - debug!( - "Source video for playlist {} no longer exists, deleting", - playlist_path.display() - ); - - // Delete the playlist file - if let Err(e) = std::fs::remove_file(&playlist_path) { - warn!( - "Failed to delete playlist {}: {}", - playlist_path.display(), - e - ); - error_count += 1; - } else { - deleted_count += 1; - - // Also try to delete associated .ts segment files - // They are typically named {filename}N.ts in the same directory - if let Some(parent_dir) = playlist_path.parent() { - for entry in WalkDir::new(parent_dir) - .max_depth(1) - .into_iter() - .filter_map(|e| e.ok()) - .filter(|e| e.file_type().is_file()) - { - let entry_path = entry.path(); - if let Some(ext) = entry_path.extension() - && ext.eq_ignore_ascii_case("ts") - { - // Check if this .ts file belongs to our playlist - if let Some(ts_stem) = entry_path.file_stem() { - let ts_name = ts_stem.to_string_lossy(); - if ts_name.starts_with(&*video_filename) { - if let Err(e) = std::fs::remove_file(entry_path) { - debug!( - "Failed to delete segment {}: {}", - entry_path.display(), - e - ); - } else { - debug!( - "Deleted segment: {}", - entry_path.display() - ); - } - } - } - } - } - } - } - } - } - } - - info!( - "Orphaned playlist cleanup completed in {:?}: deleted {} playlists, {} errors", - start.elapsed(), - deleted_count, - error_count - ); - } - }); -} - -fn watch_files( - libs: Vec, - playlist_manager: Addr, - preview_generator: Addr, - face_client: crate::ai::face_client::FaceClient, - excluded_dirs: Vec, - library_health: libraries::LibraryHealthMap, -) { - std::thread::spawn(move || { - // Get polling intervals from environment variables - // Quick scan: Check recently modified files (default: 60 seconds) - let quick_interval_secs = dotenv::var("WATCH_QUICK_INTERVAL_SECONDS") - .ok() - .and_then(|s| s.parse::().ok()) - .unwrap_or(60); - - // Full scan: Check all files regardless of modification time (default: 3600 seconds = 1 hour) - let full_interval_secs = dotenv::var("WATCH_FULL_INTERVAL_SECONDS") - .ok() - .and_then(|s| s.parse::().ok()) - .unwrap_or(3600); - - info!("Starting optimized file watcher"); - info!(" Quick scan interval: {} seconds", quick_interval_secs); - info!(" Full scan interval: {} seconds", full_interval_secs); - // Surface face-detection state at boot so it's obvious whether - // the watcher will hit Apollo. The branch silently no-ops when - // disabled (intentional for legacy deploys), which makes "why - // aren't faces being detected?" hard to diagnose otherwise. - if face_client.is_enabled() { - info!(" Face detection: ENABLED"); - } else { - info!( - " Face detection: DISABLED (set APOLLO_FACE_API_BASE_URL \ - or APOLLO_API_BASE_URL to enable)" - ); - } - for lib in &libs { - info!( - " Watching library '{}' (id={}) at {}", - lib.name, lib.id, lib.root_path - ); - } - - // Create DAOs for tracking processed files - let exif_dao = Arc::new(Mutex::new( - Box::new(SqliteExifDao::new()) as Box - )); - let preview_dao = Arc::new(Mutex::new( - Box::new(SqlitePreviewDao::new()) as Box - )); - let face_dao = Arc::new(Mutex::new( - Box::new(faces::SqliteFaceDao::new()) as Box - )); - // tag_dao for the watcher's auto-bind path. Independent of the - // request-handler tag_dao instance — both end up pointing at the - // same SQLite file via SqliteTagDao::default(). - let watcher_tag_dao = Arc::new(Mutex::new( - Box::new(SqliteTagDao::default()) as Box - )); - - let mut last_quick_scan = SystemTime::now(); - let mut last_full_scan = SystemTime::now(); - let mut scan_count = 0u64; - - // Per-library cursor for the missing-file scan. Each tick reads - // a page from `offset`, stat()s the rows, deletes confirmed- - // missing ones, and advances or wraps the cursor. State held - // in-memory so a watcher restart resumes from 0 — fine, the - // sweep is idempotent. - let mut missing_file_offsets: std::collections::HashMap = - std::collections::HashMap::new(); - - let missing_scan_page_size: i64 = dotenv::var("IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE") - .ok() - .and_then(|s| s.parse().ok()) - .filter(|n: &i64| *n > 0) - .unwrap_or(library_maintenance::DEFAULT_SCAN_PAGE_SIZE); - let missing_delete_cap: usize = dotenv::var("IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK") - .ok() - .and_then(|s| s.parse().ok()) - .filter(|n: &usize| *n > 0) - .unwrap_or(library_maintenance::DEFAULT_MISSING_DELETE_CAP); - - // Two-tick orphan-GC consensus state. Carried across ticks via - // `OrphanGcState`; see library_maintenance::run_orphan_gc. - let mut orphan_gc_state = library_maintenance::OrphanGcState::default(); - - // Initial availability sweep before the loop's first sleep so - // /libraries reports the truth from the very first request, - // rather than the optimistic Online default that - // new_health_map seeds. Without this, an unmounted share would - // appear online for up to WATCH_QUICK_INTERVAL_SECONDS (default - // 60s) after boot. Same probe logic as the per-tick gate - // below; no ingest runs here, just the health update + log. - // Disabled libraries skip the probe entirely — they should - // never enter the health map (treated as out-of-scope). - for lib in &libs { - if !lib.enabled { - continue; - } - let context = opentelemetry::Context::new(); - let had_data = exif_dao - .lock() - .expect("exif_dao poisoned") - .count_for_library(&context, lib.id) - .map(|n| n > 0) - .unwrap_or(false); - libraries::refresh_health(&library_health, lib, had_data); - } - - loop { - std::thread::sleep(Duration::from_secs(quick_interval_secs)); - - let now = SystemTime::now(); - let since_last_full = now - .duration_since(last_full_scan) - .unwrap_or(Duration::from_secs(0)); - - let is_full_scan = since_last_full.as_secs() >= full_interval_secs; - - for lib in &libs { - // Operator kill switch: a disabled library is invisible - // to the watcher entirely. No probe, no ingest, no - // maintenance, no health entry. Distinct from Stale — - // Stale is "we wanted to but couldn't"; Disabled is - // "we don't want to". Toggle via SQL. - if !lib.enabled { - debug!( - "watcher: skipping library '{}' (id={}) — enabled=false", - lib.name, lib.id - ); - continue; - } - - // Availability probe: every tick checks that the - // library's mount is reachable, is a directory, is - // readable, and (if image_exif has rows for it) is - // non-empty. A Stale library skips ingest, backlog - // drains, and metric refresh — reads/serving in HTTP - // handlers continue to work. Branches B/C extend the - // probe gate to cover handoff and orphan GC. See - // CLAUDE.md "Library availability and safety". - let had_data = { - let context = opentelemetry::Context::new(); - let mut guard = exif_dao.lock().expect("exif_dao poisoned"); - guard - .count_for_library(&context, lib.id) - .map(|n| n > 0) - .unwrap_or(false) - }; - let health = libraries::refresh_health(&library_health, lib, had_data); - if !health.is_online() { - // Skip every write path for this library this tick. - // Don't refresh the media-count gauge either — a - // probe-failed library would otherwise flap to 0 - // image / 0 video and pollute Prometheus. - continue; - } - - // Drain the unhashed-hash backlog AND the face-detection - // backlog every tick, regardless of quick/full. Quick - // scans only walk recently-modified files, so the - // pre-Phase-3 backlog never enters their candidate set - // — without these standalone passes, backfill + - // detection only progressed during full scans - // (default once an hour). - // Effective excludes for this library: global env-var - // ∪ row's excluded_dirs. Compute once per tick — used - // by every walker below for this library. - let effective_excludes = lib.effective_excluded_dirs(&excluded_dirs); - - if face_client.is_enabled() { - let context = opentelemetry::Context::new(); - backfill::backfill_unhashed_backlog(&context, lib, &exif_dao); - backfill::process_face_backlog( - &context, - lib, - &face_client, - &face_dao, - &watcher_tag_dao, - &effective_excludes, - ); - } - - // Date-taken backfill: drain rows whose canonical date is - // either unresolved or only fs_time-sourced. Independent - // of face detection — runs even on deploys that don't - // configure Apollo, since `/memories` depends on it. - { - let context = opentelemetry::Context::new(); - backfill::backfill_missing_date_taken(&context, lib, &exif_dao); - } - - if is_full_scan { - info!( - "Running full scan for library '{}' (scan #{})", - lib.name, scan_count - ); - process_new_files( - lib, - Arc::clone(&exif_dao), - Arc::clone(&preview_dao), - Arc::clone(&face_dao), - Arc::clone(&watcher_tag_dao), - face_client.clone(), - &effective_excludes, - None, - playlist_manager.clone(), - preview_generator.clone(), - ); - } else { - debug!( - "Running quick scan for library '{}' (checking files modified in last {} seconds)", - lib.name, - quick_interval_secs + 10 - ); - let check_since = last_quick_scan - .checked_sub(Duration::from_secs(10)) - .unwrap_or(last_quick_scan); - process_new_files( - lib, - Arc::clone(&exif_dao), - Arc::clone(&preview_dao), - Arc::clone(&face_dao), - Arc::clone(&watcher_tag_dao), - face_client.clone(), - &effective_excludes, - Some(check_since), - playlist_manager.clone(), - preview_generator.clone(), - ); - } - - // Update media counts per library (metric aggregates across all) - thumbnails::update_media_counts(Path::new(&lib.root_path), &effective_excludes); - - // Missing-file detection: prune image_exif rows whose - // source file is no longer on disk. Per-library, so we - // pass library-online-this-tick implicitly (we only - // reach here if the probe gate at the top of the - // iteration passed). Capped + paginated so a huge - // library doesn't stall the watcher; rows we don't - // visit this tick get visited next tick. See - // library_maintenance::detect_missing_files_for_library. - { - let context = opentelemetry::Context::new(); - let offset = missing_file_offsets.get(&lib.id).copied().unwrap_or(0); - let (deleted, next_offset) = - library_maintenance::detect_missing_files_for_library( - &context, - lib, - &exif_dao, - offset, - missing_scan_page_size, - missing_delete_cap, - ); - missing_file_offsets.insert(lib.id, next_offset); - if deleted > 0 { - debug!( - "missing-file scan: library '{}' next_offset={}", - lib.name, next_offset - ); - } - } - } - - // Reconciliation: cross-library, so it runs once per tick - // outside the per-library loop. Idempotent — fast no-op when - // there's nothing to do. Operates on the database alone, no - // filesystem dependency, so it doesn't need a health gate. - // See database::reconcile and CLAUDE.md "Multi-library data - // model" for the rules. - { - let mut conn = image_api::database::connect(); - let _ = image_api::database::reconcile::run(&mut conn); - - // Back-ref refresh: hash-keyed rows whose - // (library_id, rel_path) tuple no longer matches any - // image_exif row but whose hash still does. After a - // recent→archive move, the missing-file scan removes - // the old image_exif row; this pass repoints face / - // tag / insight back-refs at the surviving location. - // DB-only, no health gate needed — uses what's in - // image_exif as truth. - let _ = library_maintenance::refresh_back_refs(&mut conn); - - // Orphan GC: the destructive end of the maintenance - // pipeline. Two-tick consensus + every-library-online - // requirement is enforced inside run_orphan_gc; we - // pass the current all-online flag and the function - // tracks the previous tick's flag in OrphanGcState. - let all_online = library_maintenance::all_libraries_online(&libs, &library_health); - let _ = - library_maintenance::run_orphan_gc(&mut conn, &mut orphan_gc_state, all_online); - } - - if is_full_scan { - last_full_scan = now; - } - last_quick_scan = now; - scan_count += 1; - } - }); -} - -/// Check if a playlist needs to be (re)generated -/// Returns true if: -/// - Playlist doesn't exist, OR -/// - Source video is newer than the playlist -fn playlist_needs_generation(video_path: &Path, playlist_path: &Path) -> bool { - if !playlist_path.exists() { - return true; - } - - // Check if source video is newer than playlist - if let (Ok(video_meta), Ok(playlist_meta)) = ( - std::fs::metadata(video_path), - std::fs::metadata(playlist_path), - ) && let (Ok(video_modified), Ok(playlist_modified)) = - (video_meta.modified(), playlist_meta.modified()) - { - return video_modified > playlist_modified; - } - - // If we can't determine, assume it needs generation - true -} - -fn process_new_files( - library: &libraries::Library, - exif_dao: Arc>>, - preview_dao: Arc>>, - face_dao: Arc>>, - tag_dao: Arc>>, - face_client: crate::ai::face_client::FaceClient, - excluded_dirs: &[String], - modified_since: Option, - playlist_manager: Addr, - preview_generator: Addr, -) { - let context = opentelemetry::Context::new(); - let thumbs = dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined"); - let thumbnail_directory = Path::new(&thumbs); - let base_path = Path::new(&library.root_path); - - // Walk, prune EXCLUDED_DIRS subtrees, and apply image/video + modified_since - // filters. See `file_scan` for why exclusion has to happen at WalkDir - // time (filter_entry) rather than at face-detect time. - let files: Vec<(PathBuf, String)> = - image_api::file_scan::enumerate_indexable_files(base_path, excluded_dirs, modified_since); - - if files.is_empty() { - debug!("No files to process"); - return; - } - - debug!("Found {} files to check", files.len()); - - // Batch query: Get all EXIF data for these files in one query - let file_paths: Vec = files.iter().map(|(_, rel_path)| rel_path.clone()).collect(); - - let existing_exif_paths: HashMap = { - let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); - // Walk is per-library, so scope the lookup so a same-named file - // in another library doesn't make this one look already-indexed. - match dao.get_exif_batch(&context, Some(library.id), &file_paths) { - Ok(exif_records) => exif_records - .into_iter() - .map(|record| (record.file_path, true)) - .collect(), - Err(e) => { - error!("Error batch querying EXIF data: {:?}", e); - HashMap::new() - } - } - }; - - let mut new_files_found = false; - let mut files_needing_row = Vec::new(); - - // Register every image/video file in image_exif. Rows without EXIF - // still carry library_id, rel_path, content_hash, and size_bytes so - // derivative dedup and DB-indexed sort/filter work for every file, - // not just photos with parseable EXIF. - for (file_path, relative_path) in &files { - // Check both the library-scoped legacy path (current shape) and - // the bare-legacy path (pre-multi-library shape). Either one - // existing means a thumbnail is already on disk for this file. - let scoped_thumb_path = content_hash::library_scoped_legacy_path( - thumbnail_directory, - library.id, - relative_path, - ); - let bare_legacy_thumb_path = thumbnail_directory.join(relative_path); - let needs_thumbnail = !scoped_thumb_path.exists() - && !bare_legacy_thumb_path.exists() - && !thumbnails::unsupported_thumbnail_sentinel(&scoped_thumb_path).exists() - && !thumbnails::unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists(); - let needs_row = !existing_exif_paths.contains_key(relative_path); - - if needs_thumbnail || needs_row { - new_files_found = true; - - if needs_thumbnail { - info!("New file detected (missing thumbnail): {}", relative_path); - } - - if needs_row { - files_needing_row.push((file_path.clone(), relative_path.clone())); - } - } - } - - if !files_needing_row.is_empty() { - info!( - "Registering {} new files in image_exif", - files_needing_row.len() - ); - - for (file_path, relative_path) in files_needing_row { - let timestamp = Utc::now().timestamp(); - - // Hash + size from filesystem metadata — always attempted so - // every file gets a content_hash, even when EXIF is absent. - let (content_hash, size_bytes) = match content_hash::compute(&file_path) { - Ok(id) => (Some(id.content_hash), Some(id.size_bytes)), - Err(e) => { - warn!("Failed to hash {}: {:?}", file_path.display(), e); - (None, None) - } - }; - - // Perceptual hashes (pHash + dHash). Best-effort — None for - // videos and decode failures. Drives near-duplicate detection - // in the Apollo duplicates surface; failure here is non-fatal - // and never blocks indexing. - let perceptual = perceptual_hash::compute(&file_path); - - // EXIF is best-effort enrichment. When extraction fails (or the - // file type doesn't support EXIF) we still store a row with all - // EXIF fields NULL; the file remains visible to sort-by-date - // and tag queries via its rel_path and filesystem timestamps. - let exif_fields = if exif::supports_exif(&file_path) { - match exif::extract_exif_from_path(&file_path) { - Ok(data) => Some(data), - Err(e) => { - debug!( - "No EXIF or parse error for {}: {:?}", - file_path.display(), - e - ); - None - } - } - } else { - None - }; - - // Canonical date_taken via the waterfall — kamadak-exif (already - // computed above) → exiftool fallback for videos / MakerNote / - // QuickTime → filename regex → earliest_fs_time. Source is - // recorded so the per-tick backfill drain can re-run weak - // resolutions later. - let resolved_date = date_resolver::resolve_date_taken( - &file_path, - exif_fields.as_ref().and_then(|e| e.date_taken), - ); - - let insert_exif = InsertImageExif { - library_id: library.id, - file_path: relative_path.clone(), - camera_make: exif_fields.as_ref().and_then(|e| e.camera_make.clone()), - camera_model: exif_fields.as_ref().and_then(|e| e.camera_model.clone()), - lens_model: exif_fields.as_ref().and_then(|e| e.lens_model.clone()), - width: exif_fields.as_ref().and_then(|e| e.width), - height: exif_fields.as_ref().and_then(|e| e.height), - orientation: exif_fields.as_ref().and_then(|e| e.orientation), - gps_latitude: exif_fields - .as_ref() - .and_then(|e| e.gps_latitude.map(|v| v as f32)), - gps_longitude: exif_fields - .as_ref() - .and_then(|e| e.gps_longitude.map(|v| v as f32)), - gps_altitude: exif_fields - .as_ref() - .and_then(|e| e.gps_altitude.map(|v| v as f32)), - focal_length: exif_fields - .as_ref() - .and_then(|e| e.focal_length.map(|v| v as f32)), - aperture: exif_fields - .as_ref() - .and_then(|e| e.aperture.map(|v| v as f32)), - shutter_speed: exif_fields.as_ref().and_then(|e| e.shutter_speed.clone()), - iso: exif_fields.as_ref().and_then(|e| e.iso), - date_taken: resolved_date.map(|r| r.timestamp), - created_time: timestamp, - last_modified: timestamp, - content_hash, - size_bytes, - phash_64: perceptual.map(|h| h.phash_64), - dhash_64: perceptual.map(|h| h.dhash_64), - date_taken_source: resolved_date.map(|r| r.source.as_str().to_string()), - }; - - let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); - if let Err(e) = dao.store_exif(&context, insert_exif) { - error!( - "Failed to register {} in image_exif: {:?}", - relative_path, e - ); - } else { - debug!("Registered {} in image_exif", relative_path); - } - } - } - - // ── Face detection pass ──────────────────────────────────────────── - // Run after EXIF writes so newly-registered files have their - // content_hash populated. Skipped wholesale when face_client is - // disabled (no Apollo integration configured) — Phase 3 wires this - // up; the watcher remains usable on legacy deploys. - if face_client.is_enabled() { - // Opportunistic content_hash backfill: photos indexed before - // content-hashing landed (or where the hash compute failed - // silently on insert) end up in image_exif with NULL - // content_hash. build_face_candidates keys on content_hash, so - // those files would never become candidates without backfill. - // Idempotent — subsequent scans see the populated hashes and - // no-op. The dedicated `backfill_hashes` binary is still the - // right tool for very large legacy libraries; this branch - // ensures small/medium deploys self-heal without operator - // action. - backfill::backfill_missing_content_hashes(&context, &files, library, &exif_dao); - let candidates = - backfill::build_face_candidates(&context, library, &files, &exif_dao, &face_dao); - debug!( - "face_watch: scan tick — {} image file(s) walked, {} candidate(s) (library '{}', modified_since={})", - files - .iter() - .filter(|(p, _)| !file_types::is_video_file(p)) - .count(), - candidates.len(), - library.name, - modified_since.is_some(), - ); - if !candidates.is_empty() { - face_watch::run_face_detection_pass( - library, - excluded_dirs, - &face_client, - Arc::clone(&face_dao), - Arc::clone(&tag_dao), - candidates, - ); - } - } - - // Check for videos that need HLS playlists - let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); - let mut videos_needing_playlists = Vec::new(); - - for (file_path, _relative_path) in &files { - if file_types::is_video_file(file_path) { - // Construct expected playlist path - let playlist_filename = - format!("{}.m3u8", file_path.file_name().unwrap().to_string_lossy()); - let playlist_path = Path::new(&video_path_base).join(&playlist_filename); - - // Check if playlist needs (re)generation - if playlist_needs_generation(file_path, &playlist_path) { - videos_needing_playlists.push(file_path.clone()); - } - } - } - - // Send queue request to playlist manager - if !videos_needing_playlists.is_empty() { - playlist_manager.do_send(QueueVideosMessage { - video_paths: videos_needing_playlists, - }); - } - - // Check for videos that need preview clips - // Collect (full_path, relative_path) for video files - let video_files: Vec<(String, String)> = files - .iter() - .filter(|(file_path, _)| file_types::is_video_file(file_path)) - .map(|(file_path, rel_path)| (file_path.to_string_lossy().to_string(), rel_path.clone())) - .collect(); - - if !video_files.is_empty() { - // Query DB using relative paths (consistent with how GET/POST handlers store them) - let video_rel_paths: Vec = video_files.iter().map(|(_, rel)| rel.clone()).collect(); - - let existing_previews: HashMap = { - let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); - match dao.get_previews_batch(&context, &video_rel_paths) { - Ok(clips) => clips - .into_iter() - .map(|clip| (clip.file_path, clip.status)) - .collect(), - Err(e) => { - error!("Error batch querying preview clips: {:?}", e); - HashMap::new() - } - } - }; - - for (full_path, relative_path) in &video_files { - let status = existing_previews.get(relative_path).map(|s| s.as_str()); - let needs_preview = match status { - None => true, // No record at all - Some("failed") => true, // Retry failed - Some("pending") => true, // Stale pending from previous run - _ => false, // processing or complete - }; - - if needs_preview { - // Insert pending record using relative path - if status.is_none() { - let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); - let _ = dao.insert_preview(&context, relative_path, "pending"); - } - - // Send full path in the message — the actor will derive relative path from it - preview_generator.do_send(GeneratePreviewClipMessage { - video_path: full_path.clone(), - }); - } - } - } - - // Generate thumbnails for all files that need them - if new_files_found { - info!("Processing thumbnails for new files..."); - thumbnails::create_thumbnails(std::slice::from_ref(library), excluded_dirs); - } - - // Reconciliation: on a full scan, prune image_exif rows whose rel_path no - // longer exists on disk for this library. Keeps the DB in parity so - // downstream DB-backed listings (e.g. recursive /photos) don't return - // phantom files. Skipped on quick scans — those only look at recently - // modified files and can't distinguish "missing" from "unchanged". - if modified_since.is_none() { - let disk_paths: HashSet = files.iter().map(|(_, rel)| rel.clone()).collect(); - let db_paths: Vec = { - let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); - dao.get_rel_paths_for_library(&context, library.id) - .unwrap_or_else(|e| { - error!( - "Reconciliation: failed to load image_exif rel_paths for lib {}: {:?}", - library.id, e - ); - Vec::new() - }) - }; - - let stale: Vec = db_paths - .into_iter() - .filter(|p| !disk_paths.contains(p)) - .collect(); - - if !stale.is_empty() { - info!( - "Reconciliation: pruning {} stale image_exif rows for library '{}'", - stale.len(), - library.name - ); - let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); - for rel in &stale { - if let Err(e) = dao.delete_exif_by_library(&context, library.id, rel) { - warn!( - "Reconciliation: failed to delete {} (lib {}): {:?}", - rel, library.id, e - ); - } - } - } - } -} diff --git a/src/watcher.rs b/src/watcher.rs new file mode 100644 index 0000000..4528ca8 --- /dev/null +++ b/src/watcher.rs @@ -0,0 +1,951 @@ +//! Background file-watcher loop + the orphaned-playlist cleanup job. +//! +//! `watch_files` spins a thread that, on every tick (default 60 s +//! quick-scan / 3600 s full-scan), probes each library's availability, +//! drains the unhashed / date / face-detection backlogs via +//! [`crate::backfill`], walks newly-modified files through +//! [`process_new_files`], updates the media-count gauges, and runs the +//! three-stage maintenance pipeline (missing-file scan → back-ref +//! refresh → orphan GC). +//! +//! `cleanup_orphaned_playlists` runs on a slower interval (default 24 +//! hours) and reaps HLS playlists whose source videos no longer exist +//! in any library. Both jobs respect [`crate::libraries::LibraryHealthMap`] +//! — a stale library skips destructive paths so transient unmounts +//! don't trigger data loss. + +use std::collections::{HashMap, HashSet}; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime}; + +use actix::Addr; +use chrono::Utc; +use log::{debug, error, info, warn}; +use walkdir::WalkDir; + +use crate::backfill; +use crate::content_hash; +use crate::database::models::InsertImageExif; +use crate::database::{ExifDao, PreviewDao, SqliteExifDao, SqlitePreviewDao}; +use crate::date_resolver; +use crate::exif; +use crate::face_watch; +use crate::faces; +use crate::file_types; +use crate::libraries; +use crate::library_maintenance; +use crate::perceptual_hash; +use crate::tags; +use crate::tags::SqliteTagDao; +use crate::thumbnails; +use crate::video; +use crate::video::actors::{GeneratePreviewClipMessage, QueueVideosMessage, VideoPlaylistManager}; + +/// Clean up orphaned HLS playlists and segments whose source videos no longer exist +pub fn cleanup_orphaned_playlists( + libs: Vec, + excluded_dirs: Vec, + library_health: libraries::LibraryHealthMap, +) { + std::thread::spawn(move || { + let video_path = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); + + // Get cleanup interval from environment (default: 24 hours) + let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(86400); // 24 hours + + info!("Starting orphaned playlist cleanup job"); + info!(" Cleanup interval: {} seconds", cleanup_interval_secs); + info!(" Playlist directory: {}", video_path); + for lib in &libs { + info!( + " Checking sources under '{}' at {}", + lib.name, lib.root_path + ); + } + + loop { + std::thread::sleep(Duration::from_secs(cleanup_interval_secs)); + + // Safety gate: skip the cleanup cycle if any library is + // stale. A missing source video on a stale library is + // indistinguishable from a transient unmount, and the + // cleanup is destructive — we'd rather leak a few playlist + // files for a tick than delete one whose source is briefly + // unreachable. The cycle re-runs on the next interval. + { + let guard = library_health.read().unwrap_or_else(|e| e.into_inner()); + let stale: Vec = libs + .iter() + .filter(|lib| guard.get(&lib.id).map(|h| !h.is_online()).unwrap_or(false)) + .map(|lib| lib.name.clone()) + .collect(); + if !stale.is_empty() { + warn!( + "Skipping orphaned-playlist cleanup: {} library(ies) stale: [{}]", + stale.len(), + stale.join(", ") + ); + continue; + } + } + + info!("Running orphaned playlist cleanup"); + let start = std::time::Instant::now(); + let mut deleted_count = 0; + let mut error_count = 0; + + // Find all .m3u8 files in VIDEO_PATH + let playlists: Vec = WalkDir::new(&video_path) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + .filter(|e| { + e.path() + .extension() + .and_then(|s| s.to_str()) + .map(|ext| ext.eq_ignore_ascii_case("m3u8")) + .unwrap_or(false) + }) + .map(|e| e.path().to_path_buf()) + .collect(); + + info!("Found {} playlist files to check", playlists.len()); + + for playlist_path in playlists { + // Extract the original video filename from playlist name + // Playlist format: {VIDEO_PATH}/{original_filename}.m3u8 + if let Some(filename) = playlist_path.file_stem() { + let video_filename = filename.to_string_lossy(); + + // Search for this video file across every configured + // library, respecting EXCLUDED_DIRS so we don't + // false-resurrect playlists for videos that only + // exist inside an excluded subtree. As soon as one + // library has a matching source, we're done — the + // playlist isn't orphaned. + let mut video_exists = false; + 'libs: for lib in &libs { + let effective = lib.effective_excluded_dirs(&excluded_dirs); + for entry in image_api::file_scan::walk_library_files( + Path::new(&lib.root_path), + &effective, + ) { + if let Some(entry_stem) = entry.path().file_stem() + && entry_stem == filename + && file_types::is_video_file(entry.path()) + { + video_exists = true; + break 'libs; + } + } + } + + if !video_exists { + debug!( + "Source video for playlist {} no longer exists, deleting", + playlist_path.display() + ); + + // Delete the playlist file + if let Err(e) = std::fs::remove_file(&playlist_path) { + warn!( + "Failed to delete playlist {}: {}", + playlist_path.display(), + e + ); + error_count += 1; + } else { + deleted_count += 1; + + // Also try to delete associated .ts segment files + // They are typically named {filename}N.ts in the same directory + if let Some(parent_dir) = playlist_path.parent() { + for entry in WalkDir::new(parent_dir) + .max_depth(1) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + { + let entry_path = entry.path(); + if let Some(ext) = entry_path.extension() + && ext.eq_ignore_ascii_case("ts") + { + // Check if this .ts file belongs to our playlist + if let Some(ts_stem) = entry_path.file_stem() { + let ts_name = ts_stem.to_string_lossy(); + if ts_name.starts_with(&*video_filename) { + if let Err(e) = std::fs::remove_file(entry_path) { + debug!( + "Failed to delete segment {}: {}", + entry_path.display(), + e + ); + } else { + debug!( + "Deleted segment: {}", + entry_path.display() + ); + } + } + } + } + } + } + } + } + } + } + + info!( + "Orphaned playlist cleanup completed in {:?}: deleted {} playlists, {} errors", + start.elapsed(), + deleted_count, + error_count + ); + } + }); +} + +pub fn watch_files( + libs: Vec, + playlist_manager: Addr, + preview_generator: Addr, + face_client: crate::ai::face_client::FaceClient, + excluded_dirs: Vec, + library_health: libraries::LibraryHealthMap, +) { + std::thread::spawn(move || { + // Get polling intervals from environment variables + // Quick scan: Check recently modified files (default: 60 seconds) + let quick_interval_secs = dotenv::var("WATCH_QUICK_INTERVAL_SECONDS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(60); + + // Full scan: Check all files regardless of modification time (default: 3600 seconds = 1 hour) + let full_interval_secs = dotenv::var("WATCH_FULL_INTERVAL_SECONDS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(3600); + + info!("Starting optimized file watcher"); + info!(" Quick scan interval: {} seconds", quick_interval_secs); + info!(" Full scan interval: {} seconds", full_interval_secs); + // Surface face-detection state at boot so it's obvious whether + // the watcher will hit Apollo. The branch silently no-ops when + // disabled (intentional for legacy deploys), which makes "why + // aren't faces being detected?" hard to diagnose otherwise. + if face_client.is_enabled() { + info!(" Face detection: ENABLED"); + } else { + info!( + " Face detection: DISABLED (set APOLLO_FACE_API_BASE_URL \ + or APOLLO_API_BASE_URL to enable)" + ); + } + for lib in &libs { + info!( + " Watching library '{}' (id={}) at {}", + lib.name, lib.id, lib.root_path + ); + } + + // Create DAOs for tracking processed files + let exif_dao = Arc::new(Mutex::new( + Box::new(SqliteExifDao::new()) as Box + )); + let preview_dao = Arc::new(Mutex::new( + Box::new(SqlitePreviewDao::new()) as Box + )); + let face_dao = Arc::new(Mutex::new( + Box::new(faces::SqliteFaceDao::new()) as Box + )); + // tag_dao for the watcher's auto-bind path. Independent of the + // request-handler tag_dao instance — both end up pointing at the + // same SQLite file via SqliteTagDao::default(). + let watcher_tag_dao = Arc::new(Mutex::new( + Box::new(SqliteTagDao::default()) as Box + )); + + let mut last_quick_scan = SystemTime::now(); + let mut last_full_scan = SystemTime::now(); + let mut scan_count = 0u64; + + // Per-library cursor for the missing-file scan. Each tick reads + // a page from `offset`, stat()s the rows, deletes confirmed- + // missing ones, and advances or wraps the cursor. State held + // in-memory so a watcher restart resumes from 0 — fine, the + // sweep is idempotent. + let mut missing_file_offsets: HashMap = HashMap::new(); + + let missing_scan_page_size: i64 = dotenv::var("IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &i64| *n > 0) + .unwrap_or(library_maintenance::DEFAULT_SCAN_PAGE_SIZE); + let missing_delete_cap: usize = dotenv::var("IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &usize| *n > 0) + .unwrap_or(library_maintenance::DEFAULT_MISSING_DELETE_CAP); + + // Two-tick orphan-GC consensus state. Carried across ticks via + // `OrphanGcState`; see library_maintenance::run_orphan_gc. + let mut orphan_gc_state = library_maintenance::OrphanGcState::default(); + + // Initial availability sweep before the loop's first sleep so + // /libraries reports the truth from the very first request, + // rather than the optimistic Online default that + // new_health_map seeds. Without this, an unmounted share would + // appear online for up to WATCH_QUICK_INTERVAL_SECONDS (default + // 60s) after boot. Same probe logic as the per-tick gate + // below; no ingest runs here, just the health update + log. + // Disabled libraries skip the probe entirely — they should + // never enter the health map (treated as out-of-scope). + for lib in &libs { + if !lib.enabled { + continue; + } + let context = opentelemetry::Context::new(); + let had_data = exif_dao + .lock() + .expect("exif_dao poisoned") + .count_for_library(&context, lib.id) + .map(|n| n > 0) + .unwrap_or(false); + libraries::refresh_health(&library_health, lib, had_data); + } + + loop { + std::thread::sleep(Duration::from_secs(quick_interval_secs)); + + let now = SystemTime::now(); + let since_last_full = now + .duration_since(last_full_scan) + .unwrap_or(Duration::from_secs(0)); + + let is_full_scan = since_last_full.as_secs() >= full_interval_secs; + + for lib in &libs { + // Operator kill switch: a disabled library is invisible + // to the watcher entirely. No probe, no ingest, no + // maintenance, no health entry. Distinct from Stale — + // Stale is "we wanted to but couldn't"; Disabled is + // "we don't want to". Toggle via SQL. + if !lib.enabled { + debug!( + "watcher: skipping library '{}' (id={}) — enabled=false", + lib.name, lib.id + ); + continue; + } + + // Availability probe: every tick checks that the + // library's mount is reachable, is a directory, is + // readable, and (if image_exif has rows for it) is + // non-empty. A Stale library skips ingest, backlog + // drains, and metric refresh — reads/serving in HTTP + // handlers continue to work. Branches B/C extend the + // probe gate to cover handoff and orphan GC. See + // CLAUDE.md "Library availability and safety". + let had_data = { + let context = opentelemetry::Context::new(); + let mut guard = exif_dao.lock().expect("exif_dao poisoned"); + guard + .count_for_library(&context, lib.id) + .map(|n| n > 0) + .unwrap_or(false) + }; + let health = libraries::refresh_health(&library_health, lib, had_data); + if !health.is_online() { + // Skip every write path for this library this tick. + // Don't refresh the media-count gauge either — a + // probe-failed library would otherwise flap to 0 + // image / 0 video and pollute Prometheus. + continue; + } + + // Drain the unhashed-hash backlog AND the face-detection + // backlog every tick, regardless of quick/full. Quick + // scans only walk recently-modified files, so the + // pre-Phase-3 backlog never enters their candidate set + // — without these standalone passes, backfill + + // detection only progressed during full scans + // (default once an hour). + // Effective excludes for this library: global env-var + // ∪ row's excluded_dirs. Compute once per tick — used + // by every walker below for this library. + let effective_excludes = lib.effective_excluded_dirs(&excluded_dirs); + + if face_client.is_enabled() { + let context = opentelemetry::Context::new(); + backfill::backfill_unhashed_backlog(&context, lib, &exif_dao); + backfill::process_face_backlog( + &context, + lib, + &face_client, + &face_dao, + &watcher_tag_dao, + &effective_excludes, + ); + } + + // Date-taken backfill: drain rows whose canonical date is + // either unresolved or only fs_time-sourced. Independent + // of face detection — runs even on deploys that don't + // configure Apollo, since `/memories` depends on it. + { + let context = opentelemetry::Context::new(); + backfill::backfill_missing_date_taken(&context, lib, &exif_dao); + } + + if is_full_scan { + info!( + "Running full scan for library '{}' (scan #{})", + lib.name, scan_count + ); + process_new_files( + lib, + Arc::clone(&exif_dao), + Arc::clone(&preview_dao), + Arc::clone(&face_dao), + Arc::clone(&watcher_tag_dao), + face_client.clone(), + &effective_excludes, + None, + playlist_manager.clone(), + preview_generator.clone(), + ); + } else { + debug!( + "Running quick scan for library '{}' (checking files modified in last {} seconds)", + lib.name, + quick_interval_secs + 10 + ); + let check_since = last_quick_scan + .checked_sub(Duration::from_secs(10)) + .unwrap_or(last_quick_scan); + process_new_files( + lib, + Arc::clone(&exif_dao), + Arc::clone(&preview_dao), + Arc::clone(&face_dao), + Arc::clone(&watcher_tag_dao), + face_client.clone(), + &effective_excludes, + Some(check_since), + playlist_manager.clone(), + preview_generator.clone(), + ); + } + + // Update media counts per library (metric aggregates across all) + thumbnails::update_media_counts(Path::new(&lib.root_path), &effective_excludes); + + // Missing-file detection: prune image_exif rows whose + // source file is no longer on disk. Per-library, so we + // pass library-online-this-tick implicitly (we only + // reach here if the probe gate at the top of the + // iteration passed). Capped + paginated so a huge + // library doesn't stall the watcher; rows we don't + // visit this tick get visited next tick. See + // library_maintenance::detect_missing_files_for_library. + { + let context = opentelemetry::Context::new(); + let offset = missing_file_offsets.get(&lib.id).copied().unwrap_or(0); + let (deleted, next_offset) = + library_maintenance::detect_missing_files_for_library( + &context, + lib, + &exif_dao, + offset, + missing_scan_page_size, + missing_delete_cap, + ); + missing_file_offsets.insert(lib.id, next_offset); + if deleted > 0 { + debug!( + "missing-file scan: library '{}' next_offset={}", + lib.name, next_offset + ); + } + } + } + + // Reconciliation: cross-library, so it runs once per tick + // outside the per-library loop. Idempotent — fast no-op when + // there's nothing to do. Operates on the database alone, no + // filesystem dependency, so it doesn't need a health gate. + // See database::reconcile and CLAUDE.md "Multi-library data + // model" for the rules. + { + let mut conn = image_api::database::connect(); + let _ = image_api::database::reconcile::run(&mut conn); + + // Back-ref refresh: hash-keyed rows whose + // (library_id, rel_path) tuple no longer matches any + // image_exif row but whose hash still does. After a + // recent→archive move, the missing-file scan removes + // the old image_exif row; this pass repoints face / + // tag / insight back-refs at the surviving location. + // DB-only, no health gate needed — uses what's in + // image_exif as truth. + let _ = library_maintenance::refresh_back_refs(&mut conn); + + // Orphan GC: the destructive end of the maintenance + // pipeline. Two-tick consensus + every-library-online + // requirement is enforced inside run_orphan_gc; we + // pass the current all-online flag and the function + // tracks the previous tick's flag in OrphanGcState. + let all_online = library_maintenance::all_libraries_online(&libs, &library_health); + let _ = + library_maintenance::run_orphan_gc(&mut conn, &mut orphan_gc_state, all_online); + } + + if is_full_scan { + last_full_scan = now; + } + last_quick_scan = now; + scan_count += 1; + } + }); +} + +/// Check if a playlist needs to be (re)generated. +/// +/// Returns true if: +/// - Playlist doesn't exist, OR +/// - Source video is newer than the playlist +/// +/// When metadata for either path is unreadable, returns true so the +/// caller errs on the side of regeneration (a redundant transcode +/// beats a stale playlist). +pub fn playlist_needs_generation(video_path: &Path, playlist_path: &Path) -> bool { + if !playlist_path.exists() { + return true; + } + + // Check if source video is newer than playlist + if let (Ok(video_meta), Ok(playlist_meta)) = ( + std::fs::metadata(video_path), + std::fs::metadata(playlist_path), + ) && let (Ok(video_modified), Ok(playlist_modified)) = + (video_meta.modified(), playlist_meta.modified()) + { + return video_modified > playlist_modified; + } + + // If we can't determine, assume it needs generation + true +} + +pub fn process_new_files( + library: &libraries::Library, + exif_dao: Arc>>, + preview_dao: Arc>>, + face_dao: Arc>>, + tag_dao: Arc>>, + face_client: crate::ai::face_client::FaceClient, + excluded_dirs: &[String], + modified_since: Option, + playlist_manager: Addr, + preview_generator: Addr, +) { + let context = opentelemetry::Context::new(); + let thumbs = dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined"); + let thumbnail_directory = Path::new(&thumbs); + let base_path = Path::new(&library.root_path); + + // Walk, prune EXCLUDED_DIRS subtrees, and apply image/video + modified_since + // filters. See `file_scan` for why exclusion has to happen at WalkDir + // time (filter_entry) rather than at face-detect time. + let files: Vec<(PathBuf, String)> = + image_api::file_scan::enumerate_indexable_files(base_path, excluded_dirs, modified_since); + + if files.is_empty() { + debug!("No files to process"); + return; + } + + debug!("Found {} files to check", files.len()); + + // Batch query: Get all EXIF data for these files in one query + let file_paths: Vec = files.iter().map(|(_, rel_path)| rel_path.clone()).collect(); + + let existing_exif_paths: HashMap = { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + // Walk is per-library, so scope the lookup so a same-named file + // in another library doesn't make this one look already-indexed. + match dao.get_exif_batch(&context, Some(library.id), &file_paths) { + Ok(exif_records) => exif_records + .into_iter() + .map(|record| (record.file_path, true)) + .collect(), + Err(e) => { + error!("Error batch querying EXIF data: {:?}", e); + HashMap::new() + } + } + }; + + let mut new_files_found = false; + let mut files_needing_row = Vec::new(); + + // Register every image/video file in image_exif. Rows without EXIF + // still carry library_id, rel_path, content_hash, and size_bytes so + // derivative dedup and DB-indexed sort/filter work for every file, + // not just photos with parseable EXIF. + for (file_path, relative_path) in &files { + // Check both the library-scoped legacy path (current shape) and + // the bare-legacy path (pre-multi-library shape). Either one + // existing means a thumbnail is already on disk for this file. + let scoped_thumb_path = content_hash::library_scoped_legacy_path( + thumbnail_directory, + library.id, + relative_path, + ); + let bare_legacy_thumb_path = thumbnail_directory.join(relative_path); + let needs_thumbnail = !scoped_thumb_path.exists() + && !bare_legacy_thumb_path.exists() + && !thumbnails::unsupported_thumbnail_sentinel(&scoped_thumb_path).exists() + && !thumbnails::unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists(); + let needs_row = !existing_exif_paths.contains_key(relative_path); + + if needs_thumbnail || needs_row { + new_files_found = true; + + if needs_thumbnail { + info!("New file detected (missing thumbnail): {}", relative_path); + } + + if needs_row { + files_needing_row.push((file_path.clone(), relative_path.clone())); + } + } + } + + if !files_needing_row.is_empty() { + info!( + "Registering {} new files in image_exif", + files_needing_row.len() + ); + + for (file_path, relative_path) in files_needing_row { + let timestamp = Utc::now().timestamp(); + + // Hash + size from filesystem metadata — always attempted so + // every file gets a content_hash, even when EXIF is absent. + let (content_hash, size_bytes) = match content_hash::compute(&file_path) { + Ok(id) => (Some(id.content_hash), Some(id.size_bytes)), + Err(e) => { + warn!("Failed to hash {}: {:?}", file_path.display(), e); + (None, None) + } + }; + + // Perceptual hashes (pHash + dHash). Best-effort — None for + // videos and decode failures. Drives near-duplicate detection + // in the Apollo duplicates surface; failure here is non-fatal + // and never blocks indexing. + let perceptual = perceptual_hash::compute(&file_path); + + // EXIF is best-effort enrichment. When extraction fails (or the + // file type doesn't support EXIF) we still store a row with all + // EXIF fields NULL; the file remains visible to sort-by-date + // and tag queries via its rel_path and filesystem timestamps. + let exif_fields = if exif::supports_exif(&file_path) { + match exif::extract_exif_from_path(&file_path) { + Ok(data) => Some(data), + Err(e) => { + debug!( + "No EXIF or parse error for {}: {:?}", + file_path.display(), + e + ); + None + } + } + } else { + None + }; + + // Canonical date_taken via the waterfall — kamadak-exif (already + // computed above) → exiftool fallback for videos / MakerNote / + // QuickTime → filename regex → earliest_fs_time. Source is + // recorded so the per-tick backfill drain can re-run weak + // resolutions later. + let resolved_date = date_resolver::resolve_date_taken( + &file_path, + exif_fields.as_ref().and_then(|e| e.date_taken), + ); + + let insert_exif = InsertImageExif { + library_id: library.id, + file_path: relative_path.clone(), + camera_make: exif_fields.as_ref().and_then(|e| e.camera_make.clone()), + camera_model: exif_fields.as_ref().and_then(|e| e.camera_model.clone()), + lens_model: exif_fields.as_ref().and_then(|e| e.lens_model.clone()), + width: exif_fields.as_ref().and_then(|e| e.width), + height: exif_fields.as_ref().and_then(|e| e.height), + orientation: exif_fields.as_ref().and_then(|e| e.orientation), + gps_latitude: exif_fields + .as_ref() + .and_then(|e| e.gps_latitude.map(|v| v as f32)), + gps_longitude: exif_fields + .as_ref() + .and_then(|e| e.gps_longitude.map(|v| v as f32)), + gps_altitude: exif_fields + .as_ref() + .and_then(|e| e.gps_altitude.map(|v| v as f32)), + focal_length: exif_fields + .as_ref() + .and_then(|e| e.focal_length.map(|v| v as f32)), + aperture: exif_fields + .as_ref() + .and_then(|e| e.aperture.map(|v| v as f32)), + shutter_speed: exif_fields.as_ref().and_then(|e| e.shutter_speed.clone()), + iso: exif_fields.as_ref().and_then(|e| e.iso), + date_taken: resolved_date.map(|r| r.timestamp), + created_time: timestamp, + last_modified: timestamp, + content_hash, + size_bytes, + phash_64: perceptual.map(|h| h.phash_64), + dhash_64: perceptual.map(|h| h.dhash_64), + date_taken_source: resolved_date.map(|r| r.source.as_str().to_string()), + }; + + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + if let Err(e) = dao.store_exif(&context, insert_exif) { + error!( + "Failed to register {} in image_exif: {:?}", + relative_path, e + ); + } else { + debug!("Registered {} in image_exif", relative_path); + } + } + } + + // ── Face detection pass ──────────────────────────────────────────── + // Run after EXIF writes so newly-registered files have their + // content_hash populated. Skipped wholesale when face_client is + // disabled (no Apollo integration configured) — Phase 3 wires this + // up; the watcher remains usable on legacy deploys. + if face_client.is_enabled() { + // Opportunistic content_hash backfill: photos indexed before + // content-hashing landed (or where the hash compute failed + // silently on insert) end up in image_exif with NULL + // content_hash. build_face_candidates keys on content_hash, so + // those files would never become candidates without backfill. + // Idempotent — subsequent scans see the populated hashes and + // no-op. The dedicated `backfill_hashes` binary is still the + // right tool for very large legacy libraries; this branch + // ensures small/medium deploys self-heal without operator + // action. + backfill::backfill_missing_content_hashes(&context, &files, library, &exif_dao); + let candidates = + backfill::build_face_candidates(&context, library, &files, &exif_dao, &face_dao); + debug!( + "face_watch: scan tick — {} image file(s) walked, {} candidate(s) (library '{}', modified_since={})", + files + .iter() + .filter(|(p, _)| !file_types::is_video_file(p)) + .count(), + candidates.len(), + library.name, + modified_since.is_some(), + ); + if !candidates.is_empty() { + face_watch::run_face_detection_pass( + library, + excluded_dirs, + &face_client, + Arc::clone(&face_dao), + Arc::clone(&tag_dao), + candidates, + ); + } + } + + // Check for videos that need HLS playlists + let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); + let mut videos_needing_playlists = Vec::new(); + + for (file_path, _relative_path) in &files { + if file_types::is_video_file(file_path) { + // Construct expected playlist path + let playlist_filename = + format!("{}.m3u8", file_path.file_name().unwrap().to_string_lossy()); + let playlist_path = Path::new(&video_path_base).join(&playlist_filename); + + // Check if playlist needs (re)generation + if playlist_needs_generation(file_path, &playlist_path) { + videos_needing_playlists.push(file_path.clone()); + } + } + } + + // Send queue request to playlist manager + if !videos_needing_playlists.is_empty() { + playlist_manager.do_send(QueueVideosMessage { + video_paths: videos_needing_playlists, + }); + } + + // Check for videos that need preview clips + // Collect (full_path, relative_path) for video files + let video_files: Vec<(String, String)> = files + .iter() + .filter(|(file_path, _)| file_types::is_video_file(file_path)) + .map(|(file_path, rel_path)| (file_path.to_string_lossy().to_string(), rel_path.clone())) + .collect(); + + if !video_files.is_empty() { + // Query DB using relative paths (consistent with how GET/POST handlers store them) + let video_rel_paths: Vec = video_files.iter().map(|(_, rel)| rel.clone()).collect(); + + let existing_previews: HashMap = { + let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); + match dao.get_previews_batch(&context, &video_rel_paths) { + Ok(clips) => clips + .into_iter() + .map(|clip| (clip.file_path, clip.status)) + .collect(), + Err(e) => { + error!("Error batch querying preview clips: {:?}", e); + HashMap::new() + } + } + }; + + for (full_path, relative_path) in &video_files { + let status = existing_previews.get(relative_path).map(|s| s.as_str()); + let needs_preview = match status { + None => true, // No record at all + Some("failed") => true, // Retry failed + Some("pending") => true, // Stale pending from previous run + _ => false, // processing or complete + }; + + if needs_preview { + // Insert pending record using relative path + if status.is_none() { + let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); + let _ = dao.insert_preview(&context, relative_path, "pending"); + } + + // Send full path in the message — the actor will derive relative path from it + preview_generator.do_send(GeneratePreviewClipMessage { + video_path: full_path.clone(), + }); + } + } + } + + // Generate thumbnails for all files that need them + if new_files_found { + info!("Processing thumbnails for new files..."); + thumbnails::create_thumbnails(std::slice::from_ref(library), excluded_dirs); + } + + // Reconciliation: on a full scan, prune image_exif rows whose rel_path no + // longer exists on disk for this library. Keeps the DB in parity so + // downstream DB-backed listings (e.g. recursive /photos) don't return + // phantom files. Skipped on quick scans — those only look at recently + // modified files and can't distinguish "missing" from "unchanged". + if modified_since.is_none() { + let disk_paths: HashSet = files.iter().map(|(_, rel)| rel.clone()).collect(); + let db_paths: Vec = { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + dao.get_rel_paths_for_library(&context, library.id) + .unwrap_or_else(|e| { + error!( + "Reconciliation: failed to load image_exif rel_paths for lib {}: {:?}", + library.id, e + ); + Vec::new() + }) + }; + + let stale: Vec = db_paths + .into_iter() + .filter(|p| !disk_paths.contains(p)) + .collect(); + + if !stale.is_empty() { + info!( + "Reconciliation: pruning {} stale image_exif rows for library '{}'", + stale.len(), + library.name + ); + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + for rel in &stale { + if let Err(e) = dao.delete_exif_by_library(&context, library.id, rel) { + warn!( + "Reconciliation: failed to delete {} (lib {}): {:?}", + rel, library.id, e + ); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + use std::thread::sleep; + use std::time::Duration as StdDuration; + use tempfile::TempDir; + + #[test] + fn playlist_needs_generation_true_when_playlist_missing() { + let tmp = TempDir::new().unwrap(); + let video = tmp.path().join("clip.mp4"); + fs::write(&video, b"v").unwrap(); + let playlist = tmp.path().join("clip.mp4.m3u8"); + // playlist does not exist + assert!(playlist_needs_generation(&video, &playlist)); + } + + #[test] + fn playlist_needs_generation_false_when_playlist_is_newer() { + let tmp = TempDir::new().unwrap(); + let video = tmp.path().join("clip.mp4"); + fs::write(&video, b"v").unwrap(); + // Sleep to guarantee a distinct mtime for the playlist created next. + // Many filesystems have ~10 ms mtime resolution; 50 ms is plenty. + sleep(StdDuration::from_millis(50)); + let playlist = tmp.path().join("clip.mp4.m3u8"); + fs::write(&playlist, b"#EXTM3U").unwrap(); + assert!(!playlist_needs_generation(&video, &playlist)); + } + + #[test] + fn playlist_needs_generation_true_when_video_is_newer() { + let tmp = TempDir::new().unwrap(); + let playlist = tmp.path().join("clip.mp4.m3u8"); + fs::write(&playlist, b"#EXTM3U").unwrap(); + sleep(StdDuration::from_millis(50)); + let video = tmp.path().join("clip.mp4"); + fs::write(&video, b"v").unwrap(); + assert!(playlist_needs_generation(&video, &playlist)); + } + + #[test] + fn playlist_needs_generation_true_when_video_missing_metadata() { + // Video doesn't exist; metadata fails for it. Falls through to the + // "assume needs regeneration" branch. + let tmp = TempDir::new().unwrap(); + let video = tmp.path().join("missing.mp4"); + let playlist = tmp.path().join("missing.mp4.m3u8"); + fs::write(&playlist, b"#EXTM3U").unwrap(); + assert!(playlist_needs_generation(&video, &playlist)); + } +}