//! Duplicate detection surface — exact (blake3) and perceptual //! (pHash + Hamming) groups, plus the soft-mark resolve flow that //! Apollo's DUPLICATES modal drives. //! //! All routes require auth (Claims). Endpoints: //! //! - `GET /duplicates/exact?library=&include_resolved=` — count>1 byte-identical groups. //! - `GET /duplicates/perceptual?library=&threshold=&include_resolved=` — Hamming-clustered groups. //! - `POST /duplicates/resolve` — soft-mark demoted siblings. //! - `POST /duplicates/unresolve` — clear a prior soft-mark. //! //! Perceptual clustering caches the BK-tree result for 5 minutes so //! repeated opens of the modal don't re-cluster the whole library. //! Cache invalidation is best-effort: resolve/unresolve clear the //! cache, but new files arriving via the watcher don't (the next //! 5-minute window picks them up). For a single-user personal tool //! that's the right trade-off. use std::collections::HashMap; use std::sync::Mutex; use std::time::{Duration, Instant}; use actix_web::{App, HttpRequest, HttpResponse, Responder, dev::ServiceFactory, web}; use bk_tree::{BKTree, Metric}; use lazy_static::lazy_static; use opentelemetry::trace::{TraceContextExt, Tracer}; use serde::{Deserialize, Serialize}; use crate::data::Claims; use crate::database::{DuplicateRow, ExifDao}; use crate::libraries; use crate::otel::{extract_context_from_request, global_tracer}; use crate::state::AppState; // ── Cache ──────────────────────────────────────────────────────────────── const PERCEPTUAL_CACHE_TTL: Duration = Duration::from_secs(300); const FOLDER_PAIR_CACHE_TTL: Duration = Duration::from_secs(300); #[derive(Clone)] struct PerceptualCacheEntry { /// Cache key: (library_id, threshold, include_resolved). `library_id` /// is `None` for "all libraries". Cluster output is the same shape we /// return on the wire so we can serve cached requests with zero work. library_id: Option, threshold: u32, include_resolved: bool, computed_at: Instant, groups: Vec, } #[derive(Clone)] struct FolderPairCacheEntry { library_id: Option, include_resolved: bool, computed_at: Instant, /// Pre-filter cache: every detected folder pair, regardless of /// min_coverage / min_shared. The handler applies the user-supplied /// thresholds on top so a slider drag is a memcpy + a filter, not a /// re-bucket. Bucketing dominates the cost (O(N²) over members of /// big exact-dup groups), so this is the right cache boundary. pairs: Vec, } lazy_static! { static ref PERCEPTUAL_CACHE: Mutex> = Mutex::new(None); static ref FOLDER_PAIR_CACHE: Mutex> = Mutex::new(None); } /// Drop the perceptual-cluster cache. Called from `resolve`/`unresolve` /// so the next modal open reflects the soft-mark change immediately. fn invalidate_perceptual_cache() { if let Ok(mut guard) = PERCEPTUAL_CACHE.lock() { *guard = None; } } /// Drop the folder-pair cache. Same rationale as /// `invalidate_perceptual_cache`: a resolve mutates which rows /// participate, so the next folder-pair fetch must re-bucket. fn invalidate_folder_pair_cache() { if let Ok(mut guard) = FOLDER_PAIR_CACHE.lock() { *guard = None; } } // ── Wire shapes ────────────────────────────────────────────────────────── #[derive(Serialize, Debug, Clone)] pub struct DuplicateMember { pub library_id: i32, pub rel_path: String, pub content_hash: String, pub size_bytes: Option, pub date_taken: Option, pub width: Option, pub height: Option, pub duplicate_of_hash: Option, pub duplicate_decided_at: Option, } impl From for DuplicateMember { fn from(r: DuplicateRow) -> Self { Self { library_id: r.library_id, rel_path: r.rel_path, content_hash: r.content_hash, size_bytes: r.size_bytes, date_taken: r.date_taken, width: r.width, height: r.height, duplicate_of_hash: r.duplicate_of_hash, duplicate_decided_at: r.duplicate_decided_at, } } } #[derive(Serialize, Debug, Clone)] #[serde(rename_all = "lowercase")] pub enum DuplicateKind { Exact, Perceptual, } #[derive(Serialize, Debug, Clone)] pub struct DuplicateGroup { pub kind: DuplicateKind, /// Representative content_hash. For exact groups, the shared hash /// (every member has the same one). For perceptual groups, an /// arbitrary cluster member's hash, used only as a stable id for /// the UI to key off. pub representative_hash: String, pub members: Vec, } #[derive(Deserialize, Debug)] pub struct ListDuplicatesQuery { pub library: Option, #[serde(default)] pub include_resolved: Option, /// Perceptual only — Hamming-distance threshold. Ignored on the /// exact endpoint. Defaults to 8 (~12% similarity tolerance, the /// sweet spot for resized/recompressed copies). #[serde(default)] pub threshold: Option, } #[derive(Deserialize, Debug)] pub struct DuplicateMemberRef { pub library_id: i32, pub rel_path: String, } #[derive(Deserialize, Debug)] pub struct ResolveDuplicatesReq { pub survivor: DuplicateMemberRef, pub demoted: Vec, } #[derive(Serialize, Debug)] pub struct ResolveResponse { pub resolved_count: usize, } #[derive(Deserialize, Debug)] pub struct UnresolveDuplicateReq { pub library_id: i32, pub rel_path: String, } #[derive(Deserialize, Debug)] pub struct ListFolderPairsQuery { pub library: Option, #[serde(default)] pub include_resolved: Option, /// Coverage floor: `shared / min(side_a.total, side_b.total)`. /// Default 0.5 — surfaces "at least half of the smaller folder is /// duplicated in the other folder", which is the threshold above /// which "demote one whole side" feels safe. #[serde(default)] pub min_coverage: Option, /// Absolute floor on shared-file count. Default 3 — anything less /// is incidental noise (e.g. two folders that happen to share a /// stock background image). #[serde(default)] pub min_shared: Option, } #[derive(Serialize, Debug, Clone)] pub struct FolderEndpoint { pub library_id: i32, /// Folder path relative to library root, e.g. `Cars/BMW`. Empty /// string for files at the library root (no leading slash). pub folder: String, /// Total count of `image_exif` rows in this folder, applied with /// the same `include_resolved` filter as the dup query so the /// numerator and denominator come from the same population. pub total_files: i64, } #[derive(Serialize, Debug, Clone)] pub struct FolderPairFile { pub content_hash: String, pub a_rel_path: String, pub b_rel_path: String, pub size_bytes: Option, pub date_taken: Option, pub width: Option, pub height: Option, } #[derive(Serialize, Debug, Clone)] pub struct FolderPair { pub side_a: FolderEndpoint, pub side_b: FolderEndpoint, pub shared_count: i64, /// `shared_count / min(side_a.total_files, side_b.total_files)`, /// in `[0.0, 1.0]`. A coverage of 1.0 means the smaller folder is /// fully contained in the other. pub coverage: f32, pub shared_files: Vec, } // ── Handlers ───────────────────────────────────────────────────────────── async fn list_exact_handler( _: Claims, request: HttpRequest, app_state: web::Data, query: web::Query, exif_dao: web::Data>>, ) -> impl Responder { let context = extract_context_from_request(&request); let span = global_tracer().start_with_context("duplicates.list_exact", &context); let span_context = opentelemetry::Context::current_with_span(span); let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref()) .ok() .flatten() .map(|l| l.id); let include_resolved = query.include_resolved.unwrap_or(false); let rows = { let mut dao = exif_dao.lock().expect("exif dao lock"); match dao.list_duplicates_exact(&span_context, library_id, include_resolved) { Ok(rows) => rows, Err(e) => { return HttpResponse::InternalServerError().body(format!("{:?}", e)); } } }; let groups = group_exact(rows); HttpResponse::Ok().json(GroupsResponse { groups }) } async fn list_perceptual_handler( _: Claims, request: HttpRequest, app_state: web::Data, query: web::Query, exif_dao: web::Data>>, ) -> impl Responder { let context = extract_context_from_request(&request); let span = global_tracer().start_with_context("duplicates.list_perceptual", &context); let span_context = opentelemetry::Context::current_with_span(span); let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref()) .ok() .flatten() .map(|l| l.id); let threshold = query.threshold.unwrap_or(8).clamp(0, 32); let include_resolved = query.include_resolved.unwrap_or(false); // Cache hit? if let Ok(guard) = PERCEPTUAL_CACHE.lock() && let Some(entry) = guard.as_ref() && entry.library_id == library_id && entry.threshold == threshold && entry.include_resolved == include_resolved && entry.computed_at.elapsed() < PERCEPTUAL_CACHE_TTL { return HttpResponse::Ok().json(GroupsResponse { groups: entry.groups.clone(), }); } let rows = { let mut dao = exif_dao.lock().expect("exif dao lock"); match dao.list_perceptual_candidates(&span_context, library_id, include_resolved) { Ok(rows) => rows, Err(e) => { return HttpResponse::InternalServerError().body(format!("{:?}", e)); } } }; let groups = cluster_perceptual(rows, threshold); if let Ok(mut guard) = PERCEPTUAL_CACHE.lock() { *guard = Some(PerceptualCacheEntry { library_id, threshold, include_resolved, computed_at: Instant::now(), groups: groups.clone(), }); } HttpResponse::Ok().json(GroupsResponse { groups }) } async fn resolve_handler( _: Claims, request: HttpRequest, body: web::Json, exif_dao: web::Data>>, ) -> impl Responder { let context = extract_context_from_request(&request); let span = global_tracer().start_with_context("duplicates.resolve", &context); let span_context = opentelemetry::Context::current_with_span(span); if body.demoted.is_empty() { return HttpResponse::BadRequest().body("demoted list is empty"); } let mut dao = exif_dao.lock().expect("exif dao lock"); // Resolve survivor → its content_hash, plus the canonical rel_path // we'll use as the destination for any tag-union INSERTs. let survivor = match dao.lookup_duplicate_row( &span_context, body.survivor.library_id, &body.survivor.rel_path, ) { Ok(Some(row)) => row, Ok(None) => return HttpResponse::NotFound().body("survivor not found"), Err(e) => return HttpResponse::InternalServerError().body(format!("{:?}", e)), }; // Survivor must not itself be soft-marked — otherwise the modal is // pointing at a row we've already demoted, which would create a chain. if survivor.duplicate_of_hash.is_some() { return HttpResponse::Conflict().body("survivor is itself soft-marked as a duplicate"); } let now = chrono::Utc::now().timestamp(); let mut resolved_count = 0usize; for member_ref in &body.demoted { let demoted = match dao.lookup_duplicate_row( &span_context, member_ref.library_id, &member_ref.rel_path, ) { Ok(Some(row)) => row, Ok(None) => { log::warn!( "duplicates.resolve: skipping unknown demoted ({}, {})", member_ref.library_id, member_ref.rel_path ); continue; } Err(e) => { return HttpResponse::InternalServerError().body(format!("{:?}", e)); } }; // Survivor and demoted must not be the same row (would set // duplicate_of_hash to its own hash — recursive nonsense). if demoted.library_id == survivor.library_id && demoted.rel_path == survivor.rel_path { continue; } // For perceptual dups (different content_hash), union the // demoted's tag set onto the survivor before flipping the // soft-mark. For exact dups (same content_hash), tags are // already shared at the bytes layer — the union is a no-op. if demoted.content_hash != survivor.content_hash && let Err(e) = dao.union_perceptual_tags( &span_context, &survivor.content_hash, &demoted.content_hash, &survivor.rel_path, ) { log::warn!( "duplicates.resolve: tag union failed for {}: {:?}", demoted.rel_path, e ); // Continue with the soft-mark anyway — losing tag // continuity is recoverable (unresolve restores the // demoted row's grid presence, and the original tags // never moved off the demoted hash). } if let Err(e) = dao.set_duplicate_of( &span_context, demoted.library_id, &demoted.rel_path, &survivor.content_hash, now, ) { return HttpResponse::InternalServerError().body(format!("{:?}", e)); } resolved_count += 1; } drop(dao); invalidate_perceptual_cache(); invalidate_folder_pair_cache(); HttpResponse::Ok().json(ResolveResponse { resolved_count }) } async fn unresolve_handler( _: Claims, request: HttpRequest, body: web::Json, exif_dao: web::Data>>, ) -> impl Responder { let context = extract_context_from_request(&request); let span = global_tracer().start_with_context("duplicates.unresolve", &context); let span_context = opentelemetry::Context::current_with_span(span); let mut dao = exif_dao.lock().expect("exif dao lock"); if let Err(e) = dao.clear_duplicate_of(&span_context, body.library_id, &body.rel_path) { return HttpResponse::InternalServerError().body(format!("{:?}", e)); } drop(dao); invalidate_perceptual_cache(); invalidate_folder_pair_cache(); HttpResponse::Ok().finish() } async fn list_folder_pairs_handler( _: Claims, request: HttpRequest, app_state: web::Data, query: web::Query, exif_dao: web::Data>>, ) -> impl Responder { let context = extract_context_from_request(&request); let span = global_tracer().start_with_context("duplicates.list_folder_pairs", &context); let span_context = opentelemetry::Context::current_with_span(span); let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref()) .ok() .flatten() .map(|l| l.id); let include_resolved = query.include_resolved.unwrap_or(false); let min_coverage = query.min_coverage.unwrap_or(0.5).clamp(0.0, 1.0); let min_shared = query.min_shared.unwrap_or(3).max(1); // Cache hit on the (library, include_resolved) tuple — coverage / // min_shared are user-tunable filters applied AFTER bucketing, so // the cache stores the unfiltered pair list. if let Ok(guard) = FOLDER_PAIR_CACHE.lock() && let Some(entry) = guard.as_ref() && entry.library_id == library_id && entry.include_resolved == include_resolved && entry.computed_at.elapsed() < FOLDER_PAIR_CACHE_TTL { let filtered = filter_folder_pairs(entry.pairs.clone(), min_coverage, min_shared); return HttpResponse::Ok().json(FolderPairsResponse { pairs: filtered }); } let (dup_rows, all_paths) = { let mut dao = exif_dao.lock().expect("exif dao lock"); let dup_rows = match dao.list_duplicates_exact(&span_context, library_id, include_resolved) { Ok(rows) => rows, Err(e) => return HttpResponse::InternalServerError().body(format!("{:?}", e)), }; let all_paths = match dao.list_image_paths(&span_context, library_id, include_resolved) { Ok(rows) => rows, Err(e) => return HttpResponse::InternalServerError().body(format!("{:?}", e)), }; (dup_rows, all_paths) }; let totals = folder_totals(&all_paths); let pairs = bucket_folder_pairs(dup_rows, &totals); if let Ok(mut guard) = FOLDER_PAIR_CACHE.lock() { *guard = Some(FolderPairCacheEntry { library_id, include_resolved, computed_at: Instant::now(), pairs: pairs.clone(), }); } let filtered = filter_folder_pairs(pairs, min_coverage, min_shared); HttpResponse::Ok().json(FolderPairsResponse { pairs: filtered }) } // ── Grouping / clustering ──────────────────────────────────────────────── #[derive(Serialize, Debug)] struct GroupsResponse { groups: Vec, } #[derive(Serialize, Debug)] struct FolderPairsResponse { pairs: Vec, } /// Folder portion of `rel_path`: everything up to (and excluding) the /// last `/`. Returns an empty string for top-level files. Library-root /// agnostic — the rel_path is already relative to the library root. fn folder_dir(rel_path: &str) -> &str { match rel_path.rfind('/') { Some(i) => &rel_path[..i], None => "", } } /// Per-folder file totals from a flat `(library_id, rel_path)` listing. /// Used as the denominator for the coverage metric. We count every /// hashed image_exif row that matches the dup query's filter, so the /// numerator (shared dups) and denominator (folder population) come /// from the same population. fn folder_totals(rows: &[(i32, String)]) -> HashMap<(i32, String), i64> { let mut out: HashMap<(i32, String), i64> = HashMap::new(); for (lib, rel_path) in rows { let dir = folder_dir(rel_path).to_string(); *out.entry((*lib, dir)).or_insert(0) += 1; } out } /// Canonical ordering for a folder pair: lexicographic on /// `(library_id, folder)`. Ensures we bucket `(F1, F2)` and `(F2, F1)` /// onto the same key regardless of which member of an exact-dup group /// we encounter first. fn canonical_pair<'a>( a: &'a (i32, String), b: &'a (i32, String), ) -> (&'a (i32, String), &'a (i32, String)) { if a <= b { (a, b) } else { (b, a) } } /// Bucket exact-dup rows into folder-pair edges. For each exact-dup /// group, pick one representative member per (library, folder) tuple /// (lex-smallest rel_path) so within-folder duplicates collapse to one /// edge endpoint each — those are an EXACT-tab concern, not a /// folder-pair one. Then for every distinct ordered pair of folders /// the hash touches, record one shared-file entry. /// /// Output is unfiltered; the caller applies `min_coverage` / /// `min_shared` thresholds on top so the slider UX doesn't have to /// re-bucket on every drag. fn bucket_folder_pairs( rows: Vec, totals: &HashMap<(i32, String), i64>, ) -> Vec { // Group dup rows by content_hash (the rows are already ordered by // hash in the SQL query, but don't rely on that for correctness). let mut by_hash: HashMap> = HashMap::new(); for row in rows { by_hash .entry(row.content_hash.clone()) .or_default() .push(row); } // Edge accumulator: pair-key → list of shared files. The pair key // owns its strings so we can serialize folder names back into the // wire shape without juggling lifetimes through the response. type PairKey = ((i32, String), (i32, String)); let mut edges: HashMap> = HashMap::new(); for (_, members) in by_hash { if members.len() < 2 { continue; } // One representative per (library_id, folder). Lex-smallest // rel_path wins — deterministic, and matches the SQL ORDER BY // so the visible thumbnail is stable across requests. let mut by_folder: HashMap<(i32, String), &DuplicateRow> = HashMap::new(); for m in &members { let key = (m.library_id, folder_dir(&m.rel_path).to_string()); by_folder .entry(key) .and_modify(|existing| { if m.rel_path < existing.rel_path { *existing = m; } }) .or_insert(m); } // Skip degenerate hashes that all live in one folder — those // are within-folder dups, surfaced by the EXACT tab. if by_folder.len() < 2 { continue; } // For every unordered pair of folders this hash touches, record // one shared-file entry (one rep per side). let folder_keys: Vec<&(i32, String)> = by_folder.keys().collect(); for i in 0..folder_keys.len() { for j in (i + 1)..folder_keys.len() { let (a_key, b_key) = canonical_pair(folder_keys[i], folder_keys[j]); let a_row = by_folder[a_key]; let b_row = by_folder[b_key]; edges .entry((a_key.clone(), b_key.clone())) .or_default() .push(FolderPairFile { content_hash: a_row.content_hash.clone(), a_rel_path: a_row.rel_path.clone(), b_rel_path: b_row.rel_path.clone(), // Size / dims should agree across exact dups, // but if a NULL slipped in we just take side A's. size_bytes: a_row.size_bytes.or(b_row.size_bytes), date_taken: a_row.date_taken.or(b_row.date_taken), width: a_row.width.or(b_row.width), height: a_row.height.or(b_row.height), }); } } } let mut pairs: Vec = edges .into_iter() .map(|((a, b), mut shared_files)| { // Stable rel_path order inside the response so the // frontend's thumbnail strip doesn't reshuffle on refetch. shared_files.sort_by(|x, y| x.a_rel_path.cmp(&y.a_rel_path)); let total_a = totals.get(&a).copied().unwrap_or(0); let total_b = totals.get(&b).copied().unwrap_or(0); let shared_count = shared_files.len() as i64; let denom = total_a.min(total_b).max(1) as f32; let coverage = (shared_count as f32 / denom).clamp(0.0, 1.0); FolderPair { side_a: FolderEndpoint { library_id: a.0, folder: a.1, total_files: total_a, }, side_b: FolderEndpoint { library_id: b.0, folder: b.1, total_files: total_b, }, shared_count, coverage, shared_files, } }) .collect(); // Largest shared_count first (most reward per click), tie-break on // higher coverage (subset-into-superset matches above scattered // fragments of equal size), then deterministic by folder names. pairs.sort_by(|a, b| { b.shared_count .cmp(&a.shared_count) .then_with(|| { b.coverage .partial_cmp(&a.coverage) .unwrap_or(std::cmp::Ordering::Equal) }) .then_with(|| a.side_a.library_id.cmp(&b.side_a.library_id)) .then_with(|| a.side_a.folder.cmp(&b.side_a.folder)) .then_with(|| a.side_b.library_id.cmp(&b.side_b.library_id)) .then_with(|| a.side_b.folder.cmp(&b.side_b.folder)) }); pairs } fn filter_folder_pairs( pairs: Vec, min_coverage: f32, min_shared: u32, ) -> Vec { pairs .into_iter() .filter(|p| p.coverage >= min_coverage && p.shared_count >= min_shared as i64) .collect() } fn group_exact(rows: Vec) -> Vec { let mut by_hash: HashMap> = HashMap::new(); for row in rows { by_hash .entry(row.content_hash.clone()) .or_default() .push(row); } let mut groups: Vec = by_hash .into_iter() .filter(|(_, members)| members.len() > 1) .map(|(hash, members)| DuplicateGroup { kind: DuplicateKind::Exact, representative_hash: hash, members: members.into_iter().map(DuplicateMember::from).collect(), }) .collect(); // Largest groups first (most reward per click), then deterministic. groups.sort_by(|a, b| { b.members .len() .cmp(&a.members.len()) .then_with(|| a.representative_hash.cmp(&b.representative_hash)) }); groups } /// Bits set in a "useful" perceptual hash. Real photographic content /// produces ~50/50 bit distributions; anything outside the [16, 48] /// band is low-entropy structure (uniform skies, black frames, /// monochrome scans, faded film) where pHash collapses to near- /// uniform values that Hamming-trivially across hundreds of unrelated /// images. The 8/56 band that shipped first was too permissive — /// even at threshold=4 the false-positive cluster persisted. const MIN_INFORMATIVE_POPCOUNT: u32 = 16; const MAX_INFORMATIVE_POPCOUNT: u32 = 64 - MIN_INFORMATIVE_POPCOUNT; #[inline] fn is_informative_hash(h: i64) -> bool { let pop = (h as u64).count_ones(); (MIN_INFORMATIVE_POPCOUNT..=MAX_INFORMATIVE_POPCOUNT).contains(&pop) } /// dHash gets a stricter threshold than pHash. pHash is the /// candidate-discovery signal (BK-tree neighbourhood lookup); dHash /// is the validation signal that has to actively agree before we /// union. Splitting the budget asymmetrically means a real near-dup /// (which scores well on both) survives while an incidental pHash /// collision (uniform-content false positive) gets vetoed. /// /// Floor of 2 so threshold=4 still allows a 1-bit jitter in dHash — /// genuine resampling can flip a low-frequency gradient bit even /// when the visual content is identical. #[inline] fn dhash_threshold(phash_threshold: u32) -> u32 { (phash_threshold / 2).max(2) } /// Single-link cluster the input rows by Hamming distance over their /// pHash, with `threshold` as the maximum distance for an edge. Rows /// without a pHash, or with a degenerate (low-entropy) pHash, are /// excluded — they'd chain together unrelated images. /// /// Two-signal validation: the BK-tree gives candidate pairs cheaply, /// then we additionally require dHash agreement before unioning. pHash /// alone is too permissive; pairing it with dHash collapses the false- /// positive cluster significantly (different DCT vs gradient /// signatures on real near-dups still both stay close, but spurious /// pHash collisions on uniform images don't survive the dHash check). /// /// Implementation: BK-tree neighbourhood lookup per row, union-find /// over the validated edges. O(N log N) instead of the O(N²) naive /// pairwise scan; on a 1.26M-row library that's the difference between /// "responds in 1.5 s" and "responds in 25 minutes". fn cluster_perceptual(rows: Vec, threshold: u32) -> Vec { let candidates: Vec = rows .into_iter() .filter(|r| r.phash_64.is_some_and(is_informative_hash)) .collect(); if candidates.len() < 2 { return Vec::new(); } // Build BK-tree keyed on (phash_u64, index-in-candidates). let mut tree: BKTree = BKTree::new(HammingMetric); for (idx, row) in candidates.iter().enumerate() { if let Some(p) = row.phash_64 { tree.add(HashKey { phash: p as u64, idx, }); } } // Union-find over edges within `threshold`. For a candidate pair // surfaced by the pHash BK-tree, require dHash within a *stricter* // threshold (`dhash_threshold(threshold)`) before unioning. pHash // agreement on low-entropy structure can be incidental; pHash // agreement AND dHash within roughly half that distance is a // strong near-dup signal. dHash on either side missing → reject // (was: trust pHash alone). Missing dHash means we can't validate // the candidate, and the false-positive cost outweighs the rare // case of a partial backfill. let dhash_max = dhash_threshold(threshold); let mut uf = UnionFind::new(candidates.len()); for (idx, row) in candidates.iter().enumerate() { let Some(p) = row.phash_64 else { continue }; let key = HashKey { phash: p as u64, idx, }; for (_, neighbour) in tree.find(&key, threshold) { if neighbour.idx == idx { continue; } let other = &candidates[neighbour.idx]; let dhash_ok = match (row.dhash_64, other.dhash_64) { (Some(a), Some(b)) => { (a as u64 ^ b as u64).count_ones() <= dhash_max && is_informative_hash(a) && is_informative_hash(b) } _ => false, }; if dhash_ok { uf.union(idx, neighbour.idx); } } } // Bucket by root. let mut by_root: HashMap> = HashMap::new(); for (idx, row) in candidates.into_iter().enumerate() { let root = uf.find(idx); by_root.entry(root).or_default().push(row); } // Medoid-validate each cluster to break single-link chains. // Single-link unions any pair within threshold; that means a chain // A↔B↔C can collapse into one cluster even when A and C aren't // similar. The medoid pass picks the cluster's most-central member // and drops any other whose distance to it exceeds threshold — // chains lose their tail, dense real-near-dup clusters keep all // members. Discard clusters that drop below 2 after refinement. let groups: Vec = by_root .into_values() .filter_map(|cluster| refine_cluster(cluster, threshold, dhash_max)) .map(|cluster| { let representative_hash = cluster[0].content_hash.clone(); DuplicateGroup { kind: DuplicateKind::Perceptual, representative_hash, members: cluster.into_iter().map(DuplicateMember::from).collect(), } }) .collect(); let mut groups = groups; groups.sort_by(|a, b| { b.members .len() .cmp(&a.members.len()) .then_with(|| a.representative_hash.cmp(&b.representative_hash)) }); groups } /// Tighten a single-link cluster to its medoid neighbourhood. Returns /// `None` when fewer than 2 members survive — caller drops the cluster. fn refine_cluster( cluster: Vec, phash_max: u32, dhash_max: u32, ) -> Option> { if cluster.len() < 2 { return None; } if cluster.len() == 2 { // No chain can exist with only two members; the union-find // already guaranteed both signals validated when joining. return Some(cluster); } // Pick the medoid: member whose summed pHash+dHash distance to the // rest of the cluster is smallest. Stable-deterministic via the // first-best-wins tie break (lower content_hash wins via natural // iteration order from the BK-tree input ordering). let phashes: Vec = cluster .iter() .map(|r| r.phash_64.unwrap_or(0) as u64) .collect(); let dhashes: Vec = cluster .iter() .map(|r| r.dhash_64.unwrap_or(0) as u64) .collect(); let mut best_idx = 0usize; let mut best_score = u32::MAX; for i in 0..cluster.len() { let mut score: u32 = 0; for j in 0..cluster.len() { if i == j { continue; } score = score.saturating_add((phashes[i] ^ phashes[j]).count_ones()); score = score.saturating_add((dhashes[i] ^ dhashes[j]).count_ones()); } if score < best_score { best_score = score; best_idx = i; } } let medoid_phash = phashes[best_idx]; let medoid_dhash = dhashes[best_idx]; let kept: Vec = cluster .into_iter() .enumerate() .filter(|(i, _)| { *i == best_idx || ((phashes[*i] ^ medoid_phash).count_ones() <= phash_max && (dhashes[*i] ^ medoid_dhash).count_ones() <= dhash_max) }) .map(|(_, r)| r) .collect(); if kept.len() < 2 { None } else { Some(kept) } } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] struct HashKey { phash: u64, idx: usize, } struct HammingMetric; impl Metric for HammingMetric { fn distance(&self, a: &HashKey, b: &HashKey) -> u32 { (a.phash ^ b.phash).count_ones() } fn threshold_distance(&self, a: &HashKey, b: &HashKey, _: u32) -> Option { Some(self.distance(a, b)) } } struct UnionFind { parent: Vec, rank: Vec, } impl UnionFind { fn new(n: usize) -> Self { Self { parent: (0..n).collect(), rank: vec![0; n], } } fn find(&mut self, x: usize) -> usize { if self.parent[x] != x { let root = self.find(self.parent[x]); self.parent[x] = root; } self.parent[x] } fn union(&mut self, a: usize, b: usize) { let ra = self.find(a); let rb = self.find(b); if ra == rb { return; } if self.rank[ra] < self.rank[rb] { self.parent[ra] = rb; } else if self.rank[ra] > self.rank[rb] { self.parent[rb] = ra; } else { self.parent[rb] = ra; self.rank[ra] += 1; } } } // ── Routing ────────────────────────────────────────────────────────────── pub fn add_duplicate_services(app: App) -> App where T: ServiceFactory< actix_web::dev::ServiceRequest, Config = (), Error = actix_web::Error, InitError = (), >, { app.service(web::resource("/duplicates/exact").route(web::get().to(list_exact_handler))) .service( web::resource("/duplicates/perceptual").route(web::get().to(list_perceptual_handler)), ) .service( web::resource("/duplicates/folder-pairs") .route(web::get().to(list_folder_pairs_handler)), ) .service(web::resource("/duplicates/resolve").route(web::post().to(resolve_handler))) .service(web::resource("/duplicates/unresolve").route(web::post().to(unresolve_handler))) } // ── Tests ──────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; fn row(library_id: i32, rel: &str, hash: &str, phash: Option) -> DuplicateRow { DuplicateRow { library_id, rel_path: rel.into(), content_hash: hash.into(), size_bytes: Some(1000), date_taken: None, width: None, height: None, phash_64: phash, dhash_64: None, duplicate_of_hash: None, duplicate_decided_at: None, } } #[test] fn group_exact_collapses_by_hash() { let rows = vec![ row(1, "a.jpg", "h1", None), row(1, "b.jpg", "h1", None), row(2, "c.jpg", "h1", None), row(1, "lonely.jpg", "h2", None), ]; let groups = group_exact(rows); assert_eq!(groups.len(), 1); assert_eq!(groups[0].representative_hash, "h1"); assert_eq!(groups[0].members.len(), 3); } /// All hashes used below have popcount in the "informative" /// 8..=56 band so they survive the entropy filter that keeps /// solid-colour images out of the cluster graph. const INFORMATIVE_BASE: i64 = 0x55AA_55AA_55AA_55AA; // popcount = 32 const INFORMATIVE_NEAR: i64 = 0x55AA_55AA_55AA_55AB; // 1-bit away from BASE const INFORMATIVE_FAR: i64 = 0x6996_6996_6996_6996; // 32-bits away from BASE fn row_with_dhash( library_id: i32, rel: &str, hash: &str, phash: Option, dhash: Option, ) -> DuplicateRow { DuplicateRow { library_id, rel_path: rel.into(), content_hash: hash.into(), size_bytes: Some(1000), date_taken: None, width: None, height: None, phash_64: phash, dhash_64: dhash, duplicate_of_hash: None, duplicate_decided_at: None, } } #[test] fn cluster_perceptual_unites_close_hashes() { // Two rows near each other on both pHash and dHash; one far // on pHash. Threshold 4 should merge the close pair. let rows = vec![ row_with_dhash( 1, "a.jpg", "h1", Some(INFORMATIVE_BASE), Some(INFORMATIVE_BASE), ), row_with_dhash( 1, "b.jpg", "h2", Some(INFORMATIVE_NEAR), Some(INFORMATIVE_NEAR), ), row_with_dhash( 1, "c.jpg", "h3", Some(INFORMATIVE_FAR), Some(INFORMATIVE_FAR), ), ]; let groups = cluster_perceptual(rows, 4); assert_eq!(groups.len(), 1); assert_eq!(groups[0].members.len(), 2); let paths: Vec<&str> = groups[0] .members .iter() .map(|m| m.rel_path.as_str()) .collect(); assert!(paths.contains(&"a.jpg")); assert!(paths.contains(&"b.jpg")); } #[test] fn cluster_perceptual_threshold_zero_drops_distinct() { let rows = vec![ row_with_dhash( 1, "a.jpg", "h1", Some(INFORMATIVE_BASE), Some(INFORMATIVE_BASE), ), row_with_dhash( 1, "b.jpg", "h2", Some(INFORMATIVE_NEAR), Some(INFORMATIVE_NEAR), ), ]; let groups = cluster_perceptual(rows, 0); assert!(groups.is_empty()); } #[test] fn cluster_perceptual_skips_singletons() { let rows = vec![row(1, "alone.jpg", "h1", Some(INFORMATIVE_BASE))]; assert!(cluster_perceptual(rows, 8).is_empty()); } #[test] fn cluster_perceptual_filters_low_entropy_hashes() { // Both 0 (popcount 0) and i64::MAX (popcount 63) fall outside // the informative band. A pair of these would trivially match // (Hamming distance to each other small or zero) without the // entropy filter — that's exactly the regression that was // producing a giant first cluster of solid-colour images. let rows = vec![ row(1, "blank-a.jpg", "h1", Some(0)), row(1, "blank-b.jpg", "h2", Some(0)), row(1, "white-a.jpg", "h3", Some(i64::MAX)), row(1, "white-b.jpg", "h4", Some(i64::MAX)), ]; assert!(cluster_perceptual(rows, 8).is_empty()); } #[test] fn cluster_perceptual_requires_dhash_agreement() { // pHash within threshold but dHash far apart — the candidate // edge from the BK-tree must be rejected. Without the dHash // double-check this would form a 2-member cluster. let rows = vec![ row_with_dhash( 1, "a.jpg", "h1", Some(INFORMATIVE_BASE), Some(INFORMATIVE_BASE), ), row_with_dhash( 1, "b.jpg", "h2", Some(INFORMATIVE_NEAR), Some(INFORMATIVE_FAR), ), ]; assert!(cluster_perceptual(rows, 4).is_empty()); } #[test] fn cluster_perceptual_breaks_long_chain_at_medoid() { // 4-link chain at threshold=2 with pairwise distances chosen // so single-link unions all four but the endpoints sit past // the medoid's neighbourhood. Bit positions hop by exactly 2 // bits per step, in non-overlapping nibbles, so consecutive // hops compose into wider distant-pair distances: // A↔B = 2, B↔C = 2, C↔D = 2, // A↔C = 4, B↔D = 4, A↔D = 6. // Medoid (B or C) keeps Δ ≤ 2 of itself; the far endpoint // gets chopped, leaving exactly 3 members. const A: i64 = 0x55AA_55AA_55AA_55AA; const B: i64 = 0x55AA_55AA_55AA_55A9; // ^0x03 last byte const C: i64 = 0x55AA_55AA_55AA_55A5; // ^0x0C from B const D: i64 = 0x55AA_55AA_55AA_5595; // ^0x30 from C let rows = vec![ row_with_dhash(1, "a.jpg", "h1", Some(A), Some(A)), row_with_dhash(1, "b.jpg", "h2", Some(B), Some(B)), row_with_dhash(1, "c.jpg", "h3", Some(C), Some(C)), row_with_dhash(1, "d.jpg", "h4", Some(D), Some(D)), ]; let groups = cluster_perceptual(rows, 2); assert_eq!(groups.len(), 1); assert_eq!( groups[0].members.len(), 3, "medoid pass should chop one chain endpoint past Δ=2" ); } fn dup_row_at(library_id: i32, rel: &str, hash: &str) -> DuplicateRow { DuplicateRow { library_id, rel_path: rel.into(), content_hash: hash.into(), size_bytes: Some(1000), date_taken: None, width: None, height: None, phash_64: None, dhash_64: None, duplicate_of_hash: None, duplicate_decided_at: None, } } #[test] fn folder_dir_strips_basename() { assert_eq!(folder_dir("Cars/BMW/DSC_5530.NEF"), "Cars/BMW"); assert_eq!(folder_dir("IMG_1.jpg"), ""); assert_eq!(folder_dir("a/b/c/d.jpg"), "a/b/c"); } #[test] fn folder_totals_groups_by_dir() { let rows = vec![ (1, "a/x.jpg".to_string()), (1, "a/y.jpg".to_string()), (1, "b/z.jpg".to_string()), (2, "a/x.jpg".to_string()), ]; let t = folder_totals(&rows); assert_eq!(t.get(&(1, "a".into())).copied(), Some(2)); assert_eq!(t.get(&(1, "b".into())).copied(), Some(1)); assert_eq!(t.get(&(2, "a".into())).copied(), Some(1)); } #[test] fn bucket_folder_pairs_collapses_within_folder_dups() { // A hash that exists at TWO paths in the same folder isn't a // folder-pair edge — it's a within-folder dup. Should produce // zero pairs. let rows = vec![ dup_row_at(1, "f1/a.jpg", "h1"), dup_row_at(1, "f1/a_copy.jpg", "h1"), ]; let totals: HashMap<(i32, String), i64> = [((1, "f1".to_string()), 2)].into_iter().collect(); let pairs = bucket_folder_pairs(rows, &totals); assert!(pairs.is_empty()); } #[test] fn bucket_folder_pairs_canonicalizes_pair_order() { // Two hashes both span (lib1, "f1") and (lib2, "f2") — should // bucket onto the SAME pair, regardless of which side the dup // query encounters first. let rows = vec![ dup_row_at(1, "f1/a.jpg", "h1"), dup_row_at(2, "f2/a.jpg", "h1"), dup_row_at(2, "f2/b.jpg", "h2"), dup_row_at(1, "f1/b.jpg", "h2"), ]; let totals: HashMap<(i32, String), i64> = [((1, "f1".to_string()), 2), ((2, "f2".to_string()), 2)] .into_iter() .collect(); let pairs = bucket_folder_pairs(rows, &totals); assert_eq!(pairs.len(), 1); assert_eq!(pairs[0].shared_count, 2); assert!((pairs[0].coverage - 1.0).abs() < 1e-6); } #[test] fn bucket_folder_pairs_subset_coverage() { // BMW (50 files) is a strict subset of Night Photos (200 files): // 3 hashes shared between them, BMW has 5 files, Night has 20. // Coverage should be 3/5 = 0.6 (smaller side fully informs the // metric — that's the "fully contained" signal). let rows = vec![ dup_row_at(1, "Cars/BMW/a.NEF", "h1"), dup_row_at(1, "Cars/Night Photos/2015/July/a.NEF", "h1"), dup_row_at(1, "Cars/BMW/b.NEF", "h2"), dup_row_at(1, "Cars/Night Photos/2015/July/b.NEF", "h2"), dup_row_at(1, "Cars/BMW/c.NEF", "h3"), dup_row_at(1, "Cars/Night Photos/2015/July/c.NEF", "h3"), ]; let totals: HashMap<(i32, String), i64> = [ ((1, "Cars/BMW".to_string()), 5), ((1, "Cars/Night Photos/2015/July".to_string()), 20), ] .into_iter() .collect(); let pairs = bucket_folder_pairs(rows, &totals); assert_eq!(pairs.len(), 1); assert_eq!(pairs[0].shared_count, 3); // Smaller side (BMW, 5 files) hits coverage 3/5 = 0.6. assert!((pairs[0].coverage - 0.6).abs() < 1e-6); } #[test] fn bucket_folder_pairs_picks_lex_smallest_rep() { // Two copies in folder A, one in folder B. The shared_files // entry should reference A's lex-smallest rel_path. let rows = vec![ dup_row_at(1, "A/z.jpg", "h1"), dup_row_at(1, "A/a.jpg", "h1"), dup_row_at(1, "B/q.jpg", "h1"), ]; let totals: HashMap<(i32, String), i64> = [((1, "A".to_string()), 2), ((1, "B".to_string()), 1)] .into_iter() .collect(); let pairs = bucket_folder_pairs(rows, &totals); assert_eq!(pairs.len(), 1); let f = &pairs[0].shared_files[0]; // Pair is canonicalized so A < B; A's rep wins lex. assert_eq!(f.a_rel_path, "A/a.jpg"); assert_eq!(f.b_rel_path, "B/q.jpg"); } #[test] fn filter_folder_pairs_applies_thresholds() { let totals: HashMap<(i32, String), i64> = [ ((1, "tiny".to_string()), 1), ((1, "tinier".to_string()), 1), ((1, "big-a".to_string()), 100), ((1, "big-b".to_string()), 100), ] .into_iter() .collect(); // tiny↔tinier: 1 shared, coverage 1.0 — should be filtered out // by min_shared=3 (incidental, not a folder-level signal). // big-a↔big-b: 50 shared — should pass. let mut rows: Vec = Vec::new(); rows.push(dup_row_at(1, "tiny/x.jpg", "ht")); rows.push(dup_row_at(1, "tinier/x.jpg", "ht")); for i in 0..50 { let h = format!("h{}", i); rows.push(dup_row_at(1, &format!("big-a/{}.jpg", i), &h)); rows.push(dup_row_at(1, &format!("big-b/{}.jpg", i), &h)); } let pairs = bucket_folder_pairs(rows, &totals); let kept = filter_folder_pairs(pairs, 0.5, 3); assert_eq!(kept.len(), 1); assert_eq!(kept[0].shared_count, 50); } /// Sanity-check the BK-tree's metric, which is what the duplicates /// path actually clusters on. #[test] fn hamming_metric_is_symmetric() { let m = HammingMetric; let a = HashKey { phash: 0b1010, idx: 0, }; let b = HashKey { phash: 0b0101, idx: 1, }; let d1 = m.distance(&a, &b); let d2 = m.distance(&b, &a); assert_eq!(d1, d2); assert_eq!(d1, 4); } }