diff --git a/src/face_watch.rs b/src/face_watch.rs new file mode 100644 index 0000000..87d0103 --- /dev/null +++ b/src/face_watch.rs @@ -0,0 +1,389 @@ +//! Face-detection pass for the file watcher. +//! +//! `process_new_files` calls [`run_face_detection_pass`] after the EXIF +//! registration loop. We walk the candidates (images, not yet face-scanned, +//! not excluded by EXCLUDED_DIRS), fan out parallel detect calls to Apollo, +//! and persist the results — detected faces, `no_faces` markers when Apollo +//! found nothing, `failed` markers on permanent decode errors, no marker on +//! transient failures so the next scan retries. +//! +//! The watcher runs in a plain `std::thread`, so we build a short-lived +//! tokio runtime per pass and `block_on` a join of K detect futures. K is +//! configurable via `FACE_DETECT_CONCURRENCY` (default 8). Apollo's +//! threadpool is bounded to 1–2 workers anyway, so the runs queue +//! server-side; the client-side fan-out is purely about overlapping IO +//! (file read + JSON encode) with someone else's inference. + +use crate::ai::face_client::{DetectMeta, FaceClient, FaceDetectError}; +use crate::exif; +use crate::faces::{FaceDao, InsertFaceDetectionInput}; +use crate::file_types; +use crate::libraries::Library; +use crate::memories::PathExcluder; +use log::{debug, info, warn}; +use std::path::Path; +use std::sync::{Arc, Mutex}; +use tokio::sync::Semaphore; + +/// One file the watcher would like to face-scan. Built by the caller from +/// the EXIF batch (we need `content_hash` to key everything against). +#[derive(Debug, Clone)] +pub struct FaceCandidate { + pub rel_path: String, + pub content_hash: String, +} + +/// Synchronous entry point. Returns once every candidate has been +/// processed (or definitively skipped). When `face_client.is_enabled()` +/// is false this is a no-op so the watcher can call unconditionally. +pub fn run_face_detection_pass( + library: &Library, + excluded_dirs: &[String], + face_client: &FaceClient, + face_dao: Arc>>, + candidates: Vec, +) { + if !face_client.is_enabled() { + return; + } + if candidates.is_empty() { + return; + } + + let base = Path::new(&library.root_path); + let filtered = filter_excluded(base, excluded_dirs, candidates, Some(&library.name)); + if filtered.is_empty() { + return; + } + + let concurrency: usize = std::env::var("FACE_DETECT_CONCURRENCY") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &usize| *n > 0) + .unwrap_or(8); + + info!( + "face_watch: running detection on {} candidates (library '{}', concurrency {})", + filtered.len(), + library.name, + concurrency + ); + + // Per-pass tokio runtime. The watcher thread isn't in any pre-existing + // async context — building one here keeps the rest of the watcher + // sync-only. Worker count is small; the parallelism we care about is + // task-level (semaphore) not thread-level. + let rt = match tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + warn!("face_watch: failed to build tokio runtime: {e}"); + return; + } + }; + + let library_id = library.id; + let library_root = library.root_path.clone(); + rt.block_on(async move { + let sem = Arc::new(Semaphore::new(concurrency)); + let mut handles = Vec::with_capacity(filtered.len()); + for cand in filtered { + let permit_sem = sem.clone(); + let face_client = face_client.clone(); + let face_dao = face_dao.clone(); + let library_root = library_root.clone(); + handles.push(tokio::spawn(async move { + // acquire_owned would let us drop the permit explicitly + // before await points; for a one-shot call into Apollo + // the simpler bounded acquire is enough. + let _permit = permit_sem.acquire().await.expect("face semaphore"); + process_one(library_id, &library_root, cand, &face_client, face_dao).await; + })); + } + for h in handles { + // join; per-task panics are logged inside process_one before + // they reach here, so we don't propagate. + let _ = h.await; + } + }); +} + +async fn process_one( + library_id: i32, + library_root: &str, + cand: FaceCandidate, + face_client: &FaceClient, + face_dao: Arc>>, +) { + let abs = Path::new(library_root).join(&cand.rel_path); + // Read the bytes off disk in a blocking-friendly task. Filesystem IO + // is sync but cheap; a small spawn_blocking would be overkill. + let bytes = match read_image_bytes_for_detect(&abs) { + Ok(b) => b, + Err(e) => { + // Don't mark — file may have been moved/renamed mid-scan; let + // the next pass try again. Future-bug check: a permanently + // unreadable file would loop forever; we accept that for v1 + // because process_new_files already prunes vanished rows on + // full scans. + warn!( + "face_watch: read failed for {} ({}): {}", + cand.rel_path, library_id, e + ); + return; + } + }; + + let meta = DetectMeta { + content_hash: cand.content_hash.clone(), + library_id, + rel_path: cand.rel_path.clone(), + orientation: None, + model_version: None, + }; + let ctx = opentelemetry::Context::current(); + + match face_client.detect(bytes, meta).await { + Ok(resp) => { + // Hold the dao lock only across the synchronous DB writes. + let mut dao = face_dao.lock().expect("face dao"); + if resp.faces.is_empty() { + if let Err(e) = dao.mark_status( + &ctx, + library_id, + &cand.content_hash, + &cand.rel_path, + "no_faces", + &resp.model_version, + ) { + warn!( + "face_watch: mark no_faces failed for {}: {:?}", + cand.rel_path, e + ); + } + debug!( + "face_watch: {} → no faces (model {})", + cand.rel_path, resp.model_version + ); + } else { + let face_count = resp.faces.len(); + for face in &resp.faces { + let emb = match face.decode_embedding() { + Ok(b) => b, + Err(e) => { + warn!("face_watch: bad embedding for {}: {:?}", cand.rel_path, e); + continue; + } + }; + if let Err(e) = dao.store_detection( + &ctx, + InsertFaceDetectionInput { + library_id, + content_hash: cand.content_hash.clone(), + rel_path: cand.rel_path.clone(), + bbox: Some((face.bbox.x, face.bbox.y, face.bbox.w, face.bbox.h)), + embedding: Some(emb), + confidence: Some(face.confidence), + source: "auto".to_string(), + person_id: None, + status: "detected".to_string(), + model_version: resp.model_version.clone(), + }, + ) { + warn!( + "face_watch: store_detection failed for {}: {:?}", + cand.rel_path, e + ); + } + } + info!( + "face_watch: {} → {} face(s) ({}ms, {})", + cand.rel_path, face_count, resp.duration_ms, resp.model_version + ); + } + } + Err(FaceDetectError::Permanent(e)) => { + warn!( + "face_watch: permanent failure on {}: {} — marking failed", + cand.rel_path, e + ); + let mut dao = face_dao.lock().expect("face dao"); + // model_version is best-effort here — the engine that rejected + // the bytes may not have echoed one. Empty string is fine; this + // row is purely a "don't retry" sentinel. + if let Err(e) = dao.mark_status( + &ctx, + library_id, + &cand.content_hash, + &cand.rel_path, + "failed", + "", + ) { + warn!( + "face_watch: mark failed errored for {}: {:?}", + cand.rel_path, e + ); + } + } + Err(FaceDetectError::Transient(e)) => { + // Don't mark anything; next scan tick retries naturally. + // Demoted to debug because OOM and engine-not-ready are noisy + // and self-resolving. + debug!( + "face_watch: transient on {}: {} (will retry next pass)", + cand.rel_path, e + ); + } + Err(FaceDetectError::Disabled) => { + // Caller already checked is_enabled(); this branch is defensive. + } + } +} + +/// Drop candidates whose path matches the watcher's `EXCLUDED_DIRS` rules. +/// Pulled out for unit testing — the same `PathExcluder` /memories uses, +/// just applied at the face-detect candidate set instead of the memories +/// listing. Skip @eaDir / .thumbnails / user-defined paths before we burn +/// a detect call (and Apollo's GPU memory) on junk. +pub(crate) fn filter_excluded( + base: &Path, + excluded_dirs: &[String], + candidates: Vec, + library_name: Option<&str>, +) -> Vec { + if excluded_dirs.is_empty() { + return candidates; + } + let excluder = PathExcluder::new(base, excluded_dirs); + candidates + .into_iter() + .filter(|c| { + let abs = base.join(&c.rel_path); + if excluder.is_excluded(&abs) { + debug!( + "face_watch: skipping excluded path {} (library {})", + c.rel_path, + library_name.unwrap_or("") + ); + return false; + } + true + }) + .collect() +} + +/// Read image bytes for face detection. Insightface (via opencv) can't +/// decode RAW or HEIC — for those we extract the embedded JPEG preview +/// the way the thumbnail pipeline does. Plain JPEG/PNG/WebP/etc. go +/// through a direct read. +pub(crate) fn read_image_bytes_for_detect(path: &Path) -> std::io::Result> { + if file_types::needs_ffmpeg_thumbnail(path) + && let Some(preview) = exif::extract_embedded_jpeg_preview(path) + { + return Ok(preview); + } + // Plain read for everything else. RAW/HEIC files without an embedded + // preview fall through here too; Apollo will then 422 and the caller + // marks the row failed. That's fine; we tried. + std::fs::read(path) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + + fn cand(rel_path: &str) -> FaceCandidate { + FaceCandidate { + rel_path: rel_path.to_string(), + content_hash: format!("hash-{rel_path}"), + } + } + + #[test] + fn filter_excluded_pattern_drops_dir_components() { + // A pattern matches a path *component* under base, not a substring. + // Phase 3 needs this for @eaDir / .thumbnails skipping. + let tmp = tempfile::tempdir().unwrap(); + let base = tmp.path(); + let candidates = vec![ + cand("photos/a.jpg"), // keep + cand("photos/@eaDir/SYNOPHOTO_THUMB"), // drop (component match) + cand("photos/eaDir-not-a-thing.jpg"), // keep (substring, not component) + ]; + let kept = filter_excluded(base, &["@eaDir".to_string()], candidates, Some("test")); + let kept_paths: Vec<_> = kept.iter().map(|c| c.rel_path.as_str()).collect(); + assert_eq!( + kept_paths, + vec!["photos/a.jpg", "photos/eaDir-not-a-thing.jpg"] + ); + } + + #[test] + fn filter_excluded_absolute_dir_drops_subtree() { + // Absolute (under-base) entries drop the whole subtree. + let tmp = tempfile::tempdir().unwrap(); + let base = tmp.path(); + let candidates = vec![ + cand("public/a.jpg"), + cand("private/a.jpg"), + cand("private/sub/b.jpg"), + ]; + let kept = filter_excluded(base, &["/private".to_string()], candidates, None); + let kept_paths: Vec<_> = kept.iter().map(|c| c.rel_path.as_str()).collect(); + assert_eq!(kept_paths, vec!["public/a.jpg"]); + } + + #[test] + fn filter_excluded_empty_rules_passes_all() { + // Skip the PathExcluder build entirely on the common path where + // EXCLUDED_DIRS is unset — saves an allocation per pass. + let tmp = tempfile::tempdir().unwrap(); + let base = tmp.path(); + let candidates = vec![cand("a.jpg"), cand("b.jpg")]; + let kept = filter_excluded(base, &[], candidates, None); + assert_eq!(kept.len(), 2); + } + + #[test] + fn read_bytes_passes_through_for_jpeg() { + // JPEG goes through plain read — we DON'T want to lose orientation + // metadata or re-encode here; insightface's exif_transpose handles + // orientation on its end. + let tmp = tempfile::tempdir().unwrap(); + let path = tmp.path().join("test.jpg"); + let mut buf = Vec::new(); + // Tiny 4x4 grey JPEG — encoded by image crate so we know it round-trips. + let img = image::DynamicImage::ImageRgb8(image::RgbImage::from_pixel( + 4, + 4, + image::Rgb([128, 128, 128]), + )); + img.write_to( + &mut std::io::Cursor::new(&mut buf), + image::ImageFormat::Jpeg, + ) + .unwrap(); + fs::write(&path, &buf).unwrap(); + + let read = read_image_bytes_for_detect(&path).expect("read jpeg"); + assert_eq!(read, buf, "JPEG bytes must pass through verbatim"); + } + + #[test] + fn read_bytes_falls_back_when_raw_has_no_preview() { + // A `.nef` file with non-RAW bytes won't have an embedded preview — + // the helper falls through to plain read rather than refusing. This + // matches the docstring contract; Apollo will then 422 and we'll + // mark the row as failed. + let tmp = tempfile::tempdir().unwrap(); + let path = tmp.path().join("not_really.nef"); + fs::write(&path, b"definitely-not-a-raw-file").unwrap(); + + let read = read_image_bytes_for_detect(&path).expect("fallback read"); + assert_eq!(read, b"definitely-not-a-raw-file"); + } +} diff --git a/src/lib.rs b/src/lib.rs index ad1c595..12de818 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ pub mod data; pub mod database; pub mod error; pub mod exif; +pub mod face_watch; pub mod faces; pub mod file_types; pub mod files; diff --git a/src/main.rs b/src/main.rs index 8cee679..4e628d0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,6 +66,7 @@ mod data; mod database; mod error; mod exif; +mod face_watch; mod faces; mod file_types; mod files; @@ -1460,6 +1461,8 @@ fn main() -> std::io::Result<()> { app_state.libraries.clone(), playlist_mgr_for_watcher, preview_gen_for_watcher, + app_state.face_client.clone(), + app_state.excluded_dirs.clone(), ); // Start orphaned playlist cleanup job @@ -1787,6 +1790,8 @@ fn watch_files( libs: Vec, playlist_manager: Addr, preview_generator: Addr, + face_client: crate::ai::face_client::FaceClient, + excluded_dirs: Vec, ) { std::thread::spawn(move || { // Get polling intervals from environment variables @@ -1819,6 +1824,9 @@ fn watch_files( let preview_dao = Arc::new(Mutex::new( Box::new(SqlitePreviewDao::new()) as Box )); + let face_dao = Arc::new(Mutex::new( + Box::new(faces::SqliteFaceDao::new()) as Box + )); let mut last_quick_scan = SystemTime::now(); let mut last_full_scan = SystemTime::now(); @@ -1844,6 +1852,9 @@ fn watch_files( lib, Arc::clone(&exif_dao), Arc::clone(&preview_dao), + Arc::clone(&face_dao), + face_client.clone(), + &excluded_dirs, None, playlist_manager.clone(), preview_generator.clone(), @@ -1861,6 +1872,9 @@ fn watch_files( lib, Arc::clone(&exif_dao), Arc::clone(&preview_dao), + Arc::clone(&face_dao), + face_client.clone(), + &excluded_dirs, Some(check_since), playlist_manager.clone(), preview_generator.clone(), @@ -1907,6 +1921,9 @@ fn process_new_files( library: &libraries::Library, exif_dao: Arc>>, preview_dao: Arc>>, + face_dao: Arc>>, + face_client: crate::ai::face_client::FaceClient, + excluded_dirs: &[String], modified_since: Option, playlist_manager: Addr, preview_generator: Addr, @@ -2082,6 +2099,24 @@ fn process_new_files( } } + // ── Face detection pass ──────────────────────────────────────────── + // Run after EXIF writes so newly-registered files have their + // content_hash populated. Skipped wholesale when face_client is + // disabled (no Apollo integration configured) — Phase 3 wires this + // up; the watcher remains usable on legacy deploys. + if face_client.is_enabled() { + let candidates = build_face_candidates(&context, &files, &exif_dao, &face_dao); + if !candidates.is_empty() { + face_watch::run_face_detection_pass( + library, + excluded_dirs, + &face_client, + Arc::clone(&face_dao), + candidates, + ); + } + } + // Check for videos that need HLS playlists let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); let mut videos_needing_playlists = Vec::new(); @@ -2206,6 +2241,64 @@ fn process_new_files( } } +/// Build the face-detection candidate list for a scan tick. +/// +/// We need `(rel_path, content_hash)` for every image file that has a +/// content_hash recorded in image_exif but no row in face_detections yet. +/// Re-querying image_exif here picks up rows the EXIF write loop just +/// inserted alongside any pre-existing rows the watcher walked over — +/// covers both new uploads and the initial backlog scan. +fn build_face_candidates( + context: &opentelemetry::Context, + files: &[(PathBuf, String)], + exif_dao: &Arc>>, + face_dao: &Arc>>, +) -> Vec { + // Restrict to image files; videos aren't face-scanned in v1 (kamadak + // doesn't even register them in image_exif). + let image_paths: Vec = files + .iter() + .filter(|(p, _)| !is_video_file(p)) + .map(|(_, rel)| rel.clone()) + .collect(); + if image_paths.is_empty() { + return Vec::new(); + } + + let exif_records = { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + dao.get_exif_batch(context, &image_paths) + .unwrap_or_default() + }; + // rel_path → content_hash (only rows with a hash; without one we have + // nothing to key face data against). + let mut hash_by_path: HashMap = HashMap::with_capacity(exif_records.len()); + for record in exif_records { + if let Some(h) = record.content_hash { + hash_by_path.insert(record.file_path, h); + } + } + + let mut candidates = Vec::new(); + let mut dao = face_dao.lock().expect("face dao"); + for rel_path in image_paths { + let Some(hash) = hash_by_path.get(&rel_path) else { + continue; + }; + match dao.already_scanned(context, hash) { + Ok(true) => continue, + Ok(false) => candidates.push(face_watch::FaceCandidate { + rel_path, + content_hash: hash.clone(), + }), + Err(e) => { + warn!("face_watch: already_scanned errored for {}: {:?}", hash, e); + } + } + } + candidates +} + #[cfg(test)] mod tests { use super::*;