diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index 1e39f9a..49528f7 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -3204,6 +3204,7 @@ Return ONLY the summary, nothing else."#, — surrounding events matter even when a contact is known.\n\ - Use recall_facts_for_photo + recall_entities to load any prior knowledge about subjects in the photo.\n\ - When you identify people / places / events / things, use store_entity + store_fact to grow the persistent memory.\n\ + - Before store_entity, call recall_entities to check whether a similar name already exists; reuse the existing entity_id rather than creating a near-duplicate (e.g. \"Sara\" vs \"Sarah J.\"). The DAO will collapse obvious cosine matches, but choosing the existing id keeps facts and photo links consolidated.\n\ - A tool returning no results is informative; continue with the others.", ); diff --git a/src/database/knowledge_dao.rs b/src/database/knowledge_dao.rs index f807f23..ffc368f 100644 --- a/src/database/knowledge_dao.rs +++ b/src/database/knowledge_dao.rs @@ -282,6 +282,20 @@ impl SqliteKnowledgeDao { } } +/// Cosine-similarity threshold above which a new entity collapses into an +/// existing same-type entity at upsert time. The agent's pre-flight name +/// search uses FTS5 prefix tokens, which misses near-dupes like +/// "Sarah" / "Sara" / "Sarah J." that share a description-rich embedding. +/// Override via `ENTITY_DEDUP_COSINE_THRESHOLD` env var when tuning. +const ENTITY_DEDUP_COSINE_THRESHOLD_DEFAULT: f32 = 0.92; + +fn entity_dedup_cosine_threshold() -> f32 { + std::env::var("ENTITY_DEDUP_COSINE_THRESHOLD") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(ENTITY_DEDUP_COSINE_THRESHOLD_DEFAULT) +} + impl KnowledgeDao for SqliteKnowledgeDao { // ----------------------------------------------------------------------- // Entity operations @@ -308,7 +322,7 @@ impl KnowledgeDao for SqliteKnowledgeDao { // Use lower() on both sides so existing dirty rows ("Person") still match. let name_lower = entity.name.to_lowercase(); let type_lower = entity.entity_type.to_lowercase(); - let existing: Option = entities + let mut existing: Option = entities .filter(diesel::dsl::sql::(&format!( "lower(name) = '{}' AND lower(entity_type) = '{}'", name_lower.replace('\'', "''"), @@ -318,6 +332,49 @@ impl KnowledgeDao for SqliteKnowledgeDao { .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + // Fuzzy-match fallback: if no exact name match and the incoming + // entity carries an embedding, compare against same-type entities' + // embeddings and collapse if any are above the cosine threshold. + if existing.is_none() + && let Some(new_emb_bytes) = entity.embedding.as_ref() + && let Ok(new_vec) = Self::deserialize_embedding(new_emb_bytes) + && !new_vec.is_empty() + { + let threshold = entity_dedup_cosine_threshold(); + let candidates: Vec = entities + .filter(embedding.is_not_null()) + .filter(entity_type.eq(&entity.entity_type)) + .filter(status.ne("rejected")) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + + let mut best: Option<(Entity, f32)> = None; + for cand in candidates { + let Some(cand_bytes) = cand.embedding.as_ref() else { + continue; + }; + let Ok(cand_vec) = Self::deserialize_embedding(cand_bytes) else { + continue; + }; + let sim = Self::cosine_similarity(&new_vec, &cand_vec); + if sim >= threshold && best.as_ref().is_none_or(|(_, s)| sim > *s) { + best = Some((cand, sim)); + } + } + + if let Some((cand, sim)) = best { + log::info!( + "entity dedup: collapsing new '{}' ({}) into existing '{}' (id={}, cos={:.3})", + entity.name, + entity.entity_type, + cand.name, + cand.id, + sim + ); + existing = Some(cand); + } + } + if let Some(existing_entity) = existing { // Update description, embedding, updated_at diesel::update(entities.filter(id.eq(existing_entity.id))) @@ -1276,4 +1333,87 @@ mod tests { "FK should reject fact whose persona doesn't exist" ); } + + #[test] + fn upsert_entity_collapses_near_duplicate_by_embedding() { + // The agent's pre-flight check uses FTS5 prefix tokens, which + // miss "Sarah" / "Sara" / "Sarah J." pairs. The DAO upsert is + // the safety net: if no exact (name, type) match but the new + // entity's embedding sits above the cosine threshold against an + // existing same-type entity, we collapse instead of inserting. + let cx = opentelemetry::Context::new(); + let conn = connection_with_fks_on(); + let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); + + let mut emb_a = vec![0.0_f32; 64]; + emb_a[0] = 1.0; + emb_a[1] = 0.5; + let mut emb_b_near = emb_a.clone(); + emb_b_near[2] = 0.05; // nudge — cosine still well above 0.92 + + // Seed an existing entity with the embedding. + let seeded = dao + .upsert_entity( + &cx, + InsertEntity { + name: "Sarah".to_string(), + entity_type: "person".to_string(), + description: "tagged friend".to_string(), + embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_a)), + confidence: 0.6, + status: "active".to_string(), + created_at: 0, + updated_at: 0, + }, + ) + .unwrap(); + + // A "different name" with a near-identical embedding should + // collapse onto the existing row, not create a new entity. + let collapsed = dao + .upsert_entity( + &cx, + InsertEntity { + name: "Sara".to_string(), + entity_type: "person".to_string(), + description: "tagged friend".to_string(), + embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_b_near)), + confidence: 0.6, + status: "active".to_string(), + created_at: 0, + updated_at: 0, + }, + ) + .unwrap(); + + assert_eq!( + collapsed.id, seeded.id, + "near-duplicate by cosine should reuse the existing entity id" + ); + + // And a clearly-different embedding under a different name should + // still create a new row. + let mut emb_unrelated = vec![0.0_f32; 64]; + emb_unrelated[10] = 1.0; + let distinct = dao + .upsert_entity( + &cx, + InsertEntity { + name: "Bob".to_string(), + entity_type: "person".to_string(), + description: String::new(), + embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_unrelated)), + confidence: 0.6, + status: "active".to_string(), + created_at: 0, + updated_at: 0, + }, + ) + .unwrap(); + + assert_ne!( + distinct.id, seeded.id, + "unrelated embedding should not collapse" + ); + } } diff --git a/src/knowledge.rs b/src/knowledge.rs index 30ceabd..f37ed1e 100644 --- a/src/knowledge.rs +++ b/src/knowledge.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use std::sync::Mutex; use crate::data::Claims; -use crate::database::models::{Entity, EntityFact, EntityPhotoLink}; +use crate::database::models::{Entity, EntityFact, EntityPhotoLink, InsertEntityFact}; use crate::database::{ EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, RecentActivity, }; @@ -179,6 +179,16 @@ pub struct FactPatchRequest { pub confidence: Option, } +#[derive(Deserialize)] +pub struct FactCreateRequest { + pub subject_entity_id: i32, + pub predicate: String, + pub object_entity_id: Option, + pub object_value: Option, + pub source_photo: Option, + pub confidence: Option, +} + #[derive(Deserialize)] pub struct EntityListQuery { #[serde(rename = "type")] @@ -222,7 +232,11 @@ where .route(web::patch().to(patch_entity::)) .route(web::delete().to(delete_entity::)), ) - .service(web::resource("/facts").route(web::get().to(list_facts::))) + .service( + web::resource("/facts") + .route(web::get().to(list_facts::)) + .route(web::post().to(create_fact::)), + ) .service( web::resource("/facts/{id}") .route(web::patch().to(patch_fact::)) @@ -535,6 +549,100 @@ async fn list_facts( } } +async fn create_fact( + req: HttpRequest, + claims: Claims, + body: web::Json, + dao: web::Data>, + persona_dao: PersonaDaoData, +) -> impl Responder { + if body.object_entity_id.is_none() && body.object_value.is_none() { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": "object_entity_id or object_value is required" + })); + } + if body.predicate.trim().is_empty() { + return HttpResponse::BadRequest() + .json(serde_json::json!({"error": "predicate must not be empty"})); + } + + // Persona scoping: facts are written under the active single persona. + // PersonaFilter::All is read-only ("hive-mind" view); callers should + // pin a specific persona for writes via X-Persona-Id. + let persona = resolve_persona_filter(&req, &claims, &persona_dao); + let (user_id, persona_id) = match &persona { + PersonaFilter::Single { user_id, persona_id } => (*user_id, persona_id.clone()), + PersonaFilter::All { user_id } => (*user_id, "default".to_string()), + }; + + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + + // Verify subject entity exists. + match dao.get_entity_by_id(&cx, body.subject_entity_id) { + Ok(None) => { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": format!("Subject entity {} not found", body.subject_entity_id) + })); + } + Err(e) => { + log::error!("create_fact subject lookup error: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + Ok(Some(_)) => {} + } + + // Optional object entity validation when supplied. + if let Some(oid) = body.object_entity_id { + match dao.get_entity_by_id(&cx, oid) { + Ok(None) => { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": format!("Object entity {} not found", oid) + })); + } + Err(e) => { + log::error!("create_fact object lookup error: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + Ok(Some(_)) => {} + } + } + + let now = Utc::now().timestamp(); + let confidence = body.confidence.unwrap_or(0.6).clamp(0.0, 0.95); + + let insert = InsertEntityFact { + subject_entity_id: body.subject_entity_id, + predicate: body.predicate.trim().to_string(), + object_entity_id: body.object_entity_id, + object_value: body.object_value.clone(), + source_photo: body.source_photo.clone(), + source_insight_id: None, + confidence, + status: "active".to_string(), + created_at: now, + persona_id, + user_id, + }; + + match dao.upsert_fact(&cx, insert) { + Ok((fact, is_new)) => { + let status = if is_new { + actix_web::http::StatusCode::CREATED + } else { + actix_web::http::StatusCode::OK + }; + HttpResponse::build(status).json(fact) + } + Err(e) => { + log::error!("create_fact upsert error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + async fn patch_fact( _claims: Claims, id: web::Path,