Files
ImageApi/src/database/knowledge_dao.rs
Cameron Cordes e67e00ef8a knowledge: predicate-quality nudge + bulk-reject endpoint
Two coupled changes to fight the speech-act-predicate problem
(facts like (Cameron, expressed, "I'm tempted to...")):

1. System prompt grows an explicit predicate-quality rule. The
   agent is told to use relationship-shaped verbs (lives_in,
   works_at, attended, is_friend_of, interested_in), and is
   given an explicit DON'T list (expressed, said, mentioned,
   stated, quoted, noted, discussed, thought, wondered). Plus a
   concrete Bad / Good example contrasting the noise pattern
   with the structured paraphrase the agent should be writing.
   Stops the bleed for new insights.

2. Cleanup tools for the legacy noise that's already in the
   table:
   - get_predicate_stats(persona, limit) returns
     [(predicate, count)] sorted desc — feeds the curation UI's
     PREDICATES tab.
   - bulk_reject_facts_by_predicate(persona, predicate, audit)
     flips every ACTIVE fact under that predicate to 'rejected'
     in one transaction, stamping last_modified_* so the action
     is attributable + reversible per-fact through the entity
     detail panel. REVIEWED facts under the same predicate are
     left alone — the curator may have hand-approved an
     exception ("interested_in" might be largely noise but a
     reviewed entry is intentional).

New HTTP endpoints:
   GET  /knowledge/predicate-stats?limit=
   POST /knowledge/predicates/{predicate}/bulk-reject

Persona-scoped via the existing X-Persona-Id header.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:50:26 -04:00

2841 lines
109 KiB
Rust

#![allow(dead_code)]
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use crate::database::models::{
Entity, EntityFact, EntityPhotoLink, InsertEntity, InsertEntityFact, InsertEntityPhotoLink,
};
use crate::database::schema;
use crate::database::{DbError, DbErrorKind, connect};
use crate::otel::trace_db_call;
// ---------------------------------------------------------------------------
// Entity type normalisation
// ---------------------------------------------------------------------------
/// Canonicalise a model-supplied entity_type to a consistent lowercase form.
/// Weak models frequently vary capitalisation ("Person" vs "person") or use
/// synonym types ("location" vs "place"). Normalising here prevents duplicate
/// entities that differ only by type spelling.
pub(crate) fn normalize_entity_type(raw: &str) -> String {
match raw.to_lowercase().as_str() {
"person" | "people" | "human" | "individual" | "contact" => "person",
"place" | "location" | "venue" | "site" | "area" | "landmark" => "place",
"event" | "occasion" | "activity" | "celebration" => "event",
"thing" | "object" | "item" | "product" => "thing",
other => other,
}
.to_string()
}
// ---------------------------------------------------------------------------
// Filter / patch types
// ---------------------------------------------------------------------------
pub struct EntityFilter {
pub entity_type: Option<String>,
/// "active" | "reviewed" | "rejected" | "all"
pub status: Option<String>,
/// LIKE match on name and description
pub search: Option<String>,
pub limit: i64,
pub offset: i64,
}
/// Sort key for the curation list. Name = alphabetical clustering
/// (good for spotting near-duplicates like Sara / Sarah / Sarah J.).
/// FactCount = surface heavily-used entities first, demote 0-fact
/// noise to the bottom. UpdatedDesc = legacy "newest activity first".
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EntitySort {
UpdatedDesc,
NameAsc,
FactCountDesc,
}
pub struct FactFilter {
pub entity_id: Option<i32>,
/// "active" | "reviewed" | "rejected" | "all"
pub status: Option<String>,
pub predicate: Option<String>,
pub persona: PersonaFilter,
pub limit: i64,
pub offset: i64,
}
/// Persona scoping for fact reads. `Single` filters to one persona's
/// view; `All` is the hive-mind read used when a persona has
/// `include_all_memories=true` in the personas table. Both variants
/// carry `user_id` because facts are user-isolated — two users with
/// the same 'default' persona must not see each other's facts (this
/// is enforced at the schema level by the composite FK in migration
/// 2026-05-10). Entities and photo-links are always shared and don't
/// take a persona filter.
#[derive(Debug, Clone)]
pub enum PersonaFilter {
Single { user_id: i32, persona_id: String },
All { user_id: i32 },
}
impl PersonaFilter {
pub fn user_id(&self) -> i32 {
match self {
Self::Single { user_id, .. } => *user_id,
Self::All { user_id } => *user_id,
}
}
}
pub struct EntityPatch {
pub name: Option<String>,
pub description: Option<String>,
pub status: Option<String>,
pub confidence: Option<f32>,
}
pub struct FactPatch {
pub predicate: Option<String>,
pub object_value: Option<String>,
pub status: Option<String>,
pub confidence: Option<f32>,
/// Real-world valid-time bounds. Outer Some = "patch this column";
/// inner Some(val) = set to that unix-seconds value; inner None =
/// clear back to NULL ("unbounded"). The double-Option lets the
/// HTTP layer distinguish "field omitted" (leave alone) from
/// "field sent as null" (clear) — needed for these specifically
/// because there's no sentinel string-empty equivalent like the
/// other fields have.
pub valid_from: Option<Option<i64>>,
pub valid_until: Option<Option<i64>>,
}
pub struct RecentActivity {
pub entities: Vec<Entity>,
pub facts: Vec<EntityFact>,
}
/// A near-duplicate cluster found by `find_consolidation_proposals`.
/// `min_cosine` / `max_cosine` are summary stats over the pairwise
/// edges inside the group — gives the curator a sense of "how tight"
/// the cluster is before clicking in.
#[derive(Debug, Clone)]
pub struct ConsolidationGroup {
pub entities: Vec<Entity>,
pub min_cosine: f32,
pub max_cosine: f32,
}
/// Graph view payload: every entity that has at least one fact
/// becomes a node; every relational fact (object_entity_id set)
/// becomes an edge between subject and object. Multiple facts with
/// the same (subject, object, predicate) collapse into one edge
/// with a count so the UI can fan them out as one weighted line.
#[derive(Debug, Clone)]
pub struct GraphNode {
pub id: i32,
pub name: String,
pub entity_type: String,
pub fact_count: i64,
}
#[derive(Debug, Clone)]
pub struct GraphEdge {
pub source: i32,
pub target: i32,
pub predicate: String,
pub count: i64,
}
#[derive(Debug, Clone)]
pub struct EntityGraph {
pub nodes: Vec<GraphNode>,
pub edges: Vec<GraphEdge>,
}
// ---------------------------------------------------------------------------
// Trait
// ---------------------------------------------------------------------------
pub trait KnowledgeDao: Sync + Send {
// --- Entity ---
fn upsert_entity(
&mut self,
cx: &opentelemetry::Context,
entity: InsertEntity,
) -> Result<Entity, DbError>;
fn get_entity_by_id(
&mut self,
cx: &opentelemetry::Context,
id: i32,
) -> Result<Option<Entity>, DbError>;
fn get_entity_by_name(
&mut self,
cx: &opentelemetry::Context,
name: &str,
entity_type: Option<&str>,
) -> Result<Vec<Entity>, DbError>;
fn get_entities_with_embeddings(
&mut self,
cx: &opentelemetry::Context,
entity_type: Option<&str>,
) -> Result<Vec<Entity>, DbError>;
fn list_entities(
&mut self,
cx: &opentelemetry::Context,
filter: EntityFilter,
) -> Result<(Vec<Entity>, i64), DbError>;
/// List entities alongside a persona-scoped fact count for each.
/// Powers the curation surface — sorting by fact count surfaces
/// the heavily-used entities and demotes 0-fact noise. Counting
/// is restricted to non-rejected facts under the active persona
/// scope so a switch in the persona picker re-orders the list.
fn list_entities_with_fact_counts(
&mut self,
cx: &opentelemetry::Context,
filter: EntityFilter,
sort: EntitySort,
persona: &PersonaFilter,
) -> Result<(Vec<(Entity, i64)>, i64), DbError>;
/// Aggregate the user's active+reviewed facts by predicate so
/// the curation UI can flag noisy verbs ("expressed", "said") and
/// bulk-reject. Persona-scoped via the existing PersonaFilter
/// pattern. Sorted by count desc.
fn get_predicate_stats(
&mut self,
cx: &opentelemetry::Context,
persona: &PersonaFilter,
limit: usize,
) -> Result<Vec<(String, i64)>, DbError>;
/// Bulk reject every active fact under a given predicate
/// (persona-scoped). Returns the number of rows touched. Used by
/// the predicate-cleanup UI to nuke noise verbs in one click.
/// Stamps last_modified_* with the caller-supplied audit so the
/// action shows up in the recent-edits feed.
fn bulk_reject_facts_by_predicate(
&mut self,
cx: &opentelemetry::Context,
persona: &PersonaFilter,
predicate: &str,
audit: Option<(&str, &str)>,
) -> Result<usize, DbError>;
/// Build a graph snapshot — entities as nodes (fact count from
/// the active persona scope), relational facts as edges. Used
/// by the curation UI's graph view. Filters:
/// - entity_type: optional, restricts nodes to one type
/// - node_limit: caps the number of nodes; lower-fact-count
/// entities drop first
/// Edges between dropped entities are pruned. Persona scoping
/// affects fact_count + edge inclusion (rejected / superseded
/// excluded; All vs Single mirrors the existing pattern).
fn build_entity_graph(
&mut self,
cx: &opentelemetry::Context,
entity_type: Option<&str>,
node_limit: usize,
persona: &PersonaFilter,
) -> Result<EntityGraph, DbError>;
/// Find groups of near-duplicate entities that the upsert-time
/// cosine guard didn't catch (it runs at ~0.92; this scan runs
/// at a lower threshold to surface the "probably same" tier that
/// needs human review). Groups are formed via union-find over
/// the cosine-adjacency graph, partitioned by entity_type so a
/// person can't cluster with a place. Returns groups of >= 2
/// entities, sorted by size desc then by max pairwise cosine.
/// Trimmed to `max_groups`.
fn find_consolidation_proposals(
&mut self,
cx: &opentelemetry::Context,
threshold: f32,
max_groups: usize,
) -> Result<Vec<ConsolidationGroup>, DbError>;
/// Batch fetch per-persona fact counts for a set of entities,
/// scoped to one user. Returns map of entity_id → list of
/// (persona_id, count). Used by the curation UI to show "this
/// entity has 0 facts in your active persona but 12 in journal"
/// so the curator knows where to find the existing knowledge.
/// Rejected facts excluded; superseded included (they're history,
/// not noise).
fn get_persona_breakdowns_for_entities(
&mut self,
cx: &opentelemetry::Context,
entity_ids: &[i32],
user_id: i32,
) -> Result<std::collections::HashMap<i32, Vec<(String, i64)>>, DbError>;
fn update_entity_status(
&mut self,
cx: &opentelemetry::Context,
id: i32,
status: &str,
) -> Result<(), DbError>;
fn update_entity(
&mut self,
cx: &opentelemetry::Context,
id: i32,
patch: EntityPatch,
) -> Result<Option<Entity>, DbError>;
fn delete_entity(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>;
fn merge_entities(
&mut self,
cx: &opentelemetry::Context,
source_id: i32,
target_id: i32,
) -> Result<(i64, i64), DbError>;
// --- Facts ---
fn upsert_fact(
&mut self,
cx: &opentelemetry::Context,
fact: InsertEntityFact,
) -> Result<(EntityFact, bool), DbError>;
fn get_facts_for_entity(
&mut self,
cx: &opentelemetry::Context,
entity_id: i32,
persona: &PersonaFilter,
) -> Result<Vec<EntityFact>, DbError>;
fn list_facts(
&mut self,
cx: &opentelemetry::Context,
filter: FactFilter,
) -> Result<(Vec<EntityFact>, i64), DbError>;
/// Update a fact. `audit` stamps the row's `last_modified_*`
/// columns — None = legacy internal callers without provenance
/// context; HTTP passes `Some(("manual", "manual"))`; the agent
/// passes its loop-time model + backend so the audit trail can
/// distinguish human edits from agent corrections.
fn update_fact(
&mut self,
cx: &opentelemetry::Context,
id: i32,
patch: FactPatch,
audit: Option<(&str, &str)>,
) -> Result<Option<EntityFact>, DbError>;
fn update_facts_insight_id(
&mut self,
cx: &opentelemetry::Context,
source_photo: &str,
insight_id: i32,
) -> Result<(), DbError>;
fn delete_fact(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>;
/// Mark an old fact as superseded by a new one. Atomically:
/// - reads the new fact's valid_from
/// - sets old.superseded_by = new_id
/// - sets old.status = 'superseded'
/// - stamps old.valid_until = new.valid_from (if not already
/// set; otherwise leaves it)
/// - stamps old.last_modified_* from `audit`
///
/// Returns the updated old fact. Errors if either id is missing.
fn supersede_fact(
&mut self,
cx: &opentelemetry::Context,
old_id: i32,
new_id: i32,
audit: Option<(&str, &str)>,
) -> Result<Option<EntityFact>, DbError>;
/// Undo a supersession: clear `superseded_by`, flip status back to
/// 'active', clear `valid_until` (we don't know if it was auto-
/// stamped by the supersede or hand-set, so the conservative reset
/// is to clear it — user can re-bound after). Stamps the audit
/// columns so the revert is itself attributable.
///
/// Returns the restored fact. Errors if the fact doesn't exist or
/// wasn't superseded in the first place (no-op semantics).
fn revert_supersession(
&mut self,
cx: &opentelemetry::Context,
fact_id: i32,
audit: Option<(&str, &str)>,
) -> Result<Option<EntityFact>, DbError>;
// --- Photo links ---
fn upsert_photo_link(
&mut self,
cx: &opentelemetry::Context,
link: InsertEntityPhotoLink,
) -> Result<(), DbError>;
fn delete_photo_links_for_file(
&mut self,
cx: &opentelemetry::Context,
file_path: &str,
) -> Result<(), DbError>;
fn get_links_for_photo(
&mut self,
cx: &opentelemetry::Context,
file_path: &str,
) -> Result<Vec<EntityPhotoLink>, DbError>;
fn get_links_for_entity(
&mut self,
cx: &opentelemetry::Context,
entity_id: i32,
) -> Result<Vec<EntityPhotoLink>, DbError>;
// --- Audit ---
fn get_recent_activity(
&mut self,
cx: &opentelemetry::Context,
since: i64,
limit: i64,
persona: &PersonaFilter,
) -> Result<RecentActivity, DbError>;
}
// ---------------------------------------------------------------------------
// SQLite implementation
// ---------------------------------------------------------------------------
pub struct SqliteKnowledgeDao {
connection: Arc<Mutex<SqliteConnection>>,
}
impl Default for SqliteKnowledgeDao {
fn default() -> Self {
Self::new()
}
}
impl SqliteKnowledgeDao {
pub fn new() -> Self {
SqliteKnowledgeDao {
connection: Arc::new(Mutex::new(connect())),
}
}
pub fn from_connection(conn: Arc<Mutex<SqliteConnection>>) -> Self {
SqliteKnowledgeDao { connection: conn }
}
fn serialize_embedding(vec: &[f32]) -> Vec<u8> {
vec.iter().flat_map(|f| f.to_le_bytes()).collect()
}
fn deserialize_embedding(bytes: &[u8]) -> Result<Vec<f32>, DbError> {
if !bytes.len().is_multiple_of(4) {
return Err(DbError::new(DbErrorKind::QueryError));
}
Ok(bytes
.chunks_exact(4)
.map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
.collect())
}
pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
if a.len() != b.len() || a.is_empty() {
return 0.0;
}
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let mag_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let mag_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if mag_a == 0.0 || mag_b == 0.0 {
0.0
} else {
dot / (mag_a * mag_b)
}
}
}
/// Cosine-similarity threshold above which a new entity collapses into an
/// existing same-type entity at upsert time. The agent's pre-flight name
/// search uses FTS5 prefix tokens, which misses near-dupes like
/// "Sarah" / "Sara" / "Sarah J." that share a description-rich embedding.
/// Override via `ENTITY_DEDUP_COSINE_THRESHOLD` env var when tuning.
const ENTITY_DEDUP_COSINE_THRESHOLD_DEFAULT: f32 = 0.92;
fn entity_dedup_cosine_threshold() -> f32 {
std::env::var("ENTITY_DEDUP_COSINE_THRESHOLD")
.ok()
.and_then(|v| v.parse::<f32>().ok())
.unwrap_or(ENTITY_DEDUP_COSINE_THRESHOLD_DEFAULT)
}
impl KnowledgeDao for SqliteKnowledgeDao {
// -----------------------------------------------------------------------
// Entity operations
// -----------------------------------------------------------------------
fn upsert_entity(
&mut self,
cx: &opentelemetry::Context,
entity: InsertEntity,
) -> Result<Entity, DbError> {
trace_db_call(cx, "insert", "upsert_entity", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// Normalise type before lookup and insert so that model variations
// ("Person" / "person", "location" / "place") collapse to one row.
let entity = InsertEntity {
entity_type: normalize_entity_type(&entity.entity_type),
..entity
};
// Case-insensitive lookup by name + entity_type.
// Use lower() on both sides so existing dirty rows ("Person") still match.
let name_lower = entity.name.to_lowercase();
let type_lower = entity.entity_type.to_lowercase();
let mut existing: Option<Entity> = entities
.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&format!(
"lower(name) = '{}' AND lower(entity_type) = '{}'",
name_lower.replace('\'', "''"),
type_lower.replace('\'', "''")
)))
.first::<Entity>(conn.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
// Fuzzy-match fallback: if no exact name match and the incoming
// entity carries an embedding, compare against same-type entities'
// embeddings and collapse if any are above the cosine threshold.
if existing.is_none()
&& let Some(new_emb_bytes) = entity.embedding.as_ref()
&& let Ok(new_vec) = Self::deserialize_embedding(new_emb_bytes)
&& !new_vec.is_empty()
{
let threshold = entity_dedup_cosine_threshold();
let candidates: Vec<Entity> = entities
.filter(embedding.is_not_null())
.filter(entity_type.eq(&entity.entity_type))
.filter(status.ne("rejected"))
.load::<Entity>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
let mut best: Option<(Entity, f32)> = None;
for cand in candidates {
let Some(cand_bytes) = cand.embedding.as_ref() else {
continue;
};
let Ok(cand_vec) = Self::deserialize_embedding(cand_bytes) else {
continue;
};
let sim = Self::cosine_similarity(&new_vec, &cand_vec);
if sim >= threshold && best.as_ref().is_none_or(|(_, s)| sim > *s) {
best = Some((cand, sim));
}
}
if let Some((cand, sim)) = best {
log::info!(
"entity dedup: collapsing new '{}' ({}) into existing '{}' (id={}, cos={:.3})",
entity.name,
entity.entity_type,
cand.name,
cand.id,
sim
);
existing = Some(cand);
}
}
if let Some(existing_entity) = existing {
// Update description, embedding, updated_at
diesel::update(entities.filter(id.eq(existing_entity.id)))
.set((
description.eq(&entity.description),
embedding.eq(&entity.embedding),
updated_at.eq(entity.updated_at),
))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
entities
.filter(id.eq(existing_entity.id))
.first::<Entity>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
} else {
diesel::insert_into(entities)
.values(&entity)
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
entities
.order(id.desc())
.first::<Entity>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
}
})
.map_err(|_| DbError::new(DbErrorKind::InsertError))
}
fn get_entity_by_id(
&mut self,
cx: &opentelemetry::Context,
entity_id: i32,
) -> Result<Option<Entity>, DbError> {
trace_db_call(cx, "query", "get_entity_by_id", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
entities
.filter(id.eq(entity_id))
.first::<Entity>(conn.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_entity_by_name(
&mut self,
cx: &opentelemetry::Context,
entity_name: &str,
entity_type_filter: Option<&str>,
) -> Result<Vec<Entity>, DbError> {
trace_db_call(cx, "query", "get_entity_by_name", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let name_lower = entity_name.to_lowercase().replace('\'', "''");
let mut sql = format!("lower(name) = '{}'", name_lower);
if let Some(et) = entity_type_filter {
sql.push_str(&format!(" AND entity_type = '{}'", et.replace('\'', "''")));
}
sql.push_str(" AND status != 'rejected'");
entities
.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&sql))
.load::<Entity>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_entities_with_embeddings(
&mut self,
cx: &opentelemetry::Context,
entity_type_filter: Option<&str>,
) -> Result<Vec<Entity>, DbError> {
trace_db_call(cx, "query", "get_entities_with_embeddings", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let mut query = entities
.filter(embedding.is_not_null())
.filter(status.ne("rejected"))
.into_boxed();
if let Some(et) = entity_type_filter {
query = query.filter(entity_type.eq(et));
}
query
.load::<Entity>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn list_entities(
&mut self,
cx: &opentelemetry::Context,
filter: EntityFilter,
) -> Result<(Vec<Entity>, i64), DbError> {
trace_db_call(cx, "query", "list_entities", |_span| {
use diesel::dsl::count_star;
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let mut query = entities.into_boxed();
if let Some(ref et) = filter.entity_type {
query = query.filter(entity_type.eq(et));
}
let status_val = filter.status.as_deref().unwrap_or("active");
if status_val != "all" {
query = query.filter(status.eq(status_val));
}
if let Some(ref search_term) = filter.search {
let pattern = format!("%{}%", search_term);
query = query.filter(name.like(pattern.clone()).or(description.like(pattern)));
}
// Count with same filters applied (build separately since boxed query is consumed)
let mut count_query = entities.into_boxed();
if let Some(ref et) = filter.entity_type {
count_query = count_query.filter(entity_type.eq(et));
}
let status_val2 = filter.status.as_deref().unwrap_or("active");
if status_val2 != "all" {
count_query = count_query.filter(status.eq(status_val2));
}
if let Some(ref search_term) = filter.search {
let pattern = format!("%{}%", search_term);
count_query =
count_query.filter(name.like(pattern.clone()).or(description.like(pattern)));
}
let total: i64 = count_query
.select(count_star())
.first(conn.deref_mut())
.unwrap_or(0);
let results = query
.order(updated_at.desc())
.limit(filter.limit)
.offset(filter.offset)
.load::<Entity>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
Ok((results, total))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn list_entities_with_fact_counts(
&mut self,
cx: &opentelemetry::Context,
filter: EntityFilter,
sort: EntitySort,
persona: &PersonaFilter,
) -> Result<(Vec<(Entity, i64)>, i64), DbError> {
trace_db_call(cx, "query", "list_entities_with_fact_counts", |_span| {
use diesel::sql_query;
use diesel::sql_types::{BigInt, Integer, Text};
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// Build WHERE fragments. Inline-safe values are bound; status
// / sort keywords are validated against fixed sets.
let mut where_parts: Vec<String> = Vec::new();
let mut bind_types: Vec<&'static str> = Vec::new();
let mut bind_strs: Vec<String> = Vec::new();
if filter.entity_type.is_some() {
where_parts.push("e.entity_type = ?".to_string());
bind_types.push("text");
bind_strs.push(filter.entity_type.clone().unwrap());
}
let status_val = filter.status.as_deref().unwrap_or("active");
if status_val != "all" {
where_parts.push("e.status = ?".to_string());
bind_types.push("text");
bind_strs.push(status_val.to_string());
}
if let Some(ref s) = filter.search {
where_parts.push("(e.name LIKE ? OR e.description LIKE ?)".to_string());
bind_types.push("text");
bind_types.push("text");
let pat = format!("%{}%", s);
bind_strs.push(pat.clone());
bind_strs.push(pat);
}
let where_clause = if where_parts.is_empty() {
String::new()
} else {
format!("WHERE {}", where_parts.join(" AND "))
};
// Persona-scoped fact-count subquery. Single = filter on
// (user_id, persona_id); All = union across the user's
// personas (mirror PersonaFilter::All read semantics).
let fact_count_join = match persona {
PersonaFilter::Single {
user_id: _,
persona_id: _,
} => {
"LEFT JOIN (\
SELECT subject_entity_id, COUNT(*) AS fact_count \
FROM entity_facts \
WHERE user_id = ? AND persona_id = ? AND status != 'rejected' \
GROUP BY subject_entity_id\
) fc ON fc.subject_entity_id = e.id"
}
PersonaFilter::All { user_id: _ } => {
"LEFT JOIN (\
SELECT subject_entity_id, COUNT(*) AS fact_count \
FROM entity_facts \
WHERE user_id = ? AND status != 'rejected' \
GROUP BY subject_entity_id\
) fc ON fc.subject_entity_id = e.id"
}
};
let order_by = match sort {
EntitySort::UpdatedDesc => "e.updated_at DESC",
EntitySort::NameAsc => "lower(e.name) ASC",
EntitySort::FactCountDesc => "COALESCE(fc.fact_count, 0) DESC, lower(e.name) ASC",
};
let select_sql = format!(
"SELECT e.id, e.name, e.entity_type, e.description, e.embedding, \
e.confidence, e.status, e.created_at, e.updated_at, \
COALESCE(fc.fact_count, 0) AS fact_count \
FROM entities e \
{fact_count_join} \
{where_clause} \
ORDER BY {order_by} \
LIMIT ? OFFSET ?"
);
let count_sql = format!("SELECT COUNT(*) AS total FROM entities e {where_clause}");
// ── Total count ─────────────────────────────────────────
#[derive(diesel::QueryableByName)]
struct TotalRow {
#[diesel(sql_type = BigInt)]
total: i64,
}
let mut count_q = sql_query(count_sql).into_boxed();
for s in &bind_strs {
count_q = count_q.bind::<Text, _>(s.clone());
}
let total: i64 = count_q
.get_result::<TotalRow>(conn.deref_mut())
.map(|r| r.total)
.unwrap_or(0);
// ── Page query ──────────────────────────────────────────
#[derive(diesel::QueryableByName)]
struct EntityWithCountRow {
#[diesel(sql_type = Integer)]
id: i32,
#[diesel(sql_type = Text)]
name: String,
#[diesel(sql_type = Text)]
entity_type: String,
#[diesel(sql_type = Text)]
description: String,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)]
embedding: Option<Vec<u8>>,
#[diesel(sql_type = diesel::sql_types::Float)]
confidence: f32,
#[diesel(sql_type = Text)]
status: String,
#[diesel(sql_type = BigInt)]
created_at: i64,
#[diesel(sql_type = BigInt)]
updated_at: i64,
#[diesel(sql_type = BigInt)]
fact_count: i64,
}
let mut q = sql_query(select_sql).into_boxed();
// Persona binds first (they're earlier in the SQL — inside
// the subquery LEFT JOIN).
match persona {
PersonaFilter::Single {
user_id,
persona_id,
} => {
q = q
.bind::<Integer, _>(*user_id)
.bind::<Text, _>(persona_id.clone());
}
PersonaFilter::All { user_id } => {
q = q.bind::<Integer, _>(*user_id);
}
}
// Then WHERE binds in order.
for s in &bind_strs {
q = q.bind::<Text, _>(s.clone());
}
// Then LIMIT / OFFSET.
q = q
.bind::<BigInt, _>(filter.limit)
.bind::<BigInt, _>(filter.offset);
let rows: Vec<EntityWithCountRow> = q
.load(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
let pairs: Vec<(Entity, i64)> = rows
.into_iter()
.map(|r| {
(
Entity {
id: r.id,
name: r.name,
entity_type: r.entity_type,
description: r.description,
embedding: r.embedding,
confidence: r.confidence,
status: r.status,
created_at: r.created_at,
updated_at: r.updated_at,
},
r.fact_count,
)
})
.collect();
// Sink unused `_bind_types`; keeping it as documentation.
let _ = bind_types;
Ok((pairs, total))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_predicate_stats(
&mut self,
cx: &opentelemetry::Context,
persona: &PersonaFilter,
limit: usize,
) -> Result<Vec<(String, i64)>, DbError> {
trace_db_call(cx, "query", "get_predicate_stats", |_span| {
use diesel::sql_query;
use diesel::sql_types::{BigInt, Integer, Text};
// Active + reviewed only — rejected / superseded are
// already off the agent's read path so they shouldn't
// count toward "what predicates are noisy in production".
let where_sql = match persona {
PersonaFilter::Single { .. } => {
"WHERE user_id = ? AND persona_id = ? \
AND status IN ('active','reviewed')"
}
PersonaFilter::All { .. } => {
"WHERE user_id = ? AND status IN ('active','reviewed')"
}
};
let sql = format!(
"SELECT predicate, COUNT(*) AS cnt FROM entity_facts \
{where_sql} \
GROUP BY predicate \
ORDER BY cnt DESC \
LIMIT ?",
);
#[derive(diesel::QueryableByName)]
struct Row {
#[diesel(sql_type = Text)]
predicate: String,
#[diesel(sql_type = BigInt)]
cnt: i64,
}
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let mut q = sql_query(sql).into_boxed();
match persona {
PersonaFilter::Single { user_id, persona_id } => {
q = q
.bind::<Integer, _>(*user_id)
.bind::<Text, _>(persona_id.clone());
}
PersonaFilter::All { user_id } => {
q = q.bind::<Integer, _>(*user_id);
}
}
q = q.bind::<BigInt, _>(limit as i64);
let rows: Vec<Row> = q
.load(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
Ok(rows.into_iter().map(|r| (r.predicate, r.cnt)).collect())
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn bulk_reject_facts_by_predicate(
&mut self,
cx: &opentelemetry::Context,
persona: &PersonaFilter,
target_predicate: &str,
audit: Option<(&str, &str)>,
) -> Result<usize, DbError> {
trace_db_call(cx, "update", "bulk_reject_facts_by_predicate", |_span| {
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let now = chrono::Utc::now().timestamp();
let (audit_model, audit_backend) = match audit {
Some((m, b)) => (Some(m.to_string()), Some(b.to_string())),
None => (None, None),
};
// Persona scoping mirrors get_predicate_stats. Only ACTIVE
// rows flip — REVIEWED survives so the curator can preserve
// a hand-approved exception under the same predicate.
let touched = match persona {
PersonaFilter::Single { user_id: uid, persona_id: pid } => diesel::update(
entity_facts
.filter(predicate.eq(target_predicate))
.filter(user_id.eq(*uid))
.filter(persona_id.eq(pid))
.filter(status.eq("active")),
)
.set((
status.eq("rejected"),
last_modified_by_model.eq(audit_model.clone()),
last_modified_by_backend.eq(audit_backend.clone()),
last_modified_at.eq(Some(now)),
))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Bulk reject error: {}", e))?,
PersonaFilter::All { user_id: uid } => diesel::update(
entity_facts
.filter(predicate.eq(target_predicate))
.filter(user_id.eq(*uid))
.filter(status.eq("active")),
)
.set((
status.eq("rejected"),
last_modified_by_model.eq(audit_model.clone()),
last_modified_by_backend.eq(audit_backend.clone()),
last_modified_at.eq(Some(now)),
))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Bulk reject error: {}", e))?,
};
Ok(touched)
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn build_entity_graph(
&mut self,
cx: &opentelemetry::Context,
entity_type_filter: Option<&str>,
node_limit: usize,
persona: &PersonaFilter,
) -> Result<EntityGraph, DbError> {
trace_db_call(cx, "query", "build_entity_graph", |_span| {
use diesel::sql_query;
use diesel::sql_types::{BigInt, Integer, Text};
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// ── Nodes: entities with non-rejected facts under the
// active scope, plus their fact count. Cap to node_limit
// by count desc so the graph stays drawable; lower-count
// entities drop. Excludes 'rejected' entity rows too.
let (persona_filter_sql, persona_binds_count) = match persona {
PersonaFilter::Single { .. } => (
"AND ef.user_id = ? AND ef.persona_id = ? AND ef.status NOT IN ('rejected','superseded')",
2,
),
PersonaFilter::All { .. } => (
"AND ef.user_id = ? AND ef.status NOT IN ('rejected','superseded')",
1,
),
};
let mut where_parts: Vec<&str> = vec!["e.status != 'rejected'"];
if entity_type_filter.is_some() {
where_parts.push("e.entity_type = ?");
}
let where_clause = format!("WHERE {}", where_parts.join(" AND "));
// SQL: join entities to their (persona-scoped) fact count,
// sort by count desc, limit. Including entities with 0
// facts would clutter the view — skip them via INNER JOIN
// (subquery on entity_facts) so only entities with at
// least one in-scope fact show up.
let node_sql = format!(
"SELECT e.id, e.name, e.entity_type, fc.fact_count \
FROM entities e \
INNER JOIN ( \
SELECT subject_entity_id AS sid, COUNT(*) AS fact_count \
FROM entity_facts ef \
WHERE 1=1 {persona_filter_sql} \
GROUP BY subject_entity_id \
) fc ON fc.sid = e.id \
{where_clause} \
ORDER BY fc.fact_count DESC, e.id ASC \
LIMIT ?",
);
#[derive(diesel::QueryableByName)]
struct NodeRow {
#[diesel(sql_type = Integer)]
id: i32,
#[diesel(sql_type = Text)]
name: String,
#[diesel(sql_type = Text)]
entity_type: String,
#[diesel(sql_type = BigInt)]
fact_count: i64,
}
let mut nq = sql_query(node_sql).into_boxed();
// Persona binds (inside the subquery — earlier in the SQL).
match persona {
PersonaFilter::Single { user_id, persona_id } => {
nq = nq
.bind::<Integer, _>(*user_id)
.bind::<Text, _>(persona_id.clone());
}
PersonaFilter::All { user_id } => {
nq = nq.bind::<Integer, _>(*user_id);
}
}
// Entity-type filter bind, if any.
if let Some(t) = entity_type_filter {
nq = nq.bind::<Text, _>(t.to_string());
}
// LIMIT.
nq = nq.bind::<BigInt, _>(node_limit as i64);
let node_rows: Vec<NodeRow> = nq
.load(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Node query error: {}", e))?;
let _ = persona_binds_count; // documentary
let node_ids: std::collections::HashSet<i32> =
node_rows.iter().map(|r| r.id).collect();
let nodes: Vec<GraphNode> = node_rows
.into_iter()
.map(|r| GraphNode {
id: r.id,
name: r.name,
entity_type: r.entity_type,
fact_count: r.fact_count,
})
.collect();
if nodes.is_empty() {
return Ok(EntityGraph {
nodes,
edges: Vec::new(),
});
}
// ── Edges: relational facts where BOTH subject and
// object survived the node cap. Grouped by (subject,
// object, predicate) so 3 "is_friend_of Bob" facts
// become one edge with count=3.
let id_list: Vec<String> = node_ids.iter().map(|i| i.to_string()).collect();
let in_clause = id_list.join(", ");
// Note: ids are i32, inlined safely; predicates use binds.
let (edge_persona_sql, _) = match persona {
PersonaFilter::Single { .. } => (
"user_id = ? AND persona_id = ? AND status NOT IN ('rejected','superseded')",
2,
),
PersonaFilter::All { .. } => (
"user_id = ? AND status NOT IN ('rejected','superseded')",
1,
),
};
let edge_sql = format!(
"SELECT subject_entity_id, object_entity_id, predicate, COUNT(*) AS cnt \
FROM entity_facts \
WHERE {edge_persona_sql} \
AND object_entity_id IS NOT NULL \
AND subject_entity_id IN ({in_clause}) \
AND object_entity_id IN ({in_clause}) \
GROUP BY subject_entity_id, object_entity_id, predicate",
);
#[derive(diesel::QueryableByName)]
struct EdgeRow {
#[diesel(sql_type = Integer)]
subject_entity_id: i32,
#[diesel(sql_type = Integer)]
object_entity_id: i32,
#[diesel(sql_type = Text)]
predicate: String,
#[diesel(sql_type = BigInt)]
cnt: i64,
}
let mut eq = sql_query(edge_sql).into_boxed();
match persona {
PersonaFilter::Single { user_id, persona_id } => {
eq = eq
.bind::<Integer, _>(*user_id)
.bind::<Text, _>(persona_id.clone());
}
PersonaFilter::All { user_id } => {
eq = eq.bind::<Integer, _>(*user_id);
}
}
let edge_rows: Vec<EdgeRow> = eq
.load(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Edge query error: {}", e))?;
let edges: Vec<GraphEdge> = edge_rows
.into_iter()
.map(|r| GraphEdge {
source: r.subject_entity_id,
target: r.object_entity_id,
predicate: r.predicate,
count: r.cnt,
})
.collect();
Ok(EntityGraph { nodes, edges })
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn find_consolidation_proposals(
&mut self,
cx: &opentelemetry::Context,
threshold: f32,
max_groups: usize,
) -> Result<Vec<ConsolidationGroup>, DbError> {
trace_db_call(cx, "query", "find_consolidation_proposals", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// Pull every non-rejected entity with an embedding. We
// keep 'reviewed' rows in the scan because pre-guard
// legacy data still needs cleanup even if the curator
// marked individual entities reviewed.
let rows: Vec<Entity> = entities
.filter(embedding.is_not_null())
.filter(status.ne("rejected"))
.load::<Entity>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
// Partition by entity_type so a person can't cluster
// with a place via coincidental embedding closeness.
let mut by_type: std::collections::HashMap<String, Vec<usize>> =
std::collections::HashMap::new();
for (idx, e) in rows.iter().enumerate() {
by_type.entry(e.entity_type.clone()).or_default().push(idx);
}
// Decode embeddings once. Skip rows that don't deserialize
// cleanly (corrupted or wrong-dim) rather than failing
// the whole scan.
let mut decoded: Vec<Option<Vec<f32>>> = Vec::with_capacity(rows.len());
for e in &rows {
let v = e
.embedding
.as_ref()
.and_then(|b| Self::deserialize_embedding(b).ok())
.filter(|v| !v.is_empty());
decoded.push(v);
}
// Union-find for transitive clustering.
struct UF {
parent: Vec<usize>,
}
impl UF {
fn new(n: usize) -> Self {
UF {
parent: (0..n).collect(),
}
}
fn find(&mut self, x: usize) -> usize {
let mut r = x;
while self.parent[r] != r {
r = self.parent[r];
}
let mut cur = x;
while self.parent[cur] != r {
let nxt = self.parent[cur];
self.parent[cur] = r;
cur = nxt;
}
r
}
fn union(&mut self, a: usize, b: usize) {
let ra = self.find(a);
let rb = self.find(b);
if ra != rb {
self.parent[ra] = rb;
}
}
}
let mut uf = UF::new(rows.len());
let mut group_min: std::collections::HashMap<usize, f32> =
std::collections::HashMap::new();
let mut group_max: std::collections::HashMap<usize, f32> =
std::collections::HashMap::new();
// Single pass: union and update per-component stats in
// one go. Stats are tracked per root; final pass after
// all unions corrects roots that moved.
type Edge = (usize, usize, f32);
let mut edges: Vec<Edge> = Vec::new();
for indices in by_type.values() {
for a in 0..indices.len() {
let ia = indices[a];
let va = match &decoded[ia] {
Some(v) => v,
None => continue,
};
for b in (a + 1)..indices.len() {
let ib = indices[b];
let vb = match &decoded[ib] {
Some(v) => v,
None => continue,
};
let sim = Self::cosine_similarity(va, vb);
if sim >= threshold {
uf.union(ia, ib);
edges.push((ia, ib, sim));
}
}
}
}
// Second pass over the kept edges to populate stats by
// final root (post-union path compression).
for (a, _b, sim) in &edges {
let root = uf.find(*a);
let mn = group_min.entry(root).or_insert(*sim);
if *sim < *mn {
*mn = *sim;
}
let mx = group_max.entry(root).or_insert(*sim);
if *sim > *mx {
*mx = *sim;
}
}
// Bucket entities by root component, skipping singletons.
let mut groups: std::collections::HashMap<usize, Vec<usize>> =
std::collections::HashMap::new();
for i in 0..rows.len() {
let root = uf.find(i);
if !group_min.contains_key(&root) {
continue;
}
groups.entry(root).or_default().push(i);
}
let mut result: Vec<ConsolidationGroup> = groups
.into_iter()
.filter(|(_, members)| members.len() >= 2)
.map(|(root, members)| ConsolidationGroup {
entities: members.into_iter().map(|i| rows[i].clone()).collect(),
min_cosine: *group_min.get(&root).unwrap_or(&0.0),
max_cosine: *group_max.get(&root).unwrap_or(&0.0),
})
.collect();
// Biggest clusters first; tie-break on the strongest
// pair so the most-obvious dupes surface at the top.
result.sort_by(|a, b| {
b.entities.len().cmp(&a.entities.len()).then_with(|| {
b.max_cosine
.partial_cmp(&a.max_cosine)
.unwrap_or(std::cmp::Ordering::Equal)
})
});
result.truncate(max_groups);
Ok(result)
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_persona_breakdowns_for_entities(
&mut self,
cx: &opentelemetry::Context,
entity_ids: &[i32],
user_id: i32,
) -> Result<std::collections::HashMap<i32, Vec<(String, i64)>>, DbError> {
trace_db_call(cx, "query", "get_persona_breakdowns", |_span| {
use diesel::sql_query;
use diesel::sql_types::{BigInt, Integer, Text};
if entity_ids.is_empty() {
return Ok(std::collections::HashMap::new());
}
// Build the `IN (?, ?, ?…)` placeholder list. We bind
// user_id first, then the entity ids. No real escape risk
// since the values are typed ints, but bound parameters
// are cleaner than format!() either way.
let placeholders = vec!["?"; entity_ids.len()].join(", ");
let sql = format!(
"SELECT subject_entity_id, persona_id, COUNT(*) AS cnt \
FROM entity_facts \
WHERE user_id = ? \
AND status != 'rejected' \
AND subject_entity_id IN ({}) \
GROUP BY subject_entity_id, persona_id \
ORDER BY subject_entity_id, persona_id",
placeholders
);
#[derive(diesel::QueryableByName)]
struct Row {
#[diesel(sql_type = Integer)]
subject_entity_id: i32,
#[diesel(sql_type = Text)]
persona_id: String,
#[diesel(sql_type = BigInt)]
cnt: i64,
}
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let mut q = sql_query(sql).into_boxed();
q = q.bind::<Integer, _>(user_id);
for id in entity_ids {
q = q.bind::<Integer, _>(*id);
}
let rows: Vec<Row> = q
.load(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
let mut out: std::collections::HashMap<i32, Vec<(String, i64)>> =
std::collections::HashMap::with_capacity(entity_ids.len());
for r in rows {
out.entry(r.subject_entity_id)
.or_default()
.push((r.persona_id, r.cnt));
}
Ok(out)
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn update_entity_status(
&mut self,
cx: &opentelemetry::Context,
entity_id: i32,
new_status: &str,
) -> Result<(), DbError> {
trace_db_call(cx, "update", "update_entity_status", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
diesel::update(entities.filter(id.eq(entity_id)))
.set(status.eq(new_status))
.execute(conn.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn update_entity(
&mut self,
cx: &opentelemetry::Context,
entity_id: i32,
patch: EntityPatch,
) -> Result<Option<Entity>, DbError> {
trace_db_call(cx, "update", "update_entity", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let now = chrono::Utc::now().timestamp();
if let Some(ref new_name) = patch.name {
diesel::update(entities.filter(id.eq(entity_id)))
.set((name.eq(new_name), updated_at.eq(now)))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update name error: {}", e))?;
}
if let Some(ref new_desc) = patch.description {
diesel::update(entities.filter(id.eq(entity_id)))
.set((description.eq(new_desc), updated_at.eq(now)))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update description error: {}", e))?;
}
if let Some(ref new_status) = patch.status {
diesel::update(entities.filter(id.eq(entity_id)))
.set((status.eq(new_status), updated_at.eq(now)))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update status error: {}", e))?;
}
if let Some(new_confidence) = patch.confidence {
diesel::update(entities.filter(id.eq(entity_id)))
.set((confidence.eq(new_confidence), updated_at.eq(now)))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update confidence error: {}", e))?;
}
entities
.filter(id.eq(entity_id))
.first::<Entity>(conn.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn delete_entity(
&mut self,
cx: &opentelemetry::Context,
entity_id: i32,
) -> Result<(), DbError> {
trace_db_call(cx, "delete", "delete_entity", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// entity_facts has a CHECK constraint requiring
// `object_entity_id IS NOT NULL OR object_value IS NOT NULL`.
// The FK on object_entity_id is ON DELETE SET NULL — but
// facts that pointed at the deleted entity *only* via the
// entity reference (the common case for relational facts
// like "Alice is_friend_of Bob") have no object_value, so
// SET NULL would leave them with both NULLs and the CHECK
// aborts the whole DELETE. Pre-delete those facts in a
// transaction so the CASCADE / SET NULL chain on what
// remains can fire cleanly.
//
// Long-term fix is to change the FK to ON DELETE CASCADE
// via a table-rebuild migration, but the DAO-side workaround
// is sufficient and less invasive.
conn.transaction::<(), diesel::result::Error, _>(|conn| {
use schema::entity_facts::dsl as ef;
diesel::delete(
ef::entity_facts
.filter(ef::object_entity_id.eq(entity_id))
.filter(ef::object_value.is_null()),
)
.execute(conn)?;
diesel::delete(entities.filter(id.eq(entity_id))).execute(conn)?;
Ok(())
})
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))
})
.map_err(|e| {
// Surface the actual diesel error string before collapsing
// to the opaque DbErrorKind::QueryError.
log::warn!("delete_entity({}) failed: {}", entity_id, e);
DbError::new(DbErrorKind::QueryError)
})
}
fn merge_entities(
&mut self,
cx: &opentelemetry::Context,
source_id: i32,
target_id: i32,
) -> Result<(i64, i64), DbError> {
trace_db_call(cx, "update", "merge_entities", |_span| {
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
conn.transaction::<(i64, i64), diesel::result::Error, _>(|conn| {
use schema::entity_facts::dsl as ef;
// 1. Re-point facts where source is subject
let facts_updated =
diesel::update(ef::entity_facts.filter(ef::subject_entity_id.eq(source_id)))
.set(ef::subject_entity_id.eq(target_id))
.execute(conn)? as i64;
// 2. Re-point facts where source is object
diesel::update(ef::entity_facts.filter(ef::object_entity_id.eq(source_id)))
.set(ef::object_entity_id.eq(Some(target_id)))
.execute(conn)?;
// 3. Copy photo links to target (INSERT OR IGNORE to skip duplicates)
let links_updated = diesel::sql_query(
"INSERT OR IGNORE INTO entity_photo_links (entity_id, library_id, rel_path, role) \
SELECT ?, library_id, rel_path, role FROM entity_photo_links WHERE entity_id = ?",
)
.bind::<diesel::sql_types::Integer, _>(target_id)
.bind::<diesel::sql_types::Integer, _>(source_id)
.execute(conn)? as i64;
// 4. Delete source entity (FK CASCADE removes remaining facts/links)
diesel::delete(
schema::entities::dsl::entities.filter(schema::entities::dsl::id.eq(source_id)),
)
.execute(conn)?;
Ok((facts_updated, links_updated))
})
.map_err(|e| anyhow::anyhow!("Merge transaction error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
// -----------------------------------------------------------------------
// Fact operations
// -----------------------------------------------------------------------
fn upsert_fact(
&mut self,
cx: &opentelemetry::Context,
fact: InsertEntityFact,
) -> Result<(EntityFact, bool), DbError> {
trace_db_call(cx, "insert", "upsert_fact", |_span| {
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// Look for an identical active fact AUTHORED BY THE SAME
// (USER, PERSONA). The same claim from a different persona —
// or from a different user with the same persona name — is a
// separate fact (each persona's voice/confidence is its own),
// not a confidence bump on someone else's row.
let mut dup_query = entity_facts
.filter(subject_entity_id.eq(fact.subject_entity_id))
.filter(predicate.eq(&fact.predicate))
.filter(user_id.eq(fact.user_id))
.filter(persona_id.eq(&fact.persona_id))
.filter(status.ne("rejected"))
.into_boxed();
match &fact.object_entity_id {
Some(oid) => dup_query = dup_query.filter(object_entity_id.eq(oid)),
None => dup_query = dup_query.filter(object_entity_id.is_null()),
}
match &fact.object_value {
Some(ov) => dup_query = dup_query.filter(object_value.eq(ov)),
None => dup_query = dup_query.filter(object_value.is_null()),
}
let existing: Option<EntityFact> = dup_query
.first::<EntityFact>(conn.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
if let Some(existing_fact) = existing {
// Corroborate: bump confidence by 0.1 capped at 0.95
let new_confidence = (existing_fact.confidence + 0.1).min(0.95);
diesel::update(entity_facts.filter(id.eq(existing_fact.id)))
.set(confidence.eq(new_confidence))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update confidence error: {}", e))?;
let updated = entity_facts
.filter(id.eq(existing_fact.id))
.first::<EntityFact>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
Ok((updated, false)) // false = corroborated, not newly created
} else {
diesel::insert_into(entity_facts)
.values(&fact)
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
let inserted = entity_facts
.order(id.desc())
.first::<EntityFact>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
Ok((inserted, true)) // true = newly created
}
})
.map_err(|_| DbError::new(DbErrorKind::InsertError))
}
fn get_facts_for_entity(
&mut self,
cx: &opentelemetry::Context,
entity_id: i32,
persona: &PersonaFilter,
) -> Result<Vec<EntityFact>, DbError> {
trace_db_call(cx, "query", "get_facts_for_entity", |_span| {
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let mut q = entity_facts
.filter(subject_entity_id.eq(entity_id))
.filter(status.ne("rejected"))
.filter(user_id.eq(persona.user_id()))
.into_boxed();
if let PersonaFilter::Single {
persona_id: pid, ..
} = persona
{
q = q.filter(persona_id.eq(pid.clone()));
}
q.load::<EntityFact>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn list_facts(
&mut self,
cx: &opentelemetry::Context,
filter: FactFilter,
) -> Result<(Vec<EntityFact>, i64), DbError> {
trace_db_call(cx, "query", "list_facts", |_span| {
use diesel::dsl::count_star;
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let mut query = entity_facts.into_boxed();
let mut count_query = entity_facts.into_boxed();
// user_id always applies — facts are user-isolated.
let uid = filter.persona.user_id();
query = query.filter(user_id.eq(uid));
count_query = count_query.filter(user_id.eq(uid));
if let Some(eid) = filter.entity_id {
query = query.filter(subject_entity_id.eq(eid));
count_query = count_query.filter(subject_entity_id.eq(eid));
}
let status_val = filter.status.as_deref().unwrap_or("active");
if status_val != "all" {
query = query.filter(status.eq(status_val));
count_query = count_query.filter(status.eq(status_val));
}
if let Some(ref pred) = filter.predicate {
query = query.filter(predicate.eq(pred));
count_query = count_query.filter(predicate.eq(pred));
}
if let PersonaFilter::Single {
persona_id: ref pid,
..
} = filter.persona
{
query = query.filter(persona_id.eq(pid.clone()));
count_query = count_query.filter(persona_id.eq(pid.clone()));
}
let total: i64 = count_query
.select(count_star())
.first(conn.deref_mut())
.unwrap_or(0);
let results = query
.order(created_at.desc())
.limit(filter.limit)
.offset(filter.offset)
.load::<EntityFact>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
Ok((results, total))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn update_fact(
&mut self,
cx: &opentelemetry::Context,
fact_id: i32,
patch: FactPatch,
audit: Option<(&str, &str)>,
) -> Result<Option<EntityFact>, DbError> {
trace_db_call(cx, "update", "update_fact", |_span| {
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let mut touched = false;
if let Some(ref new_predicate) = patch.predicate {
diesel::update(entity_facts.filter(id.eq(fact_id)))
.set(predicate.eq(new_predicate))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
touched = true;
}
if let Some(ref new_value) = patch.object_value {
diesel::update(entity_facts.filter(id.eq(fact_id)))
.set(object_value.eq(new_value))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
touched = true;
}
if let Some(ref new_status) = patch.status {
diesel::update(entity_facts.filter(id.eq(fact_id)))
.set(status.eq(new_status))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
touched = true;
}
if let Some(new_confidence) = patch.confidence {
diesel::update(entity_facts.filter(id.eq(fact_id)))
.set(confidence.eq(new_confidence))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
touched = true;
}
if let Some(new_from) = patch.valid_from {
diesel::update(entity_facts.filter(id.eq(fact_id)))
.set(valid_from.eq(new_from))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
touched = true;
}
if let Some(new_until) = patch.valid_until {
diesel::update(entity_facts.filter(id.eq(fact_id)))
.set(valid_until.eq(new_until))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
touched = true;
}
// Only stamp the audit columns if we actually changed
// something — empty patches stay quiet.
if touched {
let now = chrono::Utc::now().timestamp();
let (model_str, backend_str) = match audit {
Some((m, b)) => (Some(m.to_string()), Some(b.to_string())),
None => (None, None),
};
diesel::update(entity_facts.filter(id.eq(fact_id)))
.set((
last_modified_by_model.eq(model_str),
last_modified_by_backend.eq(backend_str),
last_modified_at.eq(Some(now)),
))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Audit-stamp error: {}", e))?;
}
entity_facts
.filter(id.eq(fact_id))
.first::<EntityFact>(conn.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn update_facts_insight_id(
&mut self,
cx: &opentelemetry::Context,
photo_path: &str,
insight_id: i32,
) -> Result<(), DbError> {
trace_db_call(cx, "update", "update_facts_insight_id", |_span| {
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
diesel::update(
entity_facts
.filter(source_photo.eq(photo_path))
.filter(source_insight_id.is_null()),
)
.set(source_insight_id.eq(insight_id))
.execute(conn.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn delete_fact(&mut self, cx: &opentelemetry::Context, fact_id: i32) -> Result<(), DbError> {
trace_db_call(cx, "delete", "delete_fact", |_span| {
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// Clear dangling supersession pointers from any fact this
// one had retired — there's no FK on superseded_by (SQLite
// can't ALTER ADD with REFERENCES) so we do it manually.
// Sibling rows lose the pointer but stay 'superseded' —
// the user's historical correction survives the cleanup.
conn.transaction::<(), diesel::result::Error, _>(|conn| {
diesel::update(entity_facts.filter(superseded_by.eq(fact_id)))
.set(superseded_by.eq::<Option<i32>>(None))
.execute(conn)?;
diesel::delete(entity_facts.filter(id.eq(fact_id))).execute(conn)?;
Ok(())
})
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))
})
.map_err(|e| {
log::warn!("delete_fact({}) failed: {}", fact_id, e);
DbError::new(DbErrorKind::QueryError)
})
}
fn supersede_fact(
&mut self,
cx: &opentelemetry::Context,
old_id: i32,
new_id: i32,
audit: Option<(&str, &str)>,
) -> Result<Option<EntityFact>, DbError> {
trace_db_call(cx, "update", "supersede_fact", |_span| {
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
if old_id == new_id {
return Err(anyhow::anyhow!(
"supersede_fact: old_id and new_id must differ"
));
}
let now = chrono::Utc::now().timestamp();
let (audit_model, audit_backend) = match audit {
Some((m, b)) => (Some(m.to_string()), Some(b.to_string())),
None => (None, None),
};
conn.transaction::<Option<EntityFact>, diesel::result::Error, _>(|conn| {
// Pull the new fact's valid_from so we can close
// the old fact's interval at the same point.
let new_fact: Option<EntityFact> = entity_facts
.filter(id.eq(new_id))
.first::<EntityFact>(conn)
.optional()?;
let Some(new_fact) = new_fact else {
return Ok(None);
};
// Verify the old fact exists before touching it —
// returning None lets the handler 404 cleanly.
let old_fact: Option<EntityFact> = entity_facts
.filter(id.eq(old_id))
.first::<EntityFact>(conn)
.optional()?;
if old_fact.is_none() {
return Ok(None);
}
// Only stamp valid_until if the user hasn't
// already set it — respecting hand-curated bounds.
let target_valid_until = old_fact
.as_ref()
.and_then(|f| f.valid_until)
.or(new_fact.valid_from);
diesel::update(entity_facts.filter(id.eq(old_id)))
.set((
status.eq("superseded"),
superseded_by.eq(Some(new_id)),
valid_until.eq(target_valid_until),
last_modified_by_model.eq(audit_model.clone()),
last_modified_by_backend.eq(audit_backend.clone()),
last_modified_at.eq(Some(now)),
))
.execute(conn)?;
entity_facts
.filter(id.eq(old_id))
.first::<EntityFact>(conn)
.optional()
})
.map_err(|e| anyhow::anyhow!("Supersede error: {}", e))
})
.map_err(|e| {
log::warn!(
"supersede_fact(old={}, new={}) failed: {}",
old_id,
new_id,
e
);
DbError::new(DbErrorKind::UpdateError)
})
}
fn revert_supersession(
&mut self,
cx: &opentelemetry::Context,
fact_id: i32,
audit: Option<(&str, &str)>,
) -> Result<Option<EntityFact>, DbError> {
trace_db_call(cx, "update", "revert_supersession", |_span| {
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// Verify the fact exists and was in fact superseded —
// reverting an already-active fact is a no-op and the
// handler can 404 / 409 on the None.
let existing: Option<EntityFact> = entity_facts
.filter(id.eq(fact_id))
.first::<EntityFact>(conn.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
let Some(row) = existing else {
return Ok(None);
};
if row.status != "superseded" && row.superseded_by.is_none() {
// Not superseded — nothing to revert. Returning the
// current row is friendlier than 404 here; the
// handler decides what status to return.
return Ok(Some(row));
}
let now = chrono::Utc::now().timestamp();
let (audit_model, audit_backend) = match audit {
Some((m, b)) => (Some(m.to_string()), Some(b.to_string())),
None => (None, None),
};
diesel::update(entity_facts.filter(id.eq(fact_id)))
.set((
status.eq("active"),
superseded_by.eq::<Option<i32>>(None),
// Clear the auto-stamped valid_until. If the user
// had hand-set it pre-supersede we don't have a
// way to know — accepting the loss as the cost of
// a clean revert. Curator can re-bound after.
valid_until.eq::<Option<i64>>(None),
last_modified_by_model.eq(audit_model),
last_modified_by_backend.eq(audit_backend),
last_modified_at.eq(Some(now)),
))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Revert error: {}", e))?;
entity_facts
.filter(id.eq(fact_id))
.first::<EntityFact>(conn.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|e| {
log::warn!("revert_supersession({}) failed: {}", fact_id, e);
DbError::new(DbErrorKind::UpdateError)
})
}
// -----------------------------------------------------------------------
// Photo link operations
// -----------------------------------------------------------------------
fn upsert_photo_link(
&mut self,
cx: &opentelemetry::Context,
link: InsertEntityPhotoLink,
) -> Result<(), DbError> {
trace_db_call(cx, "insert", "upsert_photo_link", |_span| {
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// INSERT OR IGNORE respects the UNIQUE(entity_id, library_id, rel_path, role) constraint
diesel::sql_query(
"INSERT OR IGNORE INTO entity_photo_links (entity_id, library_id, rel_path, role) VALUES (?, ?, ?, ?)"
)
.bind::<diesel::sql_types::Integer, _>(link.entity_id)
.bind::<diesel::sql_types::Integer, _>(link.library_id)
.bind::<diesel::sql_types::Text, _>(&link.file_path)
.bind::<diesel::sql_types::Text, _>(&link.role)
.execute(conn.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::InsertError))
}
fn delete_photo_links_for_file(
&mut self,
cx: &opentelemetry::Context,
file_path_val: &str,
) -> Result<(), DbError> {
trace_db_call(cx, "delete", "delete_photo_links_for_file", |_span| {
use schema::entity_photo_links::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
diesel::delete(entity_photo_links.filter(rel_path.eq(file_path_val)))
.execute(conn.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_links_for_photo(
&mut self,
cx: &opentelemetry::Context,
file_path_val: &str,
) -> Result<Vec<EntityPhotoLink>, DbError> {
trace_db_call(cx, "query", "get_links_for_photo", |_span| {
use schema::entity_photo_links::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
entity_photo_links
.filter(rel_path.eq(file_path_val))
.load::<EntityPhotoLink>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_links_for_entity(
&mut self,
cx: &opentelemetry::Context,
entity_id_val: i32,
) -> Result<Vec<EntityPhotoLink>, DbError> {
trace_db_call(cx, "query", "get_links_for_entity", |_span| {
use schema::entity_photo_links::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
entity_photo_links
.filter(entity_id.eq(entity_id_val))
.load::<EntityPhotoLink>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
// -----------------------------------------------------------------------
// Audit
// -----------------------------------------------------------------------
fn get_recent_activity(
&mut self,
cx: &opentelemetry::Context,
since: i64,
limit: i64,
persona: &PersonaFilter,
) -> Result<RecentActivity, DbError> {
trace_db_call(cx, "query", "get_recent_activity", |_span| {
use schema::entities::dsl as e;
use schema::entity_facts::dsl as ef;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// Entities are shared — recency is global.
let recent_entities = e::entities
.filter(e::created_at.gt(since))
.order(e::created_at.desc())
.limit(limit)
.load::<Entity>(conn.deref_mut())
.map_err(|err| anyhow::anyhow!("Query error: {}", err))?;
let mut facts_q = ef::entity_facts
.filter(ef::created_at.gt(since))
.filter(ef::user_id.eq(persona.user_id()))
.into_boxed();
if let PersonaFilter::Single {
persona_id: pid, ..
} = persona
{
facts_q = facts_q.filter(ef::persona_id.eq(pid.clone()));
}
let recent_facts = facts_q
.order(ef::created_at.desc())
.limit(limit)
.load::<EntityFact>(conn.deref_mut())
.map_err(|err| anyhow::anyhow!("Query error: {}", err))?;
Ok(RecentActivity {
entities: recent_entities,
facts: recent_facts,
})
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
}
#[cfg(test)]
mod tests {
//! Persona scoping + composite-FK invariants for entity_facts.
//!
//! These tests pin three contracts that are silently regressable:
//!
//! 1. PersonaFilter::Single isolates per (user_id, persona_id). Two
//! users with the same 'default' persona must not see each
//! other's facts (multi-user leakage was a latent bug before
//! migration 2026-05-10 added user_id + composite FK).
//!
//! 2. PersonaFilter::All scopes to a single user but unions across
//! that user's personas. Hive-mind for human browsing of
//! /knowledge/*; never crosses users.
//!
//! 3. Deleting a persona CASCADEs to the user's facts under that
//! persona — and ONLY that user's, ONLY that persona's. Other
//! users sharing the persona_id name keep their facts.
//!
//! FKs aren't enabled by default on Diesel's SQLite connection;
//! `connection_with_fks_on()` flips the pragma so the cascade
//! actually fires in tests (mirroring runtime in production).
use super::*;
use crate::database::models::{InsertEntity, InsertEntityFact, InsertPersona};
use crate::database::test::in_memory_db_connection;
use diesel::connection::SimpleConnection;
fn connection_with_fks_on() -> Arc<Mutex<SqliteConnection>> {
let mut conn = in_memory_db_connection();
conn.batch_execute("PRAGMA foreign_keys = ON;")
.expect("enable foreign_keys pragma");
Arc::new(Mutex::new(conn))
}
fn create_user(conn: &Arc<Mutex<SqliteConnection>>, username: &str) -> i32 {
use crate::database::schema::users::dsl as u;
let mut c = conn.lock().unwrap();
diesel::insert_into(u::users)
.values((u::username.eq(username), u::password.eq("x")))
.execute(c.deref_mut())
.unwrap();
u::users
.filter(u::username.eq(username))
.select(u::id)
.first(c.deref_mut())
.unwrap()
}
fn create_persona_row(conn: &Arc<Mutex<SqliteConnection>>, uid: i32, pid: &str) {
use crate::database::schema::personas::dsl as p;
let mut c = conn.lock().unwrap();
diesel::insert_into(p::personas)
.values(InsertPersona {
user_id: uid,
persona_id: pid,
name: pid,
system_prompt: "test prompt",
is_built_in: false,
include_all_memories: false,
created_at: 0,
updated_at: 0,
reviewed_only_facts: false,
allow_agent_corrections: false,
})
.execute(c.deref_mut())
.unwrap();
}
fn make_entity(dao: &mut SqliteKnowledgeDao, name: &str) -> Entity {
let cx = opentelemetry::Context::new();
dao.upsert_entity(
&cx,
InsertEntity {
name: name.to_string(),
entity_type: "person".to_string(),
description: String::new(),
embedding: None,
confidence: 0.6,
status: "active".to_string(),
created_at: 0,
updated_at: 0,
},
)
.unwrap()
}
fn add_fact(
dao: &mut SqliteKnowledgeDao,
subject: i32,
predicate: &str,
value: &str,
user_id: i32,
persona_id: &str,
) -> EntityFact {
let cx = opentelemetry::Context::new();
let (fact, _) = dao
.upsert_fact(
&cx,
InsertEntityFact {
subject_entity_id: subject,
predicate: predicate.to_string(),
object_entity_id: None,
object_value: Some(value.to_string()),
source_photo: None,
source_insight_id: None,
confidence: 0.6,
status: "active".to_string(),
created_at: 0,
persona_id: persona_id.to_string(),
user_id,
valid_from: None,
valid_until: None,
superseded_by: None,
created_by_model: None,
created_by_backend: None,
last_modified_by_model: None,
last_modified_by_backend: None,
last_modified_at: None,
},
)
.unwrap();
fact
}
#[test]
fn persona_filter_single_isolates_per_user() {
// Two users, same persona name. Each user's facts under that
// persona must NOT surface to the other user's reads — this is
// the multi-user leakage that motivated adding user_id.
let cx = opentelemetry::Context::new();
let conn = connection_with_fks_on();
let alice = create_user(&conn, "alice");
let bob = create_user(&conn, "bob");
create_persona_row(&conn, alice, "default");
create_persona_row(&conn, bob, "default");
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
let entity = make_entity(&mut dao, "Cabin");
add_fact(
&mut dao,
entity.id,
"located_in",
"Vermont",
alice,
"default",
);
add_fact(&mut dao, entity.id, "color", "red", bob, "default");
let alice_view = dao
.get_facts_for_entity(
&cx,
entity.id,
&PersonaFilter::Single {
user_id: alice,
persona_id: "default".to_string(),
},
)
.unwrap();
assert_eq!(alice_view.len(), 1);
assert_eq!(alice_view[0].predicate, "located_in");
let bob_view = dao
.get_facts_for_entity(
&cx,
entity.id,
&PersonaFilter::Single {
user_id: bob,
persona_id: "default".to_string(),
},
)
.unwrap();
assert_eq!(bob_view.len(), 1);
assert_eq!(bob_view[0].predicate, "color");
}
#[test]
fn persona_filter_all_unions_across_personas_one_user() {
// include_all_memories=true → All variant: see this user's
// facts across all their personas. Must NOT include other
// users' facts even when they share a persona name.
let cx = opentelemetry::Context::new();
let conn = connection_with_fks_on();
let alice = create_user(&conn, "alice");
let bob = create_user(&conn, "bob");
create_persona_row(&conn, alice, "default");
create_persona_row(&conn, alice, "journal");
create_persona_row(&conn, bob, "default");
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
let entity = make_entity(&mut dao, "Cabin");
add_fact(&mut dao, entity.id, "p1", "v1", alice, "default");
add_fact(&mut dao, entity.id, "p2", "v2", alice, "journal");
add_fact(&mut dao, entity.id, "p3", "v3", bob, "default");
let alice_all = dao
.get_facts_for_entity(&cx, entity.id, &PersonaFilter::All { user_id: alice })
.unwrap();
let predicates: Vec<&str> = alice_all.iter().map(|f| f.predicate.as_str()).collect();
assert_eq!(predicates.len(), 2);
assert!(predicates.contains(&"p1"));
assert!(predicates.contains(&"p2"));
assert!(
!predicates.contains(&"p3"),
"All variant must not leak across users"
);
}
#[test]
fn upsert_fact_dedup_does_not_cross_users() {
// Two users insert the SAME claim (same subject + predicate +
// object_value) under the same persona name. Pre-fix, the
// dedup key was (subject, predicate, persona_id) and bob's
// insert would corroborate alice's row instead of creating a
// new one. Post-fix the key includes user_id, so each user
// gets their own row at confidence=0.6.
let conn = connection_with_fks_on();
let alice = create_user(&conn, "alice");
let bob = create_user(&conn, "bob");
create_persona_row(&conn, alice, "default");
create_persona_row(&conn, bob, "default");
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
let entity = make_entity(&mut dao, "Cabin");
let alice_fact = add_fact(&mut dao, entity.id, "color", "red", alice, "default");
let bob_fact = add_fact(&mut dao, entity.id, "color", "red", bob, "default");
assert_ne!(alice_fact.id, bob_fact.id, "must be separate rows");
assert_eq!(alice_fact.confidence, 0.6);
assert_eq!(
bob_fact.confidence, 0.6,
"bob's row should not have been corroboration-bumped against alice's"
);
}
#[test]
fn deleting_persona_cascades_only_that_users_facts() {
// Composite FK + CASCADE: deleting alice's 'journal' persona
// wipes alice's journal facts but leaves alice's default
// facts AND bob's journal-named facts untouched.
let cx = opentelemetry::Context::new();
let conn = connection_with_fks_on();
let alice = create_user(&conn, "alice");
let bob = create_user(&conn, "bob");
create_persona_row(&conn, alice, "default");
create_persona_row(&conn, alice, "journal");
create_persona_row(&conn, bob, "journal");
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
let entity = make_entity(&mut dao, "Cabin");
add_fact(
&mut dao,
entity.id,
"p_alice_default",
"x",
alice,
"default",
);
add_fact(
&mut dao,
entity.id,
"p_alice_journal",
"y",
alice,
"journal",
);
add_fact(&mut dao, entity.id, "p_bob_journal", "z", bob, "journal");
// Delete alice's journal persona — CASCADE should remove only
// alice's journal facts.
{
use crate::database::schema::personas::dsl as p;
let mut c = conn.lock().unwrap();
diesel::delete(
p::personas
.filter(p::user_id.eq(alice))
.filter(p::persona_id.eq("journal")),
)
.execute(c.deref_mut())
.unwrap();
}
// alice/default survives.
let alice_default = dao
.get_facts_for_entity(
&cx,
entity.id,
&PersonaFilter::Single {
user_id: alice,
persona_id: "default".to_string(),
},
)
.unwrap();
assert_eq!(alice_default.len(), 1);
assert_eq!(alice_default[0].predicate, "p_alice_default");
// alice/journal is gone.
let alice_journal = dao
.get_facts_for_entity(
&cx,
entity.id,
&PersonaFilter::Single {
user_id: alice,
persona_id: "journal".to_string(),
},
)
.unwrap();
assert!(
alice_journal.is_empty(),
"CASCADE should have removed alice's journal facts"
);
// bob/journal — same persona name, different user — untouched.
let bob_journal = dao
.get_facts_for_entity(
&cx,
entity.id,
&PersonaFilter::Single {
user_id: bob,
persona_id: "journal".to_string(),
},
)
.unwrap();
assert_eq!(bob_journal.len(), 1);
assert_eq!(bob_journal[0].predicate, "p_bob_journal");
}
#[test]
fn fact_insert_with_unknown_persona_is_rejected() {
// FK enforcement: inserting a fact whose (user_id, persona_id)
// pair has no matching personas row should fail. Protects
// against typo'd persona ids silently leaking into the table.
let cx = opentelemetry::Context::new();
let conn = connection_with_fks_on();
let alice = create_user(&conn, "alice");
// Note: NO persona row inserted for alice + 'ghost'.
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
let entity = make_entity(&mut dao, "Cabin");
let result = dao.upsert_fact(
&cx,
InsertEntityFact {
subject_entity_id: entity.id,
predicate: "color".to_string(),
object_entity_id: None,
object_value: Some("red".to_string()),
source_photo: None,
source_insight_id: None,
confidence: 0.6,
status: "active".to_string(),
created_at: 0,
persona_id: "ghost".to_string(),
user_id: alice,
valid_from: None,
valid_until: None,
superseded_by: None,
created_by_model: None,
created_by_backend: None,
last_modified_by_model: None,
last_modified_by_backend: None,
last_modified_at: None,
},
);
assert!(
result.is_err(),
"FK should reject fact whose persona doesn't exist"
);
}
#[test]
fn supersede_fact_links_and_stamps_valid_until() {
// Supersession: marking an old fact as replaced by a new one
// flips its status to 'superseded', points superseded_by at
// the new fact, and stamps valid_until from the new fact's
// valid_from (when not already set). Pre-existing valid_until
// on the old fact is respected.
let cx = opentelemetry::Context::new();
let conn = connection_with_fks_on();
let alice = create_user(&conn, "alice");
create_persona_row(&conn, alice, "default");
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
let cameron = make_entity(&mut dao, "Cameron");
let old = add_fact(
&mut dao,
cameron.id,
"is_in_relationship_with",
"X",
alice,
"default",
);
// The new fact carries a valid_from we expect to be stamped
// onto the old fact's valid_until.
let new = add_fact(
&mut dao,
cameron.id,
"is_in_relationship_with",
"Y",
alice,
"default",
);
dao.update_fact(
&cx,
new.id,
FactPatch {
predicate: None,
object_value: None,
status: None,
confidence: None,
valid_from: Some(Some(1640995200)), // 2022-01-01
valid_until: None,
},
None,
)
.unwrap();
let updated = dao
.supersede_fact(&cx, old.id, new.id, None)
.unwrap()
.expect("supersede returned None");
assert_eq!(updated.status, "superseded");
assert_eq!(updated.superseded_by, Some(new.id));
assert_eq!(updated.valid_until, Some(1640995200));
}
#[test]
fn delete_fact_clears_dangling_supersession_pointers() {
// Deleting the newer fact (the supersedeR) leaves the older
// fact's superseded_by dangling — the DAO clears it back to
// NULL in the same transaction so the column never points at
// a missing row. The old fact's status stays 'superseded'
// because the historical correction is still meaningful.
let cx = opentelemetry::Context::new();
let conn = connection_with_fks_on();
let alice = create_user(&conn, "alice");
create_persona_row(&conn, alice, "default");
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
let cameron = make_entity(&mut dao, "Cameron");
let old = add_fact(&mut dao, cameron.id, "lives_in", "NYC", alice, "default");
let new = add_fact(&mut dao, cameron.id, "lives_in", "SF", alice, "default");
dao.supersede_fact(&cx, old.id, new.id, None)
.unwrap()
.unwrap();
dao.delete_fact(&cx, new.id).unwrap();
let rehydrated = dao
.list_facts(
&cx,
FactFilter {
entity_id: Some(cameron.id),
// "all" — the old fact is 'superseded' now, so the
// default 'active' scope would skip it.
status: Some("all".to_string()),
predicate: None,
persona: PersonaFilter::Single {
user_id: alice,
persona_id: "default".to_string(),
},
limit: 10,
offset: 0,
},
)
.unwrap()
.0;
let old_row = rehydrated.iter().find(|f| f.id == old.id).unwrap();
assert_eq!(
old_row.superseded_by, None,
"dangling supersession pointer should be cleared"
);
assert_eq!(
old_row.status, "superseded",
"historical status should survive the supersederr delete"
);
}
#[test]
fn update_fact_can_set_and_clear_valid_time() {
// FactPatch.valid_from / valid_until are Option<Option<i64>>
// so PATCH can distinguish "leave alone" (None) from "set to
// value" (Some(Some(n))) and "clear back to NULL" (Some(None)).
let cx = opentelemetry::Context::new();
let conn = connection_with_fks_on();
let alice = create_user(&conn, "alice");
create_persona_row(&conn, alice, "default");
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
let cameron = make_entity(&mut dao, "Cameron");
let fact = add_fact(
&mut dao,
cameron.id,
"is_in_relationship_with",
"Alex",
alice,
"default",
);
assert_eq!(fact.valid_from, None);
assert_eq!(fact.valid_until, None);
// Set both bounds.
let updated = dao
.update_fact(
&cx,
fact.id,
FactPatch {
predicate: None,
object_value: None,
status: None,
confidence: None,
valid_from: Some(Some(1577836800)), // 2020-01-01
valid_until: Some(Some(1640995200)), // 2022-01-01
},
None,
)
.unwrap()
.unwrap();
assert_eq!(updated.valid_from, Some(1577836800));
assert_eq!(updated.valid_until, Some(1640995200));
// Leave alone: omit both — values persist.
let still = dao
.update_fact(
&cx,
fact.id,
FactPatch {
predicate: None,
object_value: None,
status: None,
confidence: None,
valid_from: None,
valid_until: None,
},
None,
)
.unwrap()
.unwrap();
assert_eq!(still.valid_from, Some(1577836800));
assert_eq!(still.valid_until, Some(1640995200));
// Clear valid_until back to NULL (relationship ongoing again).
let cleared = dao
.update_fact(
&cx,
fact.id,
FactPatch {
predicate: None,
object_value: None,
status: None,
confidence: None,
valid_from: None,
valid_until: Some(None),
},
None,
)
.unwrap()
.unwrap();
assert_eq!(cleared.valid_from, Some(1577836800));
assert_eq!(cleared.valid_until, None);
}
#[test]
fn delete_entity_clears_relational_facts_that_would_violate_check() {
// entity_facts has a CHECK that at least one of object_entity_id /
// object_value is non-null. The FK on object_entity_id is
// ON DELETE SET NULL, which would leave purely-relational facts
// (subject + predicate + object_entity_id, no object_value)
// with both nulls and abort the delete. The DAO pre-deletes
// those rows in a transaction so the parent delete can succeed.
let cx = opentelemetry::Context::new();
let conn = connection_with_fks_on();
let alice = create_user(&conn, "alice");
create_persona_row(&conn, alice, "default");
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
let bob = make_entity(&mut dao, "Bob");
let carol = make_entity(&mut dao, "Carol");
// A relational fact where Carol is the object — exactly the
// shape the CHECK + SET NULL combination would otherwise break.
let (rel_fact, _) = dao
.upsert_fact(
&cx,
InsertEntityFact {
subject_entity_id: bob.id,
predicate: "is_friend_of".to_string(),
object_entity_id: Some(carol.id),
object_value: None,
source_photo: None,
source_insight_id: None,
confidence: 0.6,
status: "active".to_string(),
created_at: 0,
persona_id: "default".to_string(),
user_id: alice,
valid_from: None,
valid_until: None,
superseded_by: None,
created_by_model: None,
created_by_backend: None,
last_modified_by_model: None,
last_modified_by_backend: None,
last_modified_at: None,
},
)
.unwrap();
// A typed fact where Bob is the subject — should survive.
add_fact(&mut dao, bob.id, "has_age", "30", alice, "default");
// Delete Carol — should succeed (relational fact pre-deleted).
dao.delete_entity(&cx, carol.id).unwrap();
assert!(
dao.get_entity_by_id(&cx, carol.id).unwrap().is_none(),
"Carol should be deleted"
);
// The relational fact about Carol should be gone (pre-deleted by
// the DAO's transaction, not SET NULL'd).
let bob_facts = dao
.get_facts_for_entity(
&cx,
bob.id,
&PersonaFilter::Single {
user_id: alice,
persona_id: "default".to_string(),
},
)
.unwrap();
assert!(
!bob_facts.iter().any(|f| f.id == rel_fact.id),
"relational fact pointing at Carol should be removed"
);
// The typed fact survives.
assert!(
bob_facts.iter().any(|f| f.predicate == "has_age"),
"typed fact about Bob should survive Carol's deletion"
);
}
#[test]
fn upsert_entity_collapses_near_duplicate_by_embedding() {
// The agent's pre-flight check uses FTS5 prefix tokens, which
// miss "Sarah" / "Sara" / "Sarah J." pairs. The DAO upsert is
// the safety net: if no exact (name, type) match but the new
// entity's embedding sits above the cosine threshold against an
// existing same-type entity, we collapse instead of inserting.
let cx = opentelemetry::Context::new();
let conn = connection_with_fks_on();
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
let mut emb_a = vec![0.0_f32; 64];
emb_a[0] = 1.0;
emb_a[1] = 0.5;
let mut emb_b_near = emb_a.clone();
emb_b_near[2] = 0.05; // nudge — cosine still well above 0.92
// Seed an existing entity with the embedding.
let seeded = dao
.upsert_entity(
&cx,
InsertEntity {
name: "Sarah".to_string(),
entity_type: "person".to_string(),
description: "tagged friend".to_string(),
embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_a)),
confidence: 0.6,
status: "active".to_string(),
created_at: 0,
updated_at: 0,
},
)
.unwrap();
// A "different name" with a near-identical embedding should
// collapse onto the existing row, not create a new entity.
let collapsed = dao
.upsert_entity(
&cx,
InsertEntity {
name: "Sara".to_string(),
entity_type: "person".to_string(),
description: "tagged friend".to_string(),
embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_b_near)),
confidence: 0.6,
status: "active".to_string(),
created_at: 0,
updated_at: 0,
},
)
.unwrap();
assert_eq!(
collapsed.id, seeded.id,
"near-duplicate by cosine should reuse the existing entity id"
);
// And a clearly-different embedding under a different name should
// still create a new row.
let mut emb_unrelated = vec![0.0_f32; 64];
emb_unrelated[10] = 1.0;
let distinct = dao
.upsert_entity(
&cx,
InsertEntity {
name: "Bob".to_string(),
entity_type: "person".to_string(),
description: String::new(),
embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_unrelated)),
confidence: 0.6,
status: "active".to_string(),
created_at: 0,
updated_at: 0,
},
)
.unwrap();
assert_ne!(
distinct.id, seeded.id,
"unrelated embedding should not collapse"
);
}
}