From 1971eeccd64bf761772d4cb84cd3f9b3e584348b Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 30 Apr 2026 01:46:49 +0000 Subject: [PATCH] faces: drain backfill + detection backlog every tick, not just full scans MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Symptom: ImageApi restart, then ~60 minutes of silence — no face_watch lines at all. Cause: backfill + face-detection candidate build were both gated inside process_new_files, which during quick scans (every 60s) only walks files modified in the last interval. The pre-existing unhashed / unscanned backlog never entered the candidate set, so it only drained on the full-scan path (default once per hour). Surfaced as "scan stuck at 1101/13118" — most of those rows were waiting on the next full scan. Two new per-tick passes that work directly off the DB: (1) backfill_unhashed_backlog uses ExifDao::get_rows_missing_hash to pull unhashed rows in id order, capped (FACE_HASH_BACKFILL_MAX_PER_TICK default 2000), and writes content_hash for each. No filesystem walk — the walk was the gating filter that hid the backlog. (2) process_face_backlog uses a new FaceDao::list_unscanned_candidates (LEFT-anti-join on content_hash via raw SQL, GROUP BY hash so duplicates fire one detect call) to pull a capped batch of hashed-but-unscanned rows (FACE_BACKLOG_MAX_PER_TICK default 64) and runs the existing face_watch detection pipeline on them. Both run only when face_client.is_enabled(). The cap on (2) is small because each candidate is a real Apollo round-trip — 64/tick at 60s quick interval ≈ 64 detections/min, which paces an 8-core CPU inference comfortably while keeping a steady flow visible in logs. process_new_files's own backfill stays in place for the same-tick flow (a brand-new upload gets hashed AND face-scanned in the tick where it's discovered) but is now belt-and-suspenders. Test backstop pinning the new DAO method's filter contract: only hashed, unscanned, in-library rows are returned; scanned rows, unhashed rows, and other-library rows are filtered out. --- src/faces.rs | 109 ++++++++++++++++++++++++++++++++++ src/main.rs | 161 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 270 insertions(+) diff --git a/src/faces.rs b/src/faces.rs index f45afa4..20fd700 100644 --- a/src/faces.rs +++ b/src/faces.rs @@ -99,6 +99,17 @@ pub struct FaceDetectionRow { pub created_at: i64, } +/// Row shape for `list_unscanned_candidates`'s raw SQL. Diesel's +/// `sql_query` requires a `QueryableByName` row type with explicit +/// column SQL types; using a tuple isn't supported. +#[derive(diesel::QueryableByName, Debug)] +struct UnscannedRow { + #[diesel(sql_type = diesel::sql_types::Text)] + rel_path: String, + #[diesel(sql_type = diesel::sql_types::Text)] + content_hash: String, +} + #[derive(Insertable, Debug)] #[diesel(table_name = face_detections)] struct InsertFaceDetection { @@ -354,6 +365,18 @@ pub trait FaceDao: Send + Sync { ctx: &opentelemetry::Context, content_hash: &str, ) -> anyhow::Result; + /// Find image_exif rows in `library_id` that have a populated + /// content_hash but no matching face_detections row yet. Used by + /// the watcher's quick-scan path to drain the backlog without + /// re-walking the filesystem. Returns `(rel_path, content_hash)` + /// pairs, capped at `limit`. Distinct on content_hash so the same + /// hash that lives at multiple rel_paths only fires one detection. + fn list_unscanned_candidates( + &mut self, + ctx: &opentelemetry::Context, + library_id: i32, + limit: i64, + ) -> anyhow::Result>; fn store_detection( &mut self, ctx: &opentelemetry::Context, @@ -565,6 +588,43 @@ impl FaceDao for SqliteFaceDao { }) } + fn list_unscanned_candidates( + &mut self, + ctx: &opentelemetry::Context, + library_id: i32, + limit: i64, + ) -> anyhow::Result> { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "query", "list_unscanned_candidates", |span| { + span.set_attribute(KeyValue::new("library_id", library_id as i64)); + // Pick the smallest-id rel_path per content_hash so we don't + // fire multiple detect calls for the same hash if it lives + // under several rel_paths in the same library. The + // anti-join (NOT EXISTS) drains hashes that have no row in + // face_detections at all. + let rows: Vec<(String, String)> = diesel::sql_query( + "SELECT rel_path, content_hash \ + FROM image_exif e \ + WHERE library_id = ? \ + AND content_hash IS NOT NULL \ + AND NOT EXISTS ( \ + SELECT 1 FROM face_detections f \ + WHERE f.content_hash = e.content_hash \ + ) \ + GROUP BY content_hash \ + LIMIT ?", + ) + .bind::(library_id) + .bind::(limit) + .load::(conn.deref_mut()) + .with_context(|| "list_unscanned_candidates")? + .into_iter() + .map(|r| (r.rel_path, r.content_hash)) + .collect(); + Ok(rows) + }) + } + fn store_detection( &mut self, ctx: &opentelemetry::Context, @@ -3291,4 +3351,53 @@ mod tests { assert!(joined.person_name.is_none()); } + #[test] + fn list_unscanned_candidates_filters_to_hashed_unscanned_in_library() { + // The watcher's per-tick backlog drain depends on this query + // returning *only* image_exif rows with a populated + // content_hash and no matching face_detections row in the + // requested library. A regression here would either silently + // re-scan files (waste of inference) or skip files that need + // scanning (the symptom we just shipped a fix for). + let mut dao = fresh_dao(); + diesel::sql_query( + "INSERT OR IGNORE INTO libraries (id, name, root_path, created_at) \ + VALUES (1, 'main', '/tmp', 0), (2, 'other', '/tmp2', 0)", + ) + .execute(dao.connection.lock().unwrap().deref_mut()) + .expect("seed libraries"); + + // Seed image_exif: mix of hashed/unhashed/scanned/cross-library. + diesel::sql_query( + "INSERT INTO image_exif \ + (library_id, rel_path, content_hash, created_time, last_modified) VALUES \ + (1, 'a.jpg', 'h-a', 0, 0), \ + (1, 'b.jpg', 'h-b', 0, 0), \ + (1, 'c.jpg', NULL, 0, 0), \ + (1, 'd.jpg', 'h-d', 0, 0), \ + (2, 'e.jpg', 'h-e', 0, 0)", + ) + .execute(dao.connection.lock().unwrap().deref_mut()) + .expect("seed image_exif"); + + // 'b' has been scanned (no_faces marker) — expect it filtered out. + dao.mark_status(&ctx(), 1, "h-b", "b.jpg", "no_faces", "buffalo_l") + .expect("scanned marker"); + + let cands = dao + .list_unscanned_candidates(&ctx(), 1, 10) + .expect("list unscanned"); + + let hashes: std::collections::HashSet<_> = + cands.iter().map(|(_, h)| h.clone()).collect(); + + // Should contain a and d (hashed, unscanned, library 1). + assert!(hashes.contains("h-a"), "missing h-a: {:?}", hashes); + assert!(hashes.contains("h-d"), "missing h-d: {:?}", hashes); + // Should NOT contain b (scanned), c (no hash), e (other library). + assert!(!hashes.contains("h-b"), "expected h-b filtered (scanned)"); + assert!(!hashes.contains("h-e"), "expected h-e filtered (other library)"); + assert_eq!(cands.len(), 2, "unexpected candidates: {:?}", cands); + } + } diff --git a/src/main.rs b/src/main.rs index f943810..3e85cbd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1861,6 +1861,26 @@ fn watch_files( let is_full_scan = since_last_full.as_secs() >= full_interval_secs; for lib in &libs { + // Drain the unhashed-hash backlog AND the face-detection + // backlog every tick, regardless of quick/full. Quick + // scans only walk recently-modified files, so the + // pre-Phase-3 backlog never enters their candidate set + // — without these standalone passes, backfill + + // detection only progressed during full scans + // (default once an hour). + if face_client.is_enabled() { + let context = opentelemetry::Context::new(); + backfill_unhashed_backlog(&context, lib, &exif_dao); + process_face_backlog( + &context, + lib, + &face_client, + &face_dao, + &watcher_tag_dao, + &excluded_dirs, + ); + } + if is_full_scan { info!( "Running full scan for library '{}' (scan #{})", @@ -2288,6 +2308,147 @@ fn process_new_files( /// blake3-ing every photo at once. Subsequent scans pick up the rest. /// For 50k+ libraries the dedicated `cargo run --bin backfill_hashes` /// is still faster (it doesn't fight a watcher loop for the DAO mutex). +/// Drain unhashed image_exif rows by querying them directly, independent +/// of the filesystem walk. Quick scans only walk recently-modified +/// files, so a backlog of pre-existing unhashed rows never enters +/// `process_new_files`'s candidate set — left alone, it would only +/// drain on full scans (default once an hour). Calling this every tick +/// keeps the face-detection backlog moving regardless. +/// +/// Returns the number of rows successfully backfilled this pass. +fn backfill_unhashed_backlog( + context: &opentelemetry::Context, + library: &libraries::Library, + exif_dao: &Arc>>, +) -> usize { + let cap: i64 = dotenv::var("FACE_HASH_BACKFILL_MAX_PER_TICK") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &i64| *n > 0) + .unwrap_or(2000); + + // Fetch up to cap+1 rows so we can tell "more remain" without a + // separate count query. Across libraries — there's no per-library + // filter on get_rows_missing_hash today — but we only ever update + // rows whose library_id matches the caller's library, so other + // libraries' rows just get skipped here and picked up on the next + // library's tick. Negligible cost given the cap. + let rows: Vec<(i32, String)> = { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + dao.get_rows_missing_hash(context, cap + 1).unwrap_or_default() + }; + if rows.is_empty() { + return 0; + } + + let more_than_cap = rows.len() as i64 > cap; + let base_path = std::path::Path::new(&library.root_path); + + let mut backfilled = 0usize; + let mut errors = 0usize; + let mut skipped_other_lib = 0usize; + for (lib_id, rel_path) in rows.iter().take(cap as usize) { + if *lib_id != library.id { + skipped_other_lib += 1; + continue; + } + let abs = base_path.join(rel_path); + if !abs.exists() { + // File walked away — the watcher's reconciliation pass will + // remove the orphan exif row eventually. + continue; + } + match content_hash::compute(&abs) { + Ok(id) => { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + if let Err(e) = + dao.backfill_content_hash(context, library.id, rel_path, &id.content_hash, id.size_bytes) + { + warn!( + "face_watch: backfill_content_hash failed for {}: {:?}", + rel_path, e + ); + errors += 1; + } else { + backfilled += 1; + } + } + Err(e) => { + debug!("face_watch: hash compute failed for {} ({:?})", abs.display(), e); + errors += 1; + } + } + } + + if backfilled > 0 || errors > 0 || more_than_cap { + info!( + "face_watch: backfill pass for library '{}': hashed {} ({} error(s), {} skipped to other libraries; {} cap, more_remain={})", + library.name, backfilled, errors, skipped_other_lib, cap, more_than_cap + ); + } + backfilled +} + +/// Per-tick face-detection drain. Pulls a capped batch of hashed-but- +/// unscanned image_exif rows directly via the FaceDao anti-join and +/// hands them to the existing detection pass. Runs on every tick (not +/// just full scans) so the backlog moves at quick-scan cadence. +fn process_face_backlog( + context: &opentelemetry::Context, + library: &libraries::Library, + face_client: &crate::ai::face_client::FaceClient, + face_dao: &Arc>>, + tag_dao: &Arc>>, + excluded_dirs: &[String], +) { + let cap: i64 = dotenv::var("FACE_BACKLOG_MAX_PER_TICK") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &i64| *n > 0) + .unwrap_or(64); + + let rows: Vec<(String, String)> = { + let mut dao = face_dao.lock().expect("face dao"); + match dao.list_unscanned_candidates(context, library.id, cap) { + Ok(r) => r, + Err(e) => { + warn!( + "face_watch: list_unscanned_candidates failed for library '{}': {:?}", + library.name, e + ); + return; + } + } + }; + if rows.is_empty() { + return; + } + + info!( + "face_watch: backlog drain — running detection on {} candidate(s) for library '{}' (cap={})", + rows.len(), + library.name, + cap + ); + + let candidates: Vec = rows + .into_iter() + .map(|(rel_path, content_hash)| face_watch::FaceCandidate { + rel_path, + content_hash, + }) + .collect(); + + face_watch::run_face_detection_pass( + library, + excluded_dirs, + face_client, + Arc::clone(face_dao), + Arc::clone(tag_dao), + candidates, + ); +} + fn backfill_missing_content_hashes( context: &opentelemetry::Context, files: &[(PathBuf, String)],