faces: drain backfill + detection backlog every tick, not just full scans
Symptom: ImageApi restart, then ~60 minutes of silence — no
face_watch lines at all. Cause: backfill + face-detection candidate
build were both gated inside process_new_files, which during quick
scans (every 60s) only walks files modified in the last interval.
The pre-existing unhashed / unscanned backlog never entered the
candidate set, so it only drained on the full-scan path (default
once per hour). Surfaced as "scan stuck at 1101/13118" — most of
those rows were waiting on the next full scan.
Two new per-tick passes that work directly off the DB:
(1) backfill_unhashed_backlog uses ExifDao::get_rows_missing_hash to
pull unhashed rows in id order, capped (FACE_HASH_BACKFILL_MAX_PER_TICK
default 2000), and writes content_hash for each. No filesystem
walk — the walk was the gating filter that hid the backlog.
(2) process_face_backlog uses a new FaceDao::list_unscanned_candidates
(LEFT-anti-join on content_hash via raw SQL, GROUP BY hash so
duplicates fire one detect call) to pull a capped batch of
hashed-but-unscanned rows (FACE_BACKLOG_MAX_PER_TICK default 64)
and runs the existing face_watch detection pipeline on them.
Both run only when face_client.is_enabled(). The cap on (2) is small
because each candidate is a real Apollo round-trip — 64/tick at 60s
quick interval ≈ 64 detections/min, which paces an 8-core CPU
inference comfortably while keeping a steady flow visible in logs.
process_new_files's own backfill stays in place for the same-tick
flow (a brand-new upload gets hashed AND face-scanned in the tick
where it's discovered) but is now belt-and-suspenders.
Test backstop pinning the new DAO method's filter contract: only
hashed, unscanned, in-library rows are returned; scanned rows,
unhashed rows, and other-library rows are filtered out.
This commit is contained in:
161
src/main.rs
161
src/main.rs
@@ -1861,6 +1861,26 @@ fn watch_files(
|
||||
let is_full_scan = since_last_full.as_secs() >= full_interval_secs;
|
||||
|
||||
for lib in &libs {
|
||||
// Drain the unhashed-hash backlog AND the face-detection
|
||||
// backlog every tick, regardless of quick/full. Quick
|
||||
// scans only walk recently-modified files, so the
|
||||
// pre-Phase-3 backlog never enters their candidate set
|
||||
// — without these standalone passes, backfill +
|
||||
// detection only progressed during full scans
|
||||
// (default once an hour).
|
||||
if face_client.is_enabled() {
|
||||
let context = opentelemetry::Context::new();
|
||||
backfill_unhashed_backlog(&context, lib, &exif_dao);
|
||||
process_face_backlog(
|
||||
&context,
|
||||
lib,
|
||||
&face_client,
|
||||
&face_dao,
|
||||
&watcher_tag_dao,
|
||||
&excluded_dirs,
|
||||
);
|
||||
}
|
||||
|
||||
if is_full_scan {
|
||||
info!(
|
||||
"Running full scan for library '{}' (scan #{})",
|
||||
@@ -2288,6 +2308,147 @@ fn process_new_files(
|
||||
/// blake3-ing every photo at once. Subsequent scans pick up the rest.
|
||||
/// For 50k+ libraries the dedicated `cargo run --bin backfill_hashes`
|
||||
/// is still faster (it doesn't fight a watcher loop for the DAO mutex).
|
||||
/// Drain unhashed image_exif rows by querying them directly, independent
|
||||
/// of the filesystem walk. Quick scans only walk recently-modified
|
||||
/// files, so a backlog of pre-existing unhashed rows never enters
|
||||
/// `process_new_files`'s candidate set — left alone, it would only
|
||||
/// drain on full scans (default once an hour). Calling this every tick
|
||||
/// keeps the face-detection backlog moving regardless.
|
||||
///
|
||||
/// Returns the number of rows successfully backfilled this pass.
|
||||
fn backfill_unhashed_backlog(
|
||||
context: &opentelemetry::Context,
|
||||
library: &libraries::Library,
|
||||
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
|
||||
) -> usize {
|
||||
let cap: i64 = dotenv::var("FACE_HASH_BACKFILL_MAX_PER_TICK")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.filter(|n: &i64| *n > 0)
|
||||
.unwrap_or(2000);
|
||||
|
||||
// Fetch up to cap+1 rows so we can tell "more remain" without a
|
||||
// separate count query. Across libraries — there's no per-library
|
||||
// filter on get_rows_missing_hash today — but we only ever update
|
||||
// rows whose library_id matches the caller's library, so other
|
||||
// libraries' rows just get skipped here and picked up on the next
|
||||
// library's tick. Negligible cost given the cap.
|
||||
let rows: Vec<(i32, String)> = {
|
||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||
dao.get_rows_missing_hash(context, cap + 1).unwrap_or_default()
|
||||
};
|
||||
if rows.is_empty() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let more_than_cap = rows.len() as i64 > cap;
|
||||
let base_path = std::path::Path::new(&library.root_path);
|
||||
|
||||
let mut backfilled = 0usize;
|
||||
let mut errors = 0usize;
|
||||
let mut skipped_other_lib = 0usize;
|
||||
for (lib_id, rel_path) in rows.iter().take(cap as usize) {
|
||||
if *lib_id != library.id {
|
||||
skipped_other_lib += 1;
|
||||
continue;
|
||||
}
|
||||
let abs = base_path.join(rel_path);
|
||||
if !abs.exists() {
|
||||
// File walked away — the watcher's reconciliation pass will
|
||||
// remove the orphan exif row eventually.
|
||||
continue;
|
||||
}
|
||||
match content_hash::compute(&abs) {
|
||||
Ok(id) => {
|
||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||
if let Err(e) =
|
||||
dao.backfill_content_hash(context, library.id, rel_path, &id.content_hash, id.size_bytes)
|
||||
{
|
||||
warn!(
|
||||
"face_watch: backfill_content_hash failed for {}: {:?}",
|
||||
rel_path, e
|
||||
);
|
||||
errors += 1;
|
||||
} else {
|
||||
backfilled += 1;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("face_watch: hash compute failed for {} ({:?})", abs.display(), e);
|
||||
errors += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if backfilled > 0 || errors > 0 || more_than_cap {
|
||||
info!(
|
||||
"face_watch: backfill pass for library '{}': hashed {} ({} error(s), {} skipped to other libraries; {} cap, more_remain={})",
|
||||
library.name, backfilled, errors, skipped_other_lib, cap, more_than_cap
|
||||
);
|
||||
}
|
||||
backfilled
|
||||
}
|
||||
|
||||
/// Per-tick face-detection drain. Pulls a capped batch of hashed-but-
|
||||
/// unscanned image_exif rows directly via the FaceDao anti-join and
|
||||
/// hands them to the existing detection pass. Runs on every tick (not
|
||||
/// just full scans) so the backlog moves at quick-scan cadence.
|
||||
fn process_face_backlog(
|
||||
context: &opentelemetry::Context,
|
||||
library: &libraries::Library,
|
||||
face_client: &crate::ai::face_client::FaceClient,
|
||||
face_dao: &Arc<Mutex<Box<dyn faces::FaceDao>>>,
|
||||
tag_dao: &Arc<Mutex<Box<dyn tags::TagDao>>>,
|
||||
excluded_dirs: &[String],
|
||||
) {
|
||||
let cap: i64 = dotenv::var("FACE_BACKLOG_MAX_PER_TICK")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.filter(|n: &i64| *n > 0)
|
||||
.unwrap_or(64);
|
||||
|
||||
let rows: Vec<(String, String)> = {
|
||||
let mut dao = face_dao.lock().expect("face dao");
|
||||
match dao.list_unscanned_candidates(context, library.id, cap) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"face_watch: list_unscanned_candidates failed for library '{}': {:?}",
|
||||
library.name, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
if rows.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
info!(
|
||||
"face_watch: backlog drain — running detection on {} candidate(s) for library '{}' (cap={})",
|
||||
rows.len(),
|
||||
library.name,
|
||||
cap
|
||||
);
|
||||
|
||||
let candidates: Vec<face_watch::FaceCandidate> = rows
|
||||
.into_iter()
|
||||
.map(|(rel_path, content_hash)| face_watch::FaceCandidate {
|
||||
rel_path,
|
||||
content_hash,
|
||||
})
|
||||
.collect();
|
||||
|
||||
face_watch::run_face_detection_pass(
|
||||
library,
|
||||
excluded_dirs,
|
||||
face_client,
|
||||
Arc::clone(face_dao),
|
||||
Arc::clone(tag_dao),
|
||||
candidates,
|
||||
);
|
||||
}
|
||||
|
||||
fn backfill_missing_content_hashes(
|
||||
context: &opentelemetry::Context,
|
||||
files: &[(PathBuf, String)],
|
||||
|
||||
Reference in New Issue
Block a user