Files
ImageApi/src/duplicates.rs
Cameron Cordes 67cf0c7f73 duplicates: folder-pair view of exact dups
Bucket exact-dup rows by (library_id, dirname) pair on each side, then
filter by coverage = shared / min(folder_a_total, folder_b_total) and
an absolute floor on shared count. Surfaces "this folder is mostly
contained in that folder" matches that the per-file EXACT view buries
under one row each — e.g. an old phone-backup tree shadowing the
organized library, or a topic-grouped folder duplicating a date-grouped
one within the same library.

New endpoint: GET /duplicates/folder-pairs?library=&include_resolved=
&min_coverage=&min_shared=. Cached 5 min keyed on (library, include_resolved);
the user-tunable thresholds filter the cached unfiltered pair list so
slider drags don't re-bucket. Shares the resolve / unresolve flow with
the existing tabs — the frontend fans out N parallel /resolve calls,
one per shared content_hash.

Folder names carry no signal (BMW lives under Night Photos, not BMW_backup),
so bucketing is purely on (library_id, dirname) co-occurrence in
exact-dup groups. Within-folder dups (same hash twice in the same
folder) are skipped — those belong to the EXACT tab.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 12:43:29 -04:00

1369 lines
49 KiB
Rust

//! Duplicate detection surface — exact (blake3) and perceptual
//! (pHash + Hamming) groups, plus the soft-mark resolve flow that
//! Apollo's DUPLICATES modal drives.
//!
//! All routes require auth (Claims). Endpoints:
//!
//! - `GET /duplicates/exact?library=&include_resolved=` — count>1 byte-identical groups.
//! - `GET /duplicates/perceptual?library=&threshold=&include_resolved=` — Hamming-clustered groups.
//! - `POST /duplicates/resolve` — soft-mark demoted siblings.
//! - `POST /duplicates/unresolve` — clear a prior soft-mark.
//!
//! Perceptual clustering caches the BK-tree result for 5 minutes so
//! repeated opens of the modal don't re-cluster the whole library.
//! Cache invalidation is best-effort: resolve/unresolve clear the
//! cache, but new files arriving via the watcher don't (the next
//! 5-minute window picks them up). For a single-user personal tool
//! that's the right trade-off.
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use actix_web::{App, HttpRequest, HttpResponse, Responder, dev::ServiceFactory, web};
use bk_tree::{BKTree, Metric};
use lazy_static::lazy_static;
use opentelemetry::trace::{TraceContextExt, Tracer};
use serde::{Deserialize, Serialize};
use crate::data::Claims;
use crate::database::{DuplicateRow, ExifDao};
use crate::libraries;
use crate::otel::{extract_context_from_request, global_tracer};
use crate::state::AppState;
// ── Cache ────────────────────────────────────────────────────────────────
const PERCEPTUAL_CACHE_TTL: Duration = Duration::from_secs(300);
const FOLDER_PAIR_CACHE_TTL: Duration = Duration::from_secs(300);
#[derive(Clone)]
struct PerceptualCacheEntry {
/// Cache key: (library_id, threshold, include_resolved). `library_id`
/// is `None` for "all libraries". Cluster output is the same shape we
/// return on the wire so we can serve cached requests with zero work.
library_id: Option<i32>,
threshold: u32,
include_resolved: bool,
computed_at: Instant,
groups: Vec<DuplicateGroup>,
}
#[derive(Clone)]
struct FolderPairCacheEntry {
library_id: Option<i32>,
include_resolved: bool,
computed_at: Instant,
/// Pre-filter cache: every detected folder pair, regardless of
/// min_coverage / min_shared. The handler applies the user-supplied
/// thresholds on top so a slider drag is a memcpy + a filter, not a
/// re-bucket. Bucketing dominates the cost (O(N²) over members of
/// big exact-dup groups), so this is the right cache boundary.
pairs: Vec<FolderPair>,
}
lazy_static! {
static ref PERCEPTUAL_CACHE: Mutex<Option<PerceptualCacheEntry>> = Mutex::new(None);
static ref FOLDER_PAIR_CACHE: Mutex<Option<FolderPairCacheEntry>> = Mutex::new(None);
}
/// Drop the perceptual-cluster cache. Called from `resolve`/`unresolve`
/// so the next modal open reflects the soft-mark change immediately.
fn invalidate_perceptual_cache() {
if let Ok(mut guard) = PERCEPTUAL_CACHE.lock() {
*guard = None;
}
}
/// Drop the folder-pair cache. Same rationale as
/// `invalidate_perceptual_cache`: a resolve mutates which rows
/// participate, so the next folder-pair fetch must re-bucket.
fn invalidate_folder_pair_cache() {
if let Ok(mut guard) = FOLDER_PAIR_CACHE.lock() {
*guard = None;
}
}
// ── Wire shapes ──────────────────────────────────────────────────────────
#[derive(Serialize, Debug, Clone)]
pub struct DuplicateMember {
pub library_id: i32,
pub rel_path: String,
pub content_hash: String,
pub size_bytes: Option<i64>,
pub date_taken: Option<i64>,
pub width: Option<i32>,
pub height: Option<i32>,
pub duplicate_of_hash: Option<String>,
pub duplicate_decided_at: Option<i64>,
}
impl From<DuplicateRow> for DuplicateMember {
fn from(r: DuplicateRow) -> Self {
Self {
library_id: r.library_id,
rel_path: r.rel_path,
content_hash: r.content_hash,
size_bytes: r.size_bytes,
date_taken: r.date_taken,
width: r.width,
height: r.height,
duplicate_of_hash: r.duplicate_of_hash,
duplicate_decided_at: r.duplicate_decided_at,
}
}
}
#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "lowercase")]
pub enum DuplicateKind {
Exact,
Perceptual,
}
#[derive(Serialize, Debug, Clone)]
pub struct DuplicateGroup {
pub kind: DuplicateKind,
/// Representative content_hash. For exact groups, the shared hash
/// (every member has the same one). For perceptual groups, an
/// arbitrary cluster member's hash, used only as a stable id for
/// the UI to key off.
pub representative_hash: String,
pub members: Vec<DuplicateMember>,
}
#[derive(Deserialize, Debug)]
pub struct ListDuplicatesQuery {
pub library: Option<String>,
#[serde(default)]
pub include_resolved: Option<bool>,
/// Perceptual only — Hamming-distance threshold. Ignored on the
/// exact endpoint. Defaults to 8 (~12% similarity tolerance, the
/// sweet spot for resized/recompressed copies).
#[serde(default)]
pub threshold: Option<u32>,
}
#[derive(Deserialize, Debug)]
pub struct DuplicateMemberRef {
pub library_id: i32,
pub rel_path: String,
}
#[derive(Deserialize, Debug)]
pub struct ResolveDuplicatesReq {
pub survivor: DuplicateMemberRef,
pub demoted: Vec<DuplicateMemberRef>,
}
#[derive(Serialize, Debug)]
pub struct ResolveResponse {
pub resolved_count: usize,
}
#[derive(Deserialize, Debug)]
pub struct UnresolveDuplicateReq {
pub library_id: i32,
pub rel_path: String,
}
#[derive(Deserialize, Debug)]
pub struct ListFolderPairsQuery {
pub library: Option<String>,
#[serde(default)]
pub include_resolved: Option<bool>,
/// Coverage floor: `shared / min(side_a.total, side_b.total)`.
/// Default 0.5 — surfaces "at least half of the smaller folder is
/// duplicated in the other folder", which is the threshold above
/// which "demote one whole side" feels safe.
#[serde(default)]
pub min_coverage: Option<f32>,
/// Absolute floor on shared-file count. Default 3 — anything less
/// is incidental noise (e.g. two folders that happen to share a
/// stock background image).
#[serde(default)]
pub min_shared: Option<u32>,
}
#[derive(Serialize, Debug, Clone)]
pub struct FolderEndpoint {
pub library_id: i32,
/// Folder path relative to library root, e.g. `Cars/BMW`. Empty
/// string for files at the library root (no leading slash).
pub folder: String,
/// Total count of `image_exif` rows in this folder, applied with
/// the same `include_resolved` filter as the dup query so the
/// numerator and denominator come from the same population.
pub total_files: i64,
}
#[derive(Serialize, Debug, Clone)]
pub struct FolderPairFile {
pub content_hash: String,
pub a_rel_path: String,
pub b_rel_path: String,
pub size_bytes: Option<i64>,
pub date_taken: Option<i64>,
pub width: Option<i32>,
pub height: Option<i32>,
}
#[derive(Serialize, Debug, Clone)]
pub struct FolderPair {
pub side_a: FolderEndpoint,
pub side_b: FolderEndpoint,
pub shared_count: i64,
/// `shared_count / min(side_a.total_files, side_b.total_files)`,
/// in `[0.0, 1.0]`. A coverage of 1.0 means the smaller folder is
/// fully contained in the other.
pub coverage: f32,
pub shared_files: Vec<FolderPairFile>,
}
// ── Handlers ─────────────────────────────────────────────────────────────
async fn list_exact_handler(
_: Claims,
request: HttpRequest,
app_state: web::Data<AppState>,
query: web::Query<ListDuplicatesQuery>,
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let context = extract_context_from_request(&request);
let span = global_tracer().start_with_context("duplicates.list_exact", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref())
.ok()
.flatten()
.map(|l| l.id);
let include_resolved = query.include_resolved.unwrap_or(false);
let rows = {
let mut dao = exif_dao.lock().expect("exif dao lock");
match dao.list_duplicates_exact(&span_context, library_id, include_resolved) {
Ok(rows) => rows,
Err(e) => {
return HttpResponse::InternalServerError().body(format!("{:?}", e));
}
}
};
let groups = group_exact(rows);
HttpResponse::Ok().json(GroupsResponse { groups })
}
async fn list_perceptual_handler(
_: Claims,
request: HttpRequest,
app_state: web::Data<AppState>,
query: web::Query<ListDuplicatesQuery>,
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let context = extract_context_from_request(&request);
let span = global_tracer().start_with_context("duplicates.list_perceptual", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref())
.ok()
.flatten()
.map(|l| l.id);
let threshold = query.threshold.unwrap_or(8).clamp(0, 32);
let include_resolved = query.include_resolved.unwrap_or(false);
// Cache hit?
if let Ok(guard) = PERCEPTUAL_CACHE.lock()
&& let Some(entry) = guard.as_ref()
&& entry.library_id == library_id
&& entry.threshold == threshold
&& entry.include_resolved == include_resolved
&& entry.computed_at.elapsed() < PERCEPTUAL_CACHE_TTL
{
return HttpResponse::Ok().json(GroupsResponse {
groups: entry.groups.clone(),
});
}
let rows = {
let mut dao = exif_dao.lock().expect("exif dao lock");
match dao.list_perceptual_candidates(&span_context, library_id, include_resolved) {
Ok(rows) => rows,
Err(e) => {
return HttpResponse::InternalServerError().body(format!("{:?}", e));
}
}
};
let groups = cluster_perceptual(rows, threshold);
if let Ok(mut guard) = PERCEPTUAL_CACHE.lock() {
*guard = Some(PerceptualCacheEntry {
library_id,
threshold,
include_resolved,
computed_at: Instant::now(),
groups: groups.clone(),
});
}
HttpResponse::Ok().json(GroupsResponse { groups })
}
async fn resolve_handler(
_: Claims,
request: HttpRequest,
body: web::Json<ResolveDuplicatesReq>,
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let context = extract_context_from_request(&request);
let span = global_tracer().start_with_context("duplicates.resolve", &context);
let span_context = opentelemetry::Context::current_with_span(span);
if body.demoted.is_empty() {
return HttpResponse::BadRequest().body("demoted list is empty");
}
let mut dao = exif_dao.lock().expect("exif dao lock");
// Resolve survivor → its content_hash, plus the canonical rel_path
// we'll use as the destination for any tag-union INSERTs.
let survivor = match dao.lookup_duplicate_row(
&span_context,
body.survivor.library_id,
&body.survivor.rel_path,
) {
Ok(Some(row)) => row,
Ok(None) => return HttpResponse::NotFound().body("survivor not found"),
Err(e) => return HttpResponse::InternalServerError().body(format!("{:?}", e)),
};
// Survivor must not itself be soft-marked — otherwise the modal is
// pointing at a row we've already demoted, which would create a chain.
if survivor.duplicate_of_hash.is_some() {
return HttpResponse::Conflict().body("survivor is itself soft-marked as a duplicate");
}
let now = chrono::Utc::now().timestamp();
let mut resolved_count = 0usize;
for member_ref in &body.demoted {
let demoted = match dao.lookup_duplicate_row(
&span_context,
member_ref.library_id,
&member_ref.rel_path,
) {
Ok(Some(row)) => row,
Ok(None) => {
log::warn!(
"duplicates.resolve: skipping unknown demoted ({}, {})",
member_ref.library_id,
member_ref.rel_path
);
continue;
}
Err(e) => {
return HttpResponse::InternalServerError().body(format!("{:?}", e));
}
};
// Survivor and demoted must not be the same row (would set
// duplicate_of_hash to its own hash — recursive nonsense).
if demoted.library_id == survivor.library_id && demoted.rel_path == survivor.rel_path {
continue;
}
// For perceptual dups (different content_hash), union the
// demoted's tag set onto the survivor before flipping the
// soft-mark. For exact dups (same content_hash), tags are
// already shared at the bytes layer — the union is a no-op.
if demoted.content_hash != survivor.content_hash
&& let Err(e) = dao.union_perceptual_tags(
&span_context,
&survivor.content_hash,
&demoted.content_hash,
&survivor.rel_path,
)
{
log::warn!(
"duplicates.resolve: tag union failed for {}: {:?}",
demoted.rel_path,
e
);
// Continue with the soft-mark anyway — losing tag
// continuity is recoverable (unresolve restores the
// demoted row's grid presence, and the original tags
// never moved off the demoted hash).
}
if let Err(e) = dao.set_duplicate_of(
&span_context,
demoted.library_id,
&demoted.rel_path,
&survivor.content_hash,
now,
) {
return HttpResponse::InternalServerError().body(format!("{:?}", e));
}
resolved_count += 1;
}
drop(dao);
invalidate_perceptual_cache();
invalidate_folder_pair_cache();
HttpResponse::Ok().json(ResolveResponse { resolved_count })
}
async fn unresolve_handler(
_: Claims,
request: HttpRequest,
body: web::Json<UnresolveDuplicateReq>,
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let context = extract_context_from_request(&request);
let span = global_tracer().start_with_context("duplicates.unresolve", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let mut dao = exif_dao.lock().expect("exif dao lock");
if let Err(e) = dao.clear_duplicate_of(&span_context, body.library_id, &body.rel_path) {
return HttpResponse::InternalServerError().body(format!("{:?}", e));
}
drop(dao);
invalidate_perceptual_cache();
invalidate_folder_pair_cache();
HttpResponse::Ok().finish()
}
async fn list_folder_pairs_handler(
_: Claims,
request: HttpRequest,
app_state: web::Data<AppState>,
query: web::Query<ListFolderPairsQuery>,
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let context = extract_context_from_request(&request);
let span = global_tracer().start_with_context("duplicates.list_folder_pairs", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref())
.ok()
.flatten()
.map(|l| l.id);
let include_resolved = query.include_resolved.unwrap_or(false);
let min_coverage = query.min_coverage.unwrap_or(0.5).clamp(0.0, 1.0);
let min_shared = query.min_shared.unwrap_or(3).max(1);
// Cache hit on the (library, include_resolved) tuple — coverage /
// min_shared are user-tunable filters applied AFTER bucketing, so
// the cache stores the unfiltered pair list.
if let Ok(guard) = FOLDER_PAIR_CACHE.lock()
&& let Some(entry) = guard.as_ref()
&& entry.library_id == library_id
&& entry.include_resolved == include_resolved
&& entry.computed_at.elapsed() < FOLDER_PAIR_CACHE_TTL
{
let filtered = filter_folder_pairs(entry.pairs.clone(), min_coverage, min_shared);
return HttpResponse::Ok().json(FolderPairsResponse { pairs: filtered });
}
let (dup_rows, all_paths) = {
let mut dao = exif_dao.lock().expect("exif dao lock");
let dup_rows = match dao.list_duplicates_exact(&span_context, library_id, include_resolved)
{
Ok(rows) => rows,
Err(e) => return HttpResponse::InternalServerError().body(format!("{:?}", e)),
};
let all_paths = match dao.list_image_paths(&span_context, library_id, include_resolved) {
Ok(rows) => rows,
Err(e) => return HttpResponse::InternalServerError().body(format!("{:?}", e)),
};
(dup_rows, all_paths)
};
let totals = folder_totals(&all_paths);
let pairs = bucket_folder_pairs(dup_rows, &totals);
if let Ok(mut guard) = FOLDER_PAIR_CACHE.lock() {
*guard = Some(FolderPairCacheEntry {
library_id,
include_resolved,
computed_at: Instant::now(),
pairs: pairs.clone(),
});
}
let filtered = filter_folder_pairs(pairs, min_coverage, min_shared);
HttpResponse::Ok().json(FolderPairsResponse { pairs: filtered })
}
// ── Grouping / clustering ────────────────────────────────────────────────
#[derive(Serialize, Debug)]
struct GroupsResponse {
groups: Vec<DuplicateGroup>,
}
#[derive(Serialize, Debug)]
struct FolderPairsResponse {
pairs: Vec<FolderPair>,
}
/// Folder portion of `rel_path`: everything up to (and excluding) the
/// last `/`. Returns an empty string for top-level files. Library-root
/// agnostic — the rel_path is already relative to the library root.
fn folder_dir(rel_path: &str) -> &str {
match rel_path.rfind('/') {
Some(i) => &rel_path[..i],
None => "",
}
}
/// Per-folder file totals from a flat `(library_id, rel_path)` listing.
/// Used as the denominator for the coverage metric. We count every
/// hashed image_exif row that matches the dup query's filter, so the
/// numerator (shared dups) and denominator (folder population) come
/// from the same population.
fn folder_totals(rows: &[(i32, String)]) -> HashMap<(i32, String), i64> {
let mut out: HashMap<(i32, String), i64> = HashMap::new();
for (lib, rel_path) in rows {
let dir = folder_dir(rel_path).to_string();
*out.entry((*lib, dir)).or_insert(0) += 1;
}
out
}
/// Canonical ordering for a folder pair: lexicographic on
/// `(library_id, folder)`. Ensures we bucket `(F1, F2)` and `(F2, F1)`
/// onto the same key regardless of which member of an exact-dup group
/// we encounter first.
fn canonical_pair<'a>(
a: &'a (i32, String),
b: &'a (i32, String),
) -> (&'a (i32, String), &'a (i32, String)) {
if a <= b { (a, b) } else { (b, a) }
}
/// Bucket exact-dup rows into folder-pair edges. For each exact-dup
/// group, pick one representative member per (library, folder) tuple
/// (lex-smallest rel_path) so within-folder duplicates collapse to one
/// edge endpoint each — those are an EXACT-tab concern, not a
/// folder-pair one. Then for every distinct ordered pair of folders
/// the hash touches, record one shared-file entry.
///
/// Output is unfiltered; the caller applies `min_coverage` /
/// `min_shared` thresholds on top so the slider UX doesn't have to
/// re-bucket on every drag.
fn bucket_folder_pairs(
rows: Vec<DuplicateRow>,
totals: &HashMap<(i32, String), i64>,
) -> Vec<FolderPair> {
// Group dup rows by content_hash (the rows are already ordered by
// hash in the SQL query, but don't rely on that for correctness).
let mut by_hash: HashMap<String, Vec<DuplicateRow>> = HashMap::new();
for row in rows {
by_hash
.entry(row.content_hash.clone())
.or_default()
.push(row);
}
// Edge accumulator: pair-key → list of shared files. The pair key
// owns its strings so we can serialize folder names back into the
// wire shape without juggling lifetimes through the response.
type PairKey = ((i32, String), (i32, String));
let mut edges: HashMap<PairKey, Vec<FolderPairFile>> = HashMap::new();
for (_, members) in by_hash {
if members.len() < 2 {
continue;
}
// One representative per (library_id, folder). Lex-smallest
// rel_path wins — deterministic, and matches the SQL ORDER BY
// so the visible thumbnail is stable across requests.
let mut by_folder: HashMap<(i32, String), &DuplicateRow> = HashMap::new();
for m in &members {
let key = (m.library_id, folder_dir(&m.rel_path).to_string());
by_folder
.entry(key)
.and_modify(|existing| {
if m.rel_path < existing.rel_path {
*existing = m;
}
})
.or_insert(m);
}
// Skip degenerate hashes that all live in one folder — those
// are within-folder dups, surfaced by the EXACT tab.
if by_folder.len() < 2 {
continue;
}
// For every unordered pair of folders this hash touches, record
// one shared-file entry (one rep per side).
let folder_keys: Vec<&(i32, String)> = by_folder.keys().collect();
for i in 0..folder_keys.len() {
for j in (i + 1)..folder_keys.len() {
let (a_key, b_key) = canonical_pair(folder_keys[i], folder_keys[j]);
let a_row = by_folder[a_key];
let b_row = by_folder[b_key];
edges
.entry((a_key.clone(), b_key.clone()))
.or_default()
.push(FolderPairFile {
content_hash: a_row.content_hash.clone(),
a_rel_path: a_row.rel_path.clone(),
b_rel_path: b_row.rel_path.clone(),
// Size / dims should agree across exact dups,
// but if a NULL slipped in we just take side A's.
size_bytes: a_row.size_bytes.or(b_row.size_bytes),
date_taken: a_row.date_taken.or(b_row.date_taken),
width: a_row.width.or(b_row.width),
height: a_row.height.or(b_row.height),
});
}
}
}
let mut pairs: Vec<FolderPair> = edges
.into_iter()
.map(|((a, b), mut shared_files)| {
// Stable rel_path order inside the response so the
// frontend's thumbnail strip doesn't reshuffle on refetch.
shared_files.sort_by(|x, y| x.a_rel_path.cmp(&y.a_rel_path));
let total_a = totals.get(&a).copied().unwrap_or(0);
let total_b = totals.get(&b).copied().unwrap_or(0);
let shared_count = shared_files.len() as i64;
let denom = total_a.min(total_b).max(1) as f32;
let coverage = (shared_count as f32 / denom).clamp(0.0, 1.0);
FolderPair {
side_a: FolderEndpoint {
library_id: a.0,
folder: a.1,
total_files: total_a,
},
side_b: FolderEndpoint {
library_id: b.0,
folder: b.1,
total_files: total_b,
},
shared_count,
coverage,
shared_files,
}
})
.collect();
// Largest shared_count first (most reward per click), tie-break on
// higher coverage (subset-into-superset matches above scattered
// fragments of equal size), then deterministic by folder names.
pairs.sort_by(|a, b| {
b.shared_count
.cmp(&a.shared_count)
.then_with(|| {
b.coverage
.partial_cmp(&a.coverage)
.unwrap_or(std::cmp::Ordering::Equal)
})
.then_with(|| a.side_a.library_id.cmp(&b.side_a.library_id))
.then_with(|| a.side_a.folder.cmp(&b.side_a.folder))
.then_with(|| a.side_b.library_id.cmp(&b.side_b.library_id))
.then_with(|| a.side_b.folder.cmp(&b.side_b.folder))
});
pairs
}
fn filter_folder_pairs(
pairs: Vec<FolderPair>,
min_coverage: f32,
min_shared: u32,
) -> Vec<FolderPair> {
pairs
.into_iter()
.filter(|p| p.coverage >= min_coverage && p.shared_count >= min_shared as i64)
.collect()
}
fn group_exact(rows: Vec<DuplicateRow>) -> Vec<DuplicateGroup> {
let mut by_hash: HashMap<String, Vec<DuplicateRow>> = HashMap::new();
for row in rows {
by_hash
.entry(row.content_hash.clone())
.or_default()
.push(row);
}
let mut groups: Vec<DuplicateGroup> = by_hash
.into_iter()
.filter(|(_, members)| members.len() > 1)
.map(|(hash, members)| DuplicateGroup {
kind: DuplicateKind::Exact,
representative_hash: hash,
members: members.into_iter().map(DuplicateMember::from).collect(),
})
.collect();
// Largest groups first (most reward per click), then deterministic.
groups.sort_by(|a, b| {
b.members
.len()
.cmp(&a.members.len())
.then_with(|| a.representative_hash.cmp(&b.representative_hash))
});
groups
}
/// Bits set in a "useful" perceptual hash. Real photographic content
/// produces ~50/50 bit distributions; anything outside the [16, 48]
/// band is low-entropy structure (uniform skies, black frames,
/// monochrome scans, faded film) where pHash collapses to near-
/// uniform values that Hamming-trivially across hundreds of unrelated
/// images. The 8/56 band that shipped first was too permissive —
/// even at threshold=4 the false-positive cluster persisted.
const MIN_INFORMATIVE_POPCOUNT: u32 = 16;
const MAX_INFORMATIVE_POPCOUNT: u32 = 64 - MIN_INFORMATIVE_POPCOUNT;
#[inline]
fn is_informative_hash(h: i64) -> bool {
let pop = (h as u64).count_ones();
(MIN_INFORMATIVE_POPCOUNT..=MAX_INFORMATIVE_POPCOUNT).contains(&pop)
}
/// dHash gets a stricter threshold than pHash. pHash is the
/// candidate-discovery signal (BK-tree neighbourhood lookup); dHash
/// is the validation signal that has to actively agree before we
/// union. Splitting the budget asymmetrically means a real near-dup
/// (which scores well on both) survives while an incidental pHash
/// collision (uniform-content false positive) gets vetoed.
///
/// Floor of 2 so threshold=4 still allows a 1-bit jitter in dHash —
/// genuine resampling can flip a low-frequency gradient bit even
/// when the visual content is identical.
#[inline]
fn dhash_threshold(phash_threshold: u32) -> u32 {
(phash_threshold / 2).max(2)
}
/// Single-link cluster the input rows by Hamming distance over their
/// pHash, with `threshold` as the maximum distance for an edge. Rows
/// without a pHash, or with a degenerate (low-entropy) pHash, are
/// excluded — they'd chain together unrelated images.
///
/// Two-signal validation: the BK-tree gives candidate pairs cheaply,
/// then we additionally require dHash agreement before unioning. pHash
/// alone is too permissive; pairing it with dHash collapses the false-
/// positive cluster significantly (different DCT vs gradient
/// signatures on real near-dups still both stay close, but spurious
/// pHash collisions on uniform images don't survive the dHash check).
///
/// Implementation: BK-tree neighbourhood lookup per row, union-find
/// over the validated edges. O(N log N) instead of the O(N²) naive
/// pairwise scan; on a 1.26M-row library that's the difference between
/// "responds in 1.5 s" and "responds in 25 minutes".
fn cluster_perceptual(rows: Vec<DuplicateRow>, threshold: u32) -> Vec<DuplicateGroup> {
let candidates: Vec<DuplicateRow> = rows
.into_iter()
.filter(|r| r.phash_64.is_some_and(is_informative_hash))
.collect();
if candidates.len() < 2 {
return Vec::new();
}
// Build BK-tree keyed on (phash_u64, index-in-candidates).
let mut tree: BKTree<HashKey, HammingMetric> = BKTree::new(HammingMetric);
for (idx, row) in candidates.iter().enumerate() {
if let Some(p) = row.phash_64 {
tree.add(HashKey {
phash: p as u64,
idx,
});
}
}
// Union-find over edges within `threshold`. For a candidate pair
// surfaced by the pHash BK-tree, require dHash within a *stricter*
// threshold (`dhash_threshold(threshold)`) before unioning. pHash
// agreement on low-entropy structure can be incidental; pHash
// agreement AND dHash within roughly half that distance is a
// strong near-dup signal. dHash on either side missing → reject
// (was: trust pHash alone). Missing dHash means we can't validate
// the candidate, and the false-positive cost outweighs the rare
// case of a partial backfill.
let dhash_max = dhash_threshold(threshold);
let mut uf = UnionFind::new(candidates.len());
for (idx, row) in candidates.iter().enumerate() {
let Some(p) = row.phash_64 else { continue };
let key = HashKey {
phash: p as u64,
idx,
};
for (_, neighbour) in tree.find(&key, threshold) {
if neighbour.idx == idx {
continue;
}
let other = &candidates[neighbour.idx];
let dhash_ok = match (row.dhash_64, other.dhash_64) {
(Some(a), Some(b)) => {
(a as u64 ^ b as u64).count_ones() <= dhash_max
&& is_informative_hash(a)
&& is_informative_hash(b)
}
_ => false,
};
if dhash_ok {
uf.union(idx, neighbour.idx);
}
}
}
// Bucket by root.
let mut by_root: HashMap<usize, Vec<DuplicateRow>> = HashMap::new();
for (idx, row) in candidates.into_iter().enumerate() {
let root = uf.find(idx);
by_root.entry(root).or_default().push(row);
}
// Medoid-validate each cluster to break single-link chains.
// Single-link unions any pair within threshold; that means a chain
// A↔B↔C can collapse into one cluster even when A and C aren't
// similar. The medoid pass picks the cluster's most-central member
// and drops any other whose distance to it exceeds threshold —
// chains lose their tail, dense real-near-dup clusters keep all
// members. Discard clusters that drop below 2 after refinement.
let groups: Vec<DuplicateGroup> = by_root
.into_values()
.filter_map(|cluster| refine_cluster(cluster, threshold, dhash_max))
.map(|cluster| {
let representative_hash = cluster[0].content_hash.clone();
DuplicateGroup {
kind: DuplicateKind::Perceptual,
representative_hash,
members: cluster.into_iter().map(DuplicateMember::from).collect(),
}
})
.collect();
let mut groups = groups;
groups.sort_by(|a, b| {
b.members
.len()
.cmp(&a.members.len())
.then_with(|| a.representative_hash.cmp(&b.representative_hash))
});
groups
}
/// Tighten a single-link cluster to its medoid neighbourhood. Returns
/// `None` when fewer than 2 members survive — caller drops the cluster.
fn refine_cluster(
cluster: Vec<DuplicateRow>,
phash_max: u32,
dhash_max: u32,
) -> Option<Vec<DuplicateRow>> {
if cluster.len() < 2 {
return None;
}
if cluster.len() == 2 {
// No chain can exist with only two members; the union-find
// already guaranteed both signals validated when joining.
return Some(cluster);
}
// Pick the medoid: member whose summed pHash+dHash distance to the
// rest of the cluster is smallest. Stable-deterministic via the
// first-best-wins tie break (lower content_hash wins via natural
// iteration order from the BK-tree input ordering).
let phashes: Vec<u64> = cluster
.iter()
.map(|r| r.phash_64.unwrap_or(0) as u64)
.collect();
let dhashes: Vec<u64> = cluster
.iter()
.map(|r| r.dhash_64.unwrap_or(0) as u64)
.collect();
let mut best_idx = 0usize;
let mut best_score = u32::MAX;
for i in 0..cluster.len() {
let mut score: u32 = 0;
for j in 0..cluster.len() {
if i == j {
continue;
}
score = score.saturating_add((phashes[i] ^ phashes[j]).count_ones());
score = score.saturating_add((dhashes[i] ^ dhashes[j]).count_ones());
}
if score < best_score {
best_score = score;
best_idx = i;
}
}
let medoid_phash = phashes[best_idx];
let medoid_dhash = dhashes[best_idx];
let kept: Vec<DuplicateRow> = cluster
.into_iter()
.enumerate()
.filter(|(i, _)| {
*i == best_idx
|| ((phashes[*i] ^ medoid_phash).count_ones() <= phash_max
&& (dhashes[*i] ^ medoid_dhash).count_ones() <= dhash_max)
})
.map(|(_, r)| r)
.collect();
if kept.len() < 2 { None } else { Some(kept) }
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
struct HashKey {
phash: u64,
idx: usize,
}
struct HammingMetric;
impl Metric<HashKey> for HammingMetric {
fn distance(&self, a: &HashKey, b: &HashKey) -> u32 {
(a.phash ^ b.phash).count_ones()
}
fn threshold_distance(&self, a: &HashKey, b: &HashKey, _: u32) -> Option<u32> {
Some(self.distance(a, b))
}
}
struct UnionFind {
parent: Vec<usize>,
rank: Vec<u8>,
}
impl UnionFind {
fn new(n: usize) -> Self {
Self {
parent: (0..n).collect(),
rank: vec![0; n],
}
}
fn find(&mut self, x: usize) -> usize {
if self.parent[x] != x {
let root = self.find(self.parent[x]);
self.parent[x] = root;
}
self.parent[x]
}
fn union(&mut self, a: usize, b: usize) {
let ra = self.find(a);
let rb = self.find(b);
if ra == rb {
return;
}
if self.rank[ra] < self.rank[rb] {
self.parent[ra] = rb;
} else if self.rank[ra] > self.rank[rb] {
self.parent[rb] = ra;
} else {
self.parent[rb] = ra;
self.rank[ra] += 1;
}
}
}
// ── Routing ──────────────────────────────────────────────────────────────
pub fn add_duplicate_services<T>(app: App<T>) -> App<T>
where
T: ServiceFactory<
actix_web::dev::ServiceRequest,
Config = (),
Error = actix_web::Error,
InitError = (),
>,
{
app.service(web::resource("/duplicates/exact").route(web::get().to(list_exact_handler)))
.service(
web::resource("/duplicates/perceptual").route(web::get().to(list_perceptual_handler)),
)
.service(
web::resource("/duplicates/folder-pairs")
.route(web::get().to(list_folder_pairs_handler)),
)
.service(web::resource("/duplicates/resolve").route(web::post().to(resolve_handler)))
.service(web::resource("/duplicates/unresolve").route(web::post().to(unresolve_handler)))
}
// ── Tests ────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
fn row(library_id: i32, rel: &str, hash: &str, phash: Option<i64>) -> DuplicateRow {
DuplicateRow {
library_id,
rel_path: rel.into(),
content_hash: hash.into(),
size_bytes: Some(1000),
date_taken: None,
width: None,
height: None,
phash_64: phash,
dhash_64: None,
duplicate_of_hash: None,
duplicate_decided_at: None,
}
}
#[test]
fn group_exact_collapses_by_hash() {
let rows = vec![
row(1, "a.jpg", "h1", None),
row(1, "b.jpg", "h1", None),
row(2, "c.jpg", "h1", None),
row(1, "lonely.jpg", "h2", None),
];
let groups = group_exact(rows);
assert_eq!(groups.len(), 1);
assert_eq!(groups[0].representative_hash, "h1");
assert_eq!(groups[0].members.len(), 3);
}
/// All hashes used below have popcount in the "informative"
/// 8..=56 band so they survive the entropy filter that keeps
/// solid-colour images out of the cluster graph.
const INFORMATIVE_BASE: i64 = 0x55AA_55AA_55AA_55AA; // popcount = 32
const INFORMATIVE_NEAR: i64 = 0x55AA_55AA_55AA_55AB; // 1-bit away from BASE
const INFORMATIVE_FAR: i64 = 0x6996_6996_6996_6996; // 32-bits away from BASE
fn row_with_dhash(
library_id: i32,
rel: &str,
hash: &str,
phash: Option<i64>,
dhash: Option<i64>,
) -> DuplicateRow {
DuplicateRow {
library_id,
rel_path: rel.into(),
content_hash: hash.into(),
size_bytes: Some(1000),
date_taken: None,
width: None,
height: None,
phash_64: phash,
dhash_64: dhash,
duplicate_of_hash: None,
duplicate_decided_at: None,
}
}
#[test]
fn cluster_perceptual_unites_close_hashes() {
// Two rows near each other on both pHash and dHash; one far
// on pHash. Threshold 4 should merge the close pair.
let rows = vec![
row_with_dhash(
1,
"a.jpg",
"h1",
Some(INFORMATIVE_BASE),
Some(INFORMATIVE_BASE),
),
row_with_dhash(
1,
"b.jpg",
"h2",
Some(INFORMATIVE_NEAR),
Some(INFORMATIVE_NEAR),
),
row_with_dhash(
1,
"c.jpg",
"h3",
Some(INFORMATIVE_FAR),
Some(INFORMATIVE_FAR),
),
];
let groups = cluster_perceptual(rows, 4);
assert_eq!(groups.len(), 1);
assert_eq!(groups[0].members.len(), 2);
let paths: Vec<&str> = groups[0]
.members
.iter()
.map(|m| m.rel_path.as_str())
.collect();
assert!(paths.contains(&"a.jpg"));
assert!(paths.contains(&"b.jpg"));
}
#[test]
fn cluster_perceptual_threshold_zero_drops_distinct() {
let rows = vec![
row_with_dhash(
1,
"a.jpg",
"h1",
Some(INFORMATIVE_BASE),
Some(INFORMATIVE_BASE),
),
row_with_dhash(
1,
"b.jpg",
"h2",
Some(INFORMATIVE_NEAR),
Some(INFORMATIVE_NEAR),
),
];
let groups = cluster_perceptual(rows, 0);
assert!(groups.is_empty());
}
#[test]
fn cluster_perceptual_skips_singletons() {
let rows = vec![row(1, "alone.jpg", "h1", Some(INFORMATIVE_BASE))];
assert!(cluster_perceptual(rows, 8).is_empty());
}
#[test]
fn cluster_perceptual_filters_low_entropy_hashes() {
// Both 0 (popcount 0) and i64::MAX (popcount 63) fall outside
// the informative band. A pair of these would trivially match
// (Hamming distance to each other small or zero) without the
// entropy filter — that's exactly the regression that was
// producing a giant first cluster of solid-colour images.
let rows = vec![
row(1, "blank-a.jpg", "h1", Some(0)),
row(1, "blank-b.jpg", "h2", Some(0)),
row(1, "white-a.jpg", "h3", Some(i64::MAX)),
row(1, "white-b.jpg", "h4", Some(i64::MAX)),
];
assert!(cluster_perceptual(rows, 8).is_empty());
}
#[test]
fn cluster_perceptual_requires_dhash_agreement() {
// pHash within threshold but dHash far apart — the candidate
// edge from the BK-tree must be rejected. Without the dHash
// double-check this would form a 2-member cluster.
let rows = vec![
row_with_dhash(
1,
"a.jpg",
"h1",
Some(INFORMATIVE_BASE),
Some(INFORMATIVE_BASE),
),
row_with_dhash(
1,
"b.jpg",
"h2",
Some(INFORMATIVE_NEAR),
Some(INFORMATIVE_FAR),
),
];
assert!(cluster_perceptual(rows, 4).is_empty());
}
#[test]
fn cluster_perceptual_breaks_long_chain_at_medoid() {
// 4-link chain at threshold=2 with pairwise distances chosen
// so single-link unions all four but the endpoints sit past
// the medoid's neighbourhood. Bit positions hop by exactly 2
// bits per step, in non-overlapping nibbles, so consecutive
// hops compose into wider distant-pair distances:
// A↔B = 2, B↔C = 2, C↔D = 2,
// A↔C = 4, B↔D = 4, A↔D = 6.
// Medoid (B or C) keeps Δ ≤ 2 of itself; the far endpoint
// gets chopped, leaving exactly 3 members.
const A: i64 = 0x55AA_55AA_55AA_55AA;
const B: i64 = 0x55AA_55AA_55AA_55A9; // ^0x03 last byte
const C: i64 = 0x55AA_55AA_55AA_55A5; // ^0x0C from B
const D: i64 = 0x55AA_55AA_55AA_5595; // ^0x30 from C
let rows = vec![
row_with_dhash(1, "a.jpg", "h1", Some(A), Some(A)),
row_with_dhash(1, "b.jpg", "h2", Some(B), Some(B)),
row_with_dhash(1, "c.jpg", "h3", Some(C), Some(C)),
row_with_dhash(1, "d.jpg", "h4", Some(D), Some(D)),
];
let groups = cluster_perceptual(rows, 2);
assert_eq!(groups.len(), 1);
assert_eq!(
groups[0].members.len(),
3,
"medoid pass should chop one chain endpoint past Δ=2"
);
}
fn dup_row_at(library_id: i32, rel: &str, hash: &str) -> DuplicateRow {
DuplicateRow {
library_id,
rel_path: rel.into(),
content_hash: hash.into(),
size_bytes: Some(1000),
date_taken: None,
width: None,
height: None,
phash_64: None,
dhash_64: None,
duplicate_of_hash: None,
duplicate_decided_at: None,
}
}
#[test]
fn folder_dir_strips_basename() {
assert_eq!(folder_dir("Cars/BMW/DSC_5530.NEF"), "Cars/BMW");
assert_eq!(folder_dir("IMG_1.jpg"), "");
assert_eq!(folder_dir("a/b/c/d.jpg"), "a/b/c");
}
#[test]
fn folder_totals_groups_by_dir() {
let rows = vec![
(1, "a/x.jpg".to_string()),
(1, "a/y.jpg".to_string()),
(1, "b/z.jpg".to_string()),
(2, "a/x.jpg".to_string()),
];
let t = folder_totals(&rows);
assert_eq!(t.get(&(1, "a".into())).copied(), Some(2));
assert_eq!(t.get(&(1, "b".into())).copied(), Some(1));
assert_eq!(t.get(&(2, "a".into())).copied(), Some(1));
}
#[test]
fn bucket_folder_pairs_collapses_within_folder_dups() {
// A hash that exists at TWO paths in the same folder isn't a
// folder-pair edge — it's a within-folder dup. Should produce
// zero pairs.
let rows = vec![
dup_row_at(1, "f1/a.jpg", "h1"),
dup_row_at(1, "f1/a_copy.jpg", "h1"),
];
let totals: HashMap<(i32, String), i64> =
[((1, "f1".to_string()), 2)].into_iter().collect();
let pairs = bucket_folder_pairs(rows, &totals);
assert!(pairs.is_empty());
}
#[test]
fn bucket_folder_pairs_canonicalizes_pair_order() {
// Two hashes both span (lib1, "f1") and (lib2, "f2") — should
// bucket onto the SAME pair, regardless of which side the dup
// query encounters first.
let rows = vec![
dup_row_at(1, "f1/a.jpg", "h1"),
dup_row_at(2, "f2/a.jpg", "h1"),
dup_row_at(2, "f2/b.jpg", "h2"),
dup_row_at(1, "f1/b.jpg", "h2"),
];
let totals: HashMap<(i32, String), i64> =
[((1, "f1".to_string()), 2), ((2, "f2".to_string()), 2)]
.into_iter()
.collect();
let pairs = bucket_folder_pairs(rows, &totals);
assert_eq!(pairs.len(), 1);
assert_eq!(pairs[0].shared_count, 2);
assert!((pairs[0].coverage - 1.0).abs() < 1e-6);
}
#[test]
fn bucket_folder_pairs_subset_coverage() {
// BMW (50 files) is a strict subset of Night Photos (200 files):
// 3 hashes shared between them, BMW has 5 files, Night has 20.
// Coverage should be 3/5 = 0.6 (smaller side fully informs the
// metric — that's the "fully contained" signal).
let rows = vec![
dup_row_at(1, "Cars/BMW/a.NEF", "h1"),
dup_row_at(1, "Cars/Night Photos/2015/July/a.NEF", "h1"),
dup_row_at(1, "Cars/BMW/b.NEF", "h2"),
dup_row_at(1, "Cars/Night Photos/2015/July/b.NEF", "h2"),
dup_row_at(1, "Cars/BMW/c.NEF", "h3"),
dup_row_at(1, "Cars/Night Photos/2015/July/c.NEF", "h3"),
];
let totals: HashMap<(i32, String), i64> = [
((1, "Cars/BMW".to_string()), 5),
((1, "Cars/Night Photos/2015/July".to_string()), 20),
]
.into_iter()
.collect();
let pairs = bucket_folder_pairs(rows, &totals);
assert_eq!(pairs.len(), 1);
assert_eq!(pairs[0].shared_count, 3);
// Smaller side (BMW, 5 files) hits coverage 3/5 = 0.6.
assert!((pairs[0].coverage - 0.6).abs() < 1e-6);
}
#[test]
fn bucket_folder_pairs_picks_lex_smallest_rep() {
// Two copies in folder A, one in folder B. The shared_files
// entry should reference A's lex-smallest rel_path.
let rows = vec![
dup_row_at(1, "A/z.jpg", "h1"),
dup_row_at(1, "A/a.jpg", "h1"),
dup_row_at(1, "B/q.jpg", "h1"),
];
let totals: HashMap<(i32, String), i64> =
[((1, "A".to_string()), 2), ((1, "B".to_string()), 1)]
.into_iter()
.collect();
let pairs = bucket_folder_pairs(rows, &totals);
assert_eq!(pairs.len(), 1);
let f = &pairs[0].shared_files[0];
// Pair is canonicalized so A < B; A's rep wins lex.
assert_eq!(f.a_rel_path, "A/a.jpg");
assert_eq!(f.b_rel_path, "B/q.jpg");
}
#[test]
fn filter_folder_pairs_applies_thresholds() {
let totals: HashMap<(i32, String), i64> = [
((1, "tiny".to_string()), 1),
((1, "tinier".to_string()), 1),
((1, "big-a".to_string()), 100),
((1, "big-b".to_string()), 100),
]
.into_iter()
.collect();
// tiny↔tinier: 1 shared, coverage 1.0 — should be filtered out
// by min_shared=3 (incidental, not a folder-level signal).
// big-a↔big-b: 50 shared — should pass.
let mut rows: Vec<DuplicateRow> = Vec::new();
rows.push(dup_row_at(1, "tiny/x.jpg", "ht"));
rows.push(dup_row_at(1, "tinier/x.jpg", "ht"));
for i in 0..50 {
let h = format!("h{}", i);
rows.push(dup_row_at(1, &format!("big-a/{}.jpg", i), &h));
rows.push(dup_row_at(1, &format!("big-b/{}.jpg", i), &h));
}
let pairs = bucket_folder_pairs(rows, &totals);
let kept = filter_folder_pairs(pairs, 0.5, 3);
assert_eq!(kept.len(), 1);
assert_eq!(kept[0].shared_count, 50);
}
/// Sanity-check the BK-tree's metric, which is what the duplicates
/// path actually clusters on.
#[test]
fn hamming_metric_is_symmetric() {
let m = HammingMetric;
let a = HashKey {
phash: 0b1010,
idx: 0,
};
let b = HashKey {
phash: 0b0101,
idx: 1,
};
let d1 = m.distance(&a, &b);
let d2 = m.distance(&b, &a);
assert_eq!(d1, d2);
assert_eq!(d1, 4);
}
}