From 263e27e1081f18fbca8ad8975f12b179332a4086 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Fri, 1 May 2026 16:27:53 +0000 Subject: [PATCH] multi-library: handoff + orphan GC with two-tick consensus MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Branch C of the multi-library data-model rollout. Implements the operational maintenance pipeline pinned in CLAUDE.md → "Multi-library data model" / "Library availability and safety". Branches A and B land first; this branch builds on top. New module: src/library_maintenance.rs Three idempotent passes the watcher runs every tick after the per-library ingest loop: 1. Missing-file scan (per online library) For each Online library, load a paginated page of image_exif rows (IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE, default 500), stat() each one, and delete rows whose source file is NotFound. Permission/IO errors are skipped, never deleted. Capped at IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK (default 200) per library per tick — so a pathological mount that returns NotFound for everything can't wipe the table in one cycle. Cursor advances across ticks, wraps on partial-page returns, and naturally cycles through the entire library over many minutes. Skipped wholesale for Stale libraries via the existing probe gate. 2. Back-ref refresh (DB-only) For face_detections / tagged_photo / photo_insights: any hash-keyed row whose (library_id, rel_path) no longer matches an image_exif row, but whose content_hash does, is repointed at a surviving image_exif location. Pure SQL with EXISTS guards so rows whose hash is fully orphaned are left alone (the orphan GC handles those). Idempotent; no availability gate needed. This is what makes a recent → archive move invisible to readers: when pass 1 retires the lib-A row, pass 2 pivots tags / faces / insights to lib-B's surviving path before any client notices. 3. Orphan GC (destructive) Hash-keyed derived rows whose content_hash has no image_exif referent are GC-eligible. Two-tick consensus: a hash must be observed orphaned on two consecutive ticks AND every library must be Online for both. A single Stale tick within the window cancels all pending deletes (they remain marked but won't be promoted) — they're re-evaluated next tick. The pending set lives in OrphanGcState (in-memory); a watcher restart resets it, which can only delay a delete, never cause one. Hashes that re-appear in image_exif between ticks are "revived" from the pending set (handles transient share unmount / remount). Two new ExifDao methods: - list_rel_paths_for_library_page(library_id, limit, offset) for the paginated missing-file scan. - (count_for_library landed in Branch A.) Watcher wiring (main.rs) Per-library: missing-file scan inside the existing per-library loop, after process_new_files, gated by the same probe check that already protects ingest. After the loop: reconcile (Branch B), back-ref refresh, then run_orphan_gc. The maintenance connection is opened once per tick (image_api::database::connect), used by all three DB-only passes, and dropped at end of tick. CLAUDE.md gains a "Maintenance pipeline" subsection that describes the three passes and their interaction with the existing availability-and-safety policy. Tests: 225 pass (217 from Branch B + 8 new in library_maintenance covering back-ref refresh including the fully-orphaned no-op case, two-tick GC consensus, Stale-tick consensus reset, image_exif re-appearance revival, multi-table delete, and the all_libraries_online helper). Co-Authored-By: Claude Opus 4.7 (1M context) --- CLAUDE.md | 41 ++ src/database/mod.rs | 34 ++ src/files.rs | 10 + src/lib.rs | 1 + src/library_maintenance.rs | 745 +++++++++++++++++++++++++++++++++++++ src/main.rs | 76 ++++ 6 files changed, 907 insertions(+) create mode 100644 src/library_maintenance.rs diff --git a/CLAUDE.md b/CLAUDE.md index acf9b40..82a1e62 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -229,6 +229,47 @@ disappearance with no matching appearance is treated as "unavailable-or-deleted, defer judgment" — it does not re-key any rows and does not enqueue GC. +**Maintenance pipeline (`src/library_maintenance.rs`).** The watcher +runs three maintenance passes per tick that together implement the +move/handoff and orphan rules: + +1. **Missing-file scan** — per online library, paginated. A page of + `image_exif` rows is loaded (`IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE`, + default 500), each row's `(root_path/rel_path)` is `stat()`-ed, + and confirmed-not-found rows are deleted from `image_exif` + (capped at `IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK`, default 200). + Permission/IO errors are skipped, never deleted — only `NotFound` + triggers a deletion. The cursor wraps every time a partial page + comes back, so the whole library is swept across consecutive ticks. + Skipped wholesale for Stale libraries via the per-library probe + gate at the top of the loop iteration. + +2. **Back-ref refresh** — DB-only. For `face_detections`, + `tagged_photo`, and `photo_insights`: any hash-keyed row whose + `(library_id, rel_path)` no longer matches an `image_exif` row + *but whose `content_hash` does* is repointed at the surviving + `image_exif` location. Idempotent SQL; no health gate needed. + This is what makes the recent → archive handoff invisible to + read paths: when the missing-file scan retires the lib-A row, + tags/faces/insights pivot to lib-B's path before any user + notices. + +3. **Orphan GC** — destructive. Hash-keyed derived rows whose + `content_hash` no longer has any `image_exif` row are eligible. + Two-tick consensus: a hash must be observed orphaned on two + consecutive ticks AND every library must be online for both. A + single Stale tick within the window cancels all pending deletes. + The pending set is held in memory (`OrphanGcState`) — restart + resets it, which only delays a delete, never causes one. Tags, + faces, and insights for orphaned hashes are deleted in one batch + per tick. + +A backup library that briefly disappears, then returns within two +ticks, never loses any derived data. A move from lib-A to lib-B +without disappearance flips through pass 1 (lib-A row retired) and +pass 2 (back-refs follow), with pass 3 noting nothing because the +hash is still present in `image_exif` (lib-B's row). + ### File Processing Pipeline **Thumbnail Generation:** diff --git a/src/database/mod.rs b/src/database/mod.rs index 444664f..d591c64 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -466,6 +466,18 @@ pub trait ExifDao: Sync + Send { context: &opentelemetry::Context, library_id: i32, ) -> Result; + + /// Paginated rel_path listing for a single library, ordered by id + /// ascending. Used by the missing-file detector to scan a library + /// in capped chunks across consecutive watcher ticks rather than + /// stat()ing every row every minute. Returns `(id, rel_path)`. + fn list_rel_paths_for_library_page( + &mut self, + context: &opentelemetry::Context, + library_id: i32, + limit: i64, + offset: i64, + ) -> Result, DbError>; } pub struct SqliteExifDao { @@ -1129,6 +1141,28 @@ impl ExifDao for SqliteExifDao { }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } + + fn list_rel_paths_for_library_page( + &mut self, + context: &opentelemetry::Context, + library_id_val: i32, + limit: i64, + offset: i64, + ) -> Result, DbError> { + trace_db_call(context, "query", "list_rel_paths_for_library_page", |_span| { + use schema::image_exif::dsl::*; + + image_exif + .filter(library_id.eq(library_id_val)) + .order(id.asc()) + .select((id, rel_path)) + .limit(limit) + .offset(offset) + .load::<(i32, String)>(self.connection.lock().unwrap().deref_mut()) + .map_err(|_| anyhow::anyhow!("Query error")) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } } #[cfg(test)] diff --git a/src/files.rs b/src/files.rs index 67787dd..a1514ab 100644 --- a/src/files.rs +++ b/src/files.rs @@ -1709,6 +1709,16 @@ mod tests { ) -> Result { Ok(0) } + + fn list_rel_paths_for_library_page( + &mut self, + _context: &opentelemetry::Context, + _library_id: i32, + _limit: i64, + _offset: i64, + ) -> Result, DbError> { + Ok(Vec::new()) + } } mod api { diff --git a/src/lib.rs b/src/lib.rs index 2c384e1..52392d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,7 @@ pub mod file_types; pub mod files; pub mod geo; pub mod libraries; +pub mod library_maintenance; pub mod memories; pub mod otel; pub mod parsers; diff --git a/src/library_maintenance.rs b/src/library_maintenance.rs new file mode 100644 index 0000000..4ba5d06 --- /dev/null +++ b/src/library_maintenance.rs @@ -0,0 +1,745 @@ +//! Filesystem-backed maintenance of `image_exif`, the back-ref columns +//! on hash-keyed tables, and orphan derived data. +//! +//! These passes are the operational implementation of the library +//! handoff and orphan rules from CLAUDE.md → "Multi-library data +//! model" / "Library availability and safety": +//! +//! 1. **Missing-file detection** — when a file disappears from disk +//! but its `image_exif` row remains, the row is removed. Naturally +//! implements the move case: when a user moves a file from lib-A +//! to lib-B, the watcher's normal ingest creates the lib-B row; +//! this pass eventually retires the lib-A row. +//! +//! 2. **Back-ref refresh** — hash-keyed rows (`face_detections` and, +//! after Branch B, `tagged_photo` / `photo_insights`) carry a +//! denormalized `(library_id, rel_path)` back-ref. After a move, +//! that back-ref may point at a deleted row. The refresh pass +//! finds rows whose `(library_id, rel_path)` no longer matches +//! any `image_exif` row but whose `content_hash` does, and updates +//! the back-ref to one of the surviving paths. Idempotent. +//! +//! 3. **Orphan GC** — when a `content_hash` no longer has any +//! `image_exif` row referencing it, hash-keyed derived rows for +//! that hash become eligible for deletion. To survive transient +//! unmounts, the pass uses a **two-tick consensus rule**: a hash +//! must be observed orphaned for two consecutive ticks AND every +//! library must be online for both observations. The "marked but +//! not yet deleted" state is held in memory; restarting the +//! watcher resets it (which is fine — the second tick simply +//! happens after the next tick, not the very next one). +//! +//! Pass 1 is filesystem-dependent and gated on the per-library +//! availability probe. Passes 2 and 3 are database-only but pass 3 +//! additionally requires every library to be online for the +//! consensus window. + +use std::collections::HashSet; +use std::path::Path; +use std::sync::{Arc, Mutex}; + +use diesel::prelude::*; +use diesel::sql_query; +use diesel::sqlite::SqliteConnection; +use log::{debug, info, warn}; + +use crate::database::ExifDao; +use crate::libraries::{Library, LibraryHealthMap}; + +/// Cap on missing-file deletions per library per tick. Prevents a +/// pathological mount that returns "not found" for everything (e.g. +/// case-sensitivity flip on a network share that the probe didn't +/// catch) from wiping the entire image_exif table in one tick. Tunable +/// via `IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK`. +pub const DEFAULT_MISSING_DELETE_CAP: usize = 200; + +/// Page size for the missing-file scan. We stat() every row in this +/// batch but only delete those that are confirmed-not-found (subject +/// to the delete cap above). Tunable via +/// `IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE`. +pub const DEFAULT_SCAN_PAGE_SIZE: i64 = 500; + +/// Scan a page of `image_exif` rows for `library`, stat() each one, +/// and delete rows whose source file is gone. Returns +/// `(deleted, next_offset)`. `next_offset` wraps to 0 when the page +/// returned fewer rows than the page size, so the watcher cycles +/// through the whole library across ticks. +/// +/// Caller must already have confirmed the library is online — running +/// against a Stale library would interpret every row as missing. +pub fn detect_missing_files_for_library( + context: &opentelemetry::Context, + library: &Library, + exif_dao: &Arc>>, + offset: i64, + page_size: i64, + delete_cap: usize, +) -> (usize, i64) { + let rows = { + let mut dao = exif_dao.lock().expect("exif_dao poisoned"); + match dao.list_rel_paths_for_library_page(context, library.id, page_size, offset) { + Ok(r) => r, + Err(e) => { + warn!( + "missing-file scan: list page failed for library '{}' (offset={}): {:?}", + library.name, offset, e + ); + return (0, offset); + } + } + }; + let n_returned = rows.len(); + // Wrap offset when we hit the end of the table — next tick starts + // a fresh sweep. Doing it here rather than on the next call keeps + // the offset accounting visible in one place. + let next_offset = if (n_returned as i64) < page_size { + 0 + } else { + offset + page_size + }; + + if rows.is_empty() { + return (0, next_offset); + } + + let root = Path::new(&library.root_path); + let mut to_delete: Vec = Vec::new(); + for (_id, rel_path) in &rows { + if to_delete.len() >= delete_cap { + break; + } + let abs = root.join(rel_path); + match std::fs::metadata(&abs) { + Ok(_) => { + // File still exists — nothing to do. + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + to_delete.push(rel_path.clone()); + } + Err(e) => { + // Permission denied / IO error / etc. — skip this row, + // leave it for the next sweep. We never want a transient + // FS hiccup to mass-delete metadata. + debug!( + "missing-file scan: stat() error for {:?}, skipping: {:?}", + abs, e + ); + } + } + } + + if to_delete.is_empty() { + return (0, next_offset); + } + + let mut deleted = 0; + { + let mut dao = exif_dao.lock().expect("exif_dao poisoned"); + for rel_path in &to_delete { + match dao.delete_exif_by_library(context, library.id, rel_path) { + Ok(()) => deleted += 1, + Err(e) => warn!( + "missing-file scan: delete failed for ({}, {}): {:?}", + library.id, rel_path, e + ), + } + } + } + + if deleted > 0 { + info!( + "missing-file scan: removed {} stale image_exif row(s) from library '{}'", + deleted, library.name + ); + } + + (deleted, next_offset) +} + +/// Refresh the `(library_id, rel_path)` back-refs on hash-keyed +/// tables. A back-ref is stale when: +/// - its `content_hash` is non-null, +/// - that hash is referenced by at least one `image_exif` row, but +/// - the row's own `(library_id, rel_path)` does not appear in +/// `image_exif`. +/// +/// In that case, point the back-ref at any surviving image_exif row +/// for the same hash. `face_detections` is the canonical case (it +/// carries `library_id` + `rel_path` columns); `tagged_photo` and +/// `photo_insights` only carry rel_path historically — we still keep +/// it in sync here for consistency, picking any surviving rel_path. +/// +/// All-SQL, idempotent. Returns the number of rows updated. +pub fn refresh_back_refs(conn: &mut SqliteConnection) -> usize { + let mut total = 0usize; + + // face_detections — back-ref is (library_id, rel_path). Repoint to + // any surviving image_exif row carrying the same content_hash. + let updated = sql_query( + "UPDATE face_detections \ + SET library_id = ( \ + SELECT ie.library_id FROM image_exif ie \ + WHERE ie.content_hash = face_detections.content_hash \ + ORDER BY ie.id LIMIT 1 \ + ), \ + rel_path = ( \ + SELECT ie.rel_path FROM image_exif ie \ + WHERE ie.content_hash = face_detections.content_hash \ + ORDER BY ie.id LIMIT 1 \ + ) \ + WHERE EXISTS ( \ + SELECT 1 FROM image_exif ie \ + WHERE ie.content_hash = face_detections.content_hash \ + ) \ + AND NOT EXISTS ( \ + SELECT 1 FROM image_exif ie \ + WHERE ie.library_id = face_detections.library_id \ + AND ie.rel_path = face_detections.rel_path \ + )", + ) + .execute(conn) + .unwrap_or_else(|e| { + warn!("back-ref refresh: face_detections update failed: {:?}", e); + 0 + }); + total += updated; + + // tagged_photo — only rel_path. Update to any surviving rel_path + // for the same content_hash so the path-only DAO read still finds + // tags after a move. + let updated = sql_query( + "UPDATE tagged_photo \ + SET rel_path = ( \ + SELECT ie.rel_path FROM image_exif ie \ + WHERE ie.content_hash = tagged_photo.content_hash \ + ORDER BY ie.id LIMIT 1 \ + ) \ + WHERE content_hash IS NOT NULL \ + AND EXISTS ( \ + SELECT 1 FROM image_exif ie \ + WHERE ie.content_hash = tagged_photo.content_hash \ + ) \ + AND NOT EXISTS ( \ + SELECT 1 FROM image_exif ie \ + WHERE ie.rel_path = tagged_photo.rel_path \ + )", + ) + .execute(conn) + .unwrap_or_else(|e| { + warn!("back-ref refresh: tagged_photo update failed: {:?}", e); + 0 + }); + total += updated; + + // photo_insights — has both library_id and rel_path. Update both + // when the (library_id, rel_path) tuple no longer matches any + // image_exif row but the hash does. + let updated = sql_query( + "UPDATE photo_insights \ + SET library_id = ( \ + SELECT ie.library_id FROM image_exif ie \ + WHERE ie.content_hash = photo_insights.content_hash \ + ORDER BY ie.id LIMIT 1 \ + ), \ + rel_path = ( \ + SELECT ie.rel_path FROM image_exif ie \ + WHERE ie.content_hash = photo_insights.content_hash \ + ORDER BY ie.id LIMIT 1 \ + ) \ + WHERE content_hash IS NOT NULL \ + AND EXISTS ( \ + SELECT 1 FROM image_exif ie \ + WHERE ie.content_hash = photo_insights.content_hash \ + ) \ + AND NOT EXISTS ( \ + SELECT 1 FROM image_exif ie \ + WHERE ie.library_id = photo_insights.library_id \ + AND ie.rel_path = photo_insights.rel_path \ + )", + ) + .execute(conn) + .unwrap_or_else(|e| { + warn!("back-ref refresh: photo_insights update failed: {:?}", e); + 0 + }); + total += updated; + + if total > 0 { + info!("back-ref refresh: updated {} hash-keyed row(s)", total); + } + total +} + +/// One tick's outcome of the orphan-GC pass. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub struct GcStats { + /// Hashes newly observed orphaned this tick (added to the + /// pending set). + pub newly_marked: usize, + /// Hashes that were marked last tick AND are still orphaned this + /// tick AND every library is online — these are deleted. + pub deleted_face_detections: usize, + pub deleted_tagged_photo: usize, + pub deleted_photo_insights: usize, + /// Hashes dropped from the pending set because they re-appeared + /// in image_exif (e.g. user remounted a backup that was briefly + /// missing). + pub revived: usize, +} + +impl GcStats { + pub fn changed(&self) -> bool { + self.newly_marked > 0 + || self.deleted_face_detections > 0 + || self.deleted_tagged_photo > 0 + || self.deleted_photo_insights > 0 + || self.revived > 0 + } + + pub fn total_deleted(&self) -> usize { + self.deleted_face_detections + self.deleted_tagged_photo + self.deleted_photo_insights + } +} + +/// Two-tick orphan-GC state. The watcher constructs one of these once +/// at startup and passes it back into `run_orphan_gc` every tick. +#[derive(Debug, Default)] +pub struct OrphanGcState { + /// Hashes observed orphaned on the previous tick. A hash gets + /// promoted to "delete" when it survives a second consecutive + /// observation with all libraries online. + pending: HashSet, + /// Whether every library was online on the previous tick. Combined + /// with the all-online check on the current tick, this gives the + /// "two consecutive ticks of full availability" guard described in + /// CLAUDE.md → "Library availability and safety". + prev_tick_all_online: bool, +} + +/// Run one tick of the orphan GC. The function is responsible for the +/// full lifecycle: probing for orphans, updating `state.pending`, +/// performing deletes when consensus is reached, and returning stats +/// for the watcher to log. +/// +/// Safety guard: `all_online` MUST reflect every configured library +/// being Online right now. Even if true, deletes only happen when the +/// previous tick was also all-online. A single Stale tick within the +/// window cancels any pending deletes (they stay marked but won't be +/// promoted) — they're then re-evaluated next tick. +pub fn run_orphan_gc( + conn: &mut SqliteConnection, + state: &mut OrphanGcState, + all_online: bool, +) -> GcStats { + let mut stats = GcStats::default(); + + // Find every distinct content_hash referenced by hash-keyed + // derived data that is NOT currently referenced by image_exif. + // These are this tick's orphan candidates. Cheap query — three + // index lookups + a HashSet at row count of derived tables, which + // is small. + let orphans: HashSet = match collect_orphan_hashes(conn) { + Ok(set) => set, + Err(e) => { + warn!("orphan-gc: candidate query failed: {:?}", e); + return stats; + } + }; + + // Drop entries from pending that are no longer orphaned + // ("revived"). Common case: a network share that briefly went + // stale comes back, image_exif gets re-populated by ingest, and + // the hash is no longer orphaned. + let revived = state.pending.difference(&orphans).cloned().collect::>(); + if !revived.is_empty() { + for h in &revived { + state.pending.remove(h); + } + stats.revived = revived.len(); + } + + if !all_online { + // Every Stale library cancels both the consensus window AND + // any pending deletes. We *do* still note newly observed + // orphans below — that's harmless bookkeeping. But we never + // delete this tick. + for h in &orphans { + if state.pending.insert(h.clone()) { + stats.newly_marked += 1; + } + } + state.prev_tick_all_online = false; + return stats; + } + + // All-online + previous-tick-also-all-online: hashes that are + // both pending AND still orphaned this tick are confirmed and + // get deleted. Hashes orphaned this tick but not pending get + // freshly marked. + let consensus_window_open = state.prev_tick_all_online; + + let to_delete: Vec = if consensus_window_open { + orphans + .iter() + .filter(|h| state.pending.contains(*h)) + .cloned() + .collect() + } else { + Vec::new() + }; + + for h in &orphans { + if !state.pending.contains(h) { + state.pending.insert(h.clone()); + stats.newly_marked += 1; + } + } + + if !to_delete.is_empty() { + match delete_hash_keyed_rows(conn, &to_delete) { + Ok((faces, tags, insights)) => { + stats.deleted_face_detections = faces; + stats.deleted_tagged_photo = tags; + stats.deleted_photo_insights = insights; + // Drop deleted hashes from pending so we don't try to + // re-delete them next tick (they'll have already been + // removed from the orphan set). + for h in &to_delete { + state.pending.remove(h); + } + } + Err(e) => warn!("orphan-gc: delete batch failed: {:?}", e), + } + } + + state.prev_tick_all_online = true; + + if stats.changed() { + info!( + "orphan-gc: marked {} new, revived {}; deleted {} face_detections / {} tagged_photo / {} photo_insights", + stats.newly_marked, + stats.revived, + stats.deleted_face_detections, + stats.deleted_tagged_photo, + stats.deleted_photo_insights, + ); + } else { + debug!("orphan-gc: no changes this tick (pending: {})", state.pending.len()); + } + + stats +} + +/// Helper for the watcher: are *all* libraries currently Online? +pub fn all_libraries_online(libs: &[Library], health: &LibraryHealthMap) -> bool { + let guard = health.read().unwrap_or_else(|e| e.into_inner()); + libs.iter().all(|lib| { + guard + .get(&lib.id) + .map(|h| h.is_online()) + .unwrap_or(false) + }) +} + +#[derive(QueryableByName, Debug)] +struct HashRow { + #[diesel(sql_type = diesel::sql_types::Text)] + content_hash: String, +} + +fn collect_orphan_hashes(conn: &mut SqliteConnection) -> QueryResult> { + // Union of every distinct content_hash carried by hash-keyed + // derived tables, minus those still referenced by image_exif. + let rows = sql_query( + "SELECT DISTINCT content_hash FROM ( \ + SELECT content_hash FROM face_detections WHERE content_hash IS NOT NULL \ + UNION ALL \ + SELECT content_hash FROM tagged_photo WHERE content_hash IS NOT NULL \ + UNION ALL \ + SELECT content_hash FROM photo_insights WHERE content_hash IS NOT NULL \ + ) AS derived \ + WHERE content_hash NOT IN ( \ + SELECT content_hash FROM image_exif WHERE content_hash IS NOT NULL \ + )", + ) + .get_results::(conn)?; + + Ok(rows.into_iter().map(|r| r.content_hash).collect()) +} + +/// Delete every hash-keyed row whose `content_hash` is in `hashes`. +/// Returns `(faces, tagged_photo, photo_insights)`. +fn delete_hash_keyed_rows( + conn: &mut SqliteConnection, + hashes: &[String], +) -> QueryResult<(usize, usize, usize)> { + if hashes.is_empty() { + return Ok((0, 0, 0)); + } + + use crate::database::schema::{face_detections, photo_insights, tagged_photo}; + + let faces = diesel::delete( + face_detections::table.filter(face_detections::content_hash.eq_any(hashes)), + ) + .execute(conn)?; + let tags = diesel::delete( + tagged_photo::table.filter(tagged_photo::content_hash.eq_any(hashes)), + ) + .execute(conn)?; + let insights = diesel::delete( + photo_insights::table.filter(photo_insights::content_hash.eq_any(hashes)), + ) + .execute(conn)?; + + Ok((faces, tags, insights)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::database::test::in_memory_db_connection; + + fn ensure_library(conn: &mut SqliteConnection, library_id: i32) { + diesel::sql_query( + "INSERT OR IGNORE INTO libraries (id, name, root_path, created_at) \ + VALUES (?, 'test-' || ?, '/tmp/test-' || ?, 0)", + ) + .bind::(library_id) + .bind::(library_id) + .bind::(library_id) + .execute(conn) + .unwrap(); + } + + fn insert_image_exif( + conn: &mut SqliteConnection, + library_id: i32, + rel_path: &str, + content_hash: Option<&str>, + ) { + ensure_library(conn, library_id); + diesel::sql_query( + "INSERT INTO image_exif (library_id, rel_path, created_time, last_modified, content_hash) \ + VALUES (?, ?, 0, 0, ?)", + ) + .bind::(library_id) + .bind::(rel_path) + .bind::, _>(content_hash) + .execute(conn) + .unwrap(); + } + + fn insert_face(conn: &mut SqliteConnection, library_id: i32, rel_path: &str, hash: &str) { + ensure_library(conn, library_id); + diesel::sql_query( + "INSERT INTO face_detections (library_id, content_hash, rel_path, source, status, model_version, created_at) \ + VALUES (?, ?, ?, 'auto', 'no_faces', 'v', 0)", + ) + .bind::(library_id) + .bind::(hash) + .bind::(rel_path) + .execute(conn) + .unwrap(); + } + + fn insert_tag_with_hash(conn: &mut SqliteConnection, rel_path: &str, hash: &str) { + diesel::sql_query("INSERT OR IGNORE INTO tags (id, name, created_time) VALUES (1, 't', 0)") + .execute(conn) + .unwrap(); + diesel::sql_query( + "INSERT INTO tagged_photo (rel_path, tag_id, created_time, content_hash) VALUES (?, 1, 0, ?)", + ) + .bind::(rel_path) + .bind::(hash) + .execute(conn) + .unwrap(); + } + + fn insert_insight_with_hash( + conn: &mut SqliteConnection, + library_id: i32, + rel_path: &str, + hash: &str, + ) { + ensure_library(conn, library_id); + diesel::sql_query( + "INSERT INTO photo_insights (library_id, rel_path, title, summary, generated_at, model_version, is_current, backend, content_hash) \ + VALUES (?, ?, 't', 's', 0, 'v', 1, 'local', ?)", + ) + .bind::(library_id) + .bind::(rel_path) + .bind::(hash) + .execute(conn) + .unwrap(); + } + + #[derive(QueryableByName, Debug)] + struct CountRow { + #[diesel(sql_type = diesel::sql_types::BigInt)] + n: i64, + } + fn count(conn: &mut SqliteConnection, sql: &str) -> i64 { + diesel::sql_query(sql).get_result::(conn).unwrap().n + } + + #[test] + fn refresh_back_refs_repoints_face_detection_after_move() { + let mut conn = in_memory_db_connection(); + // Original location lib 1, rel "old.jpg". image_exif row gone + // (file moved); only the new lib 2 row remains. + insert_image_exif(&mut conn, 2, "new.jpg", Some("h1")); + insert_face(&mut conn, 1, "old.jpg", "h1"); + + let updated = refresh_back_refs(&mut conn); + assert_eq!(updated, 1); + + let row = diesel::sql_query("SELECT library_id AS n FROM face_detections") + .get_result::(&mut conn) + .unwrap(); + assert_eq!(row.n, 2, "library_id should now point at lib 2"); + } + + #[test] + fn refresh_back_refs_no_change_when_back_ref_still_valid() { + let mut conn = in_memory_db_connection(); + insert_image_exif(&mut conn, 1, "a.jpg", Some("h1")); + insert_face(&mut conn, 1, "a.jpg", "h1"); + + let updated = refresh_back_refs(&mut conn); + assert_eq!(updated, 0); + } + + #[test] + fn refresh_back_refs_no_change_when_hash_fully_orphaned() { + // Hash exists on face_detections but no surviving image_exif + // row for it → the refresh is a no-op (orphan GC handles + // these). Important: the SET subquery would return NULL and + // we'd null out the back-ref otherwise; the EXISTS guard + // protects against that. + let mut conn = in_memory_db_connection(); + insert_face(&mut conn, 1, "gone.jpg", "h1"); + + let updated = refresh_back_refs(&mut conn); + assert_eq!(updated, 0); + } + + #[test] + fn orphan_gc_requires_two_consecutive_all_online_ticks() { + let mut conn = in_memory_db_connection(); + // Hash present in face_detections but NOT image_exif → orphan. + insert_face(&mut conn, 1, "x.jpg", "h-orphan"); + let mut state = OrphanGcState::default(); + + // Tick 1: prev_tick_all_online is false (default), so even + // with current tick all-online we mark only. + let stats = run_orphan_gc(&mut conn, &mut state, true); + assert_eq!(stats.newly_marked, 1); + assert_eq!(stats.total_deleted(), 0); + assert_eq!(state.pending.len(), 1); + + // Tick 2: prev_tick_all_online is now true, current tick still + // all-online → consensus reached, hash gets deleted. + let stats = run_orphan_gc(&mut conn, &mut state, true); + assert_eq!(stats.deleted_face_detections, 1); + assert!(state.pending.is_empty()); + + // Tick 3: nothing left. + let stats = run_orphan_gc(&mut conn, &mut state, true); + assert_eq!(stats.total_deleted(), 0); + assert_eq!(stats.newly_marked, 0); + } + + #[test] + fn orphan_gc_resets_consensus_on_stale_library() { + let mut conn = in_memory_db_connection(); + insert_face(&mut conn, 1, "x.jpg", "h-orphan"); + let mut state = OrphanGcState::default(); + + // Tick 1: all-online, mark. + run_orphan_gc(&mut conn, &mut state, true); + // Tick 2: stale library — consensus window resets, no delete. + let stats = run_orphan_gc(&mut conn, &mut state, false); + assert_eq!(stats.total_deleted(), 0); + assert!(!state.prev_tick_all_online); + // Tick 3: all-online again — but we need ANOTHER tick to set + // prev_tick_all_online before deletes can fire. So tick 3 + // marks (no-op on existing pending), tick 4 deletes. + let stats = run_orphan_gc(&mut conn, &mut state, true); + assert_eq!(stats.total_deleted(), 0); + let stats = run_orphan_gc(&mut conn, &mut state, true); + assert_eq!(stats.deleted_face_detections, 1); + } + + #[test] + fn orphan_gc_revives_when_image_exif_reappears() { + let mut conn = in_memory_db_connection(); + insert_face(&mut conn, 1, "x.jpg", "h-orphan"); + let mut state = OrphanGcState::default(); + + // Tick 1: mark. + run_orphan_gc(&mut conn, &mut state, true); + assert!(state.pending.contains("h-orphan")); + + // Between ticks, the image_exif row reappears (e.g. backup + // share was briefly stale). Hash is no longer orphaned. + insert_image_exif(&mut conn, 2, "x.jpg", Some("h-orphan")); + + let stats = run_orphan_gc(&mut conn, &mut state, true); + assert_eq!(stats.revived, 1); + assert_eq!(stats.total_deleted(), 0); + assert!(state.pending.is_empty()); + } + + #[test] + fn orphan_gc_deletes_across_all_three_tables() { + let mut conn = in_memory_db_connection(); + // Same orphan hash appears in all three derived tables. + insert_face(&mut conn, 1, "a.jpg", "h-orphan"); + insert_tag_with_hash(&mut conn, "a.jpg", "h-orphan"); + insert_insight_with_hash(&mut conn, 1, "a.jpg", "h-orphan"); + + let mut state = OrphanGcState::default(); + run_orphan_gc(&mut conn, &mut state, true); + let stats = run_orphan_gc(&mut conn, &mut state, true); + assert_eq!(stats.deleted_face_detections, 1); + assert_eq!(stats.deleted_tagged_photo, 1); + assert_eq!(stats.deleted_photo_insights, 1); + + assert_eq!(count(&mut conn, "SELECT COUNT(*) AS n FROM face_detections"), 0); + assert_eq!(count(&mut conn, "SELECT COUNT(*) AS n FROM tagged_photo"), 0); + assert_eq!(count(&mut conn, "SELECT COUNT(*) AS n FROM photo_insights"), 0); + } + + #[test] + fn all_libraries_online_helper() { + use crate::libraries::{LibraryHealth, new_health_map}; + let libs = vec![ + Library { + id: 1, + name: "a".into(), + root_path: "/x".into(), + }, + Library { + id: 2, + name: "b".into(), + root_path: "/y".into(), + }, + ]; + let health = new_health_map(&libs); + assert!(all_libraries_online(&libs, &health)); + + // Flip lib 2 to stale. + { + let mut g = health.write().unwrap(); + g.insert( + 2, + LibraryHealth::Stale { + reason: "test".into(), + since: 0, + }, + ); + } + assert!(!all_libraries_online(&libs, &health)); + } +} diff --git a/src/main.rs b/src/main.rs index f9f9dbe..57a3761 100644 --- a/src/main.rs +++ b/src/main.rs @@ -72,6 +72,7 @@ mod file_types; mod files; mod geo; mod libraries; +mod library_maintenance; mod state; mod tags; mod utils; @@ -1945,6 +1946,29 @@ fn watch_files( let mut last_full_scan = SystemTime::now(); let mut scan_count = 0u64; + // Per-library cursor for the missing-file scan. Each tick reads + // a page from `offset`, stat()s the rows, deletes confirmed- + // missing ones, and advances or wraps the cursor. State held + // in-memory so a watcher restart resumes from 0 — fine, the + // sweep is idempotent. + let mut missing_file_offsets: std::collections::HashMap = + std::collections::HashMap::new(); + + let missing_scan_page_size: i64 = dotenv::var("IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &i64| *n > 0) + .unwrap_or(library_maintenance::DEFAULT_SCAN_PAGE_SIZE); + let missing_delete_cap: usize = dotenv::var("IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &usize| *n > 0) + .unwrap_or(library_maintenance::DEFAULT_MISSING_DELETE_CAP); + + // Two-tick orphan-GC consensus state. Carried across ticks via + // `OrphanGcState`; see library_maintenance::run_orphan_gc. + let mut orphan_gc_state = library_maintenance::OrphanGcState::default(); + // Initial availability sweep before the loop's first sleep so // /libraries reports the truth from the very first request, // rather than the optimistic Online default that @@ -2061,6 +2085,35 @@ fn watch_files( // Update media counts per library (metric aggregates across all) update_media_counts(Path::new(&lib.root_path), &excluded_dirs); + + // Missing-file detection: prune image_exif rows whose + // source file is no longer on disk. Per-library, so we + // pass library-online-this-tick implicitly (we only + // reach here if the probe gate at the top of the + // iteration passed). Capped + paginated so a huge + // library doesn't stall the watcher; rows we don't + // visit this tick get visited next tick. See + // library_maintenance::detect_missing_files_for_library. + { + let context = opentelemetry::Context::new(); + let offset = missing_file_offsets.get(&lib.id).copied().unwrap_or(0); + let (deleted, next_offset) = + library_maintenance::detect_missing_files_for_library( + &context, + lib, + &exif_dao, + offset, + missing_scan_page_size, + missing_delete_cap, + ); + missing_file_offsets.insert(lib.id, next_offset); + if deleted > 0 { + debug!( + "missing-file scan: library '{}' next_offset={}", + lib.name, next_offset + ); + } + } } // Reconciliation: cross-library, so it runs once per tick @@ -2072,6 +2125,29 @@ fn watch_files( { let mut conn = image_api::database::connect(); let _ = image_api::database::reconcile::run(&mut conn); + + // Back-ref refresh: hash-keyed rows whose + // (library_id, rel_path) tuple no longer matches any + // image_exif row but whose hash still does. After a + // recent→archive move, the missing-file scan removes + // the old image_exif row; this pass repoints face / + // tag / insight back-refs at the surviving location. + // DB-only, no health gate needed — uses what's in + // image_exif as truth. + let _ = library_maintenance::refresh_back_refs(&mut conn); + + // Orphan GC: the destructive end of the maintenance + // pipeline. Two-tick consensus + every-library-online + // requirement is enforced inside run_orphan_gc; we + // pass the current all-online flag and the function + // tracks the previous tick's flag in OrphanGcState. + let all_online = + library_maintenance::all_libraries_online(&libs, &library_health); + let _ = library_maintenance::run_orphan_gc( + &mut conn, + &mut orphan_gc_state, + all_online, + ); } if is_full_scan {