From 8d9e76cf15d1815549c3def67b43ed27317a8908 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 14 May 2026 12:54:07 -0400 Subject: [PATCH 1/5] clip-search: migration + client + probe binary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Probe-phase scaffolding for CLIP semantic search. Adds the column that will hold per-photo embeddings, the HTTP client to Apollo's inference service, and a throwaway probe binary so we can eyeball search-result quality on the live library before building the persistence layer (backlog drain, /photos/search endpoint, UI). - migrations/2026-05-14-000000_add_clip_embedding/ — adds image_exif.clip_embedding (BLOB) and clip_model_version (TEXT), plus a partial index on (clip_embedding IS NULL AND content_hash IS NOT NULL) for the future backfill drain. - src/database/models.rs — extends ImageExif struct to match. - src/ai/clip_client.rs — encode_image / encode_text / health, same Permanent/Transient/Disabled taxonomy as face_client. - src/bin/probe_clip_search.rs — --query --library N --limit M --top K. Encodes a sample and prints top-K cosine similarities. No DB writes. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../down.sql | 3 + .../up.sql | 27 ++ src/ai/clip_client.rs | 393 ++++++++++++++++++ src/ai/insight_chat.rs | 2 + src/ai/mod.rs | 1 + src/bin/probe_clip_search.rs | 268 ++++++++++++ src/database/models.rs | 9 + src/database/schema.rs | 2 + src/files.rs | 6 + 9 files changed, 711 insertions(+) create mode 100644 migrations/2026-05-14-000000_add_clip_embedding/down.sql create mode 100644 migrations/2026-05-14-000000_add_clip_embedding/up.sql create mode 100644 src/ai/clip_client.rs create mode 100644 src/bin/probe_clip_search.rs diff --git a/migrations/2026-05-14-000000_add_clip_embedding/down.sql b/migrations/2026-05-14-000000_add_clip_embedding/down.sql new file mode 100644 index 0000000..e0dc48b --- /dev/null +++ b/migrations/2026-05-14-000000_add_clip_embedding/down.sql @@ -0,0 +1,3 @@ +DROP INDEX IF EXISTS idx_image_exif_clip_backfill; +ALTER TABLE image_exif DROP COLUMN clip_model_version; +ALTER TABLE image_exif DROP COLUMN clip_embedding; diff --git a/migrations/2026-05-14-000000_add_clip_embedding/up.sql b/migrations/2026-05-14-000000_add_clip_embedding/up.sql new file mode 100644 index 0000000..c14d6da --- /dev/null +++ b/migrations/2026-05-14-000000_add_clip_embedding/up.sql @@ -0,0 +1,27 @@ +-- CLIP semantic photo search: store a per-photo image embedding so +-- text queries can rerank against the live library via cosine +-- similarity. Apollo encodes the bytes via its CLIP service; ImageApi +-- writes the resulting blob here. +-- +-- `clip_embedding` is the raw little-endian float32 buffer of an +-- L2-normalized vector (dim depends on the model — 768 bytes×4 for +-- ViT-L/14, 512 bytes×4 for ViT-B/32). Apollo always returns the +-- normalized form so the search-time dot product reduces to a plain +-- cosine similarity. +-- +-- `clip_model_version` echoes the upstream `APOLLO_CLIP_MODEL` (e.g. +-- "ViT-L/14"). A model swap shouldn't silently mix geometries — the +-- backfill drain will re-eligibilize rows whose stored model_version +-- differs from the live engine's, and the search route refuses to +-- mix rows from two model_versions in the same response. +ALTER TABLE image_exif ADD COLUMN clip_embedding BLOB; +ALTER TABLE image_exif ADD COLUMN clip_model_version TEXT; + +-- Partial index for the backfill drain. Mirrors the shape of +-- `idx_image_exif_date_backfill`: candidate rows are those with a +-- known content_hash (so we don't race the unhashed backlog) but no +-- embedding yet. SELECT cost stays O(missing rows) instead of full +-- table scan once the column is mostly populated. +CREATE INDEX IF NOT EXISTS idx_image_exif_clip_backfill + ON image_exif (id) + WHERE clip_embedding IS NULL AND content_hash IS NOT NULL; diff --git a/src/ai/clip_client.rs b/src/ai/clip_client.rs new file mode 100644 index 0000000..9b38a2a --- /dev/null +++ b/src/ai/clip_client.rs @@ -0,0 +1,393 @@ +//! Thin async HTTP client for Apollo's `/api/internal/clip/*` endpoints. +//! +//! Apollo hosts the OpenAI CLIP inference service (ViT-L/14 by default, +//! configurable via `APOLLO_CLIP_MODEL`). This client is the ImageApi side +//! of the contract: shove image bytes through `/encode_image` to populate +//! `image_exif.clip_embedding` during backfill, and call `/encode_text` to +//! encode a user's natural-language query at search time. The actual +//! cosine-similarity rerank runs locally in ImageApi. +//! +//! Mirrors `face_client.rs` / `tag_client.rs` shape: optional base URL +//! (None = disabled — feature off, drain and search no-op), reqwest +//! client with a generous timeout because GPU inference under a backlog +//! can queue server-side (Apollo's threadpool is bounded to 1 worker on +//! CUDA). +//! +//! Configured via `APOLLO_CLIP_API_BASE_URL`, falling back to +//! `APOLLO_API_BASE_URL` when the dedicated var is unset (single-Apollo +//! deploys are the common case). +//! +//! Wire format: +//! - `/encode_image`: multipart/form-data with `file=` and +//! `meta=` (content_hash / library_id / rel_path for logging). +//! - `/encode_text`: JSON `{"text": ""}`. +//! +//! Both return `{model_version, embedding_dim, duration_ms, embedding}` +//! where `embedding` is base64 of `dim×4` little-endian float32 bytes, +//! L2-normalized so the rerank reduces to a plain dot product. +//! +//! Error mapping (reflected in [`ClipError`]): +//! - 422 `decode_failed` / `empty_text` → permanent: ImageApi marks the +//! row failed or surfaces the empty-query error to the search caller. +//! - 503 `cuda_oom` / `engine_unavailable` → defer-and-retry: no marker. +//! - Any other 5xx / network error → defer. + +use anyhow::{Context, Result}; +use base64::Engine; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +#[derive(Debug, Clone, Serialize)] +pub struct EncodeImageMeta { + pub content_hash: String, + pub library_id: i32, + pub rel_path: String, +} + +#[derive(Debug, Clone, Deserialize)] +#[allow(dead_code)] // duration_ms logged by the backfill drain +pub struct EncodeResponse { + pub model_version: String, + pub embedding_dim: i32, + pub duration_ms: i64, + /// base64 of `embedding_dim * 4` bytes (LE float32). ImageApi stores + /// the decoded bytes verbatim as a BLOB. + pub embedding: String, +} + +impl EncodeResponse { + /// Decode the wire-format embedding back into raw bytes for storage. + /// Validates the buffer is `embedding_dim * 4` bytes long so a + /// malformed response surfaces here rather than as a downstream + /// silent length mismatch. + pub fn decode_embedding(&self) -> Result> { + let bytes = base64::engine::general_purpose::STANDARD + .decode(self.embedding.as_bytes()) + .context("clip embedding base64 decode")?; + let expected = (self.embedding_dim as usize) * 4; + if bytes.len() != expected { + anyhow::bail!( + "clip embedding wrong size: got {} bytes, expected {} ({} * 4)", + bytes.len(), + expected, + self.embedding_dim + ); + } + Ok(bytes) + } +} + +#[derive(Debug, Clone, Deserialize)] +#[allow(dead_code)] // load_error consumed by future health probe +pub struct ClipHealth { + pub loaded: bool, + pub device: String, + pub model_version: String, + pub embedding_dim: i32, + #[serde(default)] + pub load_error: Option, +} + +#[derive(Debug)] +pub enum ClipError { + /// Apollo refused for a reason that won't change on retry (decode + /// failure on /encode_image, empty text on /encode_text). + Permanent(anyhow::Error), + /// Apollo couldn't process this turn but might next time (CUDA OOM, + /// engine not loaded, network hiccup). + Transient(anyhow::Error), + /// Feature is disabled (no `APOLLO_CLIP_API_BASE_URL` / + /// `APOLLO_API_BASE_URL`). + Disabled, +} + +impl std::fmt::Display for ClipError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ClipError::Permanent(e) => write!(f, "permanent: {e}"), + ClipError::Transient(e) => write!(f, "transient: {e}"), + ClipError::Disabled => write!(f, "clip client disabled"), + } + } +} + +impl std::error::Error for ClipError {} + +#[derive(Clone)] +pub struct ClipClient { + client: Client, + base_url: Option, +} + +impl ClipClient { + pub fn new(base_url: Option) -> Self { + let timeout_secs = std::env::var("CLIP_REQUEST_TIMEOUT_SEC") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(60); + let client = Client::builder() + .timeout(Duration::from_secs(timeout_secs)) + .build() + .expect("reqwest client build"); + Self { + client, + base_url: base_url.map(|u| u.trim_end_matches('/').to_string()), + } + } + + /// Read both standard env vars. `APOLLO_CLIP_API_BASE_URL` wins; + /// fallback to `APOLLO_API_BASE_URL`. Both unset → disabled. + pub fn from_env() -> Self { + let base = std::env::var("APOLLO_CLIP_API_BASE_URL") + .ok() + .filter(|s| !s.trim().is_empty()) + .or_else(|| { + std::env::var("APOLLO_API_BASE_URL") + .ok() + .filter(|s| !s.trim().is_empty()) + }); + Self::new(base) + } + + pub fn is_enabled(&self) -> bool { + self.base_url.is_some() + } + + /// Encode an image to a 768-d (ViT-L/14) or 512-d (ViT-B/32) + /// L2-normalized embedding. Used by the backfill drain. + pub async fn encode_image( + &self, + bytes: Vec, + meta: EncodeImageMeta, + ) -> std::result::Result { + let Some(base) = self.base_url.as_deref() else { + return Err(ClipError::Disabled); + }; + let url = format!("{}/api/internal/clip/encode_image", base); + let meta_json = serde_json::to_string(&meta) + .map_err(|e| ClipError::Permanent(anyhow::anyhow!("meta serialize: {e}")))?; + let form = reqwest::multipart::Form::new() + .text("meta", meta_json) + .part( + "file", + reqwest::multipart::Part::bytes(bytes) + .file_name(meta.rel_path.clone()) + .mime_str("application/octet-stream") + .unwrap_or_else(|_| reqwest::multipart::Part::bytes(Vec::new())), + ); + self.send_multipart(&url, form).await + } + + /// Encode a natural-language query to an embedding. Used by the + /// search route to rank stored image embeddings by cosine sim. + pub async fn encode_text( + &self, + text: &str, + ) -> std::result::Result { + let Some(base) = self.base_url.as_deref() else { + return Err(ClipError::Disabled); + }; + let url = format!("{}/api/internal/clip/encode_text", base); + let body = serde_json::json!({ "text": text }); + + let resp = match self.client.post(&url).json(&body).send().await { + Ok(r) => r, + Err(e) if e.is_timeout() || e.is_connect() => { + return Err(ClipError::Transient(anyhow::anyhow!( + "clip client network: {e}" + ))); + } + Err(e) => { + return Err(ClipError::Transient(anyhow::anyhow!( + "clip client request: {e}" + ))); + } + }; + let status = resp.status(); + if status.is_success() { + let body: EncodeResponse = resp.json().await.map_err(|e| { + ClipError::Transient(anyhow::anyhow!("clip response decode: {e}")) + })?; + return Ok(body); + } + let body_text = resp.text().await.unwrap_or_default(); + Err(classify_error_response(status.as_u16(), &body_text)) + } + + /// Engine reachability + device/model report. Used as a startup + /// sanity check from the probe binary and (later) the backlog drain. + #[allow(dead_code)] // consumed by probe + drain + pub async fn health(&self) -> Result { + let base = self.base_url.as_deref().context("clip client disabled")?; + let url = format!("{}/api/internal/clip/health", base); + let resp = self.client.get(&url).send().await?.error_for_status()?; + let body: ClipHealth = resp.json().await?; + Ok(body) + } + + async fn send_multipart( + &self, + url: &str, + form: reqwest::multipart::Form, + ) -> std::result::Result { + let resp = match self.client.post(url).multipart(form).send().await { + Ok(r) => r, + Err(e) if e.is_timeout() || e.is_connect() => { + return Err(ClipError::Transient(anyhow::anyhow!( + "clip client network: {e}" + ))); + } + Err(e) => { + return Err(ClipError::Transient(anyhow::anyhow!( + "clip client request: {e}" + ))); + } + }; + let status = resp.status(); + if status.is_success() { + let body: EncodeResponse = resp.json().await.map_err(|e| { + ClipError::Transient(anyhow::anyhow!("clip response decode: {e}")) + })?; + return Ok(body); + } + let body_text = resp.text().await.unwrap_or_default(); + Err(classify_error_response(status.as_u16(), &body_text)) + } +} + +/// Pulled out as a pure function so the marker-row contract is unit- +/// testable without spinning up an HTTP server. Matches the shape used +/// by face_client::classify_error_response so future retry policies +/// can share code. +fn classify_error_response(status: u16, body_text: &str) -> ClipError { + let detail_code = serde_json::from_str::(body_text) + .ok() + .and_then(|v| { + v.get("detail") + .and_then(|d| d.as_str().map(str::to_string)) + .or_else(|| { + v.get("detail") + .and_then(|d| d.get("code")) + .and_then(|c| c.as_str()) + .map(str::to_string) + }) + }) + .unwrap_or_default(); + + if status == 422 { + return ClipError::Permanent(anyhow::anyhow!( + "clip {} {}: {}", + status, + detail_code, + body_text + )); + } + if status == 503 { + return ClipError::Transient(anyhow::anyhow!( + "clip {} {}: {}", + status, + detail_code, + body_text + )); + } + // 408 / 413 / 429 are operator-fixable infra issues; defer. + if matches!(status, 408 | 413 | 429) { + return ClipError::Transient(anyhow::anyhow!( + "clip {} {}: {}", + status, + detail_code, + body_text + )); + } + if (400..500).contains(&status) { + ClipError::Permanent(anyhow::anyhow!( + "clip {} {}: {}", + status, + detail_code, + body_text + )) + } else { + ClipError::Transient(anyhow::anyhow!( + "clip {} {}: {}", + status, + detail_code, + body_text + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn is_permanent(e: &ClipError) -> bool { + matches!(e, ClipError::Permanent(_)) + } + fn is_transient(e: &ClipError) -> bool { + matches!(e, ClipError::Transient(_)) + } + + #[test] + fn classify_422_decode_failed_is_permanent() { + assert!(is_permanent(&classify_error_response( + 422, + r#"{"detail":"decode_failed: bad bytes"}"# + ))); + } + + #[test] + fn classify_422_empty_text_is_permanent() { + assert!(is_permanent(&classify_error_response( + 422, + r#"{"detail":"empty_text"}"# + ))); + } + + #[test] + fn classify_503_cuda_oom_is_transient() { + assert!(is_transient(&classify_error_response( + 503, + r#"{"detail":{"code":"cuda_oom","error":"out of memory"}}"#, + ))); + } + + #[test] + fn classify_5xx_is_transient_other_4xx_is_permanent() { + assert!(is_transient(&classify_error_response(500, ""))); + assert!(is_permanent(&classify_error_response(404, "{}"))); + } + + #[test] + fn classify_infra_4xx_is_transient() { + assert!(is_transient(&classify_error_response(408, ""))); + assert!(is_transient(&classify_error_response(413, ""))); + assert!(is_transient(&classify_error_response(429, "{}"))); + } + + #[test] + fn decode_embedding_size_mismatch_errors() { + // dim=4 says we expect 16 bytes (4 floats × 4 bytes). Encode 8. + use base64::Engine; + let resp = EncodeResponse { + model_version: "ViT-L/14".into(), + embedding_dim: 4, + duration_ms: 0, + embedding: base64::engine::general_purpose::STANDARD.encode([0u8; 8]), + }; + assert!(resp.decode_embedding().is_err()); + } + + #[test] + fn decode_embedding_round_trip() { + use base64::Engine; + let bytes: Vec = (0..16).collect(); + let resp = EncodeResponse { + model_version: "ViT-L/14".into(), + embedding_dim: 4, + duration_ms: 0, + embedding: base64::engine::general_purpose::STANDARD.encode(&bytes), + }; + assert_eq!(resp.decode_embedding().unwrap(), bytes); + } +} diff --git a/src/ai/insight_chat.rs b/src/ai/insight_chat.rs index adb2da4..b2a7af8 100644 --- a/src/ai/insight_chat.rs +++ b/src/ai/insight_chat.rs @@ -2184,6 +2184,8 @@ mod tests { date_taken_source: None, original_date_taken: None, original_date_taken_source: None, + clip_embedding: None, + clip_model_version: None, }); let out = resolve_date_taken_for_context(&exif, "Screenshot_2014-06-01.png"); assert_eq!(out.as_deref(), Some("2021-08-15")); diff --git a/src/ai/mod.rs b/src/ai/mod.rs index d6fda90..3468325 100644 --- a/src/ai/mod.rs +++ b/src/ai/mod.rs @@ -1,4 +1,5 @@ pub mod apollo_client; +pub mod clip_client; pub mod daily_summary_job; pub mod face_client; pub mod handlers; diff --git a/src/bin/probe_clip_search.rs b/src/bin/probe_clip_search.rs new file mode 100644 index 0000000..45686ed --- /dev/null +++ b/src/bin/probe_clip_search.rs @@ -0,0 +1,268 @@ +//! Probe binary for CLIP semantic search. +//! +//! No DB writes. Walks a library's `image_exif` rows, encodes a sample +//! via Apollo's `/encode_image`, encodes the user's --query via +//! `/encode_text`, and prints the top-K most similar photos by cosine +//! similarity so the operator can eyeball quality before committing to +//! the persistence phase (column populated by backlog drain, search +//! endpoint, UI). +//! +//! Usage: +//! cargo run --release --bin probe_clip_search -- \ +//! --library 1 --limit 200 --query "a beach at sunset" --top 10 +//! +//! Env: standard ImageApi `.env`. Requires either +//! `APOLLO_CLIP_API_BASE_URL` or `APOLLO_API_BASE_URL` to be set. + +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; +use std::time::Instant; + +use clap::Parser; +use log::{info, warn}; + +use image_api::ai::clip_client::{ClipClient, ClipError, EncodeImageMeta}; +use image_api::database::{ExifDao, SqliteExifDao, connect}; +use image_api::exif; +use image_api::file_types; +use image_api::libraries::{self, Library}; + +#[derive(Parser, Debug)] +#[command(name = "probe_clip_search")] +#[command(about = "Top-K CLIP semantic search over a sample of image_exif rows")] +struct Args { + /// Library id to sample from. + #[arg(long)] + library: i32, + + /// Max files to encode. CPU inference is slow (~1-3 s per photo at + /// ViT-L/14); start small and grow once GPU is sorted. + #[arg(long, default_value_t = 50)] + limit: usize, + + /// Natural-language query. Empty triggers an error from Apollo. + #[arg(long)] + query: String, + + /// How many top results to print. + #[arg(long, default_value_t = 10)] + top: usize, + + /// Offset into the library's rel_path listing. + #[arg(long, default_value_t = 0)] + offset: i64, + + /// How many DB rows to scan before giving up on hitting the limit. + #[arg(long, default_value_t = 5000)] + max_scan: i64, +} + +/// Same as `face_watch::read_image_bytes_for_detect` (which is pub(crate)). +/// Inlined for the throwaway probe. +fn read_image_bytes(path: &Path) -> std::io::Result> { + if file_types::needs_ffmpeg_thumbnail(path) + && let Some(preview) = exif::extract_embedded_jpeg_preview(path) + { + return Ok(preview); + } + std::fs::read(path) +} + +/// Decode a base64'd LE float32 vector to a `Vec`. +fn decode_f32_vec(b64: &str) -> anyhow::Result> { + use base64::Engine; + let bytes = base64::engine::general_purpose::STANDARD.decode(b64.as_bytes())?; + if bytes.len() % 4 != 0 { + anyhow::bail!("embedding byte length {} not divisible by 4", bytes.len()); + } + let mut out = Vec::with_capacity(bytes.len() / 4); + for chunk in bytes.chunks_exact(4) { + out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])); + } + Ok(out) +} + +/// Plain dot product. Apollo L2-normalizes both sides, so this is cosine sim. +fn dot(a: &[f32], b: &[f32]) -> f32 { + a.iter().zip(b.iter()).map(|(x, y)| x * y).sum() +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + dotenv::dotenv().ok(); + + let args = Args::parse(); + if args.query.trim().is_empty() { + anyhow::bail!("--query must not be empty"); + } + + let client = ClipClient::from_env(); + if !client.is_enabled() { + anyhow::bail!( + "ClipClient disabled: set APOLLO_CLIP_API_BASE_URL or APOLLO_API_BASE_URL in .env" + ); + } + + match client.health().await { + Ok(h) => info!( + "clip engine: loaded={} device={} model={} dim={}", + h.loaded, h.device, h.model_version, h.embedding_dim + ), + Err(e) => warn!("health probe failed (continuing): {e}"), + } + + let mut seed_conn = connect(); + if let Some(base) = dotenv::var("BASE_PATH").ok().as_deref() { + libraries::seed_or_patch_from_env(&mut seed_conn, base); + } + let libs = libraries::load_all(&mut seed_conn); + drop(seed_conn); + let lib: Library = libs + .into_iter() + .find(|l| l.id == args.library) + .ok_or_else(|| anyhow::anyhow!("library id {} not found", args.library))?; + info!("probing library #{} ({}) at {}", lib.id, lib.name, lib.root_path); + + let dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteExifDao::new()))); + let ctx = opentelemetry::Context::new(); + + // Encode the query up-front so the long image-encode loop doesn't + // race a slow query encode. Fails fast on a misspelled query. + let query_resp = client + .encode_text(&args.query) + .await + .map_err(|e| anyhow::anyhow!("encode_text: {e}"))?; + let query_vec = decode_f32_vec(&query_resp.embedding)?; + info!( + "query encoded ({}d, {}ms): {:?}", + query_resp.embedding_dim, + query_resp.duration_ms, + args.query + ); + + // Page through (id, rel_path), filter to images on disk, encode up + // to `limit`. Each encoded photo gets scored against the query and + // kept in a top-K heap. + const PAGE: i64 = 500; + let mut offset = args.offset; + let mut scanned: i64 = 0; + let mut encoded = 0usize; + let mut perm_fail = 0usize; + let mut transient_fail = 0usize; + let root = PathBuf::from(&lib.root_path); + let started = Instant::now(); + // (similarity, rel_path) — we keep all scored results and sort at + // the end. With limit≤few-hundred this is trivial. + let mut scores: Vec<(f32, String)> = Vec::with_capacity(args.limit); + + 'outer: loop { + if scanned >= args.max_scan { + warn!( + "scan cap ({}) reached before hitting limit ({}); bump --max-scan to scan deeper", + args.max_scan, args.limit + ); + break; + } + let rows = { + let mut guard = dao.lock().expect("dao lock"); + guard + .list_rel_paths_for_library_page(&ctx, lib.id, PAGE, offset) + .map_err(|e| anyhow::anyhow!("list rel_paths: {:?}", e))? + }; + if rows.is_empty() { + info!("no more rows after offset {}", offset); + break; + } + offset += rows.len() as i64; + scanned += rows.len() as i64; + + for (_id, rel_path) in rows { + if encoded >= args.limit { + break 'outer; + } + let abs = root.join(&rel_path); + if !file_types::is_image_file(&abs) || !abs.exists() { + continue; + } + let bytes = match read_image_bytes(&abs) { + Ok(b) => b, + Err(e) => { + warn!("read {rel_path}: {e}"); + continue; + } + }; + let meta = EncodeImageMeta { + content_hash: String::new(), + library_id: lib.id, + rel_path: rel_path.clone(), + }; + let call_start = Instant::now(); + match client.encode_image(bytes, meta).await { + Ok(resp) => { + encoded += 1; + let vec = match decode_f32_vec(&resp.embedding) { + Ok(v) => v, + Err(e) => { + warn!("decode {rel_path}: {e}"); + continue; + } + }; + if vec.len() != query_vec.len() { + warn!( + "dim mismatch for {rel_path}: image={} query={}", + vec.len(), + query_vec.len() + ); + continue; + } + let sim = dot(&vec, &query_vec); + scores.push((sim, rel_path.clone())); + if encoded % 10 == 0 { + info!( + "progress: {} encoded, {:.1}s elapsed", + encoded, + started.elapsed().as_secs_f32() + ); + } + let _ = call_start; + } + Err(ClipError::Permanent(e)) => { + perm_fail += 1; + warn!("permanent encode failure for {rel_path}: {e}"); + } + Err(ClipError::Transient(e)) => { + transient_fail += 1; + warn!("transient encode failure for {rel_path}: {e}"); + } + Err(ClipError::Disabled) => { + anyhow::bail!("clip client became disabled mid-run; impossible"); + } + } + } + } + + scores.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); + let elapsed = started.elapsed(); + println!(); + println!("── top {} for query: {:?} ──", args.top.min(scores.len()), args.query); + for (i, (sim, path)) in scores.iter().take(args.top).enumerate() { + println!("[{:>2}] sim={:.3} {}", i + 1, sim, path); + } + println!(); + println!("── summary ─────────────────────────────────────"); + println!("query : {:?}", args.query); + println!("scanned rows : {scanned}"); + println!("encoded photos : {encoded}"); + println!("permanent failures : {perm_fail}"); + println!("transient failures : {transient_fail}"); + println!("elapsed : {:.1}s", elapsed.as_secs_f32()); + if encoded > 0 { + println!( + "throughput : {:.2} photos/s ({:.0}ms/photo avg)", + encoded as f32 / elapsed.as_secs_f32().max(0.001), + elapsed.as_millis() as f32 / encoded as f32 + ); + } + Ok(()) +} diff --git a/src/database/models.rs b/src/database/models.rs index b5e1c1e..9d005f5 100644 --- a/src/database/models.rs +++ b/src/database/models.rs @@ -114,6 +114,15 @@ pub struct ImageExif { /// Snapshot of the prior `date_taken_source` taken on first manual /// override. NULL when no override is active. pub original_date_taken_source: Option, + /// L2-normalized CLIP image embedding (raw little-endian float32 bytes; + /// length depends on the model — 768×4 for ViT-L/14, 512×4 for ViT-B/32). + /// NULL until Apollo's CLIP service has encoded this photo via the + /// backfill drain. Used by `/photos/search` for semantic queries. + pub clip_embedding: Option>, + /// Which CLIP model produced `clip_embedding` (e.g. `"ViT-L/14"`). A + /// swap of `APOLLO_CLIP_MODEL` re-eligibilizes rows whose stored + /// version differs so the drain rebuilds them. + pub clip_model_version: Option, } #[derive(Insertable)] diff --git a/src/database/schema.rs b/src/database/schema.rs index d001d80..28b5d26 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -138,6 +138,8 @@ diesel::table! { date_taken_source -> Nullable, original_date_taken -> Nullable, original_date_taken_source -> Nullable, + clip_embedding -> Nullable, + clip_model_version -> Nullable, } } diff --git a/src/files.rs b/src/files.rs index 91904e3..522f042 100644 --- a/src/files.rs +++ b/src/files.rs @@ -1511,6 +1511,8 @@ mod tests { date_taken_source, original_date_taken: None, original_date_taken_source: None, + clip_embedding: None, + clip_model_version: None, } } @@ -1550,6 +1552,8 @@ mod tests { date_taken_source: data.date_taken_source.clone(), original_date_taken: None, original_date_taken_source: None, + clip_embedding: None, + clip_model_version: None, }) } @@ -1596,6 +1600,8 @@ mod tests { date_taken_source: data.date_taken_source.clone(), original_date_taken: None, original_date_taken_source: None, + clip_embedding: None, + clip_model_version: None, }) } From 32195ed89ed495a890467b73dbbd504066412a15 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 14 May 2026 14:00:41 -0400 Subject: [PATCH 2/5] clip-search: backlog drain + /photos/search endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the persistence layer for CLIP semantic search. The watcher's per-tick drain encodes any image_exif row with a known content_hash but no clip_embedding via Apollo (cap CLIP_BACKLOG_MAX_PER_TICK, default 32). On a query, /photos/search encodes the text via Apollo and reranks every stored embedding in-memory. ExifDao additions: - list_clip_unencoded_candidates — partial-index scan for drain - backfill_clip_embedding — touches only the two new columns - list_clip_index — dedup'd (hash, embedding) pull for search clip_watch::run_clip_encoding_pass is the parallel fan-out — tokio runtime per pass with CLIP_ENCODE_CONCURRENCY (default 4). No marker rows for permanent failures yet; per-tick cap bounds the retry cost. /photos/search params: q, limit, threshold (default 0.20), library, model_version. Response is intentionally minimal (path + score) so the frontend joins against existing photo-metadata routes lazily. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/backfill.rs | 68 +++++++++++ src/clip_search.rs | 288 ++++++++++++++++++++++++++++++++++++++++++++ src/clip_watch.rs | 249 ++++++++++++++++++++++++++++++++++++++ src/database/mod.rs | 195 ++++++++++++++++++++++++++++++ src/files.rs | 29 +++++ src/lib.rs | 2 + src/main.rs | 9 ++ src/state.rs | 11 ++ src/watcher.rs | 24 ++++ 9 files changed, 875 insertions(+) create mode 100644 src/clip_search.rs create mode 100644 src/clip_watch.rs diff --git a/src/backfill.rs b/src/backfill.rs index ad225ae..3de017c 100644 --- a/src/backfill.rs +++ b/src/backfill.rs @@ -220,6 +220,74 @@ pub fn backfill_missing_date_taken( /// unscanned image_exif rows directly via the FaceDao anti-join and /// hands them to the existing detection pass. Runs on every tick (not /// just full scans) so the backlog moves at quick-scan cadence. +/// Per-tick CLIP encoding drain. Mirrors `process_face_backlog`: pull +/// up to `CLIP_BACKLOG_MAX_PER_TICK` candidates with a known +/// `content_hash` but no `clip_embedding`, hand them to +/// `clip_watch::run_clip_encoding_pass` for parallel fan-out, and let +/// that module write the result back via `backfill_clip_embedding`. +/// +/// Idempotent — a row stays in the candidate set until its embedding +/// lands, so a transient failure (Apollo unreachable, CUDA OOM) just +/// defers to the next tick. Permanent failures (un-decodable bytes) +/// retry every tick at this point; future Branch may add a status +/// column like face_detections has. +pub fn process_clip_backlog( + context: &opentelemetry::Context, + library: &libraries::Library, + clip_client: &crate::ai::clip_client::ClipClient, + exif_dao: &Arc>>, + excluded_dirs: &[String], +) { + if !clip_client.is_enabled() { + return; + } + let cap: i64 = dotenv::var("CLIP_BACKLOG_MAX_PER_TICK") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &i64| *n > 0) + .unwrap_or(32); + + let rows: Vec<(String, String)> = { + let mut dao = exif_dao.lock().expect("exif dao"); + match dao.list_clip_unencoded_candidates(context, library.id, cap) { + Ok(r) => r, + Err(e) => { + warn!( + "clip_watch: list_clip_unencoded_candidates failed for library '{}': {:?}", + library.name, e + ); + return; + } + } + }; + if rows.is_empty() { + return; + } + + info!( + "clip_watch: backlog drain — encoding {} candidate(s) for library '{}' (cap={})", + rows.len(), + library.name, + cap + ); + + let candidates: Vec = rows + .into_iter() + .map(|(rel_path, content_hash)| crate::clip_watch::ClipCandidate { + rel_path, + content_hash, + }) + .collect(); + + crate::clip_watch::run_clip_encoding_pass( + library, + excluded_dirs, + clip_client, + Arc::clone(exif_dao), + candidates, + ); +} + pub fn process_face_backlog( context: &opentelemetry::Context, library: &libraries::Library, diff --git a/src/clip_search.rs b/src/clip_search.rs new file mode 100644 index 0000000..a3e3f9b --- /dev/null +++ b/src/clip_search.rs @@ -0,0 +1,288 @@ +//! `/photos/search?q=` — CLIP semantic photo search. +//! +//! The route lives outside `files.rs` to keep that 1500+ line module +//! focused on EXIF / tag listing. The flow is: +//! +//! 1. Parse query params (`q`, `limit`, `threshold`, optional `library`). +//! 2. Call Apollo's `/api/internal/clip/encode_text` to get the query +//! vector (L2-normalized 768-d f32 for ViT-L/14). +//! 3. Load every `(content_hash, clip_embedding)` for the scope from +//! `image_exif` via `ExifDao::list_clip_index`. ~28–43 MB for a 14k +//! library at ViT-L/14; loaded fresh per request — fast enough for +//! v1, optimize via an AppState cache later if needed. +//! 4. Dot product (= cosine since both sides are L2-normalized), filter +//! above `threshold`, top-K by score. +//! 5. Resolve each surviving hash back to a `(library_id, rel_path)` so +//! the frontend can render the photo / hand off to the carousel. +//! +//! Response shape is intentionally minimal — paths + score — so the +//! frontend can reuse existing PhotoGrid rendering by joining against +//! `/api/photos/match` (or calling `/image/metadata` lazily). Don't +//! bake camera/EXIF metadata into this route; it would force a fan-out +//! per result and balloon the response. + +use crate::AppState; +use crate::ai::clip_client::ClipError; +use crate::database::ExifDao; +use actix_web::{HttpResponse, Result as ActixResult, web}; +use base64::Engine; +use serde::{Deserialize, Serialize}; +use std::sync::Mutex; + +#[derive(Debug, Deserialize)] +pub struct SearchQuery { + /// Natural-language query. Required; empty triggers 400. + pub q: String, + /// Max results to return. Capped to 200 server-side; the UI almost + /// always wants ≤50. Defaults to 20. + #[serde(default = "default_limit")] + pub limit: usize, + /// Cosine-similarity floor below which results are dropped. + /// 0.20 is the rough "this is plausibly relevant" line for OpenAI + /// CLIP; tunable per call when sweeping. Defaults to 0.20. + #[serde(default = "default_threshold")] + pub threshold: f32, + /// Optional single-library scope. When omitted, every enabled + /// library is searched. Multi-select isn't supported yet — the + /// frontend wires through one or all. + pub library: Option, + /// Optional model-version filter. Defaults to the live engine's + /// version (queried lazily). Forces a strict join so mid-flight + /// model swaps can't mix geometries in a single response. + #[serde(default)] + pub model_version: Option, +} + +fn default_limit() -> usize { + 20 +} + +fn default_threshold() -> f32 { + 0.20 +} + +#[derive(Debug, Serialize)] +pub struct SearchHit { + pub library_id: i32, + pub rel_path: String, + pub content_hash: String, + /// Cosine similarity in [-1, 1]. In practice OpenAI CLIP returns + /// 0.10–0.40 for the typical photo library. + pub score: f32, +} + +#[derive(Debug, Serialize)] +pub struct SearchResponse { + pub query: String, + pub model_version: String, + pub threshold: f32, + pub considered: usize, + pub results: Vec, +} + +#[derive(Debug, Serialize)] +struct SearchError { + error: String, +} + +/// Decode a stored `clip_embedding` BLOB back into a `Vec`. Returns +/// `None` on malformed bytes — those rows get skipped rather than +/// failing the whole query. +fn decode_embedding(bytes: &[u8]) -> Option> { + if bytes.is_empty() || bytes.len() % 4 != 0 { + return None; + } + let mut out = Vec::with_capacity(bytes.len() / 4); + for chunk in bytes.chunks_exact(4) { + out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])); + } + Some(out) +} + +#[inline] +fn dot(a: &[f32], b: &[f32]) -> f32 { + a.iter().zip(b.iter()).map(|(x, y)| x * y).sum() +} + +pub async fn search_photos( + state: web::Data, + exif_dao: web::Data>>, + query: web::Query, +) -> ActixResult { + let q_text = query.q.trim().to_string(); + if q_text.is_empty() { + return Ok(HttpResponse::BadRequest().json(SearchError { + error: "query parameter `q` is required".into(), + })); + } + if !state.clip_client.is_enabled() { + return Ok(HttpResponse::ServiceUnavailable().json(SearchError { + error: "CLIP search is disabled (no Apollo CLIP endpoint configured)".into(), + })); + } + + let limit = query.limit.min(200).max(1); + let threshold = query.threshold.clamp(-1.0, 1.0); + + // 1. Encode the query text. Fast — Apollo's text encoder is ~50ms + // on CPU. Bail with a clear error message if Apollo's down so the + // user sees "service unavailable" rather than empty results. + let query_resp = match state.clip_client.encode_text(&q_text).await { + Ok(r) => r, + Err(ClipError::Permanent(e)) => { + return Ok(HttpResponse::BadRequest().json(SearchError { + error: format!("query rejected: {e}"), + })); + } + Err(ClipError::Transient(e)) => { + return Ok(HttpResponse::BadGateway().json(SearchError { + error: format!("CLIP service unavailable: {e}"), + })); + } + Err(ClipError::Disabled) => { + return Ok(HttpResponse::ServiceUnavailable().json(SearchError { + error: "CLIP service disabled".into(), + })); + } + }; + // decode_embedding works on raw bytes; the wire format is b64. + let query_bytes = base64::engine::general_purpose::STANDARD + .decode(query_resp.embedding.as_bytes()) + .unwrap_or_default(); + let query_vec = match decode_embedding(&query_bytes) { + Some(v) => v, + None => { + return Ok(HttpResponse::BadGateway().json(SearchError { + error: "CLIP service returned a malformed query embedding".into(), + })); + } + }; + + // 2. Decide which library scope to search. + let library_ids: Vec = match query.library { + Some(id) => vec![id], + None => Vec::new(), // empty = all libraries + }; + + // 3. Pull the (hash, embedding) matrix. Lock contention here is + // bounded — one big SELECT under a mutex Arc> + // and then we release before scoring. If this becomes a hotspot + // we'll cache the decoded matrix in AppState with TTL. + let ctx = opentelemetry::Context::current(); + let rows: Vec<(String, Vec)> = { + let mut dao = exif_dao.lock().expect("exif dao"); + match dao.list_clip_index( + &ctx, + &library_ids, + query.model_version.as_deref().or(Some(&query_resp.model_version)), + ) { + Ok(r) => r, + Err(e) => { + log::warn!("clip_search: list_clip_index failed: {:?}", e); + return Ok(HttpResponse::InternalServerError().json(SearchError { + error: "failed to load search index".into(), + })); + } + } + }; + let considered = rows.len(); + if considered == 0 { + return Ok(HttpResponse::Ok().json(SearchResponse { + query: q_text, + model_version: query_resp.model_version, + threshold, + considered, + results: Vec::new(), + })); + } + + // 4. Score. Cap the loop's transient allocation; we keep all scores + // and sort at the end. With ~14k entries the sort is microseconds. + let mut scored: Vec<(f32, String)> = Vec::with_capacity(considered); + for (hash, blob) in rows { + let Some(emb) = decode_embedding(&blob) else { + continue; + }; + if emb.len() != query_vec.len() { + continue; + } + let sim = dot(&emb, &query_vec); + if sim < threshold { + continue; + } + scored.push((sim, hash)); + } + scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); + scored.truncate(limit); + + if scored.is_empty() { + return Ok(HttpResponse::Ok().json(SearchResponse { + query: q_text, + model_version: query_resp.model_version, + threshold, + considered, + results: Vec::new(), + })); + } + + // 5. Resolve each surviving hash back to a `(library_id, rel_path)`. + // `get_rel_paths_by_hash` returns every rel_path; we pick the first + // one for the result. Apollo / the UI can fetch alternatives via + // /image/metadata when needed. + let hashes: Vec = scored.iter().map(|(_, h)| h.clone()).collect(); + let path_map = { + let mut dao = exif_dao.lock().expect("exif dao"); + match dao.get_rel_paths_for_hashes(&ctx, &hashes) { + Ok(m) => m, + Err(e) => { + log::warn!("clip_search: get_rel_paths_for_hashes failed: {:?}", e); + return Ok(HttpResponse::InternalServerError().json(SearchError { + error: "failed to resolve photo paths".into(), + })); + } + } + }; + + // We need (library_id, rel_path) — get_rel_paths_for_hashes only + // returns rel_paths. Cross-reference via find_by_content_hash to + // pick the library too. Single call per surviving hash; cheap at + // top-20. + let mut results = Vec::with_capacity(scored.len()); + { + let mut dao = exif_dao.lock().expect("exif dao"); + for (score, hash) in scored { + let row = match dao.find_by_content_hash(&ctx, &hash) { + Ok(Some(r)) => r, + Ok(None) => continue, + Err(e) => { + log::warn!( + "clip_search: find_by_content_hash failed for {}: {:?}", + hash, e + ); + continue; + } + }; + // Prefer get_rel_paths_for_hashes's first entry if it + // exists (it shares semantics with `image_exif`'s natural + // order), falling back to the ImageExif row. + let rel_path = path_map + .get(&hash) + .and_then(|paths| paths.first().cloned()) + .unwrap_or(row.file_path); + results.push(SearchHit { + library_id: row.library_id, + rel_path, + content_hash: hash, + score, + }); + } + } + + Ok(HttpResponse::Ok().json(SearchResponse { + query: q_text, + model_version: query_resp.model_version, + threshold, + considered, + results, + })) +} diff --git a/src/clip_watch.rs b/src/clip_watch.rs new file mode 100644 index 0000000..a94e4ab --- /dev/null +++ b/src/clip_watch.rs @@ -0,0 +1,249 @@ +//! CLIP-encoding pass for the file watcher. +//! +//! `process_clip_backlog` in `backfill.rs` calls [`run_clip_encoding_pass`] +//! with the page of candidates returned by +//! `ExifDao::list_clip_unencoded_candidates`. We walk those, fan out K +//! parallel encode calls to Apollo, and persist the resulting embeddings +//! into `image_exif.clip_embedding` / `clip_model_version`. +//! +//! Unlike the face pipeline, CLIP has no marker rows — a permanent +//! failure (un-decodable bytes) leaves the row's `clip_embedding` NULL +//! and the drain will retry on the next tick. For personal-library +//! scale this is fine; the per-tick cap bounds the wasted work, and +//! `file_types::is_image_file` filters out videos / non-media client- +//! side so most permanent failures are decoded-but-corrupt files (rare). +//! +//! The watcher thread isn't in any pre-existing async context, so we +//! build a short-lived tokio runtime per pass and `block_on` the join +//! of K encode futures. Concurrency knob: `CLIP_ENCODE_CONCURRENCY` +//! (default 4 — lower than faces because Apollo's CLIP path doesn't +//! release the GIL between preprocess and forward as cleanly). + +use crate::ai::clip_client::{ClipClient, ClipError, EncodeImageMeta}; +use crate::database::ExifDao; +use crate::exif; +use crate::file_types; +use crate::libraries::Library; +use crate::memories::PathExcluder; +use log::{debug, info, warn}; +use std::path::Path; +use std::sync::{Arc, Mutex}; +use tokio::sync::Semaphore; + +/// One file the watcher would like to CLIP-encode. Built from the DAO +/// `list_clip_unencoded_candidates` result — needs the `content_hash` +/// for traceability in Apollo's log lines, even though the embedding +/// itself is keyed on `(library_id, rel_path)` for the back-write. +#[derive(Debug, Clone)] +pub struct ClipCandidate { + pub rel_path: String, + pub content_hash: String, +} + +/// Synchronous entry point. Returns once every candidate has been +/// processed (or definitively skipped). No-op when the client is +/// disabled so the caller can call unconditionally. +pub fn run_clip_encoding_pass( + library: &Library, + excluded_dirs: &[String], + clip_client: &ClipClient, + exif_dao: Arc>>, + candidates: Vec, +) { + if !clip_client.is_enabled() { + return; + } + if candidates.is_empty() { + return; + } + + let base = Path::new(&library.root_path); + let filtered = filter_excluded(base, excluded_dirs, candidates, Some(&library.name)); + if filtered.is_empty() { + return; + } + + let concurrency: usize = std::env::var("CLIP_ENCODE_CONCURRENCY") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &usize| *n > 0) + .unwrap_or(4); + + info!( + "clip_watch: encoding {} candidate(s) for library '{}' (concurrency {})", + filtered.len(), + library.name, + concurrency + ); + + let rt = match tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + warn!("clip_watch: failed to build tokio runtime: {e}"); + return; + } + }; + + let library_id = library.id; + let library_root = library.root_path.clone(); + rt.block_on(async move { + let sem = Arc::new(Semaphore::new(concurrency)); + let mut handles = Vec::with_capacity(filtered.len()); + for cand in filtered { + let permit_sem = sem.clone(); + let clip_client = clip_client.clone(); + let exif_dao = exif_dao.clone(); + let library_root = library_root.clone(); + handles.push(tokio::spawn(async move { + let _permit = permit_sem.acquire().await.expect("clip semaphore"); + process_one(library_id, &library_root, cand, &clip_client, exif_dao).await; + })); + } + for h in handles { + let _ = h.await; + } + }); +} + +async fn process_one( + library_id: i32, + library_root: &str, + cand: ClipCandidate, + clip_client: &ClipClient, + exif_dao: Arc>>, +) { + let abs = Path::new(library_root).join(&cand.rel_path); + let bytes = match read_image_bytes_for_encode(&abs) { + Ok(b) => b, + Err(e) => { + // Same rationale as face_watch: don't mark — the file may + // have been moved/renamed mid-scan; let the next pass retry. + warn!( + "clip_watch: read failed for {} (lib {}): {}", + cand.rel_path, library_id, e + ); + return; + } + }; + + let meta = EncodeImageMeta { + content_hash: cand.content_hash.clone(), + library_id, + rel_path: cand.rel_path.clone(), + }; + let ctx = opentelemetry::Context::current(); + + match clip_client.encode_image(bytes, meta).await { + Ok(resp) => { + let emb_bytes = match resp.decode_embedding() { + Ok(b) => b, + Err(e) => { + warn!( + "clip_watch: bad embedding for {}: {:?}", + cand.rel_path, e + ); + return; + } + }; + let mut dao = exif_dao.lock().expect("exif dao"); + if let Err(e) = dao.backfill_clip_embedding( + &ctx, + library_id, + &cand.rel_path, + &emb_bytes, + &resp.model_version, + ) { + warn!( + "clip_watch: backfill_clip_embedding failed for {}: {:?}", + cand.rel_path, e + ); + return; + } + debug!( + "clip_watch: {} → dim={} ({}ms, {})", + cand.rel_path, resp.embedding_dim, resp.duration_ms, resp.model_version + ); + } + Err(ClipError::Permanent(e)) => { + // No marker — the row sits with NULL embedding and the drain + // retries next pass. For personal-library scale the cost of + // re-attempting permanently-broken files is bounded by the + // per-tick cap. If this becomes a recurring noise source, + // add a `clip_status` column with `failed` semantics like + // face_detections has. + warn!( + "clip_watch: permanent failure on {} (will retry next pass): {}", + cand.rel_path, e + ); + } + Err(ClipError::Transient(e)) => { + debug!( + "clip_watch: transient on {}: {} (will retry next pass)", + cand.rel_path, e + ); + } + Err(ClipError::Disabled) => { + // Defensive — the entry-point already checked is_enabled(). + } + } +} + +/// Drop candidates whose paths land in an excluded dir or whose +/// extension isn't an image. Mirrors `face_watch::filter_excluded` so +/// the two backlogs stay shape-consistent. Library name is passed +/// purely for the log line that surfaces an exclusion hit. +pub fn filter_excluded( + base: &Path, + excluded_dirs: &[String], + candidates: Vec, + library_name: Option<&str>, +) -> Vec { + let excluder = if excluded_dirs.is_empty() { + None + } else { + Some(PathExcluder::new(base, excluded_dirs)) + }; + candidates + .into_iter() + .filter(|c| { + let abs = base.join(&c.rel_path); + if !file_types::is_image_file(&abs) { + debug!( + "clip_watch: skipping non-image '{}' (lib {})", + c.rel_path, + library_name.unwrap_or("") + ); + return false; + } + if let Some(ex) = excluder.as_ref() + && ex.is_excluded(&abs) + { + debug!( + "clip_watch: skipping excluded '{}' (lib {})", + c.rel_path, + library_name.unwrap_or("") + ); + return false; + } + true + }) + .collect() +} + +/// Read image bytes for CLIP encoding. Same logic as +/// `face_watch::read_image_bytes_for_detect` — RAW / HEIC files don't +/// decode in Apollo's PIL pipeline, so we pull the embedded JPEG +/// preview the thumbnail pipeline already extracts. Plain JPEG / PNG / +/// WebP go through a direct read. +pub fn read_image_bytes_for_encode(path: &Path) -> std::io::Result> { + if file_types::needs_ffmpeg_thumbnail(path) + && let Some(preview) = exif::extract_embedded_jpeg_preview(path) + { + return Ok(preview); + } + std::fs::read(path) +} diff --git a/src/database/mod.rs b/src/database/mod.rs index 4a20702..4488a00 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -470,6 +470,61 @@ pub trait ExifDao: Sync + Send { source: &str, ) -> Result<(), DbError>; + /// Find image_exif rows needing a CLIP embedding for semantic search: + /// `clip_embedding IS NULL AND content_hash IS NOT NULL`, ordered by id + /// ASC, limited. Hash-less rows wait for `backfill_unhashed_backlog` to + /// hash them first — embedding a row we can't key on bytes is wasted + /// work that the next library/move detection would invalidate. Backed + /// by the partial index `idx_image_exif_clip_backfill`. + /// + /// Returns `(rel_path, content_hash)` for the given library only. Video + /// rows are returned too (the underlying anti-join is shape-uniform); + /// the caller filters them out via `file_types::is_image_file` before + /// sending to Apollo, mirroring `face_watch::filter_excluded`. + /// + /// **Model upgrades** (re-encoding everything on a new + /// `APOLLO_CLIP_MODEL`) are handled out-of-band — run + /// `UPDATE image_exif SET clip_embedding = NULL + /// WHERE clip_model_version != '';` + /// and the drain picks up the freshly-nulled rows on the next tick. + /// Mixing in-flight model versions in a single query is intentionally + /// not the drain's problem. + fn list_clip_unencoded_candidates( + &mut self, + context: &opentelemetry::Context, + library_id: i32, + limit: i64, + ) -> Result, DbError>; + + /// Persist a CLIP embedding for an existing row. Touches + /// `clip_embedding` and `clip_model_version` only — leaves every + /// other column alone so the drain can't accidentally clobber EXIF / + /// hash / date-resolver state that other paths have written. + fn backfill_clip_embedding( + &mut self, + context: &opentelemetry::Context, + library_id: i32, + rel_path: &str, + embedding: &[u8], + model_version: &str, + ) -> Result<(), DbError>; + + /// Load every `(content_hash, clip_embedding)` pair from the live + /// image_exif rows for the given libraries, optionally filtered to a + /// single `model_version` (cosine sim across mixed geometries is + /// meaningless). Used by `/photos/search` to rerank against the query + /// embedding in-memory. + /// + /// Returns one pair per content_hash. If a hash appears under more + /// than one library, the first row wins (Diesel's natural ORDER BY id + /// ASC). Hash-less and embedding-less rows are filtered server-side. + fn list_clip_index( + &mut self, + context: &opentelemetry::Context, + library_ids: &[i32], + model_version: Option<&str>, + ) -> Result)>, DbError>; + /// Operator-driven date_taken override (POST /image/exif/date). Snapshots /// the prior `(date_taken, date_taken_source)` into the `original_*` /// pair on first override, then writes the new value with @@ -1387,6 +1442,146 @@ impl ExifDao for SqliteExifDao { }) } + fn list_clip_unencoded_candidates( + &mut self, + context: &opentelemetry::Context, + library_id_val: i32, + limit: i64, + ) -> Result, DbError> { + trace_db_call( + context, + "query", + "list_clip_unencoded_candidates", + |_span| { + use schema::image_exif::dsl::*; + + let mut connection = self.connection.lock().expect("Unable to get ExifDao"); + + // Partial index `idx_image_exif_clip_backfill` covers the + // (clip_embedding IS NULL AND content_hash IS NOT NULL) + // filter; the planner hits it directly. ORDER BY id ASC + // keeps drain progress monotone across ticks. + image_exif + .filter(library_id.eq(library_id_val)) + .filter(clip_embedding.is_null()) + .filter(content_hash.is_not_null()) + .select((rel_path, content_hash.assume_not_null())) + .order(id.asc()) + .limit(limit) + .load::<(String, String)>(connection.deref_mut()) + .map_err(|_| anyhow::anyhow!("Query error")) + }, + ) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn backfill_clip_embedding( + &mut self, + context: &opentelemetry::Context, + library_id_val: i32, + rel_path_val: &str, + embedding: &[u8], + model_version: &str, + ) -> Result<(), DbError> { + trace_db_call(context, "update", "backfill_clip_embedding", |_span| { + use schema::image_exif::dsl::*; + + let mut connection = self.connection.lock().expect("Unable to get ExifDao"); + + let result = diesel::update( + image_exif + .filter(library_id.eq(library_id_val)) + .filter(rel_path.eq(rel_path_val)), + ) + .set(( + clip_embedding.eq(embedding), + clip_model_version.eq(model_version), + )) + .execute(connection.deref_mut()); + + match result { + Ok(rows) => { + if rows == 0 { + // Same race as backfill_date_taken — row vanished + // between the candidate query and this write. Not + // a hard error; the drain re-scans next tick. + log::debug!( + "backfill_clip_embedding: 0 rows matched lib={} {} \ + (row likely retired by missing-file scan)", + library_id_val, + rel_path_val + ); + } + Ok(()) + } + Err(e) => Err(anyhow::anyhow!( + "diesel update failed (lib={}, rel_path={}, model={}): {}", + library_id_val, + rel_path_val, + model_version, + e + )), + } + }) + .map_err(|e| { + log::warn!("backfill_clip_embedding: {}", e); + DbError::new(DbErrorKind::UpdateError) + }) + } + + fn list_clip_index( + &mut self, + context: &opentelemetry::Context, + library_ids_val: &[i32], + model_version_filter: Option<&str>, + ) -> Result)>, DbError> { + trace_db_call(context, "query", "list_clip_index", |_span| { + use schema::image_exif::dsl::*; + + let mut connection = self.connection.lock().expect("Unable to get ExifDao"); + + // Build the base filter. content_hash + clip_embedding both + // need to be present for the row to be searchable. + let mut query = image_exif + .filter(content_hash.is_not_null()) + .filter(clip_embedding.is_not_null()) + .into_boxed(); + if !library_ids_val.is_empty() { + query = query.filter(library_id.eq_any(library_ids_val)); + } + if let Some(mv) = model_version_filter { + query = query.filter(clip_model_version.eq(mv)); + } + + // Order by id ASC so cross-library duplicates pick the + // earliest-ingested row (stable across calls; the in-memory + // matrix gets a deterministic row order). Group-by on + // content_hash via post-filter — Diesel doesn't expose a + // clean DISTINCT ON in this query shape. + let rows: Vec<(String, Vec)> = query + .select(( + content_hash.assume_not_null(), + clip_embedding.assume_not_null(), + )) + .order(id.asc()) + .load::<(String, Vec)>(connection.deref_mut()) + .map_err(|_| anyhow::anyhow!("Query error"))?; + + // Dedupe by hash, keeping the first occurrence. Cheap; sized + // to ~14k entries on this library. + let mut seen: std::collections::HashSet = + std::collections::HashSet::with_capacity(rows.len()); + let mut out = Vec::with_capacity(rows.len()); + for (h, e) in rows { + if seen.insert(h.clone()) { + out.push((h, e)); + } + } + Ok(out) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + fn set_manual_date_taken( &mut self, context: &opentelemetry::Context, diff --git a/src/files.rs b/src/files.rs index 522f042..59cd49e 100644 --- a/src/files.rs +++ b/src/files.rs @@ -1938,6 +1938,35 @@ mod tests { ) -> Result<(), DbError> { Ok(()) } + + fn list_clip_unencoded_candidates( + &mut self, + _context: &opentelemetry::Context, + _library_id: i32, + _limit: i64, + ) -> Result, DbError> { + Ok(Vec::new()) + } + + fn backfill_clip_embedding( + &mut self, + _context: &opentelemetry::Context, + _library_id: i32, + _rel_path: &str, + _embedding: &[u8], + _model_version: &str, + ) -> Result<(), DbError> { + Ok(()) + } + + fn list_clip_index( + &mut self, + _context: &opentelemetry::Context, + _library_ids: &[i32], + _model_version: Option<&str>, + ) -> Result)>, DbError> { + Ok(Vec::new()) + } } mod api { diff --git a/src/lib.rs b/src/lib.rs index 04ebc54..0ea7ddb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,8 @@ pub mod ai; pub mod auth; pub mod bin_progress; pub mod cleanup; +pub mod clip_search; +pub mod clip_watch; pub mod content_hash; pub mod data; pub mod database; diff --git a/src/main.rs b/src/main.rs index 51583c5..63013ce 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,6 +31,8 @@ use log::{error, info}; mod ai; mod auth; mod backfill; +mod clip_search; +mod clip_watch; mod content_hash; mod data; mod database; @@ -164,6 +166,7 @@ fn main() -> std::io::Result<()> { playlist_mgr_for_watcher, preview_gen_for_watcher, app_state.face_client.clone(), + app_state.clip_client.clone(), app_state.excluded_dirs.clone(), app_state.library_health.clone(), ); @@ -280,6 +283,12 @@ fn main() -> std::io::Result<()> { .service( web::resource("/photos/exif").route(web::get().to(files::list_exif_summary)), ) + .service( + // Semantic search via CLIP embeddings. See + // src/clip_search.rs for the request/response shape. + web::resource("/photos/search") + .route(web::get().to(clip_search::search_photos)), + ) .service(web::resource("/file/move").post(move_file::)) .service(handlers::image::get_image) .service(handlers::image::upload_image) diff --git a/src/state.rs b/src/state.rs index fd39cba..572a83f 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,4 +1,5 @@ use crate::ai::apollo_client::ApolloClient; +use crate::ai::clip_client::ClipClient; use crate::ai::face_client::FaceClient; use crate::ai::insight_chat::{ChatLockMap, InsightChatService}; use crate::ai::openrouter::OpenRouterClient; @@ -70,6 +71,10 @@ pub struct AppState { /// nor `APOLLO_API_BASE_URL` is set; the file-watch hook (Phase 3) and /// manual-face-create handler short-circuit in that case. pub face_client: FaceClient, + /// CLIP inference client (calls Apollo's `/api/internal/clip/*`). + /// Same disabled semantics as `face_client`: unset env → no-op + /// backlog drain, /photos/search returns an empty result. + pub clip_client: ClipClient, } impl AppState { @@ -105,6 +110,7 @@ impl AppState { insight_chat: Arc, preview_dao: Arc>>, face_client: FaceClient, + clip_client: ClipClient, ) -> Self { assert!( !libraries_vec.is_empty(), @@ -143,6 +149,7 @@ impl AppState { insight_generator, insight_chat, face_client, + clip_client, } } @@ -198,6 +205,9 @@ impl Default for AppState { .or_else(|| env::var("APOLLO_API_BASE_URL").ok()); let face_client = FaceClient::new(face_client_url); + // CLIP inference client. Same env var fallback as face_client. + let clip_client = ClipClient::from_env(); + // Initialize DAOs let insight_dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteInsightDao::new()))); @@ -289,6 +299,7 @@ impl Default for AppState { insight_chat, preview_dao, face_client, + clip_client, ) } } diff --git a/src/watcher.rs b/src/watcher.rs index 13ac1cd..39127ca 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -268,6 +268,7 @@ pub fn watch_files( playlist_manager: Addr, preview_generator: Addr, face_client: crate::ai::face_client::FaceClient, + clip_client: crate::ai::clip_client::ClipClient, excluded_dirs: Vec, library_health: libraries::LibraryHealthMap, ) { @@ -300,6 +301,14 @@ pub fn watch_files( or APOLLO_API_BASE_URL to enable)" ); } + if clip_client.is_enabled() { + info!(" CLIP semantic search: ENABLED"); + } else { + info!( + " CLIP semantic search: DISABLED (set APOLLO_CLIP_API_BASE_URL \ + or APOLLO_API_BASE_URL to enable)" + ); + } { let libs = libs_lock.read().unwrap_or_else(|e| e.into_inner()); for lib in libs.iter() { @@ -463,6 +472,21 @@ pub fn watch_files( ); } + // CLIP embedding backlog. Independent of face detection — + // drain runs whenever CLIP is enabled, even on deploys + // that don't have the face engine wired up. Mirrors the + // face drain shape (capped per tick, no-op when disabled). + if clip_client.is_enabled() { + let context = opentelemetry::Context::new(); + backfill::process_clip_backlog( + &context, + lib, + &clip_client, + &exif_dao, + &effective_excludes, + ); + } + // Date-taken backfill: drain rows whose canonical date is // either unresolved or only fs_time-sourced. Independent // of face detection — runs even on deploys that don't From 66267cc34554908199110497db477340fa324eee Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 14 May 2026 14:09:20 -0400 Subject: [PATCH 3/5] clip-search: fmt + clippy clamp + test AppState arg MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pulls cargo fmt + clippy pass over the new files only — pre-existing files left untouched even though fmt has drift on them. clamp(1,200) swaps a manual min/max chain that clippy flagged. test AppState constructor needed ClipClient::new(None) so the lib-test target compiles. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ai/clip_client.rs | 19 +++++++++---------- src/backfill.rs | 10 ++++++---- src/bin/probe_clip_search.rs | 15 ++++++++++----- src/clip_search.rs | 10 +++++++--- src/clip_watch.rs | 5 +---- src/state.rs | 1 + 6 files changed, 34 insertions(+), 26 deletions(-) diff --git a/src/ai/clip_client.rs b/src/ai/clip_client.rs index 9b38a2a..85c66a7 100644 --- a/src/ai/clip_client.rs +++ b/src/ai/clip_client.rs @@ -181,10 +181,7 @@ impl ClipClient { /// Encode a natural-language query to an embedding. Used by the /// search route to rank stored image embeddings by cosine sim. - pub async fn encode_text( - &self, - text: &str, - ) -> std::result::Result { + pub async fn encode_text(&self, text: &str) -> std::result::Result { let Some(base) = self.base_url.as_deref() else { return Err(ClipError::Disabled); }; @@ -206,9 +203,10 @@ impl ClipClient { }; let status = resp.status(); if status.is_success() { - let body: EncodeResponse = resp.json().await.map_err(|e| { - ClipError::Transient(anyhow::anyhow!("clip response decode: {e}")) - })?; + let body: EncodeResponse = resp + .json() + .await + .map_err(|e| ClipError::Transient(anyhow::anyhow!("clip response decode: {e}")))?; return Ok(body); } let body_text = resp.text().await.unwrap_or_default(); @@ -246,9 +244,10 @@ impl ClipClient { }; let status = resp.status(); if status.is_success() { - let body: EncodeResponse = resp.json().await.map_err(|e| { - ClipError::Transient(anyhow::anyhow!("clip response decode: {e}")) - })?; + let body: EncodeResponse = resp + .json() + .await + .map_err(|e| ClipError::Transient(anyhow::anyhow!("clip response decode: {e}")))?; return Ok(body); } let body_text = resp.text().await.unwrap_or_default(); diff --git a/src/backfill.rs b/src/backfill.rs index 3de017c..34baa41 100644 --- a/src/backfill.rs +++ b/src/backfill.rs @@ -273,10 +273,12 @@ pub fn process_clip_backlog( let candidates: Vec = rows .into_iter() - .map(|(rel_path, content_hash)| crate::clip_watch::ClipCandidate { - rel_path, - content_hash, - }) + .map( + |(rel_path, content_hash)| crate::clip_watch::ClipCandidate { + rel_path, + content_hash, + }, + ) .collect(); crate::clip_watch::run_clip_encoding_pass( diff --git a/src/bin/probe_clip_search.rs b/src/bin/probe_clip_search.rs index 45686ed..80d5e7f 100644 --- a/src/bin/probe_clip_search.rs +++ b/src/bin/probe_clip_search.rs @@ -122,7 +122,10 @@ async fn main() -> anyhow::Result<()> { .into_iter() .find(|l| l.id == args.library) .ok_or_else(|| anyhow::anyhow!("library id {} not found", args.library))?; - info!("probing library #{} ({}) at {}", lib.id, lib.name, lib.root_path); + info!( + "probing library #{} ({}) at {}", + lib.id, lib.name, lib.root_path + ); let dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteExifDao::new()))); let ctx = opentelemetry::Context::new(); @@ -136,9 +139,7 @@ async fn main() -> anyhow::Result<()> { let query_vec = decode_f32_vec(&query_resp.embedding)?; info!( "query encoded ({}d, {}ms): {:?}", - query_resp.embedding_dim, - query_resp.duration_ms, - args.query + query_resp.embedding_dim, query_resp.duration_ms, args.query ); // Page through (id, rel_path), filter to images on disk, encode up @@ -245,7 +246,11 @@ async fn main() -> anyhow::Result<()> { scores.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); let elapsed = started.elapsed(); println!(); - println!("── top {} for query: {:?} ──", args.top.min(scores.len()), args.query); + println!( + "── top {} for query: {:?} ──", + args.top.min(scores.len()), + args.query + ); for (i, (sim, path)) in scores.iter().take(args.top).enumerate() { println!("[{:>2}] sim={:.3} {}", i + 1, sim, path); } diff --git a/src/clip_search.rs b/src/clip_search.rs index a3e3f9b..d5a2368 100644 --- a/src/clip_search.rs +++ b/src/clip_search.rs @@ -121,7 +121,7 @@ pub async fn search_photos( })); } - let limit = query.limit.min(200).max(1); + let limit = query.limit.clamp(1, 200); let threshold = query.threshold.clamp(-1.0, 1.0); // 1. Encode the query text. Fast — Apollo's text encoder is ~50ms @@ -174,7 +174,10 @@ pub async fn search_photos( match dao.list_clip_index( &ctx, &library_ids, - query.model_version.as_deref().or(Some(&query_resp.model_version)), + query + .model_version + .as_deref() + .or(Some(&query_resp.model_version)), ) { Ok(r) => r, Err(e) => { @@ -257,7 +260,8 @@ pub async fn search_photos( Err(e) => { log::warn!( "clip_search: find_by_content_hash failed for {}: {:?}", - hash, e + hash, + e ); continue; } diff --git a/src/clip_watch.rs b/src/clip_watch.rs index a94e4ab..16a75fa 100644 --- a/src/clip_watch.rs +++ b/src/clip_watch.rs @@ -142,10 +142,7 @@ async fn process_one( let emb_bytes = match resp.decode_embedding() { Ok(b) => b, Err(e) => { - warn!( - "clip_watch: bad embedding for {}: {:?}", - cand.rel_path, e - ); + warn!("clip_watch: bad embedding for {}: {:?}", cand.rel_path, e); return; } }; diff --git a/src/state.rs b/src/state.rs index 572a83f..8f1bd4e 100644 --- a/src/state.rs +++ b/src/state.rs @@ -450,6 +450,7 @@ impl AppState { insight_chat, preview_dao, FaceClient::new(None), // disabled in test + ClipClient::new(None), // disabled in test ) } } From ee2ed3005bf69734d7ade114499a626efd6bb6e0 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 14 May 2026 14:12:51 -0400 Subject: [PATCH 4/5] clip-search: document env knobs in .env.example MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit APOLLO_CLIP_API_BASE_URL (falls back to APOLLO_API_BASE_URL), CLIP_BACKLOG_MAX_PER_TICK, CLIP_ENCODE_CONCURRENCY, and CLIP_REQUEST_TIMEOUT_SEC — all of which the code already reads. Apollo's side was documented earlier; this closes the parity gap. Co-Authored-By: Claude Opus 4.7 (1M context) --- .env.example | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/.env.example b/.env.example index b520940..718f6bd 100644 --- a/.env.example +++ b/.env.example @@ -54,10 +54,12 @@ AGENTIC_CHAT_MAX_ITERATIONS=6 # OPENROUTER_APP_TITLE=ImageApi # ── AI Insights — sibling services (optional) ─────────────────────────── -# Apollo (places + face inference). Single Apollo deploys typically set -# only APOLLO_API_BASE_URL and let the face client fall back to it. +# Apollo (places, face inference, CLIP encoders). Single-Apollo deploys +# typically set only APOLLO_API_BASE_URL and let the face + CLIP +# clients fall back to it. # APOLLO_API_BASE_URL=http://apollo.lan:8000 # APOLLO_FACE_API_BASE_URL=http://apollo.lan:8000 +# APOLLO_CLIP_API_BASE_URL=http://apollo.lan:8000 # SMS_API_URL=http://localhost:8000 # SMS_API_TOKEN= @@ -80,6 +82,23 @@ FACE_DETECT_TIMEOUT_SEC=60 FACE_BACKLOG_MAX_PER_TICK=64 FACE_HASH_BACKFILL_MAX_PER_TICK=2000 +# ── CLIP semantic photo search ────────────────────────────────────────── +# ImageApi calls Apollo's /api/internal/clip/{encode_image,encode_text} +# to populate per-photo embeddings during the watcher's backlog drain +# and to encode user queries at /photos/search time. Disabled when +# neither APOLLO_CLIP_API_BASE_URL nor APOLLO_API_BASE_URL is set. +# +# Per-watcher-tick cap on the encode drain. Default 32 ≈ ~1 photo/sec +# on CPU, ~30 photos/sec on a single-GPU host (Apollo's threadpool +# is 1 on CUDA, so concurrency is bounded server-side regardless of +# our setting). Bump on a fresh deploy to clear the backlog faster. +CLIP_BACKLOG_MAX_PER_TICK=32 +# Client-side parallel encode calls per drain pass. Apollo's GPU pool +# serializes server-side; this just overlaps file-IO with inference. +CLIP_ENCODE_CONCURRENCY=4 +# Per-encode HTTP timeout. CPU-only Apollo deploys may need higher. +CLIP_REQUEST_TIMEOUT_SEC=60 + # ── RAG / search ──────────────────────────────────────────────────────── # Set to `1` to enable cross-encoder reranking on /search results. SEARCH_RAG_RERANK=0 From 922f7df8d3a0ece0f99b45b7ba240f03b8f5f666 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Fri, 15 May 2026 16:56:10 -0400 Subject: [PATCH 5/5] clip-search: offset-based pagination on /photos/search MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `offset` query param (default 0) and `total_matching` + `offset` response fields. Backend already computes the full sorted list of above-threshold matches per query; pagination just slices it at [offset, offset+limit) instead of always returning the top window. Offsets past the end return an empty page cleanly so the client can stop fetching naturally. Re-scores on every page rather than caching the sorted list — at personal-library scale (~14k embeddings, 768d) the dot-product loop is sub-100ms and the lack of state means no eviction / staleness concerns. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/clip_search.rs | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/src/clip_search.rs b/src/clip_search.rs index d5a2368..5e8d5b9 100644 --- a/src/clip_search.rs +++ b/src/clip_search.rs @@ -33,10 +33,16 @@ use std::sync::Mutex; pub struct SearchQuery { /// Natural-language query. Required; empty triggers 400. pub q: String, - /// Max results to return. Capped to 200 server-side; the UI almost - /// always wants ≤50. Defaults to 20. + /// Max results to return in this page. Capped to 200 server-side. + /// Defaults to 20. Pair with `offset` for pagination. #[serde(default = "default_limit")] pub limit: usize, + /// Zero-based offset into the sorted-and-filtered result set. The + /// scoring loop still runs over the full embedding matrix on every + /// page (cheap at personal-library scale — sub-100ms — and avoids + /// stateful pagination cursors). Defaults to 0. + #[serde(default)] + pub offset: usize, /// Cosine-similarity floor below which results are dropped. /// 0.20 is the rough "this is plausibly relevant" line for OpenAI /// CLIP; tunable per call when sweeping. Defaults to 0.20. @@ -76,7 +82,14 @@ pub struct SearchResponse { pub query: String, pub model_version: String, pub threshold: f32, + /// Total embeddings scored (= every photo in scope with a stored + /// embedding). Same value across pages of the same query. pub considered: usize, + /// Count of results above threshold, before pagination. Lets the + /// client decide whether a "Load more" button is meaningful and + /// stop fetching when ``offset + results.len() >= total_matching``. + pub total_matching: usize, + pub offset: usize, pub results: Vec, } @@ -122,6 +135,7 @@ pub async fn search_photos( } let limit = query.limit.clamp(1, 200); + let offset = query.offset; let threshold = query.threshold.clamp(-1.0, 1.0); // 1. Encode the query text. Fast — Apollo's text encoder is ~50ms @@ -195,6 +209,8 @@ pub async fn search_photos( model_version: query_resp.model_version, threshold, considered, + total_matching: 0, + offset, results: Vec::new(), })); } @@ -216,7 +232,16 @@ pub async fn search_photos( scored.push((sim, hash)); } scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); - scored.truncate(limit); + let total_matching = scored.len(); + // Pagination — slice the sorted list at `[offset, offset+limit)`. + // Offsets past the end produce empty pages rather than an error so + // the client can stop fetching naturally on "load more" past the end. + let scored: Vec<(f32, String)> = if offset >= total_matching { + Vec::new() + } else { + let end = (offset + limit).min(total_matching); + scored[offset..end].to_vec() + }; if scored.is_empty() { return Ok(HttpResponse::Ok().json(SearchResponse { @@ -224,6 +249,8 @@ pub async fn search_photos( model_version: query_resp.model_version, threshold, considered, + total_matching, + offset, results: Vec::new(), })); } @@ -287,6 +314,8 @@ pub async fn search_photos( model_version: query_resp.model_version, threshold, considered, + total_matching, + offset, results, })) }