knowledge: consolidation proposals endpoint

Finds near-duplicate entities the upsert-time cosine guard didn't
catch — typically legacy data from before that guard landed, or
pairs whose embeddings sit between 0.85 (default proposal floor)
and 0.92 (auto-collapse threshold). Pure read-side feature; the
actual merging still goes through the existing
/knowledge/entities/merge action.

New DAO method `find_consolidation_proposals(threshold,
max_groups)`:
  - Loads every non-rejected entity with an embedding.
  - Partitions by entity_type so a person can't cluster with a
    place.
  - Pairwise cosine, edges above threshold feed a union-find for
    transitive grouping (Sara → Sarah → Sarah J. all land in one
    cluster).
  - Tracks min/max cosine per component so the UI can show "how
    tight" each cluster is before clicking in.
  - Returns groups of >= 2 sorted by size desc then max cosine
    desc; trimmed to `max_groups`.

New endpoint `GET /knowledge/consolidation-proposals?threshold=
&limit=` accepts the threshold (clamped 0.5–0.99 to prevent the
"every entity in one mega-cluster" case) and returns groups with
per-entity persona fact-count breakdowns baked in — saves the UI
a separate query per cluster member.

ConsolidationGroup is exported through database/mod.rs so the
handler can use it without depending on knowledge_dao internals.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Cameron Cordes
2026-05-11 18:43:11 -04:00
parent 89d0a6527c
commit 6620fa48d7
3 changed files with 276 additions and 5 deletions

View File

@@ -117,6 +117,17 @@ pub struct RecentActivity {
pub facts: Vec<EntityFact>,
}
/// A near-duplicate cluster found by `find_consolidation_proposals`.
/// `min_cosine` / `max_cosine` are summary stats over the pairwise
/// edges inside the group — gives the curator a sense of "how tight"
/// the cluster is before clicking in.
#[derive(Debug, Clone)]
pub struct ConsolidationGroup {
pub entities: Vec<Entity>,
pub min_cosine: f32,
pub max_cosine: f32,
}
// ---------------------------------------------------------------------------
// Trait
// ---------------------------------------------------------------------------
@@ -167,6 +178,21 @@ pub trait KnowledgeDao: Sync + Send {
persona: &PersonaFilter,
) -> Result<(Vec<(Entity, i64)>, i64), DbError>;
/// Find groups of near-duplicate entities that the upsert-time
/// cosine guard didn't catch (it runs at ~0.92; this scan runs
/// at a lower threshold to surface the "probably same" tier that
/// needs human review). Groups are formed via union-find over
/// the cosine-adjacency graph, partitioned by entity_type so a
/// person can't cluster with a place. Returns groups of >= 2
/// entities, sorted by size desc then by max pairwise cosine.
/// Trimmed to `max_groups`.
fn find_consolidation_proposals(
&mut self,
cx: &opentelemetry::Context,
threshold: f32,
max_groups: usize,
) -> Result<Vec<ConsolidationGroup>, DbError>;
/// Batch fetch per-persona fact counts for a set of entities,
/// scoped to one user. Returns map of entity_id → list of
/// (persona_id, count). Used by the curation UI to show "this
@@ -800,6 +826,165 @@ impl KnowledgeDao for SqliteKnowledgeDao {
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn find_consolidation_proposals(
&mut self,
cx: &opentelemetry::Context,
threshold: f32,
max_groups: usize,
) -> Result<Vec<ConsolidationGroup>, DbError> {
trace_db_call(cx, "query", "find_consolidation_proposals", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// Pull every non-rejected entity with an embedding. We
// keep 'reviewed' rows in the scan because pre-guard
// legacy data still needs cleanup even if the curator
// marked individual entities reviewed.
let rows: Vec<Entity> = entities
.filter(embedding.is_not_null())
.filter(status.ne("rejected"))
.load::<Entity>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
// Partition by entity_type so a person can't cluster
// with a place via coincidental embedding closeness.
let mut by_type: std::collections::HashMap<String, Vec<usize>> =
std::collections::HashMap::new();
for (idx, e) in rows.iter().enumerate() {
by_type.entry(e.entity_type.clone()).or_default().push(idx);
}
// Decode embeddings once. Skip rows that don't deserialize
// cleanly (corrupted or wrong-dim) rather than failing
// the whole scan.
let mut decoded: Vec<Option<Vec<f32>>> = Vec::with_capacity(rows.len());
for e in &rows {
let v = e
.embedding
.as_ref()
.and_then(|b| Self::deserialize_embedding(b).ok())
.filter(|v| !v.is_empty());
decoded.push(v);
}
// Union-find for transitive clustering.
struct UF {
parent: Vec<usize>,
}
impl UF {
fn new(n: usize) -> Self {
UF {
parent: (0..n).collect(),
}
}
fn find(&mut self, x: usize) -> usize {
let mut r = x;
while self.parent[r] != r {
r = self.parent[r];
}
let mut cur = x;
while self.parent[cur] != r {
let nxt = self.parent[cur];
self.parent[cur] = r;
cur = nxt;
}
r
}
fn union(&mut self, a: usize, b: usize) {
let ra = self.find(a);
let rb = self.find(b);
if ra != rb {
self.parent[ra] = rb;
}
}
}
let mut uf = UF::new(rows.len());
let mut group_min: std::collections::HashMap<usize, f32> =
std::collections::HashMap::new();
let mut group_max: std::collections::HashMap<usize, f32> =
std::collections::HashMap::new();
// Single pass: union and update per-component stats in
// one go. Stats are tracked per root; final pass after
// all unions corrects roots that moved.
type Edge = (usize, usize, f32);
let mut edges: Vec<Edge> = Vec::new();
for indices in by_type.values() {
for a in 0..indices.len() {
let ia = indices[a];
let va = match &decoded[ia] {
Some(v) => v,
None => continue,
};
for b in (a + 1)..indices.len() {
let ib = indices[b];
let vb = match &decoded[ib] {
Some(v) => v,
None => continue,
};
let sim = Self::cosine_similarity(va, vb);
if sim >= threshold {
uf.union(ia, ib);
edges.push((ia, ib, sim));
}
}
}
}
// Second pass over the kept edges to populate stats by
// final root (post-union path compression).
for (a, _b, sim) in &edges {
let root = uf.find(*a);
let mn = group_min.entry(root).or_insert(*sim);
if *sim < *mn {
*mn = *sim;
}
let mx = group_max.entry(root).or_insert(*sim);
if *sim > *mx {
*mx = *sim;
}
}
// Bucket entities by root component, skipping singletons.
let mut groups: std::collections::HashMap<usize, Vec<usize>> =
std::collections::HashMap::new();
for i in 0..rows.len() {
let root = uf.find(i);
if !group_min.contains_key(&root) {
continue;
}
groups.entry(root).or_default().push(i);
}
let mut result: Vec<ConsolidationGroup> = groups
.into_iter()
.filter(|(_, members)| members.len() >= 2)
.map(|(root, members)| ConsolidationGroup {
entities: members.into_iter().map(|i| rows[i].clone()).collect(),
min_cosine: *group_min.get(&root).unwrap_or(&0.0),
max_cosine: *group_max.get(&root).unwrap_or(&0.0),
})
.collect();
// Biggest clusters first; tie-break on the strongest
// pair so the most-obvious dupes surface at the top.
result.sort_by(|a, b| {
b.entities
.len()
.cmp(&a.entities.len())
.then_with(|| {
b.max_cosine
.partial_cmp(&a.max_cosine)
.unwrap_or(std::cmp::Ordering::Equal)
})
});
result.truncate(max_groups);
Ok(result)
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_persona_breakdowns_for_entities(
&mut self,
cx: &opentelemetry::Context,