Face Recognition / People Integration #61

Merged
cameron merged 23 commits from feature/face-recog-phase3-file-watch into master 2026-04-30 17:22:09 +00:00
21 changed files with 5412 additions and 24 deletions

85
.env.example Normal file
View File

@@ -0,0 +1,85 @@
# ImageApi configuration template. Copy to `.env` and fill in for your
# deploy. Comments mirror the canonical docs in CLAUDE.md — see there
# for the full picture (especially the AI-Insights / Apollo / face
# integration sections).
# ── Required ────────────────────────────────────────────────────────────
DATABASE_URL=./database.db
BASE_PATH=/path/to/media
THUMBNAILS=/path/to/thumbnails
VIDEO_PATH=/path/to/video/hls
GIFS_DIRECTORY=/path/to/gifs
PREVIEW_CLIPS_DIRECTORY=/path/to/preview-clips
BIND_URL=0.0.0.0:8080
CORS_ALLOWED_ORIGINS=http://localhost:3000
SECRET_KEY=replace-me-with-a-long-random-secret
RUST_LOG=info
# ── File watching ───────────────────────────────────────────────────────
# Quick scan = recently-modified-files only; full scan = comprehensive walk.
WATCH_QUICK_INTERVAL_SECONDS=60
WATCH_FULL_INTERVAL_SECONDS=3600
# Comma-separated path prefixes / component names to skip in /memories
# AND in face detection (e.g. @eaDir, .thumbnails, /private).
EXCLUDED_DIRS=
# ── Video / HLS ─────────────────────────────────────────────────────────
HLS_CONCURRENCY=2
HLS_TIMEOUT_SECONDS=900
PLAYLIST_CLEANUP_INTERVAL_SECONDS=86400
# ── Telemetry (release builds only) ─────────────────────────────────────
# OTLP_OTLS_ENDPOINT=http://localhost:4317
# ── AI Insights — Ollama (local LLM) ────────────────────────────────────
OLLAMA_PRIMARY_URL=http://localhost:11434
OLLAMA_PRIMARY_MODEL=nemotron-3-nano:30b
# Optional fallback server tried on connection failure.
# OLLAMA_FALLBACK_URL=http://server:11434
# OLLAMA_FALLBACK_MODEL=llama3.2:3b
OLLAMA_REQUEST_TIMEOUT_SECONDS=120
# Cap on tool-calling iterations per chat turn / agentic insight.
AGENTIC_MAX_ITERATIONS=6
AGENTIC_CHAT_MAX_ITERATIONS=6
# ── AI Insights — OpenRouter (hybrid backend, optional) ─────────────────
# Set OPENROUTER_API_KEY to enable the hybrid backend (vision stays
# local on Ollama, chat routes to OpenRouter).
# OPENROUTER_API_KEY=sk-or-...
# OPENROUTER_DEFAULT_MODEL=anthropic/claude-sonnet-4
# OPENROUTER_ALLOWED_MODELS=openai/gpt-4o-mini,anthropic/claude-haiku-4-5,google/gemini-2.5-flash
# OPENROUTER_BASE_URL=https://openrouter.ai/api/v1
# OPENROUTER_EMBEDDING_MODEL=openai/text-embedding-3-small
# OPENROUTER_HTTP_REFERER=https://your-site.example
# OPENROUTER_APP_TITLE=ImageApi
# ── AI Insights — sibling services (optional) ───────────────────────────
# Apollo (places + face inference). Single Apollo deploys typically set
# only APOLLO_API_BASE_URL and let the face client fall back to it.
# APOLLO_API_BASE_URL=http://apollo.lan:8000
# APOLLO_FACE_API_BASE_URL=http://apollo.lan:8000
# SMS_API_URL=http://localhost:8000
# SMS_API_TOKEN=
# Display name used in agentic prompts when the LLM refers to "you".
USER_NAME=
# ── Face detection (Phase 3+) ───────────────────────────────────────────
# Cosine-sim floor for auto-binding a detected face to an existing
# same-named person on detection. 0.4 ≈ moderate-confidence match.
FACE_AUTOBIND_MIN_COS=0.4
# Per-scan-tick fan-out into Apollo's detect endpoint. Apollo's GPU
# pool serializes server-side; this just overlaps file-IO with
# inference RTT.
FACE_DETECT_CONCURRENCY=8
# Per-detect HTTP timeout. CPU-only Apollo deploys may need higher.
FACE_DETECT_TIMEOUT_SEC=60
# Per-tick caps on the two backlog drains (independent of WATCH_*
# quick / full scans). Tune up if you have a large unscanned backlog
# and want it to clear faster; tune down if Apollo is overloaded.
FACE_BACKLOG_MAX_PER_TICK=64
FACE_HASH_BACKFILL_MAX_PER_TICK=2000
# ── RAG / search ────────────────────────────────────────────────────────
# Set to `1` to enable cross-encoder reranking on /search results.
SEARCH_RAG_RERANK=0

View File

@@ -210,7 +210,34 @@ Centralized in `file_types.rs` with constants `IMAGE_EXTENSIONS` and `VIDEO_EXTE
All database operations and HTTP handlers wrapped in spans. In release builds, exports to OTLP endpoint via `OTLP_OTLS_ENDPOINT`. Debug builds use basic logger. All database operations and HTTP handlers wrapped in spans. In release builds, exports to OTLP endpoint via `OTLP_OTLS_ENDPOINT`. Debug builds use basic logger.
**Memory Exclusion:** **Memory Exclusion:**
`PathExcluder` in `memories.rs` filters out directories from memories API via `EXCLUDED_DIRS` environment variable (comma-separated paths or substring patterns). `PathExcluder` in `memories.rs` filters out directories from memories API via `EXCLUDED_DIRS` environment variable (comma-separated paths or substring patterns). The same excluder is applied to face-detection candidates (`face_watch::filter_excluded`) so junk directories like `@eaDir` / `.thumbnails` don't burn detect calls on Apollo.
### Face detection system
ImageApi owns the face data; Apollo (sibling repo) hosts the insightface inference service. Inference is triggered automatically by the file watcher and persisted into two tables:
- `persons(id, name UNIQUE COLLATE NOCASE, cover_face_id, entity_id, created_from_tag, notes, ...)` — operator-managed, name is the user-visible identity.
- `face_detections(id, library_id, content_hash, rel_path, bbox_*, embedding BLOB, confidence, source, person_id, status, model_version, ...)` — keyed on `content_hash` so a photo duplicated across libraries is detected once. Marker rows for `status IN ('no_faces','failed')` carry NULL bbox/embedding (CHECK constraint enforces this).
**Why content_hash and not (library_id, rel_path):** ties face data to the bytes, not the path. A backup mount that copies files from the primary library naturally inherits the existing detections without re-running inference.
**File-watch hook** (`src/main.rs::process_new_files`): for each photo with a populated `content_hash`, check `FaceDao::already_scanned(hash)`; if not, send bytes (or embedded JPEG preview for RAW via `exif::extract_embedded_jpeg_preview`) to Apollo's `/api/internal/faces/detect`. K=`FACE_DETECT_CONCURRENCY` (default 8) parallel calls per scan tick; Apollo serializes them via its single-worker GPU pool. `face_watch.rs` is the Tokio orchestration layer.
**Per-tick backlog drain** (also `src/main.rs`): two passes that run on every watcher tick regardless of quick-vs-full scan:
- `backfill_unhashed_backlog` — populates `image_exif.content_hash` for photos that arrived before the hash field was retroactive. Capped by `FACE_HASH_BACKFILL_MAX_PER_TICK` (default 2000); errors don't burn the cap.
- `process_face_backlog` — runs detection on photos that have a hash but no `face_detections` row. Capped by `FACE_BACKLOG_MAX_PER_TICK` (default 64). Selected via a SQL anti-join (`FaceDao::list_unscanned_candidates`); videos and EXCLUDED_DIRS paths filtered out client-side via `face_watch::filter_excluded` so they never reach Apollo.
**Auto-bind on detection:** when a photo carries a tag whose name matches a `persons.name` (case-insensitive), the new face binds automatically iff cosine similarity to the person's existing-face mean is ≥ `FACE_AUTOBIND_MIN_COS` (default 0.4). Persons with no existing faces bind unconditionally and the new face becomes the cover.
**Manual face create** (`POST /image/faces`): crops the image to the user-supplied bbox, applies EXIF orientation via `exif::apply_orientation` (the `image` crate hands raw pre-rotation pixels — without this, manually-drawn bboxes never resolved a face on re-detection), pads to ~50% of bbox dims (RetinaFace anchor scales need ~50% face-fill at det_size=640), then calls Apollo's embed endpoint. A `force` flag lets the operator save a face the detector couldn't see (e.g. profile shots, occluded faces) — the row gets a zero-vector embedding so it's manually-bound only and won't participate in clustering.
**Rerun preserves manual rows** (`POST /image/faces/{id}/rerun`): only `source='auto'` rows are deleted before re-running detection. `already_scanned` returns true on ANY row, so a photo whose only faces are manually drawn never auto-redetects.
Module map:
- `src/faces.rs``FaceDao` trait + `SqliteFaceDao` impl, route handlers for `/faces/*`, `/image/faces/*`, `/persons/*`. Mirror of `tags.rs` layout.
- `src/face_watch.rs` — Tokio orchestration for the file-watch detect pass; `filter_excluded` (PathExcluder + image-extension filter), `read_image_bytes_for_detect` (RAW preview fallback).
- `src/ai/face_client.rs` — HTTP client for Apollo's inference. Configured by `APOLLO_FACE_API_BASE_URL`, falls back to `APOLLO_API_BASE_URL`. Both unset → feature disabled, file-watch hook is a no-op.
- `migrations/2026-04-29-000000_add_faces/` — schema.
### Startup Sequence ### Startup Sequence
@@ -286,6 +313,15 @@ SMS_API_TOKEN=your-api-token # SMS API authentication token (o
# `get_personal_place_at` tool. Unset = legacy Nominatim-only path. # `get_personal_place_at` tool. Unset = legacy Nominatim-only path.
APOLLO_API_BASE_URL=http://apollo.lan:8000 # Base URL of the sibling Apollo backend APOLLO_API_BASE_URL=http://apollo.lan:8000 # Base URL of the sibling Apollo backend
# Face inference (optional). Apollo also hosts the insightface inference
# service; ImageApi calls it from the file-watch hook (Phase 3) and from
# the manual face-create endpoint. Falls back to APOLLO_API_BASE_URL when
# unset (typical single-Apollo deploy). Both unset = feature disabled.
APOLLO_FACE_API_BASE_URL=http://apollo.lan:8000 # Override if face service runs separately
FACE_AUTOBIND_MIN_COS=0.4 # Phase 3: cosine-sim floor for tag-name auto-bind
FACE_DETECT_CONCURRENCY=8 # Phase 3: per-scan-tick parallel detect calls
FACE_DETECT_TIMEOUT_SEC=60 # reqwest client timeout (CPU inference can be slow)
# OpenRouter (Hybrid Backend) - keeps embeddings + vision local, routes chat to OpenRouter # OpenRouter (Hybrid Backend) - keeps embeddings + vision local, routes chat to OpenRouter
OPENROUTER_API_KEY=sk-or-... # Required to enable hybrid backend OPENROUTER_API_KEY=sk-or-... # Required to enable hybrid backend
OPENROUTER_DEFAULT_MODEL=anthropic/claude-sonnet-4 # Used when client doesn't pick a model OPENROUTER_DEFAULT_MODEL=anthropic/claude-sonnet-4 # Used when client doesn't pick a model

3
Cargo.lock generated
View File

@@ -1913,7 +1913,7 @@ dependencies = [
[[package]] [[package]]
name = "image-api" name = "image-api"
version = "1.0.0" version = "1.1.0"
dependencies = [ dependencies = [
"actix", "actix",
"actix-cors", "actix-cors",
@@ -3229,6 +3229,7 @@ dependencies = [
"js-sys", "js-sys",
"log", "log",
"mime", "mime",
"mime_guess",
"native-tls", "native-tls",
"percent-encoding", "percent-encoding",
"pin-project-lite", "pin-project-lite",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "image-api" name = "image-api"
version = "1.0.0" version = "1.1.0"
authors = ["Cameron Cordes <cameronc.dev@gmail.com>"] authors = ["Cameron Cordes <cameronc.dev@gmail.com>"]
edition = "2024" edition = "2024"
@@ -49,7 +49,7 @@ opentelemetry-appender-log = "0.31.0"
tempfile = "3.20.0" tempfile = "3.20.0"
regex = "1.11.1" regex = "1.11.1"
exif = { package = "kamadak-exif", version = "0.6.1" } exif = { package = "kamadak-exif", version = "0.6.1" }
reqwest = { version = "0.12", features = ["json", "stream"] } reqwest = { version = "0.12", features = ["json", "stream", "multipart"] }
async-stream = "0.3" async-stream = "0.3"
tokio-util = { version = "0.7", features = ["io"] } tokio-util = { version = "0.7", features = ["io"] }
bytes = "1" bytes = "1"

View File

@@ -159,3 +159,34 @@ Daily conversation summaries are generated automatically on server startup. Conf
- Contacts to process - Contacts to process
- Model version used for embeddings: `nomic-embed-text:v1.5` - Model version used for embeddings: `nomic-embed-text:v1.5`
### Apollo + Face Recognition (Optional)
Apollo (sibling project) hosts both the Places API and the local insightface
inference service. Both integrations are optional and degrade gracefully when
unset.
- `APOLLO_API_BASE_URL` - Base URL of the sibling Apollo backend.
- When set, photo-insight enrichment folds the user's personal place name
(Home, Work, Cabin, ...) into the location string, and the agentic loop
gains a `get_personal_place_at` tool. Unset = legacy Nominatim-only path.
- `APOLLO_FACE_API_BASE_URL` - Base URL for the face-detection service.
- Falls back to `APOLLO_API_BASE_URL` when unset (typical single-Apollo
deploy). Both unset = face feature disabled (file-watch hook and
manual-face endpoints short-circuit silently).
- `FACE_AUTOBIND_MIN_COS` (Phase 3) - Cosine-sim floor for auto-binding a
detected face to an existing same-named person via people-tag bootstrap
[default: `0.4`].
- `FACE_DETECT_CONCURRENCY` (Phase 3) - Per-scan-tick concurrent detect
calls fired by the file watcher [default: `8`]. Apollo serializes them
via its single-worker GPU pool.
- `FACE_DETECT_TIMEOUT_SEC` - reqwest client timeout per detect call
[default: `60`]. CPU inference on a backlog can take many seconds.
- `FACE_BACKLOG_MAX_PER_TICK` - Cap on the per-tick backlog drain (photos
with a content_hash but no face_detections row) [default: `64`]. Runs
every watcher tick regardless of quick-vs-full scan, so the unscanned
set drains independently of the file walk.
- `FACE_HASH_BACKFILL_MAX_PER_TICK` - Cap on the per-tick content_hash
backfill (photos that were registered before the hash field was
populated retroactively) [default: `2000`]. Errors don't burn the cap;
only successful hashes count.

View File

@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS face_detections;
DROP TABLE IF EXISTS persons;

View File

@@ -0,0 +1,67 @@
-- Local face recognition tables.
--
-- `persons` are visual identities (the "who" of a face). The optional
-- `entity_id` bridges to the existing knowledge graph `entities` table —
-- when set, this person is the visual side of an LLM-extracted entity.
-- Don't auto-create entities from persons; the entity table represents
-- LLM-extracted knowledge with its own confidence semantics, and silently
-- filling it from face detections muddies the provenance.
--
-- `face_detections` carries one row per detected face on a content_hash,
-- plus marker rows with `status='no_faces'` or `status='failed'` so the
-- file watcher knows not to re-scan a hash. Keying on `content_hash`
-- (cross-library dedup) rather than `(library_id, rel_path)` means the
-- same JPEG in two libraries is scanned once. The denormalized `rel_path`
-- carries the most-recently-seen path — useful for cluster-thumb URL
-- generation; canonical path lookup goes through image_exif.
CREATE TABLE persons (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
name TEXT NOT NULL,
cover_face_id INTEGER, -- backfilled when the first face binds
entity_id INTEGER, -- optional bridge to entities(id)
created_from_tag BOOLEAN NOT NULL DEFAULT 0,
notes TEXT,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL,
CONSTRAINT fk_persons_entity FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE SET NULL,
UNIQUE(name COLLATE NOCASE)
);
CREATE INDEX idx_persons_entity ON persons(entity_id);
CREATE TABLE face_detections (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
library_id INTEGER NOT NULL,
content_hash TEXT NOT NULL, -- canonical key (cross-library dedup)
rel_path TEXT NOT NULL, -- denormalized; most recently seen
bbox_x REAL, -- normalized 0..1; NULL on marker rows
bbox_y REAL,
bbox_w REAL,
bbox_h REAL,
embedding BLOB, -- 512×f32 = 2048 bytes; NULL on marker rows
confidence REAL, -- detector score
source TEXT NOT NULL, -- 'auto' | 'manual'
person_id INTEGER,
status TEXT NOT NULL DEFAULT 'detected', -- 'detected' | 'no_faces' | 'failed'
model_version TEXT NOT NULL, -- e.g. 'buffalo_l'; embedding lineage
created_at BIGINT NOT NULL,
CONSTRAINT fk_fd_library FOREIGN KEY (library_id) REFERENCES libraries(id),
CONSTRAINT fk_fd_person FOREIGN KEY (person_id) REFERENCES persons(id) ON DELETE SET NULL,
-- Detected rows carry geometry + embedding; marker rows ('no_faces',
-- 'failed') carry neither. CHECK enforces the invariant so manual
-- inserts can't slip through with half a row.
CONSTRAINT chk_marker CHECK (
(status = 'detected' AND bbox_x IS NOT NULL AND embedding IS NOT NULL)
OR (status IN ('no_faces','failed') AND bbox_x IS NULL AND embedding IS NULL)
)
);
CREATE INDEX idx_face_detections_hash ON face_detections(content_hash);
CREATE INDEX idx_face_detections_lib_path ON face_detections(library_id, rel_path);
CREATE INDEX idx_face_detections_person ON face_detections(person_id);
CREATE INDEX idx_face_detections_status ON face_detections(status);
-- One marker row per (content_hash, status='no_faces') so the file watcher
-- doesn't double-mark when a hash is seen on multiple full-scan passes.
CREATE UNIQUE INDEX idx_face_detections_no_faces_unique
ON face_detections(content_hash) WHERE status = 'no_faces';

View File

@@ -0,0 +1,2 @@
DROP INDEX IF EXISTS idx_persons_is_ignored;
ALTER TABLE persons DROP COLUMN is_ignored;

View File

@@ -0,0 +1,20 @@
-- IGNORE / junk bucket for the face recognition feature.
--
-- An "Ignored" person is the destination for strangers, faces the user
-- doesn't want tagged, and false detections. It looks like any other
-- person row (so face_detections.person_id stays a clean foreign key)
-- but `is_ignored=1` flags it for special UI treatment:
-- - hidden from the persons list by default
-- - excluded from `find_persons_by_names_ci` so a tag-name match
-- can never auto-bind a real face to the ignore bucket
-- - cluster-suggest already filters by `person_id IS NULL`, so faces
-- bound to an ignored person are naturally excluded from future
-- re-clustering
--
-- Partial index because the WHERE-clause is small (typically 1 row),
-- and we only ever query for `is_ignored = 1` to find the bucket.
ALTER TABLE persons ADD COLUMN is_ignored BOOLEAN NOT NULL DEFAULT 0;
CREATE INDEX idx_persons_is_ignored
ON persons(is_ignored) WHERE is_ignored = 1;

370
src/ai/face_client.rs Normal file
View File

@@ -0,0 +1,370 @@
//! Thin async HTTP client for Apollo's `/api/internal/faces/*` endpoints.
//!
//! Apollo (the personal location-history viewer at the sibling repo) hosts the
//! insightface inference service. This client is the ImageApi side of the
//! contract — it shoves image bytes through `/detect` and returns boxes +
//! 512-d ArcFace embeddings, plus a single-embedding `/embed` for the manual
//! face-create flow.
//!
//! Mirrors `apollo_client.rs` shape: optional base URL (None = disabled, the
//! file watcher and manual-create handlers no-op), reqwest client with a
//! generous timeout because CPU inference on a backlog can take many seconds
//! per photo.
//!
//! Configured via `APOLLO_FACE_API_BASE_URL`, falling back to
//! `APOLLO_API_BASE_URL` when the dedicated var is unset (single-Apollo
//! deploys are the common case). Both unset → `is_enabled()` returns false.
//!
//! Wire format: multipart/form-data with `file=<bytes>` and `meta=<json>`.
//! `meta` carries `{content_hash, library_id, rel_path, orientation?,
//! model_version?}` — useful for Apollo-side logging and idempotency, ignored
//! by Apollo today but part of the stable wire contract so future versions
//! can act on it without a client change.
//!
//! Error mapping (reflected in [`FaceDetectError`]):
//! - 422 `decode_failed` → permanent: ImageApi marks `status='failed'` and
//! doesn't retry until manual rerun.
//! - 200 with `faces:[]` → `status='no_faces'` marker row.
//! - 503 `cuda_oom` / `engine_unavailable` → defer-and-retry: no marker
//! written.
//! - Any other 5xx / network error → defer.
use anyhow::{Context, Result};
use base64::Engine;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Clone, Serialize)]
pub struct DetectMeta {
pub content_hash: String,
pub library_id: i32,
pub rel_path: String,
/// EXIF orientation int (1..8). Apollo applies `exif_transpose` on the
/// bytes before inference, so this is informational only — supply when
/// the bytes were extracted from a RAW preview that lost the tag.
#[serde(skip_serializing_if = "Option::is_none")]
pub orientation: Option<i32>,
/// Echoed back in the response. ImageApi stores it in
/// `face_detections.model_version`.
#[serde(skip_serializing_if = "Option::is_none")]
pub model_version: Option<String>,
}
// Wire shape for the bbox sub-object Apollo returns. Read by Phase 3's
// file-watch hook; silence the dead-code lint until then.
#[allow(dead_code)]
#[derive(Debug, Clone, Deserialize)]
pub struct DetectedBbox {
pub x: f32,
pub y: f32,
pub w: f32,
pub h: f32,
}
#[allow(dead_code)] // bbox consumed by Phase 3 file-watch hook
#[derive(Debug, Clone, Deserialize)]
pub struct DetectedFace {
pub bbox: DetectedBbox,
pub confidence: f32,
/// base64 of 2048 bytes (512×f32 LE). ImageApi stores the raw bytes
/// verbatim as a BLOB — see `decode_embedding` for the unpack.
pub embedding: String,
}
impl DetectedFace {
/// Decode the wire-format embedding back into raw bytes for storage.
/// Returns the 2048-byte little-endian f32 buffer or an error if the
/// base64 is malformed or the wrong length.
pub fn decode_embedding(&self) -> Result<Vec<u8>> {
let bytes = base64::engine::general_purpose::STANDARD
.decode(self.embedding.as_bytes())
.context("face embedding base64 decode")?;
if bytes.len() != 2048 {
anyhow::bail!(
"face embedding wrong size: got {} bytes, expected 2048",
bytes.len()
);
}
Ok(bytes)
}
}
#[allow(dead_code)] // duration_ms logged by Phase 3 file-watch hook
#[derive(Debug, Clone, Deserialize)]
pub struct DetectResponse {
pub model_version: String,
pub duration_ms: i64,
pub faces: Vec<DetectedFace>,
}
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)] // Reported by Apollo; useful for future health-driven backoff
pub struct FaceHealth {
pub loaded: bool,
pub providers: Vec<String>,
pub model_version: String,
pub det_size: i32,
#[serde(default)]
pub load_error: Option<String>,
}
/// Distinguishes permanent failures (don't retry) from transient ones
/// (defer and retry on next scan tick). The file-watch hook keys its
/// marker-row decision on this — a `Permanent` outcome writes
/// `status='failed'`, a `Transient` outcome writes nothing so the next
/// pass tries again.
#[derive(Debug)]
pub enum FaceDetectError {
/// Apollo refused the bytes for a reason that won't change on retry
/// (decode failure, zero-dim image). Mark `status='failed'`.
Permanent(anyhow::Error),
/// Apollo couldn't process this turn but might next time (CUDA OOM,
/// engine not loaded yet, network hiccup). Don't mark anything.
Transient(anyhow::Error),
/// Feature is disabled (no `APOLLO_FACE_API_BASE_URL`). Caller should
/// silently no-op — same shape as `apollo_client::is_enabled()` false.
Disabled,
}
impl std::fmt::Display for FaceDetectError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FaceDetectError::Permanent(e) => write!(f, "permanent: {e}"),
FaceDetectError::Transient(e) => write!(f, "transient: {e}"),
FaceDetectError::Disabled => write!(f, "face client disabled"),
}
}
}
impl std::error::Error for FaceDetectError {}
#[derive(Clone)]
pub struct FaceClient {
client: Client,
/// `None` → disabled. Trim trailing slash at construction so url
/// building doesn't double up.
base_url: Option<String>,
}
impl FaceClient {
pub fn new(base_url: Option<String>) -> Self {
// 60 s timeout: CPU inference on a backlog can take many seconds
// per photo, especially the first call into a cold GPU. Apollo's
// bounded threadpool (1 worker on CUDA) means concurrent calls
// queue server-side; 60 s is enough headroom for a few items in
// the queue without surfacing a false transient.
let timeout_secs = std::env::var("FACE_DETECT_TIMEOUT_SEC")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(60);
let client = Client::builder()
.timeout(Duration::from_secs(timeout_secs))
.build()
.expect("reqwest client build");
Self {
client,
base_url: base_url.map(|u| u.trim_end_matches('/').to_string()),
}
}
pub fn is_enabled(&self) -> bool {
self.base_url.is_some()
}
/// Detect every face in `bytes`. ImageApi calls this from the file-watch
/// hook (Phase 3) and from the manual rerun handler. Empty `faces[]` in
/// the response is the no-faces signal — caller writes a marker row.
#[allow(dead_code)] // Phase 3 file-watch hook + rerun handler
pub async fn detect(
&self,
bytes: Vec<u8>,
meta: DetectMeta,
) -> std::result::Result<DetectResponse, FaceDetectError> {
let Some(base) = self.base_url.as_deref() else {
return Err(FaceDetectError::Disabled);
};
let url = format!("{}/api/internal/faces/detect", base);
self.post_multipart(&url, bytes, &meta).await
}
/// Single-embedding endpoint for the manual face-create flow. Caller
/// crops the image to the user-drawn bbox and passes those bytes; we
/// run detection inside the crop and return the highest-confidence
/// face's embedding. Apollo returns 422 `no_face_in_crop` when the
/// box missed — surfaced here as `Permanent`.
pub async fn embed(
&self,
bytes: Vec<u8>,
meta: DetectMeta,
) -> std::result::Result<DetectResponse, FaceDetectError> {
let Some(base) = self.base_url.as_deref() else {
return Err(FaceDetectError::Disabled);
};
let url = format!("{}/api/internal/faces/embed", base);
self.post_multipart(&url, bytes, &meta).await
}
/// Engine reachability + provider/model report. Used by ImageApi for a
/// startup sanity check; not on the hot path.
#[allow(dead_code)] // Phase 3 startup probe
pub async fn health(&self) -> Result<FaceHealth> {
let base = self.base_url.as_deref().context("face client disabled")?;
let url = format!("{}/api/internal/faces/health", base);
let resp = self.client.get(&url).send().await?.error_for_status()?;
let body: FaceHealth = resp.json().await?;
Ok(body)
}
async fn post_multipart(
&self,
url: &str,
bytes: Vec<u8>,
meta: &DetectMeta,
) -> std::result::Result<DetectResponse, FaceDetectError> {
let meta_json = serde_json::to_string(meta)
.map_err(|e| FaceDetectError::Permanent(anyhow::anyhow!("meta serialize: {e}")))?;
let form = reqwest::multipart::Form::new()
.text("meta", meta_json)
.part(
"file",
reqwest::multipart::Part::bytes(bytes)
.file_name(meta.rel_path.clone())
.mime_str("application/octet-stream")
.unwrap_or_else(|_| reqwest::multipart::Part::bytes(Vec::new())),
);
let resp = match self.client.post(url).multipart(form).send().await {
Ok(r) => r,
Err(e) if e.is_timeout() || e.is_connect() => {
return Err(FaceDetectError::Transient(anyhow::anyhow!(
"face client network: {e}"
)));
}
Err(e) => {
return Err(FaceDetectError::Transient(anyhow::anyhow!(
"face client request: {e}"
)));
}
};
let status = resp.status();
if status.is_success() {
let body: DetectResponse = resp.json().await.map_err(|e| {
FaceDetectError::Transient(anyhow::anyhow!("face response decode: {e}"))
})?;
return Ok(body);
}
let body_text = resp.text().await.unwrap_or_default();
Err(classify_error_response(status.as_u16(), &body_text))
}
}
/// Map an Apollo HTTP error response to a FaceDetectError. Pulled out as a
/// pure function so the marker-row contract (422 → Permanent, 503 →
/// Transient) is unit-testable without spinning up an HTTP server.
fn classify_error_response(status: u16, body_text: &str) -> FaceDetectError {
// Apollo encodes its error class in the JSON body's `detail`. Try to
// parse it; fall back to status-only classification.
let detail_code = serde_json::from_str::<serde_json::Value>(body_text)
.ok()
.and_then(|v| {
// detail can be a string ("decode_failed") or an object
// ({"code": "cuda_oom", ...}) depending on the endpoint and
// Apollo's response shape — handle both.
v.get("detail")
.and_then(|d| d.as_str().map(str::to_string))
.or_else(|| {
v.get("detail")
.and_then(|d| d.get("code"))
.and_then(|c| c.as_str())
.map(str::to_string)
})
})
.unwrap_or_default();
if status == 422 {
return FaceDetectError::Permanent(anyhow::anyhow!(
"face detect 422 {}: {}",
detail_code,
body_text
));
}
if status == 503 {
return FaceDetectError::Transient(anyhow::anyhow!(
"face detect 503 {}: {}",
detail_code,
body_text
));
}
// Any other 4xx: be conservative and treat as Permanent so we don't
// loop forever on a stable rejection. Any other 5xx: Transient —
// likely intermittent.
if (400..500).contains(&status) {
FaceDetectError::Permanent(anyhow::anyhow!(
"face detect {} {}: {}",
status,
detail_code,
body_text
))
} else {
FaceDetectError::Transient(anyhow::anyhow!(
"face detect {} {}: {}",
status,
detail_code,
body_text
))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn is_permanent(e: &FaceDetectError) -> bool {
matches!(e, FaceDetectError::Permanent(_))
}
fn is_transient(e: &FaceDetectError) -> bool {
matches!(e, FaceDetectError::Transient(_))
}
#[test]
fn classify_422_decode_failed_is_permanent() {
// Permanent → ImageApi marks status='failed' and stops retrying.
let e = classify_error_response(422, r#"{"detail":"decode_failed: bad bytes"}"#);
assert!(is_permanent(&e), "422 decode_failed must be Permanent");
assert!(format!("{e}").contains("decode_failed"));
}
#[test]
fn classify_503_cuda_oom_is_transient() {
// Transient → ImageApi must NOT write a marker so the next scan
// retries. The detail.code is nested in an object rather than a
// bare string; the parser handles both.
let e = classify_error_response(
503,
r#"{"detail":{"code":"cuda_oom","error":"out of memory"}}"#,
);
assert!(is_transient(&e), "503 cuda_oom must be Transient");
assert!(format!("{e}").contains("cuda_oom"));
}
#[test]
fn classify_500_is_transient_other_4xx_is_permanent() {
// Conservative split: 5xx defers (intermittent), other 4xx
// is treated as a stable rejection so we don't loop forever.
assert!(is_transient(&classify_error_response(500, "")));
assert!(is_transient(&classify_error_response(502, "{}")));
assert!(is_permanent(&classify_error_response(400, "{}")));
assert!(is_permanent(&classify_error_response(404, "{}")));
}
#[test]
fn classify_handles_unparseable_body() {
// Apollo can return non-JSON on misroute / proxy errors; the
// classifier must still produce a useful variant.
let e = classify_error_response(503, "<html>nginx</html>");
assert!(is_transient(&e));
}
}

View File

@@ -1,5 +1,6 @@
pub mod apollo_client; pub mod apollo_client;
pub mod daily_summary_job; pub mod daily_summary_job;
pub mod face_client;
pub mod handlers; pub mod handlers;
pub mod insight_chat; pub mod insight_chat;
pub mod insight_generator; pub mod insight_generator;

View File

@@ -386,6 +386,16 @@ pub trait ExifDao: Sync + Send {
hash: &str, hash: &str,
) -> Result<Vec<String>, DbError>; ) -> Result<Vec<String>, DbError>;
/// Batch version of [`get_rel_paths_by_hash`]. Returns a
/// `hash → Vec<rel_path>` map for every hash that has at least one
/// rel_path. Used by the batch tag lookup endpoint to expand
/// content-hash siblings without firing a query per hash.
fn get_rel_paths_for_hashes(
&mut self,
context: &opentelemetry::Context,
hashes: &[String],
) -> Result<std::collections::HashMap<String, Vec<String>>, DbError>;
/// List `(library_id, rel_path)` pairs for the given libraries, optionally /// List `(library_id, rel_path)` pairs for the given libraries, optionally
/// restricted to rows whose rel_path starts with `path_prefix`. When /// restricted to rows whose rel_path starts with `path_prefix`. When
/// `library_ids` is empty, rows from every library are returned. Used by /// `library_ids` is empty, rows from every library are returned. Used by
@@ -956,6 +966,40 @@ impl ExifDao for SqliteExifDao {
.map_err(|_| DbError::new(DbErrorKind::QueryError)) .map_err(|_| DbError::new(DbErrorKind::QueryError))
} }
fn get_rel_paths_for_hashes(
&mut self,
context: &opentelemetry::Context,
hashes: &[String],
) -> Result<std::collections::HashMap<String, Vec<String>>, DbError> {
use std::collections::HashMap;
let mut out: HashMap<String, Vec<String>> = HashMap::new();
if hashes.is_empty() {
return Ok(out);
}
trace_db_call(context, "query", "get_rel_paths_for_hashes", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// Chunk the IN clause to stay safely under SQLite's
// SQLITE_LIMIT_VARIABLE_NUMBER (32766 modern, 999 legacy).
const CHUNK: usize = 500;
for chunk in hashes.chunks(CHUNK) {
let rows: Vec<(String, String)> = image_exif
.filter(content_hash.eq_any(chunk))
.select((content_hash.assume_not_null(), rel_path))
.distinct()
.load::<(String, String)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))?;
for (hash, path) in rows {
out.entry(hash).or_default().push(path);
}
}
Ok(out)
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn list_rel_paths_for_libraries( fn list_rel_paths_for_libraries(
&mut self, &mut self,
context: &opentelemetry::Context, context: &opentelemetry::Context,

View File

@@ -70,6 +70,26 @@ diesel::table! {
} }
} }
diesel::table! {
face_detections (id) {
id -> Integer,
library_id -> Integer,
content_hash -> Text,
rel_path -> Text,
bbox_x -> Nullable<Float>,
bbox_y -> Nullable<Float>,
bbox_w -> Nullable<Float>,
bbox_h -> Nullable<Float>,
embedding -> Nullable<Binary>,
confidence -> Nullable<Float>,
source -> Text,
person_id -> Nullable<Integer>,
status -> Text,
model_version -> Text,
created_at -> BigInt,
}
}
diesel::table! { diesel::table! {
favorites (id) { favorites (id) {
id -> Integer, id -> Integer,
@@ -130,6 +150,20 @@ diesel::table! {
} }
} }
diesel::table! {
persons (id) {
id -> Integer,
name -> Text,
cover_face_id -> Nullable<Integer>,
entity_id -> Nullable<Integer>,
created_from_tag -> Bool,
notes -> Nullable<Text>,
created_at -> BigInt,
updated_at -> BigInt,
is_ignored -> Bool,
}
}
diesel::table! { diesel::table! {
photo_insights (id) { photo_insights (id) {
id -> Integer, id -> Integer,
@@ -201,7 +235,10 @@ diesel::table! {
diesel::joinable!(entity_facts -> photo_insights (source_insight_id)); diesel::joinable!(entity_facts -> photo_insights (source_insight_id));
diesel::joinable!(entity_photo_links -> entities (entity_id)); diesel::joinable!(entity_photo_links -> entities (entity_id));
diesel::joinable!(entity_photo_links -> libraries (library_id)); diesel::joinable!(entity_photo_links -> libraries (library_id));
diesel::joinable!(face_detections -> libraries (library_id));
diesel::joinable!(face_detections -> persons (person_id));
diesel::joinable!(image_exif -> libraries (library_id)); diesel::joinable!(image_exif -> libraries (library_id));
diesel::joinable!(persons -> entities (entity_id));
diesel::joinable!(photo_insights -> libraries (library_id)); diesel::joinable!(photo_insights -> libraries (library_id));
diesel::joinable!(tagged_photo -> tags (tag_id)); diesel::joinable!(tagged_photo -> tags (tag_id));
diesel::joinable!(video_preview_clips -> libraries (library_id)); diesel::joinable!(video_preview_clips -> libraries (library_id));
@@ -212,10 +249,12 @@ diesel::allow_tables_to_appear_in_same_query!(
entities, entities,
entity_facts, entity_facts,
entity_photo_links, entity_photo_links,
face_detections,
favorites, favorites,
image_exif, image_exif,
libraries, libraries,
location_history, location_history,
persons,
photo_insights, photo_insights,
search_history, search_history,
tagged_photo, tagged_photo,

590
src/face_watch.rs Normal file
View File

@@ -0,0 +1,590 @@
//! Face-detection pass for the file watcher.
//!
//! `process_new_files` calls [`run_face_detection_pass`] after the EXIF
//! registration loop. We walk the candidates (images, not yet face-scanned,
//! not excluded by EXCLUDED_DIRS), fan out parallel detect calls to Apollo,
//! and persist the results — detected faces, `no_faces` markers when Apollo
//! found nothing, `failed` markers on permanent decode errors, no marker on
//! transient failures so the next scan retries.
//!
//! The watcher runs in a plain `std::thread`, so we build a short-lived
//! tokio runtime per pass and `block_on` a join of K detect futures. K is
//! configurable via `FACE_DETECT_CONCURRENCY` (default 8). Apollo's
//! threadpool is bounded to 12 workers anyway, so the runs queue
//! server-side; the client-side fan-out is purely about overlapping IO
//! (file read + JSON encode) with someone else's inference.
use crate::ai::face_client::{DetectMeta, FaceClient, FaceDetectError};
use crate::exif;
use crate::faces::{self, FaceDao, InsertFaceDetectionInput};
use crate::file_types;
use crate::libraries::Library;
use crate::memories::PathExcluder;
use crate::tags::TagDao;
use log::{debug, info, warn};
use std::path::Path;
use std::sync::{Arc, Mutex};
use tokio::sync::Semaphore;
/// One file the watcher would like to face-scan. Built by the caller from
/// the EXIF batch (we need `content_hash` to key everything against).
#[derive(Debug, Clone)]
pub struct FaceCandidate {
pub rel_path: String,
pub content_hash: String,
}
/// Synchronous entry point. Returns once every candidate has been
/// processed (or definitively skipped). When `face_client.is_enabled()`
/// is false this is a no-op so the watcher can call unconditionally.
pub fn run_face_detection_pass(
library: &Library,
excluded_dirs: &[String],
face_client: &FaceClient,
face_dao: Arc<Mutex<Box<dyn FaceDao>>>,
tag_dao: Arc<Mutex<Box<dyn TagDao>>>,
candidates: Vec<FaceCandidate>,
) {
if !face_client.is_enabled() {
return;
}
if candidates.is_empty() {
return;
}
let base = Path::new(&library.root_path);
let filtered = filter_excluded(base, excluded_dirs, candidates, Some(&library.name));
if filtered.is_empty() {
return;
}
let concurrency: usize = std::env::var("FACE_DETECT_CONCURRENCY")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &usize| *n > 0)
.unwrap_or(8);
info!(
"face_watch: running detection on {} candidates (library '{}', concurrency {})",
filtered.len(),
library.name,
concurrency
);
// Per-pass tokio runtime. The watcher thread isn't in any pre-existing
// async context — building one here keeps the rest of the watcher
// sync-only. Worker count is small; the parallelism we care about is
// task-level (semaphore) not thread-level.
let rt = match tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
warn!("face_watch: failed to build tokio runtime: {e}");
return;
}
};
let library_id = library.id;
let library_root = library.root_path.clone();
rt.block_on(async move {
let sem = Arc::new(Semaphore::new(concurrency));
let mut handles = Vec::with_capacity(filtered.len());
for cand in filtered {
let permit_sem = sem.clone();
let face_client = face_client.clone();
let face_dao = face_dao.clone();
let tag_dao = tag_dao.clone();
let library_root = library_root.clone();
handles.push(tokio::spawn(async move {
// acquire_owned would let us drop the permit explicitly
// before await points; for a one-shot call into Apollo
// the simpler bounded acquire is enough.
let _permit = permit_sem.acquire().await.expect("face semaphore");
process_one(
library_id,
&library_root,
cand,
&face_client,
face_dao,
tag_dao,
)
.await;
}));
}
for h in handles {
// join; per-task panics are logged inside process_one before
// they reach here, so we don't propagate.
let _ = h.await;
}
});
}
async fn process_one(
library_id: i32,
library_root: &str,
cand: FaceCandidate,
face_client: &FaceClient,
face_dao: Arc<Mutex<Box<dyn FaceDao>>>,
tag_dao: Arc<Mutex<Box<dyn TagDao>>>,
) {
let abs = Path::new(library_root).join(&cand.rel_path);
// Read the bytes off disk in a blocking-friendly task. Filesystem IO
// is sync but cheap; a small spawn_blocking would be overkill.
let bytes = match read_image_bytes_for_detect(&abs) {
Ok(b) => b,
Err(e) => {
// Don't mark — file may have been moved/renamed mid-scan; let
// the next pass try again. Future-bug check: a permanently
// unreadable file would loop forever; we accept that for v1
// because process_new_files already prunes vanished rows on
// full scans.
warn!(
"face_watch: read failed for {} ({}): {}",
cand.rel_path, library_id, e
);
return;
}
};
let meta = DetectMeta {
content_hash: cand.content_hash.clone(),
library_id,
rel_path: cand.rel_path.clone(),
orientation: None,
model_version: None,
};
let ctx = opentelemetry::Context::current();
match face_client.detect(bytes, meta).await {
Ok(resp) => {
// Stage 1: persist detections, holding the dao lock only
// across synchronous DB writes.
let mut stored_for_autobind: Vec<(i32, Vec<f32>)> = Vec::new();
{
let mut dao = face_dao.lock().expect("face dao");
if resp.faces.is_empty() {
if let Err(e) = dao.mark_status(
&ctx,
library_id,
&cand.content_hash,
&cand.rel_path,
"no_faces",
&resp.model_version,
) {
warn!(
"face_watch: mark no_faces failed for {}: {:?}",
cand.rel_path, e
);
}
debug!(
"face_watch: {} → no faces (model {})",
cand.rel_path, resp.model_version
);
} else {
let face_count = resp.faces.len();
for face in &resp.faces {
let emb = match face.decode_embedding() {
Ok(b) => b,
Err(e) => {
warn!("face_watch: bad embedding for {}: {:?}", cand.rel_path, e);
continue;
}
};
// Decode the f32 vector once for auto-bind comparison.
let emb_floats = faces::decode_embedding_bytes(&emb);
match dao.store_detection(
&ctx,
InsertFaceDetectionInput {
library_id,
content_hash: cand.content_hash.clone(),
rel_path: cand.rel_path.clone(),
bbox: Some((face.bbox.x, face.bbox.y, face.bbox.w, face.bbox.h)),
embedding: Some(emb),
confidence: Some(face.confidence),
source: "auto".to_string(),
person_id: None,
status: "detected".to_string(),
model_version: resp.model_version.clone(),
},
) {
Ok(row) => {
if let Some(floats) = emb_floats {
stored_for_autobind.push((row.id, floats));
}
}
Err(e) => warn!(
"face_watch: store_detection failed for {}: {:?}",
cand.rel_path, e
),
}
}
info!(
"face_watch: {} → {} face(s) ({}ms, {})",
cand.rel_path, face_count, resp.duration_ms, resp.model_version
);
}
}
// Stage 2: auto-bind newly-stored faces against same-named
// people-tags. Done outside the dao lock so the lookups don't
// serialize with concurrent detect tasks.
if !stored_for_autobind.is_empty() {
try_auto_bind(
&ctx,
&cand.rel_path,
&resp.model_version,
stored_for_autobind,
&tag_dao,
&face_dao,
);
}
}
Err(FaceDetectError::Permanent(e)) => {
warn!(
"face_watch: permanent failure on {}: {} — marking failed",
cand.rel_path, e
);
let mut dao = face_dao.lock().expect("face dao");
// model_version is best-effort here — the engine that rejected
// the bytes may not have echoed one. Empty string is fine; this
// row is purely a "don't retry" sentinel.
if let Err(e) = dao.mark_status(
&ctx,
library_id,
&cand.content_hash,
&cand.rel_path,
"failed",
"",
) {
warn!(
"face_watch: mark failed errored for {}: {:?}",
cand.rel_path, e
);
}
}
Err(FaceDetectError::Transient(e)) => {
// Don't mark anything; next scan tick retries naturally.
// Demoted to debug because OOM and engine-not-ready are noisy
// and self-resolving.
debug!(
"face_watch: transient on {}: {} (will retry next pass)",
cand.rel_path, e
);
}
Err(FaceDetectError::Disabled) => {
// Caller already checked is_enabled(); this branch is defensive.
}
}
}
/// Auto-bind newly-detected faces to a same-named person, when a tag on the
/// photo unambiguously identifies one. Driven by `FACE_AUTOBIND_MIN_COS`
/// (default 0.4): the new face's embedding must reach this cosine
/// similarity against the L2-normalized mean of the person's existing
/// faces. The first face for a person binds unconditionally — there's
/// nothing to compare against, and the alternative ("never bind without
/// a reference") would mean bootstrap never kicks off.
///
/// Multi-match (the photo carries tags for two different known persons)
/// is intentionally a no-op — we can't tell which face is which without
/// additional matching. Those faces stay unassigned for the cluster
/// suggester (Phase 6) to handle.
fn try_auto_bind(
ctx: &opentelemetry::Context,
rel_path: &str,
model_version: &str,
new_faces: Vec<(i32, Vec<f32>)>, // (face_id, decoded embedding)
tag_dao: &Arc<Mutex<Box<dyn TagDao>>>,
face_dao: &Arc<Mutex<Box<dyn FaceDao>>>,
) {
// 1. Pull the photo's tags.
let tag_names: Vec<String> = {
let mut td = tag_dao.lock().expect("tag dao");
match td.get_tags_for_path(ctx, rel_path) {
Ok(tags) => tags.into_iter().map(|t| t.name).collect(),
Err(e) => {
warn!(
"face_watch: get_tags_for_path failed for {}: {:?}",
rel_path, e
);
return;
}
}
};
if tag_names.is_empty() {
return;
}
// 2. Find tags that map to existing persons (case-insensitive).
let person_for_tag: std::collections::HashMap<String, i32> = {
let mut fd = face_dao.lock().expect("face dao");
match fd.find_persons_by_names_ci(ctx, &tag_names) {
Ok(m) => m,
Err(e) => {
warn!(
"face_watch: find_persons_by_names_ci failed for {}: {:?}",
rel_path, e
);
return;
}
}
};
// 3. Multi-match: ambiguous, skip. Single match: candidate person.
let unique_person_ids: std::collections::HashSet<i32> =
person_for_tag.values().copied().collect();
if unique_person_ids.len() != 1 {
if !unique_person_ids.is_empty() {
debug!(
"face_watch: {} carries tags for {} different persons; skipping auto-bind",
rel_path,
unique_person_ids.len()
);
}
return;
}
let person_id = *unique_person_ids.iter().next().expect("nonempty set");
let threshold: f32 = std::env::var("FACE_AUTOBIND_MIN_COS")
.ok()
.and_then(|s| s.parse().ok())
.filter(|t: &f32| *t >= 0.0 && *t <= 1.0)
.unwrap_or(0.4);
// 4. Reference embedding (if any) under the same model_version.
let reference: Option<Vec<f32>> = {
let mut fd = face_dao.lock().expect("face dao");
match fd.person_reference_embedding(ctx, person_id, model_version) {
Ok(r) => r,
Err(e) => {
warn!(
"face_watch: person_reference_embedding failed for person {}: {:?}",
person_id, e
);
return;
}
}
};
// 5. Bind each new face that meets the criterion. Hold the lock once
// for the whole batch; assign_face_to_person uses its own short
// transaction internally.
let mut fd = face_dao.lock().expect("face dao");
for (face_id, emb) in new_faces {
let bind = match &reference {
None => {
// Person has no faces yet — first one wins so bootstrap
// can ever produce a usable reference. After this row
// commits, future faces evaluate against it.
debug!(
"face_watch: auto-binding first face {} → person {} (no reference yet)",
face_id, person_id
);
true
}
Some(ref_vec) => {
let sim = faces::cosine_similarity(&emb, ref_vec);
if sim >= threshold {
debug!(
"face_watch: auto-binding face {} → person {} (cos={:.3} ≥ {:.3})",
face_id, person_id, sim, threshold
);
true
} else {
debug!(
"face_watch: leaving face {} unassigned (cos={:.3} < {:.3} for person {})",
face_id, sim, threshold, person_id
);
false
}
}
};
if bind && let Err(e) = fd.assign_face_to_person(ctx, face_id, person_id) {
warn!(
"face_watch: assign_face_to_person failed (face={}, person={}): {:?}",
face_id, person_id, e
);
}
}
}
/// Drop candidates whose path matches the watcher's `EXCLUDED_DIRS` rules.
/// Pulled out for unit testing — the same `PathExcluder` /memories uses,
/// just applied at the face-detect candidate set instead of the memories
/// listing. Skip @eaDir / .thumbnails / user-defined paths before we burn
/// a detect call (and Apollo's GPU memory) on junk. Also drops anything
/// that isn't an image file — the backlog drain pulls every hashed row in
/// `image_exif`, which includes videos; sending those to Apollo just
/// produces `failed` markers and inflates the FAILED stat.
pub(crate) fn filter_excluded(
base: &Path,
excluded_dirs: &[String],
candidates: Vec<FaceCandidate>,
library_name: Option<&str>,
) -> Vec<FaceCandidate> {
let excluder = if excluded_dirs.is_empty() {
None
} else {
Some(PathExcluder::new(base, excluded_dirs))
};
candidates
.into_iter()
.filter(|c| {
let abs = base.join(&c.rel_path);
if !file_types::is_image_file(&abs) {
debug!(
"face_watch: skipping non-image path {} (library {})",
c.rel_path,
library_name.unwrap_or("<unknown>")
);
return false;
}
if let Some(ex) = excluder.as_ref()
&& ex.is_excluded(&abs)
{
debug!(
"face_watch: skipping excluded path {} (library {})",
c.rel_path,
library_name.unwrap_or("<unknown>")
);
return false;
}
true
})
.collect()
}
/// Read image bytes for face detection. Insightface (via opencv) can't
/// decode RAW or HEIC — for those we extract the embedded JPEG preview
/// the way the thumbnail pipeline does. Plain JPEG/PNG/WebP/etc. go
/// through a direct read.
pub(crate) fn read_image_bytes_for_detect(path: &Path) -> std::io::Result<Vec<u8>> {
if file_types::needs_ffmpeg_thumbnail(path)
&& let Some(preview) = exif::extract_embedded_jpeg_preview(path)
{
return Ok(preview);
}
// Plain read for everything else. RAW/HEIC files without an embedded
// preview fall through here too; Apollo will then 422 and the caller
// marks the row failed. That's fine; we tried.
std::fs::read(path)
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
fn cand(rel_path: &str) -> FaceCandidate {
FaceCandidate {
rel_path: rel_path.to_string(),
content_hash: format!("hash-{rel_path}"),
}
}
#[test]
fn filter_excluded_pattern_drops_dir_components() {
// A pattern matches a path *component* under base, not a substring.
// Phase 3 needs this for @eaDir / .thumbnails skipping.
let tmp = tempfile::tempdir().unwrap();
let base = tmp.path();
let candidates = vec![
cand("photos/a.jpg"), // keep
cand("photos/@eaDir/SYNOPHOTO_THUMB"), // drop (component match)
cand("photos/eaDir-not-a-thing.jpg"), // keep (substring, not component)
];
let kept = filter_excluded(base, &["@eaDir".to_string()], candidates, Some("test"));
let kept_paths: Vec<_> = kept.iter().map(|c| c.rel_path.as_str()).collect();
assert_eq!(
kept_paths,
vec!["photos/a.jpg", "photos/eaDir-not-a-thing.jpg"]
);
}
#[test]
fn filter_excluded_absolute_dir_drops_subtree() {
// Absolute (under-base) entries drop the whole subtree.
let tmp = tempfile::tempdir().unwrap();
let base = tmp.path();
let candidates = vec![
cand("public/a.jpg"),
cand("private/a.jpg"),
cand("private/sub/b.jpg"),
];
let kept = filter_excluded(base, &["/private".to_string()], candidates, None);
let kept_paths: Vec<_> = kept.iter().map(|c| c.rel_path.as_str()).collect();
assert_eq!(kept_paths, vec!["public/a.jpg"]);
}
#[test]
fn filter_excluded_empty_rules_passes_all() {
// EXCLUDED_DIRS unset still lets every image through — only the
// PathExcluder is skipped, the image-extension gate still runs.
let tmp = tempfile::tempdir().unwrap();
let base = tmp.path();
let candidates = vec![cand("a.jpg"), cand("b.jpg")];
let kept = filter_excluded(base, &[], candidates, None);
assert_eq!(kept.len(), 2);
}
#[test]
fn filter_excluded_drops_videos_and_non_media() {
// Backlog drain pulls every hashed row in image_exif (videos
// included). Videos must never reach Apollo — opencv can't
// decode them, every call would 422 and write a `failed` marker.
let tmp = tempfile::tempdir().unwrap();
let base = tmp.path();
let candidates = vec![
cand("photos/a.jpg"),
cand("photos/clip.mp4"),
cand("photos/clip.MOV"),
cand("photos/notes.txt"),
cand("photos/b.heic"),
];
let kept = filter_excluded(base, &[], candidates, Some("test"));
let kept_paths: Vec<_> = kept.iter().map(|c| c.rel_path.as_str()).collect();
assert_eq!(kept_paths, vec!["photos/a.jpg", "photos/b.heic"]);
}
#[test]
fn read_bytes_passes_through_for_jpeg() {
// JPEG goes through plain read — we DON'T want to lose orientation
// metadata or re-encode here; insightface's exif_transpose handles
// orientation on its end.
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("test.jpg");
let mut buf = Vec::new();
// Tiny 4x4 grey JPEG — encoded by image crate so we know it round-trips.
let img = image::DynamicImage::ImageRgb8(image::RgbImage::from_pixel(
4,
4,
image::Rgb([128, 128, 128]),
));
img.write_to(
&mut std::io::Cursor::new(&mut buf),
image::ImageFormat::Jpeg,
)
.unwrap();
fs::write(&path, &buf).unwrap();
let read = read_image_bytes_for_detect(&path).expect("read jpeg");
assert_eq!(read, buf, "JPEG bytes must pass through verbatim");
}
#[test]
fn read_bytes_falls_back_when_raw_has_no_preview() {
// A `.nef` file with non-RAW bytes won't have an embedded preview —
// the helper falls through to plain read rather than refusing. This
// matches the docstring contract; Apollo will then 422 and we'll
// mark the row as failed.
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("not_really.nef");
fs::write(&path, b"definitely-not-a-raw-file").unwrap();
let read = read_image_bytes_for_detect(&path).expect("fallback read");
assert_eq!(read, b"definitely-not-a-raw-file");
}
}

3403
src/faces.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1659,6 +1659,14 @@ mod tests {
Ok(vec![]) Ok(vec![])
} }
fn get_rel_paths_for_hashes(
&mut self,
_context: &opentelemetry::Context,
_hashes: &[String],
) -> Result<std::collections::HashMap<String, Vec<String>>, DbError> {
Ok(std::collections::HashMap::new())
}
fn list_rel_paths_for_libraries( fn list_rel_paths_for_libraries(
&mut self, &mut self,
_context: &opentelemetry::Context, _context: &opentelemetry::Context,

View File

@@ -12,6 +12,8 @@ pub mod data;
pub mod database; pub mod database;
pub mod error; pub mod error;
pub mod exif; pub mod exif;
pub mod face_watch;
pub mod faces;
pub mod file_types; pub mod file_types;
pub mod files; pub mod files;
pub mod geo; pub mod geo;

View File

@@ -66,6 +66,8 @@ mod data;
mod database; mod database;
mod error; mod error;
mod exif; mod exif;
mod face_watch;
mod faces;
mod file_types; mod file_types;
mod files; mod files;
mod geo; mod geo;
@@ -1459,6 +1461,8 @@ fn main() -> std::io::Result<()> {
app_state.libraries.clone(), app_state.libraries.clone(),
playlist_mgr_for_watcher, playlist_mgr_for_watcher,
preview_gen_for_watcher, preview_gen_for_watcher,
app_state.face_client.clone(),
app_state.excluded_dirs.clone(),
); );
// Start orphaned playlist cleanup job // Start orphaned playlist cleanup job
@@ -1518,6 +1522,7 @@ fn main() -> std::io::Result<()> {
let exif_dao = SqliteExifDao::new(); let exif_dao = SqliteExifDao::new();
let insight_dao = SqliteInsightDao::new(); let insight_dao = SqliteInsightDao::new();
let preview_dao = SqlitePreviewDao::new(); let preview_dao = SqlitePreviewDao::new();
let face_dao = faces::SqliteFaceDao::new();
let cors = Cors::default() let cors = Cors::default()
.allowed_origin_fn(|origin, _req_head| { .allowed_origin_fn(|origin, _req_head| {
// Allow all origins in development, or check against CORS_ALLOWED_ORIGINS env var // Allow all origins in development, or check against CORS_ALLOWED_ORIGINS env var
@@ -1595,6 +1600,7 @@ fn main() -> std::io::Result<()> {
.service(libraries::list_libraries) .service(libraries::list_libraries)
.add_feature(add_tag_services::<_, SqliteTagDao>) .add_feature(add_tag_services::<_, SqliteTagDao>)
.add_feature(knowledge::add_knowledge_services::<_, SqliteKnowledgeDao>) .add_feature(knowledge::add_knowledge_services::<_, SqliteKnowledgeDao>)
.add_feature(faces::add_face_services::<_, faces::SqliteFaceDao>)
.app_data(app_data.clone()) .app_data(app_data.clone())
.app_data::<Data<RealFileSystem>>(Data::new(RealFileSystem::new( .app_data::<Data<RealFileSystem>>(Data::new(RealFileSystem::new(
app_data.base_path.clone(), app_data.base_path.clone(),
@@ -1616,6 +1622,10 @@ fn main() -> std::io::Result<()> {
.app_data::<Data<Mutex<SqliteKnowledgeDao>>>(Data::new(Mutex::new( .app_data::<Data<Mutex<SqliteKnowledgeDao>>>(Data::new(Mutex::new(
SqliteKnowledgeDao::new(), SqliteKnowledgeDao::new(),
))) )))
.app_data::<Data<Mutex<faces::SqliteFaceDao>>>(Data::new(Mutex::new(face_dao)))
.app_data::<Data<crate::ai::face_client::FaceClient>>(Data::new(
app_data.face_client.clone(),
))
.app_data(mp::form::MultipartFormConfig::default().total_limit(1024 * 1024 * 1024)) // 1GB upload limit .app_data(mp::form::MultipartFormConfig::default().total_limit(1024 * 1024 * 1024)) // 1GB upload limit
.app_data(web::JsonConfig::default().error_handler(|err, req| { .app_data(web::JsonConfig::default().error_handler(|err, req| {
let detail = err.to_string(); let detail = err.to_string();
@@ -1780,6 +1790,8 @@ fn watch_files(
libs: Vec<libraries::Library>, libs: Vec<libraries::Library>,
playlist_manager: Addr<VideoPlaylistManager>, playlist_manager: Addr<VideoPlaylistManager>,
preview_generator: Addr<video::actors::PreviewClipGenerator>, preview_generator: Addr<video::actors::PreviewClipGenerator>,
face_client: crate::ai::face_client::FaceClient,
excluded_dirs: Vec<String>,
) { ) {
std::thread::spawn(move || { std::thread::spawn(move || {
// Get polling intervals from environment variables // Get polling intervals from environment variables
@@ -1798,6 +1810,18 @@ fn watch_files(
info!("Starting optimized file watcher"); info!("Starting optimized file watcher");
info!(" Quick scan interval: {} seconds", quick_interval_secs); info!(" Quick scan interval: {} seconds", quick_interval_secs);
info!(" Full scan interval: {} seconds", full_interval_secs); info!(" Full scan interval: {} seconds", full_interval_secs);
// Surface face-detection state at boot so it's obvious whether
// the watcher will hit Apollo. The branch silently no-ops when
// disabled (intentional for legacy deploys), which makes "why
// aren't faces being detected?" hard to diagnose otherwise.
if face_client.is_enabled() {
info!(" Face detection: ENABLED");
} else {
info!(
" Face detection: DISABLED (set APOLLO_FACE_API_BASE_URL \
or APOLLO_API_BASE_URL to enable)"
);
}
for lib in &libs { for lib in &libs {
info!( info!(
" Watching library '{}' (id={}) at {}", " Watching library '{}' (id={}) at {}",
@@ -1812,6 +1836,15 @@ fn watch_files(
let preview_dao = Arc::new(Mutex::new( let preview_dao = Arc::new(Mutex::new(
Box::new(SqlitePreviewDao::new()) as Box<dyn PreviewDao> Box::new(SqlitePreviewDao::new()) as Box<dyn PreviewDao>
)); ));
let face_dao = Arc::new(Mutex::new(
Box::new(faces::SqliteFaceDao::new()) as Box<dyn faces::FaceDao>
));
// tag_dao for the watcher's auto-bind path. Independent of the
// request-handler tag_dao instance — both end up pointing at the
// same SQLite file via SqliteTagDao::default().
let watcher_tag_dao = Arc::new(Mutex::new(
Box::new(SqliteTagDao::default()) as Box<dyn tags::TagDao>
));
let mut last_quick_scan = SystemTime::now(); let mut last_quick_scan = SystemTime::now();
let mut last_full_scan = SystemTime::now(); let mut last_full_scan = SystemTime::now();
@@ -1828,6 +1861,26 @@ fn watch_files(
let is_full_scan = since_last_full.as_secs() >= full_interval_secs; let is_full_scan = since_last_full.as_secs() >= full_interval_secs;
for lib in &libs { for lib in &libs {
// Drain the unhashed-hash backlog AND the face-detection
// backlog every tick, regardless of quick/full. Quick
// scans only walk recently-modified files, so the
// pre-Phase-3 backlog never enters their candidate set
// — without these standalone passes, backfill +
// detection only progressed during full scans
// (default once an hour).
if face_client.is_enabled() {
let context = opentelemetry::Context::new();
backfill_unhashed_backlog(&context, lib, &exif_dao);
process_face_backlog(
&context,
lib,
&face_client,
&face_dao,
&watcher_tag_dao,
&excluded_dirs,
);
}
if is_full_scan { if is_full_scan {
info!( info!(
"Running full scan for library '{}' (scan #{})", "Running full scan for library '{}' (scan #{})",
@@ -1837,6 +1890,10 @@ fn watch_files(
lib, lib,
Arc::clone(&exif_dao), Arc::clone(&exif_dao),
Arc::clone(&preview_dao), Arc::clone(&preview_dao),
Arc::clone(&face_dao),
Arc::clone(&watcher_tag_dao),
face_client.clone(),
&excluded_dirs,
None, None,
playlist_manager.clone(), playlist_manager.clone(),
preview_generator.clone(), preview_generator.clone(),
@@ -1854,6 +1911,10 @@ fn watch_files(
lib, lib,
Arc::clone(&exif_dao), Arc::clone(&exif_dao),
Arc::clone(&preview_dao), Arc::clone(&preview_dao),
Arc::clone(&face_dao),
Arc::clone(&watcher_tag_dao),
face_client.clone(),
&excluded_dirs,
Some(check_since), Some(check_since),
playlist_manager.clone(), playlist_manager.clone(),
preview_generator.clone(), preview_generator.clone(),
@@ -1900,6 +1961,10 @@ fn process_new_files(
library: &libraries::Library, library: &libraries::Library,
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>, exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>, preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
face_dao: Arc<Mutex<Box<dyn faces::FaceDao>>>,
tag_dao: Arc<Mutex<Box<dyn tags::TagDao>>>,
face_client: crate::ai::face_client::FaceClient,
excluded_dirs: &[String],
modified_since: Option<SystemTime>, modified_since: Option<SystemTime>,
playlist_manager: Addr<VideoPlaylistManager>, playlist_manager: Addr<VideoPlaylistManager>,
preview_generator: Addr<video::actors::PreviewClipGenerator>, preview_generator: Addr<video::actors::PreviewClipGenerator>,
@@ -2075,6 +2140,43 @@ fn process_new_files(
} }
} }
// ── Face detection pass ────────────────────────────────────────────
// Run after EXIF writes so newly-registered files have their
// content_hash populated. Skipped wholesale when face_client is
// disabled (no Apollo integration configured) — Phase 3 wires this
// up; the watcher remains usable on legacy deploys.
if face_client.is_enabled() {
// Opportunistic content_hash backfill: photos indexed before
// content-hashing landed (or where the hash compute failed
// silently on insert) end up in image_exif with NULL
// content_hash. build_face_candidates keys on content_hash, so
// those files would never become candidates without backfill.
// Idempotent — subsequent scans see the populated hashes and
// no-op. The dedicated `backfill_hashes` binary is still the
// right tool for very large legacy libraries; this branch
// ensures small/medium deploys self-heal without operator
// action.
backfill_missing_content_hashes(&context, &files, library, &exif_dao);
let candidates = build_face_candidates(&context, &files, &exif_dao, &face_dao);
debug!(
"face_watch: scan tick — {} image file(s) walked, {} candidate(s) (library '{}', modified_since={})",
files.iter().filter(|(p, _)| !is_video_file(p)).count(),
candidates.len(),
library.name,
modified_since.is_some(),
);
if !candidates.is_empty() {
face_watch::run_face_detection_pass(
library,
excluded_dirs,
&face_client,
Arc::clone(&face_dao),
Arc::clone(&tag_dao),
candidates,
);
}
}
// Check for videos that need HLS playlists // Check for videos that need HLS playlists
let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
let mut videos_needing_playlists = Vec::new(); let mut videos_needing_playlists = Vec::new();
@@ -2199,6 +2301,312 @@ fn process_new_files(
} }
} }
/// Compute and persist content_hash for image_exif rows where it's NULL.
///
/// Bounded per call by `FACE_HASH_BACKFILL_MAX_PER_TICK` (default 500) so
/// a watcher tick on a large legacy library doesn't block for hours
/// blake3-ing every photo at once. Subsequent scans pick up the rest.
/// For 50k+ libraries the dedicated `cargo run --bin backfill_hashes`
/// is still faster (it doesn't fight a watcher loop for the DAO mutex).
/// Drain unhashed image_exif rows by querying them directly, independent
/// of the filesystem walk. Quick scans only walk recently-modified
/// files, so a backlog of pre-existing unhashed rows never enters
/// `process_new_files`'s candidate set — left alone, it would only
/// drain on full scans (default once an hour). Calling this every tick
/// keeps the face-detection backlog moving regardless.
///
/// Returns the number of rows successfully backfilled this pass.
fn backfill_unhashed_backlog(
context: &opentelemetry::Context,
library: &libraries::Library,
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
) -> usize {
let cap: i64 = dotenv::var("FACE_HASH_BACKFILL_MAX_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &i64| *n > 0)
.unwrap_or(2000);
// Fetch up to cap+1 rows so we can tell "more remain" without a
// separate count query. Across libraries — there's no per-library
// filter on get_rows_missing_hash today — but we only ever update
// rows whose library_id matches the caller's library, so other
// libraries' rows just get skipped here and picked up on the next
// library's tick. Negligible cost given the cap.
let rows: Vec<(i32, String)> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_rows_missing_hash(context, cap + 1).unwrap_or_default()
};
if rows.is_empty() {
return 0;
}
let more_than_cap = rows.len() as i64 > cap;
let base_path = std::path::Path::new(&library.root_path);
let mut backfilled = 0usize;
let mut errors = 0usize;
let mut skipped_other_lib = 0usize;
for (lib_id, rel_path) in rows.iter().take(cap as usize) {
if *lib_id != library.id {
skipped_other_lib += 1;
continue;
}
let abs = base_path.join(rel_path);
if !abs.exists() {
// File walked away — the watcher's reconciliation pass will
// remove the orphan exif row eventually.
continue;
}
match content_hash::compute(&abs) {
Ok(id) => {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
if let Err(e) =
dao.backfill_content_hash(context, library.id, rel_path, &id.content_hash, id.size_bytes)
{
warn!(
"face_watch: backfill_content_hash failed for {}: {:?}",
rel_path, e
);
errors += 1;
} else {
backfilled += 1;
}
}
Err(e) => {
debug!("face_watch: hash compute failed for {} ({:?})", abs.display(), e);
errors += 1;
}
}
}
if backfilled > 0 || errors > 0 || more_than_cap {
info!(
"face_watch: backfill pass for library '{}': hashed {} ({} error(s), {} skipped to other libraries; {} cap, more_remain={})",
library.name, backfilled, errors, skipped_other_lib, cap, more_than_cap
);
}
backfilled
}
/// Per-tick face-detection drain. Pulls a capped batch of hashed-but-
/// unscanned image_exif rows directly via the FaceDao anti-join and
/// hands them to the existing detection pass. Runs on every tick (not
/// just full scans) so the backlog moves at quick-scan cadence.
fn process_face_backlog(
context: &opentelemetry::Context,
library: &libraries::Library,
face_client: &crate::ai::face_client::FaceClient,
face_dao: &Arc<Mutex<Box<dyn faces::FaceDao>>>,
tag_dao: &Arc<Mutex<Box<dyn tags::TagDao>>>,
excluded_dirs: &[String],
) {
let cap: i64 = dotenv::var("FACE_BACKLOG_MAX_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &i64| *n > 0)
.unwrap_or(64);
let rows: Vec<(String, String)> = {
let mut dao = face_dao.lock().expect("face dao");
match dao.list_unscanned_candidates(context, library.id, cap) {
Ok(r) => r,
Err(e) => {
warn!(
"face_watch: list_unscanned_candidates failed for library '{}': {:?}",
library.name, e
);
return;
}
}
};
if rows.is_empty() {
return;
}
info!(
"face_watch: backlog drain — running detection on {} candidate(s) for library '{}' (cap={})",
rows.len(),
library.name,
cap
);
let candidates: Vec<face_watch::FaceCandidate> = rows
.into_iter()
.map(|(rel_path, content_hash)| face_watch::FaceCandidate {
rel_path,
content_hash,
})
.collect();
face_watch::run_face_detection_pass(
library,
excluded_dirs,
face_client,
Arc::clone(face_dao),
Arc::clone(tag_dao),
candidates,
);
}
fn backfill_missing_content_hashes(
context: &opentelemetry::Context,
files: &[(PathBuf, String)],
library: &libraries::Library,
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
) {
let image_paths: Vec<String> = files
.iter()
.filter(|(p, _)| !is_video_file(p))
.map(|(_, rel)| rel.clone())
.collect();
if image_paths.is_empty() {
return;
}
let exif_records = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_exif_batch(context, &image_paths)
.unwrap_or_default()
};
// Cheap lookup back from rel_path → absolute file_path so
// content_hash::compute can read the bytes.
let path_by_rel: HashMap<String, &PathBuf> =
files.iter().map(|(p, rel)| (rel.clone(), p)).collect();
let cap: usize = dotenv::var("FACE_HASH_BACKFILL_MAX_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &usize| *n > 0)
.unwrap_or(2000);
// Count the unhashed backlog up front so we can surface "still needs
// backfill: N" in the log — without it, a face-scan that's stuck at
// 44% looks stalled when really it's chipping through hashes.
let unhashed_total = exif_records
.iter()
.filter(|r| r.content_hash.is_none())
.count();
let mut backfilled = 0usize;
let mut errors = 0usize;
for record in &exif_records {
// Cap on successes only — earlier this counted errors too, so a
// pocket of chronically-unhashable files at the front of the
// table (vanished mid-scan, permission denied, etc.) burned the
// budget every tick and the rest of the backlog never advanced.
// Errors are still bounded by `unhashed_total` (the loop walks
// each unhashed record at most once per tick).
if backfilled >= cap {
break;
}
if record.content_hash.is_some() {
continue;
}
let Some(file_path) = path_by_rel.get(&record.file_path) else {
// Walked file went missing between the directory scan and now;
// next tick will retry naturally.
continue;
};
match content_hash::compute(file_path) {
Ok(id) => {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
if let Err(e) = dao.backfill_content_hash(
context,
library.id,
&record.file_path,
&id.content_hash,
id.size_bytes,
) {
warn!(
"face_watch: backfill_content_hash failed for {}: {:?}",
record.file_path, e
);
errors += 1;
} else {
backfilled += 1;
}
}
Err(e) => {
debug!(
"face_watch: hash compute failed for {} ({:?})",
file_path.display(),
e
);
errors += 1;
}
}
}
// Always log when there's an unhashed backlog so an operator
// looking at "scan stuck at 44%" can see backfill is running and
// how much remains. Quiet only when there's nothing to do.
if unhashed_total > 0 || backfilled > 0 || errors > 0 {
let remaining = unhashed_total.saturating_sub(backfilled);
info!(
"face_watch: backfilled {}/{} content_hash for library '{}' ({} error(s); {} still need backfill; cap={})",
backfilled, unhashed_total, library.name, errors, remaining, cap
);
}
}
/// Build the face-detection candidate list for a scan tick.
///
/// We need `(rel_path, content_hash)` for every image file that has a
/// content_hash recorded in image_exif but no row in face_detections yet.
/// Re-querying image_exif here picks up rows the EXIF write loop just
/// inserted alongside any pre-existing rows the watcher walked over —
/// covers both new uploads and the initial backlog scan.
fn build_face_candidates(
context: &opentelemetry::Context,
files: &[(PathBuf, String)],
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
face_dao: &Arc<Mutex<Box<dyn faces::FaceDao>>>,
) -> Vec<face_watch::FaceCandidate> {
// Restrict to image files; videos aren't face-scanned in v1 (kamadak
// doesn't even register them in image_exif).
let image_paths: Vec<String> = files
.iter()
.filter(|(p, _)| !is_video_file(p))
.map(|(_, rel)| rel.clone())
.collect();
if image_paths.is_empty() {
return Vec::new();
}
let exif_records = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_exif_batch(context, &image_paths)
.unwrap_or_default()
};
// rel_path → content_hash (only rows with a hash; without one we have
// nothing to key face data against).
let mut hash_by_path: HashMap<String, String> = HashMap::with_capacity(exif_records.len());
for record in exif_records {
if let Some(h) = record.content_hash {
hash_by_path.insert(record.file_path, h);
}
}
let mut candidates = Vec::new();
let mut dao = face_dao.lock().expect("face dao");
for rel_path in image_paths {
let Some(hash) = hash_by_path.get(&rel_path) else {
continue;
};
match dao.already_scanned(context, hash) {
Ok(true) => continue,
Ok(false) => candidates.push(face_watch::FaceCandidate {
rel_path,
content_hash: hash.clone(),
}),
Err(e) => {
warn!("face_watch: already_scanned errored for {}: {:?}", hash, e);
}
}
}
candidates
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@@ -23,7 +23,8 @@ use crate::utils::earliest_fs_time;
// Helper that encapsulates path-exclusion semantics // Helper that encapsulates path-exclusion semantics
#[derive(Debug)] #[derive(Debug)]
struct PathExcluder { pub struct PathExcluder {
base: PathBuf,
excluded_dirs: Vec<PathBuf>, excluded_dirs: Vec<PathBuf>,
excluded_patterns: Vec<String>, excluded_patterns: Vec<String>,
} }
@@ -34,9 +35,12 @@ impl PathExcluder {
/// Rules: /// Rules:
/// - Entries starting with '/' are interpreted as "absolute under base" /// - Entries starting with '/' are interpreted as "absolute under base"
/// (e.g. "/photos/private" -> base/photos/private). /// (e.g. "/photos/private" -> base/photos/private).
/// - Entries without '/' are treated as substring patterns that match /// - Entries without '/' are treated as path-component patterns that
/// anywhere in the full path string (still scoped under base). /// match a directory or file name *under* `base`. The base prefix is
fn new(base: &Path, raw_excluded: &[String]) -> Self { /// stripped before matching so a system-level component (e.g. the
/// `tmp` in `/tmp/...` when running tests) doesn't masquerade as a
/// user-defined exclude.
pub fn new(base: &Path, raw_excluded: &[String]) -> Self {
let mut excluded_dirs = Vec::new(); let mut excluded_dirs = Vec::new();
let mut excluded_patterns = Vec::new(); let mut excluded_patterns = Vec::new();
@@ -53,18 +57,19 @@ impl PathExcluder {
} }
debug!( debug!(
"PathExcluder created. dirs={:?}, patterns={:?}", "PathExcluder created. base={:?}, dirs={:?}, patterns={:?}",
excluded_dirs, excluded_patterns base, excluded_dirs, excluded_patterns
); );
Self { Self {
base: base.to_path_buf(),
excluded_dirs, excluded_dirs,
excluded_patterns, excluded_patterns,
} }
} }
/// Returns true if `path` should be excluded. /// Returns true if `path` should be excluded.
fn is_excluded(&self, path: &Path) -> bool { pub fn is_excluded(&self, path: &Path) -> bool {
// Directory-based exclusions // Directory-based exclusions
for excluded in &self.excluded_dirs { for excluded in &self.excluded_dirs {
if path.starts_with(excluded) { if path.starts_with(excluded) {
@@ -76,10 +81,16 @@ impl PathExcluder {
} }
} }
// Pattern-based exclusions: match whole path components (dir or file name), if self.excluded_patterns.is_empty() {
// not substrings. return false;
if !self.excluded_patterns.is_empty() { }
for component in path.components() {
// Strip the base prefix before scanning components. Without this,
// every path component above `base` (e.g. `tmp` in `/tmp/test123`
// under tempdir, or the user's `home` in `/home/user/Pictures`)
// would match user-defined patterns and produce false positives.
let scan_root = path.strip_prefix(&self.base).unwrap_or(path);
for component in scan_root.components() {
if let Some(comp_str) = component.as_os_str().to_str() if let Some(comp_str) = component.as_os_str().to_str()
&& self.excluded_patterns.iter().any(|pat| pat == comp_str) && self.excluded_patterns.iter().any(|pat| pat == comp_str)
{ {
@@ -90,7 +101,6 @@ impl PathExcluder {
return true; return true;
} }
} }
}
false false
} }

View File

@@ -1,4 +1,5 @@
use crate::ai::apollo_client::ApolloClient; use crate::ai::apollo_client::ApolloClient;
use crate::ai::face_client::FaceClient;
use crate::ai::insight_chat::{ChatLockMap, InsightChatService}; use crate::ai::insight_chat::{ChatLockMap, InsightChatService};
use crate::ai::openrouter::OpenRouterClient; use crate::ai::openrouter::OpenRouterClient;
use crate::ai::{InsightGenerator, OllamaClient, SmsApiClient}; use crate::ai::{InsightGenerator, OllamaClient, SmsApiClient};
@@ -48,6 +49,11 @@ pub struct AppState {
pub insight_generator: InsightGenerator, pub insight_generator: InsightGenerator,
/// Chat continuation service. Hold an Arc so handlers can clone cheaply. /// Chat continuation service. Hold an Arc so handlers can clone cheaply.
pub insight_chat: Arc<InsightChatService>, pub insight_chat: Arc<InsightChatService>,
/// Face inference client (calls Apollo's `/api/internal/faces/*`).
/// Disabled (`is_enabled() == false`) when neither `APOLLO_FACE_API_BASE_URL`
/// nor `APOLLO_API_BASE_URL` is set; the file-watch hook (Phase 3) and
/// manual-face-create handler short-circuit in that case.
pub face_client: FaceClient,
} }
impl AppState { impl AppState {
@@ -82,6 +88,7 @@ impl AppState {
insight_generator: InsightGenerator, insight_generator: InsightGenerator,
insight_chat: Arc<InsightChatService>, insight_chat: Arc<InsightChatService>,
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>, preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
face_client: FaceClient,
) -> Self { ) -> Self {
assert!( assert!(
!libraries_vec.is_empty(), !libraries_vec.is_empty(),
@@ -115,6 +122,7 @@ impl AppState {
sms_client, sms_client,
insight_generator, insight_generator,
insight_chat, insight_chat,
face_client,
} }
} }
@@ -161,6 +169,15 @@ impl Default for AppState {
// generator silently falls through to the legacy Nominatim path. // generator silently falls through to the legacy Nominatim path.
let apollo_client = ApolloClient::new(env::var("APOLLO_API_BASE_URL").ok()); let apollo_client = ApolloClient::new(env::var("APOLLO_API_BASE_URL").ok());
// Face inference client. Falls back to APOLLO_API_BASE_URL when
// APOLLO_FACE_API_BASE_URL is unset (single-Apollo deploys are the
// common case). Both unset = feature disabled, file-watch hook
// and manual-face handlers short-circuit silently.
let face_client_url = env::var("APOLLO_FACE_API_BASE_URL")
.ok()
.or_else(|| env::var("APOLLO_API_BASE_URL").ok());
let face_client = FaceClient::new(face_client_url);
// Initialize DAOs // Initialize DAOs
let insight_dao: Arc<Mutex<Box<dyn InsightDao>>> = let insight_dao: Arc<Mutex<Box<dyn InsightDao>>> =
Arc::new(Mutex::new(Box::new(SqliteInsightDao::new()))); Arc::new(Mutex::new(Box::new(SqliteInsightDao::new())));
@@ -244,6 +261,7 @@ impl Default for AppState {
insight_generator, insight_generator,
insight_chat, insight_chat,
preview_dao, preview_dao,
face_client,
) )
} }
} }
@@ -382,6 +400,7 @@ impl AppState {
insight_generator, insight_generator,
insight_chat, insight_chat,
preview_dao, preview_dao,
FaceClient::new(None), // disabled in test
) )
} }
} }

View File

@@ -32,6 +32,7 @@ where
) )
.service(web::resource("image/tags/all").route(web::get().to(get_all_tags::<TagD>))) .service(web::resource("image/tags/all").route(web::get().to(get_all_tags::<TagD>)))
.service(web::resource("image/tags/batch").route(web::post().to(update_tags::<TagD>))) .service(web::resource("image/tags/batch").route(web::post().to(update_tags::<TagD>)))
.service(web::resource("image/tags/lookup").route(web::post().to(lookup_tags_batch::<TagD>)))
} }
async fn add_tag<D: TagDao>( async fn add_tag<D: TagDao>(
@@ -238,6 +239,149 @@ async fn update_tags<D: TagDao>(
.into_http_internal_err() .into_http_internal_err()
} }
#[derive(Deserialize, Debug)]
pub struct LookupTagsBatchRequest {
pub paths: Vec<String>,
}
/// Bulk per-path tag lookup with cross-library content-hash sibling
/// expansion. Apollo's photo-match flow used to fan out one
/// ``GET /image/tags?path=`` per record (~4k for a wide window) —
/// each call locked the dao briefly and the round-trip cost dwarfed
/// the actual SQL. This collapses the whole fan-out into:
///
/// 1. one ``image_exif`` batch lookup → query path → content_hash
/// 2. one ``image_exif`` JOIN by content_hash → all sibling rel_paths
/// (so a tag applied under library A surfaces under library B
/// when the content hashes match — important once a backup mount
/// holds copies of files from the primary library)
/// 3. one ``tagged_photo`` JOIN over the union of (query + sibling)
/// rel_paths
///
/// Body: ``{paths: [...]}``; response: ``{path: [{id, name, ...}]}``
/// with only paths that have at least one tag (caller treats absence
/// as empty). Each chunk is capped to stay under SQLite's variable
/// limit; five queries per 4k photos is still ~800x cheaper than
/// per-path HTTP fan-out.
async fn lookup_tags_batch<D: TagDao>(
_: Claims,
http_request: HttpRequest,
body: web::Json<LookupTagsBatchRequest>,
tag_dao: web::Data<Mutex<D>>,
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
use std::collections::{HashMap, HashSet};
let context = extract_context_from_request(&http_request);
let span = global_tracer().start_with_context("lookup_tags_batch", &context);
let span_context = opentelemetry::Context::current_with_span(span);
if body.paths.is_empty() {
return HttpResponse::Ok().json(HashMap::<String, Vec<Tag>>::new());
}
let query_paths: Vec<String> = body.paths.iter().map(|p| normalize_path(p)).collect();
// Stage 1: query → content_hash mapping. Files without a hash yet
// (just-indexed, hash compute failed, etc.) skip the sibling
// expansion and only get tags from their own rel_path.
let exif_records = {
let mut dao = exif_dao.lock().expect("Unable to get ExifDao");
match dao.get_exif_batch(&span_context, &query_paths) {
Ok(rows) => rows,
Err(e) => {
return HttpResponse::InternalServerError()
.body(format!("exif batch lookup failed: {:?}", e));
}
}
};
let mut hash_by_path: HashMap<String, String> = HashMap::with_capacity(exif_records.len());
for record in exif_records {
if let Some(h) = record.content_hash {
hash_by_path.insert(record.file_path, h);
}
}
let unique_hashes: Vec<String> = hash_by_path
.values()
.cloned()
.collect::<HashSet<_>>()
.into_iter()
.collect();
// Stage 2: hash → all sibling rel_paths.
let paths_by_hash = if unique_hashes.is_empty() {
HashMap::new()
} else {
let mut dao = exif_dao.lock().expect("Unable to get ExifDao");
match dao.get_rel_paths_for_hashes(&span_context, &unique_hashes) {
Ok(map) => map,
Err(e) => {
return HttpResponse::InternalServerError()
.body(format!("hash sibling lookup failed: {:?}", e));
}
}
};
// Stage 3: build expanded path set and the reverse map
// sibling → [original query paths whose tag bucket should include
// the sibling's tags]. A query path always attributes to itself
// (covers the no-content-hash case).
let mut originals_by_sibling: HashMap<String, Vec<String>> = HashMap::new();
let mut all_paths: HashSet<String> = HashSet::new();
for query_path in &query_paths {
all_paths.insert(query_path.clone());
originals_by_sibling
.entry(query_path.clone())
.or_default()
.push(query_path.clone());
if let Some(hash) = hash_by_path.get(query_path)
&& let Some(siblings) = paths_by_hash.get(hash)
{
for sibling in siblings {
if sibling == query_path {
continue;
}
all_paths.insert(sibling.clone());
originals_by_sibling
.entry(sibling.clone())
.or_default()
.push(query_path.clone());
}
}
}
// Stage 4: tags grouped by rel_path for the union.
let all_paths_vec: Vec<String> = all_paths.into_iter().collect();
let tags_by_sibling = {
let mut dao = tag_dao.lock().expect("Unable to get TagDao");
match dao.get_tags_grouped_by_paths(&span_context, &all_paths_vec) {
Ok(map) => map,
Err(e) => {
return HttpResponse::InternalServerError().body(format!("{}", e));
}
}
};
// Stage 5: aggregate sibling tags back to original query paths,
// de-duped by tag id. Empty buckets stay out of the response so
// the caller's "missing key = []" contract holds.
let mut result: HashMap<String, Vec<Tag>> = HashMap::new();
for (sibling_path, originals) in originals_by_sibling {
if let Some(tags) = tags_by_sibling.get(&sibling_path) {
for orig in originals {
let entry = result.entry(orig).or_default();
for t in tags {
if !entry.iter().any(|e| e.id == t.id) {
entry.push(t.clone());
}
}
}
}
}
span_context.span().set_status(Status::Ok);
HttpResponse::Ok().json(result)
}
#[derive(Serialize, Queryable, Clone, Debug, PartialEq)] #[derive(Serialize, Queryable, Clone, Debug, PartialEq)]
pub struct Tag { pub struct Tag {
pub id: i32, pub id: i32,
@@ -317,6 +461,14 @@ pub trait TagDao: Send + Sync {
context: &opentelemetry::Context, context: &opentelemetry::Context,
paths: &[String], paths: &[String],
) -> anyhow::Result<Vec<Tag>>; ) -> anyhow::Result<Vec<Tag>>;
/// Per-path grouped lookup: ``rel_path → [tags]``. Used by the
/// ``/image/tags/lookup`` batch endpoint. Returns only paths that
/// have at least one tag; the caller treats absence as empty.
fn get_tags_grouped_by_paths(
&mut self,
context: &opentelemetry::Context,
paths: &[String],
) -> anyhow::Result<std::collections::HashMap<String, Vec<Tag>>>;
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag>; fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag>;
fn remove_tag( fn remove_tag(
&mut self, &mut self,
@@ -470,6 +622,51 @@ impl TagDao for SqliteTagDao {
}) })
} }
fn get_tags_grouped_by_paths(
&mut self,
context: &opentelemetry::Context,
paths: &[String],
) -> anyhow::Result<std::collections::HashMap<String, Vec<Tag>>> {
use std::collections::HashMap;
let mut out: HashMap<String, Vec<Tag>> = HashMap::new();
if paths.is_empty() {
return Ok(out);
}
let mut conn = self
.connection
.lock()
.expect("Unable to lock SqliteTagDao connection");
trace_db_call(context, "query", "get_tags_grouped_by_paths", |span| {
span.set_attribute(KeyValue::new("path_count", paths.len() as i64));
// SQLite's default SQLITE_LIMIT_VARIABLE_NUMBER is 32766 in
// modern builds (999 in old ones). Chunk at 500 to stay
// safely under both — five queries for a 4k-photo grid is
// still ~800x cheaper than 4k single-row HTTP calls.
const CHUNK: usize = 500;
for chunk in paths.chunks(CHUNK) {
let rows: Vec<(String, i32, String, i64)> = tagged_photo::table
.inner_join(tags::table)
.filter(tagged_photo::rel_path.eq_any(chunk))
.select((
tagged_photo::rel_path,
tags::id,
tags::name,
tags::created_time,
))
.get_results(conn.deref_mut())
.with_context(|| "Unable to get tags grouped from Sqlite")?;
for (rel_path, id, name, created_time) in rows {
out.entry(rel_path).or_default().push(Tag {
id,
name,
created_time,
});
}
}
Ok(out)
})
}
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag> { fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag> {
let mut conn = self let mut conn = self
.connection .connection
@@ -893,6 +1090,23 @@ mod tests {
Ok(out) Ok(out)
} }
fn get_tags_grouped_by_paths(
&mut self,
_context: &opentelemetry::Context,
paths: &[String],
) -> anyhow::Result<std::collections::HashMap<String, Vec<Tag>>> {
let tagged = self.tagged_photos.borrow();
let mut out = std::collections::HashMap::new();
for p in paths {
if let Some(tags) = tagged.get(p)
&& !tags.is_empty()
{
out.insert(p.clone(), tags.clone());
}
}
Ok(out)
}
fn create_tag( fn create_tag(
&mut self, &mut self,
_context: &opentelemetry::Context, _context: &opentelemetry::Context,
@@ -1026,6 +1240,42 @@ mod tests {
} }
} }
#[actix_rt::test]
async fn get_tags_grouped_by_paths_returns_per_path_buckets() {
// Backstop for the batch tag-lookup endpoint: confirms the
// grouped variant returns one bucket per path with at least
// one tag, and omits paths with no tags entirely (the caller
// treats absence as []). The handler stacks sibling expansion
// on top via image_exif content_hash; the DAO method itself
// just needs to honour rel_path → tags directly.
let mut dao = TestTagDao::new();
let ctx = opentelemetry::Context::current();
// Seed: two paths tagged, one path untagged.
dao.tagged_photos.borrow_mut().insert(
"a.jpg".into(),
vec![Tag { id: 1, name: "alpha".into(), created_time: 0 }],
);
dao.tagged_photos.borrow_mut().insert(
"b.jpg".into(),
vec![
Tag { id: 2, name: "beta".into(), created_time: 0 },
Tag { id: 3, name: "gamma".into(), created_time: 0 },
],
);
let grouped = dao
.get_tags_grouped_by_paths(
&ctx,
&["a.jpg".into(), "b.jpg".into(), "c.jpg".into()],
)
.unwrap();
assert_eq!(grouped.get("a.jpg").map(|v| v.len()), Some(1));
assert_eq!(grouped.get("b.jpg").map(|v| v.len()), Some(2));
assert!(
!grouped.contains_key("c.jpg"),
"untagged paths must be absent so caller's missing-key=[] contract holds"
);
}
#[actix_rt::test] #[actix_rt::test]
async fn add_new_tag_test() { async fn add_new_tag_test() {
let tag_dao = TestTagDao::new(); let tag_dao = TestTagDao::new();