clip-search: backlog drain + /photos/search endpoint
Wires the persistence layer for CLIP semantic search. The watcher's per-tick drain encodes any image_exif row with a known content_hash but no clip_embedding via Apollo (cap CLIP_BACKLOG_MAX_PER_TICK, default 32). On a query, /photos/search encodes the text via Apollo and reranks every stored embedding in-memory. ExifDao additions: - list_clip_unencoded_candidates — partial-index scan for drain - backfill_clip_embedding — touches only the two new columns - list_clip_index — dedup'd (hash, embedding) pull for search clip_watch::run_clip_encoding_pass is the parallel fan-out — tokio runtime per pass with CLIP_ENCODE_CONCURRENCY (default 4). No marker rows for permanent failures yet; per-tick cap bounds the retry cost. /photos/search params: q, limit, threshold (default 0.20), library, model_version. Response is intentionally minimal (path + score) so the frontend joins against existing photo-metadata routes lazily. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
249
src/clip_watch.rs
Normal file
249
src/clip_watch.rs
Normal file
@@ -0,0 +1,249 @@
|
||||
//! CLIP-encoding pass for the file watcher.
|
||||
//!
|
||||
//! `process_clip_backlog` in `backfill.rs` calls [`run_clip_encoding_pass`]
|
||||
//! with the page of candidates returned by
|
||||
//! `ExifDao::list_clip_unencoded_candidates`. We walk those, fan out K
|
||||
//! parallel encode calls to Apollo, and persist the resulting embeddings
|
||||
//! into `image_exif.clip_embedding` / `clip_model_version`.
|
||||
//!
|
||||
//! Unlike the face pipeline, CLIP has no marker rows — a permanent
|
||||
//! failure (un-decodable bytes) leaves the row's `clip_embedding` NULL
|
||||
//! and the drain will retry on the next tick. For personal-library
|
||||
//! scale this is fine; the per-tick cap bounds the wasted work, and
|
||||
//! `file_types::is_image_file` filters out videos / non-media client-
|
||||
//! side so most permanent failures are decoded-but-corrupt files (rare).
|
||||
//!
|
||||
//! The watcher thread isn't in any pre-existing async context, so we
|
||||
//! build a short-lived tokio runtime per pass and `block_on` the join
|
||||
//! of K encode futures. Concurrency knob: `CLIP_ENCODE_CONCURRENCY`
|
||||
//! (default 4 — lower than faces because Apollo's CLIP path doesn't
|
||||
//! release the GIL between preprocess and forward as cleanly).
|
||||
|
||||
use crate::ai::clip_client::{ClipClient, ClipError, EncodeImageMeta};
|
||||
use crate::database::ExifDao;
|
||||
use crate::exif;
|
||||
use crate::file_types;
|
||||
use crate::libraries::Library;
|
||||
use crate::memories::PathExcluder;
|
||||
use log::{debug, info, warn};
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
/// One file the watcher would like to CLIP-encode. Built from the DAO
|
||||
/// `list_clip_unencoded_candidates` result — needs the `content_hash`
|
||||
/// for traceability in Apollo's log lines, even though the embedding
|
||||
/// itself is keyed on `(library_id, rel_path)` for the back-write.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ClipCandidate {
|
||||
pub rel_path: String,
|
||||
pub content_hash: String,
|
||||
}
|
||||
|
||||
/// Synchronous entry point. Returns once every candidate has been
|
||||
/// processed (or definitively skipped). No-op when the client is
|
||||
/// disabled so the caller can call unconditionally.
|
||||
pub fn run_clip_encoding_pass(
|
||||
library: &Library,
|
||||
excluded_dirs: &[String],
|
||||
clip_client: &ClipClient,
|
||||
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
|
||||
candidates: Vec<ClipCandidate>,
|
||||
) {
|
||||
if !clip_client.is_enabled() {
|
||||
return;
|
||||
}
|
||||
if candidates.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let base = Path::new(&library.root_path);
|
||||
let filtered = filter_excluded(base, excluded_dirs, candidates, Some(&library.name));
|
||||
if filtered.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let concurrency: usize = std::env::var("CLIP_ENCODE_CONCURRENCY")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.filter(|n: &usize| *n > 0)
|
||||
.unwrap_or(4);
|
||||
|
||||
info!(
|
||||
"clip_watch: encoding {} candidate(s) for library '{}' (concurrency {})",
|
||||
filtered.len(),
|
||||
library.name,
|
||||
concurrency
|
||||
);
|
||||
|
||||
let rt = match tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(2)
|
||||
.enable_all()
|
||||
.build()
|
||||
{
|
||||
Ok(rt) => rt,
|
||||
Err(e) => {
|
||||
warn!("clip_watch: failed to build tokio runtime: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let library_id = library.id;
|
||||
let library_root = library.root_path.clone();
|
||||
rt.block_on(async move {
|
||||
let sem = Arc::new(Semaphore::new(concurrency));
|
||||
let mut handles = Vec::with_capacity(filtered.len());
|
||||
for cand in filtered {
|
||||
let permit_sem = sem.clone();
|
||||
let clip_client = clip_client.clone();
|
||||
let exif_dao = exif_dao.clone();
|
||||
let library_root = library_root.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
let _permit = permit_sem.acquire().await.expect("clip semaphore");
|
||||
process_one(library_id, &library_root, cand, &clip_client, exif_dao).await;
|
||||
}));
|
||||
}
|
||||
for h in handles {
|
||||
let _ = h.await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn process_one(
|
||||
library_id: i32,
|
||||
library_root: &str,
|
||||
cand: ClipCandidate,
|
||||
clip_client: &ClipClient,
|
||||
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
|
||||
) {
|
||||
let abs = Path::new(library_root).join(&cand.rel_path);
|
||||
let bytes = match read_image_bytes_for_encode(&abs) {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
// Same rationale as face_watch: don't mark — the file may
|
||||
// have been moved/renamed mid-scan; let the next pass retry.
|
||||
warn!(
|
||||
"clip_watch: read failed for {} (lib {}): {}",
|
||||
cand.rel_path, library_id, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let meta = EncodeImageMeta {
|
||||
content_hash: cand.content_hash.clone(),
|
||||
library_id,
|
||||
rel_path: cand.rel_path.clone(),
|
||||
};
|
||||
let ctx = opentelemetry::Context::current();
|
||||
|
||||
match clip_client.encode_image(bytes, meta).await {
|
||||
Ok(resp) => {
|
||||
let emb_bytes = match resp.decode_embedding() {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"clip_watch: bad embedding for {}: {:?}",
|
||||
cand.rel_path, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let mut dao = exif_dao.lock().expect("exif dao");
|
||||
if let Err(e) = dao.backfill_clip_embedding(
|
||||
&ctx,
|
||||
library_id,
|
||||
&cand.rel_path,
|
||||
&emb_bytes,
|
||||
&resp.model_version,
|
||||
) {
|
||||
warn!(
|
||||
"clip_watch: backfill_clip_embedding failed for {}: {:?}",
|
||||
cand.rel_path, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
debug!(
|
||||
"clip_watch: {} → dim={} ({}ms, {})",
|
||||
cand.rel_path, resp.embedding_dim, resp.duration_ms, resp.model_version
|
||||
);
|
||||
}
|
||||
Err(ClipError::Permanent(e)) => {
|
||||
// No marker — the row sits with NULL embedding and the drain
|
||||
// retries next pass. For personal-library scale the cost of
|
||||
// re-attempting permanently-broken files is bounded by the
|
||||
// per-tick cap. If this becomes a recurring noise source,
|
||||
// add a `clip_status` column with `failed` semantics like
|
||||
// face_detections has.
|
||||
warn!(
|
||||
"clip_watch: permanent failure on {} (will retry next pass): {}",
|
||||
cand.rel_path, e
|
||||
);
|
||||
}
|
||||
Err(ClipError::Transient(e)) => {
|
||||
debug!(
|
||||
"clip_watch: transient on {}: {} (will retry next pass)",
|
||||
cand.rel_path, e
|
||||
);
|
||||
}
|
||||
Err(ClipError::Disabled) => {
|
||||
// Defensive — the entry-point already checked is_enabled().
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop candidates whose paths land in an excluded dir or whose
|
||||
/// extension isn't an image. Mirrors `face_watch::filter_excluded` so
|
||||
/// the two backlogs stay shape-consistent. Library name is passed
|
||||
/// purely for the log line that surfaces an exclusion hit.
|
||||
pub fn filter_excluded(
|
||||
base: &Path,
|
||||
excluded_dirs: &[String],
|
||||
candidates: Vec<ClipCandidate>,
|
||||
library_name: Option<&str>,
|
||||
) -> Vec<ClipCandidate> {
|
||||
let excluder = if excluded_dirs.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(PathExcluder::new(base, excluded_dirs))
|
||||
};
|
||||
candidates
|
||||
.into_iter()
|
||||
.filter(|c| {
|
||||
let abs = base.join(&c.rel_path);
|
||||
if !file_types::is_image_file(&abs) {
|
||||
debug!(
|
||||
"clip_watch: skipping non-image '{}' (lib {})",
|
||||
c.rel_path,
|
||||
library_name.unwrap_or("<unknown>")
|
||||
);
|
||||
return false;
|
||||
}
|
||||
if let Some(ex) = excluder.as_ref()
|
||||
&& ex.is_excluded(&abs)
|
||||
{
|
||||
debug!(
|
||||
"clip_watch: skipping excluded '{}' (lib {})",
|
||||
c.rel_path,
|
||||
library_name.unwrap_or("<unknown>")
|
||||
);
|
||||
return false;
|
||||
}
|
||||
true
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Read image bytes for CLIP encoding. Same logic as
|
||||
/// `face_watch::read_image_bytes_for_detect` — RAW / HEIC files don't
|
||||
/// decode in Apollo's PIL pipeline, so we pull the embedded JPEG
|
||||
/// preview the thumbnail pipeline already extracts. Plain JPEG / PNG /
|
||||
/// WebP go through a direct read.
|
||||
pub fn read_image_bytes_for_encode(path: &Path) -> std::io::Result<Vec<u8>> {
|
||||
if file_types::needs_ffmpeg_thumbnail(path)
|
||||
&& let Some(preview) = exif::extract_embedded_jpeg_preview(path)
|
||||
{
|
||||
return Ok(preview);
|
||||
}
|
||||
std::fs::read(path)
|
||||
}
|
||||
Reference in New Issue
Block a user