From d7aee4f228ce81ce0d5bbd2be4c4b40f9e2c0307 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Sun, 10 May 2026 15:16:05 -0400 Subject: [PATCH 01/19] knowledge: cosine dedup, fact create endpoint, recall nudge Phase 1 of the knowledge curation work. Three small server-side changes to support an Apollo-side curation surface and reduce the agent's near- duplicate output rate going forward: - upsert_entity grows an embedding-cosine fallback after the exact name match misses. New entities whose embedding sits above ENTITY_DEDUP_COSINE_THRESHOLD (default 0.92) against any same-type active entity collapse onto the existing row. Eliminates the Sarah / Sara / Sarah J. trio the FTS5 prefix check was missing. - POST /knowledge/facts symmetric with the existing PATCH/DELETE so the curation UI can create facts directly. Persona-scoped via X-Persona-Id; validates subject (and optional object) entity existence; reuses KnowledgeDao::upsert_fact so corroboration semantics match the agent path. - One sentence in build_system_content telling the agent to call recall_entities before store_entity when a name resembles something already known. Cheap; complements the DAO-layer guard. Includes upsert_entity_collapses_near_duplicate_by_embedding test covering both the collapse-on-near-match path and the don't-collapse-on- unrelated-embedding path. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ai/insight_generator.rs | 1 + src/database/knowledge_dao.rs | 142 +++++++++++++++++++++++++++++++++- src/knowledge.rs | 112 ++++++++++++++++++++++++++- 3 files changed, 252 insertions(+), 3 deletions(-) diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index 1e39f9a..49528f7 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -3204,6 +3204,7 @@ Return ONLY the summary, nothing else."#, — surrounding events matter even when a contact is known.\n\ - Use recall_facts_for_photo + recall_entities to load any prior knowledge about subjects in the photo.\n\ - When you identify people / places / events / things, use store_entity + store_fact to grow the persistent memory.\n\ + - Before store_entity, call recall_entities to check whether a similar name already exists; reuse the existing entity_id rather than creating a near-duplicate (e.g. \"Sara\" vs \"Sarah J.\"). The DAO will collapse obvious cosine matches, but choosing the existing id keeps facts and photo links consolidated.\n\ - A tool returning no results is informative; continue with the others.", ); diff --git a/src/database/knowledge_dao.rs b/src/database/knowledge_dao.rs index f807f23..ffc368f 100644 --- a/src/database/knowledge_dao.rs +++ b/src/database/knowledge_dao.rs @@ -282,6 +282,20 @@ impl SqliteKnowledgeDao { } } +/// Cosine-similarity threshold above which a new entity collapses into an +/// existing same-type entity at upsert time. The agent's pre-flight name +/// search uses FTS5 prefix tokens, which misses near-dupes like +/// "Sarah" / "Sara" / "Sarah J." that share a description-rich embedding. +/// Override via `ENTITY_DEDUP_COSINE_THRESHOLD` env var when tuning. +const ENTITY_DEDUP_COSINE_THRESHOLD_DEFAULT: f32 = 0.92; + +fn entity_dedup_cosine_threshold() -> f32 { + std::env::var("ENTITY_DEDUP_COSINE_THRESHOLD") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(ENTITY_DEDUP_COSINE_THRESHOLD_DEFAULT) +} + impl KnowledgeDao for SqliteKnowledgeDao { // ----------------------------------------------------------------------- // Entity operations @@ -308,7 +322,7 @@ impl KnowledgeDao for SqliteKnowledgeDao { // Use lower() on both sides so existing dirty rows ("Person") still match. let name_lower = entity.name.to_lowercase(); let type_lower = entity.entity_type.to_lowercase(); - let existing: Option = entities + let mut existing: Option = entities .filter(diesel::dsl::sql::(&format!( "lower(name) = '{}' AND lower(entity_type) = '{}'", name_lower.replace('\'', "''"), @@ -318,6 +332,49 @@ impl KnowledgeDao for SqliteKnowledgeDao { .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + // Fuzzy-match fallback: if no exact name match and the incoming + // entity carries an embedding, compare against same-type entities' + // embeddings and collapse if any are above the cosine threshold. + if existing.is_none() + && let Some(new_emb_bytes) = entity.embedding.as_ref() + && let Ok(new_vec) = Self::deserialize_embedding(new_emb_bytes) + && !new_vec.is_empty() + { + let threshold = entity_dedup_cosine_threshold(); + let candidates: Vec = entities + .filter(embedding.is_not_null()) + .filter(entity_type.eq(&entity.entity_type)) + .filter(status.ne("rejected")) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + + let mut best: Option<(Entity, f32)> = None; + for cand in candidates { + let Some(cand_bytes) = cand.embedding.as_ref() else { + continue; + }; + let Ok(cand_vec) = Self::deserialize_embedding(cand_bytes) else { + continue; + }; + let sim = Self::cosine_similarity(&new_vec, &cand_vec); + if sim >= threshold && best.as_ref().is_none_or(|(_, s)| sim > *s) { + best = Some((cand, sim)); + } + } + + if let Some((cand, sim)) = best { + log::info!( + "entity dedup: collapsing new '{}' ({}) into existing '{}' (id={}, cos={:.3})", + entity.name, + entity.entity_type, + cand.name, + cand.id, + sim + ); + existing = Some(cand); + } + } + if let Some(existing_entity) = existing { // Update description, embedding, updated_at diesel::update(entities.filter(id.eq(existing_entity.id))) @@ -1276,4 +1333,87 @@ mod tests { "FK should reject fact whose persona doesn't exist" ); } + + #[test] + fn upsert_entity_collapses_near_duplicate_by_embedding() { + // The agent's pre-flight check uses FTS5 prefix tokens, which + // miss "Sarah" / "Sara" / "Sarah J." pairs. The DAO upsert is + // the safety net: if no exact (name, type) match but the new + // entity's embedding sits above the cosine threshold against an + // existing same-type entity, we collapse instead of inserting. + let cx = opentelemetry::Context::new(); + let conn = connection_with_fks_on(); + let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); + + let mut emb_a = vec![0.0_f32; 64]; + emb_a[0] = 1.0; + emb_a[1] = 0.5; + let mut emb_b_near = emb_a.clone(); + emb_b_near[2] = 0.05; // nudge — cosine still well above 0.92 + + // Seed an existing entity with the embedding. + let seeded = dao + .upsert_entity( + &cx, + InsertEntity { + name: "Sarah".to_string(), + entity_type: "person".to_string(), + description: "tagged friend".to_string(), + embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_a)), + confidence: 0.6, + status: "active".to_string(), + created_at: 0, + updated_at: 0, + }, + ) + .unwrap(); + + // A "different name" with a near-identical embedding should + // collapse onto the existing row, not create a new entity. + let collapsed = dao + .upsert_entity( + &cx, + InsertEntity { + name: "Sara".to_string(), + entity_type: "person".to_string(), + description: "tagged friend".to_string(), + embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_b_near)), + confidence: 0.6, + status: "active".to_string(), + created_at: 0, + updated_at: 0, + }, + ) + .unwrap(); + + assert_eq!( + collapsed.id, seeded.id, + "near-duplicate by cosine should reuse the existing entity id" + ); + + // And a clearly-different embedding under a different name should + // still create a new row. + let mut emb_unrelated = vec![0.0_f32; 64]; + emb_unrelated[10] = 1.0; + let distinct = dao + .upsert_entity( + &cx, + InsertEntity { + name: "Bob".to_string(), + entity_type: "person".to_string(), + description: String::new(), + embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_unrelated)), + confidence: 0.6, + status: "active".to_string(), + created_at: 0, + updated_at: 0, + }, + ) + .unwrap(); + + assert_ne!( + distinct.id, seeded.id, + "unrelated embedding should not collapse" + ); + } } diff --git a/src/knowledge.rs b/src/knowledge.rs index 30ceabd..f37ed1e 100644 --- a/src/knowledge.rs +++ b/src/knowledge.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use std::sync::Mutex; use crate::data::Claims; -use crate::database::models::{Entity, EntityFact, EntityPhotoLink}; +use crate::database::models::{Entity, EntityFact, EntityPhotoLink, InsertEntityFact}; use crate::database::{ EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, RecentActivity, }; @@ -179,6 +179,16 @@ pub struct FactPatchRequest { pub confidence: Option, } +#[derive(Deserialize)] +pub struct FactCreateRequest { + pub subject_entity_id: i32, + pub predicate: String, + pub object_entity_id: Option, + pub object_value: Option, + pub source_photo: Option, + pub confidence: Option, +} + #[derive(Deserialize)] pub struct EntityListQuery { #[serde(rename = "type")] @@ -222,7 +232,11 @@ where .route(web::patch().to(patch_entity::)) .route(web::delete().to(delete_entity::)), ) - .service(web::resource("/facts").route(web::get().to(list_facts::))) + .service( + web::resource("/facts") + .route(web::get().to(list_facts::)) + .route(web::post().to(create_fact::)), + ) .service( web::resource("/facts/{id}") .route(web::patch().to(patch_fact::)) @@ -535,6 +549,100 @@ async fn list_facts( } } +async fn create_fact( + req: HttpRequest, + claims: Claims, + body: web::Json, + dao: web::Data>, + persona_dao: PersonaDaoData, +) -> impl Responder { + if body.object_entity_id.is_none() && body.object_value.is_none() { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": "object_entity_id or object_value is required" + })); + } + if body.predicate.trim().is_empty() { + return HttpResponse::BadRequest() + .json(serde_json::json!({"error": "predicate must not be empty"})); + } + + // Persona scoping: facts are written under the active single persona. + // PersonaFilter::All is read-only ("hive-mind" view); callers should + // pin a specific persona for writes via X-Persona-Id. + let persona = resolve_persona_filter(&req, &claims, &persona_dao); + let (user_id, persona_id) = match &persona { + PersonaFilter::Single { user_id, persona_id } => (*user_id, persona_id.clone()), + PersonaFilter::All { user_id } => (*user_id, "default".to_string()), + }; + + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + + // Verify subject entity exists. + match dao.get_entity_by_id(&cx, body.subject_entity_id) { + Ok(None) => { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": format!("Subject entity {} not found", body.subject_entity_id) + })); + } + Err(e) => { + log::error!("create_fact subject lookup error: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + Ok(Some(_)) => {} + } + + // Optional object entity validation when supplied. + if let Some(oid) = body.object_entity_id { + match dao.get_entity_by_id(&cx, oid) { + Ok(None) => { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": format!("Object entity {} not found", oid) + })); + } + Err(e) => { + log::error!("create_fact object lookup error: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + Ok(Some(_)) => {} + } + } + + let now = Utc::now().timestamp(); + let confidence = body.confidence.unwrap_or(0.6).clamp(0.0, 0.95); + + let insert = InsertEntityFact { + subject_entity_id: body.subject_entity_id, + predicate: body.predicate.trim().to_string(), + object_entity_id: body.object_entity_id, + object_value: body.object_value.clone(), + source_photo: body.source_photo.clone(), + source_insight_id: None, + confidence, + status: "active".to_string(), + created_at: now, + persona_id, + user_id, + }; + + match dao.upsert_fact(&cx, insert) { + Ok((fact, is_new)) => { + let status = if is_new { + actix_web::http::StatusCode::CREATED + } else { + actix_web::http::StatusCode::OK + }; + HttpResponse::build(status).json(fact) + } + Err(e) => { + log::error!("create_fact upsert error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + async fn patch_fact( _claims: Claims, id: web::Path, -- 2.49.1 From f7ce3d2b226a5c950528c762adcb58c2fb457af1 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Sun, 10 May 2026 15:19:37 -0400 Subject: [PATCH 02/19] knowledge: include library_id in photo_links response The PhotoLinkDetail in /knowledge/entities/{id} was dropping the library_id field, leaving consumers no way to construct a content-routed thumbnail URL. Apollo's curation screen was falling through to library=0 (the FastAPI default) and getting 400s. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/knowledge.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/knowledge.rs b/src/knowledge.rs index f37ed1e..49ff307 100644 --- a/src/knowledge.rs +++ b/src/knowledge.rs @@ -98,6 +98,7 @@ pub struct FactDetail { #[derive(Serialize)] pub struct PhotoLinkDetail { + pub library_id: i32, pub file_path: String, pub role: String, } @@ -105,6 +106,7 @@ pub struct PhotoLinkDetail { impl From for PhotoLinkDetail { fn from(l: EntityPhotoLink) -> Self { PhotoLinkDetail { + library_id: l.library_id, file_path: l.file_path, role: l.role, } -- 2.49.1 From 0e2b18224f26b18bdb26e038c61275fd27683f6a Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Sun, 10 May 2026 15:44:38 -0400 Subject: [PATCH 03/19] knowledge: pre-delete relational facts so entity delete succeeds MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DELETE /knowledge/entities/{id} was 500ing on any entity that was the object of a relational fact. entity_facts.object_entity_id has ON DELETE SET NULL, but the table also has CHECK (object_entity_id IS NOT NULL OR object_value IS NOT NULL) — purely relational facts (subject + predicate + object_entity_id, no object_value, like "Alice is_friend_of Bob") would have both NULL after SET NULL fired, the CHECK would abort, and the whole DELETE would fail with a CHECK violation. The user just saw QueryError because the DAO swallowed the diesel error string. Wrap delete_entity in a transaction that first deletes facts where the entity is the object AND object_value is null, then deletes the entity. Surviving siblings (typed facts about the entity as subject) are CASCADE'd by the FK as before. Also start surfacing the actual diesel error in a warn log before collapsing to DbErrorKind so future similar issues don't masquerade as the opaque QueryError. A schema-level fix (changing object FK to ON DELETE CASCADE via a table-rebuild migration) is the cleaner long-term resolution and is slated for Phase 2; the DAO-side pre-delete is sufficient and less invasive in the meantime. Test pins the contract: a relational fact pointing at the deleted entity is removed, an unrelated typed fact about an unrelated entity survives, and the entity itself is deleted. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/database/knowledge_dao.rs | 110 ++++++++++++++++++++++++++++++++-- 1 file changed, 105 insertions(+), 5 deletions(-) diff --git a/src/database/knowledge_dao.rs b/src/database/knowledge_dao.rs index ffc368f..091cd9a 100644 --- a/src/database/knowledge_dao.rs +++ b/src/database/knowledge_dao.rs @@ -601,12 +601,41 @@ impl KnowledgeDao for SqliteKnowledgeDao { trace_db_call(cx, "delete", "delete_entity", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); - diesel::delete(entities.filter(id.eq(entity_id))) - .execute(conn.deref_mut()) - .map(|_| ()) - .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) + + // entity_facts has a CHECK constraint requiring + // `object_entity_id IS NOT NULL OR object_value IS NOT NULL`. + // The FK on object_entity_id is ON DELETE SET NULL — but + // facts that pointed at the deleted entity *only* via the + // entity reference (the common case for relational facts + // like "Alice is_friend_of Bob") have no object_value, so + // SET NULL would leave them with both NULLs and the CHECK + // aborts the whole DELETE. Pre-delete those facts in a + // transaction so the CASCADE / SET NULL chain on what + // remains can fire cleanly. + // + // Long-term fix is to change the FK to ON DELETE CASCADE + // via a table-rebuild migration, but the DAO-side workaround + // is sufficient and less invasive. + conn.transaction::<(), diesel::result::Error, _>(|conn| { + use schema::entity_facts::dsl as ef; + diesel::delete( + ef::entity_facts + .filter(ef::object_entity_id.eq(entity_id)) + .filter(ef::object_value.is_null()), + ) + .execute(conn)?; + + diesel::delete(entities.filter(id.eq(entity_id))).execute(conn)?; + Ok(()) + }) + .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) + }) + .map_err(|e| { + // Surface the actual diesel error string before collapsing + // to the opaque DbErrorKind::QueryError. + log::warn!("delete_entity({}) failed: {}", entity_id, e); + DbError::new(DbErrorKind::QueryError) }) - .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn merge_entities( @@ -1334,6 +1363,77 @@ mod tests { ); } + #[test] + fn delete_entity_clears_relational_facts_that_would_violate_check() { + // entity_facts has a CHECK that at least one of object_entity_id / + // object_value is non-null. The FK on object_entity_id is + // ON DELETE SET NULL, which would leave purely-relational facts + // (subject + predicate + object_entity_id, no object_value) + // with both nulls and abort the delete. The DAO pre-deletes + // those rows in a transaction so the parent delete can succeed. + let cx = opentelemetry::Context::new(); + let conn = connection_with_fks_on(); + let alice = create_user(&conn, "alice"); + create_persona_row(&conn, alice, "default"); + + let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); + let bob = make_entity(&mut dao, "Bob"); + let carol = make_entity(&mut dao, "Carol"); + + // A relational fact where Carol is the object — exactly the + // shape the CHECK + SET NULL combination would otherwise break. + let (rel_fact, _) = dao + .upsert_fact( + &cx, + InsertEntityFact { + subject_entity_id: bob.id, + predicate: "is_friend_of".to_string(), + object_entity_id: Some(carol.id), + object_value: None, + source_photo: None, + source_insight_id: None, + confidence: 0.6, + status: "active".to_string(), + created_at: 0, + persona_id: "default".to_string(), + user_id: alice, + }, + ) + .unwrap(); + + // A typed fact where Bob is the subject — should survive. + add_fact(&mut dao, bob.id, "has_age", "30", alice, "default"); + + // Delete Carol — should succeed (relational fact pre-deleted). + dao.delete_entity(&cx, carol.id).unwrap(); + + assert!( + dao.get_entity_by_id(&cx, carol.id).unwrap().is_none(), + "Carol should be deleted" + ); + // The relational fact about Carol should be gone (pre-deleted by + // the DAO's transaction, not SET NULL'd). + let bob_facts = dao + .get_facts_for_entity( + &cx, + bob.id, + &PersonaFilter::Single { + user_id: alice, + persona_id: "default".to_string(), + }, + ) + .unwrap(); + assert!( + !bob_facts.iter().any(|f| f.id == rel_fact.id), + "relational fact pointing at Carol should be removed" + ); + // The typed fact survives. + assert!( + bob_facts.iter().any(|f| f.predicate == "has_age"), + "typed fact about Bob should survive Carol's deletion" + ); + } + #[test] fn upsert_entity_collapses_near_duplicate_by_embedding() { // The agent's pre-flight check uses FTS5 prefix tokens, which -- 2.49.1 From 0b8478a5e46164193ba3645996aadbf3e31aca57 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Sun, 10 May 2026 16:04:13 -0400 Subject: [PATCH 04/19] knowledge: list sort + persona-scoped fact_count per entity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related additions to /knowledge/entities: - New EntitySort enum (UpdatedDesc default, NameAsc, FactCountDesc) surfaced via `?sort=updated|name|count`. NameAsc clusters near- duplicate names so dupes stand out at a glance; FactCountDesc surfaces heavily-used entities and demotes 0-fact noise to the bottom. - New `list_entities_with_fact_counts` DAO method that returns each entity alongside a persona-scoped count of its non-rejected facts (subject side). Persona scope follows X-Persona-Id via the existing resolve_persona_filter chain — Single filters on (user_id, persona_id), All unions across the user's personas. Implemented as one raw SQL query with a LEFT JOIN to a fact-count subquery and ORDER BY tied to the chosen sort, so count-sort needs no second round trip. The agent's existing list_entities call site is unchanged — it doesn't need persona-scoped counts and the trait method stays cheap. EntitySummary grows an Option fact_count (skip_serializing_if none) so PATCH responses stay shaped as before. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/database/knowledge_dao.rs | 210 ++++++++++++++++++++++++++++++++++ src/database/mod.rs | 4 +- src/knowledge.rs | 43 ++++++- 3 files changed, 249 insertions(+), 8 deletions(-) diff --git a/src/database/knowledge_dao.rs b/src/database/knowledge_dao.rs index 091cd9a..70ff711 100644 --- a/src/database/knowledge_dao.rs +++ b/src/database/knowledge_dao.rs @@ -45,6 +45,17 @@ pub struct EntityFilter { pub offset: i64, } +/// Sort key for the curation list. Name = alphabetical clustering +/// (good for spotting near-duplicates like Sara / Sarah / Sarah J.). +/// FactCount = surface heavily-used entities first, demote 0-fact +/// noise to the bottom. UpdatedDesc = legacy "newest activity first". +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EntitySort { + UpdatedDesc, + NameAsc, + FactCountDesc, +} + pub struct FactFilter { pub entity_id: Option, /// "active" | "reviewed" | "rejected" | "all" @@ -134,6 +145,19 @@ pub trait KnowledgeDao: Sync + Send { filter: EntityFilter, ) -> Result<(Vec, i64), DbError>; + /// List entities alongside a persona-scoped fact count for each. + /// Powers the curation surface — sorting by fact count surfaces + /// the heavily-used entities and demotes 0-fact noise. Counting + /// is restricted to non-rejected facts under the active persona + /// scope so a switch in the persona picker re-orders the list. + fn list_entities_with_fact_counts( + &mut self, + cx: &opentelemetry::Context, + filter: EntityFilter, + sort: EntitySort, + persona: &PersonaFilter, + ) -> Result<(Vec<(Entity, i64)>, i64), DbError>; + fn update_entity_status( &mut self, cx: &opentelemetry::Context, @@ -529,6 +553,192 @@ impl KnowledgeDao for SqliteKnowledgeDao { .map_err(|_| DbError::new(DbErrorKind::QueryError)) } + fn list_entities_with_fact_counts( + &mut self, + cx: &opentelemetry::Context, + filter: EntityFilter, + sort: EntitySort, + persona: &PersonaFilter, + ) -> Result<(Vec<(Entity, i64)>, i64), DbError> { + trace_db_call(cx, "query", "list_entities_with_fact_counts", |_span| { + use diesel::sql_query; + use diesel::sql_types::{BigInt, Integer, Text}; + + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + // Build WHERE fragments. Inline-safe values are bound; status + // / sort keywords are validated against fixed sets. + let mut where_parts: Vec = Vec::new(); + let mut bind_types: Vec<&'static str> = Vec::new(); + let mut bind_strs: Vec = Vec::new(); + + if filter.entity_type.is_some() { + where_parts.push("e.entity_type = ?".to_string()); + bind_types.push("text"); + bind_strs.push(filter.entity_type.clone().unwrap()); + } + + let status_val = filter.status.as_deref().unwrap_or("active"); + if status_val != "all" { + where_parts.push("e.status = ?".to_string()); + bind_types.push("text"); + bind_strs.push(status_val.to_string()); + } + + if let Some(ref s) = filter.search { + where_parts.push("(e.name LIKE ? OR e.description LIKE ?)".to_string()); + bind_types.push("text"); + bind_types.push("text"); + let pat = format!("%{}%", s); + bind_strs.push(pat.clone()); + bind_strs.push(pat); + } + + let where_clause = if where_parts.is_empty() { + String::new() + } else { + format!("WHERE {}", where_parts.join(" AND ")) + }; + + // Persona-scoped fact-count subquery. Single = filter on + // (user_id, persona_id); All = union across the user's + // personas (mirror PersonaFilter::All read semantics). + let fact_count_join = match persona { + PersonaFilter::Single { user_id: _, persona_id: _ } => { + "LEFT JOIN (\ + SELECT subject_entity_id, COUNT(*) AS fact_count \ + FROM entity_facts \ + WHERE user_id = ? AND persona_id = ? AND status != 'rejected' \ + GROUP BY subject_entity_id\ + ) fc ON fc.subject_entity_id = e.id" + } + PersonaFilter::All { user_id: _ } => { + "LEFT JOIN (\ + SELECT subject_entity_id, COUNT(*) AS fact_count \ + FROM entity_facts \ + WHERE user_id = ? AND status != 'rejected' \ + GROUP BY subject_entity_id\ + ) fc ON fc.subject_entity_id = e.id" + } + }; + + let order_by = match sort { + EntitySort::UpdatedDesc => "e.updated_at DESC", + EntitySort::NameAsc => "lower(e.name) ASC", + EntitySort::FactCountDesc => { + "COALESCE(fc.fact_count, 0) DESC, lower(e.name) ASC" + } + }; + + let select_sql = format!( + "SELECT e.id, e.name, e.entity_type, e.description, e.embedding, \ + e.confidence, e.status, e.created_at, e.updated_at, \ + COALESCE(fc.fact_count, 0) AS fact_count \ + FROM entities e \ + {fact_count_join} \ + {where_clause} \ + ORDER BY {order_by} \ + LIMIT ? OFFSET ?" + ); + + let count_sql = format!( + "SELECT COUNT(*) AS total FROM entities e {where_clause}" + ); + + // ── Total count ───────────────────────────────────────── + #[derive(diesel::QueryableByName)] + struct TotalRow { + #[diesel(sql_type = BigInt)] + total: i64, + } + let mut count_q = sql_query(count_sql).into_boxed(); + for s in &bind_strs { + count_q = count_q.bind::(s.clone()); + } + let total: i64 = count_q + .get_result::(conn.deref_mut()) + .map(|r| r.total) + .unwrap_or(0); + + // ── Page query ────────────────────────────────────────── + #[derive(diesel::QueryableByName)] + struct EntityWithCountRow { + #[diesel(sql_type = Integer)] + id: i32, + #[diesel(sql_type = Text)] + name: String, + #[diesel(sql_type = Text)] + entity_type: String, + #[diesel(sql_type = Text)] + description: String, + #[diesel(sql_type = diesel::sql_types::Nullable)] + embedding: Option>, + #[diesel(sql_type = diesel::sql_types::Float)] + confidence: f32, + #[diesel(sql_type = Text)] + status: String, + #[diesel(sql_type = BigInt)] + created_at: i64, + #[diesel(sql_type = BigInt)] + updated_at: i64, + #[diesel(sql_type = BigInt)] + fact_count: i64, + } + + let mut q = sql_query(select_sql).into_boxed(); + // Persona binds first (they're earlier in the SQL — inside + // the subquery LEFT JOIN). + match persona { + PersonaFilter::Single { user_id, persona_id } => { + q = q + .bind::(*user_id) + .bind::(persona_id.clone()); + } + PersonaFilter::All { user_id } => { + q = q.bind::(*user_id); + } + } + // Then WHERE binds in order. + for s in &bind_strs { + q = q.bind::(s.clone()); + } + // Then LIMIT / OFFSET. + q = q + .bind::(filter.limit) + .bind::(filter.offset); + + let rows: Vec = q + .load(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + + let pairs: Vec<(Entity, i64)> = rows + .into_iter() + .map(|r| { + ( + Entity { + id: r.id, + name: r.name, + entity_type: r.entity_type, + description: r.description, + embedding: r.embedding, + confidence: r.confidence, + status: r.status, + created_at: r.created_at, + updated_at: r.updated_at, + }, + r.fact_count, + ) + }) + .collect(); + + // Sink unused `_bind_types`; keeping it as documentation. + let _ = bind_types; + + Ok((pairs, total)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + fn update_entity_status( &mut self, cx: &opentelemetry::Context, diff --git a/src/database/mod.rs b/src/database/mod.rs index d5dd9cb..3c2fb5d 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -59,8 +59,8 @@ pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao}; pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao}; pub use insights_dao::{InsightDao, SqliteInsightDao}; pub use knowledge_dao::{ - EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, RecentActivity, - SqliteKnowledgeDao, + EntityFilter, EntityPatch, EntitySort, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, + RecentActivity, SqliteKnowledgeDao, }; pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao}; pub use persona_dao::{ImportPersona, PersonaDao, PersonaPatch, SqlitePersonaDao}; diff --git a/src/knowledge.rs b/src/knowledge.rs index 49ff307..b8fc4ba 100644 --- a/src/knowledge.rs +++ b/src/knowledge.rs @@ -7,7 +7,8 @@ use std::sync::Mutex; use crate::data::Claims; use crate::database::models::{Entity, EntityFact, EntityPhotoLink, InsertEntityFact}; use crate::database::{ - EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, RecentActivity, + EntityFilter, EntityPatch, EntitySort, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, + RecentActivity, }; use crate::personas::PersonaDaoData; @@ -57,6 +58,11 @@ pub struct EntitySummary { pub status: String, pub created_at: i64, pub updated_at: i64, + /// Persona-scoped count of non-rejected facts about this entity + /// (subject side). 0 when not provided by the call site, e.g. + /// PATCH responses return the bare entity without scoping context. + #[serde(skip_serializing_if = "Option::is_none")] + pub fact_count: Option, } impl From for EntitySummary { @@ -70,10 +76,19 @@ impl From for EntitySummary { status: e.status, created_at: e.created_at, updated_at: e.updated_at, + fact_count: None, } } } +impl EntitySummary { + fn from_entity_with_count(e: Entity, fact_count: i64) -> Self { + let mut s = EntitySummary::from(e); + s.fact_count = Some(fact_count); + s + } +} + #[derive(Serialize)] pub struct EntityListResponse { pub entities: Vec, @@ -197,6 +212,9 @@ pub struct EntityListQuery { pub entity_type: Option, pub status: Option, pub search: Option, + /// "updated" (default) | "name" | "count". `count` is persona-scoped + /// via the X-Persona-Id header. + pub sort: Option, pub limit: Option, pub offset: Option, } @@ -253,9 +271,11 @@ where // --------------------------------------------------------------------------- async fn list_entities( - _claims: Claims, + req: HttpRequest, + claims: Claims, query: web::Query, dao: web::Data>, + persona_dao: PersonaDaoData, ) -> impl Responder { let limit = query.limit.unwrap_or(50).min(200); let offset = query.offset.unwrap_or(0); @@ -266,6 +286,15 @@ async fn list_entities( Some(s) => Some(s.to_string()), }; + let sort = match query.sort.as_deref() { + Some("name") => EntitySort::NameAsc, + Some("count") => EntitySort::FactCountDesc, + // "updated" or anything else falls through to the default. + _ => EntitySort::UpdatedDesc, + }; + + let persona = resolve_persona_filter(&req, &claims, &persona_dao); + let filter = EntityFilter { entity_type: query.entity_type.clone(), status: status_filter, @@ -276,10 +305,12 @@ async fn list_entities( let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); - match dao.list_entities(&cx, filter) { - Ok((entities, total)) => { - let summaries: Vec = - entities.into_iter().map(EntitySummary::from).collect(); + match dao.list_entities_with_fact_counts(&cx, filter, sort, &persona) { + Ok((pairs, total)) => { + let summaries: Vec = pairs + .into_iter() + .map(|(e, c)| EntitySummary::from_entity_with_count(e, c)) + .collect(); HttpResponse::Ok().json(EntityListResponse { entities: summaries, total, -- 2.49.1 From bcd5312953d7c224b9b6e305e8cb2b9e71a20e6a Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Sun, 10 May 2026 19:14:58 -0400 Subject: [PATCH 05/19] knowledge: detect same-predicate object conflicts at read time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GET /knowledge/entities/{id} now flags facts as `in_conflict` when another active fact shares the same predicate but disagrees on the object (entity id or text value). Pure read-time computation in the handler — group facts by predicate, distinct-object count > 1 flags all members. No schema change; same shape as `is_current` on photo insights. The flag is intentionally a *signal*, not a hard constraint. Some predicates are legitimately multi-valued (friend_of, tagged_in, appears_in) — the curator UI surfaces the amber accent and lets the user reject the stale fact, accept both, or supersede one later once the supersession column lands. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/knowledge.rs | 50 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/src/knowledge.rs b/src/knowledge.rs index b8fc4ba..256c55d 100644 --- a/src/knowledge.rs +++ b/src/knowledge.rs @@ -109,6 +109,15 @@ pub struct FactDetail { pub source_photo: Option, pub source_insight_id: Option, pub created_at: i64, + /// Set when another active fact has the same subject+predicate but + /// a different object. Detected at read time (no schema change) by + /// the get_entity handler grouping facts by predicate. Some + /// predicates are legitimately multi-valued ("tagged_in", + /// "friend_of") so this is a *signal* for the curator, not a hard + /// invariant. Stale-data correction is the common case (Alice + /// lives_in NYC AND SF — one of these is wrong). + #[serde(skip_serializing_if = "std::ops::Not::not")] + pub in_conflict: bool, } #[derive(Serialize)] @@ -381,9 +390,50 @@ async fn get_entity( source_photo: f.source_photo, source_insight_id: f.source_insight_id, created_at: f.created_at, + in_conflict: false, }); } + // Conflict detection: within the active set, group by predicate; + // any predicate group with more than one distinct object (entity + // id or value) flags all its members. Some predicates are + // legitimately multi-valued (e.g. "tagged_in", "friend_of") so + // this is a curator hint, not a hard rule — `in_conflict` exists + // to surface stale-data candidates ("lives_in NYC" and + // "lives_in SF" can't both be current). + { + use std::collections::{HashMap, HashSet}; + let mut by_predicate: HashMap> = HashMap::new(); + for (idx, f) in facts.iter().enumerate() { + if f.status == "active" { + by_predicate + .entry(f.predicate.clone()) + .or_default() + .push(idx); + } + } + let mut to_flag: HashSet = HashSet::new(); + for indices in by_predicate.values() { + if indices.len() < 2 { + continue; + } + // Distinct (object_entity_id, object_value) tuples across + // these active facts. + let mut seen: HashSet<(Option, Option)> = HashSet::new(); + for &i in indices { + seen.insert((facts[i].object_entity_id, facts[i].object_value.clone())); + } + if seen.len() > 1 { + for &i in indices { + to_flag.insert(i); + } + } + } + for i in to_flag { + facts[i].in_conflict = true; + } + } + // Fetch photo links let photo_links: Vec = match dao.get_links_for_entity(&cx, entity_id) { Ok(links) => links.into_iter().map(PhotoLinkDetail::from).collect(), -- 2.49.1 From 01f5ad752738db82cc3ca5a0b3d37d2a0221e4a6 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Sun, 10 May 2026 19:25:55 -0400 Subject: [PATCH 06/19] knowledge: valid-time on facts + interval-aware conflict detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds bitemporal support to entity_facts. Existing `created_at` is transaction time (when we recorded the fact); the new `valid_from` / `valid_until` BIGINT columns are valid time (when the fact is/was true in the real world). NULL on either side = unbounded on that side, both NULL = "always-true / unknown" — matches the default state of every legacy row, no backfill needed. The split matters for time-bounded predicates like is_in_relationship_with / lives_in / works_at: recording the fact once doesn't mean the relationship is still ongoing. Same predicate across different windows ("lives_in NYC 2018-2020", "lives_in SF 2020-present") is no longer a conflict — the interval-aware check in get_entity only flags pairs whose windows overlap. Facts with no valid-time data still flag against everything (worst case for legacy rows — user adds dates to suppress). API surface: - POST /knowledge/facts accepts optional valid_from / valid_until. - PATCH /knowledge/facts/{id} accepts both with tri-state semantics: field omitted = leave alone, JSON null = clear to NULL, number = set. Implemented via a small serde helper around Option