feature/multi-library-data-model #67
127
CLAUDE.md
127
CLAUDE.md
@@ -104,6 +104,131 @@ All database access goes through trait-based DAOs (e.g., `ExifDao`, `SqliteExifD
|
||||
- `query_by_exif()`: Complex filtering by camera, GPS bounds, date ranges
|
||||
- Batch operations minimize DB hits during file watching
|
||||
|
||||
### Multi-library data model
|
||||
|
||||
ImageApi supports more than one library (a library = a `(name, root_path)`
|
||||
row in the `libraries` table that maps to a mounted directory tree). The
|
||||
same bytes may exist under more than one library — typical case is an
|
||||
"active" library plus an "archive" library that ingests files as they age
|
||||
out — and the data model is designed so that derived data follows the
|
||||
**bytes**, not the path, while user-managed data does the same.
|
||||
|
||||
**The principle.** A photo's identity is its `content_hash` (blake3, see
|
||||
`src/content_hash.rs`). Anything we compute from or attach to a photo is
|
||||
keyed on that hash so it survives:
|
||||
- the same file appearing in a second library (backup / archive / mirror),
|
||||
- the file moving between libraries (recent → archive handoff),
|
||||
- the file moving within a library (re-organized rel_path),
|
||||
- intra-library duplicates (same bytes at two paths).
|
||||
|
||||
**Table classification.** Three categories drive the keying decision:
|
||||
|
||||
| Category | Key | Rationale | Tables |
|
||||
|---|---|---|---|
|
||||
| Intrinsic to bytes | `content_hash` | Rerunning is wasted work (or LLM cost) | `face_detections` ✓, `image_exif` (target), `photo_insights` (target), `video_preview_clips` (target) |
|
||||
| User intent about a photo | `content_hash` | "Tag this photo" means the bytes, not a path | `tagged_photo` (target), `favorites` (target) |
|
||||
| Library administrative | `(library_id, rel_path)` | Tied to a specific filesystem location | `libraries`, `entity_photo_links`, the `rel_path` back-ref columns on hash-keyed tables |
|
||||
|
||||
✓ = already implemented this way. *(target)* = today still keyed on
|
||||
`(library_id, rel_path)` and slated for migration. The migration adds a
|
||||
nullable `content_hash` column, populates it from `image_exif` where
|
||||
known, and read paths fall back to rel_path while the hash is null.
|
||||
|
||||
**Carrying a `rel_path` even when hash-keyed.** Hash-keyed tables retain
|
||||
`(library_id, rel_path)` columns as a denormalized **back-reference**, not
|
||||
as the key. This lets a single query answer "what is at this path right
|
||||
now" without joining through `image_exif`, and supports the path-only
|
||||
endpoints that predate the hash. `face_detections` is the reference
|
||||
implementation: hash is the truth, path is a hint.
|
||||
|
||||
**Merge semantics on read.** When the same hash has rows under more than
|
||||
one library:
|
||||
- Set-valued data (tags, favorites, faces, entity links) → **union**.
|
||||
- Scalar data (current insight, EXIF row, video preview clip) → earliest
|
||||
`generated_at` / `created_time` wins. The historical lib1 row beats a
|
||||
re-generated lib2 row, so the user's curated insight isn't shadowed by
|
||||
a re-run on archive ingest.
|
||||
|
||||
**Write attribution.** A new tag/favorite/insight created while viewing
|
||||
under lib2 binds to the bytes, not to lib2 — so it shows up under lib1
|
||||
too. This is by design, but it's the most surprising rule on first
|
||||
encounter; clients should not assume tags are library-scoped.
|
||||
|
||||
**Hash-less rows (transitional state).** During and immediately after a
|
||||
new mount, `image_exif.content_hash` is being populated by
|
||||
`backfill_unhashed_backlog` (capped per tick). Rules during this window:
|
||||
- Writes: if the hash is known, write hash-keyed. If not, write
|
||||
`(library_id, rel_path)`-keyed and let the reconciliation job collapse
|
||||
duplicates once the hash lands.
|
||||
- Reads: prefer hash key, fall back to `(library_id, rel_path)`.
|
||||
- Reconciliation: a one-shot pass after every backfill tick collapses
|
||||
rows that now share a hash, applying the merge semantics above.
|
||||
Idempotent — safe to re-run.
|
||||
|
||||
**Library handoff (recent → archive).** When a file moves between
|
||||
libraries (e.g. operator moves `~/photos/2024/IMG.nef` to the archive
|
||||
mount), the file watcher sees the disappearance under lib1 and the
|
||||
appearance under lib2. Hash-keyed rows don't need migration; the
|
||||
`(library_id, rel_path)` back-ref columns are updated to point to the new
|
||||
location. Library administrative rows (`entity_photo_links`,
|
||||
`(library_id, rel_path)` rows in `image_exif` for hash-less items) are
|
||||
re-keyed by the move detector, which matches a disappearance to an
|
||||
appearance by `content_hash` within a configurable window.
|
||||
|
||||
**Orphans (source deleted while a copy survives).** When the only
|
||||
`image_exif` row for a hash is deleted (file removed from disk), the
|
||||
hash-keyed derived rows survive **as long as another `image_exif` row
|
||||
references the same hash**. If the last reference is gone, derived rows
|
||||
are eligible for GC (deferred — the GC job runs on a slow schedule so
|
||||
that a brief unmount or rename doesn't wipe history).
|
||||
|
||||
**Stats and counts.** When reporting "how many photos do you have," count
|
||||
`DISTINCT content_hash` over `image_exif`, not row count. Faces stats
|
||||
already does this (`FaceDao::stats` in `src/faces.rs`); other counters
|
||||
should follow suit. Numerator and denominator must live in the same
|
||||
domain — see the face-stats commentary below for the cautionary tale.
|
||||
|
||||
**Per-library scoping when the user asks for it.** A request scoped to
|
||||
`?library=N` filters the `image_exif` view to that library, and the
|
||||
hash-keyed derived data is joined through that view. The user sees only
|
||||
photos that have a copy under lib N, but the derived data attached to
|
||||
those photos is the merged hash-keyed view. This is the answer to "show
|
||||
me archive photos with their original tags."
|
||||
|
||||
**Library availability and safety.** Libraries can be on network shares
|
||||
or removable media; the file watcher must not interpret a temporary
|
||||
unavailability as a mass-deletion event. Every tick begins with a
|
||||
**presence probe** per library: the library is considered online iff
|
||||
its `root_path` exists, is readable, and a top-level scan returns at
|
||||
least one expected entry (or matches a recent file-count high-water
|
||||
mark within a tolerance). The probe result gates which actions are safe
|
||||
to run on that library this tick:
|
||||
|
||||
| Action | Requires online? |
|
||||
|---|---|
|
||||
| Quick / full scan ingest of new files | yes |
|
||||
| EXIF / face / insight backlog drains | yes — but the work runs against any online library |
|
||||
| Move-handoff detection (lib1 disappearance ↔ lib2 appearance match) | **both** libraries online |
|
||||
| `(library_id, rel_path)` re-keying on detected move | **both** libraries online |
|
||||
| Orphan GC of hash-keyed derived data | all libraries that have *ever* held the hash must be online and confirmed-clean for two consecutive ticks |
|
||||
| Reads / serving | always allowed; falls back to whichever library is online |
|
||||
|
||||
A library that fails the probe enters a "stale" state: writes scoped to
|
||||
it are paused, its rows are flagged stale (not deleted) in
|
||||
`/libraries` status, and the watcher logs at `warn` once per
|
||||
state-transition (not per tick). A library that recovers re-enters the
|
||||
online set automatically; no operator action required for transient
|
||||
outages. The intent is that pulling a USB drive, rebooting a NAS, or
|
||||
losing a VPN never triggers a destructive code path — the worst case is
|
||||
that derived-data work pauses until the share returns.
|
||||
|
||||
The same rule constrains the move-handoff matcher: a disappearance
|
||||
under lib1 only counts as a "move" if there is a matching appearance
|
||||
under another **online** library within the window. A bare
|
||||
disappearance with no matching appearance is treated as
|
||||
"unavailable-or-deleted, defer judgment" — it does not re-key any rows
|
||||
and does not enqueue GC.
|
||||
|
||||
### File Processing Pipeline
|
||||
|
||||
**Thumbnail Generation:**
|
||||
@@ -219,7 +344,7 @@ ImageApi owns the face data; Apollo (sibling repo) hosts the insightface inferen
|
||||
- `persons(id, name UNIQUE COLLATE NOCASE, cover_face_id, entity_id, created_from_tag, notes, ...)` — operator-managed, name is the user-visible identity.
|
||||
- `face_detections(id, library_id, content_hash, rel_path, bbox_*, embedding BLOB, confidence, source, person_id, status, model_version, ...)` — keyed on `content_hash` so a photo duplicated across libraries is detected once. Marker rows for `status IN ('no_faces','failed')` carry NULL bbox/embedding (CHECK constraint enforces this).
|
||||
|
||||
**Why content_hash and not (library_id, rel_path):** ties face data to the bytes, not the path. A backup mount that copies files from the primary library naturally inherits the existing detections without re-running inference.
|
||||
**Why content_hash and not (library_id, rel_path):** ties face data to the bytes, not the path. A backup mount that copies files from the primary library naturally inherits the existing detections without re-running inference. This is the reference implementation of the multi-library data model — see "Multi-library data model" above.
|
||||
|
||||
**File-watch hook** (`src/main.rs::process_new_files`): for each photo with a populated `content_hash`, check `FaceDao::already_scanned(hash)`; if not, send bytes (or embedded JPEG preview for RAW via `exif::extract_embedded_jpeg_preview`) to Apollo's `/api/internal/faces/detect`. K=`FACE_DETECT_CONCURRENCY` (default 8) parallel calls per scan tick; Apollo serializes them via its single-worker GPU pool. `face_watch.rs` is the Tokio orchestration layer.
|
||||
|
||||
|
||||
@@ -53,12 +53,36 @@ pub fn thumbnail_path(thumbs_dir: &Path, hash: &str) -> PathBuf {
|
||||
/// Hash-keyed HLS output directory: `<video_dir>/<hash[..2]>/<hash>/`.
|
||||
/// The playlist lives at `playlist.m3u8` inside this directory and its
|
||||
/// segments are co-located so HLS relative references Just Work.
|
||||
///
|
||||
/// Allow-dead until Branch B/C rewires the HLS pipeline to use it; the
|
||||
/// helper lives here today so Branch A's path layout decisions stay
|
||||
/// adjacent to thumbnail/legacy ones.
|
||||
#[allow(dead_code)]
|
||||
pub fn hls_dir(video_dir: &Path, hash: &str) -> PathBuf {
|
||||
let shard = shard_prefix(hash);
|
||||
video_dir.join(shard).join(hash)
|
||||
}
|
||||
|
||||
/// Library-scoped legacy mirrored path:
|
||||
/// `<derivative_dir>/<library_id>/<rel_path>`. Used as the fallback when
|
||||
/// `content_hash` isn't available — the library prefix prevents the
|
||||
/// "lib1 wrote `vacation/IMG.jpg` first, lib2 sees thumb_path.exists()
|
||||
/// and serves the wrong image" failure mode.
|
||||
///
|
||||
/// Existing single-library deployments may already have thumbnails at the
|
||||
/// bare-legacy `<derivative_dir>/<rel_path>` shape; serving code is
|
||||
/// expected to check both this scoped path and the bare-legacy path so
|
||||
/// nothing 404s during the transition.
|
||||
pub fn library_scoped_legacy_path(
|
||||
derivative_dir: &Path,
|
||||
library_id: i32,
|
||||
rel_path: impl AsRef<Path>,
|
||||
) -> PathBuf {
|
||||
derivative_dir
|
||||
.join(library_id.to_string())
|
||||
.join(rel_path)
|
||||
}
|
||||
|
||||
fn shard_prefix(hash: &str) -> &str {
|
||||
let end = hash
|
||||
.char_indices()
|
||||
@@ -105,4 +129,17 @@ mod tests {
|
||||
let d = hls_dir(video, "1234deadbeef");
|
||||
assert_eq!(d, PathBuf::from("/tmp/video/12/1234deadbeef"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn library_scoped_legacy_path_prefixes_with_library_id() {
|
||||
let thumbs = Path::new("/tmp/thumbs");
|
||||
let p = library_scoped_legacy_path(thumbs, 7, "vacation/IMG.jpg");
|
||||
assert_eq!(p, PathBuf::from("/tmp/thumbs/7/vacation/IMG.jpg"));
|
||||
|
||||
// Same rel_path, different library — different output. This is
|
||||
// the whole point: lib 1 and lib 2 don't clobber each other.
|
||||
let p1 = library_scoped_legacy_path(thumbs, 1, "vacation/IMG.jpg");
|
||||
let p2 = library_scoped_legacy_path(thumbs, 2, "vacation/IMG.jpg");
|
||||
assert_ne!(p1, p2);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -295,17 +295,29 @@ pub trait ExifDao: Sync + Send {
|
||||
library_id: Option<i32>,
|
||||
) -> Result<Vec<(String, i64)>, DbError>;
|
||||
|
||||
/// Batch load EXIF data for multiple file paths (single query)
|
||||
/// Batch load EXIF data for multiple file paths (single query). When
|
||||
/// `library_id = Some(id)` the lookup is keyed on `(library_id,
|
||||
/// rel_path)`; cross-library duplicates with the same rel_path are
|
||||
/// excluded. `None` keeps the legacy rel-path-only behavior — used by
|
||||
/// the union-mode `/photos` listing, which already disambiguates by
|
||||
/// `(file_path, library_id)` in the caller.
|
||||
fn get_exif_batch(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
library_id: Option<i32>,
|
||||
file_paths: &[String],
|
||||
) -> Result<Vec<ImageExif>, DbError>;
|
||||
|
||||
/// Query files by EXIF criteria with optional filters
|
||||
/// Query files by EXIF criteria with optional filters. `library_id =
|
||||
/// Some(id)` restricts to that library; `None` spans every library
|
||||
/// (used by the unscoped `/photos` form). The composite
|
||||
/// `(library_id, date_taken)` index added in the multi_library
|
||||
/// migration depends on `library_id` being part of the WHERE clause —
|
||||
/// callers that have a library context must pass it.
|
||||
fn query_by_exif(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
library_id: Option<i32>,
|
||||
camera_make: Option<&str>,
|
||||
camera_model: Option<&str>,
|
||||
lens_model: Option<&str>,
|
||||
@@ -443,6 +455,16 @@ pub trait ExifDao: Sync + Send {
|
||||
library_id: i32,
|
||||
rel_path: &str,
|
||||
) -> Result<(), DbError>;
|
||||
|
||||
/// Number of image_exif rows for a library. Used by the availability
|
||||
/// probe to decide whether an empty mount is "fresh" (zero rows: fine)
|
||||
/// or "the share went offline" (non-zero rows: stale). Zero on query
|
||||
/// error so a transient DB hiccup doesn't itself cause a Stale flip.
|
||||
fn count_for_library(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
library_id: i32,
|
||||
) -> Result<i64, DbError>;
|
||||
}
|
||||
|
||||
pub struct SqliteExifDao {
|
||||
@@ -622,6 +644,7 @@ impl ExifDao for SqliteExifDao {
|
||||
fn get_exif_batch(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
library_id_filter: Option<i32>,
|
||||
file_paths: &[String],
|
||||
) -> Result<Vec<ImageExif>, DbError> {
|
||||
trace_db_call(context, "query", "get_exif_batch", |_span| {
|
||||
@@ -632,8 +655,11 @@ impl ExifDao for SqliteExifDao {
|
||||
}
|
||||
|
||||
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
|
||||
|
||||
image_exif
|
||||
let mut query = image_exif.into_boxed();
|
||||
if let Some(lib_id) = library_id_filter {
|
||||
query = query.filter(library_id.eq(lib_id));
|
||||
}
|
||||
query
|
||||
.filter(rel_path.eq_any(file_paths))
|
||||
.load::<ImageExif>(connection.deref_mut())
|
||||
.map_err(|_| anyhow::anyhow!("Query error"))
|
||||
@@ -644,6 +670,7 @@ impl ExifDao for SqliteExifDao {
|
||||
fn query_by_exif(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
library_id_filter: Option<i32>,
|
||||
camera_make_filter: Option<&str>,
|
||||
camera_model_filter: Option<&str>,
|
||||
lens_model_filter: Option<&str>,
|
||||
@@ -657,6 +684,12 @@ impl ExifDao for SqliteExifDao {
|
||||
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
|
||||
let mut query = image_exif.into_boxed();
|
||||
|
||||
// Library scope (most-selective filter — apply first so the
|
||||
// `(library_id, ...)` indexes are eligible).
|
||||
if let Some(lib_id) = library_id_filter {
|
||||
query = query.filter(library_id.eq(lib_id));
|
||||
}
|
||||
|
||||
// Camera filters (case-insensitive partial match)
|
||||
if let Some(make) = camera_make_filter {
|
||||
query = query.filter(camera_make.like(format!("%{}%", make)));
|
||||
@@ -1078,6 +1111,23 @@ impl ExifDao for SqliteExifDao {
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
|
||||
fn count_for_library(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
library_id_val: i32,
|
||||
) -> Result<i64, DbError> {
|
||||
trace_db_call(context, "query", "count_for_library", |_span| {
|
||||
use schema::image_exif::dsl::*;
|
||||
|
||||
image_exif
|
||||
.filter(library_id.eq(library_id_val))
|
||||
.count()
|
||||
.get_result::<i64>(self.connection.lock().unwrap().deref_mut())
|
||||
.map_err(|_| anyhow::anyhow!("Count error"))
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -1167,4 +1217,61 @@ mod exif_dao_tests {
|
||||
let lib1 = dao.get_all_with_date_taken(&ctx(), Some(1)).unwrap();
|
||||
assert_eq!(lib1, vec![("main/a.jpg".to_string(), 100)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn query_by_exif_scopes_by_library_id() {
|
||||
let mut dao = setup_two_libraries();
|
||||
insert_row(&mut dao, 1, "main/a.jpg", Some(100));
|
||||
insert_row(&mut dao, 2, "archive/a.jpg", Some(200));
|
||||
|
||||
// Union: both rows.
|
||||
let all = dao
|
||||
.query_by_exif(&ctx(), None, None, None, None, None, None, None)
|
||||
.unwrap();
|
||||
assert_eq!(all.len(), 2);
|
||||
|
||||
// Scoped to lib 2: only archive row.
|
||||
let lib2 = dao
|
||||
.query_by_exif(&ctx(), Some(2), None, None, None, None, None, None)
|
||||
.unwrap();
|
||||
assert_eq!(lib2.len(), 1);
|
||||
assert_eq!(lib2[0].file_path, "archive/a.jpg");
|
||||
assert_eq!(lib2[0].library_id, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_exif_batch_scopes_by_library_id() {
|
||||
let mut dao = setup_two_libraries();
|
||||
// Same rel_path, different libraries — the cross-library duplicate
|
||||
// case the audit flagged.
|
||||
insert_row(&mut dao, 1, "shared/photo.jpg", Some(100));
|
||||
insert_row(&mut dao, 2, "shared/photo.jpg", Some(200));
|
||||
|
||||
// None spans both libraries (legacy union behavior).
|
||||
let union = dao
|
||||
.get_exif_batch(&ctx(), None, &["shared/photo.jpg".to_string()])
|
||||
.unwrap();
|
||||
assert_eq!(union.len(), 2);
|
||||
|
||||
// Some(2) returns only the archive row.
|
||||
let scoped = dao
|
||||
.get_exif_batch(&ctx(), Some(2), &["shared/photo.jpg".to_string()])
|
||||
.unwrap();
|
||||
assert_eq!(scoped.len(), 1);
|
||||
assert_eq!(scoped[0].library_id, 2);
|
||||
assert_eq!(scoped[0].date_taken, Some(200));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn count_for_library_returns_per_library_count() {
|
||||
let mut dao = setup_two_libraries();
|
||||
insert_row(&mut dao, 1, "main/a.jpg", None);
|
||||
insert_row(&mut dao, 1, "main/b.jpg", None);
|
||||
insert_row(&mut dao, 2, "archive/a.jpg", None);
|
||||
|
||||
assert_eq!(dao.count_for_library(&ctx(), 1).unwrap(), 2);
|
||||
assert_eq!(dao.count_for_library(&ctx(), 2).unwrap(), 1);
|
||||
// Unknown library: zero, no error.
|
||||
assert_eq!(dao.count_for_library(&ctx(), 999).unwrap(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
47
src/files.rs
47
src/files.rs
@@ -110,11 +110,18 @@ fn in_memory_date_sort(
|
||||
let total_count = files.len() as i64;
|
||||
let file_paths: Vec<String> = files.iter().map(|f| f.file_name.clone()).collect();
|
||||
|
||||
// Batch fetch EXIF data (keyed by rel_path; in union mode a rel_path may
|
||||
// correspond to rows in multiple libraries — pick the date from the one
|
||||
// matching the requesting row's library_id when possible).
|
||||
// Batch fetch EXIF data. When every file in this batch belongs to the
|
||||
// same library, scope the SQL filter to that library so cross-library
|
||||
// duplicates with the same rel_path don't get fetched and discarded.
|
||||
// In genuine union mode (mixed libraries) keep the rel-path-only
|
||||
// lookup; the caller's `(file_path, library_id)` map below picks the
|
||||
// right row.
|
||||
let scope_library = match file_libraries.first() {
|
||||
Some(&first) if file_libraries.iter().all(|&id| id == first) => Some(first),
|
||||
_ => None,
|
||||
};
|
||||
let exif_rows = exif_dao
|
||||
.get_exif_batch(span_context, &file_paths)
|
||||
.get_exif_batch(span_context, scope_library, &file_paths)
|
||||
.unwrap_or_default();
|
||||
let exif_map: std::collections::HashMap<(String, i32), i64> = exif_rows
|
||||
.into_iter()
|
||||
@@ -309,11 +316,15 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
||||
None
|
||||
};
|
||||
|
||||
// Query EXIF database
|
||||
// Query EXIF database. When the request named a library, the EXIF
|
||||
// filter must be scoped to it — otherwise camera/date/GPS hits
|
||||
// from other libraries would pollute the result set even though
|
||||
// downstream filesystem walks would never visit those files.
|
||||
let mut exif_dao_guard = exif_dao.lock().expect("Unable to get ExifDao");
|
||||
let exif_results = exif_dao_guard
|
||||
.query_by_exif(
|
||||
&span_context,
|
||||
library.map(|l| l.id),
|
||||
req.camera_make.as_deref(),
|
||||
req.camera_model.as_deref(),
|
||||
req.lens_model.as_deref(),
|
||||
@@ -1242,15 +1253,19 @@ pub async fn list_exif_summary(
|
||||
.collect();
|
||||
|
||||
let mut exif_dao_guard = exif_dao.lock().expect("Unable to get ExifDao");
|
||||
match exif_dao_guard.query_by_exif(&cx, None, None, None, None, req.date_from, req.date_to) {
|
||||
match exif_dao_guard.query_by_exif(
|
||||
&cx,
|
||||
library_filter,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
req.date_from,
|
||||
req.date_to,
|
||||
) {
|
||||
Ok(rows) => {
|
||||
let photos: Vec<ExifSummary> = rows
|
||||
.into_iter()
|
||||
// Library filter post-query: keeps the DAO trait (and its
|
||||
// mocks) unchanged. For typical 2–3 library setups the in-
|
||||
// memory pass over a date-bounded result set is negligible;
|
||||
// can be pushed into SQL later if it ever isn't.
|
||||
.filter(|r| library_filter.is_none_or(|id| r.library_id == id))
|
||||
.map(|r| ExifSummary {
|
||||
library_name: library_names.get(&r.library_id).cloned(),
|
||||
file_path: r.file_path,
|
||||
@@ -1549,6 +1564,7 @@ mod tests {
|
||||
fn get_exif_batch(
|
||||
&mut self,
|
||||
_context: &opentelemetry::Context,
|
||||
_library_id: Option<i32>,
|
||||
_: &[String],
|
||||
) -> Result<Vec<crate::database::models::ImageExif>, DbError> {
|
||||
Ok(Vec::new())
|
||||
@@ -1557,6 +1573,7 @@ mod tests {
|
||||
fn query_by_exif(
|
||||
&mut self,
|
||||
_context: &opentelemetry::Context,
|
||||
_library_id: Option<i32>,
|
||||
_: Option<&str>,
|
||||
_: Option<&str>,
|
||||
_: Option<&str>,
|
||||
@@ -1684,6 +1701,14 @@ mod tests {
|
||||
) -> Result<(), DbError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn count_for_library(
|
||||
&mut self,
|
||||
_context: &opentelemetry::Context,
|
||||
_library_id: i32,
|
||||
) -> Result<i64, DbError> {
|
||||
Ok(0)
|
||||
}
|
||||
}
|
||||
|
||||
mod api {
|
||||
|
||||
262
src/libraries.rs
262
src/libraries.rs
@@ -3,7 +3,9 @@ use chrono::Utc;
|
||||
use diesel::prelude::*;
|
||||
use diesel::sqlite::SqliteConnection;
|
||||
use log::{info, warn};
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use crate::data::Claims;
|
||||
use crate::database::models::{InsertLibrary, LibraryRow};
|
||||
@@ -146,16 +148,165 @@ pub fn resolve_library_param<'a>(
|
||||
.ok_or_else(|| format!("unknown library name: {}", raw))
|
||||
}
|
||||
|
||||
/// Health of a library at a point in time. Probed at the top of each
|
||||
/// file-watcher tick. The `Stale` state is the "be conservative" signal:
|
||||
/// destructive paths (ingest writes, future move-handoff and orphan GC in
|
||||
/// branches B/C) skip a stale library, but reads/serving stay unaffected.
|
||||
///
|
||||
/// See `CLAUDE.md` → "Library availability and safety" for the policy.
|
||||
#[derive(Clone, Debug, serde::Serialize, PartialEq, Eq)]
|
||||
#[serde(tag = "state", rename_all = "snake_case")]
|
||||
pub enum LibraryHealth {
|
||||
Online,
|
||||
Stale {
|
||||
reason: String,
|
||||
/// Unix timestamp (seconds) of the most recent transition into
|
||||
/// Stale. Held for telemetry / `/libraries` surfacing only —
|
||||
/// gating logic doesn't read it.
|
||||
since: i64,
|
||||
},
|
||||
}
|
||||
|
||||
impl LibraryHealth {
|
||||
pub fn is_online(&self) -> bool {
|
||||
matches!(self, LibraryHealth::Online)
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared snapshot of every configured library's health, keyed by
|
||||
/// `library_id`. The watcher writes; HTTP handlers read. RwLock because
|
||||
/// reads vastly outnumber writes (one tick vs. every status request).
|
||||
pub type LibraryHealthMap = Arc<RwLock<HashMap<i32, LibraryHealth>>>;
|
||||
|
||||
/// Construct an initial health map. Libraries start `Online`; the first
|
||||
/// probe will downgrade any that fail. Starting `Stale` would block ingest
|
||||
/// for the watcher's first tick on a healthy mount, which is the wrong
|
||||
/// default for a server that's just been restarted.
|
||||
pub fn new_health_map(libs: &[Library]) -> LibraryHealthMap {
|
||||
let mut m = HashMap::with_capacity(libs.len());
|
||||
for lib in libs {
|
||||
m.insert(lib.id, LibraryHealth::Online);
|
||||
}
|
||||
Arc::new(RwLock::new(m))
|
||||
}
|
||||
|
||||
/// Probe a library's mount point. Cheap: stat + open dir + peek one entry.
|
||||
///
|
||||
/// `had_data` is the caller's prior knowledge that this library has been
|
||||
/// non-empty before — typically `image_exif` row count > 0. When true, an
|
||||
/// empty directory is suspicious (it's how an unmounted NFS share looks);
|
||||
/// when false, it's accepted as a fresh mount that simply hasn't been
|
||||
/// indexed yet.
|
||||
///
|
||||
/// Note: stat / read_dir on a hard-mounted, unreachable NFS share can
|
||||
/// block. The watcher accepts that risk for now — the worst case is that
|
||||
/// the tick stalls until the mount returns, which is no more destructive
|
||||
/// than the pre-probe behavior. A future enhancement can wrap this in a
|
||||
/// thread + timeout if it becomes an operational issue.
|
||||
pub fn probe_online(lib: &Library, had_data: bool) -> LibraryHealth {
|
||||
let now = Utc::now().timestamp();
|
||||
let path = Path::new(&lib.root_path);
|
||||
|
||||
let metadata = match std::fs::metadata(path) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
return LibraryHealth::Stale {
|
||||
reason: format!("root_path stat failed: {}", e),
|
||||
since: now,
|
||||
};
|
||||
}
|
||||
};
|
||||
if !metadata.is_dir() {
|
||||
return LibraryHealth::Stale {
|
||||
reason: format!("root_path is not a directory: {}", lib.root_path),
|
||||
since: now,
|
||||
};
|
||||
}
|
||||
|
||||
let mut entries = match std::fs::read_dir(path) {
|
||||
Ok(it) => it,
|
||||
Err(e) => {
|
||||
return LibraryHealth::Stale {
|
||||
reason: format!("read_dir failed: {}", e),
|
||||
since: now,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
// Empty directory only counts as Stale when we have prior evidence
|
||||
// this library used to have content. A genuinely fresh mount is
|
||||
// legitimately empty, and degrading it would block first-time ingest.
|
||||
if had_data && entries.next().is_none() {
|
||||
return LibraryHealth::Stale {
|
||||
reason: "library is empty but image_exif has rows for it".to_string(),
|
||||
since: now,
|
||||
};
|
||||
}
|
||||
|
||||
LibraryHealth::Online
|
||||
}
|
||||
|
||||
/// Probe `lib`, update `map`, and return the new state. Logs only on a
|
||||
/// state transition (Online↔Stale) so a long outage doesn't spam at every
|
||||
/// tick — operators get one warn on the way down and one info on the way
|
||||
/// up.
|
||||
pub fn refresh_health(map: &LibraryHealthMap, lib: &Library, had_data: bool) -> LibraryHealth {
|
||||
let new_state = probe_online(lib, had_data);
|
||||
let mut guard = map.write().unwrap_or_else(|e| e.into_inner());
|
||||
let prev = guard.get(&lib.id).cloned();
|
||||
let transitioned = matches!(
|
||||
(&prev, &new_state),
|
||||
(None, LibraryHealth::Stale { .. })
|
||||
| (Some(LibraryHealth::Online), LibraryHealth::Stale { .. })
|
||||
| (Some(LibraryHealth::Stale { .. }), LibraryHealth::Online)
|
||||
);
|
||||
if transitioned {
|
||||
match &new_state {
|
||||
LibraryHealth::Online => info!(
|
||||
"Library '{}' (id={}) recovered: {} is online",
|
||||
lib.name, lib.id, lib.root_path
|
||||
),
|
||||
LibraryHealth::Stale { reason, .. } => warn!(
|
||||
"Library '{}' (id={}) is STALE — pausing writes. Reason: {}. Path: {}",
|
||||
lib.name, lib.id, reason, lib.root_path
|
||||
),
|
||||
}
|
||||
}
|
||||
guard.insert(lib.id, new_state.clone());
|
||||
new_state
|
||||
}
|
||||
|
||||
/// Snapshot of one library + its current health, for `/libraries`.
|
||||
#[derive(serde::Serialize)]
|
||||
pub struct LibraryStatus {
|
||||
#[serde(flatten)]
|
||||
pub library: Library,
|
||||
pub health: LibraryHealth,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
pub struct LibrariesResponse {
|
||||
pub libraries: Vec<Library>,
|
||||
pub libraries: Vec<LibraryStatus>,
|
||||
}
|
||||
|
||||
#[get("/libraries")]
|
||||
pub async fn list_libraries(_claims: Claims, app_state: Data<AppState>) -> impl Responder {
|
||||
HttpResponse::Ok().json(LibrariesResponse {
|
||||
libraries: app_state.libraries.clone(),
|
||||
let health_guard = app_state
|
||||
.library_health
|
||||
.read()
|
||||
.unwrap_or_else(|e| e.into_inner());
|
||||
let libraries = app_state
|
||||
.libraries
|
||||
.iter()
|
||||
.map(|lib| LibraryStatus {
|
||||
library: lib.clone(),
|
||||
health: health_guard
|
||||
.get(&lib.id)
|
||||
.cloned()
|
||||
.unwrap_or(LibraryHealth::Online),
|
||||
})
|
||||
.collect();
|
||||
HttpResponse::Ok().json(LibrariesResponse { libraries })
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -279,4 +430,109 @@ mod tests {
|
||||
let err = resolve_library_param(&state, Some("missing")).unwrap_err();
|
||||
assert!(err.contains("unknown library name"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn probe_online_for_existing_non_empty_dir() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
std::fs::write(tmp.path().join("photo.jpg"), b"hello").unwrap();
|
||||
let lib = Library {
|
||||
id: 1,
|
||||
name: "main".into(),
|
||||
root_path: tmp.path().to_string_lossy().into(),
|
||||
};
|
||||
// had_data doesn't matter when the dir has entries.
|
||||
assert!(probe_online(&lib, true).is_online());
|
||||
assert!(probe_online(&lib, false).is_online());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn probe_stale_when_root_missing() {
|
||||
let lib = Library {
|
||||
id: 1,
|
||||
name: "main".into(),
|
||||
root_path: "/nonexistent/definitely/not/here".into(),
|
||||
};
|
||||
assert!(matches!(
|
||||
probe_online(&lib, false),
|
||||
LibraryHealth::Stale { .. }
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn probe_stale_when_root_is_a_file() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let file = tmp.path().join("not-a-dir");
|
||||
std::fs::write(&file, b"x").unwrap();
|
||||
let lib = Library {
|
||||
id: 1,
|
||||
name: "main".into(),
|
||||
root_path: file.to_string_lossy().into(),
|
||||
};
|
||||
assert!(matches!(
|
||||
probe_online(&lib, false),
|
||||
LibraryHealth::Stale { .. }
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn probe_empty_dir_is_online_when_no_prior_data() {
|
||||
// Fresh mount: empty directory, no rows in image_exif. Accept it.
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let lib = Library {
|
||||
id: 1,
|
||||
name: "main".into(),
|
||||
root_path: tmp.path().to_string_lossy().into(),
|
||||
};
|
||||
assert!(probe_online(&lib, false).is_online());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn probe_empty_dir_is_stale_when_prior_data_existed() {
|
||||
// The "share went offline" signal: directory exists but is empty,
|
||||
// and we know the library used to have content. Treat as Stale.
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let lib = Library {
|
||||
id: 1,
|
||||
name: "main".into(),
|
||||
root_path: tmp.path().to_string_lossy().into(),
|
||||
};
|
||||
match probe_online(&lib, true) {
|
||||
LibraryHealth::Stale { reason, .. } => {
|
||||
assert!(reason.contains("empty"), "unexpected reason: {}", reason)
|
||||
}
|
||||
other => panic!("expected Stale, got {:?}", other),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn refresh_health_logs_only_on_transition() {
|
||||
// Smoke test: refresh_health updates the map and reports correctly.
|
||||
// (We can't easily assert on logs without a custom logger; the
|
||||
// important thing is that the state churns properly.)
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let lib = Library {
|
||||
id: 42,
|
||||
name: "test".into(),
|
||||
root_path: tmp.path().to_string_lossy().into(),
|
||||
};
|
||||
let map = new_health_map(&[lib.clone()]);
|
||||
|
||||
// First probe: empty dir, no prior data — Online.
|
||||
let s1 = refresh_health(&map, &lib, false);
|
||||
assert!(s1.is_online());
|
||||
|
||||
// Probe again with had_data=true on the same empty dir — Stale.
|
||||
let s2 = refresh_health(&map, &lib, true);
|
||||
assert!(matches!(s2, LibraryHealth::Stale { .. }));
|
||||
assert_eq!(
|
||||
map.read().unwrap().get(&lib.id).cloned(),
|
||||
Some(s2.clone()),
|
||||
"map should reflect the latest probe"
|
||||
);
|
||||
|
||||
// Recovery: drop a file and probe again.
|
||||
std::fs::write(tmp.path().join("photo.jpg"), b"x").unwrap();
|
||||
let s3 = refresh_health(&map, &lib, true);
|
||||
assert!(s3.is_online());
|
||||
}
|
||||
}
|
||||
|
||||
195
src/main.rs
195
src/main.rs
@@ -150,7 +150,12 @@ async fn get_image(
|
||||
let relative_path_str = relative_path.to_string_lossy().replace('\\', "/");
|
||||
|
||||
let thumbs = &app_state.thumbnail_path;
|
||||
let legacy_thumb_path = Path::new(&thumbs).join(relative_path);
|
||||
let bare_legacy_thumb_path = Path::new(&thumbs).join(relative_path);
|
||||
let scoped_legacy_thumb_path = content_hash::library_scoped_legacy_path(
|
||||
Path::new(&thumbs),
|
||||
library.id,
|
||||
relative_path,
|
||||
);
|
||||
|
||||
// Gif thumbnails are a separate lookup (video GIF previews).
|
||||
// Dual-lookup for gif is out of scope; preserve existing flow.
|
||||
@@ -168,8 +173,16 @@ async fn get_image(
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve the hash-keyed thumbnail (if the row already has a
|
||||
// content_hash) and fall back to the legacy mirrored path.
|
||||
// Lookup chain (most-specific first, falling back as we miss):
|
||||
// 1. hash-keyed (`<thumbs>/<hash[..2]>/<hash>.jpg`) — content
|
||||
// identity, shared across libraries;
|
||||
// 2. library-scoped legacy (`<thumbs>/<lib_id>/<rel_path>`) —
|
||||
// written by current generation when hash isn't known;
|
||||
// 3. bare legacy (`<thumbs>/<rel_path>`) — pre-multi-library
|
||||
// thumbs from the days before library prefixing existed.
|
||||
// Stage (3) goes away once a one-time migration lifts every
|
||||
// bare-legacy file under a library prefix; until then it
|
||||
// prevents needless 404s for already-warmed deployments.
|
||||
let hash_thumb_path: Option<PathBuf> = {
|
||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||
match dao.get_exif(&context, &relative_path_str) {
|
||||
@@ -184,7 +197,14 @@ async fn get_image(
|
||||
.as_ref()
|
||||
.filter(|p| p.exists())
|
||||
.cloned()
|
||||
.unwrap_or_else(|| legacy_thumb_path.clone());
|
||||
.or_else(|| {
|
||||
if scoped_legacy_thumb_path.exists() {
|
||||
Some(scoped_legacy_thumb_path.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap_or_else(|| bare_legacy_thumb_path.clone());
|
||||
|
||||
// Handle circular thumbnail request
|
||||
if req.shape == Some(ThumbnailShape::Circle) {
|
||||
@@ -761,6 +781,15 @@ async fn generate_video(
|
||||
|
||||
if let Some(name) = filename.file_name() {
|
||||
let filename = name.to_str().expect("Filename should convert to string");
|
||||
// KNOWN ISSUE (multi-library): playlist filename is the basename
|
||||
// alone, so two source files with the same basename — whether in
|
||||
// different libraries or different subdirs of one library —
|
||||
// overwrite each other's playlists while ffmpeg runs. The
|
||||
// hash-keyed `content_hash::hls_dir` is the long-term answer
|
||||
// (see CLAUDE.md "Multi-library data model"); rewiring the
|
||||
// actor pipeline to use it is out of scope for this branch.
|
||||
// The orphan-cleanup job above already walks every library so
|
||||
// it doesn't false-delete archive playlists.
|
||||
let playlist = format!("{}/{}.m3u8", app_state.video_path, filename);
|
||||
|
||||
let library = libraries::resolve_library_param(&app_state, body.library.as_deref())
|
||||
@@ -1315,9 +1344,27 @@ fn create_thumbnails(libs: &[libraries::Library], excluded_dirs: &[String]) {
|
||||
let Ok(relative_path) = src.strip_prefix(&images) else {
|
||||
return;
|
||||
};
|
||||
let thumb_path = Path::new(thumbnail_directory).join(relative_path);
|
||||
// Library-scoped legacy path: prevents two libraries with
|
||||
// the same rel_path from clobbering each other's thumbs.
|
||||
// Hash-keyed promotion happens lazily on first hash-aware
|
||||
// request — keeping this loop ExifDao-free preserves the
|
||||
// current "cargo build && go" startup story.
|
||||
let thumb_path = content_hash::library_scoped_legacy_path(
|
||||
thumbnail_directory,
|
||||
lib.id,
|
||||
relative_path,
|
||||
);
|
||||
let bare_legacy = thumbnail_directory.join(relative_path);
|
||||
|
||||
if thumb_path.exists() || unsupported_thumbnail_sentinel(&thumb_path).exists() {
|
||||
// Backwards-compat check: if a single-library install has a
|
||||
// bare-legacy thumb here already, accept it as present.
|
||||
// Same for the sentinel. Means we don't redo work after
|
||||
// upgrade and we don't leave stale duplicates around.
|
||||
if thumb_path.exists()
|
||||
|| bare_legacy.exists()
|
||||
|| unsupported_thumbnail_sentinel(&thumb_path).exists()
|
||||
|| unsupported_thumbnail_sentinel(&bare_legacy).exists()
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1462,10 +1509,18 @@ fn main() -> std::io::Result<()> {
|
||||
preview_gen_for_watcher,
|
||||
app_state.face_client.clone(),
|
||||
app_state.excluded_dirs.clone(),
|
||||
app_state.library_health.clone(),
|
||||
);
|
||||
|
||||
// Start orphaned playlist cleanup job
|
||||
cleanup_orphaned_playlists(app_state.excluded_dirs.clone());
|
||||
// Start orphaned playlist cleanup job. Multi-library aware: walks
|
||||
// every configured library when looking for the source video, and
|
||||
// skips the whole cycle while any library is stale (a missing
|
||||
// source is indistinguishable from a transiently-unmounted share).
|
||||
cleanup_orphaned_playlists(
|
||||
app_state.libraries.clone(),
|
||||
app_state.excluded_dirs.clone(),
|
||||
app_state.library_health.clone(),
|
||||
);
|
||||
|
||||
// Spawn background job to generate daily conversation summaries
|
||||
{
|
||||
@@ -1657,10 +1712,13 @@ fn run_migrations(
|
||||
}
|
||||
|
||||
/// Clean up orphaned HLS playlists and segments whose source videos no longer exist
|
||||
fn cleanup_orphaned_playlists(excluded_dirs: Vec<String>) {
|
||||
fn cleanup_orphaned_playlists(
|
||||
libs: Vec<libraries::Library>,
|
||||
excluded_dirs: Vec<String>,
|
||||
library_health: libraries::LibraryHealthMap,
|
||||
) {
|
||||
std::thread::spawn(move || {
|
||||
let video_path = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
|
||||
let base_path = dotenv::var("BASE_PATH").expect("BASE_PATH must be set");
|
||||
|
||||
// Get cleanup interval from environment (default: 24 hours)
|
||||
let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS")
|
||||
@@ -1671,10 +1729,41 @@ fn cleanup_orphaned_playlists(excluded_dirs: Vec<String>) {
|
||||
info!("Starting orphaned playlist cleanup job");
|
||||
info!(" Cleanup interval: {} seconds", cleanup_interval_secs);
|
||||
info!(" Playlist directory: {}", video_path);
|
||||
for lib in &libs {
|
||||
info!(" Checking sources under '{}' at {}", lib.name, lib.root_path);
|
||||
}
|
||||
|
||||
loop {
|
||||
std::thread::sleep(Duration::from_secs(cleanup_interval_secs));
|
||||
|
||||
// Safety gate: skip the cleanup cycle if any library is
|
||||
// stale. A missing source video on a stale library is
|
||||
// indistinguishable from a transient unmount, and the
|
||||
// cleanup is destructive — we'd rather leak a few playlist
|
||||
// files for a tick than delete one whose source is briefly
|
||||
// unreachable. The cycle re-runs on the next interval.
|
||||
{
|
||||
let guard = library_health.read().unwrap_or_else(|e| e.into_inner());
|
||||
let stale: Vec<String> = libs
|
||||
.iter()
|
||||
.filter(|lib| {
|
||||
guard
|
||||
.get(&lib.id)
|
||||
.map(|h| !h.is_online())
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.map(|lib| lib.name.clone())
|
||||
.collect();
|
||||
if !stale.is_empty() {
|
||||
warn!(
|
||||
"Skipping orphaned-playlist cleanup: {} library(ies) stale: [{}]",
|
||||
stale.len(),
|
||||
stale.join(", ")
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
info!("Running orphaned playlist cleanup");
|
||||
let start = std::time::Instant::now();
|
||||
let mut deleted_count = 0;
|
||||
@@ -1703,12 +1792,16 @@ fn cleanup_orphaned_playlists(excluded_dirs: Vec<String>) {
|
||||
if let Some(filename) = playlist_path.file_stem() {
|
||||
let video_filename = filename.to_string_lossy();
|
||||
|
||||
// Search for this video file in BASE_PATH, respecting
|
||||
// EXCLUDED_DIRS so we don't false-resurrect playlists for
|
||||
// videos that only exist inside an excluded subtree.
|
||||
// Search for this video file across every configured
|
||||
// library, respecting EXCLUDED_DIRS so we don't
|
||||
// false-resurrect playlists for videos that only
|
||||
// exist inside an excluded subtree. As soon as one
|
||||
// library has a matching source, we're done — the
|
||||
// playlist isn't orphaned.
|
||||
let mut video_exists = false;
|
||||
'libs: for lib in &libs {
|
||||
for entry in image_api::file_scan::walk_library_files(
|
||||
Path::new(&base_path),
|
||||
Path::new(&lib.root_path),
|
||||
&excluded_dirs,
|
||||
) {
|
||||
if let Some(entry_stem) = entry.path().file_stem()
|
||||
@@ -1716,7 +1809,8 @@ fn cleanup_orphaned_playlists(excluded_dirs: Vec<String>) {
|
||||
&& is_video_file(entry.path())
|
||||
{
|
||||
video_exists = true;
|
||||
break;
|
||||
break 'libs;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1792,6 +1886,7 @@ fn watch_files(
|
||||
preview_generator: Addr<video::actors::PreviewClipGenerator>,
|
||||
face_client: crate::ai::face_client::FaceClient,
|
||||
excluded_dirs: Vec<String>,
|
||||
library_health: libraries::LibraryHealthMap,
|
||||
) {
|
||||
std::thread::spawn(move || {
|
||||
// Get polling intervals from environment variables
|
||||
@@ -1850,6 +1945,24 @@ fn watch_files(
|
||||
let mut last_full_scan = SystemTime::now();
|
||||
let mut scan_count = 0u64;
|
||||
|
||||
// Initial availability sweep before the loop's first sleep so
|
||||
// /libraries reports the truth from the very first request,
|
||||
// rather than the optimistic Online default that
|
||||
// new_health_map seeds. Without this, an unmounted share would
|
||||
// appear online for up to WATCH_QUICK_INTERVAL_SECONDS (default
|
||||
// 60s) after boot. Same probe logic as the per-tick gate
|
||||
// below; no ingest runs here, just the health update + log.
|
||||
for lib in &libs {
|
||||
let context = opentelemetry::Context::new();
|
||||
let had_data = exif_dao
|
||||
.lock()
|
||||
.expect("exif_dao poisoned")
|
||||
.count_for_library(&context, lib.id)
|
||||
.map(|n| n > 0)
|
||||
.unwrap_or(false);
|
||||
libraries::refresh_health(&library_health, lib, had_data);
|
||||
}
|
||||
|
||||
loop {
|
||||
std::thread::sleep(Duration::from_secs(quick_interval_secs));
|
||||
|
||||
@@ -1861,6 +1974,31 @@ fn watch_files(
|
||||
let is_full_scan = since_last_full.as_secs() >= full_interval_secs;
|
||||
|
||||
for lib in &libs {
|
||||
// Availability probe: every tick checks that the
|
||||
// library's mount is reachable, is a directory, is
|
||||
// readable, and (if image_exif has rows for it) is
|
||||
// non-empty. A Stale library skips ingest, backlog
|
||||
// drains, and metric refresh — reads/serving in HTTP
|
||||
// handlers continue to work. Branches B/C extend the
|
||||
// probe gate to cover handoff and orphan GC. See
|
||||
// CLAUDE.md "Library availability and safety".
|
||||
let had_data = {
|
||||
let context = opentelemetry::Context::new();
|
||||
let mut guard = exif_dao.lock().expect("exif_dao poisoned");
|
||||
guard
|
||||
.count_for_library(&context, lib.id)
|
||||
.map(|n| n > 0)
|
||||
.unwrap_or(false)
|
||||
};
|
||||
let health = libraries::refresh_health(&library_health, lib, had_data);
|
||||
if !health.is_online() {
|
||||
// Skip every write path for this library this tick.
|
||||
// Don't refresh the media-count gauge either — a
|
||||
// probe-failed library would otherwise flap to 0
|
||||
// image / 0 video and pollute Prometheus.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Drain the unhashed-hash backlog AND the face-detection
|
||||
// backlog every tick, regardless of quick/full. Quick
|
||||
// scans only walk recently-modified files, so the
|
||||
@@ -1992,7 +2130,9 @@ fn process_new_files(
|
||||
|
||||
let existing_exif_paths: HashMap<String, bool> = {
|
||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||
match dao.get_exif_batch(&context, &file_paths) {
|
||||
// Walk is per-library, so scope the lookup so a same-named file
|
||||
// in another library doesn't make this one look already-indexed.
|
||||
match dao.get_exif_batch(&context, Some(library.id), &file_paths) {
|
||||
Ok(exif_records) => exif_records
|
||||
.into_iter()
|
||||
.map(|record| (record.file_path, true))
|
||||
@@ -2012,9 +2152,19 @@ fn process_new_files(
|
||||
// derivative dedup and DB-indexed sort/filter work for every file,
|
||||
// not just photos with parseable EXIF.
|
||||
for (file_path, relative_path) in &files {
|
||||
let thumb_path = thumbnail_directory.join(relative_path);
|
||||
let needs_thumbnail =
|
||||
!thumb_path.exists() && !unsupported_thumbnail_sentinel(&thumb_path).exists();
|
||||
// Check both the library-scoped legacy path (current shape) and
|
||||
// the bare-legacy path (pre-multi-library shape). Either one
|
||||
// existing means a thumbnail is already on disk for this file.
|
||||
let scoped_thumb_path = content_hash::library_scoped_legacy_path(
|
||||
thumbnail_directory,
|
||||
library.id,
|
||||
relative_path,
|
||||
);
|
||||
let bare_legacy_thumb_path = thumbnail_directory.join(relative_path);
|
||||
let needs_thumbnail = !scoped_thumb_path.exists()
|
||||
&& !bare_legacy_thumb_path.exists()
|
||||
&& !unsupported_thumbnail_sentinel(&scoped_thumb_path).exists()
|
||||
&& !unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists();
|
||||
let needs_row = !existing_exif_paths.contains_key(relative_path);
|
||||
|
||||
if needs_thumbnail || needs_row {
|
||||
@@ -2131,7 +2281,7 @@ fn process_new_files(
|
||||
// ensures small/medium deploys self-heal without operator
|
||||
// action.
|
||||
backfill_missing_content_hashes(&context, &files, library, &exif_dao);
|
||||
let candidates = build_face_candidates(&context, &files, &exif_dao, &face_dao);
|
||||
let candidates = build_face_candidates(&context, library, &files, &exif_dao, &face_dao);
|
||||
debug!(
|
||||
"face_watch: scan tick — {} image file(s) walked, {} candidate(s) (library '{}', modified_since={})",
|
||||
files.iter().filter(|(p, _)| !is_video_file(p)).count(),
|
||||
@@ -2449,7 +2599,7 @@ fn backfill_missing_content_hashes(
|
||||
|
||||
let exif_records = {
|
||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||
dao.get_exif_batch(context, &image_paths)
|
||||
dao.get_exif_batch(context, Some(library.id), &image_paths)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
// Cheap lookup back from rel_path → absolute file_path so
|
||||
@@ -2541,6 +2691,7 @@ fn backfill_missing_content_hashes(
|
||||
/// covers both new uploads and the initial backlog scan.
|
||||
fn build_face_candidates(
|
||||
context: &opentelemetry::Context,
|
||||
library: &libraries::Library,
|
||||
files: &[(PathBuf, String)],
|
||||
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
|
||||
face_dao: &Arc<Mutex<Box<dyn faces::FaceDao>>>,
|
||||
@@ -2558,7 +2709,7 @@ fn build_face_candidates(
|
||||
|
||||
let exif_records = {
|
||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||
dao.get_exif_batch(context, &image_paths)
|
||||
dao.get_exif_batch(context, Some(library.id), &image_paths)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
// rel_path → content_hash (only rows with a hash; without one we have
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::database::{
|
||||
connect,
|
||||
};
|
||||
use crate::database::{PreviewDao, SqlitePreviewDao};
|
||||
use crate::libraries::{self, Library};
|
||||
use crate::libraries::{self, Library, LibraryHealthMap};
|
||||
use crate::tags::{SqliteTagDao, TagDao};
|
||||
use crate::video::actors::{
|
||||
PlaylistGenerator, PreviewClipGenerator, StreamActor, VideoPlaylistManager,
|
||||
@@ -26,6 +26,11 @@ pub struct AppState {
|
||||
/// All configured media libraries. Ordered by `id` ascending; the first
|
||||
/// entry is the primary library.
|
||||
pub libraries: Vec<Library>,
|
||||
/// Per-library availability snapshot. Updated by the file watcher at
|
||||
/// the top of each tick via `libraries::refresh_health`. HTTP handlers
|
||||
/// read it (e.g. `/libraries` surfacing). See "Library availability
|
||||
/// and safety" in CLAUDE.md.
|
||||
pub library_health: LibraryHealthMap,
|
||||
/// Legacy shim equal to `libraries[0].root_path`. Phase 2 transitional —
|
||||
/// new code should go through `primary_library()`.
|
||||
pub base_path: String,
|
||||
@@ -105,11 +110,13 @@ impl AppState {
|
||||
preview_dao,
|
||||
);
|
||||
|
||||
let library_health = libraries::new_health_map(&libraries_vec);
|
||||
Self {
|
||||
stream_manager,
|
||||
playlist_manager: Arc::new(video_playlist_manager.start()),
|
||||
preview_clip_generator: Arc::new(preview_clip_generator.start()),
|
||||
libraries: libraries_vec,
|
||||
library_health,
|
||||
base_path,
|
||||
thumbnail_path,
|
||||
video_path,
|
||||
|
||||
@@ -364,9 +364,15 @@ async fn lookup_tags_batch<D: TagDao>(
|
||||
// Stage 1: query → content_hash mapping. Files without a hash yet
|
||||
// (just-indexed, hash compute failed, etc.) skip the sibling
|
||||
// expansion and only get tags from their own rel_path.
|
||||
// Library-agnostic by design: this endpoint takes raw rel_paths from
|
||||
// the client (typically Apollo) with no library context. Span all
|
||||
// libraries and let the hash-keyed sibling expansion below do the
|
||||
// disambiguation. Same-rel_path/different-content collisions across
|
||||
// libraries surface as multiple hashes for one path — fine, we union
|
||||
// every sibling tag set.
|
||||
let exif_records = {
|
||||
let mut dao = exif_dao.lock().expect("Unable to get ExifDao");
|
||||
match dao.get_exif_batch(&span_context, &query_paths) {
|
||||
match dao.get_exif_batch(&span_context, None, &query_paths) {
|
||||
Ok(rows) => rows,
|
||||
Err(e) => {
|
||||
return HttpResponse::InternalServerError()
|
||||
|
||||
Reference in New Issue
Block a user