Split main.rs: extract watcher loop into src/watcher.rs
main.rs drops from 1200 → 346 lines (90% smaller than the pre-branch 3542). What's left is the startup wiring it was always meant to be: .env, migrations, AppState construction, route registration, server bind. The four background-loop functions move into src/watcher.rs: - watch_files (310 lines) — quick/full scan tick, per-library probe, backfill drain dispatch, missing-file scan, back-ref refresh, orphan GC. - process_new_files (351 lines) — file walk → EXIF write → face-candidate build → HLS / preview-clip queueing → reconciliation. The "biggest untested chunk" from the earlier audit. - cleanup_orphaned_playlists (167 lines) — separate slower-tick thread. - playlist_needs_generation — small mtime-comparison helper. Plus 4 unit tests for playlist_needs_generation (covers missing playlist, newer playlist, newer video, video-missing-metadata fallback). main.rs's imports correspondingly shrink — Addr, HashSet, WalkDir, Utc, InsertImageExif, and the bulk of video::actors all leave with the watcher. CLAUDE.md updated to reflect the new module layout (layered architecture box + module map for the face-detection section). cargo test --bin image-api: 329 passing (no regression). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
15
CLAUDE.md
15
CLAUDE.md
@@ -76,7 +76,10 @@ cargo run --bin cleanup_files -- --base-path /path/to/media --database-url ./dat
|
|||||||
### Core Components
|
### Core Components
|
||||||
|
|
||||||
**Layered Architecture:**
|
**Layered Architecture:**
|
||||||
- **HTTP Layer** (`main.rs`): Route handlers for images, videos, metadata, tags, favorites, memories
|
- **Startup wiring** (`main.rs`): only ~350 lines — env load, migrations, AppState, route registration, server bind. Background jobs are kicked off here but defined elsewhere.
|
||||||
|
- **HTTP Layer** (`handlers/{image,video,favorites}.rs`, `files.rs`, `tags.rs`, `faces.rs`, `memories.rs`, `ai/handlers.rs`): the route handlers, grouped by domain.
|
||||||
|
- **Background loops** (`watcher.rs`): the file-watcher tick (`watch_files`, `process_new_files`) and the orphaned-playlist cleanup (`cleanup_orphaned_playlists`). Per-tick drains are factored into `backfill.rs` (`backfill_unhashed_backlog`, `backfill_missing_date_taken`, `backfill_missing_content_hashes`, `process_face_backlog`, `build_face_candidates`).
|
||||||
|
- **Thumbnails** (`thumbnails.rs`): generation pipeline + the `IMAGE_GAUGE` / `VIDEO_GAUGE` Prometheus metrics.
|
||||||
- **Auth Layer** (`auth.rs`): JWT token validation, Claims extraction via FromRequest trait
|
- **Auth Layer** (`auth.rs`): JWT token validation, Claims extraction via FromRequest trait
|
||||||
- **Service Layer** (`files.rs`, `exif.rs`, `memories.rs`): Business logic for file operations and EXIF extraction
|
- **Service Layer** (`files.rs`, `exif.rs`, `memories.rs`): Business logic for file operations and EXIF extraction
|
||||||
- **DAO Layer** (`database/mod.rs`): Trait-based data access (ExifDao, UserDao, FavoriteDao, TagDao)
|
- **DAO Layer** (`database/mod.rs`): Trait-based data access (ExifDao, UserDao, FavoriteDao, TagDao)
|
||||||
@@ -392,8 +395,8 @@ under 2021, not 2014 — on the theory that EXIF is more reliable than
|
|||||||
import-named filenames. The reverse case (no EXIF, filename has a
|
import-named filenames. The reverse case (no EXIF, filename has a
|
||||||
date) is unchanged.
|
date) is unchanged.
|
||||||
|
|
||||||
The `backfill_missing_date_taken` drain (`src/main.rs`) runs every
|
The `backfill_missing_date_taken` drain (`src/backfill.rs`) runs every
|
||||||
watcher tick alongside `backfill_unhashed_backlog`. It loads up to
|
watcher tick alongside `backfill_unhashed_backlog` (also `src/backfill.rs`). It loads up to
|
||||||
`DATE_BACKFILL_MAX_PER_TICK` rows (default 500) where
|
`DATE_BACKFILL_MAX_PER_TICK` rows (default 500) where
|
||||||
`date_taken IS NULL OR date_taken_source = 'fs_time'` (backed by the
|
`date_taken IS NULL OR date_taken_source = 'fs_time'` (backed by the
|
||||||
`idx_image_exif_date_backfill` partial index), runs the waterfall
|
`idx_image_exif_date_backfill` partial index), runs the waterfall
|
||||||
@@ -504,9 +507,9 @@ ImageApi owns the face data; Apollo (sibling repo) hosts the insightface inferen
|
|||||||
|
|
||||||
**Why content_hash and not (library_id, rel_path):** ties face data to the bytes, not the path. A backup mount that copies files from the primary library naturally inherits the existing detections without re-running inference. This is the reference implementation of the multi-library data model — see "Multi-library data model" above.
|
**Why content_hash and not (library_id, rel_path):** ties face data to the bytes, not the path. A backup mount that copies files from the primary library naturally inherits the existing detections without re-running inference. This is the reference implementation of the multi-library data model — see "Multi-library data model" above.
|
||||||
|
|
||||||
**File-watch hook** (`src/main.rs::process_new_files`): for each photo with a populated `content_hash`, check `FaceDao::already_scanned(hash)`; if not, send bytes (or embedded JPEG preview for RAW via `exif::extract_embedded_jpeg_preview`) to Apollo's `/api/internal/faces/detect`. K=`FACE_DETECT_CONCURRENCY` (default 8) parallel calls per scan tick; Apollo serializes them via its single-worker GPU pool. `face_watch.rs` is the Tokio orchestration layer.
|
**File-watch hook** (`src/watcher.rs::process_new_files`): for each photo with a populated `content_hash`, check `FaceDao::already_scanned(hash)`; if not, send bytes (or embedded JPEG preview for RAW via `exif::extract_embedded_jpeg_preview`) to Apollo's `/api/internal/faces/detect`. K=`FACE_DETECT_CONCURRENCY` (default 8) parallel calls per scan tick; Apollo serializes them via its single-worker GPU pool. `face_watch.rs` is the Tokio orchestration layer.
|
||||||
|
|
||||||
**Per-tick backlog drain** (also `src/main.rs`): two passes that run on every watcher tick regardless of quick-vs-full scan:
|
**Per-tick backlog drain** (`src/backfill.rs`): two passes that run on every watcher tick regardless of quick-vs-full scan:
|
||||||
- `backfill_unhashed_backlog` — populates `image_exif.content_hash` for photos that arrived before the hash field was retroactive. Capped by `FACE_HASH_BACKFILL_MAX_PER_TICK` (default 2000); errors don't burn the cap.
|
- `backfill_unhashed_backlog` — populates `image_exif.content_hash` for photos that arrived before the hash field was retroactive. Capped by `FACE_HASH_BACKFILL_MAX_PER_TICK` (default 2000); errors don't burn the cap.
|
||||||
- `process_face_backlog` — runs detection on photos that have a hash but no `face_detections` row. Capped by `FACE_BACKLOG_MAX_PER_TICK` (default 64). Selected via a SQL anti-join (`FaceDao::list_unscanned_candidates`); videos and EXCLUDED_DIRS paths filtered out client-side via `face_watch::filter_excluded` so they never reach Apollo.
|
- `process_face_backlog` — runs detection on photos that have a hash but no `face_detections` row. Capped by `FACE_BACKLOG_MAX_PER_TICK` (default 64). Selected via a SQL anti-join (`FaceDao::list_unscanned_candidates`); videos and EXCLUDED_DIRS paths filtered out client-side via `face_watch::filter_excluded` so they never reach Apollo.
|
||||||
|
|
||||||
@@ -521,6 +524,8 @@ ImageApi owns the face data; Apollo (sibling repo) hosts the insightface inferen
|
|||||||
Module map:
|
Module map:
|
||||||
- `src/faces.rs` — `FaceDao` trait + `SqliteFaceDao` impl, route handlers for `/faces/*`, `/image/faces/*`, `/persons/*`. Mirror of `tags.rs` layout.
|
- `src/faces.rs` — `FaceDao` trait + `SqliteFaceDao` impl, route handlers for `/faces/*`, `/image/faces/*`, `/persons/*`. Mirror of `tags.rs` layout.
|
||||||
- `src/face_watch.rs` — Tokio orchestration for the file-watch detect pass; `filter_excluded` (PathExcluder + image-extension filter), `read_image_bytes_for_detect` (RAW preview fallback).
|
- `src/face_watch.rs` — Tokio orchestration for the file-watch detect pass; `filter_excluded` (PathExcluder + image-extension filter), `read_image_bytes_for_detect` (RAW preview fallback).
|
||||||
|
- `src/backfill.rs` — per-tick drains (unhashed-hash, date_taken, face-backlog, etc.) called from `watcher::watch_files` and `watcher::process_new_files`.
|
||||||
|
- `src/watcher.rs` — the watcher loop itself and `process_new_files` (file walk → EXIF write → face-candidate build).
|
||||||
- `src/ai/face_client.rs` — HTTP client for Apollo's inference. Configured by `APOLLO_FACE_API_BASE_URL`, falls back to `APOLLO_API_BASE_URL`. Both unset → feature disabled, file-watch hook is a no-op.
|
- `src/ai/face_client.rs` — HTTP client for Apollo's inference. Configured by `APOLLO_FACE_API_BASE_URL`, falls back to `APOLLO_API_BASE_URL`. Both unset → feature disabled, file-watch hook is a no-op.
|
||||||
- `migrations/2026-04-29-000000_add_faces/` — schema.
|
- `migrations/2026-04-29-000000_add_faces/` — schema.
|
||||||
|
|
||||||
|
|||||||
869
src/main.rs
869
src/main.rs
@@ -4,38 +4,30 @@
|
|||||||
extern crate diesel;
|
extern crate diesel;
|
||||||
extern crate rayon;
|
extern crate rayon;
|
||||||
|
|
||||||
use actix::Addr;
|
|
||||||
use actix_web::web::Data;
|
use actix_web::web::Data;
|
||||||
use actix_web_prom::PrometheusMetricsBuilder;
|
use actix_web_prom::PrometheusMetricsBuilder;
|
||||||
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
|
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::HashMap;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::path::{Path, PathBuf};
|
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::{Duration, SystemTime};
|
|
||||||
use walkdir::WalkDir;
|
|
||||||
|
|
||||||
use actix_cors::Cors;
|
use actix_cors::Cors;
|
||||||
use actix_governor::{Governor, GovernorConfigBuilder};
|
use actix_governor::{Governor, GovernorConfigBuilder};
|
||||||
use actix_multipart as mp;
|
use actix_multipart as mp;
|
||||||
use actix_web::{App, HttpResponse, HttpServer, middleware, web};
|
use actix_web::{App, HttpResponse, HttpServer, middleware, web};
|
||||||
use chrono::Utc;
|
|
||||||
use diesel::sqlite::Sqlite;
|
use diesel::sqlite::Sqlite;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
|
||||||
use crate::ai::InsightGenerator;
|
use crate::ai::InsightGenerator;
|
||||||
use crate::auth::login;
|
use crate::auth::login;
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
use crate::database::models::InsertImageExif;
|
|
||||||
use crate::database::*;
|
use crate::database::*;
|
||||||
use crate::files::{RealFileSystem, move_file};
|
use crate::files::{RealFileSystem, move_file};
|
||||||
use crate::service::ServiceBuilder;
|
use crate::service::ServiceBuilder;
|
||||||
use crate::state::AppState;
|
use crate::state::AppState;
|
||||||
use crate::tags::*;
|
use crate::tags::*;
|
||||||
use crate::video::actors::{
|
use crate::video::actors::ScanDirectoryMessage;
|
||||||
GeneratePreviewClipMessage, QueueVideosMessage, ScanDirectoryMessage, VideoPlaylistManager,
|
use log::{error, info};
|
||||||
};
|
|
||||||
use log::{debug, error, info, warn};
|
|
||||||
|
|
||||||
mod ai;
|
mod ai;
|
||||||
mod auth;
|
mod auth;
|
||||||
@@ -62,6 +54,7 @@ mod tags;
|
|||||||
mod thumbnails;
|
mod thumbnails;
|
||||||
mod utils;
|
mod utils;
|
||||||
mod video;
|
mod video;
|
||||||
|
mod watcher;
|
||||||
|
|
||||||
mod knowledge;
|
mod knowledge;
|
||||||
mod memories;
|
mod memories;
|
||||||
@@ -135,7 +128,7 @@ fn main() -> std::io::Result<()> {
|
|||||||
// Start file watcher with playlist manager and preview generator
|
// Start file watcher with playlist manager and preview generator
|
||||||
let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone();
|
let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone();
|
||||||
let preview_gen_for_watcher = app_state.preview_clip_generator.as_ref().clone();
|
let preview_gen_for_watcher = app_state.preview_clip_generator.as_ref().clone();
|
||||||
watch_files(
|
watcher::watch_files(
|
||||||
app_state.libraries.clone(),
|
app_state.libraries.clone(),
|
||||||
playlist_mgr_for_watcher,
|
playlist_mgr_for_watcher,
|
||||||
preview_gen_for_watcher,
|
preview_gen_for_watcher,
|
||||||
@@ -148,7 +141,7 @@ fn main() -> std::io::Result<()> {
|
|||||||
// every configured library when looking for the source video, and
|
// every configured library when looking for the source video, and
|
||||||
// skips the whole cycle while any library is stale (a missing
|
// skips the whole cycle while any library is stale (a missing
|
||||||
// source is indistinguishable from a transiently-unmounted share).
|
// source is indistinguishable from a transiently-unmounted share).
|
||||||
cleanup_orphaned_playlists(
|
watcher::cleanup_orphaned_playlists(
|
||||||
app_state.libraries.clone(),
|
app_state.libraries.clone(),
|
||||||
app_state.excluded_dirs.clone(),
|
app_state.excluded_dirs.clone(),
|
||||||
app_state.library_health.clone(),
|
app_state.library_health.clone(),
|
||||||
@@ -350,853 +343,3 @@ fn run_migrations(
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clean up orphaned HLS playlists and segments whose source videos no longer exist
|
|
||||||
fn cleanup_orphaned_playlists(
|
|
||||||
libs: Vec<libraries::Library>,
|
|
||||||
excluded_dirs: Vec<String>,
|
|
||||||
library_health: libraries::LibraryHealthMap,
|
|
||||||
) {
|
|
||||||
std::thread::spawn(move || {
|
|
||||||
let video_path = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
|
|
||||||
|
|
||||||
// Get cleanup interval from environment (default: 24 hours)
|
|
||||||
let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS")
|
|
||||||
.ok()
|
|
||||||
.and_then(|s| s.parse::<u64>().ok())
|
|
||||||
.unwrap_or(86400); // 24 hours
|
|
||||||
|
|
||||||
info!("Starting orphaned playlist cleanup job");
|
|
||||||
info!(" Cleanup interval: {} seconds", cleanup_interval_secs);
|
|
||||||
info!(" Playlist directory: {}", video_path);
|
|
||||||
for lib in &libs {
|
|
||||||
info!(
|
|
||||||
" Checking sources under '{}' at {}",
|
|
||||||
lib.name, lib.root_path
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
std::thread::sleep(Duration::from_secs(cleanup_interval_secs));
|
|
||||||
|
|
||||||
// Safety gate: skip the cleanup cycle if any library is
|
|
||||||
// stale. A missing source video on a stale library is
|
|
||||||
// indistinguishable from a transient unmount, and the
|
|
||||||
// cleanup is destructive — we'd rather leak a few playlist
|
|
||||||
// files for a tick than delete one whose source is briefly
|
|
||||||
// unreachable. The cycle re-runs on the next interval.
|
|
||||||
{
|
|
||||||
let guard = library_health.read().unwrap_or_else(|e| e.into_inner());
|
|
||||||
let stale: Vec<String> = libs
|
|
||||||
.iter()
|
|
||||||
.filter(|lib| guard.get(&lib.id).map(|h| !h.is_online()).unwrap_or(false))
|
|
||||||
.map(|lib| lib.name.clone())
|
|
||||||
.collect();
|
|
||||||
if !stale.is_empty() {
|
|
||||||
warn!(
|
|
||||||
"Skipping orphaned-playlist cleanup: {} library(ies) stale: [{}]",
|
|
||||||
stale.len(),
|
|
||||||
stale.join(", ")
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Running orphaned playlist cleanup");
|
|
||||||
let start = std::time::Instant::now();
|
|
||||||
let mut deleted_count = 0;
|
|
||||||
let mut error_count = 0;
|
|
||||||
|
|
||||||
// Find all .m3u8 files in VIDEO_PATH
|
|
||||||
let playlists: Vec<PathBuf> = WalkDir::new(&video_path)
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|e| e.ok())
|
|
||||||
.filter(|e| e.file_type().is_file())
|
|
||||||
.filter(|e| {
|
|
||||||
e.path()
|
|
||||||
.extension()
|
|
||||||
.and_then(|s| s.to_str())
|
|
||||||
.map(|ext| ext.eq_ignore_ascii_case("m3u8"))
|
|
||||||
.unwrap_or(false)
|
|
||||||
})
|
|
||||||
.map(|e| e.path().to_path_buf())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
info!("Found {} playlist files to check", playlists.len());
|
|
||||||
|
|
||||||
for playlist_path in playlists {
|
|
||||||
// Extract the original video filename from playlist name
|
|
||||||
// Playlist format: {VIDEO_PATH}/{original_filename}.m3u8
|
|
||||||
if let Some(filename) = playlist_path.file_stem() {
|
|
||||||
let video_filename = filename.to_string_lossy();
|
|
||||||
|
|
||||||
// Search for this video file across every configured
|
|
||||||
// library, respecting EXCLUDED_DIRS so we don't
|
|
||||||
// false-resurrect playlists for videos that only
|
|
||||||
// exist inside an excluded subtree. As soon as one
|
|
||||||
// library has a matching source, we're done — the
|
|
||||||
// playlist isn't orphaned.
|
|
||||||
let mut video_exists = false;
|
|
||||||
'libs: for lib in &libs {
|
|
||||||
let effective = lib.effective_excluded_dirs(&excluded_dirs);
|
|
||||||
for entry in image_api::file_scan::walk_library_files(
|
|
||||||
Path::new(&lib.root_path),
|
|
||||||
&effective,
|
|
||||||
) {
|
|
||||||
if let Some(entry_stem) = entry.path().file_stem()
|
|
||||||
&& entry_stem == filename
|
|
||||||
&& file_types::is_video_file(entry.path())
|
|
||||||
{
|
|
||||||
video_exists = true;
|
|
||||||
break 'libs;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !video_exists {
|
|
||||||
debug!(
|
|
||||||
"Source video for playlist {} no longer exists, deleting",
|
|
||||||
playlist_path.display()
|
|
||||||
);
|
|
||||||
|
|
||||||
// Delete the playlist file
|
|
||||||
if let Err(e) = std::fs::remove_file(&playlist_path) {
|
|
||||||
warn!(
|
|
||||||
"Failed to delete playlist {}: {}",
|
|
||||||
playlist_path.display(),
|
|
||||||
e
|
|
||||||
);
|
|
||||||
error_count += 1;
|
|
||||||
} else {
|
|
||||||
deleted_count += 1;
|
|
||||||
|
|
||||||
// Also try to delete associated .ts segment files
|
|
||||||
// They are typically named {filename}N.ts in the same directory
|
|
||||||
if let Some(parent_dir) = playlist_path.parent() {
|
|
||||||
for entry in WalkDir::new(parent_dir)
|
|
||||||
.max_depth(1)
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|e| e.ok())
|
|
||||||
.filter(|e| e.file_type().is_file())
|
|
||||||
{
|
|
||||||
let entry_path = entry.path();
|
|
||||||
if let Some(ext) = entry_path.extension()
|
|
||||||
&& ext.eq_ignore_ascii_case("ts")
|
|
||||||
{
|
|
||||||
// Check if this .ts file belongs to our playlist
|
|
||||||
if let Some(ts_stem) = entry_path.file_stem() {
|
|
||||||
let ts_name = ts_stem.to_string_lossy();
|
|
||||||
if ts_name.starts_with(&*video_filename) {
|
|
||||||
if let Err(e) = std::fs::remove_file(entry_path) {
|
|
||||||
debug!(
|
|
||||||
"Failed to delete segment {}: {}",
|
|
||||||
entry_path.display(),
|
|
||||||
e
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
debug!(
|
|
||||||
"Deleted segment: {}",
|
|
||||||
entry_path.display()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"Orphaned playlist cleanup completed in {:?}: deleted {} playlists, {} errors",
|
|
||||||
start.elapsed(),
|
|
||||||
deleted_count,
|
|
||||||
error_count
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn watch_files(
|
|
||||||
libs: Vec<libraries::Library>,
|
|
||||||
playlist_manager: Addr<VideoPlaylistManager>,
|
|
||||||
preview_generator: Addr<video::actors::PreviewClipGenerator>,
|
|
||||||
face_client: crate::ai::face_client::FaceClient,
|
|
||||||
excluded_dirs: Vec<String>,
|
|
||||||
library_health: libraries::LibraryHealthMap,
|
|
||||||
) {
|
|
||||||
std::thread::spawn(move || {
|
|
||||||
// Get polling intervals from environment variables
|
|
||||||
// Quick scan: Check recently modified files (default: 60 seconds)
|
|
||||||
let quick_interval_secs = dotenv::var("WATCH_QUICK_INTERVAL_SECONDS")
|
|
||||||
.ok()
|
|
||||||
.and_then(|s| s.parse::<u64>().ok())
|
|
||||||
.unwrap_or(60);
|
|
||||||
|
|
||||||
// Full scan: Check all files regardless of modification time (default: 3600 seconds = 1 hour)
|
|
||||||
let full_interval_secs = dotenv::var("WATCH_FULL_INTERVAL_SECONDS")
|
|
||||||
.ok()
|
|
||||||
.and_then(|s| s.parse::<u64>().ok())
|
|
||||||
.unwrap_or(3600);
|
|
||||||
|
|
||||||
info!("Starting optimized file watcher");
|
|
||||||
info!(" Quick scan interval: {} seconds", quick_interval_secs);
|
|
||||||
info!(" Full scan interval: {} seconds", full_interval_secs);
|
|
||||||
// Surface face-detection state at boot so it's obvious whether
|
|
||||||
// the watcher will hit Apollo. The branch silently no-ops when
|
|
||||||
// disabled (intentional for legacy deploys), which makes "why
|
|
||||||
// aren't faces being detected?" hard to diagnose otherwise.
|
|
||||||
if face_client.is_enabled() {
|
|
||||||
info!(" Face detection: ENABLED");
|
|
||||||
} else {
|
|
||||||
info!(
|
|
||||||
" Face detection: DISABLED (set APOLLO_FACE_API_BASE_URL \
|
|
||||||
or APOLLO_API_BASE_URL to enable)"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
for lib in &libs {
|
|
||||||
info!(
|
|
||||||
" Watching library '{}' (id={}) at {}",
|
|
||||||
lib.name, lib.id, lib.root_path
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create DAOs for tracking processed files
|
|
||||||
let exif_dao = Arc::new(Mutex::new(
|
|
||||||
Box::new(SqliteExifDao::new()) as Box<dyn ExifDao>
|
|
||||||
));
|
|
||||||
let preview_dao = Arc::new(Mutex::new(
|
|
||||||
Box::new(SqlitePreviewDao::new()) as Box<dyn PreviewDao>
|
|
||||||
));
|
|
||||||
let face_dao = Arc::new(Mutex::new(
|
|
||||||
Box::new(faces::SqliteFaceDao::new()) as Box<dyn faces::FaceDao>
|
|
||||||
));
|
|
||||||
// tag_dao for the watcher's auto-bind path. Independent of the
|
|
||||||
// request-handler tag_dao instance — both end up pointing at the
|
|
||||||
// same SQLite file via SqliteTagDao::default().
|
|
||||||
let watcher_tag_dao = Arc::new(Mutex::new(
|
|
||||||
Box::new(SqliteTagDao::default()) as Box<dyn tags::TagDao>
|
|
||||||
));
|
|
||||||
|
|
||||||
let mut last_quick_scan = SystemTime::now();
|
|
||||||
let mut last_full_scan = SystemTime::now();
|
|
||||||
let mut scan_count = 0u64;
|
|
||||||
|
|
||||||
// Per-library cursor for the missing-file scan. Each tick reads
|
|
||||||
// a page from `offset`, stat()s the rows, deletes confirmed-
|
|
||||||
// missing ones, and advances or wraps the cursor. State held
|
|
||||||
// in-memory so a watcher restart resumes from 0 — fine, the
|
|
||||||
// sweep is idempotent.
|
|
||||||
let mut missing_file_offsets: std::collections::HashMap<i32, i64> =
|
|
||||||
std::collections::HashMap::new();
|
|
||||||
|
|
||||||
let missing_scan_page_size: i64 = dotenv::var("IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE")
|
|
||||||
.ok()
|
|
||||||
.and_then(|s| s.parse().ok())
|
|
||||||
.filter(|n: &i64| *n > 0)
|
|
||||||
.unwrap_or(library_maintenance::DEFAULT_SCAN_PAGE_SIZE);
|
|
||||||
let missing_delete_cap: usize = dotenv::var("IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK")
|
|
||||||
.ok()
|
|
||||||
.and_then(|s| s.parse().ok())
|
|
||||||
.filter(|n: &usize| *n > 0)
|
|
||||||
.unwrap_or(library_maintenance::DEFAULT_MISSING_DELETE_CAP);
|
|
||||||
|
|
||||||
// Two-tick orphan-GC consensus state. Carried across ticks via
|
|
||||||
// `OrphanGcState`; see library_maintenance::run_orphan_gc.
|
|
||||||
let mut orphan_gc_state = library_maintenance::OrphanGcState::default();
|
|
||||||
|
|
||||||
// Initial availability sweep before the loop's first sleep so
|
|
||||||
// /libraries reports the truth from the very first request,
|
|
||||||
// rather than the optimistic Online default that
|
|
||||||
// new_health_map seeds. Without this, an unmounted share would
|
|
||||||
// appear online for up to WATCH_QUICK_INTERVAL_SECONDS (default
|
|
||||||
// 60s) after boot. Same probe logic as the per-tick gate
|
|
||||||
// below; no ingest runs here, just the health update + log.
|
|
||||||
// Disabled libraries skip the probe entirely — they should
|
|
||||||
// never enter the health map (treated as out-of-scope).
|
|
||||||
for lib in &libs {
|
|
||||||
if !lib.enabled {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let context = opentelemetry::Context::new();
|
|
||||||
let had_data = exif_dao
|
|
||||||
.lock()
|
|
||||||
.expect("exif_dao poisoned")
|
|
||||||
.count_for_library(&context, lib.id)
|
|
||||||
.map(|n| n > 0)
|
|
||||||
.unwrap_or(false);
|
|
||||||
libraries::refresh_health(&library_health, lib, had_data);
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
std::thread::sleep(Duration::from_secs(quick_interval_secs));
|
|
||||||
|
|
||||||
let now = SystemTime::now();
|
|
||||||
let since_last_full = now
|
|
||||||
.duration_since(last_full_scan)
|
|
||||||
.unwrap_or(Duration::from_secs(0));
|
|
||||||
|
|
||||||
let is_full_scan = since_last_full.as_secs() >= full_interval_secs;
|
|
||||||
|
|
||||||
for lib in &libs {
|
|
||||||
// Operator kill switch: a disabled library is invisible
|
|
||||||
// to the watcher entirely. No probe, no ingest, no
|
|
||||||
// maintenance, no health entry. Distinct from Stale —
|
|
||||||
// Stale is "we wanted to but couldn't"; Disabled is
|
|
||||||
// "we don't want to". Toggle via SQL.
|
|
||||||
if !lib.enabled {
|
|
||||||
debug!(
|
|
||||||
"watcher: skipping library '{}' (id={}) — enabled=false",
|
|
||||||
lib.name, lib.id
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Availability probe: every tick checks that the
|
|
||||||
// library's mount is reachable, is a directory, is
|
|
||||||
// readable, and (if image_exif has rows for it) is
|
|
||||||
// non-empty. A Stale library skips ingest, backlog
|
|
||||||
// drains, and metric refresh — reads/serving in HTTP
|
|
||||||
// handlers continue to work. Branches B/C extend the
|
|
||||||
// probe gate to cover handoff and orphan GC. See
|
|
||||||
// CLAUDE.md "Library availability and safety".
|
|
||||||
let had_data = {
|
|
||||||
let context = opentelemetry::Context::new();
|
|
||||||
let mut guard = exif_dao.lock().expect("exif_dao poisoned");
|
|
||||||
guard
|
|
||||||
.count_for_library(&context, lib.id)
|
|
||||||
.map(|n| n > 0)
|
|
||||||
.unwrap_or(false)
|
|
||||||
};
|
|
||||||
let health = libraries::refresh_health(&library_health, lib, had_data);
|
|
||||||
if !health.is_online() {
|
|
||||||
// Skip every write path for this library this tick.
|
|
||||||
// Don't refresh the media-count gauge either — a
|
|
||||||
// probe-failed library would otherwise flap to 0
|
|
||||||
// image / 0 video and pollute Prometheus.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Drain the unhashed-hash backlog AND the face-detection
|
|
||||||
// backlog every tick, regardless of quick/full. Quick
|
|
||||||
// scans only walk recently-modified files, so the
|
|
||||||
// pre-Phase-3 backlog never enters their candidate set
|
|
||||||
// — without these standalone passes, backfill +
|
|
||||||
// detection only progressed during full scans
|
|
||||||
// (default once an hour).
|
|
||||||
// Effective excludes for this library: global env-var
|
|
||||||
// ∪ row's excluded_dirs. Compute once per tick — used
|
|
||||||
// by every walker below for this library.
|
|
||||||
let effective_excludes = lib.effective_excluded_dirs(&excluded_dirs);
|
|
||||||
|
|
||||||
if face_client.is_enabled() {
|
|
||||||
let context = opentelemetry::Context::new();
|
|
||||||
backfill::backfill_unhashed_backlog(&context, lib, &exif_dao);
|
|
||||||
backfill::process_face_backlog(
|
|
||||||
&context,
|
|
||||||
lib,
|
|
||||||
&face_client,
|
|
||||||
&face_dao,
|
|
||||||
&watcher_tag_dao,
|
|
||||||
&effective_excludes,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Date-taken backfill: drain rows whose canonical date is
|
|
||||||
// either unresolved or only fs_time-sourced. Independent
|
|
||||||
// of face detection — runs even on deploys that don't
|
|
||||||
// configure Apollo, since `/memories` depends on it.
|
|
||||||
{
|
|
||||||
let context = opentelemetry::Context::new();
|
|
||||||
backfill::backfill_missing_date_taken(&context, lib, &exif_dao);
|
|
||||||
}
|
|
||||||
|
|
||||||
if is_full_scan {
|
|
||||||
info!(
|
|
||||||
"Running full scan for library '{}' (scan #{})",
|
|
||||||
lib.name, scan_count
|
|
||||||
);
|
|
||||||
process_new_files(
|
|
||||||
lib,
|
|
||||||
Arc::clone(&exif_dao),
|
|
||||||
Arc::clone(&preview_dao),
|
|
||||||
Arc::clone(&face_dao),
|
|
||||||
Arc::clone(&watcher_tag_dao),
|
|
||||||
face_client.clone(),
|
|
||||||
&effective_excludes,
|
|
||||||
None,
|
|
||||||
playlist_manager.clone(),
|
|
||||||
preview_generator.clone(),
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
debug!(
|
|
||||||
"Running quick scan for library '{}' (checking files modified in last {} seconds)",
|
|
||||||
lib.name,
|
|
||||||
quick_interval_secs + 10
|
|
||||||
);
|
|
||||||
let check_since = last_quick_scan
|
|
||||||
.checked_sub(Duration::from_secs(10))
|
|
||||||
.unwrap_or(last_quick_scan);
|
|
||||||
process_new_files(
|
|
||||||
lib,
|
|
||||||
Arc::clone(&exif_dao),
|
|
||||||
Arc::clone(&preview_dao),
|
|
||||||
Arc::clone(&face_dao),
|
|
||||||
Arc::clone(&watcher_tag_dao),
|
|
||||||
face_client.clone(),
|
|
||||||
&effective_excludes,
|
|
||||||
Some(check_since),
|
|
||||||
playlist_manager.clone(),
|
|
||||||
preview_generator.clone(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update media counts per library (metric aggregates across all)
|
|
||||||
thumbnails::update_media_counts(Path::new(&lib.root_path), &effective_excludes);
|
|
||||||
|
|
||||||
// Missing-file detection: prune image_exif rows whose
|
|
||||||
// source file is no longer on disk. Per-library, so we
|
|
||||||
// pass library-online-this-tick implicitly (we only
|
|
||||||
// reach here if the probe gate at the top of the
|
|
||||||
// iteration passed). Capped + paginated so a huge
|
|
||||||
// library doesn't stall the watcher; rows we don't
|
|
||||||
// visit this tick get visited next tick. See
|
|
||||||
// library_maintenance::detect_missing_files_for_library.
|
|
||||||
{
|
|
||||||
let context = opentelemetry::Context::new();
|
|
||||||
let offset = missing_file_offsets.get(&lib.id).copied().unwrap_or(0);
|
|
||||||
let (deleted, next_offset) =
|
|
||||||
library_maintenance::detect_missing_files_for_library(
|
|
||||||
&context,
|
|
||||||
lib,
|
|
||||||
&exif_dao,
|
|
||||||
offset,
|
|
||||||
missing_scan_page_size,
|
|
||||||
missing_delete_cap,
|
|
||||||
);
|
|
||||||
missing_file_offsets.insert(lib.id, next_offset);
|
|
||||||
if deleted > 0 {
|
|
||||||
debug!(
|
|
||||||
"missing-file scan: library '{}' next_offset={}",
|
|
||||||
lib.name, next_offset
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reconciliation: cross-library, so it runs once per tick
|
|
||||||
// outside the per-library loop. Idempotent — fast no-op when
|
|
||||||
// there's nothing to do. Operates on the database alone, no
|
|
||||||
// filesystem dependency, so it doesn't need a health gate.
|
|
||||||
// See database::reconcile and CLAUDE.md "Multi-library data
|
|
||||||
// model" for the rules.
|
|
||||||
{
|
|
||||||
let mut conn = image_api::database::connect();
|
|
||||||
let _ = image_api::database::reconcile::run(&mut conn);
|
|
||||||
|
|
||||||
// Back-ref refresh: hash-keyed rows whose
|
|
||||||
// (library_id, rel_path) tuple no longer matches any
|
|
||||||
// image_exif row but whose hash still does. After a
|
|
||||||
// recent→archive move, the missing-file scan removes
|
|
||||||
// the old image_exif row; this pass repoints face /
|
|
||||||
// tag / insight back-refs at the surviving location.
|
|
||||||
// DB-only, no health gate needed — uses what's in
|
|
||||||
// image_exif as truth.
|
|
||||||
let _ = library_maintenance::refresh_back_refs(&mut conn);
|
|
||||||
|
|
||||||
// Orphan GC: the destructive end of the maintenance
|
|
||||||
// pipeline. Two-tick consensus + every-library-online
|
|
||||||
// requirement is enforced inside run_orphan_gc; we
|
|
||||||
// pass the current all-online flag and the function
|
|
||||||
// tracks the previous tick's flag in OrphanGcState.
|
|
||||||
let all_online = library_maintenance::all_libraries_online(&libs, &library_health);
|
|
||||||
let _ =
|
|
||||||
library_maintenance::run_orphan_gc(&mut conn, &mut orphan_gc_state, all_online);
|
|
||||||
}
|
|
||||||
|
|
||||||
if is_full_scan {
|
|
||||||
last_full_scan = now;
|
|
||||||
}
|
|
||||||
last_quick_scan = now;
|
|
||||||
scan_count += 1;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if a playlist needs to be (re)generated
|
|
||||||
/// Returns true if:
|
|
||||||
/// - Playlist doesn't exist, OR
|
|
||||||
/// - Source video is newer than the playlist
|
|
||||||
fn playlist_needs_generation(video_path: &Path, playlist_path: &Path) -> bool {
|
|
||||||
if !playlist_path.exists() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if source video is newer than playlist
|
|
||||||
if let (Ok(video_meta), Ok(playlist_meta)) = (
|
|
||||||
std::fs::metadata(video_path),
|
|
||||||
std::fs::metadata(playlist_path),
|
|
||||||
) && let (Ok(video_modified), Ok(playlist_modified)) =
|
|
||||||
(video_meta.modified(), playlist_meta.modified())
|
|
||||||
{
|
|
||||||
return video_modified > playlist_modified;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we can't determine, assume it needs generation
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn process_new_files(
|
|
||||||
library: &libraries::Library,
|
|
||||||
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
|
|
||||||
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
|
|
||||||
face_dao: Arc<Mutex<Box<dyn faces::FaceDao>>>,
|
|
||||||
tag_dao: Arc<Mutex<Box<dyn tags::TagDao>>>,
|
|
||||||
face_client: crate::ai::face_client::FaceClient,
|
|
||||||
excluded_dirs: &[String],
|
|
||||||
modified_since: Option<SystemTime>,
|
|
||||||
playlist_manager: Addr<VideoPlaylistManager>,
|
|
||||||
preview_generator: Addr<video::actors::PreviewClipGenerator>,
|
|
||||||
) {
|
|
||||||
let context = opentelemetry::Context::new();
|
|
||||||
let thumbs = dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
|
|
||||||
let thumbnail_directory = Path::new(&thumbs);
|
|
||||||
let base_path = Path::new(&library.root_path);
|
|
||||||
|
|
||||||
// Walk, prune EXCLUDED_DIRS subtrees, and apply image/video + modified_since
|
|
||||||
// filters. See `file_scan` for why exclusion has to happen at WalkDir
|
|
||||||
// time (filter_entry) rather than at face-detect time.
|
|
||||||
let files: Vec<(PathBuf, String)> =
|
|
||||||
image_api::file_scan::enumerate_indexable_files(base_path, excluded_dirs, modified_since);
|
|
||||||
|
|
||||||
if files.is_empty() {
|
|
||||||
debug!("No files to process");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Found {} files to check", files.len());
|
|
||||||
|
|
||||||
// Batch query: Get all EXIF data for these files in one query
|
|
||||||
let file_paths: Vec<String> = files.iter().map(|(_, rel_path)| rel_path.clone()).collect();
|
|
||||||
|
|
||||||
let existing_exif_paths: HashMap<String, bool> = {
|
|
||||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
|
||||||
// Walk is per-library, so scope the lookup so a same-named file
|
|
||||||
// in another library doesn't make this one look already-indexed.
|
|
||||||
match dao.get_exif_batch(&context, Some(library.id), &file_paths) {
|
|
||||||
Ok(exif_records) => exif_records
|
|
||||||
.into_iter()
|
|
||||||
.map(|record| (record.file_path, true))
|
|
||||||
.collect(),
|
|
||||||
Err(e) => {
|
|
||||||
error!("Error batch querying EXIF data: {:?}", e);
|
|
||||||
HashMap::new()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut new_files_found = false;
|
|
||||||
let mut files_needing_row = Vec::new();
|
|
||||||
|
|
||||||
// Register every image/video file in image_exif. Rows without EXIF
|
|
||||||
// still carry library_id, rel_path, content_hash, and size_bytes so
|
|
||||||
// derivative dedup and DB-indexed sort/filter work for every file,
|
|
||||||
// not just photos with parseable EXIF.
|
|
||||||
for (file_path, relative_path) in &files {
|
|
||||||
// Check both the library-scoped legacy path (current shape) and
|
|
||||||
// the bare-legacy path (pre-multi-library shape). Either one
|
|
||||||
// existing means a thumbnail is already on disk for this file.
|
|
||||||
let scoped_thumb_path = content_hash::library_scoped_legacy_path(
|
|
||||||
thumbnail_directory,
|
|
||||||
library.id,
|
|
||||||
relative_path,
|
|
||||||
);
|
|
||||||
let bare_legacy_thumb_path = thumbnail_directory.join(relative_path);
|
|
||||||
let needs_thumbnail = !scoped_thumb_path.exists()
|
|
||||||
&& !bare_legacy_thumb_path.exists()
|
|
||||||
&& !thumbnails::unsupported_thumbnail_sentinel(&scoped_thumb_path).exists()
|
|
||||||
&& !thumbnails::unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists();
|
|
||||||
let needs_row = !existing_exif_paths.contains_key(relative_path);
|
|
||||||
|
|
||||||
if needs_thumbnail || needs_row {
|
|
||||||
new_files_found = true;
|
|
||||||
|
|
||||||
if needs_thumbnail {
|
|
||||||
info!("New file detected (missing thumbnail): {}", relative_path);
|
|
||||||
}
|
|
||||||
|
|
||||||
if needs_row {
|
|
||||||
files_needing_row.push((file_path.clone(), relative_path.clone()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !files_needing_row.is_empty() {
|
|
||||||
info!(
|
|
||||||
"Registering {} new files in image_exif",
|
|
||||||
files_needing_row.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
for (file_path, relative_path) in files_needing_row {
|
|
||||||
let timestamp = Utc::now().timestamp();
|
|
||||||
|
|
||||||
// Hash + size from filesystem metadata — always attempted so
|
|
||||||
// every file gets a content_hash, even when EXIF is absent.
|
|
||||||
let (content_hash, size_bytes) = match content_hash::compute(&file_path) {
|
|
||||||
Ok(id) => (Some(id.content_hash), Some(id.size_bytes)),
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Failed to hash {}: {:?}", file_path.display(), e);
|
|
||||||
(None, None)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Perceptual hashes (pHash + dHash). Best-effort — None for
|
|
||||||
// videos and decode failures. Drives near-duplicate detection
|
|
||||||
// in the Apollo duplicates surface; failure here is non-fatal
|
|
||||||
// and never blocks indexing.
|
|
||||||
let perceptual = perceptual_hash::compute(&file_path);
|
|
||||||
|
|
||||||
// EXIF is best-effort enrichment. When extraction fails (or the
|
|
||||||
// file type doesn't support EXIF) we still store a row with all
|
|
||||||
// EXIF fields NULL; the file remains visible to sort-by-date
|
|
||||||
// and tag queries via its rel_path and filesystem timestamps.
|
|
||||||
let exif_fields = if exif::supports_exif(&file_path) {
|
|
||||||
match exif::extract_exif_from_path(&file_path) {
|
|
||||||
Ok(data) => Some(data),
|
|
||||||
Err(e) => {
|
|
||||||
debug!(
|
|
||||||
"No EXIF or parse error for {}: {:?}",
|
|
||||||
file_path.display(),
|
|
||||||
e
|
|
||||||
);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
// Canonical date_taken via the waterfall — kamadak-exif (already
|
|
||||||
// computed above) → exiftool fallback for videos / MakerNote /
|
|
||||||
// QuickTime → filename regex → earliest_fs_time. Source is
|
|
||||||
// recorded so the per-tick backfill drain can re-run weak
|
|
||||||
// resolutions later.
|
|
||||||
let resolved_date = date_resolver::resolve_date_taken(
|
|
||||||
&file_path,
|
|
||||||
exif_fields.as_ref().and_then(|e| e.date_taken),
|
|
||||||
);
|
|
||||||
|
|
||||||
let insert_exif = InsertImageExif {
|
|
||||||
library_id: library.id,
|
|
||||||
file_path: relative_path.clone(),
|
|
||||||
camera_make: exif_fields.as_ref().and_then(|e| e.camera_make.clone()),
|
|
||||||
camera_model: exif_fields.as_ref().and_then(|e| e.camera_model.clone()),
|
|
||||||
lens_model: exif_fields.as_ref().and_then(|e| e.lens_model.clone()),
|
|
||||||
width: exif_fields.as_ref().and_then(|e| e.width),
|
|
||||||
height: exif_fields.as_ref().and_then(|e| e.height),
|
|
||||||
orientation: exif_fields.as_ref().and_then(|e| e.orientation),
|
|
||||||
gps_latitude: exif_fields
|
|
||||||
.as_ref()
|
|
||||||
.and_then(|e| e.gps_latitude.map(|v| v as f32)),
|
|
||||||
gps_longitude: exif_fields
|
|
||||||
.as_ref()
|
|
||||||
.and_then(|e| e.gps_longitude.map(|v| v as f32)),
|
|
||||||
gps_altitude: exif_fields
|
|
||||||
.as_ref()
|
|
||||||
.and_then(|e| e.gps_altitude.map(|v| v as f32)),
|
|
||||||
focal_length: exif_fields
|
|
||||||
.as_ref()
|
|
||||||
.and_then(|e| e.focal_length.map(|v| v as f32)),
|
|
||||||
aperture: exif_fields
|
|
||||||
.as_ref()
|
|
||||||
.and_then(|e| e.aperture.map(|v| v as f32)),
|
|
||||||
shutter_speed: exif_fields.as_ref().and_then(|e| e.shutter_speed.clone()),
|
|
||||||
iso: exif_fields.as_ref().and_then(|e| e.iso),
|
|
||||||
date_taken: resolved_date.map(|r| r.timestamp),
|
|
||||||
created_time: timestamp,
|
|
||||||
last_modified: timestamp,
|
|
||||||
content_hash,
|
|
||||||
size_bytes,
|
|
||||||
phash_64: perceptual.map(|h| h.phash_64),
|
|
||||||
dhash_64: perceptual.map(|h| h.dhash_64),
|
|
||||||
date_taken_source: resolved_date.map(|r| r.source.as_str().to_string()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
|
||||||
if let Err(e) = dao.store_exif(&context, insert_exif) {
|
|
||||||
error!(
|
|
||||||
"Failed to register {} in image_exif: {:?}",
|
|
||||||
relative_path, e
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
debug!("Registered {} in image_exif", relative_path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Face detection pass ────────────────────────────────────────────
|
|
||||||
// Run after EXIF writes so newly-registered files have their
|
|
||||||
// content_hash populated. Skipped wholesale when face_client is
|
|
||||||
// disabled (no Apollo integration configured) — Phase 3 wires this
|
|
||||||
// up; the watcher remains usable on legacy deploys.
|
|
||||||
if face_client.is_enabled() {
|
|
||||||
// Opportunistic content_hash backfill: photos indexed before
|
|
||||||
// content-hashing landed (or where the hash compute failed
|
|
||||||
// silently on insert) end up in image_exif with NULL
|
|
||||||
// content_hash. build_face_candidates keys on content_hash, so
|
|
||||||
// those files would never become candidates without backfill.
|
|
||||||
// Idempotent — subsequent scans see the populated hashes and
|
|
||||||
// no-op. The dedicated `backfill_hashes` binary is still the
|
|
||||||
// right tool for very large legacy libraries; this branch
|
|
||||||
// ensures small/medium deploys self-heal without operator
|
|
||||||
// action.
|
|
||||||
backfill::backfill_missing_content_hashes(&context, &files, library, &exif_dao);
|
|
||||||
let candidates =
|
|
||||||
backfill::build_face_candidates(&context, library, &files, &exif_dao, &face_dao);
|
|
||||||
debug!(
|
|
||||||
"face_watch: scan tick — {} image file(s) walked, {} candidate(s) (library '{}', modified_since={})",
|
|
||||||
files
|
|
||||||
.iter()
|
|
||||||
.filter(|(p, _)| !file_types::is_video_file(p))
|
|
||||||
.count(),
|
|
||||||
candidates.len(),
|
|
||||||
library.name,
|
|
||||||
modified_since.is_some(),
|
|
||||||
);
|
|
||||||
if !candidates.is_empty() {
|
|
||||||
face_watch::run_face_detection_pass(
|
|
||||||
library,
|
|
||||||
excluded_dirs,
|
|
||||||
&face_client,
|
|
||||||
Arc::clone(&face_dao),
|
|
||||||
Arc::clone(&tag_dao),
|
|
||||||
candidates,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for videos that need HLS playlists
|
|
||||||
let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
|
|
||||||
let mut videos_needing_playlists = Vec::new();
|
|
||||||
|
|
||||||
for (file_path, _relative_path) in &files {
|
|
||||||
if file_types::is_video_file(file_path) {
|
|
||||||
// Construct expected playlist path
|
|
||||||
let playlist_filename =
|
|
||||||
format!("{}.m3u8", file_path.file_name().unwrap().to_string_lossy());
|
|
||||||
let playlist_path = Path::new(&video_path_base).join(&playlist_filename);
|
|
||||||
|
|
||||||
// Check if playlist needs (re)generation
|
|
||||||
if playlist_needs_generation(file_path, &playlist_path) {
|
|
||||||
videos_needing_playlists.push(file_path.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send queue request to playlist manager
|
|
||||||
if !videos_needing_playlists.is_empty() {
|
|
||||||
playlist_manager.do_send(QueueVideosMessage {
|
|
||||||
video_paths: videos_needing_playlists,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for videos that need preview clips
|
|
||||||
// Collect (full_path, relative_path) for video files
|
|
||||||
let video_files: Vec<(String, String)> = files
|
|
||||||
.iter()
|
|
||||||
.filter(|(file_path, _)| file_types::is_video_file(file_path))
|
|
||||||
.map(|(file_path, rel_path)| (file_path.to_string_lossy().to_string(), rel_path.clone()))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
if !video_files.is_empty() {
|
|
||||||
// Query DB using relative paths (consistent with how GET/POST handlers store them)
|
|
||||||
let video_rel_paths: Vec<String> = video_files.iter().map(|(_, rel)| rel.clone()).collect();
|
|
||||||
|
|
||||||
let existing_previews: HashMap<String, String> = {
|
|
||||||
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
|
|
||||||
match dao.get_previews_batch(&context, &video_rel_paths) {
|
|
||||||
Ok(clips) => clips
|
|
||||||
.into_iter()
|
|
||||||
.map(|clip| (clip.file_path, clip.status))
|
|
||||||
.collect(),
|
|
||||||
Err(e) => {
|
|
||||||
error!("Error batch querying preview clips: {:?}", e);
|
|
||||||
HashMap::new()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
for (full_path, relative_path) in &video_files {
|
|
||||||
let status = existing_previews.get(relative_path).map(|s| s.as_str());
|
|
||||||
let needs_preview = match status {
|
|
||||||
None => true, // No record at all
|
|
||||||
Some("failed") => true, // Retry failed
|
|
||||||
Some("pending") => true, // Stale pending from previous run
|
|
||||||
_ => false, // processing or complete
|
|
||||||
};
|
|
||||||
|
|
||||||
if needs_preview {
|
|
||||||
// Insert pending record using relative path
|
|
||||||
if status.is_none() {
|
|
||||||
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
|
|
||||||
let _ = dao.insert_preview(&context, relative_path, "pending");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send full path in the message — the actor will derive relative path from it
|
|
||||||
preview_generator.do_send(GeneratePreviewClipMessage {
|
|
||||||
video_path: full_path.clone(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Generate thumbnails for all files that need them
|
|
||||||
if new_files_found {
|
|
||||||
info!("Processing thumbnails for new files...");
|
|
||||||
thumbnails::create_thumbnails(std::slice::from_ref(library), excluded_dirs);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reconciliation: on a full scan, prune image_exif rows whose rel_path no
|
|
||||||
// longer exists on disk for this library. Keeps the DB in parity so
|
|
||||||
// downstream DB-backed listings (e.g. recursive /photos) don't return
|
|
||||||
// phantom files. Skipped on quick scans — those only look at recently
|
|
||||||
// modified files and can't distinguish "missing" from "unchanged".
|
|
||||||
if modified_since.is_none() {
|
|
||||||
let disk_paths: HashSet<String> = files.iter().map(|(_, rel)| rel.clone()).collect();
|
|
||||||
let db_paths: Vec<String> = {
|
|
||||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
|
||||||
dao.get_rel_paths_for_library(&context, library.id)
|
|
||||||
.unwrap_or_else(|e| {
|
|
||||||
error!(
|
|
||||||
"Reconciliation: failed to load image_exif rel_paths for lib {}: {:?}",
|
|
||||||
library.id, e
|
|
||||||
);
|
|
||||||
Vec::new()
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
let stale: Vec<String> = db_paths
|
|
||||||
.into_iter()
|
|
||||||
.filter(|p| !disk_paths.contains(p))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
if !stale.is_empty() {
|
|
||||||
info!(
|
|
||||||
"Reconciliation: pruning {} stale image_exif rows for library '{}'",
|
|
||||||
stale.len(),
|
|
||||||
library.name
|
|
||||||
);
|
|
||||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
|
||||||
for rel in &stale {
|
|
||||||
if let Err(e) = dao.delete_exif_by_library(&context, library.id, rel) {
|
|
||||||
warn!(
|
|
||||||
"Reconciliation: failed to delete {} (lib {}): {:?}",
|
|
||||||
rel, library.id, e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
951
src/watcher.rs
Normal file
951
src/watcher.rs
Normal file
@@ -0,0 +1,951 @@
|
|||||||
|
//! Background file-watcher loop + the orphaned-playlist cleanup job.
|
||||||
|
//!
|
||||||
|
//! `watch_files` spins a thread that, on every tick (default 60 s
|
||||||
|
//! quick-scan / 3600 s full-scan), probes each library's availability,
|
||||||
|
//! drains the unhashed / date / face-detection backlogs via
|
||||||
|
//! [`crate::backfill`], walks newly-modified files through
|
||||||
|
//! [`process_new_files`], updates the media-count gauges, and runs the
|
||||||
|
//! three-stage maintenance pipeline (missing-file scan → back-ref
|
||||||
|
//! refresh → orphan GC).
|
||||||
|
//!
|
||||||
|
//! `cleanup_orphaned_playlists` runs on a slower interval (default 24
|
||||||
|
//! hours) and reaps HLS playlists whose source videos no longer exist
|
||||||
|
//! in any library. Both jobs respect [`crate::libraries::LibraryHealthMap`]
|
||||||
|
//! — a stale library skips destructive paths so transient unmounts
|
||||||
|
//! don't trigger data loss.
|
||||||
|
|
||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
|
use actix::Addr;
|
||||||
|
use chrono::Utc;
|
||||||
|
use log::{debug, error, info, warn};
|
||||||
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
|
use crate::backfill;
|
||||||
|
use crate::content_hash;
|
||||||
|
use crate::database::models::InsertImageExif;
|
||||||
|
use crate::database::{ExifDao, PreviewDao, SqliteExifDao, SqlitePreviewDao};
|
||||||
|
use crate::date_resolver;
|
||||||
|
use crate::exif;
|
||||||
|
use crate::face_watch;
|
||||||
|
use crate::faces;
|
||||||
|
use crate::file_types;
|
||||||
|
use crate::libraries;
|
||||||
|
use crate::library_maintenance;
|
||||||
|
use crate::perceptual_hash;
|
||||||
|
use crate::tags;
|
||||||
|
use crate::tags::SqliteTagDao;
|
||||||
|
use crate::thumbnails;
|
||||||
|
use crate::video;
|
||||||
|
use crate::video::actors::{GeneratePreviewClipMessage, QueueVideosMessage, VideoPlaylistManager};
|
||||||
|
|
||||||
|
/// Clean up orphaned HLS playlists and segments whose source videos no longer exist
|
||||||
|
pub fn cleanup_orphaned_playlists(
|
||||||
|
libs: Vec<libraries::Library>,
|
||||||
|
excluded_dirs: Vec<String>,
|
||||||
|
library_health: libraries::LibraryHealthMap,
|
||||||
|
) {
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
let video_path = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
|
||||||
|
|
||||||
|
// Get cleanup interval from environment (default: 24 hours)
|
||||||
|
let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse::<u64>().ok())
|
||||||
|
.unwrap_or(86400); // 24 hours
|
||||||
|
|
||||||
|
info!("Starting orphaned playlist cleanup job");
|
||||||
|
info!(" Cleanup interval: {} seconds", cleanup_interval_secs);
|
||||||
|
info!(" Playlist directory: {}", video_path);
|
||||||
|
for lib in &libs {
|
||||||
|
info!(
|
||||||
|
" Checking sources under '{}' at {}",
|
||||||
|
lib.name, lib.root_path
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
std::thread::sleep(Duration::from_secs(cleanup_interval_secs));
|
||||||
|
|
||||||
|
// Safety gate: skip the cleanup cycle if any library is
|
||||||
|
// stale. A missing source video on a stale library is
|
||||||
|
// indistinguishable from a transient unmount, and the
|
||||||
|
// cleanup is destructive — we'd rather leak a few playlist
|
||||||
|
// files for a tick than delete one whose source is briefly
|
||||||
|
// unreachable. The cycle re-runs on the next interval.
|
||||||
|
{
|
||||||
|
let guard = library_health.read().unwrap_or_else(|e| e.into_inner());
|
||||||
|
let stale: Vec<String> = libs
|
||||||
|
.iter()
|
||||||
|
.filter(|lib| guard.get(&lib.id).map(|h| !h.is_online()).unwrap_or(false))
|
||||||
|
.map(|lib| lib.name.clone())
|
||||||
|
.collect();
|
||||||
|
if !stale.is_empty() {
|
||||||
|
warn!(
|
||||||
|
"Skipping orphaned-playlist cleanup: {} library(ies) stale: [{}]",
|
||||||
|
stale.len(),
|
||||||
|
stale.join(", ")
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Running orphaned playlist cleanup");
|
||||||
|
let start = std::time::Instant::now();
|
||||||
|
let mut deleted_count = 0;
|
||||||
|
let mut error_count = 0;
|
||||||
|
|
||||||
|
// Find all .m3u8 files in VIDEO_PATH
|
||||||
|
let playlists: Vec<PathBuf> = WalkDir::new(&video_path)
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|e| e.ok())
|
||||||
|
.filter(|e| e.file_type().is_file())
|
||||||
|
.filter(|e| {
|
||||||
|
e.path()
|
||||||
|
.extension()
|
||||||
|
.and_then(|s| s.to_str())
|
||||||
|
.map(|ext| ext.eq_ignore_ascii_case("m3u8"))
|
||||||
|
.unwrap_or(false)
|
||||||
|
})
|
||||||
|
.map(|e| e.path().to_path_buf())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
info!("Found {} playlist files to check", playlists.len());
|
||||||
|
|
||||||
|
for playlist_path in playlists {
|
||||||
|
// Extract the original video filename from playlist name
|
||||||
|
// Playlist format: {VIDEO_PATH}/{original_filename}.m3u8
|
||||||
|
if let Some(filename) = playlist_path.file_stem() {
|
||||||
|
let video_filename = filename.to_string_lossy();
|
||||||
|
|
||||||
|
// Search for this video file across every configured
|
||||||
|
// library, respecting EXCLUDED_DIRS so we don't
|
||||||
|
// false-resurrect playlists for videos that only
|
||||||
|
// exist inside an excluded subtree. As soon as one
|
||||||
|
// library has a matching source, we're done — the
|
||||||
|
// playlist isn't orphaned.
|
||||||
|
let mut video_exists = false;
|
||||||
|
'libs: for lib in &libs {
|
||||||
|
let effective = lib.effective_excluded_dirs(&excluded_dirs);
|
||||||
|
for entry in image_api::file_scan::walk_library_files(
|
||||||
|
Path::new(&lib.root_path),
|
||||||
|
&effective,
|
||||||
|
) {
|
||||||
|
if let Some(entry_stem) = entry.path().file_stem()
|
||||||
|
&& entry_stem == filename
|
||||||
|
&& file_types::is_video_file(entry.path())
|
||||||
|
{
|
||||||
|
video_exists = true;
|
||||||
|
break 'libs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !video_exists {
|
||||||
|
debug!(
|
||||||
|
"Source video for playlist {} no longer exists, deleting",
|
||||||
|
playlist_path.display()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Delete the playlist file
|
||||||
|
if let Err(e) = std::fs::remove_file(&playlist_path) {
|
||||||
|
warn!(
|
||||||
|
"Failed to delete playlist {}: {}",
|
||||||
|
playlist_path.display(),
|
||||||
|
e
|
||||||
|
);
|
||||||
|
error_count += 1;
|
||||||
|
} else {
|
||||||
|
deleted_count += 1;
|
||||||
|
|
||||||
|
// Also try to delete associated .ts segment files
|
||||||
|
// They are typically named {filename}N.ts in the same directory
|
||||||
|
if let Some(parent_dir) = playlist_path.parent() {
|
||||||
|
for entry in WalkDir::new(parent_dir)
|
||||||
|
.max_depth(1)
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|e| e.ok())
|
||||||
|
.filter(|e| e.file_type().is_file())
|
||||||
|
{
|
||||||
|
let entry_path = entry.path();
|
||||||
|
if let Some(ext) = entry_path.extension()
|
||||||
|
&& ext.eq_ignore_ascii_case("ts")
|
||||||
|
{
|
||||||
|
// Check if this .ts file belongs to our playlist
|
||||||
|
if let Some(ts_stem) = entry_path.file_stem() {
|
||||||
|
let ts_name = ts_stem.to_string_lossy();
|
||||||
|
if ts_name.starts_with(&*video_filename) {
|
||||||
|
if let Err(e) = std::fs::remove_file(entry_path) {
|
||||||
|
debug!(
|
||||||
|
"Failed to delete segment {}: {}",
|
||||||
|
entry_path.display(),
|
||||||
|
e
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
"Deleted segment: {}",
|
||||||
|
entry_path.display()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"Orphaned playlist cleanup completed in {:?}: deleted {} playlists, {} errors",
|
||||||
|
start.elapsed(),
|
||||||
|
deleted_count,
|
||||||
|
error_count
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn watch_files(
|
||||||
|
libs: Vec<libraries::Library>,
|
||||||
|
playlist_manager: Addr<VideoPlaylistManager>,
|
||||||
|
preview_generator: Addr<video::actors::PreviewClipGenerator>,
|
||||||
|
face_client: crate::ai::face_client::FaceClient,
|
||||||
|
excluded_dirs: Vec<String>,
|
||||||
|
library_health: libraries::LibraryHealthMap,
|
||||||
|
) {
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
// Get polling intervals from environment variables
|
||||||
|
// Quick scan: Check recently modified files (default: 60 seconds)
|
||||||
|
let quick_interval_secs = dotenv::var("WATCH_QUICK_INTERVAL_SECONDS")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse::<u64>().ok())
|
||||||
|
.unwrap_or(60);
|
||||||
|
|
||||||
|
// Full scan: Check all files regardless of modification time (default: 3600 seconds = 1 hour)
|
||||||
|
let full_interval_secs = dotenv::var("WATCH_FULL_INTERVAL_SECONDS")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse::<u64>().ok())
|
||||||
|
.unwrap_or(3600);
|
||||||
|
|
||||||
|
info!("Starting optimized file watcher");
|
||||||
|
info!(" Quick scan interval: {} seconds", quick_interval_secs);
|
||||||
|
info!(" Full scan interval: {} seconds", full_interval_secs);
|
||||||
|
// Surface face-detection state at boot so it's obvious whether
|
||||||
|
// the watcher will hit Apollo. The branch silently no-ops when
|
||||||
|
// disabled (intentional for legacy deploys), which makes "why
|
||||||
|
// aren't faces being detected?" hard to diagnose otherwise.
|
||||||
|
if face_client.is_enabled() {
|
||||||
|
info!(" Face detection: ENABLED");
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
" Face detection: DISABLED (set APOLLO_FACE_API_BASE_URL \
|
||||||
|
or APOLLO_API_BASE_URL to enable)"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
for lib in &libs {
|
||||||
|
info!(
|
||||||
|
" Watching library '{}' (id={}) at {}",
|
||||||
|
lib.name, lib.id, lib.root_path
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create DAOs for tracking processed files
|
||||||
|
let exif_dao = Arc::new(Mutex::new(
|
||||||
|
Box::new(SqliteExifDao::new()) as Box<dyn ExifDao>
|
||||||
|
));
|
||||||
|
let preview_dao = Arc::new(Mutex::new(
|
||||||
|
Box::new(SqlitePreviewDao::new()) as Box<dyn PreviewDao>
|
||||||
|
));
|
||||||
|
let face_dao = Arc::new(Mutex::new(
|
||||||
|
Box::new(faces::SqliteFaceDao::new()) as Box<dyn faces::FaceDao>
|
||||||
|
));
|
||||||
|
// tag_dao for the watcher's auto-bind path. Independent of the
|
||||||
|
// request-handler tag_dao instance — both end up pointing at the
|
||||||
|
// same SQLite file via SqliteTagDao::default().
|
||||||
|
let watcher_tag_dao = Arc::new(Mutex::new(
|
||||||
|
Box::new(SqliteTagDao::default()) as Box<dyn tags::TagDao>
|
||||||
|
));
|
||||||
|
|
||||||
|
let mut last_quick_scan = SystemTime::now();
|
||||||
|
let mut last_full_scan = SystemTime::now();
|
||||||
|
let mut scan_count = 0u64;
|
||||||
|
|
||||||
|
// Per-library cursor for the missing-file scan. Each tick reads
|
||||||
|
// a page from `offset`, stat()s the rows, deletes confirmed-
|
||||||
|
// missing ones, and advances or wraps the cursor. State held
|
||||||
|
// in-memory so a watcher restart resumes from 0 — fine, the
|
||||||
|
// sweep is idempotent.
|
||||||
|
let mut missing_file_offsets: HashMap<i32, i64> = HashMap::new();
|
||||||
|
|
||||||
|
let missing_scan_page_size: i64 = dotenv::var("IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.filter(|n: &i64| *n > 0)
|
||||||
|
.unwrap_or(library_maintenance::DEFAULT_SCAN_PAGE_SIZE);
|
||||||
|
let missing_delete_cap: usize = dotenv::var("IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.filter(|n: &usize| *n > 0)
|
||||||
|
.unwrap_or(library_maintenance::DEFAULT_MISSING_DELETE_CAP);
|
||||||
|
|
||||||
|
// Two-tick orphan-GC consensus state. Carried across ticks via
|
||||||
|
// `OrphanGcState`; see library_maintenance::run_orphan_gc.
|
||||||
|
let mut orphan_gc_state = library_maintenance::OrphanGcState::default();
|
||||||
|
|
||||||
|
// Initial availability sweep before the loop's first sleep so
|
||||||
|
// /libraries reports the truth from the very first request,
|
||||||
|
// rather than the optimistic Online default that
|
||||||
|
// new_health_map seeds. Without this, an unmounted share would
|
||||||
|
// appear online for up to WATCH_QUICK_INTERVAL_SECONDS (default
|
||||||
|
// 60s) after boot. Same probe logic as the per-tick gate
|
||||||
|
// below; no ingest runs here, just the health update + log.
|
||||||
|
// Disabled libraries skip the probe entirely — they should
|
||||||
|
// never enter the health map (treated as out-of-scope).
|
||||||
|
for lib in &libs {
|
||||||
|
if !lib.enabled {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let context = opentelemetry::Context::new();
|
||||||
|
let had_data = exif_dao
|
||||||
|
.lock()
|
||||||
|
.expect("exif_dao poisoned")
|
||||||
|
.count_for_library(&context, lib.id)
|
||||||
|
.map(|n| n > 0)
|
||||||
|
.unwrap_or(false);
|
||||||
|
libraries::refresh_health(&library_health, lib, had_data);
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
std::thread::sleep(Duration::from_secs(quick_interval_secs));
|
||||||
|
|
||||||
|
let now = SystemTime::now();
|
||||||
|
let since_last_full = now
|
||||||
|
.duration_since(last_full_scan)
|
||||||
|
.unwrap_or(Duration::from_secs(0));
|
||||||
|
|
||||||
|
let is_full_scan = since_last_full.as_secs() >= full_interval_secs;
|
||||||
|
|
||||||
|
for lib in &libs {
|
||||||
|
// Operator kill switch: a disabled library is invisible
|
||||||
|
// to the watcher entirely. No probe, no ingest, no
|
||||||
|
// maintenance, no health entry. Distinct from Stale —
|
||||||
|
// Stale is "we wanted to but couldn't"; Disabled is
|
||||||
|
// "we don't want to". Toggle via SQL.
|
||||||
|
if !lib.enabled {
|
||||||
|
debug!(
|
||||||
|
"watcher: skipping library '{}' (id={}) — enabled=false",
|
||||||
|
lib.name, lib.id
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Availability probe: every tick checks that the
|
||||||
|
// library's mount is reachable, is a directory, is
|
||||||
|
// readable, and (if image_exif has rows for it) is
|
||||||
|
// non-empty. A Stale library skips ingest, backlog
|
||||||
|
// drains, and metric refresh — reads/serving in HTTP
|
||||||
|
// handlers continue to work. Branches B/C extend the
|
||||||
|
// probe gate to cover handoff and orphan GC. See
|
||||||
|
// CLAUDE.md "Library availability and safety".
|
||||||
|
let had_data = {
|
||||||
|
let context = opentelemetry::Context::new();
|
||||||
|
let mut guard = exif_dao.lock().expect("exif_dao poisoned");
|
||||||
|
guard
|
||||||
|
.count_for_library(&context, lib.id)
|
||||||
|
.map(|n| n > 0)
|
||||||
|
.unwrap_or(false)
|
||||||
|
};
|
||||||
|
let health = libraries::refresh_health(&library_health, lib, had_data);
|
||||||
|
if !health.is_online() {
|
||||||
|
// Skip every write path for this library this tick.
|
||||||
|
// Don't refresh the media-count gauge either — a
|
||||||
|
// probe-failed library would otherwise flap to 0
|
||||||
|
// image / 0 video and pollute Prometheus.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drain the unhashed-hash backlog AND the face-detection
|
||||||
|
// backlog every tick, regardless of quick/full. Quick
|
||||||
|
// scans only walk recently-modified files, so the
|
||||||
|
// pre-Phase-3 backlog never enters their candidate set
|
||||||
|
// — without these standalone passes, backfill +
|
||||||
|
// detection only progressed during full scans
|
||||||
|
// (default once an hour).
|
||||||
|
// Effective excludes for this library: global env-var
|
||||||
|
// ∪ row's excluded_dirs. Compute once per tick — used
|
||||||
|
// by every walker below for this library.
|
||||||
|
let effective_excludes = lib.effective_excluded_dirs(&excluded_dirs);
|
||||||
|
|
||||||
|
if face_client.is_enabled() {
|
||||||
|
let context = opentelemetry::Context::new();
|
||||||
|
backfill::backfill_unhashed_backlog(&context, lib, &exif_dao);
|
||||||
|
backfill::process_face_backlog(
|
||||||
|
&context,
|
||||||
|
lib,
|
||||||
|
&face_client,
|
||||||
|
&face_dao,
|
||||||
|
&watcher_tag_dao,
|
||||||
|
&effective_excludes,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Date-taken backfill: drain rows whose canonical date is
|
||||||
|
// either unresolved or only fs_time-sourced. Independent
|
||||||
|
// of face detection — runs even on deploys that don't
|
||||||
|
// configure Apollo, since `/memories` depends on it.
|
||||||
|
{
|
||||||
|
let context = opentelemetry::Context::new();
|
||||||
|
backfill::backfill_missing_date_taken(&context, lib, &exif_dao);
|
||||||
|
}
|
||||||
|
|
||||||
|
if is_full_scan {
|
||||||
|
info!(
|
||||||
|
"Running full scan for library '{}' (scan #{})",
|
||||||
|
lib.name, scan_count
|
||||||
|
);
|
||||||
|
process_new_files(
|
||||||
|
lib,
|
||||||
|
Arc::clone(&exif_dao),
|
||||||
|
Arc::clone(&preview_dao),
|
||||||
|
Arc::clone(&face_dao),
|
||||||
|
Arc::clone(&watcher_tag_dao),
|
||||||
|
face_client.clone(),
|
||||||
|
&effective_excludes,
|
||||||
|
None,
|
||||||
|
playlist_manager.clone(),
|
||||||
|
preview_generator.clone(),
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
"Running quick scan for library '{}' (checking files modified in last {} seconds)",
|
||||||
|
lib.name,
|
||||||
|
quick_interval_secs + 10
|
||||||
|
);
|
||||||
|
let check_since = last_quick_scan
|
||||||
|
.checked_sub(Duration::from_secs(10))
|
||||||
|
.unwrap_or(last_quick_scan);
|
||||||
|
process_new_files(
|
||||||
|
lib,
|
||||||
|
Arc::clone(&exif_dao),
|
||||||
|
Arc::clone(&preview_dao),
|
||||||
|
Arc::clone(&face_dao),
|
||||||
|
Arc::clone(&watcher_tag_dao),
|
||||||
|
face_client.clone(),
|
||||||
|
&effective_excludes,
|
||||||
|
Some(check_since),
|
||||||
|
playlist_manager.clone(),
|
||||||
|
preview_generator.clone(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update media counts per library (metric aggregates across all)
|
||||||
|
thumbnails::update_media_counts(Path::new(&lib.root_path), &effective_excludes);
|
||||||
|
|
||||||
|
// Missing-file detection: prune image_exif rows whose
|
||||||
|
// source file is no longer on disk. Per-library, so we
|
||||||
|
// pass library-online-this-tick implicitly (we only
|
||||||
|
// reach here if the probe gate at the top of the
|
||||||
|
// iteration passed). Capped + paginated so a huge
|
||||||
|
// library doesn't stall the watcher; rows we don't
|
||||||
|
// visit this tick get visited next tick. See
|
||||||
|
// library_maintenance::detect_missing_files_for_library.
|
||||||
|
{
|
||||||
|
let context = opentelemetry::Context::new();
|
||||||
|
let offset = missing_file_offsets.get(&lib.id).copied().unwrap_or(0);
|
||||||
|
let (deleted, next_offset) =
|
||||||
|
library_maintenance::detect_missing_files_for_library(
|
||||||
|
&context,
|
||||||
|
lib,
|
||||||
|
&exif_dao,
|
||||||
|
offset,
|
||||||
|
missing_scan_page_size,
|
||||||
|
missing_delete_cap,
|
||||||
|
);
|
||||||
|
missing_file_offsets.insert(lib.id, next_offset);
|
||||||
|
if deleted > 0 {
|
||||||
|
debug!(
|
||||||
|
"missing-file scan: library '{}' next_offset={}",
|
||||||
|
lib.name, next_offset
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reconciliation: cross-library, so it runs once per tick
|
||||||
|
// outside the per-library loop. Idempotent — fast no-op when
|
||||||
|
// there's nothing to do. Operates on the database alone, no
|
||||||
|
// filesystem dependency, so it doesn't need a health gate.
|
||||||
|
// See database::reconcile and CLAUDE.md "Multi-library data
|
||||||
|
// model" for the rules.
|
||||||
|
{
|
||||||
|
let mut conn = image_api::database::connect();
|
||||||
|
let _ = image_api::database::reconcile::run(&mut conn);
|
||||||
|
|
||||||
|
// Back-ref refresh: hash-keyed rows whose
|
||||||
|
// (library_id, rel_path) tuple no longer matches any
|
||||||
|
// image_exif row but whose hash still does. After a
|
||||||
|
// recent→archive move, the missing-file scan removes
|
||||||
|
// the old image_exif row; this pass repoints face /
|
||||||
|
// tag / insight back-refs at the surviving location.
|
||||||
|
// DB-only, no health gate needed — uses what's in
|
||||||
|
// image_exif as truth.
|
||||||
|
let _ = library_maintenance::refresh_back_refs(&mut conn);
|
||||||
|
|
||||||
|
// Orphan GC: the destructive end of the maintenance
|
||||||
|
// pipeline. Two-tick consensus + every-library-online
|
||||||
|
// requirement is enforced inside run_orphan_gc; we
|
||||||
|
// pass the current all-online flag and the function
|
||||||
|
// tracks the previous tick's flag in OrphanGcState.
|
||||||
|
let all_online = library_maintenance::all_libraries_online(&libs, &library_health);
|
||||||
|
let _ =
|
||||||
|
library_maintenance::run_orphan_gc(&mut conn, &mut orphan_gc_state, all_online);
|
||||||
|
}
|
||||||
|
|
||||||
|
if is_full_scan {
|
||||||
|
last_full_scan = now;
|
||||||
|
}
|
||||||
|
last_quick_scan = now;
|
||||||
|
scan_count += 1;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if a playlist needs to be (re)generated.
|
||||||
|
///
|
||||||
|
/// Returns true if:
|
||||||
|
/// - Playlist doesn't exist, OR
|
||||||
|
/// - Source video is newer than the playlist
|
||||||
|
///
|
||||||
|
/// When metadata for either path is unreadable, returns true so the
|
||||||
|
/// caller errs on the side of regeneration (a redundant transcode
|
||||||
|
/// beats a stale playlist).
|
||||||
|
pub fn playlist_needs_generation(video_path: &Path, playlist_path: &Path) -> bool {
|
||||||
|
if !playlist_path.exists() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if source video is newer than playlist
|
||||||
|
if let (Ok(video_meta), Ok(playlist_meta)) = (
|
||||||
|
std::fs::metadata(video_path),
|
||||||
|
std::fs::metadata(playlist_path),
|
||||||
|
) && let (Ok(video_modified), Ok(playlist_modified)) =
|
||||||
|
(video_meta.modified(), playlist_meta.modified())
|
||||||
|
{
|
||||||
|
return video_modified > playlist_modified;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we can't determine, assume it needs generation
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn process_new_files(
|
||||||
|
library: &libraries::Library,
|
||||||
|
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
|
||||||
|
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
|
||||||
|
face_dao: Arc<Mutex<Box<dyn faces::FaceDao>>>,
|
||||||
|
tag_dao: Arc<Mutex<Box<dyn tags::TagDao>>>,
|
||||||
|
face_client: crate::ai::face_client::FaceClient,
|
||||||
|
excluded_dirs: &[String],
|
||||||
|
modified_since: Option<SystemTime>,
|
||||||
|
playlist_manager: Addr<VideoPlaylistManager>,
|
||||||
|
preview_generator: Addr<video::actors::PreviewClipGenerator>,
|
||||||
|
) {
|
||||||
|
let context = opentelemetry::Context::new();
|
||||||
|
let thumbs = dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
|
||||||
|
let thumbnail_directory = Path::new(&thumbs);
|
||||||
|
let base_path = Path::new(&library.root_path);
|
||||||
|
|
||||||
|
// Walk, prune EXCLUDED_DIRS subtrees, and apply image/video + modified_since
|
||||||
|
// filters. See `file_scan` for why exclusion has to happen at WalkDir
|
||||||
|
// time (filter_entry) rather than at face-detect time.
|
||||||
|
let files: Vec<(PathBuf, String)> =
|
||||||
|
image_api::file_scan::enumerate_indexable_files(base_path, excluded_dirs, modified_since);
|
||||||
|
|
||||||
|
if files.is_empty() {
|
||||||
|
debug!("No files to process");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("Found {} files to check", files.len());
|
||||||
|
|
||||||
|
// Batch query: Get all EXIF data for these files in one query
|
||||||
|
let file_paths: Vec<String> = files.iter().map(|(_, rel_path)| rel_path.clone()).collect();
|
||||||
|
|
||||||
|
let existing_exif_paths: HashMap<String, bool> = {
|
||||||
|
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||||
|
// Walk is per-library, so scope the lookup so a same-named file
|
||||||
|
// in another library doesn't make this one look already-indexed.
|
||||||
|
match dao.get_exif_batch(&context, Some(library.id), &file_paths) {
|
||||||
|
Ok(exif_records) => exif_records
|
||||||
|
.into_iter()
|
||||||
|
.map(|record| (record.file_path, true))
|
||||||
|
.collect(),
|
||||||
|
Err(e) => {
|
||||||
|
error!("Error batch querying EXIF data: {:?}", e);
|
||||||
|
HashMap::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut new_files_found = false;
|
||||||
|
let mut files_needing_row = Vec::new();
|
||||||
|
|
||||||
|
// Register every image/video file in image_exif. Rows without EXIF
|
||||||
|
// still carry library_id, rel_path, content_hash, and size_bytes so
|
||||||
|
// derivative dedup and DB-indexed sort/filter work for every file,
|
||||||
|
// not just photos with parseable EXIF.
|
||||||
|
for (file_path, relative_path) in &files {
|
||||||
|
// Check both the library-scoped legacy path (current shape) and
|
||||||
|
// the bare-legacy path (pre-multi-library shape). Either one
|
||||||
|
// existing means a thumbnail is already on disk for this file.
|
||||||
|
let scoped_thumb_path = content_hash::library_scoped_legacy_path(
|
||||||
|
thumbnail_directory,
|
||||||
|
library.id,
|
||||||
|
relative_path,
|
||||||
|
);
|
||||||
|
let bare_legacy_thumb_path = thumbnail_directory.join(relative_path);
|
||||||
|
let needs_thumbnail = !scoped_thumb_path.exists()
|
||||||
|
&& !bare_legacy_thumb_path.exists()
|
||||||
|
&& !thumbnails::unsupported_thumbnail_sentinel(&scoped_thumb_path).exists()
|
||||||
|
&& !thumbnails::unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists();
|
||||||
|
let needs_row = !existing_exif_paths.contains_key(relative_path);
|
||||||
|
|
||||||
|
if needs_thumbnail || needs_row {
|
||||||
|
new_files_found = true;
|
||||||
|
|
||||||
|
if needs_thumbnail {
|
||||||
|
info!("New file detected (missing thumbnail): {}", relative_path);
|
||||||
|
}
|
||||||
|
|
||||||
|
if needs_row {
|
||||||
|
files_needing_row.push((file_path.clone(), relative_path.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !files_needing_row.is_empty() {
|
||||||
|
info!(
|
||||||
|
"Registering {} new files in image_exif",
|
||||||
|
files_needing_row.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
for (file_path, relative_path) in files_needing_row {
|
||||||
|
let timestamp = Utc::now().timestamp();
|
||||||
|
|
||||||
|
// Hash + size from filesystem metadata — always attempted so
|
||||||
|
// every file gets a content_hash, even when EXIF is absent.
|
||||||
|
let (content_hash, size_bytes) = match content_hash::compute(&file_path) {
|
||||||
|
Ok(id) => (Some(id.content_hash), Some(id.size_bytes)),
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to hash {}: {:?}", file_path.display(), e);
|
||||||
|
(None, None)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Perceptual hashes (pHash + dHash). Best-effort — None for
|
||||||
|
// videos and decode failures. Drives near-duplicate detection
|
||||||
|
// in the Apollo duplicates surface; failure here is non-fatal
|
||||||
|
// and never blocks indexing.
|
||||||
|
let perceptual = perceptual_hash::compute(&file_path);
|
||||||
|
|
||||||
|
// EXIF is best-effort enrichment. When extraction fails (or the
|
||||||
|
// file type doesn't support EXIF) we still store a row with all
|
||||||
|
// EXIF fields NULL; the file remains visible to sort-by-date
|
||||||
|
// and tag queries via its rel_path and filesystem timestamps.
|
||||||
|
let exif_fields = if exif::supports_exif(&file_path) {
|
||||||
|
match exif::extract_exif_from_path(&file_path) {
|
||||||
|
Ok(data) => Some(data),
|
||||||
|
Err(e) => {
|
||||||
|
debug!(
|
||||||
|
"No EXIF or parse error for {}: {:?}",
|
||||||
|
file_path.display(),
|
||||||
|
e
|
||||||
|
);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
// Canonical date_taken via the waterfall — kamadak-exif (already
|
||||||
|
// computed above) → exiftool fallback for videos / MakerNote /
|
||||||
|
// QuickTime → filename regex → earliest_fs_time. Source is
|
||||||
|
// recorded so the per-tick backfill drain can re-run weak
|
||||||
|
// resolutions later.
|
||||||
|
let resolved_date = date_resolver::resolve_date_taken(
|
||||||
|
&file_path,
|
||||||
|
exif_fields.as_ref().and_then(|e| e.date_taken),
|
||||||
|
);
|
||||||
|
|
||||||
|
let insert_exif = InsertImageExif {
|
||||||
|
library_id: library.id,
|
||||||
|
file_path: relative_path.clone(),
|
||||||
|
camera_make: exif_fields.as_ref().and_then(|e| e.camera_make.clone()),
|
||||||
|
camera_model: exif_fields.as_ref().and_then(|e| e.camera_model.clone()),
|
||||||
|
lens_model: exif_fields.as_ref().and_then(|e| e.lens_model.clone()),
|
||||||
|
width: exif_fields.as_ref().and_then(|e| e.width),
|
||||||
|
height: exif_fields.as_ref().and_then(|e| e.height),
|
||||||
|
orientation: exif_fields.as_ref().and_then(|e| e.orientation),
|
||||||
|
gps_latitude: exif_fields
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|e| e.gps_latitude.map(|v| v as f32)),
|
||||||
|
gps_longitude: exif_fields
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|e| e.gps_longitude.map(|v| v as f32)),
|
||||||
|
gps_altitude: exif_fields
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|e| e.gps_altitude.map(|v| v as f32)),
|
||||||
|
focal_length: exif_fields
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|e| e.focal_length.map(|v| v as f32)),
|
||||||
|
aperture: exif_fields
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|e| e.aperture.map(|v| v as f32)),
|
||||||
|
shutter_speed: exif_fields.as_ref().and_then(|e| e.shutter_speed.clone()),
|
||||||
|
iso: exif_fields.as_ref().and_then(|e| e.iso),
|
||||||
|
date_taken: resolved_date.map(|r| r.timestamp),
|
||||||
|
created_time: timestamp,
|
||||||
|
last_modified: timestamp,
|
||||||
|
content_hash,
|
||||||
|
size_bytes,
|
||||||
|
phash_64: perceptual.map(|h| h.phash_64),
|
||||||
|
dhash_64: perceptual.map(|h| h.dhash_64),
|
||||||
|
date_taken_source: resolved_date.map(|r| r.source.as_str().to_string()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||||
|
if let Err(e) = dao.store_exif(&context, insert_exif) {
|
||||||
|
error!(
|
||||||
|
"Failed to register {} in image_exif: {:?}",
|
||||||
|
relative_path, e
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
debug!("Registered {} in image_exif", relative_path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Face detection pass ────────────────────────────────────────────
|
||||||
|
// Run after EXIF writes so newly-registered files have their
|
||||||
|
// content_hash populated. Skipped wholesale when face_client is
|
||||||
|
// disabled (no Apollo integration configured) — Phase 3 wires this
|
||||||
|
// up; the watcher remains usable on legacy deploys.
|
||||||
|
if face_client.is_enabled() {
|
||||||
|
// Opportunistic content_hash backfill: photos indexed before
|
||||||
|
// content-hashing landed (or where the hash compute failed
|
||||||
|
// silently on insert) end up in image_exif with NULL
|
||||||
|
// content_hash. build_face_candidates keys on content_hash, so
|
||||||
|
// those files would never become candidates without backfill.
|
||||||
|
// Idempotent — subsequent scans see the populated hashes and
|
||||||
|
// no-op. The dedicated `backfill_hashes` binary is still the
|
||||||
|
// right tool for very large legacy libraries; this branch
|
||||||
|
// ensures small/medium deploys self-heal without operator
|
||||||
|
// action.
|
||||||
|
backfill::backfill_missing_content_hashes(&context, &files, library, &exif_dao);
|
||||||
|
let candidates =
|
||||||
|
backfill::build_face_candidates(&context, library, &files, &exif_dao, &face_dao);
|
||||||
|
debug!(
|
||||||
|
"face_watch: scan tick — {} image file(s) walked, {} candidate(s) (library '{}', modified_since={})",
|
||||||
|
files
|
||||||
|
.iter()
|
||||||
|
.filter(|(p, _)| !file_types::is_video_file(p))
|
||||||
|
.count(),
|
||||||
|
candidates.len(),
|
||||||
|
library.name,
|
||||||
|
modified_since.is_some(),
|
||||||
|
);
|
||||||
|
if !candidates.is_empty() {
|
||||||
|
face_watch::run_face_detection_pass(
|
||||||
|
library,
|
||||||
|
excluded_dirs,
|
||||||
|
&face_client,
|
||||||
|
Arc::clone(&face_dao),
|
||||||
|
Arc::clone(&tag_dao),
|
||||||
|
candidates,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for videos that need HLS playlists
|
||||||
|
let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
|
||||||
|
let mut videos_needing_playlists = Vec::new();
|
||||||
|
|
||||||
|
for (file_path, _relative_path) in &files {
|
||||||
|
if file_types::is_video_file(file_path) {
|
||||||
|
// Construct expected playlist path
|
||||||
|
let playlist_filename =
|
||||||
|
format!("{}.m3u8", file_path.file_name().unwrap().to_string_lossy());
|
||||||
|
let playlist_path = Path::new(&video_path_base).join(&playlist_filename);
|
||||||
|
|
||||||
|
// Check if playlist needs (re)generation
|
||||||
|
if playlist_needs_generation(file_path, &playlist_path) {
|
||||||
|
videos_needing_playlists.push(file_path.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send queue request to playlist manager
|
||||||
|
if !videos_needing_playlists.is_empty() {
|
||||||
|
playlist_manager.do_send(QueueVideosMessage {
|
||||||
|
video_paths: videos_needing_playlists,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for videos that need preview clips
|
||||||
|
// Collect (full_path, relative_path) for video files
|
||||||
|
let video_files: Vec<(String, String)> = files
|
||||||
|
.iter()
|
||||||
|
.filter(|(file_path, _)| file_types::is_video_file(file_path))
|
||||||
|
.map(|(file_path, rel_path)| (file_path.to_string_lossy().to_string(), rel_path.clone()))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if !video_files.is_empty() {
|
||||||
|
// Query DB using relative paths (consistent with how GET/POST handlers store them)
|
||||||
|
let video_rel_paths: Vec<String> = video_files.iter().map(|(_, rel)| rel.clone()).collect();
|
||||||
|
|
||||||
|
let existing_previews: HashMap<String, String> = {
|
||||||
|
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
|
||||||
|
match dao.get_previews_batch(&context, &video_rel_paths) {
|
||||||
|
Ok(clips) => clips
|
||||||
|
.into_iter()
|
||||||
|
.map(|clip| (clip.file_path, clip.status))
|
||||||
|
.collect(),
|
||||||
|
Err(e) => {
|
||||||
|
error!("Error batch querying preview clips: {:?}", e);
|
||||||
|
HashMap::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for (full_path, relative_path) in &video_files {
|
||||||
|
let status = existing_previews.get(relative_path).map(|s| s.as_str());
|
||||||
|
let needs_preview = match status {
|
||||||
|
None => true, // No record at all
|
||||||
|
Some("failed") => true, // Retry failed
|
||||||
|
Some("pending") => true, // Stale pending from previous run
|
||||||
|
_ => false, // processing or complete
|
||||||
|
};
|
||||||
|
|
||||||
|
if needs_preview {
|
||||||
|
// Insert pending record using relative path
|
||||||
|
if status.is_none() {
|
||||||
|
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
|
||||||
|
let _ = dao.insert_preview(&context, relative_path, "pending");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send full path in the message — the actor will derive relative path from it
|
||||||
|
preview_generator.do_send(GeneratePreviewClipMessage {
|
||||||
|
video_path: full_path.clone(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate thumbnails for all files that need them
|
||||||
|
if new_files_found {
|
||||||
|
info!("Processing thumbnails for new files...");
|
||||||
|
thumbnails::create_thumbnails(std::slice::from_ref(library), excluded_dirs);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reconciliation: on a full scan, prune image_exif rows whose rel_path no
|
||||||
|
// longer exists on disk for this library. Keeps the DB in parity so
|
||||||
|
// downstream DB-backed listings (e.g. recursive /photos) don't return
|
||||||
|
// phantom files. Skipped on quick scans — those only look at recently
|
||||||
|
// modified files and can't distinguish "missing" from "unchanged".
|
||||||
|
if modified_since.is_none() {
|
||||||
|
let disk_paths: HashSet<String> = files.iter().map(|(_, rel)| rel.clone()).collect();
|
||||||
|
let db_paths: Vec<String> = {
|
||||||
|
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||||
|
dao.get_rel_paths_for_library(&context, library.id)
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
error!(
|
||||||
|
"Reconciliation: failed to load image_exif rel_paths for lib {}: {:?}",
|
||||||
|
library.id, e
|
||||||
|
);
|
||||||
|
Vec::new()
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let stale: Vec<String> = db_paths
|
||||||
|
.into_iter()
|
||||||
|
.filter(|p| !disk_paths.contains(p))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if !stale.is_empty() {
|
||||||
|
info!(
|
||||||
|
"Reconciliation: pruning {} stale image_exif rows for library '{}'",
|
||||||
|
stale.len(),
|
||||||
|
library.name
|
||||||
|
);
|
||||||
|
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||||
|
for rel in &stale {
|
||||||
|
if let Err(e) = dao.delete_exif_by_library(&context, library.id, rel) {
|
||||||
|
warn!(
|
||||||
|
"Reconciliation: failed to delete {} (lib {}): {:?}",
|
||||||
|
rel, library.id, e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::fs;
|
||||||
|
use std::thread::sleep;
|
||||||
|
use std::time::Duration as StdDuration;
|
||||||
|
use tempfile::TempDir;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn playlist_needs_generation_true_when_playlist_missing() {
|
||||||
|
let tmp = TempDir::new().unwrap();
|
||||||
|
let video = tmp.path().join("clip.mp4");
|
||||||
|
fs::write(&video, b"v").unwrap();
|
||||||
|
let playlist = tmp.path().join("clip.mp4.m3u8");
|
||||||
|
// playlist does not exist
|
||||||
|
assert!(playlist_needs_generation(&video, &playlist));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn playlist_needs_generation_false_when_playlist_is_newer() {
|
||||||
|
let tmp = TempDir::new().unwrap();
|
||||||
|
let video = tmp.path().join("clip.mp4");
|
||||||
|
fs::write(&video, b"v").unwrap();
|
||||||
|
// Sleep to guarantee a distinct mtime for the playlist created next.
|
||||||
|
// Many filesystems have ~10 ms mtime resolution; 50 ms is plenty.
|
||||||
|
sleep(StdDuration::from_millis(50));
|
||||||
|
let playlist = tmp.path().join("clip.mp4.m3u8");
|
||||||
|
fs::write(&playlist, b"#EXTM3U").unwrap();
|
||||||
|
assert!(!playlist_needs_generation(&video, &playlist));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn playlist_needs_generation_true_when_video_is_newer() {
|
||||||
|
let tmp = TempDir::new().unwrap();
|
||||||
|
let playlist = tmp.path().join("clip.mp4.m3u8");
|
||||||
|
fs::write(&playlist, b"#EXTM3U").unwrap();
|
||||||
|
sleep(StdDuration::from_millis(50));
|
||||||
|
let video = tmp.path().join("clip.mp4");
|
||||||
|
fs::write(&video, b"v").unwrap();
|
||||||
|
assert!(playlist_needs_generation(&video, &playlist));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn playlist_needs_generation_true_when_video_missing_metadata() {
|
||||||
|
// Video doesn't exist; metadata fails for it. Falls through to the
|
||||||
|
// "assume needs regeneration" branch.
|
||||||
|
let tmp = TempDir::new().unwrap();
|
||||||
|
let video = tmp.path().join("missing.mp4");
|
||||||
|
let playlist = tmp.path().join("missing.mp4.m3u8");
|
||||||
|
fs::write(&playlist, b"#EXTM3U").unwrap();
|
||||||
|
assert!(playlist_needs_generation(&video, &playlist));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user