Files
ImageApi/src/ai/face_client.rs
Cameron Cordes 67abd8d8ff style: cargo fmt
Pre-existing whitespace drift in test bodies, normalized by rustfmt.
No behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 21:16:34 +00:00

401 lines
15 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! Thin async HTTP client for Apollo's `/api/internal/faces/*` endpoints.
//!
//! Apollo (the personal location-history viewer at the sibling repo) hosts the
//! insightface inference service. This client is the ImageApi side of the
//! contract — it shoves image bytes through `/detect` and returns boxes +
//! 512-d ArcFace embeddings, plus a single-embedding `/embed` for the manual
//! face-create flow.
//!
//! Mirrors `apollo_client.rs` shape: optional base URL (None = disabled, the
//! file watcher and manual-create handlers no-op), reqwest client with a
//! generous timeout because CPU inference on a backlog can take many seconds
//! per photo.
//!
//! Configured via `APOLLO_FACE_API_BASE_URL`, falling back to
//! `APOLLO_API_BASE_URL` when the dedicated var is unset (single-Apollo
//! deploys are the common case). Both unset → `is_enabled()` returns false.
//!
//! Wire format: multipart/form-data with `file=<bytes>` and `meta=<json>`.
//! `meta` carries `{content_hash, library_id, rel_path, orientation?,
//! model_version?}` — useful for Apollo-side logging and idempotency, ignored
//! by Apollo today but part of the stable wire contract so future versions
//! can act on it without a client change.
//!
//! Error mapping (reflected in [`FaceDetectError`]):
//! - 422 `decode_failed` → permanent: ImageApi marks `status='failed'` and
//! doesn't retry until manual rerun.
//! - 200 with `faces:[]` → `status='no_faces'` marker row.
//! - 503 `cuda_oom` / `engine_unavailable` → defer-and-retry: no marker
//! written.
//! - Any other 5xx / network error → defer.
use anyhow::{Context, Result};
use base64::Engine;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Clone, Serialize)]
pub struct DetectMeta {
pub content_hash: String,
pub library_id: i32,
pub rel_path: String,
/// EXIF orientation int (1..8). Apollo applies `exif_transpose` on the
/// bytes before inference, so this is informational only — supply when
/// the bytes were extracted from a RAW preview that lost the tag.
#[serde(skip_serializing_if = "Option::is_none")]
pub orientation: Option<i32>,
/// Echoed back in the response. ImageApi stores it in
/// `face_detections.model_version`.
#[serde(skip_serializing_if = "Option::is_none")]
pub model_version: Option<String>,
}
// Wire shape for the bbox sub-object Apollo returns. Read by Phase 3's
// file-watch hook; silence the dead-code lint until then.
#[allow(dead_code)]
#[derive(Debug, Clone, Deserialize)]
pub struct DetectedBbox {
pub x: f32,
pub y: f32,
pub w: f32,
pub h: f32,
}
#[allow(dead_code)] // bbox consumed by Phase 3 file-watch hook
#[derive(Debug, Clone, Deserialize)]
pub struct DetectedFace {
pub bbox: DetectedBbox,
pub confidence: f32,
/// base64 of 2048 bytes (512×f32 LE). ImageApi stores the raw bytes
/// verbatim as a BLOB — see `decode_embedding` for the unpack.
pub embedding: String,
}
impl DetectedFace {
/// Decode the wire-format embedding back into raw bytes for storage.
/// Returns the 2048-byte little-endian f32 buffer or an error if the
/// base64 is malformed or the wrong length.
pub fn decode_embedding(&self) -> Result<Vec<u8>> {
let bytes = base64::engine::general_purpose::STANDARD
.decode(self.embedding.as_bytes())
.context("face embedding base64 decode")?;
if bytes.len() != 2048 {
anyhow::bail!(
"face embedding wrong size: got {} bytes, expected 2048",
bytes.len()
);
}
Ok(bytes)
}
}
#[allow(dead_code)] // duration_ms logged by Phase 3 file-watch hook
#[derive(Debug, Clone, Deserialize)]
pub struct DetectResponse {
pub model_version: String,
pub duration_ms: i64,
pub faces: Vec<DetectedFace>,
}
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)] // Reported by Apollo; useful for future health-driven backoff
pub struct FaceHealth {
pub loaded: bool,
pub providers: Vec<String>,
pub model_version: String,
pub det_size: i32,
#[serde(default)]
pub load_error: Option<String>,
}
/// Distinguishes permanent failures (don't retry) from transient ones
/// (defer and retry on next scan tick). The file-watch hook keys its
/// marker-row decision on this — a `Permanent` outcome writes
/// `status='failed'`, a `Transient` outcome writes nothing so the next
/// pass tries again.
#[derive(Debug)]
pub enum FaceDetectError {
/// Apollo refused the bytes for a reason that won't change on retry
/// (decode failure, zero-dim image). Mark `status='failed'`.
Permanent(anyhow::Error),
/// Apollo couldn't process this turn but might next time (CUDA OOM,
/// engine not loaded yet, network hiccup). Don't mark anything.
Transient(anyhow::Error),
/// Feature is disabled (no `APOLLO_FACE_API_BASE_URL`). Caller should
/// silently no-op — same shape as `apollo_client::is_enabled()` false.
Disabled,
}
impl std::fmt::Display for FaceDetectError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FaceDetectError::Permanent(e) => write!(f, "permanent: {e}"),
FaceDetectError::Transient(e) => write!(f, "transient: {e}"),
FaceDetectError::Disabled => write!(f, "face client disabled"),
}
}
}
impl std::error::Error for FaceDetectError {}
#[derive(Clone)]
pub struct FaceClient {
client: Client,
/// `None` → disabled. Trim trailing slash at construction so url
/// building doesn't double up.
base_url: Option<String>,
}
impl FaceClient {
pub fn new(base_url: Option<String>) -> Self {
// 60 s timeout: CPU inference on a backlog can take many seconds
// per photo, especially the first call into a cold GPU. Apollo's
// bounded threadpool (1 worker on CUDA) means concurrent calls
// queue server-side; 60 s is enough headroom for a few items in
// the queue without surfacing a false transient.
let timeout_secs = std::env::var("FACE_DETECT_TIMEOUT_SEC")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(60);
let client = Client::builder()
.timeout(Duration::from_secs(timeout_secs))
.build()
.expect("reqwest client build");
Self {
client,
base_url: base_url.map(|u| u.trim_end_matches('/').to_string()),
}
}
pub fn is_enabled(&self) -> bool {
self.base_url.is_some()
}
/// Detect every face in `bytes`. ImageApi calls this from the file-watch
/// hook (Phase 3) and from the manual rerun handler. Empty `faces[]` in
/// the response is the no-faces signal — caller writes a marker row.
#[allow(dead_code)] // Phase 3 file-watch hook + rerun handler
pub async fn detect(
&self,
bytes: Vec<u8>,
meta: DetectMeta,
) -> std::result::Result<DetectResponse, FaceDetectError> {
let Some(base) = self.base_url.as_deref() else {
return Err(FaceDetectError::Disabled);
};
let url = format!("{}/api/internal/faces/detect", base);
self.post_multipart(&url, bytes, &meta).await
}
/// Single-embedding endpoint for the manual face-create flow. Caller
/// crops the image to the user-drawn bbox and passes those bytes; we
/// run detection inside the crop and return the highest-confidence
/// face's embedding. Apollo returns 422 `no_face_in_crop` when the
/// box missed — surfaced here as `Permanent`.
pub async fn embed(
&self,
bytes: Vec<u8>,
meta: DetectMeta,
) -> std::result::Result<DetectResponse, FaceDetectError> {
let Some(base) = self.base_url.as_deref() else {
return Err(FaceDetectError::Disabled);
};
let url = format!("{}/api/internal/faces/embed", base);
self.post_multipart(&url, bytes, &meta).await
}
/// Engine reachability + provider/model report. Used by ImageApi for a
/// startup sanity check; not on the hot path.
#[allow(dead_code)] // Phase 3 startup probe
pub async fn health(&self) -> Result<FaceHealth> {
let base = self.base_url.as_deref().context("face client disabled")?;
let url = format!("{}/api/internal/faces/health", base);
let resp = self.client.get(&url).send().await?.error_for_status()?;
let body: FaceHealth = resp.json().await?;
Ok(body)
}
async fn post_multipart(
&self,
url: &str,
bytes: Vec<u8>,
meta: &DetectMeta,
) -> std::result::Result<DetectResponse, FaceDetectError> {
let meta_json = serde_json::to_string(meta)
.map_err(|e| FaceDetectError::Permanent(anyhow::anyhow!("meta serialize: {e}")))?;
let form = reqwest::multipart::Form::new()
.text("meta", meta_json)
.part(
"file",
reqwest::multipart::Part::bytes(bytes)
.file_name(meta.rel_path.clone())
.mime_str("application/octet-stream")
.unwrap_or_else(|_| reqwest::multipart::Part::bytes(Vec::new())),
);
let resp = match self.client.post(url).multipart(form).send().await {
Ok(r) => r,
Err(e) if e.is_timeout() || e.is_connect() => {
return Err(FaceDetectError::Transient(anyhow::anyhow!(
"face client network: {e}"
)));
}
Err(e) => {
return Err(FaceDetectError::Transient(anyhow::anyhow!(
"face client request: {e}"
)));
}
};
let status = resp.status();
if status.is_success() {
let body: DetectResponse = resp.json().await.map_err(|e| {
FaceDetectError::Transient(anyhow::anyhow!("face response decode: {e}"))
})?;
return Ok(body);
}
let body_text = resp.text().await.unwrap_or_default();
Err(classify_error_response(status.as_u16(), &body_text))
}
}
/// Map an Apollo HTTP error response to a FaceDetectError. Pulled out as a
/// pure function so the marker-row contract (422 → Permanent, 503 →
/// Transient) is unit-testable without spinning up an HTTP server.
fn classify_error_response(status: u16, body_text: &str) -> FaceDetectError {
// Apollo encodes its error class in the JSON body's `detail`. Try to
// parse it; fall back to status-only classification.
let detail_code = serde_json::from_str::<serde_json::Value>(body_text)
.ok()
.and_then(|v| {
// detail can be a string ("decode_failed") or an object
// ({"code": "cuda_oom", ...}) depending on the endpoint and
// Apollo's response shape — handle both.
v.get("detail")
.and_then(|d| d.as_str().map(str::to_string))
.or_else(|| {
v.get("detail")
.and_then(|d| d.get("code"))
.and_then(|c| c.as_str())
.map(str::to_string)
})
})
.unwrap_or_default();
if status == 422 {
return FaceDetectError::Permanent(anyhow::anyhow!(
"face detect 422 {}: {}",
detail_code,
body_text
));
}
if status == 503 {
return FaceDetectError::Transient(anyhow::anyhow!(
"face detect 503 {}: {}",
detail_code,
body_text
));
}
// Infra-level 4xx that an operator can fix without re-encoding the
// bytes: 408 (proxy timeout), 413 (request too large — reverse-proxy
// body cap), 429 (rate limit). Treating these as Permanent poisons
// every photo that hit the misconfig with `status='failed'` and
// requires a manual DELETE to recover. Defer instead so the next
// scan tick retries naturally once the proxy is fixed.
if matches!(status, 408 | 413 | 429) {
return FaceDetectError::Transient(anyhow::anyhow!(
"face detect {} {}: {}",
status,
detail_code,
body_text
));
}
// Any other 4xx: be conservative and treat as Permanent so we don't
// loop forever on a stable rejection. Any other 5xx: Transient —
// likely intermittent.
if (400..500).contains(&status) {
FaceDetectError::Permanent(anyhow::anyhow!(
"face detect {} {}: {}",
status,
detail_code,
body_text
))
} else {
FaceDetectError::Transient(anyhow::anyhow!(
"face detect {} {}: {}",
status,
detail_code,
body_text
))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn is_permanent(e: &FaceDetectError) -> bool {
matches!(e, FaceDetectError::Permanent(_))
}
fn is_transient(e: &FaceDetectError) -> bool {
matches!(e, FaceDetectError::Transient(_))
}
#[test]
fn classify_422_decode_failed_is_permanent() {
// Permanent → ImageApi marks status='failed' and stops retrying.
let e = classify_error_response(422, r#"{"detail":"decode_failed: bad bytes"}"#);
assert!(is_permanent(&e), "422 decode_failed must be Permanent");
assert!(format!("{e}").contains("decode_failed"));
}
#[test]
fn classify_503_cuda_oom_is_transient() {
// Transient → ImageApi must NOT write a marker so the next scan
// retries. The detail.code is nested in an object rather than a
// bare string; the parser handles both.
let e = classify_error_response(
503,
r#"{"detail":{"code":"cuda_oom","error":"out of memory"}}"#,
);
assert!(is_transient(&e), "503 cuda_oom must be Transient");
assert!(format!("{e}").contains("cuda_oom"));
}
#[test]
fn classify_500_is_transient_other_4xx_is_permanent() {
// Conservative split: 5xx defers (intermittent), other 4xx
// is treated as a stable rejection so we don't loop forever.
assert!(is_transient(&classify_error_response(500, "")));
assert!(is_transient(&classify_error_response(502, "{}")));
assert!(is_permanent(&classify_error_response(400, "{}")));
assert!(is_permanent(&classify_error_response(404, "{}")));
}
#[test]
fn classify_infra_4xx_is_transient() {
// 408 / 413 / 429 are operator-fixable proxy/infra errors.
// Marking them Permanent poisons every affected photo with
// status='failed' and requires manual SQL to recover. The
// 413 path specifically bit us when nginx defaulted to a 1 MB
// body cap and rejected normal-size photos before they reached
// the backend.
assert!(is_transient(&classify_error_response(408, "")));
assert!(is_transient(&classify_error_response(
413,
"<html>nginx</html>"
)));
assert!(is_transient(&classify_error_response(429, "{}")));
}
#[test]
fn classify_handles_unparseable_body() {
// Apollo can return non-JSON on misroute / proxy errors; the
// classifier must still produce a useful variant.
let e = classify_error_response(503, "<html>nginx</html>");
assert!(is_transient(&e));
}
}