Files
ImageApi/src/watcher.rs
Cameron Cordes 32195ed89e clip-search: backlog drain + /photos/search endpoint
Wires the persistence layer for CLIP semantic search. The watcher's
per-tick drain encodes any image_exif row with a known content_hash
but no clip_embedding via Apollo (cap CLIP_BACKLOG_MAX_PER_TICK,
default 32). On a query, /photos/search encodes the text via Apollo
and reranks every stored embedding in-memory.

ExifDao additions:
- list_clip_unencoded_candidates — partial-index scan for drain
- backfill_clip_embedding — touches only the two new columns
- list_clip_index — dedup'd (hash, embedding) pull for search

clip_watch::run_clip_encoding_pass is the parallel fan-out — tokio
runtime per pass with CLIP_ENCODE_CONCURRENCY (default 4). No marker
rows for permanent failures yet; per-tick cap bounds the retry cost.

/photos/search params: q, limit, threshold (default 0.20), library,
model_version. Response is intentionally minimal (path + score) so
the frontend joins against existing photo-metadata routes lazily.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 16:10:52 -04:00

1103 lines
47 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! Background file-watcher loop + the orphaned-playlist cleanup job.
//!
//! `watch_files` spins a thread that, on every tick (default 60 s
//! quick-scan / 3600 s full-scan), probes each library's availability,
//! drains the unhashed / date / face-detection backlogs via
//! [`crate::backfill`], walks newly-modified files through
//! [`process_new_files`], updates the media-count gauges, and runs the
//! three-stage maintenance pipeline (missing-file scan → back-ref
//! refresh → orphan GC).
//!
//! `cleanup_orphaned_playlists` runs on a slower interval (default 24
//! hours) and reaps HLS playlists whose source videos no longer exist
//! in any library. Both jobs respect [`crate::libraries::LibraryHealthMap`]
//! — a stale library skips destructive paths so transient unmounts
//! don't trigger data loss.
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, SystemTime};
use actix::Addr;
use chrono::Utc;
use log::{debug, error, info, warn};
use crate::backfill;
use crate::content_hash;
use crate::database::models::InsertImageExif;
use crate::database::{ExifDao, PreviewDao, SqliteExifDao, SqlitePreviewDao};
use crate::date_resolver;
use crate::exif;
use crate::face_watch;
use crate::faces;
use crate::file_types;
use crate::hls_stats;
use crate::libraries;
use crate::library_maintenance;
use crate::perceptual_hash;
use crate::tags;
use crate::tags::SqliteTagDao;
use crate::thumbnails;
use crate::video;
use crate::video::actors::{
GeneratePreviewClipMessage, QueueVideosMessage, VideoPlaylistManager, VideoToQueue,
};
use crate::video::hls_paths;
/// Clean up orphaned HLS hash directories under `$VIDEO_PATH` whose
/// content_hash no longer appears in `image_exif`.
///
/// Walks `<video_path>/<shard>/<hash>/` — the layout written by the
/// hash-keyed `PlaylistGenerator` — and deletes any hash directory whose
/// hash isn't in the current DISTINCT set of `image_exif.content_hash`
/// values. Empty shard parents are reaped on the same pass.
///
/// Legacy basename-keyed files at `$VIDEO_PATH` root (from the
/// pre-content-hash layout) are left alone here; the one-shot startup
/// migration is responsible for retiring those.
///
/// `libs_lock` is the shared live view of the libraries table — read at the
/// top of each cleanup pass so a PATCH /libraries/{id} that disables or
/// re-mounts a library is picked up without a restart.
pub fn cleanup_orphaned_playlists(
libs_lock: Arc<RwLock<Vec<libraries::Library>>>,
_excluded_dirs: Vec<String>,
library_health: libraries::LibraryHealthMap,
) {
std::thread::spawn(move || {
let video_path_str = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
let video_path = PathBuf::from(&video_path_str);
// Get cleanup interval from environment (default: 24 hours)
let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(86400); // 24 hours
info!("Starting orphaned HLS cleanup job");
info!(" Cleanup interval: {} seconds", cleanup_interval_secs);
info!(" HLS directory: {}", video_path.display());
let exif_dao: Arc<Mutex<Box<dyn ExifDao>>> = Arc::new(Mutex::new(Box::new(
SqliteExifDao::new(),
)
as Box<dyn ExifDao>));
loop {
std::thread::sleep(Duration::from_secs(cleanup_interval_secs));
// Fresh snapshot per tick so a PATCH /libraries/{id} that
// disabled a library (or rewrote its excluded_dirs) is
// honoured immediately.
let libs: Vec<libraries::Library> =
libs_lock.read().unwrap_or_else(|e| e.into_inner()).clone();
// Safety gate: skip the cleanup cycle if any (enabled)
// library is stale. With hash-keyed layout the orphan
// decision is a pure DB query, but the upstream
// missing-file scan that *removes* image_exif rows already
// pauses for stale libraries — so a stale tick can hold
// hashes alive that would otherwise have been GC'd. The
// safety is then mostly belt-and-suspenders: a hash that
// should have been retired is just kept one tick longer.
// We'd rather leak a few hash dirs for 24h than wipe a
// hash dir whose source was briefly unreachable.
{
let guard = library_health.read().unwrap_or_else(|e| e.into_inner());
let stale: Vec<String> = libs
.iter()
.filter(|lib| lib.enabled)
.filter(|lib| guard.get(&lib.id).map(|h| !h.is_online()).unwrap_or(false))
.map(|lib| lib.name.clone())
.collect();
if !stale.is_empty() {
warn!(
"Skipping orphaned-HLS cleanup: {} library(ies) stale: [{}]",
stale.len(),
stale.join(", ")
);
continue;
}
}
info!("Running orphaned HLS cleanup");
let start = std::time::Instant::now();
// Snapshot every live content_hash currently in image_exif.
// We intentionally don't filter by library here — a hash that
// lives in any library is alive, even if the library a given
// download attributed it to has since been disabled.
let alive_hashes: HashSet<String> = {
let context = opentelemetry::Context::new();
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
match dao.list_distinct_content_hashes(&context) {
Ok(hashes) => hashes.into_iter().collect(),
Err(e) => {
error!(
"Failed to load distinct content hashes; skipping HLS cleanup: {:?}",
e
);
continue;
}
}
};
let mut deleted_count = 0usize;
let mut error_count = 0usize;
let mut inspected = 0usize;
// Walk top-level entries of VIDEO_PATH. Each is either a
// legacy basename-keyed `.m3u8` / `.ts` (skip — migration
// owns those) or a 2-char shard directory.
let read_root = match std::fs::read_dir(&video_path) {
Ok(r) => r,
Err(e) => {
error!(
"HLS cleanup: failed to read VIDEO_PATH {}: {}",
video_path.display(),
e
);
continue;
}
};
for shard_entry in read_root.flatten() {
let shard_path = shard_entry.path();
if !shard_entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
continue;
}
let shard_name = match shard_path.file_name().and_then(|n| n.to_str()) {
Some(n) => n.to_owned(),
None => continue,
};
if !is_hash_shard(&shard_name) {
continue;
}
// Hash dirs inside this shard.
let read_shard = match std::fs::read_dir(&shard_path) {
Ok(r) => r,
Err(e) => {
warn!(
"HLS cleanup: failed to read shard {}: {}",
shard_path.display(),
e
);
continue;
}
};
let mut shard_emptied = true;
for hash_entry in read_shard.flatten() {
let hash_path = hash_entry.path();
if !hash_entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
shard_emptied = false;
continue;
}
let Some(hash_name) = hash_path
.file_name()
.and_then(|n| n.to_str())
.map(|n| n.to_owned())
else {
shard_emptied = false;
continue;
};
if !is_full_hash(&hash_name) {
shard_emptied = false;
continue;
}
inspected += 1;
if alive_hashes.contains(&hash_name) {
shard_emptied = false;
continue;
}
debug!(
"HLS cleanup: removing orphan hash dir {}",
hash_path.display()
);
match std::fs::remove_dir_all(&hash_path) {
Ok(()) => deleted_count += 1,
Err(e) => {
warn!(
"Failed to delete orphan hash dir {}: {}",
hash_path.display(),
e
);
error_count += 1;
shard_emptied = false;
}
}
}
// If this shard now has no surviving hash dirs, reap
// the (empty) shard dir too. remove_dir fails if non-
// empty, which is the guard.
if shard_emptied {
let _ = std::fs::remove_dir(&shard_path);
}
}
info!(
"Orphaned HLS cleanup completed in {:?}: inspected {} hash dirs, deleted {} orphans, {} errors",
start.elapsed(),
inspected,
deleted_count,
error_count
);
}
});
}
/// True iff `s` is a two-character lowercase-hex shard prefix.
fn is_hash_shard(s: &str) -> bool {
s.len() == 2 && s.bytes().all(|b| b.is_ascii_hexdigit())
}
/// True iff `s` looks like a full blake3 hex digest (64 hex chars).
/// Be strict so we don't accidentally rm a non-HLS directory operators
/// have stashed under VIDEO_PATH.
fn is_full_hash(s: &str) -> bool {
s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit())
}
pub fn watch_files(
libs_lock: Arc<RwLock<Vec<libraries::Library>>>,
playlist_manager: Addr<VideoPlaylistManager>,
preview_generator: Addr<video::actors::PreviewClipGenerator>,
face_client: crate::ai::face_client::FaceClient,
clip_client: crate::ai::clip_client::ClipClient,
excluded_dirs: Vec<String>,
library_health: libraries::LibraryHealthMap,
) {
std::thread::spawn(move || {
// Get polling intervals from environment variables
// Quick scan: Check recently modified files (default: 60 seconds)
let quick_interval_secs = dotenv::var("WATCH_QUICK_INTERVAL_SECONDS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(60);
// Full scan: Check all files regardless of modification time (default: 3600 seconds = 1 hour)
let full_interval_secs = dotenv::var("WATCH_FULL_INTERVAL_SECONDS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(3600);
info!("Starting optimized file watcher");
info!(" Quick scan interval: {} seconds", quick_interval_secs);
info!(" Full scan interval: {} seconds", full_interval_secs);
// Surface face-detection state at boot so it's obvious whether
// the watcher will hit Apollo. The branch silently no-ops when
// disabled (intentional for legacy deploys), which makes "why
// aren't faces being detected?" hard to diagnose otherwise.
if face_client.is_enabled() {
info!(" Face detection: ENABLED");
} else {
info!(
" Face detection: DISABLED (set APOLLO_FACE_API_BASE_URL \
or APOLLO_API_BASE_URL to enable)"
);
}
if clip_client.is_enabled() {
info!(" CLIP semantic search: ENABLED");
} else {
info!(
" CLIP semantic search: DISABLED (set APOLLO_CLIP_API_BASE_URL \
or APOLLO_API_BASE_URL to enable)"
);
}
{
let libs = libs_lock.read().unwrap_or_else(|e| e.into_inner());
for lib in libs.iter() {
info!(
" Watching library '{}' (id={}) at {}",
lib.name, lib.id, lib.root_path
);
}
}
// Create DAOs for tracking processed files
let exif_dao = Arc::new(Mutex::new(
Box::new(SqliteExifDao::new()) as Box<dyn ExifDao>
));
let preview_dao = Arc::new(Mutex::new(
Box::new(SqlitePreviewDao::new()) as Box<dyn PreviewDao>
));
let face_dao = Arc::new(Mutex::new(
Box::new(faces::SqliteFaceDao::new()) as Box<dyn faces::FaceDao>
));
// tag_dao for the watcher's auto-bind path. Independent of the
// request-handler tag_dao instance — both end up pointing at the
// same SQLite file via SqliteTagDao::default().
let watcher_tag_dao = Arc::new(Mutex::new(
Box::new(SqliteTagDao::default()) as Box<dyn tags::TagDao>
));
let mut last_quick_scan = SystemTime::now();
// Initialize to UNIX_EPOCH so the *first* tick is treated as a
// full scan. That replaces the legacy startup ScanDirectoryMessage
// walk for HLS playlists: every library's existing media gets
// checked once at watcher boot, instead of waiting up to
// full_interval_secs (1h default) for the first natural full scan.
let mut last_full_scan = SystemTime::UNIX_EPOCH;
let mut scan_count = 0u64;
// Per-library cursor for the missing-file scan. Each tick reads
// a page from `offset`, stat()s the rows, deletes confirmed-
// missing ones, and advances or wraps the cursor. State held
// in-memory so a watcher restart resumes from 0 — fine, the
// sweep is idempotent.
let mut missing_file_offsets: HashMap<i32, i64> = HashMap::new();
let missing_scan_page_size: i64 = dotenv::var("IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &i64| *n > 0)
.unwrap_or(library_maintenance::DEFAULT_SCAN_PAGE_SIZE);
let missing_delete_cap: usize = dotenv::var("IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &usize| *n > 0)
.unwrap_or(library_maintenance::DEFAULT_MISSING_DELETE_CAP);
// Two-tick orphan-GC consensus state. Carried across ticks via
// `OrphanGcState`; see library_maintenance::run_orphan_gc.
let mut orphan_gc_state = library_maintenance::OrphanGcState::default();
// Initial availability sweep before the loop's first sleep so
// /libraries reports the truth from the very first request,
// rather than the optimistic Online default that
// new_health_map seeds. Without this, an unmounted share would
// appear online for up to WATCH_QUICK_INTERVAL_SECONDS (default
// 60s) after boot. Same probe logic as the per-tick gate
// below; no ingest runs here, just the health update + log.
// Disabled libraries skip the probe entirely — they should
// never enter the health map (treated as out-of-scope).
{
let libs = libs_lock.read().unwrap_or_else(|e| e.into_inner());
for lib in libs.iter() {
if !lib.enabled {
continue;
}
let context = opentelemetry::Context::new();
let had_data = exif_dao
.lock()
.expect("exif_dao poisoned")
.count_for_library(&context, lib.id)
.map(|n| n > 0)
.unwrap_or(false);
libraries::refresh_health(&library_health, lib, had_data);
}
}
loop {
std::thread::sleep(Duration::from_secs(quick_interval_secs));
let now = SystemTime::now();
let since_last_full = now
.duration_since(last_full_scan)
.unwrap_or(Duration::from_secs(0));
let is_full_scan = since_last_full.as_secs() >= full_interval_secs;
// Fresh snapshot per tick — picks up PATCH /libraries/{id}
// mutations to `enabled` / `excluded_dirs` without restart.
let libs: Vec<libraries::Library> =
libs_lock.read().unwrap_or_else(|e| e.into_inner()).clone();
for lib in &libs {
// Operator kill switch: a disabled library is invisible
// to the watcher entirely. No probe, no ingest, no
// maintenance, no health entry. Distinct from Stale —
// Stale is "we wanted to but couldn't"; Disabled is
// "we don't want to". Toggle via SQL.
if !lib.enabled {
debug!(
"watcher: skipping library '{}' (id={}) — enabled=false",
lib.name, lib.id
);
continue;
}
// Availability probe: every tick checks that the
// library's mount is reachable, is a directory, is
// readable, and (if image_exif has rows for it) is
// non-empty. A Stale library skips ingest, backlog
// drains, and metric refresh — reads/serving in HTTP
// handlers continue to work. Branches B/C extend the
// probe gate to cover handoff and orphan GC. See
// CLAUDE.md "Library availability and safety".
let had_data = {
let context = opentelemetry::Context::new();
let mut guard = exif_dao.lock().expect("exif_dao poisoned");
guard
.count_for_library(&context, lib.id)
.map(|n| n > 0)
.unwrap_or(false)
};
let health = libraries::refresh_health(&library_health, lib, had_data);
if !health.is_online() {
// Skip every write path for this library this tick.
// Don't refresh the media-count gauge either — a
// probe-failed library would otherwise flap to 0
// image / 0 video and pollute Prometheus.
continue;
}
// Drain the unhashed-hash backlog AND the face-detection
// backlog every tick, regardless of quick/full. Quick
// scans only walk recently-modified files, so the
// pre-Phase-3 backlog never enters their candidate set
// — without these standalone passes, backfill +
// detection only progressed during full scans
// (default once an hour).
// Effective excludes for this library: global env-var
// row's excluded_dirs. Compute once per tick — used
// by every walker below for this library.
let effective_excludes = lib.effective_excluded_dirs(&excluded_dirs);
if face_client.is_enabled() {
let context = opentelemetry::Context::new();
backfill::backfill_unhashed_backlog(&context, lib, &exif_dao);
backfill::process_face_backlog(
&context,
lib,
&face_client,
&face_dao,
&watcher_tag_dao,
&effective_excludes,
);
}
// CLIP embedding backlog. Independent of face detection —
// drain runs whenever CLIP is enabled, even on deploys
// that don't have the face engine wired up. Mirrors the
// face drain shape (capped per tick, no-op when disabled).
if clip_client.is_enabled() {
let context = opentelemetry::Context::new();
backfill::process_clip_backlog(
&context,
lib,
&clip_client,
&exif_dao,
&effective_excludes,
);
}
// Date-taken backfill: drain rows whose canonical date is
// either unresolved or only fs_time-sourced. Independent
// of face detection — runs even on deploys that don't
// configure Apollo, since `/memories` depends on it.
{
let context = opentelemetry::Context::new();
backfill::backfill_missing_date_taken(&context, lib, &exif_dao);
}
if is_full_scan {
info!(
"Running full scan for library '{}' (scan #{})",
lib.name, scan_count
);
process_new_files(
lib,
Arc::clone(&exif_dao),
Arc::clone(&preview_dao),
Arc::clone(&face_dao),
Arc::clone(&watcher_tag_dao),
face_client.clone(),
&effective_excludes,
None,
playlist_manager.clone(),
preview_generator.clone(),
);
} else {
debug!(
"Running quick scan for library '{}' (checking files modified in last {} seconds)",
lib.name,
quick_interval_secs + 10
);
let check_since = last_quick_scan
.checked_sub(Duration::from_secs(10))
.unwrap_or(last_quick_scan);
process_new_files(
lib,
Arc::clone(&exif_dao),
Arc::clone(&preview_dao),
Arc::clone(&face_dao),
Arc::clone(&watcher_tag_dao),
face_client.clone(),
&effective_excludes,
Some(check_since),
playlist_manager.clone(),
preview_generator.clone(),
);
}
// Update media counts per library (metric aggregates across all)
thumbnails::update_media_counts(Path::new(&lib.root_path), &effective_excludes);
// Missing-file detection: prune image_exif rows whose
// source file is no longer on disk. Per-library, so we
// pass library-online-this-tick implicitly (we only
// reach here if the probe gate at the top of the
// iteration passed). Capped + paginated so a huge
// library doesn't stall the watcher; rows we don't
// visit this tick get visited next tick. See
// library_maintenance::detect_missing_files_for_library.
{
let context = opentelemetry::Context::new();
let offset = missing_file_offsets.get(&lib.id).copied().unwrap_or(0);
let (deleted, next_offset) =
library_maintenance::detect_missing_files_for_library(
&context,
lib,
&exif_dao,
offset,
missing_scan_page_size,
missing_delete_cap,
);
missing_file_offsets.insert(lib.id, next_offset);
if deleted > 0 {
debug!(
"missing-file scan: library '{}' next_offset={}",
lib.name, next_offset
);
}
}
}
// Reconciliation: cross-library, so it runs once per tick
// outside the per-library loop. Idempotent — fast no-op when
// there's nothing to do. Operates on the database alone, no
// filesystem dependency, so it doesn't need a health gate.
// See database::reconcile and CLAUDE.md "Multi-library data
// model" for the rules.
{
let mut conn = image_api::database::connect();
let _ = image_api::database::reconcile::run(&mut conn);
// Back-ref refresh: hash-keyed rows whose
// (library_id, rel_path) tuple no longer matches any
// image_exif row but whose hash still does. After a
// recent→archive move, the missing-file scan removes
// the old image_exif row; this pass repoints face /
// tag / insight back-refs at the surviving location.
// DB-only, no health gate needed — uses what's in
// image_exif as truth.
let _ = library_maintenance::refresh_back_refs(&mut conn);
// Orphan GC: the destructive end of the maintenance
// pipeline. Two-tick consensus + every-library-online
// requirement is enforced inside run_orphan_gc; we
// pass the current all-online flag and the function
// tracks the previous tick's flag in OrphanGcState.
let all_online = library_maintenance::all_libraries_online(&libs, &library_health);
let _ =
library_maintenance::run_orphan_gc(&mut conn, &mut orphan_gc_state, all_online);
}
if is_full_scan {
// End-of-full-scan HLS readiness summary: log a single
// info line + refresh the Prometheus gauges. Skipped on
// quick scans because the cost is non-trivial on big
// libraries and the data only meaningfully changes on
// full passes.
let video_dir_str = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
let stats =
hls_stats::compute_and_publish(&libs, &exif_dao, Path::new(&video_dir_str));
hls_stats::log_summary(&stats);
last_full_scan = now;
}
last_quick_scan = now;
scan_count += 1;
}
});
}
/// Check if a playlist needs to be (re)generated.
///
/// Returns true if:
/// - Playlist doesn't exist, OR
/// - Source video is newer than the playlist
///
/// When metadata for either path is unreadable, returns true so the
/// caller errs on the side of regeneration (a redundant transcode
/// beats a stale playlist).
pub fn playlist_needs_generation(video_path: &Path, playlist_path: &Path) -> bool {
if !playlist_path.exists() {
return true;
}
// Check if source video is newer than playlist
if let (Ok(video_meta), Ok(playlist_meta)) = (
std::fs::metadata(video_path),
std::fs::metadata(playlist_path),
) && let (Ok(video_modified), Ok(playlist_modified)) =
(video_meta.modified(), playlist_meta.modified())
{
return video_modified > playlist_modified;
}
// If we can't determine, assume it needs generation
true
}
pub fn process_new_files(
library: &libraries::Library,
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
face_dao: Arc<Mutex<Box<dyn faces::FaceDao>>>,
tag_dao: Arc<Mutex<Box<dyn tags::TagDao>>>,
face_client: crate::ai::face_client::FaceClient,
excluded_dirs: &[String],
modified_since: Option<SystemTime>,
playlist_manager: Addr<VideoPlaylistManager>,
preview_generator: Addr<video::actors::PreviewClipGenerator>,
) {
let context = opentelemetry::Context::new();
let thumbs = dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
let thumbnail_directory = Path::new(&thumbs);
let base_path = Path::new(&library.root_path);
// Walk, prune EXCLUDED_DIRS subtrees, and apply image/video + modified_since
// filters. See `file_scan` for why exclusion has to happen at WalkDir
// time (filter_entry) rather than at face-detect time.
let files: Vec<(PathBuf, String)> =
image_api::file_scan::enumerate_indexable_files(base_path, excluded_dirs, modified_since);
if files.is_empty() {
debug!("No files to process");
return;
}
debug!("Found {} files to check", files.len());
// Batch query: Get all EXIF data for these files in one query
let file_paths: Vec<String> = files.iter().map(|(_, rel_path)| rel_path.clone()).collect();
// Map of rel_path -> Option<content_hash>. The presence of the key
// tells us "row exists"; the Option value carries the hash for the
// HLS pipeline so video files without a hash (mid-backfill) skip
// this tick rather than fall back to a basename-colliding playlist.
let existing_exif: HashMap<String, Option<String>> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
// Walk is per-library, so scope the lookup so a same-named file
// in another library doesn't make this one look already-indexed.
match dao.get_exif_batch(&context, Some(library.id), &file_paths) {
Ok(exif_records) => exif_records
.into_iter()
.map(|record| (record.file_path, record.content_hash))
.collect(),
Err(e) => {
error!("Error batch querying EXIF data: {:?}", e);
HashMap::new()
}
}
};
let mut new_files_found = false;
let mut files_needing_row = Vec::new();
// Register every image/video file in image_exif. Rows without EXIF
// still carry library_id, rel_path, content_hash, and size_bytes so
// derivative dedup and DB-indexed sort/filter work for every file,
// not just photos with parseable EXIF.
for (file_path, relative_path) in &files {
// Check both the library-scoped legacy path (current shape) and
// the bare-legacy path (pre-multi-library shape). Either one
// existing means a thumbnail is already on disk for this file.
let scoped_thumb_path = content_hash::library_scoped_legacy_path(
thumbnail_directory,
library.id,
relative_path,
);
let bare_legacy_thumb_path = thumbnail_directory.join(relative_path);
let needs_thumbnail = !scoped_thumb_path.exists()
&& !bare_legacy_thumb_path.exists()
&& !thumbnails::unsupported_thumbnail_sentinel(&scoped_thumb_path).exists()
&& !thumbnails::unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists();
let needs_row = !existing_exif.contains_key(relative_path);
if needs_thumbnail || needs_row {
new_files_found = true;
if needs_thumbnail {
info!("New file detected (missing thumbnail): {}", relative_path);
}
if needs_row {
files_needing_row.push((file_path.clone(), relative_path.clone()));
}
}
}
if !files_needing_row.is_empty() {
info!(
"Registering {} new files in image_exif",
files_needing_row.len()
);
for (file_path, relative_path) in files_needing_row {
let timestamp = Utc::now().timestamp();
// Hash + size from filesystem metadata — always attempted so
// every file gets a content_hash, even when EXIF is absent.
let (content_hash, size_bytes) = match content_hash::compute(&file_path) {
Ok(id) => (Some(id.content_hash), Some(id.size_bytes)),
Err(e) => {
warn!("Failed to hash {}: {:?}", file_path.display(), e);
(None, None)
}
};
// Perceptual hashes (pHash + dHash). Best-effort — None for
// videos and decode failures. Drives near-duplicate detection
// in the Apollo duplicates surface; failure here is non-fatal
// and never blocks indexing.
let perceptual = perceptual_hash::compute(&file_path);
// EXIF is best-effort enrichment. When extraction fails (or the
// file type doesn't support EXIF) we still store a row with all
// EXIF fields NULL; the file remains visible to sort-by-date
// and tag queries via its rel_path and filesystem timestamps.
let exif_fields = if exif::supports_exif(&file_path) {
match exif::extract_exif_from_path(&file_path) {
Ok(data) => Some(data),
Err(e) => {
debug!(
"No EXIF or parse error for {}: {:?}",
file_path.display(),
e
);
None
}
}
} else {
None
};
// Canonical date_taken via the waterfall — kamadak-exif (already
// computed above) → exiftool fallback for videos / MakerNote /
// QuickTime → filename regex → earliest_fs_time. Source is
// recorded so the per-tick backfill drain can re-run weak
// resolutions later.
let resolved_date = date_resolver::resolve_date_taken(
&file_path,
exif_fields.as_ref().and_then(|e| e.date_taken),
);
let insert_exif = InsertImageExif {
library_id: library.id,
file_path: relative_path.clone(),
camera_make: exif_fields.as_ref().and_then(|e| e.camera_make.clone()),
camera_model: exif_fields.as_ref().and_then(|e| e.camera_model.clone()),
lens_model: exif_fields.as_ref().and_then(|e| e.lens_model.clone()),
width: exif_fields.as_ref().and_then(|e| e.width),
height: exif_fields.as_ref().and_then(|e| e.height),
orientation: exif_fields.as_ref().and_then(|e| e.orientation),
gps_latitude: exif_fields
.as_ref()
.and_then(|e| e.gps_latitude.map(|v| v as f32)),
gps_longitude: exif_fields
.as_ref()
.and_then(|e| e.gps_longitude.map(|v| v as f32)),
gps_altitude: exif_fields
.as_ref()
.and_then(|e| e.gps_altitude.map(|v| v as f32)),
focal_length: exif_fields
.as_ref()
.and_then(|e| e.focal_length.map(|v| v as f32)),
aperture: exif_fields
.as_ref()
.and_then(|e| e.aperture.map(|v| v as f32)),
shutter_speed: exif_fields.as_ref().and_then(|e| e.shutter_speed.clone()),
iso: exif_fields.as_ref().and_then(|e| e.iso),
date_taken: resolved_date.map(|r| r.timestamp),
created_time: timestamp,
last_modified: timestamp,
content_hash,
size_bytes,
phash_64: perceptual.map(|h| h.phash_64),
dhash_64: perceptual.map(|h| h.dhash_64),
date_taken_source: resolved_date.map(|r| r.source.as_str().to_string()),
};
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
if let Err(e) = dao.store_exif(&context, insert_exif) {
error!(
"Failed to register {} in image_exif: {:?}",
relative_path, e
);
} else {
debug!("Registered {} in image_exif", relative_path);
}
}
}
// ── Face detection pass ────────────────────────────────────────────
// Run after EXIF writes so newly-registered files have their
// content_hash populated. Skipped wholesale when face_client is
// disabled (no Apollo integration configured) — Phase 3 wires this
// up; the watcher remains usable on legacy deploys.
if face_client.is_enabled() {
// Opportunistic content_hash backfill: photos indexed before
// content-hashing landed (or where the hash compute failed
// silently on insert) end up in image_exif with NULL
// content_hash. build_face_candidates keys on content_hash, so
// those files would never become candidates without backfill.
// Idempotent — subsequent scans see the populated hashes and
// no-op. The dedicated `backfill_hashes` binary is still the
// right tool for very large legacy libraries; this branch
// ensures small/medium deploys self-heal without operator
// action.
backfill::backfill_missing_content_hashes(&context, &files, library, &exif_dao);
let candidates =
backfill::build_face_candidates(&context, library, &files, &exif_dao, &face_dao);
debug!(
"face_watch: scan tick — {} image file(s) walked, {} candidate(s) (library '{}', modified_since={})",
files
.iter()
.filter(|(p, _)| !file_types::is_video_file(p))
.count(),
candidates.len(),
library.name,
modified_since.is_some(),
);
if !candidates.is_empty() {
face_watch::run_face_detection_pass(
library,
excluded_dirs,
&face_client,
Arc::clone(&face_dao),
Arc::clone(&tag_dao),
candidates,
);
}
}
// Check for videos that need HLS playlists. All output is keyed on
// `content_hash` (see `crate::video::hls_paths`), so files whose
// `image_exif.content_hash` is still NULL — typically mid-backfill —
// are skipped this tick and picked up after the unhashed backlog
// drain populates the hash on a subsequent tick. Skipping is the
// correct call: queuing without a hash would either fall back to
// basename keying (the bug this refactor fixes) or fabricate one.
let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
let video_dir = Path::new(&video_path_base);
let mut videos_needing_playlists: Vec<VideoToQueue> = Vec::new();
let mut hashless_video_count = 0usize;
for (file_path, relative_path) in &files {
if !file_types::is_video_file(file_path) {
continue;
}
let Some(hash) = existing_exif.get(relative_path).and_then(|h| h.clone()) else {
hashless_video_count += 1;
continue;
};
let playlist_path = hls_paths::playlist_for_hash(video_dir, &hash);
if playlist_needs_generation(file_path, &playlist_path) {
videos_needing_playlists.push(VideoToQueue {
video_path: file_path.clone(),
content_hash: hash,
});
}
}
if hashless_video_count > 0 {
debug!(
"Watcher tick for '{}': skipped {} video(s) with NULL content_hash (will retry after backfill)",
library.name, hashless_video_count
);
}
if !videos_needing_playlists.is_empty() {
playlist_manager.do_send(QueueVideosMessage {
videos: videos_needing_playlists,
});
}
// Check for videos that need preview clips
// Collect (full_path, relative_path) for video files
let video_files: Vec<(String, String)> = files
.iter()
.filter(|(file_path, _)| file_types::is_video_file(file_path))
.map(|(file_path, rel_path)| (file_path.to_string_lossy().to_string(), rel_path.clone()))
.collect();
if !video_files.is_empty() {
// Query DB using relative paths (consistent with how GET/POST handlers store them)
let video_rel_paths: Vec<String> = video_files.iter().map(|(_, rel)| rel.clone()).collect();
let existing_previews: HashMap<String, String> = {
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
match dao.get_previews_batch(&context, &video_rel_paths) {
Ok(clips) => clips
.into_iter()
.map(|clip| (clip.file_path, clip.status))
.collect(),
Err(e) => {
error!("Error batch querying preview clips: {:?}", e);
HashMap::new()
}
}
};
for (full_path, relative_path) in &video_files {
let status = existing_previews.get(relative_path).map(|s| s.as_str());
let needs_preview = match status {
None => true, // No record at all
Some("failed") => true, // Retry failed
Some("pending") => true, // Stale pending from previous run
_ => false, // processing or complete
};
if needs_preview {
// Insert pending record using relative path
if status.is_none() {
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.insert_preview(&context, relative_path, "pending");
}
// Send full path in the message — the actor will derive relative path from it
preview_generator.do_send(GeneratePreviewClipMessage {
video_path: full_path.clone(),
});
}
}
}
// Generate thumbnails for all files that need them
if new_files_found {
info!("Processing thumbnails for new files...");
thumbnails::create_thumbnails(std::slice::from_ref(library), excluded_dirs);
}
// Reconciliation: on a full scan, prune image_exif rows whose rel_path no
// longer exists on disk for this library. Keeps the DB in parity so
// downstream DB-backed listings (e.g. recursive /photos) don't return
// phantom files. Skipped on quick scans — those only look at recently
// modified files and can't distinguish "missing" from "unchanged".
if modified_since.is_none() {
let disk_paths: HashSet<String> = files.iter().map(|(_, rel)| rel.clone()).collect();
let db_paths: Vec<String> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_rel_paths_for_library(&context, library.id)
.unwrap_or_else(|e| {
error!(
"Reconciliation: failed to load image_exif rel_paths for lib {}: {:?}",
library.id, e
);
Vec::new()
})
};
let stale: Vec<String> = db_paths
.into_iter()
.filter(|p| !disk_paths.contains(p))
.collect();
if !stale.is_empty() {
info!(
"Reconciliation: pruning {} stale image_exif rows for library '{}'",
stale.len(),
library.name
);
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
for rel in &stale {
if let Err(e) = dao.delete_exif_by_library(&context, library.id, rel) {
warn!(
"Reconciliation: failed to delete {} (lib {}): {:?}",
rel, library.id, e
);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::thread::sleep;
use std::time::Duration as StdDuration;
use tempfile::TempDir;
#[test]
fn playlist_needs_generation_true_when_playlist_missing() {
let tmp = TempDir::new().unwrap();
let video = tmp.path().join("clip.mp4");
fs::write(&video, b"v").unwrap();
let playlist = tmp.path().join("clip.mp4.m3u8");
// playlist does not exist
assert!(playlist_needs_generation(&video, &playlist));
}
#[test]
fn playlist_needs_generation_false_when_playlist_is_newer() {
let tmp = TempDir::new().unwrap();
let video = tmp.path().join("clip.mp4");
fs::write(&video, b"v").unwrap();
// Sleep to guarantee a distinct mtime for the playlist created next.
// Many filesystems have ~10 ms mtime resolution; 50 ms is plenty.
sleep(StdDuration::from_millis(50));
let playlist = tmp.path().join("clip.mp4.m3u8");
fs::write(&playlist, b"#EXTM3U").unwrap();
assert!(!playlist_needs_generation(&video, &playlist));
}
#[test]
fn playlist_needs_generation_true_when_video_is_newer() {
let tmp = TempDir::new().unwrap();
let playlist = tmp.path().join("clip.mp4.m3u8");
fs::write(&playlist, b"#EXTM3U").unwrap();
sleep(StdDuration::from_millis(50));
let video = tmp.path().join("clip.mp4");
fs::write(&video, b"v").unwrap();
assert!(playlist_needs_generation(&video, &playlist));
}
#[test]
fn is_hash_shard_accepts_only_two_hex_chars() {
assert!(is_hash_shard("ab"));
assert!(is_hash_shard("00"));
assert!(is_hash_shard("FF")); // ASCII hexdigit covers upper-case too
assert!(!is_hash_shard("a"));
assert!(!is_hash_shard("abc"));
assert!(!is_hash_shard("zz"));
assert!(!is_hash_shard(""));
assert!(!is_hash_shard("a/"));
}
#[test]
fn is_full_hash_accepts_only_64_hex_chars() {
let h64 = "a".repeat(64);
assert!(is_full_hash(&h64));
let mixed = format!("ab{}", "0".repeat(62));
assert!(is_full_hash(&mixed));
assert!(!is_full_hash(&"a".repeat(63)));
assert!(!is_full_hash(&"a".repeat(65)));
assert!(!is_full_hash(&format!("z{}", "a".repeat(63))));
// Defends against operator stashing e.g. ".tmp" or "Plex" under
// VIDEO_PATH — neither passes the full-hash gate.
assert!(!is_full_hash(".tmp"));
assert!(!is_full_hash("Plex"));
}
#[test]
fn playlist_needs_generation_true_when_video_missing_metadata() {
// Video doesn't exist; metadata fails for it. Falls through to the
// "assume needs regeneration" branch.
let tmp = TempDir::new().unwrap();
let video = tmp.path().join("missing.mp4");
let playlist = tmp.path().join("missing.mp4.m3u8");
fs::write(&playlist, b"#EXTM3U").unwrap();
assert!(playlist_needs_generation(&video, &playlist));
}
}