//! Face-detection pass for the file watcher. //! //! `process_new_files` calls [`run_face_detection_pass`] after the EXIF //! registration loop. We walk the candidates (images, not yet face-scanned, //! not excluded by EXCLUDED_DIRS), fan out parallel detect calls to Apollo, //! and persist the results — detected faces, `no_faces` markers when Apollo //! found nothing, `failed` markers on permanent decode errors, no marker on //! transient failures so the next scan retries. //! //! The watcher runs in a plain `std::thread`, so we build a short-lived //! tokio runtime per pass and `block_on` a join of K detect futures. K is //! configurable via `FACE_DETECT_CONCURRENCY` (default 8). Apollo's //! threadpool is bounded to 1–2 workers anyway, so the runs queue //! server-side; the client-side fan-out is purely about overlapping IO //! (file read + JSON encode) with someone else's inference. use crate::ai::face_client::{DetectMeta, FaceClient, FaceDetectError}; use crate::exif; use crate::faces::{self, FaceDao, InsertFaceDetectionInput}; use crate::file_types; use crate::libraries::Library; use crate::memories::PathExcluder; use crate::tags::TagDao; use log::{debug, info, warn}; use std::path::Path; use std::sync::{Arc, Mutex}; use tokio::sync::Semaphore; /// One file the watcher would like to face-scan. Built by the caller from /// the EXIF batch (we need `content_hash` to key everything against). #[derive(Debug, Clone)] pub struct FaceCandidate { pub rel_path: String, pub content_hash: String, } /// Synchronous entry point. Returns once every candidate has been /// processed (or definitively skipped). When `face_client.is_enabled()` /// is false this is a no-op so the watcher can call unconditionally. pub fn run_face_detection_pass( library: &Library, excluded_dirs: &[String], face_client: &FaceClient, face_dao: Arc>>, tag_dao: Arc>>, candidates: Vec, ) { if !face_client.is_enabled() { return; } if candidates.is_empty() { return; } let base = Path::new(&library.root_path); let filtered = filter_excluded(base, excluded_dirs, candidates, Some(&library.name)); if filtered.is_empty() { return; } let concurrency: usize = std::env::var("FACE_DETECT_CONCURRENCY") .ok() .and_then(|s| s.parse().ok()) .filter(|n: &usize| *n > 0) .unwrap_or(8); info!( "face_watch: running detection on {} candidates (library '{}', concurrency {})", filtered.len(), library.name, concurrency ); // Per-pass tokio runtime. The watcher thread isn't in any pre-existing // async context — building one here keeps the rest of the watcher // sync-only. Worker count is small; the parallelism we care about is // task-level (semaphore) not thread-level. let rt = match tokio::runtime::Builder::new_multi_thread() .worker_threads(2) .enable_all() .build() { Ok(rt) => rt, Err(e) => { warn!("face_watch: failed to build tokio runtime: {e}"); return; } }; let library_id = library.id; let library_root = library.root_path.clone(); rt.block_on(async move { let sem = Arc::new(Semaphore::new(concurrency)); let mut handles = Vec::with_capacity(filtered.len()); for cand in filtered { let permit_sem = sem.clone(); let face_client = face_client.clone(); let face_dao = face_dao.clone(); let tag_dao = tag_dao.clone(); let library_root = library_root.clone(); handles.push(tokio::spawn(async move { // acquire_owned would let us drop the permit explicitly // before await points; for a one-shot call into Apollo // the simpler bounded acquire is enough. let _permit = permit_sem.acquire().await.expect("face semaphore"); process_one( library_id, &library_root, cand, &face_client, face_dao, tag_dao, ) .await; })); } for h in handles { // join; per-task panics are logged inside process_one before // they reach here, so we don't propagate. let _ = h.await; } }); } async fn process_one( library_id: i32, library_root: &str, cand: FaceCandidate, face_client: &FaceClient, face_dao: Arc>>, tag_dao: Arc>>, ) { let abs = Path::new(library_root).join(&cand.rel_path); // Read the bytes off disk in a blocking-friendly task. Filesystem IO // is sync but cheap; a small spawn_blocking would be overkill. let bytes = match read_image_bytes_for_detect(&abs) { Ok(b) => b, Err(e) => { // Don't mark — file may have been moved/renamed mid-scan; let // the next pass try again. Future-bug check: a permanently // unreadable file would loop forever; we accept that for v1 // because process_new_files already prunes vanished rows on // full scans. warn!( "face_watch: read failed for {} ({}): {}", cand.rel_path, library_id, e ); return; } }; let meta = DetectMeta { content_hash: cand.content_hash.clone(), library_id, rel_path: cand.rel_path.clone(), orientation: None, model_version: None, }; let ctx = opentelemetry::Context::current(); match face_client.detect(bytes, meta).await { Ok(resp) => { // Stage 1: persist detections, holding the dao lock only // across synchronous DB writes. let mut stored_for_autobind: Vec<(i32, Vec)> = Vec::new(); { let mut dao = face_dao.lock().expect("face dao"); if resp.faces.is_empty() { if let Err(e) = dao.mark_status( &ctx, library_id, &cand.content_hash, &cand.rel_path, "no_faces", &resp.model_version, ) { warn!( "face_watch: mark no_faces failed for {}: {:?}", cand.rel_path, e ); } debug!( "face_watch: {} → no faces (model {})", cand.rel_path, resp.model_version ); } else { let face_count = resp.faces.len(); for face in &resp.faces { let emb = match face.decode_embedding() { Ok(b) => b, Err(e) => { warn!("face_watch: bad embedding for {}: {:?}", cand.rel_path, e); continue; } }; // Decode the f32 vector once for auto-bind comparison. let emb_floats = faces::decode_embedding_bytes(&emb); match dao.store_detection( &ctx, InsertFaceDetectionInput { library_id, content_hash: cand.content_hash.clone(), rel_path: cand.rel_path.clone(), bbox: Some((face.bbox.x, face.bbox.y, face.bbox.w, face.bbox.h)), embedding: Some(emb), confidence: Some(face.confidence), source: "auto".to_string(), person_id: None, status: "detected".to_string(), model_version: resp.model_version.clone(), }, ) { Ok(row) => { if let Some(floats) = emb_floats { stored_for_autobind.push((row.id, floats)); } } Err(e) => warn!( "face_watch: store_detection failed for {}: {:?}", cand.rel_path, e ), } } info!( "face_watch: {} → {} face(s) ({}ms, {})", cand.rel_path, face_count, resp.duration_ms, resp.model_version ); } } // Stage 2: auto-bind newly-stored faces against same-named // people-tags. Done outside the dao lock so the lookups don't // serialize with concurrent detect tasks. if !stored_for_autobind.is_empty() { try_auto_bind( &ctx, &cand.rel_path, &resp.model_version, stored_for_autobind, &tag_dao, &face_dao, ); } } Err(FaceDetectError::Permanent(e)) => { warn!( "face_watch: permanent failure on {}: {} — marking failed", cand.rel_path, e ); let mut dao = face_dao.lock().expect("face dao"); // model_version is best-effort here — the engine that rejected // the bytes may not have echoed one. Empty string is fine; this // row is purely a "don't retry" sentinel. if let Err(e) = dao.mark_status( &ctx, library_id, &cand.content_hash, &cand.rel_path, "failed", "", ) { warn!( "face_watch: mark failed errored for {}: {:?}", cand.rel_path, e ); } } Err(FaceDetectError::Transient(e)) => { // Don't mark anything; next scan tick retries naturally. // Demoted to debug because OOM and engine-not-ready are noisy // and self-resolving. debug!( "face_watch: transient on {}: {} (will retry next pass)", cand.rel_path, e ); } Err(FaceDetectError::Disabled) => { // Caller already checked is_enabled(); this branch is defensive. } } } /// Auto-bind newly-detected faces to a same-named person, when a tag on the /// photo unambiguously identifies one. Driven by `FACE_AUTOBIND_MIN_COS` /// (default 0.4): the new face's embedding must reach this cosine /// similarity against the L2-normalized mean of the person's existing /// faces. The first face for a person binds unconditionally — there's /// nothing to compare against, and the alternative ("never bind without /// a reference") would mean bootstrap never kicks off. /// /// Multi-match (the photo carries tags for two different known persons) /// is intentionally a no-op — we can't tell which face is which without /// additional matching. Those faces stay unassigned for the cluster /// suggester (Phase 6) to handle. fn try_auto_bind( ctx: &opentelemetry::Context, rel_path: &str, model_version: &str, new_faces: Vec<(i32, Vec)>, // (face_id, decoded embedding) tag_dao: &Arc>>, face_dao: &Arc>>, ) { // 1. Pull the photo's tags. let tag_names: Vec = { let mut td = tag_dao.lock().expect("tag dao"); match td.get_tags_for_path(ctx, rel_path) { Ok(tags) => tags.into_iter().map(|t| t.name).collect(), Err(e) => { warn!( "face_watch: get_tags_for_path failed for {}: {:?}", rel_path, e ); return; } } }; if tag_names.is_empty() { return; } // 2. Find tags that map to existing persons (case-insensitive). let person_for_tag: std::collections::HashMap = { let mut fd = face_dao.lock().expect("face dao"); match fd.find_persons_by_names_ci(ctx, &tag_names) { Ok(m) => m, Err(e) => { warn!( "face_watch: find_persons_by_names_ci failed for {}: {:?}", rel_path, e ); return; } } }; // 3. Multi-match: ambiguous, skip. Single match: candidate person. let unique_person_ids: std::collections::HashSet = person_for_tag.values().copied().collect(); if unique_person_ids.len() != 1 { if !unique_person_ids.is_empty() { debug!( "face_watch: {} carries tags for {} different persons; skipping auto-bind", rel_path, unique_person_ids.len() ); } return; } let person_id = *unique_person_ids.iter().next().expect("nonempty set"); let threshold: f32 = std::env::var("FACE_AUTOBIND_MIN_COS") .ok() .and_then(|s| s.parse().ok()) .filter(|t: &f32| *t >= 0.0 && *t <= 1.0) .unwrap_or(0.4); // 4. Reference embedding (if any) under the same model_version. let reference: Option> = { let mut fd = face_dao.lock().expect("face dao"); match fd.person_reference_embedding(ctx, person_id, model_version) { Ok(r) => r, Err(e) => { warn!( "face_watch: person_reference_embedding failed for person {}: {:?}", person_id, e ); return; } } }; // 5. Bind each new face that meets the criterion. Hold the lock once // for the whole batch; assign_face_to_person uses its own short // transaction internally. let mut fd = face_dao.lock().expect("face dao"); for (face_id, emb) in new_faces { let bind = match &reference { None => { // Person has no faces yet — first one wins so bootstrap // can ever produce a usable reference. After this row // commits, future faces evaluate against it. debug!( "face_watch: auto-binding first face {} → person {} (no reference yet)", face_id, person_id ); true } Some(ref_vec) => { let sim = faces::cosine_similarity(&emb, ref_vec); if sim >= threshold { debug!( "face_watch: auto-binding face {} → person {} (cos={:.3} ≥ {:.3})", face_id, person_id, sim, threshold ); true } else { debug!( "face_watch: leaving face {} unassigned (cos={:.3} < {:.3} for person {})", face_id, sim, threshold, person_id ); false } } }; if bind && let Err(e) = fd.assign_face_to_person(ctx, face_id, person_id) { warn!( "face_watch: assign_face_to_person failed (face={}, person={}): {:?}", face_id, person_id, e ); } } } /// Drop candidates whose path matches the watcher's `EXCLUDED_DIRS` rules. /// Pulled out for unit testing — the same `PathExcluder` /memories uses, /// just applied at the face-detect candidate set instead of the memories /// listing. Skip @eaDir / .thumbnails / user-defined paths before we burn /// a detect call (and Apollo's GPU memory) on junk. pub(crate) fn filter_excluded( base: &Path, excluded_dirs: &[String], candidates: Vec, library_name: Option<&str>, ) -> Vec { if excluded_dirs.is_empty() { return candidates; } let excluder = PathExcluder::new(base, excluded_dirs); candidates .into_iter() .filter(|c| { let abs = base.join(&c.rel_path); if excluder.is_excluded(&abs) { debug!( "face_watch: skipping excluded path {} (library {})", c.rel_path, library_name.unwrap_or("") ); return false; } true }) .collect() } /// Read image bytes for face detection. Insightface (via opencv) can't /// decode RAW or HEIC — for those we extract the embedded JPEG preview /// the way the thumbnail pipeline does. Plain JPEG/PNG/WebP/etc. go /// through a direct read. pub(crate) fn read_image_bytes_for_detect(path: &Path) -> std::io::Result> { if file_types::needs_ffmpeg_thumbnail(path) && let Some(preview) = exif::extract_embedded_jpeg_preview(path) { return Ok(preview); } // Plain read for everything else. RAW/HEIC files without an embedded // preview fall through here too; Apollo will then 422 and the caller // marks the row failed. That's fine; we tried. std::fs::read(path) } #[cfg(test)] mod tests { use super::*; use std::fs; fn cand(rel_path: &str) -> FaceCandidate { FaceCandidate { rel_path: rel_path.to_string(), content_hash: format!("hash-{rel_path}"), } } #[test] fn filter_excluded_pattern_drops_dir_components() { // A pattern matches a path *component* under base, not a substring. // Phase 3 needs this for @eaDir / .thumbnails skipping. let tmp = tempfile::tempdir().unwrap(); let base = tmp.path(); let candidates = vec![ cand("photos/a.jpg"), // keep cand("photos/@eaDir/SYNOPHOTO_THUMB"), // drop (component match) cand("photos/eaDir-not-a-thing.jpg"), // keep (substring, not component) ]; let kept = filter_excluded(base, &["@eaDir".to_string()], candidates, Some("test")); let kept_paths: Vec<_> = kept.iter().map(|c| c.rel_path.as_str()).collect(); assert_eq!( kept_paths, vec!["photos/a.jpg", "photos/eaDir-not-a-thing.jpg"] ); } #[test] fn filter_excluded_absolute_dir_drops_subtree() { // Absolute (under-base) entries drop the whole subtree. let tmp = tempfile::tempdir().unwrap(); let base = tmp.path(); let candidates = vec![ cand("public/a.jpg"), cand("private/a.jpg"), cand("private/sub/b.jpg"), ]; let kept = filter_excluded(base, &["/private".to_string()], candidates, None); let kept_paths: Vec<_> = kept.iter().map(|c| c.rel_path.as_str()).collect(); assert_eq!(kept_paths, vec!["public/a.jpg"]); } #[test] fn filter_excluded_empty_rules_passes_all() { // Skip the PathExcluder build entirely on the common path where // EXCLUDED_DIRS is unset — saves an allocation per pass. let tmp = tempfile::tempdir().unwrap(); let base = tmp.path(); let candidates = vec![cand("a.jpg"), cand("b.jpg")]; let kept = filter_excluded(base, &[], candidates, None); assert_eq!(kept.len(), 2); } #[test] fn read_bytes_passes_through_for_jpeg() { // JPEG goes through plain read — we DON'T want to lose orientation // metadata or re-encode here; insightface's exif_transpose handles // orientation on its end. let tmp = tempfile::tempdir().unwrap(); let path = tmp.path().join("test.jpg"); let mut buf = Vec::new(); // Tiny 4x4 grey JPEG — encoded by image crate so we know it round-trips. let img = image::DynamicImage::ImageRgb8(image::RgbImage::from_pixel( 4, 4, image::Rgb([128, 128, 128]), )); img.write_to( &mut std::io::Cursor::new(&mut buf), image::ImageFormat::Jpeg, ) .unwrap(); fs::write(&path, &buf).unwrap(); let read = read_image_bytes_for_detect(&path).expect("read jpeg"); assert_eq!(read, buf, "JPEG bytes must pass through verbatim"); } #[test] fn read_bytes_falls_back_when_raw_has_no_preview() { // A `.nef` file with non-RAW bytes won't have an embedded preview — // the helper falls through to plain read rather than refusing. This // matches the docstring contract; Apollo will then 422 and we'll // mark the row as failed. let tmp = tempfile::tempdir().unwrap(); let path = tmp.path().join("not_really.nef"); fs::write(&path, b"definitely-not-a-raw-file").unwrap(); let read = read_image_bytes_for_detect(&path).expect("fallback read"); assert_eq!(read, b"definitely-not-a-raw-file"); } }