#![allow(dead_code)] use diesel::prelude::*; use diesel::sqlite::SqliteConnection; use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use crate::database::models::{ Entity, EntityFact, EntityPhotoLink, InsertEntity, InsertEntityFact, InsertEntityPhotoLink, }; use crate::database::schema; use crate::database::{DbError, DbErrorKind, connect}; use crate::otel::trace_db_call; // --------------------------------------------------------------------------- // Entity type normalisation // --------------------------------------------------------------------------- /// Canonicalise a model-supplied entity_type to a consistent lowercase form. /// Weak models frequently vary capitalisation ("Person" vs "person") or use /// synonym types ("location" vs "place"). Normalising here prevents duplicate /// entities that differ only by type spelling. pub(crate) fn normalize_entity_type(raw: &str) -> String { match raw.to_lowercase().as_str() { "person" | "people" | "human" | "individual" | "contact" => "person", "place" | "location" | "venue" | "site" | "area" | "landmark" => "place", "event" | "occasion" | "activity" | "celebration" => "event", "thing" | "object" | "item" | "product" => "thing", other => other, } .to_string() } // --------------------------------------------------------------------------- // Filter / patch types // --------------------------------------------------------------------------- pub struct EntityFilter { pub entity_type: Option, /// "active" | "reviewed" | "rejected" | "all" pub status: Option, /// LIKE match on name and description pub search: Option, pub limit: i64, pub offset: i64, } /// Sort key for the curation list. Name = alphabetical clustering /// (good for spotting near-duplicates like Sara / Sarah / Sarah J.). /// FactCount = surface heavily-used entities first, demote 0-fact /// noise to the bottom. UpdatedDesc = legacy "newest activity first". #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum EntitySort { UpdatedDesc, NameAsc, FactCountDesc, } pub struct FactFilter { pub entity_id: Option, /// "active" | "reviewed" | "rejected" | "all" pub status: Option, pub predicate: Option, pub persona: PersonaFilter, pub limit: i64, pub offset: i64, } /// Persona scoping for fact reads. `Single` filters to one persona's /// view; `All` is the hive-mind read used when a persona has /// `include_all_memories=true` in the personas table. Both variants /// carry `user_id` because facts are user-isolated — two users with /// the same 'default' persona must not see each other's facts (this /// is enforced at the schema level by the composite FK in migration /// 2026-05-10). Entities and photo-links are always shared and don't /// take a persona filter. #[derive(Debug, Clone)] pub enum PersonaFilter { Single { user_id: i32, persona_id: String }, All { user_id: i32 }, } impl PersonaFilter { pub fn user_id(&self) -> i32 { match self { Self::Single { user_id, .. } => *user_id, Self::All { user_id } => *user_id, } } } pub struct EntityPatch { pub name: Option, pub description: Option, pub status: Option, pub confidence: Option, } pub struct FactPatch { pub predicate: Option, pub object_value: Option, pub status: Option, pub confidence: Option, /// Real-world valid-time bounds. Outer Some = "patch this column"; /// inner Some(val) = set to that unix-seconds value; inner None = /// clear back to NULL ("unbounded"). The double-Option lets the /// HTTP layer distinguish "field omitted" (leave alone) from /// "field sent as null" (clear) — needed for these specifically /// because there's no sentinel string-empty equivalent like the /// other fields have. pub valid_from: Option>, pub valid_until: Option>, } pub struct RecentActivity { pub entities: Vec, pub facts: Vec, } // --------------------------------------------------------------------------- // Trait // --------------------------------------------------------------------------- pub trait KnowledgeDao: Sync + Send { // --- Entity --- fn upsert_entity( &mut self, cx: &opentelemetry::Context, entity: InsertEntity, ) -> Result; fn get_entity_by_id( &mut self, cx: &opentelemetry::Context, id: i32, ) -> Result, DbError>; fn get_entity_by_name( &mut self, cx: &opentelemetry::Context, name: &str, entity_type: Option<&str>, ) -> Result, DbError>; fn get_entities_with_embeddings( &mut self, cx: &opentelemetry::Context, entity_type: Option<&str>, ) -> Result, DbError>; fn list_entities( &mut self, cx: &opentelemetry::Context, filter: EntityFilter, ) -> Result<(Vec, i64), DbError>; /// List entities alongside a persona-scoped fact count for each. /// Powers the curation surface — sorting by fact count surfaces /// the heavily-used entities and demotes 0-fact noise. Counting /// is restricted to non-rejected facts under the active persona /// scope so a switch in the persona picker re-orders the list. fn list_entities_with_fact_counts( &mut self, cx: &opentelemetry::Context, filter: EntityFilter, sort: EntitySort, persona: &PersonaFilter, ) -> Result<(Vec<(Entity, i64)>, i64), DbError>; fn update_entity_status( &mut self, cx: &opentelemetry::Context, id: i32, status: &str, ) -> Result<(), DbError>; fn update_entity( &mut self, cx: &opentelemetry::Context, id: i32, patch: EntityPatch, ) -> Result, DbError>; fn delete_entity(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>; fn merge_entities( &mut self, cx: &opentelemetry::Context, source_id: i32, target_id: i32, ) -> Result<(i64, i64), DbError>; // --- Facts --- fn upsert_fact( &mut self, cx: &opentelemetry::Context, fact: InsertEntityFact, ) -> Result<(EntityFact, bool), DbError>; fn get_facts_for_entity( &mut self, cx: &opentelemetry::Context, entity_id: i32, persona: &PersonaFilter, ) -> Result, DbError>; fn list_facts( &mut self, cx: &opentelemetry::Context, filter: FactFilter, ) -> Result<(Vec, i64), DbError>; fn update_fact( &mut self, cx: &opentelemetry::Context, id: i32, patch: FactPatch, ) -> Result, DbError>; fn update_facts_insight_id( &mut self, cx: &opentelemetry::Context, source_photo: &str, insight_id: i32, ) -> Result<(), DbError>; fn delete_fact(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>; /// Mark an old fact as superseded by a new one. Atomically: /// - reads the new fact's valid_from /// - sets old.superseded_by = new_id /// - sets old.status = 'superseded' /// - stamps old.valid_until = new.valid_from (if not already /// set; otherwise leaves it) /// /// Returns the updated old fact. Errors if either id is missing. fn supersede_fact( &mut self, cx: &opentelemetry::Context, old_id: i32, new_id: i32, ) -> Result, DbError>; // --- Photo links --- fn upsert_photo_link( &mut self, cx: &opentelemetry::Context, link: InsertEntityPhotoLink, ) -> Result<(), DbError>; fn delete_photo_links_for_file( &mut self, cx: &opentelemetry::Context, file_path: &str, ) -> Result<(), DbError>; fn get_links_for_photo( &mut self, cx: &opentelemetry::Context, file_path: &str, ) -> Result, DbError>; fn get_links_for_entity( &mut self, cx: &opentelemetry::Context, entity_id: i32, ) -> Result, DbError>; // --- Audit --- fn get_recent_activity( &mut self, cx: &opentelemetry::Context, since: i64, limit: i64, persona: &PersonaFilter, ) -> Result; } // --------------------------------------------------------------------------- // SQLite implementation // --------------------------------------------------------------------------- pub struct SqliteKnowledgeDao { connection: Arc>, } impl Default for SqliteKnowledgeDao { fn default() -> Self { Self::new() } } impl SqliteKnowledgeDao { pub fn new() -> Self { SqliteKnowledgeDao { connection: Arc::new(Mutex::new(connect())), } } pub fn from_connection(conn: Arc>) -> Self { SqliteKnowledgeDao { connection: conn } } fn serialize_embedding(vec: &[f32]) -> Vec { vec.iter().flat_map(|f| f.to_le_bytes()).collect() } fn deserialize_embedding(bytes: &[u8]) -> Result, DbError> { if !bytes.len().is_multiple_of(4) { return Err(DbError::new(DbErrorKind::QueryError)); } Ok(bytes .chunks_exact(4) .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]])) .collect()) } pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 { if a.len() != b.len() || a.is_empty() { return 0.0; } let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum(); let mag_a: f32 = a.iter().map(|x| x * x).sum::().sqrt(); let mag_b: f32 = b.iter().map(|x| x * x).sum::().sqrt(); if mag_a == 0.0 || mag_b == 0.0 { 0.0 } else { dot / (mag_a * mag_b) } } } /// Cosine-similarity threshold above which a new entity collapses into an /// existing same-type entity at upsert time. The agent's pre-flight name /// search uses FTS5 prefix tokens, which misses near-dupes like /// "Sarah" / "Sara" / "Sarah J." that share a description-rich embedding. /// Override via `ENTITY_DEDUP_COSINE_THRESHOLD` env var when tuning. const ENTITY_DEDUP_COSINE_THRESHOLD_DEFAULT: f32 = 0.92; fn entity_dedup_cosine_threshold() -> f32 { std::env::var("ENTITY_DEDUP_COSINE_THRESHOLD") .ok() .and_then(|v| v.parse::().ok()) .unwrap_or(ENTITY_DEDUP_COSINE_THRESHOLD_DEFAULT) } impl KnowledgeDao for SqliteKnowledgeDao { // ----------------------------------------------------------------------- // Entity operations // ----------------------------------------------------------------------- fn upsert_entity( &mut self, cx: &opentelemetry::Context, entity: InsertEntity, ) -> Result { trace_db_call(cx, "insert", "upsert_entity", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); // Normalise type before lookup and insert so that model variations // ("Person" / "person", "location" / "place") collapse to one row. let entity = InsertEntity { entity_type: normalize_entity_type(&entity.entity_type), ..entity }; // Case-insensitive lookup by name + entity_type. // Use lower() on both sides so existing dirty rows ("Person") still match. let name_lower = entity.name.to_lowercase(); let type_lower = entity.entity_type.to_lowercase(); let mut existing: Option = entities .filter(diesel::dsl::sql::(&format!( "lower(name) = '{}' AND lower(entity_type) = '{}'", name_lower.replace('\'', "''"), type_lower.replace('\'', "''") ))) .first::(conn.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; // Fuzzy-match fallback: if no exact name match and the incoming // entity carries an embedding, compare against same-type entities' // embeddings and collapse if any are above the cosine threshold. if existing.is_none() && let Some(new_emb_bytes) = entity.embedding.as_ref() && let Ok(new_vec) = Self::deserialize_embedding(new_emb_bytes) && !new_vec.is_empty() { let threshold = entity_dedup_cosine_threshold(); let candidates: Vec = entities .filter(embedding.is_not_null()) .filter(entity_type.eq(&entity.entity_type)) .filter(status.ne("rejected")) .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; let mut best: Option<(Entity, f32)> = None; for cand in candidates { let Some(cand_bytes) = cand.embedding.as_ref() else { continue; }; let Ok(cand_vec) = Self::deserialize_embedding(cand_bytes) else { continue; }; let sim = Self::cosine_similarity(&new_vec, &cand_vec); if sim >= threshold && best.as_ref().is_none_or(|(_, s)| sim > *s) { best = Some((cand, sim)); } } if let Some((cand, sim)) = best { log::info!( "entity dedup: collapsing new '{}' ({}) into existing '{}' (id={}, cos={:.3})", entity.name, entity.entity_type, cand.name, cand.id, sim ); existing = Some(cand); } } if let Some(existing_entity) = existing { // Update description, embedding, updated_at diesel::update(entities.filter(id.eq(existing_entity.id))) .set(( description.eq(&entity.description), embedding.eq(&entity.embedding), updated_at.eq(entity.updated_at), )) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; entities .filter(id.eq(existing_entity.id)) .first::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) } else { diesel::insert_into(entities) .values(&entity) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Insert error: {}", e))?; entities .order(id.desc()) .first::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) } }) .map_err(|_| DbError::new(DbErrorKind::InsertError)) } fn get_entity_by_id( &mut self, cx: &opentelemetry::Context, entity_id: i32, ) -> Result, DbError> { trace_db_call(cx, "query", "get_entity_by_id", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); entities .filter(id.eq(entity_id)) .first::(conn.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_entity_by_name( &mut self, cx: &opentelemetry::Context, entity_name: &str, entity_type_filter: Option<&str>, ) -> Result, DbError> { trace_db_call(cx, "query", "get_entity_by_name", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let name_lower = entity_name.to_lowercase().replace('\'', "''"); let mut sql = format!("lower(name) = '{}'", name_lower); if let Some(et) = entity_type_filter { sql.push_str(&format!(" AND entity_type = '{}'", et.replace('\'', "''"))); } sql.push_str(" AND status != 'rejected'"); entities .filter(diesel::dsl::sql::(&sql)) .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_entities_with_embeddings( &mut self, cx: &opentelemetry::Context, entity_type_filter: Option<&str>, ) -> Result, DbError> { trace_db_call(cx, "query", "get_entities_with_embeddings", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let mut query = entities .filter(embedding.is_not_null()) .filter(status.ne("rejected")) .into_boxed(); if let Some(et) = entity_type_filter { query = query.filter(entity_type.eq(et)); } query .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn list_entities( &mut self, cx: &opentelemetry::Context, filter: EntityFilter, ) -> Result<(Vec, i64), DbError> { trace_db_call(cx, "query", "list_entities", |_span| { use diesel::dsl::count_star; use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let mut query = entities.into_boxed(); if let Some(ref et) = filter.entity_type { query = query.filter(entity_type.eq(et)); } let status_val = filter.status.as_deref().unwrap_or("active"); if status_val != "all" { query = query.filter(status.eq(status_val)); } if let Some(ref search_term) = filter.search { let pattern = format!("%{}%", search_term); query = query.filter(name.like(pattern.clone()).or(description.like(pattern))); } // Count with same filters applied (build separately since boxed query is consumed) let mut count_query = entities.into_boxed(); if let Some(ref et) = filter.entity_type { count_query = count_query.filter(entity_type.eq(et)); } let status_val2 = filter.status.as_deref().unwrap_or("active"); if status_val2 != "all" { count_query = count_query.filter(status.eq(status_val2)); } if let Some(ref search_term) = filter.search { let pattern = format!("%{}%", search_term); count_query = count_query.filter(name.like(pattern.clone()).or(description.like(pattern))); } let total: i64 = count_query .select(count_star()) .first(conn.deref_mut()) .unwrap_or(0); let results = query .order(updated_at.desc()) .limit(filter.limit) .offset(filter.offset) .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; Ok((results, total)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn list_entities_with_fact_counts( &mut self, cx: &opentelemetry::Context, filter: EntityFilter, sort: EntitySort, persona: &PersonaFilter, ) -> Result<(Vec<(Entity, i64)>, i64), DbError> { trace_db_call(cx, "query", "list_entities_with_fact_counts", |_span| { use diesel::sql_query; use diesel::sql_types::{BigInt, Integer, Text}; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); // Build WHERE fragments. Inline-safe values are bound; status // / sort keywords are validated against fixed sets. let mut where_parts: Vec = Vec::new(); let mut bind_types: Vec<&'static str> = Vec::new(); let mut bind_strs: Vec = Vec::new(); if filter.entity_type.is_some() { where_parts.push("e.entity_type = ?".to_string()); bind_types.push("text"); bind_strs.push(filter.entity_type.clone().unwrap()); } let status_val = filter.status.as_deref().unwrap_or("active"); if status_val != "all" { where_parts.push("e.status = ?".to_string()); bind_types.push("text"); bind_strs.push(status_val.to_string()); } if let Some(ref s) = filter.search { where_parts.push("(e.name LIKE ? OR e.description LIKE ?)".to_string()); bind_types.push("text"); bind_types.push("text"); let pat = format!("%{}%", s); bind_strs.push(pat.clone()); bind_strs.push(pat); } let where_clause = if where_parts.is_empty() { String::new() } else { format!("WHERE {}", where_parts.join(" AND ")) }; // Persona-scoped fact-count subquery. Single = filter on // (user_id, persona_id); All = union across the user's // personas (mirror PersonaFilter::All read semantics). let fact_count_join = match persona { PersonaFilter::Single { user_id: _, persona_id: _ } => { "LEFT JOIN (\ SELECT subject_entity_id, COUNT(*) AS fact_count \ FROM entity_facts \ WHERE user_id = ? AND persona_id = ? AND status != 'rejected' \ GROUP BY subject_entity_id\ ) fc ON fc.subject_entity_id = e.id" } PersonaFilter::All { user_id: _ } => { "LEFT JOIN (\ SELECT subject_entity_id, COUNT(*) AS fact_count \ FROM entity_facts \ WHERE user_id = ? AND status != 'rejected' \ GROUP BY subject_entity_id\ ) fc ON fc.subject_entity_id = e.id" } }; let order_by = match sort { EntitySort::UpdatedDesc => "e.updated_at DESC", EntitySort::NameAsc => "lower(e.name) ASC", EntitySort::FactCountDesc => { "COALESCE(fc.fact_count, 0) DESC, lower(e.name) ASC" } }; let select_sql = format!( "SELECT e.id, e.name, e.entity_type, e.description, e.embedding, \ e.confidence, e.status, e.created_at, e.updated_at, \ COALESCE(fc.fact_count, 0) AS fact_count \ FROM entities e \ {fact_count_join} \ {where_clause} \ ORDER BY {order_by} \ LIMIT ? OFFSET ?" ); let count_sql = format!( "SELECT COUNT(*) AS total FROM entities e {where_clause}" ); // ── Total count ───────────────────────────────────────── #[derive(diesel::QueryableByName)] struct TotalRow { #[diesel(sql_type = BigInt)] total: i64, } let mut count_q = sql_query(count_sql).into_boxed(); for s in &bind_strs { count_q = count_q.bind::(s.clone()); } let total: i64 = count_q .get_result::(conn.deref_mut()) .map(|r| r.total) .unwrap_or(0); // ── Page query ────────────────────────────────────────── #[derive(diesel::QueryableByName)] struct EntityWithCountRow { #[diesel(sql_type = Integer)] id: i32, #[diesel(sql_type = Text)] name: String, #[diesel(sql_type = Text)] entity_type: String, #[diesel(sql_type = Text)] description: String, #[diesel(sql_type = diesel::sql_types::Nullable)] embedding: Option>, #[diesel(sql_type = diesel::sql_types::Float)] confidence: f32, #[diesel(sql_type = Text)] status: String, #[diesel(sql_type = BigInt)] created_at: i64, #[diesel(sql_type = BigInt)] updated_at: i64, #[diesel(sql_type = BigInt)] fact_count: i64, } let mut q = sql_query(select_sql).into_boxed(); // Persona binds first (they're earlier in the SQL — inside // the subquery LEFT JOIN). match persona { PersonaFilter::Single { user_id, persona_id } => { q = q .bind::(*user_id) .bind::(persona_id.clone()); } PersonaFilter::All { user_id } => { q = q.bind::(*user_id); } } // Then WHERE binds in order. for s in &bind_strs { q = q.bind::(s.clone()); } // Then LIMIT / OFFSET. q = q .bind::(filter.limit) .bind::(filter.offset); let rows: Vec = q .load(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; let pairs: Vec<(Entity, i64)> = rows .into_iter() .map(|r| { ( Entity { id: r.id, name: r.name, entity_type: r.entity_type, description: r.description, embedding: r.embedding, confidence: r.confidence, status: r.status, created_at: r.created_at, updated_at: r.updated_at, }, r.fact_count, ) }) .collect(); // Sink unused `_bind_types`; keeping it as documentation. let _ = bind_types; Ok((pairs, total)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn update_entity_status( &mut self, cx: &opentelemetry::Context, entity_id: i32, new_status: &str, ) -> Result<(), DbError> { trace_db_call(cx, "update", "update_entity_status", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); diesel::update(entities.filter(id.eq(entity_id))) .set(status.eq(new_status)) .execute(conn.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Update error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } fn update_entity( &mut self, cx: &opentelemetry::Context, entity_id: i32, patch: EntityPatch, ) -> Result, DbError> { trace_db_call(cx, "update", "update_entity", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let now = chrono::Utc::now().timestamp(); if let Some(ref new_name) = patch.name { diesel::update(entities.filter(id.eq(entity_id))) .set((name.eq(new_name), updated_at.eq(now))) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update name error: {}", e))?; } if let Some(ref new_desc) = patch.description { diesel::update(entities.filter(id.eq(entity_id))) .set((description.eq(new_desc), updated_at.eq(now))) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update description error: {}", e))?; } if let Some(ref new_status) = patch.status { diesel::update(entities.filter(id.eq(entity_id))) .set((status.eq(new_status), updated_at.eq(now))) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update status error: {}", e))?; } if let Some(new_confidence) = patch.confidence { diesel::update(entities.filter(id.eq(entity_id))) .set((confidence.eq(new_confidence), updated_at.eq(now))) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update confidence error: {}", e))?; } entities .filter(id.eq(entity_id)) .first::(conn.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } fn delete_entity( &mut self, cx: &opentelemetry::Context, entity_id: i32, ) -> Result<(), DbError> { trace_db_call(cx, "delete", "delete_entity", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); // entity_facts has a CHECK constraint requiring // `object_entity_id IS NOT NULL OR object_value IS NOT NULL`. // The FK on object_entity_id is ON DELETE SET NULL — but // facts that pointed at the deleted entity *only* via the // entity reference (the common case for relational facts // like "Alice is_friend_of Bob") have no object_value, so // SET NULL would leave them with both NULLs and the CHECK // aborts the whole DELETE. Pre-delete those facts in a // transaction so the CASCADE / SET NULL chain on what // remains can fire cleanly. // // Long-term fix is to change the FK to ON DELETE CASCADE // via a table-rebuild migration, but the DAO-side workaround // is sufficient and less invasive. conn.transaction::<(), diesel::result::Error, _>(|conn| { use schema::entity_facts::dsl as ef; diesel::delete( ef::entity_facts .filter(ef::object_entity_id.eq(entity_id)) .filter(ef::object_value.is_null()), ) .execute(conn)?; diesel::delete(entities.filter(id.eq(entity_id))).execute(conn)?; Ok(()) }) .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) }) .map_err(|e| { // Surface the actual diesel error string before collapsing // to the opaque DbErrorKind::QueryError. log::warn!("delete_entity({}) failed: {}", entity_id, e); DbError::new(DbErrorKind::QueryError) }) } fn merge_entities( &mut self, cx: &opentelemetry::Context, source_id: i32, target_id: i32, ) -> Result<(i64, i64), DbError> { trace_db_call(cx, "update", "merge_entities", |_span| { let mut conn = self.connection.lock().expect("KnowledgeDao lock"); conn.transaction::<(i64, i64), diesel::result::Error, _>(|conn| { use schema::entity_facts::dsl as ef; // 1. Re-point facts where source is subject let facts_updated = diesel::update(ef::entity_facts.filter(ef::subject_entity_id.eq(source_id))) .set(ef::subject_entity_id.eq(target_id)) .execute(conn)? as i64; // 2. Re-point facts where source is object diesel::update(ef::entity_facts.filter(ef::object_entity_id.eq(source_id))) .set(ef::object_entity_id.eq(Some(target_id))) .execute(conn)?; // 3. Copy photo links to target (INSERT OR IGNORE to skip duplicates) let links_updated = diesel::sql_query( "INSERT OR IGNORE INTO entity_photo_links (entity_id, library_id, rel_path, role) \ SELECT ?, library_id, rel_path, role FROM entity_photo_links WHERE entity_id = ?", ) .bind::(target_id) .bind::(source_id) .execute(conn)? as i64; // 4. Delete source entity (FK CASCADE removes remaining facts/links) diesel::delete( schema::entities::dsl::entities.filter(schema::entities::dsl::id.eq(source_id)), ) .execute(conn)?; Ok((facts_updated, links_updated)) }) .map_err(|e| anyhow::anyhow!("Merge transaction error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } // ----------------------------------------------------------------------- // Fact operations // ----------------------------------------------------------------------- fn upsert_fact( &mut self, cx: &opentelemetry::Context, fact: InsertEntityFact, ) -> Result<(EntityFact, bool), DbError> { trace_db_call(cx, "insert", "upsert_fact", |_span| { use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); // Look for an identical active fact AUTHORED BY THE SAME // (USER, PERSONA). The same claim from a different persona — // or from a different user with the same persona name — is a // separate fact (each persona's voice/confidence is its own), // not a confidence bump on someone else's row. let mut dup_query = entity_facts .filter(subject_entity_id.eq(fact.subject_entity_id)) .filter(predicate.eq(&fact.predicate)) .filter(user_id.eq(fact.user_id)) .filter(persona_id.eq(&fact.persona_id)) .filter(status.ne("rejected")) .into_boxed(); match &fact.object_entity_id { Some(oid) => dup_query = dup_query.filter(object_entity_id.eq(oid)), None => dup_query = dup_query.filter(object_entity_id.is_null()), } match &fact.object_value { Some(ov) => dup_query = dup_query.filter(object_value.eq(ov)), None => dup_query = dup_query.filter(object_value.is_null()), } let existing: Option = dup_query .first::(conn.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; if let Some(existing_fact) = existing { // Corroborate: bump confidence by 0.1 capped at 0.95 let new_confidence = (existing_fact.confidence + 0.1).min(0.95); diesel::update(entity_facts.filter(id.eq(existing_fact.id))) .set(confidence.eq(new_confidence)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update confidence error: {}", e))?; let updated = entity_facts .filter(id.eq(existing_fact.id)) .first::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; Ok((updated, false)) // false = corroborated, not newly created } else { diesel::insert_into(entity_facts) .values(&fact) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Insert error: {}", e))?; let inserted = entity_facts .order(id.desc()) .first::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; Ok((inserted, true)) // true = newly created } }) .map_err(|_| DbError::new(DbErrorKind::InsertError)) } fn get_facts_for_entity( &mut self, cx: &opentelemetry::Context, entity_id: i32, persona: &PersonaFilter, ) -> Result, DbError> { trace_db_call(cx, "query", "get_facts_for_entity", |_span| { use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let mut q = entity_facts .filter(subject_entity_id.eq(entity_id)) .filter(status.ne("rejected")) .filter(user_id.eq(persona.user_id())) .into_boxed(); if let PersonaFilter::Single { persona_id: pid, .. } = persona { q = q.filter(persona_id.eq(pid.clone())); } q.load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn list_facts( &mut self, cx: &opentelemetry::Context, filter: FactFilter, ) -> Result<(Vec, i64), DbError> { trace_db_call(cx, "query", "list_facts", |_span| { use diesel::dsl::count_star; use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let mut query = entity_facts.into_boxed(); let mut count_query = entity_facts.into_boxed(); // user_id always applies — facts are user-isolated. let uid = filter.persona.user_id(); query = query.filter(user_id.eq(uid)); count_query = count_query.filter(user_id.eq(uid)); if let Some(eid) = filter.entity_id { query = query.filter(subject_entity_id.eq(eid)); count_query = count_query.filter(subject_entity_id.eq(eid)); } let status_val = filter.status.as_deref().unwrap_or("active"); if status_val != "all" { query = query.filter(status.eq(status_val)); count_query = count_query.filter(status.eq(status_val)); } if let Some(ref pred) = filter.predicate { query = query.filter(predicate.eq(pred)); count_query = count_query.filter(predicate.eq(pred)); } if let PersonaFilter::Single { persona_id: ref pid, .. } = filter.persona { query = query.filter(persona_id.eq(pid.clone())); count_query = count_query.filter(persona_id.eq(pid.clone())); } let total: i64 = count_query .select(count_star()) .first(conn.deref_mut()) .unwrap_or(0); let results = query .order(created_at.desc()) .limit(filter.limit) .offset(filter.offset) .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; Ok((results, total)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn update_fact( &mut self, cx: &opentelemetry::Context, fact_id: i32, patch: FactPatch, ) -> Result, DbError> { trace_db_call(cx, "update", "update_fact", |_span| { use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); if let Some(ref new_predicate) = patch.predicate { diesel::update(entity_facts.filter(id.eq(fact_id))) .set(predicate.eq(new_predicate)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; } if let Some(ref new_value) = patch.object_value { diesel::update(entity_facts.filter(id.eq(fact_id))) .set(object_value.eq(new_value)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; } if let Some(ref new_status) = patch.status { diesel::update(entity_facts.filter(id.eq(fact_id))) .set(status.eq(new_status)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; } if let Some(new_confidence) = patch.confidence { diesel::update(entity_facts.filter(id.eq(fact_id))) .set(confidence.eq(new_confidence)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; } if let Some(new_from) = patch.valid_from { diesel::update(entity_facts.filter(id.eq(fact_id))) .set(valid_from.eq(new_from)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; } if let Some(new_until) = patch.valid_until { diesel::update(entity_facts.filter(id.eq(fact_id))) .set(valid_until.eq(new_until)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; } entity_facts .filter(id.eq(fact_id)) .first::(conn.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } fn update_facts_insight_id( &mut self, cx: &opentelemetry::Context, photo_path: &str, insight_id: i32, ) -> Result<(), DbError> { trace_db_call(cx, "update", "update_facts_insight_id", |_span| { use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); diesel::update( entity_facts .filter(source_photo.eq(photo_path)) .filter(source_insight_id.is_null()), ) .set(source_insight_id.eq(insight_id)) .execute(conn.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Update error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } fn delete_fact(&mut self, cx: &opentelemetry::Context, fact_id: i32) -> Result<(), DbError> { trace_db_call(cx, "delete", "delete_fact", |_span| { use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); // Clear dangling supersession pointers from any fact this // one had retired — there's no FK on superseded_by (SQLite // can't ALTER ADD with REFERENCES) so we do it manually. // Sibling rows lose the pointer but stay 'superseded' — // the user's historical correction survives the cleanup. conn.transaction::<(), diesel::result::Error, _>(|conn| { diesel::update(entity_facts.filter(superseded_by.eq(fact_id))) .set(superseded_by.eq::>(None)) .execute(conn)?; diesel::delete(entity_facts.filter(id.eq(fact_id))).execute(conn)?; Ok(()) }) .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) }) .map_err(|e| { log::warn!("delete_fact({}) failed: {}", fact_id, e); DbError::new(DbErrorKind::QueryError) }) } fn supersede_fact( &mut self, cx: &opentelemetry::Context, old_id: i32, new_id: i32, ) -> Result, DbError> { trace_db_call(cx, "update", "supersede_fact", |_span| { use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); if old_id == new_id { return Err(anyhow::anyhow!( "supersede_fact: old_id and new_id must differ" )); } conn.transaction::, diesel::result::Error, _>( |conn| { // Pull the new fact's valid_from so we can close // the old fact's interval at the same point. let new_fact: Option = entity_facts .filter(id.eq(new_id)) .first::(conn) .optional()?; let Some(new_fact) = new_fact else { return Ok(None); }; // Verify the old fact exists before touching it — // returning None lets the handler 404 cleanly. let old_fact: Option = entity_facts .filter(id.eq(old_id)) .first::(conn) .optional()?; if old_fact.is_none() { return Ok(None); } // Only stamp valid_until if the user hasn't // already set it — respecting hand-curated bounds. let target_valid_until = old_fact .as_ref() .and_then(|f| f.valid_until) .or(new_fact.valid_from); diesel::update(entity_facts.filter(id.eq(old_id))) .set(( status.eq("superseded"), superseded_by.eq(Some(new_id)), valid_until.eq(target_valid_until), )) .execute(conn)?; entity_facts .filter(id.eq(old_id)) .first::(conn) .optional() }, ) .map_err(|e| anyhow::anyhow!("Supersede error: {}", e)) }) .map_err(|e| { log::warn!( "supersede_fact(old={}, new={}) failed: {}", old_id, new_id, e ); DbError::new(DbErrorKind::UpdateError) }) } // ----------------------------------------------------------------------- // Photo link operations // ----------------------------------------------------------------------- fn upsert_photo_link( &mut self, cx: &opentelemetry::Context, link: InsertEntityPhotoLink, ) -> Result<(), DbError> { trace_db_call(cx, "insert", "upsert_photo_link", |_span| { let mut conn = self.connection.lock().expect("KnowledgeDao lock"); // INSERT OR IGNORE respects the UNIQUE(entity_id, library_id, rel_path, role) constraint diesel::sql_query( "INSERT OR IGNORE INTO entity_photo_links (entity_id, library_id, rel_path, role) VALUES (?, ?, ?, ?)" ) .bind::(link.entity_id) .bind::(link.library_id) .bind::(&link.file_path) .bind::(&link.role) .execute(conn.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Insert error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::InsertError)) } fn delete_photo_links_for_file( &mut self, cx: &opentelemetry::Context, file_path_val: &str, ) -> Result<(), DbError> { trace_db_call(cx, "delete", "delete_photo_links_for_file", |_span| { use schema::entity_photo_links::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); diesel::delete(entity_photo_links.filter(rel_path.eq(file_path_val))) .execute(conn.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_links_for_photo( &mut self, cx: &opentelemetry::Context, file_path_val: &str, ) -> Result, DbError> { trace_db_call(cx, "query", "get_links_for_photo", |_span| { use schema::entity_photo_links::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); entity_photo_links .filter(rel_path.eq(file_path_val)) .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_links_for_entity( &mut self, cx: &opentelemetry::Context, entity_id_val: i32, ) -> Result, DbError> { trace_db_call(cx, "query", "get_links_for_entity", |_span| { use schema::entity_photo_links::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); entity_photo_links .filter(entity_id.eq(entity_id_val)) .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } // ----------------------------------------------------------------------- // Audit // ----------------------------------------------------------------------- fn get_recent_activity( &mut self, cx: &opentelemetry::Context, since: i64, limit: i64, persona: &PersonaFilter, ) -> Result { trace_db_call(cx, "query", "get_recent_activity", |_span| { use schema::entities::dsl as e; use schema::entity_facts::dsl as ef; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); // Entities are shared — recency is global. let recent_entities = e::entities .filter(e::created_at.gt(since)) .order(e::created_at.desc()) .limit(limit) .load::(conn.deref_mut()) .map_err(|err| anyhow::anyhow!("Query error: {}", err))?; let mut facts_q = ef::entity_facts .filter(ef::created_at.gt(since)) .filter(ef::user_id.eq(persona.user_id())) .into_boxed(); if let PersonaFilter::Single { persona_id: pid, .. } = persona { facts_q = facts_q.filter(ef::persona_id.eq(pid.clone())); } let recent_facts = facts_q .order(ef::created_at.desc()) .limit(limit) .load::(conn.deref_mut()) .map_err(|err| anyhow::anyhow!("Query error: {}", err))?; Ok(RecentActivity { entities: recent_entities, facts: recent_facts, }) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } } #[cfg(test)] mod tests { //! Persona scoping + composite-FK invariants for entity_facts. //! //! These tests pin three contracts that are silently regressable: //! //! 1. PersonaFilter::Single isolates per (user_id, persona_id). Two //! users with the same 'default' persona must not see each //! other's facts (multi-user leakage was a latent bug before //! migration 2026-05-10 added user_id + composite FK). //! //! 2. PersonaFilter::All scopes to a single user but unions across //! that user's personas. Hive-mind for human browsing of //! /knowledge/*; never crosses users. //! //! 3. Deleting a persona CASCADEs to the user's facts under that //! persona — and ONLY that user's, ONLY that persona's. Other //! users sharing the persona_id name keep their facts. //! //! FKs aren't enabled by default on Diesel's SQLite connection; //! `connection_with_fks_on()` flips the pragma so the cascade //! actually fires in tests (mirroring runtime in production). use super::*; use crate::database::models::{InsertEntity, InsertEntityFact, InsertPersona}; use crate::database::test::in_memory_db_connection; use diesel::connection::SimpleConnection; fn connection_with_fks_on() -> Arc> { let mut conn = in_memory_db_connection(); conn.batch_execute("PRAGMA foreign_keys = ON;") .expect("enable foreign_keys pragma"); Arc::new(Mutex::new(conn)) } fn create_user(conn: &Arc>, username: &str) -> i32 { use crate::database::schema::users::dsl as u; let mut c = conn.lock().unwrap(); diesel::insert_into(u::users) .values((u::username.eq(username), u::password.eq("x"))) .execute(c.deref_mut()) .unwrap(); u::users .filter(u::username.eq(username)) .select(u::id) .first(c.deref_mut()) .unwrap() } fn create_persona_row(conn: &Arc>, uid: i32, pid: &str) { use crate::database::schema::personas::dsl as p; let mut c = conn.lock().unwrap(); diesel::insert_into(p::personas) .values(InsertPersona { user_id: uid, persona_id: pid, name: pid, system_prompt: "test prompt", is_built_in: false, include_all_memories: false, created_at: 0, updated_at: 0, }) .execute(c.deref_mut()) .unwrap(); } fn make_entity(dao: &mut SqliteKnowledgeDao, name: &str) -> Entity { let cx = opentelemetry::Context::new(); dao.upsert_entity( &cx, InsertEntity { name: name.to_string(), entity_type: "person".to_string(), description: String::new(), embedding: None, confidence: 0.6, status: "active".to_string(), created_at: 0, updated_at: 0, }, ) .unwrap() } fn add_fact( dao: &mut SqliteKnowledgeDao, subject: i32, predicate: &str, value: &str, user_id: i32, persona_id: &str, ) -> EntityFact { let cx = opentelemetry::Context::new(); let (fact, _) = dao .upsert_fact( &cx, InsertEntityFact { subject_entity_id: subject, predicate: predicate.to_string(), object_entity_id: None, object_value: Some(value.to_string()), source_photo: None, source_insight_id: None, confidence: 0.6, status: "active".to_string(), created_at: 0, persona_id: persona_id.to_string(), user_id, valid_from: None, valid_until: None, superseded_by: None, }, ) .unwrap(); fact } #[test] fn persona_filter_single_isolates_per_user() { // Two users, same persona name. Each user's facts under that // persona must NOT surface to the other user's reads — this is // the multi-user leakage that motivated adding user_id. let cx = opentelemetry::Context::new(); let conn = connection_with_fks_on(); let alice = create_user(&conn, "alice"); let bob = create_user(&conn, "bob"); create_persona_row(&conn, alice, "default"); create_persona_row(&conn, bob, "default"); let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); let entity = make_entity(&mut dao, "Cabin"); add_fact(&mut dao, entity.id, "located_in", "Vermont", alice, "default"); add_fact(&mut dao, entity.id, "color", "red", bob, "default"); let alice_view = dao .get_facts_for_entity( &cx, entity.id, &PersonaFilter::Single { user_id: alice, persona_id: "default".to_string(), }, ) .unwrap(); assert_eq!(alice_view.len(), 1); assert_eq!(alice_view[0].predicate, "located_in"); let bob_view = dao .get_facts_for_entity( &cx, entity.id, &PersonaFilter::Single { user_id: bob, persona_id: "default".to_string(), }, ) .unwrap(); assert_eq!(bob_view.len(), 1); assert_eq!(bob_view[0].predicate, "color"); } #[test] fn persona_filter_all_unions_across_personas_one_user() { // include_all_memories=true → All variant: see this user's // facts across all their personas. Must NOT include other // users' facts even when they share a persona name. let cx = opentelemetry::Context::new(); let conn = connection_with_fks_on(); let alice = create_user(&conn, "alice"); let bob = create_user(&conn, "bob"); create_persona_row(&conn, alice, "default"); create_persona_row(&conn, alice, "journal"); create_persona_row(&conn, bob, "default"); let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); let entity = make_entity(&mut dao, "Cabin"); add_fact(&mut dao, entity.id, "p1", "v1", alice, "default"); add_fact(&mut dao, entity.id, "p2", "v2", alice, "journal"); add_fact(&mut dao, entity.id, "p3", "v3", bob, "default"); let alice_all = dao .get_facts_for_entity(&cx, entity.id, &PersonaFilter::All { user_id: alice }) .unwrap(); let predicates: Vec<&str> = alice_all.iter().map(|f| f.predicate.as_str()).collect(); assert_eq!(predicates.len(), 2); assert!(predicates.contains(&"p1")); assert!(predicates.contains(&"p2")); assert!( !predicates.contains(&"p3"), "All variant must not leak across users" ); } #[test] fn upsert_fact_dedup_does_not_cross_users() { // Two users insert the SAME claim (same subject + predicate + // object_value) under the same persona name. Pre-fix, the // dedup key was (subject, predicate, persona_id) and bob's // insert would corroborate alice's row instead of creating a // new one. Post-fix the key includes user_id, so each user // gets their own row at confidence=0.6. let conn = connection_with_fks_on(); let alice = create_user(&conn, "alice"); let bob = create_user(&conn, "bob"); create_persona_row(&conn, alice, "default"); create_persona_row(&conn, bob, "default"); let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); let entity = make_entity(&mut dao, "Cabin"); let alice_fact = add_fact(&mut dao, entity.id, "color", "red", alice, "default"); let bob_fact = add_fact(&mut dao, entity.id, "color", "red", bob, "default"); assert_ne!(alice_fact.id, bob_fact.id, "must be separate rows"); assert_eq!(alice_fact.confidence, 0.6); assert_eq!( bob_fact.confidence, 0.6, "bob's row should not have been corroboration-bumped against alice's" ); } #[test] fn deleting_persona_cascades_only_that_users_facts() { // Composite FK + CASCADE: deleting alice's 'journal' persona // wipes alice's journal facts but leaves alice's default // facts AND bob's journal-named facts untouched. let cx = opentelemetry::Context::new(); let conn = connection_with_fks_on(); let alice = create_user(&conn, "alice"); let bob = create_user(&conn, "bob"); create_persona_row(&conn, alice, "default"); create_persona_row(&conn, alice, "journal"); create_persona_row(&conn, bob, "journal"); let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); let entity = make_entity(&mut dao, "Cabin"); add_fact(&mut dao, entity.id, "p_alice_default", "x", alice, "default"); add_fact(&mut dao, entity.id, "p_alice_journal", "y", alice, "journal"); add_fact(&mut dao, entity.id, "p_bob_journal", "z", bob, "journal"); // Delete alice's journal persona — CASCADE should remove only // alice's journal facts. { use crate::database::schema::personas::dsl as p; let mut c = conn.lock().unwrap(); diesel::delete( p::personas .filter(p::user_id.eq(alice)) .filter(p::persona_id.eq("journal")), ) .execute(c.deref_mut()) .unwrap(); } // alice/default survives. let alice_default = dao .get_facts_for_entity( &cx, entity.id, &PersonaFilter::Single { user_id: alice, persona_id: "default".to_string(), }, ) .unwrap(); assert_eq!(alice_default.len(), 1); assert_eq!(alice_default[0].predicate, "p_alice_default"); // alice/journal is gone. let alice_journal = dao .get_facts_for_entity( &cx, entity.id, &PersonaFilter::Single { user_id: alice, persona_id: "journal".to_string(), }, ) .unwrap(); assert!( alice_journal.is_empty(), "CASCADE should have removed alice's journal facts" ); // bob/journal — same persona name, different user — untouched. let bob_journal = dao .get_facts_for_entity( &cx, entity.id, &PersonaFilter::Single { user_id: bob, persona_id: "journal".to_string(), }, ) .unwrap(); assert_eq!(bob_journal.len(), 1); assert_eq!(bob_journal[0].predicate, "p_bob_journal"); } #[test] fn fact_insert_with_unknown_persona_is_rejected() { // FK enforcement: inserting a fact whose (user_id, persona_id) // pair has no matching personas row should fail. Protects // against typo'd persona ids silently leaking into the table. let cx = opentelemetry::Context::new(); let conn = connection_with_fks_on(); let alice = create_user(&conn, "alice"); // Note: NO persona row inserted for alice + 'ghost'. let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); let entity = make_entity(&mut dao, "Cabin"); let result = dao.upsert_fact( &cx, InsertEntityFact { subject_entity_id: entity.id, predicate: "color".to_string(), object_entity_id: None, object_value: Some("red".to_string()), source_photo: None, source_insight_id: None, confidence: 0.6, status: "active".to_string(), created_at: 0, persona_id: "ghost".to_string(), user_id: alice, valid_from: None, valid_until: None, superseded_by: None, }, ); assert!( result.is_err(), "FK should reject fact whose persona doesn't exist" ); } #[test] fn supersede_fact_links_and_stamps_valid_until() { // Supersession: marking an old fact as replaced by a new one // flips its status to 'superseded', points superseded_by at // the new fact, and stamps valid_until from the new fact's // valid_from (when not already set). Pre-existing valid_until // on the old fact is respected. let cx = opentelemetry::Context::new(); let conn = connection_with_fks_on(); let alice = create_user(&conn, "alice"); create_persona_row(&conn, alice, "default"); let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); let cameron = make_entity(&mut dao, "Cameron"); let old = add_fact( &mut dao, cameron.id, "is_in_relationship_with", "X", alice, "default", ); // The new fact carries a valid_from we expect to be stamped // onto the old fact's valid_until. let new = add_fact( &mut dao, cameron.id, "is_in_relationship_with", "Y", alice, "default", ); dao.update_fact( &cx, new.id, FactPatch { predicate: None, object_value: None, status: None, confidence: None, valid_from: Some(Some(1640995200)), // 2022-01-01 valid_until: None, }, ) .unwrap(); let updated = dao .supersede_fact(&cx, old.id, new.id) .unwrap() .expect("supersede returned None"); assert_eq!(updated.status, "superseded"); assert_eq!(updated.superseded_by, Some(new.id)); assert_eq!(updated.valid_until, Some(1640995200)); } #[test] fn delete_fact_clears_dangling_supersession_pointers() { // Deleting the newer fact (the supersedeR) leaves the older // fact's superseded_by dangling — the DAO clears it back to // NULL in the same transaction so the column never points at // a missing row. The old fact's status stays 'superseded' // because the historical correction is still meaningful. let cx = opentelemetry::Context::new(); let conn = connection_with_fks_on(); let alice = create_user(&conn, "alice"); create_persona_row(&conn, alice, "default"); let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); let cameron = make_entity(&mut dao, "Cameron"); let old = add_fact(&mut dao, cameron.id, "lives_in", "NYC", alice, "default"); let new = add_fact(&mut dao, cameron.id, "lives_in", "SF", alice, "default"); dao.supersede_fact(&cx, old.id, new.id).unwrap().unwrap(); dao.delete_fact(&cx, new.id).unwrap(); let rehydrated = dao .list_facts( &cx, FactFilter { entity_id: Some(cameron.id), // "all" — the old fact is 'superseded' now, so the // default 'active' scope would skip it. status: Some("all".to_string()), predicate: None, persona: PersonaFilter::Single { user_id: alice, persona_id: "default".to_string(), }, limit: 10, offset: 0, }, ) .unwrap() .0; let old_row = rehydrated.iter().find(|f| f.id == old.id).unwrap(); assert_eq!( old_row.superseded_by, None, "dangling supersession pointer should be cleared" ); assert_eq!( old_row.status, "superseded", "historical status should survive the supersederr delete" ); } #[test] fn update_fact_can_set_and_clear_valid_time() { // FactPatch.valid_from / valid_until are Option> // so PATCH can distinguish "leave alone" (None) from "set to // value" (Some(Some(n))) and "clear back to NULL" (Some(None)). let cx = opentelemetry::Context::new(); let conn = connection_with_fks_on(); let alice = create_user(&conn, "alice"); create_persona_row(&conn, alice, "default"); let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); let cameron = make_entity(&mut dao, "Cameron"); let fact = add_fact( &mut dao, cameron.id, "is_in_relationship_with", "Alex", alice, "default", ); assert_eq!(fact.valid_from, None); assert_eq!(fact.valid_until, None); // Set both bounds. let updated = dao .update_fact( &cx, fact.id, FactPatch { predicate: None, object_value: None, status: None, confidence: None, valid_from: Some(Some(1577836800)), // 2020-01-01 valid_until: Some(Some(1640995200)), // 2022-01-01 }, ) .unwrap() .unwrap(); assert_eq!(updated.valid_from, Some(1577836800)); assert_eq!(updated.valid_until, Some(1640995200)); // Leave alone: omit both — values persist. let still = dao .update_fact( &cx, fact.id, FactPatch { predicate: None, object_value: None, status: None, confidence: None, valid_from: None, valid_until: None, }, ) .unwrap() .unwrap(); assert_eq!(still.valid_from, Some(1577836800)); assert_eq!(still.valid_until, Some(1640995200)); // Clear valid_until back to NULL (relationship ongoing again). let cleared = dao .update_fact( &cx, fact.id, FactPatch { predicate: None, object_value: None, status: None, confidence: None, valid_from: None, valid_until: Some(None), }, ) .unwrap() .unwrap(); assert_eq!(cleared.valid_from, Some(1577836800)); assert_eq!(cleared.valid_until, None); } #[test] fn delete_entity_clears_relational_facts_that_would_violate_check() { // entity_facts has a CHECK that at least one of object_entity_id / // object_value is non-null. The FK on object_entity_id is // ON DELETE SET NULL, which would leave purely-relational facts // (subject + predicate + object_entity_id, no object_value) // with both nulls and abort the delete. The DAO pre-deletes // those rows in a transaction so the parent delete can succeed. let cx = opentelemetry::Context::new(); let conn = connection_with_fks_on(); let alice = create_user(&conn, "alice"); create_persona_row(&conn, alice, "default"); let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); let bob = make_entity(&mut dao, "Bob"); let carol = make_entity(&mut dao, "Carol"); // A relational fact where Carol is the object — exactly the // shape the CHECK + SET NULL combination would otherwise break. let (rel_fact, _) = dao .upsert_fact( &cx, InsertEntityFact { subject_entity_id: bob.id, predicate: "is_friend_of".to_string(), object_entity_id: Some(carol.id), object_value: None, source_photo: None, source_insight_id: None, confidence: 0.6, status: "active".to_string(), created_at: 0, persona_id: "default".to_string(), user_id: alice, valid_from: None, valid_until: None, superseded_by: None, }, ) .unwrap(); // A typed fact where Bob is the subject — should survive. add_fact(&mut dao, bob.id, "has_age", "30", alice, "default"); // Delete Carol — should succeed (relational fact pre-deleted). dao.delete_entity(&cx, carol.id).unwrap(); assert!( dao.get_entity_by_id(&cx, carol.id).unwrap().is_none(), "Carol should be deleted" ); // The relational fact about Carol should be gone (pre-deleted by // the DAO's transaction, not SET NULL'd). let bob_facts = dao .get_facts_for_entity( &cx, bob.id, &PersonaFilter::Single { user_id: alice, persona_id: "default".to_string(), }, ) .unwrap(); assert!( !bob_facts.iter().any(|f| f.id == rel_fact.id), "relational fact pointing at Carol should be removed" ); // The typed fact survives. assert!( bob_facts.iter().any(|f| f.predicate == "has_age"), "typed fact about Bob should survive Carol's deletion" ); } #[test] fn upsert_entity_collapses_near_duplicate_by_embedding() { // The agent's pre-flight check uses FTS5 prefix tokens, which // miss "Sarah" / "Sara" / "Sarah J." pairs. The DAO upsert is // the safety net: if no exact (name, type) match but the new // entity's embedding sits above the cosine threshold against an // existing same-type entity, we collapse instead of inserting. let cx = opentelemetry::Context::new(); let conn = connection_with_fks_on(); let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); let mut emb_a = vec![0.0_f32; 64]; emb_a[0] = 1.0; emb_a[1] = 0.5; let mut emb_b_near = emb_a.clone(); emb_b_near[2] = 0.05; // nudge — cosine still well above 0.92 // Seed an existing entity with the embedding. let seeded = dao .upsert_entity( &cx, InsertEntity { name: "Sarah".to_string(), entity_type: "person".to_string(), description: "tagged friend".to_string(), embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_a)), confidence: 0.6, status: "active".to_string(), created_at: 0, updated_at: 0, }, ) .unwrap(); // A "different name" with a near-identical embedding should // collapse onto the existing row, not create a new entity. let collapsed = dao .upsert_entity( &cx, InsertEntity { name: "Sara".to_string(), entity_type: "person".to_string(), description: "tagged friend".to_string(), embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_b_near)), confidence: 0.6, status: "active".to_string(), created_at: 0, updated_at: 0, }, ) .unwrap(); assert_eq!( collapsed.id, seeded.id, "near-duplicate by cosine should reuse the existing entity id" ); // And a clearly-different embedding under a different name should // still create a new row. let mut emb_unrelated = vec![0.0_f32; 64]; emb_unrelated[10] = 1.0; let distinct = dao .upsert_entity( &cx, InsertEntity { name: "Bob".to_string(), entity_type: "person".to_string(), description: String::new(), embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_unrelated)), confidence: 0.6, status: "active".to_string(), created_at: 0, updated_at: 0, }, ) .unwrap(); assert_ne!( distinct.id, seeded.id, "unrelated embedding should not collapse" ); } }