Files
ImageApi/src/main.rs
Cameron Cordes b42acbb3f3 fmt: cargo fmt sweep across drifted files
No behavior change — purely whitespace/line-break cleanup that had
accumulated since the last format run.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 16:42:41 -04:00

3526 lines
140 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#![allow(clippy::too_many_arguments)]
#[macro_use]
extern crate diesel;
extern crate rayon;
use actix::Addr;
use actix_web::web::Data;
use actix_web_prom::PrometheusMetricsBuilder;
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
use futures::stream::StreamExt;
use lazy_static::lazy_static;
use prometheus::{self, IntGauge};
use std::error::Error;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use std::{
collections::{HashMap, HashSet},
io::prelude::*,
};
use std::{env, fs::File};
use std::{
io::ErrorKind,
path::{Path, PathBuf},
};
use walkdir::{DirEntry, WalkDir};
use actix_cors::Cors;
use actix_files::NamedFile;
use actix_governor::{Governor, GovernorConfigBuilder};
use actix_multipart as mp;
use actix_web::{
App, HttpRequest, HttpResponse, HttpServer, Responder, delete, get, middleware, post, put,
web::{self, BufMut, BytesMut},
};
use chrono::Utc;
use diesel::sqlite::Sqlite;
use rayon::prelude::*;
use urlencoding::decode;
use crate::ai::InsightGenerator;
use crate::auth::login;
use crate::data::*;
use crate::database::models::{ImageExif, InsertImageExif};
use crate::database::*;
use crate::files::{
RealFileSystem, RefreshThumbnailsMessage, is_image_or_video, is_valid_full_path, move_file,
};
use crate::otel::{extract_context_from_request, global_tracer};
use crate::service::ServiceBuilder;
use crate::state::AppState;
use crate::tags::*;
use crate::video::actors::{
GeneratePreviewClipMessage, ProcessMessage, QueueVideosMessage, ScanDirectoryMessage,
VideoPlaylistManager, create_playlist, generate_image_thumbnail_ffmpeg,
generate_video_thumbnail,
};
use log::{debug, error, info, trace, warn};
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
use opentelemetry::{KeyValue, global};
mod ai;
mod auth;
mod content_hash;
mod data;
mod database;
mod date_resolver;
mod duplicates;
mod error;
mod exif;
mod face_watch;
mod faces;
mod file_types;
mod files;
mod geo;
mod libraries;
mod library_maintenance;
mod perceptual_hash;
mod state;
mod tags;
mod utils;
mod video;
mod knowledge;
mod memories;
mod otel;
mod service;
#[cfg(test)]
mod testhelpers;
lazy_static! {
static ref IMAGE_GAUGE: IntGauge = IntGauge::new(
"imageserver_image_total",
"Count of the images on the server"
)
.unwrap();
static ref VIDEO_GAUGE: IntGauge = IntGauge::new(
"imageserver_video_total",
"Count of the videos on the server"
)
.unwrap();
}
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
#[get("/image")]
async fn get_image(
_claims: Claims,
request: HttpRequest,
req: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
exif_dao: Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_image", &context);
// Resolve library from query param; default to primary so clients that
// don't yet send `library=` continue to work.
let library = match libraries::resolve_library_param(&app_state, req.library.as_deref()) {
Ok(Some(lib)) => lib,
Ok(None) => app_state.primary_library(),
Err(msg) => {
span.set_status(Status::error(msg.clone()));
return HttpResponse::BadRequest().body(msg);
}
};
// Union-mode search returns flat rel_paths with no library attribution,
// so clients may request a file under the wrong library. Try the
// resolved library first; if the file isn't there, fall back to any
// other library holding that rel_path on disk.
let resolved = is_valid_full_path(&library.root_path, &req.path, false)
.filter(|p| p.exists())
.map(|p| (library, p))
.or_else(|| {
app_state.libraries.iter().find_map(|lib| {
if lib.id == library.id {
return None;
}
is_valid_full_path(&lib.root_path, &req.path, false)
.filter(|p| p.exists())
.map(|p| (lib, p))
})
});
if let Some((library, path)) = resolved {
let image_size = req.size.unwrap_or(PhotoSize::Full);
if image_size == PhotoSize::Thumb {
let relative_path = path
.strip_prefix(&library.root_path)
.expect("Error stripping library root prefix from thumbnail");
let relative_path_str = relative_path.to_string_lossy().replace('\\', "/");
let thumbs = &app_state.thumbnail_path;
let bare_legacy_thumb_path = Path::new(&thumbs).join(relative_path);
let scoped_legacy_thumb_path = content_hash::library_scoped_legacy_path(
Path::new(&thumbs),
library.id,
relative_path,
);
// Gif thumbnails are a separate lookup (video GIF previews).
// Dual-lookup for gif is out of scope; preserve existing flow.
if req.format == Some(ThumbnailFormat::Gif) && is_video_file(&path) {
let mut gif_path = Path::new(&app_state.gif_path).join(relative_path);
gif_path.set_extension("gif");
trace!("Gif thumbnail path: {:?}", gif_path);
if let Ok(file) = NamedFile::open(&gif_path) {
span.set_status(Status::Ok);
return file
.use_etag(true)
.use_last_modified(true)
.prefer_utf8(true)
.into_response(&request);
}
}
// Lookup chain (most-specific first, falling back as we miss):
// 1. hash-keyed (`<thumbs>/<hash[..2]>/<hash>.jpg`) — content
// identity, shared across libraries;
// 2. library-scoped legacy (`<thumbs>/<lib_id>/<rel_path>`) —
// written by current generation when hash isn't known;
// 3. bare legacy (`<thumbs>/<rel_path>`) — pre-multi-library
// thumbs from the days before library prefixing existed.
// Stage (3) goes away once a one-time migration lifts every
// bare-legacy file under a library prefix; until then it
// prevents needless 404s for already-warmed deployments.
let hash_thumb_path: Option<PathBuf> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
match dao.get_exif(&context, &relative_path_str) {
Ok(Some(row)) => row
.content_hash
.as_deref()
.map(|h| content_hash::thumbnail_path(Path::new(thumbs), h)),
_ => None,
}
};
let thumb_path = hash_thumb_path
.as_ref()
.filter(|p| p.exists())
.cloned()
.or_else(|| {
if scoped_legacy_thumb_path.exists() {
Some(scoped_legacy_thumb_path.clone())
} else {
None
}
})
.unwrap_or_else(|| bare_legacy_thumb_path.clone());
// Handle circular thumbnail request
if req.shape == Some(ThumbnailShape::Circle) {
match create_circular_thumbnail(&thumb_path, thumbs).await {
Ok(circular_path) => {
if let Ok(file) = NamedFile::open(&circular_path) {
span.set_status(Status::Ok);
return file
.use_etag(true)
.use_last_modified(true)
.prefer_utf8(true)
.into_response(&request);
}
}
Err(e) => {
warn!("Failed to create circular thumbnail: {:?}", e);
// Fall through to serve square thumbnail
}
}
}
trace!("Thumbnail path: {:?}", thumb_path);
if let Ok(file) = NamedFile::open(&thumb_path) {
span.set_status(Status::Ok);
return file
.use_etag(true)
.use_last_modified(true)
.prefer_utf8(true)
.into_response(&request);
}
}
// Full-size requests for RAW formats (NEF/CR2/ARW/etc.) can't just
// NamedFile-stream the original bytes — browsers won't decode the
// RAW container, so a `<img src=...>` lands as a broken image. Serve
// the embedded JPEG preview instead (typically the camera's in-body
// review JPEG, ~12 MP). Falls through to NamedFile if no preview is
// available, which preserves the historical behavior for callers
// that genuinely want the original bytes.
if image_size == PhotoSize::Full && exif::is_tiff_raw(&path) {
if let Some(preview) = exif::extract_embedded_jpeg_preview(&path) {
span.set_status(Status::Ok);
return HttpResponse::Ok()
.content_type("image/jpeg")
.insert_header(("Cache-Control", "public, max-age=3600"))
.body(preview);
}
}
if let Ok(file) = NamedFile::open(&path) {
span.set_status(Status::Ok);
// Enable ETag and set cache headers for full images (1 hour cache)
return file
.use_etag(true)
.use_last_modified(true)
.prefer_utf8(true)
.into_response(&request);
}
span.set_status(Status::error("Not found"));
HttpResponse::NotFound().finish()
} else {
span.set_status(Status::error("Not found"));
error!("Path does not exist in any library: {}", req.path);
HttpResponse::NotFound().finish()
}
}
fn is_video_file(path: &Path) -> bool {
use image_api::file_types;
file_types::is_video_file(path)
}
async fn create_circular_thumbnail(
thumb_path: &Path,
thumbs_dir: &str,
) -> Result<PathBuf, Box<dyn Error>> {
use image::{GenericImageView, ImageBuffer, Rgba};
// Create circular thumbnails directory
let circular_dir = Path::new(thumbs_dir).join("_circular");
// Get relative path from thumbs_dir to create same structure
let relative_to_thumbs = thumb_path.strip_prefix(thumbs_dir)?;
let circular_path = circular_dir.join(relative_to_thumbs).with_extension("png");
// Check if circular thumbnail already exists
if circular_path.exists() {
return Ok(circular_path);
}
// Create parent directory if needed
if let Some(parent) = circular_path.parent() {
std::fs::create_dir_all(parent)?;
}
// Load the square thumbnail
let img = image::open(thumb_path)?;
let (width, height) = img.dimensions();
// Fixed output size for consistency
let output_size = 80u32;
let radius = output_size as f32 / 2.0;
// Calculate crop area to get square center of original image
let crop_size = width.min(height);
let crop_x = (width - crop_size) / 2;
let crop_y = (height - crop_size) / 2;
// Create a new RGBA image with transparency
let output = ImageBuffer::from_fn(output_size, output_size, |x, y| {
let dx = x as f32 - radius;
let dy = y as f32 - radius;
let distance = (dx * dx + dy * dy).sqrt();
if distance <= radius {
// Inside circle - map to cropped source area
// Scale from output coordinates to crop coordinates
let scale = crop_size as f32 / output_size as f32;
let src_x = crop_x + (x as f32 * scale) as u32;
let src_y = crop_y + (y as f32 * scale) as u32;
let pixel = img.get_pixel(src_x, src_y);
Rgba([pixel[0], pixel[1], pixel[2], 255])
} else {
// Outside circle - transparent
Rgba([0, 0, 0, 0])
}
});
// Save as PNG (supports transparency)
output.save(&circular_path)?;
Ok(circular_path)
}
#[get("/image/metadata")]
async fn get_file_metadata(
_: Claims,
request: HttpRequest,
path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
exif_dao: Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_file_metadata", &context);
let span_context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
let library = libraries::resolve_library_param(&app_state, path.library.as_deref())
.ok()
.flatten()
.unwrap_or_else(|| app_state.primary_library());
// Fall back to other libraries if the file isn't under the resolved one,
// matching the `/image` handler so union-mode search results resolve.
let resolved = is_valid_full_path(&library.root_path, &path.path, false)
.filter(|p| p.exists())
.map(|p| (library, p))
.or_else(|| {
app_state.libraries.iter().find_map(|lib| {
if lib.id == library.id {
return None;
}
is_valid_full_path(&lib.root_path, &path.path, false)
.filter(|p| p.exists())
.map(|p| (lib, p))
})
});
match resolved
.ok_or_else(|| ErrorKind::InvalidData.into())
.and_then(|(lib, full_path)| {
File::open(&full_path)
.and_then(|file| file.metadata())
.map(|metadata| (lib, metadata))
}) {
Ok((resolved_library, metadata)) => {
let mut response: MetadataResponse = metadata.into();
response.library_id = Some(resolved_library.id);
response.library_name = Some(resolved_library.name.clone());
// Extract date from filename if possible
response.filename_date =
memories::extract_date_from_filename(&path.path).map(|dt| dt.timestamp());
// Query EXIF data if available
if let Ok(mut dao) = exif_dao.lock()
&& let Ok(Some(exif)) = dao.get_exif(&span_context, &path.path)
{
response.exif = Some(exif.into());
}
span.add_event(
"Metadata fetched",
vec![KeyValue::new("file", path.path.clone())],
);
span.set_status(Status::Ok);
HttpResponse::Ok().json(response)
}
Err(e) => {
let message = format!("Error getting metadata for file '{}': {:?}", path.path, e);
error!("{}", message);
span.set_status(Status::error(message));
HttpResponse::InternalServerError().finish()
}
}
}
/// Body for `POST /image/exif/gps` — write GPS coordinates into a file's
/// EXIF in place. Only `path` + `latitude` + `longitude` are required.
/// `library` is optional (falls back to the primary library) and matches
/// the convention of the other path-keyed routes.
#[derive(serde::Deserialize)]
struct SetGpsRequest {
path: String,
library: Option<String>,
latitude: f64,
longitude: f64,
}
#[post("/image/exif/gps")]
async fn set_image_gps(
_: Claims,
request: HttpRequest,
body: web::Json<SetGpsRequest>,
app_state: Data<AppState>,
exif_dao: Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("set_image_gps", &context);
let span_context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
let library = libraries::resolve_library_param(&app_state, body.library.as_deref())
.ok()
.flatten()
.unwrap_or_else(|| app_state.primary_library());
// Same fallback as get_file_metadata: union-mode means a file may
// resolve under a sibling library.
let resolved = is_valid_full_path(&library.root_path, &body.path, false)
.filter(|p| p.exists())
.map(|p| (library, p))
.or_else(|| {
app_state.libraries.iter().find_map(|lib| {
if lib.id == library.id {
return None;
}
is_valid_full_path(&lib.root_path, &body.path, false)
.filter(|p| p.exists())
.map(|p| (lib, p))
})
});
let (resolved_library, full_path) = match resolved {
Some(v) => v,
None => {
span.set_status(Status::error("file not found"));
return HttpResponse::NotFound().body("File not found");
}
};
if !exif::supports_exif(&full_path) {
return HttpResponse::BadRequest().body("File format does not support EXIF GPS write");
}
if let Err(e) = exif::write_gps(&full_path, body.latitude, body.longitude) {
let msg = format!("exiftool write failed: {}", e);
error!("{}", msg);
span.set_status(Status::error(msg.clone()));
return HttpResponse::InternalServerError().body(msg);
}
// Re-read EXIF from disk (the write path doesn't tell us the rest of
// the parsed fields back, and we want the DB row to match what
// extract_exif_from_path would now produce). Update the existing row
// rather than insert — this endpoint is invoked on already-indexed
// files only.
let extracted = match exif::extract_exif_from_path(&full_path) {
Ok(d) => d,
Err(e) => {
// GPS was written successfully but re-extraction failed; surface
// a 500 because the DB will now disagree with disk until the
// next file scan rewrites it.
let msg = format!("EXIF re-read failed after write: {}", e);
error!("{}", msg);
return HttpResponse::InternalServerError().body(msg);
}
};
let now = Utc::now().timestamp();
let normalized_path = body.path.replace('\\', "/");
// Re-run the canonical-date waterfall on every GPS write — exiftool
// writing GPS doesn't change the capture date, but if the row was
// previously sourced from `fs_time` the re-read may have given us a
// real EXIF date this time, and we want to upgrade the source.
let resolved_date = date_resolver::resolve_date_taken(&full_path, extracted.date_taken);
let insert_exif = InsertImageExif {
library_id: resolved_library.id,
file_path: normalized_path.clone(),
camera_make: extracted.camera_make,
camera_model: extracted.camera_model,
lens_model: extracted.lens_model,
width: extracted.width,
height: extracted.height,
orientation: extracted.orientation,
gps_latitude: extracted.gps_latitude.map(|v| v as f32),
gps_longitude: extracted.gps_longitude.map(|v| v as f32),
gps_altitude: extracted.gps_altitude.map(|v| v as f32),
focal_length: extracted.focal_length.map(|v| v as f32),
aperture: extracted.aperture.map(|v| v as f32),
shutter_speed: extracted.shutter_speed,
iso: extracted.iso,
date_taken: resolved_date.map(|r| r.timestamp),
// Created_time is preserved by update_exif (it doesn't touch the
// column); pass any int — it's ignored in the UPDATE statement.
created_time: now,
last_modified: now,
// Hash + size aren't touched in update_exif either, but the file
// bytes did change — best-effort recompute so the new hash lands
// on the next call to get_exif. Failure here just leaves the old
// values in place.
content_hash: content_hash::compute(&full_path)
.ok()
.map(|c| c.content_hash),
size_bytes: content_hash::compute(&full_path).ok().map(|c| c.size_bytes),
// GPS-update path doesn't touch perceptual hashes either; columns
// ignored by update_exif. Compute best-effort so a new file lands
// with a usable signal; failure just leaves prior values in place.
phash_64: perceptual_hash::compute(&full_path).map(|h| h.phash_64),
dhash_64: perceptual_hash::compute(&full_path).map(|h| h.dhash_64),
date_taken_source: resolved_date.map(|r| r.source.as_str().to_string()),
};
let updated = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
// If the row doesn't exist yet (file isn't indexed for some reason),
// insert instead so the GPS write is at least visible the moment
// the watcher catches up.
match dao.get_exif(&span_context, &normalized_path) {
Ok(Some(_)) => dao.update_exif(&span_context, insert_exif),
Ok(None) => dao.store_exif(&span_context, insert_exif),
Err(_) => dao.update_exif(&span_context, insert_exif),
}
};
match updated {
Ok(row) => {
// Mirror the file metadata so the client gets the new size /
// mtime in the same response and can refresh its cached
// metadata block in one round-trip.
let fs_meta = std::fs::metadata(&full_path).ok();
let mut response: MetadataResponse = match fs_meta {
Some(m) => m.into(),
None => MetadataResponse {
created: None,
modified: None,
size: 0,
exif: None,
filename_date: None,
library_id: None,
library_name: None,
},
};
response.exif = Some(row.into());
response.library_id = Some(resolved_library.id);
response.library_name = Some(resolved_library.name.clone());
response.filename_date =
memories::extract_date_from_filename(&body.path).map(|dt| dt.timestamp());
span.set_status(Status::Ok);
HttpResponse::Ok().json(response)
}
Err(e) => {
let msg = format!("EXIF DB update failed: {:?}", e);
error!("{}", msg);
span.set_status(Status::error(msg.clone()));
HttpResponse::InternalServerError().body(msg)
}
}
}
/// `GET /image/exif/full?path=&library=` — full per-file EXIF dump via
/// exiftool, for the DETAILS modal's "FULL EXIF" pane. Strictly richer
/// than `/image/metadata`'s curated subset (every group exiftool can
/// see: EXIF, File, MakerNotes, Composite, ICC_Profile, IPTC, …).
///
/// On-demand only — the watcher / indexer never calls this. Falls back
/// to 503 when exiftool isn't installed (deployer guidance is the same
/// as for the RAW preview pipeline: install exiftool for full coverage).
#[get("/image/exif/full")]
async fn get_full_exif(
_: Claims,
request: HttpRequest,
path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_full_exif", &context);
let library = libraries::resolve_library_param(&app_state, path.library.as_deref())
.ok()
.flatten()
.unwrap_or_else(|| app_state.primary_library());
// Same union-mode fallback as get_file_metadata — the file may live
// under a sibling library when the requested one's path resolves but
// doesn't actually contain the bytes.
let resolved = is_valid_full_path(&library.root_path, &path.path, false)
.filter(|p| p.exists())
.map(|p| (library, p))
.or_else(|| {
app_state.libraries.iter().find_map(|lib| {
if lib.id == library.id {
return None;
}
is_valid_full_path(&lib.root_path, &path.path, false)
.filter(|p| p.exists())
.map(|p| (lib, p))
})
});
let (resolved_library, full_path) = match resolved {
Some(v) => v,
None => {
span.set_status(Status::error("file not found"));
return HttpResponse::NotFound().body("File not found");
}
};
// exiftool spawn is blocking — keep it off the actix worker by
// running on the blocking pool. ~50200 ms typical for a JPEG;
// longer for RAW with rich MakerNotes.
let exif_result =
web::block(move || crate::exif::read_full_exif_via_exiftool(&full_path)).await;
match exif_result {
Ok(Ok(Some(tags))) => {
span.set_status(Status::Ok);
HttpResponse::Ok().json(serde_json::json!({
"library_id": resolved_library.id,
"library_name": resolved_library.name,
"tags": tags,
}))
}
Ok(Ok(None)) => {
// exiftool ran but produced no output for this file — treat as
// empty rather than an error so the modal renders "no tags"
// gracefully.
HttpResponse::Ok().json(serde_json::json!({
"library_id": resolved_library.id,
"library_name": resolved_library.name,
"tags": serde_json::Value::Object(Default::default()),
}))
}
Ok(Err(e)) => {
let msg = format!("exiftool failed: {}", e);
error!("{}", msg);
span.set_status(Status::error(msg.clone()));
// 503 — typically "exiftool isn't on PATH" or a transient spawn
// failure. Apollo surfaces a hint in the modal.
HttpResponse::ServiceUnavailable().body(msg)
}
Err(e) => {
let msg = format!("blocking-pool error: {}", e);
error!("{}", msg);
span.set_status(Status::error(msg.clone()));
HttpResponse::InternalServerError().body(msg)
}
}
}
/// Body for `POST /image/exif/date` — operator-driven date_taken override.
/// `date_taken` is unix seconds (matches `image_exif.date_taken`'s convention
/// — naive local reinterpreted as UTC, not real UTC; the Apollo client passes
/// through the same value the photo carousel rendered before edit).
#[derive(serde::Deserialize)]
struct SetDateRequest {
path: String,
library: Option<String>,
date_taken: i64,
}
/// Body for `POST /image/exif/date/clear` — revert a manual override and
/// restore the resolver-derived `(date_taken, date_taken_source)` pair from
/// the snapshot.
#[derive(serde::Deserialize)]
struct ClearDateRequest {
path: String,
library: Option<String>,
}
/// Build a `MetadataResponse` for the date endpoints. Mirrors
/// `get_file_metadata`'s shape so the client gets a single source of truth
/// after every mutation. Filesystem metadata is best-effort: if the file is
/// on a stale mount or moved, the DB-side override still succeeds and the
/// response carries `created=None, modified=None, size=0`. The DB row's
/// updated EXIF is what matters here.
fn build_metadata_response_for_date_mutation(
library: &libraries::Library,
rel_path: &str,
exif: ImageExif,
) -> MetadataResponse {
let full_path = is_valid_full_path(&library.root_path, &rel_path.to_string(), false);
let fs_meta = full_path
.as_ref()
.filter(|p| p.exists())
.and_then(|p| std::fs::metadata(p).ok());
let mut response: MetadataResponse = match fs_meta {
Some(m) => m.into(),
None => MetadataResponse {
created: None,
modified: None,
size: 0,
exif: None,
filename_date: None,
library_id: None,
library_name: None,
},
};
response.exif = Some(exif.into());
response.library_id = Some(library.id);
response.library_name = Some(library.name.clone());
response.filename_date =
memories::extract_date_from_filename(rel_path).map(|dt| dt.timestamp());
response
}
#[post("/image/exif/date")]
async fn set_image_date(
_: Claims,
request: HttpRequest,
body: web::Json<SetDateRequest>,
app_state: Data<AppState>,
exif_dao: Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("set_image_date", &context);
let span_context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
let library = libraries::resolve_library_param(&app_state, body.library.as_deref())
.ok()
.flatten()
.unwrap_or_else(|| app_state.primary_library());
// Path normalization matches set_image_gps so a Windows-import client
// doesn't end up with a backslash variant that misses the row.
let normalized_path = body.path.replace('\\', "/");
let updated = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.set_manual_date_taken(&span_context, library.id, &normalized_path, body.date_taken)
};
match updated {
Ok(row) => {
span.set_status(Status::Ok);
HttpResponse::Ok().json(build_metadata_response_for_date_mutation(
&library,
&normalized_path,
row,
))
}
Err(e) => {
let msg = format!("set_manual_date_taken failed: {:?}", e);
error!("{}", msg);
span.set_status(Status::error(msg.clone()));
// Likely "row not found" — the file isn't indexed under this
// (library, path). 404 lets the client distinguish from a 5xx.
HttpResponse::NotFound().body(msg)
}
}
}
#[post("/image/exif/date/clear")]
async fn clear_image_date(
_: Claims,
request: HttpRequest,
body: web::Json<ClearDateRequest>,
app_state: Data<AppState>,
exif_dao: Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("clear_image_date", &context);
let span_context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
let library = libraries::resolve_library_param(&app_state, body.library.as_deref())
.ok()
.flatten()
.unwrap_or_else(|| app_state.primary_library());
let normalized_path = body.path.replace('\\', "/");
let updated = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.clear_manual_date_taken(&span_context, library.id, &normalized_path)
};
match updated {
Ok(row) => {
span.set_status(Status::Ok);
HttpResponse::Ok().json(build_metadata_response_for_date_mutation(
&library,
&normalized_path,
row,
))
}
Err(e) => {
let msg = format!("clear_manual_date_taken failed: {:?}", e);
error!("{}", msg);
span.set_status(Status::error(msg.clone()));
HttpResponse::NotFound().body(msg)
}
}
}
#[derive(serde::Deserialize)]
struct UploadQuery {
library: Option<String>,
}
#[post("/image")]
async fn upload_image(
_: Claims,
request: HttpRequest,
query: web::Query<UploadQuery>,
mut payload: mp::Multipart,
app_state: Data<AppState>,
exif_dao: Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("upload_image", &context);
let span_context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
// Resolve the optional library selector. Absent → primary library
// (backwards-compatible with clients that don't yet send `library=`).
let target_library =
match libraries::resolve_library_param(&app_state, query.library.as_deref()) {
Ok(Some(lib)) => lib,
Ok(None) => app_state.primary_library(),
Err(msg) => {
span.set_status(Status::error(msg.clone()));
return HttpResponse::BadRequest().body(msg);
}
};
let mut file_content: BytesMut = BytesMut::new();
let mut file_name: Option<String> = None;
let mut file_path: Option<String> = None;
while let Some(Ok(mut part)) = payload.next().await {
if let Some(content_type) = part.content_disposition() {
debug!("{:?}", content_type);
if let Some(filename) = content_type.get_filename() {
debug!("Name (raw): {:?}", filename);
// Decode URL-encoded filename (e.g., "file%20name.jpg" -> "file name.jpg")
let decoded_filename = decode(filename)
.map(|s| s.to_string())
.unwrap_or_else(|_| filename.to_string());
debug!("Name (decoded): {:?}", decoded_filename);
file_name = Some(decoded_filename);
while let Some(Ok(data)) = part.next().await {
file_content.put(data);
}
} else if content_type.get_name() == Some("path") {
while let Some(Ok(data)) = part.next().await {
if let Ok(path) = std::str::from_utf8(&data) {
file_path = Some(path.to_string())
}
}
}
}
}
let path = file_path.unwrap_or_else(|| target_library.root_path.clone());
if !file_content.is_empty() {
if file_name.is_none() {
span.set_status(Status::error("No filename provided"));
return HttpResponse::BadRequest().body("No filename provided");
}
let full_path = PathBuf::from(&path).join(file_name.unwrap());
if let Some(full_path) = is_valid_full_path(
&target_library.root_path,
&full_path.to_str().unwrap().to_string(),
true,
) {
// Pre-write content-hash check: if these exact bytes already
// exist anywhere in any library (and aren't themselves
// soft-marked as duplicates), don't write the file. Return
// 409 with the canonical sibling so the mobile app can show
// a friendly "already in your library" toast.
let upload_hash = blake3::Hasher::new()
.update(&file_content)
.finalize()
.to_hex()
.to_string();
{
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
if let Ok(Some(existing)) = dao.find_by_content_hash(&span_context, &upload_hash)
&& existing.duplicate_of_hash.is_none()
{
let library_name = libraries::load_all(&mut crate::database::connect())
.into_iter()
.find(|l| l.id == existing.library_id)
.map(|l| l.name);
span.set_status(Status::Ok);
return HttpResponse::Conflict().json(serde_json::json!({
"duplicate_of": {
"library_id": existing.library_id,
"rel_path": existing.file_path,
},
"content_hash": upload_hash,
"library_name": library_name,
}));
}
}
let context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
tracer
.span_builder("file write")
.start_with_context(&tracer, &context);
let uploaded_path = if !full_path.is_file() && is_image_or_video(&full_path) {
let mut file = File::create(&full_path).unwrap();
file.write_all(&file_content).unwrap();
info!("Uploaded: {:?}", full_path);
full_path
} else {
warn!("File already exists: {:?}", full_path);
let new_path = format!(
"{}/{}_{}.{}",
full_path.parent().unwrap().to_str().unwrap(),
full_path.file_stem().unwrap().to_str().unwrap(),
Utc::now().timestamp(),
full_path
.extension()
.expect("Uploaded file should have an extension")
.to_str()
.unwrap()
);
info!("Uploaded: {}", new_path);
let new_path_buf = PathBuf::from(&new_path);
let mut file = File::create(&new_path_buf).unwrap();
file.write_all(&file_content).unwrap();
new_path_buf
};
// Extract and store EXIF data if file supports it
if exif::supports_exif(&uploaded_path) {
let relative_path = uploaded_path
.strip_prefix(&target_library.root_path)
.expect("Error stripping library root prefix")
.to_str()
.unwrap()
.replace('\\', "/");
match exif::extract_exif_from_path(&uploaded_path) {
Ok(exif_data) => {
let timestamp = Utc::now().timestamp();
let (content_hash, size_bytes) = match content_hash::compute(&uploaded_path)
{
Ok(id) => (Some(id.content_hash), Some(id.size_bytes)),
Err(e) => {
warn!(
"Failed to hash uploaded {}: {:?}",
uploaded_path.display(),
e
);
(None, None)
}
};
let perceptual = perceptual_hash::compute(&uploaded_path);
let resolved_date =
date_resolver::resolve_date_taken(&uploaded_path, exif_data.date_taken);
let insert_exif = InsertImageExif {
library_id: target_library.id,
file_path: relative_path.clone(),
camera_make: exif_data.camera_make,
camera_model: exif_data.camera_model,
lens_model: exif_data.lens_model,
width: exif_data.width,
height: exif_data.height,
orientation: exif_data.orientation,
gps_latitude: exif_data.gps_latitude.map(|v| v as f32),
gps_longitude: exif_data.gps_longitude.map(|v| v as f32),
gps_altitude: exif_data.gps_altitude.map(|v| v as f32),
focal_length: exif_data.focal_length.map(|v| v as f32),
aperture: exif_data.aperture.map(|v| v as f32),
shutter_speed: exif_data.shutter_speed,
iso: exif_data.iso,
date_taken: resolved_date.map(|r| r.timestamp),
created_time: timestamp,
last_modified: timestamp,
content_hash,
size_bytes,
phash_64: perceptual.map(|h| h.phash_64),
dhash_64: perceptual.map(|h| h.dhash_64),
date_taken_source: resolved_date.map(|r| r.source.as_str().to_string()),
};
if let Ok(mut dao) = exif_dao.lock() {
if let Err(e) = dao.store_exif(&span_context, insert_exif) {
error!("Failed to store EXIF data for {}: {:?}", relative_path, e);
} else {
debug!("EXIF data stored for {}", relative_path);
}
}
}
Err(e) => {
debug!(
"No EXIF data or error extracting from {}: {:?}",
uploaded_path.display(),
e
);
}
}
}
} else {
error!("Invalid path for upload: {:?}", full_path);
span.set_status(Status::error("Invalid path for upload"));
return HttpResponse::BadRequest().body("Path was not valid");
}
} else {
span.set_status(Status::error("No file body read"));
return HttpResponse::BadRequest().body("No file body read");
}
app_state.stream_manager.do_send(RefreshThumbnailsMessage);
span.set_status(Status::Ok);
HttpResponse::Ok().finish()
}
#[post("/video/generate")]
async fn generate_video(
_claims: Claims,
request: HttpRequest,
app_state: Data<AppState>,
body: web::Json<ThumbnailRequest>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("generate_video", &context);
let filename = PathBuf::from(&body.path);
if let Some(name) = filename.file_name() {
let filename = name.to_str().expect("Filename should convert to string");
// KNOWN ISSUE (multi-library): playlist filename is the basename
// alone, so two source files with the same basename — whether in
// different libraries or different subdirs of one library —
// overwrite each other's playlists while ffmpeg runs. The
// hash-keyed `content_hash::hls_dir` is the long-term answer
// (see CLAUDE.md "Multi-library data model"); rewiring the
// actor pipeline to use it is out of scope for this branch.
// The orphan-cleanup job above already walks every library so
// it doesn't false-delete archive playlists.
let playlist = format!("{}/{}.m3u8", app_state.video_path, filename);
let library = libraries::resolve_library_param(&app_state, body.library.as_deref())
.ok()
.flatten()
.unwrap_or_else(|| app_state.primary_library());
// Try the resolved library first, then fall back to any other library
// that actually contains the file — handles union-mode requests where
// the mobile client passes no library but the file lives in a
// non-primary library.
let resolved = is_valid_full_path(&library.root_path, &body.path, false)
.filter(|p| p.exists())
.or_else(|| {
app_state.libraries.iter().find_map(|lib| {
if lib.id == library.id {
return None;
}
is_valid_full_path(&lib.root_path, &body.path, false).filter(|p| p.exists())
})
});
if let Some(path) = resolved {
if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await {
span.add_event(
"playlist_created".to_string(),
vec![KeyValue::new("playlist-name", filename.to_string())],
);
span.set_status(Status::Ok);
app_state.stream_manager.do_send(ProcessMessage(
playlist.clone(),
child,
// opentelemetry::Context::new().with_span(span),
));
}
} else {
span.set_status(Status::error(format!("invalid path {:?}", &body.path)));
return HttpResponse::BadRequest().finish();
}
HttpResponse::Ok().json(playlist)
} else {
let message = format!("Unable to get file name: {:?}", filename);
error!("{}", message);
span.set_status(Status::error(message));
HttpResponse::BadRequest().finish()
}
}
#[get("/video/stream")]
async fn stream_video(
request: HttpRequest,
_: Claims,
path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global::tracer("image-server");
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("stream_video", &context);
let playlist = &path.path;
debug!("Playlist: {}", playlist);
// Only serve files under video_path (HLS playlists) or base_path (source videos)
if playlist.starts_with(&app_state.video_path)
|| is_valid_full_path(&app_state.base_path, playlist, false).is_some()
{
match NamedFile::open(playlist) {
Ok(file) => {
span.set_status(Status::Ok);
file.into_response(&request)
}
_ => {
span.set_status(Status::error(format!("playlist not found {}", playlist)));
HttpResponse::NotFound().finish()
}
}
} else {
span.set_status(Status::error(format!("playlist not valid {}", playlist)));
HttpResponse::BadRequest().finish()
}
}
#[get("/video/{path}")]
async fn get_video_part(
request: HttpRequest,
_: Claims,
path: web::Path<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_video_part", &context);
let part = &path.path;
debug!("Video part: {}", part);
let mut file_part = PathBuf::new();
file_part.push(app_state.video_path.clone());
file_part.push(part);
// Guard against directory traversal attacks
let canonical_base = match std::fs::canonicalize(&app_state.video_path) {
Ok(path) => path,
Err(e) => {
error!("Failed to canonicalize video path: {:?}", e);
span.set_status(Status::error("Invalid video path configuration"));
return HttpResponse::InternalServerError().finish();
}
};
let canonical_file = match std::fs::canonicalize(&file_part) {
Ok(path) => path,
Err(_) => {
warn!("Video part not found or invalid: {:?}", file_part);
span.set_status(Status::error(format!("Video part not found '{}'", part)));
return HttpResponse::NotFound().finish();
}
};
// Ensure the resolved path is still within the video directory
if !canonical_file.starts_with(&canonical_base) {
warn!("Directory traversal attempt detected: {:?}", part);
span.set_status(Status::error("Invalid video path"));
return HttpResponse::Forbidden().finish();
}
match NamedFile::open(&canonical_file) {
Ok(file) => {
span.set_status(Status::Ok);
file.into_response(&request)
}
_ => {
error!("Video part not found: {:?}", file_part);
span.set_status(Status::error(format!(
"Video part not found '{}'",
file_part.to_str().unwrap()
)));
HttpResponse::NotFound().finish()
}
}
}
#[get("/video/preview")]
async fn get_video_preview(
_claims: Claims,
request: HttpRequest,
req: web::Query<PreviewClipRequest>,
app_state: Data<AppState>,
preview_dao: Data<Mutex<Box<dyn PreviewDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_video_preview", &context);
// Validate path
let full_path = match is_valid_full_path(&app_state.base_path, &req.path, true) {
Some(path) => path,
None => {
span.set_status(Status::error("Invalid path"));
return HttpResponse::BadRequest().json(serde_json::json!({"error": "Invalid path"}));
}
};
let full_path_str = full_path.to_string_lossy().to_string();
// Use relative path (from BASE_PATH) for DB storage, consistent with EXIF convention
let relative_path = full_path_str
.strip_prefix(&app_state.base_path)
.unwrap_or(&full_path_str)
.trim_start_matches(['/', '\\'])
.to_string();
// Check preview status in DB
let preview = {
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
dao.get_preview(&context, &relative_path)
};
match preview {
Ok(Some(clip)) => match clip.status.as_str() {
"complete" => {
let preview_path = PathBuf::from(&app_state.preview_clips_path)
.join(&relative_path)
.with_extension("mp4");
match NamedFile::open(&preview_path) {
Ok(file) => {
span.set_status(Status::Ok);
file.into_response(&request)
}
Err(_) => {
// File missing on disk but DB says complete - reset and regenerate
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.update_status(
&context,
&relative_path,
"pending",
None,
None,
None,
);
app_state
.preview_clip_generator
.do_send(GeneratePreviewClipMessage {
video_path: full_path_str,
});
span.set_status(Status::Ok);
HttpResponse::Accepted().json(serde_json::json!({
"status": "processing",
"path": req.path
}))
}
}
}
"processing" => {
span.set_status(Status::Ok);
HttpResponse::Accepted().json(serde_json::json!({
"status": "processing",
"path": req.path
}))
}
"failed" => {
let error_msg = clip
.error_message
.unwrap_or_else(|| "Unknown error".to_string());
span.set_status(Status::error(format!("Generation failed: {}", error_msg)));
HttpResponse::InternalServerError().json(serde_json::json!({
"error": format!("Generation failed: {}", error_msg)
}))
}
_ => {
// pending or unknown status - trigger generation
app_state
.preview_clip_generator
.do_send(GeneratePreviewClipMessage {
video_path: full_path_str,
});
span.set_status(Status::Ok);
HttpResponse::Accepted().json(serde_json::json!({
"status": "processing",
"path": req.path
}))
}
},
Ok(None) => {
// No record exists - insert as pending and trigger generation
{
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.insert_preview(&context, &relative_path, "pending");
}
app_state
.preview_clip_generator
.do_send(GeneratePreviewClipMessage {
video_path: full_path_str,
});
span.set_status(Status::Ok);
HttpResponse::Accepted().json(serde_json::json!({
"status": "processing",
"path": req.path
}))
}
Err(_) => {
span.set_status(Status::error("Database error"));
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
#[post("/video/preview/status")]
async fn get_preview_status(
_claims: Claims,
request: HttpRequest,
body: web::Json<PreviewStatusRequest>,
app_state: Data<AppState>,
preview_dao: Data<Mutex<Box<dyn PreviewDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_preview_status", &context);
// Limit to 200 paths per request
if body.paths.len() > 200 {
span.set_status(Status::error("Too many paths"));
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "Maximum 200 paths per request"}));
}
let previews = {
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
dao.get_previews_batch(&context, &body.paths)
};
match previews {
Ok(clips) => {
// Build a map of file_path -> VideoPreviewClip for quick lookup
let clip_map: HashMap<String, _> = clips
.into_iter()
.map(|clip| (clip.file_path.clone(), clip))
.collect();
let mut items: Vec<PreviewStatusItem> = Vec::with_capacity(body.paths.len());
for path in &body.paths {
if let Some(clip) = clip_map.get(path) {
// Re-queue generation for stale pending/failed records
if clip.status == "pending" || clip.status == "failed" {
let full_path = format!(
"{}/{}",
app_state.base_path.trim_end_matches(['/', '\\']),
path.trim_start_matches(['/', '\\'])
);
app_state
.preview_clip_generator
.do_send(GeneratePreviewClipMessage {
video_path: full_path,
});
}
items.push(PreviewStatusItem {
path: path.clone(),
status: clip.status.clone(),
preview_url: if clip.status == "complete" {
Some(format!("/video/preview?path={}", urlencoding::encode(path)))
} else {
None
},
});
} else {
// No record exists — insert as pending and trigger generation
{
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.insert_preview(&context, path, "pending");
}
// Build full path for ffmpeg (actor needs the absolute path for input)
let full_path = format!(
"{}/{}",
app_state.base_path.trim_end_matches(['/', '\\']),
path.trim_start_matches(['/', '\\'])
);
info!("Triggering preview generation for '{}'", path);
app_state
.preview_clip_generator
.do_send(GeneratePreviewClipMessage {
video_path: full_path,
});
items.push(PreviewStatusItem {
path: path.clone(),
status: "pending".to_string(),
preview_url: None,
});
}
}
span.set_status(Status::Ok);
HttpResponse::Ok().json(PreviewStatusResponse { previews: items })
}
Err(_) => {
span.set_status(Status::error("Database error"));
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
#[get("image/favorites")]
async fn favorites(
claims: Claims,
request: HttpRequest,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get favorites", &context);
match web::block(move || {
favorites_dao
.lock()
.expect("Unable to get FavoritesDao")
.get_favorites(claims.sub.parse::<i32>().unwrap())
})
.await
{
Ok(Ok(favorites)) => {
let favorites = favorites
.into_iter()
.map(|favorite| favorite.path)
.collect::<Vec<String>>();
span.set_status(Status::Ok);
// Favorites are library-agnostic (shared by rel_path), so we
// intentionally leave photo_libraries empty to signal "no badge".
HttpResponse::Ok().json(PhotosResponse {
photos: favorites,
dirs: Vec::new(),
photo_libraries: Vec::new(),
total_count: None,
has_more: None,
next_offset: None,
})
}
Ok(Err(e)) => {
span.set_status(Status::error(format!("Error getting favorites: {:?}", e)));
error!("Error getting favorites: {:?}", e);
HttpResponse::InternalServerError().finish()
}
Err(_) => HttpResponse::InternalServerError().finish(),
}
}
#[put("image/favorites")]
async fn put_add_favorite(
claims: Claims,
body: web::Json<AddFavoriteRequest>,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
) -> impl Responder {
if let Ok(user_id) = claims.sub.parse::<i32>() {
let path = body.path.clone();
match web::block::<_, Result<usize, DbError>>(move || {
favorites_dao
.lock()
.expect("Unable to get FavoritesDao")
.add_favorite(user_id, &path)
})
.await
{
Ok(Err(e)) if e.kind == DbErrorKind::AlreadyExists => {
warn!("Favorite: {} exists for user: {}", &body.path, user_id);
HttpResponse::Ok()
}
Ok(Err(e)) => {
error!("{:?} {}. for user: {}", e, body.path, user_id);
HttpResponse::BadRequest()
}
Ok(Ok(_)) => {
info!("Adding favorite \"{}\" for userid: {}", body.path, user_id);
HttpResponse::Created()
}
Err(e) => {
error!("Blocking error while inserting favorite: {:?}", e);
HttpResponse::InternalServerError()
}
}
} else {
error!("Unable to parse sub as i32: {}", claims.sub);
HttpResponse::BadRequest()
}
}
#[delete("image/favorites")]
async fn delete_favorite(
claims: Claims,
body: web::Query<AddFavoriteRequest>,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
) -> impl Responder {
if let Ok(user_id) = claims.sub.parse::<i32>() {
let path = body.path.clone();
web::block(move || {
favorites_dao
.lock()
.expect("Unable to get favorites dao")
.remove_favorite(user_id, path);
})
.await
.unwrap();
info!(
"Removing favorite \"{}\" for userid: {}",
body.path, user_id
);
HttpResponse::Ok()
} else {
error!("Unable to parse sub as i32: {}", claims.sub);
HttpResponse::BadRequest()
}
}
/// Sentinel path written next to a would-be thumbnail when a file cannot be
/// decoded by either the `image` crate or ffmpeg. Its presence causes future
/// scans to skip the file instead of re-logging the failure.
pub fn unsupported_thumbnail_sentinel(thumb_path: &Path) -> PathBuf {
let mut s = thumb_path.as_os_str().to_owned();
s.push(".unsupported");
PathBuf::from(s)
}
fn generate_image_thumbnail(src: &Path, thumb_path: &Path) -> std::io::Result<()> {
// The `image` crate doesn't auto-apply EXIF Orientation on load, and
// saving back out as JPEG drops EXIF entirely — so without baking the
// rotation into the pixels here, browsers see the raw landscape buffer
// of a portrait phone shot and render it sideways. Read once up front
// and apply to whichever decode branch we end up taking.
let orientation = exif::read_orientation(src).unwrap_or(1);
// RAW formats (ARW/NEF/CR2/etc): try the file's embedded JPEG preview
// first. Avoids ffmpeg choking on proprietary RAW compression (Sony ARW
// in particular), and is faster than decoding RAW pixels anyway.
if let Some(preview) = exif::extract_embedded_jpeg_preview(src) {
let img = image::load_from_memory(&preview).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("decode embedded preview {:?}: {}", src, e),
)
})?;
let img = exif::apply_orientation(img, orientation);
let scaled = img.thumbnail(200, u32::MAX);
scaled
.save_with_format(thumb_path, image::ImageFormat::Jpeg)
.map_err(|e| std::io::Error::other(format!("save {:?}: {}", thumb_path, e)))?;
return Ok(());
}
if file_types::needs_ffmpeg_thumbnail(src) {
return generate_image_thumbnail_ffmpeg(src, thumb_path);
}
let img = image::open(src).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, format!("{:?}: {}", src, e))
})?;
let img = exif::apply_orientation(img, orientation);
let scaled = img.thumbnail(200, u32::MAX);
scaled
.save(thumb_path)
.map_err(|e| std::io::Error::other(format!("save {:?}: {}", thumb_path, e)))?;
Ok(())
}
fn create_thumbnails(libs: &[libraries::Library], excluded_dirs: &[String]) {
let tracer = global_tracer();
let span = tracer.start("creating thumbnails");
let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
let thumbnail_directory: &Path = Path::new(thumbs);
for lib in libs {
info!(
"Scanning thumbnails for library '{}' at {}",
lib.name, lib.root_path
);
let images = PathBuf::from(&lib.root_path);
// Effective excludes = global env-var excludes library row's
// excluded_dirs. Lets a parent-library mount skip the subtree
// already covered by a child library.
let effective_excludes = lib.effective_excluded_dirs(excluded_dirs);
// Prune EXCLUDED_DIRS so we don't generate thumbnails-of-thumbnails
// for Synology @eaDir trees. file_scan handles filter_entry pruning.
image_api::file_scan::walk_library_files(&images, &effective_excludes)
.into_par_iter()
.for_each(|entry| {
let src = entry.path();
let Ok(relative_path) = src.strip_prefix(&images) else {
return;
};
// Library-scoped legacy path: prevents two libraries with
// the same rel_path from clobbering each other's thumbs.
// Hash-keyed promotion happens lazily on first hash-aware
// request — keeping this loop ExifDao-free preserves the
// current "cargo build && go" startup story.
let thumb_path = content_hash::library_scoped_legacy_path(
thumbnail_directory,
lib.id,
relative_path,
);
let bare_legacy = thumbnail_directory.join(relative_path);
// Backwards-compat check: if a single-library install has a
// bare-legacy thumb here already, accept it as present.
// Same for the sentinel. Means we don't redo work after
// upgrade and we don't leave stale duplicates around.
if thumb_path.exists()
|| bare_legacy.exists()
|| unsupported_thumbnail_sentinel(&thumb_path).exists()
|| unsupported_thumbnail_sentinel(&bare_legacy).exists()
{
return;
}
let Some(parent) = thumb_path.parent() else {
return;
};
if let Err(e) = std::fs::create_dir_all(parent) {
error!("Failed to create thumbnail dir {:?}: {}", parent, e);
return;
}
if is_video(&entry) {
let mut video_span = tracer.start_with_context(
"generate_video_thumbnail",
&opentelemetry::Context::new()
.with_remote_span_context(span.span_context().clone()),
);
video_span.set_attributes(vec![
KeyValue::new("type", "video"),
KeyValue::new("file-name", thumb_path.display().to_string()),
KeyValue::new("library", lib.name.clone()),
]);
debug!("Generating video thumbnail: {:?}", thumb_path);
if let Err(e) = generate_video_thumbnail(src, &thumb_path) {
let sentinel = unsupported_thumbnail_sentinel(&thumb_path);
error!(
"Unable to thumbnail video {:?}: {}. Writing sentinel {:?}",
src, e, sentinel
);
if let Err(se) = std::fs::write(&sentinel, b"") {
warn!("Failed to write sentinel {:?}: {}", sentinel, se);
}
}
video_span.end();
} else if is_image(&entry) {
match generate_image_thumbnail(src, &thumb_path) {
Ok(_) => info!("Saved thumbnail: {:?}", thumb_path),
Err(e) => {
let sentinel = unsupported_thumbnail_sentinel(&thumb_path);
error!(
"Unable to thumbnail {:?}: {}. Writing sentinel {:?}",
src, e, sentinel
);
if let Err(se) = std::fs::write(&sentinel, b"") {
warn!("Failed to write sentinel {:?}: {}", sentinel, se);
}
}
}
}
});
}
debug!("Finished making thumbnails");
for lib in libs {
let effective_excludes = lib.effective_excluded_dirs(excluded_dirs);
update_media_counts(Path::new(&lib.root_path), &effective_excludes);
}
}
fn update_media_counts(media_dir: &Path, excluded_dirs: &[String]) {
let mut image_count = 0;
let mut video_count = 0;
for entry in image_api::file_scan::walk_library_files(media_dir, excluded_dirs) {
if is_image(&entry) {
image_count += 1;
} else if is_video(&entry) {
video_count += 1;
}
}
IMAGE_GAUGE.set(image_count);
VIDEO_GAUGE.set(video_count);
}
fn is_image(entry: &DirEntry) -> bool {
use image_api::file_types;
file_types::direntry_is_image(entry)
}
fn is_video(entry: &DirEntry) -> bool {
use image_api::file_types;
file_types::direntry_is_video(entry)
}
fn main() -> std::io::Result<()> {
if let Err(err) = dotenv::dotenv() {
println!("Error parsing .env {:?}", err);
}
run_migrations(&mut connect()).expect("Failed to run migrations");
let system = actix::System::new();
system.block_on(async {
// Just use basic logger when running a non-release build
#[cfg(debug_assertions)]
{
env_logger::init();
}
#[cfg(not(debug_assertions))]
{
otel::init_logs();
otel::init_tracing();
}
// AppState construction loads (and seeds if needed) the libraries
// table; we use that list to drive the initial thumbnail sweep.
let app_data = Data::new(AppState::default());
// Kick thumbnail generation onto a background thread so the HTTP
// server can accept traffic while large libraries are backfilling.
// Existing thumbs are re-used (exists() check inside the walk),
// so missed files are filled in over successive scans.
{
let libs = app_data.libraries.clone();
let excluded = app_data.excluded_dirs.clone();
std::thread::spawn(move || {
create_thumbnails(&libs, &excluded);
});
}
// generate_video_gifs().await;
let labels = HashMap::new();
let prometheus = PrometheusMetricsBuilder::new("api")
.const_labels(labels)
.build()
.expect("Unable to build prometheus metrics middleware");
prometheus
.registry
.register(Box::new(IMAGE_GAUGE.clone()))
.unwrap();
prometheus
.registry
.register(Box::new(VIDEO_GAUGE.clone()))
.unwrap();
let app_state = app_data.clone();
for lib in &app_state.libraries {
app_state.playlist_manager.do_send(ScanDirectoryMessage {
directory: lib.root_path.clone(),
});
}
// Start file watcher with playlist manager and preview generator
let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone();
let preview_gen_for_watcher = app_state.preview_clip_generator.as_ref().clone();
watch_files(
app_state.libraries.clone(),
playlist_mgr_for_watcher,
preview_gen_for_watcher,
app_state.face_client.clone(),
app_state.excluded_dirs.clone(),
app_state.library_health.clone(),
);
// Start orphaned playlist cleanup job. Multi-library aware: walks
// every configured library when looking for the source video, and
// skips the whole cycle while any library is stale (a missing
// source is indistinguishable from a transiently-unmounted share).
cleanup_orphaned_playlists(
app_state.libraries.clone(),
app_state.excluded_dirs.clone(),
app_state.library_health.clone(),
);
// Spawn background job to generate daily conversation summaries
{
use crate::ai::generate_daily_summaries;
use crate::database::{DailySummaryDao, SqliteDailySummaryDao};
use chrono::NaiveDate;
// Configure date range for summary generation
// Default: August 2024 ±30 days (July 1 - September 30, 2024)
// To expand: change start_date and end_date
let start_date = Some(NaiveDate::from_ymd_opt(2015, 10, 1).unwrap());
let end_date = Some(NaiveDate::from_ymd_opt(2020, 1, 1).unwrap());
// let contacts_to_summarize = vec!["Domenique", "Zach", "Paul"]; // Add more contacts as needed
let contacts_to_summarize = vec![]; // Add more contacts as needed
let ollama = app_state.ollama.clone();
let sms_client = app_state.sms_client.clone();
for contact in contacts_to_summarize {
let ollama_clone = ollama.clone();
let sms_client_clone = sms_client.clone();
let summary_dao: Arc<Mutex<Box<dyn DailySummaryDao>>> =
Arc::new(Mutex::new(Box::new(SqliteDailySummaryDao::new())));
let start = start_date;
let end = end_date;
tokio::spawn(async move {
info!("Starting daily summary generation for {}", contact);
if let Err(e) = generate_daily_summaries(
contact,
start,
end,
&ollama_clone,
&sms_client_clone,
summary_dao,
)
.await
{
error!("Daily summary generation failed for {}: {:?}", contact, e);
} else {
info!("Daily summary generation completed for {}", contact);
}
});
}
}
HttpServer::new(move || {
let user_dao = SqliteUserDao::new();
let favorites_dao = SqliteFavoriteDao::new();
let tag_dao = SqliteTagDao::default();
let exif_dao = SqliteExifDao::new();
let insight_dao = SqliteInsightDao::new();
let preview_dao = SqlitePreviewDao::new();
let face_dao = faces::SqliteFaceDao::new();
let cors = Cors::default()
.allowed_origin_fn(|origin, _req_head| {
// Allow all origins in development, or check against CORS_ALLOWED_ORIGINS env var
if let Ok(allowed_origins) = env::var("CORS_ALLOWED_ORIGINS") {
allowed_origins
.split(',')
.any(|allowed| origin.as_bytes() == allowed.trim().as_bytes())
} else {
// Default: allow all origins if not configured
true
}
})
.allowed_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"])
.allowed_headers(vec![
actix_web::http::header::AUTHORIZATION,
actix_web::http::header::ACCEPT,
actix_web::http::header::CONTENT_TYPE,
])
.supports_credentials()
.max_age(3600);
// Configure rate limiting for login endpoint (2 requests/sec, burst of 5)
let governor_conf = GovernorConfigBuilder::default()
.per_second(2)
.burst_size(5)
.finish()
.unwrap();
App::new()
.wrap(middleware::Logger::default())
.wrap(cors)
.service(
web::resource("/login")
.wrap(Governor::new(&governor_conf))
.route(web::post().to(login::<SqliteUserDao>)),
)
.service(
web::resource("/photos")
.route(web::get().to(files::list_photos::<SqliteTagDao, RealFileSystem>)),
)
.service(
web::resource("/photos/gps-summary")
.route(web::get().to(files::get_gps_summary)),
)
.service(
web::resource("/photos/exif").route(web::get().to(files::list_exif_summary)),
)
.service(web::resource("/file/move").post(move_file::<RealFileSystem>))
.service(get_image)
.service(upload_image)
.service(generate_video)
.service(stream_video)
.service(get_video_preview)
.service(get_preview_status)
.service(get_video_part)
.service(favorites)
.service(put_add_favorite)
.service(delete_favorite)
.service(get_file_metadata)
.service(set_image_gps)
.service(set_image_date)
.service(clear_image_date)
.service(get_full_exif)
.service(memories::list_memories)
.service(ai::generate_insight_handler)
.service(ai::generate_agentic_insight_handler)
.service(ai::get_insight_handler)
.service(ai::delete_insight_handler)
.service(ai::get_all_insights_handler)
.service(ai::get_available_models_handler)
.service(ai::get_openrouter_models_handler)
.service(ai::chat_turn_handler)
.service(ai::chat_stream_handler)
.service(ai::chat_history_handler)
.service(ai::chat_rewind_handler)
.service(ai::rate_insight_handler)
.service(ai::export_training_data_handler)
.service(libraries::list_libraries)
.add_feature(add_tag_services::<_, SqliteTagDao>)
.add_feature(knowledge::add_knowledge_services::<_, SqliteKnowledgeDao>)
.add_feature(faces::add_face_services::<_, faces::SqliteFaceDao>)
.add_feature(duplicates::add_duplicate_services)
.app_data(app_data.clone())
.app_data::<Data<RealFileSystem>>(Data::new(RealFileSystem::new(
app_data.base_path.clone(),
)))
.app_data::<Data<Mutex<SqliteUserDao>>>(Data::new(Mutex::new(user_dao)))
.app_data::<Data<Mutex<Box<dyn FavoriteDao>>>>(Data::new(Mutex::new(Box::new(
favorites_dao,
))))
.app_data::<Data<Mutex<SqliteTagDao>>>(Data::new(Mutex::new(tag_dao)))
.app_data::<Data<Mutex<Box<dyn ExifDao>>>>(Data::new(Mutex::new(Box::new(
exif_dao,
))))
.app_data::<Data<Mutex<Box<dyn InsightDao>>>>(Data::new(Mutex::new(Box::new(
insight_dao,
))))
.app_data::<Data<Mutex<Box<dyn PreviewDao>>>>(Data::new(Mutex::new(Box::new(
preview_dao,
))))
.app_data::<Data<Mutex<SqliteKnowledgeDao>>>(Data::new(Mutex::new(
SqliteKnowledgeDao::new(),
)))
.app_data::<Data<Mutex<faces::SqliteFaceDao>>>(Data::new(Mutex::new(face_dao)))
.app_data::<Data<crate::ai::face_client::FaceClient>>(Data::new(
app_data.face_client.clone(),
))
.app_data(mp::form::MultipartFormConfig::default().total_limit(1024 * 1024 * 1024)) // 1GB upload limit
.app_data(web::JsonConfig::default().error_handler(|err, req| {
let detail = err.to_string();
log::warn!(
"JSON parse error on {} {}: {}",
req.method(),
req.uri(),
detail
);
let response =
HttpResponse::BadRequest().json(serde_json::json!({"error": detail}));
actix_web::error::InternalError::from_response(err, response).into()
}))
.app_data::<Data<InsightGenerator>>(Data::new(app_data.insight_generator.clone()))
.wrap(prometheus.clone())
})
.bind(dotenv::var("BIND_URL").unwrap())?
.bind("localhost:8088")?
.run()
.await
})
}
fn run_migrations(
connection: &mut impl MigrationHarness<Sqlite>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
connection.run_pending_migrations(MIGRATIONS)?;
Ok(())
}
/// Clean up orphaned HLS playlists and segments whose source videos no longer exist
fn cleanup_orphaned_playlists(
libs: Vec<libraries::Library>,
excluded_dirs: Vec<String>,
library_health: libraries::LibraryHealthMap,
) {
std::thread::spawn(move || {
let video_path = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
// Get cleanup interval from environment (default: 24 hours)
let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(86400); // 24 hours
info!("Starting orphaned playlist cleanup job");
info!(" Cleanup interval: {} seconds", cleanup_interval_secs);
info!(" Playlist directory: {}", video_path);
for lib in &libs {
info!(
" Checking sources under '{}' at {}",
lib.name, lib.root_path
);
}
loop {
std::thread::sleep(Duration::from_secs(cleanup_interval_secs));
// Safety gate: skip the cleanup cycle if any library is
// stale. A missing source video on a stale library is
// indistinguishable from a transient unmount, and the
// cleanup is destructive — we'd rather leak a few playlist
// files for a tick than delete one whose source is briefly
// unreachable. The cycle re-runs on the next interval.
{
let guard = library_health.read().unwrap_or_else(|e| e.into_inner());
let stale: Vec<String> = libs
.iter()
.filter(|lib| guard.get(&lib.id).map(|h| !h.is_online()).unwrap_or(false))
.map(|lib| lib.name.clone())
.collect();
if !stale.is_empty() {
warn!(
"Skipping orphaned-playlist cleanup: {} library(ies) stale: [{}]",
stale.len(),
stale.join(", ")
);
continue;
}
}
info!("Running orphaned playlist cleanup");
let start = std::time::Instant::now();
let mut deleted_count = 0;
let mut error_count = 0;
// Find all .m3u8 files in VIDEO_PATH
let playlists: Vec<PathBuf> = WalkDir::new(&video_path)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.filter(|e| {
e.path()
.extension()
.and_then(|s| s.to_str())
.map(|ext| ext.eq_ignore_ascii_case("m3u8"))
.unwrap_or(false)
})
.map(|e| e.path().to_path_buf())
.collect();
info!("Found {} playlist files to check", playlists.len());
for playlist_path in playlists {
// Extract the original video filename from playlist name
// Playlist format: {VIDEO_PATH}/{original_filename}.m3u8
if let Some(filename) = playlist_path.file_stem() {
let video_filename = filename.to_string_lossy();
// Search for this video file across every configured
// library, respecting EXCLUDED_DIRS so we don't
// false-resurrect playlists for videos that only
// exist inside an excluded subtree. As soon as one
// library has a matching source, we're done — the
// playlist isn't orphaned.
let mut video_exists = false;
'libs: for lib in &libs {
let effective = lib.effective_excluded_dirs(&excluded_dirs);
for entry in image_api::file_scan::walk_library_files(
Path::new(&lib.root_path),
&effective,
) {
if let Some(entry_stem) = entry.path().file_stem()
&& entry_stem == filename
&& is_video_file(entry.path())
{
video_exists = true;
break 'libs;
}
}
}
if !video_exists {
debug!(
"Source video for playlist {} no longer exists, deleting",
playlist_path.display()
);
// Delete the playlist file
if let Err(e) = std::fs::remove_file(&playlist_path) {
warn!(
"Failed to delete playlist {}: {}",
playlist_path.display(),
e
);
error_count += 1;
} else {
deleted_count += 1;
// Also try to delete associated .ts segment files
// They are typically named {filename}N.ts in the same directory
if let Some(parent_dir) = playlist_path.parent() {
for entry in WalkDir::new(parent_dir)
.max_depth(1)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
{
let entry_path = entry.path();
if let Some(ext) = entry_path.extension()
&& ext.eq_ignore_ascii_case("ts")
{
// Check if this .ts file belongs to our playlist
if let Some(ts_stem) = entry_path.file_stem() {
let ts_name = ts_stem.to_string_lossy();
if ts_name.starts_with(&*video_filename) {
if let Err(e) = std::fs::remove_file(entry_path) {
debug!(
"Failed to delete segment {}: {}",
entry_path.display(),
e
);
} else {
debug!(
"Deleted segment: {}",
entry_path.display()
);
}
}
}
}
}
}
}
}
}
}
info!(
"Orphaned playlist cleanup completed in {:?}: deleted {} playlists, {} errors",
start.elapsed(),
deleted_count,
error_count
);
}
});
}
fn watch_files(
libs: Vec<libraries::Library>,
playlist_manager: Addr<VideoPlaylistManager>,
preview_generator: Addr<video::actors::PreviewClipGenerator>,
face_client: crate::ai::face_client::FaceClient,
excluded_dirs: Vec<String>,
library_health: libraries::LibraryHealthMap,
) {
std::thread::spawn(move || {
// Get polling intervals from environment variables
// Quick scan: Check recently modified files (default: 60 seconds)
let quick_interval_secs = dotenv::var("WATCH_QUICK_INTERVAL_SECONDS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(60);
// Full scan: Check all files regardless of modification time (default: 3600 seconds = 1 hour)
let full_interval_secs = dotenv::var("WATCH_FULL_INTERVAL_SECONDS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(3600);
info!("Starting optimized file watcher");
info!(" Quick scan interval: {} seconds", quick_interval_secs);
info!(" Full scan interval: {} seconds", full_interval_secs);
// Surface face-detection state at boot so it's obvious whether
// the watcher will hit Apollo. The branch silently no-ops when
// disabled (intentional for legacy deploys), which makes "why
// aren't faces being detected?" hard to diagnose otherwise.
if face_client.is_enabled() {
info!(" Face detection: ENABLED");
} else {
info!(
" Face detection: DISABLED (set APOLLO_FACE_API_BASE_URL \
or APOLLO_API_BASE_URL to enable)"
);
}
for lib in &libs {
info!(
" Watching library '{}' (id={}) at {}",
lib.name, lib.id, lib.root_path
);
}
// Create DAOs for tracking processed files
let exif_dao = Arc::new(Mutex::new(
Box::new(SqliteExifDao::new()) as Box<dyn ExifDao>
));
let preview_dao = Arc::new(Mutex::new(
Box::new(SqlitePreviewDao::new()) as Box<dyn PreviewDao>
));
let face_dao = Arc::new(Mutex::new(
Box::new(faces::SqliteFaceDao::new()) as Box<dyn faces::FaceDao>
));
// tag_dao for the watcher's auto-bind path. Independent of the
// request-handler tag_dao instance — both end up pointing at the
// same SQLite file via SqliteTagDao::default().
let watcher_tag_dao = Arc::new(Mutex::new(
Box::new(SqliteTagDao::default()) as Box<dyn tags::TagDao>
));
let mut last_quick_scan = SystemTime::now();
let mut last_full_scan = SystemTime::now();
let mut scan_count = 0u64;
// Per-library cursor for the missing-file scan. Each tick reads
// a page from `offset`, stat()s the rows, deletes confirmed-
// missing ones, and advances or wraps the cursor. State held
// in-memory so a watcher restart resumes from 0 — fine, the
// sweep is idempotent.
let mut missing_file_offsets: std::collections::HashMap<i32, i64> =
std::collections::HashMap::new();
let missing_scan_page_size: i64 = dotenv::var("IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &i64| *n > 0)
.unwrap_or(library_maintenance::DEFAULT_SCAN_PAGE_SIZE);
let missing_delete_cap: usize = dotenv::var("IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &usize| *n > 0)
.unwrap_or(library_maintenance::DEFAULT_MISSING_DELETE_CAP);
// Two-tick orphan-GC consensus state. Carried across ticks via
// `OrphanGcState`; see library_maintenance::run_orphan_gc.
let mut orphan_gc_state = library_maintenance::OrphanGcState::default();
// Initial availability sweep before the loop's first sleep so
// /libraries reports the truth from the very first request,
// rather than the optimistic Online default that
// new_health_map seeds. Without this, an unmounted share would
// appear online for up to WATCH_QUICK_INTERVAL_SECONDS (default
// 60s) after boot. Same probe logic as the per-tick gate
// below; no ingest runs here, just the health update + log.
// Disabled libraries skip the probe entirely — they should
// never enter the health map (treated as out-of-scope).
for lib in &libs {
if !lib.enabled {
continue;
}
let context = opentelemetry::Context::new();
let had_data = exif_dao
.lock()
.expect("exif_dao poisoned")
.count_for_library(&context, lib.id)
.map(|n| n > 0)
.unwrap_or(false);
libraries::refresh_health(&library_health, lib, had_data);
}
loop {
std::thread::sleep(Duration::from_secs(quick_interval_secs));
let now = SystemTime::now();
let since_last_full = now
.duration_since(last_full_scan)
.unwrap_or(Duration::from_secs(0));
let is_full_scan = since_last_full.as_secs() >= full_interval_secs;
for lib in &libs {
// Operator kill switch: a disabled library is invisible
// to the watcher entirely. No probe, no ingest, no
// maintenance, no health entry. Distinct from Stale —
// Stale is "we wanted to but couldn't"; Disabled is
// "we don't want to". Toggle via SQL.
if !lib.enabled {
debug!(
"watcher: skipping library '{}' (id={}) — enabled=false",
lib.name, lib.id
);
continue;
}
// Availability probe: every tick checks that the
// library's mount is reachable, is a directory, is
// readable, and (if image_exif has rows for it) is
// non-empty. A Stale library skips ingest, backlog
// drains, and metric refresh — reads/serving in HTTP
// handlers continue to work. Branches B/C extend the
// probe gate to cover handoff and orphan GC. See
// CLAUDE.md "Library availability and safety".
let had_data = {
let context = opentelemetry::Context::new();
let mut guard = exif_dao.lock().expect("exif_dao poisoned");
guard
.count_for_library(&context, lib.id)
.map(|n| n > 0)
.unwrap_or(false)
};
let health = libraries::refresh_health(&library_health, lib, had_data);
if !health.is_online() {
// Skip every write path for this library this tick.
// Don't refresh the media-count gauge either — a
// probe-failed library would otherwise flap to 0
// image / 0 video and pollute Prometheus.
continue;
}
// Drain the unhashed-hash backlog AND the face-detection
// backlog every tick, regardless of quick/full. Quick
// scans only walk recently-modified files, so the
// pre-Phase-3 backlog never enters their candidate set
// — without these standalone passes, backfill +
// detection only progressed during full scans
// (default once an hour).
// Effective excludes for this library: global env-var
// row's excluded_dirs. Compute once per tick — used
// by every walker below for this library.
let effective_excludes = lib.effective_excluded_dirs(&excluded_dirs);
if face_client.is_enabled() {
let context = opentelemetry::Context::new();
backfill_unhashed_backlog(&context, lib, &exif_dao);
process_face_backlog(
&context,
lib,
&face_client,
&face_dao,
&watcher_tag_dao,
&effective_excludes,
);
}
// Date-taken backfill: drain rows whose canonical date is
// either unresolved or only fs_time-sourced. Independent
// of face detection — runs even on deploys that don't
// configure Apollo, since `/memories` depends on it.
{
let context = opentelemetry::Context::new();
backfill_missing_date_taken(&context, lib, &exif_dao);
}
if is_full_scan {
info!(
"Running full scan for library '{}' (scan #{})",
lib.name, scan_count
);
process_new_files(
lib,
Arc::clone(&exif_dao),
Arc::clone(&preview_dao),
Arc::clone(&face_dao),
Arc::clone(&watcher_tag_dao),
face_client.clone(),
&effective_excludes,
None,
playlist_manager.clone(),
preview_generator.clone(),
);
} else {
debug!(
"Running quick scan for library '{}' (checking files modified in last {} seconds)",
lib.name,
quick_interval_secs + 10
);
let check_since = last_quick_scan
.checked_sub(Duration::from_secs(10))
.unwrap_or(last_quick_scan);
process_new_files(
lib,
Arc::clone(&exif_dao),
Arc::clone(&preview_dao),
Arc::clone(&face_dao),
Arc::clone(&watcher_tag_dao),
face_client.clone(),
&effective_excludes,
Some(check_since),
playlist_manager.clone(),
preview_generator.clone(),
);
}
// Update media counts per library (metric aggregates across all)
update_media_counts(Path::new(&lib.root_path), &effective_excludes);
// Missing-file detection: prune image_exif rows whose
// source file is no longer on disk. Per-library, so we
// pass library-online-this-tick implicitly (we only
// reach here if the probe gate at the top of the
// iteration passed). Capped + paginated so a huge
// library doesn't stall the watcher; rows we don't
// visit this tick get visited next tick. See
// library_maintenance::detect_missing_files_for_library.
{
let context = opentelemetry::Context::new();
let offset = missing_file_offsets.get(&lib.id).copied().unwrap_or(0);
let (deleted, next_offset) =
library_maintenance::detect_missing_files_for_library(
&context,
lib,
&exif_dao,
offset,
missing_scan_page_size,
missing_delete_cap,
);
missing_file_offsets.insert(lib.id, next_offset);
if deleted > 0 {
debug!(
"missing-file scan: library '{}' next_offset={}",
lib.name, next_offset
);
}
}
}
// Reconciliation: cross-library, so it runs once per tick
// outside the per-library loop. Idempotent — fast no-op when
// there's nothing to do. Operates on the database alone, no
// filesystem dependency, so it doesn't need a health gate.
// See database::reconcile and CLAUDE.md "Multi-library data
// model" for the rules.
{
let mut conn = image_api::database::connect();
let _ = image_api::database::reconcile::run(&mut conn);
// Back-ref refresh: hash-keyed rows whose
// (library_id, rel_path) tuple no longer matches any
// image_exif row but whose hash still does. After a
// recent→archive move, the missing-file scan removes
// the old image_exif row; this pass repoints face /
// tag / insight back-refs at the surviving location.
// DB-only, no health gate needed — uses what's in
// image_exif as truth.
let _ = library_maintenance::refresh_back_refs(&mut conn);
// Orphan GC: the destructive end of the maintenance
// pipeline. Two-tick consensus + every-library-online
// requirement is enforced inside run_orphan_gc; we
// pass the current all-online flag and the function
// tracks the previous tick's flag in OrphanGcState.
let all_online = library_maintenance::all_libraries_online(&libs, &library_health);
let _ =
library_maintenance::run_orphan_gc(&mut conn, &mut orphan_gc_state, all_online);
}
if is_full_scan {
last_full_scan = now;
}
last_quick_scan = now;
scan_count += 1;
}
});
}
/// Check if a playlist needs to be (re)generated
/// Returns true if:
/// - Playlist doesn't exist, OR
/// - Source video is newer than the playlist
fn playlist_needs_generation(video_path: &Path, playlist_path: &Path) -> bool {
if !playlist_path.exists() {
return true;
}
// Check if source video is newer than playlist
if let (Ok(video_meta), Ok(playlist_meta)) = (
std::fs::metadata(video_path),
std::fs::metadata(playlist_path),
) && let (Ok(video_modified), Ok(playlist_modified)) =
(video_meta.modified(), playlist_meta.modified())
{
return video_modified > playlist_modified;
}
// If we can't determine, assume it needs generation
true
}
fn process_new_files(
library: &libraries::Library,
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
face_dao: Arc<Mutex<Box<dyn faces::FaceDao>>>,
tag_dao: Arc<Mutex<Box<dyn tags::TagDao>>>,
face_client: crate::ai::face_client::FaceClient,
excluded_dirs: &[String],
modified_since: Option<SystemTime>,
playlist_manager: Addr<VideoPlaylistManager>,
preview_generator: Addr<video::actors::PreviewClipGenerator>,
) {
let context = opentelemetry::Context::new();
let thumbs = dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
let thumbnail_directory = Path::new(&thumbs);
let base_path = Path::new(&library.root_path);
// Walk, prune EXCLUDED_DIRS subtrees, and apply image/video + modified_since
// filters. See `file_scan` for why exclusion has to happen at WalkDir
// time (filter_entry) rather than at face-detect time.
let files: Vec<(PathBuf, String)> =
image_api::file_scan::enumerate_indexable_files(base_path, excluded_dirs, modified_since);
if files.is_empty() {
debug!("No files to process");
return;
}
debug!("Found {} files to check", files.len());
// Batch query: Get all EXIF data for these files in one query
let file_paths: Vec<String> = files.iter().map(|(_, rel_path)| rel_path.clone()).collect();
let existing_exif_paths: HashMap<String, bool> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
// Walk is per-library, so scope the lookup so a same-named file
// in another library doesn't make this one look already-indexed.
match dao.get_exif_batch(&context, Some(library.id), &file_paths) {
Ok(exif_records) => exif_records
.into_iter()
.map(|record| (record.file_path, true))
.collect(),
Err(e) => {
error!("Error batch querying EXIF data: {:?}", e);
HashMap::new()
}
}
};
let mut new_files_found = false;
let mut files_needing_row = Vec::new();
// Register every image/video file in image_exif. Rows without EXIF
// still carry library_id, rel_path, content_hash, and size_bytes so
// derivative dedup and DB-indexed sort/filter work for every file,
// not just photos with parseable EXIF.
for (file_path, relative_path) in &files {
// Check both the library-scoped legacy path (current shape) and
// the bare-legacy path (pre-multi-library shape). Either one
// existing means a thumbnail is already on disk for this file.
let scoped_thumb_path = content_hash::library_scoped_legacy_path(
thumbnail_directory,
library.id,
relative_path,
);
let bare_legacy_thumb_path = thumbnail_directory.join(relative_path);
let needs_thumbnail = !scoped_thumb_path.exists()
&& !bare_legacy_thumb_path.exists()
&& !unsupported_thumbnail_sentinel(&scoped_thumb_path).exists()
&& !unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists();
let needs_row = !existing_exif_paths.contains_key(relative_path);
if needs_thumbnail || needs_row {
new_files_found = true;
if needs_thumbnail {
info!("New file detected (missing thumbnail): {}", relative_path);
}
if needs_row {
files_needing_row.push((file_path.clone(), relative_path.clone()));
}
}
}
if !files_needing_row.is_empty() {
info!(
"Registering {} new files in image_exif",
files_needing_row.len()
);
for (file_path, relative_path) in files_needing_row {
let timestamp = Utc::now().timestamp();
// Hash + size from filesystem metadata — always attempted so
// every file gets a content_hash, even when EXIF is absent.
let (content_hash, size_bytes) = match content_hash::compute(&file_path) {
Ok(id) => (Some(id.content_hash), Some(id.size_bytes)),
Err(e) => {
warn!("Failed to hash {}: {:?}", file_path.display(), e);
(None, None)
}
};
// Perceptual hashes (pHash + dHash). Best-effort — None for
// videos and decode failures. Drives near-duplicate detection
// in the Apollo duplicates surface; failure here is non-fatal
// and never blocks indexing.
let perceptual = perceptual_hash::compute(&file_path);
// EXIF is best-effort enrichment. When extraction fails (or the
// file type doesn't support EXIF) we still store a row with all
// EXIF fields NULL; the file remains visible to sort-by-date
// and tag queries via its rel_path and filesystem timestamps.
let exif_fields = if exif::supports_exif(&file_path) {
match exif::extract_exif_from_path(&file_path) {
Ok(data) => Some(data),
Err(e) => {
debug!(
"No EXIF or parse error for {}: {:?}",
file_path.display(),
e
);
None
}
}
} else {
None
};
// Canonical date_taken via the waterfall — kamadak-exif (already
// computed above) → exiftool fallback for videos / MakerNote /
// QuickTime → filename regex → earliest_fs_time. Source is
// recorded so the per-tick backfill drain can re-run weak
// resolutions later.
let resolved_date = date_resolver::resolve_date_taken(
&file_path,
exif_fields.as_ref().and_then(|e| e.date_taken),
);
let insert_exif = InsertImageExif {
library_id: library.id,
file_path: relative_path.clone(),
camera_make: exif_fields.as_ref().and_then(|e| e.camera_make.clone()),
camera_model: exif_fields.as_ref().and_then(|e| e.camera_model.clone()),
lens_model: exif_fields.as_ref().and_then(|e| e.lens_model.clone()),
width: exif_fields.as_ref().and_then(|e| e.width),
height: exif_fields.as_ref().and_then(|e| e.height),
orientation: exif_fields.as_ref().and_then(|e| e.orientation),
gps_latitude: exif_fields
.as_ref()
.and_then(|e| e.gps_latitude.map(|v| v as f32)),
gps_longitude: exif_fields
.as_ref()
.and_then(|e| e.gps_longitude.map(|v| v as f32)),
gps_altitude: exif_fields
.as_ref()
.and_then(|e| e.gps_altitude.map(|v| v as f32)),
focal_length: exif_fields
.as_ref()
.and_then(|e| e.focal_length.map(|v| v as f32)),
aperture: exif_fields
.as_ref()
.and_then(|e| e.aperture.map(|v| v as f32)),
shutter_speed: exif_fields.as_ref().and_then(|e| e.shutter_speed.clone()),
iso: exif_fields.as_ref().and_then(|e| e.iso),
date_taken: resolved_date.map(|r| r.timestamp),
created_time: timestamp,
last_modified: timestamp,
content_hash,
size_bytes,
phash_64: perceptual.map(|h| h.phash_64),
dhash_64: perceptual.map(|h| h.dhash_64),
date_taken_source: resolved_date.map(|r| r.source.as_str().to_string()),
};
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
if let Err(e) = dao.store_exif(&context, insert_exif) {
error!(
"Failed to register {} in image_exif: {:?}",
relative_path, e
);
} else {
debug!("Registered {} in image_exif", relative_path);
}
}
}
// ── Face detection pass ────────────────────────────────────────────
// Run after EXIF writes so newly-registered files have their
// content_hash populated. Skipped wholesale when face_client is
// disabled (no Apollo integration configured) — Phase 3 wires this
// up; the watcher remains usable on legacy deploys.
if face_client.is_enabled() {
// Opportunistic content_hash backfill: photos indexed before
// content-hashing landed (or where the hash compute failed
// silently on insert) end up in image_exif with NULL
// content_hash. build_face_candidates keys on content_hash, so
// those files would never become candidates without backfill.
// Idempotent — subsequent scans see the populated hashes and
// no-op. The dedicated `backfill_hashes` binary is still the
// right tool for very large legacy libraries; this branch
// ensures small/medium deploys self-heal without operator
// action.
backfill_missing_content_hashes(&context, &files, library, &exif_dao);
let candidates = build_face_candidates(&context, library, &files, &exif_dao, &face_dao);
debug!(
"face_watch: scan tick — {} image file(s) walked, {} candidate(s) (library '{}', modified_since={})",
files.iter().filter(|(p, _)| !is_video_file(p)).count(),
candidates.len(),
library.name,
modified_since.is_some(),
);
if !candidates.is_empty() {
face_watch::run_face_detection_pass(
library,
excluded_dirs,
&face_client,
Arc::clone(&face_dao),
Arc::clone(&tag_dao),
candidates,
);
}
}
// Check for videos that need HLS playlists
let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
let mut videos_needing_playlists = Vec::new();
for (file_path, _relative_path) in &files {
if is_video_file(file_path) {
// Construct expected playlist path
let playlist_filename =
format!("{}.m3u8", file_path.file_name().unwrap().to_string_lossy());
let playlist_path = Path::new(&video_path_base).join(&playlist_filename);
// Check if playlist needs (re)generation
if playlist_needs_generation(file_path, &playlist_path) {
videos_needing_playlists.push(file_path.clone());
}
}
}
// Send queue request to playlist manager
if !videos_needing_playlists.is_empty() {
playlist_manager.do_send(QueueVideosMessage {
video_paths: videos_needing_playlists,
});
}
// Check for videos that need preview clips
// Collect (full_path, relative_path) for video files
let video_files: Vec<(String, String)> = files
.iter()
.filter(|(file_path, _)| is_video_file(file_path))
.map(|(file_path, rel_path)| (file_path.to_string_lossy().to_string(), rel_path.clone()))
.collect();
if !video_files.is_empty() {
// Query DB using relative paths (consistent with how GET/POST handlers store them)
let video_rel_paths: Vec<String> = video_files.iter().map(|(_, rel)| rel.clone()).collect();
let existing_previews: HashMap<String, String> = {
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
match dao.get_previews_batch(&context, &video_rel_paths) {
Ok(clips) => clips
.into_iter()
.map(|clip| (clip.file_path, clip.status))
.collect(),
Err(e) => {
error!("Error batch querying preview clips: {:?}", e);
HashMap::new()
}
}
};
for (full_path, relative_path) in &video_files {
let status = existing_previews.get(relative_path).map(|s| s.as_str());
let needs_preview = match status {
None => true, // No record at all
Some("failed") => true, // Retry failed
Some("pending") => true, // Stale pending from previous run
_ => false, // processing or complete
};
if needs_preview {
// Insert pending record using relative path
if status.is_none() {
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.insert_preview(&context, relative_path, "pending");
}
// Send full path in the message — the actor will derive relative path from it
preview_generator.do_send(GeneratePreviewClipMessage {
video_path: full_path.clone(),
});
}
}
}
// Generate thumbnails for all files that need them
if new_files_found {
info!("Processing thumbnails for new files...");
create_thumbnails(std::slice::from_ref(library), excluded_dirs);
}
// Reconciliation: on a full scan, prune image_exif rows whose rel_path no
// longer exists on disk for this library. Keeps the DB in parity so
// downstream DB-backed listings (e.g. recursive /photos) don't return
// phantom files. Skipped on quick scans — those only look at recently
// modified files and can't distinguish "missing" from "unchanged".
if modified_since.is_none() {
let disk_paths: HashSet<String> = files.iter().map(|(_, rel)| rel.clone()).collect();
let db_paths: Vec<String> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_rel_paths_for_library(&context, library.id)
.unwrap_or_else(|e| {
error!(
"Reconciliation: failed to load image_exif rel_paths for lib {}: {:?}",
library.id, e
);
Vec::new()
})
};
let stale: Vec<String> = db_paths
.into_iter()
.filter(|p| !disk_paths.contains(p))
.collect();
if !stale.is_empty() {
info!(
"Reconciliation: pruning {} stale image_exif rows for library '{}'",
stale.len(),
library.name
);
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
for rel in &stale {
if let Err(e) = dao.delete_exif_by_library(&context, library.id, rel) {
warn!(
"Reconciliation: failed to delete {} (lib {}): {:?}",
rel, library.id, e
);
}
}
}
}
}
/// Compute and persist content_hash for image_exif rows where it's NULL.
///
/// Bounded per call by `FACE_HASH_BACKFILL_MAX_PER_TICK` (default 500) so
/// a watcher tick on a large legacy library doesn't block for hours
/// blake3-ing every photo at once. Subsequent scans pick up the rest.
/// For 50k+ libraries the dedicated `cargo run --bin backfill_hashes`
/// is still faster (it doesn't fight a watcher loop for the DAO mutex).
/// Drain unhashed image_exif rows by querying them directly, independent
/// of the filesystem walk. Quick scans only walk recently-modified
/// files, so a backlog of pre-existing unhashed rows never enters
/// `process_new_files`'s candidate set — left alone, it would only
/// drain on full scans (default once an hour). Calling this every tick
/// keeps the face-detection backlog moving regardless.
///
/// Returns the number of rows successfully backfilled this pass.
fn backfill_unhashed_backlog(
context: &opentelemetry::Context,
library: &libraries::Library,
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
) -> usize {
let cap: i64 = dotenv::var("FACE_HASH_BACKFILL_MAX_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &i64| *n > 0)
.unwrap_or(2000);
// Fetch up to cap+1 rows so we can tell "more remain" without a
// separate count query. Across libraries — there's no per-library
// filter on get_rows_missing_hash today — but we only ever update
// rows whose library_id matches the caller's library, so other
// libraries' rows just get skipped here and picked up on the next
// library's tick. Negligible cost given the cap.
let rows: Vec<(i32, String)> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_rows_missing_hash(context, cap + 1)
.unwrap_or_default()
};
if rows.is_empty() {
return 0;
}
let more_than_cap = rows.len() as i64 > cap;
let base_path = std::path::Path::new(&library.root_path);
let mut backfilled = 0usize;
let mut errors = 0usize;
let mut skipped_other_lib = 0usize;
for (lib_id, rel_path) in rows.iter().take(cap as usize) {
if *lib_id != library.id {
skipped_other_lib += 1;
continue;
}
let abs = base_path.join(rel_path);
if !abs.exists() {
// File walked away — the watcher's reconciliation pass will
// remove the orphan exif row eventually.
continue;
}
match content_hash::compute(&abs) {
Ok(id) => {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
if let Err(e) = dao.backfill_content_hash(
context,
library.id,
rel_path,
&id.content_hash,
id.size_bytes,
) {
warn!(
"face_watch: backfill_content_hash failed for {}: {:?}",
rel_path, e
);
errors += 1;
} else {
backfilled += 1;
}
}
Err(e) => {
debug!(
"face_watch: hash compute failed for {} ({:?})",
abs.display(),
e
);
errors += 1;
}
}
}
if backfilled > 0 || errors > 0 || more_than_cap {
info!(
"face_watch: backfill pass for library '{}': hashed {} ({} error(s), {} skipped to other libraries; {} cap, more_remain={})",
library.name, backfilled, errors, skipped_other_lib, cap, more_than_cap
);
}
backfilled
}
/// Drain image_exif rows whose `date_taken` was never resolved or was
/// resolved by the weakest fallback (`fs_time`). Runs the canonical-date
/// waterfall — exiftool batch (one subprocess for the whole tick's
/// rows) → filename regex → earliest_fs_time — and persists each
/// resolution with its source tag. Capped per tick by
/// `DATE_BACKFILL_MAX_PER_TICK` (default 500) so a 14k-row library
/// drains over a few quick-scan ticks without blocking the watcher.
///
/// kamadak-exif is intentionally skipped here: the row already has a
/// NULL date_taken because the ingest path's kamadak-exif call returned
/// nothing, and re-running it would just produce the same answer.
/// exiftool is the meaningful new attempt — it handles videos and
/// MakerNote-hosted dates kamadak can't reach.
fn backfill_missing_date_taken(
context: &opentelemetry::Context,
library: &libraries::Library,
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
) -> usize {
let cap: i64 = dotenv::var("DATE_BACKFILL_MAX_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &i64| *n > 0)
.unwrap_or(500);
let rows: Vec<(i32, String)> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_rows_needing_date_backfill(context, library.id, cap + 1)
.unwrap_or_default()
};
if rows.is_empty() {
return 0;
}
let more_than_cap = rows.len() as i64 > cap;
let base_path = std::path::Path::new(&library.root_path);
// Build absolute paths and drop rows whose files no longer exist —
// the missing-file scan in library_maintenance retires deleted rows
// separately. Without this filter, NULL-date rows for missing files
// would loop through the drain forever (no source can resolve them).
let mut existing: Vec<(String, PathBuf)> = Vec::with_capacity(rows.len() as usize);
for (_, rel_path) in rows.iter().take(cap as usize) {
let abs = base_path.join(rel_path);
if abs.exists() {
existing.push((rel_path.clone(), abs));
}
}
if existing.is_empty() {
return 0;
}
// One exiftool subprocess for the whole batch; the resolver falls
// through to filename / fs_time per file when exiftool can't supply
// a date (or isn't installed at all).
let paths: Vec<PathBuf> = existing.iter().map(|(_, p)| p.clone()).collect();
let resolved = date_resolver::resolve_dates_batch(&paths, &HashMap::new());
let mut backfilled = 0usize;
let mut unresolved = 0usize;
let mut by_source: HashMap<&'static str, usize> = HashMap::new();
{
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
for (rel_path, abs) in &existing {
let Some(rd) = resolved.get(abs).copied() else {
unresolved += 1;
continue;
};
match dao.backfill_date_taken(
context,
library.id,
rel_path,
rd.timestamp,
rd.source.as_str(),
) {
Ok(()) => {
backfilled += 1;
*by_source.entry(rd.source.as_str()).or_insert(0) += 1;
}
Err(e) => {
warn!(
"date_backfill: update failed for lib {} {}: {:?}",
library.id, rel_path, e
);
}
}
}
}
if backfilled > 0 || unresolved > 0 || more_than_cap {
info!(
"date_backfill: library '{}': resolved {} ({:?}), {} unresolved, cap={}, more_remain={}",
library.name, backfilled, by_source, unresolved, cap, more_than_cap
);
}
backfilled
}
/// Per-tick face-detection drain. Pulls a capped batch of hashed-but-
/// unscanned image_exif rows directly via the FaceDao anti-join and
/// hands them to the existing detection pass. Runs on every tick (not
/// just full scans) so the backlog moves at quick-scan cadence.
fn process_face_backlog(
context: &opentelemetry::Context,
library: &libraries::Library,
face_client: &crate::ai::face_client::FaceClient,
face_dao: &Arc<Mutex<Box<dyn faces::FaceDao>>>,
tag_dao: &Arc<Mutex<Box<dyn tags::TagDao>>>,
excluded_dirs: &[String],
) {
let cap: i64 = dotenv::var("FACE_BACKLOG_MAX_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &i64| *n > 0)
.unwrap_or(64);
let rows: Vec<(String, String)> = {
let mut dao = face_dao.lock().expect("face dao");
match dao.list_unscanned_candidates(context, library.id, cap) {
Ok(r) => r,
Err(e) => {
warn!(
"face_watch: list_unscanned_candidates failed for library '{}': {:?}",
library.name, e
);
return;
}
}
};
if rows.is_empty() {
return;
}
info!(
"face_watch: backlog drain — running detection on {} candidate(s) for library '{}' (cap={})",
rows.len(),
library.name,
cap
);
let candidates: Vec<face_watch::FaceCandidate> = rows
.into_iter()
.map(|(rel_path, content_hash)| face_watch::FaceCandidate {
rel_path,
content_hash,
})
.collect();
face_watch::run_face_detection_pass(
library,
excluded_dirs,
face_client,
Arc::clone(face_dao),
Arc::clone(tag_dao),
candidates,
);
}
fn backfill_missing_content_hashes(
context: &opentelemetry::Context,
files: &[(PathBuf, String)],
library: &libraries::Library,
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
) {
let image_paths: Vec<String> = files
.iter()
.filter(|(p, _)| !is_video_file(p))
.map(|(_, rel)| rel.clone())
.collect();
if image_paths.is_empty() {
return;
}
let exif_records = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_exif_batch(context, Some(library.id), &image_paths)
.unwrap_or_default()
};
// Cheap lookup back from rel_path → absolute file_path so
// content_hash::compute can read the bytes.
let path_by_rel: HashMap<String, &PathBuf> =
files.iter().map(|(p, rel)| (rel.clone(), p)).collect();
let cap: usize = dotenv::var("FACE_HASH_BACKFILL_MAX_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &usize| *n > 0)
.unwrap_or(2000);
// Count the unhashed backlog up front so we can surface "still needs
// backfill: N" in the log — without it, a face-scan that's stuck at
// 44% looks stalled when really it's chipping through hashes.
let unhashed_total = exif_records
.iter()
.filter(|r| r.content_hash.is_none())
.count();
let mut backfilled = 0usize;
let mut errors = 0usize;
for record in &exif_records {
// Cap on successes only — earlier this counted errors too, so a
// pocket of chronically-unhashable files at the front of the
// table (vanished mid-scan, permission denied, etc.) burned the
// budget every tick and the rest of the backlog never advanced.
// Errors are still bounded by `unhashed_total` (the loop walks
// each unhashed record at most once per tick).
if backfilled >= cap {
break;
}
if record.content_hash.is_some() {
continue;
}
let Some(file_path) = path_by_rel.get(&record.file_path) else {
// Walked file went missing between the directory scan and now;
// next tick will retry naturally.
continue;
};
match content_hash::compute(file_path) {
Ok(id) => {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
if let Err(e) = dao.backfill_content_hash(
context,
library.id,
&record.file_path,
&id.content_hash,
id.size_bytes,
) {
warn!(
"face_watch: backfill_content_hash failed for {}: {:?}",
record.file_path, e
);
errors += 1;
} else {
backfilled += 1;
}
}
Err(e) => {
debug!(
"face_watch: hash compute failed for {} ({:?})",
file_path.display(),
e
);
errors += 1;
}
}
}
// Always log when there's an unhashed backlog so an operator
// looking at "scan stuck at 44%" can see backfill is running and
// how much remains. Quiet only when there's nothing to do.
if unhashed_total > 0 || backfilled > 0 || errors > 0 {
let remaining = unhashed_total.saturating_sub(backfilled);
info!(
"face_watch: backfilled {}/{} content_hash for library '{}' ({} error(s); {} still need backfill; cap={})",
backfilled, unhashed_total, library.name, errors, remaining, cap
);
}
}
/// Build the face-detection candidate list for a scan tick.
///
/// We need `(rel_path, content_hash)` for every image file that has a
/// content_hash recorded in image_exif but no row in face_detections yet.
/// Re-querying image_exif here picks up rows the EXIF write loop just
/// inserted alongside any pre-existing rows the watcher walked over —
/// covers both new uploads and the initial backlog scan.
fn build_face_candidates(
context: &opentelemetry::Context,
library: &libraries::Library,
files: &[(PathBuf, String)],
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
face_dao: &Arc<Mutex<Box<dyn faces::FaceDao>>>,
) -> Vec<face_watch::FaceCandidate> {
// Restrict to image files; videos aren't face-scanned in v1 (kamadak
// doesn't even register them in image_exif).
let image_paths: Vec<String> = files
.iter()
.filter(|(p, _)| !is_video_file(p))
.map(|(_, rel)| rel.clone())
.collect();
if image_paths.is_empty() {
return Vec::new();
}
let exif_records = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_exif_batch(context, Some(library.id), &image_paths)
.unwrap_or_default()
};
// rel_path → content_hash (only rows with a hash; without one we have
// nothing to key face data against).
let mut hash_by_path: HashMap<String, String> = HashMap::with_capacity(exif_records.len());
for record in exif_records {
if let Some(h) = record.content_hash {
hash_by_path.insert(record.file_path, h);
}
}
let mut candidates = Vec::new();
let mut dao = face_dao.lock().expect("face dao");
for rel_path in image_paths {
let Some(hash) = hash_by_path.get(&rel_path) else {
continue;
};
match dao.already_scanned(context, hash) {
Ok(true) => continue,
Ok(false) => candidates.push(face_watch::FaceCandidate {
rel_path,
content_hash: hash.clone(),
}),
Err(e) => {
warn!("face_watch: already_scanned errored for {}: {:?}", hash, e);
}
}
}
candidates
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::Claims;
use crate::database::PreviewDao;
use crate::testhelpers::TestPreviewDao;
use actix_web::web::Data;
fn make_token() -> String {
let claims = Claims::valid_user("1".to_string());
jsonwebtoken::encode(
&jsonwebtoken::Header::default(),
&claims,
&jsonwebtoken::EncodingKey::from_secret(b"test_key"),
)
.unwrap()
}
fn make_preview_dao(dao: TestPreviewDao) -> Data<Mutex<Box<dyn PreviewDao>>> {
Data::new(Mutex::new(Box::new(dao) as Box<dyn PreviewDao>))
}
#[actix_rt::test]
async fn test_get_preview_status_returns_pending_for_unknown() {
let dao = TestPreviewDao::new();
let preview_dao = make_preview_dao(dao);
let app_state = Data::new(AppState::test_state());
let token = make_token();
let app = actix_web::test::init_service(
App::new()
.service(get_preview_status)
.app_data(app_state)
.app_data(preview_dao.clone()),
)
.await;
let req = actix_web::test::TestRequest::post()
.uri("/video/preview/status")
.insert_header(("Authorization", format!("Bearer {}", token)))
.set_json(serde_json::json!({"paths": ["photos/new_video.mp4"]}))
.to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
let body: serde_json::Value = actix_web::test::read_body_json(resp).await;
let previews = body["previews"].as_array().unwrap();
assert_eq!(previews.len(), 1);
assert_eq!(previews[0]["status"], "pending");
// Verify the DAO now has a pending record
let mut dao_lock = preview_dao.lock().unwrap();
let ctx = opentelemetry::Context::new();
let clip = dao_lock.get_preview(&ctx, "photos/new_video.mp4").unwrap();
assert!(clip.is_some());
assert_eq!(clip.unwrap().status, "pending");
}
#[actix_rt::test]
async fn test_get_preview_status_returns_complete_with_url() {
let mut dao = TestPreviewDao::new();
let ctx = opentelemetry::Context::new();
dao.insert_preview(&ctx, "photos/done.mp4", "pending")
.unwrap();
dao.update_status(
&ctx,
"photos/done.mp4",
"complete",
Some(9.5),
Some(500000),
None,
)
.unwrap();
let preview_dao = make_preview_dao(dao);
let app_state = Data::new(AppState::test_state());
let token = make_token();
let app = actix_web::test::init_service(
App::new()
.service(get_preview_status)
.app_data(app_state)
.app_data(preview_dao),
)
.await;
let req = actix_web::test::TestRequest::post()
.uri("/video/preview/status")
.insert_header(("Authorization", format!("Bearer {}", token)))
.set_json(serde_json::json!({"paths": ["photos/done.mp4"]}))
.to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
let body: serde_json::Value = actix_web::test::read_body_json(resp).await;
let previews = body["previews"].as_array().unwrap();
assert_eq!(previews.len(), 1);
assert_eq!(previews[0]["status"], "complete");
assert!(
previews[0]["preview_url"]
.as_str()
.unwrap()
.contains("photos%2Fdone.mp4")
);
}
#[actix_rt::test]
async fn test_get_preview_status_rejects_over_200_paths() {
let dao = TestPreviewDao::new();
let preview_dao = make_preview_dao(dao);
let app_state = Data::new(AppState::test_state());
let token = make_token();
let app = actix_web::test::init_service(
App::new()
.service(get_preview_status)
.app_data(app_state)
.app_data(preview_dao),
)
.await;
let paths: Vec<String> = (0..201).map(|i| format!("video_{}.mp4", i)).collect();
let req = actix_web::test::TestRequest::post()
.uri("/video/preview/status")
.insert_header(("Authorization", format!("Bearer {}", token)))
.set_json(serde_json::json!({"paths": paths}))
.to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), 400);
}
#[actix_rt::test]
async fn test_get_preview_status_mixed_statuses() {
let mut dao = TestPreviewDao::new();
let ctx = opentelemetry::Context::new();
dao.insert_preview(&ctx, "a.mp4", "pending").unwrap();
dao.insert_preview(&ctx, "b.mp4", "pending").unwrap();
dao.update_status(&ctx, "b.mp4", "complete", Some(10.0), Some(100000), None)
.unwrap();
let preview_dao = make_preview_dao(dao);
let app_state = Data::new(AppState::test_state());
let token = make_token();
let app = actix_web::test::init_service(
App::new()
.service(get_preview_status)
.app_data(app_state)
.app_data(preview_dao),
)
.await;
let req = actix_web::test::TestRequest::post()
.uri("/video/preview/status")
.insert_header(("Authorization", format!("Bearer {}", token)))
.set_json(serde_json::json!({"paths": ["a.mp4", "b.mp4", "c.mp4"]}))
.to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
let body: serde_json::Value = actix_web::test::read_body_json(resp).await;
let previews = body["previews"].as_array().unwrap();
assert_eq!(previews.len(), 3);
// a.mp4 is pending
assert_eq!(previews[0]["path"], "a.mp4");
assert_eq!(previews[0]["status"], "pending");
// b.mp4 is complete with URL
assert_eq!(previews[1]["path"], "b.mp4");
assert_eq!(previews[1]["status"], "complete");
assert!(previews[1]["preview_url"].is_string());
// c.mp4 was not found — handler inserts pending
assert_eq!(previews[2]["path"], "c.mp4");
assert_eq!(previews[2]["status"], "pending");
}
/// Verifies that the status endpoint re-queues generation for stale
/// "pending" and "failed" records (e.g., after a server restart or
/// when clip files were deleted). The do_send to the actor exercises
/// the re-queue code path; the actor runs against temp dirs so it
/// won't panic.
#[actix_rt::test]
async fn test_get_preview_status_requeues_pending_and_failed() {
let mut dao = TestPreviewDao::new();
let ctx = opentelemetry::Context::new();
// Simulate stale records left from a previous server run
dao.insert_preview(&ctx, "stale/pending.mp4", "pending")
.unwrap();
dao.insert_preview(&ctx, "stale/failed.mp4", "pending")
.unwrap();
dao.update_status(
&ctx,
"stale/failed.mp4",
"failed",
None,
None,
Some("ffmpeg error"),
)
.unwrap();
let preview_dao = make_preview_dao(dao);
let app_state = Data::new(AppState::test_state());
let token = make_token();
let app = actix_web::test::init_service(
App::new()
.service(get_preview_status)
.app_data(app_state)
.app_data(preview_dao),
)
.await;
let req = actix_web::test::TestRequest::post()
.uri("/video/preview/status")
.insert_header(("Authorization", format!("Bearer {}", token)))
.set_json(serde_json::json!({
"paths": ["stale/pending.mp4", "stale/failed.mp4"]
}))
.to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
let body: serde_json::Value = actix_web::test::read_body_json(resp).await;
let previews = body["previews"].as_array().unwrap();
assert_eq!(previews.len(), 2);
// Both records are returned with their current status
assert_eq!(previews[0]["path"], "stale/pending.mp4");
assert_eq!(previews[0]["status"], "pending");
assert!(previews[0].get("preview_url").is_none());
assert_eq!(previews[1]["path"], "stale/failed.mp4");
assert_eq!(previews[1]["status"], "failed");
assert!(previews[1].get("preview_url").is_none());
}
}