diff --git a/src/backfill.rs b/src/backfill.rs new file mode 100644 index 0000000..ad225ae --- /dev/null +++ b/src/backfill.rs @@ -0,0 +1,721 @@ +//! Per-tick drains the watcher runs alongside ingest. +//! +//! These passes were previously inlined in `main.rs`; they exist because +//! a quick scan only walks recently-modified files, so any backlog of +//! rows missing a `content_hash` / `date_taken` / face detection +//! wouldn't otherwise drain except during the once-an-hour full scan. +//! Each function is bounded per call by a `*_PER_TICK` env-var cap. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +use log::{debug, info, warn}; + +use crate::content_hash; +use crate::database::ExifDao; +use crate::date_resolver; +use crate::face_watch; +use crate::faces; +use crate::file_types; +use crate::libraries; +use crate::tags; + +/// Compute and persist content_hash for image_exif rows where it's NULL. +/// +/// Bounded per call by `FACE_HASH_BACKFILL_MAX_PER_TICK` (default 2000) +/// so a watcher tick on a large legacy library doesn't block for hours +/// blake3-ing every photo at once. Subsequent scans pick up the rest. +/// For 50k+ libraries the dedicated `cargo run --bin backfill_hashes` +/// is still faster (it doesn't fight a watcher loop for the DAO mutex). +/// +/// Drains unhashed image_exif rows by querying them directly, independent +/// of the filesystem walk. Quick scans only walk recently-modified files, +/// so a backlog of pre-existing unhashed rows never enters +/// `process_new_files`'s candidate set — left alone, it would only drain +/// on full scans (default once an hour). Calling this every tick keeps +/// the face-detection backlog moving regardless. +/// +/// Returns the number of rows successfully backfilled this pass. +pub fn backfill_unhashed_backlog( + context: &opentelemetry::Context, + library: &libraries::Library, + exif_dao: &Arc>>, +) -> usize { + let cap: i64 = dotenv::var("FACE_HASH_BACKFILL_MAX_PER_TICK") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &i64| *n > 0) + .unwrap_or(2000); + + // Fetch up to cap+1 rows so we can tell "more remain" without a + // separate count query. Across libraries — there's no per-library + // filter on get_rows_missing_hash today — but we only ever update + // rows whose library_id matches the caller's library, so other + // libraries' rows just get skipped here and picked up on the next + // library's tick. Negligible cost given the cap. + let rows: Vec<(i32, String)> = { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + dao.get_rows_missing_hash(context, cap + 1) + .unwrap_or_default() + }; + if rows.is_empty() { + return 0; + } + + let more_than_cap = rows.len() as i64 > cap; + let base_path = std::path::Path::new(&library.root_path); + + let mut backfilled = 0usize; + let mut errors = 0usize; + let mut skipped_other_lib = 0usize; + for (lib_id, rel_path) in rows.iter().take(cap as usize) { + if *lib_id != library.id { + skipped_other_lib += 1; + continue; + } + let abs = base_path.join(rel_path); + if !abs.exists() { + // File walked away — the watcher's reconciliation pass will + // remove the orphan exif row eventually. + continue; + } + match content_hash::compute(&abs) { + Ok(id) => { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + if let Err(e) = dao.backfill_content_hash( + context, + library.id, + rel_path, + &id.content_hash, + id.size_bytes, + ) { + warn!( + "face_watch: backfill_content_hash failed for {}: {:?}", + rel_path, e + ); + errors += 1; + } else { + backfilled += 1; + } + } + Err(e) => { + debug!( + "face_watch: hash compute failed for {} ({:?})", + abs.display(), + e + ); + errors += 1; + } + } + } + + if backfilled > 0 || errors > 0 || more_than_cap { + info!( + "face_watch: backfill pass for library '{}': hashed {} ({} error(s), {} skipped to other libraries; {} cap, more_remain={})", + library.name, backfilled, errors, skipped_other_lib, cap, more_than_cap + ); + } + backfilled +} + +/// Drain image_exif rows whose `date_taken` was never resolved or was +/// resolved by the weakest fallback (`fs_time`). Runs the canonical-date +/// waterfall — exiftool batch (one subprocess for the whole tick's +/// rows) → filename regex → earliest_fs_time — and persists each +/// resolution with its source tag. Capped per tick by +/// `DATE_BACKFILL_MAX_PER_TICK` (default 500) so a 14k-row library +/// drains over a few quick-scan ticks without blocking the watcher. +/// +/// kamadak-exif is intentionally skipped here: the row already has a +/// NULL date_taken because the ingest path's kamadak-exif call returned +/// nothing, and re-running it would just produce the same answer. +/// exiftool is the meaningful new attempt — it handles videos and +/// MakerNote-hosted dates kamadak can't reach. +pub fn backfill_missing_date_taken( + context: &opentelemetry::Context, + library: &libraries::Library, + exif_dao: &Arc>>, +) -> usize { + let cap: i64 = dotenv::var("DATE_BACKFILL_MAX_PER_TICK") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &i64| *n > 0) + .unwrap_or(500); + + let rows: Vec<(i32, String)> = { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + dao.get_rows_needing_date_backfill(context, library.id, cap + 1) + .unwrap_or_default() + }; + if rows.is_empty() { + return 0; + } + + let more_than_cap = rows.len() as i64 > cap; + let base_path = std::path::Path::new(&library.root_path); + + // Build absolute paths and drop rows whose files no longer exist — + // the missing-file scan in library_maintenance retires deleted rows + // separately. Without this filter, NULL-date rows for missing files + // would loop through the drain forever (no source can resolve them). + let mut existing: Vec<(String, PathBuf)> = Vec::with_capacity(rows.len()); + for (_, rel_path) in rows.iter().take(cap as usize) { + let abs = base_path.join(rel_path); + if abs.exists() { + existing.push((rel_path.clone(), abs)); + } + } + if existing.is_empty() { + return 0; + } + + // One exiftool subprocess for the whole batch; the resolver falls + // through to filename / fs_time per file when exiftool can't supply + // a date (or isn't installed at all). + let paths: Vec = existing.iter().map(|(_, p)| p.clone()).collect(); + let resolved = date_resolver::resolve_dates_batch(&paths, &HashMap::new()); + + let mut backfilled = 0usize; + let mut unresolved = 0usize; + let mut by_source: HashMap<&'static str, usize> = HashMap::new(); + { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + for (rel_path, abs) in &existing { + let Some(rd) = resolved.get(abs).copied() else { + unresolved += 1; + continue; + }; + match dao.backfill_date_taken( + context, + library.id, + rel_path, + rd.timestamp, + rd.source.as_str(), + ) { + Ok(()) => { + backfilled += 1; + *by_source.entry(rd.source.as_str()).or_insert(0) += 1; + } + Err(e) => { + warn!( + "date_backfill: update failed for lib {} {}: {:?}", + library.id, rel_path, e + ); + } + } + } + } + + if backfilled > 0 || unresolved > 0 || more_than_cap { + info!( + "date_backfill: library '{}': resolved {} ({:?}), {} unresolved, cap={}, more_remain={}", + library.name, backfilled, by_source, unresolved, cap, more_than_cap + ); + } + backfilled +} + +/// Per-tick face-detection drain. Pulls a capped batch of hashed-but- +/// unscanned image_exif rows directly via the FaceDao anti-join and +/// hands them to the existing detection pass. Runs on every tick (not +/// just full scans) so the backlog moves at quick-scan cadence. +pub fn process_face_backlog( + context: &opentelemetry::Context, + library: &libraries::Library, + face_client: &crate::ai::face_client::FaceClient, + face_dao: &Arc>>, + tag_dao: &Arc>>, + excluded_dirs: &[String], +) { + let cap: i64 = dotenv::var("FACE_BACKLOG_MAX_PER_TICK") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &i64| *n > 0) + .unwrap_or(64); + + let rows: Vec<(String, String)> = { + let mut dao = face_dao.lock().expect("face dao"); + match dao.list_unscanned_candidates(context, library.id, cap) { + Ok(r) => r, + Err(e) => { + warn!( + "face_watch: list_unscanned_candidates failed for library '{}': {:?}", + library.name, e + ); + return; + } + } + }; + if rows.is_empty() { + return; + } + + info!( + "face_watch: backlog drain — running detection on {} candidate(s) for library '{}' (cap={})", + rows.len(), + library.name, + cap + ); + + let candidates: Vec = rows + .into_iter() + .map(|(rel_path, content_hash)| face_watch::FaceCandidate { + rel_path, + content_hash, + }) + .collect(); + + face_watch::run_face_detection_pass( + library, + excluded_dirs, + face_client, + Arc::clone(face_dao), + Arc::clone(tag_dao), + candidates, + ); +} + +/// Compute content_hash for any image rows the walker just touched +/// whose stored EXIF row is still hash-less. Called from +/// `process_new_files` so freshly-ingested files don't have to wait for +/// the next standalone `backfill_unhashed_backlog` tick before face +/// detection can key on their bytes. +/// +/// Cap is on **successes only**. An earlier version counted errors too, +/// so a pocket of chronically-unhashable files at the front of the +/// table (vanished mid-scan, permission denied, etc.) burned the budget +/// every tick and the rest of the backlog never advanced. +pub fn backfill_missing_content_hashes( + context: &opentelemetry::Context, + files: &[(PathBuf, String)], + library: &libraries::Library, + exif_dao: &Arc>>, +) { + let image_paths: Vec = files + .iter() + .filter(|(p, _)| !file_types::is_video_file(p)) + .map(|(_, rel)| rel.clone()) + .collect(); + if image_paths.is_empty() { + return; + } + + let exif_records = { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + dao.get_exif_batch(context, Some(library.id), &image_paths) + .unwrap_or_default() + }; + // Cheap lookup back from rel_path → absolute file_path so + // content_hash::compute can read the bytes. + let path_by_rel: HashMap = + files.iter().map(|(p, rel)| (rel.clone(), p)).collect(); + + let cap: usize = dotenv::var("FACE_HASH_BACKFILL_MAX_PER_TICK") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &usize| *n > 0) + .unwrap_or(2000); + + // Count the unhashed backlog up front so we can surface "still needs + // backfill: N" in the log — without it, a face-scan that's stuck at + // 44% looks stalled when really it's chipping through hashes. + let unhashed_total = exif_records + .iter() + .filter(|r| r.content_hash.is_none()) + .count(); + + let mut backfilled = 0usize; + let mut errors = 0usize; + for record in &exif_records { + if backfilled >= cap { + break; + } + if record.content_hash.is_some() { + continue; + } + let Some(file_path) = path_by_rel.get(&record.file_path) else { + // Walked file went missing between the directory scan and now; + // next tick will retry naturally. + continue; + }; + match content_hash::compute(file_path) { + Ok(id) => { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + if let Err(e) = dao.backfill_content_hash( + context, + library.id, + &record.file_path, + &id.content_hash, + id.size_bytes, + ) { + warn!( + "face_watch: backfill_content_hash failed for {}: {:?}", + record.file_path, e + ); + errors += 1; + } else { + backfilled += 1; + } + } + Err(e) => { + debug!( + "face_watch: hash compute failed for {} ({:?})", + file_path.display(), + e + ); + errors += 1; + } + } + } + // Always log when there's an unhashed backlog so an operator + // looking at "scan stuck at 44%" can see backfill is running and + // how much remains. Quiet only when there's nothing to do. + if unhashed_total > 0 || backfilled > 0 || errors > 0 { + let remaining = unhashed_total.saturating_sub(backfilled); + info!( + "face_watch: backfilled {}/{} content_hash for library '{}' ({} error(s); {} still need backfill; cap={})", + backfilled, unhashed_total, library.name, errors, remaining, cap + ); + } +} + +/// Build the face-detection candidate list for a scan tick. +/// +/// Returns `(rel_path, content_hash)` for every image file that has a +/// content_hash recorded in image_exif but no row in face_detections +/// yet. Re-querying image_exif here picks up rows the EXIF write loop +/// just inserted alongside any pre-existing rows the watcher walked +/// over — covers both new uploads and the initial backlog scan. +pub fn build_face_candidates( + context: &opentelemetry::Context, + library: &libraries::Library, + files: &[(PathBuf, String)], + exif_dao: &Arc>>, + face_dao: &Arc>>, +) -> Vec { + // Restrict to image files; videos aren't face-scanned in v1 (kamadak + // doesn't even register them in image_exif). + let image_paths: Vec = files + .iter() + .filter(|(p, _)| !file_types::is_video_file(p)) + .map(|(_, rel)| rel.clone()) + .collect(); + if image_paths.is_empty() { + return Vec::new(); + } + + let exif_records = { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + dao.get_exif_batch(context, Some(library.id), &image_paths) + .unwrap_or_default() + }; + // rel_path → content_hash (only rows with a hash; without one we have + // nothing to key face data against). + let mut hash_by_path: HashMap = HashMap::with_capacity(exif_records.len()); + for record in exif_records { + if let Some(h) = record.content_hash { + hash_by_path.insert(record.file_path, h); + } + } + + let mut candidates = Vec::new(); + let mut dao = face_dao.lock().expect("face dao"); + for rel_path in image_paths { + let Some(hash) = hash_by_path.get(&rel_path) else { + continue; + }; + match dao.already_scanned(context, hash) { + Ok(true) => continue, + Ok(false) => candidates.push(face_watch::FaceCandidate { + rel_path, + content_hash: hash.clone(), + }), + Err(e) => { + warn!("face_watch: already_scanned errored for {}: {:?}", hash, e); + } + } + } + candidates +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::fs; + use std::sync::{Arc, Mutex}; + + use diesel::prelude::*; + use tempfile::TempDir; + + use crate::database::models::{InsertImageExif, InsertLibrary}; + use crate::database::test::in_memory_db_connection; + use crate::database::{ExifDao, SqliteExifDao, schema}; + use crate::faces::{FaceDao, SqliteFaceDao}; + use crate::libraries::Library; + + fn ctx() -> opentelemetry::Context { + opentelemetry::Context::new() + } + + /// Build a tempdir-backed library + DAOs sharing a single in-memory + /// SQLite connection (so cross-table joins like + /// `list_unscanned_candidates` see consistent state). + fn setup() -> ( + TempDir, + Library, + Arc>, + Arc>>, + Arc>>, + ) { + let tmp = TempDir::new().expect("tempdir"); + let mut conn = in_memory_db_connection(); + // Migration seeds library id=1 with a placeholder root; rewrite it + // to point at the tempdir so `/` resolves to real + // files this test creates. + diesel::update(schema::libraries::table.filter(schema::libraries::id.eq(1))) + .set(schema::libraries::root_path.eq(tmp.path().to_string_lossy().to_string())) + .execute(&mut conn) + .expect("rewrite library 1 root"); + // Add a second library so cross-library skip cases have somewhere + // to put their rows. + diesel::insert_into(schema::libraries::table) + .values(InsertLibrary { + name: "other", + root_path: "/tmp/other-test-lib", + created_at: 0, + enabled: true, + excluded_dirs: None, + }) + .execute(&mut conn) + .expect("seed second library"); + + let library = Library { + id: 1, + name: "main".to_string(), + root_path: tmp.path().to_string_lossy().to_string(), + enabled: true, + excluded_dirs: Vec::new(), + }; + let shared = Arc::new(Mutex::new(conn)); + let exif_dao: Arc>> = Arc::new(Mutex::new(Box::new( + SqliteExifDao::from_shared(Arc::clone(&shared)), + ))); + let face_dao: Arc>> = Arc::new(Mutex::new(Box::new( + SqliteFaceDao::from_connection(Arc::clone(&shared)), + ))); + (tmp, library, shared, exif_dao, face_dao) + } + + fn insert_exif( + exif_dao: &Arc>>, + lib_id: i32, + rel: &str, + content_hash: Option<&str>, + ) { + let mut dao = exif_dao.lock().unwrap(); + dao.store_exif( + &ctx(), + InsertImageExif { + library_id: lib_id, + file_path: rel.to_string(), + camera_make: None, + camera_model: None, + lens_model: None, + width: None, + height: None, + orientation: None, + gps_latitude: None, + gps_longitude: None, + gps_altitude: None, + focal_length: None, + aperture: None, + shutter_speed: None, + iso: None, + date_taken: None, + created_time: 0, + last_modified: 0, + content_hash: content_hash.map(|s| s.to_string()), + size_bytes: None, + phash_64: None, + dhash_64: None, + date_taken_source: None, + }, + ) + .expect("insert"); + } + + fn write_image(root: &std::path::Path, rel: &str, bytes: &[u8]) { + let abs = root.join(rel); + if let Some(parent) = abs.parent() { + fs::create_dir_all(parent).expect("mkdir"); + } + fs::write(abs, bytes).expect("write file"); + } + + #[test] + fn backfill_unhashed_backlog_hashes_missing_rows_in_this_library() { + let (tmp, library, _conn, exif_dao, _face_dao) = setup(); + write_image(tmp.path(), "a.jpg", b"alpha-bytes"); + write_image(tmp.path(), "b.jpg", b"bravo-bytes"); + insert_exif(&exif_dao, 1, "a.jpg", None); + insert_exif(&exif_dao, 1, "b.jpg", None); + + let backfilled = backfill_unhashed_backlog(&ctx(), &library, &exif_dao); + assert_eq!(backfilled, 2); + + let mut dao = exif_dao.lock().unwrap(); + let rows = dao + .get_exif_batch(&ctx(), Some(1), &["a.jpg".to_string(), "b.jpg".to_string()]) + .unwrap(); + assert_eq!(rows.len(), 2); + for r in rows { + assert!( + r.content_hash.is_some(), + "row {} should have a hash", + r.file_path + ); + } + } + + #[test] + fn backfill_unhashed_backlog_skips_other_libraries_and_missing_files() { + let (tmp, library, _conn, exif_dao, _face_dao) = setup(); + write_image(tmp.path(), "exists.jpg", b"hello"); + // Row for this library whose file is missing on disk: + insert_exif(&exif_dao, 1, "ghost.jpg", None); + insert_exif(&exif_dao, 1, "exists.jpg", None); + // Row in the other library — must be skipped (different lib_id). + insert_exif(&exif_dao, 2, "other.jpg", None); + + let backfilled = backfill_unhashed_backlog(&ctx(), &library, &exif_dao); + assert_eq!(backfilled, 1, "only the existing in-library file hashes"); + + let mut dao = exif_dao.lock().unwrap(); + let other = dao + .get_exif_batch(&ctx(), Some(2), &["other.jpg".to_string()]) + .unwrap(); + assert_eq!(other.len(), 1); + assert!( + other[0].content_hash.is_none(), + "other-library row must remain unhashed" + ); + let ghost = dao + .get_exif_batch(&ctx(), Some(1), &["ghost.jpg".to_string()]) + .unwrap(); + assert_eq!(ghost.len(), 1); + assert!( + ghost[0].content_hash.is_none(), + "missing-on-disk row stays unhashed (reconciliation removes it later)" + ); + } + + #[test] + fn backfill_unhashed_backlog_respects_per_tick_cap() { + // Env-var-driven cap; the function reads it on every call, so we + // can set it just for this test and unset before returning. + // Serial guard: tests in the same binary may share env, but each + // backfill call re-reads — and we only care that the cap shape + // (success count <= cap, more_remain logged) holds. + unsafe { + std::env::set_var("FACE_HASH_BACKFILL_MAX_PER_TICK", "2"); + } + let (tmp, library, _conn, exif_dao, _face_dao) = setup(); + for i in 0..5 { + let rel = format!("img_{}.jpg", i); + write_image(tmp.path(), &rel, format!("bytes-{}", i).as_bytes()); + insert_exif(&exif_dao, 1, &rel, None); + } + + let backfilled = backfill_unhashed_backlog(&ctx(), &library, &exif_dao); + assert_eq!(backfilled, 2, "cap=2 must bound the per-tick successes"); + unsafe { + std::env::remove_var("FACE_HASH_BACKFILL_MAX_PER_TICK"); + } + } + + #[test] + fn backfill_missing_content_hashes_skips_videos_and_hashed_rows() { + let (tmp, library, _conn, exif_dao, _face_dao) = setup(); + // Two image rows (one already hashed, one not), one video. + write_image(tmp.path(), "fresh.jpg", b"fresh-pixels"); + write_image(tmp.path(), "already.jpg", b"already-pixels"); + write_image(tmp.path(), "clip.mp4", b"video-bytes"); + insert_exif(&exif_dao, 1, "fresh.jpg", None); + insert_exif(&exif_dao, 1, "already.jpg", Some("pre-existing-hash")); + insert_exif(&exif_dao, 1, "clip.mp4", None); + + let files: Vec<(PathBuf, String)> = vec![ + (tmp.path().join("fresh.jpg"), "fresh.jpg".to_string()), + (tmp.path().join("already.jpg"), "already.jpg".to_string()), + (tmp.path().join("clip.mp4"), "clip.mp4".to_string()), + ]; + backfill_missing_content_hashes(&ctx(), &files, &library, &exif_dao); + + let mut dao = exif_dao.lock().unwrap(); + let rows = dao + .get_exif_batch( + &ctx(), + Some(1), + &[ + "fresh.jpg".to_string(), + "already.jpg".to_string(), + "clip.mp4".to_string(), + ], + ) + .unwrap(); + let by_path: HashMap> = rows + .into_iter() + .map(|r| (r.file_path, r.content_hash)) + .collect(); + assert!( + by_path["fresh.jpg"].is_some(), + "fresh image must get a hash" + ); + assert_eq!( + by_path["already.jpg"].as_deref(), + Some("pre-existing-hash"), + "already-hashed image left untouched" + ); + assert!( + by_path["clip.mp4"].is_none(), + "video skipped (not face-scanned, no hash needed via this path)" + ); + } + + #[test] + fn build_face_candidates_filters_videos_unhashed_and_already_scanned() { + let (tmp, library, _conn, exif_dao, face_dao) = setup(); + + // Seed image_exif with: hashed unscanned, hashed scanned, unhashed, + // and a video. Files don't need to exist on disk — the function + // doesn't read them, only the DB rows. + insert_exif(&exif_dao, 1, "fresh.jpg", Some("hash-fresh")); + insert_exif(&exif_dao, 1, "scanned.jpg", Some("hash-scanned")); + insert_exif(&exif_dao, 1, "unhashed.jpg", None); + insert_exif(&exif_dao, 1, "clip.mp4", Some("hash-video")); + // Mark `scanned.jpg`'s hash as already detected. + { + let mut dao = face_dao.lock().unwrap(); + dao.mark_status(&ctx(), 1, "hash-scanned", "scanned.jpg", "no_faces", "test") + .expect("mark scanned"); + } + + let files: Vec<(PathBuf, String)> = vec![ + (tmp.path().join("fresh.jpg"), "fresh.jpg".to_string()), + (tmp.path().join("scanned.jpg"), "scanned.jpg".to_string()), + (tmp.path().join("unhashed.jpg"), "unhashed.jpg".to_string()), + (tmp.path().join("clip.mp4"), "clip.mp4".to_string()), + ]; + let candidates = build_face_candidates(&ctx(), &library, &files, &exif_dao, &face_dao); + + assert_eq!( + candidates.len(), + 1, + "exactly fresh.jpg should be a candidate" + ); + assert_eq!(candidates[0].rel_path, "fresh.jpg"); + assert_eq!(candidates[0].content_hash, "hash-fresh"); + } +} diff --git a/src/database/mod.rs b/src/database/mod.rs index 2e919f6..32b6ab5 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -797,6 +797,15 @@ impl SqliteExifDao { connection: Arc::new(Mutex::new(conn)), } } + + /// Test-only constructor that shares an already-wrapped connection. + /// Required when another DAO (e.g. `SqliteFaceDao`) needs to read + /// rows this DAO writes, so cross-table joins resolve against the + /// same in-memory SQLite instance. + #[cfg(test)] + pub fn from_shared(connection: Arc>) -> Self { + SqliteExifDao { connection } + } } impl ExifDao for SqliteExifDao { diff --git a/src/files.rs b/src/files.rs index 90d6eca..b7b035c 100644 --- a/src/files.rs +++ b/src/files.rs @@ -10,6 +10,7 @@ use std::path::{Path, PathBuf}; use std::sync::Mutex; use std::time::SystemTime; +use crate::AppState; use crate::data::{ Claims, ExifBatchRequest, ExifBatchResponse, ExifSummary, FilesRequest, FilterMode, MediaType, PhotosResponse, SortType, @@ -18,8 +19,8 @@ use crate::database::ExifDao; use crate::file_types; use crate::geo::{gps_bounding_box, haversine_distance}; use crate::memories::extract_date_from_filename; +use crate::thumbnails::create_thumbnails; use crate::utils::earliest_fs_time; -use crate::{AppState, create_thumbnails}; use actix_web::web::Data; use actix_web::{ HttpRequest, HttpResponse, diff --git a/src/lib.rs b/src/lib.rs index 187d743..04ebc54 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ pub mod state; pub mod tags; #[cfg(test)] pub mod testhelpers; +pub mod thumbnails; pub mod utils; pub mod video; @@ -39,20 +40,3 @@ pub mod video; pub use data::{Claims, ThumbnailRequest}; pub use database::{connect, schema}; pub use state::AppState; - -// Stub functions for modules that reference main.rs -// These are not used by cleanup_files binary -use std::path::Path; -use walkdir::DirEntry; - -pub fn create_thumbnails(_libs: &[libraries::Library], _excluded_dirs: &[String]) { - // Stub - implemented in main.rs -} - -pub fn update_media_counts(_media_dir: &Path, _excluded_dirs: &[String]) { - // Stub - implemented in main.rs -} - -pub fn is_video(entry: &DirEntry) -> bool { - file_types::direntry_is_video(entry) -} diff --git a/src/main.rs b/src/main.rs index 373513f..9fcd96f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,8 +9,6 @@ use actix_web::web::Data; use actix_web_prom::PrometheusMetricsBuilder; use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; use futures::stream::StreamExt; -use lazy_static::lazy_static; -use prometheus::{self, IntGauge}; use std::error::Error; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; @@ -23,7 +21,7 @@ use std::{ io::ErrorKind, path::{Path, PathBuf}, }; -use walkdir::{DirEntry, WalkDir}; +use walkdir::WalkDir; use actix_cors::Cors; use actix_files::NamedFile; @@ -35,7 +33,6 @@ use actix_web::{ }; use chrono::Utc; use diesel::sqlite::Sqlite; -use rayon::prelude::*; use urlencoding::decode; use crate::ai::InsightGenerator; @@ -52,8 +49,7 @@ use crate::state::AppState; use crate::tags::*; use crate::video::actors::{ GeneratePreviewClipMessage, ProcessMessage, QueueVideosMessage, ScanDirectoryMessage, - VideoPlaylistManager, create_playlist, generate_image_thumbnail_ffmpeg, - generate_video_thumbnail, + VideoPlaylistManager, create_playlist, }; use log::{debug, error, info, trace, warn}; use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; @@ -61,6 +57,7 @@ use opentelemetry::{KeyValue, global}; mod ai; mod auth; +mod backfill; mod content_hash; mod data; mod database; @@ -70,6 +67,7 @@ mod error; mod exif; mod face_watch; mod faces; +mod file_scan; mod file_types; mod files; mod geo; @@ -78,6 +76,7 @@ mod library_maintenance; mod perceptual_hash; mod state; mod tags; +mod thumbnails; mod utils; mod video; @@ -89,19 +88,6 @@ mod service; #[cfg(test)] mod testhelpers; -lazy_static! { - static ref IMAGE_GAUGE: IntGauge = IntGauge::new( - "imageserver_image_total", - "Count of the images on the server" - ) - .unwrap(); - static ref VIDEO_GAUGE: IntGauge = IntGauge::new( - "imageserver_video_total", - "Count of the videos on the server" - ) - .unwrap(); -} - pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); #[get("/image")] @@ -1575,190 +1561,6 @@ async fn delete_favorite( } } -/// Sentinel path written next to a would-be thumbnail when a file cannot be -/// decoded by either the `image` crate or ffmpeg. Its presence causes future -/// scans to skip the file instead of re-logging the failure. -pub fn unsupported_thumbnail_sentinel(thumb_path: &Path) -> PathBuf { - let mut s = thumb_path.as_os_str().to_owned(); - s.push(".unsupported"); - PathBuf::from(s) -} - -fn generate_image_thumbnail(src: &Path, thumb_path: &Path) -> std::io::Result<()> { - // The `image` crate doesn't auto-apply EXIF Orientation on load, and - // saving back out as JPEG drops EXIF entirely — so without baking the - // rotation into the pixels here, browsers see the raw landscape buffer - // of a portrait phone shot and render it sideways. Read once up front - // and apply to whichever decode branch we end up taking. - let orientation = exif::read_orientation(src).unwrap_or(1); - - // RAW formats (ARW/NEF/CR2/etc): try the file's embedded JPEG preview - // first. Avoids ffmpeg choking on proprietary RAW compression (Sony ARW - // in particular), and is faster than decoding RAW pixels anyway. - if let Some(preview) = exif::extract_embedded_jpeg_preview(src) { - let img = image::load_from_memory(&preview).map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("decode embedded preview {:?}: {}", src, e), - ) - })?; - let img = exif::apply_orientation(img, orientation); - let scaled = img.thumbnail(200, u32::MAX); - scaled - .save_with_format(thumb_path, image::ImageFormat::Jpeg) - .map_err(|e| std::io::Error::other(format!("save {:?}: {}", thumb_path, e)))?; - return Ok(()); - } - - if file_types::needs_ffmpeg_thumbnail(src) { - return generate_image_thumbnail_ffmpeg(src, thumb_path); - } - - let img = image::open(src).map_err(|e| { - std::io::Error::new(std::io::ErrorKind::InvalidData, format!("{:?}: {}", src, e)) - })?; - let img = exif::apply_orientation(img, orientation); - let scaled = img.thumbnail(200, u32::MAX); - scaled - .save(thumb_path) - .map_err(|e| std::io::Error::other(format!("save {:?}: {}", thumb_path, e)))?; - Ok(()) -} - -fn create_thumbnails(libs: &[libraries::Library], excluded_dirs: &[String]) { - let tracer = global_tracer(); - let span = tracer.start("creating thumbnails"); - - let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined"); - let thumbnail_directory: &Path = Path::new(thumbs); - - for lib in libs { - info!( - "Scanning thumbnails for library '{}' at {}", - lib.name, lib.root_path - ); - let images = PathBuf::from(&lib.root_path); - // Effective excludes = global env-var excludes ∪ library row's - // excluded_dirs. Lets a parent-library mount skip the subtree - // already covered by a child library. - let effective_excludes = lib.effective_excluded_dirs(excluded_dirs); - - // Prune EXCLUDED_DIRS so we don't generate thumbnails-of-thumbnails - // for Synology @eaDir trees. file_scan handles filter_entry pruning. - image_api::file_scan::walk_library_files(&images, &effective_excludes) - .into_par_iter() - .for_each(|entry| { - let src = entry.path(); - let Ok(relative_path) = src.strip_prefix(&images) else { - return; - }; - // Library-scoped legacy path: prevents two libraries with - // the same rel_path from clobbering each other's thumbs. - // Hash-keyed promotion happens lazily on first hash-aware - // request — keeping this loop ExifDao-free preserves the - // current "cargo build && go" startup story. - let thumb_path = content_hash::library_scoped_legacy_path( - thumbnail_directory, - lib.id, - relative_path, - ); - let bare_legacy = thumbnail_directory.join(relative_path); - - // Backwards-compat check: if a single-library install has a - // bare-legacy thumb here already, accept it as present. - // Same for the sentinel. Means we don't redo work after - // upgrade and we don't leave stale duplicates around. - if thumb_path.exists() - || bare_legacy.exists() - || unsupported_thumbnail_sentinel(&thumb_path).exists() - || unsupported_thumbnail_sentinel(&bare_legacy).exists() - { - return; - } - - let Some(parent) = thumb_path.parent() else { - return; - }; - if let Err(e) = std::fs::create_dir_all(parent) { - error!("Failed to create thumbnail dir {:?}: {}", parent, e); - return; - } - - if is_video(&entry) { - let mut video_span = tracer.start_with_context( - "generate_video_thumbnail", - &opentelemetry::Context::new() - .with_remote_span_context(span.span_context().clone()), - ); - video_span.set_attributes(vec![ - KeyValue::new("type", "video"), - KeyValue::new("file-name", thumb_path.display().to_string()), - KeyValue::new("library", lib.name.clone()), - ]); - - debug!("Generating video thumbnail: {:?}", thumb_path); - if let Err(e) = generate_video_thumbnail(src, &thumb_path) { - let sentinel = unsupported_thumbnail_sentinel(&thumb_path); - error!( - "Unable to thumbnail video {:?}: {}. Writing sentinel {:?}", - src, e, sentinel - ); - if let Err(se) = std::fs::write(&sentinel, b"") { - warn!("Failed to write sentinel {:?}: {}", sentinel, se); - } - } - video_span.end(); - } else if is_image(&entry) { - match generate_image_thumbnail(src, &thumb_path) { - Ok(_) => info!("Saved thumbnail: {:?}", thumb_path), - Err(e) => { - let sentinel = unsupported_thumbnail_sentinel(&thumb_path); - error!( - "Unable to thumbnail {:?}: {}. Writing sentinel {:?}", - src, e, sentinel - ); - if let Err(se) = std::fs::write(&sentinel, b"") { - warn!("Failed to write sentinel {:?}: {}", sentinel, se); - } - } - } - } - }); - } - - debug!("Finished making thumbnails"); - - for lib in libs { - let effective_excludes = lib.effective_excluded_dirs(excluded_dirs); - update_media_counts(Path::new(&lib.root_path), &effective_excludes); - } -} - -fn update_media_counts(media_dir: &Path, excluded_dirs: &[String]) { - let mut image_count = 0; - let mut video_count = 0; - for entry in image_api::file_scan::walk_library_files(media_dir, excluded_dirs) { - if is_image(&entry) { - image_count += 1; - } else if is_video(&entry) { - video_count += 1; - } - } - - IMAGE_GAUGE.set(image_count); - VIDEO_GAUGE.set(video_count); -} - -fn is_image(entry: &DirEntry) -> bool { - use image_api::file_types; - file_types::direntry_is_image(entry) -} - -fn is_video(entry: &DirEntry) -> bool { - use image_api::file_types; - file_types::direntry_is_video(entry) -} - fn main() -> std::io::Result<()> { if let Err(err) = dotenv::dotenv() { println!("Error parsing .env {:?}", err); @@ -1791,7 +1593,7 @@ fn main() -> std::io::Result<()> { let libs = app_data.libraries.clone(); let excluded = app_data.excluded_dirs.clone(); std::thread::spawn(move || { - create_thumbnails(&libs, &excluded); + thumbnails::create_thumbnails(&libs, &excluded); }); } // generate_video_gifs().await; @@ -1804,11 +1606,11 @@ fn main() -> std::io::Result<()> { prometheus .registry - .register(Box::new(IMAGE_GAUGE.clone())) + .register(Box::new(thumbnails::IMAGE_GAUGE.clone())) .unwrap(); prometheus .registry - .register(Box::new(VIDEO_GAUGE.clone())) + .register(Box::new(thumbnails::VIDEO_GAUGE.clone())) .unwrap(); let app_state = app_data.clone(); @@ -2379,8 +2181,8 @@ fn watch_files( if face_client.is_enabled() { let context = opentelemetry::Context::new(); - backfill_unhashed_backlog(&context, lib, &exif_dao); - process_face_backlog( + backfill::backfill_unhashed_backlog(&context, lib, &exif_dao); + backfill::process_face_backlog( &context, lib, &face_client, @@ -2396,7 +2198,7 @@ fn watch_files( // configure Apollo, since `/memories` depends on it. { let context = opentelemetry::Context::new(); - backfill_missing_date_taken(&context, lib, &exif_dao); + backfill::backfill_missing_date_taken(&context, lib, &exif_dao); } if is_full_scan { @@ -2440,7 +2242,7 @@ fn watch_files( } // Update media counts per library (metric aggregates across all) - update_media_counts(Path::new(&lib.root_path), &effective_excludes); + thumbnails::update_media_counts(Path::new(&lib.root_path), &effective_excludes); // Missing-file detection: prune image_exif rows whose // source file is no longer on disk. Per-library, so we @@ -2602,8 +2404,8 @@ fn process_new_files( let bare_legacy_thumb_path = thumbnail_directory.join(relative_path); let needs_thumbnail = !scoped_thumb_path.exists() && !bare_legacy_thumb_path.exists() - && !unsupported_thumbnail_sentinel(&scoped_thumb_path).exists() - && !unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists(); + && !thumbnails::unsupported_thumbnail_sentinel(&scoped_thumb_path).exists() + && !thumbnails::unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists(); let needs_row = !existing_exif_paths.contains_key(relative_path); if needs_thumbnail || needs_row { @@ -2738,8 +2540,9 @@ fn process_new_files( // right tool for very large legacy libraries; this branch // ensures small/medium deploys self-heal without operator // action. - backfill_missing_content_hashes(&context, &files, library, &exif_dao); - let candidates = build_face_candidates(&context, library, &files, &exif_dao, &face_dao); + backfill::backfill_missing_content_hashes(&context, &files, library, &exif_dao); + let candidates = + backfill::build_face_candidates(&context, library, &files, &exif_dao, &face_dao); debug!( "face_watch: scan tick — {} image file(s) walked, {} candidate(s) (library '{}', modified_since={})", files.iter().filter(|(p, _)| !is_video_file(p)).count(), @@ -2837,7 +2640,7 @@ fn process_new_files( // Generate thumbnails for all files that need them if new_files_found { info!("Processing thumbnails for new files..."); - create_thumbnails(std::slice::from_ref(library), excluded_dirs); + thumbnails::create_thumbnails(std::slice::from_ref(library), excluded_dirs); } // Reconciliation: on a full scan, prune image_exif rows whose rel_path no @@ -2883,419 +2686,6 @@ fn process_new_files( } } -/// Compute and persist content_hash for image_exif rows where it's NULL. -/// -/// Bounded per call by `FACE_HASH_BACKFILL_MAX_PER_TICK` (default 500) so -/// a watcher tick on a large legacy library doesn't block for hours -/// blake3-ing every photo at once. Subsequent scans pick up the rest. -/// For 50k+ libraries the dedicated `cargo run --bin backfill_hashes` -/// is still faster (it doesn't fight a watcher loop for the DAO mutex). -/// Drain unhashed image_exif rows by querying them directly, independent -/// of the filesystem walk. Quick scans only walk recently-modified -/// files, so a backlog of pre-existing unhashed rows never enters -/// `process_new_files`'s candidate set — left alone, it would only -/// drain on full scans (default once an hour). Calling this every tick -/// keeps the face-detection backlog moving regardless. -/// -/// Returns the number of rows successfully backfilled this pass. -fn backfill_unhashed_backlog( - context: &opentelemetry::Context, - library: &libraries::Library, - exif_dao: &Arc>>, -) -> usize { - let cap: i64 = dotenv::var("FACE_HASH_BACKFILL_MAX_PER_TICK") - .ok() - .and_then(|s| s.parse().ok()) - .filter(|n: &i64| *n > 0) - .unwrap_or(2000); - - // Fetch up to cap+1 rows so we can tell "more remain" without a - // separate count query. Across libraries — there's no per-library - // filter on get_rows_missing_hash today — but we only ever update - // rows whose library_id matches the caller's library, so other - // libraries' rows just get skipped here and picked up on the next - // library's tick. Negligible cost given the cap. - let rows: Vec<(i32, String)> = { - let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); - dao.get_rows_missing_hash(context, cap + 1) - .unwrap_or_default() - }; - if rows.is_empty() { - return 0; - } - - let more_than_cap = rows.len() as i64 > cap; - let base_path = std::path::Path::new(&library.root_path); - - let mut backfilled = 0usize; - let mut errors = 0usize; - let mut skipped_other_lib = 0usize; - for (lib_id, rel_path) in rows.iter().take(cap as usize) { - if *lib_id != library.id { - skipped_other_lib += 1; - continue; - } - let abs = base_path.join(rel_path); - if !abs.exists() { - // File walked away — the watcher's reconciliation pass will - // remove the orphan exif row eventually. - continue; - } - match content_hash::compute(&abs) { - Ok(id) => { - let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); - if let Err(e) = dao.backfill_content_hash( - context, - library.id, - rel_path, - &id.content_hash, - id.size_bytes, - ) { - warn!( - "face_watch: backfill_content_hash failed for {}: {:?}", - rel_path, e - ); - errors += 1; - } else { - backfilled += 1; - } - } - Err(e) => { - debug!( - "face_watch: hash compute failed for {} ({:?})", - abs.display(), - e - ); - errors += 1; - } - } - } - - if backfilled > 0 || errors > 0 || more_than_cap { - info!( - "face_watch: backfill pass for library '{}': hashed {} ({} error(s), {} skipped to other libraries; {} cap, more_remain={})", - library.name, backfilled, errors, skipped_other_lib, cap, more_than_cap - ); - } - backfilled -} - -/// Drain image_exif rows whose `date_taken` was never resolved or was -/// resolved by the weakest fallback (`fs_time`). Runs the canonical-date -/// waterfall — exiftool batch (one subprocess for the whole tick's -/// rows) → filename regex → earliest_fs_time — and persists each -/// resolution with its source tag. Capped per tick by -/// `DATE_BACKFILL_MAX_PER_TICK` (default 500) so a 14k-row library -/// drains over a few quick-scan ticks without blocking the watcher. -/// -/// kamadak-exif is intentionally skipped here: the row already has a -/// NULL date_taken because the ingest path's kamadak-exif call returned -/// nothing, and re-running it would just produce the same answer. -/// exiftool is the meaningful new attempt — it handles videos and -/// MakerNote-hosted dates kamadak can't reach. -fn backfill_missing_date_taken( - context: &opentelemetry::Context, - library: &libraries::Library, - exif_dao: &Arc>>, -) -> usize { - let cap: i64 = dotenv::var("DATE_BACKFILL_MAX_PER_TICK") - .ok() - .and_then(|s| s.parse().ok()) - .filter(|n: &i64| *n > 0) - .unwrap_or(500); - - let rows: Vec<(i32, String)> = { - let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); - dao.get_rows_needing_date_backfill(context, library.id, cap + 1) - .unwrap_or_default() - }; - if rows.is_empty() { - return 0; - } - - let more_than_cap = rows.len() as i64 > cap; - let base_path = std::path::Path::new(&library.root_path); - - // Build absolute paths and drop rows whose files no longer exist — - // the missing-file scan in library_maintenance retires deleted rows - // separately. Without this filter, NULL-date rows for missing files - // would loop through the drain forever (no source can resolve them). - let mut existing: Vec<(String, PathBuf)> = Vec::with_capacity(rows.len() as usize); - for (_, rel_path) in rows.iter().take(cap as usize) { - let abs = base_path.join(rel_path); - if abs.exists() { - existing.push((rel_path.clone(), abs)); - } - } - if existing.is_empty() { - return 0; - } - - // One exiftool subprocess for the whole batch; the resolver falls - // through to filename / fs_time per file when exiftool can't supply - // a date (or isn't installed at all). - let paths: Vec = existing.iter().map(|(_, p)| p.clone()).collect(); - let resolved = date_resolver::resolve_dates_batch(&paths, &HashMap::new()); - - let mut backfilled = 0usize; - let mut unresolved = 0usize; - let mut by_source: HashMap<&'static str, usize> = HashMap::new(); - { - let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); - for (rel_path, abs) in &existing { - let Some(rd) = resolved.get(abs).copied() else { - unresolved += 1; - continue; - }; - match dao.backfill_date_taken( - context, - library.id, - rel_path, - rd.timestamp, - rd.source.as_str(), - ) { - Ok(()) => { - backfilled += 1; - *by_source.entry(rd.source.as_str()).or_insert(0) += 1; - } - Err(e) => { - warn!( - "date_backfill: update failed for lib {} {}: {:?}", - library.id, rel_path, e - ); - } - } - } - } - - if backfilled > 0 || unresolved > 0 || more_than_cap { - info!( - "date_backfill: library '{}': resolved {} ({:?}), {} unresolved, cap={}, more_remain={}", - library.name, backfilled, by_source, unresolved, cap, more_than_cap - ); - } - backfilled -} - -/// Per-tick face-detection drain. Pulls a capped batch of hashed-but- -/// unscanned image_exif rows directly via the FaceDao anti-join and -/// hands them to the existing detection pass. Runs on every tick (not -/// just full scans) so the backlog moves at quick-scan cadence. -fn process_face_backlog( - context: &opentelemetry::Context, - library: &libraries::Library, - face_client: &crate::ai::face_client::FaceClient, - face_dao: &Arc>>, - tag_dao: &Arc>>, - excluded_dirs: &[String], -) { - let cap: i64 = dotenv::var("FACE_BACKLOG_MAX_PER_TICK") - .ok() - .and_then(|s| s.parse().ok()) - .filter(|n: &i64| *n > 0) - .unwrap_or(64); - - let rows: Vec<(String, String)> = { - let mut dao = face_dao.lock().expect("face dao"); - match dao.list_unscanned_candidates(context, library.id, cap) { - Ok(r) => r, - Err(e) => { - warn!( - "face_watch: list_unscanned_candidates failed for library '{}': {:?}", - library.name, e - ); - return; - } - } - }; - if rows.is_empty() { - return; - } - - info!( - "face_watch: backlog drain — running detection on {} candidate(s) for library '{}' (cap={})", - rows.len(), - library.name, - cap - ); - - let candidates: Vec = rows - .into_iter() - .map(|(rel_path, content_hash)| face_watch::FaceCandidate { - rel_path, - content_hash, - }) - .collect(); - - face_watch::run_face_detection_pass( - library, - excluded_dirs, - face_client, - Arc::clone(face_dao), - Arc::clone(tag_dao), - candidates, - ); -} - -fn backfill_missing_content_hashes( - context: &opentelemetry::Context, - files: &[(PathBuf, String)], - library: &libraries::Library, - exif_dao: &Arc>>, -) { - let image_paths: Vec = files - .iter() - .filter(|(p, _)| !is_video_file(p)) - .map(|(_, rel)| rel.clone()) - .collect(); - if image_paths.is_empty() { - return; - } - - let exif_records = { - let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); - dao.get_exif_batch(context, Some(library.id), &image_paths) - .unwrap_or_default() - }; - // Cheap lookup back from rel_path → absolute file_path so - // content_hash::compute can read the bytes. - let path_by_rel: HashMap = - files.iter().map(|(p, rel)| (rel.clone(), p)).collect(); - - let cap: usize = dotenv::var("FACE_HASH_BACKFILL_MAX_PER_TICK") - .ok() - .and_then(|s| s.parse().ok()) - .filter(|n: &usize| *n > 0) - .unwrap_or(2000); - - // Count the unhashed backlog up front so we can surface "still needs - // backfill: N" in the log — without it, a face-scan that's stuck at - // 44% looks stalled when really it's chipping through hashes. - let unhashed_total = exif_records - .iter() - .filter(|r| r.content_hash.is_none()) - .count(); - - let mut backfilled = 0usize; - let mut errors = 0usize; - for record in &exif_records { - // Cap on successes only — earlier this counted errors too, so a - // pocket of chronically-unhashable files at the front of the - // table (vanished mid-scan, permission denied, etc.) burned the - // budget every tick and the rest of the backlog never advanced. - // Errors are still bounded by `unhashed_total` (the loop walks - // each unhashed record at most once per tick). - if backfilled >= cap { - break; - } - if record.content_hash.is_some() { - continue; - } - let Some(file_path) = path_by_rel.get(&record.file_path) else { - // Walked file went missing between the directory scan and now; - // next tick will retry naturally. - continue; - }; - match content_hash::compute(file_path) { - Ok(id) => { - let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); - if let Err(e) = dao.backfill_content_hash( - context, - library.id, - &record.file_path, - &id.content_hash, - id.size_bytes, - ) { - warn!( - "face_watch: backfill_content_hash failed for {}: {:?}", - record.file_path, e - ); - errors += 1; - } else { - backfilled += 1; - } - } - Err(e) => { - debug!( - "face_watch: hash compute failed for {} ({:?})", - file_path.display(), - e - ); - errors += 1; - } - } - } - // Always log when there's an unhashed backlog so an operator - // looking at "scan stuck at 44%" can see backfill is running and - // how much remains. Quiet only when there's nothing to do. - if unhashed_total > 0 || backfilled > 0 || errors > 0 { - let remaining = unhashed_total.saturating_sub(backfilled); - info!( - "face_watch: backfilled {}/{} content_hash for library '{}' ({} error(s); {} still need backfill; cap={})", - backfilled, unhashed_total, library.name, errors, remaining, cap - ); - } -} - -/// Build the face-detection candidate list for a scan tick. -/// -/// We need `(rel_path, content_hash)` for every image file that has a -/// content_hash recorded in image_exif but no row in face_detections yet. -/// Re-querying image_exif here picks up rows the EXIF write loop just -/// inserted alongside any pre-existing rows the watcher walked over — -/// covers both new uploads and the initial backlog scan. -fn build_face_candidates( - context: &opentelemetry::Context, - library: &libraries::Library, - files: &[(PathBuf, String)], - exif_dao: &Arc>>, - face_dao: &Arc>>, -) -> Vec { - // Restrict to image files; videos aren't face-scanned in v1 (kamadak - // doesn't even register them in image_exif). - let image_paths: Vec = files - .iter() - .filter(|(p, _)| !is_video_file(p)) - .map(|(_, rel)| rel.clone()) - .collect(); - if image_paths.is_empty() { - return Vec::new(); - } - - let exif_records = { - let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); - dao.get_exif_batch(context, Some(library.id), &image_paths) - .unwrap_or_default() - }; - // rel_path → content_hash (only rows with a hash; without one we have - // nothing to key face data against). - let mut hash_by_path: HashMap = HashMap::with_capacity(exif_records.len()); - for record in exif_records { - if let Some(h) = record.content_hash { - hash_by_path.insert(record.file_path, h); - } - } - - let mut candidates = Vec::new(); - let mut dao = face_dao.lock().expect("face dao"); - for rel_path in image_paths { - let Some(hash) = hash_by_path.get(&rel_path) else { - continue; - }; - match dao.already_scanned(context, hash) { - Ok(true) => continue, - Ok(false) => candidates.push(face_watch::FaceCandidate { - rel_path, - content_hash: hash.clone(), - }), - Err(e) => { - warn!("face_watch: already_scanned errored for {}: {:?}", hash, e); - } - } - } - candidates -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/thumbnails.rs b/src/thumbnails.rs new file mode 100644 index 0000000..c2ae23b --- /dev/null +++ b/src/thumbnails.rs @@ -0,0 +1,275 @@ +//! Thumbnail generation + the media-count Prometheus gauges. +//! +//! Startup and per-tick scans walk each library and produce a 200×200 +//! thumbnail under `THUMBNAILS//`, falling through +//! a fast path (`image` crate), a RAW-preview path (`exif::extract_embedded_jpeg_preview`), +//! and ffmpeg for video / HEIF / NEF / ARW. Files that fail every +//! decoder get a sibling `.unsupported` sentinel so subsequent scans +//! skip them silently. + +use std::path::{Path, PathBuf}; + +use lazy_static::lazy_static; +use log::{debug, error, info, warn}; +use opentelemetry::{ + KeyValue, + trace::{Span, TraceContextExt, Tracer}, +}; +use prometheus::IntGauge; +use rayon::prelude::*; +use walkdir::DirEntry; + +use crate::content_hash; +use crate::exif; +use crate::file_types; +use crate::libraries; +use crate::otel::global_tracer; +use crate::video::actors::{generate_image_thumbnail_ffmpeg, generate_video_thumbnail}; + +lazy_static! { + pub static ref IMAGE_GAUGE: IntGauge = IntGauge::new( + "imageserver_image_total", + "Count of the images on the server" + ) + .unwrap(); + pub static ref VIDEO_GAUGE: IntGauge = IntGauge::new( + "imageserver_video_total", + "Count of the videos on the server" + ) + .unwrap(); +} + +/// Sentinel path written next to a would-be thumbnail when a file cannot be +/// decoded by either the `image` crate or ffmpeg. Its presence causes future +/// scans to skip the file instead of re-logging the failure. +pub fn unsupported_thumbnail_sentinel(thumb_path: &Path) -> PathBuf { + let mut s = thumb_path.as_os_str().to_owned(); + s.push(".unsupported"); + PathBuf::from(s) +} + +pub fn generate_image_thumbnail(src: &Path, thumb_path: &Path) -> std::io::Result<()> { + // The `image` crate doesn't auto-apply EXIF Orientation on load, and + // saving back out as JPEG drops EXIF entirely — so without baking the + // rotation into the pixels here, browsers see the raw landscape buffer + // of a portrait phone shot and render it sideways. Read once up front + // and apply to whichever decode branch we end up taking. + let orientation = exif::read_orientation(src).unwrap_or(1); + + // RAW formats (ARW/NEF/CR2/etc): try the file's embedded JPEG preview + // first. Avoids ffmpeg choking on proprietary RAW compression (Sony ARW + // in particular), and is faster than decoding RAW pixels anyway. + if let Some(preview) = exif::extract_embedded_jpeg_preview(src) { + let img = image::load_from_memory(&preview).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("decode embedded preview {:?}: {}", src, e), + ) + })?; + let img = exif::apply_orientation(img, orientation); + let scaled = img.thumbnail(200, u32::MAX); + scaled + .save_with_format(thumb_path, image::ImageFormat::Jpeg) + .map_err(|e| std::io::Error::other(format!("save {:?}: {}", thumb_path, e)))?; + return Ok(()); + } + + if file_types::needs_ffmpeg_thumbnail(src) { + return generate_image_thumbnail_ffmpeg(src, thumb_path); + } + + let img = image::open(src).map_err(|e| { + std::io::Error::new(std::io::ErrorKind::InvalidData, format!("{:?}: {}", src, e)) + })?; + let img = exif::apply_orientation(img, orientation); + let scaled = img.thumbnail(200, u32::MAX); + scaled + .save(thumb_path) + .map_err(|e| std::io::Error::other(format!("save {:?}: {}", thumb_path, e)))?; + Ok(()) +} + +pub fn create_thumbnails(libs: &[libraries::Library], excluded_dirs: &[String]) { + let tracer = global_tracer(); + let span = tracer.start("creating thumbnails"); + + let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined"); + let thumbnail_directory: &Path = Path::new(thumbs); + + for lib in libs { + info!( + "Scanning thumbnails for library '{}' at {}", + lib.name, lib.root_path + ); + let images = PathBuf::from(&lib.root_path); + // Effective excludes = global env-var excludes ∪ library row's + // excluded_dirs. Lets a parent-library mount skip the subtree + // already covered by a child library. + let effective_excludes = lib.effective_excluded_dirs(excluded_dirs); + + // Prune EXCLUDED_DIRS so we don't generate thumbnails-of-thumbnails + // for Synology @eaDir trees. file_scan handles filter_entry pruning. + crate::file_scan::walk_library_files(&images, &effective_excludes) + .into_par_iter() + .for_each(|entry| { + let src = entry.path(); + let Ok(relative_path) = src.strip_prefix(&images) else { + return; + }; + // Library-scoped legacy path: prevents two libraries with + // the same rel_path from clobbering each other's thumbs. + // Hash-keyed promotion happens lazily on first hash-aware + // request — keeping this loop ExifDao-free preserves the + // current "cargo build && go" startup story. + let thumb_path = content_hash::library_scoped_legacy_path( + thumbnail_directory, + lib.id, + relative_path, + ); + let bare_legacy = thumbnail_directory.join(relative_path); + + // Backwards-compat check: if a single-library install has a + // bare-legacy thumb here already, accept it as present. + // Same for the sentinel. Means we don't redo work after + // upgrade and we don't leave stale duplicates around. + if thumb_path.exists() + || bare_legacy.exists() + || unsupported_thumbnail_sentinel(&thumb_path).exists() + || unsupported_thumbnail_sentinel(&bare_legacy).exists() + { + return; + } + + let Some(parent) = thumb_path.parent() else { + return; + }; + if let Err(e) = std::fs::create_dir_all(parent) { + error!("Failed to create thumbnail dir {:?}: {}", parent, e); + return; + } + + if is_video(&entry) { + let mut video_span = tracer.start_with_context( + "generate_video_thumbnail", + &opentelemetry::Context::new() + .with_remote_span_context(span.span_context().clone()), + ); + video_span.set_attributes(vec![ + KeyValue::new("type", "video"), + KeyValue::new("file-name", thumb_path.display().to_string()), + KeyValue::new("library", lib.name.clone()), + ]); + + debug!("Generating video thumbnail: {:?}", thumb_path); + if let Err(e) = generate_video_thumbnail(src, &thumb_path) { + let sentinel = unsupported_thumbnail_sentinel(&thumb_path); + error!( + "Unable to thumbnail video {:?}: {}. Writing sentinel {:?}", + src, e, sentinel + ); + if let Err(se) = std::fs::write(&sentinel, b"") { + warn!("Failed to write sentinel {:?}: {}", sentinel, se); + } + } + video_span.end(); + } else if is_image(&entry) { + match generate_image_thumbnail(src, &thumb_path) { + Ok(_) => info!("Saved thumbnail: {:?}", thumb_path), + Err(e) => { + let sentinel = unsupported_thumbnail_sentinel(&thumb_path); + error!( + "Unable to thumbnail {:?}: {}. Writing sentinel {:?}", + src, e, sentinel + ); + if let Err(se) = std::fs::write(&sentinel, b"") { + warn!("Failed to write sentinel {:?}: {}", sentinel, se); + } + } + } + } + }); + } + + debug!("Finished making thumbnails"); + + for lib in libs { + let effective_excludes = lib.effective_excluded_dirs(excluded_dirs); + update_media_counts(Path::new(&lib.root_path), &effective_excludes); + } +} + +pub fn update_media_counts(media_dir: &Path, excluded_dirs: &[String]) { + let mut image_count = 0; + let mut video_count = 0; + for entry in crate::file_scan::walk_library_files(media_dir, excluded_dirs) { + if is_image(&entry) { + image_count += 1; + } else if is_video(&entry) { + video_count += 1; + } + } + + IMAGE_GAUGE.set(image_count); + VIDEO_GAUGE.set(video_count); +} + +pub fn is_image(entry: &DirEntry) -> bool { + file_types::direntry_is_image(entry) +} + +pub fn is_video(entry: &DirEntry) -> bool { + file_types::direntry_is_video(entry) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + use tempfile::TempDir; + + #[test] + fn unsupported_thumbnail_sentinel_appends_suffix() { + let p = Path::new("/thumbs/lib1/photo.jpg"); + let s = unsupported_thumbnail_sentinel(p); + assert_eq!(s, PathBuf::from("/thumbs/lib1/photo.jpg.unsupported")); + } + + #[test] + fn unsupported_thumbnail_sentinel_preserves_extension_so_existing_thumb_is_distinct() { + // A future scan checks both `thumb.exists()` and + // `sentinel.exists()` — they must be distinct paths. + let p = Path::new("foo.jpeg"); + let s = unsupported_thumbnail_sentinel(p); + assert_ne!(s, PathBuf::from("foo.jpeg")); + assert!(s.to_string_lossy().ends_with(".unsupported")); + } + + #[test] + fn unsupported_thumbnail_sentinel_handles_paths_without_extension() { + let p = Path::new("/thumbs/notes"); + let s = unsupported_thumbnail_sentinel(p); + assert_eq!(s, PathBuf::from("/thumbs/notes.unsupported")); + } + + /// Smoke-test update_media_counts: build a tempdir with two images + /// and one video, run the walker, and assert the gauges line up. + /// Exercises the is_image / is_video classifier on real DirEntry + /// values without needing a Prometheus registry. + #[test] + fn update_media_counts_counts_images_and_videos_in_tempdir() { + let tmp = TempDir::new().expect("tempdir"); + fs::write(tmp.path().join("a.jpg"), b"").unwrap(); + fs::write(tmp.path().join("b.png"), b"").unwrap(); + fs::write(tmp.path().join("c.mp4"), b"").unwrap(); + fs::write(tmp.path().join("notes.txt"), b"").unwrap(); + // Reset gauges first in case another test mutated them — the + // gauges are process-global statics. + IMAGE_GAUGE.set(0); + VIDEO_GAUGE.set(0); + + update_media_counts(tmp.path(), &[]); + + assert_eq!(IMAGE_GAUGE.get(), 2, "jpg + png"); + assert_eq!(VIDEO_GAUGE.get(), 1, "mp4"); + } +} diff --git a/src/video/actors.rs b/src/video/actors.rs index 808824b..eb539ef 100644 --- a/src/video/actors.rs +++ b/src/video/actors.rs @@ -1,7 +1,7 @@ use crate::database::PreviewDao; -use crate::is_video; use crate::libraries::Library; use crate::otel::global_tracer; +use crate::thumbnails::is_video; use crate::video::ffmpeg::{generate_preview_clip, get_duration_seconds_blocking}; use actix::prelude::*; use log::{debug, error, info, trace, warn}; diff --git a/src/video/mod.rs b/src/video/mod.rs index 8d9c3b0..8078a1f 100644 --- a/src/video/mod.rs +++ b/src/video/mod.rs @@ -1,6 +1,6 @@ use crate::otel::global_tracer; +use crate::thumbnails::{is_video, update_media_counts}; use crate::video::ffmpeg::{Ffmpeg, GifType}; -use crate::{is_video, update_media_counts}; use log::info; use opentelemetry::trace::Tracer; use std::fs;