feature/clip-semantic-search #96

Merged
cameron merged 5 commits from feature/clip-semantic-search into master 2026-05-16 00:32:33 +00:00
18 changed files with 1644 additions and 2 deletions

View File

@@ -54,10 +54,12 @@ AGENTIC_CHAT_MAX_ITERATIONS=6
# OPENROUTER_APP_TITLE=ImageApi
# ── AI Insights — sibling services (optional) ───────────────────────────
# Apollo (places + face inference). Single Apollo deploys typically set
# only APOLLO_API_BASE_URL and let the face client fall back to it.
# Apollo (places, face inference, CLIP encoders). Single-Apollo deploys
# typically set only APOLLO_API_BASE_URL and let the face + CLIP
# clients fall back to it.
# APOLLO_API_BASE_URL=http://apollo.lan:8000
# APOLLO_FACE_API_BASE_URL=http://apollo.lan:8000
# APOLLO_CLIP_API_BASE_URL=http://apollo.lan:8000
# SMS_API_URL=http://localhost:8000
# SMS_API_TOKEN=
@@ -80,6 +82,23 @@ FACE_DETECT_TIMEOUT_SEC=60
FACE_BACKLOG_MAX_PER_TICK=64
FACE_HASH_BACKFILL_MAX_PER_TICK=2000
# ── CLIP semantic photo search ──────────────────────────────────────────
# ImageApi calls Apollo's /api/internal/clip/{encode_image,encode_text}
# to populate per-photo embeddings during the watcher's backlog drain
# and to encode user queries at /photos/search time. Disabled when
# neither APOLLO_CLIP_API_BASE_URL nor APOLLO_API_BASE_URL is set.
#
# Per-watcher-tick cap on the encode drain. Default 32 ≈ ~1 photo/sec
# on CPU, ~30 photos/sec on a single-GPU host (Apollo's threadpool
# is 1 on CUDA, so concurrency is bounded server-side regardless of
# our setting). Bump on a fresh deploy to clear the backlog faster.
CLIP_BACKLOG_MAX_PER_TICK=32
# Client-side parallel encode calls per drain pass. Apollo's GPU pool
# serializes server-side; this just overlaps file-IO with inference.
CLIP_ENCODE_CONCURRENCY=4
# Per-encode HTTP timeout. CPU-only Apollo deploys may need higher.
CLIP_REQUEST_TIMEOUT_SEC=60
# ── RAG / search ────────────────────────────────────────────────────────
# Set to `1` to enable cross-encoder reranking on /search results.
SEARCH_RAG_RERANK=0

View File

@@ -0,0 +1,3 @@
DROP INDEX IF EXISTS idx_image_exif_clip_backfill;
ALTER TABLE image_exif DROP COLUMN clip_model_version;
ALTER TABLE image_exif DROP COLUMN clip_embedding;

View File

@@ -0,0 +1,27 @@
-- CLIP semantic photo search: store a per-photo image embedding so
-- text queries can rerank against the live library via cosine
-- similarity. Apollo encodes the bytes via its CLIP service; ImageApi
-- writes the resulting blob here.
--
-- `clip_embedding` is the raw little-endian float32 buffer of an
-- L2-normalized vector (dim depends on the model — 768 bytes×4 for
-- ViT-L/14, 512 bytes×4 for ViT-B/32). Apollo always returns the
-- normalized form so the search-time dot product reduces to a plain
-- cosine similarity.
--
-- `clip_model_version` echoes the upstream `APOLLO_CLIP_MODEL` (e.g.
-- "ViT-L/14"). A model swap shouldn't silently mix geometries — the
-- backfill drain will re-eligibilize rows whose stored model_version
-- differs from the live engine's, and the search route refuses to
-- mix rows from two model_versions in the same response.
ALTER TABLE image_exif ADD COLUMN clip_embedding BLOB;
ALTER TABLE image_exif ADD COLUMN clip_model_version TEXT;
-- Partial index for the backfill drain. Mirrors the shape of
-- `idx_image_exif_date_backfill`: candidate rows are those with a
-- known content_hash (so we don't race the unhashed backlog) but no
-- embedding yet. SELECT cost stays O(missing rows) instead of full
-- table scan once the column is mostly populated.
CREATE INDEX IF NOT EXISTS idx_image_exif_clip_backfill
ON image_exif (id)
WHERE clip_embedding IS NULL AND content_hash IS NOT NULL;

392
src/ai/clip_client.rs Normal file
View File

@@ -0,0 +1,392 @@
//! Thin async HTTP client for Apollo's `/api/internal/clip/*` endpoints.
//!
//! Apollo hosts the OpenAI CLIP inference service (ViT-L/14 by default,
//! configurable via `APOLLO_CLIP_MODEL`). This client is the ImageApi side
//! of the contract: shove image bytes through `/encode_image` to populate
//! `image_exif.clip_embedding` during backfill, and call `/encode_text` to
//! encode a user's natural-language query at search time. The actual
//! cosine-similarity rerank runs locally in ImageApi.
//!
//! Mirrors `face_client.rs` / `tag_client.rs` shape: optional base URL
//! (None = disabled — feature off, drain and search no-op), reqwest
//! client with a generous timeout because GPU inference under a backlog
//! can queue server-side (Apollo's threadpool is bounded to 1 worker on
//! CUDA).
//!
//! Configured via `APOLLO_CLIP_API_BASE_URL`, falling back to
//! `APOLLO_API_BASE_URL` when the dedicated var is unset (single-Apollo
//! deploys are the common case).
//!
//! Wire format:
//! - `/encode_image`: multipart/form-data with `file=<bytes>` and
//! `meta=<json>` (content_hash / library_id / rel_path for logging).
//! - `/encode_text`: JSON `{"text": "<query>"}`.
//!
//! Both return `{model_version, embedding_dim, duration_ms, embedding}`
//! where `embedding` is base64 of `dim×4` little-endian float32 bytes,
//! L2-normalized so the rerank reduces to a plain dot product.
//!
//! Error mapping (reflected in [`ClipError`]):
//! - 422 `decode_failed` / `empty_text` → permanent: ImageApi marks the
//! row failed or surfaces the empty-query error to the search caller.
//! - 503 `cuda_oom` / `engine_unavailable` → defer-and-retry: no marker.
//! - Any other 5xx / network error → defer.
use anyhow::{Context, Result};
use base64::Engine;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Clone, Serialize)]
pub struct EncodeImageMeta {
pub content_hash: String,
pub library_id: i32,
pub rel_path: String,
}
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)] // duration_ms logged by the backfill drain
pub struct EncodeResponse {
pub model_version: String,
pub embedding_dim: i32,
pub duration_ms: i64,
/// base64 of `embedding_dim * 4` bytes (LE float32). ImageApi stores
/// the decoded bytes verbatim as a BLOB.
pub embedding: String,
}
impl EncodeResponse {
/// Decode the wire-format embedding back into raw bytes for storage.
/// Validates the buffer is `embedding_dim * 4` bytes long so a
/// malformed response surfaces here rather than as a downstream
/// silent length mismatch.
pub fn decode_embedding(&self) -> Result<Vec<u8>> {
let bytes = base64::engine::general_purpose::STANDARD
.decode(self.embedding.as_bytes())
.context("clip embedding base64 decode")?;
let expected = (self.embedding_dim as usize) * 4;
if bytes.len() != expected {
anyhow::bail!(
"clip embedding wrong size: got {} bytes, expected {} ({} * 4)",
bytes.len(),
expected,
self.embedding_dim
);
}
Ok(bytes)
}
}
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)] // load_error consumed by future health probe
pub struct ClipHealth {
pub loaded: bool,
pub device: String,
pub model_version: String,
pub embedding_dim: i32,
#[serde(default)]
pub load_error: Option<String>,
}
#[derive(Debug)]
pub enum ClipError {
/// Apollo refused for a reason that won't change on retry (decode
/// failure on /encode_image, empty text on /encode_text).
Permanent(anyhow::Error),
/// Apollo couldn't process this turn but might next time (CUDA OOM,
/// engine not loaded, network hiccup).
Transient(anyhow::Error),
/// Feature is disabled (no `APOLLO_CLIP_API_BASE_URL` /
/// `APOLLO_API_BASE_URL`).
Disabled,
}
impl std::fmt::Display for ClipError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ClipError::Permanent(e) => write!(f, "permanent: {e}"),
ClipError::Transient(e) => write!(f, "transient: {e}"),
ClipError::Disabled => write!(f, "clip client disabled"),
}
}
}
impl std::error::Error for ClipError {}
#[derive(Clone)]
pub struct ClipClient {
client: Client,
base_url: Option<String>,
}
impl ClipClient {
pub fn new(base_url: Option<String>) -> Self {
let timeout_secs = std::env::var("CLIP_REQUEST_TIMEOUT_SEC")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(60);
let client = Client::builder()
.timeout(Duration::from_secs(timeout_secs))
.build()
.expect("reqwest client build");
Self {
client,
base_url: base_url.map(|u| u.trim_end_matches('/').to_string()),
}
}
/// Read both standard env vars. `APOLLO_CLIP_API_BASE_URL` wins;
/// fallback to `APOLLO_API_BASE_URL`. Both unset → disabled.
pub fn from_env() -> Self {
let base = std::env::var("APOLLO_CLIP_API_BASE_URL")
.ok()
.filter(|s| !s.trim().is_empty())
.or_else(|| {
std::env::var("APOLLO_API_BASE_URL")
.ok()
.filter(|s| !s.trim().is_empty())
});
Self::new(base)
}
pub fn is_enabled(&self) -> bool {
self.base_url.is_some()
}
/// Encode an image to a 768-d (ViT-L/14) or 512-d (ViT-B/32)
/// L2-normalized embedding. Used by the backfill drain.
pub async fn encode_image(
&self,
bytes: Vec<u8>,
meta: EncodeImageMeta,
) -> std::result::Result<EncodeResponse, ClipError> {
let Some(base) = self.base_url.as_deref() else {
return Err(ClipError::Disabled);
};
let url = format!("{}/api/internal/clip/encode_image", base);
let meta_json = serde_json::to_string(&meta)
.map_err(|e| ClipError::Permanent(anyhow::anyhow!("meta serialize: {e}")))?;
let form = reqwest::multipart::Form::new()
.text("meta", meta_json)
.part(
"file",
reqwest::multipart::Part::bytes(bytes)
.file_name(meta.rel_path.clone())
.mime_str("application/octet-stream")
.unwrap_or_else(|_| reqwest::multipart::Part::bytes(Vec::new())),
);
self.send_multipart(&url, form).await
}
/// Encode a natural-language query to an embedding. Used by the
/// search route to rank stored image embeddings by cosine sim.
pub async fn encode_text(&self, text: &str) -> std::result::Result<EncodeResponse, ClipError> {
let Some(base) = self.base_url.as_deref() else {
return Err(ClipError::Disabled);
};
let url = format!("{}/api/internal/clip/encode_text", base);
let body = serde_json::json!({ "text": text });
let resp = match self.client.post(&url).json(&body).send().await {
Ok(r) => r,
Err(e) if e.is_timeout() || e.is_connect() => {
return Err(ClipError::Transient(anyhow::anyhow!(
"clip client network: {e}"
)));
}
Err(e) => {
return Err(ClipError::Transient(anyhow::anyhow!(
"clip client request: {e}"
)));
}
};
let status = resp.status();
if status.is_success() {
let body: EncodeResponse = resp
.json()
.await
.map_err(|e| ClipError::Transient(anyhow::anyhow!("clip response decode: {e}")))?;
return Ok(body);
}
let body_text = resp.text().await.unwrap_or_default();
Err(classify_error_response(status.as_u16(), &body_text))
}
/// Engine reachability + device/model report. Used as a startup
/// sanity check from the probe binary and (later) the backlog drain.
#[allow(dead_code)] // consumed by probe + drain
pub async fn health(&self) -> Result<ClipHealth> {
let base = self.base_url.as_deref().context("clip client disabled")?;
let url = format!("{}/api/internal/clip/health", base);
let resp = self.client.get(&url).send().await?.error_for_status()?;
let body: ClipHealth = resp.json().await?;
Ok(body)
}
async fn send_multipart(
&self,
url: &str,
form: reqwest::multipart::Form,
) -> std::result::Result<EncodeResponse, ClipError> {
let resp = match self.client.post(url).multipart(form).send().await {
Ok(r) => r,
Err(e) if e.is_timeout() || e.is_connect() => {
return Err(ClipError::Transient(anyhow::anyhow!(
"clip client network: {e}"
)));
}
Err(e) => {
return Err(ClipError::Transient(anyhow::anyhow!(
"clip client request: {e}"
)));
}
};
let status = resp.status();
if status.is_success() {
let body: EncodeResponse = resp
.json()
.await
.map_err(|e| ClipError::Transient(anyhow::anyhow!("clip response decode: {e}")))?;
return Ok(body);
}
let body_text = resp.text().await.unwrap_or_default();
Err(classify_error_response(status.as_u16(), &body_text))
}
}
/// Pulled out as a pure function so the marker-row contract is unit-
/// testable without spinning up an HTTP server. Matches the shape used
/// by face_client::classify_error_response so future retry policies
/// can share code.
fn classify_error_response(status: u16, body_text: &str) -> ClipError {
let detail_code = serde_json::from_str::<serde_json::Value>(body_text)
.ok()
.and_then(|v| {
v.get("detail")
.and_then(|d| d.as_str().map(str::to_string))
.or_else(|| {
v.get("detail")
.and_then(|d| d.get("code"))
.and_then(|c| c.as_str())
.map(str::to_string)
})
})
.unwrap_or_default();
if status == 422 {
return ClipError::Permanent(anyhow::anyhow!(
"clip {} {}: {}",
status,
detail_code,
body_text
));
}
if status == 503 {
return ClipError::Transient(anyhow::anyhow!(
"clip {} {}: {}",
status,
detail_code,
body_text
));
}
// 408 / 413 / 429 are operator-fixable infra issues; defer.
if matches!(status, 408 | 413 | 429) {
return ClipError::Transient(anyhow::anyhow!(
"clip {} {}: {}",
status,
detail_code,
body_text
));
}
if (400..500).contains(&status) {
ClipError::Permanent(anyhow::anyhow!(
"clip {} {}: {}",
status,
detail_code,
body_text
))
} else {
ClipError::Transient(anyhow::anyhow!(
"clip {} {}: {}",
status,
detail_code,
body_text
))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn is_permanent(e: &ClipError) -> bool {
matches!(e, ClipError::Permanent(_))
}
fn is_transient(e: &ClipError) -> bool {
matches!(e, ClipError::Transient(_))
}
#[test]
fn classify_422_decode_failed_is_permanent() {
assert!(is_permanent(&classify_error_response(
422,
r#"{"detail":"decode_failed: bad bytes"}"#
)));
}
#[test]
fn classify_422_empty_text_is_permanent() {
assert!(is_permanent(&classify_error_response(
422,
r#"{"detail":"empty_text"}"#
)));
}
#[test]
fn classify_503_cuda_oom_is_transient() {
assert!(is_transient(&classify_error_response(
503,
r#"{"detail":{"code":"cuda_oom","error":"out of memory"}}"#,
)));
}
#[test]
fn classify_5xx_is_transient_other_4xx_is_permanent() {
assert!(is_transient(&classify_error_response(500, "")));
assert!(is_permanent(&classify_error_response(404, "{}")));
}
#[test]
fn classify_infra_4xx_is_transient() {
assert!(is_transient(&classify_error_response(408, "")));
assert!(is_transient(&classify_error_response(413, "<html>")));
assert!(is_transient(&classify_error_response(429, "{}")));
}
#[test]
fn decode_embedding_size_mismatch_errors() {
// dim=4 says we expect 16 bytes (4 floats × 4 bytes). Encode 8.
use base64::Engine;
let resp = EncodeResponse {
model_version: "ViT-L/14".into(),
embedding_dim: 4,
duration_ms: 0,
embedding: base64::engine::general_purpose::STANDARD.encode([0u8; 8]),
};
assert!(resp.decode_embedding().is_err());
}
#[test]
fn decode_embedding_round_trip() {
use base64::Engine;
let bytes: Vec<u8> = (0..16).collect();
let resp = EncodeResponse {
model_version: "ViT-L/14".into(),
embedding_dim: 4,
duration_ms: 0,
embedding: base64::engine::general_purpose::STANDARD.encode(&bytes),
};
assert_eq!(resp.decode_embedding().unwrap(), bytes);
}
}

View File

@@ -2184,6 +2184,8 @@ mod tests {
date_taken_source: None,
original_date_taken: None,
original_date_taken_source: None,
clip_embedding: None,
clip_model_version: None,
});
let out = resolve_date_taken_for_context(&exif, "Screenshot_2014-06-01.png");
assert_eq!(out.as_deref(), Some("2021-08-15"));

View File

@@ -1,4 +1,5 @@
pub mod apollo_client;
pub mod clip_client;
pub mod daily_summary_job;
pub mod face_client;
pub mod handlers;

View File

@@ -220,6 +220,76 @@ pub fn backfill_missing_date_taken(
/// unscanned image_exif rows directly via the FaceDao anti-join and
/// hands them to the existing detection pass. Runs on every tick (not
/// just full scans) so the backlog moves at quick-scan cadence.
/// Per-tick CLIP encoding drain. Mirrors `process_face_backlog`: pull
/// up to `CLIP_BACKLOG_MAX_PER_TICK` candidates with a known
/// `content_hash` but no `clip_embedding`, hand them to
/// `clip_watch::run_clip_encoding_pass` for parallel fan-out, and let
/// that module write the result back via `backfill_clip_embedding`.
///
/// Idempotent — a row stays in the candidate set until its embedding
/// lands, so a transient failure (Apollo unreachable, CUDA OOM) just
/// defers to the next tick. Permanent failures (un-decodable bytes)
/// retry every tick at this point; future Branch may add a status
/// column like face_detections has.
pub fn process_clip_backlog(
context: &opentelemetry::Context,
library: &libraries::Library,
clip_client: &crate::ai::clip_client::ClipClient,
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
excluded_dirs: &[String],
) {
if !clip_client.is_enabled() {
return;
}
let cap: i64 = dotenv::var("CLIP_BACKLOG_MAX_PER_TICK")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &i64| *n > 0)
.unwrap_or(32);
let rows: Vec<(String, String)> = {
let mut dao = exif_dao.lock().expect("exif dao");
match dao.list_clip_unencoded_candidates(context, library.id, cap) {
Ok(r) => r,
Err(e) => {
warn!(
"clip_watch: list_clip_unencoded_candidates failed for library '{}': {:?}",
library.name, e
);
return;
}
}
};
if rows.is_empty() {
return;
}
info!(
"clip_watch: backlog drain — encoding {} candidate(s) for library '{}' (cap={})",
rows.len(),
library.name,
cap
);
let candidates: Vec<crate::clip_watch::ClipCandidate> = rows
.into_iter()
.map(
|(rel_path, content_hash)| crate::clip_watch::ClipCandidate {
rel_path,
content_hash,
},
)
.collect();
crate::clip_watch::run_clip_encoding_pass(
library,
excluded_dirs,
clip_client,
Arc::clone(exif_dao),
candidates,
);
}
pub fn process_face_backlog(
context: &opentelemetry::Context,
library: &libraries::Library,

View File

@@ -0,0 +1,273 @@
//! Probe binary for CLIP semantic search.
//!
//! No DB writes. Walks a library's `image_exif` rows, encodes a sample
//! via Apollo's `/encode_image`, encodes the user's --query via
//! `/encode_text`, and prints the top-K most similar photos by cosine
//! similarity so the operator can eyeball quality before committing to
//! the persistence phase (column populated by backlog drain, search
//! endpoint, UI).
//!
//! Usage:
//! cargo run --release --bin probe_clip_search -- \
//! --library 1 --limit 200 --query "a beach at sunset" --top 10
//!
//! Env: standard ImageApi `.env`. Requires either
//! `APOLLO_CLIP_API_BASE_URL` or `APOLLO_API_BASE_URL` to be set.
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use clap::Parser;
use log::{info, warn};
use image_api::ai::clip_client::{ClipClient, ClipError, EncodeImageMeta};
use image_api::database::{ExifDao, SqliteExifDao, connect};
use image_api::exif;
use image_api::file_types;
use image_api::libraries::{self, Library};
#[derive(Parser, Debug)]
#[command(name = "probe_clip_search")]
#[command(about = "Top-K CLIP semantic search over a sample of image_exif rows")]
struct Args {
/// Library id to sample from.
#[arg(long)]
library: i32,
/// Max files to encode. CPU inference is slow (~1-3 s per photo at
/// ViT-L/14); start small and grow once GPU is sorted.
#[arg(long, default_value_t = 50)]
limit: usize,
/// Natural-language query. Empty triggers an error from Apollo.
#[arg(long)]
query: String,
/// How many top results to print.
#[arg(long, default_value_t = 10)]
top: usize,
/// Offset into the library's rel_path listing.
#[arg(long, default_value_t = 0)]
offset: i64,
/// How many DB rows to scan before giving up on hitting the limit.
#[arg(long, default_value_t = 5000)]
max_scan: i64,
}
/// Same as `face_watch::read_image_bytes_for_detect` (which is pub(crate)).
/// Inlined for the throwaway probe.
fn read_image_bytes(path: &Path) -> std::io::Result<Vec<u8>> {
if file_types::needs_ffmpeg_thumbnail(path)
&& let Some(preview) = exif::extract_embedded_jpeg_preview(path)
{
return Ok(preview);
}
std::fs::read(path)
}
/// Decode a base64'd LE float32 vector to a `Vec<f32>`.
fn decode_f32_vec(b64: &str) -> anyhow::Result<Vec<f32>> {
use base64::Engine;
let bytes = base64::engine::general_purpose::STANDARD.decode(b64.as_bytes())?;
if bytes.len() % 4 != 0 {
anyhow::bail!("embedding byte length {} not divisible by 4", bytes.len());
}
let mut out = Vec::with_capacity(bytes.len() / 4);
for chunk in bytes.chunks_exact(4) {
out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
}
Ok(out)
}
/// Plain dot product. Apollo L2-normalizes both sides, so this is cosine sim.
fn dot(a: &[f32], b: &[f32]) -> f32 {
a.iter().zip(b.iter()).map(|(x, y)| x * y).sum()
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
dotenv::dotenv().ok();
let args = Args::parse();
if args.query.trim().is_empty() {
anyhow::bail!("--query must not be empty");
}
let client = ClipClient::from_env();
if !client.is_enabled() {
anyhow::bail!(
"ClipClient disabled: set APOLLO_CLIP_API_BASE_URL or APOLLO_API_BASE_URL in .env"
);
}
match client.health().await {
Ok(h) => info!(
"clip engine: loaded={} device={} model={} dim={}",
h.loaded, h.device, h.model_version, h.embedding_dim
),
Err(e) => warn!("health probe failed (continuing): {e}"),
}
let mut seed_conn = connect();
if let Some(base) = dotenv::var("BASE_PATH").ok().as_deref() {
libraries::seed_or_patch_from_env(&mut seed_conn, base);
}
let libs = libraries::load_all(&mut seed_conn);
drop(seed_conn);
let lib: Library = libs
.into_iter()
.find(|l| l.id == args.library)
.ok_or_else(|| anyhow::anyhow!("library id {} not found", args.library))?;
info!(
"probing library #{} ({}) at {}",
lib.id, lib.name, lib.root_path
);
let dao: Arc<Mutex<Box<dyn ExifDao>>> = Arc::new(Mutex::new(Box::new(SqliteExifDao::new())));
let ctx = opentelemetry::Context::new();
// Encode the query up-front so the long image-encode loop doesn't
// race a slow query encode. Fails fast on a misspelled query.
let query_resp = client
.encode_text(&args.query)
.await
.map_err(|e| anyhow::anyhow!("encode_text: {e}"))?;
let query_vec = decode_f32_vec(&query_resp.embedding)?;
info!(
"query encoded ({}d, {}ms): {:?}",
query_resp.embedding_dim, query_resp.duration_ms, args.query
);
// Page through (id, rel_path), filter to images on disk, encode up
// to `limit`. Each encoded photo gets scored against the query and
// kept in a top-K heap.
const PAGE: i64 = 500;
let mut offset = args.offset;
let mut scanned: i64 = 0;
let mut encoded = 0usize;
let mut perm_fail = 0usize;
let mut transient_fail = 0usize;
let root = PathBuf::from(&lib.root_path);
let started = Instant::now();
// (similarity, rel_path) — we keep all scored results and sort at
// the end. With limit≤few-hundred this is trivial.
let mut scores: Vec<(f32, String)> = Vec::with_capacity(args.limit);
'outer: loop {
if scanned >= args.max_scan {
warn!(
"scan cap ({}) reached before hitting limit ({}); bump --max-scan to scan deeper",
args.max_scan, args.limit
);
break;
}
let rows = {
let mut guard = dao.lock().expect("dao lock");
guard
.list_rel_paths_for_library_page(&ctx, lib.id, PAGE, offset)
.map_err(|e| anyhow::anyhow!("list rel_paths: {:?}", e))?
};
if rows.is_empty() {
info!("no more rows after offset {}", offset);
break;
}
offset += rows.len() as i64;
scanned += rows.len() as i64;
for (_id, rel_path) in rows {
if encoded >= args.limit {
break 'outer;
}
let abs = root.join(&rel_path);
if !file_types::is_image_file(&abs) || !abs.exists() {
continue;
}
let bytes = match read_image_bytes(&abs) {
Ok(b) => b,
Err(e) => {
warn!("read {rel_path}: {e}");
continue;
}
};
let meta = EncodeImageMeta {
content_hash: String::new(),
library_id: lib.id,
rel_path: rel_path.clone(),
};
let call_start = Instant::now();
match client.encode_image(bytes, meta).await {
Ok(resp) => {
encoded += 1;
let vec = match decode_f32_vec(&resp.embedding) {
Ok(v) => v,
Err(e) => {
warn!("decode {rel_path}: {e}");
continue;
}
};
if vec.len() != query_vec.len() {
warn!(
"dim mismatch for {rel_path}: image={} query={}",
vec.len(),
query_vec.len()
);
continue;
}
let sim = dot(&vec, &query_vec);
scores.push((sim, rel_path.clone()));
if encoded % 10 == 0 {
info!(
"progress: {} encoded, {:.1}s elapsed",
encoded,
started.elapsed().as_secs_f32()
);
}
let _ = call_start;
}
Err(ClipError::Permanent(e)) => {
perm_fail += 1;
warn!("permanent encode failure for {rel_path}: {e}");
}
Err(ClipError::Transient(e)) => {
transient_fail += 1;
warn!("transient encode failure for {rel_path}: {e}");
}
Err(ClipError::Disabled) => {
anyhow::bail!("clip client became disabled mid-run; impossible");
}
}
}
}
scores.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
let elapsed = started.elapsed();
println!();
println!(
"── top {} for query: {:?} ──",
args.top.min(scores.len()),
args.query
);
for (i, (sim, path)) in scores.iter().take(args.top).enumerate() {
println!("[{:>2}] sim={:.3} {}", i + 1, sim, path);
}
println!();
println!("── summary ─────────────────────────────────────");
println!("query : {:?}", args.query);
println!("scanned rows : {scanned}");
println!("encoded photos : {encoded}");
println!("permanent failures : {perm_fail}");
println!("transient failures : {transient_fail}");
println!("elapsed : {:.1}s", elapsed.as_secs_f32());
if encoded > 0 {
println!(
"throughput : {:.2} photos/s ({:.0}ms/photo avg)",
encoded as f32 / elapsed.as_secs_f32().max(0.001),
elapsed.as_millis() as f32 / encoded as f32
);
}
Ok(())
}

321
src/clip_search.rs Normal file
View File

@@ -0,0 +1,321 @@
//! `/photos/search?q=<text>` — CLIP semantic photo search.
//!
//! The route lives outside `files.rs` to keep that 1500+ line module
//! focused on EXIF / tag listing. The flow is:
//!
//! 1. Parse query params (`q`, `limit`, `threshold`, optional `library`).
//! 2. Call Apollo's `/api/internal/clip/encode_text` to get the query
//! vector (L2-normalized 768-d f32 for ViT-L/14).
//! 3. Load every `(content_hash, clip_embedding)` for the scope from
//! `image_exif` via `ExifDao::list_clip_index`. ~2843 MB for a 14k
//! library at ViT-L/14; loaded fresh per request — fast enough for
//! v1, optimize via an AppState cache later if needed.
//! 4. Dot product (= cosine since both sides are L2-normalized), filter
//! above `threshold`, top-K by score.
//! 5. Resolve each surviving hash back to a `(library_id, rel_path)` so
//! the frontend can render the photo / hand off to the carousel.
//!
//! Response shape is intentionally minimal — paths + score — so the
//! frontend can reuse existing PhotoGrid rendering by joining against
//! `/api/photos/match` (or calling `/image/metadata` lazily). Don't
//! bake camera/EXIF metadata into this route; it would force a fan-out
//! per result and balloon the response.
use crate::AppState;
use crate::ai::clip_client::ClipError;
use crate::database::ExifDao;
use actix_web::{HttpResponse, Result as ActixResult, web};
use base64::Engine;
use serde::{Deserialize, Serialize};
use std::sync::Mutex;
#[derive(Debug, Deserialize)]
pub struct SearchQuery {
/// Natural-language query. Required; empty triggers 400.
pub q: String,
/// Max results to return in this page. Capped to 200 server-side.
/// Defaults to 20. Pair with `offset` for pagination.
#[serde(default = "default_limit")]
pub limit: usize,
/// Zero-based offset into the sorted-and-filtered result set. The
/// scoring loop still runs over the full embedding matrix on every
/// page (cheap at personal-library scale — sub-100ms — and avoids
/// stateful pagination cursors). Defaults to 0.
#[serde(default)]
pub offset: usize,
/// Cosine-similarity floor below which results are dropped.
/// 0.20 is the rough "this is plausibly relevant" line for OpenAI
/// CLIP; tunable per call when sweeping. Defaults to 0.20.
#[serde(default = "default_threshold")]
pub threshold: f32,
/// Optional single-library scope. When omitted, every enabled
/// library is searched. Multi-select isn't supported yet — the
/// frontend wires through one or all.
pub library: Option<i32>,
/// Optional model-version filter. Defaults to the live engine's
/// version (queried lazily). Forces a strict join so mid-flight
/// model swaps can't mix geometries in a single response.
#[serde(default)]
pub model_version: Option<String>,
}
fn default_limit() -> usize {
20
}
fn default_threshold() -> f32 {
0.20
}
#[derive(Debug, Serialize)]
pub struct SearchHit {
pub library_id: i32,
pub rel_path: String,
pub content_hash: String,
/// Cosine similarity in [-1, 1]. In practice OpenAI CLIP returns
/// 0.100.40 for the typical photo library.
pub score: f32,
}
#[derive(Debug, Serialize)]
pub struct SearchResponse {
pub query: String,
pub model_version: String,
pub threshold: f32,
/// Total embeddings scored (= every photo in scope with a stored
/// embedding). Same value across pages of the same query.
pub considered: usize,
/// Count of results above threshold, before pagination. Lets the
/// client decide whether a "Load more" button is meaningful and
/// stop fetching when ``offset + results.len() >= total_matching``.
pub total_matching: usize,
pub offset: usize,
pub results: Vec<SearchHit>,
}
#[derive(Debug, Serialize)]
struct SearchError {
error: String,
}
/// Decode a stored `clip_embedding` BLOB back into a `Vec<f32>`. Returns
/// `None` on malformed bytes — those rows get skipped rather than
/// failing the whole query.
fn decode_embedding(bytes: &[u8]) -> Option<Vec<f32>> {
if bytes.is_empty() || bytes.len() % 4 != 0 {
return None;
}
let mut out = Vec::with_capacity(bytes.len() / 4);
for chunk in bytes.chunks_exact(4) {
out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
}
Some(out)
}
#[inline]
fn dot(a: &[f32], b: &[f32]) -> f32 {
a.iter().zip(b.iter()).map(|(x, y)| x * y).sum()
}
pub async fn search_photos(
state: web::Data<AppState>,
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
query: web::Query<SearchQuery>,
) -> ActixResult<HttpResponse> {
let q_text = query.q.trim().to_string();
if q_text.is_empty() {
return Ok(HttpResponse::BadRequest().json(SearchError {
error: "query parameter `q` is required".into(),
}));
}
if !state.clip_client.is_enabled() {
return Ok(HttpResponse::ServiceUnavailable().json(SearchError {
error: "CLIP search is disabled (no Apollo CLIP endpoint configured)".into(),
}));
}
let limit = query.limit.clamp(1, 200);
let offset = query.offset;
let threshold = query.threshold.clamp(-1.0, 1.0);
// 1. Encode the query text. Fast — Apollo's text encoder is ~50ms
// on CPU. Bail with a clear error message if Apollo's down so the
// user sees "service unavailable" rather than empty results.
let query_resp = match state.clip_client.encode_text(&q_text).await {
Ok(r) => r,
Err(ClipError::Permanent(e)) => {
return Ok(HttpResponse::BadRequest().json(SearchError {
error: format!("query rejected: {e}"),
}));
}
Err(ClipError::Transient(e)) => {
return Ok(HttpResponse::BadGateway().json(SearchError {
error: format!("CLIP service unavailable: {e}"),
}));
}
Err(ClipError::Disabled) => {
return Ok(HttpResponse::ServiceUnavailable().json(SearchError {
error: "CLIP service disabled".into(),
}));
}
};
// decode_embedding works on raw bytes; the wire format is b64.
let query_bytes = base64::engine::general_purpose::STANDARD
.decode(query_resp.embedding.as_bytes())
.unwrap_or_default();
let query_vec = match decode_embedding(&query_bytes) {
Some(v) => v,
None => {
return Ok(HttpResponse::BadGateway().json(SearchError {
error: "CLIP service returned a malformed query embedding".into(),
}));
}
};
// 2. Decide which library scope to search.
let library_ids: Vec<i32> = match query.library {
Some(id) => vec![id],
None => Vec::new(), // empty = all libraries
};
// 3. Pull the (hash, embedding) matrix. Lock contention here is
// bounded — one big SELECT under a mutex Arc<Mutex<dyn ExifDao>>
// and then we release before scoring. If this becomes a hotspot
// we'll cache the decoded matrix in AppState with TTL.
let ctx = opentelemetry::Context::current();
let rows: Vec<(String, Vec<u8>)> = {
let mut dao = exif_dao.lock().expect("exif dao");
match dao.list_clip_index(
&ctx,
&library_ids,
query
.model_version
.as_deref()
.or(Some(&query_resp.model_version)),
) {
Ok(r) => r,
Err(e) => {
log::warn!("clip_search: list_clip_index failed: {:?}", e);
return Ok(HttpResponse::InternalServerError().json(SearchError {
error: "failed to load search index".into(),
}));
}
}
};
let considered = rows.len();
if considered == 0 {
return Ok(HttpResponse::Ok().json(SearchResponse {
query: q_text,
model_version: query_resp.model_version,
threshold,
considered,
total_matching: 0,
offset,
results: Vec::new(),
}));
}
// 4. Score. Cap the loop's transient allocation; we keep all scores
// and sort at the end. With ~14k entries the sort is microseconds.
let mut scored: Vec<(f32, String)> = Vec::with_capacity(considered);
for (hash, blob) in rows {
let Some(emb) = decode_embedding(&blob) else {
continue;
};
if emb.len() != query_vec.len() {
continue;
}
let sim = dot(&emb, &query_vec);
if sim < threshold {
continue;
}
scored.push((sim, hash));
}
scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
let total_matching = scored.len();
// Pagination — slice the sorted list at `[offset, offset+limit)`.
// Offsets past the end produce empty pages rather than an error so
// the client can stop fetching naturally on "load more" past the end.
let scored: Vec<(f32, String)> = if offset >= total_matching {
Vec::new()
} else {
let end = (offset + limit).min(total_matching);
scored[offset..end].to_vec()
};
if scored.is_empty() {
return Ok(HttpResponse::Ok().json(SearchResponse {
query: q_text,
model_version: query_resp.model_version,
threshold,
considered,
total_matching,
offset,
results: Vec::new(),
}));
}
// 5. Resolve each surviving hash back to a `(library_id, rel_path)`.
// `get_rel_paths_by_hash` returns every rel_path; we pick the first
// one for the result. Apollo / the UI can fetch alternatives via
// /image/metadata when needed.
let hashes: Vec<String> = scored.iter().map(|(_, h)| h.clone()).collect();
let path_map = {
let mut dao = exif_dao.lock().expect("exif dao");
match dao.get_rel_paths_for_hashes(&ctx, &hashes) {
Ok(m) => m,
Err(e) => {
log::warn!("clip_search: get_rel_paths_for_hashes failed: {:?}", e);
return Ok(HttpResponse::InternalServerError().json(SearchError {
error: "failed to resolve photo paths".into(),
}));
}
}
};
// We need (library_id, rel_path) — get_rel_paths_for_hashes only
// returns rel_paths. Cross-reference via find_by_content_hash to
// pick the library too. Single call per surviving hash; cheap at
// top-20.
let mut results = Vec::with_capacity(scored.len());
{
let mut dao = exif_dao.lock().expect("exif dao");
for (score, hash) in scored {
let row = match dao.find_by_content_hash(&ctx, &hash) {
Ok(Some(r)) => r,
Ok(None) => continue,
Err(e) => {
log::warn!(
"clip_search: find_by_content_hash failed for {}: {:?}",
hash,
e
);
continue;
}
};
// Prefer get_rel_paths_for_hashes's first entry if it
// exists (it shares semantics with `image_exif`'s natural
// order), falling back to the ImageExif row.
let rel_path = path_map
.get(&hash)
.and_then(|paths| paths.first().cloned())
.unwrap_or(row.file_path);
results.push(SearchHit {
library_id: row.library_id,
rel_path,
content_hash: hash,
score,
});
}
}
Ok(HttpResponse::Ok().json(SearchResponse {
query: q_text,
model_version: query_resp.model_version,
threshold,
considered,
total_matching,
offset,
results,
}))
}

246
src/clip_watch.rs Normal file
View File

@@ -0,0 +1,246 @@
//! CLIP-encoding pass for the file watcher.
//!
//! `process_clip_backlog` in `backfill.rs` calls [`run_clip_encoding_pass`]
//! with the page of candidates returned by
//! `ExifDao::list_clip_unencoded_candidates`. We walk those, fan out K
//! parallel encode calls to Apollo, and persist the resulting embeddings
//! into `image_exif.clip_embedding` / `clip_model_version`.
//!
//! Unlike the face pipeline, CLIP has no marker rows — a permanent
//! failure (un-decodable bytes) leaves the row's `clip_embedding` NULL
//! and the drain will retry on the next tick. For personal-library
//! scale this is fine; the per-tick cap bounds the wasted work, and
//! `file_types::is_image_file` filters out videos / non-media client-
//! side so most permanent failures are decoded-but-corrupt files (rare).
//!
//! The watcher thread isn't in any pre-existing async context, so we
//! build a short-lived tokio runtime per pass and `block_on` the join
//! of K encode futures. Concurrency knob: `CLIP_ENCODE_CONCURRENCY`
//! (default 4 — lower than faces because Apollo's CLIP path doesn't
//! release the GIL between preprocess and forward as cleanly).
use crate::ai::clip_client::{ClipClient, ClipError, EncodeImageMeta};
use crate::database::ExifDao;
use crate::exif;
use crate::file_types;
use crate::libraries::Library;
use crate::memories::PathExcluder;
use log::{debug, info, warn};
use std::path::Path;
use std::sync::{Arc, Mutex};
use tokio::sync::Semaphore;
/// One file the watcher would like to CLIP-encode. Built from the DAO
/// `list_clip_unencoded_candidates` result — needs the `content_hash`
/// for traceability in Apollo's log lines, even though the embedding
/// itself is keyed on `(library_id, rel_path)` for the back-write.
#[derive(Debug, Clone)]
pub struct ClipCandidate {
pub rel_path: String,
pub content_hash: String,
}
/// Synchronous entry point. Returns once every candidate has been
/// processed (or definitively skipped). No-op when the client is
/// disabled so the caller can call unconditionally.
pub fn run_clip_encoding_pass(
library: &Library,
excluded_dirs: &[String],
clip_client: &ClipClient,
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
candidates: Vec<ClipCandidate>,
) {
if !clip_client.is_enabled() {
return;
}
if candidates.is_empty() {
return;
}
let base = Path::new(&library.root_path);
let filtered = filter_excluded(base, excluded_dirs, candidates, Some(&library.name));
if filtered.is_empty() {
return;
}
let concurrency: usize = std::env::var("CLIP_ENCODE_CONCURRENCY")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &usize| *n > 0)
.unwrap_or(4);
info!(
"clip_watch: encoding {} candidate(s) for library '{}' (concurrency {})",
filtered.len(),
library.name,
concurrency
);
let rt = match tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
warn!("clip_watch: failed to build tokio runtime: {e}");
return;
}
};
let library_id = library.id;
let library_root = library.root_path.clone();
rt.block_on(async move {
let sem = Arc::new(Semaphore::new(concurrency));
let mut handles = Vec::with_capacity(filtered.len());
for cand in filtered {
let permit_sem = sem.clone();
let clip_client = clip_client.clone();
let exif_dao = exif_dao.clone();
let library_root = library_root.clone();
handles.push(tokio::spawn(async move {
let _permit = permit_sem.acquire().await.expect("clip semaphore");
process_one(library_id, &library_root, cand, &clip_client, exif_dao).await;
}));
}
for h in handles {
let _ = h.await;
}
});
}
async fn process_one(
library_id: i32,
library_root: &str,
cand: ClipCandidate,
clip_client: &ClipClient,
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
) {
let abs = Path::new(library_root).join(&cand.rel_path);
let bytes = match read_image_bytes_for_encode(&abs) {
Ok(b) => b,
Err(e) => {
// Same rationale as face_watch: don't mark — the file may
// have been moved/renamed mid-scan; let the next pass retry.
warn!(
"clip_watch: read failed for {} (lib {}): {}",
cand.rel_path, library_id, e
);
return;
}
};
let meta = EncodeImageMeta {
content_hash: cand.content_hash.clone(),
library_id,
rel_path: cand.rel_path.clone(),
};
let ctx = opentelemetry::Context::current();
match clip_client.encode_image(bytes, meta).await {
Ok(resp) => {
let emb_bytes = match resp.decode_embedding() {
Ok(b) => b,
Err(e) => {
warn!("clip_watch: bad embedding for {}: {:?}", cand.rel_path, e);
return;
}
};
let mut dao = exif_dao.lock().expect("exif dao");
if let Err(e) = dao.backfill_clip_embedding(
&ctx,
library_id,
&cand.rel_path,
&emb_bytes,
&resp.model_version,
) {
warn!(
"clip_watch: backfill_clip_embedding failed for {}: {:?}",
cand.rel_path, e
);
return;
}
debug!(
"clip_watch: {} → dim={} ({}ms, {})",
cand.rel_path, resp.embedding_dim, resp.duration_ms, resp.model_version
);
}
Err(ClipError::Permanent(e)) => {
// No marker — the row sits with NULL embedding and the drain
// retries next pass. For personal-library scale the cost of
// re-attempting permanently-broken files is bounded by the
// per-tick cap. If this becomes a recurring noise source,
// add a `clip_status` column with `failed` semantics like
// face_detections has.
warn!(
"clip_watch: permanent failure on {} (will retry next pass): {}",
cand.rel_path, e
);
}
Err(ClipError::Transient(e)) => {
debug!(
"clip_watch: transient on {}: {} (will retry next pass)",
cand.rel_path, e
);
}
Err(ClipError::Disabled) => {
// Defensive — the entry-point already checked is_enabled().
}
}
}
/// Drop candidates whose paths land in an excluded dir or whose
/// extension isn't an image. Mirrors `face_watch::filter_excluded` so
/// the two backlogs stay shape-consistent. Library name is passed
/// purely for the log line that surfaces an exclusion hit.
pub fn filter_excluded(
base: &Path,
excluded_dirs: &[String],
candidates: Vec<ClipCandidate>,
library_name: Option<&str>,
) -> Vec<ClipCandidate> {
let excluder = if excluded_dirs.is_empty() {
None
} else {
Some(PathExcluder::new(base, excluded_dirs))
};
candidates
.into_iter()
.filter(|c| {
let abs = base.join(&c.rel_path);
if !file_types::is_image_file(&abs) {
debug!(
"clip_watch: skipping non-image '{}' (lib {})",
c.rel_path,
library_name.unwrap_or("<unknown>")
);
return false;
}
if let Some(ex) = excluder.as_ref()
&& ex.is_excluded(&abs)
{
debug!(
"clip_watch: skipping excluded '{}' (lib {})",
c.rel_path,
library_name.unwrap_or("<unknown>")
);
return false;
}
true
})
.collect()
}
/// Read image bytes for CLIP encoding. Same logic as
/// `face_watch::read_image_bytes_for_detect` — RAW / HEIC files don't
/// decode in Apollo's PIL pipeline, so we pull the embedded JPEG
/// preview the thumbnail pipeline already extracts. Plain JPEG / PNG /
/// WebP go through a direct read.
pub fn read_image_bytes_for_encode(path: &Path) -> std::io::Result<Vec<u8>> {
if file_types::needs_ffmpeg_thumbnail(path)
&& let Some(preview) = exif::extract_embedded_jpeg_preview(path)
{
return Ok(preview);
}
std::fs::read(path)
}

View File

@@ -470,6 +470,61 @@ pub trait ExifDao: Sync + Send {
source: &str,
) -> Result<(), DbError>;
/// Find image_exif rows needing a CLIP embedding for semantic search:
/// `clip_embedding IS NULL AND content_hash IS NOT NULL`, ordered by id
/// ASC, limited. Hash-less rows wait for `backfill_unhashed_backlog` to
/// hash them first — embedding a row we can't key on bytes is wasted
/// work that the next library/move detection would invalidate. Backed
/// by the partial index `idx_image_exif_clip_backfill`.
///
/// Returns `(rel_path, content_hash)` for the given library only. Video
/// rows are returned too (the underlying anti-join is shape-uniform);
/// the caller filters them out via `file_types::is_image_file` before
/// sending to Apollo, mirroring `face_watch::filter_excluded`.
///
/// **Model upgrades** (re-encoding everything on a new
/// `APOLLO_CLIP_MODEL`) are handled out-of-band — run
/// `UPDATE image_exif SET clip_embedding = NULL
/// WHERE clip_model_version != '<new model>';`
/// and the drain picks up the freshly-nulled rows on the next tick.
/// Mixing in-flight model versions in a single query is intentionally
/// not the drain's problem.
fn list_clip_unencoded_candidates(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
limit: i64,
) -> Result<Vec<(String, String)>, DbError>;
/// Persist a CLIP embedding for an existing row. Touches
/// `clip_embedding` and `clip_model_version` only — leaves every
/// other column alone so the drain can't accidentally clobber EXIF /
/// hash / date-resolver state that other paths have written.
fn backfill_clip_embedding(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
rel_path: &str,
embedding: &[u8],
model_version: &str,
) -> Result<(), DbError>;
/// Load every `(content_hash, clip_embedding)` pair from the live
/// image_exif rows for the given libraries, optionally filtered to a
/// single `model_version` (cosine sim across mixed geometries is
/// meaningless). Used by `/photos/search` to rerank against the query
/// embedding in-memory.
///
/// Returns one pair per content_hash. If a hash appears under more
/// than one library, the first row wins (Diesel's natural ORDER BY id
/// ASC). Hash-less and embedding-less rows are filtered server-side.
fn list_clip_index(
&mut self,
context: &opentelemetry::Context,
library_ids: &[i32],
model_version: Option<&str>,
) -> Result<Vec<(String, Vec<u8>)>, DbError>;
/// Operator-driven date_taken override (POST /image/exif/date). Snapshots
/// the prior `(date_taken, date_taken_source)` into the `original_*`
/// pair on first override, then writes the new value with
@@ -1387,6 +1442,146 @@ impl ExifDao for SqliteExifDao {
})
}
fn list_clip_unencoded_candidates(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
limit: i64,
) -> Result<Vec<(String, String)>, DbError> {
trace_db_call(
context,
"query",
"list_clip_unencoded_candidates",
|_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// Partial index `idx_image_exif_clip_backfill` covers the
// (clip_embedding IS NULL AND content_hash IS NOT NULL)
// filter; the planner hits it directly. ORDER BY id ASC
// keeps drain progress monotone across ticks.
image_exif
.filter(library_id.eq(library_id_val))
.filter(clip_embedding.is_null())
.filter(content_hash.is_not_null())
.select((rel_path, content_hash.assume_not_null()))
.order(id.asc())
.limit(limit)
.load::<(String, String)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
},
)
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn backfill_clip_embedding(
&mut self,
context: &opentelemetry::Context,
library_id_val: i32,
rel_path_val: &str,
embedding: &[u8],
model_version: &str,
) -> Result<(), DbError> {
trace_db_call(context, "update", "backfill_clip_embedding", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
let result = diesel::update(
image_exif
.filter(library_id.eq(library_id_val))
.filter(rel_path.eq(rel_path_val)),
)
.set((
clip_embedding.eq(embedding),
clip_model_version.eq(model_version),
))
.execute(connection.deref_mut());
match result {
Ok(rows) => {
if rows == 0 {
// Same race as backfill_date_taken — row vanished
// between the candidate query and this write. Not
// a hard error; the drain re-scans next tick.
log::debug!(
"backfill_clip_embedding: 0 rows matched lib={} {} \
(row likely retired by missing-file scan)",
library_id_val,
rel_path_val
);
}
Ok(())
}
Err(e) => Err(anyhow::anyhow!(
"diesel update failed (lib={}, rel_path={}, model={}): {}",
library_id_val,
rel_path_val,
model_version,
e
)),
}
})
.map_err(|e| {
log::warn!("backfill_clip_embedding: {}", e);
DbError::new(DbErrorKind::UpdateError)
})
}
fn list_clip_index(
&mut self,
context: &opentelemetry::Context,
library_ids_val: &[i32],
model_version_filter: Option<&str>,
) -> Result<Vec<(String, Vec<u8>)>, DbError> {
trace_db_call(context, "query", "list_clip_index", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
// Build the base filter. content_hash + clip_embedding both
// need to be present for the row to be searchable.
let mut query = image_exif
.filter(content_hash.is_not_null())
.filter(clip_embedding.is_not_null())
.into_boxed();
if !library_ids_val.is_empty() {
query = query.filter(library_id.eq_any(library_ids_val));
}
if let Some(mv) = model_version_filter {
query = query.filter(clip_model_version.eq(mv));
}
// Order by id ASC so cross-library duplicates pick the
// earliest-ingested row (stable across calls; the in-memory
// matrix gets a deterministic row order). Group-by on
// content_hash via post-filter — Diesel doesn't expose a
// clean DISTINCT ON in this query shape.
let rows: Vec<(String, Vec<u8>)> = query
.select((
content_hash.assume_not_null(),
clip_embedding.assume_not_null(),
))
.order(id.asc())
.load::<(String, Vec<u8>)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))?;
// Dedupe by hash, keeping the first occurrence. Cheap; sized
// to ~14k entries on this library.
let mut seen: std::collections::HashSet<String> =
std::collections::HashSet::with_capacity(rows.len());
let mut out = Vec::with_capacity(rows.len());
for (h, e) in rows {
if seen.insert(h.clone()) {
out.push((h, e));
}
}
Ok(out)
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn set_manual_date_taken(
&mut self,
context: &opentelemetry::Context,

View File

@@ -114,6 +114,15 @@ pub struct ImageExif {
/// Snapshot of the prior `date_taken_source` taken on first manual
/// override. NULL when no override is active.
pub original_date_taken_source: Option<String>,
/// L2-normalized CLIP image embedding (raw little-endian float32 bytes;
/// length depends on the model — 768×4 for ViT-L/14, 512×4 for ViT-B/32).
/// NULL until Apollo's CLIP service has encoded this photo via the
/// backfill drain. Used by `/photos/search` for semantic queries.
pub clip_embedding: Option<Vec<u8>>,
/// Which CLIP model produced `clip_embedding` (e.g. `"ViT-L/14"`). A
/// swap of `APOLLO_CLIP_MODEL` re-eligibilizes rows whose stored
/// version differs so the drain rebuilds them.
pub clip_model_version: Option<String>,
}
#[derive(Insertable)]

View File

@@ -138,6 +138,8 @@ diesel::table! {
date_taken_source -> Nullable<Text>,
original_date_taken -> Nullable<BigInt>,
original_date_taken_source -> Nullable<Text>,
clip_embedding -> Nullable<Binary>,
clip_model_version -> Nullable<Text>,
}
}

View File

@@ -1511,6 +1511,8 @@ mod tests {
date_taken_source,
original_date_taken: None,
original_date_taken_source: None,
clip_embedding: None,
clip_model_version: None,
}
}
@@ -1550,6 +1552,8 @@ mod tests {
date_taken_source: data.date_taken_source.clone(),
original_date_taken: None,
original_date_taken_source: None,
clip_embedding: None,
clip_model_version: None,
})
}
@@ -1596,6 +1600,8 @@ mod tests {
date_taken_source: data.date_taken_source.clone(),
original_date_taken: None,
original_date_taken_source: None,
clip_embedding: None,
clip_model_version: None,
})
}
@@ -1932,6 +1938,35 @@ mod tests {
) -> Result<(), DbError> {
Ok(())
}
fn list_clip_unencoded_candidates(
&mut self,
_context: &opentelemetry::Context,
_library_id: i32,
_limit: i64,
) -> Result<Vec<(String, String)>, DbError> {
Ok(Vec::new())
}
fn backfill_clip_embedding(
&mut self,
_context: &opentelemetry::Context,
_library_id: i32,
_rel_path: &str,
_embedding: &[u8],
_model_version: &str,
) -> Result<(), DbError> {
Ok(())
}
fn list_clip_index(
&mut self,
_context: &opentelemetry::Context,
_library_ids: &[i32],
_model_version: Option<&str>,
) -> Result<Vec<(String, Vec<u8>)>, DbError> {
Ok(Vec::new())
}
}
mod api {

View File

@@ -7,6 +7,8 @@ pub mod ai;
pub mod auth;
pub mod bin_progress;
pub mod cleanup;
pub mod clip_search;
pub mod clip_watch;
pub mod content_hash;
pub mod data;
pub mod database;

View File

@@ -31,6 +31,8 @@ use log::{error, info};
mod ai;
mod auth;
mod backfill;
mod clip_search;
mod clip_watch;
mod content_hash;
mod data;
mod database;
@@ -164,6 +166,7 @@ fn main() -> std::io::Result<()> {
playlist_mgr_for_watcher,
preview_gen_for_watcher,
app_state.face_client.clone(),
app_state.clip_client.clone(),
app_state.excluded_dirs.clone(),
app_state.library_health.clone(),
);
@@ -280,6 +283,12 @@ fn main() -> std::io::Result<()> {
.service(
web::resource("/photos/exif").route(web::get().to(files::list_exif_summary)),
)
.service(
// Semantic search via CLIP embeddings. See
// src/clip_search.rs for the request/response shape.
web::resource("/photos/search")
.route(web::get().to(clip_search::search_photos)),
)
.service(web::resource("/file/move").post(move_file::<RealFileSystem>))
.service(handlers::image::get_image)
.service(handlers::image::upload_image)

View File

@@ -1,4 +1,5 @@
use crate::ai::apollo_client::ApolloClient;
use crate::ai::clip_client::ClipClient;
use crate::ai::face_client::FaceClient;
use crate::ai::insight_chat::{ChatLockMap, InsightChatService};
use crate::ai::openrouter::OpenRouterClient;
@@ -70,6 +71,10 @@ pub struct AppState {
/// nor `APOLLO_API_BASE_URL` is set; the file-watch hook (Phase 3) and
/// manual-face-create handler short-circuit in that case.
pub face_client: FaceClient,
/// CLIP inference client (calls Apollo's `/api/internal/clip/*`).
/// Same disabled semantics as `face_client`: unset env → no-op
/// backlog drain, /photos/search returns an empty result.
pub clip_client: ClipClient,
}
impl AppState {
@@ -105,6 +110,7 @@ impl AppState {
insight_chat: Arc<InsightChatService>,
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
face_client: FaceClient,
clip_client: ClipClient,
) -> Self {
assert!(
!libraries_vec.is_empty(),
@@ -143,6 +149,7 @@ impl AppState {
insight_generator,
insight_chat,
face_client,
clip_client,
}
}
@@ -198,6 +205,9 @@ impl Default for AppState {
.or_else(|| env::var("APOLLO_API_BASE_URL").ok());
let face_client = FaceClient::new(face_client_url);
// CLIP inference client. Same env var fallback as face_client.
let clip_client = ClipClient::from_env();
// Initialize DAOs
let insight_dao: Arc<Mutex<Box<dyn InsightDao>>> =
Arc::new(Mutex::new(Box::new(SqliteInsightDao::new())));
@@ -289,6 +299,7 @@ impl Default for AppState {
insight_chat,
preview_dao,
face_client,
clip_client,
)
}
}
@@ -439,6 +450,7 @@ impl AppState {
insight_chat,
preview_dao,
FaceClient::new(None), // disabled in test
ClipClient::new(None), // disabled in test
)
}
}

View File

@@ -268,6 +268,7 @@ pub fn watch_files(
playlist_manager: Addr<VideoPlaylistManager>,
preview_generator: Addr<video::actors::PreviewClipGenerator>,
face_client: crate::ai::face_client::FaceClient,
clip_client: crate::ai::clip_client::ClipClient,
excluded_dirs: Vec<String>,
library_health: libraries::LibraryHealthMap,
) {
@@ -300,6 +301,14 @@ pub fn watch_files(
or APOLLO_API_BASE_URL to enable)"
);
}
if clip_client.is_enabled() {
info!(" CLIP semantic search: ENABLED");
} else {
info!(
" CLIP semantic search: DISABLED (set APOLLO_CLIP_API_BASE_URL \
or APOLLO_API_BASE_URL to enable)"
);
}
{
let libs = libs_lock.read().unwrap_or_else(|e| e.into_inner());
for lib in libs.iter() {
@@ -463,6 +472,21 @@ pub fn watch_files(
);
}
// CLIP embedding backlog. Independent of face detection —
// drain runs whenever CLIP is enabled, even on deploys
// that don't have the face engine wired up. Mirrors the
// face drain shape (capped per tick, no-op when disabled).
if clip_client.is_enabled() {
let context = opentelemetry::Context::new();
backfill::process_clip_backlog(
&context,
lib,
&clip_client,
&exif_dao,
&effective_excludes,
);
}
// Date-taken backfill: drain rows whose canonical date is
// either unresolved or only fs_time-sourced. Independent
// of face detection — runs even on deploys that don't