duplicates: perceptual hash + soft-mark resolution + upload 409

Adds pHash + dHash columns alongside the existing blake3 content_hash so
near-duplicates (re-encoded, resized, format-converted copies) become
queryable. /duplicates/{exact,perceptual} return groups; /duplicates/
{resolve,unresolve} flip a duplicate_of_hash soft-mark on losing rows
and union perceptual-only tag sets onto the survivor. The default
/photos listing filters duplicate_of_hash IS NULL so demoted siblings
stop cluttering the grid; include_duplicates=true opts back in for
Apollo's review modal. Upload now hashes bytes pre-write and returns
409 with the canonical sibling when a file's bytes already exist.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Cameron Cordes
2026-05-03 17:36:01 -04:00
parent 4340b164eb
commit 7584cd8792
14 changed files with 1852 additions and 1 deletions

622
src/duplicates.rs Normal file
View File

@@ -0,0 +1,622 @@
//! Duplicate detection surface — exact (blake3) and perceptual
//! (pHash + Hamming) groups, plus the soft-mark resolve flow that
//! Apollo's DUPLICATES modal drives.
//!
//! All routes require auth (Claims). Endpoints:
//!
//! - `GET /duplicates/exact?library=&include_resolved=` — count>1 byte-identical groups.
//! - `GET /duplicates/perceptual?library=&threshold=&include_resolved=` — Hamming-clustered groups.
//! - `POST /duplicates/resolve` — soft-mark demoted siblings.
//! - `POST /duplicates/unresolve` — clear a prior soft-mark.
//!
//! Perceptual clustering caches the BK-tree result for 5 minutes so
//! repeated opens of the modal don't re-cluster the whole library.
//! Cache invalidation is best-effort: resolve/unresolve clear the
//! cache, but new files arriving via the watcher don't (the next
//! 5-minute window picks them up). For a single-user personal tool
//! that's the right trade-off.
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use actix_web::{App, HttpRequest, HttpResponse, Responder, dev::ServiceFactory, web};
use bk_tree::{BKTree, Metric};
use lazy_static::lazy_static;
use opentelemetry::trace::{TraceContextExt, Tracer};
use serde::{Deserialize, Serialize};
use crate::data::Claims;
use crate::database::{DuplicateRow, ExifDao};
use crate::libraries;
use crate::otel::{extract_context_from_request, global_tracer};
use crate::state::AppState;
// ── Cache ────────────────────────────────────────────────────────────────
const PERCEPTUAL_CACHE_TTL: Duration = Duration::from_secs(300);
#[derive(Clone)]
struct PerceptualCacheEntry {
/// Cache key: (library_id, threshold, include_resolved). `library_id`
/// is `None` for "all libraries". Cluster output is the same shape we
/// return on the wire so we can serve cached requests with zero work.
library_id: Option<i32>,
threshold: u32,
include_resolved: bool,
computed_at: Instant,
groups: Vec<DuplicateGroup>,
}
lazy_static! {
static ref PERCEPTUAL_CACHE: Mutex<Option<PerceptualCacheEntry>> = Mutex::new(None);
}
/// Drop the perceptual-cluster cache. Called from `resolve`/`unresolve`
/// so the next modal open reflects the soft-mark change immediately.
fn invalidate_perceptual_cache() {
if let Ok(mut guard) = PERCEPTUAL_CACHE.lock() {
*guard = None;
}
}
// ── Wire shapes ──────────────────────────────────────────────────────────
#[derive(Serialize, Debug, Clone)]
pub struct DuplicateMember {
pub library_id: i32,
pub rel_path: String,
pub content_hash: String,
pub size_bytes: Option<i64>,
pub date_taken: Option<i64>,
pub width: Option<i32>,
pub height: Option<i32>,
pub duplicate_of_hash: Option<String>,
pub duplicate_decided_at: Option<i64>,
}
impl From<DuplicateRow> for DuplicateMember {
fn from(r: DuplicateRow) -> Self {
Self {
library_id: r.library_id,
rel_path: r.rel_path,
content_hash: r.content_hash,
size_bytes: r.size_bytes,
date_taken: r.date_taken,
width: r.width,
height: r.height,
duplicate_of_hash: r.duplicate_of_hash,
duplicate_decided_at: r.duplicate_decided_at,
}
}
}
#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "lowercase")]
pub enum DuplicateKind {
Exact,
Perceptual,
}
#[derive(Serialize, Debug, Clone)]
pub struct DuplicateGroup {
pub kind: DuplicateKind,
/// Representative content_hash. For exact groups, the shared hash
/// (every member has the same one). For perceptual groups, an
/// arbitrary cluster member's hash, used only as a stable id for
/// the UI to key off.
pub representative_hash: String,
pub members: Vec<DuplicateMember>,
}
#[derive(Deserialize, Debug)]
pub struct ListDuplicatesQuery {
pub library: Option<String>,
#[serde(default)]
pub include_resolved: Option<bool>,
/// Perceptual only — Hamming-distance threshold. Ignored on the
/// exact endpoint. Defaults to 8 (~12% similarity tolerance, the
/// sweet spot for resized/recompressed copies).
#[serde(default)]
pub threshold: Option<u32>,
}
#[derive(Deserialize, Debug)]
pub struct DuplicateMemberRef {
pub library_id: i32,
pub rel_path: String,
}
#[derive(Deserialize, Debug)]
pub struct ResolveDuplicatesReq {
pub survivor: DuplicateMemberRef,
pub demoted: Vec<DuplicateMemberRef>,
}
#[derive(Serialize, Debug)]
pub struct ResolveResponse {
pub resolved_count: usize,
}
#[derive(Deserialize, Debug)]
pub struct UnresolveDuplicateReq {
pub library_id: i32,
pub rel_path: String,
}
// ── Handlers ─────────────────────────────────────────────────────────────
async fn list_exact_handler(
_: Claims,
request: HttpRequest,
app_state: web::Data<AppState>,
query: web::Query<ListDuplicatesQuery>,
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let context = extract_context_from_request(&request);
let span = global_tracer().start_with_context("duplicates.list_exact", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref())
.ok()
.flatten()
.map(|l| l.id);
let include_resolved = query.include_resolved.unwrap_or(false);
let rows = {
let mut dao = exif_dao.lock().expect("exif dao lock");
match dao.list_duplicates_exact(&span_context, library_id, include_resolved) {
Ok(rows) => rows,
Err(e) => {
return HttpResponse::InternalServerError().body(format!("{:?}", e));
}
}
};
let groups = group_exact(rows);
HttpResponse::Ok().json(GroupsResponse { groups })
}
async fn list_perceptual_handler(
_: Claims,
request: HttpRequest,
app_state: web::Data<AppState>,
query: web::Query<ListDuplicatesQuery>,
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let context = extract_context_from_request(&request);
let span = global_tracer().start_with_context("duplicates.list_perceptual", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let library_id = libraries::resolve_library_param(&app_state, query.library.as_deref())
.ok()
.flatten()
.map(|l| l.id);
let threshold = query.threshold.unwrap_or(8).clamp(0, 32);
let include_resolved = query.include_resolved.unwrap_or(false);
// Cache hit?
if let Ok(guard) = PERCEPTUAL_CACHE.lock() {
if let Some(entry) = guard.as_ref() {
if entry.library_id == library_id
&& entry.threshold == threshold
&& entry.include_resolved == include_resolved
&& entry.computed_at.elapsed() < PERCEPTUAL_CACHE_TTL
{
return HttpResponse::Ok().json(GroupsResponse {
groups: entry.groups.clone(),
});
}
}
}
let rows = {
let mut dao = exif_dao.lock().expect("exif dao lock");
match dao.list_perceptual_candidates(&span_context, library_id, include_resolved) {
Ok(rows) => rows,
Err(e) => {
return HttpResponse::InternalServerError().body(format!("{:?}", e));
}
}
};
let groups = cluster_perceptual(rows, threshold);
if let Ok(mut guard) = PERCEPTUAL_CACHE.lock() {
*guard = Some(PerceptualCacheEntry {
library_id,
threshold,
include_resolved,
computed_at: Instant::now(),
groups: groups.clone(),
});
}
HttpResponse::Ok().json(GroupsResponse { groups })
}
async fn resolve_handler(
_: Claims,
request: HttpRequest,
body: web::Json<ResolveDuplicatesReq>,
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let context = extract_context_from_request(&request);
let span = global_tracer().start_with_context("duplicates.resolve", &context);
let span_context = opentelemetry::Context::current_with_span(span);
if body.demoted.is_empty() {
return HttpResponse::BadRequest().body("demoted list is empty");
}
let mut dao = exif_dao.lock().expect("exif dao lock");
// Resolve survivor → its content_hash, plus the canonical rel_path
// we'll use as the destination for any tag-union INSERTs.
let survivor = match dao.lookup_duplicate_row(
&span_context,
body.survivor.library_id,
&body.survivor.rel_path,
) {
Ok(Some(row)) => row,
Ok(None) => return HttpResponse::NotFound().body("survivor not found"),
Err(e) => return HttpResponse::InternalServerError().body(format!("{:?}", e)),
};
// Survivor must not itself be soft-marked — otherwise the modal is
// pointing at a row we've already demoted, which would create a chain.
if survivor.duplicate_of_hash.is_some() {
return HttpResponse::Conflict().body("survivor is itself soft-marked as a duplicate");
}
let now = chrono::Utc::now().timestamp();
let mut resolved_count = 0usize;
for member_ref in &body.demoted {
let demoted = match dao.lookup_duplicate_row(
&span_context,
member_ref.library_id,
&member_ref.rel_path,
) {
Ok(Some(row)) => row,
Ok(None) => {
log::warn!(
"duplicates.resolve: skipping unknown demoted ({}, {})",
member_ref.library_id,
member_ref.rel_path
);
continue;
}
Err(e) => {
return HttpResponse::InternalServerError().body(format!("{:?}", e));
}
};
// Survivor and demoted must not be the same row (would set
// duplicate_of_hash to its own hash — recursive nonsense).
if demoted.library_id == survivor.library_id && demoted.rel_path == survivor.rel_path {
continue;
}
// For perceptual dups (different content_hash), union the
// demoted's tag set onto the survivor before flipping the
// soft-mark. For exact dups (same content_hash), tags are
// already shared at the bytes layer — the union is a no-op.
if demoted.content_hash != survivor.content_hash {
if let Err(e) = dao.union_perceptual_tags(
&span_context,
&survivor.content_hash,
&demoted.content_hash,
&survivor.rel_path,
) {
log::warn!(
"duplicates.resolve: tag union failed for {}: {:?}",
demoted.rel_path,
e
);
// Continue with the soft-mark anyway — losing tag
// continuity is recoverable (unresolve restores the
// demoted row's grid presence, and the original tags
// never moved off the demoted hash).
}
}
if let Err(e) = dao.set_duplicate_of(
&span_context,
demoted.library_id,
&demoted.rel_path,
&survivor.content_hash,
now,
) {
return HttpResponse::InternalServerError().body(format!("{:?}", e));
}
resolved_count += 1;
}
drop(dao);
invalidate_perceptual_cache();
HttpResponse::Ok().json(ResolveResponse { resolved_count })
}
async fn unresolve_handler(
_: Claims,
request: HttpRequest,
body: web::Json<UnresolveDuplicateReq>,
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let context = extract_context_from_request(&request);
let span = global_tracer().start_with_context("duplicates.unresolve", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let mut dao = exif_dao.lock().expect("exif dao lock");
if let Err(e) = dao.clear_duplicate_of(&span_context, body.library_id, &body.rel_path) {
return HttpResponse::InternalServerError().body(format!("{:?}", e));
}
drop(dao);
invalidate_perceptual_cache();
HttpResponse::Ok().finish()
}
// ── Grouping / clustering ────────────────────────────────────────────────
#[derive(Serialize, Debug)]
struct GroupsResponse {
groups: Vec<DuplicateGroup>,
}
fn group_exact(rows: Vec<DuplicateRow>) -> Vec<DuplicateGroup> {
let mut by_hash: HashMap<String, Vec<DuplicateRow>> = HashMap::new();
for row in rows {
by_hash
.entry(row.content_hash.clone())
.or_default()
.push(row);
}
let mut groups: Vec<DuplicateGroup> = by_hash
.into_iter()
.filter(|(_, members)| members.len() > 1)
.map(|(hash, members)| DuplicateGroup {
kind: DuplicateKind::Exact,
representative_hash: hash,
members: members.into_iter().map(DuplicateMember::from).collect(),
})
.collect();
// Largest groups first (most reward per click), then deterministic.
groups.sort_by(|a, b| {
b.members
.len()
.cmp(&a.members.len())
.then_with(|| a.representative_hash.cmp(&b.representative_hash))
});
groups
}
/// Single-link cluster the input rows by Hamming distance over their
/// pHash, with `threshold` as the maximum distance for an edge. Rows
/// without a pHash are skipped (we already filter at the SQL layer but
/// the type carries an Option for safety).
///
/// Implementation: BK-tree neighbourhood lookup per row, union-find
/// over the resulting edges. O(N log N) instead of the O(N²) naive
/// pairwise scan; on a 1.26M-row library that's the difference between
/// "responds in 1.5 s" and "responds in 25 minutes".
fn cluster_perceptual(rows: Vec<DuplicateRow>, threshold: u32) -> Vec<DuplicateGroup> {
let candidates: Vec<DuplicateRow> = rows.into_iter().filter(|r| r.phash_64.is_some()).collect();
if candidates.len() < 2 {
return Vec::new();
}
// Build BK-tree keyed on (phash_u64, index-in-candidates).
let mut tree: BKTree<HashKey, HammingMetric> = BKTree::new(HammingMetric);
for (idx, row) in candidates.iter().enumerate() {
if let Some(p) = row.phash_64 {
tree.add(HashKey {
phash: p as u64,
idx,
});
}
}
// Union-find over edges within `threshold`.
let mut uf = UnionFind::new(candidates.len());
for (idx, row) in candidates.iter().enumerate() {
let Some(p) = row.phash_64 else { continue };
let key = HashKey {
phash: p as u64,
idx,
};
for (_, neighbour) in tree.find(&key, threshold) {
if neighbour.idx != idx {
uf.union(idx, neighbour.idx);
}
}
}
// Bucket by root.
let mut by_root: HashMap<usize, Vec<DuplicateRow>> = HashMap::new();
for (idx, row) in candidates.into_iter().enumerate() {
let root = uf.find(idx);
by_root.entry(root).or_default().push(row);
}
let mut groups: Vec<DuplicateGroup> = by_root
.into_values()
.filter(|cluster| cluster.len() > 1)
.map(|cluster| {
let representative_hash = cluster[0].content_hash.clone();
DuplicateGroup {
kind: DuplicateKind::Perceptual,
representative_hash,
members: cluster.into_iter().map(DuplicateMember::from).collect(),
}
})
.collect();
groups.sort_by(|a, b| {
b.members
.len()
.cmp(&a.members.len())
.then_with(|| a.representative_hash.cmp(&b.representative_hash))
});
groups
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
struct HashKey {
phash: u64,
idx: usize,
}
struct HammingMetric;
impl Metric<HashKey> for HammingMetric {
fn distance(&self, a: &HashKey, b: &HashKey) -> u32 {
(a.phash ^ b.phash).count_ones()
}
fn threshold_distance(&self, a: &HashKey, b: &HashKey, _: u32) -> Option<u32> {
Some(self.distance(a, b))
}
}
struct UnionFind {
parent: Vec<usize>,
rank: Vec<u8>,
}
impl UnionFind {
fn new(n: usize) -> Self {
Self {
parent: (0..n).collect(),
rank: vec![0; n],
}
}
fn find(&mut self, x: usize) -> usize {
if self.parent[x] != x {
let root = self.find(self.parent[x]);
self.parent[x] = root;
}
self.parent[x]
}
fn union(&mut self, a: usize, b: usize) {
let ra = self.find(a);
let rb = self.find(b);
if ra == rb {
return;
}
if self.rank[ra] < self.rank[rb] {
self.parent[ra] = rb;
} else if self.rank[ra] > self.rank[rb] {
self.parent[rb] = ra;
} else {
self.parent[rb] = ra;
self.rank[ra] += 1;
}
}
}
// ── Routing ──────────────────────────────────────────────────────────────
pub fn add_duplicate_services<T>(app: App<T>) -> App<T>
where
T: ServiceFactory<actix_web::dev::ServiceRequest, Config = (), Error = actix_web::Error, InitError = ()>,
{
app.service(web::resource("/duplicates/exact").route(web::get().to(list_exact_handler)))
.service(
web::resource("/duplicates/perceptual").route(web::get().to(list_perceptual_handler)),
)
.service(web::resource("/duplicates/resolve").route(web::post().to(resolve_handler)))
.service(web::resource("/duplicates/unresolve").route(web::post().to(unresolve_handler)))
}
// ── Tests ────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
fn row(library_id: i32, rel: &str, hash: &str, phash: Option<i64>) -> DuplicateRow {
DuplicateRow {
library_id,
rel_path: rel.into(),
content_hash: hash.into(),
size_bytes: Some(1000),
date_taken: None,
width: None,
height: None,
phash_64: phash,
dhash_64: None,
duplicate_of_hash: None,
duplicate_decided_at: None,
}
}
#[test]
fn group_exact_collapses_by_hash() {
let rows = vec![
row(1, "a.jpg", "h1", None),
row(1, "b.jpg", "h1", None),
row(2, "c.jpg", "h1", None),
row(1, "lonely.jpg", "h2", None),
];
let groups = group_exact(rows);
assert_eq!(groups.len(), 1);
assert_eq!(groups[0].representative_hash, "h1");
assert_eq!(groups[0].members.len(), 3);
}
#[test]
fn cluster_perceptual_unites_close_hashes() {
// Three rows: two near each other (phash differs by 1 bit),
// one far away. Threshold 4 should merge the close pair.
let rows = vec![
row(1, "a.jpg", "h1", Some(0b0000)),
row(1, "b.jpg", "h2", Some(0b0001)),
row(1, "c.jpg", "h3", Some(i64::MAX)),
];
let groups = cluster_perceptual(rows, 4);
assert_eq!(groups.len(), 1);
assert_eq!(groups[0].members.len(), 2);
let paths: Vec<&str> = groups[0]
.members
.iter()
.map(|m| m.rel_path.as_str())
.collect();
assert!(paths.contains(&"a.jpg"));
assert!(paths.contains(&"b.jpg"));
}
#[test]
fn cluster_perceptual_threshold_zero_drops_distinct() {
let rows = vec![
row(1, "a.jpg", "h1", Some(0b0000)),
row(1, "b.jpg", "h2", Some(0b0001)),
];
let groups = cluster_perceptual(rows, 0);
assert!(groups.is_empty());
}
#[test]
fn cluster_perceptual_skips_singletons() {
let rows = vec![row(1, "alone.jpg", "h1", Some(0))];
assert!(cluster_perceptual(rows, 8).is_empty());
}
/// Sanity-check the BK-tree's metric, which is what the duplicates
/// path actually clusters on.
#[test]
fn hamming_metric_is_symmetric() {
let m = HammingMetric;
let a = HashKey { phash: 0b1010, idx: 0 };
let b = HashKey { phash: 0b0101, idx: 1 };
let d1 = m.distance(&a, &b);
let d2 = m.distance(&b, &a);
assert_eq!(d1, d2);
assert_eq!(d1, 4);
}
}