faces: phase 3 — file-watch hook drives auto detection

Wire face detection into ImageApi's existing scan loop so new uploads
pick up faces automatically and the initial backlog grinds through on
full-scan ticks. No new job system; Phase 2's already_scanned check
makes the work implicitly idempotent (one face_detections row per
content_hash, including no_faces / failed marker rows).

face_watch.rs (new):
  - run_face_detection_pass(library, excluded_dirs, face_client,
    face_dao, candidates) — sync entry point. Builds a per-pass tokio
    runtime and fans out detect calls bounded by FACE_DETECT_CONCURRENCY
    (default 8). The watcher thread itself stays sync.
  - filter_excluded — applies the same PathExcluder /memories uses, so
    @eaDir / .thumbnails / EXCLUDED_DIRS-listed paths skip detection
    before we burn a detect call (and Apollo's GPU memory) on junk.
  - read_image_bytes_for_detect — RAW/HEIC route through
    extract_embedded_jpeg_preview because opencv-python-headless can't
    decode either; everything else gets a plain std::fs::read so EXIF
    orientation reaches Apollo's exif_transpose intact.
  - process_one — translates Apollo's response into the Phase 2 marker
    contract: faces[] empty → no_faces; FaceDetectError::Permanent →
    failed (don't retry); Transient → no marker (next scan retries);
    success with N faces → N detected rows with the embeddings unpacked.

main.rs (process_new_files + watch_files):
  - watch_files now also takes face_client + excluded_dirs; the watcher
    thread builds a SqliteFaceDao the same way it builds ExifDao /
    PreviewDao.
  - After the EXIF write loop, build_face_candidates queries image_exif
    for the just-walked image paths' content_hashes (covers new uploads
    and pre-existing backlog), filters out anything already_scanned, and
    hands the rest to face_watch::run_face_detection_pass.
  - Bypassed wholesale when face_client.is_enabled() is false — keeps
    the watcher usable on legacy deploys where Apollo isn't configured.

Tests: 5 face_watch unit tests cover the parts that don't need a real
Apollo:
  - filter_excluded drops dir-component patterns (@eaDir) without
    matching substring file names (eaDir-not-a-thing.jpg keeps).
  - filter_excluded drops absolute-under-base subtrees (/private).
  - empty EXCLUDED_DIRS short-circuits cleanly.
  - read_image_bytes_for_detect passes JPEG bytes through verbatim
    (orientation must reach Apollo unmodified).
  - read_image_bytes_for_detect falls through to plain read when a
    RAW-extension file has no embedded preview, so Apollo gets a chance
    to 422 and we mark failed rather than infinitely-retrying.

cargo test --lib: 170 / 0; fmt and clippy clean for new code.
End-to-end (drop a photo → face_detections row appears) needs Apollo
running and is deferred to deploy-time verification.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Cameron Cordes
2026-04-29 18:21:19 +00:00
parent f77e44b34d
commit 4dee7b6f73
3 changed files with 483 additions and 0 deletions

389
src/face_watch.rs Normal file
View File

@@ -0,0 +1,389 @@
//! Face-detection pass for the file watcher.
//!
//! `process_new_files` calls [`run_face_detection_pass`] after the EXIF
//! registration loop. We walk the candidates (images, not yet face-scanned,
//! not excluded by EXCLUDED_DIRS), fan out parallel detect calls to Apollo,
//! and persist the results — detected faces, `no_faces` markers when Apollo
//! found nothing, `failed` markers on permanent decode errors, no marker on
//! transient failures so the next scan retries.
//!
//! The watcher runs in a plain `std::thread`, so we build a short-lived
//! tokio runtime per pass and `block_on` a join of K detect futures. K is
//! configurable via `FACE_DETECT_CONCURRENCY` (default 8). Apollo's
//! threadpool is bounded to 12 workers anyway, so the runs queue
//! server-side; the client-side fan-out is purely about overlapping IO
//! (file read + JSON encode) with someone else's inference.
use crate::ai::face_client::{DetectMeta, FaceClient, FaceDetectError};
use crate::exif;
use crate::faces::{FaceDao, InsertFaceDetectionInput};
use crate::file_types;
use crate::libraries::Library;
use crate::memories::PathExcluder;
use log::{debug, info, warn};
use std::path::Path;
use std::sync::{Arc, Mutex};
use tokio::sync::Semaphore;
/// One file the watcher would like to face-scan. Built by the caller from
/// the EXIF batch (we need `content_hash` to key everything against).
#[derive(Debug, Clone)]
pub struct FaceCandidate {
pub rel_path: String,
pub content_hash: String,
}
/// Synchronous entry point. Returns once every candidate has been
/// processed (or definitively skipped). When `face_client.is_enabled()`
/// is false this is a no-op so the watcher can call unconditionally.
pub fn run_face_detection_pass(
library: &Library,
excluded_dirs: &[String],
face_client: &FaceClient,
face_dao: Arc<Mutex<Box<dyn FaceDao>>>,
candidates: Vec<FaceCandidate>,
) {
if !face_client.is_enabled() {
return;
}
if candidates.is_empty() {
return;
}
let base = Path::new(&library.root_path);
let filtered = filter_excluded(base, excluded_dirs, candidates, Some(&library.name));
if filtered.is_empty() {
return;
}
let concurrency: usize = std::env::var("FACE_DETECT_CONCURRENCY")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &usize| *n > 0)
.unwrap_or(8);
info!(
"face_watch: running detection on {} candidates (library '{}', concurrency {})",
filtered.len(),
library.name,
concurrency
);
// Per-pass tokio runtime. The watcher thread isn't in any pre-existing
// async context — building one here keeps the rest of the watcher
// sync-only. Worker count is small; the parallelism we care about is
// task-level (semaphore) not thread-level.
let rt = match tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
warn!("face_watch: failed to build tokio runtime: {e}");
return;
}
};
let library_id = library.id;
let library_root = library.root_path.clone();
rt.block_on(async move {
let sem = Arc::new(Semaphore::new(concurrency));
let mut handles = Vec::with_capacity(filtered.len());
for cand in filtered {
let permit_sem = sem.clone();
let face_client = face_client.clone();
let face_dao = face_dao.clone();
let library_root = library_root.clone();
handles.push(tokio::spawn(async move {
// acquire_owned would let us drop the permit explicitly
// before await points; for a one-shot call into Apollo
// the simpler bounded acquire is enough.
let _permit = permit_sem.acquire().await.expect("face semaphore");
process_one(library_id, &library_root, cand, &face_client, face_dao).await;
}));
}
for h in handles {
// join; per-task panics are logged inside process_one before
// they reach here, so we don't propagate.
let _ = h.await;
}
});
}
async fn process_one(
library_id: i32,
library_root: &str,
cand: FaceCandidate,
face_client: &FaceClient,
face_dao: Arc<Mutex<Box<dyn FaceDao>>>,
) {
let abs = Path::new(library_root).join(&cand.rel_path);
// Read the bytes off disk in a blocking-friendly task. Filesystem IO
// is sync but cheap; a small spawn_blocking would be overkill.
let bytes = match read_image_bytes_for_detect(&abs) {
Ok(b) => b,
Err(e) => {
// Don't mark — file may have been moved/renamed mid-scan; let
// the next pass try again. Future-bug check: a permanently
// unreadable file would loop forever; we accept that for v1
// because process_new_files already prunes vanished rows on
// full scans.
warn!(
"face_watch: read failed for {} ({}): {}",
cand.rel_path, library_id, e
);
return;
}
};
let meta = DetectMeta {
content_hash: cand.content_hash.clone(),
library_id,
rel_path: cand.rel_path.clone(),
orientation: None,
model_version: None,
};
let ctx = opentelemetry::Context::current();
match face_client.detect(bytes, meta).await {
Ok(resp) => {
// Hold the dao lock only across the synchronous DB writes.
let mut dao = face_dao.lock().expect("face dao");
if resp.faces.is_empty() {
if let Err(e) = dao.mark_status(
&ctx,
library_id,
&cand.content_hash,
&cand.rel_path,
"no_faces",
&resp.model_version,
) {
warn!(
"face_watch: mark no_faces failed for {}: {:?}",
cand.rel_path, e
);
}
debug!(
"face_watch: {} → no faces (model {})",
cand.rel_path, resp.model_version
);
} else {
let face_count = resp.faces.len();
for face in &resp.faces {
let emb = match face.decode_embedding() {
Ok(b) => b,
Err(e) => {
warn!("face_watch: bad embedding for {}: {:?}", cand.rel_path, e);
continue;
}
};
if let Err(e) = dao.store_detection(
&ctx,
InsertFaceDetectionInput {
library_id,
content_hash: cand.content_hash.clone(),
rel_path: cand.rel_path.clone(),
bbox: Some((face.bbox.x, face.bbox.y, face.bbox.w, face.bbox.h)),
embedding: Some(emb),
confidence: Some(face.confidence),
source: "auto".to_string(),
person_id: None,
status: "detected".to_string(),
model_version: resp.model_version.clone(),
},
) {
warn!(
"face_watch: store_detection failed for {}: {:?}",
cand.rel_path, e
);
}
}
info!(
"face_watch: {} → {} face(s) ({}ms, {})",
cand.rel_path, face_count, resp.duration_ms, resp.model_version
);
}
}
Err(FaceDetectError::Permanent(e)) => {
warn!(
"face_watch: permanent failure on {}: {} — marking failed",
cand.rel_path, e
);
let mut dao = face_dao.lock().expect("face dao");
// model_version is best-effort here — the engine that rejected
// the bytes may not have echoed one. Empty string is fine; this
// row is purely a "don't retry" sentinel.
if let Err(e) = dao.mark_status(
&ctx,
library_id,
&cand.content_hash,
&cand.rel_path,
"failed",
"",
) {
warn!(
"face_watch: mark failed errored for {}: {:?}",
cand.rel_path, e
);
}
}
Err(FaceDetectError::Transient(e)) => {
// Don't mark anything; next scan tick retries naturally.
// Demoted to debug because OOM and engine-not-ready are noisy
// and self-resolving.
debug!(
"face_watch: transient on {}: {} (will retry next pass)",
cand.rel_path, e
);
}
Err(FaceDetectError::Disabled) => {
// Caller already checked is_enabled(); this branch is defensive.
}
}
}
/// Drop candidates whose path matches the watcher's `EXCLUDED_DIRS` rules.
/// Pulled out for unit testing — the same `PathExcluder` /memories uses,
/// just applied at the face-detect candidate set instead of the memories
/// listing. Skip @eaDir / .thumbnails / user-defined paths before we burn
/// a detect call (and Apollo's GPU memory) on junk.
pub(crate) fn filter_excluded(
base: &Path,
excluded_dirs: &[String],
candidates: Vec<FaceCandidate>,
library_name: Option<&str>,
) -> Vec<FaceCandidate> {
if excluded_dirs.is_empty() {
return candidates;
}
let excluder = PathExcluder::new(base, excluded_dirs);
candidates
.into_iter()
.filter(|c| {
let abs = base.join(&c.rel_path);
if excluder.is_excluded(&abs) {
debug!(
"face_watch: skipping excluded path {} (library {})",
c.rel_path,
library_name.unwrap_or("<unknown>")
);
return false;
}
true
})
.collect()
}
/// Read image bytes for face detection. Insightface (via opencv) can't
/// decode RAW or HEIC — for those we extract the embedded JPEG preview
/// the way the thumbnail pipeline does. Plain JPEG/PNG/WebP/etc. go
/// through a direct read.
pub(crate) fn read_image_bytes_for_detect(path: &Path) -> std::io::Result<Vec<u8>> {
if file_types::needs_ffmpeg_thumbnail(path)
&& let Some(preview) = exif::extract_embedded_jpeg_preview(path)
{
return Ok(preview);
}
// Plain read for everything else. RAW/HEIC files without an embedded
// preview fall through here too; Apollo will then 422 and the caller
// marks the row failed. That's fine; we tried.
std::fs::read(path)
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
fn cand(rel_path: &str) -> FaceCandidate {
FaceCandidate {
rel_path: rel_path.to_string(),
content_hash: format!("hash-{rel_path}"),
}
}
#[test]
fn filter_excluded_pattern_drops_dir_components() {
// A pattern matches a path *component* under base, not a substring.
// Phase 3 needs this for @eaDir / .thumbnails skipping.
let tmp = tempfile::tempdir().unwrap();
let base = tmp.path();
let candidates = vec![
cand("photos/a.jpg"), // keep
cand("photos/@eaDir/SYNOPHOTO_THUMB"), // drop (component match)
cand("photos/eaDir-not-a-thing.jpg"), // keep (substring, not component)
];
let kept = filter_excluded(base, &["@eaDir".to_string()], candidates, Some("test"));
let kept_paths: Vec<_> = kept.iter().map(|c| c.rel_path.as_str()).collect();
assert_eq!(
kept_paths,
vec!["photos/a.jpg", "photos/eaDir-not-a-thing.jpg"]
);
}
#[test]
fn filter_excluded_absolute_dir_drops_subtree() {
// Absolute (under-base) entries drop the whole subtree.
let tmp = tempfile::tempdir().unwrap();
let base = tmp.path();
let candidates = vec![
cand("public/a.jpg"),
cand("private/a.jpg"),
cand("private/sub/b.jpg"),
];
let kept = filter_excluded(base, &["/private".to_string()], candidates, None);
let kept_paths: Vec<_> = kept.iter().map(|c| c.rel_path.as_str()).collect();
assert_eq!(kept_paths, vec!["public/a.jpg"]);
}
#[test]
fn filter_excluded_empty_rules_passes_all() {
// Skip the PathExcluder build entirely on the common path where
// EXCLUDED_DIRS is unset — saves an allocation per pass.
let tmp = tempfile::tempdir().unwrap();
let base = tmp.path();
let candidates = vec![cand("a.jpg"), cand("b.jpg")];
let kept = filter_excluded(base, &[], candidates, None);
assert_eq!(kept.len(), 2);
}
#[test]
fn read_bytes_passes_through_for_jpeg() {
// JPEG goes through plain read — we DON'T want to lose orientation
// metadata or re-encode here; insightface's exif_transpose handles
// orientation on its end.
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("test.jpg");
let mut buf = Vec::new();
// Tiny 4x4 grey JPEG — encoded by image crate so we know it round-trips.
let img = image::DynamicImage::ImageRgb8(image::RgbImage::from_pixel(
4,
4,
image::Rgb([128, 128, 128]),
));
img.write_to(
&mut std::io::Cursor::new(&mut buf),
image::ImageFormat::Jpeg,
)
.unwrap();
fs::write(&path, &buf).unwrap();
let read = read_image_bytes_for_detect(&path).expect("read jpeg");
assert_eq!(read, buf, "JPEG bytes must pass through verbatim");
}
#[test]
fn read_bytes_falls_back_when_raw_has_no_preview() {
// A `.nef` file with non-RAW bytes won't have an embedded preview —
// the helper falls through to plain read rather than refusing. This
// matches the docstring contract; Apollo will then 422 and we'll
// mark the row as failed.
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("not_really.nef");
fs::write(&path, b"definitely-not-a-raw-file").unwrap();
let read = read_image_bytes_for_detect(&path).expect("fallback read");
assert_eq!(read, b"definitely-not-a-raw-file");
}
}

View File

@@ -12,6 +12,7 @@ pub mod data;
pub mod database;
pub mod error;
pub mod exif;
pub mod face_watch;
pub mod faces;
pub mod file_types;
pub mod files;

View File

@@ -66,6 +66,7 @@ mod data;
mod database;
mod error;
mod exif;
mod face_watch;
mod faces;
mod file_types;
mod files;
@@ -1460,6 +1461,8 @@ fn main() -> std::io::Result<()> {
app_state.libraries.clone(),
playlist_mgr_for_watcher,
preview_gen_for_watcher,
app_state.face_client.clone(),
app_state.excluded_dirs.clone(),
);
// Start orphaned playlist cleanup job
@@ -1787,6 +1790,8 @@ fn watch_files(
libs: Vec<libraries::Library>,
playlist_manager: Addr<VideoPlaylistManager>,
preview_generator: Addr<video::actors::PreviewClipGenerator>,
face_client: crate::ai::face_client::FaceClient,
excluded_dirs: Vec<String>,
) {
std::thread::spawn(move || {
// Get polling intervals from environment variables
@@ -1819,6 +1824,9 @@ fn watch_files(
let preview_dao = Arc::new(Mutex::new(
Box::new(SqlitePreviewDao::new()) as Box<dyn PreviewDao>
));
let face_dao = Arc::new(Mutex::new(
Box::new(faces::SqliteFaceDao::new()) as Box<dyn faces::FaceDao>
));
let mut last_quick_scan = SystemTime::now();
let mut last_full_scan = SystemTime::now();
@@ -1844,6 +1852,9 @@ fn watch_files(
lib,
Arc::clone(&exif_dao),
Arc::clone(&preview_dao),
Arc::clone(&face_dao),
face_client.clone(),
&excluded_dirs,
None,
playlist_manager.clone(),
preview_generator.clone(),
@@ -1861,6 +1872,9 @@ fn watch_files(
lib,
Arc::clone(&exif_dao),
Arc::clone(&preview_dao),
Arc::clone(&face_dao),
face_client.clone(),
&excluded_dirs,
Some(check_since),
playlist_manager.clone(),
preview_generator.clone(),
@@ -1907,6 +1921,9 @@ fn process_new_files(
library: &libraries::Library,
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
face_dao: Arc<Mutex<Box<dyn faces::FaceDao>>>,
face_client: crate::ai::face_client::FaceClient,
excluded_dirs: &[String],
modified_since: Option<SystemTime>,
playlist_manager: Addr<VideoPlaylistManager>,
preview_generator: Addr<video::actors::PreviewClipGenerator>,
@@ -2082,6 +2099,24 @@ fn process_new_files(
}
}
// ── Face detection pass ────────────────────────────────────────────
// Run after EXIF writes so newly-registered files have their
// content_hash populated. Skipped wholesale when face_client is
// disabled (no Apollo integration configured) — Phase 3 wires this
// up; the watcher remains usable on legacy deploys.
if face_client.is_enabled() {
let candidates = build_face_candidates(&context, &files, &exif_dao, &face_dao);
if !candidates.is_empty() {
face_watch::run_face_detection_pass(
library,
excluded_dirs,
&face_client,
Arc::clone(&face_dao),
candidates,
);
}
}
// Check for videos that need HLS playlists
let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
let mut videos_needing_playlists = Vec::new();
@@ -2206,6 +2241,64 @@ fn process_new_files(
}
}
/// Build the face-detection candidate list for a scan tick.
///
/// We need `(rel_path, content_hash)` for every image file that has a
/// content_hash recorded in image_exif but no row in face_detections yet.
/// Re-querying image_exif here picks up rows the EXIF write loop just
/// inserted alongside any pre-existing rows the watcher walked over —
/// covers both new uploads and the initial backlog scan.
fn build_face_candidates(
context: &opentelemetry::Context,
files: &[(PathBuf, String)],
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
face_dao: &Arc<Mutex<Box<dyn faces::FaceDao>>>,
) -> Vec<face_watch::FaceCandidate> {
// Restrict to image files; videos aren't face-scanned in v1 (kamadak
// doesn't even register them in image_exif).
let image_paths: Vec<String> = files
.iter()
.filter(|(p, _)| !is_video_file(p))
.map(|(_, rel)| rel.clone())
.collect();
if image_paths.is_empty() {
return Vec::new();
}
let exif_records = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_exif_batch(context, &image_paths)
.unwrap_or_default()
};
// rel_path → content_hash (only rows with a hash; without one we have
// nothing to key face data against).
let mut hash_by_path: HashMap<String, String> = HashMap::with_capacity(exif_records.len());
for record in exif_records {
if let Some(h) = record.content_hash {
hash_by_path.insert(record.file_path, h);
}
}
let mut candidates = Vec::new();
let mut dao = face_dao.lock().expect("face dao");
for rel_path in image_paths {
let Some(hash) = hash_by_path.get(&rel_path) else {
continue;
};
match dao.already_scanned(context, hash) {
Ok(true) => continue,
Ok(false) => candidates.push(face_watch::FaceCandidate {
rel_path,
content_hash: hash.clone(),
}),
Err(e) => {
warn!("face_watch: already_scanned errored for {}: {:?}", hash, e);
}
}
}
candidates
}
#[cfg(test)]
mod tests {
use super::*;