Face Recognition / People Integration #61
85
.env.example
Normal file
85
.env.example
Normal file
@@ -0,0 +1,85 @@
|
||||
# ImageApi configuration template. Copy to `.env` and fill in for your
|
||||
# deploy. Comments mirror the canonical docs in CLAUDE.md — see there
|
||||
# for the full picture (especially the AI-Insights / Apollo / face
|
||||
# integration sections).
|
||||
|
||||
# ── Required ────────────────────────────────────────────────────────────
|
||||
DATABASE_URL=./database.db
|
||||
BASE_PATH=/path/to/media
|
||||
THUMBNAILS=/path/to/thumbnails
|
||||
VIDEO_PATH=/path/to/video/hls
|
||||
GIFS_DIRECTORY=/path/to/gifs
|
||||
PREVIEW_CLIPS_DIRECTORY=/path/to/preview-clips
|
||||
BIND_URL=0.0.0.0:8080
|
||||
CORS_ALLOWED_ORIGINS=http://localhost:3000
|
||||
SECRET_KEY=replace-me-with-a-long-random-secret
|
||||
RUST_LOG=info
|
||||
|
||||
# ── File watching ───────────────────────────────────────────────────────
|
||||
# Quick scan = recently-modified-files only; full scan = comprehensive walk.
|
||||
WATCH_QUICK_INTERVAL_SECONDS=60
|
||||
WATCH_FULL_INTERVAL_SECONDS=3600
|
||||
# Comma-separated path prefixes / component names to skip in /memories
|
||||
# AND in face detection (e.g. @eaDir, .thumbnails, /private).
|
||||
EXCLUDED_DIRS=
|
||||
|
||||
# ── Video / HLS ─────────────────────────────────────────────────────────
|
||||
HLS_CONCURRENCY=2
|
||||
HLS_TIMEOUT_SECONDS=900
|
||||
PLAYLIST_CLEANUP_INTERVAL_SECONDS=86400
|
||||
|
||||
# ── Telemetry (release builds only) ─────────────────────────────────────
|
||||
# OTLP_OTLS_ENDPOINT=http://localhost:4317
|
||||
|
||||
# ── AI Insights — Ollama (local LLM) ────────────────────────────────────
|
||||
OLLAMA_PRIMARY_URL=http://localhost:11434
|
||||
OLLAMA_PRIMARY_MODEL=nemotron-3-nano:30b
|
||||
# Optional fallback server tried on connection failure.
|
||||
# OLLAMA_FALLBACK_URL=http://server:11434
|
||||
# OLLAMA_FALLBACK_MODEL=llama3.2:3b
|
||||
OLLAMA_REQUEST_TIMEOUT_SECONDS=120
|
||||
# Cap on tool-calling iterations per chat turn / agentic insight.
|
||||
AGENTIC_MAX_ITERATIONS=6
|
||||
AGENTIC_CHAT_MAX_ITERATIONS=6
|
||||
|
||||
# ── AI Insights — OpenRouter (hybrid backend, optional) ─────────────────
|
||||
# Set OPENROUTER_API_KEY to enable the hybrid backend (vision stays
|
||||
# local on Ollama, chat routes to OpenRouter).
|
||||
# OPENROUTER_API_KEY=sk-or-...
|
||||
# OPENROUTER_DEFAULT_MODEL=anthropic/claude-sonnet-4
|
||||
# OPENROUTER_ALLOWED_MODELS=openai/gpt-4o-mini,anthropic/claude-haiku-4-5,google/gemini-2.5-flash
|
||||
# OPENROUTER_BASE_URL=https://openrouter.ai/api/v1
|
||||
# OPENROUTER_EMBEDDING_MODEL=openai/text-embedding-3-small
|
||||
# OPENROUTER_HTTP_REFERER=https://your-site.example
|
||||
# OPENROUTER_APP_TITLE=ImageApi
|
||||
|
||||
# ── AI Insights — sibling services (optional) ───────────────────────────
|
||||
# Apollo (places + face inference). Single Apollo deploys typically set
|
||||
# only APOLLO_API_BASE_URL and let the face client fall back to it.
|
||||
# APOLLO_API_BASE_URL=http://apollo.lan:8000
|
||||
# APOLLO_FACE_API_BASE_URL=http://apollo.lan:8000
|
||||
# SMS_API_URL=http://localhost:8000
|
||||
# SMS_API_TOKEN=
|
||||
|
||||
# Display name used in agentic prompts when the LLM refers to "you".
|
||||
USER_NAME=
|
||||
|
||||
# ── Face detection (Phase 3+) ───────────────────────────────────────────
|
||||
# Cosine-sim floor for auto-binding a detected face to an existing
|
||||
# same-named person on detection. 0.4 ≈ moderate-confidence match.
|
||||
FACE_AUTOBIND_MIN_COS=0.4
|
||||
# Per-scan-tick fan-out into Apollo's detect endpoint. Apollo's GPU
|
||||
# pool serializes server-side; this just overlaps file-IO with
|
||||
# inference RTT.
|
||||
FACE_DETECT_CONCURRENCY=8
|
||||
# Per-detect HTTP timeout. CPU-only Apollo deploys may need higher.
|
||||
FACE_DETECT_TIMEOUT_SEC=60
|
||||
# Per-tick caps on the two backlog drains (independent of WATCH_*
|
||||
# quick / full scans). Tune up if you have a large unscanned backlog
|
||||
# and want it to clear faster; tune down if Apollo is overloaded.
|
||||
FACE_BACKLOG_MAX_PER_TICK=64
|
||||
FACE_HASH_BACKFILL_MAX_PER_TICK=2000
|
||||
|
||||
# ── RAG / search ────────────────────────────────────────────────────────
|
||||
# Set to `1` to enable cross-encoder reranking on /search results.
|
||||
SEARCH_RAG_RERANK=0
|
||||
38
CLAUDE.md
38
CLAUDE.md
@@ -210,7 +210,34 @@ Centralized in `file_types.rs` with constants `IMAGE_EXTENSIONS` and `VIDEO_EXTE
|
||||
All database operations and HTTP handlers wrapped in spans. In release builds, exports to OTLP endpoint via `OTLP_OTLS_ENDPOINT`. Debug builds use basic logger.
|
||||
|
||||
**Memory Exclusion:**
|
||||
`PathExcluder` in `memories.rs` filters out directories from memories API via `EXCLUDED_DIRS` environment variable (comma-separated paths or substring patterns).
|
||||
`PathExcluder` in `memories.rs` filters out directories from memories API via `EXCLUDED_DIRS` environment variable (comma-separated paths or substring patterns). The same excluder is applied to face-detection candidates (`face_watch::filter_excluded`) so junk directories like `@eaDir` / `.thumbnails` don't burn detect calls on Apollo.
|
||||
|
||||
### Face detection system
|
||||
|
||||
ImageApi owns the face data; Apollo (sibling repo) hosts the insightface inference service. Inference is triggered automatically by the file watcher and persisted into two tables:
|
||||
|
||||
- `persons(id, name UNIQUE COLLATE NOCASE, cover_face_id, entity_id, created_from_tag, notes, ...)` — operator-managed, name is the user-visible identity.
|
||||
- `face_detections(id, library_id, content_hash, rel_path, bbox_*, embedding BLOB, confidence, source, person_id, status, model_version, ...)` — keyed on `content_hash` so a photo duplicated across libraries is detected once. Marker rows for `status IN ('no_faces','failed')` carry NULL bbox/embedding (CHECK constraint enforces this).
|
||||
|
||||
**Why content_hash and not (library_id, rel_path):** ties face data to the bytes, not the path. A backup mount that copies files from the primary library naturally inherits the existing detections without re-running inference.
|
||||
|
||||
**File-watch hook** (`src/main.rs::process_new_files`): for each photo with a populated `content_hash`, check `FaceDao::already_scanned(hash)`; if not, send bytes (or embedded JPEG preview for RAW via `exif::extract_embedded_jpeg_preview`) to Apollo's `/api/internal/faces/detect`. K=`FACE_DETECT_CONCURRENCY` (default 8) parallel calls per scan tick; Apollo serializes them via its single-worker GPU pool. `face_watch.rs` is the Tokio orchestration layer.
|
||||
|
||||
**Per-tick backlog drain** (also `src/main.rs`): two passes that run on every watcher tick regardless of quick-vs-full scan:
|
||||
- `backfill_unhashed_backlog` — populates `image_exif.content_hash` for photos that arrived before the hash field was retroactive. Capped by `FACE_HASH_BACKFILL_MAX_PER_TICK` (default 2000); errors don't burn the cap.
|
||||
- `process_face_backlog` — runs detection on photos that have a hash but no `face_detections` row. Capped by `FACE_BACKLOG_MAX_PER_TICK` (default 64). Selected via a SQL anti-join (`FaceDao::list_unscanned_candidates`); videos and EXCLUDED_DIRS paths filtered out client-side via `face_watch::filter_excluded` so they never reach Apollo.
|
||||
|
||||
**Auto-bind on detection:** when a photo carries a tag whose name matches a `persons.name` (case-insensitive), the new face binds automatically iff cosine similarity to the person's existing-face mean is ≥ `FACE_AUTOBIND_MIN_COS` (default 0.4). Persons with no existing faces bind unconditionally and the new face becomes the cover.
|
||||
|
||||
**Manual face create** (`POST /image/faces`): crops the image to the user-supplied bbox, applies EXIF orientation via `exif::apply_orientation` (the `image` crate hands raw pre-rotation pixels — without this, manually-drawn bboxes never resolved a face on re-detection), pads to ~50% of bbox dims (RetinaFace anchor scales need ~50% face-fill at det_size=640), then calls Apollo's embed endpoint. A `force` flag lets the operator save a face the detector couldn't see (e.g. profile shots, occluded faces) — the row gets a zero-vector embedding so it's manually-bound only and won't participate in clustering.
|
||||
|
||||
**Rerun preserves manual rows** (`POST /image/faces/{id}/rerun`): only `source='auto'` rows are deleted before re-running detection. `already_scanned` returns true on ANY row, so a photo whose only faces are manually drawn never auto-redetects.
|
||||
|
||||
Module map:
|
||||
- `src/faces.rs` — `FaceDao` trait + `SqliteFaceDao` impl, route handlers for `/faces/*`, `/image/faces/*`, `/persons/*`. Mirror of `tags.rs` layout.
|
||||
- `src/face_watch.rs` — Tokio orchestration for the file-watch detect pass; `filter_excluded` (PathExcluder + image-extension filter), `read_image_bytes_for_detect` (RAW preview fallback).
|
||||
- `src/ai/face_client.rs` — HTTP client for Apollo's inference. Configured by `APOLLO_FACE_API_BASE_URL`, falls back to `APOLLO_API_BASE_URL`. Both unset → feature disabled, file-watch hook is a no-op.
|
||||
- `migrations/2026-04-29-000000_add_faces/` — schema.
|
||||
|
||||
### Startup Sequence
|
||||
|
||||
@@ -286,6 +313,15 @@ SMS_API_TOKEN=your-api-token # SMS API authentication token (o
|
||||
# `get_personal_place_at` tool. Unset = legacy Nominatim-only path.
|
||||
APOLLO_API_BASE_URL=http://apollo.lan:8000 # Base URL of the sibling Apollo backend
|
||||
|
||||
# Face inference (optional). Apollo also hosts the insightface inference
|
||||
# service; ImageApi calls it from the file-watch hook (Phase 3) and from
|
||||
# the manual face-create endpoint. Falls back to APOLLO_API_BASE_URL when
|
||||
# unset (typical single-Apollo deploy). Both unset = feature disabled.
|
||||
APOLLO_FACE_API_BASE_URL=http://apollo.lan:8000 # Override if face service runs separately
|
||||
FACE_AUTOBIND_MIN_COS=0.4 # Phase 3: cosine-sim floor for tag-name auto-bind
|
||||
FACE_DETECT_CONCURRENCY=8 # Phase 3: per-scan-tick parallel detect calls
|
||||
FACE_DETECT_TIMEOUT_SEC=60 # reqwest client timeout (CPU inference can be slow)
|
||||
|
||||
# OpenRouter (Hybrid Backend) - keeps embeddings + vision local, routes chat to OpenRouter
|
||||
OPENROUTER_API_KEY=sk-or-... # Required to enable hybrid backend
|
||||
OPENROUTER_DEFAULT_MODEL=anthropic/claude-sonnet-4 # Used when client doesn't pick a model
|
||||
|
||||
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -1913,7 +1913,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "image-api"
|
||||
version = "1.0.0"
|
||||
version = "1.1.0"
|
||||
dependencies = [
|
||||
"actix",
|
||||
"actix-cors",
|
||||
@@ -3229,6 +3229,7 @@ dependencies = [
|
||||
"js-sys",
|
||||
"log",
|
||||
"mime",
|
||||
"mime_guess",
|
||||
"native-tls",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "image-api"
|
||||
version = "1.0.0"
|
||||
version = "1.1.0"
|
||||
authors = ["Cameron Cordes <cameronc.dev@gmail.com>"]
|
||||
edition = "2024"
|
||||
|
||||
@@ -49,7 +49,7 @@ opentelemetry-appender-log = "0.31.0"
|
||||
tempfile = "3.20.0"
|
||||
regex = "1.11.1"
|
||||
exif = { package = "kamadak-exif", version = "0.6.1" }
|
||||
reqwest = { version = "0.12", features = ["json", "stream"] }
|
||||
reqwest = { version = "0.12", features = ["json", "stream", "multipart"] }
|
||||
async-stream = "0.3"
|
||||
tokio-util = { version = "0.7", features = ["io"] }
|
||||
bytes = "1"
|
||||
|
||||
31
README.md
31
README.md
@@ -159,3 +159,34 @@ Daily conversation summaries are generated automatically on server startup. Conf
|
||||
- Contacts to process
|
||||
- Model version used for embeddings: `nomic-embed-text:v1.5`
|
||||
|
||||
### Apollo + Face Recognition (Optional)
|
||||
|
||||
Apollo (sibling project) hosts both the Places API and the local insightface
|
||||
inference service. Both integrations are optional and degrade gracefully when
|
||||
unset.
|
||||
|
||||
- `APOLLO_API_BASE_URL` - Base URL of the sibling Apollo backend.
|
||||
- When set, photo-insight enrichment folds the user's personal place name
|
||||
(Home, Work, Cabin, ...) into the location string, and the agentic loop
|
||||
gains a `get_personal_place_at` tool. Unset = legacy Nominatim-only path.
|
||||
- `APOLLO_FACE_API_BASE_URL` - Base URL for the face-detection service.
|
||||
- Falls back to `APOLLO_API_BASE_URL` when unset (typical single-Apollo
|
||||
deploy). Both unset = face feature disabled (file-watch hook and
|
||||
manual-face endpoints short-circuit silently).
|
||||
- `FACE_AUTOBIND_MIN_COS` (Phase 3) - Cosine-sim floor for auto-binding a
|
||||
detected face to an existing same-named person via people-tag bootstrap
|
||||
[default: `0.4`].
|
||||
- `FACE_DETECT_CONCURRENCY` (Phase 3) - Per-scan-tick concurrent detect
|
||||
calls fired by the file watcher [default: `8`]. Apollo serializes them
|
||||
via its single-worker GPU pool.
|
||||
- `FACE_DETECT_TIMEOUT_SEC` - reqwest client timeout per detect call
|
||||
[default: `60`]. CPU inference on a backlog can take many seconds.
|
||||
- `FACE_BACKLOG_MAX_PER_TICK` - Cap on the per-tick backlog drain (photos
|
||||
with a content_hash but no face_detections row) [default: `64`]. Runs
|
||||
every watcher tick regardless of quick-vs-full scan, so the unscanned
|
||||
set drains independently of the file walk.
|
||||
- `FACE_HASH_BACKFILL_MAX_PER_TICK` - Cap on the per-tick content_hash
|
||||
backfill (photos that were registered before the hash field was
|
||||
populated retroactively) [default: `2000`]. Errors don't burn the cap;
|
||||
only successful hashes count.
|
||||
|
||||
|
||||
2
migrations/2026-04-29-000000_add_faces/down.sql
Normal file
2
migrations/2026-04-29-000000_add_faces/down.sql
Normal file
@@ -0,0 +1,2 @@
|
||||
DROP TABLE IF EXISTS face_detections;
|
||||
DROP TABLE IF EXISTS persons;
|
||||
67
migrations/2026-04-29-000000_add_faces/up.sql
Normal file
67
migrations/2026-04-29-000000_add_faces/up.sql
Normal file
@@ -0,0 +1,67 @@
|
||||
-- Local face recognition tables.
|
||||
--
|
||||
-- `persons` are visual identities (the "who" of a face). The optional
|
||||
-- `entity_id` bridges to the existing knowledge graph `entities` table —
|
||||
-- when set, this person is the visual side of an LLM-extracted entity.
|
||||
-- Don't auto-create entities from persons; the entity table represents
|
||||
-- LLM-extracted knowledge with its own confidence semantics, and silently
|
||||
-- filling it from face detections muddies the provenance.
|
||||
--
|
||||
-- `face_detections` carries one row per detected face on a content_hash,
|
||||
-- plus marker rows with `status='no_faces'` or `status='failed'` so the
|
||||
-- file watcher knows not to re-scan a hash. Keying on `content_hash`
|
||||
-- (cross-library dedup) rather than `(library_id, rel_path)` means the
|
||||
-- same JPEG in two libraries is scanned once. The denormalized `rel_path`
|
||||
-- carries the most-recently-seen path — useful for cluster-thumb URL
|
||||
-- generation; canonical path lookup goes through image_exif.
|
||||
|
||||
CREATE TABLE persons (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
cover_face_id INTEGER, -- backfilled when the first face binds
|
||||
entity_id INTEGER, -- optional bridge to entities(id)
|
||||
created_from_tag BOOLEAN NOT NULL DEFAULT 0,
|
||||
notes TEXT,
|
||||
created_at BIGINT NOT NULL,
|
||||
updated_at BIGINT NOT NULL,
|
||||
CONSTRAINT fk_persons_entity FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE SET NULL,
|
||||
UNIQUE(name COLLATE NOCASE)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_persons_entity ON persons(entity_id);
|
||||
|
||||
CREATE TABLE face_detections (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
|
||||
library_id INTEGER NOT NULL,
|
||||
content_hash TEXT NOT NULL, -- canonical key (cross-library dedup)
|
||||
rel_path TEXT NOT NULL, -- denormalized; most recently seen
|
||||
bbox_x REAL, -- normalized 0..1; NULL on marker rows
|
||||
bbox_y REAL,
|
||||
bbox_w REAL,
|
||||
bbox_h REAL,
|
||||
embedding BLOB, -- 512×f32 = 2048 bytes; NULL on marker rows
|
||||
confidence REAL, -- detector score
|
||||
source TEXT NOT NULL, -- 'auto' | 'manual'
|
||||
person_id INTEGER,
|
||||
status TEXT NOT NULL DEFAULT 'detected', -- 'detected' | 'no_faces' | 'failed'
|
||||
model_version TEXT NOT NULL, -- e.g. 'buffalo_l'; embedding lineage
|
||||
created_at BIGINT NOT NULL,
|
||||
CONSTRAINT fk_fd_library FOREIGN KEY (library_id) REFERENCES libraries(id),
|
||||
CONSTRAINT fk_fd_person FOREIGN KEY (person_id) REFERENCES persons(id) ON DELETE SET NULL,
|
||||
-- Detected rows carry geometry + embedding; marker rows ('no_faces',
|
||||
-- 'failed') carry neither. CHECK enforces the invariant so manual
|
||||
-- inserts can't slip through with half a row.
|
||||
CONSTRAINT chk_marker CHECK (
|
||||
(status = 'detected' AND bbox_x IS NOT NULL AND embedding IS NOT NULL)
|
||||
OR (status IN ('no_faces','failed') AND bbox_x IS NULL AND embedding IS NULL)
|
||||
)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_face_detections_hash ON face_detections(content_hash);
|
||||
CREATE INDEX idx_face_detections_lib_path ON face_detections(library_id, rel_path);
|
||||
CREATE INDEX idx_face_detections_person ON face_detections(person_id);
|
||||
CREATE INDEX idx_face_detections_status ON face_detections(status);
|
||||
-- One marker row per (content_hash, status='no_faces') so the file watcher
|
||||
-- doesn't double-mark when a hash is seen on multiple full-scan passes.
|
||||
CREATE UNIQUE INDEX idx_face_detections_no_faces_unique
|
||||
ON face_detections(content_hash) WHERE status = 'no_faces';
|
||||
2
migrations/2026-04-29-000200_add_is_ignored/down.sql
Normal file
2
migrations/2026-04-29-000200_add_is_ignored/down.sql
Normal file
@@ -0,0 +1,2 @@
|
||||
DROP INDEX IF EXISTS idx_persons_is_ignored;
|
||||
ALTER TABLE persons DROP COLUMN is_ignored;
|
||||
20
migrations/2026-04-29-000200_add_is_ignored/up.sql
Normal file
20
migrations/2026-04-29-000200_add_is_ignored/up.sql
Normal file
@@ -0,0 +1,20 @@
|
||||
-- IGNORE / junk bucket for the face recognition feature.
|
||||
--
|
||||
-- An "Ignored" person is the destination for strangers, faces the user
|
||||
-- doesn't want tagged, and false detections. It looks like any other
|
||||
-- person row (so face_detections.person_id stays a clean foreign key)
|
||||
-- but `is_ignored=1` flags it for special UI treatment:
|
||||
-- - hidden from the persons list by default
|
||||
-- - excluded from `find_persons_by_names_ci` so a tag-name match
|
||||
-- can never auto-bind a real face to the ignore bucket
|
||||
-- - cluster-suggest already filters by `person_id IS NULL`, so faces
|
||||
-- bound to an ignored person are naturally excluded from future
|
||||
-- re-clustering
|
||||
--
|
||||
-- Partial index because the WHERE-clause is small (typically 1 row),
|
||||
-- and we only ever query for `is_ignored = 1` to find the bucket.
|
||||
|
||||
ALTER TABLE persons ADD COLUMN is_ignored BOOLEAN NOT NULL DEFAULT 0;
|
||||
|
||||
CREATE INDEX idx_persons_is_ignored
|
||||
ON persons(is_ignored) WHERE is_ignored = 1;
|
||||
370
src/ai/face_client.rs
Normal file
370
src/ai/face_client.rs
Normal file
@@ -0,0 +1,370 @@
|
||||
//! Thin async HTTP client for Apollo's `/api/internal/faces/*` endpoints.
|
||||
//!
|
||||
//! Apollo (the personal location-history viewer at the sibling repo) hosts the
|
||||
//! insightface inference service. This client is the ImageApi side of the
|
||||
//! contract — it shoves image bytes through `/detect` and returns boxes +
|
||||
//! 512-d ArcFace embeddings, plus a single-embedding `/embed` for the manual
|
||||
//! face-create flow.
|
||||
//!
|
||||
//! Mirrors `apollo_client.rs` shape: optional base URL (None = disabled, the
|
||||
//! file watcher and manual-create handlers no-op), reqwest client with a
|
||||
//! generous timeout because CPU inference on a backlog can take many seconds
|
||||
//! per photo.
|
||||
//!
|
||||
//! Configured via `APOLLO_FACE_API_BASE_URL`, falling back to
|
||||
//! `APOLLO_API_BASE_URL` when the dedicated var is unset (single-Apollo
|
||||
//! deploys are the common case). Both unset → `is_enabled()` returns false.
|
||||
//!
|
||||
//! Wire format: multipart/form-data with `file=<bytes>` and `meta=<json>`.
|
||||
//! `meta` carries `{content_hash, library_id, rel_path, orientation?,
|
||||
//! model_version?}` — useful for Apollo-side logging and idempotency, ignored
|
||||
//! by Apollo today but part of the stable wire contract so future versions
|
||||
//! can act on it without a client change.
|
||||
//!
|
||||
//! Error mapping (reflected in [`FaceDetectError`]):
|
||||
//! - 422 `decode_failed` → permanent: ImageApi marks `status='failed'` and
|
||||
//! doesn't retry until manual rerun.
|
||||
//! - 200 with `faces:[]` → `status='no_faces'` marker row.
|
||||
//! - 503 `cuda_oom` / `engine_unavailable` → defer-and-retry: no marker
|
||||
//! written.
|
||||
//! - Any other 5xx / network error → defer.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use base64::Engine;
|
||||
use reqwest::Client;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct DetectMeta {
|
||||
pub content_hash: String,
|
||||
pub library_id: i32,
|
||||
pub rel_path: String,
|
||||
/// EXIF orientation int (1..8). Apollo applies `exif_transpose` on the
|
||||
/// bytes before inference, so this is informational only — supply when
|
||||
/// the bytes were extracted from a RAW preview that lost the tag.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub orientation: Option<i32>,
|
||||
/// Echoed back in the response. ImageApi stores it in
|
||||
/// `face_detections.model_version`.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub model_version: Option<String>,
|
||||
}
|
||||
|
||||
// Wire shape for the bbox sub-object Apollo returns. Read by Phase 3's
|
||||
// file-watch hook; silence the dead-code lint until then.
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct DetectedBbox {
|
||||
pub x: f32,
|
||||
pub y: f32,
|
||||
pub w: f32,
|
||||
pub h: f32,
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // bbox consumed by Phase 3 file-watch hook
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct DetectedFace {
|
||||
pub bbox: DetectedBbox,
|
||||
pub confidence: f32,
|
||||
/// base64 of 2048 bytes (512×f32 LE). ImageApi stores the raw bytes
|
||||
/// verbatim as a BLOB — see `decode_embedding` for the unpack.
|
||||
pub embedding: String,
|
||||
}
|
||||
|
||||
impl DetectedFace {
|
||||
/// Decode the wire-format embedding back into raw bytes for storage.
|
||||
/// Returns the 2048-byte little-endian f32 buffer or an error if the
|
||||
/// base64 is malformed or the wrong length.
|
||||
pub fn decode_embedding(&self) -> Result<Vec<u8>> {
|
||||
let bytes = base64::engine::general_purpose::STANDARD
|
||||
.decode(self.embedding.as_bytes())
|
||||
.context("face embedding base64 decode")?;
|
||||
if bytes.len() != 2048 {
|
||||
anyhow::bail!(
|
||||
"face embedding wrong size: got {} bytes, expected 2048",
|
||||
bytes.len()
|
||||
);
|
||||
}
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // duration_ms logged by Phase 3 file-watch hook
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct DetectResponse {
|
||||
pub model_version: String,
|
||||
pub duration_ms: i64,
|
||||
pub faces: Vec<DetectedFace>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[allow(dead_code)] // Reported by Apollo; useful for future health-driven backoff
|
||||
pub struct FaceHealth {
|
||||
pub loaded: bool,
|
||||
pub providers: Vec<String>,
|
||||
pub model_version: String,
|
||||
pub det_size: i32,
|
||||
#[serde(default)]
|
||||
pub load_error: Option<String>,
|
||||
}
|
||||
|
||||
/// Distinguishes permanent failures (don't retry) from transient ones
|
||||
/// (defer and retry on next scan tick). The file-watch hook keys its
|
||||
/// marker-row decision on this — a `Permanent` outcome writes
|
||||
/// `status='failed'`, a `Transient` outcome writes nothing so the next
|
||||
/// pass tries again.
|
||||
#[derive(Debug)]
|
||||
pub enum FaceDetectError {
|
||||
/// Apollo refused the bytes for a reason that won't change on retry
|
||||
/// (decode failure, zero-dim image). Mark `status='failed'`.
|
||||
Permanent(anyhow::Error),
|
||||
/// Apollo couldn't process this turn but might next time (CUDA OOM,
|
||||
/// engine not loaded yet, network hiccup). Don't mark anything.
|
||||
Transient(anyhow::Error),
|
||||
/// Feature is disabled (no `APOLLO_FACE_API_BASE_URL`). Caller should
|
||||
/// silently no-op — same shape as `apollo_client::is_enabled()` false.
|
||||
Disabled,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for FaceDetectError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
FaceDetectError::Permanent(e) => write!(f, "permanent: {e}"),
|
||||
FaceDetectError::Transient(e) => write!(f, "transient: {e}"),
|
||||
FaceDetectError::Disabled => write!(f, "face client disabled"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for FaceDetectError {}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct FaceClient {
|
||||
client: Client,
|
||||
/// `None` → disabled. Trim trailing slash at construction so url
|
||||
/// building doesn't double up.
|
||||
base_url: Option<String>,
|
||||
}
|
||||
|
||||
impl FaceClient {
|
||||
pub fn new(base_url: Option<String>) -> Self {
|
||||
// 60 s timeout: CPU inference on a backlog can take many seconds
|
||||
// per photo, especially the first call into a cold GPU. Apollo's
|
||||
// bounded threadpool (1 worker on CUDA) means concurrent calls
|
||||
// queue server-side; 60 s is enough headroom for a few items in
|
||||
// the queue without surfacing a false transient.
|
||||
let timeout_secs = std::env::var("FACE_DETECT_TIMEOUT_SEC")
|
||||
.ok()
|
||||
.and_then(|s| s.parse::<u64>().ok())
|
||||
.unwrap_or(60);
|
||||
let client = Client::builder()
|
||||
.timeout(Duration::from_secs(timeout_secs))
|
||||
.build()
|
||||
.expect("reqwest client build");
|
||||
Self {
|
||||
client,
|
||||
base_url: base_url.map(|u| u.trim_end_matches('/').to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_enabled(&self) -> bool {
|
||||
self.base_url.is_some()
|
||||
}
|
||||
|
||||
/// Detect every face in `bytes`. ImageApi calls this from the file-watch
|
||||
/// hook (Phase 3) and from the manual rerun handler. Empty `faces[]` in
|
||||
/// the response is the no-faces signal — caller writes a marker row.
|
||||
#[allow(dead_code)] // Phase 3 file-watch hook + rerun handler
|
||||
pub async fn detect(
|
||||
&self,
|
||||
bytes: Vec<u8>,
|
||||
meta: DetectMeta,
|
||||
) -> std::result::Result<DetectResponse, FaceDetectError> {
|
||||
let Some(base) = self.base_url.as_deref() else {
|
||||
return Err(FaceDetectError::Disabled);
|
||||
};
|
||||
let url = format!("{}/api/internal/faces/detect", base);
|
||||
self.post_multipart(&url, bytes, &meta).await
|
||||
}
|
||||
|
||||
/// Single-embedding endpoint for the manual face-create flow. Caller
|
||||
/// crops the image to the user-drawn bbox and passes those bytes; we
|
||||
/// run detection inside the crop and return the highest-confidence
|
||||
/// face's embedding. Apollo returns 422 `no_face_in_crop` when the
|
||||
/// box missed — surfaced here as `Permanent`.
|
||||
pub async fn embed(
|
||||
&self,
|
||||
bytes: Vec<u8>,
|
||||
meta: DetectMeta,
|
||||
) -> std::result::Result<DetectResponse, FaceDetectError> {
|
||||
let Some(base) = self.base_url.as_deref() else {
|
||||
return Err(FaceDetectError::Disabled);
|
||||
};
|
||||
let url = format!("{}/api/internal/faces/embed", base);
|
||||
self.post_multipart(&url, bytes, &meta).await
|
||||
}
|
||||
|
||||
/// Engine reachability + provider/model report. Used by ImageApi for a
|
||||
/// startup sanity check; not on the hot path.
|
||||
#[allow(dead_code)] // Phase 3 startup probe
|
||||
pub async fn health(&self) -> Result<FaceHealth> {
|
||||
let base = self.base_url.as_deref().context("face client disabled")?;
|
||||
let url = format!("{}/api/internal/faces/health", base);
|
||||
let resp = self.client.get(&url).send().await?.error_for_status()?;
|
||||
let body: FaceHealth = resp.json().await?;
|
||||
Ok(body)
|
||||
}
|
||||
|
||||
async fn post_multipart(
|
||||
&self,
|
||||
url: &str,
|
||||
bytes: Vec<u8>,
|
||||
meta: &DetectMeta,
|
||||
) -> std::result::Result<DetectResponse, FaceDetectError> {
|
||||
let meta_json = serde_json::to_string(meta)
|
||||
.map_err(|e| FaceDetectError::Permanent(anyhow::anyhow!("meta serialize: {e}")))?;
|
||||
let form = reqwest::multipart::Form::new()
|
||||
.text("meta", meta_json)
|
||||
.part(
|
||||
"file",
|
||||
reqwest::multipart::Part::bytes(bytes)
|
||||
.file_name(meta.rel_path.clone())
|
||||
.mime_str("application/octet-stream")
|
||||
.unwrap_or_else(|_| reqwest::multipart::Part::bytes(Vec::new())),
|
||||
);
|
||||
|
||||
let resp = match self.client.post(url).multipart(form).send().await {
|
||||
Ok(r) => r,
|
||||
Err(e) if e.is_timeout() || e.is_connect() => {
|
||||
return Err(FaceDetectError::Transient(anyhow::anyhow!(
|
||||
"face client network: {e}"
|
||||
)));
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(FaceDetectError::Transient(anyhow::anyhow!(
|
||||
"face client request: {e}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let status = resp.status();
|
||||
if status.is_success() {
|
||||
let body: DetectResponse = resp.json().await.map_err(|e| {
|
||||
FaceDetectError::Transient(anyhow::anyhow!("face response decode: {e}"))
|
||||
})?;
|
||||
return Ok(body);
|
||||
}
|
||||
|
||||
let body_text = resp.text().await.unwrap_or_default();
|
||||
Err(classify_error_response(status.as_u16(), &body_text))
|
||||
}
|
||||
}
|
||||
|
||||
/// Map an Apollo HTTP error response to a FaceDetectError. Pulled out as a
|
||||
/// pure function so the marker-row contract (422 → Permanent, 503 →
|
||||
/// Transient) is unit-testable without spinning up an HTTP server.
|
||||
fn classify_error_response(status: u16, body_text: &str) -> FaceDetectError {
|
||||
// Apollo encodes its error class in the JSON body's `detail`. Try to
|
||||
// parse it; fall back to status-only classification.
|
||||
let detail_code = serde_json::from_str::<serde_json::Value>(body_text)
|
||||
.ok()
|
||||
.and_then(|v| {
|
||||
// detail can be a string ("decode_failed") or an object
|
||||
// ({"code": "cuda_oom", ...}) depending on the endpoint and
|
||||
// Apollo's response shape — handle both.
|
||||
v.get("detail")
|
||||
.and_then(|d| d.as_str().map(str::to_string))
|
||||
.or_else(|| {
|
||||
v.get("detail")
|
||||
.and_then(|d| d.get("code"))
|
||||
.and_then(|c| c.as_str())
|
||||
.map(str::to_string)
|
||||
})
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
if status == 422 {
|
||||
return FaceDetectError::Permanent(anyhow::anyhow!(
|
||||
"face detect 422 {}: {}",
|
||||
detail_code,
|
||||
body_text
|
||||
));
|
||||
}
|
||||
if status == 503 {
|
||||
return FaceDetectError::Transient(anyhow::anyhow!(
|
||||
"face detect 503 {}: {}",
|
||||
detail_code,
|
||||
body_text
|
||||
));
|
||||
}
|
||||
// Any other 4xx: be conservative and treat as Permanent so we don't
|
||||
// loop forever on a stable rejection. Any other 5xx: Transient —
|
||||
// likely intermittent.
|
||||
if (400..500).contains(&status) {
|
||||
FaceDetectError::Permanent(anyhow::anyhow!(
|
||||
"face detect {} {}: {}",
|
||||
status,
|
||||
detail_code,
|
||||
body_text
|
||||
))
|
||||
} else {
|
||||
FaceDetectError::Transient(anyhow::anyhow!(
|
||||
"face detect {} {}: {}",
|
||||
status,
|
||||
detail_code,
|
||||
body_text
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn is_permanent(e: &FaceDetectError) -> bool {
|
||||
matches!(e, FaceDetectError::Permanent(_))
|
||||
}
|
||||
fn is_transient(e: &FaceDetectError) -> bool {
|
||||
matches!(e, FaceDetectError::Transient(_))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_422_decode_failed_is_permanent() {
|
||||
// Permanent → ImageApi marks status='failed' and stops retrying.
|
||||
let e = classify_error_response(422, r#"{"detail":"decode_failed: bad bytes"}"#);
|
||||
assert!(is_permanent(&e), "422 decode_failed must be Permanent");
|
||||
assert!(format!("{e}").contains("decode_failed"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_503_cuda_oom_is_transient() {
|
||||
// Transient → ImageApi must NOT write a marker so the next scan
|
||||
// retries. The detail.code is nested in an object rather than a
|
||||
// bare string; the parser handles both.
|
||||
let e = classify_error_response(
|
||||
503,
|
||||
r#"{"detail":{"code":"cuda_oom","error":"out of memory"}}"#,
|
||||
);
|
||||
assert!(is_transient(&e), "503 cuda_oom must be Transient");
|
||||
assert!(format!("{e}").contains("cuda_oom"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_500_is_transient_other_4xx_is_permanent() {
|
||||
// Conservative split: 5xx defers (intermittent), other 4xx
|
||||
// is treated as a stable rejection so we don't loop forever.
|
||||
assert!(is_transient(&classify_error_response(500, "")));
|
||||
assert!(is_transient(&classify_error_response(502, "{}")));
|
||||
assert!(is_permanent(&classify_error_response(400, "{}")));
|
||||
assert!(is_permanent(&classify_error_response(404, "{}")));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_handles_unparseable_body() {
|
||||
// Apollo can return non-JSON on misroute / proxy errors; the
|
||||
// classifier must still produce a useful variant.
|
||||
let e = classify_error_response(503, "<html>nginx</html>");
|
||||
assert!(is_transient(&e));
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod apollo_client;
|
||||
pub mod daily_summary_job;
|
||||
pub mod face_client;
|
||||
pub mod handlers;
|
||||
pub mod insight_chat;
|
||||
pub mod insight_generator;
|
||||
|
||||
@@ -386,6 +386,16 @@ pub trait ExifDao: Sync + Send {
|
||||
hash: &str,
|
||||
) -> Result<Vec<String>, DbError>;
|
||||
|
||||
/// Batch version of [`get_rel_paths_by_hash`]. Returns a
|
||||
/// `hash → Vec<rel_path>` map for every hash that has at least one
|
||||
/// rel_path. Used by the batch tag lookup endpoint to expand
|
||||
/// content-hash siblings without firing a query per hash.
|
||||
fn get_rel_paths_for_hashes(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
hashes: &[String],
|
||||
) -> Result<std::collections::HashMap<String, Vec<String>>, DbError>;
|
||||
|
||||
/// List `(library_id, rel_path)` pairs for the given libraries, optionally
|
||||
/// restricted to rows whose rel_path starts with `path_prefix`. When
|
||||
/// `library_ids` is empty, rows from every library are returned. Used by
|
||||
@@ -956,6 +966,40 @@ impl ExifDao for SqliteExifDao {
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
|
||||
fn get_rel_paths_for_hashes(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
hashes: &[String],
|
||||
) -> Result<std::collections::HashMap<String, Vec<String>>, DbError> {
|
||||
use std::collections::HashMap;
|
||||
let mut out: HashMap<String, Vec<String>> = HashMap::new();
|
||||
if hashes.is_empty() {
|
||||
return Ok(out);
|
||||
}
|
||||
trace_db_call(context, "query", "get_rel_paths_for_hashes", |_span| {
|
||||
use schema::image_exif::dsl::*;
|
||||
|
||||
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
|
||||
|
||||
// Chunk the IN clause to stay safely under SQLite's
|
||||
// SQLITE_LIMIT_VARIABLE_NUMBER (32766 modern, 999 legacy).
|
||||
const CHUNK: usize = 500;
|
||||
for chunk in hashes.chunks(CHUNK) {
|
||||
let rows: Vec<(String, String)> = image_exif
|
||||
.filter(content_hash.eq_any(chunk))
|
||||
.select((content_hash.assume_not_null(), rel_path))
|
||||
.distinct()
|
||||
.load::<(String, String)>(connection.deref_mut())
|
||||
.map_err(|_| anyhow::anyhow!("Query error"))?;
|
||||
for (hash, path) in rows {
|
||||
out.entry(hash).or_default().push(path);
|
||||
}
|
||||
}
|
||||
Ok(out)
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
|
||||
fn list_rel_paths_for_libraries(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
|
||||
@@ -70,6 +70,26 @@ diesel::table! {
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
face_detections (id) {
|
||||
id -> Integer,
|
||||
library_id -> Integer,
|
||||
content_hash -> Text,
|
||||
rel_path -> Text,
|
||||
bbox_x -> Nullable<Float>,
|
||||
bbox_y -> Nullable<Float>,
|
||||
bbox_w -> Nullable<Float>,
|
||||
bbox_h -> Nullable<Float>,
|
||||
embedding -> Nullable<Binary>,
|
||||
confidence -> Nullable<Float>,
|
||||
source -> Text,
|
||||
person_id -> Nullable<Integer>,
|
||||
status -> Text,
|
||||
model_version -> Text,
|
||||
created_at -> BigInt,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
favorites (id) {
|
||||
id -> Integer,
|
||||
@@ -130,6 +150,20 @@ diesel::table! {
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
persons (id) {
|
||||
id -> Integer,
|
||||
name -> Text,
|
||||
cover_face_id -> Nullable<Integer>,
|
||||
entity_id -> Nullable<Integer>,
|
||||
created_from_tag -> Bool,
|
||||
notes -> Nullable<Text>,
|
||||
created_at -> BigInt,
|
||||
updated_at -> BigInt,
|
||||
is_ignored -> Bool,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
photo_insights (id) {
|
||||
id -> Integer,
|
||||
@@ -201,7 +235,10 @@ diesel::table! {
|
||||
diesel::joinable!(entity_facts -> photo_insights (source_insight_id));
|
||||
diesel::joinable!(entity_photo_links -> entities (entity_id));
|
||||
diesel::joinable!(entity_photo_links -> libraries (library_id));
|
||||
diesel::joinable!(face_detections -> libraries (library_id));
|
||||
diesel::joinable!(face_detections -> persons (person_id));
|
||||
diesel::joinable!(image_exif -> libraries (library_id));
|
||||
diesel::joinable!(persons -> entities (entity_id));
|
||||
diesel::joinable!(photo_insights -> libraries (library_id));
|
||||
diesel::joinable!(tagged_photo -> tags (tag_id));
|
||||
diesel::joinable!(video_preview_clips -> libraries (library_id));
|
||||
@@ -212,10 +249,12 @@ diesel::allow_tables_to_appear_in_same_query!(
|
||||
entities,
|
||||
entity_facts,
|
||||
entity_photo_links,
|
||||
face_detections,
|
||||
favorites,
|
||||
image_exif,
|
||||
libraries,
|
||||
location_history,
|
||||
persons,
|
||||
photo_insights,
|
||||
search_history,
|
||||
tagged_photo,
|
||||
|
||||
590
src/face_watch.rs
Normal file
590
src/face_watch.rs
Normal file
@@ -0,0 +1,590 @@
|
||||
//! Face-detection pass for the file watcher.
|
||||
//!
|
||||
//! `process_new_files` calls [`run_face_detection_pass`] after the EXIF
|
||||
//! registration loop. We walk the candidates (images, not yet face-scanned,
|
||||
//! not excluded by EXCLUDED_DIRS), fan out parallel detect calls to Apollo,
|
||||
//! and persist the results — detected faces, `no_faces` markers when Apollo
|
||||
//! found nothing, `failed` markers on permanent decode errors, no marker on
|
||||
//! transient failures so the next scan retries.
|
||||
//!
|
||||
//! The watcher runs in a plain `std::thread`, so we build a short-lived
|
||||
//! tokio runtime per pass and `block_on` a join of K detect futures. K is
|
||||
//! configurable via `FACE_DETECT_CONCURRENCY` (default 8). Apollo's
|
||||
//! threadpool is bounded to 1–2 workers anyway, so the runs queue
|
||||
//! server-side; the client-side fan-out is purely about overlapping IO
|
||||
//! (file read + JSON encode) with someone else's inference.
|
||||
|
||||
use crate::ai::face_client::{DetectMeta, FaceClient, FaceDetectError};
|
||||
use crate::exif;
|
||||
use crate::faces::{self, FaceDao, InsertFaceDetectionInput};
|
||||
use crate::file_types;
|
||||
use crate::libraries::Library;
|
||||
use crate::memories::PathExcluder;
|
||||
use crate::tags::TagDao;
|
||||
use log::{debug, info, warn};
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
/// One file the watcher would like to face-scan. Built by the caller from
|
||||
/// the EXIF batch (we need `content_hash` to key everything against).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FaceCandidate {
|
||||
pub rel_path: String,
|
||||
pub content_hash: String,
|
||||
}
|
||||
|
||||
/// Synchronous entry point. Returns once every candidate has been
|
||||
/// processed (or definitively skipped). When `face_client.is_enabled()`
|
||||
/// is false this is a no-op so the watcher can call unconditionally.
|
||||
pub fn run_face_detection_pass(
|
||||
library: &Library,
|
||||
excluded_dirs: &[String],
|
||||
face_client: &FaceClient,
|
||||
face_dao: Arc<Mutex<Box<dyn FaceDao>>>,
|
||||
tag_dao: Arc<Mutex<Box<dyn TagDao>>>,
|
||||
candidates: Vec<FaceCandidate>,
|
||||
) {
|
||||
if !face_client.is_enabled() {
|
||||
return;
|
||||
}
|
||||
if candidates.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let base = Path::new(&library.root_path);
|
||||
let filtered = filter_excluded(base, excluded_dirs, candidates, Some(&library.name));
|
||||
if filtered.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let concurrency: usize = std::env::var("FACE_DETECT_CONCURRENCY")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.filter(|n: &usize| *n > 0)
|
||||
.unwrap_or(8);
|
||||
|
||||
info!(
|
||||
"face_watch: running detection on {} candidates (library '{}', concurrency {})",
|
||||
filtered.len(),
|
||||
library.name,
|
||||
concurrency
|
||||
);
|
||||
|
||||
// Per-pass tokio runtime. The watcher thread isn't in any pre-existing
|
||||
// async context — building one here keeps the rest of the watcher
|
||||
// sync-only. Worker count is small; the parallelism we care about is
|
||||
// task-level (semaphore) not thread-level.
|
||||
let rt = match tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(2)
|
||||
.enable_all()
|
||||
.build()
|
||||
{
|
||||
Ok(rt) => rt,
|
||||
Err(e) => {
|
||||
warn!("face_watch: failed to build tokio runtime: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let library_id = library.id;
|
||||
let library_root = library.root_path.clone();
|
||||
rt.block_on(async move {
|
||||
let sem = Arc::new(Semaphore::new(concurrency));
|
||||
let mut handles = Vec::with_capacity(filtered.len());
|
||||
for cand in filtered {
|
||||
let permit_sem = sem.clone();
|
||||
let face_client = face_client.clone();
|
||||
let face_dao = face_dao.clone();
|
||||
let tag_dao = tag_dao.clone();
|
||||
let library_root = library_root.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
// acquire_owned would let us drop the permit explicitly
|
||||
// before await points; for a one-shot call into Apollo
|
||||
// the simpler bounded acquire is enough.
|
||||
let _permit = permit_sem.acquire().await.expect("face semaphore");
|
||||
process_one(
|
||||
library_id,
|
||||
&library_root,
|
||||
cand,
|
||||
&face_client,
|
||||
face_dao,
|
||||
tag_dao,
|
||||
)
|
||||
.await;
|
||||
}));
|
||||
}
|
||||
for h in handles {
|
||||
// join; per-task panics are logged inside process_one before
|
||||
// they reach here, so we don't propagate.
|
||||
let _ = h.await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn process_one(
|
||||
library_id: i32,
|
||||
library_root: &str,
|
||||
cand: FaceCandidate,
|
||||
face_client: &FaceClient,
|
||||
face_dao: Arc<Mutex<Box<dyn FaceDao>>>,
|
||||
tag_dao: Arc<Mutex<Box<dyn TagDao>>>,
|
||||
) {
|
||||
let abs = Path::new(library_root).join(&cand.rel_path);
|
||||
// Read the bytes off disk in a blocking-friendly task. Filesystem IO
|
||||
// is sync but cheap; a small spawn_blocking would be overkill.
|
||||
let bytes = match read_image_bytes_for_detect(&abs) {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
// Don't mark — file may have been moved/renamed mid-scan; let
|
||||
// the next pass try again. Future-bug check: a permanently
|
||||
// unreadable file would loop forever; we accept that for v1
|
||||
// because process_new_files already prunes vanished rows on
|
||||
// full scans.
|
||||
warn!(
|
||||
"face_watch: read failed for {} ({}): {}",
|
||||
cand.rel_path, library_id, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let meta = DetectMeta {
|
||||
content_hash: cand.content_hash.clone(),
|
||||
library_id,
|
||||
rel_path: cand.rel_path.clone(),
|
||||
orientation: None,
|
||||
model_version: None,
|
||||
};
|
||||
let ctx = opentelemetry::Context::current();
|
||||
|
||||
match face_client.detect(bytes, meta).await {
|
||||
Ok(resp) => {
|
||||
// Stage 1: persist detections, holding the dao lock only
|
||||
// across synchronous DB writes.
|
||||
let mut stored_for_autobind: Vec<(i32, Vec<f32>)> = Vec::new();
|
||||
{
|
||||
let mut dao = face_dao.lock().expect("face dao");
|
||||
if resp.faces.is_empty() {
|
||||
if let Err(e) = dao.mark_status(
|
||||
&ctx,
|
||||
library_id,
|
||||
&cand.content_hash,
|
||||
&cand.rel_path,
|
||||
"no_faces",
|
||||
&resp.model_version,
|
||||
) {
|
||||
warn!(
|
||||
"face_watch: mark no_faces failed for {}: {:?}",
|
||||
cand.rel_path, e
|
||||
);
|
||||
}
|
||||
debug!(
|
||||
"face_watch: {} → no faces (model {})",
|
||||
cand.rel_path, resp.model_version
|
||||
);
|
||||
} else {
|
||||
let face_count = resp.faces.len();
|
||||
for face in &resp.faces {
|
||||
let emb = match face.decode_embedding() {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
warn!("face_watch: bad embedding for {}: {:?}", cand.rel_path, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// Decode the f32 vector once for auto-bind comparison.
|
||||
let emb_floats = faces::decode_embedding_bytes(&emb);
|
||||
match dao.store_detection(
|
||||
&ctx,
|
||||
InsertFaceDetectionInput {
|
||||
library_id,
|
||||
content_hash: cand.content_hash.clone(),
|
||||
rel_path: cand.rel_path.clone(),
|
||||
bbox: Some((face.bbox.x, face.bbox.y, face.bbox.w, face.bbox.h)),
|
||||
embedding: Some(emb),
|
||||
confidence: Some(face.confidence),
|
||||
source: "auto".to_string(),
|
||||
person_id: None,
|
||||
status: "detected".to_string(),
|
||||
model_version: resp.model_version.clone(),
|
||||
},
|
||||
) {
|
||||
Ok(row) => {
|
||||
if let Some(floats) = emb_floats {
|
||||
stored_for_autobind.push((row.id, floats));
|
||||
}
|
||||
}
|
||||
Err(e) => warn!(
|
||||
"face_watch: store_detection failed for {}: {:?}",
|
||||
cand.rel_path, e
|
||||
),
|
||||
}
|
||||
}
|
||||
info!(
|
||||
"face_watch: {} → {} face(s) ({}ms, {})",
|
||||
cand.rel_path, face_count, resp.duration_ms, resp.model_version
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Stage 2: auto-bind newly-stored faces against same-named
|
||||
// people-tags. Done outside the dao lock so the lookups don't
|
||||
// serialize with concurrent detect tasks.
|
||||
if !stored_for_autobind.is_empty() {
|
||||
try_auto_bind(
|
||||
&ctx,
|
||||
&cand.rel_path,
|
||||
&resp.model_version,
|
||||
stored_for_autobind,
|
||||
&tag_dao,
|
||||
&face_dao,
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(FaceDetectError::Permanent(e)) => {
|
||||
warn!(
|
||||
"face_watch: permanent failure on {}: {} — marking failed",
|
||||
cand.rel_path, e
|
||||
);
|
||||
let mut dao = face_dao.lock().expect("face dao");
|
||||
// model_version is best-effort here — the engine that rejected
|
||||
// the bytes may not have echoed one. Empty string is fine; this
|
||||
// row is purely a "don't retry" sentinel.
|
||||
if let Err(e) = dao.mark_status(
|
||||
&ctx,
|
||||
library_id,
|
||||
&cand.content_hash,
|
||||
&cand.rel_path,
|
||||
"failed",
|
||||
"",
|
||||
) {
|
||||
warn!(
|
||||
"face_watch: mark failed errored for {}: {:?}",
|
||||
cand.rel_path, e
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(FaceDetectError::Transient(e)) => {
|
||||
// Don't mark anything; next scan tick retries naturally.
|
||||
// Demoted to debug because OOM and engine-not-ready are noisy
|
||||
// and self-resolving.
|
||||
debug!(
|
||||
"face_watch: transient on {}: {} (will retry next pass)",
|
||||
cand.rel_path, e
|
||||
);
|
||||
}
|
||||
Err(FaceDetectError::Disabled) => {
|
||||
// Caller already checked is_enabled(); this branch is defensive.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Auto-bind newly-detected faces to a same-named person, when a tag on the
|
||||
/// photo unambiguously identifies one. Driven by `FACE_AUTOBIND_MIN_COS`
|
||||
/// (default 0.4): the new face's embedding must reach this cosine
|
||||
/// similarity against the L2-normalized mean of the person's existing
|
||||
/// faces. The first face for a person binds unconditionally — there's
|
||||
/// nothing to compare against, and the alternative ("never bind without
|
||||
/// a reference") would mean bootstrap never kicks off.
|
||||
///
|
||||
/// Multi-match (the photo carries tags for two different known persons)
|
||||
/// is intentionally a no-op — we can't tell which face is which without
|
||||
/// additional matching. Those faces stay unassigned for the cluster
|
||||
/// suggester (Phase 6) to handle.
|
||||
fn try_auto_bind(
|
||||
ctx: &opentelemetry::Context,
|
||||
rel_path: &str,
|
||||
model_version: &str,
|
||||
new_faces: Vec<(i32, Vec<f32>)>, // (face_id, decoded embedding)
|
||||
tag_dao: &Arc<Mutex<Box<dyn TagDao>>>,
|
||||
face_dao: &Arc<Mutex<Box<dyn FaceDao>>>,
|
||||
) {
|
||||
// 1. Pull the photo's tags.
|
||||
let tag_names: Vec<String> = {
|
||||
let mut td = tag_dao.lock().expect("tag dao");
|
||||
match td.get_tags_for_path(ctx, rel_path) {
|
||||
Ok(tags) => tags.into_iter().map(|t| t.name).collect(),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"face_watch: get_tags_for_path failed for {}: {:?}",
|
||||
rel_path, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
if tag_names.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. Find tags that map to existing persons (case-insensitive).
|
||||
let person_for_tag: std::collections::HashMap<String, i32> = {
|
||||
let mut fd = face_dao.lock().expect("face dao");
|
||||
match fd.find_persons_by_names_ci(ctx, &tag_names) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"face_watch: find_persons_by_names_ci failed for {}: {:?}",
|
||||
rel_path, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// 3. Multi-match: ambiguous, skip. Single match: candidate person.
|
||||
let unique_person_ids: std::collections::HashSet<i32> =
|
||||
person_for_tag.values().copied().collect();
|
||||
if unique_person_ids.len() != 1 {
|
||||
if !unique_person_ids.is_empty() {
|
||||
debug!(
|
||||
"face_watch: {} carries tags for {} different persons; skipping auto-bind",
|
||||
rel_path,
|
||||
unique_person_ids.len()
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
let person_id = *unique_person_ids.iter().next().expect("nonempty set");
|
||||
|
||||
let threshold: f32 = std::env::var("FACE_AUTOBIND_MIN_COS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.filter(|t: &f32| *t >= 0.0 && *t <= 1.0)
|
||||
.unwrap_or(0.4);
|
||||
|
||||
// 4. Reference embedding (if any) under the same model_version.
|
||||
let reference: Option<Vec<f32>> = {
|
||||
let mut fd = face_dao.lock().expect("face dao");
|
||||
match fd.person_reference_embedding(ctx, person_id, model_version) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"face_watch: person_reference_embedding failed for person {}: {:?}",
|
||||
person_id, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// 5. Bind each new face that meets the criterion. Hold the lock once
|
||||
// for the whole batch; assign_face_to_person uses its own short
|
||||
// transaction internally.
|
||||
let mut fd = face_dao.lock().expect("face dao");
|
||||
for (face_id, emb) in new_faces {
|
||||
let bind = match &reference {
|
||||
None => {
|
||||
// Person has no faces yet — first one wins so bootstrap
|
||||
// can ever produce a usable reference. After this row
|
||||
// commits, future faces evaluate against it.
|
||||
debug!(
|
||||
"face_watch: auto-binding first face {} → person {} (no reference yet)",
|
||||
face_id, person_id
|
||||
);
|
||||
true
|
||||
}
|
||||
Some(ref_vec) => {
|
||||
let sim = faces::cosine_similarity(&emb, ref_vec);
|
||||
if sim >= threshold {
|
||||
debug!(
|
||||
"face_watch: auto-binding face {} → person {} (cos={:.3} ≥ {:.3})",
|
||||
face_id, person_id, sim, threshold
|
||||
);
|
||||
true
|
||||
} else {
|
||||
debug!(
|
||||
"face_watch: leaving face {} unassigned (cos={:.3} < {:.3} for person {})",
|
||||
face_id, sim, threshold, person_id
|
||||
);
|
||||
false
|
||||
}
|
||||
}
|
||||
};
|
||||
if bind && let Err(e) = fd.assign_face_to_person(ctx, face_id, person_id) {
|
||||
warn!(
|
||||
"face_watch: assign_face_to_person failed (face={}, person={}): {:?}",
|
||||
face_id, person_id, e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop candidates whose path matches the watcher's `EXCLUDED_DIRS` rules.
|
||||
/// Pulled out for unit testing — the same `PathExcluder` /memories uses,
|
||||
/// just applied at the face-detect candidate set instead of the memories
|
||||
/// listing. Skip @eaDir / .thumbnails / user-defined paths before we burn
|
||||
/// a detect call (and Apollo's GPU memory) on junk. Also drops anything
|
||||
/// that isn't an image file — the backlog drain pulls every hashed row in
|
||||
/// `image_exif`, which includes videos; sending those to Apollo just
|
||||
/// produces `failed` markers and inflates the FAILED stat.
|
||||
pub(crate) fn filter_excluded(
|
||||
base: &Path,
|
||||
excluded_dirs: &[String],
|
||||
candidates: Vec<FaceCandidate>,
|
||||
library_name: Option<&str>,
|
||||
) -> Vec<FaceCandidate> {
|
||||
let excluder = if excluded_dirs.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(PathExcluder::new(base, excluded_dirs))
|
||||
};
|
||||
candidates
|
||||
.into_iter()
|
||||
.filter(|c| {
|
||||
let abs = base.join(&c.rel_path);
|
||||
if !file_types::is_image_file(&abs) {
|
||||
debug!(
|
||||
"face_watch: skipping non-image path {} (library {})",
|
||||
c.rel_path,
|
||||
library_name.unwrap_or("<unknown>")
|
||||
);
|
||||
return false;
|
||||
}
|
||||
if let Some(ex) = excluder.as_ref()
|
||||
&& ex.is_excluded(&abs)
|
||||
{
|
||||
debug!(
|
||||
"face_watch: skipping excluded path {} (library {})",
|
||||
c.rel_path,
|
||||
library_name.unwrap_or("<unknown>")
|
||||
);
|
||||
return false;
|
||||
}
|
||||
true
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Read image bytes for face detection. Insightface (via opencv) can't
|
||||
/// decode RAW or HEIC — for those we extract the embedded JPEG preview
|
||||
/// the way the thumbnail pipeline does. Plain JPEG/PNG/WebP/etc. go
|
||||
/// through a direct read.
|
||||
pub(crate) fn read_image_bytes_for_detect(path: &Path) -> std::io::Result<Vec<u8>> {
|
||||
if file_types::needs_ffmpeg_thumbnail(path)
|
||||
&& let Some(preview) = exif::extract_embedded_jpeg_preview(path)
|
||||
{
|
||||
return Ok(preview);
|
||||
}
|
||||
// Plain read for everything else. RAW/HEIC files without an embedded
|
||||
// preview fall through here too; Apollo will then 422 and the caller
|
||||
// marks the row failed. That's fine; we tried.
|
||||
std::fs::read(path)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::fs;
|
||||
|
||||
fn cand(rel_path: &str) -> FaceCandidate {
|
||||
FaceCandidate {
|
||||
rel_path: rel_path.to_string(),
|
||||
content_hash: format!("hash-{rel_path}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filter_excluded_pattern_drops_dir_components() {
|
||||
// A pattern matches a path *component* under base, not a substring.
|
||||
// Phase 3 needs this for @eaDir / .thumbnails skipping.
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let base = tmp.path();
|
||||
let candidates = vec![
|
||||
cand("photos/a.jpg"), // keep
|
||||
cand("photos/@eaDir/SYNOPHOTO_THUMB"), // drop (component match)
|
||||
cand("photos/eaDir-not-a-thing.jpg"), // keep (substring, not component)
|
||||
];
|
||||
let kept = filter_excluded(base, &["@eaDir".to_string()], candidates, Some("test"));
|
||||
let kept_paths: Vec<_> = kept.iter().map(|c| c.rel_path.as_str()).collect();
|
||||
assert_eq!(
|
||||
kept_paths,
|
||||
vec!["photos/a.jpg", "photos/eaDir-not-a-thing.jpg"]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filter_excluded_absolute_dir_drops_subtree() {
|
||||
// Absolute (under-base) entries drop the whole subtree.
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let base = tmp.path();
|
||||
let candidates = vec![
|
||||
cand("public/a.jpg"),
|
||||
cand("private/a.jpg"),
|
||||
cand("private/sub/b.jpg"),
|
||||
];
|
||||
let kept = filter_excluded(base, &["/private".to_string()], candidates, None);
|
||||
let kept_paths: Vec<_> = kept.iter().map(|c| c.rel_path.as_str()).collect();
|
||||
assert_eq!(kept_paths, vec!["public/a.jpg"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filter_excluded_empty_rules_passes_all() {
|
||||
// EXCLUDED_DIRS unset still lets every image through — only the
|
||||
// PathExcluder is skipped, the image-extension gate still runs.
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let base = tmp.path();
|
||||
let candidates = vec![cand("a.jpg"), cand("b.jpg")];
|
||||
let kept = filter_excluded(base, &[], candidates, None);
|
||||
assert_eq!(kept.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filter_excluded_drops_videos_and_non_media() {
|
||||
// Backlog drain pulls every hashed row in image_exif (videos
|
||||
// included). Videos must never reach Apollo — opencv can't
|
||||
// decode them, every call would 422 and write a `failed` marker.
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let base = tmp.path();
|
||||
let candidates = vec![
|
||||
cand("photos/a.jpg"),
|
||||
cand("photos/clip.mp4"),
|
||||
cand("photos/clip.MOV"),
|
||||
cand("photos/notes.txt"),
|
||||
cand("photos/b.heic"),
|
||||
];
|
||||
let kept = filter_excluded(base, &[], candidates, Some("test"));
|
||||
let kept_paths: Vec<_> = kept.iter().map(|c| c.rel_path.as_str()).collect();
|
||||
assert_eq!(kept_paths, vec!["photos/a.jpg", "photos/b.heic"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_bytes_passes_through_for_jpeg() {
|
||||
// JPEG goes through plain read — we DON'T want to lose orientation
|
||||
// metadata or re-encode here; insightface's exif_transpose handles
|
||||
// orientation on its end.
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let path = tmp.path().join("test.jpg");
|
||||
let mut buf = Vec::new();
|
||||
// Tiny 4x4 grey JPEG — encoded by image crate so we know it round-trips.
|
||||
let img = image::DynamicImage::ImageRgb8(image::RgbImage::from_pixel(
|
||||
4,
|
||||
4,
|
||||
image::Rgb([128, 128, 128]),
|
||||
));
|
||||
img.write_to(
|
||||
&mut std::io::Cursor::new(&mut buf),
|
||||
image::ImageFormat::Jpeg,
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(&path, &buf).unwrap();
|
||||
|
||||
let read = read_image_bytes_for_detect(&path).expect("read jpeg");
|
||||
assert_eq!(read, buf, "JPEG bytes must pass through verbatim");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_bytes_falls_back_when_raw_has_no_preview() {
|
||||
// A `.nef` file with non-RAW bytes won't have an embedded preview —
|
||||
// the helper falls through to plain read rather than refusing. This
|
||||
// matches the docstring contract; Apollo will then 422 and we'll
|
||||
// mark the row as failed.
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let path = tmp.path().join("not_really.nef");
|
||||
fs::write(&path, b"definitely-not-a-raw-file").unwrap();
|
||||
|
||||
let read = read_image_bytes_for_detect(&path).expect("fallback read");
|
||||
assert_eq!(read, b"definitely-not-a-raw-file");
|
||||
}
|
||||
}
|
||||
3403
src/faces.rs
Normal file
3403
src/faces.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1659,6 +1659,14 @@ mod tests {
|
||||
Ok(vec![])
|
||||
}
|
||||
|
||||
fn get_rel_paths_for_hashes(
|
||||
&mut self,
|
||||
_context: &opentelemetry::Context,
|
||||
_hashes: &[String],
|
||||
) -> Result<std::collections::HashMap<String, Vec<String>>, DbError> {
|
||||
Ok(std::collections::HashMap::new())
|
||||
}
|
||||
|
||||
fn list_rel_paths_for_libraries(
|
||||
&mut self,
|
||||
_context: &opentelemetry::Context,
|
||||
|
||||
@@ -12,6 +12,8 @@ pub mod data;
|
||||
pub mod database;
|
||||
pub mod error;
|
||||
pub mod exif;
|
||||
pub mod face_watch;
|
||||
pub mod faces;
|
||||
pub mod file_types;
|
||||
pub mod files;
|
||||
pub mod geo;
|
||||
|
||||
408
src/main.rs
408
src/main.rs
@@ -66,6 +66,8 @@ mod data;
|
||||
mod database;
|
||||
mod error;
|
||||
mod exif;
|
||||
mod face_watch;
|
||||
mod faces;
|
||||
mod file_types;
|
||||
mod files;
|
||||
mod geo;
|
||||
@@ -1459,6 +1461,8 @@ fn main() -> std::io::Result<()> {
|
||||
app_state.libraries.clone(),
|
||||
playlist_mgr_for_watcher,
|
||||
preview_gen_for_watcher,
|
||||
app_state.face_client.clone(),
|
||||
app_state.excluded_dirs.clone(),
|
||||
);
|
||||
|
||||
// Start orphaned playlist cleanup job
|
||||
@@ -1518,6 +1522,7 @@ fn main() -> std::io::Result<()> {
|
||||
let exif_dao = SqliteExifDao::new();
|
||||
let insight_dao = SqliteInsightDao::new();
|
||||
let preview_dao = SqlitePreviewDao::new();
|
||||
let face_dao = faces::SqliteFaceDao::new();
|
||||
let cors = Cors::default()
|
||||
.allowed_origin_fn(|origin, _req_head| {
|
||||
// Allow all origins in development, or check against CORS_ALLOWED_ORIGINS env var
|
||||
@@ -1595,6 +1600,7 @@ fn main() -> std::io::Result<()> {
|
||||
.service(libraries::list_libraries)
|
||||
.add_feature(add_tag_services::<_, SqliteTagDao>)
|
||||
.add_feature(knowledge::add_knowledge_services::<_, SqliteKnowledgeDao>)
|
||||
.add_feature(faces::add_face_services::<_, faces::SqliteFaceDao>)
|
||||
.app_data(app_data.clone())
|
||||
.app_data::<Data<RealFileSystem>>(Data::new(RealFileSystem::new(
|
||||
app_data.base_path.clone(),
|
||||
@@ -1616,6 +1622,10 @@ fn main() -> std::io::Result<()> {
|
||||
.app_data::<Data<Mutex<SqliteKnowledgeDao>>>(Data::new(Mutex::new(
|
||||
SqliteKnowledgeDao::new(),
|
||||
)))
|
||||
.app_data::<Data<Mutex<faces::SqliteFaceDao>>>(Data::new(Mutex::new(face_dao)))
|
||||
.app_data::<Data<crate::ai::face_client::FaceClient>>(Data::new(
|
||||
app_data.face_client.clone(),
|
||||
))
|
||||
.app_data(mp::form::MultipartFormConfig::default().total_limit(1024 * 1024 * 1024)) // 1GB upload limit
|
||||
.app_data(web::JsonConfig::default().error_handler(|err, req| {
|
||||
let detail = err.to_string();
|
||||
@@ -1780,6 +1790,8 @@ fn watch_files(
|
||||
libs: Vec<libraries::Library>,
|
||||
playlist_manager: Addr<VideoPlaylistManager>,
|
||||
preview_generator: Addr<video::actors::PreviewClipGenerator>,
|
||||
face_client: crate::ai::face_client::FaceClient,
|
||||
excluded_dirs: Vec<String>,
|
||||
) {
|
||||
std::thread::spawn(move || {
|
||||
// Get polling intervals from environment variables
|
||||
@@ -1798,6 +1810,18 @@ fn watch_files(
|
||||
info!("Starting optimized file watcher");
|
||||
info!(" Quick scan interval: {} seconds", quick_interval_secs);
|
||||
info!(" Full scan interval: {} seconds", full_interval_secs);
|
||||
// Surface face-detection state at boot so it's obvious whether
|
||||
// the watcher will hit Apollo. The branch silently no-ops when
|
||||
// disabled (intentional for legacy deploys), which makes "why
|
||||
// aren't faces being detected?" hard to diagnose otherwise.
|
||||
if face_client.is_enabled() {
|
||||
info!(" Face detection: ENABLED");
|
||||
} else {
|
||||
info!(
|
||||
" Face detection: DISABLED (set APOLLO_FACE_API_BASE_URL \
|
||||
or APOLLO_API_BASE_URL to enable)"
|
||||
);
|
||||
}
|
||||
for lib in &libs {
|
||||
info!(
|
||||
" Watching library '{}' (id={}) at {}",
|
||||
@@ -1812,6 +1836,15 @@ fn watch_files(
|
||||
let preview_dao = Arc::new(Mutex::new(
|
||||
Box::new(SqlitePreviewDao::new()) as Box<dyn PreviewDao>
|
||||
));
|
||||
let face_dao = Arc::new(Mutex::new(
|
||||
Box::new(faces::SqliteFaceDao::new()) as Box<dyn faces::FaceDao>
|
||||
));
|
||||
// tag_dao for the watcher's auto-bind path. Independent of the
|
||||
// request-handler tag_dao instance — both end up pointing at the
|
||||
// same SQLite file via SqliteTagDao::default().
|
||||
let watcher_tag_dao = Arc::new(Mutex::new(
|
||||
Box::new(SqliteTagDao::default()) as Box<dyn tags::TagDao>
|
||||
));
|
||||
|
||||
let mut last_quick_scan = SystemTime::now();
|
||||
let mut last_full_scan = SystemTime::now();
|
||||
@@ -1828,6 +1861,26 @@ fn watch_files(
|
||||
let is_full_scan = since_last_full.as_secs() >= full_interval_secs;
|
||||
|
||||
for lib in &libs {
|
||||
// Drain the unhashed-hash backlog AND the face-detection
|
||||
// backlog every tick, regardless of quick/full. Quick
|
||||
// scans only walk recently-modified files, so the
|
||||
// pre-Phase-3 backlog never enters their candidate set
|
||||
// — without these standalone passes, backfill +
|
||||
// detection only progressed during full scans
|
||||
// (default once an hour).
|
||||
if face_client.is_enabled() {
|
||||
let context = opentelemetry::Context::new();
|
||||
backfill_unhashed_backlog(&context, lib, &exif_dao);
|
||||
process_face_backlog(
|
||||
&context,
|
||||
lib,
|
||||
&face_client,
|
||||
&face_dao,
|
||||
&watcher_tag_dao,
|
||||
&excluded_dirs,
|
||||
);
|
||||
}
|
||||
|
||||
if is_full_scan {
|
||||
info!(
|
||||
"Running full scan for library '{}' (scan #{})",
|
||||
@@ -1837,6 +1890,10 @@ fn watch_files(
|
||||
lib,
|
||||
Arc::clone(&exif_dao),
|
||||
Arc::clone(&preview_dao),
|
||||
Arc::clone(&face_dao),
|
||||
Arc::clone(&watcher_tag_dao),
|
||||
face_client.clone(),
|
||||
&excluded_dirs,
|
||||
None,
|
||||
playlist_manager.clone(),
|
||||
preview_generator.clone(),
|
||||
@@ -1854,6 +1911,10 @@ fn watch_files(
|
||||
lib,
|
||||
Arc::clone(&exif_dao),
|
||||
Arc::clone(&preview_dao),
|
||||
Arc::clone(&face_dao),
|
||||
Arc::clone(&watcher_tag_dao),
|
||||
face_client.clone(),
|
||||
&excluded_dirs,
|
||||
Some(check_since),
|
||||
playlist_manager.clone(),
|
||||
preview_generator.clone(),
|
||||
@@ -1900,6 +1961,10 @@ fn process_new_files(
|
||||
library: &libraries::Library,
|
||||
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
|
||||
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
|
||||
face_dao: Arc<Mutex<Box<dyn faces::FaceDao>>>,
|
||||
tag_dao: Arc<Mutex<Box<dyn tags::TagDao>>>,
|
||||
face_client: crate::ai::face_client::FaceClient,
|
||||
excluded_dirs: &[String],
|
||||
modified_since: Option<SystemTime>,
|
||||
playlist_manager: Addr<VideoPlaylistManager>,
|
||||
preview_generator: Addr<video::actors::PreviewClipGenerator>,
|
||||
@@ -2075,6 +2140,43 @@ fn process_new_files(
|
||||
}
|
||||
}
|
||||
|
||||
// ── Face detection pass ────────────────────────────────────────────
|
||||
// Run after EXIF writes so newly-registered files have their
|
||||
// content_hash populated. Skipped wholesale when face_client is
|
||||
// disabled (no Apollo integration configured) — Phase 3 wires this
|
||||
// up; the watcher remains usable on legacy deploys.
|
||||
if face_client.is_enabled() {
|
||||
// Opportunistic content_hash backfill: photos indexed before
|
||||
// content-hashing landed (or where the hash compute failed
|
||||
// silently on insert) end up in image_exif with NULL
|
||||
// content_hash. build_face_candidates keys on content_hash, so
|
||||
// those files would never become candidates without backfill.
|
||||
// Idempotent — subsequent scans see the populated hashes and
|
||||
// no-op. The dedicated `backfill_hashes` binary is still the
|
||||
// right tool for very large legacy libraries; this branch
|
||||
// ensures small/medium deploys self-heal without operator
|
||||
// action.
|
||||
backfill_missing_content_hashes(&context, &files, library, &exif_dao);
|
||||
let candidates = build_face_candidates(&context, &files, &exif_dao, &face_dao);
|
||||
debug!(
|
||||
"face_watch: scan tick — {} image file(s) walked, {} candidate(s) (library '{}', modified_since={})",
|
||||
files.iter().filter(|(p, _)| !is_video_file(p)).count(),
|
||||
candidates.len(),
|
||||
library.name,
|
||||
modified_since.is_some(),
|
||||
);
|
||||
if !candidates.is_empty() {
|
||||
face_watch::run_face_detection_pass(
|
||||
library,
|
||||
excluded_dirs,
|
||||
&face_client,
|
||||
Arc::clone(&face_dao),
|
||||
Arc::clone(&tag_dao),
|
||||
candidates,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Check for videos that need HLS playlists
|
||||
let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
|
||||
let mut videos_needing_playlists = Vec::new();
|
||||
@@ -2199,6 +2301,312 @@ fn process_new_files(
|
||||
}
|
||||
}
|
||||
|
||||
/// Compute and persist content_hash for image_exif rows where it's NULL.
|
||||
///
|
||||
/// Bounded per call by `FACE_HASH_BACKFILL_MAX_PER_TICK` (default 500) so
|
||||
/// a watcher tick on a large legacy library doesn't block for hours
|
||||
/// blake3-ing every photo at once. Subsequent scans pick up the rest.
|
||||
/// For 50k+ libraries the dedicated `cargo run --bin backfill_hashes`
|
||||
/// is still faster (it doesn't fight a watcher loop for the DAO mutex).
|
||||
/// Drain unhashed image_exif rows by querying them directly, independent
|
||||
/// of the filesystem walk. Quick scans only walk recently-modified
|
||||
/// files, so a backlog of pre-existing unhashed rows never enters
|
||||
/// `process_new_files`'s candidate set — left alone, it would only
|
||||
/// drain on full scans (default once an hour). Calling this every tick
|
||||
/// keeps the face-detection backlog moving regardless.
|
||||
///
|
||||
/// Returns the number of rows successfully backfilled this pass.
|
||||
fn backfill_unhashed_backlog(
|
||||
context: &opentelemetry::Context,
|
||||
library: &libraries::Library,
|
||||
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
|
||||
) -> usize {
|
||||
let cap: i64 = dotenv::var("FACE_HASH_BACKFILL_MAX_PER_TICK")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.filter(|n: &i64| *n > 0)
|
||||
.unwrap_or(2000);
|
||||
|
||||
// Fetch up to cap+1 rows so we can tell "more remain" without a
|
||||
// separate count query. Across libraries — there's no per-library
|
||||
// filter on get_rows_missing_hash today — but we only ever update
|
||||
// rows whose library_id matches the caller's library, so other
|
||||
// libraries' rows just get skipped here and picked up on the next
|
||||
// library's tick. Negligible cost given the cap.
|
||||
let rows: Vec<(i32, String)> = {
|
||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||
dao.get_rows_missing_hash(context, cap + 1).unwrap_or_default()
|
||||
};
|
||||
if rows.is_empty() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let more_than_cap = rows.len() as i64 > cap;
|
||||
let base_path = std::path::Path::new(&library.root_path);
|
||||
|
||||
let mut backfilled = 0usize;
|
||||
let mut errors = 0usize;
|
||||
let mut skipped_other_lib = 0usize;
|
||||
for (lib_id, rel_path) in rows.iter().take(cap as usize) {
|
||||
if *lib_id != library.id {
|
||||
skipped_other_lib += 1;
|
||||
continue;
|
||||
}
|
||||
let abs = base_path.join(rel_path);
|
||||
if !abs.exists() {
|
||||
// File walked away — the watcher's reconciliation pass will
|
||||
// remove the orphan exif row eventually.
|
||||
continue;
|
||||
}
|
||||
match content_hash::compute(&abs) {
|
||||
Ok(id) => {
|
||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||
if let Err(e) =
|
||||
dao.backfill_content_hash(context, library.id, rel_path, &id.content_hash, id.size_bytes)
|
||||
{
|
||||
warn!(
|
||||
"face_watch: backfill_content_hash failed for {}: {:?}",
|
||||
rel_path, e
|
||||
);
|
||||
errors += 1;
|
||||
} else {
|
||||
backfilled += 1;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("face_watch: hash compute failed for {} ({:?})", abs.display(), e);
|
||||
errors += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if backfilled > 0 || errors > 0 || more_than_cap {
|
||||
info!(
|
||||
"face_watch: backfill pass for library '{}': hashed {} ({} error(s), {} skipped to other libraries; {} cap, more_remain={})",
|
||||
library.name, backfilled, errors, skipped_other_lib, cap, more_than_cap
|
||||
);
|
||||
}
|
||||
backfilled
|
||||
}
|
||||
|
||||
/// Per-tick face-detection drain. Pulls a capped batch of hashed-but-
|
||||
/// unscanned image_exif rows directly via the FaceDao anti-join and
|
||||
/// hands them to the existing detection pass. Runs on every tick (not
|
||||
/// just full scans) so the backlog moves at quick-scan cadence.
|
||||
fn process_face_backlog(
|
||||
context: &opentelemetry::Context,
|
||||
library: &libraries::Library,
|
||||
face_client: &crate::ai::face_client::FaceClient,
|
||||
face_dao: &Arc<Mutex<Box<dyn faces::FaceDao>>>,
|
||||
tag_dao: &Arc<Mutex<Box<dyn tags::TagDao>>>,
|
||||
excluded_dirs: &[String],
|
||||
) {
|
||||
let cap: i64 = dotenv::var("FACE_BACKLOG_MAX_PER_TICK")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.filter(|n: &i64| *n > 0)
|
||||
.unwrap_or(64);
|
||||
|
||||
let rows: Vec<(String, String)> = {
|
||||
let mut dao = face_dao.lock().expect("face dao");
|
||||
match dao.list_unscanned_candidates(context, library.id, cap) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"face_watch: list_unscanned_candidates failed for library '{}': {:?}",
|
||||
library.name, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
if rows.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
info!(
|
||||
"face_watch: backlog drain — running detection on {} candidate(s) for library '{}' (cap={})",
|
||||
rows.len(),
|
||||
library.name,
|
||||
cap
|
||||
);
|
||||
|
||||
let candidates: Vec<face_watch::FaceCandidate> = rows
|
||||
.into_iter()
|
||||
.map(|(rel_path, content_hash)| face_watch::FaceCandidate {
|
||||
rel_path,
|
||||
content_hash,
|
||||
})
|
||||
.collect();
|
||||
|
||||
face_watch::run_face_detection_pass(
|
||||
library,
|
||||
excluded_dirs,
|
||||
face_client,
|
||||
Arc::clone(face_dao),
|
||||
Arc::clone(tag_dao),
|
||||
candidates,
|
||||
);
|
||||
}
|
||||
|
||||
fn backfill_missing_content_hashes(
|
||||
context: &opentelemetry::Context,
|
||||
files: &[(PathBuf, String)],
|
||||
library: &libraries::Library,
|
||||
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
|
||||
) {
|
||||
let image_paths: Vec<String> = files
|
||||
.iter()
|
||||
.filter(|(p, _)| !is_video_file(p))
|
||||
.map(|(_, rel)| rel.clone())
|
||||
.collect();
|
||||
if image_paths.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let exif_records = {
|
||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||
dao.get_exif_batch(context, &image_paths)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
// Cheap lookup back from rel_path → absolute file_path so
|
||||
// content_hash::compute can read the bytes.
|
||||
let path_by_rel: HashMap<String, &PathBuf> =
|
||||
files.iter().map(|(p, rel)| (rel.clone(), p)).collect();
|
||||
|
||||
let cap: usize = dotenv::var("FACE_HASH_BACKFILL_MAX_PER_TICK")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.filter(|n: &usize| *n > 0)
|
||||
.unwrap_or(2000);
|
||||
|
||||
// Count the unhashed backlog up front so we can surface "still needs
|
||||
// backfill: N" in the log — without it, a face-scan that's stuck at
|
||||
// 44% looks stalled when really it's chipping through hashes.
|
||||
let unhashed_total = exif_records
|
||||
.iter()
|
||||
.filter(|r| r.content_hash.is_none())
|
||||
.count();
|
||||
|
||||
let mut backfilled = 0usize;
|
||||
let mut errors = 0usize;
|
||||
for record in &exif_records {
|
||||
// Cap on successes only — earlier this counted errors too, so a
|
||||
// pocket of chronically-unhashable files at the front of the
|
||||
// table (vanished mid-scan, permission denied, etc.) burned the
|
||||
// budget every tick and the rest of the backlog never advanced.
|
||||
// Errors are still bounded by `unhashed_total` (the loop walks
|
||||
// each unhashed record at most once per tick).
|
||||
if backfilled >= cap {
|
||||
break;
|
||||
}
|
||||
if record.content_hash.is_some() {
|
||||
continue;
|
||||
}
|
||||
let Some(file_path) = path_by_rel.get(&record.file_path) else {
|
||||
// Walked file went missing between the directory scan and now;
|
||||
// next tick will retry naturally.
|
||||
continue;
|
||||
};
|
||||
match content_hash::compute(file_path) {
|
||||
Ok(id) => {
|
||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||
if let Err(e) = dao.backfill_content_hash(
|
||||
context,
|
||||
library.id,
|
||||
&record.file_path,
|
||||
&id.content_hash,
|
||||
id.size_bytes,
|
||||
) {
|
||||
warn!(
|
||||
"face_watch: backfill_content_hash failed for {}: {:?}",
|
||||
record.file_path, e
|
||||
);
|
||||
errors += 1;
|
||||
} else {
|
||||
backfilled += 1;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
"face_watch: hash compute failed for {} ({:?})",
|
||||
file_path.display(),
|
||||
e
|
||||
);
|
||||
errors += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Always log when there's an unhashed backlog so an operator
|
||||
// looking at "scan stuck at 44%" can see backfill is running and
|
||||
// how much remains. Quiet only when there's nothing to do.
|
||||
if unhashed_total > 0 || backfilled > 0 || errors > 0 {
|
||||
let remaining = unhashed_total.saturating_sub(backfilled);
|
||||
info!(
|
||||
"face_watch: backfilled {}/{} content_hash for library '{}' ({} error(s); {} still need backfill; cap={})",
|
||||
backfilled, unhashed_total, library.name, errors, remaining, cap
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the face-detection candidate list for a scan tick.
|
||||
///
|
||||
/// We need `(rel_path, content_hash)` for every image file that has a
|
||||
/// content_hash recorded in image_exif but no row in face_detections yet.
|
||||
/// Re-querying image_exif here picks up rows the EXIF write loop just
|
||||
/// inserted alongside any pre-existing rows the watcher walked over —
|
||||
/// covers both new uploads and the initial backlog scan.
|
||||
fn build_face_candidates(
|
||||
context: &opentelemetry::Context,
|
||||
files: &[(PathBuf, String)],
|
||||
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
|
||||
face_dao: &Arc<Mutex<Box<dyn faces::FaceDao>>>,
|
||||
) -> Vec<face_watch::FaceCandidate> {
|
||||
// Restrict to image files; videos aren't face-scanned in v1 (kamadak
|
||||
// doesn't even register them in image_exif).
|
||||
let image_paths: Vec<String> = files
|
||||
.iter()
|
||||
.filter(|(p, _)| !is_video_file(p))
|
||||
.map(|(_, rel)| rel.clone())
|
||||
.collect();
|
||||
if image_paths.is_empty() {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let exif_records = {
|
||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||
dao.get_exif_batch(context, &image_paths)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
// rel_path → content_hash (only rows with a hash; without one we have
|
||||
// nothing to key face data against).
|
||||
let mut hash_by_path: HashMap<String, String> = HashMap::with_capacity(exif_records.len());
|
||||
for record in exif_records {
|
||||
if let Some(h) = record.content_hash {
|
||||
hash_by_path.insert(record.file_path, h);
|
||||
}
|
||||
}
|
||||
|
||||
let mut candidates = Vec::new();
|
||||
let mut dao = face_dao.lock().expect("face dao");
|
||||
for rel_path in image_paths {
|
||||
let Some(hash) = hash_by_path.get(&rel_path) else {
|
||||
continue;
|
||||
};
|
||||
match dao.already_scanned(context, hash) {
|
||||
Ok(true) => continue,
|
||||
Ok(false) => candidates.push(face_watch::FaceCandidate {
|
||||
rel_path,
|
||||
content_hash: hash.clone(),
|
||||
}),
|
||||
Err(e) => {
|
||||
warn!("face_watch: already_scanned errored for {}: {:?}", hash, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
candidates
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -23,7 +23,8 @@ use crate::utils::earliest_fs_time;
|
||||
|
||||
// Helper that encapsulates path-exclusion semantics
|
||||
#[derive(Debug)]
|
||||
struct PathExcluder {
|
||||
pub struct PathExcluder {
|
||||
base: PathBuf,
|
||||
excluded_dirs: Vec<PathBuf>,
|
||||
excluded_patterns: Vec<String>,
|
||||
}
|
||||
@@ -34,9 +35,12 @@ impl PathExcluder {
|
||||
/// Rules:
|
||||
/// - Entries starting with '/' are interpreted as "absolute under base"
|
||||
/// (e.g. "/photos/private" -> base/photos/private).
|
||||
/// - Entries without '/' are treated as substring patterns that match
|
||||
/// anywhere in the full path string (still scoped under base).
|
||||
fn new(base: &Path, raw_excluded: &[String]) -> Self {
|
||||
/// - Entries without '/' are treated as path-component patterns that
|
||||
/// match a directory or file name *under* `base`. The base prefix is
|
||||
/// stripped before matching so a system-level component (e.g. the
|
||||
/// `tmp` in `/tmp/...` when running tests) doesn't masquerade as a
|
||||
/// user-defined exclude.
|
||||
pub fn new(base: &Path, raw_excluded: &[String]) -> Self {
|
||||
let mut excluded_dirs = Vec::new();
|
||||
let mut excluded_patterns = Vec::new();
|
||||
|
||||
@@ -53,18 +57,19 @@ impl PathExcluder {
|
||||
}
|
||||
|
||||
debug!(
|
||||
"PathExcluder created. dirs={:?}, patterns={:?}",
|
||||
excluded_dirs, excluded_patterns
|
||||
"PathExcluder created. base={:?}, dirs={:?}, patterns={:?}",
|
||||
base, excluded_dirs, excluded_patterns
|
||||
);
|
||||
|
||||
Self {
|
||||
base: base.to_path_buf(),
|
||||
excluded_dirs,
|
||||
excluded_patterns,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if `path` should be excluded.
|
||||
fn is_excluded(&self, path: &Path) -> bool {
|
||||
pub fn is_excluded(&self, path: &Path) -> bool {
|
||||
// Directory-based exclusions
|
||||
for excluded in &self.excluded_dirs {
|
||||
if path.starts_with(excluded) {
|
||||
@@ -76,10 +81,16 @@ impl PathExcluder {
|
||||
}
|
||||
}
|
||||
|
||||
// Pattern-based exclusions: match whole path components (dir or file name),
|
||||
// not substrings.
|
||||
if !self.excluded_patterns.is_empty() {
|
||||
for component in path.components() {
|
||||
if self.excluded_patterns.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Strip the base prefix before scanning components. Without this,
|
||||
// every path component above `base` (e.g. `tmp` in `/tmp/test123`
|
||||
// under tempdir, or the user's `home` in `/home/user/Pictures`)
|
||||
// would match user-defined patterns and produce false positives.
|
||||
let scan_root = path.strip_prefix(&self.base).unwrap_or(path);
|
||||
for component in scan_root.components() {
|
||||
if let Some(comp_str) = component.as_os_str().to_str()
|
||||
&& self.excluded_patterns.iter().any(|pat| pat == comp_str)
|
||||
{
|
||||
@@ -90,7 +101,6 @@ impl PathExcluder {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
19
src/state.rs
19
src/state.rs
@@ -1,4 +1,5 @@
|
||||
use crate::ai::apollo_client::ApolloClient;
|
||||
use crate::ai::face_client::FaceClient;
|
||||
use crate::ai::insight_chat::{ChatLockMap, InsightChatService};
|
||||
use crate::ai::openrouter::OpenRouterClient;
|
||||
use crate::ai::{InsightGenerator, OllamaClient, SmsApiClient};
|
||||
@@ -48,6 +49,11 @@ pub struct AppState {
|
||||
pub insight_generator: InsightGenerator,
|
||||
/// Chat continuation service. Hold an Arc so handlers can clone cheaply.
|
||||
pub insight_chat: Arc<InsightChatService>,
|
||||
/// Face inference client (calls Apollo's `/api/internal/faces/*`).
|
||||
/// Disabled (`is_enabled() == false`) when neither `APOLLO_FACE_API_BASE_URL`
|
||||
/// nor `APOLLO_API_BASE_URL` is set; the file-watch hook (Phase 3) and
|
||||
/// manual-face-create handler short-circuit in that case.
|
||||
pub face_client: FaceClient,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
@@ -82,6 +88,7 @@ impl AppState {
|
||||
insight_generator: InsightGenerator,
|
||||
insight_chat: Arc<InsightChatService>,
|
||||
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
|
||||
face_client: FaceClient,
|
||||
) -> Self {
|
||||
assert!(
|
||||
!libraries_vec.is_empty(),
|
||||
@@ -115,6 +122,7 @@ impl AppState {
|
||||
sms_client,
|
||||
insight_generator,
|
||||
insight_chat,
|
||||
face_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,6 +169,15 @@ impl Default for AppState {
|
||||
// generator silently falls through to the legacy Nominatim path.
|
||||
let apollo_client = ApolloClient::new(env::var("APOLLO_API_BASE_URL").ok());
|
||||
|
||||
// Face inference client. Falls back to APOLLO_API_BASE_URL when
|
||||
// APOLLO_FACE_API_BASE_URL is unset (single-Apollo deploys are the
|
||||
// common case). Both unset = feature disabled, file-watch hook
|
||||
// and manual-face handlers short-circuit silently.
|
||||
let face_client_url = env::var("APOLLO_FACE_API_BASE_URL")
|
||||
.ok()
|
||||
.or_else(|| env::var("APOLLO_API_BASE_URL").ok());
|
||||
let face_client = FaceClient::new(face_client_url);
|
||||
|
||||
// Initialize DAOs
|
||||
let insight_dao: Arc<Mutex<Box<dyn InsightDao>>> =
|
||||
Arc::new(Mutex::new(Box::new(SqliteInsightDao::new())));
|
||||
@@ -244,6 +261,7 @@ impl Default for AppState {
|
||||
insight_generator,
|
||||
insight_chat,
|
||||
preview_dao,
|
||||
face_client,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -382,6 +400,7 @@ impl AppState {
|
||||
insight_generator,
|
||||
insight_chat,
|
||||
preview_dao,
|
||||
FaceClient::new(None), // disabled in test
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
250
src/tags.rs
250
src/tags.rs
@@ -32,6 +32,7 @@ where
|
||||
)
|
||||
.service(web::resource("image/tags/all").route(web::get().to(get_all_tags::<TagD>)))
|
||||
.service(web::resource("image/tags/batch").route(web::post().to(update_tags::<TagD>)))
|
||||
.service(web::resource("image/tags/lookup").route(web::post().to(lookup_tags_batch::<TagD>)))
|
||||
}
|
||||
|
||||
async fn add_tag<D: TagDao>(
|
||||
@@ -238,6 +239,149 @@ async fn update_tags<D: TagDao>(
|
||||
.into_http_internal_err()
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct LookupTagsBatchRequest {
|
||||
pub paths: Vec<String>,
|
||||
}
|
||||
|
||||
/// Bulk per-path tag lookup with cross-library content-hash sibling
|
||||
/// expansion. Apollo's photo-match flow used to fan out one
|
||||
/// ``GET /image/tags?path=`` per record (~4k for a wide window) —
|
||||
/// each call locked the dao briefly and the round-trip cost dwarfed
|
||||
/// the actual SQL. This collapses the whole fan-out into:
|
||||
///
|
||||
/// 1. one ``image_exif`` batch lookup → query path → content_hash
|
||||
/// 2. one ``image_exif`` JOIN by content_hash → all sibling rel_paths
|
||||
/// (so a tag applied under library A surfaces under library B
|
||||
/// when the content hashes match — important once a backup mount
|
||||
/// holds copies of files from the primary library)
|
||||
/// 3. one ``tagged_photo`` JOIN over the union of (query + sibling)
|
||||
/// rel_paths
|
||||
///
|
||||
/// Body: ``{paths: [...]}``; response: ``{path: [{id, name, ...}]}``
|
||||
/// with only paths that have at least one tag (caller treats absence
|
||||
/// as empty). Each chunk is capped to stay under SQLite's variable
|
||||
/// limit; five queries per 4k photos is still ~800x cheaper than
|
||||
/// per-path HTTP fan-out.
|
||||
async fn lookup_tags_batch<D: TagDao>(
|
||||
_: Claims,
|
||||
http_request: HttpRequest,
|
||||
body: web::Json<LookupTagsBatchRequest>,
|
||||
tag_dao: web::Data<Mutex<D>>,
|
||||
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
|
||||
) -> impl Responder {
|
||||
use std::collections::{HashMap, HashSet};
|
||||
let context = extract_context_from_request(&http_request);
|
||||
let span = global_tracer().start_with_context("lookup_tags_batch", &context);
|
||||
let span_context = opentelemetry::Context::current_with_span(span);
|
||||
|
||||
if body.paths.is_empty() {
|
||||
return HttpResponse::Ok().json(HashMap::<String, Vec<Tag>>::new());
|
||||
}
|
||||
|
||||
let query_paths: Vec<String> = body.paths.iter().map(|p| normalize_path(p)).collect();
|
||||
|
||||
// Stage 1: query → content_hash mapping. Files without a hash yet
|
||||
// (just-indexed, hash compute failed, etc.) skip the sibling
|
||||
// expansion and only get tags from their own rel_path.
|
||||
let exif_records = {
|
||||
let mut dao = exif_dao.lock().expect("Unable to get ExifDao");
|
||||
match dao.get_exif_batch(&span_context, &query_paths) {
|
||||
Ok(rows) => rows,
|
||||
Err(e) => {
|
||||
return HttpResponse::InternalServerError()
|
||||
.body(format!("exif batch lookup failed: {:?}", e));
|
||||
}
|
||||
}
|
||||
};
|
||||
let mut hash_by_path: HashMap<String, String> = HashMap::with_capacity(exif_records.len());
|
||||
for record in exif_records {
|
||||
if let Some(h) = record.content_hash {
|
||||
hash_by_path.insert(record.file_path, h);
|
||||
}
|
||||
}
|
||||
let unique_hashes: Vec<String> = hash_by_path
|
||||
.values()
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>()
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
// Stage 2: hash → all sibling rel_paths.
|
||||
let paths_by_hash = if unique_hashes.is_empty() {
|
||||
HashMap::new()
|
||||
} else {
|
||||
let mut dao = exif_dao.lock().expect("Unable to get ExifDao");
|
||||
match dao.get_rel_paths_for_hashes(&span_context, &unique_hashes) {
|
||||
Ok(map) => map,
|
||||
Err(e) => {
|
||||
return HttpResponse::InternalServerError()
|
||||
.body(format!("hash sibling lookup failed: {:?}", e));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Stage 3: build expanded path set and the reverse map
|
||||
// sibling → [original query paths whose tag bucket should include
|
||||
// the sibling's tags]. A query path always attributes to itself
|
||||
// (covers the no-content-hash case).
|
||||
let mut originals_by_sibling: HashMap<String, Vec<String>> = HashMap::new();
|
||||
let mut all_paths: HashSet<String> = HashSet::new();
|
||||
for query_path in &query_paths {
|
||||
all_paths.insert(query_path.clone());
|
||||
originals_by_sibling
|
||||
.entry(query_path.clone())
|
||||
.or_default()
|
||||
.push(query_path.clone());
|
||||
if let Some(hash) = hash_by_path.get(query_path)
|
||||
&& let Some(siblings) = paths_by_hash.get(hash)
|
||||
{
|
||||
for sibling in siblings {
|
||||
if sibling == query_path {
|
||||
continue;
|
||||
}
|
||||
all_paths.insert(sibling.clone());
|
||||
originals_by_sibling
|
||||
.entry(sibling.clone())
|
||||
.or_default()
|
||||
.push(query_path.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stage 4: tags grouped by rel_path for the union.
|
||||
let all_paths_vec: Vec<String> = all_paths.into_iter().collect();
|
||||
let tags_by_sibling = {
|
||||
let mut dao = tag_dao.lock().expect("Unable to get TagDao");
|
||||
match dao.get_tags_grouped_by_paths(&span_context, &all_paths_vec) {
|
||||
Ok(map) => map,
|
||||
Err(e) => {
|
||||
return HttpResponse::InternalServerError().body(format!("{}", e));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Stage 5: aggregate sibling tags back to original query paths,
|
||||
// de-duped by tag id. Empty buckets stay out of the response so
|
||||
// the caller's "missing key = []" contract holds.
|
||||
let mut result: HashMap<String, Vec<Tag>> = HashMap::new();
|
||||
for (sibling_path, originals) in originals_by_sibling {
|
||||
if let Some(tags) = tags_by_sibling.get(&sibling_path) {
|
||||
for orig in originals {
|
||||
let entry = result.entry(orig).or_default();
|
||||
for t in tags {
|
||||
if !entry.iter().any(|e| e.id == t.id) {
|
||||
entry.push(t.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
span_context.span().set_status(Status::Ok);
|
||||
HttpResponse::Ok().json(result)
|
||||
}
|
||||
|
||||
#[derive(Serialize, Queryable, Clone, Debug, PartialEq)]
|
||||
pub struct Tag {
|
||||
pub id: i32,
|
||||
@@ -317,6 +461,14 @@ pub trait TagDao: Send + Sync {
|
||||
context: &opentelemetry::Context,
|
||||
paths: &[String],
|
||||
) -> anyhow::Result<Vec<Tag>>;
|
||||
/// Per-path grouped lookup: ``rel_path → [tags]``. Used by the
|
||||
/// ``/image/tags/lookup`` batch endpoint. Returns only paths that
|
||||
/// have at least one tag; the caller treats absence as empty.
|
||||
fn get_tags_grouped_by_paths(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
paths: &[String],
|
||||
) -> anyhow::Result<std::collections::HashMap<String, Vec<Tag>>>;
|
||||
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag>;
|
||||
fn remove_tag(
|
||||
&mut self,
|
||||
@@ -470,6 +622,51 @@ impl TagDao for SqliteTagDao {
|
||||
})
|
||||
}
|
||||
|
||||
fn get_tags_grouped_by_paths(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
paths: &[String],
|
||||
) -> anyhow::Result<std::collections::HashMap<String, Vec<Tag>>> {
|
||||
use std::collections::HashMap;
|
||||
let mut out: HashMap<String, Vec<Tag>> = HashMap::new();
|
||||
if paths.is_empty() {
|
||||
return Ok(out);
|
||||
}
|
||||
let mut conn = self
|
||||
.connection
|
||||
.lock()
|
||||
.expect("Unable to lock SqliteTagDao connection");
|
||||
trace_db_call(context, "query", "get_tags_grouped_by_paths", |span| {
|
||||
span.set_attribute(KeyValue::new("path_count", paths.len() as i64));
|
||||
// SQLite's default SQLITE_LIMIT_VARIABLE_NUMBER is 32766 in
|
||||
// modern builds (999 in old ones). Chunk at 500 to stay
|
||||
// safely under both — five queries for a 4k-photo grid is
|
||||
// still ~800x cheaper than 4k single-row HTTP calls.
|
||||
const CHUNK: usize = 500;
|
||||
for chunk in paths.chunks(CHUNK) {
|
||||
let rows: Vec<(String, i32, String, i64)> = tagged_photo::table
|
||||
.inner_join(tags::table)
|
||||
.filter(tagged_photo::rel_path.eq_any(chunk))
|
||||
.select((
|
||||
tagged_photo::rel_path,
|
||||
tags::id,
|
||||
tags::name,
|
||||
tags::created_time,
|
||||
))
|
||||
.get_results(conn.deref_mut())
|
||||
.with_context(|| "Unable to get tags grouped from Sqlite")?;
|
||||
for (rel_path, id, name, created_time) in rows {
|
||||
out.entry(rel_path).or_default().push(Tag {
|
||||
id,
|
||||
name,
|
||||
created_time,
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(out)
|
||||
})
|
||||
}
|
||||
|
||||
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag> {
|
||||
let mut conn = self
|
||||
.connection
|
||||
@@ -893,6 +1090,23 @@ mod tests {
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
fn get_tags_grouped_by_paths(
|
||||
&mut self,
|
||||
_context: &opentelemetry::Context,
|
||||
paths: &[String],
|
||||
) -> anyhow::Result<std::collections::HashMap<String, Vec<Tag>>> {
|
||||
let tagged = self.tagged_photos.borrow();
|
||||
let mut out = std::collections::HashMap::new();
|
||||
for p in paths {
|
||||
if let Some(tags) = tagged.get(p)
|
||||
&& !tags.is_empty()
|
||||
{
|
||||
out.insert(p.clone(), tags.clone());
|
||||
}
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
fn create_tag(
|
||||
&mut self,
|
||||
_context: &opentelemetry::Context,
|
||||
@@ -1026,6 +1240,42 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn get_tags_grouped_by_paths_returns_per_path_buckets() {
|
||||
// Backstop for the batch tag-lookup endpoint: confirms the
|
||||
// grouped variant returns one bucket per path with at least
|
||||
// one tag, and omits paths with no tags entirely (the caller
|
||||
// treats absence as []). The handler stacks sibling expansion
|
||||
// on top via image_exif content_hash; the DAO method itself
|
||||
// just needs to honour rel_path → tags directly.
|
||||
let mut dao = TestTagDao::new();
|
||||
let ctx = opentelemetry::Context::current();
|
||||
// Seed: two paths tagged, one path untagged.
|
||||
dao.tagged_photos.borrow_mut().insert(
|
||||
"a.jpg".into(),
|
||||
vec![Tag { id: 1, name: "alpha".into(), created_time: 0 }],
|
||||
);
|
||||
dao.tagged_photos.borrow_mut().insert(
|
||||
"b.jpg".into(),
|
||||
vec![
|
||||
Tag { id: 2, name: "beta".into(), created_time: 0 },
|
||||
Tag { id: 3, name: "gamma".into(), created_time: 0 },
|
||||
],
|
||||
);
|
||||
let grouped = dao
|
||||
.get_tags_grouped_by_paths(
|
||||
&ctx,
|
||||
&["a.jpg".into(), "b.jpg".into(), "c.jpg".into()],
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(grouped.get("a.jpg").map(|v| v.len()), Some(1));
|
||||
assert_eq!(grouped.get("b.jpg").map(|v| v.len()), Some(2));
|
||||
assert!(
|
||||
!grouped.contains_key("c.jpg"),
|
||||
"untagged paths must be absent so caller's missing-key=[] contract holds"
|
||||
);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn add_new_tag_test() {
|
||||
let tag_dao = TestTagDao::new();
|
||||
|
||||
Reference in New Issue
Block a user