Face Recognition / People Integration #61
@@ -386,6 +386,16 @@ pub trait ExifDao: Sync + Send {
|
|||||||
hash: &str,
|
hash: &str,
|
||||||
) -> Result<Vec<String>, DbError>;
|
) -> Result<Vec<String>, DbError>;
|
||||||
|
|
||||||
|
/// Batch version of [`get_rel_paths_by_hash`]. Returns a
|
||||||
|
/// `hash → Vec<rel_path>` map for every hash that has at least one
|
||||||
|
/// rel_path. Used by the batch tag lookup endpoint to expand
|
||||||
|
/// content-hash siblings without firing a query per hash.
|
||||||
|
fn get_rel_paths_for_hashes(
|
||||||
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
hashes: &[String],
|
||||||
|
) -> Result<std::collections::HashMap<String, Vec<String>>, DbError>;
|
||||||
|
|
||||||
/// List `(library_id, rel_path)` pairs for the given libraries, optionally
|
/// List `(library_id, rel_path)` pairs for the given libraries, optionally
|
||||||
/// restricted to rows whose rel_path starts with `path_prefix`. When
|
/// restricted to rows whose rel_path starts with `path_prefix`. When
|
||||||
/// `library_ids` is empty, rows from every library are returned. Used by
|
/// `library_ids` is empty, rows from every library are returned. Used by
|
||||||
@@ -956,6 +966,40 @@ impl ExifDao for SqliteExifDao {
|
|||||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_rel_paths_for_hashes(
|
||||||
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
hashes: &[String],
|
||||||
|
) -> Result<std::collections::HashMap<String, Vec<String>>, DbError> {
|
||||||
|
use std::collections::HashMap;
|
||||||
|
let mut out: HashMap<String, Vec<String>> = HashMap::new();
|
||||||
|
if hashes.is_empty() {
|
||||||
|
return Ok(out);
|
||||||
|
}
|
||||||
|
trace_db_call(context, "query", "get_rel_paths_for_hashes", |_span| {
|
||||||
|
use schema::image_exif::dsl::*;
|
||||||
|
|
||||||
|
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
|
||||||
|
|
||||||
|
// Chunk the IN clause to stay safely under SQLite's
|
||||||
|
// SQLITE_LIMIT_VARIABLE_NUMBER (32766 modern, 999 legacy).
|
||||||
|
const CHUNK: usize = 500;
|
||||||
|
for chunk in hashes.chunks(CHUNK) {
|
||||||
|
let rows: Vec<(String, String)> = image_exif
|
||||||
|
.filter(content_hash.eq_any(chunk))
|
||||||
|
.select((content_hash.assume_not_null(), rel_path))
|
||||||
|
.distinct()
|
||||||
|
.load::<(String, String)>(connection.deref_mut())
|
||||||
|
.map_err(|_| anyhow::anyhow!("Query error"))?;
|
||||||
|
for (hash, path) in rows {
|
||||||
|
out.entry(hash).or_default().push(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(out)
|
||||||
|
})
|
||||||
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||||
|
}
|
||||||
|
|
||||||
fn list_rel_paths_for_libraries(
|
fn list_rel_paths_for_libraries(
|
||||||
&mut self,
|
&mut self,
|
||||||
context: &opentelemetry::Context,
|
context: &opentelemetry::Context,
|
||||||
|
|||||||
@@ -1659,6 +1659,14 @@ mod tests {
|
|||||||
Ok(vec![])
|
Ok(vec![])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_rel_paths_for_hashes(
|
||||||
|
&mut self,
|
||||||
|
_context: &opentelemetry::Context,
|
||||||
|
_hashes: &[String],
|
||||||
|
) -> Result<std::collections::HashMap<String, Vec<String>>, DbError> {
|
||||||
|
Ok(std::collections::HashMap::new())
|
||||||
|
}
|
||||||
|
|
||||||
fn list_rel_paths_for_libraries(
|
fn list_rel_paths_for_libraries(
|
||||||
&mut self,
|
&mut self,
|
||||||
_context: &opentelemetry::Context,
|
_context: &opentelemetry::Context,
|
||||||
|
|||||||
176
src/tags.rs
176
src/tags.rs
@@ -244,44 +244,142 @@ pub struct LookupTagsBatchRequest {
|
|||||||
pub paths: Vec<String>,
|
pub paths: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Bulk per-path tag lookup. Apollo's photo-match flow used to fan out
|
/// Bulk per-path tag lookup with cross-library content-hash sibling
|
||||||
/// one ``GET /image/tags?path=`` per record (~4k for a wide window) —
|
/// expansion. Apollo's photo-match flow used to fan out one
|
||||||
|
/// ``GET /image/tags?path=`` per record (~4k for a wide window) —
|
||||||
/// each call locked the dao briefly and the round-trip cost dwarfed
|
/// each call locked the dao briefly and the round-trip cost dwarfed
|
||||||
/// the actual SQL. This collapses the whole fan-out into one POST and
|
/// the actual SQL. This collapses the whole fan-out into:
|
||||||
/// one (chunked) JOIN. Body: ``{paths: [...]}``; response:
|
|
||||||
/// ``{path: [{id, name, ...}]}`` with **only paths that have at least
|
|
||||||
/// one tag** in the map (the caller treats absence as empty list).
|
|
||||||
///
|
///
|
||||||
/// Trade-off: this matches by ``rel_path`` directly and does NOT do
|
/// 1. one ``image_exif`` batch lookup → query path → content_hash
|
||||||
/// the cross-library content-hash sibling expansion that the per-path
|
/// 2. one ``image_exif`` JOIN by content_hash → all sibling rel_paths
|
||||||
/// ``GET /image/tags`` does. For Apollo's grid view the simpler match
|
/// (so a tag applied under library A surfaces under library B
|
||||||
/// is fine — it's the common case for single-library deploys; the
|
/// when the content hashes match — important once a backup mount
|
||||||
/// carousel still uses the per-path endpoint and resolves siblings on
|
/// holds copies of files from the primary library)
|
||||||
/// demand. If multi-library content-hash sharing becomes load-bearing
|
/// 3. one ``tagged_photo`` JOIN over the union of (query + sibling)
|
||||||
/// for the grid, extend this to JOIN ``image_exif`` on content_hash.
|
/// rel_paths
|
||||||
|
///
|
||||||
|
/// Body: ``{paths: [...]}``; response: ``{path: [{id, name, ...}]}``
|
||||||
|
/// with only paths that have at least one tag (caller treats absence
|
||||||
|
/// as empty). Each chunk is capped to stay under SQLite's variable
|
||||||
|
/// limit; five queries per 4k photos is still ~800x cheaper than
|
||||||
|
/// per-path HTTP fan-out.
|
||||||
async fn lookup_tags_batch<D: TagDao>(
|
async fn lookup_tags_batch<D: TagDao>(
|
||||||
_: Claims,
|
_: Claims,
|
||||||
http_request: HttpRequest,
|
http_request: HttpRequest,
|
||||||
body: web::Json<LookupTagsBatchRequest>,
|
body: web::Json<LookupTagsBatchRequest>,
|
||||||
tag_dao: web::Data<Mutex<D>>,
|
tag_dao: web::Data<Mutex<D>>,
|
||||||
|
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
let context = extract_context_from_request(&http_request);
|
let context = extract_context_from_request(&http_request);
|
||||||
let span = global_tracer().start_with_context("lookup_tags_batch", &context);
|
let span = global_tracer().start_with_context("lookup_tags_batch", &context);
|
||||||
let span_context = opentelemetry::Context::current_with_span(span);
|
let span_context = opentelemetry::Context::current_with_span(span);
|
||||||
|
|
||||||
if body.paths.is_empty() {
|
if body.paths.is_empty() {
|
||||||
return HttpResponse::Ok().json(std::collections::HashMap::<String, Vec<Tag>>::new());
|
return HttpResponse::Ok().json(HashMap::<String, Vec<Tag>>::new());
|
||||||
}
|
}
|
||||||
|
|
||||||
let normalized: Vec<String> = body.paths.iter().map(|p| normalize_path(p)).collect();
|
let query_paths: Vec<String> = body.paths.iter().map(|p| normalize_path(p)).collect();
|
||||||
|
|
||||||
|
// Stage 1: query → content_hash mapping. Files without a hash yet
|
||||||
|
// (just-indexed, hash compute failed, etc.) skip the sibling
|
||||||
|
// expansion and only get tags from their own rel_path.
|
||||||
|
let exif_records = {
|
||||||
|
let mut dao = exif_dao.lock().expect("Unable to get ExifDao");
|
||||||
|
match dao.get_exif_batch(&span_context, &query_paths) {
|
||||||
|
Ok(rows) => rows,
|
||||||
|
Err(e) => {
|
||||||
|
return HttpResponse::InternalServerError()
|
||||||
|
.body(format!("exif batch lookup failed: {:?}", e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let mut hash_by_path: HashMap<String, String> = HashMap::with_capacity(exif_records.len());
|
||||||
|
for record in exif_records {
|
||||||
|
if let Some(h) = record.content_hash {
|
||||||
|
hash_by_path.insert(record.file_path, h);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let unique_hashes: Vec<String> = hash_by_path
|
||||||
|
.values()
|
||||||
|
.cloned()
|
||||||
|
.collect::<HashSet<_>>()
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Stage 2: hash → all sibling rel_paths.
|
||||||
|
let paths_by_hash = if unique_hashes.is_empty() {
|
||||||
|
HashMap::new()
|
||||||
|
} else {
|
||||||
|
let mut dao = exif_dao.lock().expect("Unable to get ExifDao");
|
||||||
|
match dao.get_rel_paths_for_hashes(&span_context, &unique_hashes) {
|
||||||
|
Ok(map) => map,
|
||||||
|
Err(e) => {
|
||||||
|
return HttpResponse::InternalServerError()
|
||||||
|
.body(format!("hash sibling lookup failed: {:?}", e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Stage 3: build expanded path set and the reverse map
|
||||||
|
// sibling → [original query paths whose tag bucket should include
|
||||||
|
// the sibling's tags]. A query path always attributes to itself
|
||||||
|
// (covers the no-content-hash case).
|
||||||
|
let mut originals_by_sibling: HashMap<String, Vec<String>> = HashMap::new();
|
||||||
|
let mut all_paths: HashSet<String> = HashSet::new();
|
||||||
|
for query_path in &query_paths {
|
||||||
|
all_paths.insert(query_path.clone());
|
||||||
|
originals_by_sibling
|
||||||
|
.entry(query_path.clone())
|
||||||
|
.or_default()
|
||||||
|
.push(query_path.clone());
|
||||||
|
if let Some(hash) = hash_by_path.get(query_path)
|
||||||
|
&& let Some(siblings) = paths_by_hash.get(hash)
|
||||||
|
{
|
||||||
|
for sibling in siblings {
|
||||||
|
if sibling == query_path {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
all_paths.insert(sibling.clone());
|
||||||
|
originals_by_sibling
|
||||||
|
.entry(sibling.clone())
|
||||||
|
.or_default()
|
||||||
|
.push(query_path.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stage 4: tags grouped by rel_path for the union.
|
||||||
|
let all_paths_vec: Vec<String> = all_paths.into_iter().collect();
|
||||||
|
let tags_by_sibling = {
|
||||||
let mut dao = tag_dao.lock().expect("Unable to get TagDao");
|
let mut dao = tag_dao.lock().expect("Unable to get TagDao");
|
||||||
match dao.get_tags_grouped_by_paths(&span_context, &normalized) {
|
match dao.get_tags_grouped_by_paths(&span_context, &all_paths_vec) {
|
||||||
Ok(grouped) => {
|
Ok(map) => map,
|
||||||
|
Err(e) => {
|
||||||
|
return HttpResponse::InternalServerError().body(format!("{}", e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Stage 5: aggregate sibling tags back to original query paths,
|
||||||
|
// de-duped by tag id. Empty buckets stay out of the response so
|
||||||
|
// the caller's "missing key = []" contract holds.
|
||||||
|
let mut result: HashMap<String, Vec<Tag>> = HashMap::new();
|
||||||
|
for (sibling_path, originals) in originals_by_sibling {
|
||||||
|
if let Some(tags) = tags_by_sibling.get(&sibling_path) {
|
||||||
|
for orig in originals {
|
||||||
|
let entry = result.entry(orig).or_default();
|
||||||
|
for t in tags {
|
||||||
|
if !entry.iter().any(|e| e.id == t.id) {
|
||||||
|
entry.push(t.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
span_context.span().set_status(Status::Ok);
|
span_context.span().set_status(Status::Ok);
|
||||||
HttpResponse::Ok().json(grouped)
|
HttpResponse::Ok().json(result)
|
||||||
}
|
|
||||||
Err(e) => HttpResponse::InternalServerError().body(format!("{}", e)),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Queryable, Clone, Debug, PartialEq)]
|
#[derive(Serialize, Queryable, Clone, Debug, PartialEq)]
|
||||||
@@ -1142,6 +1240,42 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn get_tags_grouped_by_paths_returns_per_path_buckets() {
|
||||||
|
// Backstop for the batch tag-lookup endpoint: confirms the
|
||||||
|
// grouped variant returns one bucket per path with at least
|
||||||
|
// one tag, and omits paths with no tags entirely (the caller
|
||||||
|
// treats absence as []). The handler stacks sibling expansion
|
||||||
|
// on top via image_exif content_hash; the DAO method itself
|
||||||
|
// just needs to honour rel_path → tags directly.
|
||||||
|
let mut dao = TestTagDao::new();
|
||||||
|
let ctx = opentelemetry::Context::current();
|
||||||
|
// Seed: two paths tagged, one path untagged.
|
||||||
|
dao.tagged_photos.borrow_mut().insert(
|
||||||
|
"a.jpg".into(),
|
||||||
|
vec![Tag { id: 1, name: "alpha".into(), created_time: 0 }],
|
||||||
|
);
|
||||||
|
dao.tagged_photos.borrow_mut().insert(
|
||||||
|
"b.jpg".into(),
|
||||||
|
vec![
|
||||||
|
Tag { id: 2, name: "beta".into(), created_time: 0 },
|
||||||
|
Tag { id: 3, name: "gamma".into(), created_time: 0 },
|
||||||
|
],
|
||||||
|
);
|
||||||
|
let grouped = dao
|
||||||
|
.get_tags_grouped_by_paths(
|
||||||
|
&ctx,
|
||||||
|
&["a.jpg".into(), "b.jpg".into(), "c.jpg".into()],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(grouped.get("a.jpg").map(|v| v.len()), Some(1));
|
||||||
|
assert_eq!(grouped.get("b.jpg").map(|v| v.len()), Some(2));
|
||||||
|
assert!(
|
||||||
|
!grouped.contains_key("c.jpg"),
|
||||||
|
"untagged paths must be absent so caller's missing-key=[] contract holds"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn add_new_tag_test() {
|
async fn add_new_tag_test() {
|
||||||
let tag_dao = TestTagDao::new();
|
let tag_dao = TestTagDao::new();
|
||||||
|
|||||||
Reference in New Issue
Block a user