From 32195ed89ed495a890467b73dbbd504066412a15 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 14 May 2026 14:00:41 -0400 Subject: [PATCH] clip-search: backlog drain + /photos/search endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the persistence layer for CLIP semantic search. The watcher's per-tick drain encodes any image_exif row with a known content_hash but no clip_embedding via Apollo (cap CLIP_BACKLOG_MAX_PER_TICK, default 32). On a query, /photos/search encodes the text via Apollo and reranks every stored embedding in-memory. ExifDao additions: - list_clip_unencoded_candidates — partial-index scan for drain - backfill_clip_embedding — touches only the two new columns - list_clip_index — dedup'd (hash, embedding) pull for search clip_watch::run_clip_encoding_pass is the parallel fan-out — tokio runtime per pass with CLIP_ENCODE_CONCURRENCY (default 4). No marker rows for permanent failures yet; per-tick cap bounds the retry cost. /photos/search params: q, limit, threshold (default 0.20), library, model_version. Response is intentionally minimal (path + score) so the frontend joins against existing photo-metadata routes lazily. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/backfill.rs | 68 +++++++++++ src/clip_search.rs | 288 ++++++++++++++++++++++++++++++++++++++++++++ src/clip_watch.rs | 249 ++++++++++++++++++++++++++++++++++++++ src/database/mod.rs | 195 ++++++++++++++++++++++++++++++ src/files.rs | 29 +++++ src/lib.rs | 2 + src/main.rs | 9 ++ src/state.rs | 11 ++ src/watcher.rs | 24 ++++ 9 files changed, 875 insertions(+) create mode 100644 src/clip_search.rs create mode 100644 src/clip_watch.rs diff --git a/src/backfill.rs b/src/backfill.rs index ad225ae..3de017c 100644 --- a/src/backfill.rs +++ b/src/backfill.rs @@ -220,6 +220,74 @@ pub fn backfill_missing_date_taken( /// unscanned image_exif rows directly via the FaceDao anti-join and /// hands them to the existing detection pass. Runs on every tick (not /// just full scans) so the backlog moves at quick-scan cadence. +/// Per-tick CLIP encoding drain. Mirrors `process_face_backlog`: pull +/// up to `CLIP_BACKLOG_MAX_PER_TICK` candidates with a known +/// `content_hash` but no `clip_embedding`, hand them to +/// `clip_watch::run_clip_encoding_pass` for parallel fan-out, and let +/// that module write the result back via `backfill_clip_embedding`. +/// +/// Idempotent — a row stays in the candidate set until its embedding +/// lands, so a transient failure (Apollo unreachable, CUDA OOM) just +/// defers to the next tick. Permanent failures (un-decodable bytes) +/// retry every tick at this point; future Branch may add a status +/// column like face_detections has. +pub fn process_clip_backlog( + context: &opentelemetry::Context, + library: &libraries::Library, + clip_client: &crate::ai::clip_client::ClipClient, + exif_dao: &Arc>>, + excluded_dirs: &[String], +) { + if !clip_client.is_enabled() { + return; + } + let cap: i64 = dotenv::var("CLIP_BACKLOG_MAX_PER_TICK") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &i64| *n > 0) + .unwrap_or(32); + + let rows: Vec<(String, String)> = { + let mut dao = exif_dao.lock().expect("exif dao"); + match dao.list_clip_unencoded_candidates(context, library.id, cap) { + Ok(r) => r, + Err(e) => { + warn!( + "clip_watch: list_clip_unencoded_candidates failed for library '{}': {:?}", + library.name, e + ); + return; + } + } + }; + if rows.is_empty() { + return; + } + + info!( + "clip_watch: backlog drain — encoding {} candidate(s) for library '{}' (cap={})", + rows.len(), + library.name, + cap + ); + + let candidates: Vec = rows + .into_iter() + .map(|(rel_path, content_hash)| crate::clip_watch::ClipCandidate { + rel_path, + content_hash, + }) + .collect(); + + crate::clip_watch::run_clip_encoding_pass( + library, + excluded_dirs, + clip_client, + Arc::clone(exif_dao), + candidates, + ); +} + pub fn process_face_backlog( context: &opentelemetry::Context, library: &libraries::Library, diff --git a/src/clip_search.rs b/src/clip_search.rs new file mode 100644 index 0000000..a3e3f9b --- /dev/null +++ b/src/clip_search.rs @@ -0,0 +1,288 @@ +//! `/photos/search?q=` — CLIP semantic photo search. +//! +//! The route lives outside `files.rs` to keep that 1500+ line module +//! focused on EXIF / tag listing. The flow is: +//! +//! 1. Parse query params (`q`, `limit`, `threshold`, optional `library`). +//! 2. Call Apollo's `/api/internal/clip/encode_text` to get the query +//! vector (L2-normalized 768-d f32 for ViT-L/14). +//! 3. Load every `(content_hash, clip_embedding)` for the scope from +//! `image_exif` via `ExifDao::list_clip_index`. ~28–43 MB for a 14k +//! library at ViT-L/14; loaded fresh per request — fast enough for +//! v1, optimize via an AppState cache later if needed. +//! 4. Dot product (= cosine since both sides are L2-normalized), filter +//! above `threshold`, top-K by score. +//! 5. Resolve each surviving hash back to a `(library_id, rel_path)` so +//! the frontend can render the photo / hand off to the carousel. +//! +//! Response shape is intentionally minimal — paths + score — so the +//! frontend can reuse existing PhotoGrid rendering by joining against +//! `/api/photos/match` (or calling `/image/metadata` lazily). Don't +//! bake camera/EXIF metadata into this route; it would force a fan-out +//! per result and balloon the response. + +use crate::AppState; +use crate::ai::clip_client::ClipError; +use crate::database::ExifDao; +use actix_web::{HttpResponse, Result as ActixResult, web}; +use base64::Engine; +use serde::{Deserialize, Serialize}; +use std::sync::Mutex; + +#[derive(Debug, Deserialize)] +pub struct SearchQuery { + /// Natural-language query. Required; empty triggers 400. + pub q: String, + /// Max results to return. Capped to 200 server-side; the UI almost + /// always wants ≤50. Defaults to 20. + #[serde(default = "default_limit")] + pub limit: usize, + /// Cosine-similarity floor below which results are dropped. + /// 0.20 is the rough "this is plausibly relevant" line for OpenAI + /// CLIP; tunable per call when sweeping. Defaults to 0.20. + #[serde(default = "default_threshold")] + pub threshold: f32, + /// Optional single-library scope. When omitted, every enabled + /// library is searched. Multi-select isn't supported yet — the + /// frontend wires through one or all. + pub library: Option, + /// Optional model-version filter. Defaults to the live engine's + /// version (queried lazily). Forces a strict join so mid-flight + /// model swaps can't mix geometries in a single response. + #[serde(default)] + pub model_version: Option, +} + +fn default_limit() -> usize { + 20 +} + +fn default_threshold() -> f32 { + 0.20 +} + +#[derive(Debug, Serialize)] +pub struct SearchHit { + pub library_id: i32, + pub rel_path: String, + pub content_hash: String, + /// Cosine similarity in [-1, 1]. In practice OpenAI CLIP returns + /// 0.10–0.40 for the typical photo library. + pub score: f32, +} + +#[derive(Debug, Serialize)] +pub struct SearchResponse { + pub query: String, + pub model_version: String, + pub threshold: f32, + pub considered: usize, + pub results: Vec, +} + +#[derive(Debug, Serialize)] +struct SearchError { + error: String, +} + +/// Decode a stored `clip_embedding` BLOB back into a `Vec`. Returns +/// `None` on malformed bytes — those rows get skipped rather than +/// failing the whole query. +fn decode_embedding(bytes: &[u8]) -> Option> { + if bytes.is_empty() || bytes.len() % 4 != 0 { + return None; + } + let mut out = Vec::with_capacity(bytes.len() / 4); + for chunk in bytes.chunks_exact(4) { + out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])); + } + Some(out) +} + +#[inline] +fn dot(a: &[f32], b: &[f32]) -> f32 { + a.iter().zip(b.iter()).map(|(x, y)| x * y).sum() +} + +pub async fn search_photos( + state: web::Data, + exif_dao: web::Data>>, + query: web::Query, +) -> ActixResult { + let q_text = query.q.trim().to_string(); + if q_text.is_empty() { + return Ok(HttpResponse::BadRequest().json(SearchError { + error: "query parameter `q` is required".into(), + })); + } + if !state.clip_client.is_enabled() { + return Ok(HttpResponse::ServiceUnavailable().json(SearchError { + error: "CLIP search is disabled (no Apollo CLIP endpoint configured)".into(), + })); + } + + let limit = query.limit.min(200).max(1); + let threshold = query.threshold.clamp(-1.0, 1.0); + + // 1. Encode the query text. Fast — Apollo's text encoder is ~50ms + // on CPU. Bail with a clear error message if Apollo's down so the + // user sees "service unavailable" rather than empty results. + let query_resp = match state.clip_client.encode_text(&q_text).await { + Ok(r) => r, + Err(ClipError::Permanent(e)) => { + return Ok(HttpResponse::BadRequest().json(SearchError { + error: format!("query rejected: {e}"), + })); + } + Err(ClipError::Transient(e)) => { + return Ok(HttpResponse::BadGateway().json(SearchError { + error: format!("CLIP service unavailable: {e}"), + })); + } + Err(ClipError::Disabled) => { + return Ok(HttpResponse::ServiceUnavailable().json(SearchError { + error: "CLIP service disabled".into(), + })); + } + }; + // decode_embedding works on raw bytes; the wire format is b64. + let query_bytes = base64::engine::general_purpose::STANDARD + .decode(query_resp.embedding.as_bytes()) + .unwrap_or_default(); + let query_vec = match decode_embedding(&query_bytes) { + Some(v) => v, + None => { + return Ok(HttpResponse::BadGateway().json(SearchError { + error: "CLIP service returned a malformed query embedding".into(), + })); + } + }; + + // 2. Decide which library scope to search. + let library_ids: Vec = match query.library { + Some(id) => vec![id], + None => Vec::new(), // empty = all libraries + }; + + // 3. Pull the (hash, embedding) matrix. Lock contention here is + // bounded — one big SELECT under a mutex Arc> + // and then we release before scoring. If this becomes a hotspot + // we'll cache the decoded matrix in AppState with TTL. + let ctx = opentelemetry::Context::current(); + let rows: Vec<(String, Vec)> = { + let mut dao = exif_dao.lock().expect("exif dao"); + match dao.list_clip_index( + &ctx, + &library_ids, + query.model_version.as_deref().or(Some(&query_resp.model_version)), + ) { + Ok(r) => r, + Err(e) => { + log::warn!("clip_search: list_clip_index failed: {:?}", e); + return Ok(HttpResponse::InternalServerError().json(SearchError { + error: "failed to load search index".into(), + })); + } + } + }; + let considered = rows.len(); + if considered == 0 { + return Ok(HttpResponse::Ok().json(SearchResponse { + query: q_text, + model_version: query_resp.model_version, + threshold, + considered, + results: Vec::new(), + })); + } + + // 4. Score. Cap the loop's transient allocation; we keep all scores + // and sort at the end. With ~14k entries the sort is microseconds. + let mut scored: Vec<(f32, String)> = Vec::with_capacity(considered); + for (hash, blob) in rows { + let Some(emb) = decode_embedding(&blob) else { + continue; + }; + if emb.len() != query_vec.len() { + continue; + } + let sim = dot(&emb, &query_vec); + if sim < threshold { + continue; + } + scored.push((sim, hash)); + } + scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); + scored.truncate(limit); + + if scored.is_empty() { + return Ok(HttpResponse::Ok().json(SearchResponse { + query: q_text, + model_version: query_resp.model_version, + threshold, + considered, + results: Vec::new(), + })); + } + + // 5. Resolve each surviving hash back to a `(library_id, rel_path)`. + // `get_rel_paths_by_hash` returns every rel_path; we pick the first + // one for the result. Apollo / the UI can fetch alternatives via + // /image/metadata when needed. + let hashes: Vec = scored.iter().map(|(_, h)| h.clone()).collect(); + let path_map = { + let mut dao = exif_dao.lock().expect("exif dao"); + match dao.get_rel_paths_for_hashes(&ctx, &hashes) { + Ok(m) => m, + Err(e) => { + log::warn!("clip_search: get_rel_paths_for_hashes failed: {:?}", e); + return Ok(HttpResponse::InternalServerError().json(SearchError { + error: "failed to resolve photo paths".into(), + })); + } + } + }; + + // We need (library_id, rel_path) — get_rel_paths_for_hashes only + // returns rel_paths. Cross-reference via find_by_content_hash to + // pick the library too. Single call per surviving hash; cheap at + // top-20. + let mut results = Vec::with_capacity(scored.len()); + { + let mut dao = exif_dao.lock().expect("exif dao"); + for (score, hash) in scored { + let row = match dao.find_by_content_hash(&ctx, &hash) { + Ok(Some(r)) => r, + Ok(None) => continue, + Err(e) => { + log::warn!( + "clip_search: find_by_content_hash failed for {}: {:?}", + hash, e + ); + continue; + } + }; + // Prefer get_rel_paths_for_hashes's first entry if it + // exists (it shares semantics with `image_exif`'s natural + // order), falling back to the ImageExif row. + let rel_path = path_map + .get(&hash) + .and_then(|paths| paths.first().cloned()) + .unwrap_or(row.file_path); + results.push(SearchHit { + library_id: row.library_id, + rel_path, + content_hash: hash, + score, + }); + } + } + + Ok(HttpResponse::Ok().json(SearchResponse { + query: q_text, + model_version: query_resp.model_version, + threshold, + considered, + results, + })) +} diff --git a/src/clip_watch.rs b/src/clip_watch.rs new file mode 100644 index 0000000..a94e4ab --- /dev/null +++ b/src/clip_watch.rs @@ -0,0 +1,249 @@ +//! CLIP-encoding pass for the file watcher. +//! +//! `process_clip_backlog` in `backfill.rs` calls [`run_clip_encoding_pass`] +//! with the page of candidates returned by +//! `ExifDao::list_clip_unencoded_candidates`. We walk those, fan out K +//! parallel encode calls to Apollo, and persist the resulting embeddings +//! into `image_exif.clip_embedding` / `clip_model_version`. +//! +//! Unlike the face pipeline, CLIP has no marker rows — a permanent +//! failure (un-decodable bytes) leaves the row's `clip_embedding` NULL +//! and the drain will retry on the next tick. For personal-library +//! scale this is fine; the per-tick cap bounds the wasted work, and +//! `file_types::is_image_file` filters out videos / non-media client- +//! side so most permanent failures are decoded-but-corrupt files (rare). +//! +//! The watcher thread isn't in any pre-existing async context, so we +//! build a short-lived tokio runtime per pass and `block_on` the join +//! of K encode futures. Concurrency knob: `CLIP_ENCODE_CONCURRENCY` +//! (default 4 — lower than faces because Apollo's CLIP path doesn't +//! release the GIL between preprocess and forward as cleanly). + +use crate::ai::clip_client::{ClipClient, ClipError, EncodeImageMeta}; +use crate::database::ExifDao; +use crate::exif; +use crate::file_types; +use crate::libraries::Library; +use crate::memories::PathExcluder; +use log::{debug, info, warn}; +use std::path::Path; +use std::sync::{Arc, Mutex}; +use tokio::sync::Semaphore; + +/// One file the watcher would like to CLIP-encode. Built from the DAO +/// `list_clip_unencoded_candidates` result — needs the `content_hash` +/// for traceability in Apollo's log lines, even though the embedding +/// itself is keyed on `(library_id, rel_path)` for the back-write. +#[derive(Debug, Clone)] +pub struct ClipCandidate { + pub rel_path: String, + pub content_hash: String, +} + +/// Synchronous entry point. Returns once every candidate has been +/// processed (or definitively skipped). No-op when the client is +/// disabled so the caller can call unconditionally. +pub fn run_clip_encoding_pass( + library: &Library, + excluded_dirs: &[String], + clip_client: &ClipClient, + exif_dao: Arc>>, + candidates: Vec, +) { + if !clip_client.is_enabled() { + return; + } + if candidates.is_empty() { + return; + } + + let base = Path::new(&library.root_path); + let filtered = filter_excluded(base, excluded_dirs, candidates, Some(&library.name)); + if filtered.is_empty() { + return; + } + + let concurrency: usize = std::env::var("CLIP_ENCODE_CONCURRENCY") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|n: &usize| *n > 0) + .unwrap_or(4); + + info!( + "clip_watch: encoding {} candidate(s) for library '{}' (concurrency {})", + filtered.len(), + library.name, + concurrency + ); + + let rt = match tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + warn!("clip_watch: failed to build tokio runtime: {e}"); + return; + } + }; + + let library_id = library.id; + let library_root = library.root_path.clone(); + rt.block_on(async move { + let sem = Arc::new(Semaphore::new(concurrency)); + let mut handles = Vec::with_capacity(filtered.len()); + for cand in filtered { + let permit_sem = sem.clone(); + let clip_client = clip_client.clone(); + let exif_dao = exif_dao.clone(); + let library_root = library_root.clone(); + handles.push(tokio::spawn(async move { + let _permit = permit_sem.acquire().await.expect("clip semaphore"); + process_one(library_id, &library_root, cand, &clip_client, exif_dao).await; + })); + } + for h in handles { + let _ = h.await; + } + }); +} + +async fn process_one( + library_id: i32, + library_root: &str, + cand: ClipCandidate, + clip_client: &ClipClient, + exif_dao: Arc>>, +) { + let abs = Path::new(library_root).join(&cand.rel_path); + let bytes = match read_image_bytes_for_encode(&abs) { + Ok(b) => b, + Err(e) => { + // Same rationale as face_watch: don't mark — the file may + // have been moved/renamed mid-scan; let the next pass retry. + warn!( + "clip_watch: read failed for {} (lib {}): {}", + cand.rel_path, library_id, e + ); + return; + } + }; + + let meta = EncodeImageMeta { + content_hash: cand.content_hash.clone(), + library_id, + rel_path: cand.rel_path.clone(), + }; + let ctx = opentelemetry::Context::current(); + + match clip_client.encode_image(bytes, meta).await { + Ok(resp) => { + let emb_bytes = match resp.decode_embedding() { + Ok(b) => b, + Err(e) => { + warn!( + "clip_watch: bad embedding for {}: {:?}", + cand.rel_path, e + ); + return; + } + }; + let mut dao = exif_dao.lock().expect("exif dao"); + if let Err(e) = dao.backfill_clip_embedding( + &ctx, + library_id, + &cand.rel_path, + &emb_bytes, + &resp.model_version, + ) { + warn!( + "clip_watch: backfill_clip_embedding failed for {}: {:?}", + cand.rel_path, e + ); + return; + } + debug!( + "clip_watch: {} → dim={} ({}ms, {})", + cand.rel_path, resp.embedding_dim, resp.duration_ms, resp.model_version + ); + } + Err(ClipError::Permanent(e)) => { + // No marker — the row sits with NULL embedding and the drain + // retries next pass. For personal-library scale the cost of + // re-attempting permanently-broken files is bounded by the + // per-tick cap. If this becomes a recurring noise source, + // add a `clip_status` column with `failed` semantics like + // face_detections has. + warn!( + "clip_watch: permanent failure on {} (will retry next pass): {}", + cand.rel_path, e + ); + } + Err(ClipError::Transient(e)) => { + debug!( + "clip_watch: transient on {}: {} (will retry next pass)", + cand.rel_path, e + ); + } + Err(ClipError::Disabled) => { + // Defensive — the entry-point already checked is_enabled(). + } + } +} + +/// Drop candidates whose paths land in an excluded dir or whose +/// extension isn't an image. Mirrors `face_watch::filter_excluded` so +/// the two backlogs stay shape-consistent. Library name is passed +/// purely for the log line that surfaces an exclusion hit. +pub fn filter_excluded( + base: &Path, + excluded_dirs: &[String], + candidates: Vec, + library_name: Option<&str>, +) -> Vec { + let excluder = if excluded_dirs.is_empty() { + None + } else { + Some(PathExcluder::new(base, excluded_dirs)) + }; + candidates + .into_iter() + .filter(|c| { + let abs = base.join(&c.rel_path); + if !file_types::is_image_file(&abs) { + debug!( + "clip_watch: skipping non-image '{}' (lib {})", + c.rel_path, + library_name.unwrap_or("") + ); + return false; + } + if let Some(ex) = excluder.as_ref() + && ex.is_excluded(&abs) + { + debug!( + "clip_watch: skipping excluded '{}' (lib {})", + c.rel_path, + library_name.unwrap_or("") + ); + return false; + } + true + }) + .collect() +} + +/// Read image bytes for CLIP encoding. Same logic as +/// `face_watch::read_image_bytes_for_detect` — RAW / HEIC files don't +/// decode in Apollo's PIL pipeline, so we pull the embedded JPEG +/// preview the thumbnail pipeline already extracts. Plain JPEG / PNG / +/// WebP go through a direct read. +pub fn read_image_bytes_for_encode(path: &Path) -> std::io::Result> { + if file_types::needs_ffmpeg_thumbnail(path) + && let Some(preview) = exif::extract_embedded_jpeg_preview(path) + { + return Ok(preview); + } + std::fs::read(path) +} diff --git a/src/database/mod.rs b/src/database/mod.rs index 4a20702..4488a00 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -470,6 +470,61 @@ pub trait ExifDao: Sync + Send { source: &str, ) -> Result<(), DbError>; + /// Find image_exif rows needing a CLIP embedding for semantic search: + /// `clip_embedding IS NULL AND content_hash IS NOT NULL`, ordered by id + /// ASC, limited. Hash-less rows wait for `backfill_unhashed_backlog` to + /// hash them first — embedding a row we can't key on bytes is wasted + /// work that the next library/move detection would invalidate. Backed + /// by the partial index `idx_image_exif_clip_backfill`. + /// + /// Returns `(rel_path, content_hash)` for the given library only. Video + /// rows are returned too (the underlying anti-join is shape-uniform); + /// the caller filters them out via `file_types::is_image_file` before + /// sending to Apollo, mirroring `face_watch::filter_excluded`. + /// + /// **Model upgrades** (re-encoding everything on a new + /// `APOLLO_CLIP_MODEL`) are handled out-of-band — run + /// `UPDATE image_exif SET clip_embedding = NULL + /// WHERE clip_model_version != '';` + /// and the drain picks up the freshly-nulled rows on the next tick. + /// Mixing in-flight model versions in a single query is intentionally + /// not the drain's problem. + fn list_clip_unencoded_candidates( + &mut self, + context: &opentelemetry::Context, + library_id: i32, + limit: i64, + ) -> Result, DbError>; + + /// Persist a CLIP embedding for an existing row. Touches + /// `clip_embedding` and `clip_model_version` only — leaves every + /// other column alone so the drain can't accidentally clobber EXIF / + /// hash / date-resolver state that other paths have written. + fn backfill_clip_embedding( + &mut self, + context: &opentelemetry::Context, + library_id: i32, + rel_path: &str, + embedding: &[u8], + model_version: &str, + ) -> Result<(), DbError>; + + /// Load every `(content_hash, clip_embedding)` pair from the live + /// image_exif rows for the given libraries, optionally filtered to a + /// single `model_version` (cosine sim across mixed geometries is + /// meaningless). Used by `/photos/search` to rerank against the query + /// embedding in-memory. + /// + /// Returns one pair per content_hash. If a hash appears under more + /// than one library, the first row wins (Diesel's natural ORDER BY id + /// ASC). Hash-less and embedding-less rows are filtered server-side. + fn list_clip_index( + &mut self, + context: &opentelemetry::Context, + library_ids: &[i32], + model_version: Option<&str>, + ) -> Result)>, DbError>; + /// Operator-driven date_taken override (POST /image/exif/date). Snapshots /// the prior `(date_taken, date_taken_source)` into the `original_*` /// pair on first override, then writes the new value with @@ -1387,6 +1442,146 @@ impl ExifDao for SqliteExifDao { }) } + fn list_clip_unencoded_candidates( + &mut self, + context: &opentelemetry::Context, + library_id_val: i32, + limit: i64, + ) -> Result, DbError> { + trace_db_call( + context, + "query", + "list_clip_unencoded_candidates", + |_span| { + use schema::image_exif::dsl::*; + + let mut connection = self.connection.lock().expect("Unable to get ExifDao"); + + // Partial index `idx_image_exif_clip_backfill` covers the + // (clip_embedding IS NULL AND content_hash IS NOT NULL) + // filter; the planner hits it directly. ORDER BY id ASC + // keeps drain progress monotone across ticks. + image_exif + .filter(library_id.eq(library_id_val)) + .filter(clip_embedding.is_null()) + .filter(content_hash.is_not_null()) + .select((rel_path, content_hash.assume_not_null())) + .order(id.asc()) + .limit(limit) + .load::<(String, String)>(connection.deref_mut()) + .map_err(|_| anyhow::anyhow!("Query error")) + }, + ) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn backfill_clip_embedding( + &mut self, + context: &opentelemetry::Context, + library_id_val: i32, + rel_path_val: &str, + embedding: &[u8], + model_version: &str, + ) -> Result<(), DbError> { + trace_db_call(context, "update", "backfill_clip_embedding", |_span| { + use schema::image_exif::dsl::*; + + let mut connection = self.connection.lock().expect("Unable to get ExifDao"); + + let result = diesel::update( + image_exif + .filter(library_id.eq(library_id_val)) + .filter(rel_path.eq(rel_path_val)), + ) + .set(( + clip_embedding.eq(embedding), + clip_model_version.eq(model_version), + )) + .execute(connection.deref_mut()); + + match result { + Ok(rows) => { + if rows == 0 { + // Same race as backfill_date_taken — row vanished + // between the candidate query and this write. Not + // a hard error; the drain re-scans next tick. + log::debug!( + "backfill_clip_embedding: 0 rows matched lib={} {} \ + (row likely retired by missing-file scan)", + library_id_val, + rel_path_val + ); + } + Ok(()) + } + Err(e) => Err(anyhow::anyhow!( + "diesel update failed (lib={}, rel_path={}, model={}): {}", + library_id_val, + rel_path_val, + model_version, + e + )), + } + }) + .map_err(|e| { + log::warn!("backfill_clip_embedding: {}", e); + DbError::new(DbErrorKind::UpdateError) + }) + } + + fn list_clip_index( + &mut self, + context: &opentelemetry::Context, + library_ids_val: &[i32], + model_version_filter: Option<&str>, + ) -> Result)>, DbError> { + trace_db_call(context, "query", "list_clip_index", |_span| { + use schema::image_exif::dsl::*; + + let mut connection = self.connection.lock().expect("Unable to get ExifDao"); + + // Build the base filter. content_hash + clip_embedding both + // need to be present for the row to be searchable. + let mut query = image_exif + .filter(content_hash.is_not_null()) + .filter(clip_embedding.is_not_null()) + .into_boxed(); + if !library_ids_val.is_empty() { + query = query.filter(library_id.eq_any(library_ids_val)); + } + if let Some(mv) = model_version_filter { + query = query.filter(clip_model_version.eq(mv)); + } + + // Order by id ASC so cross-library duplicates pick the + // earliest-ingested row (stable across calls; the in-memory + // matrix gets a deterministic row order). Group-by on + // content_hash via post-filter — Diesel doesn't expose a + // clean DISTINCT ON in this query shape. + let rows: Vec<(String, Vec)> = query + .select(( + content_hash.assume_not_null(), + clip_embedding.assume_not_null(), + )) + .order(id.asc()) + .load::<(String, Vec)>(connection.deref_mut()) + .map_err(|_| anyhow::anyhow!("Query error"))?; + + // Dedupe by hash, keeping the first occurrence. Cheap; sized + // to ~14k entries on this library. + let mut seen: std::collections::HashSet = + std::collections::HashSet::with_capacity(rows.len()); + let mut out = Vec::with_capacity(rows.len()); + for (h, e) in rows { + if seen.insert(h.clone()) { + out.push((h, e)); + } + } + Ok(out) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + fn set_manual_date_taken( &mut self, context: &opentelemetry::Context, diff --git a/src/files.rs b/src/files.rs index 522f042..59cd49e 100644 --- a/src/files.rs +++ b/src/files.rs @@ -1938,6 +1938,35 @@ mod tests { ) -> Result<(), DbError> { Ok(()) } + + fn list_clip_unencoded_candidates( + &mut self, + _context: &opentelemetry::Context, + _library_id: i32, + _limit: i64, + ) -> Result, DbError> { + Ok(Vec::new()) + } + + fn backfill_clip_embedding( + &mut self, + _context: &opentelemetry::Context, + _library_id: i32, + _rel_path: &str, + _embedding: &[u8], + _model_version: &str, + ) -> Result<(), DbError> { + Ok(()) + } + + fn list_clip_index( + &mut self, + _context: &opentelemetry::Context, + _library_ids: &[i32], + _model_version: Option<&str>, + ) -> Result)>, DbError> { + Ok(Vec::new()) + } } mod api { diff --git a/src/lib.rs b/src/lib.rs index 04ebc54..0ea7ddb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,8 @@ pub mod ai; pub mod auth; pub mod bin_progress; pub mod cleanup; +pub mod clip_search; +pub mod clip_watch; pub mod content_hash; pub mod data; pub mod database; diff --git a/src/main.rs b/src/main.rs index 51583c5..63013ce 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,6 +31,8 @@ use log::{error, info}; mod ai; mod auth; mod backfill; +mod clip_search; +mod clip_watch; mod content_hash; mod data; mod database; @@ -164,6 +166,7 @@ fn main() -> std::io::Result<()> { playlist_mgr_for_watcher, preview_gen_for_watcher, app_state.face_client.clone(), + app_state.clip_client.clone(), app_state.excluded_dirs.clone(), app_state.library_health.clone(), ); @@ -280,6 +283,12 @@ fn main() -> std::io::Result<()> { .service( web::resource("/photos/exif").route(web::get().to(files::list_exif_summary)), ) + .service( + // Semantic search via CLIP embeddings. See + // src/clip_search.rs for the request/response shape. + web::resource("/photos/search") + .route(web::get().to(clip_search::search_photos)), + ) .service(web::resource("/file/move").post(move_file::)) .service(handlers::image::get_image) .service(handlers::image::upload_image) diff --git a/src/state.rs b/src/state.rs index fd39cba..572a83f 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,4 +1,5 @@ use crate::ai::apollo_client::ApolloClient; +use crate::ai::clip_client::ClipClient; use crate::ai::face_client::FaceClient; use crate::ai::insight_chat::{ChatLockMap, InsightChatService}; use crate::ai::openrouter::OpenRouterClient; @@ -70,6 +71,10 @@ pub struct AppState { /// nor `APOLLO_API_BASE_URL` is set; the file-watch hook (Phase 3) and /// manual-face-create handler short-circuit in that case. pub face_client: FaceClient, + /// CLIP inference client (calls Apollo's `/api/internal/clip/*`). + /// Same disabled semantics as `face_client`: unset env → no-op + /// backlog drain, /photos/search returns an empty result. + pub clip_client: ClipClient, } impl AppState { @@ -105,6 +110,7 @@ impl AppState { insight_chat: Arc, preview_dao: Arc>>, face_client: FaceClient, + clip_client: ClipClient, ) -> Self { assert!( !libraries_vec.is_empty(), @@ -143,6 +149,7 @@ impl AppState { insight_generator, insight_chat, face_client, + clip_client, } } @@ -198,6 +205,9 @@ impl Default for AppState { .or_else(|| env::var("APOLLO_API_BASE_URL").ok()); let face_client = FaceClient::new(face_client_url); + // CLIP inference client. Same env var fallback as face_client. + let clip_client = ClipClient::from_env(); + // Initialize DAOs let insight_dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteInsightDao::new()))); @@ -289,6 +299,7 @@ impl Default for AppState { insight_chat, preview_dao, face_client, + clip_client, ) } } diff --git a/src/watcher.rs b/src/watcher.rs index 13ac1cd..39127ca 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -268,6 +268,7 @@ pub fn watch_files( playlist_manager: Addr, preview_generator: Addr, face_client: crate::ai::face_client::FaceClient, + clip_client: crate::ai::clip_client::ClipClient, excluded_dirs: Vec, library_health: libraries::LibraryHealthMap, ) { @@ -300,6 +301,14 @@ pub fn watch_files( or APOLLO_API_BASE_URL to enable)" ); } + if clip_client.is_enabled() { + info!(" CLIP semantic search: ENABLED"); + } else { + info!( + " CLIP semantic search: DISABLED (set APOLLO_CLIP_API_BASE_URL \ + or APOLLO_API_BASE_URL to enable)" + ); + } { let libs = libs_lock.read().unwrap_or_else(|e| e.into_inner()); for lib in libs.iter() { @@ -463,6 +472,21 @@ pub fn watch_files( ); } + // CLIP embedding backlog. Independent of face detection — + // drain runs whenever CLIP is enabled, even on deploys + // that don't have the face engine wired up. Mirrors the + // face drain shape (capped per tick, no-op when disabled). + if clip_client.is_enabled() { + let context = opentelemetry::Context::new(); + backfill::process_clip_backlog( + &context, + lib, + &clip_client, + &exif_dao, + &effective_excludes, + ); + } + // Date-taken backfill: drain rows whose canonical date is // either unresolved or only fs_time-sourced. Independent // of face detection — runs even on deploys that don't