Two persona-infrastructure correctness fixes that go together because
the second one (FK with CASCADE) requires the first (preventing the
persona row from being mutated out from under its facts).
1. update_persona handler refuses name/systemPrompt edits to built-ins
(409). includeAllMemories stays editable — that's a per-user
preference, not the persona's identity. Mirrors the existing
delete_persona guard. The DAO is intentionally permissive so the
guard sits at the HTTP layer; persona_dao test pins that contract.
2. Migration 2026-05-10 adds user_id to entity_facts and a composite
FK (user_id, persona_id) -> personas(user_id, persona_id) ON DELETE
CASCADE. This closes two issues at once:
- Persona orphans: deleting a custom persona used to leave its
facts dangling forever, readable only via PersonaFilter::All.
CASCADE now wipes them with the persona row.
- Multi-user fact leakage: PersonaFilter::Single("default") used
to surface every user's default-scoped facts. PersonaFilter is
now { user_id, persona_id } and all read paths
(get_facts_for_entity, list_facts, get_recent_activity) filter
on user_id first. upsert_fact's dedup key extends to user_id so
identical claims under shared persona names from different
users no longer corroborate-bump each other's confidence.
- user_id threads from Claims.sub.parse::<i32>().unwrap_or(1) at
the chat / insight handlers through ChatTurnRequest, the
streaming agentic loop, execute_tool, and into the leaf tools
(tool_store_fact, tool_recall_facts_for_photo). The ".unwrap_or(1)"
accommodates Apollo's service token whose sub is non-numeric on
legacy mints.
- Backfill picks the smallest user_id matching each legacy fact's
persona_id so the FK holds for already-stored rows.
Five new knowledge_dao tests with FK-on connection: persona scoping
isolation, All-variant union per-user, dedup not crossing users,
CASCADE delete, FK rejection of unknown personas. Plus
dao_update_does_not_block_built_ins documenting where the
HTTP-layer guard lives.
Apollo coordinates separately — the matching changes there add the
/api/personas proxy and start sending persona_id on photo-chat turns.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1280 lines
47 KiB
Rust
1280 lines
47 KiB
Rust
#![allow(dead_code)]
|
|
|
|
use diesel::prelude::*;
|
|
use diesel::sqlite::SqliteConnection;
|
|
use std::ops::DerefMut;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use crate::database::models::{
|
|
Entity, EntityFact, EntityPhotoLink, InsertEntity, InsertEntityFact, InsertEntityPhotoLink,
|
|
};
|
|
use crate::database::schema;
|
|
use crate::database::{DbError, DbErrorKind, connect};
|
|
use crate::otel::trace_db_call;
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Entity type normalisation
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// Canonicalise a model-supplied entity_type to a consistent lowercase form.
|
|
/// Weak models frequently vary capitalisation ("Person" vs "person") or use
|
|
/// synonym types ("location" vs "place"). Normalising here prevents duplicate
|
|
/// entities that differ only by type spelling.
|
|
pub(crate) fn normalize_entity_type(raw: &str) -> String {
|
|
match raw.to_lowercase().as_str() {
|
|
"person" | "people" | "human" | "individual" | "contact" => "person",
|
|
"place" | "location" | "venue" | "site" | "area" | "landmark" => "place",
|
|
"event" | "occasion" | "activity" | "celebration" => "event",
|
|
"thing" | "object" | "item" | "product" => "thing",
|
|
other => other,
|
|
}
|
|
.to_string()
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Filter / patch types
|
|
// ---------------------------------------------------------------------------
|
|
|
|
pub struct EntityFilter {
|
|
pub entity_type: Option<String>,
|
|
/// "active" | "reviewed" | "rejected" | "all"
|
|
pub status: Option<String>,
|
|
/// LIKE match on name and description
|
|
pub search: Option<String>,
|
|
pub limit: i64,
|
|
pub offset: i64,
|
|
}
|
|
|
|
pub struct FactFilter {
|
|
pub entity_id: Option<i32>,
|
|
/// "active" | "reviewed" | "rejected" | "all"
|
|
pub status: Option<String>,
|
|
pub predicate: Option<String>,
|
|
pub persona: PersonaFilter,
|
|
pub limit: i64,
|
|
pub offset: i64,
|
|
}
|
|
|
|
/// Persona scoping for fact reads. `Single` filters to one persona's
|
|
/// view; `All` is the hive-mind read used when a persona has
|
|
/// `include_all_memories=true` in the personas table. Both variants
|
|
/// carry `user_id` because facts are user-isolated — two users with
|
|
/// the same 'default' persona must not see each other's facts (this
|
|
/// is enforced at the schema level by the composite FK in migration
|
|
/// 2026-05-10). Entities and photo-links are always shared and don't
|
|
/// take a persona filter.
|
|
#[derive(Debug, Clone)]
|
|
pub enum PersonaFilter {
|
|
Single { user_id: i32, persona_id: String },
|
|
All { user_id: i32 },
|
|
}
|
|
|
|
impl PersonaFilter {
|
|
pub fn user_id(&self) -> i32 {
|
|
match self {
|
|
Self::Single { user_id, .. } => *user_id,
|
|
Self::All { user_id } => *user_id,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct EntityPatch {
|
|
pub name: Option<String>,
|
|
pub description: Option<String>,
|
|
pub status: Option<String>,
|
|
pub confidence: Option<f32>,
|
|
}
|
|
|
|
pub struct FactPatch {
|
|
pub predicate: Option<String>,
|
|
pub object_value: Option<String>,
|
|
pub status: Option<String>,
|
|
pub confidence: Option<f32>,
|
|
}
|
|
|
|
pub struct RecentActivity {
|
|
pub entities: Vec<Entity>,
|
|
pub facts: Vec<EntityFact>,
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Trait
|
|
// ---------------------------------------------------------------------------
|
|
|
|
pub trait KnowledgeDao: Sync + Send {
|
|
// --- Entity ---
|
|
fn upsert_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity: InsertEntity,
|
|
) -> Result<Entity, DbError>;
|
|
|
|
fn get_entity_by_id(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
id: i32,
|
|
) -> Result<Option<Entity>, DbError>;
|
|
|
|
fn get_entity_by_name(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
name: &str,
|
|
entity_type: Option<&str>,
|
|
) -> Result<Vec<Entity>, DbError>;
|
|
|
|
fn get_entities_with_embeddings(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_type: Option<&str>,
|
|
) -> Result<Vec<Entity>, DbError>;
|
|
|
|
fn list_entities(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
filter: EntityFilter,
|
|
) -> Result<(Vec<Entity>, i64), DbError>;
|
|
|
|
fn update_entity_status(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
id: i32,
|
|
status: &str,
|
|
) -> Result<(), DbError>;
|
|
|
|
fn update_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
id: i32,
|
|
patch: EntityPatch,
|
|
) -> Result<Option<Entity>, DbError>;
|
|
|
|
fn delete_entity(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>;
|
|
|
|
fn merge_entities(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
source_id: i32,
|
|
target_id: i32,
|
|
) -> Result<(i64, i64), DbError>;
|
|
|
|
// --- Facts ---
|
|
fn upsert_fact(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
fact: InsertEntityFact,
|
|
) -> Result<(EntityFact, bool), DbError>;
|
|
|
|
fn get_facts_for_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id: i32,
|
|
persona: &PersonaFilter,
|
|
) -> Result<Vec<EntityFact>, DbError>;
|
|
|
|
fn list_facts(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
filter: FactFilter,
|
|
) -> Result<(Vec<EntityFact>, i64), DbError>;
|
|
|
|
fn update_fact(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
id: i32,
|
|
patch: FactPatch,
|
|
) -> Result<Option<EntityFact>, DbError>;
|
|
|
|
fn update_facts_insight_id(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
source_photo: &str,
|
|
insight_id: i32,
|
|
) -> Result<(), DbError>;
|
|
|
|
fn delete_fact(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>;
|
|
|
|
// --- Photo links ---
|
|
fn upsert_photo_link(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
link: InsertEntityPhotoLink,
|
|
) -> Result<(), DbError>;
|
|
|
|
fn delete_photo_links_for_file(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
file_path: &str,
|
|
) -> Result<(), DbError>;
|
|
|
|
fn get_links_for_photo(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
file_path: &str,
|
|
) -> Result<Vec<EntityPhotoLink>, DbError>;
|
|
|
|
fn get_links_for_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id: i32,
|
|
) -> Result<Vec<EntityPhotoLink>, DbError>;
|
|
|
|
// --- Audit ---
|
|
fn get_recent_activity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
since: i64,
|
|
limit: i64,
|
|
persona: &PersonaFilter,
|
|
) -> Result<RecentActivity, DbError>;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// SQLite implementation
|
|
// ---------------------------------------------------------------------------
|
|
|
|
pub struct SqliteKnowledgeDao {
|
|
connection: Arc<Mutex<SqliteConnection>>,
|
|
}
|
|
|
|
impl Default for SqliteKnowledgeDao {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl SqliteKnowledgeDao {
|
|
pub fn new() -> Self {
|
|
SqliteKnowledgeDao {
|
|
connection: Arc::new(Mutex::new(connect())),
|
|
}
|
|
}
|
|
|
|
pub fn from_connection(conn: Arc<Mutex<SqliteConnection>>) -> Self {
|
|
SqliteKnowledgeDao { connection: conn }
|
|
}
|
|
|
|
fn serialize_embedding(vec: &[f32]) -> Vec<u8> {
|
|
vec.iter().flat_map(|f| f.to_le_bytes()).collect()
|
|
}
|
|
|
|
fn deserialize_embedding(bytes: &[u8]) -> Result<Vec<f32>, DbError> {
|
|
if !bytes.len().is_multiple_of(4) {
|
|
return Err(DbError::new(DbErrorKind::QueryError));
|
|
}
|
|
Ok(bytes
|
|
.chunks_exact(4)
|
|
.map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
|
|
.collect())
|
|
}
|
|
|
|
pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
|
|
if a.len() != b.len() || a.is_empty() {
|
|
return 0.0;
|
|
}
|
|
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
|
|
let mag_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
|
|
let mag_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
|
|
if mag_a == 0.0 || mag_b == 0.0 {
|
|
0.0
|
|
} else {
|
|
dot / (mag_a * mag_b)
|
|
}
|
|
}
|
|
}
|
|
|
|
impl KnowledgeDao for SqliteKnowledgeDao {
|
|
// -----------------------------------------------------------------------
|
|
// Entity operations
|
|
// -----------------------------------------------------------------------
|
|
|
|
fn upsert_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity: InsertEntity,
|
|
) -> Result<Entity, DbError> {
|
|
trace_db_call(cx, "insert", "upsert_entity", |_span| {
|
|
use schema::entities::dsl::*;
|
|
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
// Normalise type before lookup and insert so that model variations
|
|
// ("Person" / "person", "location" / "place") collapse to one row.
|
|
let entity = InsertEntity {
|
|
entity_type: normalize_entity_type(&entity.entity_type),
|
|
..entity
|
|
};
|
|
|
|
// Case-insensitive lookup by name + entity_type.
|
|
// Use lower() on both sides so existing dirty rows ("Person") still match.
|
|
let name_lower = entity.name.to_lowercase();
|
|
let type_lower = entity.entity_type.to_lowercase();
|
|
let existing: Option<Entity> = entities
|
|
.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&format!(
|
|
"lower(name) = '{}' AND lower(entity_type) = '{}'",
|
|
name_lower.replace('\'', "''"),
|
|
type_lower.replace('\'', "''")
|
|
)))
|
|
.first::<Entity>(conn.deref_mut())
|
|
.optional()
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
if let Some(existing_entity) = existing {
|
|
// Update description, embedding, updated_at
|
|
diesel::update(entities.filter(id.eq(existing_entity.id)))
|
|
.set((
|
|
description.eq(&entity.description),
|
|
embedding.eq(&entity.embedding),
|
|
updated_at.eq(entity.updated_at),
|
|
))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
|
|
|
entities
|
|
.filter(id.eq(existing_entity.id))
|
|
.first::<Entity>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
} else {
|
|
diesel::insert_into(entities)
|
|
.values(&entity)
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
|
|
|
|
entities
|
|
.order(id.desc())
|
|
.first::<Entity>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
}
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::InsertError))
|
|
}
|
|
|
|
fn get_entity_by_id(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id: i32,
|
|
) -> Result<Option<Entity>, DbError> {
|
|
trace_db_call(cx, "query", "get_entity_by_id", |_span| {
|
|
use schema::entities::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
entities
|
|
.filter(id.eq(entity_id))
|
|
.first::<Entity>(conn.deref_mut())
|
|
.optional()
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn get_entity_by_name(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_name: &str,
|
|
entity_type_filter: Option<&str>,
|
|
) -> Result<Vec<Entity>, DbError> {
|
|
trace_db_call(cx, "query", "get_entity_by_name", |_span| {
|
|
use schema::entities::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
let name_lower = entity_name.to_lowercase().replace('\'', "''");
|
|
let mut sql = format!("lower(name) = '{}'", name_lower);
|
|
if let Some(et) = entity_type_filter {
|
|
sql.push_str(&format!(" AND entity_type = '{}'", et.replace('\'', "''")));
|
|
}
|
|
sql.push_str(" AND status != 'rejected'");
|
|
|
|
entities
|
|
.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&sql))
|
|
.load::<Entity>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn get_entities_with_embeddings(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_type_filter: Option<&str>,
|
|
) -> Result<Vec<Entity>, DbError> {
|
|
trace_db_call(cx, "query", "get_entities_with_embeddings", |_span| {
|
|
use schema::entities::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
let mut query = entities
|
|
.filter(embedding.is_not_null())
|
|
.filter(status.ne("rejected"))
|
|
.into_boxed();
|
|
|
|
if let Some(et) = entity_type_filter {
|
|
query = query.filter(entity_type.eq(et));
|
|
}
|
|
|
|
query
|
|
.load::<Entity>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn list_entities(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
filter: EntityFilter,
|
|
) -> Result<(Vec<Entity>, i64), DbError> {
|
|
trace_db_call(cx, "query", "list_entities", |_span| {
|
|
use diesel::dsl::count_star;
|
|
use schema::entities::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
let mut query = entities.into_boxed();
|
|
|
|
if let Some(ref et) = filter.entity_type {
|
|
query = query.filter(entity_type.eq(et));
|
|
}
|
|
|
|
let status_val = filter.status.as_deref().unwrap_or("active");
|
|
if status_val != "all" {
|
|
query = query.filter(status.eq(status_val));
|
|
}
|
|
|
|
if let Some(ref search_term) = filter.search {
|
|
let pattern = format!("%{}%", search_term);
|
|
query = query.filter(name.like(pattern.clone()).or(description.like(pattern)));
|
|
}
|
|
|
|
// Count with same filters applied (build separately since boxed query is consumed)
|
|
let mut count_query = entities.into_boxed();
|
|
if let Some(ref et) = filter.entity_type {
|
|
count_query = count_query.filter(entity_type.eq(et));
|
|
}
|
|
let status_val2 = filter.status.as_deref().unwrap_or("active");
|
|
if status_val2 != "all" {
|
|
count_query = count_query.filter(status.eq(status_val2));
|
|
}
|
|
if let Some(ref search_term) = filter.search {
|
|
let pattern = format!("%{}%", search_term);
|
|
count_query =
|
|
count_query.filter(name.like(pattern.clone()).or(description.like(pattern)));
|
|
}
|
|
let total: i64 = count_query
|
|
.select(count_star())
|
|
.first(conn.deref_mut())
|
|
.unwrap_or(0);
|
|
|
|
let results = query
|
|
.order(updated_at.desc())
|
|
.limit(filter.limit)
|
|
.offset(filter.offset)
|
|
.load::<Entity>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
Ok((results, total))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn update_entity_status(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id: i32,
|
|
new_status: &str,
|
|
) -> Result<(), DbError> {
|
|
trace_db_call(cx, "update", "update_entity_status", |_span| {
|
|
use schema::entities::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
diesel::update(entities.filter(id.eq(entity_id)))
|
|
.set(status.eq(new_status))
|
|
.execute(conn.deref_mut())
|
|
.map(|_| ())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
|
|
}
|
|
|
|
fn update_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id: i32,
|
|
patch: EntityPatch,
|
|
) -> Result<Option<Entity>, DbError> {
|
|
trace_db_call(cx, "update", "update_entity", |_span| {
|
|
use schema::entities::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
let now = chrono::Utc::now().timestamp();
|
|
|
|
if let Some(ref new_name) = patch.name {
|
|
diesel::update(entities.filter(id.eq(entity_id)))
|
|
.set((name.eq(new_name), updated_at.eq(now)))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update name error: {}", e))?;
|
|
}
|
|
if let Some(ref new_desc) = patch.description {
|
|
diesel::update(entities.filter(id.eq(entity_id)))
|
|
.set((description.eq(new_desc), updated_at.eq(now)))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update description error: {}", e))?;
|
|
}
|
|
if let Some(ref new_status) = patch.status {
|
|
diesel::update(entities.filter(id.eq(entity_id)))
|
|
.set((status.eq(new_status), updated_at.eq(now)))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update status error: {}", e))?;
|
|
}
|
|
if let Some(new_confidence) = patch.confidence {
|
|
diesel::update(entities.filter(id.eq(entity_id)))
|
|
.set((confidence.eq(new_confidence), updated_at.eq(now)))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update confidence error: {}", e))?;
|
|
}
|
|
|
|
entities
|
|
.filter(id.eq(entity_id))
|
|
.first::<Entity>(conn.deref_mut())
|
|
.optional()
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
|
|
}
|
|
|
|
fn delete_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id: i32,
|
|
) -> Result<(), DbError> {
|
|
trace_db_call(cx, "delete", "delete_entity", |_span| {
|
|
use schema::entities::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
diesel::delete(entities.filter(id.eq(entity_id)))
|
|
.execute(conn.deref_mut())
|
|
.map(|_| ())
|
|
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn merge_entities(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
source_id: i32,
|
|
target_id: i32,
|
|
) -> Result<(i64, i64), DbError> {
|
|
trace_db_call(cx, "update", "merge_entities", |_span| {
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
conn.transaction::<(i64, i64), diesel::result::Error, _>(|conn| {
|
|
use schema::entity_facts::dsl as ef;
|
|
|
|
// 1. Re-point facts where source is subject
|
|
let facts_updated =
|
|
diesel::update(ef::entity_facts.filter(ef::subject_entity_id.eq(source_id)))
|
|
.set(ef::subject_entity_id.eq(target_id))
|
|
.execute(conn)? as i64;
|
|
|
|
// 2. Re-point facts where source is object
|
|
diesel::update(ef::entity_facts.filter(ef::object_entity_id.eq(source_id)))
|
|
.set(ef::object_entity_id.eq(Some(target_id)))
|
|
.execute(conn)?;
|
|
|
|
// 3. Copy photo links to target (INSERT OR IGNORE to skip duplicates)
|
|
let links_updated = diesel::sql_query(
|
|
"INSERT OR IGNORE INTO entity_photo_links (entity_id, library_id, rel_path, role) \
|
|
SELECT ?, library_id, rel_path, role FROM entity_photo_links WHERE entity_id = ?",
|
|
)
|
|
.bind::<diesel::sql_types::Integer, _>(target_id)
|
|
.bind::<diesel::sql_types::Integer, _>(source_id)
|
|
.execute(conn)? as i64;
|
|
|
|
// 4. Delete source entity (FK CASCADE removes remaining facts/links)
|
|
diesel::delete(
|
|
schema::entities::dsl::entities.filter(schema::entities::dsl::id.eq(source_id)),
|
|
)
|
|
.execute(conn)?;
|
|
|
|
Ok((facts_updated, links_updated))
|
|
})
|
|
.map_err(|e| anyhow::anyhow!("Merge transaction error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Fact operations
|
|
// -----------------------------------------------------------------------
|
|
|
|
fn upsert_fact(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
fact: InsertEntityFact,
|
|
) -> Result<(EntityFact, bool), DbError> {
|
|
trace_db_call(cx, "insert", "upsert_fact", |_span| {
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
// Look for an identical active fact AUTHORED BY THE SAME
|
|
// (USER, PERSONA). The same claim from a different persona —
|
|
// or from a different user with the same persona name — is a
|
|
// separate fact (each persona's voice/confidence is its own),
|
|
// not a confidence bump on someone else's row.
|
|
let mut dup_query = entity_facts
|
|
.filter(subject_entity_id.eq(fact.subject_entity_id))
|
|
.filter(predicate.eq(&fact.predicate))
|
|
.filter(user_id.eq(fact.user_id))
|
|
.filter(persona_id.eq(&fact.persona_id))
|
|
.filter(status.ne("rejected"))
|
|
.into_boxed();
|
|
|
|
match &fact.object_entity_id {
|
|
Some(oid) => dup_query = dup_query.filter(object_entity_id.eq(oid)),
|
|
None => dup_query = dup_query.filter(object_entity_id.is_null()),
|
|
}
|
|
match &fact.object_value {
|
|
Some(ov) => dup_query = dup_query.filter(object_value.eq(ov)),
|
|
None => dup_query = dup_query.filter(object_value.is_null()),
|
|
}
|
|
|
|
let existing: Option<EntityFact> = dup_query
|
|
.first::<EntityFact>(conn.deref_mut())
|
|
.optional()
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
if let Some(existing_fact) = existing {
|
|
// Corroborate: bump confidence by 0.1 capped at 0.95
|
|
let new_confidence = (existing_fact.confidence + 0.1).min(0.95);
|
|
diesel::update(entity_facts.filter(id.eq(existing_fact.id)))
|
|
.set(confidence.eq(new_confidence))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update confidence error: {}", e))?;
|
|
|
|
let updated = entity_facts
|
|
.filter(id.eq(existing_fact.id))
|
|
.first::<EntityFact>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
Ok((updated, false)) // false = corroborated, not newly created
|
|
} else {
|
|
diesel::insert_into(entity_facts)
|
|
.values(&fact)
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
|
|
|
|
let inserted = entity_facts
|
|
.order(id.desc())
|
|
.first::<EntityFact>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
Ok((inserted, true)) // true = newly created
|
|
}
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::InsertError))
|
|
}
|
|
|
|
fn get_facts_for_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id: i32,
|
|
persona: &PersonaFilter,
|
|
) -> Result<Vec<EntityFact>, DbError> {
|
|
trace_db_call(cx, "query", "get_facts_for_entity", |_span| {
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
let mut q = entity_facts
|
|
.filter(subject_entity_id.eq(entity_id))
|
|
.filter(status.ne("rejected"))
|
|
.filter(user_id.eq(persona.user_id()))
|
|
.into_boxed();
|
|
if let PersonaFilter::Single { persona_id: pid, .. } = persona {
|
|
q = q.filter(persona_id.eq(pid.clone()));
|
|
}
|
|
q.load::<EntityFact>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn list_facts(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
filter: FactFilter,
|
|
) -> Result<(Vec<EntityFact>, i64), DbError> {
|
|
trace_db_call(cx, "query", "list_facts", |_span| {
|
|
use diesel::dsl::count_star;
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
let mut query = entity_facts.into_boxed();
|
|
let mut count_query = entity_facts.into_boxed();
|
|
|
|
// user_id always applies — facts are user-isolated.
|
|
let uid = filter.persona.user_id();
|
|
query = query.filter(user_id.eq(uid));
|
|
count_query = count_query.filter(user_id.eq(uid));
|
|
|
|
if let Some(eid) = filter.entity_id {
|
|
query = query.filter(subject_entity_id.eq(eid));
|
|
count_query = count_query.filter(subject_entity_id.eq(eid));
|
|
}
|
|
let status_val = filter.status.as_deref().unwrap_or("active");
|
|
if status_val != "all" {
|
|
query = query.filter(status.eq(status_val));
|
|
count_query = count_query.filter(status.eq(status_val));
|
|
}
|
|
if let Some(ref pred) = filter.predicate {
|
|
query = query.filter(predicate.eq(pred));
|
|
count_query = count_query.filter(predicate.eq(pred));
|
|
}
|
|
if let PersonaFilter::Single { persona_id: ref pid, .. } = filter.persona {
|
|
query = query.filter(persona_id.eq(pid.clone()));
|
|
count_query = count_query.filter(persona_id.eq(pid.clone()));
|
|
}
|
|
|
|
let total: i64 = count_query
|
|
.select(count_star())
|
|
.first(conn.deref_mut())
|
|
.unwrap_or(0);
|
|
|
|
let results = query
|
|
.order(created_at.desc())
|
|
.limit(filter.limit)
|
|
.offset(filter.offset)
|
|
.load::<EntityFact>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
Ok((results, total))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn update_fact(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
fact_id: i32,
|
|
patch: FactPatch,
|
|
) -> Result<Option<EntityFact>, DbError> {
|
|
trace_db_call(cx, "update", "update_fact", |_span| {
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
if let Some(ref new_predicate) = patch.predicate {
|
|
diesel::update(entity_facts.filter(id.eq(fact_id)))
|
|
.set(predicate.eq(new_predicate))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
|
}
|
|
if let Some(ref new_value) = patch.object_value {
|
|
diesel::update(entity_facts.filter(id.eq(fact_id)))
|
|
.set(object_value.eq(new_value))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
|
}
|
|
if let Some(ref new_status) = patch.status {
|
|
diesel::update(entity_facts.filter(id.eq(fact_id)))
|
|
.set(status.eq(new_status))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
|
}
|
|
if let Some(new_confidence) = patch.confidence {
|
|
diesel::update(entity_facts.filter(id.eq(fact_id)))
|
|
.set(confidence.eq(new_confidence))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
|
}
|
|
|
|
entity_facts
|
|
.filter(id.eq(fact_id))
|
|
.first::<EntityFact>(conn.deref_mut())
|
|
.optional()
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
|
|
}
|
|
|
|
fn update_facts_insight_id(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
photo_path: &str,
|
|
insight_id: i32,
|
|
) -> Result<(), DbError> {
|
|
trace_db_call(cx, "update", "update_facts_insight_id", |_span| {
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
diesel::update(
|
|
entity_facts
|
|
.filter(source_photo.eq(photo_path))
|
|
.filter(source_insight_id.is_null()),
|
|
)
|
|
.set(source_insight_id.eq(insight_id))
|
|
.execute(conn.deref_mut())
|
|
.map(|_| ())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
|
|
}
|
|
|
|
fn delete_fact(&mut self, cx: &opentelemetry::Context, fact_id: i32) -> Result<(), DbError> {
|
|
trace_db_call(cx, "delete", "delete_fact", |_span| {
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
diesel::delete(entity_facts.filter(id.eq(fact_id)))
|
|
.execute(conn.deref_mut())
|
|
.map(|_| ())
|
|
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Photo link operations
|
|
// -----------------------------------------------------------------------
|
|
|
|
fn upsert_photo_link(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
link: InsertEntityPhotoLink,
|
|
) -> Result<(), DbError> {
|
|
trace_db_call(cx, "insert", "upsert_photo_link", |_span| {
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
// INSERT OR IGNORE respects the UNIQUE(entity_id, library_id, rel_path, role) constraint
|
|
diesel::sql_query(
|
|
"INSERT OR IGNORE INTO entity_photo_links (entity_id, library_id, rel_path, role) VALUES (?, ?, ?, ?)"
|
|
)
|
|
.bind::<diesel::sql_types::Integer, _>(link.entity_id)
|
|
.bind::<diesel::sql_types::Integer, _>(link.library_id)
|
|
.bind::<diesel::sql_types::Text, _>(&link.file_path)
|
|
.bind::<diesel::sql_types::Text, _>(&link.role)
|
|
.execute(conn.deref_mut())
|
|
.map(|_| ())
|
|
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::InsertError))
|
|
}
|
|
|
|
fn delete_photo_links_for_file(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
file_path_val: &str,
|
|
) -> Result<(), DbError> {
|
|
trace_db_call(cx, "delete", "delete_photo_links_for_file", |_span| {
|
|
use schema::entity_photo_links::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
diesel::delete(entity_photo_links.filter(rel_path.eq(file_path_val)))
|
|
.execute(conn.deref_mut())
|
|
.map(|_| ())
|
|
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn get_links_for_photo(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
file_path_val: &str,
|
|
) -> Result<Vec<EntityPhotoLink>, DbError> {
|
|
trace_db_call(cx, "query", "get_links_for_photo", |_span| {
|
|
use schema::entity_photo_links::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
entity_photo_links
|
|
.filter(rel_path.eq(file_path_val))
|
|
.load::<EntityPhotoLink>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn get_links_for_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id_val: i32,
|
|
) -> Result<Vec<EntityPhotoLink>, DbError> {
|
|
trace_db_call(cx, "query", "get_links_for_entity", |_span| {
|
|
use schema::entity_photo_links::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
entity_photo_links
|
|
.filter(entity_id.eq(entity_id_val))
|
|
.load::<EntityPhotoLink>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Audit
|
|
// -----------------------------------------------------------------------
|
|
|
|
fn get_recent_activity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
since: i64,
|
|
limit: i64,
|
|
persona: &PersonaFilter,
|
|
) -> Result<RecentActivity, DbError> {
|
|
trace_db_call(cx, "query", "get_recent_activity", |_span| {
|
|
use schema::entities::dsl as e;
|
|
use schema::entity_facts::dsl as ef;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
// Entities are shared — recency is global.
|
|
let recent_entities = e::entities
|
|
.filter(e::created_at.gt(since))
|
|
.order(e::created_at.desc())
|
|
.limit(limit)
|
|
.load::<Entity>(conn.deref_mut())
|
|
.map_err(|err| anyhow::anyhow!("Query error: {}", err))?;
|
|
|
|
let mut facts_q = ef::entity_facts
|
|
.filter(ef::created_at.gt(since))
|
|
.filter(ef::user_id.eq(persona.user_id()))
|
|
.into_boxed();
|
|
if let PersonaFilter::Single { persona_id: pid, .. } = persona {
|
|
facts_q = facts_q.filter(ef::persona_id.eq(pid.clone()));
|
|
}
|
|
let recent_facts = facts_q
|
|
.order(ef::created_at.desc())
|
|
.limit(limit)
|
|
.load::<EntityFact>(conn.deref_mut())
|
|
.map_err(|err| anyhow::anyhow!("Query error: {}", err))?;
|
|
|
|
Ok(RecentActivity {
|
|
entities: recent_entities,
|
|
facts: recent_facts,
|
|
})
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
//! Persona scoping + composite-FK invariants for entity_facts.
|
|
//!
|
|
//! These tests pin three contracts that are silently regressable:
|
|
//!
|
|
//! 1. PersonaFilter::Single isolates per (user_id, persona_id). Two
|
|
//! users with the same 'default' persona must not see each
|
|
//! other's facts (multi-user leakage was a latent bug before
|
|
//! migration 2026-05-10 added user_id + composite FK).
|
|
//!
|
|
//! 2. PersonaFilter::All scopes to a single user but unions across
|
|
//! that user's personas. Hive-mind for human browsing of
|
|
//! /knowledge/*; never crosses users.
|
|
//!
|
|
//! 3. Deleting a persona CASCADEs to the user's facts under that
|
|
//! persona — and ONLY that user's, ONLY that persona's. Other
|
|
//! users sharing the persona_id name keep their facts.
|
|
//!
|
|
//! FKs aren't enabled by default on Diesel's SQLite connection;
|
|
//! `connection_with_fks_on()` flips the pragma so the cascade
|
|
//! actually fires in tests (mirroring runtime in production).
|
|
|
|
use super::*;
|
|
use crate::database::models::{InsertEntity, InsertEntityFact, InsertPersona};
|
|
use crate::database::test::in_memory_db_connection;
|
|
use diesel::connection::SimpleConnection;
|
|
|
|
fn connection_with_fks_on() -> Arc<Mutex<SqliteConnection>> {
|
|
let mut conn = in_memory_db_connection();
|
|
conn.batch_execute("PRAGMA foreign_keys = ON;")
|
|
.expect("enable foreign_keys pragma");
|
|
Arc::new(Mutex::new(conn))
|
|
}
|
|
|
|
fn create_user(conn: &Arc<Mutex<SqliteConnection>>, username: &str) -> i32 {
|
|
use crate::database::schema::users::dsl as u;
|
|
let mut c = conn.lock().unwrap();
|
|
diesel::insert_into(u::users)
|
|
.values((u::username.eq(username), u::password.eq("x")))
|
|
.execute(c.deref_mut())
|
|
.unwrap();
|
|
u::users
|
|
.filter(u::username.eq(username))
|
|
.select(u::id)
|
|
.first(c.deref_mut())
|
|
.unwrap()
|
|
}
|
|
|
|
fn create_persona_row(conn: &Arc<Mutex<SqliteConnection>>, uid: i32, pid: &str) {
|
|
use crate::database::schema::personas::dsl as p;
|
|
let mut c = conn.lock().unwrap();
|
|
diesel::insert_into(p::personas)
|
|
.values(InsertPersona {
|
|
user_id: uid,
|
|
persona_id: pid,
|
|
name: pid,
|
|
system_prompt: "test prompt",
|
|
is_built_in: false,
|
|
include_all_memories: false,
|
|
created_at: 0,
|
|
updated_at: 0,
|
|
})
|
|
.execute(c.deref_mut())
|
|
.unwrap();
|
|
}
|
|
|
|
fn make_entity(dao: &mut SqliteKnowledgeDao, name: &str) -> Entity {
|
|
let cx = opentelemetry::Context::new();
|
|
dao.upsert_entity(
|
|
&cx,
|
|
InsertEntity {
|
|
name: name.to_string(),
|
|
entity_type: "person".to_string(),
|
|
description: String::new(),
|
|
embedding: None,
|
|
confidence: 0.6,
|
|
status: "active".to_string(),
|
|
created_at: 0,
|
|
updated_at: 0,
|
|
},
|
|
)
|
|
.unwrap()
|
|
}
|
|
|
|
fn add_fact(
|
|
dao: &mut SqliteKnowledgeDao,
|
|
subject: i32,
|
|
predicate: &str,
|
|
value: &str,
|
|
user_id: i32,
|
|
persona_id: &str,
|
|
) -> EntityFact {
|
|
let cx = opentelemetry::Context::new();
|
|
let (fact, _) = dao
|
|
.upsert_fact(
|
|
&cx,
|
|
InsertEntityFact {
|
|
subject_entity_id: subject,
|
|
predicate: predicate.to_string(),
|
|
object_entity_id: None,
|
|
object_value: Some(value.to_string()),
|
|
source_photo: None,
|
|
source_insight_id: None,
|
|
confidence: 0.6,
|
|
status: "active".to_string(),
|
|
created_at: 0,
|
|
persona_id: persona_id.to_string(),
|
|
user_id,
|
|
},
|
|
)
|
|
.unwrap();
|
|
fact
|
|
}
|
|
|
|
#[test]
|
|
fn persona_filter_single_isolates_per_user() {
|
|
// Two users, same persona name. Each user's facts under that
|
|
// persona must NOT surface to the other user's reads — this is
|
|
// the multi-user leakage that motivated adding user_id.
|
|
let cx = opentelemetry::Context::new();
|
|
let conn = connection_with_fks_on();
|
|
let alice = create_user(&conn, "alice");
|
|
let bob = create_user(&conn, "bob");
|
|
create_persona_row(&conn, alice, "default");
|
|
create_persona_row(&conn, bob, "default");
|
|
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
let entity = make_entity(&mut dao, "Cabin");
|
|
|
|
add_fact(&mut dao, entity.id, "located_in", "Vermont", alice, "default");
|
|
add_fact(&mut dao, entity.id, "color", "red", bob, "default");
|
|
|
|
let alice_view = dao
|
|
.get_facts_for_entity(
|
|
&cx,
|
|
entity.id,
|
|
&PersonaFilter::Single {
|
|
user_id: alice,
|
|
persona_id: "default".to_string(),
|
|
},
|
|
)
|
|
.unwrap();
|
|
assert_eq!(alice_view.len(), 1);
|
|
assert_eq!(alice_view[0].predicate, "located_in");
|
|
|
|
let bob_view = dao
|
|
.get_facts_for_entity(
|
|
&cx,
|
|
entity.id,
|
|
&PersonaFilter::Single {
|
|
user_id: bob,
|
|
persona_id: "default".to_string(),
|
|
},
|
|
)
|
|
.unwrap();
|
|
assert_eq!(bob_view.len(), 1);
|
|
assert_eq!(bob_view[0].predicate, "color");
|
|
}
|
|
|
|
#[test]
|
|
fn persona_filter_all_unions_across_personas_one_user() {
|
|
// include_all_memories=true → All variant: see this user's
|
|
// facts across all their personas. Must NOT include other
|
|
// users' facts even when they share a persona name.
|
|
let cx = opentelemetry::Context::new();
|
|
let conn = connection_with_fks_on();
|
|
let alice = create_user(&conn, "alice");
|
|
let bob = create_user(&conn, "bob");
|
|
create_persona_row(&conn, alice, "default");
|
|
create_persona_row(&conn, alice, "journal");
|
|
create_persona_row(&conn, bob, "default");
|
|
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
let entity = make_entity(&mut dao, "Cabin");
|
|
|
|
add_fact(&mut dao, entity.id, "p1", "v1", alice, "default");
|
|
add_fact(&mut dao, entity.id, "p2", "v2", alice, "journal");
|
|
add_fact(&mut dao, entity.id, "p3", "v3", bob, "default");
|
|
|
|
let alice_all = dao
|
|
.get_facts_for_entity(&cx, entity.id, &PersonaFilter::All { user_id: alice })
|
|
.unwrap();
|
|
let predicates: Vec<&str> = alice_all.iter().map(|f| f.predicate.as_str()).collect();
|
|
assert_eq!(predicates.len(), 2);
|
|
assert!(predicates.contains(&"p1"));
|
|
assert!(predicates.contains(&"p2"));
|
|
assert!(
|
|
!predicates.contains(&"p3"),
|
|
"All variant must not leak across users"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn upsert_fact_dedup_does_not_cross_users() {
|
|
// Two users insert the SAME claim (same subject + predicate +
|
|
// object_value) under the same persona name. Pre-fix, the
|
|
// dedup key was (subject, predicate, persona_id) and bob's
|
|
// insert would corroborate alice's row instead of creating a
|
|
// new one. Post-fix the key includes user_id, so each user
|
|
// gets their own row at confidence=0.6.
|
|
let conn = connection_with_fks_on();
|
|
let alice = create_user(&conn, "alice");
|
|
let bob = create_user(&conn, "bob");
|
|
create_persona_row(&conn, alice, "default");
|
|
create_persona_row(&conn, bob, "default");
|
|
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
let entity = make_entity(&mut dao, "Cabin");
|
|
|
|
let alice_fact = add_fact(&mut dao, entity.id, "color", "red", alice, "default");
|
|
let bob_fact = add_fact(&mut dao, entity.id, "color", "red", bob, "default");
|
|
|
|
assert_ne!(alice_fact.id, bob_fact.id, "must be separate rows");
|
|
assert_eq!(alice_fact.confidence, 0.6);
|
|
assert_eq!(
|
|
bob_fact.confidence, 0.6,
|
|
"bob's row should not have been corroboration-bumped against alice's"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn deleting_persona_cascades_only_that_users_facts() {
|
|
// Composite FK + CASCADE: deleting alice's 'journal' persona
|
|
// wipes alice's journal facts but leaves alice's default
|
|
// facts AND bob's journal-named facts untouched.
|
|
let cx = opentelemetry::Context::new();
|
|
let conn = connection_with_fks_on();
|
|
let alice = create_user(&conn, "alice");
|
|
let bob = create_user(&conn, "bob");
|
|
create_persona_row(&conn, alice, "default");
|
|
create_persona_row(&conn, alice, "journal");
|
|
create_persona_row(&conn, bob, "journal");
|
|
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
let entity = make_entity(&mut dao, "Cabin");
|
|
|
|
add_fact(&mut dao, entity.id, "p_alice_default", "x", alice, "default");
|
|
add_fact(&mut dao, entity.id, "p_alice_journal", "y", alice, "journal");
|
|
add_fact(&mut dao, entity.id, "p_bob_journal", "z", bob, "journal");
|
|
|
|
// Delete alice's journal persona — CASCADE should remove only
|
|
// alice's journal facts.
|
|
{
|
|
use crate::database::schema::personas::dsl as p;
|
|
let mut c = conn.lock().unwrap();
|
|
diesel::delete(
|
|
p::personas
|
|
.filter(p::user_id.eq(alice))
|
|
.filter(p::persona_id.eq("journal")),
|
|
)
|
|
.execute(c.deref_mut())
|
|
.unwrap();
|
|
}
|
|
|
|
// alice/default survives.
|
|
let alice_default = dao
|
|
.get_facts_for_entity(
|
|
&cx,
|
|
entity.id,
|
|
&PersonaFilter::Single {
|
|
user_id: alice,
|
|
persona_id: "default".to_string(),
|
|
},
|
|
)
|
|
.unwrap();
|
|
assert_eq!(alice_default.len(), 1);
|
|
assert_eq!(alice_default[0].predicate, "p_alice_default");
|
|
|
|
// alice/journal is gone.
|
|
let alice_journal = dao
|
|
.get_facts_for_entity(
|
|
&cx,
|
|
entity.id,
|
|
&PersonaFilter::Single {
|
|
user_id: alice,
|
|
persona_id: "journal".to_string(),
|
|
},
|
|
)
|
|
.unwrap();
|
|
assert!(
|
|
alice_journal.is_empty(),
|
|
"CASCADE should have removed alice's journal facts"
|
|
);
|
|
|
|
// bob/journal — same persona name, different user — untouched.
|
|
let bob_journal = dao
|
|
.get_facts_for_entity(
|
|
&cx,
|
|
entity.id,
|
|
&PersonaFilter::Single {
|
|
user_id: bob,
|
|
persona_id: "journal".to_string(),
|
|
},
|
|
)
|
|
.unwrap();
|
|
assert_eq!(bob_journal.len(), 1);
|
|
assert_eq!(bob_journal[0].predicate, "p_bob_journal");
|
|
}
|
|
|
|
#[test]
|
|
fn fact_insert_with_unknown_persona_is_rejected() {
|
|
// FK enforcement: inserting a fact whose (user_id, persona_id)
|
|
// pair has no matching personas row should fail. Protects
|
|
// against typo'd persona ids silently leaking into the table.
|
|
let cx = opentelemetry::Context::new();
|
|
let conn = connection_with_fks_on();
|
|
let alice = create_user(&conn, "alice");
|
|
// Note: NO persona row inserted for alice + 'ghost'.
|
|
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
let entity = make_entity(&mut dao, "Cabin");
|
|
|
|
let result = dao.upsert_fact(
|
|
&cx,
|
|
InsertEntityFact {
|
|
subject_entity_id: entity.id,
|
|
predicate: "color".to_string(),
|
|
object_entity_id: None,
|
|
object_value: Some("red".to_string()),
|
|
source_photo: None,
|
|
source_insight_id: None,
|
|
confidence: 0.6,
|
|
status: "active".to_string(),
|
|
created_at: 0,
|
|
persona_id: "ghost".to_string(),
|
|
user_id: alice,
|
|
},
|
|
);
|
|
assert!(
|
|
result.is_err(),
|
|
"FK should reject fact whose persona doesn't exist"
|
|
);
|
|
}
|
|
}
|