//! Probe binary for CLIP semantic search. //! //! No DB writes. Walks a library's `image_exif` rows, encodes a sample //! via Apollo's `/encode_image`, encodes the user's --query via //! `/encode_text`, and prints the top-K most similar photos by cosine //! similarity so the operator can eyeball quality before committing to //! the persistence phase (column populated by backlog drain, search //! endpoint, UI). //! //! Usage: //! cargo run --release --bin probe_clip_search -- \ //! --library 1 --limit 200 --query "a beach at sunset" --top 10 //! //! Env: standard ImageApi `.env`. Requires either //! `APOLLO_CLIP_API_BASE_URL` or `APOLLO_API_BASE_URL` to be set. use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::Instant; use clap::Parser; use log::{info, warn}; use image_api::ai::clip_client::{ClipClient, ClipError, EncodeImageMeta}; use image_api::database::{ExifDao, SqliteExifDao, connect}; use image_api::exif; use image_api::file_types; use image_api::libraries::{self, Library}; #[derive(Parser, Debug)] #[command(name = "probe_clip_search")] #[command(about = "Top-K CLIP semantic search over a sample of image_exif rows")] struct Args { /// Library id to sample from. #[arg(long)] library: i32, /// Max files to encode. CPU inference is slow (~1-3 s per photo at /// ViT-L/14); start small and grow once GPU is sorted. #[arg(long, default_value_t = 50)] limit: usize, /// Natural-language query. Empty triggers an error from Apollo. #[arg(long)] query: String, /// How many top results to print. #[arg(long, default_value_t = 10)] top: usize, /// Offset into the library's rel_path listing. #[arg(long, default_value_t = 0)] offset: i64, /// How many DB rows to scan before giving up on hitting the limit. #[arg(long, default_value_t = 5000)] max_scan: i64, } /// Same as `face_watch::read_image_bytes_for_detect` (which is pub(crate)). /// Inlined for the throwaway probe. fn read_image_bytes(path: &Path) -> std::io::Result> { if file_types::needs_ffmpeg_thumbnail(path) && let Some(preview) = exif::extract_embedded_jpeg_preview(path) { return Ok(preview); } std::fs::read(path) } /// Decode a base64'd LE float32 vector to a `Vec`. fn decode_f32_vec(b64: &str) -> anyhow::Result> { use base64::Engine; let bytes = base64::engine::general_purpose::STANDARD.decode(b64.as_bytes())?; if bytes.len() % 4 != 0 { anyhow::bail!("embedding byte length {} not divisible by 4", bytes.len()); } let mut out = Vec::with_capacity(bytes.len() / 4); for chunk in bytes.chunks_exact(4) { out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])); } Ok(out) } /// Plain dot product. Apollo L2-normalizes both sides, so this is cosine sim. fn dot(a: &[f32], b: &[f32]) -> f32 { a.iter().zip(b.iter()).map(|(x, y)| x * y).sum() } #[tokio::main] async fn main() -> anyhow::Result<()> { env_logger::init(); dotenv::dotenv().ok(); let args = Args::parse(); if args.query.trim().is_empty() { anyhow::bail!("--query must not be empty"); } let client = ClipClient::from_env(); if !client.is_enabled() { anyhow::bail!( "ClipClient disabled: set APOLLO_CLIP_API_BASE_URL or APOLLO_API_BASE_URL in .env" ); } match client.health().await { Ok(h) => info!( "clip engine: loaded={} device={} model={} dim={}", h.loaded, h.device, h.model_version, h.embedding_dim ), Err(e) => warn!("health probe failed (continuing): {e}"), } let mut seed_conn = connect(); if let Some(base) = dotenv::var("BASE_PATH").ok().as_deref() { libraries::seed_or_patch_from_env(&mut seed_conn, base); } let libs = libraries::load_all(&mut seed_conn); drop(seed_conn); let lib: Library = libs .into_iter() .find(|l| l.id == args.library) .ok_or_else(|| anyhow::anyhow!("library id {} not found", args.library))?; info!( "probing library #{} ({}) at {}", lib.id, lib.name, lib.root_path ); let dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteExifDao::new()))); let ctx = opentelemetry::Context::new(); // Encode the query up-front so the long image-encode loop doesn't // race a slow query encode. Fails fast on a misspelled query. let query_resp = client .encode_text(&args.query) .await .map_err(|e| anyhow::anyhow!("encode_text: {e}"))?; let query_vec = decode_f32_vec(&query_resp.embedding)?; info!( "query encoded ({}d, {}ms): {:?}", query_resp.embedding_dim, query_resp.duration_ms, args.query ); // Page through (id, rel_path), filter to images on disk, encode up // to `limit`. Each encoded photo gets scored against the query and // kept in a top-K heap. const PAGE: i64 = 500; let mut offset = args.offset; let mut scanned: i64 = 0; let mut encoded = 0usize; let mut perm_fail = 0usize; let mut transient_fail = 0usize; let root = PathBuf::from(&lib.root_path); let started = Instant::now(); // (similarity, rel_path) — we keep all scored results and sort at // the end. With limit≤few-hundred this is trivial. let mut scores: Vec<(f32, String)> = Vec::with_capacity(args.limit); 'outer: loop { if scanned >= args.max_scan { warn!( "scan cap ({}) reached before hitting limit ({}); bump --max-scan to scan deeper", args.max_scan, args.limit ); break; } let rows = { let mut guard = dao.lock().expect("dao lock"); guard .list_rel_paths_for_library_page(&ctx, lib.id, PAGE, offset) .map_err(|e| anyhow::anyhow!("list rel_paths: {:?}", e))? }; if rows.is_empty() { info!("no more rows after offset {}", offset); break; } offset += rows.len() as i64; scanned += rows.len() as i64; for (_id, rel_path) in rows { if encoded >= args.limit { break 'outer; } let abs = root.join(&rel_path); if !file_types::is_image_file(&abs) || !abs.exists() { continue; } let bytes = match read_image_bytes(&abs) { Ok(b) => b, Err(e) => { warn!("read {rel_path}: {e}"); continue; } }; let meta = EncodeImageMeta { content_hash: String::new(), library_id: lib.id, rel_path: rel_path.clone(), }; let call_start = Instant::now(); match client.encode_image(bytes, meta).await { Ok(resp) => { encoded += 1; let vec = match decode_f32_vec(&resp.embedding) { Ok(v) => v, Err(e) => { warn!("decode {rel_path}: {e}"); continue; } }; if vec.len() != query_vec.len() { warn!( "dim mismatch for {rel_path}: image={} query={}", vec.len(), query_vec.len() ); continue; } let sim = dot(&vec, &query_vec); scores.push((sim, rel_path.clone())); if encoded % 10 == 0 { info!( "progress: {} encoded, {:.1}s elapsed", encoded, started.elapsed().as_secs_f32() ); } let _ = call_start; } Err(ClipError::Permanent(e)) => { perm_fail += 1; warn!("permanent encode failure for {rel_path}: {e}"); } Err(ClipError::Transient(e)) => { transient_fail += 1; warn!("transient encode failure for {rel_path}: {e}"); } Err(ClipError::Disabled) => { anyhow::bail!("clip client became disabled mid-run; impossible"); } } } } scores.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); let elapsed = started.elapsed(); println!(); println!( "── top {} for query: {:?} ──", args.top.min(scores.len()), args.query ); for (i, (sim, path)) in scores.iter().take(args.top).enumerate() { println!("[{:>2}] sim={:.3} {}", i + 1, sim, path); } println!(); println!("── summary ─────────────────────────────────────"); println!("query : {:?}", args.query); println!("scanned rows : {scanned}"); println!("encoded photos : {encoded}"); println!("permanent failures : {perm_fail}"); println!("transient failures : {transient_fail}"); println!("elapsed : {:.1}s", elapsed.as_secs_f32()); if encoded > 0 { println!( "throughput : {:.2} photos/s ({:.0}ms/photo avg)", encoded as f32 / elapsed.as_secs_f32().max(0.001), elapsed.as_millis() as f32 / encoded as f32 ); } Ok(()) }