Merge pull request 'feature/split-main-rs' (#92) from feature/split-main-rs into master

Reviewed-on: #92
This commit was merged in pull request #92.
This commit is contained in:
2026-05-12 17:02:06 +00:00
14 changed files with 3804 additions and 3254 deletions

View File

@@ -76,7 +76,10 @@ cargo run --bin cleanup_files -- --base-path /path/to/media --database-url ./dat
### Core Components
**Layered Architecture:**
- **HTTP Layer** (`main.rs`): Route handlers for images, videos, metadata, tags, favorites, memories
- **Startup wiring** (`main.rs`): only ~350 lines — env load, migrations, AppState, route registration, server bind. Background jobs are kicked off here but defined elsewhere.
- **HTTP Layer** (`handlers/{image,video,favorites}.rs`, `files.rs`, `tags.rs`, `faces.rs`, `memories.rs`, `ai/handlers.rs`): the route handlers, grouped by domain.
- **Background loops** (`watcher.rs`): the file-watcher tick (`watch_files`, `process_new_files`) and the orphaned-playlist cleanup (`cleanup_orphaned_playlists`). Per-tick drains are factored into `backfill.rs` (`backfill_unhashed_backlog`, `backfill_missing_date_taken`, `backfill_missing_content_hashes`, `process_face_backlog`, `build_face_candidates`).
- **Thumbnails** (`thumbnails.rs`): generation pipeline + the `IMAGE_GAUGE` / `VIDEO_GAUGE` Prometheus metrics.
- **Auth Layer** (`auth.rs`): JWT token validation, Claims extraction via FromRequest trait
- **Service Layer** (`files.rs`, `exif.rs`, `memories.rs`): Business logic for file operations and EXIF extraction
- **DAO Layer** (`database/mod.rs`): Trait-based data access (ExifDao, UserDao, FavoriteDao, TagDao)
@@ -392,8 +395,8 @@ under 2021, not 2014 — on the theory that EXIF is more reliable than
import-named filenames. The reverse case (no EXIF, filename has a
date) is unchanged.
The `backfill_missing_date_taken` drain (`src/main.rs`) runs every
watcher tick alongside `backfill_unhashed_backlog`. It loads up to
The `backfill_missing_date_taken` drain (`src/backfill.rs`) runs every
watcher tick alongside `backfill_unhashed_backlog` (also `src/backfill.rs`). It loads up to
`DATE_BACKFILL_MAX_PER_TICK` rows (default 500) where
`date_taken IS NULL OR date_taken_source = 'fs_time'` (backed by the
`idx_image_exif_date_backfill` partial index), runs the waterfall
@@ -504,9 +507,9 @@ ImageApi owns the face data; Apollo (sibling repo) hosts the insightface inferen
**Why content_hash and not (library_id, rel_path):** ties face data to the bytes, not the path. A backup mount that copies files from the primary library naturally inherits the existing detections without re-running inference. This is the reference implementation of the multi-library data model — see "Multi-library data model" above.
**File-watch hook** (`src/main.rs::process_new_files`): for each photo with a populated `content_hash`, check `FaceDao::already_scanned(hash)`; if not, send bytes (or embedded JPEG preview for RAW via `exif::extract_embedded_jpeg_preview`) to Apollo's `/api/internal/faces/detect`. K=`FACE_DETECT_CONCURRENCY` (default 8) parallel calls per scan tick; Apollo serializes them via its single-worker GPU pool. `face_watch.rs` is the Tokio orchestration layer.
**File-watch hook** (`src/watcher.rs::process_new_files`): for each photo with a populated `content_hash`, check `FaceDao::already_scanned(hash)`; if not, send bytes (or embedded JPEG preview for RAW via `exif::extract_embedded_jpeg_preview`) to Apollo's `/api/internal/faces/detect`. K=`FACE_DETECT_CONCURRENCY` (default 8) parallel calls per scan tick; Apollo serializes them via its single-worker GPU pool. `face_watch.rs` is the Tokio orchestration layer.
**Per-tick backlog drain** (also `src/main.rs`): two passes that run on every watcher tick regardless of quick-vs-full scan:
**Per-tick backlog drain** (`src/backfill.rs`): two passes that run on every watcher tick regardless of quick-vs-full scan:
- `backfill_unhashed_backlog` — populates `image_exif.content_hash` for photos that arrived before the hash field was retroactive. Capped by `FACE_HASH_BACKFILL_MAX_PER_TICK` (default 2000); errors don't burn the cap.
- `process_face_backlog` — runs detection on photos that have a hash but no `face_detections` row. Capped by `FACE_BACKLOG_MAX_PER_TICK` (default 64). Selected via a SQL anti-join (`FaceDao::list_unscanned_candidates`); videos and EXCLUDED_DIRS paths filtered out client-side via `face_watch::filter_excluded` so they never reach Apollo.
@@ -521,6 +524,8 @@ ImageApi owns the face data; Apollo (sibling repo) hosts the insightface inferen
Module map:
- `src/faces.rs``FaceDao` trait + `SqliteFaceDao` impl, route handlers for `/faces/*`, `/image/faces/*`, `/persons/*`. Mirror of `tags.rs` layout.
- `src/face_watch.rs` — Tokio orchestration for the file-watch detect pass; `filter_excluded` (PathExcluder + image-extension filter), `read_image_bytes_for_detect` (RAW preview fallback).
- `src/backfill.rs` — per-tick drains (unhashed-hash, date_taken, face-backlog, etc.) called from `watcher::watch_files` and `watcher::process_new_files`.
- `src/watcher.rs` — the watcher loop itself and `process_new_files` (file walk → EXIF write → face-candidate build).
- `src/ai/face_client.rs` — HTTP client for Apollo's inference. Configured by `APOLLO_FACE_API_BASE_URL`, falls back to `APOLLO_API_BASE_URL`. Both unset → feature disabled, file-watch hook is a no-op.
- `migrations/2026-04-29-000000_add_faces/` — schema.

721
src/backfill.rs Normal file
View File

@@ -0,0 +1,721 @@
//! Per-tick drains the watcher runs alongside ingest.
//!
//! These passes were previously inlined in `main.rs`; they exist because
//! a quick scan only walks recently-modified files, so any backlog of
//! rows missing a `content_hash` / `date_taken` / face detection
//! wouldn't otherwise drain except during the once-an-hour full scan.
//! Each function is bounded per call by a `*_PER_TICK` env-var cap.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use log::{debug, info, warn};
use crate::content_hash;
use crate::database::ExifDao;
use crate::date_resolver;
use crate::face_watch;
use crate::faces;
use crate::file_types;
use crate::libraries;
use crate::tags;
/// Compute and persist content_hash for image_exif rows where it's NULL.
///
/// Bounded per call by `FACE_HASH_BACKFILL_MAX_PER_TICK` (default 2000)
/// so a watcher tick on a large legacy library doesn't block for hours
/// blake3-ing every photo at once. Subsequent scans pick up the rest.
/// For 50k+ libraries the dedicated `cargo run --bin backfill_hashes`
/// is still faster (it doesn't fight a watcher loop for the DAO mutex).
///
/// Drains unhashed image_exif rows by querying them directly, independent
/// of the filesystem walk. Quick scans only walk recently-modified files,
/// so a backlog of pre-existing unhashed rows never enters
/// `process_new_files`'s candidate set — left alone, it would only drain
/// on full scans (default once an hour). Calling this every tick keeps
/// the face-detection backlog moving regardless.
///
/// Returns the number of rows successfully backfilled this pass.
pub fn backfill_unhashed_backlog(
context: &opentelemetry::Context,
library: &libraries::Library,
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
) -> usize {
let cap: i64 = dotenv::var("FACE_HASH_BACKFILL_MAX_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &i64| *n > 0)
.unwrap_or(2000);
// Fetch up to cap+1 rows so we can tell "more remain" without a
// separate count query. Across libraries — there's no per-library
// filter on get_rows_missing_hash today — but we only ever update
// rows whose library_id matches the caller's library, so other
// libraries' rows just get skipped here and picked up on the next
// library's tick. Negligible cost given the cap.
let rows: Vec<(i32, String)> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_rows_missing_hash(context, cap + 1)
.unwrap_or_default()
};
if rows.is_empty() {
return 0;
}
let more_than_cap = rows.len() as i64 > cap;
let base_path = std::path::Path::new(&library.root_path);
let mut backfilled = 0usize;
let mut errors = 0usize;
let mut skipped_other_lib = 0usize;
for (lib_id, rel_path) in rows.iter().take(cap as usize) {
if *lib_id != library.id {
skipped_other_lib += 1;
continue;
}
let abs = base_path.join(rel_path);
if !abs.exists() {
// File walked away — the watcher's reconciliation pass will
// remove the orphan exif row eventually.
continue;
}
match content_hash::compute(&abs) {
Ok(id) => {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
if let Err(e) = dao.backfill_content_hash(
context,
library.id,
rel_path,
&id.content_hash,
id.size_bytes,
) {
warn!(
"face_watch: backfill_content_hash failed for {}: {:?}",
rel_path, e
);
errors += 1;
} else {
backfilled += 1;
}
}
Err(e) => {
debug!(
"face_watch: hash compute failed for {} ({:?})",
abs.display(),
e
);
errors += 1;
}
}
}
if backfilled > 0 || errors > 0 || more_than_cap {
info!(
"face_watch: backfill pass for library '{}': hashed {} ({} error(s), {} skipped to other libraries; {} cap, more_remain={})",
library.name, backfilled, errors, skipped_other_lib, cap, more_than_cap
);
}
backfilled
}
/// Drain image_exif rows whose `date_taken` was never resolved or was
/// resolved by the weakest fallback (`fs_time`). Runs the canonical-date
/// waterfall — exiftool batch (one subprocess for the whole tick's
/// rows) → filename regex → earliest_fs_time — and persists each
/// resolution with its source tag. Capped per tick by
/// `DATE_BACKFILL_MAX_PER_TICK` (default 500) so a 14k-row library
/// drains over a few quick-scan ticks without blocking the watcher.
///
/// kamadak-exif is intentionally skipped here: the row already has a
/// NULL date_taken because the ingest path's kamadak-exif call returned
/// nothing, and re-running it would just produce the same answer.
/// exiftool is the meaningful new attempt — it handles videos and
/// MakerNote-hosted dates kamadak can't reach.
pub fn backfill_missing_date_taken(
context: &opentelemetry::Context,
library: &libraries::Library,
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
) -> usize {
let cap: i64 = dotenv::var("DATE_BACKFILL_MAX_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &i64| *n > 0)
.unwrap_or(500);
let rows: Vec<(i32, String)> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_rows_needing_date_backfill(context, library.id, cap + 1)
.unwrap_or_default()
};
if rows.is_empty() {
return 0;
}
let more_than_cap = rows.len() as i64 > cap;
let base_path = std::path::Path::new(&library.root_path);
// Build absolute paths and drop rows whose files no longer exist —
// the missing-file scan in library_maintenance retires deleted rows
// separately. Without this filter, NULL-date rows for missing files
// would loop through the drain forever (no source can resolve them).
let mut existing: Vec<(String, PathBuf)> = Vec::with_capacity(rows.len());
for (_, rel_path) in rows.iter().take(cap as usize) {
let abs = base_path.join(rel_path);
if abs.exists() {
existing.push((rel_path.clone(), abs));
}
}
if existing.is_empty() {
return 0;
}
// One exiftool subprocess for the whole batch; the resolver falls
// through to filename / fs_time per file when exiftool can't supply
// a date (or isn't installed at all).
let paths: Vec<PathBuf> = existing.iter().map(|(_, p)| p.clone()).collect();
let resolved = date_resolver::resolve_dates_batch(&paths, &HashMap::new());
let mut backfilled = 0usize;
let mut unresolved = 0usize;
let mut by_source: HashMap<&'static str, usize> = HashMap::new();
{
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
for (rel_path, abs) in &existing {
let Some(rd) = resolved.get(abs).copied() else {
unresolved += 1;
continue;
};
match dao.backfill_date_taken(
context,
library.id,
rel_path,
rd.timestamp,
rd.source.as_str(),
) {
Ok(()) => {
backfilled += 1;
*by_source.entry(rd.source.as_str()).or_insert(0) += 1;
}
Err(e) => {
warn!(
"date_backfill: update failed for lib {} {}: {:?}",
library.id, rel_path, e
);
}
}
}
}
if backfilled > 0 || unresolved > 0 || more_than_cap {
info!(
"date_backfill: library '{}': resolved {} ({:?}), {} unresolved, cap={}, more_remain={}",
library.name, backfilled, by_source, unresolved, cap, more_than_cap
);
}
backfilled
}
/// Per-tick face-detection drain. Pulls a capped batch of hashed-but-
/// unscanned image_exif rows directly via the FaceDao anti-join and
/// hands them to the existing detection pass. Runs on every tick (not
/// just full scans) so the backlog moves at quick-scan cadence.
pub fn process_face_backlog(
context: &opentelemetry::Context,
library: &libraries::Library,
face_client: &crate::ai::face_client::FaceClient,
face_dao: &Arc<Mutex<Box<dyn faces::FaceDao>>>,
tag_dao: &Arc<Mutex<Box<dyn tags::TagDao>>>,
excluded_dirs: &[String],
) {
let cap: i64 = dotenv::var("FACE_BACKLOG_MAX_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &i64| *n > 0)
.unwrap_or(64);
let rows: Vec<(String, String)> = {
let mut dao = face_dao.lock().expect("face dao");
match dao.list_unscanned_candidates(context, library.id, cap) {
Ok(r) => r,
Err(e) => {
warn!(
"face_watch: list_unscanned_candidates failed for library '{}': {:?}",
library.name, e
);
return;
}
}
};
if rows.is_empty() {
return;
}
info!(
"face_watch: backlog drain — running detection on {} candidate(s) for library '{}' (cap={})",
rows.len(),
library.name,
cap
);
let candidates: Vec<face_watch::FaceCandidate> = rows
.into_iter()
.map(|(rel_path, content_hash)| face_watch::FaceCandidate {
rel_path,
content_hash,
})
.collect();
face_watch::run_face_detection_pass(
library,
excluded_dirs,
face_client,
Arc::clone(face_dao),
Arc::clone(tag_dao),
candidates,
);
}
/// Compute content_hash for any image rows the walker just touched
/// whose stored EXIF row is still hash-less. Called from
/// `process_new_files` so freshly-ingested files don't have to wait for
/// the next standalone `backfill_unhashed_backlog` tick before face
/// detection can key on their bytes.
///
/// Cap is on **successes only**. An earlier version counted errors too,
/// so a pocket of chronically-unhashable files at the front of the
/// table (vanished mid-scan, permission denied, etc.) burned the budget
/// every tick and the rest of the backlog never advanced.
pub fn backfill_missing_content_hashes(
context: &opentelemetry::Context,
files: &[(PathBuf, String)],
library: &libraries::Library,
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
) {
let image_paths: Vec<String> = files
.iter()
.filter(|(p, _)| !file_types::is_video_file(p))
.map(|(_, rel)| rel.clone())
.collect();
if image_paths.is_empty() {
return;
}
let exif_records = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_exif_batch(context, Some(library.id), &image_paths)
.unwrap_or_default()
};
// Cheap lookup back from rel_path → absolute file_path so
// content_hash::compute can read the bytes.
let path_by_rel: HashMap<String, &PathBuf> =
files.iter().map(|(p, rel)| (rel.clone(), p)).collect();
let cap: usize = dotenv::var("FACE_HASH_BACKFILL_MAX_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &usize| *n > 0)
.unwrap_or(2000);
// Count the unhashed backlog up front so we can surface "still needs
// backfill: N" in the log — without it, a face-scan that's stuck at
// 44% looks stalled when really it's chipping through hashes.
let unhashed_total = exif_records
.iter()
.filter(|r| r.content_hash.is_none())
.count();
let mut backfilled = 0usize;
let mut errors = 0usize;
for record in &exif_records {
if backfilled >= cap {
break;
}
if record.content_hash.is_some() {
continue;
}
let Some(file_path) = path_by_rel.get(&record.file_path) else {
// Walked file went missing between the directory scan and now;
// next tick will retry naturally.
continue;
};
match content_hash::compute(file_path) {
Ok(id) => {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
if let Err(e) = dao.backfill_content_hash(
context,
library.id,
&record.file_path,
&id.content_hash,
id.size_bytes,
) {
warn!(
"face_watch: backfill_content_hash failed for {}: {:?}",
record.file_path, e
);
errors += 1;
} else {
backfilled += 1;
}
}
Err(e) => {
debug!(
"face_watch: hash compute failed for {} ({:?})",
file_path.display(),
e
);
errors += 1;
}
}
}
// Always log when there's an unhashed backlog so an operator
// looking at "scan stuck at 44%" can see backfill is running and
// how much remains. Quiet only when there's nothing to do.
if unhashed_total > 0 || backfilled > 0 || errors > 0 {
let remaining = unhashed_total.saturating_sub(backfilled);
info!(
"face_watch: backfilled {}/{} content_hash for library '{}' ({} error(s); {} still need backfill; cap={})",
backfilled, unhashed_total, library.name, errors, remaining, cap
);
}
}
/// Build the face-detection candidate list for a scan tick.
///
/// Returns `(rel_path, content_hash)` for every image file that has a
/// content_hash recorded in image_exif but no row in face_detections
/// yet. Re-querying image_exif here picks up rows the EXIF write loop
/// just inserted alongside any pre-existing rows the watcher walked
/// over — covers both new uploads and the initial backlog scan.
pub fn build_face_candidates(
context: &opentelemetry::Context,
library: &libraries::Library,
files: &[(PathBuf, String)],
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
face_dao: &Arc<Mutex<Box<dyn faces::FaceDao>>>,
) -> Vec<face_watch::FaceCandidate> {
// Restrict to image files; videos aren't face-scanned in v1 (kamadak
// doesn't even register them in image_exif).
let image_paths: Vec<String> = files
.iter()
.filter(|(p, _)| !file_types::is_video_file(p))
.map(|(_, rel)| rel.clone())
.collect();
if image_paths.is_empty() {
return Vec::new();
}
let exif_records = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_exif_batch(context, Some(library.id), &image_paths)
.unwrap_or_default()
};
// rel_path → content_hash (only rows with a hash; without one we have
// nothing to key face data against).
let mut hash_by_path: HashMap<String, String> = HashMap::with_capacity(exif_records.len());
for record in exif_records {
if let Some(h) = record.content_hash {
hash_by_path.insert(record.file_path, h);
}
}
let mut candidates = Vec::new();
let mut dao = face_dao.lock().expect("face dao");
for rel_path in image_paths {
let Some(hash) = hash_by_path.get(&rel_path) else {
continue;
};
match dao.already_scanned(context, hash) {
Ok(true) => continue,
Ok(false) => candidates.push(face_watch::FaceCandidate {
rel_path,
content_hash: hash.clone(),
}),
Err(e) => {
warn!("face_watch: already_scanned errored for {}: {:?}", hash, e);
}
}
}
candidates
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::sync::{Arc, Mutex};
use diesel::prelude::*;
use tempfile::TempDir;
use crate::database::models::{InsertImageExif, InsertLibrary};
use crate::database::test::in_memory_db_connection;
use crate::database::{ExifDao, SqliteExifDao, schema};
use crate::faces::{FaceDao, SqliteFaceDao};
use crate::libraries::Library;
fn ctx() -> opentelemetry::Context {
opentelemetry::Context::new()
}
/// Build a tempdir-backed library + DAOs sharing a single in-memory
/// SQLite connection (so cross-table joins like
/// `list_unscanned_candidates` see consistent state).
fn setup() -> (
TempDir,
Library,
Arc<Mutex<diesel::SqliteConnection>>,
Arc<Mutex<Box<dyn ExifDao>>>,
Arc<Mutex<Box<dyn FaceDao>>>,
) {
let tmp = TempDir::new().expect("tempdir");
let mut conn = in_memory_db_connection();
// Migration seeds library id=1 with a placeholder root; rewrite it
// to point at the tempdir so `<root>/<rel_path>` resolves to real
// files this test creates.
diesel::update(schema::libraries::table.filter(schema::libraries::id.eq(1)))
.set(schema::libraries::root_path.eq(tmp.path().to_string_lossy().to_string()))
.execute(&mut conn)
.expect("rewrite library 1 root");
// Add a second library so cross-library skip cases have somewhere
// to put their rows.
diesel::insert_into(schema::libraries::table)
.values(InsertLibrary {
name: "other",
root_path: "/tmp/other-test-lib",
created_at: 0,
enabled: true,
excluded_dirs: None,
})
.execute(&mut conn)
.expect("seed second library");
let library = Library {
id: 1,
name: "main".to_string(),
root_path: tmp.path().to_string_lossy().to_string(),
enabled: true,
excluded_dirs: Vec::new(),
};
let shared = Arc::new(Mutex::new(conn));
let exif_dao: Arc<Mutex<Box<dyn ExifDao>>> = Arc::new(Mutex::new(Box::new(
SqliteExifDao::from_shared(Arc::clone(&shared)),
)));
let face_dao: Arc<Mutex<Box<dyn FaceDao>>> = Arc::new(Mutex::new(Box::new(
SqliteFaceDao::from_connection(Arc::clone(&shared)),
)));
(tmp, library, shared, exif_dao, face_dao)
}
fn insert_exif(
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
lib_id: i32,
rel: &str,
content_hash: Option<&str>,
) {
let mut dao = exif_dao.lock().unwrap();
dao.store_exif(
&ctx(),
InsertImageExif {
library_id: lib_id,
file_path: rel.to_string(),
camera_make: None,
camera_model: None,
lens_model: None,
width: None,
height: None,
orientation: None,
gps_latitude: None,
gps_longitude: None,
gps_altitude: None,
focal_length: None,
aperture: None,
shutter_speed: None,
iso: None,
date_taken: None,
created_time: 0,
last_modified: 0,
content_hash: content_hash.map(|s| s.to_string()),
size_bytes: None,
phash_64: None,
dhash_64: None,
date_taken_source: None,
},
)
.expect("insert");
}
fn write_image(root: &std::path::Path, rel: &str, bytes: &[u8]) {
let abs = root.join(rel);
if let Some(parent) = abs.parent() {
fs::create_dir_all(parent).expect("mkdir");
}
fs::write(abs, bytes).expect("write file");
}
#[test]
fn backfill_unhashed_backlog_hashes_missing_rows_in_this_library() {
let (tmp, library, _conn, exif_dao, _face_dao) = setup();
write_image(tmp.path(), "a.jpg", b"alpha-bytes");
write_image(tmp.path(), "b.jpg", b"bravo-bytes");
insert_exif(&exif_dao, 1, "a.jpg", None);
insert_exif(&exif_dao, 1, "b.jpg", None);
let backfilled = backfill_unhashed_backlog(&ctx(), &library, &exif_dao);
assert_eq!(backfilled, 2);
let mut dao = exif_dao.lock().unwrap();
let rows = dao
.get_exif_batch(&ctx(), Some(1), &["a.jpg".to_string(), "b.jpg".to_string()])
.unwrap();
assert_eq!(rows.len(), 2);
for r in rows {
assert!(
r.content_hash.is_some(),
"row {} should have a hash",
r.file_path
);
}
}
#[test]
fn backfill_unhashed_backlog_skips_other_libraries_and_missing_files() {
let (tmp, library, _conn, exif_dao, _face_dao) = setup();
write_image(tmp.path(), "exists.jpg", b"hello");
// Row for this library whose file is missing on disk:
insert_exif(&exif_dao, 1, "ghost.jpg", None);
insert_exif(&exif_dao, 1, "exists.jpg", None);
// Row in the other library — must be skipped (different lib_id).
insert_exif(&exif_dao, 2, "other.jpg", None);
let backfilled = backfill_unhashed_backlog(&ctx(), &library, &exif_dao);
assert_eq!(backfilled, 1, "only the existing in-library file hashes");
let mut dao = exif_dao.lock().unwrap();
let other = dao
.get_exif_batch(&ctx(), Some(2), &["other.jpg".to_string()])
.unwrap();
assert_eq!(other.len(), 1);
assert!(
other[0].content_hash.is_none(),
"other-library row must remain unhashed"
);
let ghost = dao
.get_exif_batch(&ctx(), Some(1), &["ghost.jpg".to_string()])
.unwrap();
assert_eq!(ghost.len(), 1);
assert!(
ghost[0].content_hash.is_none(),
"missing-on-disk row stays unhashed (reconciliation removes it later)"
);
}
#[test]
fn backfill_unhashed_backlog_respects_per_tick_cap() {
// Env-var-driven cap; the function reads it on every call, so we
// can set it just for this test and unset before returning.
// Serial guard: tests in the same binary may share env, but each
// backfill call re-reads — and we only care that the cap shape
// (success count <= cap, more_remain logged) holds.
unsafe {
std::env::set_var("FACE_HASH_BACKFILL_MAX_PER_TICK", "2");
}
let (tmp, library, _conn, exif_dao, _face_dao) = setup();
for i in 0..5 {
let rel = format!("img_{}.jpg", i);
write_image(tmp.path(), &rel, format!("bytes-{}", i).as_bytes());
insert_exif(&exif_dao, 1, &rel, None);
}
let backfilled = backfill_unhashed_backlog(&ctx(), &library, &exif_dao);
assert_eq!(backfilled, 2, "cap=2 must bound the per-tick successes");
unsafe {
std::env::remove_var("FACE_HASH_BACKFILL_MAX_PER_TICK");
}
}
#[test]
fn backfill_missing_content_hashes_skips_videos_and_hashed_rows() {
let (tmp, library, _conn, exif_dao, _face_dao) = setup();
// Two image rows (one already hashed, one not), one video.
write_image(tmp.path(), "fresh.jpg", b"fresh-pixels");
write_image(tmp.path(), "already.jpg", b"already-pixels");
write_image(tmp.path(), "clip.mp4", b"video-bytes");
insert_exif(&exif_dao, 1, "fresh.jpg", None);
insert_exif(&exif_dao, 1, "already.jpg", Some("pre-existing-hash"));
insert_exif(&exif_dao, 1, "clip.mp4", None);
let files: Vec<(PathBuf, String)> = vec![
(tmp.path().join("fresh.jpg"), "fresh.jpg".to_string()),
(tmp.path().join("already.jpg"), "already.jpg".to_string()),
(tmp.path().join("clip.mp4"), "clip.mp4".to_string()),
];
backfill_missing_content_hashes(&ctx(), &files, &library, &exif_dao);
let mut dao = exif_dao.lock().unwrap();
let rows = dao
.get_exif_batch(
&ctx(),
Some(1),
&[
"fresh.jpg".to_string(),
"already.jpg".to_string(),
"clip.mp4".to_string(),
],
)
.unwrap();
let by_path: HashMap<String, Option<String>> = rows
.into_iter()
.map(|r| (r.file_path, r.content_hash))
.collect();
assert!(
by_path["fresh.jpg"].is_some(),
"fresh image must get a hash"
);
assert_eq!(
by_path["already.jpg"].as_deref(),
Some("pre-existing-hash"),
"already-hashed image left untouched"
);
assert!(
by_path["clip.mp4"].is_none(),
"video skipped (not face-scanned, no hash needed via this path)"
);
}
#[test]
fn build_face_candidates_filters_videos_unhashed_and_already_scanned() {
let (tmp, library, _conn, exif_dao, face_dao) = setup();
// Seed image_exif with: hashed unscanned, hashed scanned, unhashed,
// and a video. Files don't need to exist on disk — the function
// doesn't read them, only the DB rows.
insert_exif(&exif_dao, 1, "fresh.jpg", Some("hash-fresh"));
insert_exif(&exif_dao, 1, "scanned.jpg", Some("hash-scanned"));
insert_exif(&exif_dao, 1, "unhashed.jpg", None);
insert_exif(&exif_dao, 1, "clip.mp4", Some("hash-video"));
// Mark `scanned.jpg`'s hash as already detected.
{
let mut dao = face_dao.lock().unwrap();
dao.mark_status(&ctx(), 1, "hash-scanned", "scanned.jpg", "no_faces", "test")
.expect("mark scanned");
}
let files: Vec<(PathBuf, String)> = vec![
(tmp.path().join("fresh.jpg"), "fresh.jpg".to_string()),
(tmp.path().join("scanned.jpg"), "scanned.jpg".to_string()),
(tmp.path().join("unhashed.jpg"), "unhashed.jpg".to_string()),
(tmp.path().join("clip.mp4"), "clip.mp4".to_string()),
];
let candidates = build_face_candidates(&ctx(), &library, &files, &exif_dao, &face_dao);
assert_eq!(
candidates.len(),
1,
"exactly fresh.jpg should be a candidate"
);
assert_eq!(candidates[0].rel_path, "fresh.jpg");
assert_eq!(candidates[0].content_hash, "hash-fresh");
}
}

View File

@@ -797,6 +797,15 @@ impl SqliteExifDao {
connection: Arc::new(Mutex::new(conn)),
}
}
/// Test-only constructor that shares an already-wrapped connection.
/// Required when another DAO (e.g. `SqliteFaceDao`) needs to read
/// rows this DAO writes, so cross-table joins resolve against the
/// same in-memory SQLite instance.
#[cfg(test)]
pub fn from_shared(connection: Arc<Mutex<SqliteConnection>>) -> Self {
SqliteExifDao { connection }
}
}
impl ExifDao for SqliteExifDao {

View File

@@ -10,6 +10,7 @@ use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::time::SystemTime;
use crate::AppState;
use crate::data::{
Claims, ExifBatchRequest, ExifBatchResponse, ExifSummary, FilesRequest, FilterMode, MediaType,
PhotosResponse, SortType,
@@ -18,8 +19,8 @@ use crate::database::ExifDao;
use crate::file_types;
use crate::geo::{gps_bounding_box, haversine_distance};
use crate::memories::extract_date_from_filename;
use crate::thumbnails::create_thumbnails;
use crate::utils::earliest_fs_time;
use crate::{AppState, create_thumbnails};
use actix_web::web::Data;
use actix_web::{
HttpRequest, HttpResponse,

128
src/handlers/favorites.rs Normal file
View File

@@ -0,0 +1,128 @@
//! User-favorites endpoints. Favorites are keyed on `(user_id, rel_path)`
//! and shared across libraries — a favorite created in lib1 is visible
//! under lib2 if the same rel_path resolves there too.
use std::sync::Mutex;
use actix_web::{
HttpRequest, HttpResponse, Responder, delete, get, put,
web::{self, Data},
};
use log::{error, info, warn};
use opentelemetry::trace::{Span, Status, Tracer};
use crate::data::{AddFavoriteRequest, Claims, PhotosResponse};
use crate::database::{DbError, DbErrorKind, FavoriteDao};
use crate::otel::{extract_context_from_request, global_tracer};
#[get("image/favorites")]
pub async fn favorites(
claims: Claims,
request: HttpRequest,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get favorites", &context);
match web::block(move || {
favorites_dao
.lock()
.expect("Unable to get FavoritesDao")
.get_favorites(claims.sub.parse::<i32>().unwrap())
})
.await
{
Ok(Ok(favorites)) => {
let favorites = favorites
.into_iter()
.map(|favorite| favorite.path)
.collect::<Vec<String>>();
span.set_status(Status::Ok);
// Favorites are library-agnostic (shared by rel_path), so we
// intentionally leave photo_libraries empty to signal "no badge".
HttpResponse::Ok().json(PhotosResponse {
photos: favorites,
dirs: Vec::new(),
photo_libraries: Vec::new(),
total_count: None,
has_more: None,
next_offset: None,
})
}
Ok(Err(e)) => {
span.set_status(Status::error(format!("Error getting favorites: {:?}", e)));
error!("Error getting favorites: {:?}", e);
HttpResponse::InternalServerError().finish()
}
Err(_) => HttpResponse::InternalServerError().finish(),
}
}
#[put("image/favorites")]
pub async fn put_add_favorite(
claims: Claims,
body: web::Json<AddFavoriteRequest>,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
) -> impl Responder {
if let Ok(user_id) = claims.sub.parse::<i32>() {
let path = body.path.clone();
match web::block::<_, Result<usize, DbError>>(move || {
favorites_dao
.lock()
.expect("Unable to get FavoritesDao")
.add_favorite(user_id, &path)
})
.await
{
Ok(Err(e)) if e.kind == DbErrorKind::AlreadyExists => {
warn!("Favorite: {} exists for user: {}", &body.path, user_id);
HttpResponse::Ok()
}
Ok(Err(e)) => {
error!("{:?} {}. for user: {}", e, body.path, user_id);
HttpResponse::BadRequest()
}
Ok(Ok(_)) => {
info!("Adding favorite \"{}\" for userid: {}", body.path, user_id);
HttpResponse::Created()
}
Err(e) => {
error!("Blocking error while inserting favorite: {:?}", e);
HttpResponse::InternalServerError()
}
}
} else {
error!("Unable to parse sub as i32: {}", claims.sub);
HttpResponse::BadRequest()
}
}
#[delete("image/favorites")]
pub async fn delete_favorite(
claims: Claims,
body: web::Query<AddFavoriteRequest>,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
) -> impl Responder {
if let Ok(user_id) = claims.sub.parse::<i32>() {
let path = body.path.clone();
web::block(move || {
favorites_dao
.lock()
.expect("Unable to get favorites dao")
.remove_favorite(user_id, path);
})
.await
.unwrap();
info!(
"Removing favorite \"{}\" for userid: {}",
body.path, user_id
);
HttpResponse::Ok()
} else {
error!("Unable to parse sub as i32: {}", claims.sub);
HttpResponse::BadRequest()
}
}

999
src/handlers/image.rs Normal file
View File

@@ -0,0 +1,999 @@
//! `/image*` endpoints: image serving (with hash/library-scoped/bare
//! legacy thumbnail lookup), upload, EXIF metadata read + GPS / date
//! mutation, and the full exiftool dump used by Apollo's details modal.
use std::error::Error;
use std::fs::File;
use std::io::ErrorKind;
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use actix_files::NamedFile;
use actix_multipart as mp;
use actix_web::{
HttpRequest, HttpResponse, Responder, get, post,
web::{self, BufMut, BytesMut, Data},
};
use chrono::Utc;
use futures::stream::StreamExt;
use log::{debug, error, info, trace, warn};
use opentelemetry::KeyValue;
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
use urlencoding::decode;
use crate::content_hash;
use crate::data::{
Claims, MetadataResponse, PhotoSize, ThumbnailFormat, ThumbnailRequest, ThumbnailShape,
};
use crate::database::models::{ImageExif, InsertImageExif};
use crate::database::{DbErrorKind, ExifDao};
use crate::date_resolver;
use crate::exif;
use crate::file_types;
use crate::files::{RefreshThumbnailsMessage, is_image_or_video, is_valid_full_path};
use crate::libraries;
use crate::memories;
use crate::otel::{extract_context_from_request, global_tracer};
use crate::perceptual_hash;
use crate::state::AppState;
#[get("/image")]
pub async fn get_image(
_claims: Claims,
request: HttpRequest,
req: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
exif_dao: Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_image", &context);
// Resolve library from query param; default to primary so clients that
// don't yet send `library=` continue to work.
let library = match libraries::resolve_library_param(&app_state, req.library.as_deref()) {
Ok(Some(lib)) => lib,
Ok(None) => app_state.primary_library(),
Err(msg) => {
span.set_status(Status::error(msg.clone()));
return HttpResponse::BadRequest().body(msg);
}
};
// Union-mode search returns flat rel_paths with no library attribution,
// so clients may request a file under the wrong library. Try the
// resolved library first; if the file isn't there, fall back to any
// other library holding that rel_path on disk.
let resolved = is_valid_full_path(&library.root_path, &req.path, false)
.filter(|p| p.exists())
.map(|p| (library, p))
.or_else(|| {
app_state.libraries.iter().find_map(|lib| {
if lib.id == library.id {
return None;
}
is_valid_full_path(&lib.root_path, &req.path, false)
.filter(|p| p.exists())
.map(|p| (lib, p))
})
});
if let Some((library, path)) = resolved {
let image_size = req.size.unwrap_or(PhotoSize::Full);
if image_size == PhotoSize::Thumb {
let relative_path = path
.strip_prefix(&library.root_path)
.expect("Error stripping library root prefix from thumbnail");
let relative_path_str = relative_path.to_string_lossy().replace('\\', "/");
let thumbs = &app_state.thumbnail_path;
let bare_legacy_thumb_path = Path::new(&thumbs).join(relative_path);
let scoped_legacy_thumb_path = content_hash::library_scoped_legacy_path(
Path::new(&thumbs),
library.id,
relative_path,
);
// Gif thumbnails are a separate lookup (video GIF previews).
// Dual-lookup for gif is out of scope; preserve existing flow.
if req.format == Some(ThumbnailFormat::Gif) && file_types::is_video_file(&path) {
let mut gif_path = Path::new(&app_state.gif_path).join(relative_path);
gif_path.set_extension("gif");
trace!("Gif thumbnail path: {:?}", gif_path);
if let Ok(file) = NamedFile::open(&gif_path) {
span.set_status(Status::Ok);
return file
.use_etag(true)
.use_last_modified(true)
.prefer_utf8(true)
.into_response(&request);
}
}
// Lookup chain (most-specific first, falling back as we miss):
// 1. hash-keyed (`<thumbs>/<hash[..2]>/<hash>.jpg`) — content
// identity, shared across libraries;
// 2. library-scoped legacy (`<thumbs>/<lib_id>/<rel_path>`) —
// written by current generation when hash isn't known;
// 3. bare legacy (`<thumbs>/<rel_path>`) — pre-multi-library
// thumbs from the days before library prefixing existed.
// Stage (3) goes away once a one-time migration lifts every
// bare-legacy file under a library prefix; until then it
// prevents needless 404s for already-warmed deployments.
let hash_thumb_path: Option<PathBuf> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
match dao.get_exif(&context, &relative_path_str) {
Ok(Some(row)) => row
.content_hash
.as_deref()
.map(|h| content_hash::thumbnail_path(Path::new(thumbs), h)),
_ => None,
}
};
let thumb_path = hash_thumb_path
.as_ref()
.filter(|p| p.exists())
.cloned()
.or_else(|| {
if scoped_legacy_thumb_path.exists() {
Some(scoped_legacy_thumb_path.clone())
} else {
None
}
})
.unwrap_or_else(|| bare_legacy_thumb_path.clone());
// Handle circular thumbnail request
if req.shape == Some(ThumbnailShape::Circle) {
match create_circular_thumbnail(&thumb_path, thumbs).await {
Ok(circular_path) => {
if let Ok(file) = NamedFile::open(&circular_path) {
span.set_status(Status::Ok);
return file
.use_etag(true)
.use_last_modified(true)
.prefer_utf8(true)
.into_response(&request);
}
}
Err(e) => {
warn!("Failed to create circular thumbnail: {:?}", e);
// Fall through to serve square thumbnail
}
}
}
trace!("Thumbnail path: {:?}", thumb_path);
if let Ok(file) = NamedFile::open(&thumb_path) {
span.set_status(Status::Ok);
return file
.use_etag(true)
.use_last_modified(true)
.prefer_utf8(true)
.into_response(&request);
}
}
// Full-size requests for RAW formats (NEF/CR2/ARW/etc.) can't just
// NamedFile-stream the original bytes — browsers won't decode the
// RAW container, so a `<img src=...>` lands as a broken image. Serve
// the embedded JPEG preview instead (typically the camera's in-body
// review JPEG, ~12 MP). Falls through to NamedFile if no preview is
// available, which preserves the historical behavior for callers
// that genuinely want the original bytes.
if image_size == PhotoSize::Full && exif::is_tiff_raw(&path) {
if let Some(preview) = exif::extract_embedded_jpeg_preview(&path) {
span.set_status(Status::Ok);
return HttpResponse::Ok()
.content_type("image/jpeg")
.insert_header(("Cache-Control", "public, max-age=3600"))
.body(preview);
}
}
if let Ok(file) = NamedFile::open(&path) {
span.set_status(Status::Ok);
// Enable ETag and set cache headers for full images (1 hour cache)
return file
.use_etag(true)
.use_last_modified(true)
.prefer_utf8(true)
.into_response(&request);
}
span.set_status(Status::error("Not found"));
HttpResponse::NotFound().finish()
} else {
span.set_status(Status::error("Not found"));
error!("Path does not exist in any library: {}", req.path);
HttpResponse::NotFound().finish()
}
}
async fn create_circular_thumbnail(
thumb_path: &Path,
thumbs_dir: &str,
) -> Result<PathBuf, Box<dyn Error>> {
use image::{GenericImageView, ImageBuffer, Rgba};
// Create circular thumbnails directory
let circular_dir = Path::new(thumbs_dir).join("_circular");
// Get relative path from thumbs_dir to create same structure
let relative_to_thumbs = thumb_path.strip_prefix(thumbs_dir)?;
let circular_path = circular_dir.join(relative_to_thumbs).with_extension("png");
// Check if circular thumbnail already exists
if circular_path.exists() {
return Ok(circular_path);
}
// Create parent directory if needed
if let Some(parent) = circular_path.parent() {
std::fs::create_dir_all(parent)?;
}
// Load the square thumbnail
let img = image::open(thumb_path)?;
let (width, height) = img.dimensions();
// Fixed output size for consistency
let output_size = 80u32;
let radius = output_size as f32 / 2.0;
// Calculate crop area to get square center of original image
let crop_size = width.min(height);
let crop_x = (width - crop_size) / 2;
let crop_y = (height - crop_size) / 2;
// Create a new RGBA image with transparency
let output = ImageBuffer::from_fn(output_size, output_size, |x, y| {
let dx = x as f32 - radius;
let dy = y as f32 - radius;
let distance = (dx * dx + dy * dy).sqrt();
if distance <= radius {
// Inside circle - map to cropped source area
// Scale from output coordinates to crop coordinates
let scale = crop_size as f32 / output_size as f32;
let src_x = crop_x + (x as f32 * scale) as u32;
let src_y = crop_y + (y as f32 * scale) as u32;
let pixel = img.get_pixel(src_x, src_y);
Rgba([pixel[0], pixel[1], pixel[2], 255])
} else {
// Outside circle - transparent
Rgba([0, 0, 0, 0])
}
});
// Save as PNG (supports transparency)
output.save(&circular_path)?;
Ok(circular_path)
}
#[get("/image/metadata")]
pub async fn get_file_metadata(
_: Claims,
request: HttpRequest,
path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
exif_dao: Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_file_metadata", &context);
let span_context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
let library = libraries::resolve_library_param(&app_state, path.library.as_deref())
.ok()
.flatten()
.unwrap_or_else(|| app_state.primary_library());
// Fall back to other libraries if the file isn't under the resolved one,
// matching the `/image` handler so union-mode search results resolve.
let resolved = is_valid_full_path(&library.root_path, &path.path, false)
.filter(|p| p.exists())
.map(|p| (library, p))
.or_else(|| {
app_state.libraries.iter().find_map(|lib| {
if lib.id == library.id {
return None;
}
is_valid_full_path(&lib.root_path, &path.path, false)
.filter(|p| p.exists())
.map(|p| (lib, p))
})
});
match resolved
.ok_or_else(|| ErrorKind::InvalidData.into())
.and_then(|(lib, full_path)| {
File::open(&full_path)
.and_then(|file| file.metadata())
.map(|metadata| (lib, metadata))
}) {
Ok((resolved_library, metadata)) => {
let mut response: MetadataResponse = metadata.into();
response.library_id = Some(resolved_library.id);
response.library_name = Some(resolved_library.name.clone());
// Extract date from filename if possible
response.filename_date =
memories::extract_date_from_filename(&path.path).map(|dt| dt.timestamp());
// Query EXIF data if available
if let Ok(mut dao) = exif_dao.lock()
&& let Ok(Some(exif)) = dao.get_exif(&span_context, &path.path)
{
response.exif = Some(exif.into());
}
span.add_event(
"Metadata fetched",
vec![KeyValue::new("file", path.path.clone())],
);
span.set_status(Status::Ok);
HttpResponse::Ok().json(response)
}
Err(e) => {
let message = format!("Error getting metadata for file '{}': {:?}", path.path, e);
error!("{}", message);
span.set_status(Status::error(message));
HttpResponse::InternalServerError().finish()
}
}
}
/// Body for `POST /image/exif/gps` — write GPS coordinates into a file's
/// EXIF in place. Only `path` + `latitude` + `longitude` are required.
/// `library` is optional (falls back to the primary library) and matches
/// the convention of the other path-keyed routes.
#[derive(serde::Deserialize)]
struct SetGpsRequest {
path: String,
library: Option<String>,
latitude: f64,
longitude: f64,
}
#[post("/image/exif/gps")]
pub async fn set_image_gps(
_: Claims,
request: HttpRequest,
body: web::Json<SetGpsRequest>,
app_state: Data<AppState>,
exif_dao: Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("set_image_gps", &context);
let span_context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
let library = libraries::resolve_library_param(&app_state, body.library.as_deref())
.ok()
.flatten()
.unwrap_or_else(|| app_state.primary_library());
// Same fallback as get_file_metadata: union-mode means a file may
// resolve under a sibling library.
let resolved = is_valid_full_path(&library.root_path, &body.path, false)
.filter(|p| p.exists())
.map(|p| (library, p))
.or_else(|| {
app_state.libraries.iter().find_map(|lib| {
if lib.id == library.id {
return None;
}
is_valid_full_path(&lib.root_path, &body.path, false)
.filter(|p| p.exists())
.map(|p| (lib, p))
})
});
let (resolved_library, full_path) = match resolved {
Some(v) => v,
None => {
span.set_status(Status::error("file not found"));
return HttpResponse::NotFound().body("File not found");
}
};
if !exif::supports_exif(&full_path) {
return HttpResponse::BadRequest().body("File format does not support EXIF GPS write");
}
if let Err(e) = exif::write_gps(&full_path, body.latitude, body.longitude) {
let msg = format!("exiftool write failed: {}", e);
error!("{}", msg);
span.set_status(Status::error(msg.clone()));
return HttpResponse::InternalServerError().body(msg);
}
// Re-read EXIF from disk (the write path doesn't tell us the rest of
// the parsed fields back, and we want the DB row to match what
// extract_exif_from_path would now produce). Update the existing row
// rather than insert — this endpoint is invoked on already-indexed
// files only.
let extracted = match exif::extract_exif_from_path(&full_path) {
Ok(d) => d,
Err(e) => {
// GPS was written successfully but re-extraction failed; surface
// a 500 because the DB will now disagree with disk until the
// next file scan rewrites it.
let msg = format!("EXIF re-read failed after write: {}", e);
error!("{}", msg);
return HttpResponse::InternalServerError().body(msg);
}
};
let now = Utc::now().timestamp();
let normalized_path = body.path.replace('\\', "/");
// Re-run the canonical-date waterfall on every GPS write — exiftool
// writing GPS doesn't change the capture date, but if the row was
// previously sourced from `fs_time` the re-read may have given us a
// real EXIF date this time, and we want to upgrade the source.
let resolved_date = date_resolver::resolve_date_taken(&full_path, extracted.date_taken);
let insert_exif = InsertImageExif {
library_id: resolved_library.id,
file_path: normalized_path.clone(),
camera_make: extracted.camera_make,
camera_model: extracted.camera_model,
lens_model: extracted.lens_model,
width: extracted.width,
height: extracted.height,
orientation: extracted.orientation,
gps_latitude: extracted.gps_latitude.map(|v| v as f32),
gps_longitude: extracted.gps_longitude.map(|v| v as f32),
gps_altitude: extracted.gps_altitude.map(|v| v as f32),
focal_length: extracted.focal_length.map(|v| v as f32),
aperture: extracted.aperture.map(|v| v as f32),
shutter_speed: extracted.shutter_speed,
iso: extracted.iso,
date_taken: resolved_date.map(|r| r.timestamp),
// Created_time is preserved by update_exif (it doesn't touch the
// column); pass any int — it's ignored in the UPDATE statement.
created_time: now,
last_modified: now,
// Hash + size aren't touched in update_exif either, but the file
// bytes did change — best-effort recompute so the new hash lands
// on the next call to get_exif. Failure here just leaves the old
// values in place.
content_hash: content_hash::compute(&full_path)
.ok()
.map(|c| c.content_hash),
size_bytes: content_hash::compute(&full_path).ok().map(|c| c.size_bytes),
// GPS-update path doesn't touch perceptual hashes either; columns
// ignored by update_exif. Compute best-effort so a new file lands
// with a usable signal; failure just leaves prior values in place.
phash_64: perceptual_hash::compute(&full_path).map(|h| h.phash_64),
dhash_64: perceptual_hash::compute(&full_path).map(|h| h.dhash_64),
date_taken_source: resolved_date.map(|r| r.source.as_str().to_string()),
};
let updated = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
// If the row doesn't exist yet (file isn't indexed for some reason),
// insert instead so the GPS write is at least visible the moment
// the watcher catches up.
match dao.get_exif(&span_context, &normalized_path) {
Ok(Some(_)) => dao.update_exif(&span_context, insert_exif),
Ok(None) => dao.store_exif(&span_context, insert_exif),
Err(_) => dao.update_exif(&span_context, insert_exif),
}
};
match updated {
Ok(row) => {
// Mirror the file metadata so the client gets the new size /
// mtime in the same response and can refresh its cached
// metadata block in one round-trip.
let fs_meta = std::fs::metadata(&full_path).ok();
let mut response: MetadataResponse = match fs_meta {
Some(m) => m.into(),
None => MetadataResponse {
created: None,
modified: None,
size: 0,
exif: None,
filename_date: None,
library_id: None,
library_name: None,
},
};
response.exif = Some(row.into());
response.library_id = Some(resolved_library.id);
response.library_name = Some(resolved_library.name.clone());
response.filename_date =
memories::extract_date_from_filename(&body.path).map(|dt| dt.timestamp());
span.set_status(Status::Ok);
HttpResponse::Ok().json(response)
}
Err(e) => {
let msg = format!("EXIF DB update failed: {:?}", e);
error!("{}", msg);
span.set_status(Status::error(msg.clone()));
HttpResponse::InternalServerError().body(msg)
}
}
}
/// `GET /image/exif/full?path=&library=` — full per-file EXIF dump via
/// exiftool, for the DETAILS modal's "FULL EXIF" pane. Strictly richer
/// than `/image/metadata`'s curated subset (every group exiftool can
/// see: EXIF, File, MakerNotes, Composite, ICC_Profile, IPTC, …).
///
/// On-demand only — the watcher / indexer never calls this. Falls back
/// to 503 when exiftool isn't installed (deployer guidance is the same
/// as for the RAW preview pipeline: install exiftool for full coverage).
#[get("/image/exif/full")]
pub async fn get_full_exif(
_: Claims,
request: HttpRequest,
path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_full_exif", &context);
let library = libraries::resolve_library_param(&app_state, path.library.as_deref())
.ok()
.flatten()
.unwrap_or_else(|| app_state.primary_library());
// Same union-mode fallback as get_file_metadata — the file may live
// under a sibling library when the requested one's path resolves but
// doesn't actually contain the bytes.
let resolved = is_valid_full_path(&library.root_path, &path.path, false)
.filter(|p| p.exists())
.map(|p| (library, p))
.or_else(|| {
app_state.libraries.iter().find_map(|lib| {
if lib.id == library.id {
return None;
}
is_valid_full_path(&lib.root_path, &path.path, false)
.filter(|p| p.exists())
.map(|p| (lib, p))
})
});
let (resolved_library, full_path) = match resolved {
Some(v) => v,
None => {
span.set_status(Status::error("file not found"));
return HttpResponse::NotFound().body("File not found");
}
};
// exiftool spawn is blocking — keep it off the actix worker by
// running on the blocking pool. ~50200 ms typical for a JPEG;
// longer for RAW with rich MakerNotes.
let exif_result =
web::block(move || crate::exif::read_full_exif_via_exiftool(&full_path)).await;
match exif_result {
Ok(Ok(Some(tags))) => {
span.set_status(Status::Ok);
HttpResponse::Ok().json(serde_json::json!({
"library_id": resolved_library.id,
"library_name": resolved_library.name,
"tags": tags,
}))
}
Ok(Ok(None)) => {
// exiftool ran but produced no output for this file — treat as
// empty rather than an error so the modal renders "no tags"
// gracefully.
HttpResponse::Ok().json(serde_json::json!({
"library_id": resolved_library.id,
"library_name": resolved_library.name,
"tags": serde_json::Value::Object(Default::default()),
}))
}
Ok(Err(e)) => {
let msg = format!("exiftool failed: {}", e);
error!("{}", msg);
span.set_status(Status::error(msg.clone()));
// 503 — typically "exiftool isn't on PATH" or a transient spawn
// failure. Apollo surfaces a hint in the modal.
HttpResponse::ServiceUnavailable().body(msg)
}
Err(e) => {
let msg = format!("blocking-pool error: {}", e);
error!("{}", msg);
span.set_status(Status::error(msg.clone()));
HttpResponse::InternalServerError().body(msg)
}
}
}
/// Body for `POST /image/exif/date` — operator-driven date_taken override.
/// `date_taken` is unix seconds (matches `image_exif.date_taken`'s convention
/// — naive local reinterpreted as UTC, not real UTC; the Apollo client passes
/// through the same value the photo carousel rendered before edit).
#[derive(serde::Deserialize)]
struct SetDateRequest {
path: String,
library: Option<String>,
date_taken: i64,
}
/// Body for `POST /image/exif/date/clear` — revert a manual override and
/// restore the resolver-derived `(date_taken, date_taken_source)` pair from
/// the snapshot.
#[derive(serde::Deserialize)]
struct ClearDateRequest {
path: String,
library: Option<String>,
}
/// Build a `MetadataResponse` for the date endpoints. Mirrors
/// `get_file_metadata`'s shape so the client gets a single source of truth
/// after every mutation. Filesystem metadata is best-effort: if the file is
/// on a stale mount or moved, the DB-side override still succeeds and the
/// response carries `created=None, modified=None, size=0`. The DB row's
/// updated EXIF is what matters here.
fn build_metadata_response_for_date_mutation(
library: &libraries::Library,
rel_path: &str,
exif: ImageExif,
) -> MetadataResponse {
let full_path = is_valid_full_path(&library.root_path, &rel_path.to_string(), false);
let fs_meta = full_path
.as_ref()
.filter(|p| p.exists())
.and_then(|p| std::fs::metadata(p).ok());
let mut response: MetadataResponse = match fs_meta {
Some(m) => m.into(),
None => MetadataResponse {
created: None,
modified: None,
size: 0,
exif: None,
filename_date: None,
library_id: None,
library_name: None,
},
};
response.exif = Some(exif.into());
response.library_id = Some(library.id);
response.library_name = Some(library.name.clone());
response.filename_date =
memories::extract_date_from_filename(rel_path).map(|dt| dt.timestamp());
response
}
#[post("/image/exif/date")]
pub async fn set_image_date(
_: Claims,
request: HttpRequest,
body: web::Json<SetDateRequest>,
app_state: Data<AppState>,
exif_dao: Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("set_image_date", &context);
let span_context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
let library = match libraries::resolve_library_param(&app_state, body.library.as_deref()) {
Ok(Some(lib)) => lib,
Ok(None) => app_state.primary_library(),
Err(msg) => {
span.set_status(Status::error(msg.clone()));
return HttpResponse::BadRequest().body(msg);
}
};
// Path normalization matches set_image_gps so a Windows-import client
// doesn't end up with a backslash variant that misses the row.
let normalized_path = body.path.replace('\\', "/");
let updated = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.set_manual_date_taken(&span_context, library.id, &normalized_path, body.date_taken)
};
match updated {
Ok(row) => {
span.set_status(Status::Ok);
HttpResponse::Ok().json(build_metadata_response_for_date_mutation(
&library,
&normalized_path,
row,
))
}
Err(e) => {
let msg = format!("set_manual_date_taken failed: {:?}", e);
error!("{}", msg);
span.set_status(Status::error(msg.clone()));
match e.kind {
DbErrorKind::NotFound => HttpResponse::NotFound().body(msg),
_ => HttpResponse::InternalServerError().body(msg),
}
}
}
}
#[post("/image/exif/date/clear")]
pub async fn clear_image_date(
_: Claims,
request: HttpRequest,
body: web::Json<ClearDateRequest>,
app_state: Data<AppState>,
exif_dao: Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("clear_image_date", &context);
let span_context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
let library = match libraries::resolve_library_param(&app_state, body.library.as_deref()) {
Ok(Some(lib)) => lib,
Ok(None) => app_state.primary_library(),
Err(msg) => {
span.set_status(Status::error(msg.clone()));
return HttpResponse::BadRequest().body(msg);
}
};
let normalized_path = body.path.replace('\\', "/");
let updated = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.clear_manual_date_taken(&span_context, library.id, &normalized_path)
};
match updated {
Ok(row) => {
span.set_status(Status::Ok);
HttpResponse::Ok().json(build_metadata_response_for_date_mutation(
&library,
&normalized_path,
row,
))
}
Err(e) => {
let msg = format!("clear_manual_date_taken failed: {:?}", e);
error!("{}", msg);
span.set_status(Status::error(msg.clone()));
match e.kind {
DbErrorKind::NotFound => HttpResponse::NotFound().body(msg),
_ => HttpResponse::InternalServerError().body(msg),
}
}
}
}
#[derive(serde::Deserialize)]
struct UploadQuery {
library: Option<String>,
}
#[post("/image")]
pub async fn upload_image(
_: Claims,
request: HttpRequest,
query: web::Query<UploadQuery>,
mut payload: mp::Multipart,
app_state: Data<AppState>,
exif_dao: Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("upload_image", &context);
let span_context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
// Resolve the optional library selector. Absent → primary library
// (backwards-compatible with clients that don't yet send `library=`).
let target_library =
match libraries::resolve_library_param(&app_state, query.library.as_deref()) {
Ok(Some(lib)) => lib,
Ok(None) => app_state.primary_library(),
Err(msg) => {
span.set_status(Status::error(msg.clone()));
return HttpResponse::BadRequest().body(msg);
}
};
let mut file_content: BytesMut = BytesMut::new();
let mut file_name: Option<String> = None;
let mut file_path: Option<String> = None;
while let Some(Ok(mut part)) = payload.next().await {
if let Some(content_type) = part.content_disposition() {
debug!("{:?}", content_type);
if let Some(filename) = content_type.get_filename() {
debug!("Name (raw): {:?}", filename);
// Decode URL-encoded filename (e.g., "file%20name.jpg" -> "file name.jpg")
let decoded_filename = decode(filename)
.map(|s| s.to_string())
.unwrap_or_else(|_| filename.to_string());
debug!("Name (decoded): {:?}", decoded_filename);
file_name = Some(decoded_filename);
while let Some(Ok(data)) = part.next().await {
file_content.put(data);
}
} else if content_type.get_name() == Some("path") {
while let Some(Ok(data)) = part.next().await {
if let Ok(path) = std::str::from_utf8(&data) {
file_path = Some(path.to_string())
}
}
}
}
}
let path = file_path.unwrap_or_else(|| target_library.root_path.clone());
if !file_content.is_empty() {
if file_name.is_none() {
span.set_status(Status::error("No filename provided"));
return HttpResponse::BadRequest().body("No filename provided");
}
let full_path = PathBuf::from(&path).join(file_name.unwrap());
if let Some(full_path) = is_valid_full_path(
&target_library.root_path,
&full_path.to_str().unwrap().to_string(),
true,
) {
// Pre-write content-hash check: if these exact bytes already
// exist anywhere in any library (and aren't themselves
// soft-marked as duplicates), don't write the file. Return
// 409 with the canonical sibling so the mobile app can show
// a friendly "already in your library" toast.
let upload_hash = blake3::Hasher::new()
.update(&file_content)
.finalize()
.to_hex()
.to_string();
{
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
if let Ok(Some(existing)) = dao.find_by_content_hash(&span_context, &upload_hash)
&& existing.duplicate_of_hash.is_none()
{
let library_name = libraries::load_all(&mut crate::database::connect())
.into_iter()
.find(|l| l.id == existing.library_id)
.map(|l| l.name);
span.set_status(Status::Ok);
return HttpResponse::Conflict().json(serde_json::json!({
"duplicate_of": {
"library_id": existing.library_id,
"rel_path": existing.file_path,
},
"content_hash": upload_hash,
"library_name": library_name,
}));
}
}
let context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
tracer
.span_builder("file write")
.start_with_context(&tracer, &context);
let uploaded_path = if !full_path.is_file() && is_image_or_video(&full_path) {
let mut file = File::create(&full_path).unwrap();
file.write_all(&file_content).unwrap();
info!("Uploaded: {:?}", full_path);
full_path
} else {
warn!("File already exists: {:?}", full_path);
let new_path = format!(
"{}/{}_{}.{}",
full_path.parent().unwrap().to_str().unwrap(),
full_path.file_stem().unwrap().to_str().unwrap(),
Utc::now().timestamp(),
full_path
.extension()
.expect("Uploaded file should have an extension")
.to_str()
.unwrap()
);
info!("Uploaded: {}", new_path);
let new_path_buf = PathBuf::from(&new_path);
let mut file = File::create(&new_path_buf).unwrap();
file.write_all(&file_content).unwrap();
new_path_buf
};
// Extract and store EXIF data if file supports it
if exif::supports_exif(&uploaded_path) {
let relative_path = uploaded_path
.strip_prefix(&target_library.root_path)
.expect("Error stripping library root prefix")
.to_str()
.unwrap()
.replace('\\', "/");
match exif::extract_exif_from_path(&uploaded_path) {
Ok(exif_data) => {
let timestamp = Utc::now().timestamp();
let (content_hash, size_bytes) = match content_hash::compute(&uploaded_path)
{
Ok(id) => (Some(id.content_hash), Some(id.size_bytes)),
Err(e) => {
warn!(
"Failed to hash uploaded {}: {:?}",
uploaded_path.display(),
e
);
(None, None)
}
};
let perceptual = perceptual_hash::compute(&uploaded_path);
let resolved_date =
date_resolver::resolve_date_taken(&uploaded_path, exif_data.date_taken);
let insert_exif = InsertImageExif {
library_id: target_library.id,
file_path: relative_path.clone(),
camera_make: exif_data.camera_make,
camera_model: exif_data.camera_model,
lens_model: exif_data.lens_model,
width: exif_data.width,
height: exif_data.height,
orientation: exif_data.orientation,
gps_latitude: exif_data.gps_latitude.map(|v| v as f32),
gps_longitude: exif_data.gps_longitude.map(|v| v as f32),
gps_altitude: exif_data.gps_altitude.map(|v| v as f32),
focal_length: exif_data.focal_length.map(|v| v as f32),
aperture: exif_data.aperture.map(|v| v as f32),
shutter_speed: exif_data.shutter_speed,
iso: exif_data.iso,
date_taken: resolved_date.map(|r| r.timestamp),
created_time: timestamp,
last_modified: timestamp,
content_hash,
size_bytes,
phash_64: perceptual.map(|h| h.phash_64),
dhash_64: perceptual.map(|h| h.dhash_64),
date_taken_source: resolved_date.map(|r| r.source.as_str().to_string()),
};
if let Ok(mut dao) = exif_dao.lock() {
if let Err(e) = dao.store_exif(&span_context, insert_exif) {
error!("Failed to store EXIF data for {}: {:?}", relative_path, e);
} else {
debug!("EXIF data stored for {}", relative_path);
}
}
}
Err(e) => {
debug!(
"No EXIF data or error extracting from {}: {:?}",
uploaded_path.display(),
e
);
}
}
}
} else {
error!("Invalid path for upload: {:?}", full_path);
span.set_status(Status::error("Invalid path for upload"));
return HttpResponse::BadRequest().body("Path was not valid");
}
} else {
span.set_status(Status::error("No file body read"));
return HttpResponse::BadRequest().body("No file body read");
}
app_state.stream_manager.do_send(RefreshThumbnailsMessage);
span.set_status(Status::Ok);
HttpResponse::Ok().finish()
}

9
src/handlers/mod.rs Normal file
View File

@@ -0,0 +1,9 @@
//! HTTP route handlers, grouped by domain.
//!
//! These were previously inlined in `main.rs`; moving them out keeps
//! `main()` focused on startup wiring and makes each domain
//! independently testable with `actix_web::test::init_service`.
pub mod favorites;
pub mod image;
pub mod video;

665
src/handlers/video.rs Normal file
View File

@@ -0,0 +1,665 @@
//! Video-related endpoints: HLS playlist generation, segment streaming,
//! and the short-clip preview pipeline.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Mutex;
use actix_files::NamedFile;
use actix_web::{
HttpRequest, HttpResponse, Responder, get, post,
web::{self, Data},
};
use log::{debug, error, info, warn};
use opentelemetry::trace::{Span, Status, Tracer};
use opentelemetry::{KeyValue, global};
use crate::data::{
Claims, PreviewClipRequest, PreviewStatusItem, PreviewStatusRequest, PreviewStatusResponse,
ThumbnailRequest,
};
use crate::database::PreviewDao;
use crate::files::is_valid_full_path;
use crate::libraries;
use crate::otel::{extract_context_from_request, global_tracer};
use crate::state::AppState;
use crate::video::actors::{GeneratePreviewClipMessage, ProcessMessage, create_playlist};
#[post("/video/generate")]
pub async fn generate_video(
_claims: Claims,
request: HttpRequest,
app_state: Data<AppState>,
body: web::Json<ThumbnailRequest>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("generate_video", &context);
let filename = PathBuf::from(&body.path);
if let Some(name) = filename.file_name() {
let filename = name.to_str().expect("Filename should convert to string");
// KNOWN ISSUE (multi-library): playlist filename is the basename
// alone, so two source files with the same basename — whether in
// different libraries or different subdirs of one library —
// overwrite each other's playlists while ffmpeg runs. The
// hash-keyed `content_hash::hls_dir` is the long-term answer
// (see CLAUDE.md "Multi-library data model"); rewiring the
// actor pipeline to use it is out of scope for this branch.
// The orphan-cleanup job above already walks every library so
// it doesn't false-delete archive playlists.
let playlist = format!("{}/{}.m3u8", app_state.video_path, filename);
let library = libraries::resolve_library_param(&app_state, body.library.as_deref())
.ok()
.flatten()
.unwrap_or_else(|| app_state.primary_library());
// Try the resolved library first, then fall back to any other library
// that actually contains the file — handles union-mode requests where
// the mobile client passes no library but the file lives in a
// non-primary library.
let resolved = is_valid_full_path(&library.root_path, &body.path, false)
.filter(|p| p.exists())
.or_else(|| {
app_state.libraries.iter().find_map(|lib| {
if lib.id == library.id {
return None;
}
is_valid_full_path(&lib.root_path, &body.path, false).filter(|p| p.exists())
})
});
if let Some(path) = resolved {
if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await {
span.add_event(
"playlist_created".to_string(),
vec![KeyValue::new("playlist-name", filename.to_string())],
);
span.set_status(Status::Ok);
app_state.stream_manager.do_send(ProcessMessage(
playlist.clone(),
child,
// opentelemetry::Context::new().with_span(span),
));
}
} else {
span.set_status(Status::error(format!("invalid path {:?}", &body.path)));
return HttpResponse::BadRequest().finish();
}
HttpResponse::Ok().json(playlist)
} else {
let message = format!("Unable to get file name: {:?}", filename);
error!("{}", message);
span.set_status(Status::error(message));
HttpResponse::BadRequest().finish()
}
}
#[get("/video/stream")]
pub async fn stream_video(
request: HttpRequest,
_: Claims,
path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global::tracer("image-server");
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("stream_video", &context);
let playlist = &path.path;
debug!("Playlist: {}", playlist);
// Only serve files under video_path (HLS playlists) or base_path (source videos)
if playlist.starts_with(&app_state.video_path)
|| is_valid_full_path(&app_state.base_path, playlist, false).is_some()
{
match NamedFile::open(playlist) {
Ok(file) => {
span.set_status(Status::Ok);
file.into_response(&request)
}
_ => {
span.set_status(Status::error(format!("playlist not found {}", playlist)));
HttpResponse::NotFound().finish()
}
}
} else {
span.set_status(Status::error(format!("playlist not valid {}", playlist)));
HttpResponse::BadRequest().finish()
}
}
#[get("/video/{path}")]
pub async fn get_video_part(
request: HttpRequest,
_: Claims,
path: web::Path<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_video_part", &context);
let part = &path.path;
debug!("Video part: {}", part);
let mut file_part = PathBuf::new();
file_part.push(app_state.video_path.clone());
file_part.push(part);
// Guard against directory traversal attacks
let canonical_base = match std::fs::canonicalize(&app_state.video_path) {
Ok(path) => path,
Err(e) => {
error!("Failed to canonicalize video path: {:?}", e);
span.set_status(Status::error("Invalid video path configuration"));
return HttpResponse::InternalServerError().finish();
}
};
let canonical_file = match std::fs::canonicalize(&file_part) {
Ok(path) => path,
Err(_) => {
warn!("Video part not found or invalid: {:?}", file_part);
span.set_status(Status::error(format!("Video part not found '{}'", part)));
return HttpResponse::NotFound().finish();
}
};
// Ensure the resolved path is still within the video directory
if !canonical_file.starts_with(&canonical_base) {
warn!("Directory traversal attempt detected: {:?}", part);
span.set_status(Status::error("Invalid video path"));
return HttpResponse::Forbidden().finish();
}
match NamedFile::open(&canonical_file) {
Ok(file) => {
span.set_status(Status::Ok);
file.into_response(&request)
}
_ => {
error!("Video part not found: {:?}", file_part);
span.set_status(Status::error(format!(
"Video part not found '{}'",
file_part.to_str().unwrap()
)));
HttpResponse::NotFound().finish()
}
}
}
#[get("/video/preview")]
pub async fn get_video_preview(
_claims: Claims,
request: HttpRequest,
req: web::Query<PreviewClipRequest>,
app_state: Data<AppState>,
preview_dao: Data<Mutex<Box<dyn PreviewDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_video_preview", &context);
// Validate path
let full_path = match is_valid_full_path(&app_state.base_path, &req.path, true) {
Some(path) => path,
None => {
span.set_status(Status::error("Invalid path"));
return HttpResponse::BadRequest().json(serde_json::json!({"error": "Invalid path"}));
}
};
let full_path_str = full_path.to_string_lossy().to_string();
// Use relative path (from BASE_PATH) for DB storage, consistent with EXIF convention
let relative_path = full_path_str
.strip_prefix(&app_state.base_path)
.unwrap_or(&full_path_str)
.trim_start_matches(['/', '\\'])
.to_string();
// Check preview status in DB
let preview = {
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
dao.get_preview(&context, &relative_path)
};
match preview {
Ok(Some(clip)) => match clip.status.as_str() {
"complete" => {
let preview_path = PathBuf::from(&app_state.preview_clips_path)
.join(&relative_path)
.with_extension("mp4");
match NamedFile::open(&preview_path) {
Ok(file) => {
span.set_status(Status::Ok);
file.into_response(&request)
}
Err(_) => {
// File missing on disk but DB says complete - reset and regenerate
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.update_status(
&context,
&relative_path,
"pending",
None,
None,
None,
);
app_state
.preview_clip_generator
.do_send(GeneratePreviewClipMessage {
video_path: full_path_str,
});
span.set_status(Status::Ok);
HttpResponse::Accepted().json(serde_json::json!({
"status": "processing",
"path": req.path
}))
}
}
}
"processing" => {
span.set_status(Status::Ok);
HttpResponse::Accepted().json(serde_json::json!({
"status": "processing",
"path": req.path
}))
}
"failed" => {
let error_msg = clip
.error_message
.unwrap_or_else(|| "Unknown error".to_string());
span.set_status(Status::error(format!("Generation failed: {}", error_msg)));
HttpResponse::InternalServerError().json(serde_json::json!({
"error": format!("Generation failed: {}", error_msg)
}))
}
_ => {
// pending or unknown status - trigger generation
app_state
.preview_clip_generator
.do_send(GeneratePreviewClipMessage {
video_path: full_path_str,
});
span.set_status(Status::Ok);
HttpResponse::Accepted().json(serde_json::json!({
"status": "processing",
"path": req.path
}))
}
},
Ok(None) => {
// No record exists - insert as pending and trigger generation
{
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.insert_preview(&context, &relative_path, "pending");
}
app_state
.preview_clip_generator
.do_send(GeneratePreviewClipMessage {
video_path: full_path_str,
});
span.set_status(Status::Ok);
HttpResponse::Accepted().json(serde_json::json!({
"status": "processing",
"path": req.path
}))
}
Err(_) => {
span.set_status(Status::error("Database error"));
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
#[post("/video/preview/status")]
pub async fn get_preview_status(
_claims: Claims,
request: HttpRequest,
body: web::Json<PreviewStatusRequest>,
app_state: Data<AppState>,
preview_dao: Data<Mutex<Box<dyn PreviewDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_preview_status", &context);
// Limit to 200 paths per request
if body.paths.len() > 200 {
span.set_status(Status::error("Too many paths"));
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "Maximum 200 paths per request"}));
}
let previews = {
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
dao.get_previews_batch(&context, &body.paths)
};
match previews {
Ok(clips) => {
// Build a map of file_path -> VideoPreviewClip for quick lookup
let clip_map: HashMap<String, _> = clips
.into_iter()
.map(|clip| (clip.file_path.clone(), clip))
.collect();
let mut items: Vec<PreviewStatusItem> = Vec::with_capacity(body.paths.len());
for path in &body.paths {
if let Some(clip) = clip_map.get(path) {
// Re-queue generation for stale pending/failed records
if clip.status == "pending" || clip.status == "failed" {
let full_path = format!(
"{}/{}",
app_state.base_path.trim_end_matches(['/', '\\']),
path.trim_start_matches(['/', '\\'])
);
app_state
.preview_clip_generator
.do_send(GeneratePreviewClipMessage {
video_path: full_path,
});
}
items.push(PreviewStatusItem {
path: path.clone(),
status: clip.status.clone(),
preview_url: if clip.status == "complete" {
Some(format!("/video/preview?path={}", urlencoding::encode(path)))
} else {
None
},
});
} else {
// No record exists — insert as pending and trigger generation
{
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.insert_preview(&context, path, "pending");
}
// Build full path for ffmpeg (actor needs the absolute path for input)
let full_path = format!(
"{}/{}",
app_state.base_path.trim_end_matches(['/', '\\']),
path.trim_start_matches(['/', '\\'])
);
info!("Triggering preview generation for '{}'", path);
app_state
.preview_clip_generator
.do_send(GeneratePreviewClipMessage {
video_path: full_path,
});
items.push(PreviewStatusItem {
path: path.clone(),
status: "pending".to_string(),
preview_url: None,
});
}
}
span.set_status(Status::Ok);
HttpResponse::Ok().json(PreviewStatusResponse { previews: items })
}
Err(_) => {
span.set_status(Status::error("Database error"));
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::Claims;
use crate::database::PreviewDao;
use crate::testhelpers::TestPreviewDao;
use actix_web::App;
fn make_token() -> String {
let claims = Claims::valid_user("1".to_string());
jsonwebtoken::encode(
&jsonwebtoken::Header::default(),
&claims,
&jsonwebtoken::EncodingKey::from_secret(b"test_key"),
)
.unwrap()
}
fn make_preview_dao(dao: TestPreviewDao) -> Data<Mutex<Box<dyn PreviewDao>>> {
Data::new(Mutex::new(Box::new(dao) as Box<dyn PreviewDao>))
}
#[actix_rt::test]
async fn test_get_preview_status_returns_pending_for_unknown() {
let dao = TestPreviewDao::new();
let preview_dao = make_preview_dao(dao);
let app_state = Data::new(AppState::test_state());
let token = make_token();
let app = actix_web::test::init_service(
App::new()
.service(get_preview_status)
.app_data(app_state)
.app_data(preview_dao.clone()),
)
.await;
let req = actix_web::test::TestRequest::post()
.uri("/video/preview/status")
.insert_header(("Authorization", format!("Bearer {}", token)))
.set_json(serde_json::json!({"paths": ["photos/new_video.mp4"]}))
.to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
let body: serde_json::Value = actix_web::test::read_body_json(resp).await;
let previews = body["previews"].as_array().unwrap();
assert_eq!(previews.len(), 1);
assert_eq!(previews[0]["status"], "pending");
// Verify the DAO now has a pending record
let mut dao_lock = preview_dao.lock().unwrap();
let ctx = opentelemetry::Context::new();
let clip = dao_lock.get_preview(&ctx, "photos/new_video.mp4").unwrap();
assert!(clip.is_some());
assert_eq!(clip.unwrap().status, "pending");
}
#[actix_rt::test]
async fn test_get_preview_status_returns_complete_with_url() {
let mut dao = TestPreviewDao::new();
let ctx = opentelemetry::Context::new();
dao.insert_preview(&ctx, "photos/done.mp4", "pending")
.unwrap();
dao.update_status(
&ctx,
"photos/done.mp4",
"complete",
Some(9.5),
Some(500000),
None,
)
.unwrap();
let preview_dao = make_preview_dao(dao);
let app_state = Data::new(AppState::test_state());
let token = make_token();
let app = actix_web::test::init_service(
App::new()
.service(get_preview_status)
.app_data(app_state)
.app_data(preview_dao),
)
.await;
let req = actix_web::test::TestRequest::post()
.uri("/video/preview/status")
.insert_header(("Authorization", format!("Bearer {}", token)))
.set_json(serde_json::json!({"paths": ["photos/done.mp4"]}))
.to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
let body: serde_json::Value = actix_web::test::read_body_json(resp).await;
let previews = body["previews"].as_array().unwrap();
assert_eq!(previews.len(), 1);
assert_eq!(previews[0]["status"], "complete");
assert!(
previews[0]["preview_url"]
.as_str()
.unwrap()
.contains("photos%2Fdone.mp4")
);
}
#[actix_rt::test]
async fn test_get_preview_status_rejects_over_200_paths() {
let dao = TestPreviewDao::new();
let preview_dao = make_preview_dao(dao);
let app_state = Data::new(AppState::test_state());
let token = make_token();
let app = actix_web::test::init_service(
App::new()
.service(get_preview_status)
.app_data(app_state)
.app_data(preview_dao),
)
.await;
let paths: Vec<String> = (0..201).map(|i| format!("video_{}.mp4", i)).collect();
let req = actix_web::test::TestRequest::post()
.uri("/video/preview/status")
.insert_header(("Authorization", format!("Bearer {}", token)))
.set_json(serde_json::json!({"paths": paths}))
.to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), 400);
}
#[actix_rt::test]
async fn test_get_preview_status_mixed_statuses() {
let mut dao = TestPreviewDao::new();
let ctx = opentelemetry::Context::new();
dao.insert_preview(&ctx, "a.mp4", "pending").unwrap();
dao.insert_preview(&ctx, "b.mp4", "pending").unwrap();
dao.update_status(&ctx, "b.mp4", "complete", Some(10.0), Some(100000), None)
.unwrap();
let preview_dao = make_preview_dao(dao);
let app_state = Data::new(AppState::test_state());
let token = make_token();
let app = actix_web::test::init_service(
App::new()
.service(get_preview_status)
.app_data(app_state)
.app_data(preview_dao),
)
.await;
let req = actix_web::test::TestRequest::post()
.uri("/video/preview/status")
.insert_header(("Authorization", format!("Bearer {}", token)))
.set_json(serde_json::json!({"paths": ["a.mp4", "b.mp4", "c.mp4"]}))
.to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
let body: serde_json::Value = actix_web::test::read_body_json(resp).await;
let previews = body["previews"].as_array().unwrap();
assert_eq!(previews.len(), 3);
// a.mp4 is pending
assert_eq!(previews[0]["path"], "a.mp4");
assert_eq!(previews[0]["status"], "pending");
// b.mp4 is complete with URL
assert_eq!(previews[1]["path"], "b.mp4");
assert_eq!(previews[1]["status"], "complete");
assert!(previews[1]["preview_url"].is_string());
// c.mp4 was not found — handler inserts pending
assert_eq!(previews[2]["path"], "c.mp4");
assert_eq!(previews[2]["status"], "pending");
}
/// Verifies that the status endpoint re-queues generation for stale
/// "pending" and "failed" records (e.g., after a server restart or
/// when clip files were deleted). The do_send to the actor exercises
/// the re-queue code path; the actor runs against temp dirs so it
/// won't panic.
#[actix_rt::test]
async fn test_get_preview_status_requeues_pending_and_failed() {
let mut dao = TestPreviewDao::new();
let ctx = opentelemetry::Context::new();
// Simulate stale records left from a previous server run
dao.insert_preview(&ctx, "stale/pending.mp4", "pending")
.unwrap();
dao.insert_preview(&ctx, "stale/failed.mp4", "pending")
.unwrap();
dao.update_status(
&ctx,
"stale/failed.mp4",
"failed",
None,
None,
Some("ffmpeg error"),
)
.unwrap();
let preview_dao = make_preview_dao(dao);
let app_state = Data::new(AppState::test_state());
let token = make_token();
let app = actix_web::test::init_service(
App::new()
.service(get_preview_status)
.app_data(app_state)
.app_data(preview_dao),
)
.await;
let req = actix_web::test::TestRequest::post()
.uri("/video/preview/status")
.insert_header(("Authorization", format!("Bearer {}", token)))
.set_json(serde_json::json!({
"paths": ["stale/pending.mp4", "stale/failed.mp4"]
}))
.to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
let body: serde_json::Value = actix_web::test::read_body_json(resp).await;
let previews = body["previews"].as_array().unwrap();
assert_eq!(previews.len(), 2);
// Both records are returned with their current status
assert_eq!(previews[0]["path"], "stale/pending.mp4");
assert_eq!(previews[0]["status"], "pending");
assert!(previews[0].get("preview_url").is_none());
assert_eq!(previews[1]["path"], "stale/failed.mp4");
assert_eq!(previews[1]["status"], "failed");
assert!(previews[1].get("preview_url").is_none());
}
}

View File

@@ -32,6 +32,7 @@ pub mod state;
pub mod tags;
#[cfg(test)]
pub mod testhelpers;
pub mod thumbnails;
pub mod utils;
pub mod video;
@@ -39,20 +40,3 @@ pub mod video;
pub use data::{Claims, ThumbnailRequest};
pub use database::{connect, schema};
pub use state::AppState;
// Stub functions for modules that reference main.rs
// These are not used by cleanup_files binary
use std::path::Path;
use walkdir::DirEntry;
pub fn create_thumbnails(_libs: &[libraries::Library], _excluded_dirs: &[String]) {
// Stub - implemented in main.rs
}
pub fn update_media_counts(_media_dir: &Path, _excluded_dirs: &[String]) {
// Stub - implemented in main.rs
}
pub fn is_video(entry: &DirEntry) -> bool {
file_types::direntry_is_video(entry)
}

File diff suppressed because it is too large Load Diff

275
src/thumbnails.rs Normal file
View File

@@ -0,0 +1,275 @@
//! Thumbnail generation + the media-count Prometheus gauges.
//!
//! Startup and per-tick scans walk each library and produce a 200×200
//! thumbnail under `THUMBNAILS/<library_id>/<rel_path>`, falling through
//! a fast path (`image` crate), a RAW-preview path (`exif::extract_embedded_jpeg_preview`),
//! and ffmpeg for video / HEIF / NEF / ARW. Files that fail every
//! decoder get a sibling `.unsupported` sentinel so subsequent scans
//! skip them silently.
use std::path::{Path, PathBuf};
use lazy_static::lazy_static;
use log::{debug, error, info, warn};
use opentelemetry::{
KeyValue,
trace::{Span, TraceContextExt, Tracer},
};
use prometheus::IntGauge;
use rayon::prelude::*;
use walkdir::DirEntry;
use crate::content_hash;
use crate::exif;
use crate::file_types;
use crate::libraries;
use crate::otel::global_tracer;
use crate::video::actors::{generate_image_thumbnail_ffmpeg, generate_video_thumbnail};
lazy_static! {
pub static ref IMAGE_GAUGE: IntGauge = IntGauge::new(
"imageserver_image_total",
"Count of the images on the server"
)
.unwrap();
pub static ref VIDEO_GAUGE: IntGauge = IntGauge::new(
"imageserver_video_total",
"Count of the videos on the server"
)
.unwrap();
}
/// Sentinel path written next to a would-be thumbnail when a file cannot be
/// decoded by either the `image` crate or ffmpeg. Its presence causes future
/// scans to skip the file instead of re-logging the failure.
pub fn unsupported_thumbnail_sentinel(thumb_path: &Path) -> PathBuf {
let mut s = thumb_path.as_os_str().to_owned();
s.push(".unsupported");
PathBuf::from(s)
}
pub fn generate_image_thumbnail(src: &Path, thumb_path: &Path) -> std::io::Result<()> {
// The `image` crate doesn't auto-apply EXIF Orientation on load, and
// saving back out as JPEG drops EXIF entirely — so without baking the
// rotation into the pixels here, browsers see the raw landscape buffer
// of a portrait phone shot and render it sideways. Read once up front
// and apply to whichever decode branch we end up taking.
let orientation = exif::read_orientation(src).unwrap_or(1);
// RAW formats (ARW/NEF/CR2/etc): try the file's embedded JPEG preview
// first. Avoids ffmpeg choking on proprietary RAW compression (Sony ARW
// in particular), and is faster than decoding RAW pixels anyway.
if let Some(preview) = exif::extract_embedded_jpeg_preview(src) {
let img = image::load_from_memory(&preview).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("decode embedded preview {:?}: {}", src, e),
)
})?;
let img = exif::apply_orientation(img, orientation);
let scaled = img.thumbnail(200, u32::MAX);
scaled
.save_with_format(thumb_path, image::ImageFormat::Jpeg)
.map_err(|e| std::io::Error::other(format!("save {:?}: {}", thumb_path, e)))?;
return Ok(());
}
if file_types::needs_ffmpeg_thumbnail(src) {
return generate_image_thumbnail_ffmpeg(src, thumb_path);
}
let img = image::open(src).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, format!("{:?}: {}", src, e))
})?;
let img = exif::apply_orientation(img, orientation);
let scaled = img.thumbnail(200, u32::MAX);
scaled
.save(thumb_path)
.map_err(|e| std::io::Error::other(format!("save {:?}: {}", thumb_path, e)))?;
Ok(())
}
pub fn create_thumbnails(libs: &[libraries::Library], excluded_dirs: &[String]) {
let tracer = global_tracer();
let span = tracer.start("creating thumbnails");
let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
let thumbnail_directory: &Path = Path::new(thumbs);
for lib in libs {
info!(
"Scanning thumbnails for library '{}' at {}",
lib.name, lib.root_path
);
let images = PathBuf::from(&lib.root_path);
// Effective excludes = global env-var excludes library row's
// excluded_dirs. Lets a parent-library mount skip the subtree
// already covered by a child library.
let effective_excludes = lib.effective_excluded_dirs(excluded_dirs);
// Prune EXCLUDED_DIRS so we don't generate thumbnails-of-thumbnails
// for Synology @eaDir trees. file_scan handles filter_entry pruning.
crate::file_scan::walk_library_files(&images, &effective_excludes)
.into_par_iter()
.for_each(|entry| {
let src = entry.path();
let Ok(relative_path) = src.strip_prefix(&images) else {
return;
};
// Library-scoped legacy path: prevents two libraries with
// the same rel_path from clobbering each other's thumbs.
// Hash-keyed promotion happens lazily on first hash-aware
// request — keeping this loop ExifDao-free preserves the
// current "cargo build && go" startup story.
let thumb_path = content_hash::library_scoped_legacy_path(
thumbnail_directory,
lib.id,
relative_path,
);
let bare_legacy = thumbnail_directory.join(relative_path);
// Backwards-compat check: if a single-library install has a
// bare-legacy thumb here already, accept it as present.
// Same for the sentinel. Means we don't redo work after
// upgrade and we don't leave stale duplicates around.
if thumb_path.exists()
|| bare_legacy.exists()
|| unsupported_thumbnail_sentinel(&thumb_path).exists()
|| unsupported_thumbnail_sentinel(&bare_legacy).exists()
{
return;
}
let Some(parent) = thumb_path.parent() else {
return;
};
if let Err(e) = std::fs::create_dir_all(parent) {
error!("Failed to create thumbnail dir {:?}: {}", parent, e);
return;
}
if is_video(&entry) {
let mut video_span = tracer.start_with_context(
"generate_video_thumbnail",
&opentelemetry::Context::new()
.with_remote_span_context(span.span_context().clone()),
);
video_span.set_attributes(vec![
KeyValue::new("type", "video"),
KeyValue::new("file-name", thumb_path.display().to_string()),
KeyValue::new("library", lib.name.clone()),
]);
debug!("Generating video thumbnail: {:?}", thumb_path);
if let Err(e) = generate_video_thumbnail(src, &thumb_path) {
let sentinel = unsupported_thumbnail_sentinel(&thumb_path);
error!(
"Unable to thumbnail video {:?}: {}. Writing sentinel {:?}",
src, e, sentinel
);
if let Err(se) = std::fs::write(&sentinel, b"") {
warn!("Failed to write sentinel {:?}: {}", sentinel, se);
}
}
video_span.end();
} else if is_image(&entry) {
match generate_image_thumbnail(src, &thumb_path) {
Ok(_) => info!("Saved thumbnail: {:?}", thumb_path),
Err(e) => {
let sentinel = unsupported_thumbnail_sentinel(&thumb_path);
error!(
"Unable to thumbnail {:?}: {}. Writing sentinel {:?}",
src, e, sentinel
);
if let Err(se) = std::fs::write(&sentinel, b"") {
warn!("Failed to write sentinel {:?}: {}", sentinel, se);
}
}
}
}
});
}
debug!("Finished making thumbnails");
for lib in libs {
let effective_excludes = lib.effective_excluded_dirs(excluded_dirs);
update_media_counts(Path::new(&lib.root_path), &effective_excludes);
}
}
pub fn update_media_counts(media_dir: &Path, excluded_dirs: &[String]) {
let mut image_count = 0;
let mut video_count = 0;
for entry in crate::file_scan::walk_library_files(media_dir, excluded_dirs) {
if is_image(&entry) {
image_count += 1;
} else if is_video(&entry) {
video_count += 1;
}
}
IMAGE_GAUGE.set(image_count);
VIDEO_GAUGE.set(video_count);
}
pub fn is_image(entry: &DirEntry) -> bool {
file_types::direntry_is_image(entry)
}
pub fn is_video(entry: &DirEntry) -> bool {
file_types::direntry_is_video(entry)
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use tempfile::TempDir;
#[test]
fn unsupported_thumbnail_sentinel_appends_suffix() {
let p = Path::new("/thumbs/lib1/photo.jpg");
let s = unsupported_thumbnail_sentinel(p);
assert_eq!(s, PathBuf::from("/thumbs/lib1/photo.jpg.unsupported"));
}
#[test]
fn unsupported_thumbnail_sentinel_preserves_extension_so_existing_thumb_is_distinct() {
// A future scan checks both `thumb.exists()` and
// `sentinel.exists()` — they must be distinct paths.
let p = Path::new("foo.jpeg");
let s = unsupported_thumbnail_sentinel(p);
assert_ne!(s, PathBuf::from("foo.jpeg"));
assert!(s.to_string_lossy().ends_with(".unsupported"));
}
#[test]
fn unsupported_thumbnail_sentinel_handles_paths_without_extension() {
let p = Path::new("/thumbs/notes");
let s = unsupported_thumbnail_sentinel(p);
assert_eq!(s, PathBuf::from("/thumbs/notes.unsupported"));
}
/// Smoke-test update_media_counts: build a tempdir with two images
/// and one video, run the walker, and assert the gauges line up.
/// Exercises the is_image / is_video classifier on real DirEntry
/// values without needing a Prometheus registry.
#[test]
fn update_media_counts_counts_images_and_videos_in_tempdir() {
let tmp = TempDir::new().expect("tempdir");
fs::write(tmp.path().join("a.jpg"), b"").unwrap();
fs::write(tmp.path().join("b.png"), b"").unwrap();
fs::write(tmp.path().join("c.mp4"), b"").unwrap();
fs::write(tmp.path().join("notes.txt"), b"").unwrap();
// Reset gauges first in case another test mutated them — the
// gauges are process-global statics.
IMAGE_GAUGE.set(0);
VIDEO_GAUGE.set(0);
update_media_counts(tmp.path(), &[]);
assert_eq!(IMAGE_GAUGE.get(), 2, "jpg + png");
assert_eq!(VIDEO_GAUGE.get(), 1, "mp4");
}
}

View File

@@ -1,7 +1,7 @@
use crate::database::PreviewDao;
use crate::is_video;
use crate::libraries::Library;
use crate::otel::global_tracer;
use crate::thumbnails::is_video;
use crate::video::ffmpeg::{generate_preview_clip, get_duration_seconds_blocking};
use actix::prelude::*;
use log::{debug, error, info, trace, warn};

View File

@@ -1,6 +1,6 @@
use crate::otel::global_tracer;
use crate::thumbnails::{is_video, update_media_counts};
use crate::video::ffmpeg::{Ffmpeg, GifType};
use crate::{is_video, update_media_counts};
use log::info;
use opentelemetry::trace::Tracer;
use std::fs;

951
src/watcher.rs Normal file
View File

@@ -0,0 +1,951 @@
//! Background file-watcher loop + the orphaned-playlist cleanup job.
//!
//! `watch_files` spins a thread that, on every tick (default 60 s
//! quick-scan / 3600 s full-scan), probes each library's availability,
//! drains the unhashed / date / face-detection backlogs via
//! [`crate::backfill`], walks newly-modified files through
//! [`process_new_files`], updates the media-count gauges, and runs the
//! three-stage maintenance pipeline (missing-file scan → back-ref
//! refresh → orphan GC).
//!
//! `cleanup_orphaned_playlists` runs on a slower interval (default 24
//! hours) and reaps HLS playlists whose source videos no longer exist
//! in any library. Both jobs respect [`crate::libraries::LibraryHealthMap`]
//! — a stale library skips destructive paths so transient unmounts
//! don't trigger data loss.
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use actix::Addr;
use chrono::Utc;
use log::{debug, error, info, warn};
use walkdir::WalkDir;
use crate::backfill;
use crate::content_hash;
use crate::database::models::InsertImageExif;
use crate::database::{ExifDao, PreviewDao, SqliteExifDao, SqlitePreviewDao};
use crate::date_resolver;
use crate::exif;
use crate::face_watch;
use crate::faces;
use crate::file_types;
use crate::libraries;
use crate::library_maintenance;
use crate::perceptual_hash;
use crate::tags;
use crate::tags::SqliteTagDao;
use crate::thumbnails;
use crate::video;
use crate::video::actors::{GeneratePreviewClipMessage, QueueVideosMessage, VideoPlaylistManager};
/// Clean up orphaned HLS playlists and segments whose source videos no longer exist
pub fn cleanup_orphaned_playlists(
libs: Vec<libraries::Library>,
excluded_dirs: Vec<String>,
library_health: libraries::LibraryHealthMap,
) {
std::thread::spawn(move || {
let video_path = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
// Get cleanup interval from environment (default: 24 hours)
let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(86400); // 24 hours
info!("Starting orphaned playlist cleanup job");
info!(" Cleanup interval: {} seconds", cleanup_interval_secs);
info!(" Playlist directory: {}", video_path);
for lib in &libs {
info!(
" Checking sources under '{}' at {}",
lib.name, lib.root_path
);
}
loop {
std::thread::sleep(Duration::from_secs(cleanup_interval_secs));
// Safety gate: skip the cleanup cycle if any library is
// stale. A missing source video on a stale library is
// indistinguishable from a transient unmount, and the
// cleanup is destructive — we'd rather leak a few playlist
// files for a tick than delete one whose source is briefly
// unreachable. The cycle re-runs on the next interval.
{
let guard = library_health.read().unwrap_or_else(|e| e.into_inner());
let stale: Vec<String> = libs
.iter()
.filter(|lib| guard.get(&lib.id).map(|h| !h.is_online()).unwrap_or(false))
.map(|lib| lib.name.clone())
.collect();
if !stale.is_empty() {
warn!(
"Skipping orphaned-playlist cleanup: {} library(ies) stale: [{}]",
stale.len(),
stale.join(", ")
);
continue;
}
}
info!("Running orphaned playlist cleanup");
let start = std::time::Instant::now();
let mut deleted_count = 0;
let mut error_count = 0;
// Find all .m3u8 files in VIDEO_PATH
let playlists: Vec<PathBuf> = WalkDir::new(&video_path)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.filter(|e| {
e.path()
.extension()
.and_then(|s| s.to_str())
.map(|ext| ext.eq_ignore_ascii_case("m3u8"))
.unwrap_or(false)
})
.map(|e| e.path().to_path_buf())
.collect();
info!("Found {} playlist files to check", playlists.len());
for playlist_path in playlists {
// Extract the original video filename from playlist name
// Playlist format: {VIDEO_PATH}/{original_filename}.m3u8
if let Some(filename) = playlist_path.file_stem() {
let video_filename = filename.to_string_lossy();
// Search for this video file across every configured
// library, respecting EXCLUDED_DIRS so we don't
// false-resurrect playlists for videos that only
// exist inside an excluded subtree. As soon as one
// library has a matching source, we're done — the
// playlist isn't orphaned.
let mut video_exists = false;
'libs: for lib in &libs {
let effective = lib.effective_excluded_dirs(&excluded_dirs);
for entry in image_api::file_scan::walk_library_files(
Path::new(&lib.root_path),
&effective,
) {
if let Some(entry_stem) = entry.path().file_stem()
&& entry_stem == filename
&& file_types::is_video_file(entry.path())
{
video_exists = true;
break 'libs;
}
}
}
if !video_exists {
debug!(
"Source video for playlist {} no longer exists, deleting",
playlist_path.display()
);
// Delete the playlist file
if let Err(e) = std::fs::remove_file(&playlist_path) {
warn!(
"Failed to delete playlist {}: {}",
playlist_path.display(),
e
);
error_count += 1;
} else {
deleted_count += 1;
// Also try to delete associated .ts segment files
// They are typically named {filename}N.ts in the same directory
if let Some(parent_dir) = playlist_path.parent() {
for entry in WalkDir::new(parent_dir)
.max_depth(1)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
{
let entry_path = entry.path();
if let Some(ext) = entry_path.extension()
&& ext.eq_ignore_ascii_case("ts")
{
// Check if this .ts file belongs to our playlist
if let Some(ts_stem) = entry_path.file_stem() {
let ts_name = ts_stem.to_string_lossy();
if ts_name.starts_with(&*video_filename) {
if let Err(e) = std::fs::remove_file(entry_path) {
debug!(
"Failed to delete segment {}: {}",
entry_path.display(),
e
);
} else {
debug!(
"Deleted segment: {}",
entry_path.display()
);
}
}
}
}
}
}
}
}
}
}
info!(
"Orphaned playlist cleanup completed in {:?}: deleted {} playlists, {} errors",
start.elapsed(),
deleted_count,
error_count
);
}
});
}
pub fn watch_files(
libs: Vec<libraries::Library>,
playlist_manager: Addr<VideoPlaylistManager>,
preview_generator: Addr<video::actors::PreviewClipGenerator>,
face_client: crate::ai::face_client::FaceClient,
excluded_dirs: Vec<String>,
library_health: libraries::LibraryHealthMap,
) {
std::thread::spawn(move || {
// Get polling intervals from environment variables
// Quick scan: Check recently modified files (default: 60 seconds)
let quick_interval_secs = dotenv::var("WATCH_QUICK_INTERVAL_SECONDS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(60);
// Full scan: Check all files regardless of modification time (default: 3600 seconds = 1 hour)
let full_interval_secs = dotenv::var("WATCH_FULL_INTERVAL_SECONDS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(3600);
info!("Starting optimized file watcher");
info!(" Quick scan interval: {} seconds", quick_interval_secs);
info!(" Full scan interval: {} seconds", full_interval_secs);
// Surface face-detection state at boot so it's obvious whether
// the watcher will hit Apollo. The branch silently no-ops when
// disabled (intentional for legacy deploys), which makes "why
// aren't faces being detected?" hard to diagnose otherwise.
if face_client.is_enabled() {
info!(" Face detection: ENABLED");
} else {
info!(
" Face detection: DISABLED (set APOLLO_FACE_API_BASE_URL \
or APOLLO_API_BASE_URL to enable)"
);
}
for lib in &libs {
info!(
" Watching library '{}' (id={}) at {}",
lib.name, lib.id, lib.root_path
);
}
// Create DAOs for tracking processed files
let exif_dao = Arc::new(Mutex::new(
Box::new(SqliteExifDao::new()) as Box<dyn ExifDao>
));
let preview_dao = Arc::new(Mutex::new(
Box::new(SqlitePreviewDao::new()) as Box<dyn PreviewDao>
));
let face_dao = Arc::new(Mutex::new(
Box::new(faces::SqliteFaceDao::new()) as Box<dyn faces::FaceDao>
));
// tag_dao for the watcher's auto-bind path. Independent of the
// request-handler tag_dao instance — both end up pointing at the
// same SQLite file via SqliteTagDao::default().
let watcher_tag_dao = Arc::new(Mutex::new(
Box::new(SqliteTagDao::default()) as Box<dyn tags::TagDao>
));
let mut last_quick_scan = SystemTime::now();
let mut last_full_scan = SystemTime::now();
let mut scan_count = 0u64;
// Per-library cursor for the missing-file scan. Each tick reads
// a page from `offset`, stat()s the rows, deletes confirmed-
// missing ones, and advances or wraps the cursor. State held
// in-memory so a watcher restart resumes from 0 — fine, the
// sweep is idempotent.
let mut missing_file_offsets: HashMap<i32, i64> = HashMap::new();
let missing_scan_page_size: i64 = dotenv::var("IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &i64| *n > 0)
.unwrap_or(library_maintenance::DEFAULT_SCAN_PAGE_SIZE);
let missing_delete_cap: usize = dotenv::var("IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &usize| *n > 0)
.unwrap_or(library_maintenance::DEFAULT_MISSING_DELETE_CAP);
// Two-tick orphan-GC consensus state. Carried across ticks via
// `OrphanGcState`; see library_maintenance::run_orphan_gc.
let mut orphan_gc_state = library_maintenance::OrphanGcState::default();
// Initial availability sweep before the loop's first sleep so
// /libraries reports the truth from the very first request,
// rather than the optimistic Online default that
// new_health_map seeds. Without this, an unmounted share would
// appear online for up to WATCH_QUICK_INTERVAL_SECONDS (default
// 60s) after boot. Same probe logic as the per-tick gate
// below; no ingest runs here, just the health update + log.
// Disabled libraries skip the probe entirely — they should
// never enter the health map (treated as out-of-scope).
for lib in &libs {
if !lib.enabled {
continue;
}
let context = opentelemetry::Context::new();
let had_data = exif_dao
.lock()
.expect("exif_dao poisoned")
.count_for_library(&context, lib.id)
.map(|n| n > 0)
.unwrap_or(false);
libraries::refresh_health(&library_health, lib, had_data);
}
loop {
std::thread::sleep(Duration::from_secs(quick_interval_secs));
let now = SystemTime::now();
let since_last_full = now
.duration_since(last_full_scan)
.unwrap_or(Duration::from_secs(0));
let is_full_scan = since_last_full.as_secs() >= full_interval_secs;
for lib in &libs {
// Operator kill switch: a disabled library is invisible
// to the watcher entirely. No probe, no ingest, no
// maintenance, no health entry. Distinct from Stale —
// Stale is "we wanted to but couldn't"; Disabled is
// "we don't want to". Toggle via SQL.
if !lib.enabled {
debug!(
"watcher: skipping library '{}' (id={}) — enabled=false",
lib.name, lib.id
);
continue;
}
// Availability probe: every tick checks that the
// library's mount is reachable, is a directory, is
// readable, and (if image_exif has rows for it) is
// non-empty. A Stale library skips ingest, backlog
// drains, and metric refresh — reads/serving in HTTP
// handlers continue to work. Branches B/C extend the
// probe gate to cover handoff and orphan GC. See
// CLAUDE.md "Library availability and safety".
let had_data = {
let context = opentelemetry::Context::new();
let mut guard = exif_dao.lock().expect("exif_dao poisoned");
guard
.count_for_library(&context, lib.id)
.map(|n| n > 0)
.unwrap_or(false)
};
let health = libraries::refresh_health(&library_health, lib, had_data);
if !health.is_online() {
// Skip every write path for this library this tick.
// Don't refresh the media-count gauge either — a
// probe-failed library would otherwise flap to 0
// image / 0 video and pollute Prometheus.
continue;
}
// Drain the unhashed-hash backlog AND the face-detection
// backlog every tick, regardless of quick/full. Quick
// scans only walk recently-modified files, so the
// pre-Phase-3 backlog never enters their candidate set
// — without these standalone passes, backfill +
// detection only progressed during full scans
// (default once an hour).
// Effective excludes for this library: global env-var
// row's excluded_dirs. Compute once per tick — used
// by every walker below for this library.
let effective_excludes = lib.effective_excluded_dirs(&excluded_dirs);
if face_client.is_enabled() {
let context = opentelemetry::Context::new();
backfill::backfill_unhashed_backlog(&context, lib, &exif_dao);
backfill::process_face_backlog(
&context,
lib,
&face_client,
&face_dao,
&watcher_tag_dao,
&effective_excludes,
);
}
// Date-taken backfill: drain rows whose canonical date is
// either unresolved or only fs_time-sourced. Independent
// of face detection — runs even on deploys that don't
// configure Apollo, since `/memories` depends on it.
{
let context = opentelemetry::Context::new();
backfill::backfill_missing_date_taken(&context, lib, &exif_dao);
}
if is_full_scan {
info!(
"Running full scan for library '{}' (scan #{})",
lib.name, scan_count
);
process_new_files(
lib,
Arc::clone(&exif_dao),
Arc::clone(&preview_dao),
Arc::clone(&face_dao),
Arc::clone(&watcher_tag_dao),
face_client.clone(),
&effective_excludes,
None,
playlist_manager.clone(),
preview_generator.clone(),
);
} else {
debug!(
"Running quick scan for library '{}' (checking files modified in last {} seconds)",
lib.name,
quick_interval_secs + 10
);
let check_since = last_quick_scan
.checked_sub(Duration::from_secs(10))
.unwrap_or(last_quick_scan);
process_new_files(
lib,
Arc::clone(&exif_dao),
Arc::clone(&preview_dao),
Arc::clone(&face_dao),
Arc::clone(&watcher_tag_dao),
face_client.clone(),
&effective_excludes,
Some(check_since),
playlist_manager.clone(),
preview_generator.clone(),
);
}
// Update media counts per library (metric aggregates across all)
thumbnails::update_media_counts(Path::new(&lib.root_path), &effective_excludes);
// Missing-file detection: prune image_exif rows whose
// source file is no longer on disk. Per-library, so we
// pass library-online-this-tick implicitly (we only
// reach here if the probe gate at the top of the
// iteration passed). Capped + paginated so a huge
// library doesn't stall the watcher; rows we don't
// visit this tick get visited next tick. See
// library_maintenance::detect_missing_files_for_library.
{
let context = opentelemetry::Context::new();
let offset = missing_file_offsets.get(&lib.id).copied().unwrap_or(0);
let (deleted, next_offset) =
library_maintenance::detect_missing_files_for_library(
&context,
lib,
&exif_dao,
offset,
missing_scan_page_size,
missing_delete_cap,
);
missing_file_offsets.insert(lib.id, next_offset);
if deleted > 0 {
debug!(
"missing-file scan: library '{}' next_offset={}",
lib.name, next_offset
);
}
}
}
// Reconciliation: cross-library, so it runs once per tick
// outside the per-library loop. Idempotent — fast no-op when
// there's nothing to do. Operates on the database alone, no
// filesystem dependency, so it doesn't need a health gate.
// See database::reconcile and CLAUDE.md "Multi-library data
// model" for the rules.
{
let mut conn = image_api::database::connect();
let _ = image_api::database::reconcile::run(&mut conn);
// Back-ref refresh: hash-keyed rows whose
// (library_id, rel_path) tuple no longer matches any
// image_exif row but whose hash still does. After a
// recent→archive move, the missing-file scan removes
// the old image_exif row; this pass repoints face /
// tag / insight back-refs at the surviving location.
// DB-only, no health gate needed — uses what's in
// image_exif as truth.
let _ = library_maintenance::refresh_back_refs(&mut conn);
// Orphan GC: the destructive end of the maintenance
// pipeline. Two-tick consensus + every-library-online
// requirement is enforced inside run_orphan_gc; we
// pass the current all-online flag and the function
// tracks the previous tick's flag in OrphanGcState.
let all_online = library_maintenance::all_libraries_online(&libs, &library_health);
let _ =
library_maintenance::run_orphan_gc(&mut conn, &mut orphan_gc_state, all_online);
}
if is_full_scan {
last_full_scan = now;
}
last_quick_scan = now;
scan_count += 1;
}
});
}
/// Check if a playlist needs to be (re)generated.
///
/// Returns true if:
/// - Playlist doesn't exist, OR
/// - Source video is newer than the playlist
///
/// When metadata for either path is unreadable, returns true so the
/// caller errs on the side of regeneration (a redundant transcode
/// beats a stale playlist).
pub fn playlist_needs_generation(video_path: &Path, playlist_path: &Path) -> bool {
if !playlist_path.exists() {
return true;
}
// Check if source video is newer than playlist
if let (Ok(video_meta), Ok(playlist_meta)) = (
std::fs::metadata(video_path),
std::fs::metadata(playlist_path),
) && let (Ok(video_modified), Ok(playlist_modified)) =
(video_meta.modified(), playlist_meta.modified())
{
return video_modified > playlist_modified;
}
// If we can't determine, assume it needs generation
true
}
pub fn process_new_files(
library: &libraries::Library,
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
face_dao: Arc<Mutex<Box<dyn faces::FaceDao>>>,
tag_dao: Arc<Mutex<Box<dyn tags::TagDao>>>,
face_client: crate::ai::face_client::FaceClient,
excluded_dirs: &[String],
modified_since: Option<SystemTime>,
playlist_manager: Addr<VideoPlaylistManager>,
preview_generator: Addr<video::actors::PreviewClipGenerator>,
) {
let context = opentelemetry::Context::new();
let thumbs = dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
let thumbnail_directory = Path::new(&thumbs);
let base_path = Path::new(&library.root_path);
// Walk, prune EXCLUDED_DIRS subtrees, and apply image/video + modified_since
// filters. See `file_scan` for why exclusion has to happen at WalkDir
// time (filter_entry) rather than at face-detect time.
let files: Vec<(PathBuf, String)> =
image_api::file_scan::enumerate_indexable_files(base_path, excluded_dirs, modified_since);
if files.is_empty() {
debug!("No files to process");
return;
}
debug!("Found {} files to check", files.len());
// Batch query: Get all EXIF data for these files in one query
let file_paths: Vec<String> = files.iter().map(|(_, rel_path)| rel_path.clone()).collect();
let existing_exif_paths: HashMap<String, bool> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
// Walk is per-library, so scope the lookup so a same-named file
// in another library doesn't make this one look already-indexed.
match dao.get_exif_batch(&context, Some(library.id), &file_paths) {
Ok(exif_records) => exif_records
.into_iter()
.map(|record| (record.file_path, true))
.collect(),
Err(e) => {
error!("Error batch querying EXIF data: {:?}", e);
HashMap::new()
}
}
};
let mut new_files_found = false;
let mut files_needing_row = Vec::new();
// Register every image/video file in image_exif. Rows without EXIF
// still carry library_id, rel_path, content_hash, and size_bytes so
// derivative dedup and DB-indexed sort/filter work for every file,
// not just photos with parseable EXIF.
for (file_path, relative_path) in &files {
// Check both the library-scoped legacy path (current shape) and
// the bare-legacy path (pre-multi-library shape). Either one
// existing means a thumbnail is already on disk for this file.
let scoped_thumb_path = content_hash::library_scoped_legacy_path(
thumbnail_directory,
library.id,
relative_path,
);
let bare_legacy_thumb_path = thumbnail_directory.join(relative_path);
let needs_thumbnail = !scoped_thumb_path.exists()
&& !bare_legacy_thumb_path.exists()
&& !thumbnails::unsupported_thumbnail_sentinel(&scoped_thumb_path).exists()
&& !thumbnails::unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists();
let needs_row = !existing_exif_paths.contains_key(relative_path);
if needs_thumbnail || needs_row {
new_files_found = true;
if needs_thumbnail {
info!("New file detected (missing thumbnail): {}", relative_path);
}
if needs_row {
files_needing_row.push((file_path.clone(), relative_path.clone()));
}
}
}
if !files_needing_row.is_empty() {
info!(
"Registering {} new files in image_exif",
files_needing_row.len()
);
for (file_path, relative_path) in files_needing_row {
let timestamp = Utc::now().timestamp();
// Hash + size from filesystem metadata — always attempted so
// every file gets a content_hash, even when EXIF is absent.
let (content_hash, size_bytes) = match content_hash::compute(&file_path) {
Ok(id) => (Some(id.content_hash), Some(id.size_bytes)),
Err(e) => {
warn!("Failed to hash {}: {:?}", file_path.display(), e);
(None, None)
}
};
// Perceptual hashes (pHash + dHash). Best-effort — None for
// videos and decode failures. Drives near-duplicate detection
// in the Apollo duplicates surface; failure here is non-fatal
// and never blocks indexing.
let perceptual = perceptual_hash::compute(&file_path);
// EXIF is best-effort enrichment. When extraction fails (or the
// file type doesn't support EXIF) we still store a row with all
// EXIF fields NULL; the file remains visible to sort-by-date
// and tag queries via its rel_path and filesystem timestamps.
let exif_fields = if exif::supports_exif(&file_path) {
match exif::extract_exif_from_path(&file_path) {
Ok(data) => Some(data),
Err(e) => {
debug!(
"No EXIF or parse error for {}: {:?}",
file_path.display(),
e
);
None
}
}
} else {
None
};
// Canonical date_taken via the waterfall — kamadak-exif (already
// computed above) → exiftool fallback for videos / MakerNote /
// QuickTime → filename regex → earliest_fs_time. Source is
// recorded so the per-tick backfill drain can re-run weak
// resolutions later.
let resolved_date = date_resolver::resolve_date_taken(
&file_path,
exif_fields.as_ref().and_then(|e| e.date_taken),
);
let insert_exif = InsertImageExif {
library_id: library.id,
file_path: relative_path.clone(),
camera_make: exif_fields.as_ref().and_then(|e| e.camera_make.clone()),
camera_model: exif_fields.as_ref().and_then(|e| e.camera_model.clone()),
lens_model: exif_fields.as_ref().and_then(|e| e.lens_model.clone()),
width: exif_fields.as_ref().and_then(|e| e.width),
height: exif_fields.as_ref().and_then(|e| e.height),
orientation: exif_fields.as_ref().and_then(|e| e.orientation),
gps_latitude: exif_fields
.as_ref()
.and_then(|e| e.gps_latitude.map(|v| v as f32)),
gps_longitude: exif_fields
.as_ref()
.and_then(|e| e.gps_longitude.map(|v| v as f32)),
gps_altitude: exif_fields
.as_ref()
.and_then(|e| e.gps_altitude.map(|v| v as f32)),
focal_length: exif_fields
.as_ref()
.and_then(|e| e.focal_length.map(|v| v as f32)),
aperture: exif_fields
.as_ref()
.and_then(|e| e.aperture.map(|v| v as f32)),
shutter_speed: exif_fields.as_ref().and_then(|e| e.shutter_speed.clone()),
iso: exif_fields.as_ref().and_then(|e| e.iso),
date_taken: resolved_date.map(|r| r.timestamp),
created_time: timestamp,
last_modified: timestamp,
content_hash,
size_bytes,
phash_64: perceptual.map(|h| h.phash_64),
dhash_64: perceptual.map(|h| h.dhash_64),
date_taken_source: resolved_date.map(|r| r.source.as_str().to_string()),
};
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
if let Err(e) = dao.store_exif(&context, insert_exif) {
error!(
"Failed to register {} in image_exif: {:?}",
relative_path, e
);
} else {
debug!("Registered {} in image_exif", relative_path);
}
}
}
// ── Face detection pass ────────────────────────────────────────────
// Run after EXIF writes so newly-registered files have their
// content_hash populated. Skipped wholesale when face_client is
// disabled (no Apollo integration configured) — Phase 3 wires this
// up; the watcher remains usable on legacy deploys.
if face_client.is_enabled() {
// Opportunistic content_hash backfill: photos indexed before
// content-hashing landed (or where the hash compute failed
// silently on insert) end up in image_exif with NULL
// content_hash. build_face_candidates keys on content_hash, so
// those files would never become candidates without backfill.
// Idempotent — subsequent scans see the populated hashes and
// no-op. The dedicated `backfill_hashes` binary is still the
// right tool for very large legacy libraries; this branch
// ensures small/medium deploys self-heal without operator
// action.
backfill::backfill_missing_content_hashes(&context, &files, library, &exif_dao);
let candidates =
backfill::build_face_candidates(&context, library, &files, &exif_dao, &face_dao);
debug!(
"face_watch: scan tick — {} image file(s) walked, {} candidate(s) (library '{}', modified_since={})",
files
.iter()
.filter(|(p, _)| !file_types::is_video_file(p))
.count(),
candidates.len(),
library.name,
modified_since.is_some(),
);
if !candidates.is_empty() {
face_watch::run_face_detection_pass(
library,
excluded_dirs,
&face_client,
Arc::clone(&face_dao),
Arc::clone(&tag_dao),
candidates,
);
}
}
// Check for videos that need HLS playlists
let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
let mut videos_needing_playlists = Vec::new();
for (file_path, _relative_path) in &files {
if file_types::is_video_file(file_path) {
// Construct expected playlist path
let playlist_filename =
format!("{}.m3u8", file_path.file_name().unwrap().to_string_lossy());
let playlist_path = Path::new(&video_path_base).join(&playlist_filename);
// Check if playlist needs (re)generation
if playlist_needs_generation(file_path, &playlist_path) {
videos_needing_playlists.push(file_path.clone());
}
}
}
// Send queue request to playlist manager
if !videos_needing_playlists.is_empty() {
playlist_manager.do_send(QueueVideosMessage {
video_paths: videos_needing_playlists,
});
}
// Check for videos that need preview clips
// Collect (full_path, relative_path) for video files
let video_files: Vec<(String, String)> = files
.iter()
.filter(|(file_path, _)| file_types::is_video_file(file_path))
.map(|(file_path, rel_path)| (file_path.to_string_lossy().to_string(), rel_path.clone()))
.collect();
if !video_files.is_empty() {
// Query DB using relative paths (consistent with how GET/POST handlers store them)
let video_rel_paths: Vec<String> = video_files.iter().map(|(_, rel)| rel.clone()).collect();
let existing_previews: HashMap<String, String> = {
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
match dao.get_previews_batch(&context, &video_rel_paths) {
Ok(clips) => clips
.into_iter()
.map(|clip| (clip.file_path, clip.status))
.collect(),
Err(e) => {
error!("Error batch querying preview clips: {:?}", e);
HashMap::new()
}
}
};
for (full_path, relative_path) in &video_files {
let status = existing_previews.get(relative_path).map(|s| s.as_str());
let needs_preview = match status {
None => true, // No record at all
Some("failed") => true, // Retry failed
Some("pending") => true, // Stale pending from previous run
_ => false, // processing or complete
};
if needs_preview {
// Insert pending record using relative path
if status.is_none() {
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.insert_preview(&context, relative_path, "pending");
}
// Send full path in the message — the actor will derive relative path from it
preview_generator.do_send(GeneratePreviewClipMessage {
video_path: full_path.clone(),
});
}
}
}
// Generate thumbnails for all files that need them
if new_files_found {
info!("Processing thumbnails for new files...");
thumbnails::create_thumbnails(std::slice::from_ref(library), excluded_dirs);
}
// Reconciliation: on a full scan, prune image_exif rows whose rel_path no
// longer exists on disk for this library. Keeps the DB in parity so
// downstream DB-backed listings (e.g. recursive /photos) don't return
// phantom files. Skipped on quick scans — those only look at recently
// modified files and can't distinguish "missing" from "unchanged".
if modified_since.is_none() {
let disk_paths: HashSet<String> = files.iter().map(|(_, rel)| rel.clone()).collect();
let db_paths: Vec<String> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_rel_paths_for_library(&context, library.id)
.unwrap_or_else(|e| {
error!(
"Reconciliation: failed to load image_exif rel_paths for lib {}: {:?}",
library.id, e
);
Vec::new()
})
};
let stale: Vec<String> = db_paths
.into_iter()
.filter(|p| !disk_paths.contains(p))
.collect();
if !stale.is_empty() {
info!(
"Reconciliation: pruning {} stale image_exif rows for library '{}'",
stale.len(),
library.name
);
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
for rel in &stale {
if let Err(e) = dao.delete_exif_by_library(&context, library.id, rel) {
warn!(
"Reconciliation: failed to delete {} (lib {}): {:?}",
rel, library.id, e
);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::thread::sleep;
use std::time::Duration as StdDuration;
use tempfile::TempDir;
#[test]
fn playlist_needs_generation_true_when_playlist_missing() {
let tmp = TempDir::new().unwrap();
let video = tmp.path().join("clip.mp4");
fs::write(&video, b"v").unwrap();
let playlist = tmp.path().join("clip.mp4.m3u8");
// playlist does not exist
assert!(playlist_needs_generation(&video, &playlist));
}
#[test]
fn playlist_needs_generation_false_when_playlist_is_newer() {
let tmp = TempDir::new().unwrap();
let video = tmp.path().join("clip.mp4");
fs::write(&video, b"v").unwrap();
// Sleep to guarantee a distinct mtime for the playlist created next.
// Many filesystems have ~10 ms mtime resolution; 50 ms is plenty.
sleep(StdDuration::from_millis(50));
let playlist = tmp.path().join("clip.mp4.m3u8");
fs::write(&playlist, b"#EXTM3U").unwrap();
assert!(!playlist_needs_generation(&video, &playlist));
}
#[test]
fn playlist_needs_generation_true_when_video_is_newer() {
let tmp = TempDir::new().unwrap();
let playlist = tmp.path().join("clip.mp4.m3u8");
fs::write(&playlist, b"#EXTM3U").unwrap();
sleep(StdDuration::from_millis(50));
let video = tmp.path().join("clip.mp4");
fs::write(&video, b"v").unwrap();
assert!(playlist_needs_generation(&video, &playlist));
}
#[test]
fn playlist_needs_generation_true_when_video_missing_metadata() {
// Video doesn't exist; metadata fails for it. Falls through to the
// "assume needs regeneration" branch.
let tmp = TempDir::new().unwrap();
let video = tmp.path().join("missing.mp4");
let playlist = tmp.path().join("missing.mp4.m3u8");
fs::write(&playlist, b"#EXTM3U").unwrap();
assert!(playlist_needs_generation(&video, &playlist));
}
}