Files
ImageApi/src/database/mod.rs
Cameron Cordes 8503ef7884 chore: cargo fmt + clippy --fix sweep across the crate
Pure mechanical cleanup of accumulated drift in files outside the
HLS-content-hash branch's main change set. No behavior change.

- `cargo fmt` on every previously-misformatted file
  (`ai/insight_generator.rs`, `database/knowledge_dao.rs`,
  `faces.rs`, `knowledge.rs`, `libraries.rs`).
- `cargo clippy --fix`:
  - `needless_borrow`: `&library` → `library` in `handlers/image.rs`
    (two sites in the photo-listing path).
- Manual clippy pass for warnings clippy emits but can't auto-apply:
  - `field_reassign_with_default` in `database/reconcile.rs::run` —
    consolidated into a struct-literal initializer.
  - `needless_range_loop` in `database/knowledge_dao.rs::union_perceptual_tags`
    — inner `for b in (a+1)..indices.len() { let ib = indices[b]; ... }`
    becomes `for &ib in &indices[a + 1..] { ... }`.
  - Doc-list indentation: continuation lines under nested bullets in
    `database/mod.rs::get_memories_in_window` and
    `database/knowledge_dao.rs::build_entity_graph` realigned to the
    list-item content column.

Deliberately not touched (each deserves its own focused commit, with
testing, rather than getting bundled into a sweep):
- 4× `deprecated count_distinct` in `faces.rs` — diesel API migration
  to `AggregateExpressionMethods::aggregate_distinct` may shift result
  types; needs verification against the existing stats queries.
- `await_holding_lock` in `knowledge.rs:807` — `std::sync::Mutex` held
  across `ollama.generate(...).await`. Genuine concurrency bug; fix
  requires understanding the surrounding flow before just dropping
  the guard.
- 2× `type_complexity` in `database/mod.rs` — cosmetic, would need a
  `type` alias and corresponding callers updated.
- Dead `total_deleted` on `library_maintenance::GcStats` and
  `file_scan::enumerate_indexable_files` — both are public surface
  retained for future use; deletion is a separate decision.

All 707 tests still pass. Release build clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 16:25:05 -04:00

2629 lines
98 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use bcrypt::{DEFAULT_COST, hash, verify};
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use crate::database::models::{
Favorite, ImageExif, InsertFavorite, InsertImageExif, InsertUser, User,
};
use crate::otel::trace_db_call;
/// Decoded shape for `get_memories_in_window`'s raw `sql_query`. Diesel's
/// query DSL doesn't expose strftime, so the memories filter is hand-
/// written SQL — but the returned columns are simple enough that a small
/// `QueryableByName` struct suffices, kept private to this module.
#[derive(diesel::QueryableByName)]
#[allow(dead_code)] // fields read via Diesel's QueryableByName derive
struct MemoriesWindowRow {
#[diesel(sql_type = diesel::sql_types::Text)]
rel_path: String,
#[diesel(sql_type = diesel::sql_types::BigInt)]
date_taken: i64,
#[diesel(sql_type = diesel::sql_types::BigInt)]
last_modified: i64,
}
/// Wire shape for a single member of a duplicate group, returned by
/// `list_duplicates_*` and `lookup_duplicate_row`. Carries everything
/// the Apollo modal needs to render a member tile and its meta line —
/// thumbnails are derived from `(library_id, rel_path)` upstream.
#[derive(Debug, Clone, serde::Serialize)]
pub struct DuplicateRow {
pub library_id: i32,
pub rel_path: String,
pub content_hash: String,
pub size_bytes: Option<i64>,
pub date_taken: Option<i64>,
pub width: Option<i32>,
pub height: Option<i32>,
pub phash_64: Option<i64>,
pub dhash_64: Option<i64>,
pub duplicate_of_hash: Option<String>,
pub duplicate_decided_at: Option<i64>,
}
pub mod calendar_dao;
pub mod daily_summary_dao;
pub mod insights_dao;
pub mod knowledge_dao;
pub mod location_dao;
pub mod models;
pub mod persona_dao;
pub mod preview_dao;
pub mod reconcile;
pub mod schema;
pub mod search_dao;
pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao};
pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao};
pub use insights_dao::{InsightDao, SqliteInsightDao};
pub use knowledge_dao::{
ConsolidationGroup, EntityFilter, EntityGraph, EntityPatch, EntitySort, FactFilter, FactPatch,
KnowledgeDao, PersonaFilter, RecentActivity, SqliteKnowledgeDao,
};
pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao};
pub use persona_dao::{ImportPersona, PersonaDao, PersonaPatch, SqlitePersonaDao};
pub use preview_dao::{PreviewDao, SqlitePreviewDao};
pub use search_dao::{SearchHistoryDao, SqliteSearchHistoryDao};
pub trait UserDao {
fn create_user(&mut self, user: &str, password: &str) -> Option<User>;
fn get_user(&mut self, user: &str, password: &str) -> Option<User>;
fn user_exists(&mut self, user: &str) -> bool;
}
pub struct SqliteUserDao {
connection: SqliteConnection,
}
impl Default for SqliteUserDao {
fn default() -> Self {
Self::new()
}
}
impl SqliteUserDao {
pub fn new() -> Self {
Self {
connection: connect(),
}
}
}
#[cfg(test)]
pub mod test {
use diesel::{Connection, SqliteConnection};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
const DB_MIGRATIONS: EmbeddedMigrations = embed_migrations!();
pub fn in_memory_db_connection() -> SqliteConnection {
let mut connection = SqliteConnection::establish(":memory:")
.expect("Unable to create in-memory db connection");
connection
.run_pending_migrations(DB_MIGRATIONS)
.expect("Failure running DB migrations");
connection
}
}
impl UserDao for SqliteUserDao {
// TODO: Should probably use Result here
fn create_user(&mut self, user: &str, pass: &str) -> Option<User> {
use schema::users::dsl::*;
let hashed = hash(pass, DEFAULT_COST);
if let Ok(hash) = hashed {
diesel::insert_into(users)
.values(InsertUser {
username: user,
password: &hash,
})
.execute(&mut self.connection)
.unwrap();
users
.filter(username.eq(username))
.load::<User>(&mut self.connection)
.unwrap()
.first()
.cloned()
} else {
None
}
}
fn get_user(&mut self, user: &str, pass: &str) -> Option<User> {
use schema::users::dsl::*;
match users
.filter(username.eq(user))
.load::<User>(&mut self.connection)
.unwrap_or_default()
.first()
{
Some(u) if verify(pass, &u.password).unwrap_or(false) => Some(u.clone()),
_ => None,
}
}
fn user_exists(&mut self, user: &str) -> bool {
use schema::users::dsl::*;
!users
.filter(username.eq(user))
.load::<User>(&mut self.connection)
.unwrap_or_default()
.is_empty()
}
}
pub fn connect() -> SqliteConnection {
let db_url = dotenv::var("DATABASE_URL").expect("DATABASE_URL must be set");
let mut conn = SqliteConnection::establish(&db_url).expect("Error connecting to DB");
// Each DAO opens its own connection (13+ across the app) and they all
// share one DB file. Without WAL, a writer holds an exclusive lock
// that blocks readers — `load_persons` racing the face-watch write
// storm errors instantly with `database is locked`. WAL lets readers
// and one writer coexist; busy_timeout makes any remaining
// writer-vs-writer contention wait instead of failing fast.
// synchronous=NORMAL is the standard WAL pairing (FULL is for
// rollback-journal durability; we accept the narrow last-fsync
// window for the 210× write throughput).
use diesel::connection::SimpleConnection;
// foreign_keys = ON is per-connection in SQLite (off by default), so
// it has to be set here alongside the other pragmas. Without it
// every `REFERENCES … ON DELETE CASCADE / SET NULL` clause in the
// schema is documentation-only — orphan rows would survive the
// referenced row's deletion. With it, the cascade fires
// automatically and code that previously did manual two-step
// cleanup (delete child rows, then parent) becomes redundant but
// still correct.
conn.batch_execute(
"PRAGMA journal_mode = WAL; \
PRAGMA busy_timeout = 5000; \
PRAGMA synchronous = NORMAL; \
PRAGMA foreign_keys = ON;",
)
.expect("set sqlite pragmas");
conn
}
#[derive(Debug)]
pub struct DbError {
pub kind: DbErrorKind,
}
impl DbError {
fn new(kind: DbErrorKind) -> Self {
DbError { kind }
}
fn exists() -> Self {
DbError::new(DbErrorKind::AlreadyExists)
}
}
#[derive(Debug, PartialEq)]
pub enum DbErrorKind {
AlreadyExists,
InsertError,
QueryError,
UpdateError,
NotFound,
}
pub trait FavoriteDao: Sync + Send {
fn add_favorite(&mut self, user_id: i32, favorite_path: &str) -> Result<usize, DbError>;
fn remove_favorite(&mut self, user_id: i32, favorite_path: String);
fn get_favorites(&mut self, user_id: i32) -> Result<Vec<Favorite>, DbError>;
#[allow(dead_code)]
fn update_path(&mut self, old_path: &str, new_path: &str) -> Result<(), DbError>;
#[allow(dead_code)]
fn get_all_paths(&mut self) -> Result<Vec<String>, DbError>;
}
pub struct SqliteFavoriteDao {
connection: Arc<Mutex<SqliteConnection>>,
}
impl Default for SqliteFavoriteDao {
fn default() -> Self {
Self::new()
}
}
impl SqliteFavoriteDao {
pub fn new() -> Self {
SqliteFavoriteDao {
connection: Arc::new(Mutex::new(connect())),
}
}
}
impl FavoriteDao for SqliteFavoriteDao {
fn add_favorite(&mut self, user_id: i32, favorite_path: &str) -> Result<usize, DbError> {
use schema::favorites::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get FavoriteDao");
if favorites
.filter(userid.eq(user_id).and(rel_path.eq(&favorite_path)))
.first::<Favorite>(connection.deref_mut())
.is_err()
{
diesel::insert_into(favorites)
.values(InsertFavorite {
userid: &user_id,
path: favorite_path,
})
.execute(connection.deref_mut())
.map_err(|_| DbError::new(DbErrorKind::InsertError))
} else {
Err(DbError::exists())
}
}
fn remove_favorite(&mut self, user_id: i32, favorite_path: String) {
use schema::favorites::dsl::*;
diesel::delete(favorites)
.filter(userid.eq(user_id).and(rel_path.eq(favorite_path)))
.execute(self.connection.lock().unwrap().deref_mut())
.unwrap();
}
fn get_favorites(&mut self, user_id: i32) -> Result<Vec<Favorite>, DbError> {
use schema::favorites::dsl::*;
favorites
.filter(userid.eq(user_id))
.load::<Favorite>(self.connection.lock().unwrap().deref_mut())
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn update_path(&mut self, old_path: &str, new_path: &str) -> Result<(), DbError> {
use schema::favorites::dsl::*;
diesel::update(favorites.filter(rel_path.eq(old_path)))
.set(rel_path.eq(new_path))
.execute(self.connection.lock().unwrap().deref_mut())
.map_err(|_| DbError::new(DbErrorKind::UpdateError))?;
Ok(())
}
fn get_all_paths(&mut self) -> Result<Vec<String>, DbError> {
use schema::favorites::dsl::*;
favorites
.select(rel_path)
.distinct()
.load(self.connection.lock().unwrap().deref_mut())
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
}
#[allow(dead_code)]
pub trait ExifDao: Sync + Send {
fn store_exif(
&mut self,
context: &opentelemetry::Context,
exif_data: InsertImageExif,
) -> Result<ImageExif, DbError>;
fn get_exif(
&mut self,
context: &opentelemetry::Context,
file_path: &str,
) -> Result<Option<ImageExif>, DbError>;
fn update_exif(
&mut self,
context: &opentelemetry::Context,
exif_data: InsertImageExif,
) -> Result<ImageExif, DbError>;
fn delete_exif(
&mut self,
context: &opentelemetry::Context,
file_path: &str,
) -> Result<(), DbError>;
fn get_all_with_date_taken(
&mut self,
context: &opentelemetry::Context,
library_id: Option<i32>,
) -> Result<Vec<(String, i64)>, DbError>;
/// Batch load EXIF data for multiple file paths (single query). When
/// `library_id = Some(id)` the lookup is keyed on `(library_id,
/// rel_path)`; cross-library duplicates with the same rel_path are
/// excluded. `None` keeps the legacy rel-path-only behavior — used by
/// the union-mode `/photos` listing, which already disambiguates by
/// `(file_path, library_id)` in the caller.
fn get_exif_batch(
&mut self,
context: &opentelemetry::Context,
library_id: Option<i32>,
file_paths: &[String],
) -> Result<Vec<ImageExif>, DbError>;
/// Query files by EXIF criteria with optional filters. `library_id =
/// Some(id)` restricts to that library; `None` spans every library
/// (used by the unscoped `/photos` form). The composite
/// `(library_id, date_taken)` index added in the multi_library
/// migration depends on `library_id` being part of the WHERE clause —
/// callers that have a library context must pass it.
fn query_by_exif(
&mut self,
context: &opentelemetry::Context,
library_id: Option<i32>,
camera_make: Option<&str>,
camera_model: Option<&str>,
lens_model: Option<&str>,
gps_bounds: Option<(f64, f64, f64, f64)>, // (min_lat, max_lat, min_lon, max_lon)
date_from: Option<i64>,
date_to: Option<i64>,
) -> Result<Vec<ImageExif>, DbError>;
/// Get distinct camera makes with counts
fn get_camera_makes(
&mut self,
context: &opentelemetry::Context,
) -> Result<Vec<(String, i64)>, DbError>;
/// Update file path in EXIF database
fn update_file_path(
&mut self,
context: &opentelemetry::Context,
old_path: &str,
new_path: &str,
) -> Result<(), DbError>;
/// Get all file paths from EXIF database
fn get_all_file_paths(
&mut self,
context: &opentelemetry::Context,
) -> Result<Vec<String>, DbError>;
/// Get all photos with GPS coordinates
/// Returns Vec<(file_path, latitude, longitude, date_taken)>
#[allow(clippy::type_complexity)]
fn get_all_with_gps(
&mut self,
context: &opentelemetry::Context,
base_path: &str,
recursive: bool,
) -> Result<Vec<(String, f64, f64, Option<i64>)>, DbError>;
/// Return rows that still lack a `content_hash`, oldest first. Used by
/// the `backfill_hashes` binary to batch through the historical
/// backlog. Returns `(library_id, rel_path)` tuples so the caller can
/// resolve each file on disk.
fn get_rows_missing_hash(
&mut self,
context: &opentelemetry::Context,
limit: i64,
) -> Result<Vec<(i32, String)>, DbError>;
/// Persist the computed blake3 hash + file size for an existing row.
fn backfill_content_hash(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
hash: &str,
size_bytes: i64,
) -> Result<(), DbError>;
/// Every distinct non-NULL `content_hash` across all libraries. Used
/// by HLS orphan cleanup to identify hash dirs under `$VIDEO_PATH`
/// whose source video no longer exists. Cheap query (single column,
/// indexed) but unbounded in size — the result is a HashSet membership
/// check, so a 100k-photo library produces ~100k strings.
fn list_distinct_content_hashes(
&mut self,
context: &opentelemetry::Context,
) -> Result<Vec<String>, DbError>;
/// Every row in `image_exif` for `library_id`, as
/// `(rel_path, content_hash)`. The hash is Option because rows
/// mid-backfill carry NULL. Used by HLS readiness stats; callers
/// filter by extension client-side because the DB schema doesn't
/// carry media type.
fn list_paths_and_hashes_for_library(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
) -> Result<Vec<(String, Option<String>)>, DbError>;
/// Return image_exif rows that need their `date_taken` resolved by the
/// canonical-date waterfall (see `crate::date_resolver`): `date_taken
/// IS NULL`. Returns `(library_id, rel_path)`. The caller filters to
/// its own library on the way through; rows from other libraries fall
/// to the next library's tick. Backed by the partial index
/// `idx_image_exif_date_backfill`.
///
/// `fs_time`-sourced rows are intentionally excluded even though they
/// represent the weakest resolution: the resolver is deterministic on
/// file bytes + filename + fs metadata, so a row that landed on
/// fs_time once will land there again on every retry. Including them
/// in the drain caused it to spin on the same lowest-id rows forever
/// and starve other SQLite writers (face PATCHes hitting busy_timeout).
/// If a stronger tool comes online (exiftool install, new filename
/// regex), an operator can issue a one-shot re-resolve out-of-band.
fn get_rows_needing_date_backfill(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
limit: i64,
) -> Result<Vec<(i32, String)>, DbError>;
/// Persist a resolver result for an existing row. Touches `date_taken`
/// and `date_taken_source` only — leaves all other columns alone so
/// the drain doesn't accidentally clobber EXIF/hash/perceptual data
/// the watcher / GPS-write path may have already written.
fn backfill_date_taken(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
date_taken: i64,
source: &str,
) -> Result<(), DbError>;
/// Operator-driven date_taken override (POST /image/exif/date). Snapshots
/// the prior `(date_taken, date_taken_source)` into the `original_*`
/// pair on first override, then writes the new value with
/// `date_taken_source = 'manual'`. Subsequent overrides keep the
/// original snapshot intact so a single revert restores the resolver
/// result, not whatever override was set just before. Returns the
/// post-update row.
fn set_manual_date_taken(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
date_taken: i64,
) -> Result<ImageExif, DbError>;
/// Revert a manual override (POST /image/exif/date/clear): restore
/// `date_taken` + `date_taken_source` from the `original_*` snapshot,
/// then null both originals. No-op (returns current row unchanged) when
/// no override is active.
fn clear_manual_date_taken(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
) -> Result<ImageExif, DbError>;
/// Single-query backend for `/memories`. Returns
/// `(rel_path, date_taken, last_modified)` for rows in `library_id`
/// whose `date_taken` falls within `[now - years_back y, now]` and
/// whose calendar position matches the request's span:
/// - `"day"` — same month + day-of-month (any year)
/// - `"week"` — same week-of-year (SQLite `%W`, Monday-anchored —
/// close to but not exactly ISO week 8601; the boundary cases
/// at year-start/end can shift by ±1 vs the prior request-time
/// `iso_week()` filter)
/// - `"month"` — same month (any year)
///
/// `tz_offset_minutes` is applied to both sides of the strftime
/// comparison so the calendar match is in the user's local time.
/// Backed by the `(library_id, date_taken)` index.
///
/// This is the single-SQL replacement for the EXIF-loop +
/// WalkDir-fallback that powered `/memories` previously; it's
/// correct only because the canonical-date waterfall at ingest
/// (`crate::date_resolver`) populates `date_taken` for every row
/// it can resolve.
fn get_memories_in_window(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
span_token: &str,
years_back: i32,
tz_offset_minutes: i32,
) -> Result<Vec<(String, i64, i64)>, DbError>;
/// Return image rows that have a `content_hash` but no `phash_64`,
/// oldest first. Used by the `backfill_perceptual_hash` binary.
/// Filters by image extension at the DB layer to avoid ever asking
/// `image_hasher` to decode a video. Returns `(library_id, rel_path)`.
fn get_rows_missing_perceptual_hash(
&mut self,
context: &opentelemetry::Context,
limit: i64,
) -> Result<Vec<(i32, String)>, DbError>;
/// Persist computed perceptual hashes (pHash + dHash) for an
/// existing image_exif row. Either column may be left NULL by
/// passing `None`, but in practice the binary computes both or
/// neither — `image_hasher` either decodes the image and produces
/// both signals, or fails entirely.
fn backfill_perceptual_hash(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
phash_64: Option<i64>,
dhash_64: Option<i64>,
) -> Result<(), DbError>;
/// Group exact-hash duplicates: rows whose `content_hash` appears
/// more than once across the (optionally library-scoped) corpus.
/// Returns one [`DuplicateRow`] per member; callers group by
/// `content_hash`. When `include_resolved=false`, rows already
/// soft-marked (`duplicate_of_hash IS NOT NULL`) are excluded so
/// the modal doesn't re-surface decisions the user already made.
fn list_duplicates_exact(
&mut self,
context: &opentelemetry::Context,
library_id: Option<i32>,
include_resolved: bool,
) -> Result<Vec<DuplicateRow>, DbError>;
/// Return all rows with a non-null `phash_64` (optionally library-
/// scoped), used by the perceptual-cluster routine in
/// [`crate::main`] to single-link cluster via Hamming distance.
/// Each returned row is a *distinct content_hash* — exact duplicates
/// are collapsed at the DB layer so the in-memory clusterer doesn't
/// rediscover them.
fn list_perceptual_candidates(
&mut self,
context: &opentelemetry::Context,
library_id: Option<i32>,
include_resolved: bool,
) -> Result<Vec<DuplicateRow>, DbError>;
/// Lightweight `(library_id, rel_path)` listing for every hashed
/// image_exif row, used to compute per-folder file totals for the
/// folder-pair duplicate view. Filters mirror `list_duplicates_exact`
/// so the denominator (folder population) and numerator (shared
/// dups between two folders) come from the same row population.
fn list_image_paths(
&mut self,
context: &opentelemetry::Context,
library_id: Option<i32>,
include_resolved: bool,
) -> Result<Vec<(i32, String)>, DbError>;
/// Look up a single row's metadata by `(library_id, rel_path)`. Used
/// by the resolve endpoint to map the request payload to the
/// underlying `content_hash` before writing the soft-mark. Returns
/// `Ok(None)` if the file doesn't exist in `image_exif`.
fn lookup_duplicate_row(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
) -> Result<Option<DuplicateRow>, DbError>;
/// Soft-mark a file as a duplicate of `survivor_hash`. Sets
/// `duplicate_of_hash` and `duplicate_decided_at` on the row(s)
/// matching `(library_id, rel_path)`. The file stays on disk; the
/// default `/photos` listing hides it because of the
/// `duplicate_of_hash IS NULL` filter.
fn set_duplicate_of(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
survivor_hash: &str,
decided_at: i64,
) -> Result<(), DbError>;
/// Reverse a soft-mark: clears `duplicate_of_hash` and
/// `duplicate_decided_at`. Used by the modal's UNRESOLVE chip.
fn clear_duplicate_of(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
) -> Result<(), DbError>;
/// Union the tags from `demoted_hash` onto `survivor_hash`. Used at
/// resolve time for *perceptual* duplicates (different content_hashes,
/// independent tag sets) so the user doesn't lose their tagging work
/// when promoting a survivor. Idempotent: a tag already on the survivor
/// is left alone. Exact duplicates (same content_hash) don't need this
/// because their tag rows are already shared.
fn union_perceptual_tags(
&mut self,
context: &opentelemetry::Context,
survivor_hash: &str,
demoted_hash: &str,
survivor_rel_path: &str,
) -> Result<(), DbError>;
/// Return the first EXIF row with the given content hash (any library).
/// Used by thumbnail/HLS generation to detect pre-existing derivatives
/// from another library before regenerating.
fn find_by_content_hash(
&mut self,
context: &opentelemetry::Context,
hash: &str,
) -> Result<Option<ImageExif>, DbError>;
/// Given a file instance `(library_id, rel_path)`, return every distinct
/// rel_path in `image_exif` whose `content_hash` matches this file's.
/// Used by tag and insight read-paths so annotations follow content
/// rather than path, even when the same file is indexed under
/// different library roots. Falls back to `[rel_path]` when the file
/// hasn't been hashed yet.
fn get_rel_paths_sharing_content(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
) -> Result<Vec<String>, DbError>;
/// All rel_paths known to live in a given library. Used by search to
/// scope tag-based (path-keyed) hits to a single library after joining
/// through the library-agnostic tag tables.
fn get_rel_paths_for_library(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
) -> Result<Vec<String>, DbError>;
/// Look up a content_hash for a rel_path in *any* library. Useful when
/// the caller has a library-agnostic rel_path (e.g. from tagged_photo)
/// and wants to find content-equivalent siblings without knowing the
/// file's original library.
fn find_content_hash_anywhere(
&mut self,
context: &opentelemetry::Context,
rel_path: &str,
) -> Result<Option<String>, DbError>;
/// Given a content_hash, return all rel_paths carrying that hash.
fn get_rel_paths_by_hash(
&mut self,
context: &opentelemetry::Context,
hash: &str,
) -> Result<Vec<String>, DbError>;
/// Batch version of [`get_rel_paths_by_hash`]. Returns a
/// `hash → Vec<rel_path>` map for every hash that has at least one
/// rel_path. Used by the batch tag lookup endpoint to expand
/// content-hash siblings without firing a query per hash.
fn get_rel_paths_for_hashes(
&mut self,
context: &opentelemetry::Context,
hashes: &[String],
) -> Result<std::collections::HashMap<String, Vec<String>>, DbError>;
/// List `(library_id, rel_path)` pairs for the given libraries, optionally
/// restricted to rows whose rel_path starts with `path_prefix`. When
/// `library_ids` is empty, rows from every library are returned. Used by
/// `/photos` recursive listing to skip the filesystem walk — the watcher
/// keeps image_exif in parity with disk via the reconciliation pass.
///
/// `include_duplicates=false` filters out rows soft-marked with
/// `duplicate_of_hash IS NOT NULL` so the default photo listing hides
/// demoted siblings; the Apollo duplicates modal passes `true` to
/// see both survivors and demoted members inside a group.
fn list_rel_paths_for_libraries(
&mut self,
context: &opentelemetry::Context,
library_ids: &[i32],
path_prefix: Option<&str>,
include_duplicates: bool,
) -> Result<Vec<(i32, String)>, DbError>;
/// Delete a single image_exif row scoped to `(library_id, rel_path)`.
/// Distinct from `delete_exif`, which matches on rel_path alone and
/// would clobber same-named files across libraries.
fn delete_exif_by_library(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
) -> Result<(), DbError>;
/// Number of image_exif rows for a library. Used by the availability
/// probe to decide whether an empty mount is "fresh" (zero rows: fine)
/// or "the share went offline" (non-zero rows: stale). Zero on query
/// error so a transient DB hiccup doesn't itself cause a Stale flip.
fn count_for_library(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
) -> Result<i64, DbError>;
/// Paginated rel_path listing for a single library, ordered by id
/// ascending. Used by the missing-file detector to scan a library
/// in capped chunks across consecutive watcher ticks rather than
/// stat()ing every row every minute. Returns `(id, rel_path)`.
fn list_rel_paths_for_library_page(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
limit: i64,
offset: i64,
) -> Result<Vec<(i32, String)>, DbError>;
}
/// Locate an `image_exif` row for a `(library_id, rel_path)` mutation,
/// mirroring `get_exif`'s union semantics so the write side is no
/// stricter than the read side. Tries the requested library first
/// (forward-slash and Windows backslash forms), then falls back to any
/// library with a matching `rel_path`. The latter handles two cases
/// `/image/metadata` already absorbs but the date-override path used to
/// 404 on:
/// 1. The metadata endpoint resolves `library_id` from the filesystem
/// (where the file currently lives), but the only `image_exif` row
/// sits under a different library_id (older mount, multi-library
/// duplicate). The mutation by `(library_id, rel_path)` would miss.
/// 2. A row imported from Windows still has backslash separators.
fn locate_image_exif_row(
connection: &mut SqliteConnection,
library_id_val: i32,
rel_path_val: &str,
) -> anyhow::Result<ImageExif> {
use schema::image_exif::dsl::*;
let normalized = rel_path_val.replace('\\', "/");
let windows_form = rel_path_val.replace('/', "\\");
let scoped = image_exif
.filter(library_id.eq(library_id_val))
.filter(rel_path.eq(&normalized).or(rel_path.eq(&windows_form)))
.first::<ImageExif>(connection);
match scoped {
Ok(row) => Ok(row),
Err(diesel::result::Error::NotFound) => {
// Fall back to any library with this rel_path. `image_exif`
// can carry the same path under multiple library_ids; the
// metadata endpoint already resolves loosely (see `get_exif`),
// and the date-override write must agree.
image_exif
.filter(rel_path.eq(&normalized).or(rel_path.eq(&windows_form)))
.first::<ImageExif>(connection)
.map_err(|e| match e {
diesel::result::Error::NotFound => anyhow::Error::msg("row_not_found"),
other => anyhow::Error::from(other).context("lookup_failed"),
})
}
Err(other) => Err(anyhow::Error::from(other).context("lookup_failed")),
}
}
/// Map a `locate_image_exif_row` / mutation closure error onto the
/// appropriate `DbErrorKind`. The `row_not_found` sentinel from
/// `locate_image_exif_row` becomes `NotFound` so the handler can return
/// 404; everything else is a real DB failure (`UpdateError` → 500).
fn classify_image_exif_error(e: &anyhow::Error) -> DbErrorKind {
if e.chain().any(|c| c.to_string() == "row_not_found") {
DbErrorKind::NotFound
} else {
DbErrorKind::UpdateError
}
}
pub struct SqliteExifDao {
connection: Arc<Mutex<SqliteConnection>>,
}
impl Default for SqliteExifDao {
fn default() -> Self {
Self::new()
}
}
impl SqliteExifDao {
pub fn new() -> Self {
SqliteExifDao {
connection: Arc::new(Mutex::new(connect())),
}
}
#[cfg(test)]
pub fn from_connection(conn: SqliteConnection) -> Self {
SqliteExifDao {
connection: Arc::new(Mutex::new(conn)),
}
}
/// Test-only constructor that shares an already-wrapped connection.
/// Required when another DAO (e.g. `SqliteFaceDao`) needs to read
/// rows this DAO writes, so cross-table joins resolve against the
/// same in-memory SQLite instance.
#[cfg(test)]
pub fn from_shared(connection: Arc<Mutex<SqliteConnection>>) -> Self {
SqliteExifDao { connection }
}
}
impl ExifDao for SqliteExifDao {
fn store_exif(
&mut self,
context: &opentelemetry::Context,
exif_data: InsertImageExif,
) -> Result<ImageExif, DbError> {
trace_db_call(context, "insert", "store_exif", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
diesel::insert_into(image_exif)
.values(&exif_data)
.execute(connection.deref_mut())
.map_err(|e| {
log::warn!(
"image_exif insert failed (lib={}, rel_path={:?}): {}",
exif_data.library_id,
exif_data.file_path,
e
);
anyhow::anyhow!("Insert error: {}", e)
})?;
image_exif
.filter(library_id.eq(exif_data.library_id))
.filter(rel_path.eq(&exif_data.file_path))
.first::<ImageExif>(connection.deref_mut())
.map_err(|e| anyhow::anyhow!("Post-insert lookup failed: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::InsertError))
}
fn get_exif(
&mut self,
context: &opentelemetry::Context,
path: &str,
) -> Result<Option<ImageExif>, DbError> {
trace_db_call(context, "query", "get_exif", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// Try both normalized (forward slash) and Windows (backslash) paths
// since database may contain either format
let normalized = path.replace('\\', "/");
let windows_path = path.replace('/', "\\");
match image_exif
.filter(rel_path.eq(&normalized).or(rel_path.eq(&windows_path)))
.first::<ImageExif>(connection.deref_mut())
{
Ok(exif) => Ok(Some(exif)),
Err(diesel::result::Error::NotFound) => Ok(None),
Err(_) => Err(anyhow::anyhow!("Query error")),
}
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn update_exif(
&mut self,
context: &opentelemetry::Context,
exif_data: InsertImageExif,
) -> Result<ImageExif, DbError> {
trace_db_call(context, "update", "update_exif", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
diesel::update(
image_exif
.filter(library_id.eq(exif_data.library_id))
.filter(rel_path.eq(&exif_data.file_path)),
)
.set((
camera_make.eq(&exif_data.camera_make),
camera_model.eq(&exif_data.camera_model),
lens_model.eq(&exif_data.lens_model),
width.eq(&exif_data.width),
height.eq(&exif_data.height),
orientation.eq(&exif_data.orientation),
gps_latitude.eq(&exif_data.gps_latitude),
gps_longitude.eq(&exif_data.gps_longitude),
gps_altitude.eq(&exif_data.gps_altitude),
focal_length.eq(&exif_data.focal_length),
aperture.eq(&exif_data.aperture),
shutter_speed.eq(&exif_data.shutter_speed),
iso.eq(&exif_data.iso),
date_taken.eq(&exif_data.date_taken),
date_taken_source.eq(&exif_data.date_taken_source),
last_modified.eq(&exif_data.last_modified),
))
.execute(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Update error"))?;
image_exif
.filter(library_id.eq(exif_data.library_id))
.filter(rel_path.eq(&exif_data.file_path))
.first::<ImageExif>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn delete_exif(&mut self, context: &opentelemetry::Context, path: &str) -> Result<(), DbError> {
trace_db_call(context, "delete", "delete_exif", |_span| {
use schema::image_exif::dsl::*;
diesel::delete(image_exif.filter(rel_path.eq(path)))
.execute(self.connection.lock().unwrap().deref_mut())
.map(|_| ())
.map_err(|_| anyhow::anyhow!("Delete error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_all_with_date_taken(
&mut self,
context: &opentelemetry::Context,
lib_id: Option<i32>,
) -> Result<Vec<(String, i64)>, DbError> {
trace_db_call(context, "query", "get_all_with_date_taken", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
let query = image_exif
.select((rel_path, date_taken))
.filter(date_taken.is_not_null())
.into_boxed();
let query = match lib_id {
Some(filter_id) => query.filter(library_id.eq(filter_id)),
None => query,
};
query
.load::<(String, Option<i64>)>(connection.deref_mut())
.map(|records| {
records
.into_iter()
.filter_map(|(path, dt)| dt.map(|ts| (path, ts)))
.collect()
})
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_exif_batch(
&mut self,
context: &opentelemetry::Context,
library_id_filter: Option<i32>,
file_paths: &[String],
) -> Result<Vec<ImageExif>, DbError> {
trace_db_call(context, "query", "get_exif_batch", |_span| {
use schema::image_exif::dsl::*;
if file_paths.is_empty() {
return Ok(Vec::new());
}
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
let mut query = image_exif.into_boxed();
if let Some(lib_id) = library_id_filter {
query = query.filter(library_id.eq(lib_id));
}
query
.filter(rel_path.eq_any(file_paths))
.load::<ImageExif>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn query_by_exif(
&mut self,
context: &opentelemetry::Context,
library_id_filter: Option<i32>,
camera_make_filter: Option<&str>,
camera_model_filter: Option<&str>,
lens_model_filter: Option<&str>,
gps_bounds: Option<(f64, f64, f64, f64)>,
date_from: Option<i64>,
date_to: Option<i64>,
) -> Result<Vec<ImageExif>, DbError> {
trace_db_call(context, "query", "query_by_exif", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
let mut query = image_exif.into_boxed();
// Library scope (most-selective filter — apply first so the
// `(library_id, ...)` indexes are eligible).
if let Some(lib_id) = library_id_filter {
query = query.filter(library_id.eq(lib_id));
}
// Camera filters (case-insensitive partial match)
if let Some(make) = camera_make_filter {
query = query.filter(camera_make.like(format!("%{}%", make)));
}
if let Some(model) = camera_model_filter {
query = query.filter(camera_model.like(format!("%{}%", model)));
}
if let Some(lens) = lens_model_filter {
query = query.filter(lens_model.like(format!("%{}%", lens)));
}
// GPS bounding box
if let Some((min_lat, max_lat, min_lon, max_lon)) = gps_bounds {
query = query
.filter(gps_latitude.between(min_lat as f32, max_lat as f32))
.filter(gps_longitude.between(min_lon as f32, max_lon as f32))
.filter(gps_latitude.is_not_null())
.filter(gps_longitude.is_not_null());
}
// Date range
if let Some(from) = date_from {
query = query.filter(date_taken.ge(from));
}
if let Some(to) = date_to {
query = query.filter(date_taken.le(to));
}
if date_from.is_some() || date_to.is_some() {
query = query.filter(date_taken.is_not_null());
}
query
.load::<ImageExif>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_camera_makes(
&mut self,
context: &opentelemetry::Context,
) -> Result<Vec<(String, i64)>, DbError> {
trace_db_call(context, "query", "get_camera_makes", |_span| {
use diesel::dsl::count;
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(camera_make.is_not_null())
.group_by(camera_make)
.select((camera_make, count(id)))
.order(count(id).desc())
.load::<(Option<String>, i64)>(connection.deref_mut())
.map(|records| {
records
.into_iter()
.filter_map(|(make, cnt)| make.map(|m| (m, cnt)))
.collect()
})
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn update_file_path(
&mut self,
context: &opentelemetry::Context,
old_path: &str,
new_path: &str,
) -> Result<(), DbError> {
trace_db_call(context, "update", "update_file_path", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
diesel::update(image_exif.filter(rel_path.eq(old_path)))
.set(rel_path.eq(new_path))
.execute(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Update error"))?;
Ok(())
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn get_all_file_paths(
&mut self,
context: &opentelemetry::Context,
) -> Result<Vec<String>, DbError> {
trace_db_call(context, "query", "get_all_file_paths", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.select(rel_path)
.load(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_all_with_gps(
&mut self,
context: &opentelemetry::Context,
base_path: &str,
recursive: bool,
) -> Result<Vec<(String, f64, f64, Option<i64>)>, DbError> {
trace_db_call(context, "query", "get_all_with_gps", |span| {
use opentelemetry::KeyValue;
use opentelemetry::trace::Span;
use schema::image_exif::dsl::*;
span.set_attributes(vec![
KeyValue::new("base_path", base_path.to_string()),
KeyValue::new("recursive", recursive.to_string()),
]);
let connection = &mut *self.connection.lock().unwrap();
// Query all photos with non-null GPS coordinates
let mut query = image_exif
.filter(gps_latitude.is_not_null().and(gps_longitude.is_not_null()))
.into_boxed();
// Apply path filtering
// If base_path is empty or "/", return all GPS photos (no filter)
// Otherwise filter by path prefix
if !base_path.is_empty() && base_path != "/" {
// Match base path as prefix (with wildcard)
query = query.filter(rel_path.like(format!("{}%", base_path)));
span.set_attribute(KeyValue::new("path_filter_applied", true));
} else {
span.set_attribute(KeyValue::new("path_filter_applied", false));
span.set_attribute(KeyValue::new("returning_all_gps_photos", true));
}
// Load full ImageExif records
let results: Vec<ImageExif> = query
.load::<ImageExif>(connection)
.map_err(|e| anyhow::anyhow!("GPS query error: {}", e))?;
// Convert to tuple format (path, lat, lon, date_taken)
// Filter out any rows where GPS is still None (shouldn't happen due to filter)
// Cast f32 GPS values to f64 for API compatibility
let filtered: Vec<(String, f64, f64, Option<i64>)> = results
.into_iter()
.filter_map(|exif| {
if let (Some(lat_val), Some(lon_val)) = (exif.gps_latitude, exif.gps_longitude)
{
Some((
exif.file_path,
lat_val as f64,
lon_val as f64,
exif.date_taken,
))
} else {
None
}
})
.collect();
span.set_attribute(KeyValue::new("result_count", filtered.len() as i64));
Ok(filtered)
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_rows_missing_hash(
&mut self,
context: &opentelemetry::Context,
limit: i64,
) -> Result<Vec<(i32, String)>, DbError> {
trace_db_call(context, "query", "get_rows_missing_hash", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(content_hash.is_null())
.select((library_id, rel_path))
.order(id.asc())
.limit(limit)
.load::<(i32, String)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn backfill_content_hash(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
hash: &str,
size_val: i64,
) -> Result<(), DbError> {
trace_db_call(context, "update", "backfill_content_hash", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
diesel::update(
image_exif
.filter(library_id.eq(library_id_val))
.filter(rel_path.eq(rel_path_val)),
)
.set((content_hash.eq(hash), size_bytes.eq(size_val)))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|_| anyhow::anyhow!("Update error"))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn list_distinct_content_hashes(
&mut self,
context: &opentelemetry::Context,
) -> Result<Vec<String>, DbError> {
trace_db_call(context, "query", "list_distinct_content_hashes", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(content_hash.is_not_null())
.select(content_hash)
.distinct()
.load::<Option<String>>(connection.deref_mut())
.map(|rows| rows.into_iter().flatten().collect())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn list_paths_and_hashes_for_library(
&mut self,
context: &opentelemetry::Context,
lib_id: i32,
) -> Result<Vec<(String, Option<String>)>, DbError> {
trace_db_call(
context,
"query",
"list_paths_and_hashes_for_library",
|_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(library_id.eq(lib_id))
.select((rel_path, content_hash))
.load::<(String, Option<String>)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
},
)
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_rows_needing_date_backfill(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
limit: i64,
) -> Result<Vec<(i32, String)>, DbError> {
trace_db_call(
context,
"query",
"get_rows_needing_date_backfill",
|_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// The partial index is on `(library_id, id) WHERE date_taken
// IS NULL`, so the planner hits it directly.
image_exif
.filter(library_id.eq(library_id_val))
.filter(date_taken.is_null())
.select((library_id, rel_path))
.order(id.asc())
.limit(limit)
.load::<(i32, String)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
},
)
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn backfill_date_taken(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
date_taken_val: i64,
source: &str,
) -> Result<(), DbError> {
trace_db_call(context, "update", "backfill_date_taken", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
let result = diesel::update(
image_exif
.filter(library_id.eq(library_id_val))
.filter(rel_path.eq(rel_path_val)),
)
.set((date_taken.eq(date_taken_val), date_taken_source.eq(source)))
.execute(connection.deref_mut());
match result {
Ok(rows) => {
// Surface zero-row updates as a warning rather than a
// silent success. They mean the (library_id, rel_path)
// row was deleted between the `get_rows_needing_date_
// backfill` query and this update — rare but possible
// when the file watcher is racing the drain. The drain
// shouldn't treat that as a hard error, so still
// return Ok(()).
if rows == 0 {
log::debug!(
"backfill_date_taken: 0 rows matched lib={} {} \
(row likely retired by missing-file scan)",
library_id_val,
rel_path_val
);
}
Ok(())
}
// Preserve the diesel error in the chain so warnings at
// the call site can articulate the cause (a flat "Update
// error" was useless for triage).
Err(e) => Err(anyhow::anyhow!(
"diesel update failed (lib={}, rel_path={}, date_taken={}, source={}): {}",
library_id_val,
rel_path_val,
date_taken_val,
source,
e
)),
}
})
.map_err(|e| {
// Log before the anyhow message gets stripped by the
// DbError-only return type.
log::warn!("backfill_date_taken: {}", e);
DbError::new(DbErrorKind::UpdateError)
})
}
fn set_manual_date_taken(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
date_taken_val: i64,
) -> Result<ImageExif, DbError> {
trace_db_call(context, "update", "set_manual_date_taken", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// Read-modify-write under the dao mutex so the snapshot is
// consistent with the value being overwritten. The mutex holds
// for the duration of this closure — no other writer can race.
let current =
locate_image_exif_row(connection.deref_mut(), library_id_val, rel_path_val)?;
// Snapshot only on first override. Subsequent overrides keep
// the original snapshot intact so a single revert restores
// the resolver-derived value, not the prior override.
let (orig_dt, orig_src) = if current.original_date_taken.is_none() {
(current.date_taken, current.date_taken_source.clone())
} else {
(
current.original_date_taken,
current.original_date_taken_source.clone(),
)
};
// Update by primary key so we hit the exact row we read,
// even when the lookup fell back across libraries / slash forms.
let row_id = current.id;
diesel::update(image_exif.find(row_id))
.set((
date_taken.eq(Some(date_taken_val)),
date_taken_source.eq(Some("manual".to_string())),
original_date_taken.eq(orig_dt),
original_date_taken_source.eq(orig_src),
))
.execute(connection.deref_mut())
.map_err(|e| anyhow::Error::from(e).context("update_failed"))?;
image_exif
.find(row_id)
.first::<ImageExif>(connection.deref_mut())
.map_err(|e| anyhow::Error::from(e).context("reread_failed"))
})
.map_err(|e| {
log::warn!(
"set_manual_date_taken(library_id={}, rel_path={:?}): {:#}",
library_id_val,
rel_path_val,
e,
);
DbError::new(classify_image_exif_error(&e))
})
}
fn clear_manual_date_taken(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
) -> Result<ImageExif, DbError> {
trace_db_call(context, "update", "clear_manual_date_taken", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
let current =
locate_image_exif_row(connection.deref_mut(), library_id_val, rel_path_val)?;
// No override active — nothing to revert. Return the current
// row unchanged so the endpoint is idempotent.
if current.original_date_taken.is_none() {
return Ok(current);
}
let row_id = current.id;
diesel::update(image_exif.find(row_id))
.set((
date_taken.eq(current.original_date_taken),
date_taken_source.eq(current.original_date_taken_source.clone()),
original_date_taken.eq::<Option<i64>>(None),
original_date_taken_source.eq::<Option<String>>(None),
))
.execute(connection.deref_mut())
.map_err(|e| anyhow::Error::from(e).context("update_failed"))?;
image_exif
.find(row_id)
.first::<ImageExif>(connection.deref_mut())
.map_err(|e| anyhow::Error::from(e).context("reread_failed"))
})
.map_err(|e| {
log::warn!(
"clear_manual_date_taken(library_id={}, rel_path={:?}): {:#}",
library_id_val,
rel_path_val,
e,
);
DbError::new(classify_image_exif_error(&e))
})
}
fn get_memories_in_window(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
span_token: &str,
years_back: i32,
tz_offset_minutes: i32,
) -> Result<Vec<(String, i64, i64)>, DbError> {
trace_db_call(context, "query", "get_memories_in_window", |_span| {
// strftime pattern is span-dependent; the rest of the WHERE
// clause is shared. Only `%m-%d`, `%W`, `%m` are accepted —
// anything else is a programmer error.
let pattern = match span_token {
"day" => "%m-%d",
"week" => "%W",
"month" => "%m",
_ => return Err(anyhow::anyhow!("invalid span token: {}", span_token)),
};
// SQLite's date modifiers want a string like `'-480 minutes'`
// (signed) or `'-15 years'`. Use the `+` flag so positive
// offsets render as `+480 minutes`.
let tz_modifier = format!("{:+} minutes", tz_offset_minutes);
let years_modifier = format!("-{} years", years_back);
let sql = format!(
"SELECT rel_path, date_taken, last_modified \
FROM image_exif \
WHERE library_id = ?1 \
AND date_taken IS NOT NULL \
AND date_taken <= unixepoch('now') \
AND date_taken >= unixepoch('now', ?2) \
AND strftime('{p}', date_taken, 'unixepoch', ?3) \
= strftime('{p}', 'now', ?3)",
p = pattern,
);
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
diesel::sql_query(sql)
.bind::<diesel::sql_types::Integer, _>(library_id)
.bind::<diesel::sql_types::Text, _>(years_modifier)
.bind::<diesel::sql_types::Text, _>(tz_modifier)
.load::<MemoriesWindowRow>(connection.deref_mut())
.map(|rows| {
rows.into_iter()
.map(|r| (r.rel_path, r.date_taken, r.last_modified))
.collect()
})
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn find_by_content_hash(
&mut self,
context: &opentelemetry::Context,
hash: &str,
) -> Result<Option<ImageExif>, DbError> {
trace_db_call(context, "query", "find_by_content_hash", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(content_hash.eq(hash))
.first::<ImageExif>(connection.deref_mut())
.optional()
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_rel_paths_sharing_content(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
) -> Result<Vec<String>, DbError> {
trace_db_call(context, "query", "get_rel_paths_sharing_content", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// Look up this file's content_hash. Missing row or NULL hash
// means we can't expand the match set; return the given
// rel_path so callers fall through to direct-match behavior.
let hash: Option<String> = image_exif
.filter(library_id.eq(library_id_val))
.filter(rel_path.eq(rel_path_val))
.select(content_hash)
.first::<Option<String>>(connection.deref_mut())
.optional()
.map_err(|_| anyhow::anyhow!("Query error"))?
.flatten();
let paths = match hash {
Some(h) => image_exif
.filter(content_hash.eq(h))
.select(rel_path)
.distinct()
.load::<String>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))?,
None => vec![rel_path_val.to_string()],
};
Ok(paths)
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_rel_paths_for_library(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
) -> Result<Vec<String>, DbError> {
trace_db_call(context, "query", "get_rel_paths_for_library", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(library_id.eq(library_id_val))
.select(rel_path)
.load::<String>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn find_content_hash_anywhere(
&mut self,
context: &opentelemetry::Context,
rel_path_val: &str,
) -> Result<Option<String>, DbError> {
trace_db_call(context, "query", "find_content_hash_anywhere", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(rel_path.eq(rel_path_val))
.filter(content_hash.is_not_null())
.select(content_hash)
.first::<Option<String>>(connection.deref_mut())
.optional()
.map(|opt| opt.flatten())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_rel_paths_by_hash(
&mut self,
context: &opentelemetry::Context,
hash: &str,
) -> Result<Vec<String>, DbError> {
trace_db_call(context, "query", "get_rel_paths_by_hash", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(content_hash.eq(hash))
.select(rel_path)
.distinct()
.load::<String>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_rel_paths_for_hashes(
&mut self,
context: &opentelemetry::Context,
hashes: &[String],
) -> Result<std::collections::HashMap<String, Vec<String>>, DbError> {
use std::collections::HashMap;
let mut out: HashMap<String, Vec<String>> = HashMap::new();
if hashes.is_empty() {
return Ok(out);
}
trace_db_call(context, "query", "get_rel_paths_for_hashes", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// Chunk the IN clause to stay safely under SQLite's
// SQLITE_LIMIT_VARIABLE_NUMBER (32766 modern, 999 legacy).
const CHUNK: usize = 500;
for chunk in hashes.chunks(CHUNK) {
let rows: Vec<(String, String)> = image_exif
.filter(content_hash.eq_any(chunk))
.select((content_hash.assume_not_null(), rel_path))
.distinct()
.load::<(String, String)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))?;
for (hash, path) in rows {
out.entry(hash).or_default().push(path);
}
}
Ok(out)
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn list_rel_paths_for_libraries(
&mut self,
context: &opentelemetry::Context,
library_ids: &[i32],
path_prefix: Option<&str>,
include_duplicates: bool,
) -> Result<Vec<(i32, String)>, DbError> {
trace_db_call(context, "query", "list_rel_paths_for_libraries", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
let mut query = image_exif.select((library_id, rel_path)).into_boxed();
if !library_ids.is_empty() {
query = query.filter(library_id.eq_any(library_ids.to_vec()));
}
if let Some(prefix) = path_prefix.map(str::trim).filter(|s| !s.is_empty()) {
// Trailing slash normalization so "2024" matches "2024/..."
// without also matching "2024-archive/...".
let prefix = prefix.trim_end_matches('/');
let pattern = format!("{}/%", prefix.replace('%', "\\%").replace('_', "\\_"));
query = query.filter(rel_path.like(pattern).escape('\\'));
}
if !include_duplicates {
if library_ids.is_empty() {
// Unscoped (all-libraries) view — every survivor is
// reachable somewhere, so a soft-marked row is
// genuinely a duplicate from the user's perspective.
// Hide it.
query = query.filter(duplicate_of_hash.is_null());
} else {
// Scoped to specific libraries: only hide a
// soft-marked row when the survivor is reachable
// *in this view*. If the survivor lives in a
// library the user can't see right now, the
// demoted file is the only copy of those bytes
// they have access to — keep it visible.
//
// Implemented as a correlated NOT EXISTS subquery
// over an aliased image_exif. Library ids are i32
// so format!-inlining the integer list is safe.
use diesel::sql_types::Bool;
let lib_list = library_ids
.iter()
.map(i32::to_string)
.collect::<Vec<_>>()
.join(",");
let raw = format!(
"(image_exif.duplicate_of_hash IS NULL OR NOT EXISTS \
(SELECT 1 FROM image_exif AS survivor \
WHERE survivor.content_hash = image_exif.duplicate_of_hash \
AND survivor.library_id IN ({})))",
lib_list
);
query = query.filter(diesel::dsl::sql::<Bool>(&raw));
}
}
query
.load::<(i32, String)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn delete_exif_by_library(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
) -> Result<(), DbError> {
trace_db_call(context, "delete", "delete_exif_by_library", |_span| {
use schema::image_exif::dsl::*;
diesel::delete(
image_exif
.filter(library_id.eq(library_id_val))
.filter(rel_path.eq(rel_path_val)),
)
.execute(self.connection.lock().unwrap().deref_mut())
.map(|_| ())
.map_err(|_| anyhow::anyhow!("Delete error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn count_for_library(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
) -> Result<i64, DbError> {
trace_db_call(context, "query", "count_for_library", |_span| {
use schema::image_exif::dsl::*;
image_exif
.filter(library_id.eq(library_id_val))
.count()
.get_result::<i64>(self.connection.lock().unwrap().deref_mut())
.map_err(|_| anyhow::anyhow!("Count error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn list_rel_paths_for_library_page(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
limit: i64,
offset: i64,
) -> Result<Vec<(i32, String)>, DbError> {
trace_db_call(
context,
"query",
"list_rel_paths_for_library_page",
|_span| {
use schema::image_exif::dsl::*;
image_exif
.filter(library_id.eq(library_id_val))
.order(id.asc())
.select((id, rel_path))
.limit(limit)
.offset(offset)
.load::<(i32, String)>(self.connection.lock().unwrap().deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
},
)
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_rows_missing_perceptual_hash(
&mut self,
context: &opentelemetry::Context,
limit: i64,
) -> Result<Vec<(i32, String)>, DbError> {
trace_db_call(
context,
"query",
"get_rows_missing_perceptual_hash",
|_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// Image-only filter via extension. Videos and decode-failures
// would always come back NULL otherwise and the binary would
// grind through them on every run. The list mirrors the file
// formats `image` 0.25 / `image_hasher` 3.x can decode.
image_exif
.filter(content_hash.is_not_null())
.filter(phash_64.is_null())
.filter(
rel_path
.like("%.jpg")
.or(rel_path.like("%.jpeg"))
.or(rel_path.like("%.JPG"))
.or(rel_path.like("%.JPEG"))
.or(rel_path.like("%.png"))
.or(rel_path.like("%.PNG"))
.or(rel_path.like("%.webp"))
.or(rel_path.like("%.WEBP"))
.or(rel_path.like("%.tif"))
.or(rel_path.like("%.tiff"))
.or(rel_path.like("%.TIF"))
.or(rel_path.like("%.TIFF"))
.or(rel_path.like("%.avif"))
.or(rel_path.like("%.AVIF")),
)
.select((library_id, rel_path))
.order(id.asc())
.limit(limit)
.load::<(i32, String)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
},
)
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn backfill_perceptual_hash(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
phash_val: Option<i64>,
dhash_val: Option<i64>,
) -> Result<(), DbError> {
trace_db_call(context, "update", "backfill_perceptual_hash", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
diesel::update(
image_exif
.filter(library_id.eq(library_id_val))
.filter(rel_path.eq(rel_path_val)),
)
.set((phash_64.eq(phash_val), dhash_64.eq(dhash_val)))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|_| anyhow::anyhow!("Update error"))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn list_duplicates_exact(
&mut self,
context: &opentelemetry::Context,
library_id_filter: Option<i32>,
include_resolved: bool,
) -> Result<Vec<DuplicateRow>, DbError> {
trace_db_call(context, "query", "list_duplicates_exact", |_span| {
// Sub-select the content_hashes that appear more than once
// (optionally library-scoped), then load the full member rows
// for those hashes ordered by hash + library + path so the
// caller can stream-group without buffering the full dataset.
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// Step 1: hashes with count > 1.
let dup_hashes: Vec<String> = {
use schema::image_exif::dsl::*;
let mut q = image_exif
.filter(content_hash.is_not_null())
.group_by(content_hash)
.select(content_hash.assume_not_null())
.having(diesel::dsl::count_star().gt(1))
.into_boxed();
if let Some(lib) = library_id_filter {
q = q.filter(library_id.eq(lib));
}
q.load::<String>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))?
};
if dup_hashes.is_empty() {
return Ok(Vec::new());
}
// Step 2: every member row for those hashes.
use schema::image_exif::dsl::*;
let mut q = image_exif
.filter(content_hash.eq_any(&dup_hashes))
.select((
library_id,
rel_path,
content_hash.assume_not_null(),
size_bytes,
date_taken,
width,
height,
phash_64,
dhash_64,
duplicate_of_hash,
duplicate_decided_at,
))
.order((content_hash.asc(), library_id.asc(), rel_path.asc()))
.into_boxed();
if let Some(lib) = library_id_filter {
q = q.filter(library_id.eq(lib));
}
if !include_resolved {
q = q.filter(duplicate_of_hash.is_null());
}
let rows: Vec<(
i32,
String,
String,
Option<i64>,
Option<i64>,
Option<i32>,
Option<i32>,
Option<i64>,
Option<i64>,
Option<String>,
Option<i64>,
)> = q
.load(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))?;
Ok(rows
.into_iter()
.map(|r| DuplicateRow {
library_id: r.0,
rel_path: r.1,
content_hash: r.2,
size_bytes: r.3,
date_taken: r.4,
width: r.5,
height: r.6,
phash_64: r.7,
dhash_64: r.8,
duplicate_of_hash: r.9,
duplicate_decided_at: r.10,
})
.collect())
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn list_perceptual_candidates(
&mut self,
context: &opentelemetry::Context,
library_id_filter: Option<i32>,
include_resolved: bool,
) -> Result<Vec<DuplicateRow>, DbError> {
trace_db_call(context, "query", "list_perceptual_candidates", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// For perceptual candidates we want one canonical row per
// distinct content_hash — exact dups are clustered by the
// exact-dup query and would only pollute the perceptual
// graph with zero-distance edges. Diesel doesn't have a
// clean `DISTINCT ON`, so we load every row and dedup
// client-side keyed on content_hash. The result set is small
// (only rows with a phash) and the cost is negligible vs
// the BK-tree clustering that follows.
let mut q = image_exif
.filter(content_hash.is_not_null())
.filter(phash_64.is_not_null())
.select((
library_id,
rel_path,
content_hash.assume_not_null(),
size_bytes,
date_taken,
width,
height,
phash_64,
dhash_64,
duplicate_of_hash,
duplicate_decided_at,
))
.order((content_hash.asc(), library_id.asc(), rel_path.asc()))
.into_boxed();
if let Some(lib) = library_id_filter {
q = q.filter(library_id.eq(lib));
}
if !include_resolved {
q = q.filter(duplicate_of_hash.is_null());
}
let rows: Vec<(
i32,
String,
String,
Option<i64>,
Option<i64>,
Option<i32>,
Option<i32>,
Option<i64>,
Option<i64>,
Option<String>,
Option<i64>,
)> = q
.load(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))?;
// Dedup keyed on content_hash, keeping the first occurrence
// (deterministic by the SQL ORDER BY: lowest library_id,
// then lexicographically smallest rel_path).
let mut seen = std::collections::HashSet::new();
let mut out = Vec::with_capacity(rows.len());
for r in rows {
if seen.insert(r.2.clone()) {
out.push(DuplicateRow {
library_id: r.0,
rel_path: r.1,
content_hash: r.2,
size_bytes: r.3,
date_taken: r.4,
width: r.5,
height: r.6,
phash_64: r.7,
dhash_64: r.8,
duplicate_of_hash: r.9,
duplicate_decided_at: r.10,
});
}
}
Ok(out)
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn list_image_paths(
&mut self,
context: &opentelemetry::Context,
library_id_filter: Option<i32>,
include_resolved: bool,
) -> Result<Vec<(i32, String)>, DbError> {
trace_db_call(context, "query", "list_image_paths", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
let mut q = image_exif
.filter(content_hash.is_not_null())
.select((library_id, rel_path))
.into_boxed();
if let Some(lib) = library_id_filter {
q = q.filter(library_id.eq(lib));
}
if !include_resolved {
q = q.filter(duplicate_of_hash.is_null());
}
q.load::<(i32, String)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn lookup_duplicate_row(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
) -> Result<Option<DuplicateRow>, DbError> {
trace_db_call(context, "query", "lookup_duplicate_row", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(library_id.eq(library_id_val))
.filter(rel_path.eq(rel_path_val))
.filter(content_hash.is_not_null())
.select((
library_id,
rel_path,
content_hash.assume_not_null(),
size_bytes,
date_taken,
width,
height,
phash_64,
dhash_64,
duplicate_of_hash,
duplicate_decided_at,
))
.first::<(
i32,
String,
String,
Option<i64>,
Option<i64>,
Option<i32>,
Option<i32>,
Option<i64>,
Option<i64>,
Option<String>,
Option<i64>,
)>(connection.deref_mut())
.optional()
.map(|opt| {
opt.map(|r| DuplicateRow {
library_id: r.0,
rel_path: r.1,
content_hash: r.2,
size_bytes: r.3,
date_taken: r.4,
width: r.5,
height: r.6,
phash_64: r.7,
dhash_64: r.8,
duplicate_of_hash: r.9,
duplicate_decided_at: r.10,
})
})
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn set_duplicate_of(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
survivor_hash: &str,
decided_at: i64,
) -> Result<(), DbError> {
trace_db_call(context, "update", "set_duplicate_of", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
diesel::update(
image_exif
.filter(library_id.eq(library_id_val))
.filter(rel_path.eq(rel_path_val)),
)
.set((
duplicate_of_hash.eq(survivor_hash),
duplicate_decided_at.eq(decided_at),
))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|_| anyhow::anyhow!("Update error"))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn clear_duplicate_of(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
) -> Result<(), DbError> {
trace_db_call(context, "update", "clear_duplicate_of", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
diesel::update(
image_exif
.filter(library_id.eq(library_id_val))
.filter(rel_path.eq(rel_path_val)),
)
.set((
duplicate_of_hash.eq::<Option<String>>(None),
duplicate_decided_at.eq::<Option<i64>>(None),
))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|_| anyhow::anyhow!("Update error"))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn union_perceptual_tags(
&mut self,
context: &opentelemetry::Context,
survivor_hash: &str,
demoted_hash: &str,
survivor_rel_path: &str,
) -> Result<(), DbError> {
trace_db_call(context, "update", "union_perceptual_tags", |_span| {
// INSERT OR IGNORE handles two relevant uniqueness paths:
// - tagged_photo (rel_path, tag_id) is the historical key,
// so existing tag rows under the survivor's path collide
// and stay put.
// - The (rel_path, tag_id) collision is the one that
// matters for idempotence; (content_hash, tag_id) at the
// bytes level isn't enforced by SQLite but the read path
// dedups on it, so an extra row would be cosmetic.
// Tags whose rel_path differs are inserted, picking up the
// survivor's content_hash so they live under the right bytes.
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
diesel::sql_query(
"INSERT OR IGNORE INTO tagged_photo (rel_path, tag_id, created_time, content_hash) \
SELECT ?, tag_id, strftime('%s','now'), ? \
FROM tagged_photo \
WHERE content_hash = ? \
AND tag_id NOT IN ( \
SELECT tag_id FROM tagged_photo WHERE content_hash = ? \
)",
)
.bind::<diesel::sql_types::Text, _>(survivor_rel_path)
.bind::<diesel::sql_types::Text, _>(survivor_hash)
.bind::<diesel::sql_types::Text, _>(demoted_hash)
.bind::<diesel::sql_types::Text, _>(survivor_hash)
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|_| anyhow::anyhow!("Tag union error"))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
}
#[cfg(test)]
mod exif_dao_tests {
use super::*;
use crate::database::models::InsertLibrary;
use crate::database::test::in_memory_db_connection;
fn ctx() -> opentelemetry::Context {
opentelemetry::Context::new()
}
fn insert_row(dao: &mut SqliteExifDao, lib_id: i32, rel: &str, date: Option<i64>) {
dao.store_exif(
&ctx(),
InsertImageExif {
library_id: lib_id,
file_path: rel.to_string(),
camera_make: None,
camera_model: None,
lens_model: None,
width: None,
height: None,
orientation: None,
gps_latitude: None,
gps_longitude: None,
gps_altitude: None,
focal_length: None,
aperture: None,
shutter_speed: None,
iso: None,
date_taken: date,
created_time: 0,
last_modified: 0,
content_hash: None,
size_bytes: None,
phash_64: None,
dhash_64: None,
date_taken_source: None,
},
)
.expect("insert exif row");
}
fn setup_two_libraries() -> SqliteExifDao {
let mut conn = in_memory_db_connection();
// Migration seeds library id=1 with a placeholder root; add id=2.
diesel::insert_into(schema::libraries::table)
.values(InsertLibrary {
name: "archive",
root_path: "/tmp/archive",
created_at: 0,
enabled: true,
excluded_dirs: None,
})
.execute(&mut conn)
.expect("seed second library");
SqliteExifDao::from_connection(conn)
}
#[test]
fn get_all_with_date_taken_union_returns_all_libraries() {
let mut dao = setup_two_libraries();
insert_row(&mut dao, 1, "main/a.jpg", Some(100));
insert_row(&mut dao, 2, "archive/b.jpg", Some(200));
// Row without a date must be excluded even in union mode.
insert_row(&mut dao, 2, "archive/c.jpg", None);
let mut rows = dao.get_all_with_date_taken(&ctx(), None).unwrap();
rows.sort_by_key(|(_, ts)| *ts);
assert_eq!(
rows,
vec![
("main/a.jpg".to_string(), 100),
("archive/b.jpg".to_string(), 200),
]
);
}
#[test]
fn get_all_with_date_taken_scopes_by_library_id() {
let mut dao = setup_two_libraries();
insert_row(&mut dao, 1, "main/a.jpg", Some(100));
insert_row(&mut dao, 2, "archive/b.jpg", Some(200));
insert_row(&mut dao, 2, "archive/c.jpg", Some(300));
let lib2 = dao.get_all_with_date_taken(&ctx(), Some(2)).unwrap();
let mut paths: Vec<String> = lib2.into_iter().map(|(p, _)| p).collect();
paths.sort();
assert_eq!(paths, vec!["archive/b.jpg", "archive/c.jpg"]);
let lib1 = dao.get_all_with_date_taken(&ctx(), Some(1)).unwrap();
assert_eq!(lib1, vec![("main/a.jpg".to_string(), 100)]);
}
#[test]
fn query_by_exif_scopes_by_library_id() {
let mut dao = setup_two_libraries();
insert_row(&mut dao, 1, "main/a.jpg", Some(100));
insert_row(&mut dao, 2, "archive/a.jpg", Some(200));
// Union: both rows.
let all = dao
.query_by_exif(&ctx(), None, None, None, None, None, None, None)
.unwrap();
assert_eq!(all.len(), 2);
// Scoped to lib 2: only archive row.
let lib2 = dao
.query_by_exif(&ctx(), Some(2), None, None, None, None, None, None)
.unwrap();
assert_eq!(lib2.len(), 1);
assert_eq!(lib2[0].file_path, "archive/a.jpg");
assert_eq!(lib2[0].library_id, 2);
}
#[test]
fn get_exif_batch_scopes_by_library_id() {
let mut dao = setup_two_libraries();
// Same rel_path, different libraries — the cross-library duplicate
// case the audit flagged.
insert_row(&mut dao, 1, "shared/photo.jpg", Some(100));
insert_row(&mut dao, 2, "shared/photo.jpg", Some(200));
// None spans both libraries (legacy union behavior).
let union = dao
.get_exif_batch(&ctx(), None, &["shared/photo.jpg".to_string()])
.unwrap();
assert_eq!(union.len(), 2);
// Some(2) returns only the archive row.
let scoped = dao
.get_exif_batch(&ctx(), Some(2), &["shared/photo.jpg".to_string()])
.unwrap();
assert_eq!(scoped.len(), 1);
assert_eq!(scoped[0].library_id, 2);
assert_eq!(scoped[0].date_taken, Some(200));
}
#[test]
fn count_for_library_returns_per_library_count() {
let mut dao = setup_two_libraries();
insert_row(&mut dao, 1, "main/a.jpg", None);
insert_row(&mut dao, 1, "main/b.jpg", None);
insert_row(&mut dao, 2, "archive/a.jpg", None);
assert_eq!(dao.count_for_library(&ctx(), 1).unwrap(), 2);
assert_eq!(dao.count_for_library(&ctx(), 2).unwrap(), 1);
// Unknown library: zero, no error.
assert_eq!(dao.count_for_library(&ctx(), 999).unwrap(), 0);
}
/// Insert a row with an explicit date source — used by the
/// canonical-date drain tests below.
fn insert_row_with_source(
dao: &mut SqliteExifDao,
lib_id: i32,
rel: &str,
date: Option<i64>,
source: Option<&str>,
) {
dao.store_exif(
&ctx(),
InsertImageExif {
library_id: lib_id,
file_path: rel.to_string(),
camera_make: None,
camera_model: None,
lens_model: None,
width: None,
height: None,
orientation: None,
gps_latitude: None,
gps_longitude: None,
gps_altitude: None,
focal_length: None,
aperture: None,
shutter_speed: None,
iso: None,
date_taken: date,
created_time: 0,
last_modified: 0,
content_hash: None,
size_bytes: None,
phash_64: None,
dhash_64: None,
date_taken_source: source.map(|s| s.to_string()),
},
)
.expect("insert exif row");
}
#[test]
fn get_rows_needing_date_backfill_returns_null_only() {
let mut dao = setup_two_libraries();
// Each row exercises a different source. Only NULL is eligible —
// fs_time was removed from the drain because re-resolving it is
// deterministic-no-op work that starves other writers.
insert_row_with_source(&mut dao, 1, "main/null.jpg", None, None);
insert_row_with_source(&mut dao, 1, "main/fs.jpg", Some(123), Some("fs_time"));
insert_row_with_source(&mut dao, 1, "main/name.jpg", Some(456), Some("filename"));
insert_row_with_source(&mut dao, 1, "main/real.jpg", Some(789), Some("exif"));
// Other library — never returned even when eligible.
insert_row_with_source(&mut dao, 2, "archive/null.jpg", None, None);
let rows = dao.get_rows_needing_date_backfill(&ctx(), 1, 100).unwrap();
let paths: Vec<String> = rows.into_iter().map(|(_, p)| p).collect();
assert_eq!(paths.len(), 1, "expected only NULL-date rows");
assert!(paths.contains(&"main/null.jpg".to_string()));
}
#[test]
fn backfill_date_taken_writes_date_and_source_only() {
let mut dao = setup_two_libraries();
insert_row_with_source(&mut dao, 1, "main/x.jpg", None, None);
// Set a content_hash on the row to verify backfill_date_taken
// doesn't disturb other columns. Using the existing
// backfill_content_hash for this verifies via a separate path.
dao.backfill_content_hash(&ctx(), 1, "main/x.jpg", "deadbeef", 1024)
.unwrap();
dao.backfill_date_taken(&ctx(), 1, "main/x.jpg", 1700000000, "exiftool")
.unwrap();
let row = dao.get_exif(&ctx(), "main/x.jpg").unwrap().unwrap();
assert_eq!(row.date_taken, Some(1700000000));
assert_eq!(row.date_taken_source, Some("exiftool".to_string()));
// Untouched columns survive.
assert_eq!(row.content_hash, Some("deadbeef".to_string()));
assert_eq!(row.size_bytes, Some(1024));
}
#[test]
fn get_memories_in_window_day_matches_only_same_md_in_year_window() {
let mut dao = setup_two_libraries();
// Anchor on a known date so the test is timezone-stable: insert
// rows whose date_taken IS the same wall-clock time as `now()`
// would have been some N years ago, and verify the day-span
// filter returns them. We can't bind 'now' from Rust, so instead
// we insert rows for the *current* day (offset by 365 days * N
// years) and rely on SQLite computing the same `%m-%d` for both
// sides of the equality. Using the unix-now-minus-365*N seconds
// approximation is good enough — leap years drift by ~one day
// every four years, but the test only checks day-of-year match
// for rows inserted "today minus N years (no leap correction)".
// To dodge the leap-year drift entirely, we use rows whose
// calendar date is read back from SQLite and we just check
// membership.
// 1y, 5y, 10y, 21y back from 'now':
let now_ts = chrono::Utc::now().timestamp();
let year_secs: i64 = 365 * 86_400;
insert_row_with_source(
&mut dao,
1,
"y1.jpg",
Some(now_ts - year_secs),
Some("exif"),
);
insert_row_with_source(
&mut dao,
1,
"y5.jpg",
Some(now_ts - 5 * year_secs),
Some("exif"),
);
insert_row_with_source(
&mut dao,
1,
"y10.jpg",
Some(now_ts - 10 * year_secs),
Some("exif"),
);
// Outside the 20-year window:
insert_row_with_source(
&mut dao,
1,
"y21.jpg",
Some(now_ts - 21 * year_secs),
Some("exif"),
);
// Future row: must be excluded by the `<= now` clause.
insert_row_with_source(
&mut dao,
1,
"future.jpg",
Some(now_ts + 86_400),
Some("exif"),
);
// No date — never returned regardless of source.
insert_row_with_source(&mut dao, 1, "nodate.jpg", None, None);
// Month span returns rows from the same calendar month over the
// window — y1, y5, y10 should all qualify (same month any year),
// y21 trims (out of years_back), future trims (> now), nodate
// never qualifies. Day-of-month leap drift means even with 365-
// day approximation a row may shift by one in either direction;
// month is the safer assertion under that approximation.
let rows = dao
.get_memories_in_window(&ctx(), 1, "month", 20, 0)
.unwrap();
let paths: std::collections::HashSet<String> =
rows.into_iter().map(|(p, _, _)| p).collect();
assert!(
paths.contains("y1.jpg") && paths.contains("y5.jpg") && paths.contains("y10.jpg"),
"month span should include all in-window rows: {:?}",
paths
);
assert!(
!paths.contains("y21.jpg"),
"21-year-old row should fall outside the years_back window"
);
assert!(!paths.contains("future.jpg"), "future row must be excluded");
assert!(
!paths.contains("nodate.jpg"),
"row without date must never appear"
);
}
#[test]
fn get_memories_in_window_scopes_by_library_id() {
let mut dao = setup_two_libraries();
let now_ts = chrono::Utc::now().timestamp();
let year = 365 * 86_400i64;
insert_row_with_source(&mut dao, 1, "main/x.jpg", Some(now_ts - year), Some("exif"));
insert_row_with_source(
&mut dao,
2,
"archive/x.jpg",
Some(now_ts - year),
Some("exif"),
);
let lib1 = dao
.get_memories_in_window(&ctx(), 1, "month", 20, 0)
.unwrap();
let lib2 = dao
.get_memories_in_window(&ctx(), 2, "month", 20, 0)
.unwrap();
assert_eq!(lib1.len(), 1);
assert_eq!(lib1[0].0, "main/x.jpg");
assert_eq!(lib2.len(), 1);
assert_eq!(lib2[0].0, "archive/x.jpg");
}
#[test]
fn get_memories_in_window_rejects_unknown_span_token() {
let mut dao = setup_two_libraries();
let err = dao.get_memories_in_window(&ctx(), 1, "decade", 20, 0);
assert!(err.is_err());
}
}