From 6620fa48d70fb2ff8c3ec97632c87775817860bf Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Mon, 11 May 2026 18:43:11 -0400 Subject: [PATCH] knowledge: consolidation proposals endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Finds near-duplicate entities the upsert-time cosine guard didn't catch — typically legacy data from before that guard landed, or pairs whose embeddings sit between 0.85 (default proposal floor) and 0.92 (auto-collapse threshold). Pure read-side feature; the actual merging still goes through the existing /knowledge/entities/merge action. New DAO method `find_consolidation_proposals(threshold, max_groups)`: - Loads every non-rejected entity with an embedding. - Partitions by entity_type so a person can't cluster with a place. - Pairwise cosine, edges above threshold feed a union-find for transitive grouping (Sara → Sarah → Sarah J. all land in one cluster). - Tracks min/max cosine per component so the UI can show "how tight" each cluster is before clicking in. - Returns groups of >= 2 sorted by size desc then max cosine desc; trimmed to `max_groups`. New endpoint `GET /knowledge/consolidation-proposals?threshold= &limit=` accepts the threshold (clamped 0.5–0.99 to prevent the "every entity in one mega-cluster" case) and returns groups with per-entity persona fact-count breakdowns baked in — saves the UI a separate query per cluster member. ConsolidationGroup is exported through database/mod.rs so the handler can use it without depending on knowledge_dao internals. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/database/knowledge_dao.rs | 185 ++++++++++++++++++++++++++++++++++ src/database/mod.rs | 4 +- src/knowledge.rs | 92 ++++++++++++++++- 3 files changed, 276 insertions(+), 5 deletions(-) diff --git a/src/database/knowledge_dao.rs b/src/database/knowledge_dao.rs index 330ad40..f500011 100644 --- a/src/database/knowledge_dao.rs +++ b/src/database/knowledge_dao.rs @@ -117,6 +117,17 @@ pub struct RecentActivity { pub facts: Vec, } +/// A near-duplicate cluster found by `find_consolidation_proposals`. +/// `min_cosine` / `max_cosine` are summary stats over the pairwise +/// edges inside the group — gives the curator a sense of "how tight" +/// the cluster is before clicking in. +#[derive(Debug, Clone)] +pub struct ConsolidationGroup { + pub entities: Vec, + pub min_cosine: f32, + pub max_cosine: f32, +} + // --------------------------------------------------------------------------- // Trait // --------------------------------------------------------------------------- @@ -167,6 +178,21 @@ pub trait KnowledgeDao: Sync + Send { persona: &PersonaFilter, ) -> Result<(Vec<(Entity, i64)>, i64), DbError>; + /// Find groups of near-duplicate entities that the upsert-time + /// cosine guard didn't catch (it runs at ~0.92; this scan runs + /// at a lower threshold to surface the "probably same" tier that + /// needs human review). Groups are formed via union-find over + /// the cosine-adjacency graph, partitioned by entity_type so a + /// person can't cluster with a place. Returns groups of >= 2 + /// entities, sorted by size desc then by max pairwise cosine. + /// Trimmed to `max_groups`. + fn find_consolidation_proposals( + &mut self, + cx: &opentelemetry::Context, + threshold: f32, + max_groups: usize, + ) -> Result, DbError>; + /// Batch fetch per-persona fact counts for a set of entities, /// scoped to one user. Returns map of entity_id → list of /// (persona_id, count). Used by the curation UI to show "this @@ -800,6 +826,165 @@ impl KnowledgeDao for SqliteKnowledgeDao { .map_err(|_| DbError::new(DbErrorKind::QueryError)) } + fn find_consolidation_proposals( + &mut self, + cx: &opentelemetry::Context, + threshold: f32, + max_groups: usize, + ) -> Result, DbError> { + trace_db_call(cx, "query", "find_consolidation_proposals", |_span| { + use schema::entities::dsl::*; + + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + // Pull every non-rejected entity with an embedding. We + // keep 'reviewed' rows in the scan because pre-guard + // legacy data still needs cleanup even if the curator + // marked individual entities reviewed. + let rows: Vec = entities + .filter(embedding.is_not_null()) + .filter(status.ne("rejected")) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + + // Partition by entity_type so a person can't cluster + // with a place via coincidental embedding closeness. + let mut by_type: std::collections::HashMap> = + std::collections::HashMap::new(); + for (idx, e) in rows.iter().enumerate() { + by_type.entry(e.entity_type.clone()).or_default().push(idx); + } + + // Decode embeddings once. Skip rows that don't deserialize + // cleanly (corrupted or wrong-dim) rather than failing + // the whole scan. + let mut decoded: Vec>> = Vec::with_capacity(rows.len()); + for e in &rows { + let v = e + .embedding + .as_ref() + .and_then(|b| Self::deserialize_embedding(b).ok()) + .filter(|v| !v.is_empty()); + decoded.push(v); + } + + // Union-find for transitive clustering. + struct UF { + parent: Vec, + } + impl UF { + fn new(n: usize) -> Self { + UF { + parent: (0..n).collect(), + } + } + fn find(&mut self, x: usize) -> usize { + let mut r = x; + while self.parent[r] != r { + r = self.parent[r]; + } + let mut cur = x; + while self.parent[cur] != r { + let nxt = self.parent[cur]; + self.parent[cur] = r; + cur = nxt; + } + r + } + fn union(&mut self, a: usize, b: usize) { + let ra = self.find(a); + let rb = self.find(b); + if ra != rb { + self.parent[ra] = rb; + } + } + } + + let mut uf = UF::new(rows.len()); + let mut group_min: std::collections::HashMap = + std::collections::HashMap::new(); + let mut group_max: std::collections::HashMap = + std::collections::HashMap::new(); + + // Single pass: union and update per-component stats in + // one go. Stats are tracked per root; final pass after + // all unions corrects roots that moved. + type Edge = (usize, usize, f32); + let mut edges: Vec = Vec::new(); + for indices in by_type.values() { + for a in 0..indices.len() { + let ia = indices[a]; + let va = match &decoded[ia] { + Some(v) => v, + None => continue, + }; + for b in (a + 1)..indices.len() { + let ib = indices[b]; + let vb = match &decoded[ib] { + Some(v) => v, + None => continue, + }; + let sim = Self::cosine_similarity(va, vb); + if sim >= threshold { + uf.union(ia, ib); + edges.push((ia, ib, sim)); + } + } + } + } + // Second pass over the kept edges to populate stats by + // final root (post-union path compression). + for (a, _b, sim) in &edges { + let root = uf.find(*a); + let mn = group_min.entry(root).or_insert(*sim); + if *sim < *mn { + *mn = *sim; + } + let mx = group_max.entry(root).or_insert(*sim); + if *sim > *mx { + *mx = *sim; + } + } + + // Bucket entities by root component, skipping singletons. + let mut groups: std::collections::HashMap> = + std::collections::HashMap::new(); + for i in 0..rows.len() { + let root = uf.find(i); + if !group_min.contains_key(&root) { + continue; + } + groups.entry(root).or_default().push(i); + } + + let mut result: Vec = groups + .into_iter() + .filter(|(_, members)| members.len() >= 2) + .map(|(root, members)| ConsolidationGroup { + entities: members.into_iter().map(|i| rows[i].clone()).collect(), + min_cosine: *group_min.get(&root).unwrap_or(&0.0), + max_cosine: *group_max.get(&root).unwrap_or(&0.0), + }) + .collect(); + + // Biggest clusters first; tie-break on the strongest + // pair so the most-obvious dupes surface at the top. + result.sort_by(|a, b| { + b.entities + .len() + .cmp(&a.entities.len()) + .then_with(|| { + b.max_cosine + .partial_cmp(&a.max_cosine) + .unwrap_or(std::cmp::Ordering::Equal) + }) + }); + result.truncate(max_groups); + Ok(result) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + fn get_persona_breakdowns_for_entities( &mut self, cx: &opentelemetry::Context, diff --git a/src/database/mod.rs b/src/database/mod.rs index 3c2fb5d..5a26048 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -59,8 +59,8 @@ pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao}; pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao}; pub use insights_dao::{InsightDao, SqliteInsightDao}; pub use knowledge_dao::{ - EntityFilter, EntityPatch, EntitySort, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, - RecentActivity, SqliteKnowledgeDao, + ConsolidationGroup, EntityFilter, EntityPatch, EntitySort, FactFilter, FactPatch, KnowledgeDao, + PersonaFilter, RecentActivity, SqliteKnowledgeDao, }; pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao}; pub use persona_dao::{ImportPersona, PersonaDao, PersonaPatch, SqlitePersonaDao}; diff --git a/src/knowledge.rs b/src/knowledge.rs index 69d81f4..2e26a53 100644 --- a/src/knowledge.rs +++ b/src/knowledge.rs @@ -7,8 +7,8 @@ use std::sync::Mutex; use crate::data::Claims; use crate::database::models::{Entity, EntityFact, EntityPhotoLink, InsertEntityFact}; use crate::database::{ - EntityFilter, EntityPatch, EntitySort, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, - RecentActivity, + ConsolidationGroup, EntityFilter, EntityPatch, EntitySort, FactFilter, FactPatch, KnowledgeDao, + PersonaFilter, RecentActivity, }; use crate::personas::PersonaDaoData; use crate::state::AppState; @@ -330,6 +330,27 @@ pub struct RecentQuery { pub limit: Option, } +#[derive(Deserialize)] +pub struct ConsolidationQuery { + /// Cosine threshold for clustering. Default 0.85 — looser than + /// the upsert-time guard (0.92) so this view surfaces "probably + /// same" pairs for human review. + pub threshold: Option, + pub limit: Option, +} + +#[derive(Serialize)] +pub struct ConsolidationGroupView { + pub entities: Vec, + pub min_cosine: f32, + pub max_cosine: f32, +} + +#[derive(Serialize)] +pub struct ConsolidationResponse { + pub groups: Vec, +} + // --------------------------------------------------------------------------- // Service registration // --------------------------------------------------------------------------- @@ -370,7 +391,11 @@ where web::resource("/facts/{id}/restore") .route(web::post().to(restore_fact::)), ) - .service(web::resource("/recent").route(web::get().to(get_recent::))), + .service(web::resource("/recent").route(web::get().to(get_recent::))) + .service( + web::resource("/consolidation-proposals") + .route(web::get().to(get_consolidation_proposals::)), + ), ) } @@ -1146,3 +1171,64 @@ async fn get_recent( } } } + +async fn get_consolidation_proposals( + req: HttpRequest, + claims: Claims, + query: web::Query, + dao: web::Data>, + persona_dao: PersonaDaoData, +) -> impl Responder { + // Clamp threshold so a curious client can't drag the cosine + // floor to 0 and pull every entity into one giant cluster. + let threshold = query.threshold.unwrap_or(0.85).clamp(0.5, 0.99); + let max_groups = query.limit.unwrap_or(50).clamp(1, 200) as usize; + + let persona = resolve_persona_filter(&req, &claims, &persona_dao); + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + let groups: Vec = + match dao.find_consolidation_proposals(&cx, threshold, max_groups) { + Ok(g) => g, + Err(e) => { + log::error!("find_consolidation_proposals: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + }; + + // Decorate with per-persona fact counts so the curation UI can + // show "default 8 · journal 3" inline and the curator can pick + // which entity is the strongest target. + let entity_ids: Vec = groups + .iter() + .flat_map(|g| g.entities.iter().map(|e| e.id)) + .collect(); + let breakdowns = dao + .get_persona_breakdowns_for_entities(&cx, &entity_ids, persona.user_id()) + .unwrap_or_default(); + + let groups_view: Vec = groups + .into_iter() + .map(|g| ConsolidationGroupView { + entities: g + .entities + .into_iter() + .map(|e| { + let id = e.id; + let summary = EntitySummary::from(e); + match breakdowns.get(&id) { + Some(bd) => summary.with_persona_breakdown(bd.clone()), + None => summary, + } + }) + .collect(), + min_cosine: g.min_cosine, + max_cosine: g.max_cosine, + }) + .collect(); + + HttpResponse::Ok().json(ConsolidationResponse { + groups: groups_view, + }) +}