multi-library: handoff + orphan GC with two-tick consensus
Branch C of the multi-library data-model rollout. Implements the
operational maintenance pipeline pinned in CLAUDE.md → "Multi-library
data model" / "Library availability and safety". Branches A and B
land first; this branch builds on top.
New module: src/library_maintenance.rs
Three idempotent passes the watcher runs every tick after the
per-library ingest loop:
1. Missing-file scan (per online library)
For each Online library, load a paginated page of image_exif rows
(IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE, default 500), stat() each one,
and delete rows whose source file is NotFound. Permission/IO
errors are skipped, never deleted. Capped at
IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK (default 200) per library
per tick — so a pathological mount that returns NotFound for
everything can't wipe the table in one cycle. Cursor advances
across ticks, wraps on partial-page returns, and naturally cycles
through the entire library over many minutes. Skipped wholesale
for Stale libraries via the existing probe gate.
2. Back-ref refresh (DB-only)
For face_detections / tagged_photo / photo_insights: any
hash-keyed row whose (library_id, rel_path) no longer matches an
image_exif row, but whose content_hash does, is repointed at a
surviving image_exif location. Pure SQL with EXISTS guards so
rows whose hash is fully orphaned are left alone (the orphan GC
handles those). Idempotent; no availability gate needed.
This is what makes a recent → archive move invisible to readers:
when pass 1 retires the lib-A row, pass 2 pivots tags / faces /
insights to lib-B's surviving path before any client notices.
3. Orphan GC (destructive)
Hash-keyed derived rows whose content_hash has no image_exif
referent are GC-eligible. Two-tick consensus: a hash must be
observed orphaned on two consecutive ticks AND every library must
be Online for both. A single Stale tick within the window cancels
all pending deletes (they remain marked but won't be promoted) —
they're re-evaluated next tick. The pending set lives in
OrphanGcState (in-memory); a watcher restart resets it, which can
only delay a delete, never cause one. Hashes that re-appear in
image_exif between ticks are "revived" from the pending set
(handles transient share unmount / remount).
Two new ExifDao methods:
- list_rel_paths_for_library_page(library_id, limit, offset) for
the paginated missing-file scan.
- (count_for_library landed in Branch A.)
Watcher wiring (main.rs)
Per-library: missing-file scan inside the existing per-library
loop, after process_new_files, gated by the same probe check that
already protects ingest. After the loop: reconcile (Branch B),
back-ref refresh, then run_orphan_gc. The maintenance connection is
opened once per tick (image_api::database::connect), used by all
three DB-only passes, and dropped at end of tick.
CLAUDE.md gains a "Maintenance pipeline" subsection that describes
the three passes and their interaction with the existing
availability-and-safety policy.
Tests: 225 pass (217 from Branch B + 8 new in library_maintenance
covering back-ref refresh including the fully-orphaned no-op case,
two-tick GC consensus, Stale-tick consensus reset, image_exif
re-appearance revival, multi-table delete, and the
all_libraries_online helper).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -466,6 +466,18 @@ pub trait ExifDao: Sync + Send {
|
||||
context: &opentelemetry::Context,
|
||||
library_id: i32,
|
||||
) -> Result<i64, DbError>;
|
||||
|
||||
/// Paginated rel_path listing for a single library, ordered by id
|
||||
/// ascending. Used by the missing-file detector to scan a library
|
||||
/// in capped chunks across consecutive watcher ticks rather than
|
||||
/// stat()ing every row every minute. Returns `(id, rel_path)`.
|
||||
fn list_rel_paths_for_library_page(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
library_id: i32,
|
||||
limit: i64,
|
||||
offset: i64,
|
||||
) -> Result<Vec<(i32, String)>, DbError>;
|
||||
}
|
||||
|
||||
pub struct SqliteExifDao {
|
||||
@@ -1129,6 +1141,28 @@ impl ExifDao for SqliteExifDao {
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
|
||||
fn list_rel_paths_for_library_page(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
library_id_val: i32,
|
||||
limit: i64,
|
||||
offset: i64,
|
||||
) -> Result<Vec<(i32, String)>, DbError> {
|
||||
trace_db_call(context, "query", "list_rel_paths_for_library_page", |_span| {
|
||||
use schema::image_exif::dsl::*;
|
||||
|
||||
image_exif
|
||||
.filter(library_id.eq(library_id_val))
|
||||
.order(id.asc())
|
||||
.select((id, rel_path))
|
||||
.limit(limit)
|
||||
.offset(offset)
|
||||
.load::<(i32, String)>(self.connection.lock().unwrap().deref_mut())
|
||||
.map_err(|_| anyhow::anyhow!("Query error"))
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
10
src/files.rs
10
src/files.rs
@@ -1709,6 +1709,16 @@ mod tests {
|
||||
) -> Result<i64, DbError> {
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
fn list_rel_paths_for_library_page(
|
||||
&mut self,
|
||||
_context: &opentelemetry::Context,
|
||||
_library_id: i32,
|
||||
_limit: i64,
|
||||
_offset: i64,
|
||||
) -> Result<Vec<(i32, String)>, DbError> {
|
||||
Ok(Vec::new())
|
||||
}
|
||||
}
|
||||
|
||||
mod api {
|
||||
|
||||
@@ -19,6 +19,7 @@ pub mod file_types;
|
||||
pub mod files;
|
||||
pub mod geo;
|
||||
pub mod libraries;
|
||||
pub mod library_maintenance;
|
||||
pub mod memories;
|
||||
pub mod otel;
|
||||
pub mod parsers;
|
||||
|
||||
745
src/library_maintenance.rs
Normal file
745
src/library_maintenance.rs
Normal file
@@ -0,0 +1,745 @@
|
||||
//! Filesystem-backed maintenance of `image_exif`, the back-ref columns
|
||||
//! on hash-keyed tables, and orphan derived data.
|
||||
//!
|
||||
//! These passes are the operational implementation of the library
|
||||
//! handoff and orphan rules from CLAUDE.md → "Multi-library data
|
||||
//! model" / "Library availability and safety":
|
||||
//!
|
||||
//! 1. **Missing-file detection** — when a file disappears from disk
|
||||
//! but its `image_exif` row remains, the row is removed. Naturally
|
||||
//! implements the move case: when a user moves a file from lib-A
|
||||
//! to lib-B, the watcher's normal ingest creates the lib-B row;
|
||||
//! this pass eventually retires the lib-A row.
|
||||
//!
|
||||
//! 2. **Back-ref refresh** — hash-keyed rows (`face_detections` and,
|
||||
//! after Branch B, `tagged_photo` / `photo_insights`) carry a
|
||||
//! denormalized `(library_id, rel_path)` back-ref. After a move,
|
||||
//! that back-ref may point at a deleted row. The refresh pass
|
||||
//! finds rows whose `(library_id, rel_path)` no longer matches
|
||||
//! any `image_exif` row but whose `content_hash` does, and updates
|
||||
//! the back-ref to one of the surviving paths. Idempotent.
|
||||
//!
|
||||
//! 3. **Orphan GC** — when a `content_hash` no longer has any
|
||||
//! `image_exif` row referencing it, hash-keyed derived rows for
|
||||
//! that hash become eligible for deletion. To survive transient
|
||||
//! unmounts, the pass uses a **two-tick consensus rule**: a hash
|
||||
//! must be observed orphaned for two consecutive ticks AND every
|
||||
//! library must be online for both observations. The "marked but
|
||||
//! not yet deleted" state is held in memory; restarting the
|
||||
//! watcher resets it (which is fine — the second tick simply
|
||||
//! happens after the next tick, not the very next one).
|
||||
//!
|
||||
//! Pass 1 is filesystem-dependent and gated on the per-library
|
||||
//! availability probe. Passes 2 and 3 are database-only but pass 3
|
||||
//! additionally requires every library to be online for the
|
||||
//! consensus window.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use diesel::prelude::*;
|
||||
use diesel::sql_query;
|
||||
use diesel::sqlite::SqliteConnection;
|
||||
use log::{debug, info, warn};
|
||||
|
||||
use crate::database::ExifDao;
|
||||
use crate::libraries::{Library, LibraryHealthMap};
|
||||
|
||||
/// Cap on missing-file deletions per library per tick. Prevents a
|
||||
/// pathological mount that returns "not found" for everything (e.g.
|
||||
/// case-sensitivity flip on a network share that the probe didn't
|
||||
/// catch) from wiping the entire image_exif table in one tick. Tunable
|
||||
/// via `IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK`.
|
||||
pub const DEFAULT_MISSING_DELETE_CAP: usize = 200;
|
||||
|
||||
/// Page size for the missing-file scan. We stat() every row in this
|
||||
/// batch but only delete those that are confirmed-not-found (subject
|
||||
/// to the delete cap above). Tunable via
|
||||
/// `IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE`.
|
||||
pub const DEFAULT_SCAN_PAGE_SIZE: i64 = 500;
|
||||
|
||||
/// Scan a page of `image_exif` rows for `library`, stat() each one,
|
||||
/// and delete rows whose source file is gone. Returns
|
||||
/// `(deleted, next_offset)`. `next_offset` wraps to 0 when the page
|
||||
/// returned fewer rows than the page size, so the watcher cycles
|
||||
/// through the whole library across ticks.
|
||||
///
|
||||
/// Caller must already have confirmed the library is online — running
|
||||
/// against a Stale library would interpret every row as missing.
|
||||
pub fn detect_missing_files_for_library(
|
||||
context: &opentelemetry::Context,
|
||||
library: &Library,
|
||||
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
|
||||
offset: i64,
|
||||
page_size: i64,
|
||||
delete_cap: usize,
|
||||
) -> (usize, i64) {
|
||||
let rows = {
|
||||
let mut dao = exif_dao.lock().expect("exif_dao poisoned");
|
||||
match dao.list_rel_paths_for_library_page(context, library.id, page_size, offset) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"missing-file scan: list page failed for library '{}' (offset={}): {:?}",
|
||||
library.name, offset, e
|
||||
);
|
||||
return (0, offset);
|
||||
}
|
||||
}
|
||||
};
|
||||
let n_returned = rows.len();
|
||||
// Wrap offset when we hit the end of the table — next tick starts
|
||||
// a fresh sweep. Doing it here rather than on the next call keeps
|
||||
// the offset accounting visible in one place.
|
||||
let next_offset = if (n_returned as i64) < page_size {
|
||||
0
|
||||
} else {
|
||||
offset + page_size
|
||||
};
|
||||
|
||||
if rows.is_empty() {
|
||||
return (0, next_offset);
|
||||
}
|
||||
|
||||
let root = Path::new(&library.root_path);
|
||||
let mut to_delete: Vec<String> = Vec::new();
|
||||
for (_id, rel_path) in &rows {
|
||||
if to_delete.len() >= delete_cap {
|
||||
break;
|
||||
}
|
||||
let abs = root.join(rel_path);
|
||||
match std::fs::metadata(&abs) {
|
||||
Ok(_) => {
|
||||
// File still exists — nothing to do.
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
to_delete.push(rel_path.clone());
|
||||
}
|
||||
Err(e) => {
|
||||
// Permission denied / IO error / etc. — skip this row,
|
||||
// leave it for the next sweep. We never want a transient
|
||||
// FS hiccup to mass-delete metadata.
|
||||
debug!(
|
||||
"missing-file scan: stat() error for {:?}, skipping: {:?}",
|
||||
abs, e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if to_delete.is_empty() {
|
||||
return (0, next_offset);
|
||||
}
|
||||
|
||||
let mut deleted = 0;
|
||||
{
|
||||
let mut dao = exif_dao.lock().expect("exif_dao poisoned");
|
||||
for rel_path in &to_delete {
|
||||
match dao.delete_exif_by_library(context, library.id, rel_path) {
|
||||
Ok(()) => deleted += 1,
|
||||
Err(e) => warn!(
|
||||
"missing-file scan: delete failed for ({}, {}): {:?}",
|
||||
library.id, rel_path, e
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if deleted > 0 {
|
||||
info!(
|
||||
"missing-file scan: removed {} stale image_exif row(s) from library '{}'",
|
||||
deleted, library.name
|
||||
);
|
||||
}
|
||||
|
||||
(deleted, next_offset)
|
||||
}
|
||||
|
||||
/// Refresh the `(library_id, rel_path)` back-refs on hash-keyed
|
||||
/// tables. A back-ref is stale when:
|
||||
/// - its `content_hash` is non-null,
|
||||
/// - that hash is referenced by at least one `image_exif` row, but
|
||||
/// - the row's own `(library_id, rel_path)` does not appear in
|
||||
/// `image_exif`.
|
||||
///
|
||||
/// In that case, point the back-ref at any surviving image_exif row
|
||||
/// for the same hash. `face_detections` is the canonical case (it
|
||||
/// carries `library_id` + `rel_path` columns); `tagged_photo` and
|
||||
/// `photo_insights` only carry rel_path historically — we still keep
|
||||
/// it in sync here for consistency, picking any surviving rel_path.
|
||||
///
|
||||
/// All-SQL, idempotent. Returns the number of rows updated.
|
||||
pub fn refresh_back_refs(conn: &mut SqliteConnection) -> usize {
|
||||
let mut total = 0usize;
|
||||
|
||||
// face_detections — back-ref is (library_id, rel_path). Repoint to
|
||||
// any surviving image_exif row carrying the same content_hash.
|
||||
let updated = sql_query(
|
||||
"UPDATE face_detections \
|
||||
SET library_id = ( \
|
||||
SELECT ie.library_id FROM image_exif ie \
|
||||
WHERE ie.content_hash = face_detections.content_hash \
|
||||
ORDER BY ie.id LIMIT 1 \
|
||||
), \
|
||||
rel_path = ( \
|
||||
SELECT ie.rel_path FROM image_exif ie \
|
||||
WHERE ie.content_hash = face_detections.content_hash \
|
||||
ORDER BY ie.id LIMIT 1 \
|
||||
) \
|
||||
WHERE EXISTS ( \
|
||||
SELECT 1 FROM image_exif ie \
|
||||
WHERE ie.content_hash = face_detections.content_hash \
|
||||
) \
|
||||
AND NOT EXISTS ( \
|
||||
SELECT 1 FROM image_exif ie \
|
||||
WHERE ie.library_id = face_detections.library_id \
|
||||
AND ie.rel_path = face_detections.rel_path \
|
||||
)",
|
||||
)
|
||||
.execute(conn)
|
||||
.unwrap_or_else(|e| {
|
||||
warn!("back-ref refresh: face_detections update failed: {:?}", e);
|
||||
0
|
||||
});
|
||||
total += updated;
|
||||
|
||||
// tagged_photo — only rel_path. Update to any surviving rel_path
|
||||
// for the same content_hash so the path-only DAO read still finds
|
||||
// tags after a move.
|
||||
let updated = sql_query(
|
||||
"UPDATE tagged_photo \
|
||||
SET rel_path = ( \
|
||||
SELECT ie.rel_path FROM image_exif ie \
|
||||
WHERE ie.content_hash = tagged_photo.content_hash \
|
||||
ORDER BY ie.id LIMIT 1 \
|
||||
) \
|
||||
WHERE content_hash IS NOT NULL \
|
||||
AND EXISTS ( \
|
||||
SELECT 1 FROM image_exif ie \
|
||||
WHERE ie.content_hash = tagged_photo.content_hash \
|
||||
) \
|
||||
AND NOT EXISTS ( \
|
||||
SELECT 1 FROM image_exif ie \
|
||||
WHERE ie.rel_path = tagged_photo.rel_path \
|
||||
)",
|
||||
)
|
||||
.execute(conn)
|
||||
.unwrap_or_else(|e| {
|
||||
warn!("back-ref refresh: tagged_photo update failed: {:?}", e);
|
||||
0
|
||||
});
|
||||
total += updated;
|
||||
|
||||
// photo_insights — has both library_id and rel_path. Update both
|
||||
// when the (library_id, rel_path) tuple no longer matches any
|
||||
// image_exif row but the hash does.
|
||||
let updated = sql_query(
|
||||
"UPDATE photo_insights \
|
||||
SET library_id = ( \
|
||||
SELECT ie.library_id FROM image_exif ie \
|
||||
WHERE ie.content_hash = photo_insights.content_hash \
|
||||
ORDER BY ie.id LIMIT 1 \
|
||||
), \
|
||||
rel_path = ( \
|
||||
SELECT ie.rel_path FROM image_exif ie \
|
||||
WHERE ie.content_hash = photo_insights.content_hash \
|
||||
ORDER BY ie.id LIMIT 1 \
|
||||
) \
|
||||
WHERE content_hash IS NOT NULL \
|
||||
AND EXISTS ( \
|
||||
SELECT 1 FROM image_exif ie \
|
||||
WHERE ie.content_hash = photo_insights.content_hash \
|
||||
) \
|
||||
AND NOT EXISTS ( \
|
||||
SELECT 1 FROM image_exif ie \
|
||||
WHERE ie.library_id = photo_insights.library_id \
|
||||
AND ie.rel_path = photo_insights.rel_path \
|
||||
)",
|
||||
)
|
||||
.execute(conn)
|
||||
.unwrap_or_else(|e| {
|
||||
warn!("back-ref refresh: photo_insights update failed: {:?}", e);
|
||||
0
|
||||
});
|
||||
total += updated;
|
||||
|
||||
if total > 0 {
|
||||
info!("back-ref refresh: updated {} hash-keyed row(s)", total);
|
||||
}
|
||||
total
|
||||
}
|
||||
|
||||
/// One tick's outcome of the orphan-GC pass.
|
||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct GcStats {
|
||||
/// Hashes newly observed orphaned this tick (added to the
|
||||
/// pending set).
|
||||
pub newly_marked: usize,
|
||||
/// Hashes that were marked last tick AND are still orphaned this
|
||||
/// tick AND every library is online — these are deleted.
|
||||
pub deleted_face_detections: usize,
|
||||
pub deleted_tagged_photo: usize,
|
||||
pub deleted_photo_insights: usize,
|
||||
/// Hashes dropped from the pending set because they re-appeared
|
||||
/// in image_exif (e.g. user remounted a backup that was briefly
|
||||
/// missing).
|
||||
pub revived: usize,
|
||||
}
|
||||
|
||||
impl GcStats {
|
||||
pub fn changed(&self) -> bool {
|
||||
self.newly_marked > 0
|
||||
|| self.deleted_face_detections > 0
|
||||
|| self.deleted_tagged_photo > 0
|
||||
|| self.deleted_photo_insights > 0
|
||||
|| self.revived > 0
|
||||
}
|
||||
|
||||
pub fn total_deleted(&self) -> usize {
|
||||
self.deleted_face_detections + self.deleted_tagged_photo + self.deleted_photo_insights
|
||||
}
|
||||
}
|
||||
|
||||
/// Two-tick orphan-GC state. The watcher constructs one of these once
|
||||
/// at startup and passes it back into `run_orphan_gc` every tick.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct OrphanGcState {
|
||||
/// Hashes observed orphaned on the previous tick. A hash gets
|
||||
/// promoted to "delete" when it survives a second consecutive
|
||||
/// observation with all libraries online.
|
||||
pending: HashSet<String>,
|
||||
/// Whether every library was online on the previous tick. Combined
|
||||
/// with the all-online check on the current tick, this gives the
|
||||
/// "two consecutive ticks of full availability" guard described in
|
||||
/// CLAUDE.md → "Library availability and safety".
|
||||
prev_tick_all_online: bool,
|
||||
}
|
||||
|
||||
/// Run one tick of the orphan GC. The function is responsible for the
|
||||
/// full lifecycle: probing for orphans, updating `state.pending`,
|
||||
/// performing deletes when consensus is reached, and returning stats
|
||||
/// for the watcher to log.
|
||||
///
|
||||
/// Safety guard: `all_online` MUST reflect every configured library
|
||||
/// being Online right now. Even if true, deletes only happen when the
|
||||
/// previous tick was also all-online. A single Stale tick within the
|
||||
/// window cancels any pending deletes (they stay marked but won't be
|
||||
/// promoted) — they're then re-evaluated next tick.
|
||||
pub fn run_orphan_gc(
|
||||
conn: &mut SqliteConnection,
|
||||
state: &mut OrphanGcState,
|
||||
all_online: bool,
|
||||
) -> GcStats {
|
||||
let mut stats = GcStats::default();
|
||||
|
||||
// Find every distinct content_hash referenced by hash-keyed
|
||||
// derived data that is NOT currently referenced by image_exif.
|
||||
// These are this tick's orphan candidates. Cheap query — three
|
||||
// index lookups + a HashSet at row count of derived tables, which
|
||||
// is small.
|
||||
let orphans: HashSet<String> = match collect_orphan_hashes(conn) {
|
||||
Ok(set) => set,
|
||||
Err(e) => {
|
||||
warn!("orphan-gc: candidate query failed: {:?}", e);
|
||||
return stats;
|
||||
}
|
||||
};
|
||||
|
||||
// Drop entries from pending that are no longer orphaned
|
||||
// ("revived"). Common case: a network share that briefly went
|
||||
// stale comes back, image_exif gets re-populated by ingest, and
|
||||
// the hash is no longer orphaned.
|
||||
let revived = state.pending.difference(&orphans).cloned().collect::<Vec<_>>();
|
||||
if !revived.is_empty() {
|
||||
for h in &revived {
|
||||
state.pending.remove(h);
|
||||
}
|
||||
stats.revived = revived.len();
|
||||
}
|
||||
|
||||
if !all_online {
|
||||
// Every Stale library cancels both the consensus window AND
|
||||
// any pending deletes. We *do* still note newly observed
|
||||
// orphans below — that's harmless bookkeeping. But we never
|
||||
// delete this tick.
|
||||
for h in &orphans {
|
||||
if state.pending.insert(h.clone()) {
|
||||
stats.newly_marked += 1;
|
||||
}
|
||||
}
|
||||
state.prev_tick_all_online = false;
|
||||
return stats;
|
||||
}
|
||||
|
||||
// All-online + previous-tick-also-all-online: hashes that are
|
||||
// both pending AND still orphaned this tick are confirmed and
|
||||
// get deleted. Hashes orphaned this tick but not pending get
|
||||
// freshly marked.
|
||||
let consensus_window_open = state.prev_tick_all_online;
|
||||
|
||||
let to_delete: Vec<String> = if consensus_window_open {
|
||||
orphans
|
||||
.iter()
|
||||
.filter(|h| state.pending.contains(*h))
|
||||
.cloned()
|
||||
.collect()
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
for h in &orphans {
|
||||
if !state.pending.contains(h) {
|
||||
state.pending.insert(h.clone());
|
||||
stats.newly_marked += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if !to_delete.is_empty() {
|
||||
match delete_hash_keyed_rows(conn, &to_delete) {
|
||||
Ok((faces, tags, insights)) => {
|
||||
stats.deleted_face_detections = faces;
|
||||
stats.deleted_tagged_photo = tags;
|
||||
stats.deleted_photo_insights = insights;
|
||||
// Drop deleted hashes from pending so we don't try to
|
||||
// re-delete them next tick (they'll have already been
|
||||
// removed from the orphan set).
|
||||
for h in &to_delete {
|
||||
state.pending.remove(h);
|
||||
}
|
||||
}
|
||||
Err(e) => warn!("orphan-gc: delete batch failed: {:?}", e),
|
||||
}
|
||||
}
|
||||
|
||||
state.prev_tick_all_online = true;
|
||||
|
||||
if stats.changed() {
|
||||
info!(
|
||||
"orphan-gc: marked {} new, revived {}; deleted {} face_detections / {} tagged_photo / {} photo_insights",
|
||||
stats.newly_marked,
|
||||
stats.revived,
|
||||
stats.deleted_face_detections,
|
||||
stats.deleted_tagged_photo,
|
||||
stats.deleted_photo_insights,
|
||||
);
|
||||
} else {
|
||||
debug!("orphan-gc: no changes this tick (pending: {})", state.pending.len());
|
||||
}
|
||||
|
||||
stats
|
||||
}
|
||||
|
||||
/// Helper for the watcher: are *all* libraries currently Online?
|
||||
pub fn all_libraries_online(libs: &[Library], health: &LibraryHealthMap) -> bool {
|
||||
let guard = health.read().unwrap_or_else(|e| e.into_inner());
|
||||
libs.iter().all(|lib| {
|
||||
guard
|
||||
.get(&lib.id)
|
||||
.map(|h| h.is_online())
|
||||
.unwrap_or(false)
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(QueryableByName, Debug)]
|
||||
struct HashRow {
|
||||
#[diesel(sql_type = diesel::sql_types::Text)]
|
||||
content_hash: String,
|
||||
}
|
||||
|
||||
fn collect_orphan_hashes(conn: &mut SqliteConnection) -> QueryResult<HashSet<String>> {
|
||||
// Union of every distinct content_hash carried by hash-keyed
|
||||
// derived tables, minus those still referenced by image_exif.
|
||||
let rows = sql_query(
|
||||
"SELECT DISTINCT content_hash FROM ( \
|
||||
SELECT content_hash FROM face_detections WHERE content_hash IS NOT NULL \
|
||||
UNION ALL \
|
||||
SELECT content_hash FROM tagged_photo WHERE content_hash IS NOT NULL \
|
||||
UNION ALL \
|
||||
SELECT content_hash FROM photo_insights WHERE content_hash IS NOT NULL \
|
||||
) AS derived \
|
||||
WHERE content_hash NOT IN ( \
|
||||
SELECT content_hash FROM image_exif WHERE content_hash IS NOT NULL \
|
||||
)",
|
||||
)
|
||||
.get_results::<HashRow>(conn)?;
|
||||
|
||||
Ok(rows.into_iter().map(|r| r.content_hash).collect())
|
||||
}
|
||||
|
||||
/// Delete every hash-keyed row whose `content_hash` is in `hashes`.
|
||||
/// Returns `(faces, tagged_photo, photo_insights)`.
|
||||
fn delete_hash_keyed_rows(
|
||||
conn: &mut SqliteConnection,
|
||||
hashes: &[String],
|
||||
) -> QueryResult<(usize, usize, usize)> {
|
||||
if hashes.is_empty() {
|
||||
return Ok((0, 0, 0));
|
||||
}
|
||||
|
||||
use crate::database::schema::{face_detections, photo_insights, tagged_photo};
|
||||
|
||||
let faces = diesel::delete(
|
||||
face_detections::table.filter(face_detections::content_hash.eq_any(hashes)),
|
||||
)
|
||||
.execute(conn)?;
|
||||
let tags = diesel::delete(
|
||||
tagged_photo::table.filter(tagged_photo::content_hash.eq_any(hashes)),
|
||||
)
|
||||
.execute(conn)?;
|
||||
let insights = diesel::delete(
|
||||
photo_insights::table.filter(photo_insights::content_hash.eq_any(hashes)),
|
||||
)
|
||||
.execute(conn)?;
|
||||
|
||||
Ok((faces, tags, insights))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::database::test::in_memory_db_connection;
|
||||
|
||||
fn ensure_library(conn: &mut SqliteConnection, library_id: i32) {
|
||||
diesel::sql_query(
|
||||
"INSERT OR IGNORE INTO libraries (id, name, root_path, created_at) \
|
||||
VALUES (?, 'test-' || ?, '/tmp/test-' || ?, 0)",
|
||||
)
|
||||
.bind::<diesel::sql_types::Integer, _>(library_id)
|
||||
.bind::<diesel::sql_types::Integer, _>(library_id)
|
||||
.bind::<diesel::sql_types::Integer, _>(library_id)
|
||||
.execute(conn)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn insert_image_exif(
|
||||
conn: &mut SqliteConnection,
|
||||
library_id: i32,
|
||||
rel_path: &str,
|
||||
content_hash: Option<&str>,
|
||||
) {
|
||||
ensure_library(conn, library_id);
|
||||
diesel::sql_query(
|
||||
"INSERT INTO image_exif (library_id, rel_path, created_time, last_modified, content_hash) \
|
||||
VALUES (?, ?, 0, 0, ?)",
|
||||
)
|
||||
.bind::<diesel::sql_types::Integer, _>(library_id)
|
||||
.bind::<diesel::sql_types::Text, _>(rel_path)
|
||||
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(content_hash)
|
||||
.execute(conn)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn insert_face(conn: &mut SqliteConnection, library_id: i32, rel_path: &str, hash: &str) {
|
||||
ensure_library(conn, library_id);
|
||||
diesel::sql_query(
|
||||
"INSERT INTO face_detections (library_id, content_hash, rel_path, source, status, model_version, created_at) \
|
||||
VALUES (?, ?, ?, 'auto', 'no_faces', 'v', 0)",
|
||||
)
|
||||
.bind::<diesel::sql_types::Integer, _>(library_id)
|
||||
.bind::<diesel::sql_types::Text, _>(hash)
|
||||
.bind::<diesel::sql_types::Text, _>(rel_path)
|
||||
.execute(conn)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn insert_tag_with_hash(conn: &mut SqliteConnection, rel_path: &str, hash: &str) {
|
||||
diesel::sql_query("INSERT OR IGNORE INTO tags (id, name, created_time) VALUES (1, 't', 0)")
|
||||
.execute(conn)
|
||||
.unwrap();
|
||||
diesel::sql_query(
|
||||
"INSERT INTO tagged_photo (rel_path, tag_id, created_time, content_hash) VALUES (?, 1, 0, ?)",
|
||||
)
|
||||
.bind::<diesel::sql_types::Text, _>(rel_path)
|
||||
.bind::<diesel::sql_types::Text, _>(hash)
|
||||
.execute(conn)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn insert_insight_with_hash(
|
||||
conn: &mut SqliteConnection,
|
||||
library_id: i32,
|
||||
rel_path: &str,
|
||||
hash: &str,
|
||||
) {
|
||||
ensure_library(conn, library_id);
|
||||
diesel::sql_query(
|
||||
"INSERT INTO photo_insights (library_id, rel_path, title, summary, generated_at, model_version, is_current, backend, content_hash) \
|
||||
VALUES (?, ?, 't', 's', 0, 'v', 1, 'local', ?)",
|
||||
)
|
||||
.bind::<diesel::sql_types::Integer, _>(library_id)
|
||||
.bind::<diesel::sql_types::Text, _>(rel_path)
|
||||
.bind::<diesel::sql_types::Text, _>(hash)
|
||||
.execute(conn)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[derive(QueryableByName, Debug)]
|
||||
struct CountRow {
|
||||
#[diesel(sql_type = diesel::sql_types::BigInt)]
|
||||
n: i64,
|
||||
}
|
||||
fn count(conn: &mut SqliteConnection, sql: &str) -> i64 {
|
||||
diesel::sql_query(sql).get_result::<CountRow>(conn).unwrap().n
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn refresh_back_refs_repoints_face_detection_after_move() {
|
||||
let mut conn = in_memory_db_connection();
|
||||
// Original location lib 1, rel "old.jpg". image_exif row gone
|
||||
// (file moved); only the new lib 2 row remains.
|
||||
insert_image_exif(&mut conn, 2, "new.jpg", Some("h1"));
|
||||
insert_face(&mut conn, 1, "old.jpg", "h1");
|
||||
|
||||
let updated = refresh_back_refs(&mut conn);
|
||||
assert_eq!(updated, 1);
|
||||
|
||||
let row = diesel::sql_query("SELECT library_id AS n FROM face_detections")
|
||||
.get_result::<CountRow>(&mut conn)
|
||||
.unwrap();
|
||||
assert_eq!(row.n, 2, "library_id should now point at lib 2");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn refresh_back_refs_no_change_when_back_ref_still_valid() {
|
||||
let mut conn = in_memory_db_connection();
|
||||
insert_image_exif(&mut conn, 1, "a.jpg", Some("h1"));
|
||||
insert_face(&mut conn, 1, "a.jpg", "h1");
|
||||
|
||||
let updated = refresh_back_refs(&mut conn);
|
||||
assert_eq!(updated, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn refresh_back_refs_no_change_when_hash_fully_orphaned() {
|
||||
// Hash exists on face_detections but no surviving image_exif
|
||||
// row for it → the refresh is a no-op (orphan GC handles
|
||||
// these). Important: the SET subquery would return NULL and
|
||||
// we'd null out the back-ref otherwise; the EXISTS guard
|
||||
// protects against that.
|
||||
let mut conn = in_memory_db_connection();
|
||||
insert_face(&mut conn, 1, "gone.jpg", "h1");
|
||||
|
||||
let updated = refresh_back_refs(&mut conn);
|
||||
assert_eq!(updated, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn orphan_gc_requires_two_consecutive_all_online_ticks() {
|
||||
let mut conn = in_memory_db_connection();
|
||||
// Hash present in face_detections but NOT image_exif → orphan.
|
||||
insert_face(&mut conn, 1, "x.jpg", "h-orphan");
|
||||
let mut state = OrphanGcState::default();
|
||||
|
||||
// Tick 1: prev_tick_all_online is false (default), so even
|
||||
// with current tick all-online we mark only.
|
||||
let stats = run_orphan_gc(&mut conn, &mut state, true);
|
||||
assert_eq!(stats.newly_marked, 1);
|
||||
assert_eq!(stats.total_deleted(), 0);
|
||||
assert_eq!(state.pending.len(), 1);
|
||||
|
||||
// Tick 2: prev_tick_all_online is now true, current tick still
|
||||
// all-online → consensus reached, hash gets deleted.
|
||||
let stats = run_orphan_gc(&mut conn, &mut state, true);
|
||||
assert_eq!(stats.deleted_face_detections, 1);
|
||||
assert!(state.pending.is_empty());
|
||||
|
||||
// Tick 3: nothing left.
|
||||
let stats = run_orphan_gc(&mut conn, &mut state, true);
|
||||
assert_eq!(stats.total_deleted(), 0);
|
||||
assert_eq!(stats.newly_marked, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn orphan_gc_resets_consensus_on_stale_library() {
|
||||
let mut conn = in_memory_db_connection();
|
||||
insert_face(&mut conn, 1, "x.jpg", "h-orphan");
|
||||
let mut state = OrphanGcState::default();
|
||||
|
||||
// Tick 1: all-online, mark.
|
||||
run_orphan_gc(&mut conn, &mut state, true);
|
||||
// Tick 2: stale library — consensus window resets, no delete.
|
||||
let stats = run_orphan_gc(&mut conn, &mut state, false);
|
||||
assert_eq!(stats.total_deleted(), 0);
|
||||
assert!(!state.prev_tick_all_online);
|
||||
// Tick 3: all-online again — but we need ANOTHER tick to set
|
||||
// prev_tick_all_online before deletes can fire. So tick 3
|
||||
// marks (no-op on existing pending), tick 4 deletes.
|
||||
let stats = run_orphan_gc(&mut conn, &mut state, true);
|
||||
assert_eq!(stats.total_deleted(), 0);
|
||||
let stats = run_orphan_gc(&mut conn, &mut state, true);
|
||||
assert_eq!(stats.deleted_face_detections, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn orphan_gc_revives_when_image_exif_reappears() {
|
||||
let mut conn = in_memory_db_connection();
|
||||
insert_face(&mut conn, 1, "x.jpg", "h-orphan");
|
||||
let mut state = OrphanGcState::default();
|
||||
|
||||
// Tick 1: mark.
|
||||
run_orphan_gc(&mut conn, &mut state, true);
|
||||
assert!(state.pending.contains("h-orphan"));
|
||||
|
||||
// Between ticks, the image_exif row reappears (e.g. backup
|
||||
// share was briefly stale). Hash is no longer orphaned.
|
||||
insert_image_exif(&mut conn, 2, "x.jpg", Some("h-orphan"));
|
||||
|
||||
let stats = run_orphan_gc(&mut conn, &mut state, true);
|
||||
assert_eq!(stats.revived, 1);
|
||||
assert_eq!(stats.total_deleted(), 0);
|
||||
assert!(state.pending.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn orphan_gc_deletes_across_all_three_tables() {
|
||||
let mut conn = in_memory_db_connection();
|
||||
// Same orphan hash appears in all three derived tables.
|
||||
insert_face(&mut conn, 1, "a.jpg", "h-orphan");
|
||||
insert_tag_with_hash(&mut conn, "a.jpg", "h-orphan");
|
||||
insert_insight_with_hash(&mut conn, 1, "a.jpg", "h-orphan");
|
||||
|
||||
let mut state = OrphanGcState::default();
|
||||
run_orphan_gc(&mut conn, &mut state, true);
|
||||
let stats = run_orphan_gc(&mut conn, &mut state, true);
|
||||
assert_eq!(stats.deleted_face_detections, 1);
|
||||
assert_eq!(stats.deleted_tagged_photo, 1);
|
||||
assert_eq!(stats.deleted_photo_insights, 1);
|
||||
|
||||
assert_eq!(count(&mut conn, "SELECT COUNT(*) AS n FROM face_detections"), 0);
|
||||
assert_eq!(count(&mut conn, "SELECT COUNT(*) AS n FROM tagged_photo"), 0);
|
||||
assert_eq!(count(&mut conn, "SELECT COUNT(*) AS n FROM photo_insights"), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn all_libraries_online_helper() {
|
||||
use crate::libraries::{LibraryHealth, new_health_map};
|
||||
let libs = vec![
|
||||
Library {
|
||||
id: 1,
|
||||
name: "a".into(),
|
||||
root_path: "/x".into(),
|
||||
},
|
||||
Library {
|
||||
id: 2,
|
||||
name: "b".into(),
|
||||
root_path: "/y".into(),
|
||||
},
|
||||
];
|
||||
let health = new_health_map(&libs);
|
||||
assert!(all_libraries_online(&libs, &health));
|
||||
|
||||
// Flip lib 2 to stale.
|
||||
{
|
||||
let mut g = health.write().unwrap();
|
||||
g.insert(
|
||||
2,
|
||||
LibraryHealth::Stale {
|
||||
reason: "test".into(),
|
||||
since: 0,
|
||||
},
|
||||
);
|
||||
}
|
||||
assert!(!all_libraries_online(&libs, &health));
|
||||
}
|
||||
}
|
||||
76
src/main.rs
76
src/main.rs
@@ -72,6 +72,7 @@ mod file_types;
|
||||
mod files;
|
||||
mod geo;
|
||||
mod libraries;
|
||||
mod library_maintenance;
|
||||
mod state;
|
||||
mod tags;
|
||||
mod utils;
|
||||
@@ -1945,6 +1946,29 @@ fn watch_files(
|
||||
let mut last_full_scan = SystemTime::now();
|
||||
let mut scan_count = 0u64;
|
||||
|
||||
// Per-library cursor for the missing-file scan. Each tick reads
|
||||
// a page from `offset`, stat()s the rows, deletes confirmed-
|
||||
// missing ones, and advances or wraps the cursor. State held
|
||||
// in-memory so a watcher restart resumes from 0 — fine, the
|
||||
// sweep is idempotent.
|
||||
let mut missing_file_offsets: std::collections::HashMap<i32, i64> =
|
||||
std::collections::HashMap::new();
|
||||
|
||||
let missing_scan_page_size: i64 = dotenv::var("IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.filter(|n: &i64| *n > 0)
|
||||
.unwrap_or(library_maintenance::DEFAULT_SCAN_PAGE_SIZE);
|
||||
let missing_delete_cap: usize = dotenv::var("IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.filter(|n: &usize| *n > 0)
|
||||
.unwrap_or(library_maintenance::DEFAULT_MISSING_DELETE_CAP);
|
||||
|
||||
// Two-tick orphan-GC consensus state. Carried across ticks via
|
||||
// `OrphanGcState`; see library_maintenance::run_orphan_gc.
|
||||
let mut orphan_gc_state = library_maintenance::OrphanGcState::default();
|
||||
|
||||
// Initial availability sweep before the loop's first sleep so
|
||||
// /libraries reports the truth from the very first request,
|
||||
// rather than the optimistic Online default that
|
||||
@@ -2061,6 +2085,35 @@ fn watch_files(
|
||||
|
||||
// Update media counts per library (metric aggregates across all)
|
||||
update_media_counts(Path::new(&lib.root_path), &excluded_dirs);
|
||||
|
||||
// Missing-file detection: prune image_exif rows whose
|
||||
// source file is no longer on disk. Per-library, so we
|
||||
// pass library-online-this-tick implicitly (we only
|
||||
// reach here if the probe gate at the top of the
|
||||
// iteration passed). Capped + paginated so a huge
|
||||
// library doesn't stall the watcher; rows we don't
|
||||
// visit this tick get visited next tick. See
|
||||
// library_maintenance::detect_missing_files_for_library.
|
||||
{
|
||||
let context = opentelemetry::Context::new();
|
||||
let offset = missing_file_offsets.get(&lib.id).copied().unwrap_or(0);
|
||||
let (deleted, next_offset) =
|
||||
library_maintenance::detect_missing_files_for_library(
|
||||
&context,
|
||||
lib,
|
||||
&exif_dao,
|
||||
offset,
|
||||
missing_scan_page_size,
|
||||
missing_delete_cap,
|
||||
);
|
||||
missing_file_offsets.insert(lib.id, next_offset);
|
||||
if deleted > 0 {
|
||||
debug!(
|
||||
"missing-file scan: library '{}' next_offset={}",
|
||||
lib.name, next_offset
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reconciliation: cross-library, so it runs once per tick
|
||||
@@ -2072,6 +2125,29 @@ fn watch_files(
|
||||
{
|
||||
let mut conn = image_api::database::connect();
|
||||
let _ = image_api::database::reconcile::run(&mut conn);
|
||||
|
||||
// Back-ref refresh: hash-keyed rows whose
|
||||
// (library_id, rel_path) tuple no longer matches any
|
||||
// image_exif row but whose hash still does. After a
|
||||
// recent→archive move, the missing-file scan removes
|
||||
// the old image_exif row; this pass repoints face /
|
||||
// tag / insight back-refs at the surviving location.
|
||||
// DB-only, no health gate needed — uses what's in
|
||||
// image_exif as truth.
|
||||
let _ = library_maintenance::refresh_back_refs(&mut conn);
|
||||
|
||||
// Orphan GC: the destructive end of the maintenance
|
||||
// pipeline. Two-tick consensus + every-library-online
|
||||
// requirement is enforced inside run_orphan_gc; we
|
||||
// pass the current all-online flag and the function
|
||||
// tracks the previous tick's flag in OrphanGcState.
|
||||
let all_online =
|
||||
library_maintenance::all_libraries_online(&libs, &library_health);
|
||||
let _ = library_maintenance::run_orphan_gc(
|
||||
&mut conn,
|
||||
&mut orphan_gc_state,
|
||||
all_online,
|
||||
);
|
||||
}
|
||||
|
||||
if is_full_scan {
|
||||
|
||||
Reference in New Issue
Block a user