date_backfill: per-tick drain for unresolved date_taken rows

Adds two ExifDao methods (`get_rows_needing_date_backfill` /
`backfill_date_taken`) and a `backfill_missing_date_taken` watcher pass
that runs on every tick alongside `backfill_unhashed_backlog`.

The drain queries the partial index for rows where `date_taken IS NULL`
or `date_taken_source = 'fs_time'`, batches up to
`DATE_BACKFILL_MAX_PER_TICK` paths (default 500), and feeds them through
`date_resolver::resolve_dates_batch` — a single exiftool subprocess
covers the whole tick. Rows that newly resolve to `exiftool` /
`filename` / `fs_time` get persisted via `backfill_date_taken` (touches
only `date_taken` + `date_taken_source` so EXIF / hash / perceptual
columns survive).

`filename`-sourced rows are intentionally not re-resolved — the regex
is authoritative when it matches and re-running exiftool wouldn't
change the answer. Files that have disappeared from disk are skipped
so a ghost row doesn't loop through the drain forever; the
missing-file scan in `library_maintenance` retires those separately.

Comes with two DAO unit tests (eligibility filter + column-isolation).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Cameron Cordes
2026-05-06 16:03:03 -04:00
parent 2d14291733
commit 54e0635a98
3 changed files with 291 additions and 0 deletions

View File

@@ -396,6 +396,34 @@ pub trait ExifDao: Sync + Send {
size_bytes: i64,
) -> Result<(), DbError>;
/// Return image_exif rows that need their `date_taken` re-resolved by
/// the canonical-date waterfall (see `crate::date_resolver`):
/// either no source ever ran (`date_taken IS NULL`), or only the
/// weakest fallback resolved it (`date_taken_source = 'fs_time'`).
/// Returns `(library_id, rel_path)`. The caller filters to its own
/// library on the way through; rows from other libraries fall to the
/// next library's tick. Backed by the partial index
/// `idx_image_exif_date_backfill`.
fn get_rows_needing_date_backfill(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
limit: i64,
) -> Result<Vec<(i32, String)>, DbError>;
/// Persist a resolver result for an existing row. Touches `date_taken`
/// and `date_taken_source` only — leaves all other columns alone so
/// the drain doesn't accidentally clobber EXIF/hash/perceptual data
/// the watcher / GPS-write path may have already written.
fn backfill_date_taken(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
date_taken: i64,
source: &str,
) -> Result<(), DbError>;
/// Return image rows that have a `content_hash` but no `phash_64`,
/// oldest first. Used by the `backfill_perceptual_hash` binary.
/// Filters by image extension at the DB layer to avoid ever asking
@@ -1056,6 +1084,61 @@ impl ExifDao for SqliteExifDao {
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn get_rows_needing_date_backfill(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
limit: i64,
) -> Result<Vec<(i32, String)>, DbError> {
trace_db_call(context, "query", "get_rows_needing_date_backfill", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// The partial index is on `(library_id, id) WHERE date_taken
// IS NULL OR date_taken_source = 'fs_time'`, so the planner
// hits it directly when both predicates are present.
image_exif
.filter(library_id.eq(library_id_val))
.filter(date_taken.is_null().or(date_taken_source.eq("fs_time")))
.select((library_id, rel_path))
.order(id.asc())
.limit(limit)
.load::<(i32, String)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn backfill_date_taken(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
date_taken_val: i64,
source: &str,
) -> Result<(), DbError> {
trace_db_call(context, "update", "backfill_date_taken", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
diesel::update(
image_exif
.filter(library_id.eq(library_id_val))
.filter(rel_path.eq(rel_path_val)),
)
.set((
date_taken.eq(date_taken_val),
date_taken_source.eq(source),
))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|_| anyhow::anyhow!("Update error"))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn find_by_content_hash(
&mut self,
context: &opentelemetry::Context,
@@ -1933,4 +2016,86 @@ mod exif_dao_tests {
// Unknown library: zero, no error.
assert_eq!(dao.count_for_library(&ctx(), 999).unwrap(), 0);
}
/// Insert a row with an explicit date source — used by the
/// canonical-date drain tests below.
fn insert_row_with_source(
dao: &mut SqliteExifDao,
lib_id: i32,
rel: &str,
date: Option<i64>,
source: Option<&str>,
) {
dao.store_exif(
&ctx(),
InsertImageExif {
library_id: lib_id,
file_path: rel.to_string(),
camera_make: None,
camera_model: None,
lens_model: None,
width: None,
height: None,
orientation: None,
gps_latitude: None,
gps_longitude: None,
gps_altitude: None,
focal_length: None,
aperture: None,
shutter_speed: None,
iso: None,
date_taken: date,
created_time: 0,
last_modified: 0,
content_hash: None,
size_bytes: None,
phash_64: None,
dhash_64: None,
date_taken_source: source.map(|s| s.to_string()),
},
)
.expect("insert exif row");
}
#[test]
fn get_rows_needing_date_backfill_returns_null_and_fs_time() {
let mut dao = setup_two_libraries();
// Each row exercises a different source: null, fs_time (eligible),
// filename and exif (skipped).
insert_row_with_source(&mut dao, 1, "main/null.jpg", None, None);
insert_row_with_source(&mut dao, 1, "main/fs.jpg", Some(123), Some("fs_time"));
insert_row_with_source(&mut dao, 1, "main/name.jpg", Some(456), Some("filename"));
insert_row_with_source(&mut dao, 1, "main/real.jpg", Some(789), Some("exif"));
// Other library — never returned even when eligible.
insert_row_with_source(&mut dao, 2, "archive/null.jpg", None, None);
let rows = dao
.get_rows_needing_date_backfill(&ctx(), 1, 100)
.unwrap();
let paths: Vec<String> = rows.into_iter().map(|(_, p)| p).collect();
assert_eq!(paths.len(), 2, "expected null + fs_time eligible only");
assert!(paths.contains(&"main/null.jpg".to_string()));
assert!(paths.contains(&"main/fs.jpg".to_string()));
}
#[test]
fn backfill_date_taken_writes_date_and_source_only() {
let mut dao = setup_two_libraries();
insert_row_with_source(&mut dao, 1, "main/x.jpg", None, None);
// Set a content_hash on the row to verify backfill_date_taken
// doesn't disturb other columns. Using the existing
// backfill_content_hash for this verifies via a separate path.
dao.backfill_content_hash(&ctx(), 1, "main/x.jpg", "deadbeef", 1024)
.unwrap();
dao.backfill_date_taken(&ctx(), 1, "main/x.jpg", 1700000000, "exiftool")
.unwrap();
let row = dao.get_exif(&ctx(), "main/x.jpg").unwrap().unwrap();
assert_eq!(row.date_taken, Some(1700000000));
assert_eq!(row.date_taken_source, Some("exiftool".to_string()));
// Untouched columns survive.
assert_eq!(row.content_hash, Some("deadbeef".to_string()));
assert_eq!(row.size_bytes, Some(1024));
}
}

View File

@@ -1646,6 +1646,26 @@ mod tests {
Ok(())
}
fn get_rows_needing_date_backfill(
&mut self,
_context: &opentelemetry::Context,
_library_id: i32,
_limit: i64,
) -> Result<Vec<(i32, String)>, DbError> {
Ok(Vec::new())
}
fn backfill_date_taken(
&mut self,
_context: &opentelemetry::Context,
_library_id: i32,
_rel_path: &str,
_date_taken: i64,
_source: &str,
) -> Result<(), DbError> {
Ok(())
}
fn find_by_content_hash(
&mut self,
_context: &opentelemetry::Context,

View File

@@ -2125,6 +2125,15 @@ fn watch_files(
);
}
// Date-taken backfill: drain rows whose canonical date is
// either unresolved or only fs_time-sourced. Independent
// of face detection — runs even on deploys that don't
// configure Apollo, since `/memories` depends on it.
{
let context = opentelemetry::Context::new();
backfill_missing_date_taken(&context, lib, &exif_dao);
}
if is_full_scan {
info!(
"Running full scan for library '{}' (scan #{})",
@@ -2706,6 +2715,103 @@ fn backfill_unhashed_backlog(
backfilled
}
/// Drain image_exif rows whose `date_taken` was never resolved or was
/// resolved by the weakest fallback (`fs_time`). Runs the canonical-date
/// waterfall — exiftool batch (one subprocess for the whole tick's
/// rows) → filename regex → earliest_fs_time — and persists each
/// resolution with its source tag. Capped per tick by
/// `DATE_BACKFILL_MAX_PER_TICK` (default 500) so a 14k-row library
/// drains over a few quick-scan ticks without blocking the watcher.
///
/// kamadak-exif is intentionally skipped here: the row already has a
/// NULL date_taken because the ingest path's kamadak-exif call returned
/// nothing, and re-running it would just produce the same answer.
/// exiftool is the meaningful new attempt — it handles videos and
/// MakerNote-hosted dates kamadak can't reach.
fn backfill_missing_date_taken(
context: &opentelemetry::Context,
library: &libraries::Library,
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
) -> usize {
let cap: i64 = dotenv::var("DATE_BACKFILL_MAX_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &i64| *n > 0)
.unwrap_or(500);
let rows: Vec<(i32, String)> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_rows_needing_date_backfill(context, library.id, cap + 1)
.unwrap_or_default()
};
if rows.is_empty() {
return 0;
}
let more_than_cap = rows.len() as i64 > cap;
let base_path = std::path::Path::new(&library.root_path);
// Build absolute paths and drop rows whose files no longer exist —
// the missing-file scan in library_maintenance retires deleted rows
// separately. Without this filter, NULL-date rows for missing files
// would loop through the drain forever (no source can resolve them).
let mut existing: Vec<(String, PathBuf)> = Vec::with_capacity(rows.len() as usize);
for (_, rel_path) in rows.iter().take(cap as usize) {
let abs = base_path.join(rel_path);
if abs.exists() {
existing.push((rel_path.clone(), abs));
}
}
if existing.is_empty() {
return 0;
}
// One exiftool subprocess for the whole batch; the resolver falls
// through to filename / fs_time per file when exiftool can't supply
// a date (or isn't installed at all).
let paths: Vec<PathBuf> = existing.iter().map(|(_, p)| p.clone()).collect();
let resolved = date_resolver::resolve_dates_batch(&paths, &HashMap::new());
let mut backfilled = 0usize;
let mut unresolved = 0usize;
let mut by_source: HashMap<&'static str, usize> = HashMap::new();
{
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
for (rel_path, abs) in &existing {
let Some(rd) = resolved.get(abs).copied() else {
unresolved += 1;
continue;
};
match dao.backfill_date_taken(
context,
library.id,
rel_path,
rd.timestamp,
rd.source.as_str(),
) {
Ok(()) => {
backfilled += 1;
*by_source.entry(rd.source.as_str()).or_insert(0) += 1;
}
Err(e) => {
warn!(
"date_backfill: update failed for lib {} {}: {:?}",
library.id, rel_path, e
);
}
}
}
}
if backfilled > 0 || unresolved > 0 || more_than_cap {
info!(
"date_backfill: library '{}': resolved {} ({:?}), {} unresolved, cap={}, more_remain={}",
library.name, backfilled, by_source, unresolved, cap, more_than_cap
);
}
backfilled
}
/// Per-tick face-detection drain. Pulls a capped batch of hashed-but-
/// unscanned image_exif rows directly via the FaceDao anti-join and
/// hands them to the existing detection pass. Runs on every tick (not