Files
ImageApi/src/database/mod.rs
Cameron Cordes 263e27e108 multi-library: handoff + orphan GC with two-tick consensus
Branch C of the multi-library data-model rollout. Implements the
operational maintenance pipeline pinned in CLAUDE.md → "Multi-library
data model" / "Library availability and safety". Branches A and B
land first; this branch builds on top.

New module: src/library_maintenance.rs

Three idempotent passes the watcher runs every tick after the
per-library ingest loop:

1. Missing-file scan (per online library)

   For each Online library, load a paginated page of image_exif rows
   (IMAGE_EXIF_MISSING_SCAN_PAGE_SIZE, default 500), stat() each one,
   and delete rows whose source file is NotFound. Permission/IO
   errors are skipped, never deleted. Capped at
   IMAGE_EXIF_MISSING_DELETE_CAP_PER_TICK (default 200) per library
   per tick — so a pathological mount that returns NotFound for
   everything can't wipe the table in one cycle. Cursor advances
   across ticks, wraps on partial-page returns, and naturally cycles
   through the entire library over many minutes. Skipped wholesale
   for Stale libraries via the existing probe gate.

2. Back-ref refresh (DB-only)

   For face_detections / tagged_photo / photo_insights: any
   hash-keyed row whose (library_id, rel_path) no longer matches an
   image_exif row, but whose content_hash does, is repointed at a
   surviving image_exif location. Pure SQL with EXISTS guards so
   rows whose hash is fully orphaned are left alone (the orphan GC
   handles those). Idempotent; no availability gate needed.

   This is what makes a recent → archive move invisible to readers:
   when pass 1 retires the lib-A row, pass 2 pivots tags / faces /
   insights to lib-B's surviving path before any client notices.

3. Orphan GC (destructive)

   Hash-keyed derived rows whose content_hash has no image_exif
   referent are GC-eligible. Two-tick consensus: a hash must be
   observed orphaned on two consecutive ticks AND every library must
   be Online for both. A single Stale tick within the window cancels
   all pending deletes (they remain marked but won't be promoted) —
   they're re-evaluated next tick. The pending set lives in
   OrphanGcState (in-memory); a watcher restart resets it, which can
   only delay a delete, never cause one. Hashes that re-appear in
   image_exif between ticks are "revived" from the pending set
   (handles transient share unmount / remount).

Two new ExifDao methods:
  - list_rel_paths_for_library_page(library_id, limit, offset) for
    the paginated missing-file scan.
  - (count_for_library landed in Branch A.)

Watcher wiring (main.rs)

Per-library: missing-file scan inside the existing per-library
loop, after process_new_files, gated by the same probe check that
already protects ingest. After the loop: reconcile (Branch B),
back-ref refresh, then run_orphan_gc. The maintenance connection is
opened once per tick (image_api::database::connect), used by all
three DB-only passes, and dropped at end of tick.

CLAUDE.md gains a "Maintenance pipeline" subsection that describes
the three passes and their interaction with the existing
availability-and-safety policy.

Tests: 225 pass (217 from Branch B + 8 new in library_maintenance
covering back-ref refresh including the fully-orphaned no-op case,
two-tick GC consensus, Stale-tick consensus reset, image_exif
re-appearance revival, multi-table delete, and the
all_libraries_online helper).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 16:27:53 +00:00

1313 lines
46 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use bcrypt::{DEFAULT_COST, hash, verify};
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use crate::database::models::{
Favorite, ImageExif, InsertFavorite, InsertImageExif, InsertUser, User,
};
use crate::otel::trace_db_call;
pub mod calendar_dao;
pub mod daily_summary_dao;
pub mod insights_dao;
pub mod knowledge_dao;
pub mod location_dao;
pub mod models;
pub mod preview_dao;
pub mod reconcile;
pub mod schema;
pub mod search_dao;
pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao};
pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao};
pub use insights_dao::{InsightDao, SqliteInsightDao};
pub use knowledge_dao::{
EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, RecentActivity,
SqliteKnowledgeDao,
};
pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao};
pub use preview_dao::{PreviewDao, SqlitePreviewDao};
pub use search_dao::{SearchHistoryDao, SqliteSearchHistoryDao};
pub trait UserDao {
fn create_user(&mut self, user: &str, password: &str) -> Option<User>;
fn get_user(&mut self, user: &str, password: &str) -> Option<User>;
fn user_exists(&mut self, user: &str) -> bool;
}
pub struct SqliteUserDao {
connection: SqliteConnection,
}
impl Default for SqliteUserDao {
fn default() -> Self {
Self::new()
}
}
impl SqliteUserDao {
pub fn new() -> Self {
Self {
connection: connect(),
}
}
}
#[cfg(test)]
pub mod test {
use diesel::{Connection, SqliteConnection};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
const DB_MIGRATIONS: EmbeddedMigrations = embed_migrations!();
pub fn in_memory_db_connection() -> SqliteConnection {
let mut connection = SqliteConnection::establish(":memory:")
.expect("Unable to create in-memory db connection");
connection
.run_pending_migrations(DB_MIGRATIONS)
.expect("Failure running DB migrations");
connection
}
}
impl UserDao for SqliteUserDao {
// TODO: Should probably use Result here
fn create_user(&mut self, user: &str, pass: &str) -> Option<User> {
use schema::users::dsl::*;
let hashed = hash(pass, DEFAULT_COST);
if let Ok(hash) = hashed {
diesel::insert_into(users)
.values(InsertUser {
username: user,
password: &hash,
})
.execute(&mut self.connection)
.unwrap();
users
.filter(username.eq(username))
.load::<User>(&mut self.connection)
.unwrap()
.first()
.cloned()
} else {
None
}
}
fn get_user(&mut self, user: &str, pass: &str) -> Option<User> {
use schema::users::dsl::*;
match users
.filter(username.eq(user))
.load::<User>(&mut self.connection)
.unwrap_or_default()
.first()
{
Some(u) if verify(pass, &u.password).unwrap_or(false) => Some(u.clone()),
_ => None,
}
}
fn user_exists(&mut self, user: &str) -> bool {
use schema::users::dsl::*;
!users
.filter(username.eq(user))
.load::<User>(&mut self.connection)
.unwrap_or_default()
.is_empty()
}
}
pub fn connect() -> SqliteConnection {
let db_url = dotenv::var("DATABASE_URL").expect("DATABASE_URL must be set");
let mut conn = SqliteConnection::establish(&db_url).expect("Error connecting to DB");
// Each DAO opens its own connection (13+ across the app) and they all
// share one DB file. Without WAL, a writer holds an exclusive lock
// that blocks readers — `load_persons` racing the face-watch write
// storm errors instantly with `database is locked`. WAL lets readers
// and one writer coexist; busy_timeout makes any remaining
// writer-vs-writer contention wait instead of failing fast.
// synchronous=NORMAL is the standard WAL pairing (FULL is for
// rollback-journal durability; we accept the narrow last-fsync
// window for the 210× write throughput).
use diesel::connection::SimpleConnection;
// foreign_keys = ON is per-connection in SQLite (off by default), so
// it has to be set here alongside the other pragmas. Without it
// every `REFERENCES … ON DELETE CASCADE / SET NULL` clause in the
// schema is documentation-only — orphan rows would survive the
// referenced row's deletion. With it, the cascade fires
// automatically and code that previously did manual two-step
// cleanup (delete child rows, then parent) becomes redundant but
// still correct.
conn.batch_execute(
"PRAGMA journal_mode = WAL; \
PRAGMA busy_timeout = 5000; \
PRAGMA synchronous = NORMAL; \
PRAGMA foreign_keys = ON;",
)
.expect("set sqlite pragmas");
conn
}
#[derive(Debug)]
pub struct DbError {
pub kind: DbErrorKind,
}
impl DbError {
fn new(kind: DbErrorKind) -> Self {
DbError { kind }
}
fn exists() -> Self {
DbError::new(DbErrorKind::AlreadyExists)
}
}
#[derive(Debug, PartialEq)]
pub enum DbErrorKind {
AlreadyExists,
InsertError,
QueryError,
UpdateError,
}
pub trait FavoriteDao: Sync + Send {
fn add_favorite(&mut self, user_id: i32, favorite_path: &str) -> Result<usize, DbError>;
fn remove_favorite(&mut self, user_id: i32, favorite_path: String);
fn get_favorites(&mut self, user_id: i32) -> Result<Vec<Favorite>, DbError>;
#[allow(dead_code)]
fn update_path(&mut self, old_path: &str, new_path: &str) -> Result<(), DbError>;
#[allow(dead_code)]
fn get_all_paths(&mut self) -> Result<Vec<String>, DbError>;
}
pub struct SqliteFavoriteDao {
connection: Arc<Mutex<SqliteConnection>>,
}
impl Default for SqliteFavoriteDao {
fn default() -> Self {
Self::new()
}
}
impl SqliteFavoriteDao {
pub fn new() -> Self {
SqliteFavoriteDao {
connection: Arc::new(Mutex::new(connect())),
}
}
}
impl FavoriteDao for SqliteFavoriteDao {
fn add_favorite(&mut self, user_id: i32, favorite_path: &str) -> Result<usize, DbError> {
use schema::favorites::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get FavoriteDao");
if favorites
.filter(userid.eq(user_id).and(rel_path.eq(&favorite_path)))
.first::<Favorite>(connection.deref_mut())
.is_err()
{
diesel::insert_into(favorites)
.values(InsertFavorite {
userid: &user_id,
path: favorite_path,
})
.execute(connection.deref_mut())
.map_err(|_| DbError::new(DbErrorKind::InsertError))
} else {
Err(DbError::exists())
}
}
fn remove_favorite(&mut self, user_id: i32, favorite_path: String) {
use schema::favorites::dsl::*;
diesel::delete(favorites)
.filter(userid.eq(user_id).and(rel_path.eq(favorite_path)))
.execute(self.connection.lock().unwrap().deref_mut())
.unwrap();
}
fn get_favorites(&mut self, user_id: i32) -> Result<Vec<Favorite>, DbError> {
use schema::favorites::dsl::*;
favorites
.filter(userid.eq(user_id))
.load::<Favorite>(self.connection.lock().unwrap().deref_mut())
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn update_path(&mut self, old_path: &str, new_path: &str) -> Result<(), DbError> {
use schema::favorites::dsl::*;
diesel::update(favorites.filter(rel_path.eq(old_path)))
.set(rel_path.eq(new_path))
.execute(self.connection.lock().unwrap().deref_mut())
.map_err(|_| DbError::new(DbErrorKind::UpdateError))?;
Ok(())
}
fn get_all_paths(&mut self) -> Result<Vec<String>, DbError> {
use schema::favorites::dsl::*;
favorites
.select(rel_path)
.distinct()
.load(self.connection.lock().unwrap().deref_mut())
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
}
#[allow(dead_code)]
pub trait ExifDao: Sync + Send {
fn store_exif(
&mut self,
context: &opentelemetry::Context,
exif_data: InsertImageExif,
) -> Result<ImageExif, DbError>;
fn get_exif(
&mut self,
context: &opentelemetry::Context,
file_path: &str,
) -> Result<Option<ImageExif>, DbError>;
fn update_exif(
&mut self,
context: &opentelemetry::Context,
exif_data: InsertImageExif,
) -> Result<ImageExif, DbError>;
fn delete_exif(
&mut self,
context: &opentelemetry::Context,
file_path: &str,
) -> Result<(), DbError>;
fn get_all_with_date_taken(
&mut self,
context: &opentelemetry::Context,
library_id: Option<i32>,
) -> Result<Vec<(String, i64)>, DbError>;
/// Batch load EXIF data for multiple file paths (single query). When
/// `library_id = Some(id)` the lookup is keyed on `(library_id,
/// rel_path)`; cross-library duplicates with the same rel_path are
/// excluded. `None` keeps the legacy rel-path-only behavior — used by
/// the union-mode `/photos` listing, which already disambiguates by
/// `(file_path, library_id)` in the caller.
fn get_exif_batch(
&mut self,
context: &opentelemetry::Context,
library_id: Option<i32>,
file_paths: &[String],
) -> Result<Vec<ImageExif>, DbError>;
/// Query files by EXIF criteria with optional filters. `library_id =
/// Some(id)` restricts to that library; `None` spans every library
/// (used by the unscoped `/photos` form). The composite
/// `(library_id, date_taken)` index added in the multi_library
/// migration depends on `library_id` being part of the WHERE clause —
/// callers that have a library context must pass it.
fn query_by_exif(
&mut self,
context: &opentelemetry::Context,
library_id: Option<i32>,
camera_make: Option<&str>,
camera_model: Option<&str>,
lens_model: Option<&str>,
gps_bounds: Option<(f64, f64, f64, f64)>, // (min_lat, max_lat, min_lon, max_lon)
date_from: Option<i64>,
date_to: Option<i64>,
) -> Result<Vec<ImageExif>, DbError>;
/// Get distinct camera makes with counts
fn get_camera_makes(
&mut self,
context: &opentelemetry::Context,
) -> Result<Vec<(String, i64)>, DbError>;
/// Update file path in EXIF database
fn update_file_path(
&mut self,
context: &opentelemetry::Context,
old_path: &str,
new_path: &str,
) -> Result<(), DbError>;
/// Get all file paths from EXIF database
fn get_all_file_paths(
&mut self,
context: &opentelemetry::Context,
) -> Result<Vec<String>, DbError>;
/// Get all photos with GPS coordinates
/// Returns Vec<(file_path, latitude, longitude, date_taken)>
#[allow(clippy::type_complexity)]
fn get_all_with_gps(
&mut self,
context: &opentelemetry::Context,
base_path: &str,
recursive: bool,
) -> Result<Vec<(String, f64, f64, Option<i64>)>, DbError>;
/// Return rows that still lack a `content_hash`, oldest first. Used by
/// the `backfill_hashes` binary to batch through the historical
/// backlog. Returns `(library_id, rel_path)` tuples so the caller can
/// resolve each file on disk.
fn get_rows_missing_hash(
&mut self,
context: &opentelemetry::Context,
limit: i64,
) -> Result<Vec<(i32, String)>, DbError>;
/// Persist the computed blake3 hash + file size for an existing row.
fn backfill_content_hash(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
hash: &str,
size_bytes: i64,
) -> Result<(), DbError>;
/// Return the first EXIF row with the given content hash (any library).
/// Used by thumbnail/HLS generation to detect pre-existing derivatives
/// from another library before regenerating.
fn find_by_content_hash(
&mut self,
context: &opentelemetry::Context,
hash: &str,
) -> Result<Option<ImageExif>, DbError>;
/// Given a file instance `(library_id, rel_path)`, return every distinct
/// rel_path in `image_exif` whose `content_hash` matches this file's.
/// Used by tag and insight read-paths so annotations follow content
/// rather than path, even when the same file is indexed under
/// different library roots. Falls back to `[rel_path]` when the file
/// hasn't been hashed yet.
fn get_rel_paths_sharing_content(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
) -> Result<Vec<String>, DbError>;
/// All rel_paths known to live in a given library. Used by search to
/// scope tag-based (path-keyed) hits to a single library after joining
/// through the library-agnostic tag tables.
fn get_rel_paths_for_library(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
) -> Result<Vec<String>, DbError>;
/// Look up a content_hash for a rel_path in *any* library. Useful when
/// the caller has a library-agnostic rel_path (e.g. from tagged_photo)
/// and wants to find content-equivalent siblings without knowing the
/// file's original library.
fn find_content_hash_anywhere(
&mut self,
context: &opentelemetry::Context,
rel_path: &str,
) -> Result<Option<String>, DbError>;
/// Given a content_hash, return all rel_paths carrying that hash.
fn get_rel_paths_by_hash(
&mut self,
context: &opentelemetry::Context,
hash: &str,
) -> Result<Vec<String>, DbError>;
/// Batch version of [`get_rel_paths_by_hash`]. Returns a
/// `hash → Vec<rel_path>` map for every hash that has at least one
/// rel_path. Used by the batch tag lookup endpoint to expand
/// content-hash siblings without firing a query per hash.
fn get_rel_paths_for_hashes(
&mut self,
context: &opentelemetry::Context,
hashes: &[String],
) -> Result<std::collections::HashMap<String, Vec<String>>, DbError>;
/// List `(library_id, rel_path)` pairs for the given libraries, optionally
/// restricted to rows whose rel_path starts with `path_prefix`. When
/// `library_ids` is empty, rows from every library are returned. Used by
/// `/photos` recursive listing to skip the filesystem walk — the watcher
/// keeps image_exif in parity with disk via the reconciliation pass.
fn list_rel_paths_for_libraries(
&mut self,
context: &opentelemetry::Context,
library_ids: &[i32],
path_prefix: Option<&str>,
) -> Result<Vec<(i32, String)>, DbError>;
/// Delete a single image_exif row scoped to `(library_id, rel_path)`.
/// Distinct from `delete_exif`, which matches on rel_path alone and
/// would clobber same-named files across libraries.
fn delete_exif_by_library(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
) -> Result<(), DbError>;
/// Number of image_exif rows for a library. Used by the availability
/// probe to decide whether an empty mount is "fresh" (zero rows: fine)
/// or "the share went offline" (non-zero rows: stale). Zero on query
/// error so a transient DB hiccup doesn't itself cause a Stale flip.
fn count_for_library(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
) -> Result<i64, DbError>;
/// Paginated rel_path listing for a single library, ordered by id
/// ascending. Used by the missing-file detector to scan a library
/// in capped chunks across consecutive watcher ticks rather than
/// stat()ing every row every minute. Returns `(id, rel_path)`.
fn list_rel_paths_for_library_page(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
limit: i64,
offset: i64,
) -> Result<Vec<(i32, String)>, DbError>;
}
pub struct SqliteExifDao {
connection: Arc<Mutex<SqliteConnection>>,
}
impl Default for SqliteExifDao {
fn default() -> Self {
Self::new()
}
}
impl SqliteExifDao {
pub fn new() -> Self {
SqliteExifDao {
connection: Arc::new(Mutex::new(connect())),
}
}
#[cfg(test)]
pub fn from_connection(conn: SqliteConnection) -> Self {
SqliteExifDao {
connection: Arc::new(Mutex::new(conn)),
}
}
}
impl ExifDao for SqliteExifDao {
fn store_exif(
&mut self,
context: &opentelemetry::Context,
exif_data: InsertImageExif,
) -> Result<ImageExif, DbError> {
trace_db_call(context, "insert", "store_exif", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
diesel::insert_into(image_exif)
.values(&exif_data)
.execute(connection.deref_mut())
.map_err(|e| {
log::warn!(
"image_exif insert failed (lib={}, rel_path={:?}): {}",
exif_data.library_id,
exif_data.file_path,
e
);
anyhow::anyhow!("Insert error: {}", e)
})?;
image_exif
.filter(library_id.eq(exif_data.library_id))
.filter(rel_path.eq(&exif_data.file_path))
.first::<ImageExif>(connection.deref_mut())
.map_err(|e| anyhow::anyhow!("Post-insert lookup failed: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::InsertError))
}
fn get_exif(
&mut self,
context: &opentelemetry::Context,
path: &str,
) -> Result<Option<ImageExif>, DbError> {
trace_db_call(context, "query", "get_exif", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// Try both normalized (forward slash) and Windows (backslash) paths
// since database may contain either format
let normalized = path.replace('\\', "/");
let windows_path = path.replace('/', "\\");
match image_exif
.filter(rel_path.eq(&normalized).or(rel_path.eq(&windows_path)))
.first::<ImageExif>(connection.deref_mut())
{
Ok(exif) => Ok(Some(exif)),
Err(diesel::result::Error::NotFound) => Ok(None),
Err(_) => Err(anyhow::anyhow!("Query error")),
}
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn update_exif(
&mut self,
context: &opentelemetry::Context,
exif_data: InsertImageExif,
) -> Result<ImageExif, DbError> {
trace_db_call(context, "update", "update_exif", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
diesel::update(
image_exif
.filter(library_id.eq(exif_data.library_id))
.filter(rel_path.eq(&exif_data.file_path)),
)
.set((
camera_make.eq(&exif_data.camera_make),
camera_model.eq(&exif_data.camera_model),
lens_model.eq(&exif_data.lens_model),
width.eq(&exif_data.width),
height.eq(&exif_data.height),
orientation.eq(&exif_data.orientation),
gps_latitude.eq(&exif_data.gps_latitude),
gps_longitude.eq(&exif_data.gps_longitude),
gps_altitude.eq(&exif_data.gps_altitude),
focal_length.eq(&exif_data.focal_length),
aperture.eq(&exif_data.aperture),
shutter_speed.eq(&exif_data.shutter_speed),
iso.eq(&exif_data.iso),
date_taken.eq(&exif_data.date_taken),
last_modified.eq(&exif_data.last_modified),
))
.execute(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Update error"))?;
image_exif
.filter(library_id.eq(exif_data.library_id))
.filter(rel_path.eq(&exif_data.file_path))
.first::<ImageExif>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn delete_exif(&mut self, context: &opentelemetry::Context, path: &str) -> Result<(), DbError> {
trace_db_call(context, "delete", "delete_exif", |_span| {
use schema::image_exif::dsl::*;
diesel::delete(image_exif.filter(rel_path.eq(path)))
.execute(self.connection.lock().unwrap().deref_mut())
.map(|_| ())
.map_err(|_| anyhow::anyhow!("Delete error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_all_with_date_taken(
&mut self,
context: &opentelemetry::Context,
lib_id: Option<i32>,
) -> Result<Vec<(String, i64)>, DbError> {
trace_db_call(context, "query", "get_all_with_date_taken", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
let query = image_exif
.select((rel_path, date_taken))
.filter(date_taken.is_not_null())
.into_boxed();
let query = match lib_id {
Some(filter_id) => query.filter(library_id.eq(filter_id)),
None => query,
};
query
.load::<(String, Option<i64>)>(connection.deref_mut())
.map(|records| {
records
.into_iter()
.filter_map(|(path, dt)| dt.map(|ts| (path, ts)))
.collect()
})
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_exif_batch(
&mut self,
context: &opentelemetry::Context,
library_id_filter: Option<i32>,
file_paths: &[String],
) -> Result<Vec<ImageExif>, DbError> {
trace_db_call(context, "query", "get_exif_batch", |_span| {
use schema::image_exif::dsl::*;
if file_paths.is_empty() {
return Ok(Vec::new());
}
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
let mut query = image_exif.into_boxed();
if let Some(lib_id) = library_id_filter {
query = query.filter(library_id.eq(lib_id));
}
query
.filter(rel_path.eq_any(file_paths))
.load::<ImageExif>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn query_by_exif(
&mut self,
context: &opentelemetry::Context,
library_id_filter: Option<i32>,
camera_make_filter: Option<&str>,
camera_model_filter: Option<&str>,
lens_model_filter: Option<&str>,
gps_bounds: Option<(f64, f64, f64, f64)>,
date_from: Option<i64>,
date_to: Option<i64>,
) -> Result<Vec<ImageExif>, DbError> {
trace_db_call(context, "query", "query_by_exif", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
let mut query = image_exif.into_boxed();
// Library scope (most-selective filter — apply first so the
// `(library_id, ...)` indexes are eligible).
if let Some(lib_id) = library_id_filter {
query = query.filter(library_id.eq(lib_id));
}
// Camera filters (case-insensitive partial match)
if let Some(make) = camera_make_filter {
query = query.filter(camera_make.like(format!("%{}%", make)));
}
if let Some(model) = camera_model_filter {
query = query.filter(camera_model.like(format!("%{}%", model)));
}
if let Some(lens) = lens_model_filter {
query = query.filter(lens_model.like(format!("%{}%", lens)));
}
// GPS bounding box
if let Some((min_lat, max_lat, min_lon, max_lon)) = gps_bounds {
query = query
.filter(gps_latitude.between(min_lat as f32, max_lat as f32))
.filter(gps_longitude.between(min_lon as f32, max_lon as f32))
.filter(gps_latitude.is_not_null())
.filter(gps_longitude.is_not_null());
}
// Date range
if let Some(from) = date_from {
query = query.filter(date_taken.ge(from));
}
if let Some(to) = date_to {
query = query.filter(date_taken.le(to));
}
if date_from.is_some() || date_to.is_some() {
query = query.filter(date_taken.is_not_null());
}
query
.load::<ImageExif>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_camera_makes(
&mut self,
context: &opentelemetry::Context,
) -> Result<Vec<(String, i64)>, DbError> {
trace_db_call(context, "query", "get_camera_makes", |_span| {
use diesel::dsl::count;
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(camera_make.is_not_null())
.group_by(camera_make)
.select((camera_make, count(id)))
.order(count(id).desc())
.load::<(Option<String>, i64)>(connection.deref_mut())
.map(|records| {
records
.into_iter()
.filter_map(|(make, cnt)| make.map(|m| (m, cnt)))
.collect()
})
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn update_file_path(
&mut self,
context: &opentelemetry::Context,
old_path: &str,
new_path: &str,
) -> Result<(), DbError> {
trace_db_call(context, "update", "update_file_path", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
diesel::update(image_exif.filter(rel_path.eq(old_path)))
.set(rel_path.eq(new_path))
.execute(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Update error"))?;
Ok(())
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn get_all_file_paths(
&mut self,
context: &opentelemetry::Context,
) -> Result<Vec<String>, DbError> {
trace_db_call(context, "query", "get_all_file_paths", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.select(rel_path)
.load(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_all_with_gps(
&mut self,
context: &opentelemetry::Context,
base_path: &str,
recursive: bool,
) -> Result<Vec<(String, f64, f64, Option<i64>)>, DbError> {
trace_db_call(context, "query", "get_all_with_gps", |span| {
use opentelemetry::KeyValue;
use opentelemetry::trace::Span;
use schema::image_exif::dsl::*;
span.set_attributes(vec![
KeyValue::new("base_path", base_path.to_string()),
KeyValue::new("recursive", recursive.to_string()),
]);
let connection = &mut *self.connection.lock().unwrap();
// Query all photos with non-null GPS coordinates
let mut query = image_exif
.filter(gps_latitude.is_not_null().and(gps_longitude.is_not_null()))
.into_boxed();
// Apply path filtering
// If base_path is empty or "/", return all GPS photos (no filter)
// Otherwise filter by path prefix
if !base_path.is_empty() && base_path != "/" {
// Match base path as prefix (with wildcard)
query = query.filter(rel_path.like(format!("{}%", base_path)));
span.set_attribute(KeyValue::new("path_filter_applied", true));
} else {
span.set_attribute(KeyValue::new("path_filter_applied", false));
span.set_attribute(KeyValue::new("returning_all_gps_photos", true));
}
// Load full ImageExif records
let results: Vec<ImageExif> = query
.load::<ImageExif>(connection)
.map_err(|e| anyhow::anyhow!("GPS query error: {}", e))?;
// Convert to tuple format (path, lat, lon, date_taken)
// Filter out any rows where GPS is still None (shouldn't happen due to filter)
// Cast f32 GPS values to f64 for API compatibility
let filtered: Vec<(String, f64, f64, Option<i64>)> = results
.into_iter()
.filter_map(|exif| {
if let (Some(lat_val), Some(lon_val)) = (exif.gps_latitude, exif.gps_longitude)
{
Some((
exif.file_path,
lat_val as f64,
lon_val as f64,
exif.date_taken,
))
} else {
None
}
})
.collect();
span.set_attribute(KeyValue::new("result_count", filtered.len() as i64));
Ok(filtered)
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_rows_missing_hash(
&mut self,
context: &opentelemetry::Context,
limit: i64,
) -> Result<Vec<(i32, String)>, DbError> {
trace_db_call(context, "query", "get_rows_missing_hash", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(content_hash.is_null())
.select((library_id, rel_path))
.order(id.asc())
.limit(limit)
.load::<(i32, String)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn backfill_content_hash(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
hash: &str,
size_val: i64,
) -> Result<(), DbError> {
trace_db_call(context, "update", "backfill_content_hash", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
diesel::update(
image_exif
.filter(library_id.eq(library_id_val))
.filter(rel_path.eq(rel_path_val)),
)
.set((content_hash.eq(hash), size_bytes.eq(size_val)))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|_| anyhow::anyhow!("Update error"))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn find_by_content_hash(
&mut self,
context: &opentelemetry::Context,
hash: &str,
) -> Result<Option<ImageExif>, DbError> {
trace_db_call(context, "query", "find_by_content_hash", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(content_hash.eq(hash))
.first::<ImageExif>(connection.deref_mut())
.optional()
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_rel_paths_sharing_content(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
) -> Result<Vec<String>, DbError> {
trace_db_call(context, "query", "get_rel_paths_sharing_content", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// Look up this file's content_hash. Missing row or NULL hash
// means we can't expand the match set; return the given
// rel_path so callers fall through to direct-match behavior.
let hash: Option<String> = image_exif
.filter(library_id.eq(library_id_val))
.filter(rel_path.eq(rel_path_val))
.select(content_hash)
.first::<Option<String>>(connection.deref_mut())
.optional()
.map_err(|_| anyhow::anyhow!("Query error"))?
.flatten();
let paths = match hash {
Some(h) => image_exif
.filter(content_hash.eq(h))
.select(rel_path)
.distinct()
.load::<String>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))?,
None => vec![rel_path_val.to_string()],
};
Ok(paths)
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_rel_paths_for_library(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
) -> Result<Vec<String>, DbError> {
trace_db_call(context, "query", "get_rel_paths_for_library", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(library_id.eq(library_id_val))
.select(rel_path)
.load::<String>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn find_content_hash_anywhere(
&mut self,
context: &opentelemetry::Context,
rel_path_val: &str,
) -> Result<Option<String>, DbError> {
trace_db_call(context, "query", "find_content_hash_anywhere", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(rel_path.eq(rel_path_val))
.filter(content_hash.is_not_null())
.select(content_hash)
.first::<Option<String>>(connection.deref_mut())
.optional()
.map(|opt| opt.flatten())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_rel_paths_by_hash(
&mut self,
context: &opentelemetry::Context,
hash: &str,
) -> Result<Vec<String>, DbError> {
trace_db_call(context, "query", "get_rel_paths_by_hash", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(content_hash.eq(hash))
.select(rel_path)
.distinct()
.load::<String>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_rel_paths_for_hashes(
&mut self,
context: &opentelemetry::Context,
hashes: &[String],
) -> Result<std::collections::HashMap<String, Vec<String>>, DbError> {
use std::collections::HashMap;
let mut out: HashMap<String, Vec<String>> = HashMap::new();
if hashes.is_empty() {
return Ok(out);
}
trace_db_call(context, "query", "get_rel_paths_for_hashes", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// Chunk the IN clause to stay safely under SQLite's
// SQLITE_LIMIT_VARIABLE_NUMBER (32766 modern, 999 legacy).
const CHUNK: usize = 500;
for chunk in hashes.chunks(CHUNK) {
let rows: Vec<(String, String)> = image_exif
.filter(content_hash.eq_any(chunk))
.select((content_hash.assume_not_null(), rel_path))
.distinct()
.load::<(String, String)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))?;
for (hash, path) in rows {
out.entry(hash).or_default().push(path);
}
}
Ok(out)
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn list_rel_paths_for_libraries(
&mut self,
context: &opentelemetry::Context,
library_ids: &[i32],
path_prefix: Option<&str>,
) -> Result<Vec<(i32, String)>, DbError> {
trace_db_call(context, "query", "list_rel_paths_for_libraries", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
let mut query = image_exif.select((library_id, rel_path)).into_boxed();
if !library_ids.is_empty() {
query = query.filter(library_id.eq_any(library_ids.to_vec()));
}
if let Some(prefix) = path_prefix.map(str::trim).filter(|s| !s.is_empty()) {
// Trailing slash normalization so "2024" matches "2024/..."
// without also matching "2024-archive/...".
let prefix = prefix.trim_end_matches('/');
let pattern = format!("{}/%", prefix.replace('%', "\\%").replace('_', "\\_"));
query = query.filter(rel_path.like(pattern).escape('\\'));
}
query
.load::<(i32, String)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn delete_exif_by_library(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
) -> Result<(), DbError> {
trace_db_call(context, "delete", "delete_exif_by_library", |_span| {
use schema::image_exif::dsl::*;
diesel::delete(
image_exif
.filter(library_id.eq(library_id_val))
.filter(rel_path.eq(rel_path_val)),
)
.execute(self.connection.lock().unwrap().deref_mut())
.map(|_| ())
.map_err(|_| anyhow::anyhow!("Delete error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn count_for_library(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
) -> Result<i64, DbError> {
trace_db_call(context, "query", "count_for_library", |_span| {
use schema::image_exif::dsl::*;
image_exif
.filter(library_id.eq(library_id_val))
.count()
.get_result::<i64>(self.connection.lock().unwrap().deref_mut())
.map_err(|_| anyhow::anyhow!("Count error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn list_rel_paths_for_library_page(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
limit: i64,
offset: i64,
) -> Result<Vec<(i32, String)>, DbError> {
trace_db_call(context, "query", "list_rel_paths_for_library_page", |_span| {
use schema::image_exif::dsl::*;
image_exif
.filter(library_id.eq(library_id_val))
.order(id.asc())
.select((id, rel_path))
.limit(limit)
.offset(offset)
.load::<(i32, String)>(self.connection.lock().unwrap().deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
}
#[cfg(test)]
mod exif_dao_tests {
use super::*;
use crate::database::models::InsertLibrary;
use crate::database::test::in_memory_db_connection;
fn ctx() -> opentelemetry::Context {
opentelemetry::Context::new()
}
fn insert_row(dao: &mut SqliteExifDao, lib_id: i32, rel: &str, date: Option<i64>) {
dao.store_exif(
&ctx(),
InsertImageExif {
library_id: lib_id,
file_path: rel.to_string(),
camera_make: None,
camera_model: None,
lens_model: None,
width: None,
height: None,
orientation: None,
gps_latitude: None,
gps_longitude: None,
gps_altitude: None,
focal_length: None,
aperture: None,
shutter_speed: None,
iso: None,
date_taken: date,
created_time: 0,
last_modified: 0,
content_hash: None,
size_bytes: None,
},
)
.expect("insert exif row");
}
fn setup_two_libraries() -> SqliteExifDao {
let mut conn = in_memory_db_connection();
// Migration seeds library id=1 with a placeholder root; add id=2.
diesel::insert_into(schema::libraries::table)
.values(InsertLibrary {
name: "archive",
root_path: "/tmp/archive",
created_at: 0,
})
.execute(&mut conn)
.expect("seed second library");
SqliteExifDao::from_connection(conn)
}
#[test]
fn get_all_with_date_taken_union_returns_all_libraries() {
let mut dao = setup_two_libraries();
insert_row(&mut dao, 1, "main/a.jpg", Some(100));
insert_row(&mut dao, 2, "archive/b.jpg", Some(200));
// Row without a date must be excluded even in union mode.
insert_row(&mut dao, 2, "archive/c.jpg", None);
let mut rows = dao.get_all_with_date_taken(&ctx(), None).unwrap();
rows.sort_by_key(|(_, ts)| *ts);
assert_eq!(
rows,
vec![
("main/a.jpg".to_string(), 100),
("archive/b.jpg".to_string(), 200),
]
);
}
#[test]
fn get_all_with_date_taken_scopes_by_library_id() {
let mut dao = setup_two_libraries();
insert_row(&mut dao, 1, "main/a.jpg", Some(100));
insert_row(&mut dao, 2, "archive/b.jpg", Some(200));
insert_row(&mut dao, 2, "archive/c.jpg", Some(300));
let lib2 = dao.get_all_with_date_taken(&ctx(), Some(2)).unwrap();
let mut paths: Vec<String> = lib2.into_iter().map(|(p, _)| p).collect();
paths.sort();
assert_eq!(paths, vec!["archive/b.jpg", "archive/c.jpg"]);
let lib1 = dao.get_all_with_date_taken(&ctx(), Some(1)).unwrap();
assert_eq!(lib1, vec![("main/a.jpg".to_string(), 100)]);
}
#[test]
fn query_by_exif_scopes_by_library_id() {
let mut dao = setup_two_libraries();
insert_row(&mut dao, 1, "main/a.jpg", Some(100));
insert_row(&mut dao, 2, "archive/a.jpg", Some(200));
// Union: both rows.
let all = dao
.query_by_exif(&ctx(), None, None, None, None, None, None, None)
.unwrap();
assert_eq!(all.len(), 2);
// Scoped to lib 2: only archive row.
let lib2 = dao
.query_by_exif(&ctx(), Some(2), None, None, None, None, None, None)
.unwrap();
assert_eq!(lib2.len(), 1);
assert_eq!(lib2[0].file_path, "archive/a.jpg");
assert_eq!(lib2[0].library_id, 2);
}
#[test]
fn get_exif_batch_scopes_by_library_id() {
let mut dao = setup_two_libraries();
// Same rel_path, different libraries — the cross-library duplicate
// case the audit flagged.
insert_row(&mut dao, 1, "shared/photo.jpg", Some(100));
insert_row(&mut dao, 2, "shared/photo.jpg", Some(200));
// None spans both libraries (legacy union behavior).
let union = dao
.get_exif_batch(&ctx(), None, &["shared/photo.jpg".to_string()])
.unwrap();
assert_eq!(union.len(), 2);
// Some(2) returns only the archive row.
let scoped = dao
.get_exif_batch(&ctx(), Some(2), &["shared/photo.jpg".to_string()])
.unwrap();
assert_eq!(scoped.len(), 1);
assert_eq!(scoped[0].library_id, 2);
assert_eq!(scoped[0].date_taken, Some(200));
}
#[test]
fn count_for_library_returns_per_library_count() {
let mut dao = setup_two_libraries();
insert_row(&mut dao, 1, "main/a.jpg", None);
insert_row(&mut dao, 1, "main/b.jpg", None);
insert_row(&mut dao, 2, "archive/a.jpg", None);
assert_eq!(dao.count_for_library(&ctx(), 1).unwrap(), 2);
assert_eq!(dao.count_for_library(&ctx(), 2).unwrap(), 1);
// Unknown library: zero, no error.
assert_eq!(dao.count_for_library(&ctx(), 999).unwrap(), 0);
}
}