feature/clip-semantic-search #96
23
.env.example
23
.env.example
@@ -54,10 +54,12 @@ AGENTIC_CHAT_MAX_ITERATIONS=6
|
||||
# OPENROUTER_APP_TITLE=ImageApi
|
||||
|
||||
# ── AI Insights — sibling services (optional) ───────────────────────────
|
||||
# Apollo (places + face inference). Single Apollo deploys typically set
|
||||
# only APOLLO_API_BASE_URL and let the face client fall back to it.
|
||||
# Apollo (places, face inference, CLIP encoders). Single-Apollo deploys
|
||||
# typically set only APOLLO_API_BASE_URL and let the face + CLIP
|
||||
# clients fall back to it.
|
||||
# APOLLO_API_BASE_URL=http://apollo.lan:8000
|
||||
# APOLLO_FACE_API_BASE_URL=http://apollo.lan:8000
|
||||
# APOLLO_CLIP_API_BASE_URL=http://apollo.lan:8000
|
||||
# SMS_API_URL=http://localhost:8000
|
||||
# SMS_API_TOKEN=
|
||||
|
||||
@@ -80,6 +82,23 @@ FACE_DETECT_TIMEOUT_SEC=60
|
||||
FACE_BACKLOG_MAX_PER_TICK=64
|
||||
FACE_HASH_BACKFILL_MAX_PER_TICK=2000
|
||||
|
||||
# ── CLIP semantic photo search ──────────────────────────────────────────
|
||||
# ImageApi calls Apollo's /api/internal/clip/{encode_image,encode_text}
|
||||
# to populate per-photo embeddings during the watcher's backlog drain
|
||||
# and to encode user queries at /photos/search time. Disabled when
|
||||
# neither APOLLO_CLIP_API_BASE_URL nor APOLLO_API_BASE_URL is set.
|
||||
#
|
||||
# Per-watcher-tick cap on the encode drain. Default 32 ≈ ~1 photo/sec
|
||||
# on CPU, ~30 photos/sec on a single-GPU host (Apollo's threadpool
|
||||
# is 1 on CUDA, so concurrency is bounded server-side regardless of
|
||||
# our setting). Bump on a fresh deploy to clear the backlog faster.
|
||||
CLIP_BACKLOG_MAX_PER_TICK=32
|
||||
# Client-side parallel encode calls per drain pass. Apollo's GPU pool
|
||||
# serializes server-side; this just overlaps file-IO with inference.
|
||||
CLIP_ENCODE_CONCURRENCY=4
|
||||
# Per-encode HTTP timeout. CPU-only Apollo deploys may need higher.
|
||||
CLIP_REQUEST_TIMEOUT_SEC=60
|
||||
|
||||
# ── RAG / search ────────────────────────────────────────────────────────
|
||||
# Set to `1` to enable cross-encoder reranking on /search results.
|
||||
SEARCH_RAG_RERANK=0
|
||||
|
||||
3
migrations/2026-05-14-000000_add_clip_embedding/down.sql
Normal file
3
migrations/2026-05-14-000000_add_clip_embedding/down.sql
Normal file
@@ -0,0 +1,3 @@
|
||||
DROP INDEX IF EXISTS idx_image_exif_clip_backfill;
|
||||
ALTER TABLE image_exif DROP COLUMN clip_model_version;
|
||||
ALTER TABLE image_exif DROP COLUMN clip_embedding;
|
||||
27
migrations/2026-05-14-000000_add_clip_embedding/up.sql
Normal file
27
migrations/2026-05-14-000000_add_clip_embedding/up.sql
Normal file
@@ -0,0 +1,27 @@
|
||||
-- CLIP semantic photo search: store a per-photo image embedding so
|
||||
-- text queries can rerank against the live library via cosine
|
||||
-- similarity. Apollo encodes the bytes via its CLIP service; ImageApi
|
||||
-- writes the resulting blob here.
|
||||
--
|
||||
-- `clip_embedding` is the raw little-endian float32 buffer of an
|
||||
-- L2-normalized vector (dim depends on the model — 768 bytes×4 for
|
||||
-- ViT-L/14, 512 bytes×4 for ViT-B/32). Apollo always returns the
|
||||
-- normalized form so the search-time dot product reduces to a plain
|
||||
-- cosine similarity.
|
||||
--
|
||||
-- `clip_model_version` echoes the upstream `APOLLO_CLIP_MODEL` (e.g.
|
||||
-- "ViT-L/14"). A model swap shouldn't silently mix geometries — the
|
||||
-- backfill drain will re-eligibilize rows whose stored model_version
|
||||
-- differs from the live engine's, and the search route refuses to
|
||||
-- mix rows from two model_versions in the same response.
|
||||
ALTER TABLE image_exif ADD COLUMN clip_embedding BLOB;
|
||||
ALTER TABLE image_exif ADD COLUMN clip_model_version TEXT;
|
||||
|
||||
-- Partial index for the backfill drain. Mirrors the shape of
|
||||
-- `idx_image_exif_date_backfill`: candidate rows are those with a
|
||||
-- known content_hash (so we don't race the unhashed backlog) but no
|
||||
-- embedding yet. SELECT cost stays O(missing rows) instead of full
|
||||
-- table scan once the column is mostly populated.
|
||||
CREATE INDEX IF NOT EXISTS idx_image_exif_clip_backfill
|
||||
ON image_exif (id)
|
||||
WHERE clip_embedding IS NULL AND content_hash IS NOT NULL;
|
||||
392
src/ai/clip_client.rs
Normal file
392
src/ai/clip_client.rs
Normal file
@@ -0,0 +1,392 @@
|
||||
//! Thin async HTTP client for Apollo's `/api/internal/clip/*` endpoints.
|
||||
//!
|
||||
//! Apollo hosts the OpenAI CLIP inference service (ViT-L/14 by default,
|
||||
//! configurable via `APOLLO_CLIP_MODEL`). This client is the ImageApi side
|
||||
//! of the contract: shove image bytes through `/encode_image` to populate
|
||||
//! `image_exif.clip_embedding` during backfill, and call `/encode_text` to
|
||||
//! encode a user's natural-language query at search time. The actual
|
||||
//! cosine-similarity rerank runs locally in ImageApi.
|
||||
//!
|
||||
//! Mirrors `face_client.rs` / `tag_client.rs` shape: optional base URL
|
||||
//! (None = disabled — feature off, drain and search no-op), reqwest
|
||||
//! client with a generous timeout because GPU inference under a backlog
|
||||
//! can queue server-side (Apollo's threadpool is bounded to 1 worker on
|
||||
//! CUDA).
|
||||
//!
|
||||
//! Configured via `APOLLO_CLIP_API_BASE_URL`, falling back to
|
||||
//! `APOLLO_API_BASE_URL` when the dedicated var is unset (single-Apollo
|
||||
//! deploys are the common case).
|
||||
//!
|
||||
//! Wire format:
|
||||
//! - `/encode_image`: multipart/form-data with `file=<bytes>` and
|
||||
//! `meta=<json>` (content_hash / library_id / rel_path for logging).
|
||||
//! - `/encode_text`: JSON `{"text": "<query>"}`.
|
||||
//!
|
||||
//! Both return `{model_version, embedding_dim, duration_ms, embedding}`
|
||||
//! where `embedding` is base64 of `dim×4` little-endian float32 bytes,
|
||||
//! L2-normalized so the rerank reduces to a plain dot product.
|
||||
//!
|
||||
//! Error mapping (reflected in [`ClipError`]):
|
||||
//! - 422 `decode_failed` / `empty_text` → permanent: ImageApi marks the
|
||||
//! row failed or surfaces the empty-query error to the search caller.
|
||||
//! - 503 `cuda_oom` / `engine_unavailable` → defer-and-retry: no marker.
|
||||
//! - Any other 5xx / network error → defer.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use base64::Engine;
|
||||
use reqwest::Client;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct EncodeImageMeta {
|
||||
pub content_hash: String,
|
||||
pub library_id: i32,
|
||||
pub rel_path: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[allow(dead_code)] // duration_ms logged by the backfill drain
|
||||
pub struct EncodeResponse {
|
||||
pub model_version: String,
|
||||
pub embedding_dim: i32,
|
||||
pub duration_ms: i64,
|
||||
/// base64 of `embedding_dim * 4` bytes (LE float32). ImageApi stores
|
||||
/// the decoded bytes verbatim as a BLOB.
|
||||
pub embedding: String,
|
||||
}
|
||||
|
||||
impl EncodeResponse {
|
||||
/// Decode the wire-format embedding back into raw bytes for storage.
|
||||
/// Validates the buffer is `embedding_dim * 4` bytes long so a
|
||||
/// malformed response surfaces here rather than as a downstream
|
||||
/// silent length mismatch.
|
||||
pub fn decode_embedding(&self) -> Result<Vec<u8>> {
|
||||
let bytes = base64::engine::general_purpose::STANDARD
|
||||
.decode(self.embedding.as_bytes())
|
||||
.context("clip embedding base64 decode")?;
|
||||
let expected = (self.embedding_dim as usize) * 4;
|
||||
if bytes.len() != expected {
|
||||
anyhow::bail!(
|
||||
"clip embedding wrong size: got {} bytes, expected {} ({} * 4)",
|
||||
bytes.len(),
|
||||
expected,
|
||||
self.embedding_dim
|
||||
);
|
||||
}
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[allow(dead_code)] // load_error consumed by future health probe
|
||||
pub struct ClipHealth {
|
||||
pub loaded: bool,
|
||||
pub device: String,
|
||||
pub model_version: String,
|
||||
pub embedding_dim: i32,
|
||||
#[serde(default)]
|
||||
pub load_error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ClipError {
|
||||
/// Apollo refused for a reason that won't change on retry (decode
|
||||
/// failure on /encode_image, empty text on /encode_text).
|
||||
Permanent(anyhow::Error),
|
||||
/// Apollo couldn't process this turn but might next time (CUDA OOM,
|
||||
/// engine not loaded, network hiccup).
|
||||
Transient(anyhow::Error),
|
||||
/// Feature is disabled (no `APOLLO_CLIP_API_BASE_URL` /
|
||||
/// `APOLLO_API_BASE_URL`).
|
||||
Disabled,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ClipError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
ClipError::Permanent(e) => write!(f, "permanent: {e}"),
|
||||
ClipError::Transient(e) => write!(f, "transient: {e}"),
|
||||
ClipError::Disabled => write!(f, "clip client disabled"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for ClipError {}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ClipClient {
|
||||
client: Client,
|
||||
base_url: Option<String>,
|
||||
}
|
||||
|
||||
impl ClipClient {
|
||||
pub fn new(base_url: Option<String>) -> Self {
|
||||
let timeout_secs = std::env::var("CLIP_REQUEST_TIMEOUT_SEC")
|
||||
.ok()
|
||||
.and_then(|s| s.parse::<u64>().ok())
|
||||
.unwrap_or(60);
|
||||
let client = Client::builder()
|
||||
.timeout(Duration::from_secs(timeout_secs))
|
||||
.build()
|
||||
.expect("reqwest client build");
|
||||
Self {
|
||||
client,
|
||||
base_url: base_url.map(|u| u.trim_end_matches('/').to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Read both standard env vars. `APOLLO_CLIP_API_BASE_URL` wins;
|
||||
/// fallback to `APOLLO_API_BASE_URL`. Both unset → disabled.
|
||||
pub fn from_env() -> Self {
|
||||
let base = std::env::var("APOLLO_CLIP_API_BASE_URL")
|
||||
.ok()
|
||||
.filter(|s| !s.trim().is_empty())
|
||||
.or_else(|| {
|
||||
std::env::var("APOLLO_API_BASE_URL")
|
||||
.ok()
|
||||
.filter(|s| !s.trim().is_empty())
|
||||
});
|
||||
Self::new(base)
|
||||
}
|
||||
|
||||
pub fn is_enabled(&self) -> bool {
|
||||
self.base_url.is_some()
|
||||
}
|
||||
|
||||
/// Encode an image to a 768-d (ViT-L/14) or 512-d (ViT-B/32)
|
||||
/// L2-normalized embedding. Used by the backfill drain.
|
||||
pub async fn encode_image(
|
||||
&self,
|
||||
bytes: Vec<u8>,
|
||||
meta: EncodeImageMeta,
|
||||
) -> std::result::Result<EncodeResponse, ClipError> {
|
||||
let Some(base) = self.base_url.as_deref() else {
|
||||
return Err(ClipError::Disabled);
|
||||
};
|
||||
let url = format!("{}/api/internal/clip/encode_image", base);
|
||||
let meta_json = serde_json::to_string(&meta)
|
||||
.map_err(|e| ClipError::Permanent(anyhow::anyhow!("meta serialize: {e}")))?;
|
||||
let form = reqwest::multipart::Form::new()
|
||||
.text("meta", meta_json)
|
||||
.part(
|
||||
"file",
|
||||
reqwest::multipart::Part::bytes(bytes)
|
||||
.file_name(meta.rel_path.clone())
|
||||
.mime_str("application/octet-stream")
|
||||
.unwrap_or_else(|_| reqwest::multipart::Part::bytes(Vec::new())),
|
||||
);
|
||||
self.send_multipart(&url, form).await
|
||||
}
|
||||
|
||||
/// Encode a natural-language query to an embedding. Used by the
|
||||
/// search route to rank stored image embeddings by cosine sim.
|
||||
pub async fn encode_text(&self, text: &str) -> std::result::Result<EncodeResponse, ClipError> {
|
||||
let Some(base) = self.base_url.as_deref() else {
|
||||
return Err(ClipError::Disabled);
|
||||
};
|
||||
let url = format!("{}/api/internal/clip/encode_text", base);
|
||||
let body = serde_json::json!({ "text": text });
|
||||
|
||||
let resp = match self.client.post(&url).json(&body).send().await {
|
||||
Ok(r) => r,
|
||||
Err(e) if e.is_timeout() || e.is_connect() => {
|
||||
return Err(ClipError::Transient(anyhow::anyhow!(
|
||||
"clip client network: {e}"
|
||||
)));
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(ClipError::Transient(anyhow::anyhow!(
|
||||
"clip client request: {e}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
let status = resp.status();
|
||||
if status.is_success() {
|
||||
let body: EncodeResponse = resp
|
||||
.json()
|
||||
.await
|
||||
.map_err(|e| ClipError::Transient(anyhow::anyhow!("clip response decode: {e}")))?;
|
||||
return Ok(body);
|
||||
}
|
||||
let body_text = resp.text().await.unwrap_or_default();
|
||||
Err(classify_error_response(status.as_u16(), &body_text))
|
||||
}
|
||||
|
||||
/// Engine reachability + device/model report. Used as a startup
|
||||
/// sanity check from the probe binary and (later) the backlog drain.
|
||||
#[allow(dead_code)] // consumed by probe + drain
|
||||
pub async fn health(&self) -> Result<ClipHealth> {
|
||||
let base = self.base_url.as_deref().context("clip client disabled")?;
|
||||
let url = format!("{}/api/internal/clip/health", base);
|
||||
let resp = self.client.get(&url).send().await?.error_for_status()?;
|
||||
let body: ClipHealth = resp.json().await?;
|
||||
Ok(body)
|
||||
}
|
||||
|
||||
async fn send_multipart(
|
||||
&self,
|
||||
url: &str,
|
||||
form: reqwest::multipart::Form,
|
||||
) -> std::result::Result<EncodeResponse, ClipError> {
|
||||
let resp = match self.client.post(url).multipart(form).send().await {
|
||||
Ok(r) => r,
|
||||
Err(e) if e.is_timeout() || e.is_connect() => {
|
||||
return Err(ClipError::Transient(anyhow::anyhow!(
|
||||
"clip client network: {e}"
|
||||
)));
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(ClipError::Transient(anyhow::anyhow!(
|
||||
"clip client request: {e}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
let status = resp.status();
|
||||
if status.is_success() {
|
||||
let body: EncodeResponse = resp
|
||||
.json()
|
||||
.await
|
||||
.map_err(|e| ClipError::Transient(anyhow::anyhow!("clip response decode: {e}")))?;
|
||||
return Ok(body);
|
||||
}
|
||||
let body_text = resp.text().await.unwrap_or_default();
|
||||
Err(classify_error_response(status.as_u16(), &body_text))
|
||||
}
|
||||
}
|
||||
|
||||
/// Pulled out as a pure function so the marker-row contract is unit-
|
||||
/// testable without spinning up an HTTP server. Matches the shape used
|
||||
/// by face_client::classify_error_response so future retry policies
|
||||
/// can share code.
|
||||
fn classify_error_response(status: u16, body_text: &str) -> ClipError {
|
||||
let detail_code = serde_json::from_str::<serde_json::Value>(body_text)
|
||||
.ok()
|
||||
.and_then(|v| {
|
||||
v.get("detail")
|
||||
.and_then(|d| d.as_str().map(str::to_string))
|
||||
.or_else(|| {
|
||||
v.get("detail")
|
||||
.and_then(|d| d.get("code"))
|
||||
.and_then(|c| c.as_str())
|
||||
.map(str::to_string)
|
||||
})
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
if status == 422 {
|
||||
return ClipError::Permanent(anyhow::anyhow!(
|
||||
"clip {} {}: {}",
|
||||
status,
|
||||
detail_code,
|
||||
body_text
|
||||
));
|
||||
}
|
||||
if status == 503 {
|
||||
return ClipError::Transient(anyhow::anyhow!(
|
||||
"clip {} {}: {}",
|
||||
status,
|
||||
detail_code,
|
||||
body_text
|
||||
));
|
||||
}
|
||||
// 408 / 413 / 429 are operator-fixable infra issues; defer.
|
||||
if matches!(status, 408 | 413 | 429) {
|
||||
return ClipError::Transient(anyhow::anyhow!(
|
||||
"clip {} {}: {}",
|
||||
status,
|
||||
detail_code,
|
||||
body_text
|
||||
));
|
||||
}
|
||||
if (400..500).contains(&status) {
|
||||
ClipError::Permanent(anyhow::anyhow!(
|
||||
"clip {} {}: {}",
|
||||
status,
|
||||
detail_code,
|
||||
body_text
|
||||
))
|
||||
} else {
|
||||
ClipError::Transient(anyhow::anyhow!(
|
||||
"clip {} {}: {}",
|
||||
status,
|
||||
detail_code,
|
||||
body_text
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn is_permanent(e: &ClipError) -> bool {
|
||||
matches!(e, ClipError::Permanent(_))
|
||||
}
|
||||
fn is_transient(e: &ClipError) -> bool {
|
||||
matches!(e, ClipError::Transient(_))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_422_decode_failed_is_permanent() {
|
||||
assert!(is_permanent(&classify_error_response(
|
||||
422,
|
||||
r#"{"detail":"decode_failed: bad bytes"}"#
|
||||
)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_422_empty_text_is_permanent() {
|
||||
assert!(is_permanent(&classify_error_response(
|
||||
422,
|
||||
r#"{"detail":"empty_text"}"#
|
||||
)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_503_cuda_oom_is_transient() {
|
||||
assert!(is_transient(&classify_error_response(
|
||||
503,
|
||||
r#"{"detail":{"code":"cuda_oom","error":"out of memory"}}"#,
|
||||
)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_5xx_is_transient_other_4xx_is_permanent() {
|
||||
assert!(is_transient(&classify_error_response(500, "")));
|
||||
assert!(is_permanent(&classify_error_response(404, "{}")));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_infra_4xx_is_transient() {
|
||||
assert!(is_transient(&classify_error_response(408, "")));
|
||||
assert!(is_transient(&classify_error_response(413, "<html>")));
|
||||
assert!(is_transient(&classify_error_response(429, "{}")));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_embedding_size_mismatch_errors() {
|
||||
// dim=4 says we expect 16 bytes (4 floats × 4 bytes). Encode 8.
|
||||
use base64::Engine;
|
||||
let resp = EncodeResponse {
|
||||
model_version: "ViT-L/14".into(),
|
||||
embedding_dim: 4,
|
||||
duration_ms: 0,
|
||||
embedding: base64::engine::general_purpose::STANDARD.encode([0u8; 8]),
|
||||
};
|
||||
assert!(resp.decode_embedding().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_embedding_round_trip() {
|
||||
use base64::Engine;
|
||||
let bytes: Vec<u8> = (0..16).collect();
|
||||
let resp = EncodeResponse {
|
||||
model_version: "ViT-L/14".into(),
|
||||
embedding_dim: 4,
|
||||
duration_ms: 0,
|
||||
embedding: base64::engine::general_purpose::STANDARD.encode(&bytes),
|
||||
};
|
||||
assert_eq!(resp.decode_embedding().unwrap(), bytes);
|
||||
}
|
||||
}
|
||||
@@ -2184,6 +2184,8 @@ mod tests {
|
||||
date_taken_source: None,
|
||||
original_date_taken: None,
|
||||
original_date_taken_source: None,
|
||||
clip_embedding: None,
|
||||
clip_model_version: None,
|
||||
});
|
||||
let out = resolve_date_taken_for_context(&exif, "Screenshot_2014-06-01.png");
|
||||
assert_eq!(out.as_deref(), Some("2021-08-15"));
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
pub mod apollo_client;
|
||||
pub mod clip_client;
|
||||
pub mod daily_summary_job;
|
||||
pub mod face_client;
|
||||
pub mod handlers;
|
||||
|
||||
@@ -220,6 +220,76 @@ pub fn backfill_missing_date_taken(
|
||||
/// unscanned image_exif rows directly via the FaceDao anti-join and
|
||||
/// hands them to the existing detection pass. Runs on every tick (not
|
||||
/// just full scans) so the backlog moves at quick-scan cadence.
|
||||
/// Per-tick CLIP encoding drain. Mirrors `process_face_backlog`: pull
|
||||
/// up to `CLIP_BACKLOG_MAX_PER_TICK` candidates with a known
|
||||
/// `content_hash` but no `clip_embedding`, hand them to
|
||||
/// `clip_watch::run_clip_encoding_pass` for parallel fan-out, and let
|
||||
/// that module write the result back via `backfill_clip_embedding`.
|
||||
///
|
||||
/// Idempotent — a row stays in the candidate set until its embedding
|
||||
/// lands, so a transient failure (Apollo unreachable, CUDA OOM) just
|
||||
/// defers to the next tick. Permanent failures (un-decodable bytes)
|
||||
/// retry every tick at this point; future Branch may add a status
|
||||
/// column like face_detections has.
|
||||
pub fn process_clip_backlog(
|
||||
context: &opentelemetry::Context,
|
||||
library: &libraries::Library,
|
||||
clip_client: &crate::ai::clip_client::ClipClient,
|
||||
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
|
||||
excluded_dirs: &[String],
|
||||
) {
|
||||
if !clip_client.is_enabled() {
|
||||
return;
|
||||
}
|
||||
let cap: i64 = dotenv::var("CLIP_BACKLOG_MAX_PER_TICK")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.filter(|n: &i64| *n > 0)
|
||||
.unwrap_or(32);
|
||||
|
||||
let rows: Vec<(String, String)> = {
|
||||
let mut dao = exif_dao.lock().expect("exif dao");
|
||||
match dao.list_clip_unencoded_candidates(context, library.id, cap) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"clip_watch: list_clip_unencoded_candidates failed for library '{}': {:?}",
|
||||
library.name, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
if rows.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
info!(
|
||||
"clip_watch: backlog drain — encoding {} candidate(s) for library '{}' (cap={})",
|
||||
rows.len(),
|
||||
library.name,
|
||||
cap
|
||||
);
|
||||
|
||||
let candidates: Vec<crate::clip_watch::ClipCandidate> = rows
|
||||
.into_iter()
|
||||
.map(
|
||||
|(rel_path, content_hash)| crate::clip_watch::ClipCandidate {
|
||||
rel_path,
|
||||
content_hash,
|
||||
},
|
||||
)
|
||||
.collect();
|
||||
|
||||
crate::clip_watch::run_clip_encoding_pass(
|
||||
library,
|
||||
excluded_dirs,
|
||||
clip_client,
|
||||
Arc::clone(exif_dao),
|
||||
candidates,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn process_face_backlog(
|
||||
context: &opentelemetry::Context,
|
||||
library: &libraries::Library,
|
||||
|
||||
273
src/bin/probe_clip_search.rs
Normal file
273
src/bin/probe_clip_search.rs
Normal file
@@ -0,0 +1,273 @@
|
||||
//! Probe binary for CLIP semantic search.
|
||||
//!
|
||||
//! No DB writes. Walks a library's `image_exif` rows, encodes a sample
|
||||
//! via Apollo's `/encode_image`, encodes the user's --query via
|
||||
//! `/encode_text`, and prints the top-K most similar photos by cosine
|
||||
//! similarity so the operator can eyeball quality before committing to
|
||||
//! the persistence phase (column populated by backlog drain, search
|
||||
//! endpoint, UI).
|
||||
//!
|
||||
//! Usage:
|
||||
//! cargo run --release --bin probe_clip_search -- \
|
||||
//! --library 1 --limit 200 --query "a beach at sunset" --top 10
|
||||
//!
|
||||
//! Env: standard ImageApi `.env`. Requires either
|
||||
//! `APOLLO_CLIP_API_BASE_URL` or `APOLLO_API_BASE_URL` to be set.
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Instant;
|
||||
|
||||
use clap::Parser;
|
||||
use log::{info, warn};
|
||||
|
||||
use image_api::ai::clip_client::{ClipClient, ClipError, EncodeImageMeta};
|
||||
use image_api::database::{ExifDao, SqliteExifDao, connect};
|
||||
use image_api::exif;
|
||||
use image_api::file_types;
|
||||
use image_api::libraries::{self, Library};
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(name = "probe_clip_search")]
|
||||
#[command(about = "Top-K CLIP semantic search over a sample of image_exif rows")]
|
||||
struct Args {
|
||||
/// Library id to sample from.
|
||||
#[arg(long)]
|
||||
library: i32,
|
||||
|
||||
/// Max files to encode. CPU inference is slow (~1-3 s per photo at
|
||||
/// ViT-L/14); start small and grow once GPU is sorted.
|
||||
#[arg(long, default_value_t = 50)]
|
||||
limit: usize,
|
||||
|
||||
/// Natural-language query. Empty triggers an error from Apollo.
|
||||
#[arg(long)]
|
||||
query: String,
|
||||
|
||||
/// How many top results to print.
|
||||
#[arg(long, default_value_t = 10)]
|
||||
top: usize,
|
||||
|
||||
/// Offset into the library's rel_path listing.
|
||||
#[arg(long, default_value_t = 0)]
|
||||
offset: i64,
|
||||
|
||||
/// How many DB rows to scan before giving up on hitting the limit.
|
||||
#[arg(long, default_value_t = 5000)]
|
||||
max_scan: i64,
|
||||
}
|
||||
|
||||
/// Same as `face_watch::read_image_bytes_for_detect` (which is pub(crate)).
|
||||
/// Inlined for the throwaway probe.
|
||||
fn read_image_bytes(path: &Path) -> std::io::Result<Vec<u8>> {
|
||||
if file_types::needs_ffmpeg_thumbnail(path)
|
||||
&& let Some(preview) = exif::extract_embedded_jpeg_preview(path)
|
||||
{
|
||||
return Ok(preview);
|
||||
}
|
||||
std::fs::read(path)
|
||||
}
|
||||
|
||||
/// Decode a base64'd LE float32 vector to a `Vec<f32>`.
|
||||
fn decode_f32_vec(b64: &str) -> anyhow::Result<Vec<f32>> {
|
||||
use base64::Engine;
|
||||
let bytes = base64::engine::general_purpose::STANDARD.decode(b64.as_bytes())?;
|
||||
if bytes.len() % 4 != 0 {
|
||||
anyhow::bail!("embedding byte length {} not divisible by 4", bytes.len());
|
||||
}
|
||||
let mut out = Vec::with_capacity(bytes.len() / 4);
|
||||
for chunk in bytes.chunks_exact(4) {
|
||||
out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Plain dot product. Apollo L2-normalizes both sides, so this is cosine sim.
|
||||
fn dot(a: &[f32], b: &[f32]) -> f32 {
|
||||
a.iter().zip(b.iter()).map(|(x, y)| x * y).sum()
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
env_logger::init();
|
||||
dotenv::dotenv().ok();
|
||||
|
||||
let args = Args::parse();
|
||||
if args.query.trim().is_empty() {
|
||||
anyhow::bail!("--query must not be empty");
|
||||
}
|
||||
|
||||
let client = ClipClient::from_env();
|
||||
if !client.is_enabled() {
|
||||
anyhow::bail!(
|
||||
"ClipClient disabled: set APOLLO_CLIP_API_BASE_URL or APOLLO_API_BASE_URL in .env"
|
||||
);
|
||||
}
|
||||
|
||||
match client.health().await {
|
||||
Ok(h) => info!(
|
||||
"clip engine: loaded={} device={} model={} dim={}",
|
||||
h.loaded, h.device, h.model_version, h.embedding_dim
|
||||
),
|
||||
Err(e) => warn!("health probe failed (continuing): {e}"),
|
||||
}
|
||||
|
||||
let mut seed_conn = connect();
|
||||
if let Some(base) = dotenv::var("BASE_PATH").ok().as_deref() {
|
||||
libraries::seed_or_patch_from_env(&mut seed_conn, base);
|
||||
}
|
||||
let libs = libraries::load_all(&mut seed_conn);
|
||||
drop(seed_conn);
|
||||
let lib: Library = libs
|
||||
.into_iter()
|
||||
.find(|l| l.id == args.library)
|
||||
.ok_or_else(|| anyhow::anyhow!("library id {} not found", args.library))?;
|
||||
info!(
|
||||
"probing library #{} ({}) at {}",
|
||||
lib.id, lib.name, lib.root_path
|
||||
);
|
||||
|
||||
let dao: Arc<Mutex<Box<dyn ExifDao>>> = Arc::new(Mutex::new(Box::new(SqliteExifDao::new())));
|
||||
let ctx = opentelemetry::Context::new();
|
||||
|
||||
// Encode the query up-front so the long image-encode loop doesn't
|
||||
// race a slow query encode. Fails fast on a misspelled query.
|
||||
let query_resp = client
|
||||
.encode_text(&args.query)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("encode_text: {e}"))?;
|
||||
let query_vec = decode_f32_vec(&query_resp.embedding)?;
|
||||
info!(
|
||||
"query encoded ({}d, {}ms): {:?}",
|
||||
query_resp.embedding_dim, query_resp.duration_ms, args.query
|
||||
);
|
||||
|
||||
// Page through (id, rel_path), filter to images on disk, encode up
|
||||
// to `limit`. Each encoded photo gets scored against the query and
|
||||
// kept in a top-K heap.
|
||||
const PAGE: i64 = 500;
|
||||
let mut offset = args.offset;
|
||||
let mut scanned: i64 = 0;
|
||||
let mut encoded = 0usize;
|
||||
let mut perm_fail = 0usize;
|
||||
let mut transient_fail = 0usize;
|
||||
let root = PathBuf::from(&lib.root_path);
|
||||
let started = Instant::now();
|
||||
// (similarity, rel_path) — we keep all scored results and sort at
|
||||
// the end. With limit≤few-hundred this is trivial.
|
||||
let mut scores: Vec<(f32, String)> = Vec::with_capacity(args.limit);
|
||||
|
||||
'outer: loop {
|
||||
if scanned >= args.max_scan {
|
||||
warn!(
|
||||
"scan cap ({}) reached before hitting limit ({}); bump --max-scan to scan deeper",
|
||||
args.max_scan, args.limit
|
||||
);
|
||||
break;
|
||||
}
|
||||
let rows = {
|
||||
let mut guard = dao.lock().expect("dao lock");
|
||||
guard
|
||||
.list_rel_paths_for_library_page(&ctx, lib.id, PAGE, offset)
|
||||
.map_err(|e| anyhow::anyhow!("list rel_paths: {:?}", e))?
|
||||
};
|
||||
if rows.is_empty() {
|
||||
info!("no more rows after offset {}", offset);
|
||||
break;
|
||||
}
|
||||
offset += rows.len() as i64;
|
||||
scanned += rows.len() as i64;
|
||||
|
||||
for (_id, rel_path) in rows {
|
||||
if encoded >= args.limit {
|
||||
break 'outer;
|
||||
}
|
||||
let abs = root.join(&rel_path);
|
||||
if !file_types::is_image_file(&abs) || !abs.exists() {
|
||||
continue;
|
||||
}
|
||||
let bytes = match read_image_bytes(&abs) {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
warn!("read {rel_path}: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let meta = EncodeImageMeta {
|
||||
content_hash: String::new(),
|
||||
library_id: lib.id,
|
||||
rel_path: rel_path.clone(),
|
||||
};
|
||||
let call_start = Instant::now();
|
||||
match client.encode_image(bytes, meta).await {
|
||||
Ok(resp) => {
|
||||
encoded += 1;
|
||||
let vec = match decode_f32_vec(&resp.embedding) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!("decode {rel_path}: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if vec.len() != query_vec.len() {
|
||||
warn!(
|
||||
"dim mismatch for {rel_path}: image={} query={}",
|
||||
vec.len(),
|
||||
query_vec.len()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let sim = dot(&vec, &query_vec);
|
||||
scores.push((sim, rel_path.clone()));
|
||||
if encoded % 10 == 0 {
|
||||
info!(
|
||||
"progress: {} encoded, {:.1}s elapsed",
|
||||
encoded,
|
||||
started.elapsed().as_secs_f32()
|
||||
);
|
||||
}
|
||||
let _ = call_start;
|
||||
}
|
||||
Err(ClipError::Permanent(e)) => {
|
||||
perm_fail += 1;
|
||||
warn!("permanent encode failure for {rel_path}: {e}");
|
||||
}
|
||||
Err(ClipError::Transient(e)) => {
|
||||
transient_fail += 1;
|
||||
warn!("transient encode failure for {rel_path}: {e}");
|
||||
}
|
||||
Err(ClipError::Disabled) => {
|
||||
anyhow::bail!("clip client became disabled mid-run; impossible");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
scores.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
|
||||
let elapsed = started.elapsed();
|
||||
println!();
|
||||
println!(
|
||||
"── top {} for query: {:?} ──",
|
||||
args.top.min(scores.len()),
|
||||
args.query
|
||||
);
|
||||
for (i, (sim, path)) in scores.iter().take(args.top).enumerate() {
|
||||
println!("[{:>2}] sim={:.3} {}", i + 1, sim, path);
|
||||
}
|
||||
println!();
|
||||
println!("── summary ─────────────────────────────────────");
|
||||
println!("query : {:?}", args.query);
|
||||
println!("scanned rows : {scanned}");
|
||||
println!("encoded photos : {encoded}");
|
||||
println!("permanent failures : {perm_fail}");
|
||||
println!("transient failures : {transient_fail}");
|
||||
println!("elapsed : {:.1}s", elapsed.as_secs_f32());
|
||||
if encoded > 0 {
|
||||
println!(
|
||||
"throughput : {:.2} photos/s ({:.0}ms/photo avg)",
|
||||
encoded as f32 / elapsed.as_secs_f32().max(0.001),
|
||||
elapsed.as_millis() as f32 / encoded as f32
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
321
src/clip_search.rs
Normal file
321
src/clip_search.rs
Normal file
@@ -0,0 +1,321 @@
|
||||
//! `/photos/search?q=<text>` — CLIP semantic photo search.
|
||||
//!
|
||||
//! The route lives outside `files.rs` to keep that 1500+ line module
|
||||
//! focused on EXIF / tag listing. The flow is:
|
||||
//!
|
||||
//! 1. Parse query params (`q`, `limit`, `threshold`, optional `library`).
|
||||
//! 2. Call Apollo's `/api/internal/clip/encode_text` to get the query
|
||||
//! vector (L2-normalized 768-d f32 for ViT-L/14).
|
||||
//! 3. Load every `(content_hash, clip_embedding)` for the scope from
|
||||
//! `image_exif` via `ExifDao::list_clip_index`. ~28–43 MB for a 14k
|
||||
//! library at ViT-L/14; loaded fresh per request — fast enough for
|
||||
//! v1, optimize via an AppState cache later if needed.
|
||||
//! 4. Dot product (= cosine since both sides are L2-normalized), filter
|
||||
//! above `threshold`, top-K by score.
|
||||
//! 5. Resolve each surviving hash back to a `(library_id, rel_path)` so
|
||||
//! the frontend can render the photo / hand off to the carousel.
|
||||
//!
|
||||
//! Response shape is intentionally minimal — paths + score — so the
|
||||
//! frontend can reuse existing PhotoGrid rendering by joining against
|
||||
//! `/api/photos/match` (or calling `/image/metadata` lazily). Don't
|
||||
//! bake camera/EXIF metadata into this route; it would force a fan-out
|
||||
//! per result and balloon the response.
|
||||
|
||||
use crate::AppState;
|
||||
use crate::ai::clip_client::ClipError;
|
||||
use crate::database::ExifDao;
|
||||
use actix_web::{HttpResponse, Result as ActixResult, web};
|
||||
use base64::Engine;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Mutex;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct SearchQuery {
|
||||
/// Natural-language query. Required; empty triggers 400.
|
||||
pub q: String,
|
||||
/// Max results to return in this page. Capped to 200 server-side.
|
||||
/// Defaults to 20. Pair with `offset` for pagination.
|
||||
#[serde(default = "default_limit")]
|
||||
pub limit: usize,
|
||||
/// Zero-based offset into the sorted-and-filtered result set. The
|
||||
/// scoring loop still runs over the full embedding matrix on every
|
||||
/// page (cheap at personal-library scale — sub-100ms — and avoids
|
||||
/// stateful pagination cursors). Defaults to 0.
|
||||
#[serde(default)]
|
||||
pub offset: usize,
|
||||
/// Cosine-similarity floor below which results are dropped.
|
||||
/// 0.20 is the rough "this is plausibly relevant" line for OpenAI
|
||||
/// CLIP; tunable per call when sweeping. Defaults to 0.20.
|
||||
#[serde(default = "default_threshold")]
|
||||
pub threshold: f32,
|
||||
/// Optional single-library scope. When omitted, every enabled
|
||||
/// library is searched. Multi-select isn't supported yet — the
|
||||
/// frontend wires through one or all.
|
||||
pub library: Option<i32>,
|
||||
/// Optional model-version filter. Defaults to the live engine's
|
||||
/// version (queried lazily). Forces a strict join so mid-flight
|
||||
/// model swaps can't mix geometries in a single response.
|
||||
#[serde(default)]
|
||||
pub model_version: Option<String>,
|
||||
}
|
||||
|
||||
fn default_limit() -> usize {
|
||||
20
|
||||
}
|
||||
|
||||
fn default_threshold() -> f32 {
|
||||
0.20
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct SearchHit {
|
||||
pub library_id: i32,
|
||||
pub rel_path: String,
|
||||
pub content_hash: String,
|
||||
/// Cosine similarity in [-1, 1]. In practice OpenAI CLIP returns
|
||||
/// 0.10–0.40 for the typical photo library.
|
||||
pub score: f32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct SearchResponse {
|
||||
pub query: String,
|
||||
pub model_version: String,
|
||||
pub threshold: f32,
|
||||
/// Total embeddings scored (= every photo in scope with a stored
|
||||
/// embedding). Same value across pages of the same query.
|
||||
pub considered: usize,
|
||||
/// Count of results above threshold, before pagination. Lets the
|
||||
/// client decide whether a "Load more" button is meaningful and
|
||||
/// stop fetching when ``offset + results.len() >= total_matching``.
|
||||
pub total_matching: usize,
|
||||
pub offset: usize,
|
||||
pub results: Vec<SearchHit>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct SearchError {
|
||||
error: String,
|
||||
}
|
||||
|
||||
/// Decode a stored `clip_embedding` BLOB back into a `Vec<f32>`. Returns
|
||||
/// `None` on malformed bytes — those rows get skipped rather than
|
||||
/// failing the whole query.
|
||||
fn decode_embedding(bytes: &[u8]) -> Option<Vec<f32>> {
|
||||
if bytes.is_empty() || bytes.len() % 4 != 0 {
|
||||
return None;
|
||||
}
|
||||
let mut out = Vec::with_capacity(bytes.len() / 4);
|
||||
for chunk in bytes.chunks_exact(4) {
|
||||
out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
|
||||
}
|
||||
Some(out)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn dot(a: &[f32], b: &[f32]) -> f32 {
|
||||
a.iter().zip(b.iter()).map(|(x, y)| x * y).sum()
|
||||
}
|
||||
|
||||
pub async fn search_photos(
|
||||
state: web::Data<AppState>,
|
||||
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
|
||||
query: web::Query<SearchQuery>,
|
||||
) -> ActixResult<HttpResponse> {
|
||||
let q_text = query.q.trim().to_string();
|
||||
if q_text.is_empty() {
|
||||
return Ok(HttpResponse::BadRequest().json(SearchError {
|
||||
error: "query parameter `q` is required".into(),
|
||||
}));
|
||||
}
|
||||
if !state.clip_client.is_enabled() {
|
||||
return Ok(HttpResponse::ServiceUnavailable().json(SearchError {
|
||||
error: "CLIP search is disabled (no Apollo CLIP endpoint configured)".into(),
|
||||
}));
|
||||
}
|
||||
|
||||
let limit = query.limit.clamp(1, 200);
|
||||
let offset = query.offset;
|
||||
let threshold = query.threshold.clamp(-1.0, 1.0);
|
||||
|
||||
// 1. Encode the query text. Fast — Apollo's text encoder is ~50ms
|
||||
// on CPU. Bail with a clear error message if Apollo's down so the
|
||||
// user sees "service unavailable" rather than empty results.
|
||||
let query_resp = match state.clip_client.encode_text(&q_text).await {
|
||||
Ok(r) => r,
|
||||
Err(ClipError::Permanent(e)) => {
|
||||
return Ok(HttpResponse::BadRequest().json(SearchError {
|
||||
error: format!("query rejected: {e}"),
|
||||
}));
|
||||
}
|
||||
Err(ClipError::Transient(e)) => {
|
||||
return Ok(HttpResponse::BadGateway().json(SearchError {
|
||||
error: format!("CLIP service unavailable: {e}"),
|
||||
}));
|
||||
}
|
||||
Err(ClipError::Disabled) => {
|
||||
return Ok(HttpResponse::ServiceUnavailable().json(SearchError {
|
||||
error: "CLIP service disabled".into(),
|
||||
}));
|
||||
}
|
||||
};
|
||||
// decode_embedding works on raw bytes; the wire format is b64.
|
||||
let query_bytes = base64::engine::general_purpose::STANDARD
|
||||
.decode(query_resp.embedding.as_bytes())
|
||||
.unwrap_or_default();
|
||||
let query_vec = match decode_embedding(&query_bytes) {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
return Ok(HttpResponse::BadGateway().json(SearchError {
|
||||
error: "CLIP service returned a malformed query embedding".into(),
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
// 2. Decide which library scope to search.
|
||||
let library_ids: Vec<i32> = match query.library {
|
||||
Some(id) => vec![id],
|
||||
None => Vec::new(), // empty = all libraries
|
||||
};
|
||||
|
||||
// 3. Pull the (hash, embedding) matrix. Lock contention here is
|
||||
// bounded — one big SELECT under a mutex Arc<Mutex<dyn ExifDao>>
|
||||
// and then we release before scoring. If this becomes a hotspot
|
||||
// we'll cache the decoded matrix in AppState with TTL.
|
||||
let ctx = opentelemetry::Context::current();
|
||||
let rows: Vec<(String, Vec<u8>)> = {
|
||||
let mut dao = exif_dao.lock().expect("exif dao");
|
||||
match dao.list_clip_index(
|
||||
&ctx,
|
||||
&library_ids,
|
||||
query
|
||||
.model_version
|
||||
.as_deref()
|
||||
.or(Some(&query_resp.model_version)),
|
||||
) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
log::warn!("clip_search: list_clip_index failed: {:?}", e);
|
||||
return Ok(HttpResponse::InternalServerError().json(SearchError {
|
||||
error: "failed to load search index".into(),
|
||||
}));
|
||||
}
|
||||
}
|
||||
};
|
||||
let considered = rows.len();
|
||||
if considered == 0 {
|
||||
return Ok(HttpResponse::Ok().json(SearchResponse {
|
||||
query: q_text,
|
||||
model_version: query_resp.model_version,
|
||||
threshold,
|
||||
considered,
|
||||
total_matching: 0,
|
||||
offset,
|
||||
results: Vec::new(),
|
||||
}));
|
||||
}
|
||||
|
||||
// 4. Score. Cap the loop's transient allocation; we keep all scores
|
||||
// and sort at the end. With ~14k entries the sort is microseconds.
|
||||
let mut scored: Vec<(f32, String)> = Vec::with_capacity(considered);
|
||||
for (hash, blob) in rows {
|
||||
let Some(emb) = decode_embedding(&blob) else {
|
||||
continue;
|
||||
};
|
||||
if emb.len() != query_vec.len() {
|
||||
continue;
|
||||
}
|
||||
let sim = dot(&emb, &query_vec);
|
||||
if sim < threshold {
|
||||
continue;
|
||||
}
|
||||
scored.push((sim, hash));
|
||||
}
|
||||
scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
|
||||
let total_matching = scored.len();
|
||||
// Pagination — slice the sorted list at `[offset, offset+limit)`.
|
||||
// Offsets past the end produce empty pages rather than an error so
|
||||
// the client can stop fetching naturally on "load more" past the end.
|
||||
let scored: Vec<(f32, String)> = if offset >= total_matching {
|
||||
Vec::new()
|
||||
} else {
|
||||
let end = (offset + limit).min(total_matching);
|
||||
scored[offset..end].to_vec()
|
||||
};
|
||||
|
||||
if scored.is_empty() {
|
||||
return Ok(HttpResponse::Ok().json(SearchResponse {
|
||||
query: q_text,
|
||||
model_version: query_resp.model_version,
|
||||
threshold,
|
||||
considered,
|
||||
total_matching,
|
||||
offset,
|
||||
results: Vec::new(),
|
||||
}));
|
||||
}
|
||||
|
||||
// 5. Resolve each surviving hash back to a `(library_id, rel_path)`.
|
||||
// `get_rel_paths_by_hash` returns every rel_path; we pick the first
|
||||
// one for the result. Apollo / the UI can fetch alternatives via
|
||||
// /image/metadata when needed.
|
||||
let hashes: Vec<String> = scored.iter().map(|(_, h)| h.clone()).collect();
|
||||
let path_map = {
|
||||
let mut dao = exif_dao.lock().expect("exif dao");
|
||||
match dao.get_rel_paths_for_hashes(&ctx, &hashes) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
log::warn!("clip_search: get_rel_paths_for_hashes failed: {:?}", e);
|
||||
return Ok(HttpResponse::InternalServerError().json(SearchError {
|
||||
error: "failed to resolve photo paths".into(),
|
||||
}));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// We need (library_id, rel_path) — get_rel_paths_for_hashes only
|
||||
// returns rel_paths. Cross-reference via find_by_content_hash to
|
||||
// pick the library too. Single call per surviving hash; cheap at
|
||||
// top-20.
|
||||
let mut results = Vec::with_capacity(scored.len());
|
||||
{
|
||||
let mut dao = exif_dao.lock().expect("exif dao");
|
||||
for (score, hash) in scored {
|
||||
let row = match dao.find_by_content_hash(&ctx, &hash) {
|
||||
Ok(Some(r)) => r,
|
||||
Ok(None) => continue,
|
||||
Err(e) => {
|
||||
log::warn!(
|
||||
"clip_search: find_by_content_hash failed for {}: {:?}",
|
||||
hash,
|
||||
e
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// Prefer get_rel_paths_for_hashes's first entry if it
|
||||
// exists (it shares semantics with `image_exif`'s natural
|
||||
// order), falling back to the ImageExif row.
|
||||
let rel_path = path_map
|
||||
.get(&hash)
|
||||
.and_then(|paths| paths.first().cloned())
|
||||
.unwrap_or(row.file_path);
|
||||
results.push(SearchHit {
|
||||
library_id: row.library_id,
|
||||
rel_path,
|
||||
content_hash: hash,
|
||||
score,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(HttpResponse::Ok().json(SearchResponse {
|
||||
query: q_text,
|
||||
model_version: query_resp.model_version,
|
||||
threshold,
|
||||
considered,
|
||||
total_matching,
|
||||
offset,
|
||||
results,
|
||||
}))
|
||||
}
|
||||
246
src/clip_watch.rs
Normal file
246
src/clip_watch.rs
Normal file
@@ -0,0 +1,246 @@
|
||||
//! CLIP-encoding pass for the file watcher.
|
||||
//!
|
||||
//! `process_clip_backlog` in `backfill.rs` calls [`run_clip_encoding_pass`]
|
||||
//! with the page of candidates returned by
|
||||
//! `ExifDao::list_clip_unencoded_candidates`. We walk those, fan out K
|
||||
//! parallel encode calls to Apollo, and persist the resulting embeddings
|
||||
//! into `image_exif.clip_embedding` / `clip_model_version`.
|
||||
//!
|
||||
//! Unlike the face pipeline, CLIP has no marker rows — a permanent
|
||||
//! failure (un-decodable bytes) leaves the row's `clip_embedding` NULL
|
||||
//! and the drain will retry on the next tick. For personal-library
|
||||
//! scale this is fine; the per-tick cap bounds the wasted work, and
|
||||
//! `file_types::is_image_file` filters out videos / non-media client-
|
||||
//! side so most permanent failures are decoded-but-corrupt files (rare).
|
||||
//!
|
||||
//! The watcher thread isn't in any pre-existing async context, so we
|
||||
//! build a short-lived tokio runtime per pass and `block_on` the join
|
||||
//! of K encode futures. Concurrency knob: `CLIP_ENCODE_CONCURRENCY`
|
||||
//! (default 4 — lower than faces because Apollo's CLIP path doesn't
|
||||
//! release the GIL between preprocess and forward as cleanly).
|
||||
|
||||
use crate::ai::clip_client::{ClipClient, ClipError, EncodeImageMeta};
|
||||
use crate::database::ExifDao;
|
||||
use crate::exif;
|
||||
use crate::file_types;
|
||||
use crate::libraries::Library;
|
||||
use crate::memories::PathExcluder;
|
||||
use log::{debug, info, warn};
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
/// One file the watcher would like to CLIP-encode. Built from the DAO
|
||||
/// `list_clip_unencoded_candidates` result — needs the `content_hash`
|
||||
/// for traceability in Apollo's log lines, even though the embedding
|
||||
/// itself is keyed on `(library_id, rel_path)` for the back-write.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ClipCandidate {
|
||||
pub rel_path: String,
|
||||
pub content_hash: String,
|
||||
}
|
||||
|
||||
/// Synchronous entry point. Returns once every candidate has been
|
||||
/// processed (or definitively skipped). No-op when the client is
|
||||
/// disabled so the caller can call unconditionally.
|
||||
pub fn run_clip_encoding_pass(
|
||||
library: &Library,
|
||||
excluded_dirs: &[String],
|
||||
clip_client: &ClipClient,
|
||||
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
|
||||
candidates: Vec<ClipCandidate>,
|
||||
) {
|
||||
if !clip_client.is_enabled() {
|
||||
return;
|
||||
}
|
||||
if candidates.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let base = Path::new(&library.root_path);
|
||||
let filtered = filter_excluded(base, excluded_dirs, candidates, Some(&library.name));
|
||||
if filtered.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let concurrency: usize = std::env::var("CLIP_ENCODE_CONCURRENCY")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.filter(|n: &usize| *n > 0)
|
||||
.unwrap_or(4);
|
||||
|
||||
info!(
|
||||
"clip_watch: encoding {} candidate(s) for library '{}' (concurrency {})",
|
||||
filtered.len(),
|
||||
library.name,
|
||||
concurrency
|
||||
);
|
||||
|
||||
let rt = match tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(2)
|
||||
.enable_all()
|
||||
.build()
|
||||
{
|
||||
Ok(rt) => rt,
|
||||
Err(e) => {
|
||||
warn!("clip_watch: failed to build tokio runtime: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let library_id = library.id;
|
||||
let library_root = library.root_path.clone();
|
||||
rt.block_on(async move {
|
||||
let sem = Arc::new(Semaphore::new(concurrency));
|
||||
let mut handles = Vec::with_capacity(filtered.len());
|
||||
for cand in filtered {
|
||||
let permit_sem = sem.clone();
|
||||
let clip_client = clip_client.clone();
|
||||
let exif_dao = exif_dao.clone();
|
||||
let library_root = library_root.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
let _permit = permit_sem.acquire().await.expect("clip semaphore");
|
||||
process_one(library_id, &library_root, cand, &clip_client, exif_dao).await;
|
||||
}));
|
||||
}
|
||||
for h in handles {
|
||||
let _ = h.await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn process_one(
|
||||
library_id: i32,
|
||||
library_root: &str,
|
||||
cand: ClipCandidate,
|
||||
clip_client: &ClipClient,
|
||||
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
|
||||
) {
|
||||
let abs = Path::new(library_root).join(&cand.rel_path);
|
||||
let bytes = match read_image_bytes_for_encode(&abs) {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
// Same rationale as face_watch: don't mark — the file may
|
||||
// have been moved/renamed mid-scan; let the next pass retry.
|
||||
warn!(
|
||||
"clip_watch: read failed for {} (lib {}): {}",
|
||||
cand.rel_path, library_id, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let meta = EncodeImageMeta {
|
||||
content_hash: cand.content_hash.clone(),
|
||||
library_id,
|
||||
rel_path: cand.rel_path.clone(),
|
||||
};
|
||||
let ctx = opentelemetry::Context::current();
|
||||
|
||||
match clip_client.encode_image(bytes, meta).await {
|
||||
Ok(resp) => {
|
||||
let emb_bytes = match resp.decode_embedding() {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
warn!("clip_watch: bad embedding for {}: {:?}", cand.rel_path, e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let mut dao = exif_dao.lock().expect("exif dao");
|
||||
if let Err(e) = dao.backfill_clip_embedding(
|
||||
&ctx,
|
||||
library_id,
|
||||
&cand.rel_path,
|
||||
&emb_bytes,
|
||||
&resp.model_version,
|
||||
) {
|
||||
warn!(
|
||||
"clip_watch: backfill_clip_embedding failed for {}: {:?}",
|
||||
cand.rel_path, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
debug!(
|
||||
"clip_watch: {} → dim={} ({}ms, {})",
|
||||
cand.rel_path, resp.embedding_dim, resp.duration_ms, resp.model_version
|
||||
);
|
||||
}
|
||||
Err(ClipError::Permanent(e)) => {
|
||||
// No marker — the row sits with NULL embedding and the drain
|
||||
// retries next pass. For personal-library scale the cost of
|
||||
// re-attempting permanently-broken files is bounded by the
|
||||
// per-tick cap. If this becomes a recurring noise source,
|
||||
// add a `clip_status` column with `failed` semantics like
|
||||
// face_detections has.
|
||||
warn!(
|
||||
"clip_watch: permanent failure on {} (will retry next pass): {}",
|
||||
cand.rel_path, e
|
||||
);
|
||||
}
|
||||
Err(ClipError::Transient(e)) => {
|
||||
debug!(
|
||||
"clip_watch: transient on {}: {} (will retry next pass)",
|
||||
cand.rel_path, e
|
||||
);
|
||||
}
|
||||
Err(ClipError::Disabled) => {
|
||||
// Defensive — the entry-point already checked is_enabled().
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop candidates whose paths land in an excluded dir or whose
|
||||
/// extension isn't an image. Mirrors `face_watch::filter_excluded` so
|
||||
/// the two backlogs stay shape-consistent. Library name is passed
|
||||
/// purely for the log line that surfaces an exclusion hit.
|
||||
pub fn filter_excluded(
|
||||
base: &Path,
|
||||
excluded_dirs: &[String],
|
||||
candidates: Vec<ClipCandidate>,
|
||||
library_name: Option<&str>,
|
||||
) -> Vec<ClipCandidate> {
|
||||
let excluder = if excluded_dirs.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(PathExcluder::new(base, excluded_dirs))
|
||||
};
|
||||
candidates
|
||||
.into_iter()
|
||||
.filter(|c| {
|
||||
let abs = base.join(&c.rel_path);
|
||||
if !file_types::is_image_file(&abs) {
|
||||
debug!(
|
||||
"clip_watch: skipping non-image '{}' (lib {})",
|
||||
c.rel_path,
|
||||
library_name.unwrap_or("<unknown>")
|
||||
);
|
||||
return false;
|
||||
}
|
||||
if let Some(ex) = excluder.as_ref()
|
||||
&& ex.is_excluded(&abs)
|
||||
{
|
||||
debug!(
|
||||
"clip_watch: skipping excluded '{}' (lib {})",
|
||||
c.rel_path,
|
||||
library_name.unwrap_or("<unknown>")
|
||||
);
|
||||
return false;
|
||||
}
|
||||
true
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Read image bytes for CLIP encoding. Same logic as
|
||||
/// `face_watch::read_image_bytes_for_detect` — RAW / HEIC files don't
|
||||
/// decode in Apollo's PIL pipeline, so we pull the embedded JPEG
|
||||
/// preview the thumbnail pipeline already extracts. Plain JPEG / PNG /
|
||||
/// WebP go through a direct read.
|
||||
pub fn read_image_bytes_for_encode(path: &Path) -> std::io::Result<Vec<u8>> {
|
||||
if file_types::needs_ffmpeg_thumbnail(path)
|
||||
&& let Some(preview) = exif::extract_embedded_jpeg_preview(path)
|
||||
{
|
||||
return Ok(preview);
|
||||
}
|
||||
std::fs::read(path)
|
||||
}
|
||||
@@ -470,6 +470,61 @@ pub trait ExifDao: Sync + Send {
|
||||
source: &str,
|
||||
) -> Result<(), DbError>;
|
||||
|
||||
/// Find image_exif rows needing a CLIP embedding for semantic search:
|
||||
/// `clip_embedding IS NULL AND content_hash IS NOT NULL`, ordered by id
|
||||
/// ASC, limited. Hash-less rows wait for `backfill_unhashed_backlog` to
|
||||
/// hash them first — embedding a row we can't key on bytes is wasted
|
||||
/// work that the next library/move detection would invalidate. Backed
|
||||
/// by the partial index `idx_image_exif_clip_backfill`.
|
||||
///
|
||||
/// Returns `(rel_path, content_hash)` for the given library only. Video
|
||||
/// rows are returned too (the underlying anti-join is shape-uniform);
|
||||
/// the caller filters them out via `file_types::is_image_file` before
|
||||
/// sending to Apollo, mirroring `face_watch::filter_excluded`.
|
||||
///
|
||||
/// **Model upgrades** (re-encoding everything on a new
|
||||
/// `APOLLO_CLIP_MODEL`) are handled out-of-band — run
|
||||
/// `UPDATE image_exif SET clip_embedding = NULL
|
||||
/// WHERE clip_model_version != '<new model>';`
|
||||
/// and the drain picks up the freshly-nulled rows on the next tick.
|
||||
/// Mixing in-flight model versions in a single query is intentionally
|
||||
/// not the drain's problem.
|
||||
fn list_clip_unencoded_candidates(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
library_id: i32,
|
||||
limit: i64,
|
||||
) -> Result<Vec<(String, String)>, DbError>;
|
||||
|
||||
/// Persist a CLIP embedding for an existing row. Touches
|
||||
/// `clip_embedding` and `clip_model_version` only — leaves every
|
||||
/// other column alone so the drain can't accidentally clobber EXIF /
|
||||
/// hash / date-resolver state that other paths have written.
|
||||
fn backfill_clip_embedding(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
library_id: i32,
|
||||
rel_path: &str,
|
||||
embedding: &[u8],
|
||||
model_version: &str,
|
||||
) -> Result<(), DbError>;
|
||||
|
||||
/// Load every `(content_hash, clip_embedding)` pair from the live
|
||||
/// image_exif rows for the given libraries, optionally filtered to a
|
||||
/// single `model_version` (cosine sim across mixed geometries is
|
||||
/// meaningless). Used by `/photos/search` to rerank against the query
|
||||
/// embedding in-memory.
|
||||
///
|
||||
/// Returns one pair per content_hash. If a hash appears under more
|
||||
/// than one library, the first row wins (Diesel's natural ORDER BY id
|
||||
/// ASC). Hash-less and embedding-less rows are filtered server-side.
|
||||
fn list_clip_index(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
library_ids: &[i32],
|
||||
model_version: Option<&str>,
|
||||
) -> Result<Vec<(String, Vec<u8>)>, DbError>;
|
||||
|
||||
/// Operator-driven date_taken override (POST /image/exif/date). Snapshots
|
||||
/// the prior `(date_taken, date_taken_source)` into the `original_*`
|
||||
/// pair on first override, then writes the new value with
|
||||
@@ -1387,6 +1442,146 @@ impl ExifDao for SqliteExifDao {
|
||||
})
|
||||
}
|
||||
|
||||
fn list_clip_unencoded_candidates(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
library_id_val: i32,
|
||||
limit: i64,
|
||||
) -> Result<Vec<(String, String)>, DbError> {
|
||||
trace_db_call(
|
||||
context,
|
||||
"query",
|
||||
"list_clip_unencoded_candidates",
|
||||
|_span| {
|
||||
use schema::image_exif::dsl::*;
|
||||
|
||||
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
|
||||
|
||||
// Partial index `idx_image_exif_clip_backfill` covers the
|
||||
// (clip_embedding IS NULL AND content_hash IS NOT NULL)
|
||||
// filter; the planner hits it directly. ORDER BY id ASC
|
||||
// keeps drain progress monotone across ticks.
|
||||
image_exif
|
||||
.filter(library_id.eq(library_id_val))
|
||||
.filter(clip_embedding.is_null())
|
||||
.filter(content_hash.is_not_null())
|
||||
.select((rel_path, content_hash.assume_not_null()))
|
||||
.order(id.asc())
|
||||
.limit(limit)
|
||||
.load::<(String, String)>(connection.deref_mut())
|
||||
.map_err(|_| anyhow::anyhow!("Query error"))
|
||||
},
|
||||
)
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
|
||||
fn backfill_clip_embedding(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
library_id_val: i32,
|
||||
rel_path_val: &str,
|
||||
embedding: &[u8],
|
||||
model_version: &str,
|
||||
) -> Result<(), DbError> {
|
||||
trace_db_call(context, "update", "backfill_clip_embedding", |_span| {
|
||||
use schema::image_exif::dsl::*;
|
||||
|
||||
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
|
||||
|
||||
let result = diesel::update(
|
||||
image_exif
|
||||
.filter(library_id.eq(library_id_val))
|
||||
.filter(rel_path.eq(rel_path_val)),
|
||||
)
|
||||
.set((
|
||||
clip_embedding.eq(embedding),
|
||||
clip_model_version.eq(model_version),
|
||||
))
|
||||
.execute(connection.deref_mut());
|
||||
|
||||
match result {
|
||||
Ok(rows) => {
|
||||
if rows == 0 {
|
||||
// Same race as backfill_date_taken — row vanished
|
||||
// between the candidate query and this write. Not
|
||||
// a hard error; the drain re-scans next tick.
|
||||
log::debug!(
|
||||
"backfill_clip_embedding: 0 rows matched lib={} {} \
|
||||
(row likely retired by missing-file scan)",
|
||||
library_id_val,
|
||||
rel_path_val
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => Err(anyhow::anyhow!(
|
||||
"diesel update failed (lib={}, rel_path={}, model={}): {}",
|
||||
library_id_val,
|
||||
rel_path_val,
|
||||
model_version,
|
||||
e
|
||||
)),
|
||||
}
|
||||
})
|
||||
.map_err(|e| {
|
||||
log::warn!("backfill_clip_embedding: {}", e);
|
||||
DbError::new(DbErrorKind::UpdateError)
|
||||
})
|
||||
}
|
||||
|
||||
fn list_clip_index(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
library_ids_val: &[i32],
|
||||
model_version_filter: Option<&str>,
|
||||
) -> Result<Vec<(String, Vec<u8>)>, DbError> {
|
||||
trace_db_call(context, "query", "list_clip_index", |_span| {
|
||||
use schema::image_exif::dsl::*;
|
||||
|
||||
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
|
||||
|
||||
// Build the base filter. content_hash + clip_embedding both
|
||||
// need to be present for the row to be searchable.
|
||||
let mut query = image_exif
|
||||
.filter(content_hash.is_not_null())
|
||||
.filter(clip_embedding.is_not_null())
|
||||
.into_boxed();
|
||||
if !library_ids_val.is_empty() {
|
||||
query = query.filter(library_id.eq_any(library_ids_val));
|
||||
}
|
||||
if let Some(mv) = model_version_filter {
|
||||
query = query.filter(clip_model_version.eq(mv));
|
||||
}
|
||||
|
||||
// Order by id ASC so cross-library duplicates pick the
|
||||
// earliest-ingested row (stable across calls; the in-memory
|
||||
// matrix gets a deterministic row order). Group-by on
|
||||
// content_hash via post-filter — Diesel doesn't expose a
|
||||
// clean DISTINCT ON in this query shape.
|
||||
let rows: Vec<(String, Vec<u8>)> = query
|
||||
.select((
|
||||
content_hash.assume_not_null(),
|
||||
clip_embedding.assume_not_null(),
|
||||
))
|
||||
.order(id.asc())
|
||||
.load::<(String, Vec<u8>)>(connection.deref_mut())
|
||||
.map_err(|_| anyhow::anyhow!("Query error"))?;
|
||||
|
||||
// Dedupe by hash, keeping the first occurrence. Cheap; sized
|
||||
// to ~14k entries on this library.
|
||||
let mut seen: std::collections::HashSet<String> =
|
||||
std::collections::HashSet::with_capacity(rows.len());
|
||||
let mut out = Vec::with_capacity(rows.len());
|
||||
for (h, e) in rows {
|
||||
if seen.insert(h.clone()) {
|
||||
out.push((h, e));
|
||||
}
|
||||
}
|
||||
Ok(out)
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
|
||||
fn set_manual_date_taken(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
|
||||
@@ -114,6 +114,15 @@ pub struct ImageExif {
|
||||
/// Snapshot of the prior `date_taken_source` taken on first manual
|
||||
/// override. NULL when no override is active.
|
||||
pub original_date_taken_source: Option<String>,
|
||||
/// L2-normalized CLIP image embedding (raw little-endian float32 bytes;
|
||||
/// length depends on the model — 768×4 for ViT-L/14, 512×4 for ViT-B/32).
|
||||
/// NULL until Apollo's CLIP service has encoded this photo via the
|
||||
/// backfill drain. Used by `/photos/search` for semantic queries.
|
||||
pub clip_embedding: Option<Vec<u8>>,
|
||||
/// Which CLIP model produced `clip_embedding` (e.g. `"ViT-L/14"`). A
|
||||
/// swap of `APOLLO_CLIP_MODEL` re-eligibilizes rows whose stored
|
||||
/// version differs so the drain rebuilds them.
|
||||
pub clip_model_version: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Insertable)]
|
||||
|
||||
@@ -138,6 +138,8 @@ diesel::table! {
|
||||
date_taken_source -> Nullable<Text>,
|
||||
original_date_taken -> Nullable<BigInt>,
|
||||
original_date_taken_source -> Nullable<Text>,
|
||||
clip_embedding -> Nullable<Binary>,
|
||||
clip_model_version -> Nullable<Text>,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
35
src/files.rs
35
src/files.rs
@@ -1511,6 +1511,8 @@ mod tests {
|
||||
date_taken_source,
|
||||
original_date_taken: None,
|
||||
original_date_taken_source: None,
|
||||
clip_embedding: None,
|
||||
clip_model_version: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1550,6 +1552,8 @@ mod tests {
|
||||
date_taken_source: data.date_taken_source.clone(),
|
||||
original_date_taken: None,
|
||||
original_date_taken_source: None,
|
||||
clip_embedding: None,
|
||||
clip_model_version: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1596,6 +1600,8 @@ mod tests {
|
||||
date_taken_source: data.date_taken_source.clone(),
|
||||
original_date_taken: None,
|
||||
original_date_taken_source: None,
|
||||
clip_embedding: None,
|
||||
clip_model_version: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1932,6 +1938,35 @@ mod tests {
|
||||
) -> Result<(), DbError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn list_clip_unencoded_candidates(
|
||||
&mut self,
|
||||
_context: &opentelemetry::Context,
|
||||
_library_id: i32,
|
||||
_limit: i64,
|
||||
) -> Result<Vec<(String, String)>, DbError> {
|
||||
Ok(Vec::new())
|
||||
}
|
||||
|
||||
fn backfill_clip_embedding(
|
||||
&mut self,
|
||||
_context: &opentelemetry::Context,
|
||||
_library_id: i32,
|
||||
_rel_path: &str,
|
||||
_embedding: &[u8],
|
||||
_model_version: &str,
|
||||
) -> Result<(), DbError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn list_clip_index(
|
||||
&mut self,
|
||||
_context: &opentelemetry::Context,
|
||||
_library_ids: &[i32],
|
||||
_model_version: Option<&str>,
|
||||
) -> Result<Vec<(String, Vec<u8>)>, DbError> {
|
||||
Ok(Vec::new())
|
||||
}
|
||||
}
|
||||
|
||||
mod api {
|
||||
|
||||
@@ -7,6 +7,8 @@ pub mod ai;
|
||||
pub mod auth;
|
||||
pub mod bin_progress;
|
||||
pub mod cleanup;
|
||||
pub mod clip_search;
|
||||
pub mod clip_watch;
|
||||
pub mod content_hash;
|
||||
pub mod data;
|
||||
pub mod database;
|
||||
|
||||
@@ -31,6 +31,8 @@ use log::{error, info};
|
||||
mod ai;
|
||||
mod auth;
|
||||
mod backfill;
|
||||
mod clip_search;
|
||||
mod clip_watch;
|
||||
mod content_hash;
|
||||
mod data;
|
||||
mod database;
|
||||
@@ -164,6 +166,7 @@ fn main() -> std::io::Result<()> {
|
||||
playlist_mgr_for_watcher,
|
||||
preview_gen_for_watcher,
|
||||
app_state.face_client.clone(),
|
||||
app_state.clip_client.clone(),
|
||||
app_state.excluded_dirs.clone(),
|
||||
app_state.library_health.clone(),
|
||||
);
|
||||
@@ -280,6 +283,12 @@ fn main() -> std::io::Result<()> {
|
||||
.service(
|
||||
web::resource("/photos/exif").route(web::get().to(files::list_exif_summary)),
|
||||
)
|
||||
.service(
|
||||
// Semantic search via CLIP embeddings. See
|
||||
// src/clip_search.rs for the request/response shape.
|
||||
web::resource("/photos/search")
|
||||
.route(web::get().to(clip_search::search_photos)),
|
||||
)
|
||||
.service(web::resource("/file/move").post(move_file::<RealFileSystem>))
|
||||
.service(handlers::image::get_image)
|
||||
.service(handlers::image::upload_image)
|
||||
|
||||
12
src/state.rs
12
src/state.rs
@@ -1,4 +1,5 @@
|
||||
use crate::ai::apollo_client::ApolloClient;
|
||||
use crate::ai::clip_client::ClipClient;
|
||||
use crate::ai::face_client::FaceClient;
|
||||
use crate::ai::insight_chat::{ChatLockMap, InsightChatService};
|
||||
use crate::ai::openrouter::OpenRouterClient;
|
||||
@@ -70,6 +71,10 @@ pub struct AppState {
|
||||
/// nor `APOLLO_API_BASE_URL` is set; the file-watch hook (Phase 3) and
|
||||
/// manual-face-create handler short-circuit in that case.
|
||||
pub face_client: FaceClient,
|
||||
/// CLIP inference client (calls Apollo's `/api/internal/clip/*`).
|
||||
/// Same disabled semantics as `face_client`: unset env → no-op
|
||||
/// backlog drain, /photos/search returns an empty result.
|
||||
pub clip_client: ClipClient,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
@@ -105,6 +110,7 @@ impl AppState {
|
||||
insight_chat: Arc<InsightChatService>,
|
||||
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
|
||||
face_client: FaceClient,
|
||||
clip_client: ClipClient,
|
||||
) -> Self {
|
||||
assert!(
|
||||
!libraries_vec.is_empty(),
|
||||
@@ -143,6 +149,7 @@ impl AppState {
|
||||
insight_generator,
|
||||
insight_chat,
|
||||
face_client,
|
||||
clip_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -198,6 +205,9 @@ impl Default for AppState {
|
||||
.or_else(|| env::var("APOLLO_API_BASE_URL").ok());
|
||||
let face_client = FaceClient::new(face_client_url);
|
||||
|
||||
// CLIP inference client. Same env var fallback as face_client.
|
||||
let clip_client = ClipClient::from_env();
|
||||
|
||||
// Initialize DAOs
|
||||
let insight_dao: Arc<Mutex<Box<dyn InsightDao>>> =
|
||||
Arc::new(Mutex::new(Box::new(SqliteInsightDao::new())));
|
||||
@@ -289,6 +299,7 @@ impl Default for AppState {
|
||||
insight_chat,
|
||||
preview_dao,
|
||||
face_client,
|
||||
clip_client,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -439,6 +450,7 @@ impl AppState {
|
||||
insight_chat,
|
||||
preview_dao,
|
||||
FaceClient::new(None), // disabled in test
|
||||
ClipClient::new(None), // disabled in test
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,6 +268,7 @@ pub fn watch_files(
|
||||
playlist_manager: Addr<VideoPlaylistManager>,
|
||||
preview_generator: Addr<video::actors::PreviewClipGenerator>,
|
||||
face_client: crate::ai::face_client::FaceClient,
|
||||
clip_client: crate::ai::clip_client::ClipClient,
|
||||
excluded_dirs: Vec<String>,
|
||||
library_health: libraries::LibraryHealthMap,
|
||||
) {
|
||||
@@ -300,6 +301,14 @@ pub fn watch_files(
|
||||
or APOLLO_API_BASE_URL to enable)"
|
||||
);
|
||||
}
|
||||
if clip_client.is_enabled() {
|
||||
info!(" CLIP semantic search: ENABLED");
|
||||
} else {
|
||||
info!(
|
||||
" CLIP semantic search: DISABLED (set APOLLO_CLIP_API_BASE_URL \
|
||||
or APOLLO_API_BASE_URL to enable)"
|
||||
);
|
||||
}
|
||||
{
|
||||
let libs = libs_lock.read().unwrap_or_else(|e| e.into_inner());
|
||||
for lib in libs.iter() {
|
||||
@@ -463,6 +472,21 @@ pub fn watch_files(
|
||||
);
|
||||
}
|
||||
|
||||
// CLIP embedding backlog. Independent of face detection —
|
||||
// drain runs whenever CLIP is enabled, even on deploys
|
||||
// that don't have the face engine wired up. Mirrors the
|
||||
// face drain shape (capped per tick, no-op when disabled).
|
||||
if clip_client.is_enabled() {
|
||||
let context = opentelemetry::Context::new();
|
||||
backfill::process_clip_backlog(
|
||||
&context,
|
||||
lib,
|
||||
&clip_client,
|
||||
&exif_dao,
|
||||
&effective_excludes,
|
||||
);
|
||||
}
|
||||
|
||||
// Date-taken backfill: drain rows whose canonical date is
|
||||
// either unresolved or only fs_time-sourced. Independent
|
||||
// of face detection — runs even on deploys that don't
|
||||
|
||||
Reference in New Issue
Block a user