dad0220587
Every map_err(|_| DbError::new(...)) and map_err(|_| anyhow!("..."))
in the database layer was discarding the actual Diesel/SQLite error,
making failures impossible to diagnose from logs.
- Add DbError::log() that logs the source error before converting
- Replace all ~130 swallowed outer map_err closures with DbError::log
- Replace all ~47 swallowed inner anyhow closures to include the
source error in the message
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2847 lines
109 KiB
Rust
2847 lines
109 KiB
Rust
#![allow(dead_code)]
|
|
|
|
use diesel::prelude::*;
|
|
use diesel::sqlite::SqliteConnection;
|
|
use std::ops::DerefMut;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use crate::database::models::{
|
|
Entity, EntityFact, EntityPhotoLink, InsertEntity, InsertEntityFact, InsertEntityPhotoLink,
|
|
};
|
|
use crate::database::schema;
|
|
use crate::database::{DbError, DbErrorKind, connect};
|
|
use crate::otel::trace_db_call;
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Entity type normalisation
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// Canonicalise a model-supplied entity_type to a consistent lowercase form.
|
|
/// Weak models frequently vary capitalisation ("Person" vs "person") or use
|
|
/// synonym types ("location" vs "place"). Normalising here prevents duplicate
|
|
/// entities that differ only by type spelling.
|
|
pub(crate) fn normalize_entity_type(raw: &str) -> String {
|
|
match raw.to_lowercase().as_str() {
|
|
"person" | "people" | "human" | "individual" | "contact" => "person",
|
|
"place" | "location" | "venue" | "site" | "area" | "landmark" => "place",
|
|
"event" | "occasion" | "activity" | "celebration" => "event",
|
|
"thing" | "object" | "item" | "product" => "thing",
|
|
other => other,
|
|
}
|
|
.to_string()
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Filter / patch types
|
|
// ---------------------------------------------------------------------------
|
|
|
|
pub struct EntityFilter {
|
|
pub entity_type: Option<String>,
|
|
/// "active" | "reviewed" | "rejected" | "all"
|
|
pub status: Option<String>,
|
|
/// LIKE match on name and description
|
|
pub search: Option<String>,
|
|
pub limit: i64,
|
|
pub offset: i64,
|
|
}
|
|
|
|
/// Sort key for the curation list. Name = alphabetical clustering
|
|
/// (good for spotting near-duplicates like Sara / Sarah / Sarah J.).
|
|
/// FactCount = surface heavily-used entities first, demote 0-fact
|
|
/// noise to the bottom. UpdatedDesc = legacy "newest activity first".
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
pub enum EntitySort {
|
|
UpdatedDesc,
|
|
NameAsc,
|
|
FactCountDesc,
|
|
}
|
|
|
|
pub struct FactFilter {
|
|
pub entity_id: Option<i32>,
|
|
/// "active" | "reviewed" | "rejected" | "all"
|
|
pub status: Option<String>,
|
|
pub predicate: Option<String>,
|
|
pub persona: PersonaFilter,
|
|
pub limit: i64,
|
|
pub offset: i64,
|
|
}
|
|
|
|
/// Persona scoping for fact reads. `Single` filters to one persona's
|
|
/// view; `All` is the hive-mind read used when a persona has
|
|
/// `include_all_memories=true` in the personas table. Both variants
|
|
/// carry `user_id` because facts are user-isolated — two users with
|
|
/// the same 'default' persona must not see each other's facts (this
|
|
/// is enforced at the schema level by the composite FK in migration
|
|
/// 2026-05-10). Entities and photo-links are always shared and don't
|
|
/// take a persona filter.
|
|
#[derive(Debug, Clone)]
|
|
pub enum PersonaFilter {
|
|
Single { user_id: i32, persona_id: String },
|
|
All { user_id: i32 },
|
|
}
|
|
|
|
impl PersonaFilter {
|
|
pub fn user_id(&self) -> i32 {
|
|
match self {
|
|
Self::Single { user_id, .. } => *user_id,
|
|
Self::All { user_id } => *user_id,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct EntityPatch {
|
|
pub name: Option<String>,
|
|
pub description: Option<String>,
|
|
pub status: Option<String>,
|
|
pub confidence: Option<f32>,
|
|
}
|
|
|
|
pub struct FactPatch {
|
|
pub predicate: Option<String>,
|
|
pub object_value: Option<String>,
|
|
pub status: Option<String>,
|
|
pub confidence: Option<f32>,
|
|
/// Real-world valid-time bounds. Outer Some = "patch this column";
|
|
/// inner Some(val) = set to that unix-seconds value; inner None =
|
|
/// clear back to NULL ("unbounded"). The double-Option lets the
|
|
/// HTTP layer distinguish "field omitted" (leave alone) from
|
|
/// "field sent as null" (clear) — needed for these specifically
|
|
/// because there's no sentinel string-empty equivalent like the
|
|
/// other fields have.
|
|
pub valid_from: Option<Option<i64>>,
|
|
pub valid_until: Option<Option<i64>>,
|
|
}
|
|
|
|
pub struct RecentActivity {
|
|
pub entities: Vec<Entity>,
|
|
pub facts: Vec<EntityFact>,
|
|
}
|
|
|
|
/// A near-duplicate cluster found by `find_consolidation_proposals`.
|
|
/// `min_cosine` / `max_cosine` are summary stats over the pairwise
|
|
/// edges inside the group — gives the curator a sense of "how tight"
|
|
/// the cluster is before clicking in.
|
|
#[derive(Debug, Clone)]
|
|
pub struct ConsolidationGroup {
|
|
pub entities: Vec<Entity>,
|
|
pub min_cosine: f32,
|
|
pub max_cosine: f32,
|
|
}
|
|
|
|
/// Graph view payload: every entity that has at least one fact
|
|
/// becomes a node; every relational fact (object_entity_id set)
|
|
/// becomes an edge between subject and object. Multiple facts with
|
|
/// the same (subject, object, predicate) collapse into one edge
|
|
/// with a count so the UI can fan them out as one weighted line.
|
|
#[derive(Debug, Clone)]
|
|
pub struct GraphNode {
|
|
pub id: i32,
|
|
pub name: String,
|
|
pub entity_type: String,
|
|
pub fact_count: i64,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct GraphEdge {
|
|
pub source: i32,
|
|
pub target: i32,
|
|
pub predicate: String,
|
|
pub count: i64,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct EntityGraph {
|
|
pub nodes: Vec<GraphNode>,
|
|
pub edges: Vec<GraphEdge>,
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Trait
|
|
// ---------------------------------------------------------------------------
|
|
|
|
pub trait KnowledgeDao: Sync + Send {
|
|
// --- Entity ---
|
|
fn upsert_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity: InsertEntity,
|
|
) -> Result<Entity, DbError>;
|
|
|
|
fn get_entity_by_id(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
id: i32,
|
|
) -> Result<Option<Entity>, DbError>;
|
|
|
|
fn get_entity_by_name(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
name: &str,
|
|
entity_type: Option<&str>,
|
|
) -> Result<Vec<Entity>, DbError>;
|
|
|
|
fn get_entities_with_embeddings(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_type: Option<&str>,
|
|
) -> Result<Vec<Entity>, DbError>;
|
|
|
|
fn list_entities(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
filter: EntityFilter,
|
|
) -> Result<(Vec<Entity>, i64), DbError>;
|
|
|
|
/// List entities alongside a persona-scoped fact count for each.
|
|
/// Powers the curation surface — sorting by fact count surfaces
|
|
/// the heavily-used entities and demotes 0-fact noise. Counting
|
|
/// is restricted to non-rejected facts under the active persona
|
|
/// scope so a switch in the persona picker re-orders the list.
|
|
fn list_entities_with_fact_counts(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
filter: EntityFilter,
|
|
sort: EntitySort,
|
|
persona: &PersonaFilter,
|
|
) -> Result<(Vec<(Entity, i64)>, i64), DbError>;
|
|
|
|
/// Aggregate the user's active+reviewed facts by predicate so
|
|
/// the curation UI can flag noisy verbs ("expressed", "said") and
|
|
/// bulk-reject. Persona-scoped via the existing PersonaFilter
|
|
/// pattern. Sorted by count desc.
|
|
fn get_predicate_stats(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
persona: &PersonaFilter,
|
|
limit: usize,
|
|
) -> Result<Vec<(String, i64)>, DbError>;
|
|
|
|
/// Bulk reject every active fact under a given predicate
|
|
/// (persona-scoped). Returns the number of rows touched. Used by
|
|
/// the predicate-cleanup UI to nuke noise verbs in one click.
|
|
/// Stamps last_modified_* with the caller-supplied audit so the
|
|
/// action shows up in the recent-edits feed.
|
|
fn bulk_reject_facts_by_predicate(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
persona: &PersonaFilter,
|
|
predicate: &str,
|
|
audit: Option<(&str, &str)>,
|
|
) -> Result<usize, DbError>;
|
|
|
|
/// Build a graph snapshot — entities as nodes (fact count from
|
|
/// the active persona scope), relational facts as edges. Used
|
|
/// by the curation UI's graph view. Filters:
|
|
/// - entity_type: optional, restricts nodes to one type
|
|
/// - node_limit: caps the number of nodes; lower-fact-count
|
|
/// entities drop first
|
|
///
|
|
/// Edges between dropped entities are pruned. Persona scoping
|
|
/// affects fact_count + edge inclusion (rejected / superseded
|
|
/// excluded; All vs Single mirrors the existing pattern).
|
|
fn build_entity_graph(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_type: Option<&str>,
|
|
node_limit: usize,
|
|
persona: &PersonaFilter,
|
|
) -> Result<EntityGraph, DbError>;
|
|
|
|
/// Find groups of near-duplicate entities that the upsert-time
|
|
/// cosine guard didn't catch (it runs at ~0.92; this scan runs
|
|
/// at a lower threshold to surface the "probably same" tier that
|
|
/// needs human review). Groups are formed via union-find over
|
|
/// the cosine-adjacency graph, partitioned by entity_type so a
|
|
/// person can't cluster with a place. Returns groups of >= 2
|
|
/// entities, sorted by size desc then by max pairwise cosine.
|
|
/// Trimmed to `max_groups`.
|
|
fn find_consolidation_proposals(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
threshold: f32,
|
|
max_groups: usize,
|
|
) -> Result<Vec<ConsolidationGroup>, DbError>;
|
|
|
|
/// Batch fetch per-persona fact counts for a set of entities,
|
|
/// scoped to one user. Returns map of entity_id → list of
|
|
/// (persona_id, count). Used by the curation UI to show "this
|
|
/// entity has 0 facts in your active persona but 12 in journal"
|
|
/// so the curator knows where to find the existing knowledge.
|
|
/// Rejected facts excluded; superseded included (they're history,
|
|
/// not noise).
|
|
fn get_persona_breakdowns_for_entities(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_ids: &[i32],
|
|
user_id: i32,
|
|
) -> Result<std::collections::HashMap<i32, Vec<(String, i64)>>, DbError>;
|
|
|
|
fn update_entity_status(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
id: i32,
|
|
status: &str,
|
|
) -> Result<(), DbError>;
|
|
|
|
fn update_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
id: i32,
|
|
patch: EntityPatch,
|
|
) -> Result<Option<Entity>, DbError>;
|
|
|
|
fn delete_entity(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>;
|
|
|
|
fn merge_entities(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
source_id: i32,
|
|
target_id: i32,
|
|
) -> Result<(i64, i64), DbError>;
|
|
|
|
// --- Facts ---
|
|
fn upsert_fact(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
fact: InsertEntityFact,
|
|
) -> Result<(EntityFact, bool), DbError>;
|
|
|
|
fn get_facts_for_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id: i32,
|
|
persona: &PersonaFilter,
|
|
) -> Result<Vec<EntityFact>, DbError>;
|
|
|
|
fn list_facts(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
filter: FactFilter,
|
|
) -> Result<(Vec<EntityFact>, i64), DbError>;
|
|
|
|
/// Update a fact. `audit` stamps the row's `last_modified_*`
|
|
/// columns — None = legacy internal callers without provenance
|
|
/// context; HTTP passes `Some(("manual", "manual"))`; the agent
|
|
/// passes its loop-time model + backend so the audit trail can
|
|
/// distinguish human edits from agent corrections.
|
|
fn update_fact(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
id: i32,
|
|
patch: FactPatch,
|
|
audit: Option<(&str, &str)>,
|
|
) -> Result<Option<EntityFact>, DbError>;
|
|
|
|
fn update_facts_insight_id(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
source_photo: &str,
|
|
insight_id: i32,
|
|
) -> Result<(), DbError>;
|
|
|
|
fn delete_fact(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>;
|
|
|
|
/// Mark an old fact as superseded by a new one. Atomically:
|
|
/// - reads the new fact's valid_from
|
|
/// - sets old.superseded_by = new_id
|
|
/// - sets old.status = 'superseded'
|
|
/// - stamps old.valid_until = new.valid_from (if not already
|
|
/// set; otherwise leaves it)
|
|
/// - stamps old.last_modified_* from `audit`
|
|
///
|
|
/// Returns the updated old fact. Errors if either id is missing.
|
|
fn supersede_fact(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
old_id: i32,
|
|
new_id: i32,
|
|
audit: Option<(&str, &str)>,
|
|
) -> Result<Option<EntityFact>, DbError>;
|
|
|
|
/// Undo a supersession: clear `superseded_by`, flip status back to
|
|
/// 'active', clear `valid_until` (we don't know if it was auto-
|
|
/// stamped by the supersede or hand-set, so the conservative reset
|
|
/// is to clear it — user can re-bound after). Stamps the audit
|
|
/// columns so the revert is itself attributable.
|
|
///
|
|
/// Returns the restored fact. Errors if the fact doesn't exist or
|
|
/// wasn't superseded in the first place (no-op semantics).
|
|
fn revert_supersession(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
fact_id: i32,
|
|
audit: Option<(&str, &str)>,
|
|
) -> Result<Option<EntityFact>, DbError>;
|
|
|
|
// --- Photo links ---
|
|
fn upsert_photo_link(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
link: InsertEntityPhotoLink,
|
|
) -> Result<(), DbError>;
|
|
|
|
fn delete_photo_links_for_file(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
file_path: &str,
|
|
) -> Result<(), DbError>;
|
|
|
|
fn get_links_for_photo(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
file_path: &str,
|
|
) -> Result<Vec<EntityPhotoLink>, DbError>;
|
|
|
|
fn get_links_for_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id: i32,
|
|
) -> Result<Vec<EntityPhotoLink>, DbError>;
|
|
|
|
// --- Audit ---
|
|
fn get_recent_activity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
since: i64,
|
|
limit: i64,
|
|
persona: &PersonaFilter,
|
|
) -> Result<RecentActivity, DbError>;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// SQLite implementation
|
|
// ---------------------------------------------------------------------------
|
|
|
|
pub struct SqliteKnowledgeDao {
|
|
connection: Arc<Mutex<SqliteConnection>>,
|
|
}
|
|
|
|
impl Default for SqliteKnowledgeDao {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl SqliteKnowledgeDao {
|
|
pub fn new() -> Self {
|
|
SqliteKnowledgeDao {
|
|
connection: Arc::new(Mutex::new(connect())),
|
|
}
|
|
}
|
|
|
|
pub fn from_connection(conn: Arc<Mutex<SqliteConnection>>) -> Self {
|
|
SqliteKnowledgeDao { connection: conn }
|
|
}
|
|
|
|
fn serialize_embedding(vec: &[f32]) -> Vec<u8> {
|
|
vec.iter().flat_map(|f| f.to_le_bytes()).collect()
|
|
}
|
|
|
|
fn deserialize_embedding(bytes: &[u8]) -> Result<Vec<f32>, DbError> {
|
|
if !bytes.len().is_multiple_of(4) {
|
|
return Err(DbError::new(DbErrorKind::QueryError));
|
|
}
|
|
Ok(bytes
|
|
.chunks_exact(4)
|
|
.map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
|
|
.collect())
|
|
}
|
|
|
|
pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
|
|
if a.len() != b.len() || a.is_empty() {
|
|
return 0.0;
|
|
}
|
|
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
|
|
let mag_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
|
|
let mag_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
|
|
if mag_a == 0.0 || mag_b == 0.0 {
|
|
0.0
|
|
} else {
|
|
dot / (mag_a * mag_b)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Cosine-similarity threshold above which a new entity collapses into an
|
|
/// existing same-type entity at upsert time. The agent's pre-flight name
|
|
/// search uses FTS5 prefix tokens, which misses near-dupes like
|
|
/// "Sarah" / "Sara" / "Sarah J." that share a description-rich embedding.
|
|
/// Override via `ENTITY_DEDUP_COSINE_THRESHOLD` env var when tuning.
|
|
const ENTITY_DEDUP_COSINE_THRESHOLD_DEFAULT: f32 = 0.92;
|
|
|
|
fn entity_dedup_cosine_threshold() -> f32 {
|
|
std::env::var("ENTITY_DEDUP_COSINE_THRESHOLD")
|
|
.ok()
|
|
.and_then(|v| v.parse::<f32>().ok())
|
|
.unwrap_or(ENTITY_DEDUP_COSINE_THRESHOLD_DEFAULT)
|
|
}
|
|
|
|
impl KnowledgeDao for SqliteKnowledgeDao {
|
|
// -----------------------------------------------------------------------
|
|
// Entity operations
|
|
// -----------------------------------------------------------------------
|
|
|
|
fn upsert_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity: InsertEntity,
|
|
) -> Result<Entity, DbError> {
|
|
trace_db_call(cx, "insert", "upsert_entity", |_span| {
|
|
use schema::entities::dsl::*;
|
|
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
// Normalise type before lookup and insert so that model variations
|
|
// ("Person" / "person", "location" / "place") collapse to one row.
|
|
let entity = InsertEntity {
|
|
entity_type: normalize_entity_type(&entity.entity_type),
|
|
..entity
|
|
};
|
|
|
|
// Case-insensitive lookup by name + entity_type.
|
|
// Use lower() on both sides so existing dirty rows ("Person") still match.
|
|
let name_lower = entity.name.to_lowercase();
|
|
let type_lower = entity.entity_type.to_lowercase();
|
|
let mut existing: Option<Entity> = entities
|
|
.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&format!(
|
|
"lower(name) = '{}' AND lower(entity_type) = '{}'",
|
|
name_lower.replace('\'', "''"),
|
|
type_lower.replace('\'', "''")
|
|
)))
|
|
.first::<Entity>(conn.deref_mut())
|
|
.optional()
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
// Fuzzy-match fallback: if no exact name match and the incoming
|
|
// entity carries an embedding, compare against same-type entities'
|
|
// embeddings and collapse if any are above the cosine threshold.
|
|
if existing.is_none()
|
|
&& let Some(new_emb_bytes) = entity.embedding.as_ref()
|
|
&& let Ok(new_vec) = Self::deserialize_embedding(new_emb_bytes)
|
|
&& !new_vec.is_empty()
|
|
{
|
|
let threshold = entity_dedup_cosine_threshold();
|
|
let candidates: Vec<Entity> = entities
|
|
.filter(embedding.is_not_null())
|
|
.filter(entity_type.eq(&entity.entity_type))
|
|
.filter(status.ne("rejected"))
|
|
.load::<Entity>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
let mut best: Option<(Entity, f32)> = None;
|
|
for cand in candidates {
|
|
let Some(cand_bytes) = cand.embedding.as_ref() else {
|
|
continue;
|
|
};
|
|
let Ok(cand_vec) = Self::deserialize_embedding(cand_bytes) else {
|
|
continue;
|
|
};
|
|
let sim = Self::cosine_similarity(&new_vec, &cand_vec);
|
|
if sim >= threshold && best.as_ref().is_none_or(|(_, s)| sim > *s) {
|
|
best = Some((cand, sim));
|
|
}
|
|
}
|
|
|
|
if let Some((cand, sim)) = best {
|
|
log::info!(
|
|
"entity dedup: collapsing new '{}' ({}) into existing '{}' (id={}, cos={:.3})",
|
|
entity.name,
|
|
entity.entity_type,
|
|
cand.name,
|
|
cand.id,
|
|
sim
|
|
);
|
|
existing = Some(cand);
|
|
}
|
|
}
|
|
|
|
if let Some(existing_entity) = existing {
|
|
// Update description, embedding, updated_at
|
|
diesel::update(entities.filter(id.eq(existing_entity.id)))
|
|
.set((
|
|
description.eq(&entity.description),
|
|
embedding.eq(&entity.embedding),
|
|
updated_at.eq(entity.updated_at),
|
|
))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
|
|
|
entities
|
|
.filter(id.eq(existing_entity.id))
|
|
.first::<Entity>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
} else {
|
|
diesel::insert_into(entities)
|
|
.values(&entity)
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
|
|
|
|
entities
|
|
.order(id.desc())
|
|
.first::<Entity>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
}
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::InsertError, e))
|
|
}
|
|
|
|
fn get_entity_by_id(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id: i32,
|
|
) -> Result<Option<Entity>, DbError> {
|
|
trace_db_call(cx, "query", "get_entity_by_id", |_span| {
|
|
use schema::entities::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
entities
|
|
.filter(id.eq(entity_id))
|
|
.first::<Entity>(conn.deref_mut())
|
|
.optional()
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
|
|
fn get_entity_by_name(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_name: &str,
|
|
entity_type_filter: Option<&str>,
|
|
) -> Result<Vec<Entity>, DbError> {
|
|
trace_db_call(cx, "query", "get_entity_by_name", |_span| {
|
|
use schema::entities::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
let name_lower = entity_name.to_lowercase().replace('\'', "''");
|
|
let mut sql = format!("lower(name) = '{}'", name_lower);
|
|
if let Some(et) = entity_type_filter {
|
|
sql.push_str(&format!(" AND entity_type = '{}'", et.replace('\'', "''")));
|
|
}
|
|
sql.push_str(" AND status != 'rejected'");
|
|
|
|
entities
|
|
.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&sql))
|
|
.load::<Entity>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
|
|
fn get_entities_with_embeddings(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_type_filter: Option<&str>,
|
|
) -> Result<Vec<Entity>, DbError> {
|
|
trace_db_call(cx, "query", "get_entities_with_embeddings", |_span| {
|
|
use schema::entities::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
let mut query = entities
|
|
.filter(embedding.is_not_null())
|
|
.filter(status.ne("rejected"))
|
|
.into_boxed();
|
|
|
|
if let Some(et) = entity_type_filter {
|
|
query = query.filter(entity_type.eq(et));
|
|
}
|
|
|
|
query
|
|
.load::<Entity>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
|
|
fn list_entities(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
filter: EntityFilter,
|
|
) -> Result<(Vec<Entity>, i64), DbError> {
|
|
trace_db_call(cx, "query", "list_entities", |_span| {
|
|
use diesel::dsl::count_star;
|
|
use schema::entities::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
let mut query = entities.into_boxed();
|
|
|
|
if let Some(ref et) = filter.entity_type {
|
|
query = query.filter(entity_type.eq(et));
|
|
}
|
|
|
|
let status_val = filter.status.as_deref().unwrap_or("active");
|
|
if status_val != "all" {
|
|
query = query.filter(status.eq(status_val));
|
|
}
|
|
|
|
if let Some(ref search_term) = filter.search {
|
|
let pattern = format!("%{}%", search_term);
|
|
query = query.filter(name.like(pattern.clone()).or(description.like(pattern)));
|
|
}
|
|
|
|
// Count with same filters applied (build separately since boxed query is consumed)
|
|
let mut count_query = entities.into_boxed();
|
|
if let Some(ref et) = filter.entity_type {
|
|
count_query = count_query.filter(entity_type.eq(et));
|
|
}
|
|
let status_val2 = filter.status.as_deref().unwrap_or("active");
|
|
if status_val2 != "all" {
|
|
count_query = count_query.filter(status.eq(status_val2));
|
|
}
|
|
if let Some(ref search_term) = filter.search {
|
|
let pattern = format!("%{}%", search_term);
|
|
count_query =
|
|
count_query.filter(name.like(pattern.clone()).or(description.like(pattern)));
|
|
}
|
|
let total: i64 = count_query
|
|
.select(count_star())
|
|
.first(conn.deref_mut())
|
|
.unwrap_or(0);
|
|
|
|
let results = query
|
|
.order(updated_at.desc())
|
|
.limit(filter.limit)
|
|
.offset(filter.offset)
|
|
.load::<Entity>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
Ok((results, total))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
|
|
fn list_entities_with_fact_counts(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
filter: EntityFilter,
|
|
sort: EntitySort,
|
|
persona: &PersonaFilter,
|
|
) -> Result<(Vec<(Entity, i64)>, i64), DbError> {
|
|
trace_db_call(cx, "query", "list_entities_with_fact_counts", |_span| {
|
|
use diesel::sql_query;
|
|
use diesel::sql_types::{BigInt, Integer, Text};
|
|
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
// Build WHERE fragments. Inline-safe values are bound; status
|
|
// / sort keywords are validated against fixed sets.
|
|
let mut where_parts: Vec<String> = Vec::new();
|
|
let mut bind_types: Vec<&'static str> = Vec::new();
|
|
let mut bind_strs: Vec<String> = Vec::new();
|
|
|
|
if filter.entity_type.is_some() {
|
|
where_parts.push("e.entity_type = ?".to_string());
|
|
bind_types.push("text");
|
|
bind_strs.push(filter.entity_type.clone().unwrap());
|
|
}
|
|
|
|
let status_val = filter.status.as_deref().unwrap_or("active");
|
|
if status_val != "all" {
|
|
where_parts.push("e.status = ?".to_string());
|
|
bind_types.push("text");
|
|
bind_strs.push(status_val.to_string());
|
|
}
|
|
|
|
if let Some(ref s) = filter.search {
|
|
where_parts.push("(e.name LIKE ? OR e.description LIKE ?)".to_string());
|
|
bind_types.push("text");
|
|
bind_types.push("text");
|
|
let pat = format!("%{}%", s);
|
|
bind_strs.push(pat.clone());
|
|
bind_strs.push(pat);
|
|
}
|
|
|
|
let where_clause = if where_parts.is_empty() {
|
|
String::new()
|
|
} else {
|
|
format!("WHERE {}", where_parts.join(" AND "))
|
|
};
|
|
|
|
// Persona-scoped fact-count subquery. Single = filter on
|
|
// (user_id, persona_id); All = union across the user's
|
|
// personas (mirror PersonaFilter::All read semantics).
|
|
let fact_count_join = match persona {
|
|
PersonaFilter::Single {
|
|
user_id: _,
|
|
persona_id: _,
|
|
} => {
|
|
"LEFT JOIN (\
|
|
SELECT subject_entity_id, COUNT(*) AS fact_count \
|
|
FROM entity_facts \
|
|
WHERE user_id = ? AND persona_id = ? AND status != 'rejected' \
|
|
GROUP BY subject_entity_id\
|
|
) fc ON fc.subject_entity_id = e.id"
|
|
}
|
|
PersonaFilter::All { user_id: _ } => {
|
|
"LEFT JOIN (\
|
|
SELECT subject_entity_id, COUNT(*) AS fact_count \
|
|
FROM entity_facts \
|
|
WHERE user_id = ? AND status != 'rejected' \
|
|
GROUP BY subject_entity_id\
|
|
) fc ON fc.subject_entity_id = e.id"
|
|
}
|
|
};
|
|
|
|
let order_by = match sort {
|
|
EntitySort::UpdatedDesc => "e.updated_at DESC",
|
|
EntitySort::NameAsc => "lower(e.name) ASC",
|
|
EntitySort::FactCountDesc => "COALESCE(fc.fact_count, 0) DESC, lower(e.name) ASC",
|
|
};
|
|
|
|
let select_sql = format!(
|
|
"SELECT e.id, e.name, e.entity_type, e.description, e.embedding, \
|
|
e.confidence, e.status, e.created_at, e.updated_at, \
|
|
COALESCE(fc.fact_count, 0) AS fact_count \
|
|
FROM entities e \
|
|
{fact_count_join} \
|
|
{where_clause} \
|
|
ORDER BY {order_by} \
|
|
LIMIT ? OFFSET ?"
|
|
);
|
|
|
|
let count_sql = format!("SELECT COUNT(*) AS total FROM entities e {where_clause}");
|
|
|
|
// ── Total count ─────────────────────────────────────────
|
|
#[derive(diesel::QueryableByName)]
|
|
struct TotalRow {
|
|
#[diesel(sql_type = BigInt)]
|
|
total: i64,
|
|
}
|
|
let mut count_q = sql_query(count_sql).into_boxed();
|
|
for s in &bind_strs {
|
|
count_q = count_q.bind::<Text, _>(s.clone());
|
|
}
|
|
let total: i64 = count_q
|
|
.get_result::<TotalRow>(conn.deref_mut())
|
|
.map(|r| r.total)
|
|
.unwrap_or(0);
|
|
|
|
// ── Page query ──────────────────────────────────────────
|
|
#[derive(diesel::QueryableByName)]
|
|
struct EntityWithCountRow {
|
|
#[diesel(sql_type = Integer)]
|
|
id: i32,
|
|
#[diesel(sql_type = Text)]
|
|
name: String,
|
|
#[diesel(sql_type = Text)]
|
|
entity_type: String,
|
|
#[diesel(sql_type = Text)]
|
|
description: String,
|
|
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)]
|
|
embedding: Option<Vec<u8>>,
|
|
#[diesel(sql_type = diesel::sql_types::Float)]
|
|
confidence: f32,
|
|
#[diesel(sql_type = Text)]
|
|
status: String,
|
|
#[diesel(sql_type = BigInt)]
|
|
created_at: i64,
|
|
#[diesel(sql_type = BigInt)]
|
|
updated_at: i64,
|
|
#[diesel(sql_type = BigInt)]
|
|
fact_count: i64,
|
|
}
|
|
|
|
let mut q = sql_query(select_sql).into_boxed();
|
|
// Persona binds first (they're earlier in the SQL — inside
|
|
// the subquery LEFT JOIN).
|
|
match persona {
|
|
PersonaFilter::Single {
|
|
user_id,
|
|
persona_id,
|
|
} => {
|
|
q = q
|
|
.bind::<Integer, _>(*user_id)
|
|
.bind::<Text, _>(persona_id.clone());
|
|
}
|
|
PersonaFilter::All { user_id } => {
|
|
q = q.bind::<Integer, _>(*user_id);
|
|
}
|
|
}
|
|
// Then WHERE binds in order.
|
|
for s in &bind_strs {
|
|
q = q.bind::<Text, _>(s.clone());
|
|
}
|
|
// Then LIMIT / OFFSET.
|
|
q = q
|
|
.bind::<BigInt, _>(filter.limit)
|
|
.bind::<BigInt, _>(filter.offset);
|
|
|
|
let rows: Vec<EntityWithCountRow> = q
|
|
.load(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
let pairs: Vec<(Entity, i64)> = rows
|
|
.into_iter()
|
|
.map(|r| {
|
|
(
|
|
Entity {
|
|
id: r.id,
|
|
name: r.name,
|
|
entity_type: r.entity_type,
|
|
description: r.description,
|
|
embedding: r.embedding,
|
|
confidence: r.confidence,
|
|
status: r.status,
|
|
created_at: r.created_at,
|
|
updated_at: r.updated_at,
|
|
},
|
|
r.fact_count,
|
|
)
|
|
})
|
|
.collect();
|
|
|
|
// Sink unused `_bind_types`; keeping it as documentation.
|
|
let _ = bind_types;
|
|
|
|
Ok((pairs, total))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
|
|
fn get_predicate_stats(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
persona: &PersonaFilter,
|
|
limit: usize,
|
|
) -> Result<Vec<(String, i64)>, DbError> {
|
|
trace_db_call(cx, "query", "get_predicate_stats", |_span| {
|
|
use diesel::sql_query;
|
|
use diesel::sql_types::{BigInt, Integer, Text};
|
|
|
|
// Active + reviewed only — rejected / superseded are
|
|
// already off the agent's read path so they shouldn't
|
|
// count toward "what predicates are noisy in production".
|
|
let where_sql = match persona {
|
|
PersonaFilter::Single { .. } => {
|
|
"WHERE user_id = ? AND persona_id = ? \
|
|
AND status IN ('active','reviewed')"
|
|
}
|
|
PersonaFilter::All { .. } => {
|
|
"WHERE user_id = ? AND status IN ('active','reviewed')"
|
|
}
|
|
};
|
|
let sql = format!(
|
|
"SELECT predicate, COUNT(*) AS cnt FROM entity_facts \
|
|
{where_sql} \
|
|
GROUP BY predicate \
|
|
ORDER BY cnt DESC \
|
|
LIMIT ?",
|
|
);
|
|
|
|
#[derive(diesel::QueryableByName)]
|
|
struct Row {
|
|
#[diesel(sql_type = Text)]
|
|
predicate: String,
|
|
#[diesel(sql_type = BigInt)]
|
|
cnt: i64,
|
|
}
|
|
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
let mut q = sql_query(sql).into_boxed();
|
|
match persona {
|
|
PersonaFilter::Single {
|
|
user_id,
|
|
persona_id,
|
|
} => {
|
|
q = q
|
|
.bind::<Integer, _>(*user_id)
|
|
.bind::<Text, _>(persona_id.clone());
|
|
}
|
|
PersonaFilter::All { user_id } => {
|
|
q = q.bind::<Integer, _>(*user_id);
|
|
}
|
|
}
|
|
q = q.bind::<BigInt, _>(limit as i64);
|
|
|
|
let rows: Vec<Row> = q
|
|
.load(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
Ok(rows.into_iter().map(|r| (r.predicate, r.cnt)).collect())
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
|
|
fn bulk_reject_facts_by_predicate(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
persona: &PersonaFilter,
|
|
target_predicate: &str,
|
|
audit: Option<(&str, &str)>,
|
|
) -> Result<usize, DbError> {
|
|
trace_db_call(cx, "update", "bulk_reject_facts_by_predicate", |_span| {
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
let now = chrono::Utc::now().timestamp();
|
|
let (audit_model, audit_backend) = match audit {
|
|
Some((m, b)) => (Some(m.to_string()), Some(b.to_string())),
|
|
None => (None, None),
|
|
};
|
|
|
|
// Persona scoping mirrors get_predicate_stats. Only ACTIVE
|
|
// rows flip — REVIEWED survives so the curator can preserve
|
|
// a hand-approved exception under the same predicate.
|
|
let touched = match persona {
|
|
PersonaFilter::Single {
|
|
user_id: uid,
|
|
persona_id: pid,
|
|
} => diesel::update(
|
|
entity_facts
|
|
.filter(predicate.eq(target_predicate))
|
|
.filter(user_id.eq(*uid))
|
|
.filter(persona_id.eq(pid))
|
|
.filter(status.eq("active")),
|
|
)
|
|
.set((
|
|
status.eq("rejected"),
|
|
last_modified_by_model.eq(audit_model.clone()),
|
|
last_modified_by_backend.eq(audit_backend.clone()),
|
|
last_modified_at.eq(Some(now)),
|
|
))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Bulk reject error: {}", e))?,
|
|
PersonaFilter::All { user_id: uid } => diesel::update(
|
|
entity_facts
|
|
.filter(predicate.eq(target_predicate))
|
|
.filter(user_id.eq(*uid))
|
|
.filter(status.eq("active")),
|
|
)
|
|
.set((
|
|
status.eq("rejected"),
|
|
last_modified_by_model.eq(audit_model.clone()),
|
|
last_modified_by_backend.eq(audit_backend.clone()),
|
|
last_modified_at.eq(Some(now)),
|
|
))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Bulk reject error: {}", e))?,
|
|
};
|
|
Ok(touched)
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::UpdateError, e))
|
|
}
|
|
|
|
fn build_entity_graph(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_type_filter: Option<&str>,
|
|
node_limit: usize,
|
|
persona: &PersonaFilter,
|
|
) -> Result<EntityGraph, DbError> {
|
|
trace_db_call(cx, "query", "build_entity_graph", |_span| {
|
|
use diesel::sql_query;
|
|
use diesel::sql_types::{BigInt, Integer, Text};
|
|
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
// ── Nodes: entities with non-rejected facts under the
|
|
// active scope, plus their fact count. Cap to node_limit
|
|
// by count desc so the graph stays drawable; lower-count
|
|
// entities drop. Excludes 'rejected' entity rows too.
|
|
let (persona_filter_sql, persona_binds_count) = match persona {
|
|
PersonaFilter::Single { .. } => (
|
|
"AND ef.user_id = ? AND ef.persona_id = ? AND ef.status NOT IN ('rejected','superseded')",
|
|
2,
|
|
),
|
|
PersonaFilter::All { .. } => (
|
|
"AND ef.user_id = ? AND ef.status NOT IN ('rejected','superseded')",
|
|
1,
|
|
),
|
|
};
|
|
|
|
let mut where_parts: Vec<&str> = vec!["e.status != 'rejected'"];
|
|
if entity_type_filter.is_some() {
|
|
where_parts.push("e.entity_type = ?");
|
|
}
|
|
let where_clause = format!("WHERE {}", where_parts.join(" AND "));
|
|
|
|
// SQL: join entities to their (persona-scoped) fact count,
|
|
// sort by count desc, limit. Including entities with 0
|
|
// facts would clutter the view — skip them via INNER JOIN
|
|
// (subquery on entity_facts) so only entities with at
|
|
// least one in-scope fact show up.
|
|
let node_sql = format!(
|
|
"SELECT e.id, e.name, e.entity_type, fc.fact_count \
|
|
FROM entities e \
|
|
INNER JOIN ( \
|
|
SELECT subject_entity_id AS sid, COUNT(*) AS fact_count \
|
|
FROM entity_facts ef \
|
|
WHERE 1=1 {persona_filter_sql} \
|
|
GROUP BY subject_entity_id \
|
|
) fc ON fc.sid = e.id \
|
|
{where_clause} \
|
|
ORDER BY fc.fact_count DESC, e.id ASC \
|
|
LIMIT ?",
|
|
);
|
|
|
|
#[derive(diesel::QueryableByName)]
|
|
struct NodeRow {
|
|
#[diesel(sql_type = Integer)]
|
|
id: i32,
|
|
#[diesel(sql_type = Text)]
|
|
name: String,
|
|
#[diesel(sql_type = Text)]
|
|
entity_type: String,
|
|
#[diesel(sql_type = BigInt)]
|
|
fact_count: i64,
|
|
}
|
|
|
|
let mut nq = sql_query(node_sql).into_boxed();
|
|
// Persona binds (inside the subquery — earlier in the SQL).
|
|
match persona {
|
|
PersonaFilter::Single { user_id, persona_id } => {
|
|
nq = nq
|
|
.bind::<Integer, _>(*user_id)
|
|
.bind::<Text, _>(persona_id.clone());
|
|
}
|
|
PersonaFilter::All { user_id } => {
|
|
nq = nq.bind::<Integer, _>(*user_id);
|
|
}
|
|
}
|
|
// Entity-type filter bind, if any.
|
|
if let Some(t) = entity_type_filter {
|
|
nq = nq.bind::<Text, _>(t.to_string());
|
|
}
|
|
// LIMIT.
|
|
nq = nq.bind::<BigInt, _>(node_limit as i64);
|
|
|
|
let node_rows: Vec<NodeRow> = nq
|
|
.load(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Node query error: {}", e))?;
|
|
|
|
let _ = persona_binds_count; // documentary
|
|
|
|
let node_ids: std::collections::HashSet<i32> =
|
|
node_rows.iter().map(|r| r.id).collect();
|
|
let nodes: Vec<GraphNode> = node_rows
|
|
.into_iter()
|
|
.map(|r| GraphNode {
|
|
id: r.id,
|
|
name: r.name,
|
|
entity_type: r.entity_type,
|
|
fact_count: r.fact_count,
|
|
})
|
|
.collect();
|
|
|
|
if nodes.is_empty() {
|
|
return Ok(EntityGraph {
|
|
nodes,
|
|
edges: Vec::new(),
|
|
});
|
|
}
|
|
|
|
// ── Edges: relational facts where BOTH subject and
|
|
// object survived the node cap. Grouped by (subject,
|
|
// object, predicate) so 3 "is_friend_of Bob" facts
|
|
// become one edge with count=3.
|
|
let id_list: Vec<String> = node_ids.iter().map(|i| i.to_string()).collect();
|
|
let in_clause = id_list.join(", ");
|
|
// Note: ids are i32, inlined safely; predicates use binds.
|
|
let (edge_persona_sql, _) = match persona {
|
|
PersonaFilter::Single { .. } => (
|
|
"user_id = ? AND persona_id = ? AND status NOT IN ('rejected','superseded')",
|
|
2,
|
|
),
|
|
PersonaFilter::All { .. } => (
|
|
"user_id = ? AND status NOT IN ('rejected','superseded')",
|
|
1,
|
|
),
|
|
};
|
|
let edge_sql = format!(
|
|
"SELECT subject_entity_id, object_entity_id, predicate, COUNT(*) AS cnt \
|
|
FROM entity_facts \
|
|
WHERE {edge_persona_sql} \
|
|
AND object_entity_id IS NOT NULL \
|
|
AND subject_entity_id IN ({in_clause}) \
|
|
AND object_entity_id IN ({in_clause}) \
|
|
GROUP BY subject_entity_id, object_entity_id, predicate",
|
|
);
|
|
|
|
#[derive(diesel::QueryableByName)]
|
|
struct EdgeRow {
|
|
#[diesel(sql_type = Integer)]
|
|
subject_entity_id: i32,
|
|
#[diesel(sql_type = Integer)]
|
|
object_entity_id: i32,
|
|
#[diesel(sql_type = Text)]
|
|
predicate: String,
|
|
#[diesel(sql_type = BigInt)]
|
|
cnt: i64,
|
|
}
|
|
|
|
let mut eq = sql_query(edge_sql).into_boxed();
|
|
match persona {
|
|
PersonaFilter::Single { user_id, persona_id } => {
|
|
eq = eq
|
|
.bind::<Integer, _>(*user_id)
|
|
.bind::<Text, _>(persona_id.clone());
|
|
}
|
|
PersonaFilter::All { user_id } => {
|
|
eq = eq.bind::<Integer, _>(*user_id);
|
|
}
|
|
}
|
|
let edge_rows: Vec<EdgeRow> = eq
|
|
.load(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Edge query error: {}", e))?;
|
|
|
|
let edges: Vec<GraphEdge> = edge_rows
|
|
.into_iter()
|
|
.map(|r| GraphEdge {
|
|
source: r.subject_entity_id,
|
|
target: r.object_entity_id,
|
|
predicate: r.predicate,
|
|
count: r.cnt,
|
|
})
|
|
.collect();
|
|
|
|
Ok(EntityGraph { nodes, edges })
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
|
|
fn find_consolidation_proposals(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
threshold: f32,
|
|
max_groups: usize,
|
|
) -> Result<Vec<ConsolidationGroup>, DbError> {
|
|
trace_db_call(cx, "query", "find_consolidation_proposals", |_span| {
|
|
use schema::entities::dsl::*;
|
|
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
// Pull every non-rejected entity with an embedding. We
|
|
// keep 'reviewed' rows in the scan because pre-guard
|
|
// legacy data still needs cleanup even if the curator
|
|
// marked individual entities reviewed.
|
|
let rows: Vec<Entity> = entities
|
|
.filter(embedding.is_not_null())
|
|
.filter(status.ne("rejected"))
|
|
.load::<Entity>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
// Partition by entity_type so a person can't cluster
|
|
// with a place via coincidental embedding closeness.
|
|
let mut by_type: std::collections::HashMap<String, Vec<usize>> =
|
|
std::collections::HashMap::new();
|
|
for (idx, e) in rows.iter().enumerate() {
|
|
by_type.entry(e.entity_type.clone()).or_default().push(idx);
|
|
}
|
|
|
|
// Decode embeddings once. Skip rows that don't deserialize
|
|
// cleanly (corrupted or wrong-dim) rather than failing
|
|
// the whole scan.
|
|
let mut decoded: Vec<Option<Vec<f32>>> = Vec::with_capacity(rows.len());
|
|
for e in &rows {
|
|
let v = e
|
|
.embedding
|
|
.as_ref()
|
|
.and_then(|b| Self::deserialize_embedding(b).ok())
|
|
.filter(|v| !v.is_empty());
|
|
decoded.push(v);
|
|
}
|
|
|
|
// Union-find for transitive clustering.
|
|
struct UF {
|
|
parent: Vec<usize>,
|
|
}
|
|
impl UF {
|
|
fn new(n: usize) -> Self {
|
|
UF {
|
|
parent: (0..n).collect(),
|
|
}
|
|
}
|
|
fn find(&mut self, x: usize) -> usize {
|
|
let mut r = x;
|
|
while self.parent[r] != r {
|
|
r = self.parent[r];
|
|
}
|
|
let mut cur = x;
|
|
while self.parent[cur] != r {
|
|
let nxt = self.parent[cur];
|
|
self.parent[cur] = r;
|
|
cur = nxt;
|
|
}
|
|
r
|
|
}
|
|
fn union(&mut self, a: usize, b: usize) {
|
|
let ra = self.find(a);
|
|
let rb = self.find(b);
|
|
if ra != rb {
|
|
self.parent[ra] = rb;
|
|
}
|
|
}
|
|
}
|
|
|
|
let mut uf = UF::new(rows.len());
|
|
let mut group_min: std::collections::HashMap<usize, f32> =
|
|
std::collections::HashMap::new();
|
|
let mut group_max: std::collections::HashMap<usize, f32> =
|
|
std::collections::HashMap::new();
|
|
|
|
// Single pass: union and update per-component stats in
|
|
// one go. Stats are tracked per root; final pass after
|
|
// all unions corrects roots that moved.
|
|
type Edge = (usize, usize, f32);
|
|
let mut edges: Vec<Edge> = Vec::new();
|
|
for indices in by_type.values() {
|
|
for a in 0..indices.len() {
|
|
let ia = indices[a];
|
|
let va = match &decoded[ia] {
|
|
Some(v) => v,
|
|
None => continue,
|
|
};
|
|
for &ib in &indices[a + 1..] {
|
|
let vb = match &decoded[ib] {
|
|
Some(v) => v,
|
|
None => continue,
|
|
};
|
|
let sim = Self::cosine_similarity(va, vb);
|
|
if sim >= threshold {
|
|
uf.union(ia, ib);
|
|
edges.push((ia, ib, sim));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// Second pass over the kept edges to populate stats by
|
|
// final root (post-union path compression).
|
|
for (a, _b, sim) in &edges {
|
|
let root = uf.find(*a);
|
|
let mn = group_min.entry(root).or_insert(*sim);
|
|
if *sim < *mn {
|
|
*mn = *sim;
|
|
}
|
|
let mx = group_max.entry(root).or_insert(*sim);
|
|
if *sim > *mx {
|
|
*mx = *sim;
|
|
}
|
|
}
|
|
|
|
// Bucket entities by root component, skipping singletons.
|
|
let mut groups: std::collections::HashMap<usize, Vec<usize>> =
|
|
std::collections::HashMap::new();
|
|
for i in 0..rows.len() {
|
|
let root = uf.find(i);
|
|
if !group_min.contains_key(&root) {
|
|
continue;
|
|
}
|
|
groups.entry(root).or_default().push(i);
|
|
}
|
|
|
|
let mut result: Vec<ConsolidationGroup> = groups
|
|
.into_iter()
|
|
.filter(|(_, members)| members.len() >= 2)
|
|
.map(|(root, members)| ConsolidationGroup {
|
|
entities: members.into_iter().map(|i| rows[i].clone()).collect(),
|
|
min_cosine: *group_min.get(&root).unwrap_or(&0.0),
|
|
max_cosine: *group_max.get(&root).unwrap_or(&0.0),
|
|
})
|
|
.collect();
|
|
|
|
// Biggest clusters first; tie-break on the strongest
|
|
// pair so the most-obvious dupes surface at the top.
|
|
result.sort_by(|a, b| {
|
|
b.entities.len().cmp(&a.entities.len()).then_with(|| {
|
|
b.max_cosine
|
|
.partial_cmp(&a.max_cosine)
|
|
.unwrap_or(std::cmp::Ordering::Equal)
|
|
})
|
|
});
|
|
result.truncate(max_groups);
|
|
Ok(result)
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
|
|
fn get_persona_breakdowns_for_entities(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_ids: &[i32],
|
|
user_id: i32,
|
|
) -> Result<std::collections::HashMap<i32, Vec<(String, i64)>>, DbError> {
|
|
trace_db_call(cx, "query", "get_persona_breakdowns", |_span| {
|
|
use diesel::sql_query;
|
|
use diesel::sql_types::{BigInt, Integer, Text};
|
|
|
|
if entity_ids.is_empty() {
|
|
return Ok(std::collections::HashMap::new());
|
|
}
|
|
|
|
// Build the `IN (?, ?, ?…)` placeholder list. We bind
|
|
// user_id first, then the entity ids. No real escape risk
|
|
// since the values are typed ints, but bound parameters
|
|
// are cleaner than format!() either way.
|
|
let placeholders = vec!["?"; entity_ids.len()].join(", ");
|
|
let sql = format!(
|
|
"SELECT subject_entity_id, persona_id, COUNT(*) AS cnt \
|
|
FROM entity_facts \
|
|
WHERE user_id = ? \
|
|
AND status != 'rejected' \
|
|
AND subject_entity_id IN ({}) \
|
|
GROUP BY subject_entity_id, persona_id \
|
|
ORDER BY subject_entity_id, persona_id",
|
|
placeholders
|
|
);
|
|
|
|
#[derive(diesel::QueryableByName)]
|
|
struct Row {
|
|
#[diesel(sql_type = Integer)]
|
|
subject_entity_id: i32,
|
|
#[diesel(sql_type = Text)]
|
|
persona_id: String,
|
|
#[diesel(sql_type = BigInt)]
|
|
cnt: i64,
|
|
}
|
|
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
let mut q = sql_query(sql).into_boxed();
|
|
q = q.bind::<Integer, _>(user_id);
|
|
for id in entity_ids {
|
|
q = q.bind::<Integer, _>(*id);
|
|
}
|
|
let rows: Vec<Row> = q
|
|
.load(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
let mut out: std::collections::HashMap<i32, Vec<(String, i64)>> =
|
|
std::collections::HashMap::with_capacity(entity_ids.len());
|
|
for r in rows {
|
|
out.entry(r.subject_entity_id)
|
|
.or_default()
|
|
.push((r.persona_id, r.cnt));
|
|
}
|
|
Ok(out)
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
|
|
fn update_entity_status(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id: i32,
|
|
new_status: &str,
|
|
) -> Result<(), DbError> {
|
|
trace_db_call(cx, "update", "update_entity_status", |_span| {
|
|
use schema::entities::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
diesel::update(entities.filter(id.eq(entity_id)))
|
|
.set(status.eq(new_status))
|
|
.execute(conn.deref_mut())
|
|
.map(|_| ())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::UpdateError, e))
|
|
}
|
|
|
|
fn update_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id: i32,
|
|
patch: EntityPatch,
|
|
) -> Result<Option<Entity>, DbError> {
|
|
trace_db_call(cx, "update", "update_entity", |_span| {
|
|
use schema::entities::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
let now = chrono::Utc::now().timestamp();
|
|
|
|
if let Some(ref new_name) = patch.name {
|
|
diesel::update(entities.filter(id.eq(entity_id)))
|
|
.set((name.eq(new_name), updated_at.eq(now)))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update name error: {}", e))?;
|
|
}
|
|
if let Some(ref new_desc) = patch.description {
|
|
diesel::update(entities.filter(id.eq(entity_id)))
|
|
.set((description.eq(new_desc), updated_at.eq(now)))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update description error: {}", e))?;
|
|
}
|
|
if let Some(ref new_status) = patch.status {
|
|
diesel::update(entities.filter(id.eq(entity_id)))
|
|
.set((status.eq(new_status), updated_at.eq(now)))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update status error: {}", e))?;
|
|
}
|
|
if let Some(new_confidence) = patch.confidence {
|
|
diesel::update(entities.filter(id.eq(entity_id)))
|
|
.set((confidence.eq(new_confidence), updated_at.eq(now)))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update confidence error: {}", e))?;
|
|
}
|
|
|
|
entities
|
|
.filter(id.eq(entity_id))
|
|
.first::<Entity>(conn.deref_mut())
|
|
.optional()
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::UpdateError, e))
|
|
}
|
|
|
|
fn delete_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id: i32,
|
|
) -> Result<(), DbError> {
|
|
trace_db_call(cx, "delete", "delete_entity", |_span| {
|
|
use schema::entities::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
// entity_facts has a CHECK constraint requiring
|
|
// `object_entity_id IS NOT NULL OR object_value IS NOT NULL`.
|
|
// The FK on object_entity_id is ON DELETE SET NULL — but
|
|
// facts that pointed at the deleted entity *only* via the
|
|
// entity reference (the common case for relational facts
|
|
// like "Alice is_friend_of Bob") have no object_value, so
|
|
// SET NULL would leave them with both NULLs and the CHECK
|
|
// aborts the whole DELETE. Pre-delete those facts in a
|
|
// transaction so the CASCADE / SET NULL chain on what
|
|
// remains can fire cleanly.
|
|
//
|
|
// Long-term fix is to change the FK to ON DELETE CASCADE
|
|
// via a table-rebuild migration, but the DAO-side workaround
|
|
// is sufficient and less invasive.
|
|
conn.transaction::<(), diesel::result::Error, _>(|conn| {
|
|
use schema::entity_facts::dsl as ef;
|
|
diesel::delete(
|
|
ef::entity_facts
|
|
.filter(ef::object_entity_id.eq(entity_id))
|
|
.filter(ef::object_value.is_null()),
|
|
)
|
|
.execute(conn)?;
|
|
|
|
diesel::delete(entities.filter(id.eq(entity_id))).execute(conn)?;
|
|
Ok(())
|
|
})
|
|
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))
|
|
})
|
|
.map_err(|e| {
|
|
// Surface the actual diesel error string before collapsing
|
|
// to the opaque DbErrorKind::QueryError.
|
|
log::warn!("delete_entity({}) failed: {}", entity_id, e);
|
|
DbError::new(DbErrorKind::QueryError)
|
|
})
|
|
}
|
|
|
|
fn merge_entities(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
source_id: i32,
|
|
target_id: i32,
|
|
) -> Result<(i64, i64), DbError> {
|
|
trace_db_call(cx, "update", "merge_entities", |_span| {
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
conn.transaction::<(i64, i64), diesel::result::Error, _>(|conn| {
|
|
use schema::entity_facts::dsl as ef;
|
|
|
|
// 1. Re-point facts where source is subject
|
|
let facts_updated =
|
|
diesel::update(ef::entity_facts.filter(ef::subject_entity_id.eq(source_id)))
|
|
.set(ef::subject_entity_id.eq(target_id))
|
|
.execute(conn)? as i64;
|
|
|
|
// 2. Re-point facts where source is object
|
|
diesel::update(ef::entity_facts.filter(ef::object_entity_id.eq(source_id)))
|
|
.set(ef::object_entity_id.eq(Some(target_id)))
|
|
.execute(conn)?;
|
|
|
|
// 3. Copy photo links to target (INSERT OR IGNORE to skip duplicates)
|
|
let links_updated = diesel::sql_query(
|
|
"INSERT OR IGNORE INTO entity_photo_links (entity_id, library_id, rel_path, role) \
|
|
SELECT ?, library_id, rel_path, role FROM entity_photo_links WHERE entity_id = ?",
|
|
)
|
|
.bind::<diesel::sql_types::Integer, _>(target_id)
|
|
.bind::<diesel::sql_types::Integer, _>(source_id)
|
|
.execute(conn)? as i64;
|
|
|
|
// 4. Delete source entity (FK CASCADE removes remaining facts/links)
|
|
diesel::delete(
|
|
schema::entities::dsl::entities.filter(schema::entities::dsl::id.eq(source_id)),
|
|
)
|
|
.execute(conn)?;
|
|
|
|
Ok((facts_updated, links_updated))
|
|
})
|
|
.map_err(|e| anyhow::anyhow!("Merge transaction error: {}", e))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::UpdateError, e))
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Fact operations
|
|
// -----------------------------------------------------------------------
|
|
|
|
fn upsert_fact(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
fact: InsertEntityFact,
|
|
) -> Result<(EntityFact, bool), DbError> {
|
|
trace_db_call(cx, "insert", "upsert_fact", |_span| {
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
// Look for an identical active fact AUTHORED BY THE SAME
|
|
// (USER, PERSONA). The same claim from a different persona —
|
|
// or from a different user with the same persona name — is a
|
|
// separate fact (each persona's voice/confidence is its own),
|
|
// not a confidence bump on someone else's row.
|
|
let mut dup_query = entity_facts
|
|
.filter(subject_entity_id.eq(fact.subject_entity_id))
|
|
.filter(predicate.eq(&fact.predicate))
|
|
.filter(user_id.eq(fact.user_id))
|
|
.filter(persona_id.eq(&fact.persona_id))
|
|
.filter(status.ne("rejected"))
|
|
.into_boxed();
|
|
|
|
match &fact.object_entity_id {
|
|
Some(oid) => dup_query = dup_query.filter(object_entity_id.eq(oid)),
|
|
None => dup_query = dup_query.filter(object_entity_id.is_null()),
|
|
}
|
|
match &fact.object_value {
|
|
Some(ov) => dup_query = dup_query.filter(object_value.eq(ov)),
|
|
None => dup_query = dup_query.filter(object_value.is_null()),
|
|
}
|
|
|
|
let existing: Option<EntityFact> = dup_query
|
|
.first::<EntityFact>(conn.deref_mut())
|
|
.optional()
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
if let Some(existing_fact) = existing {
|
|
// Corroborate: bump confidence by 0.1 capped at 0.95
|
|
let new_confidence = (existing_fact.confidence + 0.1).min(0.95);
|
|
diesel::update(entity_facts.filter(id.eq(existing_fact.id)))
|
|
.set(confidence.eq(new_confidence))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update confidence error: {}", e))?;
|
|
|
|
let updated = entity_facts
|
|
.filter(id.eq(existing_fact.id))
|
|
.first::<EntityFact>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
Ok((updated, false)) // false = corroborated, not newly created
|
|
} else {
|
|
diesel::insert_into(entity_facts)
|
|
.values(&fact)
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
|
|
|
|
let inserted = entity_facts
|
|
.order(id.desc())
|
|
.first::<EntityFact>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
Ok((inserted, true)) // true = newly created
|
|
}
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::InsertError, e))
|
|
}
|
|
|
|
fn get_facts_for_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id: i32,
|
|
persona: &PersonaFilter,
|
|
) -> Result<Vec<EntityFact>, DbError> {
|
|
trace_db_call(cx, "query", "get_facts_for_entity", |_span| {
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
let mut q = entity_facts
|
|
.filter(subject_entity_id.eq(entity_id))
|
|
.filter(status.ne("rejected"))
|
|
.filter(user_id.eq(persona.user_id()))
|
|
.into_boxed();
|
|
if let PersonaFilter::Single {
|
|
persona_id: pid, ..
|
|
} = persona
|
|
{
|
|
q = q.filter(persona_id.eq(pid.clone()));
|
|
}
|
|
q.load::<EntityFact>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
|
|
fn list_facts(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
filter: FactFilter,
|
|
) -> Result<(Vec<EntityFact>, i64), DbError> {
|
|
trace_db_call(cx, "query", "list_facts", |_span| {
|
|
use diesel::dsl::count_star;
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
let mut query = entity_facts.into_boxed();
|
|
let mut count_query = entity_facts.into_boxed();
|
|
|
|
// user_id always applies — facts are user-isolated.
|
|
let uid = filter.persona.user_id();
|
|
query = query.filter(user_id.eq(uid));
|
|
count_query = count_query.filter(user_id.eq(uid));
|
|
|
|
if let Some(eid) = filter.entity_id {
|
|
query = query.filter(subject_entity_id.eq(eid));
|
|
count_query = count_query.filter(subject_entity_id.eq(eid));
|
|
}
|
|
let status_val = filter.status.as_deref().unwrap_or("active");
|
|
if status_val != "all" {
|
|
query = query.filter(status.eq(status_val));
|
|
count_query = count_query.filter(status.eq(status_val));
|
|
}
|
|
if let Some(ref pred) = filter.predicate {
|
|
query = query.filter(predicate.eq(pred));
|
|
count_query = count_query.filter(predicate.eq(pred));
|
|
}
|
|
if let PersonaFilter::Single {
|
|
persona_id: ref pid,
|
|
..
|
|
} = filter.persona
|
|
{
|
|
query = query.filter(persona_id.eq(pid.clone()));
|
|
count_query = count_query.filter(persona_id.eq(pid.clone()));
|
|
}
|
|
|
|
let total: i64 = count_query
|
|
.select(count_star())
|
|
.first(conn.deref_mut())
|
|
.unwrap_or(0);
|
|
|
|
let results = query
|
|
.order(created_at.desc())
|
|
.limit(filter.limit)
|
|
.offset(filter.offset)
|
|
.load::<EntityFact>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
|
|
Ok((results, total))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
|
|
fn update_fact(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
fact_id: i32,
|
|
patch: FactPatch,
|
|
audit: Option<(&str, &str)>,
|
|
) -> Result<Option<EntityFact>, DbError> {
|
|
trace_db_call(cx, "update", "update_fact", |_span| {
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
let mut touched = false;
|
|
if let Some(ref new_predicate) = patch.predicate {
|
|
diesel::update(entity_facts.filter(id.eq(fact_id)))
|
|
.set(predicate.eq(new_predicate))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
|
touched = true;
|
|
}
|
|
if let Some(ref new_value) = patch.object_value {
|
|
diesel::update(entity_facts.filter(id.eq(fact_id)))
|
|
.set(object_value.eq(new_value))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
|
touched = true;
|
|
}
|
|
if let Some(ref new_status) = patch.status {
|
|
diesel::update(entity_facts.filter(id.eq(fact_id)))
|
|
.set(status.eq(new_status))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
|
touched = true;
|
|
}
|
|
if let Some(new_confidence) = patch.confidence {
|
|
diesel::update(entity_facts.filter(id.eq(fact_id)))
|
|
.set(confidence.eq(new_confidence))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
|
touched = true;
|
|
}
|
|
if let Some(new_from) = patch.valid_from {
|
|
diesel::update(entity_facts.filter(id.eq(fact_id)))
|
|
.set(valid_from.eq(new_from))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
|
touched = true;
|
|
}
|
|
if let Some(new_until) = patch.valid_until {
|
|
diesel::update(entity_facts.filter(id.eq(fact_id)))
|
|
.set(valid_until.eq(new_until))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
|
touched = true;
|
|
}
|
|
|
|
// Only stamp the audit columns if we actually changed
|
|
// something — empty patches stay quiet.
|
|
if touched {
|
|
let now = chrono::Utc::now().timestamp();
|
|
let (model_str, backend_str) = match audit {
|
|
Some((m, b)) => (Some(m.to_string()), Some(b.to_string())),
|
|
None => (None, None),
|
|
};
|
|
diesel::update(entity_facts.filter(id.eq(fact_id)))
|
|
.set((
|
|
last_modified_by_model.eq(model_str),
|
|
last_modified_by_backend.eq(backend_str),
|
|
last_modified_at.eq(Some(now)),
|
|
))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Audit-stamp error: {}", e))?;
|
|
}
|
|
|
|
entity_facts
|
|
.filter(id.eq(fact_id))
|
|
.first::<EntityFact>(conn.deref_mut())
|
|
.optional()
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::UpdateError, e))
|
|
}
|
|
|
|
fn update_facts_insight_id(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
photo_path: &str,
|
|
insight_id: i32,
|
|
) -> Result<(), DbError> {
|
|
trace_db_call(cx, "update", "update_facts_insight_id", |_span| {
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
diesel::update(
|
|
entity_facts
|
|
.filter(source_photo.eq(photo_path))
|
|
.filter(source_insight_id.is_null()),
|
|
)
|
|
.set(source_insight_id.eq(insight_id))
|
|
.execute(conn.deref_mut())
|
|
.map(|_| ())
|
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::UpdateError, e))
|
|
}
|
|
|
|
fn delete_fact(&mut self, cx: &opentelemetry::Context, fact_id: i32) -> Result<(), DbError> {
|
|
trace_db_call(cx, "delete", "delete_fact", |_span| {
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
// Clear dangling supersession pointers from any fact this
|
|
// one had retired — there's no FK on superseded_by (SQLite
|
|
// can't ALTER ADD with REFERENCES) so we do it manually.
|
|
// Sibling rows lose the pointer but stay 'superseded' —
|
|
// the user's historical correction survives the cleanup.
|
|
conn.transaction::<(), diesel::result::Error, _>(|conn| {
|
|
diesel::update(entity_facts.filter(superseded_by.eq(fact_id)))
|
|
.set(superseded_by.eq::<Option<i32>>(None))
|
|
.execute(conn)?;
|
|
diesel::delete(entity_facts.filter(id.eq(fact_id))).execute(conn)?;
|
|
Ok(())
|
|
})
|
|
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))
|
|
})
|
|
.map_err(|e| {
|
|
log::warn!("delete_fact({}) failed: {}", fact_id, e);
|
|
DbError::new(DbErrorKind::QueryError)
|
|
})
|
|
}
|
|
|
|
fn supersede_fact(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
old_id: i32,
|
|
new_id: i32,
|
|
audit: Option<(&str, &str)>,
|
|
) -> Result<Option<EntityFact>, DbError> {
|
|
trace_db_call(cx, "update", "supersede_fact", |_span| {
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
if old_id == new_id {
|
|
return Err(anyhow::anyhow!(
|
|
"supersede_fact: old_id and new_id must differ"
|
|
));
|
|
}
|
|
|
|
let now = chrono::Utc::now().timestamp();
|
|
let (audit_model, audit_backend) = match audit {
|
|
Some((m, b)) => (Some(m.to_string()), Some(b.to_string())),
|
|
None => (None, None),
|
|
};
|
|
|
|
conn.transaction::<Option<EntityFact>, diesel::result::Error, _>(|conn| {
|
|
// Pull the new fact's valid_from so we can close
|
|
// the old fact's interval at the same point.
|
|
let new_fact: Option<EntityFact> = entity_facts
|
|
.filter(id.eq(new_id))
|
|
.first::<EntityFact>(conn)
|
|
.optional()?;
|
|
let Some(new_fact) = new_fact else {
|
|
return Ok(None);
|
|
};
|
|
|
|
// Verify the old fact exists before touching it —
|
|
// returning None lets the handler 404 cleanly.
|
|
let old_fact: Option<EntityFact> = entity_facts
|
|
.filter(id.eq(old_id))
|
|
.first::<EntityFact>(conn)
|
|
.optional()?;
|
|
if old_fact.is_none() {
|
|
return Ok(None);
|
|
}
|
|
|
|
// Only stamp valid_until if the user hasn't
|
|
// already set it — respecting hand-curated bounds.
|
|
let target_valid_until = old_fact
|
|
.as_ref()
|
|
.and_then(|f| f.valid_until)
|
|
.or(new_fact.valid_from);
|
|
|
|
diesel::update(entity_facts.filter(id.eq(old_id)))
|
|
.set((
|
|
status.eq("superseded"),
|
|
superseded_by.eq(Some(new_id)),
|
|
valid_until.eq(target_valid_until),
|
|
last_modified_by_model.eq(audit_model.clone()),
|
|
last_modified_by_backend.eq(audit_backend.clone()),
|
|
last_modified_at.eq(Some(now)),
|
|
))
|
|
.execute(conn)?;
|
|
|
|
entity_facts
|
|
.filter(id.eq(old_id))
|
|
.first::<EntityFact>(conn)
|
|
.optional()
|
|
})
|
|
.map_err(|e| anyhow::anyhow!("Supersede error: {}", e))
|
|
})
|
|
.map_err(|e| {
|
|
log::warn!(
|
|
"supersede_fact(old={}, new={}) failed: {}",
|
|
old_id,
|
|
new_id,
|
|
e
|
|
);
|
|
DbError::new(DbErrorKind::UpdateError)
|
|
})
|
|
}
|
|
|
|
fn revert_supersession(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
fact_id: i32,
|
|
audit: Option<(&str, &str)>,
|
|
) -> Result<Option<EntityFact>, DbError> {
|
|
trace_db_call(cx, "update", "revert_supersession", |_span| {
|
|
use schema::entity_facts::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
// Verify the fact exists and was in fact superseded —
|
|
// reverting an already-active fact is a no-op and the
|
|
// handler can 404 / 409 on the None.
|
|
let existing: Option<EntityFact> = entity_facts
|
|
.filter(id.eq(fact_id))
|
|
.first::<EntityFact>(conn.deref_mut())
|
|
.optional()
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
|
|
let Some(row) = existing else {
|
|
return Ok(None);
|
|
};
|
|
if row.status != "superseded" && row.superseded_by.is_none() {
|
|
// Not superseded — nothing to revert. Returning the
|
|
// current row is friendlier than 404 here; the
|
|
// handler decides what status to return.
|
|
return Ok(Some(row));
|
|
}
|
|
|
|
let now = chrono::Utc::now().timestamp();
|
|
let (audit_model, audit_backend) = match audit {
|
|
Some((m, b)) => (Some(m.to_string()), Some(b.to_string())),
|
|
None => (None, None),
|
|
};
|
|
|
|
diesel::update(entity_facts.filter(id.eq(fact_id)))
|
|
.set((
|
|
status.eq("active"),
|
|
superseded_by.eq::<Option<i32>>(None),
|
|
// Clear the auto-stamped valid_until. If the user
|
|
// had hand-set it pre-supersede we don't have a
|
|
// way to know — accepting the loss as the cost of
|
|
// a clean revert. Curator can re-bound after.
|
|
valid_until.eq::<Option<i64>>(None),
|
|
last_modified_by_model.eq(audit_model),
|
|
last_modified_by_backend.eq(audit_backend),
|
|
last_modified_at.eq(Some(now)),
|
|
))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Revert error: {}", e))?;
|
|
|
|
entity_facts
|
|
.filter(id.eq(fact_id))
|
|
.first::<EntityFact>(conn.deref_mut())
|
|
.optional()
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|e| {
|
|
log::warn!("revert_supersession({}) failed: {}", fact_id, e);
|
|
DbError::new(DbErrorKind::UpdateError)
|
|
})
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Photo link operations
|
|
// -----------------------------------------------------------------------
|
|
|
|
fn upsert_photo_link(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
link: InsertEntityPhotoLink,
|
|
) -> Result<(), DbError> {
|
|
trace_db_call(cx, "insert", "upsert_photo_link", |_span| {
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
// INSERT OR IGNORE respects the UNIQUE(entity_id, library_id, rel_path, role) constraint
|
|
diesel::sql_query(
|
|
"INSERT OR IGNORE INTO entity_photo_links (entity_id, library_id, rel_path, role) VALUES (?, ?, ?, ?)"
|
|
)
|
|
.bind::<diesel::sql_types::Integer, _>(link.entity_id)
|
|
.bind::<diesel::sql_types::Integer, _>(link.library_id)
|
|
.bind::<diesel::sql_types::Text, _>(&link.file_path)
|
|
.bind::<diesel::sql_types::Text, _>(&link.role)
|
|
.execute(conn.deref_mut())
|
|
.map(|_| ())
|
|
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::InsertError, e))
|
|
}
|
|
|
|
fn delete_photo_links_for_file(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
file_path_val: &str,
|
|
) -> Result<(), DbError> {
|
|
trace_db_call(cx, "delete", "delete_photo_links_for_file", |_span| {
|
|
use schema::entity_photo_links::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
diesel::delete(entity_photo_links.filter(rel_path.eq(file_path_val)))
|
|
.execute(conn.deref_mut())
|
|
.map(|_| ())
|
|
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
|
|
fn get_links_for_photo(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
file_path_val: &str,
|
|
) -> Result<Vec<EntityPhotoLink>, DbError> {
|
|
trace_db_call(cx, "query", "get_links_for_photo", |_span| {
|
|
use schema::entity_photo_links::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
entity_photo_links
|
|
.filter(rel_path.eq(file_path_val))
|
|
.load::<EntityPhotoLink>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
|
|
fn get_links_for_entity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
entity_id_val: i32,
|
|
) -> Result<Vec<EntityPhotoLink>, DbError> {
|
|
trace_db_call(cx, "query", "get_links_for_entity", |_span| {
|
|
use schema::entity_photo_links::dsl::*;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
entity_photo_links
|
|
.filter(entity_id.eq(entity_id_val))
|
|
.load::<EntityPhotoLink>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Audit
|
|
// -----------------------------------------------------------------------
|
|
|
|
fn get_recent_activity(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
since: i64,
|
|
limit: i64,
|
|
persona: &PersonaFilter,
|
|
) -> Result<RecentActivity, DbError> {
|
|
trace_db_call(cx, "query", "get_recent_activity", |_span| {
|
|
use schema::entities::dsl as e;
|
|
use schema::entity_facts::dsl as ef;
|
|
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
|
|
|
// Entities are shared — recency is global.
|
|
let recent_entities = e::entities
|
|
.filter(e::created_at.gt(since))
|
|
.order(e::created_at.desc())
|
|
.limit(limit)
|
|
.load::<Entity>(conn.deref_mut())
|
|
.map_err(|err| anyhow::anyhow!("Query error: {}", err))?;
|
|
|
|
let mut facts_q = ef::entity_facts
|
|
.filter(ef::created_at.gt(since))
|
|
.filter(ef::user_id.eq(persona.user_id()))
|
|
.into_boxed();
|
|
if let PersonaFilter::Single {
|
|
persona_id: pid, ..
|
|
} = persona
|
|
{
|
|
facts_q = facts_q.filter(ef::persona_id.eq(pid.clone()));
|
|
}
|
|
let recent_facts = facts_q
|
|
.order(ef::created_at.desc())
|
|
.limit(limit)
|
|
.load::<EntityFact>(conn.deref_mut())
|
|
.map_err(|err| anyhow::anyhow!("Query error: {}", err))?;
|
|
|
|
Ok(RecentActivity {
|
|
entities: recent_entities,
|
|
facts: recent_facts,
|
|
})
|
|
})
|
|
.map_err(|e| DbError::log(DbErrorKind::QueryError, e))
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
//! Persona scoping + composite-FK invariants for entity_facts.
|
|
//!
|
|
//! These tests pin three contracts that are silently regressable:
|
|
//!
|
|
//! 1. PersonaFilter::Single isolates per (user_id, persona_id). Two
|
|
//! users with the same 'default' persona must not see each
|
|
//! other's facts (multi-user leakage was a latent bug before
|
|
//! migration 2026-05-10 added user_id + composite FK).
|
|
//!
|
|
//! 2. PersonaFilter::All scopes to a single user but unions across
|
|
//! that user's personas. Hive-mind for human browsing of
|
|
//! /knowledge/*; never crosses users.
|
|
//!
|
|
//! 3. Deleting a persona CASCADEs to the user's facts under that
|
|
//! persona — and ONLY that user's, ONLY that persona's. Other
|
|
//! users sharing the persona_id name keep their facts.
|
|
//!
|
|
//! FKs aren't enabled by default on Diesel's SQLite connection;
|
|
//! `connection_with_fks_on()` flips the pragma so the cascade
|
|
//! actually fires in tests (mirroring runtime in production).
|
|
|
|
use super::*;
|
|
use crate::database::models::{InsertEntity, InsertEntityFact, InsertPersona};
|
|
use crate::database::test::in_memory_db_connection;
|
|
use diesel::connection::SimpleConnection;
|
|
|
|
fn connection_with_fks_on() -> Arc<Mutex<SqliteConnection>> {
|
|
let mut conn = in_memory_db_connection();
|
|
conn.batch_execute("PRAGMA foreign_keys = ON;")
|
|
.expect("enable foreign_keys pragma");
|
|
Arc::new(Mutex::new(conn))
|
|
}
|
|
|
|
fn create_user(conn: &Arc<Mutex<SqliteConnection>>, username: &str) -> i32 {
|
|
use crate::database::schema::users::dsl as u;
|
|
let mut c = conn.lock().unwrap();
|
|
diesel::insert_into(u::users)
|
|
.values((u::username.eq(username), u::password.eq("x")))
|
|
.execute(c.deref_mut())
|
|
.unwrap();
|
|
u::users
|
|
.filter(u::username.eq(username))
|
|
.select(u::id)
|
|
.first(c.deref_mut())
|
|
.unwrap()
|
|
}
|
|
|
|
fn create_persona_row(conn: &Arc<Mutex<SqliteConnection>>, uid: i32, pid: &str) {
|
|
use crate::database::schema::personas::dsl as p;
|
|
let mut c = conn.lock().unwrap();
|
|
diesel::insert_into(p::personas)
|
|
.values(InsertPersona {
|
|
user_id: uid,
|
|
persona_id: pid,
|
|
name: pid,
|
|
system_prompt: "test prompt",
|
|
is_built_in: false,
|
|
include_all_memories: false,
|
|
created_at: 0,
|
|
updated_at: 0,
|
|
reviewed_only_facts: false,
|
|
allow_agent_corrections: false,
|
|
})
|
|
.execute(c.deref_mut())
|
|
.unwrap();
|
|
}
|
|
|
|
fn make_entity(dao: &mut SqliteKnowledgeDao, name: &str) -> Entity {
|
|
let cx = opentelemetry::Context::new();
|
|
dao.upsert_entity(
|
|
&cx,
|
|
InsertEntity {
|
|
name: name.to_string(),
|
|
entity_type: "person".to_string(),
|
|
description: String::new(),
|
|
embedding: None,
|
|
confidence: 0.6,
|
|
status: "active".to_string(),
|
|
created_at: 0,
|
|
updated_at: 0,
|
|
},
|
|
)
|
|
.unwrap()
|
|
}
|
|
|
|
fn add_fact(
|
|
dao: &mut SqliteKnowledgeDao,
|
|
subject: i32,
|
|
predicate: &str,
|
|
value: &str,
|
|
user_id: i32,
|
|
persona_id: &str,
|
|
) -> EntityFact {
|
|
let cx = opentelemetry::Context::new();
|
|
let (fact, _) = dao
|
|
.upsert_fact(
|
|
&cx,
|
|
InsertEntityFact {
|
|
subject_entity_id: subject,
|
|
predicate: predicate.to_string(),
|
|
object_entity_id: None,
|
|
object_value: Some(value.to_string()),
|
|
source_photo: None,
|
|
source_insight_id: None,
|
|
confidence: 0.6,
|
|
status: "active".to_string(),
|
|
created_at: 0,
|
|
persona_id: persona_id.to_string(),
|
|
user_id,
|
|
valid_from: None,
|
|
valid_until: None,
|
|
superseded_by: None,
|
|
created_by_model: None,
|
|
created_by_backend: None,
|
|
last_modified_by_model: None,
|
|
last_modified_by_backend: None,
|
|
last_modified_at: None,
|
|
},
|
|
)
|
|
.unwrap();
|
|
fact
|
|
}
|
|
|
|
#[test]
|
|
fn persona_filter_single_isolates_per_user() {
|
|
// Two users, same persona name. Each user's facts under that
|
|
// persona must NOT surface to the other user's reads — this is
|
|
// the multi-user leakage that motivated adding user_id.
|
|
let cx = opentelemetry::Context::new();
|
|
let conn = connection_with_fks_on();
|
|
let alice = create_user(&conn, "alice");
|
|
let bob = create_user(&conn, "bob");
|
|
create_persona_row(&conn, alice, "default");
|
|
create_persona_row(&conn, bob, "default");
|
|
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
let entity = make_entity(&mut dao, "Cabin");
|
|
|
|
add_fact(
|
|
&mut dao,
|
|
entity.id,
|
|
"located_in",
|
|
"Vermont",
|
|
alice,
|
|
"default",
|
|
);
|
|
add_fact(&mut dao, entity.id, "color", "red", bob, "default");
|
|
|
|
let alice_view = dao
|
|
.get_facts_for_entity(
|
|
&cx,
|
|
entity.id,
|
|
&PersonaFilter::Single {
|
|
user_id: alice,
|
|
persona_id: "default".to_string(),
|
|
},
|
|
)
|
|
.unwrap();
|
|
assert_eq!(alice_view.len(), 1);
|
|
assert_eq!(alice_view[0].predicate, "located_in");
|
|
|
|
let bob_view = dao
|
|
.get_facts_for_entity(
|
|
&cx,
|
|
entity.id,
|
|
&PersonaFilter::Single {
|
|
user_id: bob,
|
|
persona_id: "default".to_string(),
|
|
},
|
|
)
|
|
.unwrap();
|
|
assert_eq!(bob_view.len(), 1);
|
|
assert_eq!(bob_view[0].predicate, "color");
|
|
}
|
|
|
|
#[test]
|
|
fn persona_filter_all_unions_across_personas_one_user() {
|
|
// include_all_memories=true → All variant: see this user's
|
|
// facts across all their personas. Must NOT include other
|
|
// users' facts even when they share a persona name.
|
|
let cx = opentelemetry::Context::new();
|
|
let conn = connection_with_fks_on();
|
|
let alice = create_user(&conn, "alice");
|
|
let bob = create_user(&conn, "bob");
|
|
create_persona_row(&conn, alice, "default");
|
|
create_persona_row(&conn, alice, "journal");
|
|
create_persona_row(&conn, bob, "default");
|
|
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
let entity = make_entity(&mut dao, "Cabin");
|
|
|
|
add_fact(&mut dao, entity.id, "p1", "v1", alice, "default");
|
|
add_fact(&mut dao, entity.id, "p2", "v2", alice, "journal");
|
|
add_fact(&mut dao, entity.id, "p3", "v3", bob, "default");
|
|
|
|
let alice_all = dao
|
|
.get_facts_for_entity(&cx, entity.id, &PersonaFilter::All { user_id: alice })
|
|
.unwrap();
|
|
let predicates: Vec<&str> = alice_all.iter().map(|f| f.predicate.as_str()).collect();
|
|
assert_eq!(predicates.len(), 2);
|
|
assert!(predicates.contains(&"p1"));
|
|
assert!(predicates.contains(&"p2"));
|
|
assert!(
|
|
!predicates.contains(&"p3"),
|
|
"All variant must not leak across users"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn upsert_fact_dedup_does_not_cross_users() {
|
|
// Two users insert the SAME claim (same subject + predicate +
|
|
// object_value) under the same persona name. Pre-fix, the
|
|
// dedup key was (subject, predicate, persona_id) and bob's
|
|
// insert would corroborate alice's row instead of creating a
|
|
// new one. Post-fix the key includes user_id, so each user
|
|
// gets their own row at confidence=0.6.
|
|
let conn = connection_with_fks_on();
|
|
let alice = create_user(&conn, "alice");
|
|
let bob = create_user(&conn, "bob");
|
|
create_persona_row(&conn, alice, "default");
|
|
create_persona_row(&conn, bob, "default");
|
|
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
let entity = make_entity(&mut dao, "Cabin");
|
|
|
|
let alice_fact = add_fact(&mut dao, entity.id, "color", "red", alice, "default");
|
|
let bob_fact = add_fact(&mut dao, entity.id, "color", "red", bob, "default");
|
|
|
|
assert_ne!(alice_fact.id, bob_fact.id, "must be separate rows");
|
|
assert_eq!(alice_fact.confidence, 0.6);
|
|
assert_eq!(
|
|
bob_fact.confidence, 0.6,
|
|
"bob's row should not have been corroboration-bumped against alice's"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn deleting_persona_cascades_only_that_users_facts() {
|
|
// Composite FK + CASCADE: deleting alice's 'journal' persona
|
|
// wipes alice's journal facts but leaves alice's default
|
|
// facts AND bob's journal-named facts untouched.
|
|
let cx = opentelemetry::Context::new();
|
|
let conn = connection_with_fks_on();
|
|
let alice = create_user(&conn, "alice");
|
|
let bob = create_user(&conn, "bob");
|
|
create_persona_row(&conn, alice, "default");
|
|
create_persona_row(&conn, alice, "journal");
|
|
create_persona_row(&conn, bob, "journal");
|
|
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
let entity = make_entity(&mut dao, "Cabin");
|
|
|
|
add_fact(
|
|
&mut dao,
|
|
entity.id,
|
|
"p_alice_default",
|
|
"x",
|
|
alice,
|
|
"default",
|
|
);
|
|
add_fact(
|
|
&mut dao,
|
|
entity.id,
|
|
"p_alice_journal",
|
|
"y",
|
|
alice,
|
|
"journal",
|
|
);
|
|
add_fact(&mut dao, entity.id, "p_bob_journal", "z", bob, "journal");
|
|
|
|
// Delete alice's journal persona — CASCADE should remove only
|
|
// alice's journal facts.
|
|
{
|
|
use crate::database::schema::personas::dsl as p;
|
|
let mut c = conn.lock().unwrap();
|
|
diesel::delete(
|
|
p::personas
|
|
.filter(p::user_id.eq(alice))
|
|
.filter(p::persona_id.eq("journal")),
|
|
)
|
|
.execute(c.deref_mut())
|
|
.unwrap();
|
|
}
|
|
|
|
// alice/default survives.
|
|
let alice_default = dao
|
|
.get_facts_for_entity(
|
|
&cx,
|
|
entity.id,
|
|
&PersonaFilter::Single {
|
|
user_id: alice,
|
|
persona_id: "default".to_string(),
|
|
},
|
|
)
|
|
.unwrap();
|
|
assert_eq!(alice_default.len(), 1);
|
|
assert_eq!(alice_default[0].predicate, "p_alice_default");
|
|
|
|
// alice/journal is gone.
|
|
let alice_journal = dao
|
|
.get_facts_for_entity(
|
|
&cx,
|
|
entity.id,
|
|
&PersonaFilter::Single {
|
|
user_id: alice,
|
|
persona_id: "journal".to_string(),
|
|
},
|
|
)
|
|
.unwrap();
|
|
assert!(
|
|
alice_journal.is_empty(),
|
|
"CASCADE should have removed alice's journal facts"
|
|
);
|
|
|
|
// bob/journal — same persona name, different user — untouched.
|
|
let bob_journal = dao
|
|
.get_facts_for_entity(
|
|
&cx,
|
|
entity.id,
|
|
&PersonaFilter::Single {
|
|
user_id: bob,
|
|
persona_id: "journal".to_string(),
|
|
},
|
|
)
|
|
.unwrap();
|
|
assert_eq!(bob_journal.len(), 1);
|
|
assert_eq!(bob_journal[0].predicate, "p_bob_journal");
|
|
}
|
|
|
|
#[test]
|
|
fn fact_insert_with_unknown_persona_is_rejected() {
|
|
// FK enforcement: inserting a fact whose (user_id, persona_id)
|
|
// pair has no matching personas row should fail. Protects
|
|
// against typo'd persona ids silently leaking into the table.
|
|
let cx = opentelemetry::Context::new();
|
|
let conn = connection_with_fks_on();
|
|
let alice = create_user(&conn, "alice");
|
|
// Note: NO persona row inserted for alice + 'ghost'.
|
|
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
let entity = make_entity(&mut dao, "Cabin");
|
|
|
|
let result = dao.upsert_fact(
|
|
&cx,
|
|
InsertEntityFact {
|
|
subject_entity_id: entity.id,
|
|
predicate: "color".to_string(),
|
|
object_entity_id: None,
|
|
object_value: Some("red".to_string()),
|
|
source_photo: None,
|
|
source_insight_id: None,
|
|
confidence: 0.6,
|
|
status: "active".to_string(),
|
|
created_at: 0,
|
|
persona_id: "ghost".to_string(),
|
|
user_id: alice,
|
|
valid_from: None,
|
|
valid_until: None,
|
|
superseded_by: None,
|
|
created_by_model: None,
|
|
created_by_backend: None,
|
|
last_modified_by_model: None,
|
|
last_modified_by_backend: None,
|
|
last_modified_at: None,
|
|
},
|
|
);
|
|
assert!(
|
|
result.is_err(),
|
|
"FK should reject fact whose persona doesn't exist"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn supersede_fact_links_and_stamps_valid_until() {
|
|
// Supersession: marking an old fact as replaced by a new one
|
|
// flips its status to 'superseded', points superseded_by at
|
|
// the new fact, and stamps valid_until from the new fact's
|
|
// valid_from (when not already set). Pre-existing valid_until
|
|
// on the old fact is respected.
|
|
let cx = opentelemetry::Context::new();
|
|
let conn = connection_with_fks_on();
|
|
let alice = create_user(&conn, "alice");
|
|
create_persona_row(&conn, alice, "default");
|
|
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
let cameron = make_entity(&mut dao, "Cameron");
|
|
let old = add_fact(
|
|
&mut dao,
|
|
cameron.id,
|
|
"is_in_relationship_with",
|
|
"X",
|
|
alice,
|
|
"default",
|
|
);
|
|
// The new fact carries a valid_from we expect to be stamped
|
|
// onto the old fact's valid_until.
|
|
let new = add_fact(
|
|
&mut dao,
|
|
cameron.id,
|
|
"is_in_relationship_with",
|
|
"Y",
|
|
alice,
|
|
"default",
|
|
);
|
|
dao.update_fact(
|
|
&cx,
|
|
new.id,
|
|
FactPatch {
|
|
predicate: None,
|
|
object_value: None,
|
|
status: None,
|
|
confidence: None,
|
|
valid_from: Some(Some(1640995200)), // 2022-01-01
|
|
valid_until: None,
|
|
},
|
|
None,
|
|
)
|
|
.unwrap();
|
|
|
|
let updated = dao
|
|
.supersede_fact(&cx, old.id, new.id, None)
|
|
.unwrap()
|
|
.expect("supersede returned None");
|
|
|
|
assert_eq!(updated.status, "superseded");
|
|
assert_eq!(updated.superseded_by, Some(new.id));
|
|
assert_eq!(updated.valid_until, Some(1640995200));
|
|
}
|
|
|
|
#[test]
|
|
fn delete_fact_clears_dangling_supersession_pointers() {
|
|
// Deleting the newer fact (the supersedeR) leaves the older
|
|
// fact's superseded_by dangling — the DAO clears it back to
|
|
// NULL in the same transaction so the column never points at
|
|
// a missing row. The old fact's status stays 'superseded'
|
|
// because the historical correction is still meaningful.
|
|
let cx = opentelemetry::Context::new();
|
|
let conn = connection_with_fks_on();
|
|
let alice = create_user(&conn, "alice");
|
|
create_persona_row(&conn, alice, "default");
|
|
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
let cameron = make_entity(&mut dao, "Cameron");
|
|
let old = add_fact(&mut dao, cameron.id, "lives_in", "NYC", alice, "default");
|
|
let new = add_fact(&mut dao, cameron.id, "lives_in", "SF", alice, "default");
|
|
|
|
dao.supersede_fact(&cx, old.id, new.id, None)
|
|
.unwrap()
|
|
.unwrap();
|
|
dao.delete_fact(&cx, new.id).unwrap();
|
|
|
|
let rehydrated = dao
|
|
.list_facts(
|
|
&cx,
|
|
FactFilter {
|
|
entity_id: Some(cameron.id),
|
|
// "all" — the old fact is 'superseded' now, so the
|
|
// default 'active' scope would skip it.
|
|
status: Some("all".to_string()),
|
|
predicate: None,
|
|
persona: PersonaFilter::Single {
|
|
user_id: alice,
|
|
persona_id: "default".to_string(),
|
|
},
|
|
limit: 10,
|
|
offset: 0,
|
|
},
|
|
)
|
|
.unwrap()
|
|
.0;
|
|
let old_row = rehydrated.iter().find(|f| f.id == old.id).unwrap();
|
|
assert_eq!(
|
|
old_row.superseded_by, None,
|
|
"dangling supersession pointer should be cleared"
|
|
);
|
|
assert_eq!(
|
|
old_row.status, "superseded",
|
|
"historical status should survive the supersederr delete"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn update_fact_can_set_and_clear_valid_time() {
|
|
// FactPatch.valid_from / valid_until are Option<Option<i64>>
|
|
// so PATCH can distinguish "leave alone" (None) from "set to
|
|
// value" (Some(Some(n))) and "clear back to NULL" (Some(None)).
|
|
let cx = opentelemetry::Context::new();
|
|
let conn = connection_with_fks_on();
|
|
let alice = create_user(&conn, "alice");
|
|
create_persona_row(&conn, alice, "default");
|
|
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
let cameron = make_entity(&mut dao, "Cameron");
|
|
let fact = add_fact(
|
|
&mut dao,
|
|
cameron.id,
|
|
"is_in_relationship_with",
|
|
"Alex",
|
|
alice,
|
|
"default",
|
|
);
|
|
assert_eq!(fact.valid_from, None);
|
|
assert_eq!(fact.valid_until, None);
|
|
|
|
// Set both bounds.
|
|
let updated = dao
|
|
.update_fact(
|
|
&cx,
|
|
fact.id,
|
|
FactPatch {
|
|
predicate: None,
|
|
object_value: None,
|
|
status: None,
|
|
confidence: None,
|
|
valid_from: Some(Some(1577836800)), // 2020-01-01
|
|
valid_until: Some(Some(1640995200)), // 2022-01-01
|
|
},
|
|
None,
|
|
)
|
|
.unwrap()
|
|
.unwrap();
|
|
assert_eq!(updated.valid_from, Some(1577836800));
|
|
assert_eq!(updated.valid_until, Some(1640995200));
|
|
|
|
// Leave alone: omit both — values persist.
|
|
let still = dao
|
|
.update_fact(
|
|
&cx,
|
|
fact.id,
|
|
FactPatch {
|
|
predicate: None,
|
|
object_value: None,
|
|
status: None,
|
|
confidence: None,
|
|
valid_from: None,
|
|
valid_until: None,
|
|
},
|
|
None,
|
|
)
|
|
.unwrap()
|
|
.unwrap();
|
|
assert_eq!(still.valid_from, Some(1577836800));
|
|
assert_eq!(still.valid_until, Some(1640995200));
|
|
|
|
// Clear valid_until back to NULL (relationship ongoing again).
|
|
let cleared = dao
|
|
.update_fact(
|
|
&cx,
|
|
fact.id,
|
|
FactPatch {
|
|
predicate: None,
|
|
object_value: None,
|
|
status: None,
|
|
confidence: None,
|
|
valid_from: None,
|
|
valid_until: Some(None),
|
|
},
|
|
None,
|
|
)
|
|
.unwrap()
|
|
.unwrap();
|
|
assert_eq!(cleared.valid_from, Some(1577836800));
|
|
assert_eq!(cleared.valid_until, None);
|
|
}
|
|
|
|
#[test]
|
|
fn delete_entity_clears_relational_facts_that_would_violate_check() {
|
|
// entity_facts has a CHECK that at least one of object_entity_id /
|
|
// object_value is non-null. The FK on object_entity_id is
|
|
// ON DELETE SET NULL, which would leave purely-relational facts
|
|
// (subject + predicate + object_entity_id, no object_value)
|
|
// with both nulls and abort the delete. The DAO pre-deletes
|
|
// those rows in a transaction so the parent delete can succeed.
|
|
let cx = opentelemetry::Context::new();
|
|
let conn = connection_with_fks_on();
|
|
let alice = create_user(&conn, "alice");
|
|
create_persona_row(&conn, alice, "default");
|
|
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
let bob = make_entity(&mut dao, "Bob");
|
|
let carol = make_entity(&mut dao, "Carol");
|
|
|
|
// A relational fact where Carol is the object — exactly the
|
|
// shape the CHECK + SET NULL combination would otherwise break.
|
|
let (rel_fact, _) = dao
|
|
.upsert_fact(
|
|
&cx,
|
|
InsertEntityFact {
|
|
subject_entity_id: bob.id,
|
|
predicate: "is_friend_of".to_string(),
|
|
object_entity_id: Some(carol.id),
|
|
object_value: None,
|
|
source_photo: None,
|
|
source_insight_id: None,
|
|
confidence: 0.6,
|
|
status: "active".to_string(),
|
|
created_at: 0,
|
|
persona_id: "default".to_string(),
|
|
user_id: alice,
|
|
valid_from: None,
|
|
valid_until: None,
|
|
superseded_by: None,
|
|
created_by_model: None,
|
|
created_by_backend: None,
|
|
last_modified_by_model: None,
|
|
last_modified_by_backend: None,
|
|
last_modified_at: None,
|
|
},
|
|
)
|
|
.unwrap();
|
|
|
|
// A typed fact where Bob is the subject — should survive.
|
|
add_fact(&mut dao, bob.id, "has_age", "30", alice, "default");
|
|
|
|
// Delete Carol — should succeed (relational fact pre-deleted).
|
|
dao.delete_entity(&cx, carol.id).unwrap();
|
|
|
|
assert!(
|
|
dao.get_entity_by_id(&cx, carol.id).unwrap().is_none(),
|
|
"Carol should be deleted"
|
|
);
|
|
// The relational fact about Carol should be gone (pre-deleted by
|
|
// the DAO's transaction, not SET NULL'd).
|
|
let bob_facts = dao
|
|
.get_facts_for_entity(
|
|
&cx,
|
|
bob.id,
|
|
&PersonaFilter::Single {
|
|
user_id: alice,
|
|
persona_id: "default".to_string(),
|
|
},
|
|
)
|
|
.unwrap();
|
|
assert!(
|
|
!bob_facts.iter().any(|f| f.id == rel_fact.id),
|
|
"relational fact pointing at Carol should be removed"
|
|
);
|
|
// The typed fact survives.
|
|
assert!(
|
|
bob_facts.iter().any(|f| f.predicate == "has_age"),
|
|
"typed fact about Bob should survive Carol's deletion"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn upsert_entity_collapses_near_duplicate_by_embedding() {
|
|
// The agent's pre-flight check uses FTS5 prefix tokens, which
|
|
// miss "Sarah" / "Sara" / "Sarah J." pairs. The DAO upsert is
|
|
// the safety net: if no exact (name, type) match but the new
|
|
// entity's embedding sits above the cosine threshold against an
|
|
// existing same-type entity, we collapse instead of inserting.
|
|
let cx = opentelemetry::Context::new();
|
|
let conn = connection_with_fks_on();
|
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
|
|
|
let mut emb_a = vec![0.0_f32; 64];
|
|
emb_a[0] = 1.0;
|
|
emb_a[1] = 0.5;
|
|
let mut emb_b_near = emb_a.clone();
|
|
emb_b_near[2] = 0.05; // nudge — cosine still well above 0.92
|
|
|
|
// Seed an existing entity with the embedding.
|
|
let seeded = dao
|
|
.upsert_entity(
|
|
&cx,
|
|
InsertEntity {
|
|
name: "Sarah".to_string(),
|
|
entity_type: "person".to_string(),
|
|
description: "tagged friend".to_string(),
|
|
embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_a)),
|
|
confidence: 0.6,
|
|
status: "active".to_string(),
|
|
created_at: 0,
|
|
updated_at: 0,
|
|
},
|
|
)
|
|
.unwrap();
|
|
|
|
// A "different name" with a near-identical embedding should
|
|
// collapse onto the existing row, not create a new entity.
|
|
let collapsed = dao
|
|
.upsert_entity(
|
|
&cx,
|
|
InsertEntity {
|
|
name: "Sara".to_string(),
|
|
entity_type: "person".to_string(),
|
|
description: "tagged friend".to_string(),
|
|
embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_b_near)),
|
|
confidence: 0.6,
|
|
status: "active".to_string(),
|
|
created_at: 0,
|
|
updated_at: 0,
|
|
},
|
|
)
|
|
.unwrap();
|
|
|
|
assert_eq!(
|
|
collapsed.id, seeded.id,
|
|
"near-duplicate by cosine should reuse the existing entity id"
|
|
);
|
|
|
|
// And a clearly-different embedding under a different name should
|
|
// still create a new row.
|
|
let mut emb_unrelated = vec![0.0_f32; 64];
|
|
emb_unrelated[10] = 1.0;
|
|
let distinct = dao
|
|
.upsert_entity(
|
|
&cx,
|
|
InsertEntity {
|
|
name: "Bob".to_string(),
|
|
entity_type: "person".to_string(),
|
|
description: String::new(),
|
|
embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_unrelated)),
|
|
confidence: 0.6,
|
|
status: "active".to_string(),
|
|
created_at: 0,
|
|
updated_at: 0,
|
|
},
|
|
)
|
|
.unwrap();
|
|
|
|
assert_ne!(
|
|
distinct.id, seeded.id,
|
|
"unrelated embedding should not collapse"
|
|
);
|
|
}
|
|
}
|