From 8d9e76cf15d1815549c3def67b43ed27317a8908 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 14 May 2026 12:54:07 -0400 Subject: [PATCH] clip-search: migration + client + probe binary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Probe-phase scaffolding for CLIP semantic search. Adds the column that will hold per-photo embeddings, the HTTP client to Apollo's inference service, and a throwaway probe binary so we can eyeball search-result quality on the live library before building the persistence layer (backlog drain, /photos/search endpoint, UI). - migrations/2026-05-14-000000_add_clip_embedding/ — adds image_exif.clip_embedding (BLOB) and clip_model_version (TEXT), plus a partial index on (clip_embedding IS NULL AND content_hash IS NOT NULL) for the future backfill drain. - src/database/models.rs — extends ImageExif struct to match. - src/ai/clip_client.rs — encode_image / encode_text / health, same Permanent/Transient/Disabled taxonomy as face_client. - src/bin/probe_clip_search.rs — --query --library N --limit M --top K. Encodes a sample and prints top-K cosine similarities. No DB writes. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../down.sql | 3 + .../up.sql | 27 ++ src/ai/clip_client.rs | 393 ++++++++++++++++++ src/ai/insight_chat.rs | 2 + src/ai/mod.rs | 1 + src/bin/probe_clip_search.rs | 268 ++++++++++++ src/database/models.rs | 9 + src/database/schema.rs | 2 + src/files.rs | 6 + 9 files changed, 711 insertions(+) create mode 100644 migrations/2026-05-14-000000_add_clip_embedding/down.sql create mode 100644 migrations/2026-05-14-000000_add_clip_embedding/up.sql create mode 100644 src/ai/clip_client.rs create mode 100644 src/bin/probe_clip_search.rs diff --git a/migrations/2026-05-14-000000_add_clip_embedding/down.sql b/migrations/2026-05-14-000000_add_clip_embedding/down.sql new file mode 100644 index 0000000..e0dc48b --- /dev/null +++ b/migrations/2026-05-14-000000_add_clip_embedding/down.sql @@ -0,0 +1,3 @@ +DROP INDEX IF EXISTS idx_image_exif_clip_backfill; +ALTER TABLE image_exif DROP COLUMN clip_model_version; +ALTER TABLE image_exif DROP COLUMN clip_embedding; diff --git a/migrations/2026-05-14-000000_add_clip_embedding/up.sql b/migrations/2026-05-14-000000_add_clip_embedding/up.sql new file mode 100644 index 0000000..c14d6da --- /dev/null +++ b/migrations/2026-05-14-000000_add_clip_embedding/up.sql @@ -0,0 +1,27 @@ +-- CLIP semantic photo search: store a per-photo image embedding so +-- text queries can rerank against the live library via cosine +-- similarity. Apollo encodes the bytes via its CLIP service; ImageApi +-- writes the resulting blob here. +-- +-- `clip_embedding` is the raw little-endian float32 buffer of an +-- L2-normalized vector (dim depends on the model — 768 bytes×4 for +-- ViT-L/14, 512 bytes×4 for ViT-B/32). Apollo always returns the +-- normalized form so the search-time dot product reduces to a plain +-- cosine similarity. +-- +-- `clip_model_version` echoes the upstream `APOLLO_CLIP_MODEL` (e.g. +-- "ViT-L/14"). A model swap shouldn't silently mix geometries — the +-- backfill drain will re-eligibilize rows whose stored model_version +-- differs from the live engine's, and the search route refuses to +-- mix rows from two model_versions in the same response. +ALTER TABLE image_exif ADD COLUMN clip_embedding BLOB; +ALTER TABLE image_exif ADD COLUMN clip_model_version TEXT; + +-- Partial index for the backfill drain. Mirrors the shape of +-- `idx_image_exif_date_backfill`: candidate rows are those with a +-- known content_hash (so we don't race the unhashed backlog) but no +-- embedding yet. SELECT cost stays O(missing rows) instead of full +-- table scan once the column is mostly populated. +CREATE INDEX IF NOT EXISTS idx_image_exif_clip_backfill + ON image_exif (id) + WHERE clip_embedding IS NULL AND content_hash IS NOT NULL; diff --git a/src/ai/clip_client.rs b/src/ai/clip_client.rs new file mode 100644 index 0000000..9b38a2a --- /dev/null +++ b/src/ai/clip_client.rs @@ -0,0 +1,393 @@ +//! Thin async HTTP client for Apollo's `/api/internal/clip/*` endpoints. +//! +//! Apollo hosts the OpenAI CLIP inference service (ViT-L/14 by default, +//! configurable via `APOLLO_CLIP_MODEL`). This client is the ImageApi side +//! of the contract: shove image bytes through `/encode_image` to populate +//! `image_exif.clip_embedding` during backfill, and call `/encode_text` to +//! encode a user's natural-language query at search time. The actual +//! cosine-similarity rerank runs locally in ImageApi. +//! +//! Mirrors `face_client.rs` / `tag_client.rs` shape: optional base URL +//! (None = disabled — feature off, drain and search no-op), reqwest +//! client with a generous timeout because GPU inference under a backlog +//! can queue server-side (Apollo's threadpool is bounded to 1 worker on +//! CUDA). +//! +//! Configured via `APOLLO_CLIP_API_BASE_URL`, falling back to +//! `APOLLO_API_BASE_URL` when the dedicated var is unset (single-Apollo +//! deploys are the common case). +//! +//! Wire format: +//! - `/encode_image`: multipart/form-data with `file=` and +//! `meta=` (content_hash / library_id / rel_path for logging). +//! - `/encode_text`: JSON `{"text": ""}`. +//! +//! Both return `{model_version, embedding_dim, duration_ms, embedding}` +//! where `embedding` is base64 of `dim×4` little-endian float32 bytes, +//! L2-normalized so the rerank reduces to a plain dot product. +//! +//! Error mapping (reflected in [`ClipError`]): +//! - 422 `decode_failed` / `empty_text` → permanent: ImageApi marks the +//! row failed or surfaces the empty-query error to the search caller. +//! - 503 `cuda_oom` / `engine_unavailable` → defer-and-retry: no marker. +//! - Any other 5xx / network error → defer. + +use anyhow::{Context, Result}; +use base64::Engine; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +#[derive(Debug, Clone, Serialize)] +pub struct EncodeImageMeta { + pub content_hash: String, + pub library_id: i32, + pub rel_path: String, +} + +#[derive(Debug, Clone, Deserialize)] +#[allow(dead_code)] // duration_ms logged by the backfill drain +pub struct EncodeResponse { + pub model_version: String, + pub embedding_dim: i32, + pub duration_ms: i64, + /// base64 of `embedding_dim * 4` bytes (LE float32). ImageApi stores + /// the decoded bytes verbatim as a BLOB. + pub embedding: String, +} + +impl EncodeResponse { + /// Decode the wire-format embedding back into raw bytes for storage. + /// Validates the buffer is `embedding_dim * 4` bytes long so a + /// malformed response surfaces here rather than as a downstream + /// silent length mismatch. + pub fn decode_embedding(&self) -> Result> { + let bytes = base64::engine::general_purpose::STANDARD + .decode(self.embedding.as_bytes()) + .context("clip embedding base64 decode")?; + let expected = (self.embedding_dim as usize) * 4; + if bytes.len() != expected { + anyhow::bail!( + "clip embedding wrong size: got {} bytes, expected {} ({} * 4)", + bytes.len(), + expected, + self.embedding_dim + ); + } + Ok(bytes) + } +} + +#[derive(Debug, Clone, Deserialize)] +#[allow(dead_code)] // load_error consumed by future health probe +pub struct ClipHealth { + pub loaded: bool, + pub device: String, + pub model_version: String, + pub embedding_dim: i32, + #[serde(default)] + pub load_error: Option, +} + +#[derive(Debug)] +pub enum ClipError { + /// Apollo refused for a reason that won't change on retry (decode + /// failure on /encode_image, empty text on /encode_text). + Permanent(anyhow::Error), + /// Apollo couldn't process this turn but might next time (CUDA OOM, + /// engine not loaded, network hiccup). + Transient(anyhow::Error), + /// Feature is disabled (no `APOLLO_CLIP_API_BASE_URL` / + /// `APOLLO_API_BASE_URL`). + Disabled, +} + +impl std::fmt::Display for ClipError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ClipError::Permanent(e) => write!(f, "permanent: {e}"), + ClipError::Transient(e) => write!(f, "transient: {e}"), + ClipError::Disabled => write!(f, "clip client disabled"), + } + } +} + +impl std::error::Error for ClipError {} + +#[derive(Clone)] +pub struct ClipClient { + client: Client, + base_url: Option, +} + +impl ClipClient { + pub fn new(base_url: Option) -> Self { + let timeout_secs = std::env::var("CLIP_REQUEST_TIMEOUT_SEC") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(60); + let client = Client::builder() + .timeout(Duration::from_secs(timeout_secs)) + .build() + .expect("reqwest client build"); + Self { + client, + base_url: base_url.map(|u| u.trim_end_matches('/').to_string()), + } + } + + /// Read both standard env vars. `APOLLO_CLIP_API_BASE_URL` wins; + /// fallback to `APOLLO_API_BASE_URL`. Both unset → disabled. + pub fn from_env() -> Self { + let base = std::env::var("APOLLO_CLIP_API_BASE_URL") + .ok() + .filter(|s| !s.trim().is_empty()) + .or_else(|| { + std::env::var("APOLLO_API_BASE_URL") + .ok() + .filter(|s| !s.trim().is_empty()) + }); + Self::new(base) + } + + pub fn is_enabled(&self) -> bool { + self.base_url.is_some() + } + + /// Encode an image to a 768-d (ViT-L/14) or 512-d (ViT-B/32) + /// L2-normalized embedding. Used by the backfill drain. + pub async fn encode_image( + &self, + bytes: Vec, + meta: EncodeImageMeta, + ) -> std::result::Result { + let Some(base) = self.base_url.as_deref() else { + return Err(ClipError::Disabled); + }; + let url = format!("{}/api/internal/clip/encode_image", base); + let meta_json = serde_json::to_string(&meta) + .map_err(|e| ClipError::Permanent(anyhow::anyhow!("meta serialize: {e}")))?; + let form = reqwest::multipart::Form::new() + .text("meta", meta_json) + .part( + "file", + reqwest::multipart::Part::bytes(bytes) + .file_name(meta.rel_path.clone()) + .mime_str("application/octet-stream") + .unwrap_or_else(|_| reqwest::multipart::Part::bytes(Vec::new())), + ); + self.send_multipart(&url, form).await + } + + /// Encode a natural-language query to an embedding. Used by the + /// search route to rank stored image embeddings by cosine sim. + pub async fn encode_text( + &self, + text: &str, + ) -> std::result::Result { + let Some(base) = self.base_url.as_deref() else { + return Err(ClipError::Disabled); + }; + let url = format!("{}/api/internal/clip/encode_text", base); + let body = serde_json::json!({ "text": text }); + + let resp = match self.client.post(&url).json(&body).send().await { + Ok(r) => r, + Err(e) if e.is_timeout() || e.is_connect() => { + return Err(ClipError::Transient(anyhow::anyhow!( + "clip client network: {e}" + ))); + } + Err(e) => { + return Err(ClipError::Transient(anyhow::anyhow!( + "clip client request: {e}" + ))); + } + }; + let status = resp.status(); + if status.is_success() { + let body: EncodeResponse = resp.json().await.map_err(|e| { + ClipError::Transient(anyhow::anyhow!("clip response decode: {e}")) + })?; + return Ok(body); + } + let body_text = resp.text().await.unwrap_or_default(); + Err(classify_error_response(status.as_u16(), &body_text)) + } + + /// Engine reachability + device/model report. Used as a startup + /// sanity check from the probe binary and (later) the backlog drain. + #[allow(dead_code)] // consumed by probe + drain + pub async fn health(&self) -> Result { + let base = self.base_url.as_deref().context("clip client disabled")?; + let url = format!("{}/api/internal/clip/health", base); + let resp = self.client.get(&url).send().await?.error_for_status()?; + let body: ClipHealth = resp.json().await?; + Ok(body) + } + + async fn send_multipart( + &self, + url: &str, + form: reqwest::multipart::Form, + ) -> std::result::Result { + let resp = match self.client.post(url).multipart(form).send().await { + Ok(r) => r, + Err(e) if e.is_timeout() || e.is_connect() => { + return Err(ClipError::Transient(anyhow::anyhow!( + "clip client network: {e}" + ))); + } + Err(e) => { + return Err(ClipError::Transient(anyhow::anyhow!( + "clip client request: {e}" + ))); + } + }; + let status = resp.status(); + if status.is_success() { + let body: EncodeResponse = resp.json().await.map_err(|e| { + ClipError::Transient(anyhow::anyhow!("clip response decode: {e}")) + })?; + return Ok(body); + } + let body_text = resp.text().await.unwrap_or_default(); + Err(classify_error_response(status.as_u16(), &body_text)) + } +} + +/// Pulled out as a pure function so the marker-row contract is unit- +/// testable without spinning up an HTTP server. Matches the shape used +/// by face_client::classify_error_response so future retry policies +/// can share code. +fn classify_error_response(status: u16, body_text: &str) -> ClipError { + let detail_code = serde_json::from_str::(body_text) + .ok() + .and_then(|v| { + v.get("detail") + .and_then(|d| d.as_str().map(str::to_string)) + .or_else(|| { + v.get("detail") + .and_then(|d| d.get("code")) + .and_then(|c| c.as_str()) + .map(str::to_string) + }) + }) + .unwrap_or_default(); + + if status == 422 { + return ClipError::Permanent(anyhow::anyhow!( + "clip {} {}: {}", + status, + detail_code, + body_text + )); + } + if status == 503 { + return ClipError::Transient(anyhow::anyhow!( + "clip {} {}: {}", + status, + detail_code, + body_text + )); + } + // 408 / 413 / 429 are operator-fixable infra issues; defer. + if matches!(status, 408 | 413 | 429) { + return ClipError::Transient(anyhow::anyhow!( + "clip {} {}: {}", + status, + detail_code, + body_text + )); + } + if (400..500).contains(&status) { + ClipError::Permanent(anyhow::anyhow!( + "clip {} {}: {}", + status, + detail_code, + body_text + )) + } else { + ClipError::Transient(anyhow::anyhow!( + "clip {} {}: {}", + status, + detail_code, + body_text + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn is_permanent(e: &ClipError) -> bool { + matches!(e, ClipError::Permanent(_)) + } + fn is_transient(e: &ClipError) -> bool { + matches!(e, ClipError::Transient(_)) + } + + #[test] + fn classify_422_decode_failed_is_permanent() { + assert!(is_permanent(&classify_error_response( + 422, + r#"{"detail":"decode_failed: bad bytes"}"# + ))); + } + + #[test] + fn classify_422_empty_text_is_permanent() { + assert!(is_permanent(&classify_error_response( + 422, + r#"{"detail":"empty_text"}"# + ))); + } + + #[test] + fn classify_503_cuda_oom_is_transient() { + assert!(is_transient(&classify_error_response( + 503, + r#"{"detail":{"code":"cuda_oom","error":"out of memory"}}"#, + ))); + } + + #[test] + fn classify_5xx_is_transient_other_4xx_is_permanent() { + assert!(is_transient(&classify_error_response(500, ""))); + assert!(is_permanent(&classify_error_response(404, "{}"))); + } + + #[test] + fn classify_infra_4xx_is_transient() { + assert!(is_transient(&classify_error_response(408, ""))); + assert!(is_transient(&classify_error_response(413, ""))); + assert!(is_transient(&classify_error_response(429, "{}"))); + } + + #[test] + fn decode_embedding_size_mismatch_errors() { + // dim=4 says we expect 16 bytes (4 floats × 4 bytes). Encode 8. + use base64::Engine; + let resp = EncodeResponse { + model_version: "ViT-L/14".into(), + embedding_dim: 4, + duration_ms: 0, + embedding: base64::engine::general_purpose::STANDARD.encode([0u8; 8]), + }; + assert!(resp.decode_embedding().is_err()); + } + + #[test] + fn decode_embedding_round_trip() { + use base64::Engine; + let bytes: Vec = (0..16).collect(); + let resp = EncodeResponse { + model_version: "ViT-L/14".into(), + embedding_dim: 4, + duration_ms: 0, + embedding: base64::engine::general_purpose::STANDARD.encode(&bytes), + }; + assert_eq!(resp.decode_embedding().unwrap(), bytes); + } +} diff --git a/src/ai/insight_chat.rs b/src/ai/insight_chat.rs index adb2da4..b2a7af8 100644 --- a/src/ai/insight_chat.rs +++ b/src/ai/insight_chat.rs @@ -2184,6 +2184,8 @@ mod tests { date_taken_source: None, original_date_taken: None, original_date_taken_source: None, + clip_embedding: None, + clip_model_version: None, }); let out = resolve_date_taken_for_context(&exif, "Screenshot_2014-06-01.png"); assert_eq!(out.as_deref(), Some("2021-08-15")); diff --git a/src/ai/mod.rs b/src/ai/mod.rs index d6fda90..3468325 100644 --- a/src/ai/mod.rs +++ b/src/ai/mod.rs @@ -1,4 +1,5 @@ pub mod apollo_client; +pub mod clip_client; pub mod daily_summary_job; pub mod face_client; pub mod handlers; diff --git a/src/bin/probe_clip_search.rs b/src/bin/probe_clip_search.rs new file mode 100644 index 0000000..45686ed --- /dev/null +++ b/src/bin/probe_clip_search.rs @@ -0,0 +1,268 @@ +//! Probe binary for CLIP semantic search. +//! +//! No DB writes. Walks a library's `image_exif` rows, encodes a sample +//! via Apollo's `/encode_image`, encodes the user's --query via +//! `/encode_text`, and prints the top-K most similar photos by cosine +//! similarity so the operator can eyeball quality before committing to +//! the persistence phase (column populated by backlog drain, search +//! endpoint, UI). +//! +//! Usage: +//! cargo run --release --bin probe_clip_search -- \ +//! --library 1 --limit 200 --query "a beach at sunset" --top 10 +//! +//! Env: standard ImageApi `.env`. Requires either +//! `APOLLO_CLIP_API_BASE_URL` or `APOLLO_API_BASE_URL` to be set. + +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; +use std::time::Instant; + +use clap::Parser; +use log::{info, warn}; + +use image_api::ai::clip_client::{ClipClient, ClipError, EncodeImageMeta}; +use image_api::database::{ExifDao, SqliteExifDao, connect}; +use image_api::exif; +use image_api::file_types; +use image_api::libraries::{self, Library}; + +#[derive(Parser, Debug)] +#[command(name = "probe_clip_search")] +#[command(about = "Top-K CLIP semantic search over a sample of image_exif rows")] +struct Args { + /// Library id to sample from. + #[arg(long)] + library: i32, + + /// Max files to encode. CPU inference is slow (~1-3 s per photo at + /// ViT-L/14); start small and grow once GPU is sorted. + #[arg(long, default_value_t = 50)] + limit: usize, + + /// Natural-language query. Empty triggers an error from Apollo. + #[arg(long)] + query: String, + + /// How many top results to print. + #[arg(long, default_value_t = 10)] + top: usize, + + /// Offset into the library's rel_path listing. + #[arg(long, default_value_t = 0)] + offset: i64, + + /// How many DB rows to scan before giving up on hitting the limit. + #[arg(long, default_value_t = 5000)] + max_scan: i64, +} + +/// Same as `face_watch::read_image_bytes_for_detect` (which is pub(crate)). +/// Inlined for the throwaway probe. +fn read_image_bytes(path: &Path) -> std::io::Result> { + if file_types::needs_ffmpeg_thumbnail(path) + && let Some(preview) = exif::extract_embedded_jpeg_preview(path) + { + return Ok(preview); + } + std::fs::read(path) +} + +/// Decode a base64'd LE float32 vector to a `Vec`. +fn decode_f32_vec(b64: &str) -> anyhow::Result> { + use base64::Engine; + let bytes = base64::engine::general_purpose::STANDARD.decode(b64.as_bytes())?; + if bytes.len() % 4 != 0 { + anyhow::bail!("embedding byte length {} not divisible by 4", bytes.len()); + } + let mut out = Vec::with_capacity(bytes.len() / 4); + for chunk in bytes.chunks_exact(4) { + out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])); + } + Ok(out) +} + +/// Plain dot product. Apollo L2-normalizes both sides, so this is cosine sim. +fn dot(a: &[f32], b: &[f32]) -> f32 { + a.iter().zip(b.iter()).map(|(x, y)| x * y).sum() +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + dotenv::dotenv().ok(); + + let args = Args::parse(); + if args.query.trim().is_empty() { + anyhow::bail!("--query must not be empty"); + } + + let client = ClipClient::from_env(); + if !client.is_enabled() { + anyhow::bail!( + "ClipClient disabled: set APOLLO_CLIP_API_BASE_URL or APOLLO_API_BASE_URL in .env" + ); + } + + match client.health().await { + Ok(h) => info!( + "clip engine: loaded={} device={} model={} dim={}", + h.loaded, h.device, h.model_version, h.embedding_dim + ), + Err(e) => warn!("health probe failed (continuing): {e}"), + } + + let mut seed_conn = connect(); + if let Some(base) = dotenv::var("BASE_PATH").ok().as_deref() { + libraries::seed_or_patch_from_env(&mut seed_conn, base); + } + let libs = libraries::load_all(&mut seed_conn); + drop(seed_conn); + let lib: Library = libs + .into_iter() + .find(|l| l.id == args.library) + .ok_or_else(|| anyhow::anyhow!("library id {} not found", args.library))?; + info!("probing library #{} ({}) at {}", lib.id, lib.name, lib.root_path); + + let dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteExifDao::new()))); + let ctx = opentelemetry::Context::new(); + + // Encode the query up-front so the long image-encode loop doesn't + // race a slow query encode. Fails fast on a misspelled query. + let query_resp = client + .encode_text(&args.query) + .await + .map_err(|e| anyhow::anyhow!("encode_text: {e}"))?; + let query_vec = decode_f32_vec(&query_resp.embedding)?; + info!( + "query encoded ({}d, {}ms): {:?}", + query_resp.embedding_dim, + query_resp.duration_ms, + args.query + ); + + // Page through (id, rel_path), filter to images on disk, encode up + // to `limit`. Each encoded photo gets scored against the query and + // kept in a top-K heap. + const PAGE: i64 = 500; + let mut offset = args.offset; + let mut scanned: i64 = 0; + let mut encoded = 0usize; + let mut perm_fail = 0usize; + let mut transient_fail = 0usize; + let root = PathBuf::from(&lib.root_path); + let started = Instant::now(); + // (similarity, rel_path) — we keep all scored results and sort at + // the end. With limit≤few-hundred this is trivial. + let mut scores: Vec<(f32, String)> = Vec::with_capacity(args.limit); + + 'outer: loop { + if scanned >= args.max_scan { + warn!( + "scan cap ({}) reached before hitting limit ({}); bump --max-scan to scan deeper", + args.max_scan, args.limit + ); + break; + } + let rows = { + let mut guard = dao.lock().expect("dao lock"); + guard + .list_rel_paths_for_library_page(&ctx, lib.id, PAGE, offset) + .map_err(|e| anyhow::anyhow!("list rel_paths: {:?}", e))? + }; + if rows.is_empty() { + info!("no more rows after offset {}", offset); + break; + } + offset += rows.len() as i64; + scanned += rows.len() as i64; + + for (_id, rel_path) in rows { + if encoded >= args.limit { + break 'outer; + } + let abs = root.join(&rel_path); + if !file_types::is_image_file(&abs) || !abs.exists() { + continue; + } + let bytes = match read_image_bytes(&abs) { + Ok(b) => b, + Err(e) => { + warn!("read {rel_path}: {e}"); + continue; + } + }; + let meta = EncodeImageMeta { + content_hash: String::new(), + library_id: lib.id, + rel_path: rel_path.clone(), + }; + let call_start = Instant::now(); + match client.encode_image(bytes, meta).await { + Ok(resp) => { + encoded += 1; + let vec = match decode_f32_vec(&resp.embedding) { + Ok(v) => v, + Err(e) => { + warn!("decode {rel_path}: {e}"); + continue; + } + }; + if vec.len() != query_vec.len() { + warn!( + "dim mismatch for {rel_path}: image={} query={}", + vec.len(), + query_vec.len() + ); + continue; + } + let sim = dot(&vec, &query_vec); + scores.push((sim, rel_path.clone())); + if encoded % 10 == 0 { + info!( + "progress: {} encoded, {:.1}s elapsed", + encoded, + started.elapsed().as_secs_f32() + ); + } + let _ = call_start; + } + Err(ClipError::Permanent(e)) => { + perm_fail += 1; + warn!("permanent encode failure for {rel_path}: {e}"); + } + Err(ClipError::Transient(e)) => { + transient_fail += 1; + warn!("transient encode failure for {rel_path}: {e}"); + } + Err(ClipError::Disabled) => { + anyhow::bail!("clip client became disabled mid-run; impossible"); + } + } + } + } + + scores.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); + let elapsed = started.elapsed(); + println!(); + println!("── top {} for query: {:?} ──", args.top.min(scores.len()), args.query); + for (i, (sim, path)) in scores.iter().take(args.top).enumerate() { + println!("[{:>2}] sim={:.3} {}", i + 1, sim, path); + } + println!(); + println!("── summary ─────────────────────────────────────"); + println!("query : {:?}", args.query); + println!("scanned rows : {scanned}"); + println!("encoded photos : {encoded}"); + println!("permanent failures : {perm_fail}"); + println!("transient failures : {transient_fail}"); + println!("elapsed : {:.1}s", elapsed.as_secs_f32()); + if encoded > 0 { + println!( + "throughput : {:.2} photos/s ({:.0}ms/photo avg)", + encoded as f32 / elapsed.as_secs_f32().max(0.001), + elapsed.as_millis() as f32 / encoded as f32 + ); + } + Ok(()) +} diff --git a/src/database/models.rs b/src/database/models.rs index b5e1c1e..9d005f5 100644 --- a/src/database/models.rs +++ b/src/database/models.rs @@ -114,6 +114,15 @@ pub struct ImageExif { /// Snapshot of the prior `date_taken_source` taken on first manual /// override. NULL when no override is active. pub original_date_taken_source: Option, + /// L2-normalized CLIP image embedding (raw little-endian float32 bytes; + /// length depends on the model — 768×4 for ViT-L/14, 512×4 for ViT-B/32). + /// NULL until Apollo's CLIP service has encoded this photo via the + /// backfill drain. Used by `/photos/search` for semantic queries. + pub clip_embedding: Option>, + /// Which CLIP model produced `clip_embedding` (e.g. `"ViT-L/14"`). A + /// swap of `APOLLO_CLIP_MODEL` re-eligibilizes rows whose stored + /// version differs so the drain rebuilds them. + pub clip_model_version: Option, } #[derive(Insertable)] diff --git a/src/database/schema.rs b/src/database/schema.rs index d001d80..28b5d26 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -138,6 +138,8 @@ diesel::table! { date_taken_source -> Nullable, original_date_taken -> Nullable, original_date_taken_source -> Nullable, + clip_embedding -> Nullable, + clip_model_version -> Nullable, } } diff --git a/src/files.rs b/src/files.rs index 91904e3..522f042 100644 --- a/src/files.rs +++ b/src/files.rs @@ -1511,6 +1511,8 @@ mod tests { date_taken_source, original_date_taken: None, original_date_taken_source: None, + clip_embedding: None, + clip_model_version: None, } } @@ -1550,6 +1552,8 @@ mod tests { date_taken_source: data.date_taken_source.clone(), original_date_taken: None, original_date_taken_source: None, + clip_embedding: None, + clip_model_version: None, }) } @@ -1596,6 +1600,8 @@ mod tests { date_taken_source: data.date_taken_source.clone(), original_date_taken: None, original_date_taken_source: None, + clip_embedding: None, + clip_model_version: None, }) }