feature/library-handoff-and-gc #69
72
CLAUDE.md
72
CLAUDE.md
@@ -229,6 +229,78 @@ disappearance with no matching appearance is treated as
|
|||||||
"unavailable-or-deleted, defer judgment" — it does not re-key any rows
|
"unavailable-or-deleted, defer judgment" — it does not re-key any rows
|
||||||
and does not enqueue GC.
|
and does not enqueue GC.
|
||||||
|
|
||||||
|
**Maintenance pipeline (`src/library_maintenance.rs`).** The watcher
|
||||||
|
runs three maintenance passes per tick that together implement the
|
||||||
|
move/handoff and orphan rules:
|
||||||
|
|
||||||
|
1. **Missing-file scan** — per online library, paginated. A page of
|
||||||
|
`image_exif` rows is loaded (`IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE`,
|
||||||
|
default 500), each row's `(root_path/rel_path)` is `stat()`-ed,
|
||||||
|
and confirmed-not-found rows are deleted from `image_exif`
|
||||||
|
(capped at `IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK`, default 200).
|
||||||
|
Permission/IO errors are skipped, never deleted — only `NotFound`
|
||||||
|
triggers a deletion. The cursor wraps every time a partial page
|
||||||
|
comes back, so the whole library is swept across consecutive ticks.
|
||||||
|
Skipped wholesale for Stale libraries via the per-library probe
|
||||||
|
gate at the top of the loop iteration.
|
||||||
|
|
||||||
|
2. **Back-ref refresh** — DB-only. For `face_detections`,
|
||||||
|
`tagged_photo`, and `photo_insights`: any hash-keyed row whose
|
||||||
|
`(library_id, rel_path)` no longer matches an `image_exif` row
|
||||||
|
*but whose `content_hash` does* is repointed at the surviving
|
||||||
|
`image_exif` location. Idempotent SQL; no health gate needed.
|
||||||
|
This is what makes the recent → archive handoff invisible to
|
||||||
|
read paths: when the missing-file scan retires the lib-A row,
|
||||||
|
tags/faces/insights pivot to lib-B's path before any user
|
||||||
|
notices.
|
||||||
|
|
||||||
|
3. **Orphan GC** — destructive. Hash-keyed derived rows whose
|
||||||
|
`content_hash` no longer has any `image_exif` row are eligible.
|
||||||
|
Two-tick consensus: a hash must be observed orphaned on two
|
||||||
|
consecutive ticks AND every library must be online for both. A
|
||||||
|
single Stale tick within the window cancels all pending deletes.
|
||||||
|
The pending set is held in memory (`OrphanGcState`) — restart
|
||||||
|
resets it, which only delays a delete, never causes one. Tags,
|
||||||
|
faces, and insights for orphaned hashes are deleted in one batch
|
||||||
|
per tick.
|
||||||
|
|
||||||
|
A backup library that briefly disappears, then returns within two
|
||||||
|
ticks, never loses any derived data. A move from lib-A to lib-B
|
||||||
|
without disappearance flips through pass 1 (lib-A row retired) and
|
||||||
|
pass 2 (back-refs follow), with pass 3 noting nothing because the
|
||||||
|
hash is still present in `image_exif` (lib-B's row).
|
||||||
|
|
||||||
|
**Known gap: in-place content changes (future Branch D).** The
|
||||||
|
maintenance pipeline assumes a `(library_id, rel_path)`'s bytes are
|
||||||
|
stable for as long as the file exists at that path. If a user edits
|
||||||
|
a file in place (crop, re-export) without renaming, the watcher's
|
||||||
|
quick scan walks the file (mtime is recent) but `process_new_files`
|
||||||
|
short-circuits because `(library_id, rel_path)` already has an
|
||||||
|
`image_exif` row — no re-hash, no re-EXIF, no face redetection. The
|
||||||
|
row's `content_hash` keeps pointing at the original bytes. Tags /
|
||||||
|
faces / insights stay attached to the original hash and continue to
|
||||||
|
display because the rel_path back-ref still resolves; new faces
|
||||||
|
introduced by the edit are never detected.
|
||||||
|
|
||||||
|
The right place to fix this is a **stale-content detection pass**
|
||||||
|
that compares `image_exif.last_modified` / `size_bytes` to
|
||||||
|
`fs::metadata` for rows the quick scan would otherwise skip. On
|
||||||
|
mismatch, recompute the hash, update `image_exif`, and apply the
|
||||||
|
"content branched" semantics:
|
||||||
|
- **Faces** re-run (faces are fully derived from bytes).
|
||||||
|
- **Tags** migrate to the new hash (user intent — "this photo is
|
||||||
|
vacation" survives a crop). Insights migrate forward as a
|
||||||
|
starting point and are flagged for re-generation.
|
||||||
|
- **Favorites** (when migrated to hash-keyed) follow the path /
|
||||||
|
user intent.
|
||||||
|
|
||||||
|
The interesting case is the operator who keeps an unedited copy in
|
||||||
|
the archive library and edits the local copy: post-detection, the
|
||||||
|
archive copy stays on the original hash, the local copy branches to
|
||||||
|
the new hash, and the two histories cleanly split. Apollo's
|
||||||
|
`derived.db` cache will need an invalidation hook for the changed
|
||||||
|
hash — design it alongside Branch D.
|
||||||
|
|
||||||
### File Processing Pipeline
|
### File Processing Pipeline
|
||||||
|
|
||||||
**Thumbnail Generation:**
|
**Thumbnail Generation:**
|
||||||
|
|||||||
@@ -466,6 +466,18 @@ pub trait ExifDao: Sync + Send {
|
|||||||
context: &opentelemetry::Context,
|
context: &opentelemetry::Context,
|
||||||
library_id: i32,
|
library_id: i32,
|
||||||
) -> Result<i64, DbError>;
|
) -> Result<i64, DbError>;
|
||||||
|
|
||||||
|
/// Paginated rel_path listing for a single library, ordered by id
|
||||||
|
/// ascending. Used by the missing-file detector to scan a library
|
||||||
|
/// in capped chunks across consecutive watcher ticks rather than
|
||||||
|
/// stat()ing every row every minute. Returns `(id, rel_path)`.
|
||||||
|
fn list_rel_paths_for_library_page(
|
||||||
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
library_id: i32,
|
||||||
|
limit: i64,
|
||||||
|
offset: i64,
|
||||||
|
) -> Result<Vec<(i32, String)>, DbError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct SqliteExifDao {
|
pub struct SqliteExifDao {
|
||||||
@@ -1129,6 +1141,28 @@ impl ExifDao for SqliteExifDao {
|
|||||||
})
|
})
|
||||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn list_rel_paths_for_library_page(
|
||||||
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
library_id_val: i32,
|
||||||
|
limit: i64,
|
||||||
|
offset: i64,
|
||||||
|
) -> Result<Vec<(i32, String)>, DbError> {
|
||||||
|
trace_db_call(context, "query", "list_rel_paths_for_library_page", |_span| {
|
||||||
|
use schema::image_exif::dsl::*;
|
||||||
|
|
||||||
|
image_exif
|
||||||
|
.filter(library_id.eq(library_id_val))
|
||||||
|
.order(id.asc())
|
||||||
|
.select((id, rel_path))
|
||||||
|
.limit(limit)
|
||||||
|
.offset(offset)
|
||||||
|
.load::<(i32, String)>(self.connection.lock().unwrap().deref_mut())
|
||||||
|
.map_err(|_| anyhow::anyhow!("Query error"))
|
||||||
|
})
|
||||||
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
10
src/files.rs
10
src/files.rs
@@ -1709,6 +1709,16 @@ mod tests {
|
|||||||
) -> Result<i64, DbError> {
|
) -> Result<i64, DbError> {
|
||||||
Ok(0)
|
Ok(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn list_rel_paths_for_library_page(
|
||||||
|
&mut self,
|
||||||
|
_context: &opentelemetry::Context,
|
||||||
|
_library_id: i32,
|
||||||
|
_limit: i64,
|
||||||
|
_offset: i64,
|
||||||
|
) -> Result<Vec<(i32, String)>, DbError> {
|
||||||
|
Ok(Vec::new())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mod api {
|
mod api {
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ pub mod file_types;
|
|||||||
pub mod files;
|
pub mod files;
|
||||||
pub mod geo;
|
pub mod geo;
|
||||||
pub mod libraries;
|
pub mod libraries;
|
||||||
|
pub mod library_maintenance;
|
||||||
pub mod memories;
|
pub mod memories;
|
||||||
pub mod otel;
|
pub mod otel;
|
||||||
pub mod parsers;
|
pub mod parsers;
|
||||||
|
|||||||
759
src/library_maintenance.rs
Normal file
759
src/library_maintenance.rs
Normal file
@@ -0,0 +1,759 @@
|
|||||||
|
//! Filesystem-backed maintenance of `image_exif`, the back-ref columns
|
||||||
|
//! on hash-keyed tables, and orphan derived data.
|
||||||
|
//!
|
||||||
|
//! These passes are the operational implementation of the library
|
||||||
|
//! handoff and orphan rules from CLAUDE.md → "Multi-library data
|
||||||
|
//! model" / "Library availability and safety":
|
||||||
|
//!
|
||||||
|
//! 1. **Missing-file detection** — when a file disappears from disk
|
||||||
|
//! but its `image_exif` row remains, the row is removed. Naturally
|
||||||
|
//! implements the move case: when a user moves a file from lib-A
|
||||||
|
//! to lib-B, the watcher's normal ingest creates the lib-B row;
|
||||||
|
//! this pass eventually retires the lib-A row.
|
||||||
|
//!
|
||||||
|
//! 2. **Back-ref refresh** — hash-keyed rows (`face_detections` and,
|
||||||
|
//! after Branch B, `tagged_photo` / `photo_insights`) carry a
|
||||||
|
//! denormalized `(library_id, rel_path)` back-ref. After a move,
|
||||||
|
//! that back-ref may point at a deleted row. The refresh pass
|
||||||
|
//! finds rows whose `(library_id, rel_path)` no longer matches
|
||||||
|
//! any `image_exif` row but whose `content_hash` does, and updates
|
||||||
|
//! the back-ref to one of the surviving paths. Idempotent.
|
||||||
|
//!
|
||||||
|
//! 3. **Orphan GC** — when a `content_hash` no longer has any
|
||||||
|
//! `image_exif` row referencing it, hash-keyed derived rows for
|
||||||
|
//! that hash become eligible for deletion. To survive transient
|
||||||
|
//! unmounts, the pass uses a **two-tick consensus rule**: a hash
|
||||||
|
//! must be observed orphaned for two consecutive ticks AND every
|
||||||
|
//! library must be online for both observations. The "marked but
|
||||||
|
//! not yet deleted" state is held in memory; restarting the
|
||||||
|
//! watcher resets it (which is fine — the second tick simply
|
||||||
|
//! happens after the next tick, not the very next one).
|
||||||
|
//!
|
||||||
|
//! Pass 1 is filesystem-dependent and gated on the per-library
|
||||||
|
//! availability probe. Passes 2 and 3 are database-only but pass 3
|
||||||
|
//! additionally requires every library to be online for the
|
||||||
|
//! consensus window.
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use diesel::prelude::*;
|
||||||
|
use diesel::sql_query;
|
||||||
|
use diesel::sqlite::SqliteConnection;
|
||||||
|
use log::{debug, info, warn};
|
||||||
|
|
||||||
|
use crate::database::ExifDao;
|
||||||
|
use crate::libraries::{Library, LibraryHealthMap};
|
||||||
|
|
||||||
|
/// Cap on missing-file deletions per library per tick. Prevents a
|
||||||
|
/// pathological mount that returns "not found" for everything (e.g.
|
||||||
|
/// case-sensitivity flip on a network share that the probe didn't
|
||||||
|
/// catch) from wiping the entire image_exif table in one tick. Tunable
|
||||||
|
/// via `IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK`.
|
||||||
|
pub const DEFAULT_MISSING_DELETE_CAP: usize = 200;
|
||||||
|
|
||||||
|
/// Page size for the missing-file scan. We stat() every row in this
|
||||||
|
/// batch but only delete those that are confirmed-not-found (subject
|
||||||
|
/// to the delete cap above). Tunable via
|
||||||
|
/// `IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE`.
|
||||||
|
pub const DEFAULT_SCAN_PAGE_SIZE: i64 = 500;
|
||||||
|
|
||||||
|
/// Scan a page of `image_exif` rows for `library`, stat() each one,
|
||||||
|
/// and delete rows whose source file is gone. Returns
|
||||||
|
/// `(deleted, next_offset)`. `next_offset` wraps to 0 when the page
|
||||||
|
/// returned fewer rows than the page size, so the watcher cycles
|
||||||
|
/// through the whole library across ticks.
|
||||||
|
///
|
||||||
|
/// Caller must already have confirmed the library is online — running
|
||||||
|
/// against a Stale library would interpret every row as missing.
|
||||||
|
pub fn detect_missing_files_for_library(
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
library: &Library,
|
||||||
|
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
|
||||||
|
offset: i64,
|
||||||
|
page_size: i64,
|
||||||
|
delete_cap: usize,
|
||||||
|
) -> (usize, i64) {
|
||||||
|
let rows = {
|
||||||
|
let mut dao = exif_dao.lock().expect("exif_dao poisoned");
|
||||||
|
match dao.list_rel_paths_for_library_page(context, library.id, page_size, offset) {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
"missing-file scan: list page failed for library '{}' (offset={}): {:?}",
|
||||||
|
library.name, offset, e
|
||||||
|
);
|
||||||
|
return (0, offset);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let n_returned = rows.len();
|
||||||
|
// Wrap offset when we hit the end of the table — next tick starts
|
||||||
|
// a fresh sweep. Doing it here rather than on the next call keeps
|
||||||
|
// the offset accounting visible in one place.
|
||||||
|
let next_offset = if (n_returned as i64) < page_size {
|
||||||
|
0
|
||||||
|
} else {
|
||||||
|
offset + page_size
|
||||||
|
};
|
||||||
|
|
||||||
|
if rows.is_empty() {
|
||||||
|
return (0, next_offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
let root = Path::new(&library.root_path);
|
||||||
|
let mut to_delete: Vec<String> = Vec::new();
|
||||||
|
for (_id, rel_path) in &rows {
|
||||||
|
if to_delete.len() >= delete_cap {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let abs = root.join(rel_path);
|
||||||
|
match std::fs::metadata(&abs) {
|
||||||
|
Ok(_) => {
|
||||||
|
// File still exists — nothing to do.
|
||||||
|
}
|
||||||
|
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||||
|
to_delete.push(rel_path.clone());
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
// Permission denied / IO error / etc. — skip this row,
|
||||||
|
// leave it for the next sweep. We never want a transient
|
||||||
|
// FS hiccup to mass-delete metadata.
|
||||||
|
debug!(
|
||||||
|
"missing-file scan: stat() error for {:?}, skipping: {:?}",
|
||||||
|
abs, e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if to_delete.is_empty() {
|
||||||
|
return (0, next_offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut deleted = 0;
|
||||||
|
{
|
||||||
|
let mut dao = exif_dao.lock().expect("exif_dao poisoned");
|
||||||
|
for rel_path in &to_delete {
|
||||||
|
match dao.delete_exif_by_library(context, library.id, rel_path) {
|
||||||
|
Ok(()) => deleted += 1,
|
||||||
|
Err(e) => warn!(
|
||||||
|
"missing-file scan: delete failed for ({}, {}): {:?}",
|
||||||
|
library.id, rel_path, e
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if deleted > 0 {
|
||||||
|
info!(
|
||||||
|
"missing-file scan: removed {} stale image_exif row(s) from library '{}'",
|
||||||
|
deleted, library.name
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
(deleted, next_offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Refresh the `(library_id, rel_path)` back-refs on hash-keyed
|
||||||
|
/// tables. A back-ref is stale when:
|
||||||
|
/// - its `content_hash` is non-null,
|
||||||
|
/// - that hash is referenced by at least one `image_exif` row, but
|
||||||
|
/// - the row's own `(library_id, rel_path)` does not appear in
|
||||||
|
/// `image_exif`.
|
||||||
|
///
|
||||||
|
/// In that case, point the back-ref at any surviving image_exif row
|
||||||
|
/// for the same hash. `face_detections` is the canonical case (it
|
||||||
|
/// carries `library_id` + `rel_path` columns); `tagged_photo` and
|
||||||
|
/// `photo_insights` only carry rel_path historically — we still keep
|
||||||
|
/// it in sync here for consistency, picking any surviving rel_path.
|
||||||
|
///
|
||||||
|
/// All-SQL, idempotent. Returns the number of rows updated.
|
||||||
|
pub fn refresh_back_refs(conn: &mut SqliteConnection) -> usize {
|
||||||
|
let mut total = 0usize;
|
||||||
|
|
||||||
|
// face_detections — back-ref is (library_id, rel_path). Repoint to
|
||||||
|
// any surviving image_exif row carrying the same content_hash.
|
||||||
|
let updated = sql_query(
|
||||||
|
"UPDATE face_detections \
|
||||||
|
SET library_id = ( \
|
||||||
|
SELECT ie.library_id FROM image_exif ie \
|
||||||
|
WHERE ie.content_hash = face_detections.content_hash \
|
||||||
|
ORDER BY ie.id LIMIT 1 \
|
||||||
|
), \
|
||||||
|
rel_path = ( \
|
||||||
|
SELECT ie.rel_path FROM image_exif ie \
|
||||||
|
WHERE ie.content_hash = face_detections.content_hash \
|
||||||
|
ORDER BY ie.id LIMIT 1 \
|
||||||
|
) \
|
||||||
|
WHERE EXISTS ( \
|
||||||
|
SELECT 1 FROM image_exif ie \
|
||||||
|
WHERE ie.content_hash = face_detections.content_hash \
|
||||||
|
) \
|
||||||
|
AND NOT EXISTS ( \
|
||||||
|
SELECT 1 FROM image_exif ie \
|
||||||
|
WHERE ie.library_id = face_detections.library_id \
|
||||||
|
AND ie.rel_path = face_detections.rel_path \
|
||||||
|
)",
|
||||||
|
)
|
||||||
|
.execute(conn)
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
warn!("back-ref refresh: face_detections update failed: {:?}", e);
|
||||||
|
0
|
||||||
|
});
|
||||||
|
total += updated;
|
||||||
|
|
||||||
|
// tagged_photo — only rel_path. Update to any surviving rel_path
|
||||||
|
// for the same content_hash so the path-only DAO read still finds
|
||||||
|
// tags after a move.
|
||||||
|
let updated = sql_query(
|
||||||
|
"UPDATE tagged_photo \
|
||||||
|
SET rel_path = ( \
|
||||||
|
SELECT ie.rel_path FROM image_exif ie \
|
||||||
|
WHERE ie.content_hash = tagged_photo.content_hash \
|
||||||
|
ORDER BY ie.id LIMIT 1 \
|
||||||
|
) \
|
||||||
|
WHERE content_hash IS NOT NULL \
|
||||||
|
AND EXISTS ( \
|
||||||
|
SELECT 1 FROM image_exif ie \
|
||||||
|
WHERE ie.content_hash = tagged_photo.content_hash \
|
||||||
|
) \
|
||||||
|
AND NOT EXISTS ( \
|
||||||
|
SELECT 1 FROM image_exif ie \
|
||||||
|
WHERE ie.rel_path = tagged_photo.rel_path \
|
||||||
|
)",
|
||||||
|
)
|
||||||
|
.execute(conn)
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
warn!("back-ref refresh: tagged_photo update failed: {:?}", e);
|
||||||
|
0
|
||||||
|
});
|
||||||
|
total += updated;
|
||||||
|
|
||||||
|
// photo_insights — has both library_id and rel_path. Update both
|
||||||
|
// when the (library_id, rel_path) tuple no longer matches any
|
||||||
|
// image_exif row but the hash does.
|
||||||
|
let updated = sql_query(
|
||||||
|
"UPDATE photo_insights \
|
||||||
|
SET library_id = ( \
|
||||||
|
SELECT ie.library_id FROM image_exif ie \
|
||||||
|
WHERE ie.content_hash = photo_insights.content_hash \
|
||||||
|
ORDER BY ie.id LIMIT 1 \
|
||||||
|
), \
|
||||||
|
rel_path = ( \
|
||||||
|
SELECT ie.rel_path FROM image_exif ie \
|
||||||
|
WHERE ie.content_hash = photo_insights.content_hash \
|
||||||
|
ORDER BY ie.id LIMIT 1 \
|
||||||
|
) \
|
||||||
|
WHERE content_hash IS NOT NULL \
|
||||||
|
AND EXISTS ( \
|
||||||
|
SELECT 1 FROM image_exif ie \
|
||||||
|
WHERE ie.content_hash = photo_insights.content_hash \
|
||||||
|
) \
|
||||||
|
AND NOT EXISTS ( \
|
||||||
|
SELECT 1 FROM image_exif ie \
|
||||||
|
WHERE ie.library_id = photo_insights.library_id \
|
||||||
|
AND ie.rel_path = photo_insights.rel_path \
|
||||||
|
)",
|
||||||
|
)
|
||||||
|
.execute(conn)
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
warn!("back-ref refresh: photo_insights update failed: {:?}", e);
|
||||||
|
0
|
||||||
|
});
|
||||||
|
total += updated;
|
||||||
|
|
||||||
|
if total > 0 {
|
||||||
|
info!("back-ref refresh: updated {} hash-keyed row(s)", total);
|
||||||
|
}
|
||||||
|
total
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One tick's outcome of the orphan-GC pass.
|
||||||
|
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub struct GcStats {
|
||||||
|
/// Hashes newly observed orphaned this tick (added to the
|
||||||
|
/// pending set).
|
||||||
|
pub newly_marked: usize,
|
||||||
|
/// Hashes that were marked last tick AND are still orphaned this
|
||||||
|
/// tick AND every library is online — these are deleted.
|
||||||
|
pub deleted_face_detections: usize,
|
||||||
|
pub deleted_tagged_photo: usize,
|
||||||
|
pub deleted_photo_insights: usize,
|
||||||
|
/// Hashes dropped from the pending set because they re-appeared
|
||||||
|
/// in image_exif (e.g. user remounted a backup that was briefly
|
||||||
|
/// missing).
|
||||||
|
pub revived: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GcStats {
|
||||||
|
pub fn changed(&self) -> bool {
|
||||||
|
self.newly_marked > 0
|
||||||
|
|| self.deleted_face_detections > 0
|
||||||
|
|| self.deleted_tagged_photo > 0
|
||||||
|
|| self.deleted_photo_insights > 0
|
||||||
|
|| self.revived > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn total_deleted(&self) -> usize {
|
||||||
|
self.deleted_face_detections + self.deleted_tagged_photo + self.deleted_photo_insights
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Two-tick orphan-GC state. The watcher constructs one of these once
|
||||||
|
/// at startup and passes it back into `run_orphan_gc` every tick.
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct OrphanGcState {
|
||||||
|
/// Hashes observed orphaned on the previous tick. A hash gets
|
||||||
|
/// promoted to "delete" when it survives a second consecutive
|
||||||
|
/// observation with all libraries online.
|
||||||
|
pending: HashSet<String>,
|
||||||
|
/// Whether every library was online on the previous tick. Combined
|
||||||
|
/// with the all-online check on the current tick, this gives the
|
||||||
|
/// "two consecutive ticks of full availability" guard described in
|
||||||
|
/// CLAUDE.md → "Library availability and safety".
|
||||||
|
prev_tick_all_online: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run one tick of the orphan GC. The function is responsible for the
|
||||||
|
/// full lifecycle: probing for orphans, updating `state.pending`,
|
||||||
|
/// performing deletes when consensus is reached, and returning stats
|
||||||
|
/// for the watcher to log.
|
||||||
|
///
|
||||||
|
/// Safety guard: `all_online` MUST reflect every configured library
|
||||||
|
/// being Online right now. Even if true, deletes only happen when the
|
||||||
|
/// previous tick was also all-online. A single Stale tick within the
|
||||||
|
/// window cancels any pending deletes (they stay marked but won't be
|
||||||
|
/// promoted) — they're then re-evaluated next tick.
|
||||||
|
pub fn run_orphan_gc(
|
||||||
|
conn: &mut SqliteConnection,
|
||||||
|
state: &mut OrphanGcState,
|
||||||
|
all_online: bool,
|
||||||
|
) -> GcStats {
|
||||||
|
let mut stats = GcStats::default();
|
||||||
|
|
||||||
|
// Find every distinct content_hash referenced by hash-keyed
|
||||||
|
// derived data that is NOT currently referenced by image_exif.
|
||||||
|
// These are this tick's orphan candidates. Cheap query — three
|
||||||
|
// index lookups + a HashSet at row count of derived tables, which
|
||||||
|
// is small.
|
||||||
|
let orphans: HashSet<String> = match collect_orphan_hashes(conn) {
|
||||||
|
Ok(set) => set,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("orphan-gc: candidate query failed: {:?}", e);
|
||||||
|
return stats;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Drop entries from pending that are no longer orphaned
|
||||||
|
// ("revived"). Common case: a network share that briefly went
|
||||||
|
// stale comes back, image_exif gets re-populated by ingest, and
|
||||||
|
// the hash is no longer orphaned.
|
||||||
|
let revived = state.pending.difference(&orphans).cloned().collect::<Vec<_>>();
|
||||||
|
if !revived.is_empty() {
|
||||||
|
for h in &revived {
|
||||||
|
state.pending.remove(h);
|
||||||
|
}
|
||||||
|
stats.revived = revived.len();
|
||||||
|
}
|
||||||
|
|
||||||
|
if !all_online {
|
||||||
|
// Every Stale library cancels both the consensus window AND
|
||||||
|
// any pending deletes. We *do* still note newly observed
|
||||||
|
// orphans below — that's harmless bookkeeping. But we never
|
||||||
|
// delete this tick.
|
||||||
|
for h in &orphans {
|
||||||
|
if state.pending.insert(h.clone()) {
|
||||||
|
stats.newly_marked += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
state.prev_tick_all_online = false;
|
||||||
|
if stats.changed() {
|
||||||
|
info!(
|
||||||
|
"orphan-gc: {} new orphan hash(es) marked, {} revived (deferred — at least one library Stale; pending: {})",
|
||||||
|
stats.newly_marked,
|
||||||
|
stats.revived,
|
||||||
|
state.pending.len()
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
"orphan-gc: stale library, no changes (pending: {})",
|
||||||
|
state.pending.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return stats;
|
||||||
|
}
|
||||||
|
|
||||||
|
// All-online + previous-tick-also-all-online: hashes that are
|
||||||
|
// both pending AND still orphaned this tick are confirmed and
|
||||||
|
// get deleted. Hashes orphaned this tick but not pending get
|
||||||
|
// freshly marked.
|
||||||
|
let consensus_window_open = state.prev_tick_all_online;
|
||||||
|
|
||||||
|
let to_delete: Vec<String> = if consensus_window_open {
|
||||||
|
orphans
|
||||||
|
.iter()
|
||||||
|
.filter(|h| state.pending.contains(*h))
|
||||||
|
.cloned()
|
||||||
|
.collect()
|
||||||
|
} else {
|
||||||
|
Vec::new()
|
||||||
|
};
|
||||||
|
|
||||||
|
for h in &orphans {
|
||||||
|
if !state.pending.contains(h) {
|
||||||
|
state.pending.insert(h.clone());
|
||||||
|
stats.newly_marked += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !to_delete.is_empty() {
|
||||||
|
match delete_hash_keyed_rows(conn, &to_delete) {
|
||||||
|
Ok((faces, tags, insights)) => {
|
||||||
|
stats.deleted_face_detections = faces;
|
||||||
|
stats.deleted_tagged_photo = tags;
|
||||||
|
stats.deleted_photo_insights = insights;
|
||||||
|
// Drop deleted hashes from pending so we don't try to
|
||||||
|
// re-delete them next tick (they'll have already been
|
||||||
|
// removed from the orphan set).
|
||||||
|
for h in &to_delete {
|
||||||
|
state.pending.remove(h);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => warn!("orphan-gc: delete batch failed: {:?}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
state.prev_tick_all_online = true;
|
||||||
|
|
||||||
|
if stats.changed() {
|
||||||
|
info!(
|
||||||
|
"orphan-gc: {} new orphan hash(es) marked, {} revived; deleted {} face_detections / {} tagged_photo / {} photo_insights row(s) (pending: {})",
|
||||||
|
stats.newly_marked,
|
||||||
|
stats.revived,
|
||||||
|
stats.deleted_face_detections,
|
||||||
|
stats.deleted_tagged_photo,
|
||||||
|
stats.deleted_photo_insights,
|
||||||
|
state.pending.len(),
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
debug!("orphan-gc: no changes this tick (pending: {})", state.pending.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
stats
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper for the watcher: are *all* libraries currently Online?
|
||||||
|
pub fn all_libraries_online(libs: &[Library], health: &LibraryHealthMap) -> bool {
|
||||||
|
let guard = health.read().unwrap_or_else(|e| e.into_inner());
|
||||||
|
libs.iter().all(|lib| {
|
||||||
|
guard
|
||||||
|
.get(&lib.id)
|
||||||
|
.map(|h| h.is_online())
|
||||||
|
.unwrap_or(false)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(QueryableByName, Debug)]
|
||||||
|
struct HashRow {
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Text)]
|
||||||
|
content_hash: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn collect_orphan_hashes(conn: &mut SqliteConnection) -> QueryResult<HashSet<String>> {
|
||||||
|
// Union of every distinct content_hash carried by hash-keyed
|
||||||
|
// derived tables, minus those still referenced by image_exif.
|
||||||
|
let rows = sql_query(
|
||||||
|
"SELECT DISTINCT content_hash FROM ( \
|
||||||
|
SELECT content_hash FROM face_detections WHERE content_hash IS NOT NULL \
|
||||||
|
UNION ALL \
|
||||||
|
SELECT content_hash FROM tagged_photo WHERE content_hash IS NOT NULL \
|
||||||
|
UNION ALL \
|
||||||
|
SELECT content_hash FROM photo_insights WHERE content_hash IS NOT NULL \
|
||||||
|
) AS derived \
|
||||||
|
WHERE content_hash NOT IN ( \
|
||||||
|
SELECT content_hash FROM image_exif WHERE content_hash IS NOT NULL \
|
||||||
|
)",
|
||||||
|
)
|
||||||
|
.get_results::<HashRow>(conn)?;
|
||||||
|
|
||||||
|
Ok(rows.into_iter().map(|r| r.content_hash).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete every hash-keyed row whose `content_hash` is in `hashes`.
|
||||||
|
/// Returns `(faces, tagged_photo, photo_insights)`.
|
||||||
|
fn delete_hash_keyed_rows(
|
||||||
|
conn: &mut SqliteConnection,
|
||||||
|
hashes: &[String],
|
||||||
|
) -> QueryResult<(usize, usize, usize)> {
|
||||||
|
if hashes.is_empty() {
|
||||||
|
return Ok((0, 0, 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
use crate::database::schema::{face_detections, photo_insights, tagged_photo};
|
||||||
|
|
||||||
|
let faces = diesel::delete(
|
||||||
|
face_detections::table.filter(face_detections::content_hash.eq_any(hashes)),
|
||||||
|
)
|
||||||
|
.execute(conn)?;
|
||||||
|
let tags = diesel::delete(
|
||||||
|
tagged_photo::table.filter(tagged_photo::content_hash.eq_any(hashes)),
|
||||||
|
)
|
||||||
|
.execute(conn)?;
|
||||||
|
let insights = diesel::delete(
|
||||||
|
photo_insights::table.filter(photo_insights::content_hash.eq_any(hashes)),
|
||||||
|
)
|
||||||
|
.execute(conn)?;
|
||||||
|
|
||||||
|
Ok((faces, tags, insights))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::database::test::in_memory_db_connection;
|
||||||
|
|
||||||
|
fn ensure_library(conn: &mut SqliteConnection, library_id: i32) {
|
||||||
|
diesel::sql_query(
|
||||||
|
"INSERT OR IGNORE INTO libraries (id, name, root_path, created_at) \
|
||||||
|
VALUES (?, 'test-' || ?, '/tmp/test-' || ?, 0)",
|
||||||
|
)
|
||||||
|
.bind::<diesel::sql_types::Integer, _>(library_id)
|
||||||
|
.bind::<diesel::sql_types::Integer, _>(library_id)
|
||||||
|
.bind::<diesel::sql_types::Integer, _>(library_id)
|
||||||
|
.execute(conn)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert_image_exif(
|
||||||
|
conn: &mut SqliteConnection,
|
||||||
|
library_id: i32,
|
||||||
|
rel_path: &str,
|
||||||
|
content_hash: Option<&str>,
|
||||||
|
) {
|
||||||
|
ensure_library(conn, library_id);
|
||||||
|
diesel::sql_query(
|
||||||
|
"INSERT INTO image_exif (library_id, rel_path, created_time, last_modified, content_hash) \
|
||||||
|
VALUES (?, ?, 0, 0, ?)",
|
||||||
|
)
|
||||||
|
.bind::<diesel::sql_types::Integer, _>(library_id)
|
||||||
|
.bind::<diesel::sql_types::Text, _>(rel_path)
|
||||||
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(content_hash)
|
||||||
|
.execute(conn)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert_face(conn: &mut SqliteConnection, library_id: i32, rel_path: &str, hash: &str) {
|
||||||
|
ensure_library(conn, library_id);
|
||||||
|
diesel::sql_query(
|
||||||
|
"INSERT INTO face_detections (library_id, content_hash, rel_path, source, status, model_version, created_at) \
|
||||||
|
VALUES (?, ?, ?, 'auto', 'no_faces', 'v', 0)",
|
||||||
|
)
|
||||||
|
.bind::<diesel::sql_types::Integer, _>(library_id)
|
||||||
|
.bind::<diesel::sql_types::Text, _>(hash)
|
||||||
|
.bind::<diesel::sql_types::Text, _>(rel_path)
|
||||||
|
.execute(conn)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert_tag_with_hash(conn: &mut SqliteConnection, rel_path: &str, hash: &str) {
|
||||||
|
diesel::sql_query("INSERT OR IGNORE INTO tags (id, name, created_time) VALUES (1, 't', 0)")
|
||||||
|
.execute(conn)
|
||||||
|
.unwrap();
|
||||||
|
diesel::sql_query(
|
||||||
|
"INSERT INTO tagged_photo (rel_path, tag_id, created_time, content_hash) VALUES (?, 1, 0, ?)",
|
||||||
|
)
|
||||||
|
.bind::<diesel::sql_types::Text, _>(rel_path)
|
||||||
|
.bind::<diesel::sql_types::Text, _>(hash)
|
||||||
|
.execute(conn)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert_insight_with_hash(
|
||||||
|
conn: &mut SqliteConnection,
|
||||||
|
library_id: i32,
|
||||||
|
rel_path: &str,
|
||||||
|
hash: &str,
|
||||||
|
) {
|
||||||
|
ensure_library(conn, library_id);
|
||||||
|
diesel::sql_query(
|
||||||
|
"INSERT INTO photo_insights (library_id, rel_path, title, summary, generated_at, model_version, is_current, backend, content_hash) \
|
||||||
|
VALUES (?, ?, 't', 's', 0, 'v', 1, 'local', ?)",
|
||||||
|
)
|
||||||
|
.bind::<diesel::sql_types::Integer, _>(library_id)
|
||||||
|
.bind::<diesel::sql_types::Text, _>(rel_path)
|
||||||
|
.bind::<diesel::sql_types::Text, _>(hash)
|
||||||
|
.execute(conn)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(QueryableByName, Debug)]
|
||||||
|
struct CountRow {
|
||||||
|
#[diesel(sql_type = diesel::sql_types::BigInt)]
|
||||||
|
n: i64,
|
||||||
|
}
|
||||||
|
fn count(conn: &mut SqliteConnection, sql: &str) -> i64 {
|
||||||
|
diesel::sql_query(sql).get_result::<CountRow>(conn).unwrap().n
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn refresh_back_refs_repoints_face_detection_after_move() {
|
||||||
|
let mut conn = in_memory_db_connection();
|
||||||
|
// Original location lib 1, rel "old.jpg". image_exif row gone
|
||||||
|
// (file moved); only the new lib 2 row remains.
|
||||||
|
insert_image_exif(&mut conn, 2, "new.jpg", Some("h1"));
|
||||||
|
insert_face(&mut conn, 1, "old.jpg", "h1");
|
||||||
|
|
||||||
|
let updated = refresh_back_refs(&mut conn);
|
||||||
|
assert_eq!(updated, 1);
|
||||||
|
|
||||||
|
let row = diesel::sql_query("SELECT library_id AS n FROM face_detections")
|
||||||
|
.get_result::<CountRow>(&mut conn)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(row.n, 2, "library_id should now point at lib 2");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn refresh_back_refs_no_change_when_back_ref_still_valid() {
|
||||||
|
let mut conn = in_memory_db_connection();
|
||||||
|
insert_image_exif(&mut conn, 1, "a.jpg", Some("h1"));
|
||||||
|
insert_face(&mut conn, 1, "a.jpg", "h1");
|
||||||
|
|
||||||
|
let updated = refresh_back_refs(&mut conn);
|
||||||
|
assert_eq!(updated, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn refresh_back_refs_no_change_when_hash_fully_orphaned() {
|
||||||
|
// Hash exists on face_detections but no surviving image_exif
|
||||||
|
// row for it → the refresh is a no-op (orphan GC handles
|
||||||
|
// these). Important: the SET subquery would return NULL and
|
||||||
|
// we'd null out the back-ref otherwise; the EXISTS guard
|
||||||
|
// protects against that.
|
||||||
|
let mut conn = in_memory_db_connection();
|
||||||
|
insert_face(&mut conn, 1, "gone.jpg", "h1");
|
||||||
|
|
||||||
|
let updated = refresh_back_refs(&mut conn);
|
||||||
|
assert_eq!(updated, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn orphan_gc_requires_two_consecutive_all_online_ticks() {
|
||||||
|
let mut conn = in_memory_db_connection();
|
||||||
|
// Hash present in face_detections but NOT image_exif → orphan.
|
||||||
|
insert_face(&mut conn, 1, "x.jpg", "h-orphan");
|
||||||
|
let mut state = OrphanGcState::default();
|
||||||
|
|
||||||
|
// Tick 1: prev_tick_all_online is false (default), so even
|
||||||
|
// with current tick all-online we mark only.
|
||||||
|
let stats = run_orphan_gc(&mut conn, &mut state, true);
|
||||||
|
assert_eq!(stats.newly_marked, 1);
|
||||||
|
assert_eq!(stats.total_deleted(), 0);
|
||||||
|
assert_eq!(state.pending.len(), 1);
|
||||||
|
|
||||||
|
// Tick 2: prev_tick_all_online is now true, current tick still
|
||||||
|
// all-online → consensus reached, hash gets deleted.
|
||||||
|
let stats = run_orphan_gc(&mut conn, &mut state, true);
|
||||||
|
assert_eq!(stats.deleted_face_detections, 1);
|
||||||
|
assert!(state.pending.is_empty());
|
||||||
|
|
||||||
|
// Tick 3: nothing left.
|
||||||
|
let stats = run_orphan_gc(&mut conn, &mut state, true);
|
||||||
|
assert_eq!(stats.total_deleted(), 0);
|
||||||
|
assert_eq!(stats.newly_marked, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn orphan_gc_resets_consensus_on_stale_library() {
|
||||||
|
let mut conn = in_memory_db_connection();
|
||||||
|
insert_face(&mut conn, 1, "x.jpg", "h-orphan");
|
||||||
|
let mut state = OrphanGcState::default();
|
||||||
|
|
||||||
|
// Tick 1: all-online, mark.
|
||||||
|
run_orphan_gc(&mut conn, &mut state, true);
|
||||||
|
// Tick 2: stale library — consensus window resets, no delete.
|
||||||
|
let stats = run_orphan_gc(&mut conn, &mut state, false);
|
||||||
|
assert_eq!(stats.total_deleted(), 0);
|
||||||
|
assert!(!state.prev_tick_all_online);
|
||||||
|
// Tick 3: all-online again — but we need ANOTHER tick to set
|
||||||
|
// prev_tick_all_online before deletes can fire. So tick 3
|
||||||
|
// marks (no-op on existing pending), tick 4 deletes.
|
||||||
|
let stats = run_orphan_gc(&mut conn, &mut state, true);
|
||||||
|
assert_eq!(stats.total_deleted(), 0);
|
||||||
|
let stats = run_orphan_gc(&mut conn, &mut state, true);
|
||||||
|
assert_eq!(stats.deleted_face_detections, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn orphan_gc_revives_when_image_exif_reappears() {
|
||||||
|
let mut conn = in_memory_db_connection();
|
||||||
|
insert_face(&mut conn, 1, "x.jpg", "h-orphan");
|
||||||
|
let mut state = OrphanGcState::default();
|
||||||
|
|
||||||
|
// Tick 1: mark.
|
||||||
|
run_orphan_gc(&mut conn, &mut state, true);
|
||||||
|
assert!(state.pending.contains("h-orphan"));
|
||||||
|
|
||||||
|
// Between ticks, the image_exif row reappears (e.g. backup
|
||||||
|
// share was briefly stale). Hash is no longer orphaned.
|
||||||
|
insert_image_exif(&mut conn, 2, "x.jpg", Some("h-orphan"));
|
||||||
|
|
||||||
|
let stats = run_orphan_gc(&mut conn, &mut state, true);
|
||||||
|
assert_eq!(stats.revived, 1);
|
||||||
|
assert_eq!(stats.total_deleted(), 0);
|
||||||
|
assert!(state.pending.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn orphan_gc_deletes_across_all_three_tables() {
|
||||||
|
let mut conn = in_memory_db_connection();
|
||||||
|
// Same orphan hash appears in all three derived tables.
|
||||||
|
insert_face(&mut conn, 1, "a.jpg", "h-orphan");
|
||||||
|
insert_tag_with_hash(&mut conn, "a.jpg", "h-orphan");
|
||||||
|
insert_insight_with_hash(&mut conn, 1, "a.jpg", "h-orphan");
|
||||||
|
|
||||||
|
let mut state = OrphanGcState::default();
|
||||||
|
run_orphan_gc(&mut conn, &mut state, true);
|
||||||
|
let stats = run_orphan_gc(&mut conn, &mut state, true);
|
||||||
|
assert_eq!(stats.deleted_face_detections, 1);
|
||||||
|
assert_eq!(stats.deleted_tagged_photo, 1);
|
||||||
|
assert_eq!(stats.deleted_photo_insights, 1);
|
||||||
|
|
||||||
|
assert_eq!(count(&mut conn, "SELECT COUNT(*) AS n FROM face_detections"), 0);
|
||||||
|
assert_eq!(count(&mut conn, "SELECT COUNT(*) AS n FROM tagged_photo"), 0);
|
||||||
|
assert_eq!(count(&mut conn, "SELECT COUNT(*) AS n FROM photo_insights"), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn all_libraries_online_helper() {
|
||||||
|
use crate::libraries::{LibraryHealth, new_health_map};
|
||||||
|
let libs = vec![
|
||||||
|
Library {
|
||||||
|
id: 1,
|
||||||
|
name: "a".into(),
|
||||||
|
root_path: "/x".into(),
|
||||||
|
},
|
||||||
|
Library {
|
||||||
|
id: 2,
|
||||||
|
name: "b".into(),
|
||||||
|
root_path: "/y".into(),
|
||||||
|
},
|
||||||
|
];
|
||||||
|
let health = new_health_map(&libs);
|
||||||
|
assert!(all_libraries_online(&libs, &health));
|
||||||
|
|
||||||
|
// Flip lib 2 to stale.
|
||||||
|
{
|
||||||
|
let mut g = health.write().unwrap();
|
||||||
|
g.insert(
|
||||||
|
2,
|
||||||
|
LibraryHealth::Stale {
|
||||||
|
reason: "test".into(),
|
||||||
|
since: 0,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
assert!(!all_libraries_online(&libs, &health));
|
||||||
|
}
|
||||||
|
}
|
||||||
76
src/main.rs
76
src/main.rs
@@ -72,6 +72,7 @@ mod file_types;
|
|||||||
mod files;
|
mod files;
|
||||||
mod geo;
|
mod geo;
|
||||||
mod libraries;
|
mod libraries;
|
||||||
|
mod library_maintenance;
|
||||||
mod state;
|
mod state;
|
||||||
mod tags;
|
mod tags;
|
||||||
mod utils;
|
mod utils;
|
||||||
@@ -1945,6 +1946,29 @@ fn watch_files(
|
|||||||
let mut last_full_scan = SystemTime::now();
|
let mut last_full_scan = SystemTime::now();
|
||||||
let mut scan_count = 0u64;
|
let mut scan_count = 0u64;
|
||||||
|
|
||||||
|
// Per-library cursor for the missing-file scan. Each tick reads
|
||||||
|
// a page from `offset`, stat()s the rows, deletes confirmed-
|
||||||
|
// missing ones, and advances or wraps the cursor. State held
|
||||||
|
// in-memory so a watcher restart resumes from 0 — fine, the
|
||||||
|
// sweep is idempotent.
|
||||||
|
let mut missing_file_offsets: std::collections::HashMap<i32, i64> =
|
||||||
|
std::collections::HashMap::new();
|
||||||
|
|
||||||
|
let missing_scan_page_size: i64 = dotenv::var("IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.filter(|n: &i64| *n > 0)
|
||||||
|
.unwrap_or(library_maintenance::DEFAULT_SCAN_PAGE_SIZE);
|
||||||
|
let missing_delete_cap: usize = dotenv::var("IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.filter(|n: &usize| *n > 0)
|
||||||
|
.unwrap_or(library_maintenance::DEFAULT_MISSING_DELETE_CAP);
|
||||||
|
|
||||||
|
// Two-tick orphan-GC consensus state. Carried across ticks via
|
||||||
|
// `OrphanGcState`; see library_maintenance::run_orphan_gc.
|
||||||
|
let mut orphan_gc_state = library_maintenance::OrphanGcState::default();
|
||||||
|
|
||||||
// Initial availability sweep before the loop's first sleep so
|
// Initial availability sweep before the loop's first sleep so
|
||||||
// /libraries reports the truth from the very first request,
|
// /libraries reports the truth from the very first request,
|
||||||
// rather than the optimistic Online default that
|
// rather than the optimistic Online default that
|
||||||
@@ -2061,6 +2085,35 @@ fn watch_files(
|
|||||||
|
|
||||||
// Update media counts per library (metric aggregates across all)
|
// Update media counts per library (metric aggregates across all)
|
||||||
update_media_counts(Path::new(&lib.root_path), &excluded_dirs);
|
update_media_counts(Path::new(&lib.root_path), &excluded_dirs);
|
||||||
|
|
||||||
|
// Missing-file detection: prune image_exif rows whose
|
||||||
|
// source file is no longer on disk. Per-library, so we
|
||||||
|
// pass library-online-this-tick implicitly (we only
|
||||||
|
// reach here if the probe gate at the top of the
|
||||||
|
// iteration passed). Capped + paginated so a huge
|
||||||
|
// library doesn't stall the watcher; rows we don't
|
||||||
|
// visit this tick get visited next tick. See
|
||||||
|
// library_maintenance::detect_missing_files_for_library.
|
||||||
|
{
|
||||||
|
let context = opentelemetry::Context::new();
|
||||||
|
let offset = missing_file_offsets.get(&lib.id).copied().unwrap_or(0);
|
||||||
|
let (deleted, next_offset) =
|
||||||
|
library_maintenance::detect_missing_files_for_library(
|
||||||
|
&context,
|
||||||
|
lib,
|
||||||
|
&exif_dao,
|
||||||
|
offset,
|
||||||
|
missing_scan_page_size,
|
||||||
|
missing_delete_cap,
|
||||||
|
);
|
||||||
|
missing_file_offsets.insert(lib.id, next_offset);
|
||||||
|
if deleted > 0 {
|
||||||
|
debug!(
|
||||||
|
"missing-file scan: library '{}' next_offset={}",
|
||||||
|
lib.name, next_offset
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reconciliation: cross-library, so it runs once per tick
|
// Reconciliation: cross-library, so it runs once per tick
|
||||||
@@ -2072,6 +2125,29 @@ fn watch_files(
|
|||||||
{
|
{
|
||||||
let mut conn = image_api::database::connect();
|
let mut conn = image_api::database::connect();
|
||||||
let _ = image_api::database::reconcile::run(&mut conn);
|
let _ = image_api::database::reconcile::run(&mut conn);
|
||||||
|
|
||||||
|
// Back-ref refresh: hash-keyed rows whose
|
||||||
|
// (library_id, rel_path) tuple no longer matches any
|
||||||
|
// image_exif row but whose hash still does. After a
|
||||||
|
// recent→archive move, the missing-file scan removes
|
||||||
|
// the old image_exif row; this pass repoints face /
|
||||||
|
// tag / insight back-refs at the surviving location.
|
||||||
|
// DB-only, no health gate needed — uses what's in
|
||||||
|
// image_exif as truth.
|
||||||
|
let _ = library_maintenance::refresh_back_refs(&mut conn);
|
||||||
|
|
||||||
|
// Orphan GC: the destructive end of the maintenance
|
||||||
|
// pipeline. Two-tick consensus + every-library-online
|
||||||
|
// requirement is enforced inside run_orphan_gc; we
|
||||||
|
// pass the current all-online flag and the function
|
||||||
|
// tracks the previous tick's flag in OrphanGcState.
|
||||||
|
let all_online =
|
||||||
|
library_maintenance::all_libraries_online(&libs, &library_health);
|
||||||
|
let _ = library_maintenance::run_orphan_gc(
|
||||||
|
&mut conn,
|
||||||
|
&mut orphan_gc_state,
|
||||||
|
all_online,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if is_full_scan {
|
if is_full_scan {
|
||||||
|
|||||||
Reference in New Issue
Block a user