diff --git a/CLAUDE.md b/CLAUDE.md index 86515d2..1968167 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -286,6 +286,15 @@ SMS_API_TOKEN=your-api-token # SMS API authentication token (o # `get_personal_place_at` tool. Unset = legacy Nominatim-only path. APOLLO_API_BASE_URL=http://apollo.lan:8000 # Base URL of the sibling Apollo backend +# Face inference (optional). Apollo also hosts the insightface inference +# service; ImageApi calls it from the file-watch hook (Phase 3) and from +# the manual face-create endpoint. Falls back to APOLLO_API_BASE_URL when +# unset (typical single-Apollo deploy). Both unset = feature disabled. +APOLLO_FACE_API_BASE_URL=http://apollo.lan:8000 # Override if face service runs separately +FACE_AUTOBIND_MIN_COS=0.4 # Phase 3: cosine-sim floor for tag-name auto-bind +FACE_DETECT_CONCURRENCY=8 # Phase 3: per-scan-tick parallel detect calls +FACE_DETECT_TIMEOUT_SEC=60 # reqwest client timeout (CPU inference can be slow) + # OpenRouter (Hybrid Backend) - keeps embeddings + vision local, routes chat to OpenRouter OPENROUTER_API_KEY=sk-or-... # Required to enable hybrid backend OPENROUTER_DEFAULT_MODEL=anthropic/claude-sonnet-4 # Used when client doesn't pick a model diff --git a/Cargo.lock b/Cargo.lock index 891a2f9..2023d51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3229,6 +3229,7 @@ dependencies = [ "js-sys", "log", "mime", + "mime_guess", "native-tls", "percent-encoding", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index 0a25252..1c89808 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ opentelemetry-appender-log = "0.31.0" tempfile = "3.20.0" regex = "1.11.1" exif = { package = "kamadak-exif", version = "0.6.1" } -reqwest = { version = "0.12", features = ["json", "stream"] } +reqwest = { version = "0.12", features = ["json", "stream", "multipart"] } async-stream = "0.3" tokio-util = { version = "0.7", features = ["io"] } bytes = "1" diff --git a/README.md b/README.md index fceba81..caa3de0 100644 --- a/README.md +++ b/README.md @@ -159,3 +159,26 @@ Daily conversation summaries are generated automatically on server startup. Conf - Contacts to process - Model version used for embeddings: `nomic-embed-text:v1.5` +### Apollo + Face Recognition (Optional) + +Apollo (sibling project) hosts both the Places API and the local insightface +inference service. Both integrations are optional and degrade gracefully when +unset. + +- `APOLLO_API_BASE_URL` - Base URL of the sibling Apollo backend. + - When set, photo-insight enrichment folds the user's personal place name + (Home, Work, Cabin, ...) into the location string, and the agentic loop + gains a `get_personal_place_at` tool. Unset = legacy Nominatim-only path. +- `APOLLO_FACE_API_BASE_URL` - Base URL for the face-detection service. + - Falls back to `APOLLO_API_BASE_URL` when unset (typical single-Apollo + deploy). Both unset = face feature disabled (file-watch hook and + manual-face endpoints short-circuit silently). +- `FACE_AUTOBIND_MIN_COS` (Phase 3) - Cosine-sim floor for auto-binding a + detected face to an existing same-named person via people-tag bootstrap + [default: `0.4`]. +- `FACE_DETECT_CONCURRENCY` (Phase 3) - Per-scan-tick concurrent detect + calls fired by the file watcher [default: `8`]. Apollo serializes them + via its single-worker GPU pool. +- `FACE_DETECT_TIMEOUT_SEC` - reqwest client timeout per detect call + [default: `60`]. CPU inference on a backlog can take many seconds. + diff --git a/migrations/2026-04-29-000000_add_faces/down.sql b/migrations/2026-04-29-000000_add_faces/down.sql new file mode 100644 index 0000000..bae8303 --- /dev/null +++ b/migrations/2026-04-29-000000_add_faces/down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS face_detections; +DROP TABLE IF EXISTS persons; diff --git a/migrations/2026-04-29-000000_add_faces/up.sql b/migrations/2026-04-29-000000_add_faces/up.sql new file mode 100644 index 0000000..4f4f4c8 --- /dev/null +++ b/migrations/2026-04-29-000000_add_faces/up.sql @@ -0,0 +1,67 @@ +-- Local face recognition tables. +-- +-- `persons` are visual identities (the "who" of a face). The optional +-- `entity_id` bridges to the existing knowledge graph `entities` table — +-- when set, this person is the visual side of an LLM-extracted entity. +-- Don't auto-create entities from persons; the entity table represents +-- LLM-extracted knowledge with its own confidence semantics, and silently +-- filling it from face detections muddies the provenance. +-- +-- `face_detections` carries one row per detected face on a content_hash, +-- plus marker rows with `status='no_faces'` or `status='failed'` so the +-- file watcher knows not to re-scan a hash. Keying on `content_hash` +-- (cross-library dedup) rather than `(library_id, rel_path)` means the +-- same JPEG in two libraries is scanned once. The denormalized `rel_path` +-- carries the most-recently-seen path — useful for cluster-thumb URL +-- generation; canonical path lookup goes through image_exif. + +CREATE TABLE persons ( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + name TEXT NOT NULL, + cover_face_id INTEGER, -- backfilled when the first face binds + entity_id INTEGER, -- optional bridge to entities(id) + created_from_tag BOOLEAN NOT NULL DEFAULT 0, + notes TEXT, + created_at BIGINT NOT NULL, + updated_at BIGINT NOT NULL, + CONSTRAINT fk_persons_entity FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE SET NULL, + UNIQUE(name COLLATE NOCASE) +); + +CREATE INDEX idx_persons_entity ON persons(entity_id); + +CREATE TABLE face_detections ( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + library_id INTEGER NOT NULL, + content_hash TEXT NOT NULL, -- canonical key (cross-library dedup) + rel_path TEXT NOT NULL, -- denormalized; most recently seen + bbox_x REAL, -- normalized 0..1; NULL on marker rows + bbox_y REAL, + bbox_w REAL, + bbox_h REAL, + embedding BLOB, -- 512×f32 = 2048 bytes; NULL on marker rows + confidence REAL, -- detector score + source TEXT NOT NULL, -- 'auto' | 'manual' + person_id INTEGER, + status TEXT NOT NULL DEFAULT 'detected', -- 'detected' | 'no_faces' | 'failed' + model_version TEXT NOT NULL, -- e.g. 'buffalo_l'; embedding lineage + created_at BIGINT NOT NULL, + CONSTRAINT fk_fd_library FOREIGN KEY (library_id) REFERENCES libraries(id), + CONSTRAINT fk_fd_person FOREIGN KEY (person_id) REFERENCES persons(id) ON DELETE SET NULL, + -- Detected rows carry geometry + embedding; marker rows ('no_faces', + -- 'failed') carry neither. CHECK enforces the invariant so manual + -- inserts can't slip through with half a row. + CONSTRAINT chk_marker CHECK ( + (status = 'detected' AND bbox_x IS NOT NULL AND embedding IS NOT NULL) + OR (status IN ('no_faces','failed') AND bbox_x IS NULL AND embedding IS NULL) + ) +); + +CREATE INDEX idx_face_detections_hash ON face_detections(content_hash); +CREATE INDEX idx_face_detections_lib_path ON face_detections(library_id, rel_path); +CREATE INDEX idx_face_detections_person ON face_detections(person_id); +CREATE INDEX idx_face_detections_status ON face_detections(status); +-- One marker row per (content_hash, status='no_faces') so the file watcher +-- doesn't double-mark when a hash is seen on multiple full-scan passes. +CREATE UNIQUE INDEX idx_face_detections_no_faces_unique + ON face_detections(content_hash) WHERE status = 'no_faces'; diff --git a/src/ai/face_client.rs b/src/ai/face_client.rs new file mode 100644 index 0000000..3bd2472 --- /dev/null +++ b/src/ai/face_client.rs @@ -0,0 +1,312 @@ +//! Thin async HTTP client for Apollo's `/api/internal/faces/*` endpoints. +//! +//! Apollo (the personal location-history viewer at the sibling repo) hosts the +//! insightface inference service. This client is the ImageApi side of the +//! contract — it shoves image bytes through `/detect` and returns boxes + +//! 512-d ArcFace embeddings, plus a single-embedding `/embed` for the manual +//! face-create flow. +//! +//! Mirrors `apollo_client.rs` shape: optional base URL (None = disabled, the +//! file watcher and manual-create handlers no-op), reqwest client with a +//! generous timeout because CPU inference on a backlog can take many seconds +//! per photo. +//! +//! Configured via `APOLLO_FACE_API_BASE_URL`, falling back to +//! `APOLLO_API_BASE_URL` when the dedicated var is unset (single-Apollo +//! deploys are the common case). Both unset → `is_enabled()` returns false. +//! +//! Wire format: multipart/form-data with `file=` and `meta=`. +//! `meta` carries `{content_hash, library_id, rel_path, orientation?, +//! model_version?}` — useful for Apollo-side logging and idempotency, ignored +//! by Apollo today but part of the stable wire contract so future versions +//! can act on it without a client change. +//! +//! Error mapping (reflected in [`FaceDetectError`]): +//! - 422 `decode_failed` → permanent: ImageApi marks `status='failed'` and +//! doesn't retry until manual rerun. +//! - 200 with `faces:[]` → `status='no_faces'` marker row. +//! - 503 `cuda_oom` / `engine_unavailable` → defer-and-retry: no marker +//! written. +//! - Any other 5xx / network error → defer. + +use anyhow::{Context, Result}; +use base64::Engine; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +#[derive(Debug, Clone, Serialize)] +pub struct DetectMeta { + pub content_hash: String, + pub library_id: i32, + pub rel_path: String, + /// EXIF orientation int (1..8). Apollo applies `exif_transpose` on the + /// bytes before inference, so this is informational only — supply when + /// the bytes were extracted from a RAW preview that lost the tag. + #[serde(skip_serializing_if = "Option::is_none")] + pub orientation: Option, + /// Echoed back in the response. ImageApi stores it in + /// `face_detections.model_version`. + #[serde(skip_serializing_if = "Option::is_none")] + pub model_version: Option, +} + +// Wire shape for the bbox sub-object Apollo returns. Read by Phase 3's +// file-watch hook; silence the dead-code lint until then. +#[allow(dead_code)] +#[derive(Debug, Clone, Deserialize)] +pub struct DetectedBbox { + pub x: f32, + pub y: f32, + pub w: f32, + pub h: f32, +} + +#[allow(dead_code)] // bbox consumed by Phase 3 file-watch hook +#[derive(Debug, Clone, Deserialize)] +pub struct DetectedFace { + pub bbox: DetectedBbox, + pub confidence: f32, + /// base64 of 2048 bytes (512×f32 LE). ImageApi stores the raw bytes + /// verbatim as a BLOB — see `decode_embedding` for the unpack. + pub embedding: String, +} + +impl DetectedFace { + /// Decode the wire-format embedding back into raw bytes for storage. + /// Returns the 2048-byte little-endian f32 buffer or an error if the + /// base64 is malformed or the wrong length. + pub fn decode_embedding(&self) -> Result> { + let bytes = base64::engine::general_purpose::STANDARD + .decode(self.embedding.as_bytes()) + .context("face embedding base64 decode")?; + if bytes.len() != 2048 { + anyhow::bail!( + "face embedding wrong size: got {} bytes, expected 2048", + bytes.len() + ); + } + Ok(bytes) + } +} + +#[allow(dead_code)] // duration_ms logged by Phase 3 file-watch hook +#[derive(Debug, Clone, Deserialize)] +pub struct DetectResponse { + pub model_version: String, + pub duration_ms: i64, + pub faces: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +#[allow(dead_code)] // Reported by Apollo; useful for future health-driven backoff +pub struct FaceHealth { + pub loaded: bool, + pub providers: Vec, + pub model_version: String, + pub det_size: i32, + #[serde(default)] + pub load_error: Option, +} + +/// Distinguishes permanent failures (don't retry) from transient ones +/// (defer and retry on next scan tick). The file-watch hook keys its +/// marker-row decision on this — a `Permanent` outcome writes +/// `status='failed'`, a `Transient` outcome writes nothing so the next +/// pass tries again. +#[derive(Debug)] +pub enum FaceDetectError { + /// Apollo refused the bytes for a reason that won't change on retry + /// (decode failure, zero-dim image). Mark `status='failed'`. + Permanent(anyhow::Error), + /// Apollo couldn't process this turn but might next time (CUDA OOM, + /// engine not loaded yet, network hiccup). Don't mark anything. + Transient(anyhow::Error), + /// Feature is disabled (no `APOLLO_FACE_API_BASE_URL`). Caller should + /// silently no-op — same shape as `apollo_client::is_enabled()` false. + Disabled, +} + +impl std::fmt::Display for FaceDetectError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + FaceDetectError::Permanent(e) => write!(f, "permanent: {e}"), + FaceDetectError::Transient(e) => write!(f, "transient: {e}"), + FaceDetectError::Disabled => write!(f, "face client disabled"), + } + } +} + +impl std::error::Error for FaceDetectError {} + +#[derive(Clone)] +pub struct FaceClient { + client: Client, + /// `None` → disabled. Trim trailing slash at construction so url + /// building doesn't double up. + base_url: Option, +} + +impl FaceClient { + pub fn new(base_url: Option) -> Self { + // 60 s timeout: CPU inference on a backlog can take many seconds + // per photo, especially the first call into a cold GPU. Apollo's + // bounded threadpool (1 worker on CUDA) means concurrent calls + // queue server-side; 60 s is enough headroom for a few items in + // the queue without surfacing a false transient. + let timeout_secs = std::env::var("FACE_DETECT_TIMEOUT_SEC") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(60); + let client = Client::builder() + .timeout(Duration::from_secs(timeout_secs)) + .build() + .expect("reqwest client build"); + Self { + client, + base_url: base_url.map(|u| u.trim_end_matches('/').to_string()), + } + } + + pub fn is_enabled(&self) -> bool { + self.base_url.is_some() + } + + /// Detect every face in `bytes`. ImageApi calls this from the file-watch + /// hook (Phase 3) and from the manual rerun handler. Empty `faces[]` in + /// the response is the no-faces signal — caller writes a marker row. + #[allow(dead_code)] // Phase 3 file-watch hook + rerun handler + pub async fn detect( + &self, + bytes: Vec, + meta: DetectMeta, + ) -> std::result::Result { + let Some(base) = self.base_url.as_deref() else { + return Err(FaceDetectError::Disabled); + }; + let url = format!("{}/api/internal/faces/detect", base); + self.post_multipart(&url, bytes, &meta).await + } + + /// Single-embedding endpoint for the manual face-create flow. Caller + /// crops the image to the user-drawn bbox and passes those bytes; we + /// run detection inside the crop and return the highest-confidence + /// face's embedding. Apollo returns 422 `no_face_in_crop` when the + /// box missed — surfaced here as `Permanent`. + pub async fn embed( + &self, + bytes: Vec, + meta: DetectMeta, + ) -> std::result::Result { + let Some(base) = self.base_url.as_deref() else { + return Err(FaceDetectError::Disabled); + }; + let url = format!("{}/api/internal/faces/embed", base); + self.post_multipart(&url, bytes, &meta).await + } + + /// Engine reachability + provider/model report. Used by ImageApi for a + /// startup sanity check; not on the hot path. + #[allow(dead_code)] // Phase 3 startup probe + pub async fn health(&self) -> Result { + let base = self.base_url.as_deref().context("face client disabled")?; + let url = format!("{}/api/internal/faces/health", base); + let resp = self.client.get(&url).send().await?.error_for_status()?; + let body: FaceHealth = resp.json().await?; + Ok(body) + } + + async fn post_multipart( + &self, + url: &str, + bytes: Vec, + meta: &DetectMeta, + ) -> std::result::Result { + let meta_json = serde_json::to_string(meta) + .map_err(|e| FaceDetectError::Permanent(anyhow::anyhow!("meta serialize: {e}")))?; + let form = reqwest::multipart::Form::new() + .text("meta", meta_json) + .part( + "file", + reqwest::multipart::Part::bytes(bytes) + .file_name(meta.rel_path.clone()) + .mime_str("application/octet-stream") + .unwrap_or_else(|_| reqwest::multipart::Part::bytes(Vec::new())), + ); + + let resp = match self.client.post(url).multipart(form).send().await { + Ok(r) => r, + Err(e) if e.is_timeout() || e.is_connect() => { + return Err(FaceDetectError::Transient(anyhow::anyhow!( + "face client network: {e}" + ))); + } + Err(e) => { + return Err(FaceDetectError::Transient(anyhow::anyhow!( + "face client request: {e}" + ))); + } + }; + + let status = resp.status(); + if status.is_success() { + let body: DetectResponse = resp.json().await.map_err(|e| { + FaceDetectError::Transient(anyhow::anyhow!("face response decode: {e}")) + })?; + return Ok(body); + } + + let body_text = resp.text().await.unwrap_or_default(); + // Apollo encodes its error class in the JSON body's `detail`. Try + // to parse it; fall back to status-only classification. + let detail_code = serde_json::from_str::(&body_text) + .ok() + .and_then(|v| { + // detail can be a string ("decode_failed") or an object + // ({"code": "cuda_oom", ...}) depending on the endpoint + // and Apollo's response shape — handle both. + v.get("detail") + .and_then(|d| d.as_str().map(str::to_string)) + .or_else(|| { + v.get("detail") + .and_then(|d| d.get("code")) + .and_then(|c| c.as_str()) + .map(str::to_string) + }) + }) + .unwrap_or_default(); + + if status == reqwest::StatusCode::UNPROCESSABLE_ENTITY { + return Err(FaceDetectError::Permanent(anyhow::anyhow!( + "face detect 422 {}: {}", + detail_code, + body_text + ))); + } + if status == reqwest::StatusCode::SERVICE_UNAVAILABLE { + return Err(FaceDetectError::Transient(anyhow::anyhow!( + "face detect 503 {}: {}", + detail_code, + body_text + ))); + } + // Any other 4xx: be conservative and treat as Permanent so we + // don't loop forever on a stable rejection. Any other 5xx: + // Transient — likely intermittent. + if status.is_client_error() { + Err(FaceDetectError::Permanent(anyhow::anyhow!( + "face detect {} {}: {}", + status.as_u16(), + detail_code, + body_text + ))) + } else { + Err(FaceDetectError::Transient(anyhow::anyhow!( + "face detect {} {}: {}", + status.as_u16(), + detail_code, + body_text + ))) + } + } +} diff --git a/src/ai/mod.rs b/src/ai/mod.rs index a9d55bf..d6fda90 100644 --- a/src/ai/mod.rs +++ b/src/ai/mod.rs @@ -1,5 +1,6 @@ pub mod apollo_client; pub mod daily_summary_job; +pub mod face_client; pub mod handlers; pub mod insight_chat; pub mod insight_generator; diff --git a/src/database/schema.rs b/src/database/schema.rs index e49f21f..5427ef7 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -70,6 +70,26 @@ diesel::table! { } } +diesel::table! { + face_detections (id) { + id -> Integer, + library_id -> Integer, + content_hash -> Text, + rel_path -> Text, + bbox_x -> Nullable, + bbox_y -> Nullable, + bbox_w -> Nullable, + bbox_h -> Nullable, + embedding -> Nullable, + confidence -> Nullable, + source -> Text, + person_id -> Nullable, + status -> Text, + model_version -> Text, + created_at -> BigInt, + } +} + diesel::table! { favorites (id) { id -> Integer, @@ -130,6 +150,19 @@ diesel::table! { } } +diesel::table! { + persons (id) { + id -> Integer, + name -> Text, + cover_face_id -> Nullable, + entity_id -> Nullable, + created_from_tag -> Bool, + notes -> Nullable, + created_at -> BigInt, + updated_at -> BigInt, + } +} + diesel::table! { photo_insights (id) { id -> Integer, @@ -201,7 +234,10 @@ diesel::table! { diesel::joinable!(entity_facts -> photo_insights (source_insight_id)); diesel::joinable!(entity_photo_links -> entities (entity_id)); diesel::joinable!(entity_photo_links -> libraries (library_id)); +diesel::joinable!(face_detections -> libraries (library_id)); +diesel::joinable!(face_detections -> persons (person_id)); diesel::joinable!(image_exif -> libraries (library_id)); +diesel::joinable!(persons -> entities (entity_id)); diesel::joinable!(photo_insights -> libraries (library_id)); diesel::joinable!(tagged_photo -> tags (tag_id)); diesel::joinable!(video_preview_clips -> libraries (library_id)); @@ -212,10 +248,12 @@ diesel::allow_tables_to_appear_in_same_query!( entities, entity_facts, entity_photo_links, + face_detections, favorites, image_exif, libraries, location_history, + persons, photo_insights, search_history, tagged_photo, diff --git a/src/faces.rs b/src/faces.rs new file mode 100644 index 0000000..aa79e14 --- /dev/null +++ b/src/faces.rs @@ -0,0 +1,1863 @@ +//! Local face recognition: data layer + HTTP surface. +//! +//! Phase 2 ships the persistence model and the manual CRUD endpoints; the +//! file-watch hook that drives automatic detection lives in `process_new_files` +//! (Phase 3) and is not registered yet. Inference is delegated to Apollo over +//! HTTP via [`crate::ai::face_client`]; this module never imports onnxruntime. +//! +//! Data model: +//! - `persons` are visual identities (the "who" of a face). +//! - `face_detections` rows are either real detections (`status='detected'`) +//! or markers (`status='no_faces' | 'failed'`). Both are keyed on +//! `content_hash` so the same JPEG in two libraries is scanned once. +//! - The `(library_id, rel_path)` pair is the *display* lookup; we resolve +//! it through `image_exif.content_hash` on every read so renames don't +//! strand face rows. +//! +//! The `FaceDao` trait abstracts persistence; `SqliteFaceDao` is the +//! production impl. The Phase 2 endpoints use it directly. A test impl +//! (in-memory) lives at the bottom of the module behind `#[cfg(test)]`. + +use crate::Claims; +use crate::ai::face_client::{DetectMeta, FaceClient, FaceDetectError}; +use crate::database::schema::{face_detections, image_exif, persons}; +use crate::error::IntoHttpError; +use crate::libraries::{self, Library}; +use crate::otel::{extract_context_from_request, global_tracer, trace_db_call}; +use crate::state::AppState; +use crate::utils::normalize_path; +use crate::{ThumbnailRequest, connect}; +use actix_web::dev::{ServiceFactory, ServiceRequest}; +use actix_web::{App, HttpRequest, HttpResponse, Responder, web}; +use anyhow::{Context, anyhow}; +use chrono::Utc; +use diesel::prelude::*; +use image::GenericImageView; +use log::{info, warn}; +use opentelemetry::KeyValue; +use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; +use serde::{Deserialize, Serialize}; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex}; + +// ── Wire types ────────────────────────────────────────────────────────────── + +/// Visual identity. The optional `entity_id` bridges this person to an +/// LLM-extracted knowledge-graph entity (textual side). Persons are NOT +/// auto-bridged at creation — only when the user explicitly links them in +/// the management UI, or when bootstrap finds an exact-name match. +#[derive(Serialize, Queryable, Clone, Debug)] +pub struct Person { + pub id: i32, + pub name: String, + pub cover_face_id: Option, + pub entity_id: Option, + pub created_from_tag: bool, + pub notes: Option, + pub created_at: i64, + pub updated_at: i64, +} + +#[derive(Insertable, Debug)] +#[diesel(table_name = persons)] +struct InsertPerson { + name: String, + notes: Option, + created_from_tag: bool, + created_at: i64, + updated_at: i64, +} + +#[derive(Serialize, Queryable, Clone, Debug)] +pub struct FaceDetectionRow { + pub id: i32, + pub library_id: i32, + pub content_hash: String, + pub rel_path: String, + pub bbox_x: Option, + pub bbox_y: Option, + pub bbox_w: Option, + pub bbox_h: Option, + /// Skip on the wire — clients call /faces/embeddings explicitly when + /// they need it. Saves ~2 KB per face on every list response. + #[serde(skip_serializing)] + pub embedding: Option>, + pub confidence: Option, + pub source: String, + pub person_id: Option, + pub status: String, + pub model_version: String, + pub created_at: i64, +} + +#[derive(Insertable, Debug)] +#[diesel(table_name = face_detections)] +struct InsertFaceDetection { + library_id: i32, + content_hash: String, + rel_path: String, + bbox_x: Option, + bbox_y: Option, + bbox_w: Option, + bbox_h: Option, + embedding: Option>, + confidence: Option, + source: String, + person_id: Option, + status: String, + model_version: String, + created_at: i64, +} + +/// Face row decorated with its assigned person's name. Returned by +/// `/image/faces` for the rendering side (carousel overlay, person chips). +#[derive(Serialize, Debug, Clone)] +pub struct FaceWithPerson { + pub id: i32, + pub bbox_x: f32, + pub bbox_y: f32, + pub bbox_w: f32, + pub bbox_h: f32, + pub confidence: f32, + pub source: String, + pub person_id: Option, + pub person_name: Option, + pub model_version: String, +} + +/// Face row plus the photo it lives on. Powers the per-person photo grid +/// (`GET /persons/{id}/faces`) and unassigned-cluster surfacing in Apollo. +#[derive(Serialize, Debug, Clone)] +pub struct FaceWithPath { + pub id: i32, + pub library_id: i32, + pub rel_path: String, + pub bbox_x: f32, + pub bbox_y: f32, + pub bbox_w: f32, + pub bbox_h: f32, + pub confidence: f32, + pub person_id: Option, + pub model_version: String, +} + +/// Embedding-bearing face row. Returned by `/faces/embeddings` for Apollo's +/// clustering layer; embedding is base64-encoded so the JSON payload is +/// self-contained (Apollo's DBSCAN runs over numpy arrays decoded from this). +#[derive(Serialize, Debug, Clone)] +pub struct FaceEmbeddingRow { + pub id: i32, + pub library_id: i32, + pub rel_path: String, + pub content_hash: String, + pub person_id: Option, + pub model_version: String, + /// base64 of 2048 bytes (512×f32 LE). + pub embedding: String, +} + +#[derive(Serialize, Debug, Default)] +pub struct FaceStats { + pub library_id: Option, + pub total_photos: i64, + pub scanned: i64, + pub with_faces: i64, + pub no_faces: i64, + pub failed: i64, + pub persons_count: i64, + pub unassigned_faces: i64, +} + +#[derive(Serialize, Debug, Clone)] +pub struct PersonSummary { + pub id: i32, + pub name: String, + pub cover_face_id: Option, + pub entity_id: Option, + pub created_from_tag: bool, + pub notes: Option, + pub face_count: i64, +} + +// ── Request bodies ────────────────────────────────────────────────────────── + +#[derive(Deserialize, Debug)] +pub struct CreatePersonReq { + pub name: String, + #[serde(default)] + pub notes: Option, + /// Optional bridge to an existing entity. NULL/missing leaves it + /// unbridged; set explicitly to wire the person to LLM-extracted facts. + #[serde(default)] + pub entity_id: Option, +} + +#[derive(Deserialize, Debug)] +pub struct UpdatePersonReq { + #[serde(default)] + pub name: Option, + #[serde(default)] + pub notes: Option, + #[serde(default)] + pub cover_face_id: Option, + #[serde(default)] + pub entity_id: Option, +} + +#[derive(Deserialize, Debug)] +pub struct MergePersonsReq { + /// Person id to merge *into*. The source (`{id}` in the path) is + /// re-pointed to this id, then deleted. + pub into: i32, +} + +#[derive(Deserialize, Debug)] +pub struct DeletePersonQuery { + /// `set_null` (default) leaves face rows orphaned (person_id NULL); + /// `delete` cascades through and removes the face rows entirely. + /// Default is set_null because deleting the person almost never means + /// "delete every photo of them ever existed." + #[serde(default)] + pub cascade: Option, +} + +#[derive(Deserialize, Debug)] +pub struct CreateFaceReq { + /// Photo path (library-relative). Resolved to content_hash via + /// image_exif before any face row is inserted. + pub path: String, + pub library: Option, + pub bbox: BboxReq, + /// Optional initial person assignment. Use this when the user draws a + /// box and immediately picks a name from the autocomplete. + #[serde(default)] + pub person_id: Option, +} + +#[derive(Deserialize, Debug)] +pub struct BboxReq { + pub x: f32, + pub y: f32, + pub w: f32, + pub h: f32, +} + +#[derive(Deserialize, Debug)] +pub struct UpdateFaceReq { + /// `null` literally clears the assignment; missing leaves it alone. + /// Distinguish via `Option>` is tricky in serde without + /// custom deserialization; encode "clear" as `clear_person: true` + /// instead. + #[serde(default)] + pub person_id: Option, + #[serde(default)] + pub clear_person: bool, + #[serde(default)] + pub bbox: Option, +} + +#[derive(Deserialize, Debug)] +pub struct EmbeddingsQuery { + pub library: Option, + /// Default true — clustering only cares about unassigned faces. Set + /// false to dump all embeddings (e.g. for re-clustering everything). + #[serde(default = "default_unassigned")] + pub unassigned: bool, + #[serde(default = "default_embeddings_limit")] + pub limit: i64, + #[serde(default)] + pub offset: i64, +} + +fn default_unassigned() -> bool { + true +} +fn default_embeddings_limit() -> i64 { + 500 +} + +// ── DAO trait ─────────────────────────────────────────────────────────────── + +// File-watch hook (Phase 3) and the rerun handler (Phase 6) consume the +// methods the Phase 2 routes don't. Allow dead_code on the trait so we +// don't have to sprinkle attributes on every method that's wired up later. +#[allow(dead_code)] +pub trait FaceDao: Send + Sync { + fn already_scanned( + &mut self, + ctx: &opentelemetry::Context, + content_hash: &str, + ) -> anyhow::Result; + fn store_detection( + &mut self, + ctx: &opentelemetry::Context, + row: InsertFaceDetectionInput, + ) -> anyhow::Result; + fn mark_status( + &mut self, + ctx: &opentelemetry::Context, + library_id: i32, + content_hash: &str, + rel_path: &str, + status: &str, + model_version: &str, + ) -> anyhow::Result<()>; + fn list_for_content_hash( + &mut self, + ctx: &opentelemetry::Context, + content_hash: &str, + ) -> anyhow::Result>; + fn list_for_person( + &mut self, + ctx: &opentelemetry::Context, + person_id: i32, + library_id: Option, + ) -> anyhow::Result>; + fn list_embeddings( + &mut self, + ctx: &opentelemetry::Context, + library_id: Option, + unassigned: bool, + limit: i64, + offset: i64, + ) -> anyhow::Result>; + fn get_face( + &mut self, + ctx: &opentelemetry::Context, + id: i32, + ) -> anyhow::Result>; + fn update_face( + &mut self, + ctx: &opentelemetry::Context, + id: i32, + person_id: Option>, // None=leave; Some(None)=clear; Some(Some(id))=set + bbox: Option<(f32, f32, f32, f32)>, + embedding: Option>, + ) -> anyhow::Result; + fn delete_face(&mut self, ctx: &opentelemetry::Context, id: i32) -> anyhow::Result; + fn delete_auto_for_hash( + &mut self, + ctx: &opentelemetry::Context, + content_hash: &str, + ) -> anyhow::Result; + fn stats( + &mut self, + ctx: &opentelemetry::Context, + library_id: Option, + ) -> anyhow::Result; + + // ── Persons ───────────────────────────────────────────────────────── + fn create_person( + &mut self, + ctx: &opentelemetry::Context, + req: &CreatePersonReq, + from_tag: bool, + ) -> anyhow::Result; + fn get_person( + &mut self, + ctx: &opentelemetry::Context, + id: i32, + ) -> anyhow::Result>; + fn list_persons( + &mut self, + ctx: &opentelemetry::Context, + library_id: Option, + ) -> anyhow::Result>; + fn update_person( + &mut self, + ctx: &opentelemetry::Context, + id: i32, + patch: &UpdatePersonReq, + ) -> anyhow::Result; + /// Delete a person. `cascade=true` removes face rows; otherwise the + /// rows have their `person_id` set NULL by the FK constraint. + fn delete_person( + &mut self, + ctx: &opentelemetry::Context, + id: i32, + cascade_delete_faces: bool, + ) -> anyhow::Result; + fn merge_persons( + &mut self, + ctx: &opentelemetry::Context, + src: i32, + into: i32, + ) -> anyhow::Result; + + /// Resolve `(library_id, rel_path)` → `content_hash` via image_exif. + /// Returns None when the photo hasn't been EXIF-indexed yet (no row + /// in image_exif) or when the row exists but content_hash is NULL. + fn resolve_content_hash( + &mut self, + ctx: &opentelemetry::Context, + library_id: i32, + rel_path: &str, + ) -> anyhow::Result>; +} + +/// Free-standing input struct; the DAO copies it into [`InsertFaceDetection`] +/// so callers don't need to import the diesel-derived insertable. +#[derive(Debug, Clone)] +pub struct InsertFaceDetectionInput { + pub library_id: i32, + pub content_hash: String, + pub rel_path: String, + pub bbox: Option<(f32, f32, f32, f32)>, + pub embedding: Option>, + pub confidence: Option, + pub source: String, + pub person_id: Option, + pub status: String, + pub model_version: String, +} + +// ── SqliteFaceDao impl ────────────────────────────────────────────────────── + +pub struct SqliteFaceDao { + connection: Arc>, +} + +impl SqliteFaceDao { + pub fn new() -> Self { + Self { + connection: Arc::new(Mutex::new(connect())), + } + } + + /// Test helper — bind to a pre-built (typically in-memory) connection. + #[cfg(test)] + pub fn from_connection(connection: Arc>) -> Self { + Self { connection } + } +} + +impl Default for SqliteFaceDao { + fn default() -> Self { + Self::new() + } +} + +impl FaceDao for SqliteFaceDao { + fn already_scanned( + &mut self, + ctx: &opentelemetry::Context, + content_hash: &str, + ) -> anyhow::Result { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "query", "face_already_scanned", |span| { + span.set_attribute(KeyValue::new("content_hash", content_hash.to_string())); + face_detections::table + .filter(face_detections::content_hash.eq(content_hash)) + .select(face_detections::id) + .first::(conn.deref_mut()) + .optional() + .map(|x| x.is_some()) + .with_context(|| "already_scanned query") + }) + } + + fn store_detection( + &mut self, + ctx: &opentelemetry::Context, + row: InsertFaceDetectionInput, + ) -> anyhow::Result { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "insert", "store_detection", |span| { + span.set_attribute(KeyValue::new("status", row.status.clone())); + span.set_attribute(KeyValue::new("source", row.source.clone())); + let now = Utc::now().timestamp(); + let (bx, by, bw, bh) = match row.bbox { + Some((x, y, w, h)) => (Some(x), Some(y), Some(w), Some(h)), + None => (None, None, None, None), + }; + let insert = InsertFaceDetection { + library_id: row.library_id, + content_hash: row.content_hash, + rel_path: row.rel_path, + bbox_x: bx, + bbox_y: by, + bbox_w: bw, + bbox_h: bh, + embedding: row.embedding, + confidence: row.confidence, + source: row.source, + person_id: row.person_id, + status: row.status, + model_version: row.model_version, + created_at: now, + }; + diesel::insert_into(face_detections::table) + .values(&insert) + .execute(conn.deref_mut()) + .with_context(|| "insert face_detection")?; + define_sql_function! { fn last_insert_rowid() -> diesel::sql_types::Integer; } + let id = diesel::select(last_insert_rowid()) + .get_result::(conn.deref_mut()) + .with_context(|| "last_insert_rowid")?; + face_detections::table + .find(id) + .first::(conn.deref_mut()) + .with_context(|| "fetch inserted face") + }) + } + + fn mark_status( + &mut self, + ctx: &opentelemetry::Context, + library_id: i32, + content_hash: &str, + rel_path: &str, + status: &str, + model_version: &str, + ) -> anyhow::Result<()> { + // Marker rows have NULL bbox + NULL embedding (CHECK enforces + // this). We let the UNIQUE partial index on (content_hash) WHERE + // status='no_faces' guard against double-marking; for 'failed' we + // do a manual exists-check. + let exists = self.already_scanned(ctx, content_hash)?; + if exists { + // Don't write a second marker if any row already exists for + // this hash — that includes detected rows from a prior run + // that succeeded; the file watcher's already_scanned() check + // should have caught this, but stay idempotent. + return Ok(()); + } + self.store_detection( + ctx, + InsertFaceDetectionInput { + library_id, + content_hash: content_hash.to_string(), + rel_path: rel_path.to_string(), + bbox: None, + embedding: None, + confidence: None, + source: "auto".to_string(), + person_id: None, + status: status.to_string(), + model_version: model_version.to_string(), + }, + )?; + Ok(()) + } + + fn list_for_content_hash( + &mut self, + ctx: &opentelemetry::Context, + content_hash: &str, + ) -> anyhow::Result> { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "query", "faces_for_hash", |span| { + span.set_attribute(KeyValue::new("content_hash", content_hash.to_string())); + face_detections::table + .left_join(persons::table.on(persons::id.nullable().eq(face_detections::person_id))) + .filter(face_detections::content_hash.eq(content_hash)) + .filter(face_detections::status.eq("detected")) + .select(( + face_detections::id, + face_detections::bbox_x, + face_detections::bbox_y, + face_detections::bbox_w, + face_detections::bbox_h, + face_detections::confidence, + face_detections::source, + face_detections::person_id, + persons::name.nullable(), + face_detections::model_version, + )) + .load::<( + i32, + Option, + Option, + Option, + Option, + Option, + String, + Option, + Option, + String, + )>(conn.deref_mut()) + .with_context(|| "list faces for hash") + .map(|rows| { + rows.into_iter() + .map(|r| FaceWithPerson { + id: r.0, + bbox_x: r.1.unwrap_or(0.0), + bbox_y: r.2.unwrap_or(0.0), + bbox_w: r.3.unwrap_or(0.0), + bbox_h: r.4.unwrap_or(0.0), + confidence: r.5.unwrap_or(0.0), + source: r.6, + person_id: r.7, + person_name: r.8, + model_version: r.9, + }) + .collect() + }) + }) + } + + fn list_for_person( + &mut self, + ctx: &opentelemetry::Context, + person_id: i32, + library_id: Option, + ) -> anyhow::Result> { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "query", "faces_for_person", |span| { + span.set_attribute(KeyValue::new("person_id", person_id as i64)); + let mut query = face_detections::table + .filter(face_detections::person_id.eq(person_id)) + .filter(face_detections::status.eq("detected")) + .into_boxed(); + if let Some(lib) = library_id { + query = query.filter(face_detections::library_id.eq(lib)); + } + query + .select(( + face_detections::id, + face_detections::library_id, + face_detections::rel_path, + face_detections::bbox_x, + face_detections::bbox_y, + face_detections::bbox_w, + face_detections::bbox_h, + face_detections::confidence, + face_detections::person_id, + face_detections::model_version, + )) + .load::<( + i32, + i32, + String, + Option, + Option, + Option, + Option, + Option, + Option, + String, + )>(conn.deref_mut()) + .with_context(|| "list faces for person") + .map(|rows| { + rows.into_iter() + .map(|r| FaceWithPath { + id: r.0, + library_id: r.1, + rel_path: r.2, + bbox_x: r.3.unwrap_or(0.0), + bbox_y: r.4.unwrap_or(0.0), + bbox_w: r.5.unwrap_or(0.0), + bbox_h: r.6.unwrap_or(0.0), + confidence: r.7.unwrap_or(0.0), + person_id: r.8, + model_version: r.9, + }) + .collect() + }) + }) + } + + fn list_embeddings( + &mut self, + ctx: &opentelemetry::Context, + library_id: Option, + unassigned: bool, + limit: i64, + offset: i64, + ) -> anyhow::Result> { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "query", "list_embeddings", |span| { + span.set_attribute(KeyValue::new("limit", limit)); + span.set_attribute(KeyValue::new("offset", offset)); + let mut query = face_detections::table + .filter(face_detections::status.eq("detected")) + .into_boxed(); + if let Some(lib) = library_id { + query = query.filter(face_detections::library_id.eq(lib)); + } + if unassigned { + query = query.filter(face_detections::person_id.is_null()); + } + let rows = query + .order(face_detections::id.asc()) + .limit(limit) + .offset(offset) + .load::(conn.deref_mut()) + .with_context(|| "list embeddings")?; + // Pair with the base64-encoded embedding string so the handler + // doesn't need to know the wire format. Skip rows with NULL + // embedding (shouldn't happen on detected rows, but defensive). + use base64::Engine; + Ok(rows + .into_iter() + .filter_map(|r| { + r.embedding.as_ref().map(|bytes| { + let b64 = base64::engine::general_purpose::STANDARD.encode(bytes); + (r.clone(), b64) + }) + }) + .collect()) + }) + } + + fn get_face( + &mut self, + ctx: &opentelemetry::Context, + id: i32, + ) -> anyhow::Result> { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "query", "get_face", |span| { + span.set_attribute(KeyValue::new("id", id as i64)); + face_detections::table + .find(id) + .first::(conn.deref_mut()) + .optional() + .with_context(|| "get_face") + }) + } + + fn update_face( + &mut self, + ctx: &opentelemetry::Context, + id: i32, + person_id: Option>, + bbox: Option<(f32, f32, f32, f32)>, + embedding: Option>, + ) -> anyhow::Result { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "update", "update_face", |span| { + span.set_attribute(KeyValue::new("id", id as i64)); + // Apply patches one at a time so each set() has the right type. + // Diesel's update DSL is type-driven and combining heterogeneous + // optional sets in one statement is awkward. + if let Some(pid) = person_id { + diesel::update(face_detections::table.find(id)) + .set(face_detections::person_id.eq(pid)) + .execute(conn.deref_mut()) + .with_context(|| "update person_id")?; + } + if let Some((x, y, w, h)) = bbox { + diesel::update(face_detections::table.find(id)) + .set(( + face_detections::bbox_x.eq(x), + face_detections::bbox_y.eq(y), + face_detections::bbox_w.eq(w), + face_detections::bbox_h.eq(h), + )) + .execute(conn.deref_mut()) + .with_context(|| "update bbox")?; + } + if let Some(emb) = embedding { + diesel::update(face_detections::table.find(id)) + .set(face_detections::embedding.eq(emb)) + .execute(conn.deref_mut()) + .with_context(|| "update embedding")?; + } + face_detections::table + .find(id) + .first::(conn.deref_mut()) + .with_context(|| "fetch updated face") + }) + } + + fn delete_face(&mut self, ctx: &opentelemetry::Context, id: i32) -> anyhow::Result { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "delete", "delete_face", |span| { + span.set_attribute(KeyValue::new("id", id as i64)); + let n = diesel::delete(face_detections::table.find(id)) + .execute(conn.deref_mut()) + .with_context(|| "delete face")?; + Ok(n > 0) + }) + } + + fn delete_auto_for_hash( + &mut self, + ctx: &opentelemetry::Context, + content_hash: &str, + ) -> anyhow::Result { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "delete", "delete_auto_for_hash", |span| { + span.set_attribute(KeyValue::new("content_hash", content_hash.to_string())); + diesel::delete( + face_detections::table + .filter(face_detections::content_hash.eq(content_hash)) + .filter(face_detections::source.eq("auto")), + ) + .execute(conn.deref_mut()) + .with_context(|| "delete auto rows") + }) + } + + fn stats( + &mut self, + ctx: &opentelemetry::Context, + library_id: Option, + ) -> anyhow::Result { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "query", "face_stats", |span| { + if let Some(lib) = library_id { + span.set_attribute(KeyValue::new("library_id", lib as i64)); + } + // Count distinct content_hashes per status by status — one + // hash can have many rows (multiple detected faces) but we + // want it counted once. + let scanned: i64 = { + let mut q = face_detections::table.into_boxed(); + if let Some(lib) = library_id { + q = q.filter(face_detections::library_id.eq(lib)); + } + q.select(diesel::dsl::count_distinct(face_detections::content_hash)) + .first(conn.deref_mut()) + .with_context(|| "stats: scanned")? + }; + let with_faces: i64 = { + let mut q = face_detections::table + .filter(face_detections::status.eq("detected")) + .into_boxed(); + if let Some(lib) = library_id { + q = q.filter(face_detections::library_id.eq(lib)); + } + q.select(diesel::dsl::count_distinct(face_detections::content_hash)) + .first(conn.deref_mut()) + .with_context(|| "stats: with_faces")? + }; + let no_faces: i64 = { + let mut q = face_detections::table + .filter(face_detections::status.eq("no_faces")) + .into_boxed(); + if let Some(lib) = library_id { + q = q.filter(face_detections::library_id.eq(lib)); + } + q.select(diesel::dsl::count_distinct(face_detections::content_hash)) + .first(conn.deref_mut()) + .with_context(|| "stats: no_faces")? + }; + let failed: i64 = { + let mut q = face_detections::table + .filter(face_detections::status.eq("failed")) + .into_boxed(); + if let Some(lib) = library_id { + q = q.filter(face_detections::library_id.eq(lib)); + } + q.select(diesel::dsl::count_distinct(face_detections::content_hash)) + .first(conn.deref_mut()) + .with_context(|| "stats: failed")? + }; + let total_photos: i64 = { + let mut q = image_exif::table.into_boxed(); + if let Some(lib) = library_id { + q = q.filter(image_exif::library_id.eq(lib)); + } + q.select(diesel::dsl::count_star()) + .first(conn.deref_mut()) + .with_context(|| "stats: total_photos")? + }; + let persons_count: i64 = persons::table + .select(diesel::dsl::count_star()) + .first(conn.deref_mut()) + .with_context(|| "stats: persons")?; + let unassigned_faces: i64 = { + let mut q = face_detections::table + .filter(face_detections::status.eq("detected")) + .filter(face_detections::person_id.is_null()) + .into_boxed(); + if let Some(lib) = library_id { + q = q.filter(face_detections::library_id.eq(lib)); + } + q.select(diesel::dsl::count_star()) + .first(conn.deref_mut()) + .with_context(|| "stats: unassigned")? + }; + + Ok(FaceStats { + library_id, + total_photos, + scanned, + with_faces, + no_faces, + failed, + persons_count, + unassigned_faces, + }) + }) + } + + fn create_person( + &mut self, + ctx: &opentelemetry::Context, + req: &CreatePersonReq, + from_tag: bool, + ) -> anyhow::Result { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "insert", "create_person", |span| { + span.set_attribute(KeyValue::new("name", req.name.clone())); + let now = Utc::now().timestamp(); + let insert = InsertPerson { + name: req.name.clone(), + notes: req.notes.clone(), + created_from_tag: from_tag, + created_at: now, + updated_at: now, + }; + diesel::insert_into(persons::table) + .values(&insert) + .execute(conn.deref_mut()) + .with_context(|| format!("insert person {}", req.name))?; + define_sql_function! { fn last_insert_rowid() -> diesel::sql_types::Integer; } + let id = diesel::select(last_insert_rowid()) + .get_result::(conn.deref_mut()) + .with_context(|| "last_insert_rowid persons")?; + // Optional entity bridge — do this as a follow-up update so + // schema's UNIQUE(name COLLATE NOCASE) can fire on insert + // before we touch entity_id. + if let Some(entity_id) = req.entity_id { + diesel::update(persons::table.find(id)) + .set(persons::entity_id.eq(entity_id)) + .execute(conn.deref_mut()) + .with_context(|| "set entity_id on new person")?; + } + persons::table + .find(id) + .first::(conn.deref_mut()) + .with_context(|| "fetch new person") + }) + } + + fn get_person( + &mut self, + ctx: &opentelemetry::Context, + id: i32, + ) -> anyhow::Result> { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "query", "get_person", |span| { + span.set_attribute(KeyValue::new("id", id as i64)); + persons::table + .find(id) + .first::(conn.deref_mut()) + .optional() + .with_context(|| "get_person") + }) + } + + fn list_persons( + &mut self, + ctx: &opentelemetry::Context, + library_id: Option, + ) -> anyhow::Result> { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "query", "list_persons", |_| { + // Two-step: load all persons, then a single grouped count + // query for face counts. Using a LEFT JOIN + GROUP BY in + // Diesel here gets noisy with the optional library filter; a + // second roundtrip is cheap and clearer. + let person_rows: Vec = persons::table + .order(persons::name.asc()) + .load::(conn.deref_mut()) + .with_context(|| "load persons")?; + + // Diesel's BoxedSelectStatement + group_by trips the trait + // resolver into recursion, so this aggregation goes through + // sql_query. The shape is small and the bind list is at most + // one parameter — readability isn't really worse than the DSL. + let counts: Vec<(i32, i64)> = { + use diesel::sql_types::*; + #[derive(QueryableByName)] + struct PersonCountRow { + #[diesel(sql_type = Integer)] + person_id: i32, + #[diesel(sql_type = BigInt)] + count: i64, + } + let sql = if library_id.is_some() { + "SELECT person_id, COUNT(*) AS count FROM face_detections \ + WHERE status='detected' AND person_id IS NOT NULL AND library_id = ? \ + GROUP BY person_id" + } else { + "SELECT person_id, COUNT(*) AS count FROM face_detections \ + WHERE status='detected' AND person_id IS NOT NULL \ + GROUP BY person_id" + }; + let mut q = diesel::sql_query(sql).into_boxed(); + if let Some(lib) = library_id { + q = q.bind::(lib); + } + q.load::(conn.deref_mut()) + .with_context(|| "person face counts")? + .into_iter() + .map(|r| (r.person_id, r.count)) + .collect() + }; + use std::collections::HashMap; + let count_map: HashMap = counts.into_iter().collect(); + + Ok(person_rows + .into_iter() + .map(|p| { + let face_count = count_map.get(&p.id).copied().unwrap_or(0); + PersonSummary { + id: p.id, + name: p.name, + cover_face_id: p.cover_face_id, + entity_id: p.entity_id, + created_from_tag: p.created_from_tag, + notes: p.notes, + face_count, + } + }) + .collect()) + }) + } + + fn update_person( + &mut self, + ctx: &opentelemetry::Context, + id: i32, + patch: &UpdatePersonReq, + ) -> anyhow::Result { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "update", "update_person", |span| { + span.set_attribute(KeyValue::new("id", id as i64)); + let now = Utc::now().timestamp(); + // Apply each patched column individually for the same + // reason as update_face — heterogeneous optional sets are + // painful in Diesel's type-driven update DSL. + if let Some(name) = &patch.name { + diesel::update(persons::table.find(id)) + .set((persons::name.eq(name), persons::updated_at.eq(now))) + .execute(conn.deref_mut()) + .with_context(|| "update person name")?; + } + if let Some(notes) = &patch.notes { + diesel::update(persons::table.find(id)) + .set((persons::notes.eq(notes), persons::updated_at.eq(now))) + .execute(conn.deref_mut()) + .with_context(|| "update person notes")?; + } + if let Some(cover) = patch.cover_face_id { + diesel::update(persons::table.find(id)) + .set(( + persons::cover_face_id.eq(cover), + persons::updated_at.eq(now), + )) + .execute(conn.deref_mut()) + .with_context(|| "update person cover")?; + } + if let Some(eid) = patch.entity_id { + diesel::update(persons::table.find(id)) + .set((persons::entity_id.eq(eid), persons::updated_at.eq(now))) + .execute(conn.deref_mut()) + .with_context(|| "update person entity_id")?; + } + persons::table + .find(id) + .first::(conn.deref_mut()) + .with_context(|| "fetch updated person") + }) + } + + fn delete_person( + &mut self, + ctx: &opentelemetry::Context, + id: i32, + cascade_delete_faces: bool, + ) -> anyhow::Result { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "delete", "delete_person", |span| { + span.set_attribute(KeyValue::new("id", id as i64)); + span.set_attribute(KeyValue::new("cascade", cascade_delete_faces)); + if cascade_delete_faces { + diesel::delete(face_detections::table.filter(face_detections::person_id.eq(id))) + .execute(conn.deref_mut()) + .with_context(|| "cascade delete faces for person")?; + } + // Always clear cover_face_id pointers that referenced this + // person's faces (otherwise the FK from persons.cover_face_id + // could hang). cover_face_id has no FK constraint in SQLite + // so this is documentation-only — the explicit nuke is on + // the face rows above. + let n = diesel::delete(persons::table.find(id)) + .execute(conn.deref_mut()) + .with_context(|| "delete person")?; + Ok(n > 0) + }) + } + + fn merge_persons( + &mut self, + ctx: &opentelemetry::Context, + src: i32, + into: i32, + ) -> anyhow::Result { + if src == into { + anyhow::bail!("cannot merge a person into itself"); + } + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "update", "merge_persons", |span| { + span.set_attribute(KeyValue::new("src", src as i64)); + span.set_attribute(KeyValue::new("into", into as i64)); + // Wrap in a transaction so a half-merged state can't survive + // a SQLite write error mid-operation. + conn.deref_mut().transaction::<_, anyhow::Error, _>(|tx| { + // Re-point face_detections. + diesel::update(face_detections::table.filter(face_detections::person_id.eq(src))) + .set(face_detections::person_id.eq(into)) + .execute(tx) + .with_context(|| "repoint faces on merge")?; + // Copy notes from src into target if the target is empty. + let src_person: Person = persons::table + .find(src) + .first(tx) + .with_context(|| "load src person for merge")?; + let into_person: Person = persons::table + .find(into) + .first(tx) + .with_context(|| "load target person for merge")?; + if into_person.notes.as_deref().unwrap_or("").is_empty() + && src_person + .notes + .as_deref() + .map(|s| !s.is_empty()) + .unwrap_or(false) + { + diesel::update(persons::table.find(into)) + .set(persons::notes.eq(src_person.notes)) + .execute(tx) + .with_context(|| "copy notes on merge")?; + } + diesel::delete(persons::table.find(src)) + .execute(tx) + .with_context(|| "delete src person on merge")?; + persons::table + .find(into) + .first::(tx) + .with_context(|| "fetch merged person") + }) + }) + } + + fn resolve_content_hash( + &mut self, + ctx: &opentelemetry::Context, + library_id: i32, + rel_path: &str, + ) -> anyhow::Result> { + let mut conn = self.connection.lock().expect("face dao lock"); + trace_db_call(ctx, "query", "resolve_content_hash", |_| { + image_exif::table + .filter(image_exif::library_id.eq(library_id)) + .filter(image_exif::rel_path.eq(rel_path)) + .select(image_exif::content_hash) + .first::>(conn.deref_mut()) + .optional() + .map(|outer| outer.and_then(|inner| inner)) + .with_context(|| "resolve content_hash") + }) + } +} + +// ── Handlers ──────────────────────────────────────────────────────────────── + +pub fn add_face_services(app: App) -> App +where + T: ServiceFactory, +{ + app.service(web::resource("/faces/stats").route(web::get().to(stats_handler::))) + .service(web::resource("/faces/embeddings").route(web::get().to(embeddings_handler::))) + .service( + web::resource("/image/faces") + .route(web::get().to(list_faces_handler::)) + .route(web::post().to(create_face_handler::)), + ) + .service( + web::resource("/image/faces/{id}") + .route(web::patch().to(update_face_handler::)) + .route(web::delete().to(delete_face_handler::)), + ) + .service( + web::resource("/persons") + .route(web::get().to(list_persons_handler::)) + .route(web::post().to(create_person_handler::)), + ) + .service( + web::resource("/persons/{id}") + .route(web::get().to(get_person_handler::)) + .route(web::patch().to(update_person_handler::)) + .route(web::delete().to(delete_person_handler::)), + ) + .service( + web::resource("/persons/{id}/merge").route(web::post().to(merge_persons_handler::)), + ) + .service( + web::resource("/persons/{id}/faces").route(web::get().to(person_faces_handler::)), + ) +} + +// ── Stats / list ──────────────────────────────────────────────────────────── + +#[derive(Deserialize)] +pub struct LibraryQuery { + pub library: Option, +} + +async fn stats_handler( + _: Claims, + request: HttpRequest, + app_state: web::Data, + query: web::Query, + face_dao: web::Data>, +) -> impl Responder { + let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("faces.stats", &context); + let span_context = opentelemetry::Context::current_with_span(span); + + let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref()) + .ok() + .flatten() + .map(|l| l.id); + let mut dao = face_dao.lock().expect("face dao lock"); + dao.stats(&span_context, library_id) + .map(|s| { + span_context.span().set_status(Status::Ok); + HttpResponse::Ok().json(s) + }) + .into_http_internal_err() +} + +async fn list_faces_handler( + _: Claims, + request: HttpRequest, + query: web::Query, + app_state: web::Data, + face_dao: web::Data>, +) -> impl Responder { + let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("faces.list", &context); + let span_context = opentelemetry::Context::current_with_span(span); + + let normalized_path = normalize_path(&query.path); + // resolve_library_param returns Option<&Library>; clone so the result + // is owned (matching the primary_library fallback's type). + let library: Library = libraries::resolve_library_param(&app_state, query.library.as_deref()) + .ok() + .flatten() + .cloned() + .unwrap_or_else(|| app_state.primary_library().clone()); + + let mut dao = face_dao.lock().expect("face dao lock"); + let hash = match dao.resolve_content_hash(&span_context, library.id, &normalized_path) { + Ok(Some(h)) => h, + Ok(None) => { + // Photo not yet hashed — empty face list is a graceful answer. + // The carousel falls back to "no overlay" which is fine until + // the watcher catches up. + return HttpResponse::Ok().json(Vec::::new()); + } + Err(e) => return HttpResponse::InternalServerError().body(e.to_string()), + }; + match dao.list_for_content_hash(&span_context, &hash) { + Ok(faces) => HttpResponse::Ok().json(faces), + Err(e) => HttpResponse::InternalServerError().body(e.to_string()), + } +} + +async fn embeddings_handler( + _: Claims, + request: HttpRequest, + query: web::Query, + face_dao: web::Data>, +) -> impl Responder { + let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("faces.embeddings", &context); + let span_context = opentelemetry::Context::current_with_span(span); + + let limit = query.limit.clamp(1, 5_000); + let offset = query.offset.max(0); + let mut dao = face_dao.lock().expect("face dao lock"); + dao.list_embeddings( + &span_context, + query.library, + query.unassigned, + limit, + offset, + ) + .map(|rows| { + let out: Vec = rows + .into_iter() + .map(|(r, b64)| FaceEmbeddingRow { + id: r.id, + library_id: r.library_id, + rel_path: r.rel_path, + content_hash: r.content_hash, + person_id: r.person_id, + model_version: r.model_version, + embedding: b64, + }) + .collect(); + HttpResponse::Ok().json(out) + }) + .into_http_internal_err() +} + +// ── Manual face create / update / delete ──────────────────────────────────── + +async fn create_face_handler( + _: Claims, + request: HttpRequest, + body: web::Json, + app_state: web::Data, + face_client: web::Data, + face_dao: web::Data>, +) -> impl Responder { + let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("faces.create_manual", &context); + let span_context = opentelemetry::Context::current_with_span(span); + + if !face_client.is_enabled() { + return HttpResponse::ServiceUnavailable().body("face client disabled"); + } + + let normalized_path = normalize_path(&body.path); + let library: Library = match libraries::resolve_library_param( + &app_state, + body.library.as_ref().map(|i| i.to_string()).as_deref(), + ) { + Ok(Some(lib)) => lib.clone(), + _ => app_state.primary_library().clone(), + }; + + // 1. Resolve content_hash for the photo. + let hash = { + let mut dao = face_dao.lock().expect("face dao lock"); + match dao.resolve_content_hash(&span_context, library.id, &normalized_path) { + Ok(Some(h)) => h, + Ok(None) => { + return HttpResponse::Conflict() + .body("photo not yet hashed; wait for next watcher pass"); + } + Err(e) => return HttpResponse::InternalServerError().body(e.to_string()), + } + }; + + // 2. Read full image, crop to bbox, encode as JPEG for transport. + let abs_path = library.resolve(&normalized_path); + let crop_bytes = match crop_image_to_bbox( + &abs_path, + body.bbox.x, + body.bbox.y, + body.bbox.w, + body.bbox.h, + ) { + Ok(b) => b, + Err(e) => { + warn!("crop_image_to_bbox failed for {:?}: {:?}", abs_path, e); + return HttpResponse::BadRequest().body(format!("cannot crop photo: {}", e)); + } + }; + + // 3. Send the crop to Apollo for embedding extraction. + let meta = DetectMeta { + content_hash: hash.clone(), + library_id: library.id, + rel_path: normalized_path.clone(), + orientation: None, + model_version: None, + }; + let detect = match face_client.embed(crop_bytes, meta).await { + Ok(r) => r, + Err(FaceDetectError::Permanent(e)) => { + return HttpResponse::UnprocessableEntity().body(format!("{}", e)); + } + Err(FaceDetectError::Transient(e)) => { + return HttpResponse::ServiceUnavailable().body(format!("{}", e)); + } + Err(FaceDetectError::Disabled) => { + return HttpResponse::ServiceUnavailable().body("face client disabled"); + } + }; + + let detected = match detect.faces.first() { + Some(f) => f.clone(), + None => { + // Apollo would have returned 422 on no_face_in_crop; defensive. + return HttpResponse::UnprocessableEntity().body("no face in crop"); + } + }; + let embedding_bytes = match detected.decode_embedding() { + Ok(b) => b, + Err(e) => { + warn!("manual face: decode embedding failed: {:?}", e); + return HttpResponse::BadGateway().body("invalid embedding from face service"); + } + }; + + // 4. Insert the manual row using the bbox the user drew (NOT the + // detector's tighter box around their drawing — they get what they + // asked for; cluster matching uses the embedding which is from the + // detector's true box anyway). + let mut dao = face_dao.lock().expect("face dao lock"); + let row = match dao.store_detection( + &span_context, + InsertFaceDetectionInput { + library_id: library.id, + content_hash: hash, + rel_path: normalized_path, + bbox: Some((body.bbox.x, body.bbox.y, body.bbox.w, body.bbox.h)), + embedding: Some(embedding_bytes), + confidence: Some(detected.confidence), + source: "manual".to_string(), + person_id: body.person_id, + status: "detected".to_string(), + model_version: detect.model_version, + }, + ) { + Ok(r) => r, + Err(e) => return HttpResponse::InternalServerError().body(e.to_string()), + }; + info!( + "Created manual face id={} library={} hash={} person_id={:?}", + row.id, row.library_id, row.content_hash, row.person_id + ); + HttpResponse::Created().json(row) +} + +async fn update_face_handler( + _: Claims, + request: HttpRequest, + path: web::Path, + body: web::Json, + face_dao: web::Data>, +) -> impl Responder { + let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("faces.update", &context); + let span_context = opentelemetry::Context::current_with_span(span); + let id = path.into_inner(); + + let person_patch: Option> = if body.clear_person { + Some(None) + } else { + body.person_id.map(Some) + }; + let bbox_patch = body.bbox.as_ref().map(|b| (b.x, b.y, b.w, b.h)); + + // bbox change → embedding becomes stale. Phase 2 only stores the new + // bbox; re-embed is a Phase 3 concern (it requires reading the image + // off disk and going back through face_client.embed). For now log a + // warning so we can spot orphan-embedding rows. + if bbox_patch.is_some() { + warn!( + "PATCH /image/faces/{}: bbox updated; embedding now stale (Phase 3 will re-embed)", + id + ); + } + + let mut dao = face_dao.lock().expect("face dao lock"); + dao.update_face(&span_context, id, person_patch, bbox_patch, None) + .map(|row| HttpResponse::Ok().json(row)) + .into_http_internal_err() +} + +async fn delete_face_handler( + _: Claims, + request: HttpRequest, + path: web::Path, + face_dao: web::Data>, +) -> impl Responder { + let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("faces.delete", &context); + let span_context = opentelemetry::Context::current_with_span(span); + + let mut dao = face_dao.lock().expect("face dao lock"); + match dao.delete_face(&span_context, path.into_inner()) { + Ok(true) => HttpResponse::NoContent().finish(), + Ok(false) => HttpResponse::NotFound().finish(), + Err(e) => HttpResponse::InternalServerError().body(e.to_string()), + } +} + +// ── Persons ───────────────────────────────────────────────────────────────── + +async fn list_persons_handler( + _: Claims, + request: HttpRequest, + app_state: web::Data, + query: web::Query, + face_dao: web::Data>, +) -> impl Responder { + let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("persons.list", &context); + let span_context = opentelemetry::Context::current_with_span(span); + + let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref()) + .ok() + .flatten() + .map(|l| l.id); + let mut dao = face_dao.lock().expect("face dao lock"); + dao.list_persons(&span_context, library_id) + .map(|p| HttpResponse::Ok().json(p)) + .into_http_internal_err() +} + +async fn create_person_handler( + _: Claims, + request: HttpRequest, + body: web::Json, + face_dao: web::Data>, +) -> impl Responder { + let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("persons.create", &context); + let span_context = opentelemetry::Context::current_with_span(span); + if body.name.trim().is_empty() { + return HttpResponse::BadRequest().body("name required"); + } + + let mut dao = face_dao.lock().expect("face dao lock"); + match dao.create_person(&span_context, &body, /*from_tag*/ false) { + Ok(p) => HttpResponse::Created().json(p), + Err(e) => { + // SQLite UNIQUE(name COLLATE NOCASE) → 409 Conflict so the UI + // can show "name already exists" without parsing. + let msg = format!("{}", e); + if msg.to_lowercase().contains("unique") { + HttpResponse::Conflict().body("person name already exists") + } else { + HttpResponse::InternalServerError().body(msg) + } + } + } +} + +async fn get_person_handler( + _: Claims, + request: HttpRequest, + path: web::Path, + face_dao: web::Data>, +) -> impl Responder { + let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("persons.get", &context); + let span_context = opentelemetry::Context::current_with_span(span); + + let mut dao = face_dao.lock().expect("face dao lock"); + match dao.get_person(&span_context, path.into_inner()) { + Ok(Some(p)) => HttpResponse::Ok().json(p), + Ok(None) => HttpResponse::NotFound().finish(), + Err(e) => HttpResponse::InternalServerError().body(e.to_string()), + } +} + +async fn update_person_handler( + _: Claims, + request: HttpRequest, + path: web::Path, + body: web::Json, + face_dao: web::Data>, +) -> impl Responder { + let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("persons.update", &context); + let span_context = opentelemetry::Context::current_with_span(span); + let mut dao = face_dao.lock().expect("face dao lock"); + match dao.update_person(&span_context, path.into_inner(), &body) { + Ok(p) => HttpResponse::Ok().json(p), + Err(e) => { + let msg = format!("{}", e); + if msg.to_lowercase().contains("unique") { + HttpResponse::Conflict().body("person name already exists") + } else { + HttpResponse::InternalServerError().body(msg) + } + } + } +} + +async fn delete_person_handler( + _: Claims, + request: HttpRequest, + path: web::Path, + query: web::Query, + face_dao: web::Data>, +) -> impl Responder { + let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("persons.delete", &context); + let span_context = opentelemetry::Context::current_with_span(span); + // Default cascade=set_null — don't destroy face history just because + // the user renamed/removed the identity. + let cascade = matches!(query.cascade.as_deref(), Some("delete")); + let mut dao = face_dao.lock().expect("face dao lock"); + match dao.delete_person(&span_context, path.into_inner(), cascade) { + Ok(true) => HttpResponse::NoContent().finish(), + Ok(false) => HttpResponse::NotFound().finish(), + Err(e) => HttpResponse::InternalServerError().body(e.to_string()), + } +} + +async fn merge_persons_handler( + _: Claims, + request: HttpRequest, + path: web::Path, + body: web::Json, + face_dao: web::Data>, +) -> impl Responder { + let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("persons.merge", &context); + let span_context = opentelemetry::Context::current_with_span(span); + let src = path.into_inner(); + let mut dao = face_dao.lock().expect("face dao lock"); + match dao.merge_persons(&span_context, src, body.into) { + Ok(p) => HttpResponse::Ok().json(p), + Err(e) => { + let msg = format!("{}", e); + if msg.contains("itself") { + HttpResponse::BadRequest().body(msg) + } else { + HttpResponse::InternalServerError().body(msg) + } + } + } +} + +async fn person_faces_handler( + _: Claims, + request: HttpRequest, + path: web::Path, + app_state: web::Data, + query: web::Query, + face_dao: web::Data>, +) -> impl Responder { + let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("persons.faces", &context); + let span_context = opentelemetry::Context::current_with_span(span); + let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref()) + .ok() + .flatten() + .map(|l| l.id); + let mut dao = face_dao.lock().expect("face dao lock"); + dao.list_for_person(&span_context, path.into_inner(), library_id) + .map(|faces| HttpResponse::Ok().json(faces)) + .into_http_internal_err() +} + +// ── Helpers ───────────────────────────────────────────────────────────────── + +/// Crop `abs_path` to the normalized bbox and re-encode as JPEG for the +/// face service. `image::open` decodes most photo formats Apollo will see; +/// HEIC/RAW are out of scope for the manual flow (the user can't draw a +/// face on a thumbnail of a non-decodable file anyway). +fn crop_image_to_bbox( + abs_path: &std::path::Path, + nx: f32, + ny: f32, + nw: f32, + nh: f32, +) -> anyhow::Result> { + if !(0.0..=1.0).contains(&nx) || !(0.0..=1.0).contains(&ny) { + return Err(anyhow!("bbox xy out of [0,1]")); + } + if nw <= 0.0 || nh <= 0.0 || nx + nw > 1.001 || ny + nh > 1.001 { + return Err(anyhow!("bbox wh out of bounds or zero")); + } + let img = image::open(abs_path).with_context(|| format!("open {:?}", abs_path))?; + let (w, h) = img.dimensions(); + let px = (nx * w as f32).round().clamp(0.0, w as f32 - 1.0) as u32; + let py = (ny * h as f32).round().clamp(0.0, h as f32 - 1.0) as u32; + let pw = ((nw * w as f32).round() as u32).min(w.saturating_sub(px)); + let ph = ((nh * h as f32).round() as u32).min(h.saturating_sub(py)); + if pw == 0 || ph == 0 { + return Err(anyhow!("crop produced zero-dim image")); + } + // Pad the crop a bit so the detector has context — a tightly-drawn + // bbox often clips ears/jaw which hurts the embedding. 10% on each + // side is a reasonable default. + let pad_x = (pw / 10).max(1); + let pad_y = (ph / 10).max(1); + let cx = px.saturating_sub(pad_x); + let cy = py.saturating_sub(pad_y); + let cw = (pw + 2 * pad_x).min(w - cx); + let ch = (ph + 2 * pad_y).min(h - cy); + let cropped = img.crop_imm(cx, cy, cw, ch); + let mut out = std::io::Cursor::new(Vec::new()); + cropped + .write_to(&mut out, image::ImageFormat::Jpeg) + .with_context(|| "encode crop as JPEG")?; + Ok(out.into_inner()) +} + +// ── Tests ─────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::database::test::in_memory_db_connection; + + fn fresh_dao() -> SqliteFaceDao { + SqliteFaceDao::from_connection(Arc::new(Mutex::new(in_memory_db_connection()))) + } + + fn ctx() -> opentelemetry::Context { + opentelemetry::Context::current() + } + + #[test] + fn person_crud_roundtrip() { + let mut dao = fresh_dao(); + let p = dao + .create_person( + &ctx(), + &CreatePersonReq { + name: "Alice".into(), + notes: Some("the boss".into()), + entity_id: None, + }, + false, + ) + .expect("create person"); + assert_eq!(p.name, "Alice"); + assert_eq!(p.notes.as_deref(), Some("the boss")); + assert!(!p.created_from_tag); + + // Case-insensitive uniqueness — second create with same name in + // different case must fail with a UNIQUE violation, surfacing + // as 409 Conflict at the handler layer. + let dup = dao.create_person( + &ctx(), + &CreatePersonReq { + name: "alice".into(), + notes: None, + entity_id: None, + }, + false, + ); + assert!(dup.is_err(), "case-insensitive UNIQUE must reject 'alice'"); + + // Update notes; verify updated_at moves forward. + let prev_updated = p.updated_at; + std::thread::sleep(std::time::Duration::from_millis(1100)); // boundary cross + let updated = dao + .update_person( + &ctx(), + p.id, + &UpdatePersonReq { + name: None, + notes: Some("a new note".into()), + cover_face_id: None, + entity_id: None, + }, + ) + .expect("update"); + assert_eq!(updated.notes.as_deref(), Some("a new note")); + assert!(updated.updated_at >= prev_updated); + + // List + delete. + let listed = dao.list_persons(&ctx(), None).expect("list"); + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].face_count, 0); + assert!(dao.delete_person(&ctx(), p.id, false).expect("delete")); + assert!(dao.list_persons(&ctx(), None).expect("list").is_empty()); + } + + #[test] + fn marker_rows_idempotent() { + let mut dao = fresh_dao(); + // Need a libraries row to satisfy face_detections.library_id FK + // without DEFERRED — SQLite enforces FK immediately by default. + // The :memory: DB already has the libraries seed via + // seed_or_patch_from_env? No — in_memory_db_connection just runs + // migrations; the libraries seed is a runtime path. Insert one + // manually for the test. + // Migrations may seed libraries(id=1); INSERT OR IGNORE keeps the + // test runnable either way. + diesel::sql_query( + "INSERT OR IGNORE INTO libraries (id, name, root_path, created_at) \ + VALUES (1, 'main', '/tmp', 0)", + ) + .execute(dao.connection.lock().unwrap().deref_mut()) + .expect("seed libraries"); + + // Marker insert. + dao.mark_status(&ctx(), 1, "abc123", "x.jpg", "no_faces", "buffalo_l") + .expect("first mark"); + assert!( + dao.already_scanned(&ctx(), "abc123").expect("scan"), + "already_scanned should report true after marker" + ); + + // Second mark for the same hash is a no-op (the partial UNIQUE + // index would otherwise reject; the DAO short-circuits before the + // insert). + dao.mark_status(&ctx(), 1, "abc123", "x.jpg", "no_faces", "buffalo_l") + .expect("second mark idempotent"); + + // Stats reflect the no_faces marker. + let stats = dao.stats(&ctx(), Some(1)).expect("stats"); + assert_eq!(stats.no_faces, 1); + assert_eq!(stats.scanned, 1); + assert_eq!(stats.with_faces, 0); + } + + #[test] + fn merge_persons_repoints_faces() { + let mut dao = fresh_dao(); + // Migrations may seed libraries(id=1); INSERT OR IGNORE keeps the + // test runnable either way. + diesel::sql_query( + "INSERT OR IGNORE INTO libraries (id, name, root_path, created_at) \ + VALUES (1, 'main', '/tmp', 0)", + ) + .execute(dao.connection.lock().unwrap().deref_mut()) + .expect("seed libraries"); + + let alice = dao + .create_person( + &ctx(), + &CreatePersonReq { + name: "Alice".into(), + notes: None, + entity_id: None, + }, + false, + ) + .unwrap(); + let alyse = dao + .create_person( + &ctx(), + &CreatePersonReq { + name: "Alyse".into(), + notes: Some("dup of alice".into()), + entity_id: None, + }, + false, + ) + .unwrap(); + + // Insert a detected face row owned by `alyse`. + let _ = dao + .store_detection( + &ctx(), + InsertFaceDetectionInput { + library_id: 1, + content_hash: "h1".into(), + rel_path: "p1.jpg".into(), + bbox: Some((0.1, 0.1, 0.2, 0.2)), + embedding: Some(vec![0u8; 2048]), + confidence: Some(0.9), + source: "auto".into(), + person_id: Some(alyse.id), + status: "detected".into(), + model_version: "buffalo_l".into(), + }, + ) + .unwrap(); + + // Merge alyse → alice. Notes from src copy when target empty. + let merged = dao.merge_persons(&ctx(), alyse.id, alice.id).unwrap(); + assert_eq!(merged.id, alice.id); + assert_eq!(merged.notes.as_deref(), Some("dup of alice")); + + // alyse is gone. + assert!(dao.get_person(&ctx(), alyse.id).unwrap().is_none()); + + // The face is now alice's. + let faces = dao.list_for_person(&ctx(), alice.id, Some(1)).unwrap(); + assert_eq!(faces.len(), 1); + assert_eq!(faces[0].person_id, Some(alice.id)); + } +} diff --git a/src/lib.rs b/src/lib.rs index e6d2cc1..ad1c595 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ pub mod data; pub mod database; pub mod error; pub mod exif; +pub mod faces; pub mod file_types; pub mod files; pub mod geo; diff --git a/src/main.rs b/src/main.rs index ccdb14b..8cee679 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,6 +66,7 @@ mod data; mod database; mod error; mod exif; +mod faces; mod file_types; mod files; mod geo; @@ -1518,6 +1519,7 @@ fn main() -> std::io::Result<()> { let exif_dao = SqliteExifDao::new(); let insight_dao = SqliteInsightDao::new(); let preview_dao = SqlitePreviewDao::new(); + let face_dao = faces::SqliteFaceDao::new(); let cors = Cors::default() .allowed_origin_fn(|origin, _req_head| { // Allow all origins in development, or check against CORS_ALLOWED_ORIGINS env var @@ -1595,6 +1597,7 @@ fn main() -> std::io::Result<()> { .service(libraries::list_libraries) .add_feature(add_tag_services::<_, SqliteTagDao>) .add_feature(knowledge::add_knowledge_services::<_, SqliteKnowledgeDao>) + .add_feature(faces::add_face_services::<_, faces::SqliteFaceDao>) .app_data(app_data.clone()) .app_data::>(Data::new(RealFileSystem::new( app_data.base_path.clone(), @@ -1616,6 +1619,10 @@ fn main() -> std::io::Result<()> { .app_data::>>(Data::new(Mutex::new( SqliteKnowledgeDao::new(), ))) + .app_data::>>(Data::new(Mutex::new(face_dao))) + .app_data::>(Data::new( + app_data.face_client.clone(), + )) .app_data(mp::form::MultipartFormConfig::default().total_limit(1024 * 1024 * 1024)) // 1GB upload limit .app_data(web::JsonConfig::default().error_handler(|err, req| { let detail = err.to_string(); diff --git a/src/state.rs b/src/state.rs index 5682d43..18eab29 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,4 +1,5 @@ use crate::ai::apollo_client::ApolloClient; +use crate::ai::face_client::FaceClient; use crate::ai::insight_chat::{ChatLockMap, InsightChatService}; use crate::ai::openrouter::OpenRouterClient; use crate::ai::{InsightGenerator, OllamaClient, SmsApiClient}; @@ -48,6 +49,11 @@ pub struct AppState { pub insight_generator: InsightGenerator, /// Chat continuation service. Hold an Arc so handlers can clone cheaply. pub insight_chat: Arc, + /// Face inference client (calls Apollo's `/api/internal/faces/*`). + /// Disabled (`is_enabled() == false`) when neither `APOLLO_FACE_API_BASE_URL` + /// nor `APOLLO_API_BASE_URL` is set; the file-watch hook (Phase 3) and + /// manual-face-create handler short-circuit in that case. + pub face_client: FaceClient, } impl AppState { @@ -82,6 +88,7 @@ impl AppState { insight_generator: InsightGenerator, insight_chat: Arc, preview_dao: Arc>>, + face_client: FaceClient, ) -> Self { assert!( !libraries_vec.is_empty(), @@ -115,6 +122,7 @@ impl AppState { sms_client, insight_generator, insight_chat, + face_client, } } @@ -161,6 +169,15 @@ impl Default for AppState { // generator silently falls through to the legacy Nominatim path. let apollo_client = ApolloClient::new(env::var("APOLLO_API_BASE_URL").ok()); + // Face inference client. Falls back to APOLLO_API_BASE_URL when + // APOLLO_FACE_API_BASE_URL is unset (single-Apollo deploys are the + // common case). Both unset = feature disabled, file-watch hook + // and manual-face handlers short-circuit silently. + let face_client_url = env::var("APOLLO_FACE_API_BASE_URL") + .ok() + .or_else(|| env::var("APOLLO_API_BASE_URL").ok()); + let face_client = FaceClient::new(face_client_url); + // Initialize DAOs let insight_dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteInsightDao::new()))); @@ -244,6 +261,7 @@ impl Default for AppState { insight_generator, insight_chat, preview_dao, + face_client, ) } } @@ -382,6 +400,7 @@ impl AppState { insight_generator, insight_chat, preview_dao, + FaceClient::new(None), // disabled in test ) } }