//! CLIP-encoding pass for the file watcher. //! //! `process_clip_backlog` in `backfill.rs` calls [`run_clip_encoding_pass`] //! with the page of candidates returned by //! `ExifDao::list_clip_unencoded_candidates`. We walk those, fan out K //! parallel encode calls to Apollo, and persist the resulting embeddings //! into `image_exif.clip_embedding` / `clip_model_version`. //! //! Unlike the face pipeline, CLIP has no marker rows — a permanent //! failure (un-decodable bytes) leaves the row's `clip_embedding` NULL //! and the drain will retry on the next tick. For personal-library //! scale this is fine; the per-tick cap bounds the wasted work, and //! `file_types::is_image_file` filters out videos / non-media client- //! side so most permanent failures are decoded-but-corrupt files (rare). //! //! The watcher thread isn't in any pre-existing async context, so we //! build a short-lived tokio runtime per pass and `block_on` the join //! of K encode futures. Concurrency knob: `CLIP_ENCODE_CONCURRENCY` //! (default 4 — lower than faces because Apollo's CLIP path doesn't //! release the GIL between preprocess and forward as cleanly). use crate::ai::clip_client::{ClipClient, ClipError, EncodeImageMeta}; use crate::database::ExifDao; use crate::exif; use crate::file_types; use crate::libraries::Library; use crate::memories::PathExcluder; use log::{debug, info, warn}; use std::path::Path; use std::sync::{Arc, Mutex}; use tokio::sync::Semaphore; /// One file the watcher would like to CLIP-encode. Built from the DAO /// `list_clip_unencoded_candidates` result — needs the `content_hash` /// for traceability in Apollo's log lines, even though the embedding /// itself is keyed on `(library_id, rel_path)` for the back-write. #[derive(Debug, Clone)] pub struct ClipCandidate { pub rel_path: String, pub content_hash: String, } /// Synchronous entry point. Returns once every candidate has been /// processed (or definitively skipped). No-op when the client is /// disabled so the caller can call unconditionally. pub fn run_clip_encoding_pass( library: &Library, excluded_dirs: &[String], clip_client: &ClipClient, exif_dao: Arc>>, candidates: Vec, ) { if !clip_client.is_enabled() { return; } if candidates.is_empty() { return; } let base = Path::new(&library.root_path); let filtered = filter_excluded(base, excluded_dirs, candidates, Some(&library.name)); if filtered.is_empty() { return; } let concurrency: usize = std::env::var("CLIP_ENCODE_CONCURRENCY") .ok() .and_then(|s| s.parse().ok()) .filter(|n: &usize| *n > 0) .unwrap_or(4); info!( "clip_watch: encoding {} candidate(s) for library '{}' (concurrency {})", filtered.len(), library.name, concurrency ); let rt = match tokio::runtime::Builder::new_multi_thread() .worker_threads(2) .enable_all() .build() { Ok(rt) => rt, Err(e) => { warn!("clip_watch: failed to build tokio runtime: {e}"); return; } }; let library_id = library.id; let library_root = library.root_path.clone(); rt.block_on(async move { let sem = Arc::new(Semaphore::new(concurrency)); let mut handles = Vec::with_capacity(filtered.len()); for cand in filtered { let permit_sem = sem.clone(); let clip_client = clip_client.clone(); let exif_dao = exif_dao.clone(); let library_root = library_root.clone(); handles.push(tokio::spawn(async move { let _permit = permit_sem.acquire().await.expect("clip semaphore"); process_one(library_id, &library_root, cand, &clip_client, exif_dao).await; })); } for h in handles { let _ = h.await; } }); } async fn process_one( library_id: i32, library_root: &str, cand: ClipCandidate, clip_client: &ClipClient, exif_dao: Arc>>, ) { let abs = Path::new(library_root).join(&cand.rel_path); let bytes = match read_image_bytes_for_encode(&abs) { Ok(b) => b, Err(e) => { // Same rationale as face_watch: don't mark — the file may // have been moved/renamed mid-scan; let the next pass retry. warn!( "clip_watch: read failed for {} (lib {}): {}", cand.rel_path, library_id, e ); return; } }; let meta = EncodeImageMeta { content_hash: cand.content_hash.clone(), library_id, rel_path: cand.rel_path.clone(), }; let ctx = opentelemetry::Context::current(); match clip_client.encode_image(bytes, meta).await { Ok(resp) => { let emb_bytes = match resp.decode_embedding() { Ok(b) => b, Err(e) => { warn!("clip_watch: bad embedding for {}: {:?}", cand.rel_path, e); return; } }; let mut dao = exif_dao.lock().expect("exif dao"); if let Err(e) = dao.backfill_clip_embedding( &ctx, library_id, &cand.rel_path, &emb_bytes, &resp.model_version, ) { warn!( "clip_watch: backfill_clip_embedding failed for {}: {:?}", cand.rel_path, e ); return; } debug!( "clip_watch: {} → dim={} ({}ms, {})", cand.rel_path, resp.embedding_dim, resp.duration_ms, resp.model_version ); } Err(ClipError::Permanent(e)) => { // No marker — the row sits with NULL embedding and the drain // retries next pass. For personal-library scale the cost of // re-attempting permanently-broken files is bounded by the // per-tick cap. If this becomes a recurring noise source, // add a `clip_status` column with `failed` semantics like // face_detections has. warn!( "clip_watch: permanent failure on {} (will retry next pass): {}", cand.rel_path, e ); } Err(ClipError::Transient(e)) => { debug!( "clip_watch: transient on {}: {} (will retry next pass)", cand.rel_path, e ); } Err(ClipError::Disabled) => { // Defensive — the entry-point already checked is_enabled(). } } } /// Drop candidates whose paths land in an excluded dir or whose /// extension isn't an image. Mirrors `face_watch::filter_excluded` so /// the two backlogs stay shape-consistent. Library name is passed /// purely for the log line that surfaces an exclusion hit. pub fn filter_excluded( base: &Path, excluded_dirs: &[String], candidates: Vec, library_name: Option<&str>, ) -> Vec { let excluder = if excluded_dirs.is_empty() { None } else { Some(PathExcluder::new(base, excluded_dirs)) }; candidates .into_iter() .filter(|c| { let abs = base.join(&c.rel_path); if !file_types::is_image_file(&abs) { debug!( "clip_watch: skipping non-image '{}' (lib {})", c.rel_path, library_name.unwrap_or("") ); return false; } if let Some(ex) = excluder.as_ref() && ex.is_excluded(&abs) { debug!( "clip_watch: skipping excluded '{}' (lib {})", c.rel_path, library_name.unwrap_or("") ); return false; } true }) .collect() } /// Read image bytes for CLIP encoding. Same logic as /// `face_watch::read_image_bytes_for_detect` — RAW / HEIC files don't /// decode in Apollo's PIL pipeline, so we pull the embedded JPEG /// preview the thumbnail pipeline already extracts. Plain JPEG / PNG / /// WebP go through a direct read. pub fn read_image_bytes_for_encode(path: &Path) -> std::io::Result> { if file_types::needs_ffmpeg_thumbnail(path) && let Some(preview) = exif::extract_embedded_jpeg_preview(path) { return Ok(preview); } std::fs::read(path) }