diff --git a/migrations/2026-05-10-000100_entity_facts_valid_time/down.sql b/migrations/2026-05-10-000100_entity_facts_valid_time/down.sql new file mode 100644 index 0000000..63bc68d --- /dev/null +++ b/migrations/2026-05-10-000100_entity_facts_valid_time/down.sql @@ -0,0 +1,5 @@ +-- SQLite can drop columns since 3.35 (March 2021); embedded +-- libsqlite3-sys is well past that. Drop in reverse insert order so +-- a partial down still leaves the schema valid. +ALTER TABLE entity_facts DROP COLUMN valid_until; +ALTER TABLE entity_facts DROP COLUMN valid_from; diff --git a/migrations/2026-05-10-000100_entity_facts_valid_time/up.sql b/migrations/2026-05-10-000100_entity_facts_valid_time/up.sql new file mode 100644 index 0000000..4f0bb01 --- /dev/null +++ b/migrations/2026-05-10-000100_entity_facts_valid_time/up.sql @@ -0,0 +1,25 @@ +-- Add valid-time columns to entity_facts. +-- +-- entity_facts already has created_at — *transaction time*, the +-- moment WE recorded the fact. That's not the same as the real-world +-- period the fact was true. "Cameron is_in_relationship_with X" was +-- only true during a window; recording it in 2026 doesn't make it +-- true today. Without the distinction, every former relationship, +-- former job, former address reads as currently-true. +-- +-- Adding two BIGINT NULL columns: valid_from / valid_until (unix +-- seconds). NULL means "unbounded on that side" — `valid_from IS +-- NULL` reads as "always-true-back-to-the-beginning", +-- `valid_until IS NULL` as "still-true-now-or-unknown". Both NULL = +-- temporal validity unknown (current state of all legacy rows). +-- +-- Conflict detection refines accordingly: same-predicate facts with +-- different objects stop flagging when their intervals are disjoint +-- ("lives_in NYC 2018-2020" and "lives_in SF 2020-present" are both +-- valid, just at different times). + +ALTER TABLE entity_facts ADD COLUMN valid_from BIGINT; +ALTER TABLE entity_facts ADD COLUMN valid_until BIGINT; + +-- Optional partial index for time-bounded scans. Skipped for now — +-- conflict detection runs per-entity (small N) and doesn't need it. diff --git a/migrations/2026-05-10-000200_entity_facts_superseded_by/down.sql b/migrations/2026-05-10-000200_entity_facts_superseded_by/down.sql new file mode 100644 index 0000000..1e09629 --- /dev/null +++ b/migrations/2026-05-10-000200_entity_facts_superseded_by/down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS idx_entity_facts_superseded_by; +ALTER TABLE entity_facts DROP COLUMN superseded_by; diff --git a/migrations/2026-05-10-000200_entity_facts_superseded_by/up.sql b/migrations/2026-05-10-000200_entity_facts_superseded_by/up.sql new file mode 100644 index 0000000..be04ae0 --- /dev/null +++ b/migrations/2026-05-10-000200_entity_facts_superseded_by/up.sql @@ -0,0 +1,31 @@ +-- Add a supersession pointer to entity_facts. +-- +-- Status alone is a one-way trapdoor: 'rejected' loses the link +-- between the rejected fact and the one that replaced it. For +-- evolving facts (Cameron's relationship, employer, address) the +-- curator wants to *replace* a stale fact with a new one and keep +-- the history readable: "from 2018 until 2022 this was true, then +-- it became this other thing". +-- +-- A nullable INTEGER column pointing at another entity_facts.id — +-- no FK constraint because SQLite can't ALTER ADD COLUMN with REFs; +-- the DAO's delete_fact clears dangling pointers in the same +-- transaction as the parent delete to keep the column honest. +-- +-- A status of 'superseded' on the old fact (alongside the existing +-- active / reviewed / rejected) signals "replaced by a newer +-- claim". Read paths already filter 'rejected' out of the active +-- view; the curation UI will treat 'superseded' the same way for +-- conflict detection so they don't keep flagging. +-- +-- Pairs with the valid-time columns from 2026-05-10-000100: the +-- supersede action auto-stamps the old fact's `valid_until` from +-- the new fact's `valid_from`, closing the interval cleanly. + +ALTER TABLE entity_facts ADD COLUMN superseded_by INTEGER; + +-- Helpful index for "show me what superseded this fact" walks +-- (rare today; cheap to add now while the table is small). +CREATE INDEX idx_entity_facts_superseded_by + ON entity_facts(superseded_by) + WHERE superseded_by IS NOT NULL; diff --git a/migrations/2026-05-10-000300_entity_facts_provenance/down.sql b/migrations/2026-05-10-000300_entity_facts_provenance/down.sql new file mode 100644 index 0000000..b0ab263 --- /dev/null +++ b/migrations/2026-05-10-000300_entity_facts_provenance/down.sql @@ -0,0 +1,4 @@ +DROP INDEX IF EXISTS idx_entity_facts_created_by_backend; +DROP INDEX IF EXISTS idx_entity_facts_created_by_model; +ALTER TABLE entity_facts DROP COLUMN created_by_backend; +ALTER TABLE entity_facts DROP COLUMN created_by_model; diff --git a/migrations/2026-05-10-000300_entity_facts_provenance/up.sql b/migrations/2026-05-10-000300_entity_facts_provenance/up.sql new file mode 100644 index 0000000..41db18b --- /dev/null +++ b/migrations/2026-05-10-000300_entity_facts_provenance/up.sql @@ -0,0 +1,30 @@ +-- Track which model + backend generated each fact so the curator +-- can audit which configurations produce trustworthy knowledge. +-- +-- photo_insights already carries `model_version` + `backend`, and +-- entity_facts.source_insight_id links to it — but: +-- 1. source_insight_id is only set after an insight is stored +-- (post-loop), so chat-continuation facts and facts whose insight +-- was regenerated lose the link. +-- 2. JOINing for every read is more friction than just embedding the +-- provenance on the fact row itself. +-- 3. Manual facts (POST /knowledge/facts) have no insight at all and +-- need to record "manual" as their provenance. +-- +-- Two nullable TEXT columns are enough for the audit use case: model +-- (e.g. "qwen2.5:7b", "anthropic/claude-sonnet-4") and backend +-- ("local", "hybrid", "manual"). Pre-existing rows leave both NULL — +-- legacy facts predate this tracking and can't be back-filled +-- reliably from training_messages without burning compute. + +ALTER TABLE entity_facts ADD COLUMN created_by_model TEXT; +ALTER TABLE entity_facts ADD COLUMN created_by_backend TEXT; + +-- Indexes are cheap and useful for "show me all facts from model X" +-- audit queries — partial so the legacy NULL rows don't bloat them. +CREATE INDEX idx_entity_facts_created_by_model + ON entity_facts(created_by_model) + WHERE created_by_model IS NOT NULL; +CREATE INDEX idx_entity_facts_created_by_backend + ON entity_facts(created_by_backend) + WHERE created_by_backend IS NOT NULL; diff --git a/migrations/2026-05-10-000400_personas_reviewed_only/down.sql b/migrations/2026-05-10-000400_personas_reviewed_only/down.sql new file mode 100644 index 0000000..46af3cf --- /dev/null +++ b/migrations/2026-05-10-000400_personas_reviewed_only/down.sql @@ -0,0 +1 @@ +ALTER TABLE personas DROP COLUMN reviewed_only_facts; diff --git a/migrations/2026-05-10-000400_personas_reviewed_only/up.sql b/migrations/2026-05-10-000400_personas_reviewed_only/up.sql new file mode 100644 index 0000000..0d8302e --- /dev/null +++ b/migrations/2026-05-10-000400_personas_reviewed_only/up.sql @@ -0,0 +1,16 @@ +-- Per-persona toggle: when true, agent reads only see facts whose +-- status is exactly 'reviewed' (human-verified). When false (the +-- default), agent reads see 'active' OR 'reviewed' — everything not +-- rejected or superseded. +-- +-- The mobile app surfaces this as "Strict mode" on the persona +-- editor: useful when you want a persona's chat to be grounded +-- exclusively on the curated subset, e.g. for tasks where +-- hallucinated agent claims are particularly costly. +-- +-- Note: this is separate from `include_all_memories` (which unions +-- across personas for hive-mind reads). Reviewed-only operates on +-- the status axis; include_all_memories operates on the persona- +-- scope axis. They compose freely. + +ALTER TABLE personas ADD COLUMN reviewed_only_facts BOOLEAN NOT NULL DEFAULT 0; diff --git a/migrations/2026-05-10-000500_entity_facts_audit_and_agent_gate/down.sql b/migrations/2026-05-10-000500_entity_facts_audit_and_agent_gate/down.sql new file mode 100644 index 0000000..0d83de6 --- /dev/null +++ b/migrations/2026-05-10-000500_entity_facts_audit_and_agent_gate/down.sql @@ -0,0 +1,5 @@ +ALTER TABLE personas DROP COLUMN allow_agent_corrections; +DROP INDEX IF EXISTS idx_entity_facts_last_modified_at; +ALTER TABLE entity_facts DROP COLUMN last_modified_at; +ALTER TABLE entity_facts DROP COLUMN last_modified_by_backend; +ALTER TABLE entity_facts DROP COLUMN last_modified_by_model; diff --git a/migrations/2026-05-10-000500_entity_facts_audit_and_agent_gate/up.sql b/migrations/2026-05-10-000500_entity_facts_audit_and_agent_gate/up.sql new file mode 100644 index 0000000..1951131 --- /dev/null +++ b/migrations/2026-05-10-000500_entity_facts_audit_and_agent_gate/up.sql @@ -0,0 +1,30 @@ +-- Three coupled changes for agent self-correction safety: +-- +-- 1. `entity_facts.last_modified_by_*` + `last_modified_at` track who +-- most recently mutated each fact. `created_by_*` from migration +-- 2026-05-10-000300 records who first wrote the row; this records +-- who last *changed* it. Separate columns so the create vs update +-- audit is independently grep-able ("show me every fact gpt-5 +-- altered last week" stays a single index scan). +-- +-- 2. `personas.allow_agent_corrections` is the gate for the new +-- agent-side `update_fact` / `supersede_fact` tools. Default OFF — +-- a fresh persona's agent can create but can't alter or replace. +-- Operator opts in per-persona after the model has earned trust, +-- typically via the strict-mode flow (curate, then ratchet up +-- agent autonomy as confidence rises). Parallel in shape to +-- `reviewed_only_facts` from 2026-05-10-000400; they compose. +-- +-- 3. Index on `last_modified_at` (partial, NOT NULL) for the +-- audit-feed reads in the curation UI ("show recent agent edits +-- sorted newest first"). + +ALTER TABLE entity_facts ADD COLUMN last_modified_by_model TEXT; +ALTER TABLE entity_facts ADD COLUMN last_modified_by_backend TEXT; +ALTER TABLE entity_facts ADD COLUMN last_modified_at BIGINT; + +CREATE INDEX idx_entity_facts_last_modified_at + ON entity_facts(last_modified_at) + WHERE last_modified_at IS NOT NULL; + +ALTER TABLE personas ADD COLUMN allow_agent_corrections BOOLEAN NOT NULL DEFAULT 0; diff --git a/migrations/2026-05-11-000000_normalize_entity_types/down.sql b/migrations/2026-05-11-000000_normalize_entity_types/down.sql new file mode 100644 index 0000000..4c1a2f2 --- /dev/null +++ b/migrations/2026-05-11-000000_normalize_entity_types/down.sql @@ -0,0 +1,6 @@ +-- Irreversible: we collapsed multiple raw entity_type strings to +-- canonical forms and don't have a per-row record of the original. +-- The down migration is intentionally a no-op (the rewritten values +-- are still semantically correct), and the up migration is safe to +-- re-run because every UPDATE is conditional on `!= canonical`. +SELECT 1; diff --git a/migrations/2026-05-11-000000_normalize_entity_types/up.sql b/migrations/2026-05-11-000000_normalize_entity_types/up.sql new file mode 100644 index 0000000..def6ab4 --- /dev/null +++ b/migrations/2026-05-11-000000_normalize_entity_types/up.sql @@ -0,0 +1,43 @@ +-- Canonicalize `entities.entity_type` so legacy rows from before +-- `normalize_entity_type` landed in upsert_entity stop polluting +-- client-side filters. Mirrors the synonym map in +-- `src/database/knowledge_dao.rs::normalize_entity_type`: +-- person ← person | people | human | individual | contact +-- place ← place | location | venue | site | area | landmark +-- event ← event | occasion | activity | celebration +-- thing ← thing | object | item | product +-- Types outside the synonym set (e.g. "friend", "family") are not +-- recognized as canonical and get a lowercase+trim pass instead, so +-- at minimum case variants collapse. +-- +-- `UPDATE OR IGNORE` skips rows that would violate UNIQUE(name, +-- entity_type) after the rewrite. Two rows like ("Sarah", "person") +-- + ("Sarah", "Person") would otherwise collide — the duplicate +-- survives unchanged so the curator can merge it via the curation +-- UI rather than have the migration silently delete data. + +UPDATE OR IGNORE entities +SET entity_type = 'person' +WHERE LOWER(TRIM(entity_type)) IN ('person', 'people', 'human', 'individual', 'contact') + AND entity_type != 'person'; + +UPDATE OR IGNORE entities +SET entity_type = 'place' +WHERE LOWER(TRIM(entity_type)) IN ('place', 'location', 'venue', 'site', 'area', 'landmark') + AND entity_type != 'place'; + +UPDATE OR IGNORE entities +SET entity_type = 'event' +WHERE LOWER(TRIM(entity_type)) IN ('event', 'occasion', 'activity', 'celebration') + AND entity_type != 'event'; + +UPDATE OR IGNORE entities +SET entity_type = 'thing' +WHERE LOWER(TRIM(entity_type)) IN ('thing', 'object', 'item', 'product') + AND entity_type != 'thing'; + +-- Anything left ("Friend" vs "friend") gets a lowercase+trim sweep +-- so at least case variants of the same custom type collapse. +UPDATE OR IGNORE entities +SET entity_type = LOWER(TRIM(entity_type)) +WHERE entity_type != LOWER(TRIM(entity_type)); diff --git a/src/ai/handlers.rs b/src/ai/handlers.rs index f6189ba..0e46057 100644 --- a/src/ai/handlers.rs +++ b/src/ai/handlers.rs @@ -885,10 +885,7 @@ pub async fn chat_history_handler( .flatten() .unwrap_or_else(|| app_state.primary_library()); - match app_state - .insight_chat - .load_history(library.id, &query.path) - { + match app_state.insight_chat.load_history(library.id, &query.path) { Ok(view) => HttpResponse::Ok().json(ChatHistoryHttpResponse { messages: view .messages diff --git a/src/ai/insight_chat.rs b/src/ai/insight_chat.rs index 5a0e3f9..adb2da4 100644 --- a/src/ai/insight_chat.rs +++ b/src/ai/insight_chat.rs @@ -405,7 +405,10 @@ impl InsightChatService { // and probes the per-table presence flags. Pass `offer_describe_tool` // directly — the `!is_hybrid && local_first_user_has_image` decision // is the chat-path's vision predicate. - let gate_opts = self.generator.current_gate_opts(offer_describe_tool); + let gate_opts = self.generator.current_gate_opts_for_persona( + offer_describe_tool, + Some((req.user_id, &active_persona)), + ); let tools = InsightGenerator::build_tool_definitions(gate_opts); // Image base64 only needed when describe_photo is on the menu. Load @@ -497,6 +500,8 @@ impl InsightChatService { &normalized, req.user_id, &active_persona, + &model_used, + &effective_backend, &loop_cx, ) .await; @@ -835,7 +840,10 @@ impl InsightChatService { .map(|imgs| !imgs.is_empty()) .unwrap_or(false); let offer_describe_tool = !is_hybrid && local_first_user_has_image; - let gate_opts = self.generator.current_gate_opts(offer_describe_tool); + let gate_opts = self.generator.current_gate_opts_for_persona( + offer_describe_tool, + Some((req.user_id, &active_persona)), + ); let tools = InsightGenerator::build_tool_definitions(gate_opts); let image_base64: Option = if offer_describe_tool { @@ -870,6 +878,8 @@ impl InsightChatService { &normalized, req.user_id, &active_persona, + &model_used, + &effective_backend, max_iterations, &tx, ) @@ -1022,7 +1032,10 @@ impl InsightChatService { // the chat model can re-look at the photo on demand. Hybrid: // already inlined, no tool needed. let offer_describe_tool = !is_hybrid && image_base64.is_some(); - let gate_opts = self.generator.current_gate_opts(offer_describe_tool); + let gate_opts = self.generator.current_gate_opts_for_persona( + offer_describe_tool, + Some((req.user_id, &active_persona)), + ); let tools = InsightGenerator::build_tool_definitions(gate_opts); // System message = persona + photo context block. Photo context @@ -1059,6 +1072,8 @@ impl InsightChatService { &normalized, req.user_id, &active_persona, + &model_used, + &effective_backend, max_iterations, &tx, ) @@ -1210,6 +1225,10 @@ impl InsightChatService { normalized: &str, user_id: i32, active_persona: &str, + // Provenance — stamped onto any store_fact tool call made + // during this loop. Mirrors the non-streaming chat path. + model_used: &str, + effective_backend: &str, max_iterations: usize, tx: &tokio::sync::mpsc::Sender, ) -> Result { @@ -1290,6 +1309,8 @@ impl InsightChatService { normalized, user_id, active_persona, + model_used, + effective_backend, &cx, ) .await; diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index 1e39f9a..3617261 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -13,7 +13,7 @@ use crate::ai::apollo_client::{ApolloClient, ApolloPlace}; use crate::ai::llm_client::LlmClient; use crate::ai::ollama::{ChatMessage, OllamaClient, Tool}; use crate::ai::openrouter::OpenRouterClient; -use crate::ai::sms_client::SmsApiClient; +use crate::ai::sms_client::{SmsApiClient, SmsSearchHit, SmsSearchParams}; use crate::ai::user_display_name; use crate::database::models::InsertPhotoInsight; use crate::database::{ @@ -89,6 +89,10 @@ pub struct InsightGenerator { // Knowledge memory knowledge_dao: Arc>>, + // Persona settings (looked up by recall_facts_for_photo to honour + // the per-persona "reviewed-only" strict-mode toggle). + persona_dao: Arc>>, + libraries: Vec, } @@ -104,6 +108,12 @@ pub struct ToolGateOpts { pub calendar_present: bool, pub location_history_present: bool, pub faces_present: bool, + /// Per-persona toggle from migration 2026-05-10-000500. When + /// false the agent's update_fact / supersede_fact tools aren't + /// in the catalog at all — defense-in-depth so a hallucinated + /// tool name still 404s, and the agent doesn't waste iterations + /// trying corrections it isn't allowed to do. + pub allow_agent_corrections: bool, } impl InsightGenerator { @@ -121,6 +131,7 @@ impl InsightGenerator { tag_dao: Arc>>, face_dao: Arc>>, knowledge_dao: Arc>>, + persona_dao: Arc>>, libraries: Vec, ) -> Self { Self { @@ -137,6 +148,7 @@ impl InsightGenerator { tag_dao, face_dao, knowledge_dao, + persona_dao, libraries, } } @@ -158,6 +170,20 @@ impl InsightGenerator { /// supplied by the caller because it depends on the model selected /// for this turn, not on persistent state. pub fn current_gate_opts(&self, has_vision: bool) -> ToolGateOpts { + self.current_gate_opts_for_persona(has_vision, None) + } + + /// Same as `current_gate_opts` but resolves the per-persona + /// `allow_agent_corrections` flag too. Pass `Some((user_id, + /// persona_id))` when generating in a persona context (every chat + /// turn does); pass `None` for callers that don't have one yet + /// (cold paths, populate_knowledge bin), which defaults the gate + /// to closed — the conservative posture. + pub fn current_gate_opts_for_persona( + &self, + has_vision: bool, + persona: Option<(i32, &str)>, + ) -> ToolGateOpts { let cx = opentelemetry::Context::new(); let calendar_present = { let mut dao = self @@ -184,6 +210,13 @@ impl InsightGenerator { let mut dao = self.face_dao.lock().expect("Unable to lock FaceDao"); dao.has_any_faces(&cx).unwrap_or(false) }; + let allow_agent_corrections = persona + .and_then(|(uid, pid)| { + let mut pdao = self.persona_dao.lock().expect("Unable to lock PersonaDao"); + pdao.get_persona(&cx, uid, pid).ok().flatten() + }) + .map(|p| p.allow_agent_corrections) + .unwrap_or(false); ToolGateOpts { has_vision, apollo_enabled: self.apollo_enabled(), @@ -191,6 +224,7 @@ impl InsightGenerator { calendar_present, location_history_present, faces_present, + allow_agent_corrections, } } @@ -1554,6 +1588,13 @@ Return ONLY the summary, nothing else."#, file_path: &str, user_id: i32, persona_id: &str, + // Provenance — written into entity_facts.created_by_* when + // the loop calls store_fact. The caller knows the actual + // chat-runtime model and backend (which may differ from + // ollama.primary_model in hybrid mode where chat lives on + // OpenRouter while Ollama still handles vision). + model: &str, + backend: &str, cx: &opentelemetry::Context, ) -> String { let result = match tool_name { @@ -1574,7 +1615,17 @@ Return ONLY the summary, nothing else."#, } "store_entity" => self.tool_store_entity(arguments, ollama, cx).await, "store_fact" => { - self.tool_store_fact(arguments, file_path, user_id, persona_id, cx) + self.tool_store_fact( + arguments, file_path, user_id, persona_id, model, backend, cx, + ) + .await + } + "update_fact" => { + self.tool_update_fact(arguments, user_id, persona_id, model, backend, cx) + .await + } + "supersede_fact" => { + self.tool_supersede_fact(arguments, user_id, persona_id, model, backend, cx) .await } "get_current_datetime" => Self::tool_get_current_datetime(), @@ -1807,7 +1858,11 @@ Return ONLY the summary, nothing else."#, /// Tool: search_messages — keyword / semantic / hybrid search over all /// SMS message bodies via the Django FTS5 + embeddings pipeline. Now /// supports optional `contact_id`, `start_ts`, `end_ts` filters. - async fn tool_search_messages(&self, args: &serde_json::Value, cx: &opentelemetry::Context) -> String { + async fn tool_search_messages( + &self, + args: &serde_json::Value, + cx: &opentelemetry::Context, + ) -> String { let query = match args.get("query").and_then(|v| v.as_str()) { Some(q) if !q.trim().is_empty() => q.trim(), _ => { @@ -1827,8 +1882,7 @@ Return ONLY the summary, nothing else."#, .map(|s| !s.trim().is_empty()) .unwrap_or(false); let has_numeric_contact = args.get("contact_id").is_some(); - let has_ts_window = - args.get("start_ts").is_some() || args.get("end_ts").is_some(); + let has_ts_window = args.get("start_ts").is_some() || args.get("end_ts").is_some(); if has_date_str && !has_numeric_contact && !has_ts_window { log::info!( "search_messages with `date` and no `query` — routing to get_sms_messages" @@ -1841,8 +1895,7 @@ Return ONLY the summary, nothing else."#, routed ); } - let has_contact_name = - args.get("contact").and_then(|v| v.as_str()).is_some(); + let has_contact_name = args.get("contact").and_then(|v| v.as_str()).is_some(); if has_ts_window || has_numeric_contact || has_contact_name { return "Error: search_messages needs a 'query' (keywords/phrase). \ To fetch messages around a date or from a contact without keywords, \ @@ -1874,70 +1927,62 @@ Return ONLY the summary, nothing else."#, let contact_id = args.get("contact_id").and_then(|v| v.as_i64()); let start_ts = args.get("start_ts").and_then(|v| v.as_i64()); let end_ts = args.get("end_ts").and_then(|v| v.as_i64()); + let is_mms = args.get("is_mms").and_then(|v| v.as_bool()); + let has_media = args.get("has_media").and_then(|v| v.as_bool()); let has_date_filter = start_ts.is_some() || end_ts.is_some(); - // When a date filter is supplied, fetch a larger pool from SMS-API - // so in-window matches that ranked lower than out-of-window ones - // aren't lost. - let fetch_limit = if has_date_filter { 100 } else { user_limit }; - log::info!( - "tool_search_messages: query='{}', mode={}, contact_id={:?}, range=[{:?}, {:?}], user_limit={}, fetch_limit={}", + "tool_search_messages: query='{}', mode={}, contact_id={:?}, range=[{:?}, {:?}], is_mms={:?}, has_media={:?}, limit={}", query, mode, contact_id, start_ts, end_ts, - user_limit, - fetch_limit + is_mms, + has_media, + user_limit ); - let hits = match self - .sms_client - .search_messages_with_contact(query, &mode, fetch_limit, contact_id) - .await - { + // SMS-API applies all of date/contact/mms/media filtering and + // pagination server-side, so the response is already exactly the + // page we want — no over-fetch, no client-side post-filter. + let params = SmsSearchParams { + mode: mode.as_str(), + limit: user_limit, + contact_id, + date_from: start_ts, + date_to: end_ts, + is_mms, + has_media, + offset: None, + }; + + let hits = match self.sms_client.search_messages(query, ¶ms).await { Ok(h) => h, Err(e) => return format!("Error searching messages: {}", e), }; - // Date-range post-filter on the client side. SMS-API's /search/ - // doesn't accept date params; mirroring Apollo's pattern here. - let filtered: Vec<_> = hits - .into_iter() - .filter(|h| { - if let Some(s) = start_ts - && h.date < s - { - return false; - } - if let Some(e) = end_ts - && h.date > e - { - return false; - } - true - }) - .take(user_limit) - .collect(); - - if filtered.is_empty() { + if hits.is_empty() { return "No messages matched.".to_string(); } + Self::format_search_hits(&hits, &mode, has_date_filter) + } + + /// Render a list of [`SmsSearchHit`] for the LLM. Prefers the SMS-API + /// snippet (which already excerpts the matched span and is the only + /// preview MMS-attachment-only matches have) over the full body, and + /// strips the `` tags the snippet ships with. + fn format_search_hits(hits: &[SmsSearchHit], mode: &str, date_filtered: bool) -> String { let user_name = user_display_name(); let mut out = String::new(); out.push_str(&format!( "Found {} messages (mode: {}{}):\n\n", - filtered.len(), + hits.len(), mode, - if has_date_filter { - ", date-filtered" - } else { - "" - } + if date_filtered { ", date-filtered" } else { "" } )); - for h in filtered { + for h in hits { let date = chrono::DateTime::from_timestamp(h.date, 0) .map(|dt| dt.format("%Y-%m-%d").to_string()) .unwrap_or_else(|| h.date.to_string()); @@ -1950,14 +1995,25 @@ Return ONLY the summary, nothing else."#, .similarity_score .map(|s| format!(" [score {:.2}]", s)) .unwrap_or_default(); + let preview = match h.snippet.as_deref() { + Some(s) if !s.is_empty() => Self::strip_mark_tags(s), + _ => h.body.clone(), + }; out.push_str(&format!( "[{}]{} {} — {}\n\n", - date, score, direction, h.body + date, score, direction, preview )); } out } + /// Strip the `` / `` highlight tags that SMS-API wraps + /// matched terms in. The tags carry no signal beyond what the query + /// already conveys to the LLM, and leaving them in adds prompt noise. + fn strip_mark_tags(s: &str) -> String { + s.replace("", "").replace("", "") + } + /// Tool: get_sms_messages — fetch SMS messages near a date for a contact async fn tool_get_sms_messages( &self, @@ -2401,9 +2457,15 @@ Return ONLY the summary, nothing else."#, limit ); + // Status scope mirrors recall_facts_for_photo: by default + // include both 'active' (agent-generated, not yet reviewed) + // and 'reviewed' (human-verified). The DAO list_entities only + // takes a single status string, so use 'all' here and filter + // out 'rejected' below — slightly broader but fine for the + // recall use case (entities are global, persona-agnostic). let filter = EntityFilter { entity_type, - status: Some("active".to_string()), + status: Some("all".to_string()), search: name_search, limit, offset: 0, @@ -2414,12 +2476,13 @@ Return ONLY the summary, nothing else."#, .lock() .expect("Unable to lock KnowledgeDao"); match kdao.list_entities(cx, filter) { - Ok((entities, _total)) if entities.is_empty() => { + Ok((entities, _total)) if entities.iter().all(|e| e.status == "rejected") => { "No known entities found matching the query.".to_string() } Ok((entities, _total)) => { let lines: Vec = entities .iter() + .filter(|e| e.status != "rejected") .map(|e| { format!( "ID:{} | {} | {} | {} | confidence:{:.2}", @@ -2427,7 +2490,11 @@ Return ONLY the summary, nothing else."#, ) }) .collect(); - format!("Known entities:\n{}", lines.join("\n")) + if lines.is_empty() { + "No known entities found matching the query.".to_string() + } else { + format!("Known entities:\n{}", lines.join("\n")) + } } Err(e) => format!("Error recalling entities: {:?}", e), } @@ -2453,6 +2520,19 @@ Return ONLY the summary, nothing else."#, log::info!("tool_recall_facts_for_photo: file_path={}", file_path); + // Resolve the persona's reviewed-only-mode flag once. If the + // persona row is missing (shouldn't happen — composite FK + // enforces existence on writes), fall back to the permissive + // default of active+reviewed. + let reviewed_only = { + let mut pdao = self.persona_dao.lock().expect("Unable to lock PersonaDao"); + pdao.get_persona(cx, user_id, persona_id) + .ok() + .flatten() + .map(|p| p.reviewed_only_facts) + .unwrap_or(false) + }; + let mut kdao = self .knowledge_dao .lock() @@ -2486,7 +2566,18 @@ Return ONLY the summary, nothing else."#, e.name, e.entity_type, role )); if let Ok(facts) = kdao.get_facts_for_entity(cx, entity_id, &persona_filter) { - for f in facts.iter().filter(|f| f.status == "active") { + // Default scope: active + reviewed (everything not + // rejected / superseded). Strict mode trims to + // reviewed only — the persona has opted into seeing + // exclusively human-verified facts. + let allow = |s: &str| -> bool { + if reviewed_only { + s == "reviewed" + } else { + s == "active" || s == "reviewed" + } + }; + for f in facts.iter().filter(|f| allow(&f.status)) { let obj = if let Some(ref v) = f.object_value { v.clone() } else if let Some(oid) = f.object_entity_id { @@ -2632,6 +2723,8 @@ Return ONLY the summary, nothing else."#, file_path: &str, user_id: i32, persona_id: &str, + model: &str, + backend: &str, cx: &opentelemetry::Context, ) -> String { use crate::database::models::{InsertEntityFact, InsertEntityPhotoLink}; @@ -2672,6 +2765,19 @@ Return ONLY the summary, nothing else."#, file_path ); + // Anchor the fact in valid-time using the source photo's + // `date_taken` (Apollo's naive-as-UTC convention is fine + // here — we only care about calendar ordering, not absolute + // UTC). The semantic stretch: a photo *evidences* the fact at + // that date — the fact may have started earlier — so this is + // best read as "no later than this it started being true", + // not a strict lower bound. Still useful: gives the curator a + // calendar anchor and lets supersession (next slice) close + // intervals cleanly when a newer fact arrives. valid_until + // stays NULL — a single photo can't tell us when something + // *stopped* being true. + let valid_from = self.fetch_exif(file_path).and_then(|e| e.date_taken); + let fact = InsertEntityFact { subject_entity_id, predicate, @@ -2684,6 +2790,17 @@ Return ONLY the summary, nothing else."#, created_at: chrono::Utc::now().timestamp(), persona_id: persona_id.to_string(), user_id, + valid_from, + valid_until: None, + superseded_by: None, + created_by_model: Some(model.to_string()), + created_by_backend: Some(backend.to_string()), + // Initial write — no modification yet; last_modified_* + // intentionally NULL so the audit feed only shows real + // post-creation changes. + last_modified_by_model: None, + last_modified_by_backend: None, + last_modified_at: None, }; let mut kdao = self @@ -2723,6 +2840,144 @@ Return ONLY the summary, nothing else."#, ) } + /// Tool: update_fact — patch a fact's mutable fields. Gated by the + /// active persona's `allow_agent_corrections` flag at the schema / + /// catalog layer (build_tool_definitions); rechecked here as a + /// defense in depth in case a hallucinated tool call slips + /// through. + async fn tool_update_fact( + &self, + args: &serde_json::Value, + user_id: i32, + persona_id: &str, + model: &str, + backend: &str, + cx: &opentelemetry::Context, + ) -> String { + use crate::database::FactPatch; + + // Defense-in-depth gate check. + let allowed = { + let mut pdao = self.persona_dao.lock().expect("Unable to lock PersonaDao"); + pdao.get_persona(cx, user_id, persona_id) + .ok() + .flatten() + .map(|p| p.allow_agent_corrections) + .unwrap_or(false) + }; + if !allowed { + return "Error: agent corrections are disabled for this persona. Ask the operator to flip allow_agent_corrections.".to_string(); + } + + let fact_id = match args.get("fact_id").and_then(|v| v.as_i64()) { + Some(id) => id as i32, + None => return "Error: missing required parameter 'fact_id'".to_string(), + }; + + // Build the patch from any fields present on `args`. valid_* + // are tri-state — JSON null → Some(None) → clear back to NULL, + // omitted → None → leave alone, value → Some(Some(value)) → + // set. The match-on-presence pattern below mirrors the HTTP + // PATCH path's serde-helper behaviour. + let parse_optional_i64 = |v: Option<&serde_json::Value>| -> Option> { + v.map(|val| if val.is_null() { None } else { val.as_i64() }) + }; + + let patch = FactPatch { + predicate: args + .get("predicate") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + object_value: args + .get("object_value") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + status: args + .get("status") + .and_then(|v| v.as_str()) + .filter(|s| matches!(*s, "active" | "reviewed" | "rejected")) + .map(|s| s.to_string()), + confidence: args + .get("confidence") + .and_then(|v| v.as_f64()) + .map(|f| f.clamp(0.0, 0.95) as f32), + valid_from: parse_optional_i64(args.get("valid_from")), + valid_until: parse_optional_i64(args.get("valid_until")), + }; + + log::info!("tool_update_fact: fact_id={}", fact_id); + + let mut kdao = self + .knowledge_dao + .lock() + .expect("Unable to lock KnowledgeDao"); + + match kdao.update_fact(cx, fact_id, patch, Some((model, backend))) { + Ok(Some(f)) => format!( + "Updated fact ID:{} (status={}, confidence={:.2})", + f.id, f.status, f.confidence + ), + Ok(None) => format!("Error: fact ID:{} not found", fact_id), + Err(e) => format!("Error updating fact: {:?}", e), + } + } + + /// Tool: supersede_fact — replace one fact with another. Same + /// gating as update_fact. + async fn tool_supersede_fact( + &self, + args: &serde_json::Value, + user_id: i32, + persona_id: &str, + model: &str, + backend: &str, + cx: &opentelemetry::Context, + ) -> String { + let allowed = { + let mut pdao = self.persona_dao.lock().expect("Unable to lock PersonaDao"); + pdao.get_persona(cx, user_id, persona_id) + .ok() + .flatten() + .map(|p| p.allow_agent_corrections) + .unwrap_or(false) + }; + if !allowed { + return "Error: agent corrections are disabled for this persona.".to_string(); + } + + let old_fact_id = match args.get("old_fact_id").and_then(|v| v.as_i64()) { + Some(id) => id as i32, + None => return "Error: missing required parameter 'old_fact_id'".to_string(), + }; + let new_fact_id = match args.get("new_fact_id").and_then(|v| v.as_i64()) { + Some(id) => id as i32, + None => return "Error: missing required parameter 'new_fact_id'".to_string(), + }; + if old_fact_id == new_fact_id { + return "Error: old_fact_id and new_fact_id must differ".to_string(); + } + + log::info!( + "tool_supersede_fact: old={}, new={}", + old_fact_id, + new_fact_id + ); + + let mut kdao = self + .knowledge_dao + .lock() + .expect("Unable to lock KnowledgeDao"); + + match kdao.supersede_fact(cx, old_fact_id, new_fact_id, Some((model, backend))) { + Ok(Some(f)) => format!( + "Superseded fact ID:{} (now status={}, valid_until={:?})", + f.id, f.status, f.valid_until + ), + Ok(None) => "Error: old or new fact not found".to_string(), + Err(e) => format!("Error superseding fact: {:?}", e), + } + } + /// Tool: get_current_datetime — returns the current local date and time fn tool_get_current_datetime() -> String { let now = Local::now(); @@ -2771,23 +3026,39 @@ Return ONLY the summary, nothing else."#, tools.push(Tool::function( "search_messages", - "Search SMS/MMS message bodies. Modes: `fts5` (keyword + phrase + prefix + AND/OR/NOT + NEAR proximity), \ + "Search SMS/MMS messages — bodies and (for MMS) attachment text + filenames. \ + Modes: `fts5` (keyword + phrase + prefix + AND/OR/NOT + NEAR proximity), \ `semantic` (embedding similarity, requires generated embeddings), `hybrid` (RRF merge, recommended; \ - degrades to fts5 when embeddings absent). Optional `start_ts` / `end_ts` (real-UTC unix seconds) and \ - `contact_id` filters. For pure date / contact browsing without keywords, prefer `get_sms_messages`. \ - Examples: `{query: \"trader joe's\"}` — phrase across all time. \ - `{query: \"dinner\", contact_id: 42, start_ts: 1700000000, end_ts: 1700604800}` — keyword within a contact and a week. \ - `{query: \"NEAR(meeting work, 5)\"}` — proximity search.", + degrades to fts5 when embeddings absent). Optional filters: `start_ts` / `end_ts` (real-UTC unix \ + seconds), `contact_id`, `is_mms` (true = MMS only, false = SMS only), `has_media` (true = messages \ + with image/video/audio attachments only). For pure date / contact browsing without keywords, prefer \ + `get_sms_messages`. \ + \n\nFTS5 query syntax (works in fts5 + hybrid modes):\n\ + - Phrase: `\"trader joe's\"` — exact word sequence (use double quotes).\n\ + - Prefix: `restaur*` — matches restaurant, restaurants, restauranteur, ….\n\ + - Boolean: `dinner AND tahoe`, `wedding OR reception OR ceremony`, `vacation NOT work` (operators must be UPPERCASE).\n\ + - Proximity: `NEAR(meeting work, 5)` — both terms within 5 tokens of each other.\n\ + - Combine: `(reception OR ceremony) AND tahoe*` — group with parens.\n\ + Unquoted multi-word queries are treated as implicit AND. Apostrophes / hyphens / colons are safe — they no longer downgrade to a slow LIKE scan. Use `mode: \"fts5\"` when you want the operators above to be authoritative; `hybrid` still respects them but may surface semantically-similar non-keyword hits alongside.\n\n\ + Examples:\n\ + - `{query: \"trader joe's\"}` — phrase across all time.\n\ + - `{query: \"dinner\", contact_id: 42, start_ts: 1700000000, end_ts: 1700604800}` — keyword within a contact and a week.\n\ + - `{query: \"vacation\", has_media: true}` — only matches that include photos / videos.\n\ + - `{query: \"wedding OR reception OR ceremony\", mode: \"fts5\"}` — any of several synonyms.\n\ + - `{query: \"restaur*\", mode: \"fts5\"}` — prefix expansion for varying word forms.\n\ + - `{query: \"NEAR(birthday cake, 5)\", mode: \"fts5\"}` — terms close together but in any order.", serde_json::json!({ "type": "object", "required": ["query"], "properties": { - "query": { "type": "string", "description": "Search query. Min 3 chars. fts5 supports phrase (\"\"), prefix (*), AND/OR/NOT, and NEAR proximity." }, + "query": { "type": "string", "description": "Search query. Min 3 chars. fts5 supports phrase (\"\"), prefix (*), AND/OR/NOT, and NEAR proximity. Matches both message body and MMS attachment text/filename." }, "mode": { "type": "string", "enum": ["fts5", "semantic", "hybrid"], "description": "Search strategy. Default: hybrid." }, "limit": { "type": "integer", "description": "Max results (default 20, max 50)." }, "contact_id": { "type": "integer", "description": "Optional numeric contact id to scope the search." }, "start_ts": { "type": "integer", "description": "Optional inclusive lower bound, real-UTC unix seconds." }, - "end_ts": { "type": "integer", "description": "Optional inclusive upper bound, real-UTC unix seconds." } + "end_ts": { "type": "integer", "description": "Optional inclusive upper bound, real-UTC unix seconds." }, + "is_mms": { "type": "boolean", "description": "Optional: true to restrict to MMS, false to restrict to SMS." }, + "has_media":{ "type": "boolean", "description": "Optional: true to restrict to messages with image / video / audio attachments." } } }), )); @@ -2972,6 +3243,55 @@ Return ONLY the summary, nothing else."#, }), )); + // Self-correction tools — only exposed when the active persona + // has allow_agent_corrections=true. Gating happens here AND in + // the tool method itself (the runtime check is the load-bearing + // one; this just keeps the tool out of the model's catalog so + // it doesn't waste iterations trying calls it can't make). + if opts.allow_agent_corrections { + tools.push(Tool::function( + "update_fact", + "Correct an existing fact in the knowledge memory. Use sparingly — only when you have \ + stronger evidence than the original write justified (e.g. a clearer photo, a \ + contradicting timestamp on a related fact, or explicit user correction). Common \ + patches: tighten `valid_from` / `valid_until` to a known interval, downgrade \ + `confidence` after seeing contradicting evidence, or flip `status` to 'reviewed' if \ + you've verified the fact independently. Cannot change subject / object — supersede \ + instead. Pass `fact_id` plus any subset of patchable fields.", + serde_json::json!({ + "type": "object", + "required": ["fact_id"], + "properties": { + "fact_id": { "type": "integer", "description": "ID of the fact to patch (from recall_facts_for_photo or list)." }, + "predicate": { "type": "string", "description": "New predicate string. Rare." }, + "object_value": { "type": "string", "description": "New free-text object. Use for typed-fact corrections." }, + "status": { "type": "string", "description": "'active' | 'reviewed' | 'rejected'. 'superseded' is for the supersede_fact tool." }, + "confidence": { "type": "number", "description": "0.0–0.95. Lower when you've seen contradicting evidence." }, + "valid_from": { "type": "integer", "description": "Unix-seconds lower bound on when the fact began being true. Null clears." }, + "valid_until": { "type": "integer", "description": "Unix-seconds upper bound on when the fact stopped being true. Null clears." } + } + }), + )); + + tools.push(Tool::function( + "supersede_fact", + "Mark an old fact as replaced by a newer one. Use when the new fact contradicts the \ + old AND the contradiction is a *time-bounded change* (relationship changed, address \ + changed, role changed), not a correction of a mistake — for mistakes, set the old \ + fact's status to 'rejected' via update_fact. Atomically: flips old.status to \ + 'superseded', points old.superseded_by at the new fact, and stamps old.valid_until \ + from new.valid_from (when not already set) so the two intervals are disjoint.", + serde_json::json!({ + "type": "object", + "required": ["old_fact_id", "new_fact_id"], + "properties": { + "old_fact_id": { "type": "integer", "description": "The fact being replaced." }, + "new_fact_id": { "type": "integer", "description": "The fact that replaces it. Must already exist (use store_fact first if you're recording a new one)." } + } + }), + )); + } + tools.push(Tool::function( "get_current_datetime", "Get the current date and time. Useful when reasoning about how long ago a photo was taken.", @@ -3204,6 +3524,8 @@ Return ONLY the summary, nothing else."#, — surrounding events matter even when a contact is known.\n\ - Use recall_facts_for_photo + recall_entities to load any prior knowledge about subjects in the photo.\n\ - When you identify people / places / events / things, use store_entity + store_fact to grow the persistent memory.\n\ + - Before store_entity, call recall_entities to check whether a similar name already exists; reuse the existing entity_id rather than creating a near-duplicate (e.g. \"Sara\" vs \"Sarah J.\"). The DAO will collapse obvious cosine matches, but choosing the existing id keeps facts and photo links consolidated.\n\ + - Predicates should be relationship-shaped verbs that encode a queryable claim — `lives_in`, `works_at`, `attended`, `is_friend_of`, `is_parent_of`, `interested_in`, `married_to`, `owns`. DO NOT use vague speech-act predicates like `expressed`, `said`, `mentioned`, `stated`, `quoted`, `noted`, `discussed`, `thought`, `wondered`. DO NOT store quotations or sentence fragments as `object_value` — paraphrase into a structured claim. Bad: `(Cameron, expressed, \"I'm tempted to get a part-time job there\")`. Good: `(Cameron, considered_employment_at, )` or `(Cameron, interested_in, \"part-time work\")`.\n\ - A tool returning no results is informative; continue with the others.", ); @@ -3713,6 +4035,8 @@ Return ONLY the summary, nothing else."#, &file_path, user_id, &persona_id, + chat_backend.primary_model(), + &backend_label, &loop_cx, ) .await; @@ -3928,6 +4252,7 @@ mod tests { calendar_present: false, location_history_present: false, faces_present: false, + allow_agent_corrections: false, }; let tools = InsightGenerator::build_tool_definitions(opts); let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect(); @@ -3950,6 +4275,9 @@ mod tests { assert!(!names.contains(&"get_calendar_events")); assert!(!names.contains(&"get_location_history")); assert!(!names.contains(&"get_faces_in_photo")); + // Agent-correction tools are absent without the gate. + assert!(!names.contains(&"update_fact")); + assert!(!names.contains(&"supersede_fact")); } #[test] @@ -3961,6 +4289,7 @@ mod tests { calendar_present: true, location_history_present: true, faces_present: true, + allow_agent_corrections: true, }; let tools = InsightGenerator::build_tool_definitions(opts); let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect(); @@ -3970,6 +4299,8 @@ mod tests { assert!(names.contains(&"get_calendar_events")); assert!(names.contains(&"get_location_history")); assert!(names.contains(&"get_faces_in_photo")); + assert!(names.contains(&"update_fact")); + assert!(names.contains(&"supersede_fact")); } fn place(name: &str, description: &str) -> ApolloPlace { @@ -4143,6 +4474,92 @@ mod tests { ); } + fn make_search_hit( + id: i64, + contact: &str, + body: &str, + snippet: Option<&str>, + type_: i32, + ) -> SmsSearchHit { + SmsSearchHit { + message_id: id, + contact_name: contact.to_string(), + contact_address: "+15551234567".to_string(), + body: body.to_string(), + date: 1_700_000_000 + id * 86_400, + type_, + similarity_score: None, + snippet: snippet.map(|s| s.to_string()), + } + } + + #[test] + fn format_search_hits_falls_back_to_body_when_no_snippet() { + // fts5 mode often returns no snippet for messages that didn't need + // excerpting (whole body short, or semantic-only hit in hybrid). + let hit = make_search_hit(1, "Sarah", "see you at the lake tomorrow", None, 1); + let out = InsightGenerator::format_search_hits(&[hit], "fts5", false); + + assert!(out.starts_with("Found 1 messages (mode: fts5):")); + assert!(out.contains("see you at the lake tomorrow")); + assert!(out.contains("Sarah —")); + assert!(!out.contains("date-filtered")); + } + + #[test] + fn format_search_hits_prefers_snippet_over_body_and_strips_marks() { + let hit = make_search_hit( + 2, + "Sarah", + "long body text that should be ignored once the server returns a focused snippet", + Some("…at the lake tomorrow…"), + 1, + ); + let out = InsightGenerator::format_search_hits(&[hit], "hybrid", true); + + // Snippet wins over body, tags stripped. + assert!(out.contains("at the lake tomorrow")); + assert!(!out.contains("")); + assert!(!out.contains("")); + assert!(!out.contains("long body text")); + // Date-filter flag flows through to the header. + assert!(out.contains("date-filtered")); + } + + #[test] + fn format_search_hits_surfaces_mms_attachment_only_matches() { + // Regression guard: pre-snippet, MMS rows that matched via + // message_parts_fts (filename / part text) had an empty `body` + // and rendered as a blank preview to the LLM. + let hit = make_search_hit(3, "Mom", "", Some("birthday_cake.jpg"), 1); + let out = InsightGenerator::format_search_hits(&[hit], "fts5", false); + + assert!(out.contains("birthday_cake.jpg")); + assert!(!out.contains("")); + assert!(out.contains("Mom —")); + } + + #[test] + fn format_search_hits_empty_snippet_falls_back_to_body() { + let hit = make_search_hit(4, "Dad", "fallback body", Some(""), 1); + let out = InsightGenerator::format_search_hits(&[hit], "fts5", false); + assert!(out.contains("fallback body")); + } + + #[test] + fn strip_mark_tags_handles_common_patterns() { + assert_eq!(InsightGenerator::strip_mark_tags("plain text"), "plain text"); + assert_eq!( + InsightGenerator::strip_mark_tags("…the lake…"), + "…the lake…" + ); + // FTS5 highlights every match — multiple marks per snippet are normal. + assert_eq!( + InsightGenerator::strip_mark_tags("dinner at tahoe"), + "dinner at tahoe" + ); + } + #[test] fn summarize_search_rag_counts_hits() { let raw = "[2023-08-15] Sarah: venue confirmed\n\n[2023-08-14] Mom: travel plans\n\n[2023-08-13] Dad: weather"; diff --git a/src/ai/sms_client.rs b/src/ai/sms_client.rs index b59c266..6661bac 100644 --- a/src/ai/sms_client.rs +++ b/src/ai/sms_client.rs @@ -257,30 +257,45 @@ impl SmsApiClient { } /// Search message bodies via the Django side's FTS5 / semantic / hybrid - /// endpoint. `mode` selects the ranking strategy: + /// endpoint. `params.mode` selects the ranking strategy: /// - "fts5" keyword-only, supports phrase / prefix / boolean / NEAR /// - "semantic" embedding similarity /// - "hybrid" both merged via reciprocal rank fusion (recommended) /// - /// The SMS-API endpoint accepts `contact_id` natively; date filtering is - /// the caller's responsibility (post-filter on the returned rows). - pub async fn search_messages_with_contact( + /// All of `contact_id`, `date_from` / `date_to` (unix seconds), `is_mms`, + /// `has_media`, and `offset` are pushed to SMS-API server-side so the + /// filtered+paginated result set is exact rather than a client-side + /// over-fetch. + pub async fn search_messages( &self, query: &str, - mode: &str, - limit: usize, - contact_id: Option, + params: &SmsSearchParams<'_>, ) -> Result> { let mut url = format!( "{}/api/messages/search/?q={}&mode={}&limit={}", self.base_url, urlencoding::encode(query), - urlencoding::encode(mode), - limit + urlencoding::encode(params.mode), + params.limit, ); - if let Some(cid) = contact_id { + if let Some(cid) = params.contact_id { url.push_str(&format!("&contact_id={}", cid)); } + if let Some(off) = params.offset { + url.push_str(&format!("&offset={}", off)); + } + if let Some(from) = params.date_from { + url.push_str(&format!("&date_from={}", from)); + } + if let Some(to) = params.date_to { + url.push_str(&format!("&date_to={}", to)); + } + if let Some(is_mms) = params.is_mms { + url.push_str(&format!("&is_mms={}", is_mms)); + } + if let Some(has_media) = params.has_media { + url.push_str(&format!("&has_media={}", has_media)); + } let mut request = self.client.get(&url); if let Some(token) = &self.token { @@ -383,6 +398,30 @@ pub struct SmsSearchHit { /// Present for semantic / hybrid modes; absent for fts5. #[serde(default)] pub similarity_score: Option, + /// SMS-API-generated excerpt around the match, wrapped in `` tags. + /// For MMS messages that only matched via attachment text / filename + /// (empty `body`), the snippet is the only meaningful preview. + #[serde(default)] + pub snippet: Option, +} + +/// Optional filter / paging knobs for [`SmsApiClient::search_messages`]. +/// All fields except `mode` and `limit` map 1:1 to the same-named SMS-API +/// query params (added in the 2026-05 search-enhancements release). +#[derive(Debug, Clone)] +pub struct SmsSearchParams<'a> { + pub mode: &'a str, + pub limit: usize, + pub contact_id: Option, + /// Unix-seconds inclusive lower bound on `date`. + pub date_from: Option, + /// Unix-seconds inclusive upper bound on `date`. + pub date_to: Option, + /// `Some(true)` = MMS only, `Some(false)` = SMS only, `None` = both. + pub is_mms: Option, + /// `Some(true)` = only messages with image/video/audio attachments. + pub has_media: Option, + pub offset: Option, } #[derive(Deserialize)] diff --git a/src/bin/populate_knowledge.rs b/src/bin/populate_knowledge.rs index a7cdd27..29945d7 100644 --- a/src/bin/populate_knowledge.rs +++ b/src/bin/populate_knowledge.rs @@ -185,6 +185,9 @@ async fn main() -> anyhow::Result<()> { Arc::new(Mutex::new(Box::new(SqliteKnowledgeDao::new()))); let face_dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteFaceDao::new()))); + let persona_dao: Arc>> = Arc::new(Mutex::new( + Box::new(image_api::database::SqlitePersonaDao::new()), + )); // Pass the full library set so `resolve_full_path` probes every root, // even when --library restricts the walk. A rel_path shared across @@ -203,6 +206,7 @@ async fn main() -> anyhow::Result<()> { tag_dao, face_dao, knowledge_dao, + persona_dao, all_libs.clone(), ); diff --git a/src/database/insights_dao.rs b/src/database/insights_dao.rs index 8c7551c..86c51aa 100644 --- a/src/database/insights_dao.rs +++ b/src/database/insights_dao.rs @@ -204,19 +204,24 @@ impl InsightDao for SqliteInsightDao { lib_id: i32, path: &str, ) -> Result, DbError> { - trace_db_call(context, "query", "get_current_insight_for_library", |_span| { - use schema::photo_insights::dsl::*; + trace_db_call( + context, + "query", + "get_current_insight_for_library", + |_span| { + use schema::photo_insights::dsl::*; - let mut connection = self.connection.lock().expect("Unable to get InsightDao"); + let mut connection = self.connection.lock().expect("Unable to get InsightDao"); - photo_insights - .filter(library_id.eq(lib_id)) - .filter(rel_path.eq(path)) - .filter(is_current.eq(true)) - .first::(connection.deref_mut()) - .optional() - .map_err(|_| anyhow::anyhow!("Query error")) - }) + photo_insights + .filter(library_id.eq(lib_id)) + .filter(rel_path.eq(path)) + .filter(is_current.eq(true)) + .first::(connection.deref_mut()) + .optional() + .map_err(|_| anyhow::anyhow!("Query error")) + }, + ) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } diff --git a/src/database/knowledge_dao.rs b/src/database/knowledge_dao.rs index f807f23..069dd38 100644 --- a/src/database/knowledge_dao.rs +++ b/src/database/knowledge_dao.rs @@ -45,6 +45,17 @@ pub struct EntityFilter { pub offset: i64, } +/// Sort key for the curation list. Name = alphabetical clustering +/// (good for spotting near-duplicates like Sara / Sarah / Sarah J.). +/// FactCount = surface heavily-used entities first, demote 0-fact +/// noise to the bottom. UpdatedDesc = legacy "newest activity first". +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EntitySort { + UpdatedDesc, + NameAsc, + FactCountDesc, +} + pub struct FactFilter { pub entity_id: Option, /// "active" | "reviewed" | "rejected" | "all" @@ -90,6 +101,15 @@ pub struct FactPatch { pub object_value: Option, pub status: Option, pub confidence: Option, + /// Real-world valid-time bounds. Outer Some = "patch this column"; + /// inner Some(val) = set to that unix-seconds value; inner None = + /// clear back to NULL ("unbounded"). The double-Option lets the + /// HTTP layer distinguish "field omitted" (leave alone) from + /// "field sent as null" (clear) — needed for these specifically + /// because there's no sentinel string-empty equivalent like the + /// other fields have. + pub valid_from: Option>, + pub valid_until: Option>, } pub struct RecentActivity { @@ -97,6 +117,44 @@ pub struct RecentActivity { pub facts: Vec, } +/// A near-duplicate cluster found by `find_consolidation_proposals`. +/// `min_cosine` / `max_cosine` are summary stats over the pairwise +/// edges inside the group — gives the curator a sense of "how tight" +/// the cluster is before clicking in. +#[derive(Debug, Clone)] +pub struct ConsolidationGroup { + pub entities: Vec, + pub min_cosine: f32, + pub max_cosine: f32, +} + +/// Graph view payload: every entity that has at least one fact +/// becomes a node; every relational fact (object_entity_id set) +/// becomes an edge between subject and object. Multiple facts with +/// the same (subject, object, predicate) collapse into one edge +/// with a count so the UI can fan them out as one weighted line. +#[derive(Debug, Clone)] +pub struct GraphNode { + pub id: i32, + pub name: String, + pub entity_type: String, + pub fact_count: i64, +} + +#[derive(Debug, Clone)] +pub struct GraphEdge { + pub source: i32, + pub target: i32, + pub predicate: String, + pub count: i64, +} + +#[derive(Debug, Clone)] +pub struct EntityGraph { + pub nodes: Vec, + pub edges: Vec, +} + // --------------------------------------------------------------------------- // Trait // --------------------------------------------------------------------------- @@ -134,6 +192,89 @@ pub trait KnowledgeDao: Sync + Send { filter: EntityFilter, ) -> Result<(Vec, i64), DbError>; + /// List entities alongside a persona-scoped fact count for each. + /// Powers the curation surface — sorting by fact count surfaces + /// the heavily-used entities and demotes 0-fact noise. Counting + /// is restricted to non-rejected facts under the active persona + /// scope so a switch in the persona picker re-orders the list. + fn list_entities_with_fact_counts( + &mut self, + cx: &opentelemetry::Context, + filter: EntityFilter, + sort: EntitySort, + persona: &PersonaFilter, + ) -> Result<(Vec<(Entity, i64)>, i64), DbError>; + + /// Aggregate the user's active+reviewed facts by predicate so + /// the curation UI can flag noisy verbs ("expressed", "said") and + /// bulk-reject. Persona-scoped via the existing PersonaFilter + /// pattern. Sorted by count desc. + fn get_predicate_stats( + &mut self, + cx: &opentelemetry::Context, + persona: &PersonaFilter, + limit: usize, + ) -> Result, DbError>; + + /// Bulk reject every active fact under a given predicate + /// (persona-scoped). Returns the number of rows touched. Used by + /// the predicate-cleanup UI to nuke noise verbs in one click. + /// Stamps last_modified_* with the caller-supplied audit so the + /// action shows up in the recent-edits feed. + fn bulk_reject_facts_by_predicate( + &mut self, + cx: &opentelemetry::Context, + persona: &PersonaFilter, + predicate: &str, + audit: Option<(&str, &str)>, + ) -> Result; + + /// Build a graph snapshot — entities as nodes (fact count from + /// the active persona scope), relational facts as edges. Used + /// by the curation UI's graph view. Filters: + /// - entity_type: optional, restricts nodes to one type + /// - node_limit: caps the number of nodes; lower-fact-count + /// entities drop first + /// Edges between dropped entities are pruned. Persona scoping + /// affects fact_count + edge inclusion (rejected / superseded + /// excluded; All vs Single mirrors the existing pattern). + fn build_entity_graph( + &mut self, + cx: &opentelemetry::Context, + entity_type: Option<&str>, + node_limit: usize, + persona: &PersonaFilter, + ) -> Result; + + /// Find groups of near-duplicate entities that the upsert-time + /// cosine guard didn't catch (it runs at ~0.92; this scan runs + /// at a lower threshold to surface the "probably same" tier that + /// needs human review). Groups are formed via union-find over + /// the cosine-adjacency graph, partitioned by entity_type so a + /// person can't cluster with a place. Returns groups of >= 2 + /// entities, sorted by size desc then by max pairwise cosine. + /// Trimmed to `max_groups`. + fn find_consolidation_proposals( + &mut self, + cx: &opentelemetry::Context, + threshold: f32, + max_groups: usize, + ) -> Result, DbError>; + + /// Batch fetch per-persona fact counts for a set of entities, + /// scoped to one user. Returns map of entity_id → list of + /// (persona_id, count). Used by the curation UI to show "this + /// entity has 0 facts in your active persona but 12 in journal" + /// so the curator knows where to find the existing knowledge. + /// Rejected facts excluded; superseded included (they're history, + /// not noise). + fn get_persona_breakdowns_for_entities( + &mut self, + cx: &opentelemetry::Context, + entity_ids: &[i32], + user_id: i32, + ) -> Result>, DbError>; + fn update_entity_status( &mut self, cx: &opentelemetry::Context, @@ -177,11 +318,17 @@ pub trait KnowledgeDao: Sync + Send { filter: FactFilter, ) -> Result<(Vec, i64), DbError>; + /// Update a fact. `audit` stamps the row's `last_modified_*` + /// columns — None = legacy internal callers without provenance + /// context; HTTP passes `Some(("manual", "manual"))`; the agent + /// passes its loop-time model + backend so the audit trail can + /// distinguish human edits from agent corrections. fn update_fact( &mut self, cx: &opentelemetry::Context, id: i32, patch: FactPatch, + audit: Option<(&str, &str)>, ) -> Result, DbError>; fn update_facts_insight_id( @@ -193,6 +340,38 @@ pub trait KnowledgeDao: Sync + Send { fn delete_fact(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>; + /// Mark an old fact as superseded by a new one. Atomically: + /// - reads the new fact's valid_from + /// - sets old.superseded_by = new_id + /// - sets old.status = 'superseded' + /// - stamps old.valid_until = new.valid_from (if not already + /// set; otherwise leaves it) + /// - stamps old.last_modified_* from `audit` + /// + /// Returns the updated old fact. Errors if either id is missing. + fn supersede_fact( + &mut self, + cx: &opentelemetry::Context, + old_id: i32, + new_id: i32, + audit: Option<(&str, &str)>, + ) -> Result, DbError>; + + /// Undo a supersession: clear `superseded_by`, flip status back to + /// 'active', clear `valid_until` (we don't know if it was auto- + /// stamped by the supersede or hand-set, so the conservative reset + /// is to clear it — user can re-bound after). Stamps the audit + /// columns so the revert is itself attributable. + /// + /// Returns the restored fact. Errors if the fact doesn't exist or + /// wasn't superseded in the first place (no-op semantics). + fn revert_supersession( + &mut self, + cx: &opentelemetry::Context, + fact_id: i32, + audit: Option<(&str, &str)>, + ) -> Result, DbError>; + // --- Photo links --- fn upsert_photo_link( &mut self, @@ -282,6 +461,20 @@ impl SqliteKnowledgeDao { } } +/// Cosine-similarity threshold above which a new entity collapses into an +/// existing same-type entity at upsert time. The agent's pre-flight name +/// search uses FTS5 prefix tokens, which misses near-dupes like +/// "Sarah" / "Sara" / "Sarah J." that share a description-rich embedding. +/// Override via `ENTITY_DEDUP_COSINE_THRESHOLD` env var when tuning. +const ENTITY_DEDUP_COSINE_THRESHOLD_DEFAULT: f32 = 0.92; + +fn entity_dedup_cosine_threshold() -> f32 { + std::env::var("ENTITY_DEDUP_COSINE_THRESHOLD") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(ENTITY_DEDUP_COSINE_THRESHOLD_DEFAULT) +} + impl KnowledgeDao for SqliteKnowledgeDao { // ----------------------------------------------------------------------- // Entity operations @@ -308,7 +501,7 @@ impl KnowledgeDao for SqliteKnowledgeDao { // Use lower() on both sides so existing dirty rows ("Person") still match. let name_lower = entity.name.to_lowercase(); let type_lower = entity.entity_type.to_lowercase(); - let existing: Option = entities + let mut existing: Option = entities .filter(diesel::dsl::sql::(&format!( "lower(name) = '{}' AND lower(entity_type) = '{}'", name_lower.replace('\'', "''"), @@ -318,6 +511,49 @@ impl KnowledgeDao for SqliteKnowledgeDao { .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + // Fuzzy-match fallback: if no exact name match and the incoming + // entity carries an embedding, compare against same-type entities' + // embeddings and collapse if any are above the cosine threshold. + if existing.is_none() + && let Some(new_emb_bytes) = entity.embedding.as_ref() + && let Ok(new_vec) = Self::deserialize_embedding(new_emb_bytes) + && !new_vec.is_empty() + { + let threshold = entity_dedup_cosine_threshold(); + let candidates: Vec = entities + .filter(embedding.is_not_null()) + .filter(entity_type.eq(&entity.entity_type)) + .filter(status.ne("rejected")) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + + let mut best: Option<(Entity, f32)> = None; + for cand in candidates { + let Some(cand_bytes) = cand.embedding.as_ref() else { + continue; + }; + let Ok(cand_vec) = Self::deserialize_embedding(cand_bytes) else { + continue; + }; + let sim = Self::cosine_similarity(&new_vec, &cand_vec); + if sim >= threshold && best.as_ref().is_none_or(|(_, s)| sim > *s) { + best = Some((cand, sim)); + } + } + + if let Some((cand, sim)) = best { + log::info!( + "entity dedup: collapsing new '{}' ({}) into existing '{}' (id={}, cos={:.3})", + entity.name, + entity.entity_type, + cand.name, + cand.id, + sim + ); + existing = Some(cand); + } + } + if let Some(existing_entity) = existing { // Update description, embedding, updated_at diesel::update(entities.filter(id.eq(existing_entity.id))) @@ -472,6 +708,706 @@ impl KnowledgeDao for SqliteKnowledgeDao { .map_err(|_| DbError::new(DbErrorKind::QueryError)) } + fn list_entities_with_fact_counts( + &mut self, + cx: &opentelemetry::Context, + filter: EntityFilter, + sort: EntitySort, + persona: &PersonaFilter, + ) -> Result<(Vec<(Entity, i64)>, i64), DbError> { + trace_db_call(cx, "query", "list_entities_with_fact_counts", |_span| { + use diesel::sql_query; + use diesel::sql_types::{BigInt, Integer, Text}; + + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + // Build WHERE fragments. Inline-safe values are bound; status + // / sort keywords are validated against fixed sets. + let mut where_parts: Vec = Vec::new(); + let mut bind_types: Vec<&'static str> = Vec::new(); + let mut bind_strs: Vec = Vec::new(); + + if filter.entity_type.is_some() { + where_parts.push("e.entity_type = ?".to_string()); + bind_types.push("text"); + bind_strs.push(filter.entity_type.clone().unwrap()); + } + + let status_val = filter.status.as_deref().unwrap_or("active"); + if status_val != "all" { + where_parts.push("e.status = ?".to_string()); + bind_types.push("text"); + bind_strs.push(status_val.to_string()); + } + + if let Some(ref s) = filter.search { + where_parts.push("(e.name LIKE ? OR e.description LIKE ?)".to_string()); + bind_types.push("text"); + bind_types.push("text"); + let pat = format!("%{}%", s); + bind_strs.push(pat.clone()); + bind_strs.push(pat); + } + + let where_clause = if where_parts.is_empty() { + String::new() + } else { + format!("WHERE {}", where_parts.join(" AND ")) + }; + + // Persona-scoped fact-count subquery. Single = filter on + // (user_id, persona_id); All = union across the user's + // personas (mirror PersonaFilter::All read semantics). + let fact_count_join = match persona { + PersonaFilter::Single { + user_id: _, + persona_id: _, + } => { + "LEFT JOIN (\ + SELECT subject_entity_id, COUNT(*) AS fact_count \ + FROM entity_facts \ + WHERE user_id = ? AND persona_id = ? AND status != 'rejected' \ + GROUP BY subject_entity_id\ + ) fc ON fc.subject_entity_id = e.id" + } + PersonaFilter::All { user_id: _ } => { + "LEFT JOIN (\ + SELECT subject_entity_id, COUNT(*) AS fact_count \ + FROM entity_facts \ + WHERE user_id = ? AND status != 'rejected' \ + GROUP BY subject_entity_id\ + ) fc ON fc.subject_entity_id = e.id" + } + }; + + let order_by = match sort { + EntitySort::UpdatedDesc => "e.updated_at DESC", + EntitySort::NameAsc => "lower(e.name) ASC", + EntitySort::FactCountDesc => "COALESCE(fc.fact_count, 0) DESC, lower(e.name) ASC", + }; + + let select_sql = format!( + "SELECT e.id, e.name, e.entity_type, e.description, e.embedding, \ + e.confidence, e.status, e.created_at, e.updated_at, \ + COALESCE(fc.fact_count, 0) AS fact_count \ + FROM entities e \ + {fact_count_join} \ + {where_clause} \ + ORDER BY {order_by} \ + LIMIT ? OFFSET ?" + ); + + let count_sql = format!("SELECT COUNT(*) AS total FROM entities e {where_clause}"); + + // ── Total count ───────────────────────────────────────── + #[derive(diesel::QueryableByName)] + struct TotalRow { + #[diesel(sql_type = BigInt)] + total: i64, + } + let mut count_q = sql_query(count_sql).into_boxed(); + for s in &bind_strs { + count_q = count_q.bind::(s.clone()); + } + let total: i64 = count_q + .get_result::(conn.deref_mut()) + .map(|r| r.total) + .unwrap_or(0); + + // ── Page query ────────────────────────────────────────── + #[derive(diesel::QueryableByName)] + struct EntityWithCountRow { + #[diesel(sql_type = Integer)] + id: i32, + #[diesel(sql_type = Text)] + name: String, + #[diesel(sql_type = Text)] + entity_type: String, + #[diesel(sql_type = Text)] + description: String, + #[diesel(sql_type = diesel::sql_types::Nullable)] + embedding: Option>, + #[diesel(sql_type = diesel::sql_types::Float)] + confidence: f32, + #[diesel(sql_type = Text)] + status: String, + #[diesel(sql_type = BigInt)] + created_at: i64, + #[diesel(sql_type = BigInt)] + updated_at: i64, + #[diesel(sql_type = BigInt)] + fact_count: i64, + } + + let mut q = sql_query(select_sql).into_boxed(); + // Persona binds first (they're earlier in the SQL — inside + // the subquery LEFT JOIN). + match persona { + PersonaFilter::Single { + user_id, + persona_id, + } => { + q = q + .bind::(*user_id) + .bind::(persona_id.clone()); + } + PersonaFilter::All { user_id } => { + q = q.bind::(*user_id); + } + } + // Then WHERE binds in order. + for s in &bind_strs { + q = q.bind::(s.clone()); + } + // Then LIMIT / OFFSET. + q = q + .bind::(filter.limit) + .bind::(filter.offset); + + let rows: Vec = q + .load(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + + let pairs: Vec<(Entity, i64)> = rows + .into_iter() + .map(|r| { + ( + Entity { + id: r.id, + name: r.name, + entity_type: r.entity_type, + description: r.description, + embedding: r.embedding, + confidence: r.confidence, + status: r.status, + created_at: r.created_at, + updated_at: r.updated_at, + }, + r.fact_count, + ) + }) + .collect(); + + // Sink unused `_bind_types`; keeping it as documentation. + let _ = bind_types; + + Ok((pairs, total)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn get_predicate_stats( + &mut self, + cx: &opentelemetry::Context, + persona: &PersonaFilter, + limit: usize, + ) -> Result, DbError> { + trace_db_call(cx, "query", "get_predicate_stats", |_span| { + use diesel::sql_query; + use diesel::sql_types::{BigInt, Integer, Text}; + + // Active + reviewed only — rejected / superseded are + // already off the agent's read path so they shouldn't + // count toward "what predicates are noisy in production". + let where_sql = match persona { + PersonaFilter::Single { .. } => { + "WHERE user_id = ? AND persona_id = ? \ + AND status IN ('active','reviewed')" + } + PersonaFilter::All { .. } => { + "WHERE user_id = ? AND status IN ('active','reviewed')" + } + }; + let sql = format!( + "SELECT predicate, COUNT(*) AS cnt FROM entity_facts \ + {where_sql} \ + GROUP BY predicate \ + ORDER BY cnt DESC \ + LIMIT ?", + ); + + #[derive(diesel::QueryableByName)] + struct Row { + #[diesel(sql_type = Text)] + predicate: String, + #[diesel(sql_type = BigInt)] + cnt: i64, + } + + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + let mut q = sql_query(sql).into_boxed(); + match persona { + PersonaFilter::Single { user_id, persona_id } => { + q = q + .bind::(*user_id) + .bind::(persona_id.clone()); + } + PersonaFilter::All { user_id } => { + q = q.bind::(*user_id); + } + } + q = q.bind::(limit as i64); + + let rows: Vec = q + .load(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + Ok(rows.into_iter().map(|r| (r.predicate, r.cnt)).collect()) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn bulk_reject_facts_by_predicate( + &mut self, + cx: &opentelemetry::Context, + persona: &PersonaFilter, + target_predicate: &str, + audit: Option<(&str, &str)>, + ) -> Result { + trace_db_call(cx, "update", "bulk_reject_facts_by_predicate", |_span| { + use schema::entity_facts::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + let now = chrono::Utc::now().timestamp(); + let (audit_model, audit_backend) = match audit { + Some((m, b)) => (Some(m.to_string()), Some(b.to_string())), + None => (None, None), + }; + + // Persona scoping mirrors get_predicate_stats. Only ACTIVE + // rows flip — REVIEWED survives so the curator can preserve + // a hand-approved exception under the same predicate. + let touched = match persona { + PersonaFilter::Single { user_id: uid, persona_id: pid } => diesel::update( + entity_facts + .filter(predicate.eq(target_predicate)) + .filter(user_id.eq(*uid)) + .filter(persona_id.eq(pid)) + .filter(status.eq("active")), + ) + .set(( + status.eq("rejected"), + last_modified_by_model.eq(audit_model.clone()), + last_modified_by_backend.eq(audit_backend.clone()), + last_modified_at.eq(Some(now)), + )) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Bulk reject error: {}", e))?, + PersonaFilter::All { user_id: uid } => diesel::update( + entity_facts + .filter(predicate.eq(target_predicate)) + .filter(user_id.eq(*uid)) + .filter(status.eq("active")), + ) + .set(( + status.eq("rejected"), + last_modified_by_model.eq(audit_model.clone()), + last_modified_by_backend.eq(audit_backend.clone()), + last_modified_at.eq(Some(now)), + )) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Bulk reject error: {}", e))?, + }; + Ok(touched) + }) + .map_err(|_| DbError::new(DbErrorKind::UpdateError)) + } + + fn build_entity_graph( + &mut self, + cx: &opentelemetry::Context, + entity_type_filter: Option<&str>, + node_limit: usize, + persona: &PersonaFilter, + ) -> Result { + trace_db_call(cx, "query", "build_entity_graph", |_span| { + use diesel::sql_query; + use diesel::sql_types::{BigInt, Integer, Text}; + + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + // ── Nodes: entities with non-rejected facts under the + // active scope, plus their fact count. Cap to node_limit + // by count desc so the graph stays drawable; lower-count + // entities drop. Excludes 'rejected' entity rows too. + let (persona_filter_sql, persona_binds_count) = match persona { + PersonaFilter::Single { .. } => ( + "AND ef.user_id = ? AND ef.persona_id = ? AND ef.status NOT IN ('rejected','superseded')", + 2, + ), + PersonaFilter::All { .. } => ( + "AND ef.user_id = ? AND ef.status NOT IN ('rejected','superseded')", + 1, + ), + }; + + let mut where_parts: Vec<&str> = vec!["e.status != 'rejected'"]; + if entity_type_filter.is_some() { + where_parts.push("e.entity_type = ?"); + } + let where_clause = format!("WHERE {}", where_parts.join(" AND ")); + + // SQL: join entities to their (persona-scoped) fact count, + // sort by count desc, limit. Including entities with 0 + // facts would clutter the view — skip them via INNER JOIN + // (subquery on entity_facts) so only entities with at + // least one in-scope fact show up. + let node_sql = format!( + "SELECT e.id, e.name, e.entity_type, fc.fact_count \ + FROM entities e \ + INNER JOIN ( \ + SELECT subject_entity_id AS sid, COUNT(*) AS fact_count \ + FROM entity_facts ef \ + WHERE 1=1 {persona_filter_sql} \ + GROUP BY subject_entity_id \ + ) fc ON fc.sid = e.id \ + {where_clause} \ + ORDER BY fc.fact_count DESC, e.id ASC \ + LIMIT ?", + ); + + #[derive(diesel::QueryableByName)] + struct NodeRow { + #[diesel(sql_type = Integer)] + id: i32, + #[diesel(sql_type = Text)] + name: String, + #[diesel(sql_type = Text)] + entity_type: String, + #[diesel(sql_type = BigInt)] + fact_count: i64, + } + + let mut nq = sql_query(node_sql).into_boxed(); + // Persona binds (inside the subquery — earlier in the SQL). + match persona { + PersonaFilter::Single { user_id, persona_id } => { + nq = nq + .bind::(*user_id) + .bind::(persona_id.clone()); + } + PersonaFilter::All { user_id } => { + nq = nq.bind::(*user_id); + } + } + // Entity-type filter bind, if any. + if let Some(t) = entity_type_filter { + nq = nq.bind::(t.to_string()); + } + // LIMIT. + nq = nq.bind::(node_limit as i64); + + let node_rows: Vec = nq + .load(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Node query error: {}", e))?; + + let _ = persona_binds_count; // documentary + + let node_ids: std::collections::HashSet = + node_rows.iter().map(|r| r.id).collect(); + let nodes: Vec = node_rows + .into_iter() + .map(|r| GraphNode { + id: r.id, + name: r.name, + entity_type: r.entity_type, + fact_count: r.fact_count, + }) + .collect(); + + if nodes.is_empty() { + return Ok(EntityGraph { + nodes, + edges: Vec::new(), + }); + } + + // ── Edges: relational facts where BOTH subject and + // object survived the node cap. Grouped by (subject, + // object, predicate) so 3 "is_friend_of Bob" facts + // become one edge with count=3. + let id_list: Vec = node_ids.iter().map(|i| i.to_string()).collect(); + let in_clause = id_list.join(", "); + // Note: ids are i32, inlined safely; predicates use binds. + let (edge_persona_sql, _) = match persona { + PersonaFilter::Single { .. } => ( + "user_id = ? AND persona_id = ? AND status NOT IN ('rejected','superseded')", + 2, + ), + PersonaFilter::All { .. } => ( + "user_id = ? AND status NOT IN ('rejected','superseded')", + 1, + ), + }; + let edge_sql = format!( + "SELECT subject_entity_id, object_entity_id, predicate, COUNT(*) AS cnt \ + FROM entity_facts \ + WHERE {edge_persona_sql} \ + AND object_entity_id IS NOT NULL \ + AND subject_entity_id IN ({in_clause}) \ + AND object_entity_id IN ({in_clause}) \ + GROUP BY subject_entity_id, object_entity_id, predicate", + ); + + #[derive(diesel::QueryableByName)] + struct EdgeRow { + #[diesel(sql_type = Integer)] + subject_entity_id: i32, + #[diesel(sql_type = Integer)] + object_entity_id: i32, + #[diesel(sql_type = Text)] + predicate: String, + #[diesel(sql_type = BigInt)] + cnt: i64, + } + + let mut eq = sql_query(edge_sql).into_boxed(); + match persona { + PersonaFilter::Single { user_id, persona_id } => { + eq = eq + .bind::(*user_id) + .bind::(persona_id.clone()); + } + PersonaFilter::All { user_id } => { + eq = eq.bind::(*user_id); + } + } + let edge_rows: Vec = eq + .load(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Edge query error: {}", e))?; + + let edges: Vec = edge_rows + .into_iter() + .map(|r| GraphEdge { + source: r.subject_entity_id, + target: r.object_entity_id, + predicate: r.predicate, + count: r.cnt, + }) + .collect(); + + Ok(EntityGraph { nodes, edges }) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn find_consolidation_proposals( + &mut self, + cx: &opentelemetry::Context, + threshold: f32, + max_groups: usize, + ) -> Result, DbError> { + trace_db_call(cx, "query", "find_consolidation_proposals", |_span| { + use schema::entities::dsl::*; + + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + // Pull every non-rejected entity with an embedding. We + // keep 'reviewed' rows in the scan because pre-guard + // legacy data still needs cleanup even if the curator + // marked individual entities reviewed. + let rows: Vec = entities + .filter(embedding.is_not_null()) + .filter(status.ne("rejected")) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + + // Partition by entity_type so a person can't cluster + // with a place via coincidental embedding closeness. + let mut by_type: std::collections::HashMap> = + std::collections::HashMap::new(); + for (idx, e) in rows.iter().enumerate() { + by_type.entry(e.entity_type.clone()).or_default().push(idx); + } + + // Decode embeddings once. Skip rows that don't deserialize + // cleanly (corrupted or wrong-dim) rather than failing + // the whole scan. + let mut decoded: Vec>> = Vec::with_capacity(rows.len()); + for e in &rows { + let v = e + .embedding + .as_ref() + .and_then(|b| Self::deserialize_embedding(b).ok()) + .filter(|v| !v.is_empty()); + decoded.push(v); + } + + // Union-find for transitive clustering. + struct UF { + parent: Vec, + } + impl UF { + fn new(n: usize) -> Self { + UF { + parent: (0..n).collect(), + } + } + fn find(&mut self, x: usize) -> usize { + let mut r = x; + while self.parent[r] != r { + r = self.parent[r]; + } + let mut cur = x; + while self.parent[cur] != r { + let nxt = self.parent[cur]; + self.parent[cur] = r; + cur = nxt; + } + r + } + fn union(&mut self, a: usize, b: usize) { + let ra = self.find(a); + let rb = self.find(b); + if ra != rb { + self.parent[ra] = rb; + } + } + } + + let mut uf = UF::new(rows.len()); + let mut group_min: std::collections::HashMap = + std::collections::HashMap::new(); + let mut group_max: std::collections::HashMap = + std::collections::HashMap::new(); + + // Single pass: union and update per-component stats in + // one go. Stats are tracked per root; final pass after + // all unions corrects roots that moved. + type Edge = (usize, usize, f32); + let mut edges: Vec = Vec::new(); + for indices in by_type.values() { + for a in 0..indices.len() { + let ia = indices[a]; + let va = match &decoded[ia] { + Some(v) => v, + None => continue, + }; + for b in (a + 1)..indices.len() { + let ib = indices[b]; + let vb = match &decoded[ib] { + Some(v) => v, + None => continue, + }; + let sim = Self::cosine_similarity(va, vb); + if sim >= threshold { + uf.union(ia, ib); + edges.push((ia, ib, sim)); + } + } + } + } + // Second pass over the kept edges to populate stats by + // final root (post-union path compression). + for (a, _b, sim) in &edges { + let root = uf.find(*a); + let mn = group_min.entry(root).or_insert(*sim); + if *sim < *mn { + *mn = *sim; + } + let mx = group_max.entry(root).or_insert(*sim); + if *sim > *mx { + *mx = *sim; + } + } + + // Bucket entities by root component, skipping singletons. + let mut groups: std::collections::HashMap> = + std::collections::HashMap::new(); + for i in 0..rows.len() { + let root = uf.find(i); + if !group_min.contains_key(&root) { + continue; + } + groups.entry(root).or_default().push(i); + } + + let mut result: Vec = groups + .into_iter() + .filter(|(_, members)| members.len() >= 2) + .map(|(root, members)| ConsolidationGroup { + entities: members.into_iter().map(|i| rows[i].clone()).collect(), + min_cosine: *group_min.get(&root).unwrap_or(&0.0), + max_cosine: *group_max.get(&root).unwrap_or(&0.0), + }) + .collect(); + + // Biggest clusters first; tie-break on the strongest + // pair so the most-obvious dupes surface at the top. + result.sort_by(|a, b| { + b.entities.len().cmp(&a.entities.len()).then_with(|| { + b.max_cosine + .partial_cmp(&a.max_cosine) + .unwrap_or(std::cmp::Ordering::Equal) + }) + }); + result.truncate(max_groups); + Ok(result) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn get_persona_breakdowns_for_entities( + &mut self, + cx: &opentelemetry::Context, + entity_ids: &[i32], + user_id: i32, + ) -> Result>, DbError> { + trace_db_call(cx, "query", "get_persona_breakdowns", |_span| { + use diesel::sql_query; + use diesel::sql_types::{BigInt, Integer, Text}; + + if entity_ids.is_empty() { + return Ok(std::collections::HashMap::new()); + } + + // Build the `IN (?, ?, ?…)` placeholder list. We bind + // user_id first, then the entity ids. No real escape risk + // since the values are typed ints, but bound parameters + // are cleaner than format!() either way. + let placeholders = vec!["?"; entity_ids.len()].join(", "); + let sql = format!( + "SELECT subject_entity_id, persona_id, COUNT(*) AS cnt \ + FROM entity_facts \ + WHERE user_id = ? \ + AND status != 'rejected' \ + AND subject_entity_id IN ({}) \ + GROUP BY subject_entity_id, persona_id \ + ORDER BY subject_entity_id, persona_id", + placeholders + ); + + #[derive(diesel::QueryableByName)] + struct Row { + #[diesel(sql_type = Integer)] + subject_entity_id: i32, + #[diesel(sql_type = Text)] + persona_id: String, + #[diesel(sql_type = BigInt)] + cnt: i64, + } + + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + let mut q = sql_query(sql).into_boxed(); + q = q.bind::(user_id); + for id in entity_ids { + q = q.bind::(*id); + } + let rows: Vec = q + .load(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + + let mut out: std::collections::HashMap> = + std::collections::HashMap::with_capacity(entity_ids.len()); + for r in rows { + out.entry(r.subject_entity_id) + .or_default() + .push((r.persona_id, r.cnt)); + } + Ok(out) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + fn update_entity_status( &mut self, cx: &opentelemetry::Context, @@ -544,12 +1480,41 @@ impl KnowledgeDao for SqliteKnowledgeDao { trace_db_call(cx, "delete", "delete_entity", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); - diesel::delete(entities.filter(id.eq(entity_id))) - .execute(conn.deref_mut()) - .map(|_| ()) - .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) + + // entity_facts has a CHECK constraint requiring + // `object_entity_id IS NOT NULL OR object_value IS NOT NULL`. + // The FK on object_entity_id is ON DELETE SET NULL — but + // facts that pointed at the deleted entity *only* via the + // entity reference (the common case for relational facts + // like "Alice is_friend_of Bob") have no object_value, so + // SET NULL would leave them with both NULLs and the CHECK + // aborts the whole DELETE. Pre-delete those facts in a + // transaction so the CASCADE / SET NULL chain on what + // remains can fire cleanly. + // + // Long-term fix is to change the FK to ON DELETE CASCADE + // via a table-rebuild migration, but the DAO-side workaround + // is sufficient and less invasive. + conn.transaction::<(), diesel::result::Error, _>(|conn| { + use schema::entity_facts::dsl as ef; + diesel::delete( + ef::entity_facts + .filter(ef::object_entity_id.eq(entity_id)) + .filter(ef::object_value.is_null()), + ) + .execute(conn)?; + + diesel::delete(entities.filter(id.eq(entity_id))).execute(conn)?; + Ok(()) + }) + .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) + }) + .map_err(|e| { + // Surface the actual diesel error string before collapsing + // to the opaque DbErrorKind::QueryError. + log::warn!("delete_entity({}) failed: {}", entity_id, e); + DbError::new(DbErrorKind::QueryError) }) - .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn merge_entities( @@ -682,7 +1647,10 @@ impl KnowledgeDao for SqliteKnowledgeDao { .filter(status.ne("rejected")) .filter(user_id.eq(persona.user_id())) .into_boxed(); - if let PersonaFilter::Single { persona_id: pid, .. } = persona { + if let PersonaFilter::Single { + persona_id: pid, .. + } = persona + { q = q.filter(persona_id.eq(pid.clone())); } q.load::(conn.deref_mut()) @@ -722,7 +1690,11 @@ impl KnowledgeDao for SqliteKnowledgeDao { query = query.filter(predicate.eq(pred)); count_query = count_query.filter(predicate.eq(pred)); } - if let PersonaFilter::Single { persona_id: ref pid, .. } = filter.persona { + if let PersonaFilter::Single { + persona_id: ref pid, + .. + } = filter.persona + { query = query.filter(persona_id.eq(pid.clone())); count_query = count_query.filter(persona_id.eq(pid.clone())); } @@ -749,34 +1721,72 @@ impl KnowledgeDao for SqliteKnowledgeDao { cx: &opentelemetry::Context, fact_id: i32, patch: FactPatch, + audit: Option<(&str, &str)>, ) -> Result, DbError> { trace_db_call(cx, "update", "update_fact", |_span| { use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + let mut touched = false; if let Some(ref new_predicate) = patch.predicate { diesel::update(entity_facts.filter(id.eq(fact_id))) .set(predicate.eq(new_predicate)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; + touched = true; } if let Some(ref new_value) = patch.object_value { diesel::update(entity_facts.filter(id.eq(fact_id))) .set(object_value.eq(new_value)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; + touched = true; } if let Some(ref new_status) = patch.status { diesel::update(entity_facts.filter(id.eq(fact_id))) .set(status.eq(new_status)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; + touched = true; } if let Some(new_confidence) = patch.confidence { diesel::update(entity_facts.filter(id.eq(fact_id))) .set(confidence.eq(new_confidence)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; + touched = true; + } + if let Some(new_from) = patch.valid_from { + diesel::update(entity_facts.filter(id.eq(fact_id))) + .set(valid_from.eq(new_from)) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; + touched = true; + } + if let Some(new_until) = patch.valid_until { + diesel::update(entity_facts.filter(id.eq(fact_id))) + .set(valid_until.eq(new_until)) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; + touched = true; + } + + // Only stamp the audit columns if we actually changed + // something — empty patches stay quiet. + if touched { + let now = chrono::Utc::now().timestamp(); + let (model_str, backend_str) = match audit { + Some((m, b)) => (Some(m.to_string()), Some(b.to_string())), + None => (None, None), + }; + diesel::update(entity_facts.filter(id.eq(fact_id))) + .set(( + last_modified_by_model.eq(model_str), + last_modified_by_backend.eq(backend_str), + last_modified_at.eq(Some(now)), + )) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Audit-stamp error: {}", e))?; } entity_facts @@ -814,12 +1824,166 @@ impl KnowledgeDao for SqliteKnowledgeDao { trace_db_call(cx, "delete", "delete_fact", |_span| { use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); - diesel::delete(entity_facts.filter(id.eq(fact_id))) - .execute(conn.deref_mut()) - .map(|_| ()) - .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) + // Clear dangling supersession pointers from any fact this + // one had retired — there's no FK on superseded_by (SQLite + // can't ALTER ADD with REFERENCES) so we do it manually. + // Sibling rows lose the pointer but stay 'superseded' — + // the user's historical correction survives the cleanup. + conn.transaction::<(), diesel::result::Error, _>(|conn| { + diesel::update(entity_facts.filter(superseded_by.eq(fact_id))) + .set(superseded_by.eq::>(None)) + .execute(conn)?; + diesel::delete(entity_facts.filter(id.eq(fact_id))).execute(conn)?; + Ok(()) + }) + .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) + }) + .map_err(|e| { + log::warn!("delete_fact({}) failed: {}", fact_id, e); + DbError::new(DbErrorKind::QueryError) + }) + } + + fn supersede_fact( + &mut self, + cx: &opentelemetry::Context, + old_id: i32, + new_id: i32, + audit: Option<(&str, &str)>, + ) -> Result, DbError> { + trace_db_call(cx, "update", "supersede_fact", |_span| { + use schema::entity_facts::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + if old_id == new_id { + return Err(anyhow::anyhow!( + "supersede_fact: old_id and new_id must differ" + )); + } + + let now = chrono::Utc::now().timestamp(); + let (audit_model, audit_backend) = match audit { + Some((m, b)) => (Some(m.to_string()), Some(b.to_string())), + None => (None, None), + }; + + conn.transaction::, diesel::result::Error, _>(|conn| { + // Pull the new fact's valid_from so we can close + // the old fact's interval at the same point. + let new_fact: Option = entity_facts + .filter(id.eq(new_id)) + .first::(conn) + .optional()?; + let Some(new_fact) = new_fact else { + return Ok(None); + }; + + // Verify the old fact exists before touching it — + // returning None lets the handler 404 cleanly. + let old_fact: Option = entity_facts + .filter(id.eq(old_id)) + .first::(conn) + .optional()?; + if old_fact.is_none() { + return Ok(None); + } + + // Only stamp valid_until if the user hasn't + // already set it — respecting hand-curated bounds. + let target_valid_until = old_fact + .as_ref() + .and_then(|f| f.valid_until) + .or(new_fact.valid_from); + + diesel::update(entity_facts.filter(id.eq(old_id))) + .set(( + status.eq("superseded"), + superseded_by.eq(Some(new_id)), + valid_until.eq(target_valid_until), + last_modified_by_model.eq(audit_model.clone()), + last_modified_by_backend.eq(audit_backend.clone()), + last_modified_at.eq(Some(now)), + )) + .execute(conn)?; + + entity_facts + .filter(id.eq(old_id)) + .first::(conn) + .optional() + }) + .map_err(|e| anyhow::anyhow!("Supersede error: {}", e)) + }) + .map_err(|e| { + log::warn!( + "supersede_fact(old={}, new={}) failed: {}", + old_id, + new_id, + e + ); + DbError::new(DbErrorKind::UpdateError) + }) + } + + fn revert_supersession( + &mut self, + cx: &opentelemetry::Context, + fact_id: i32, + audit: Option<(&str, &str)>, + ) -> Result, DbError> { + trace_db_call(cx, "update", "revert_supersession", |_span| { + use schema::entity_facts::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + // Verify the fact exists and was in fact superseded — + // reverting an already-active fact is a no-op and the + // handler can 404 / 409 on the None. + let existing: Option = entity_facts + .filter(id.eq(fact_id)) + .first::(conn.deref_mut()) + .optional() + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + let Some(row) = existing else { + return Ok(None); + }; + if row.status != "superseded" && row.superseded_by.is_none() { + // Not superseded — nothing to revert. Returning the + // current row is friendlier than 404 here; the + // handler decides what status to return. + return Ok(Some(row)); + } + + let now = chrono::Utc::now().timestamp(); + let (audit_model, audit_backend) = match audit { + Some((m, b)) => (Some(m.to_string()), Some(b.to_string())), + None => (None, None), + }; + + diesel::update(entity_facts.filter(id.eq(fact_id))) + .set(( + status.eq("active"), + superseded_by.eq::>(None), + // Clear the auto-stamped valid_until. If the user + // had hand-set it pre-supersede we don't have a + // way to know — accepting the loss as the cost of + // a clean revert. Curator can re-bound after. + valid_until.eq::>(None), + last_modified_by_model.eq(audit_model), + last_modified_by_backend.eq(audit_backend), + last_modified_at.eq(Some(now)), + )) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Revert error: {}", e))?; + + entity_facts + .filter(id.eq(fact_id)) + .first::(conn.deref_mut()) + .optional() + .map_err(|e| anyhow::anyhow!("Query error: {}", e)) + }) + .map_err(|e| { + log::warn!("revert_supersession({}) failed: {}", fact_id, e); + DbError::new(DbErrorKind::UpdateError) }) - .map_err(|_| DbError::new(DbErrorKind::QueryError)) } // ----------------------------------------------------------------------- @@ -924,7 +2088,10 @@ impl KnowledgeDao for SqliteKnowledgeDao { .filter(ef::created_at.gt(since)) .filter(ef::user_id.eq(persona.user_id())) .into_boxed(); - if let PersonaFilter::Single { persona_id: pid, .. } = persona { + if let PersonaFilter::Single { + persona_id: pid, .. + } = persona + { facts_q = facts_q.filter(ef::persona_id.eq(pid.clone())); } let recent_facts = facts_q @@ -1004,6 +2171,8 @@ mod tests { include_all_memories: false, created_at: 0, updated_at: 0, + reviewed_only_facts: false, + allow_agent_corrections: false, }) .execute(c.deref_mut()) .unwrap(); @@ -1051,6 +2220,14 @@ mod tests { created_at: 0, persona_id: persona_id.to_string(), user_id, + valid_from: None, + valid_until: None, + superseded_by: None, + created_by_model: None, + created_by_backend: None, + last_modified_by_model: None, + last_modified_by_backend: None, + last_modified_at: None, }, ) .unwrap(); @@ -1072,7 +2249,14 @@ mod tests { let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); let entity = make_entity(&mut dao, "Cabin"); - add_fact(&mut dao, entity.id, "located_in", "Vermont", alice, "default"); + add_fact( + &mut dao, + entity.id, + "located_in", + "Vermont", + alice, + "default", + ); add_fact(&mut dao, entity.id, "color", "red", bob, "default"); let alice_view = dao @@ -1179,8 +2363,22 @@ mod tests { let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); let entity = make_entity(&mut dao, "Cabin"); - add_fact(&mut dao, entity.id, "p_alice_default", "x", alice, "default"); - add_fact(&mut dao, entity.id, "p_alice_journal", "y", alice, "journal"); + add_fact( + &mut dao, + entity.id, + "p_alice_default", + "x", + alice, + "default", + ); + add_fact( + &mut dao, + entity.id, + "p_alice_journal", + "y", + alice, + "journal", + ); add_fact(&mut dao, entity.id, "p_bob_journal", "z", bob, "journal"); // Delete alice's journal persona — CASCADE should remove only @@ -1269,6 +2467,14 @@ mod tests { created_at: 0, persona_id: "ghost".to_string(), user_id: alice, + valid_from: None, + valid_until: None, + superseded_by: None, + created_by_model: None, + created_by_backend: None, + last_modified_by_model: None, + last_modified_by_backend: None, + last_modified_at: None, }, ); assert!( @@ -1276,4 +2482,359 @@ mod tests { "FK should reject fact whose persona doesn't exist" ); } + + #[test] + fn supersede_fact_links_and_stamps_valid_until() { + // Supersession: marking an old fact as replaced by a new one + // flips its status to 'superseded', points superseded_by at + // the new fact, and stamps valid_until from the new fact's + // valid_from (when not already set). Pre-existing valid_until + // on the old fact is respected. + let cx = opentelemetry::Context::new(); + let conn = connection_with_fks_on(); + let alice = create_user(&conn, "alice"); + create_persona_row(&conn, alice, "default"); + + let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); + let cameron = make_entity(&mut dao, "Cameron"); + let old = add_fact( + &mut dao, + cameron.id, + "is_in_relationship_with", + "X", + alice, + "default", + ); + // The new fact carries a valid_from we expect to be stamped + // onto the old fact's valid_until. + let new = add_fact( + &mut dao, + cameron.id, + "is_in_relationship_with", + "Y", + alice, + "default", + ); + dao.update_fact( + &cx, + new.id, + FactPatch { + predicate: None, + object_value: None, + status: None, + confidence: None, + valid_from: Some(Some(1640995200)), // 2022-01-01 + valid_until: None, + }, + None, + ) + .unwrap(); + + let updated = dao + .supersede_fact(&cx, old.id, new.id, None) + .unwrap() + .expect("supersede returned None"); + + assert_eq!(updated.status, "superseded"); + assert_eq!(updated.superseded_by, Some(new.id)); + assert_eq!(updated.valid_until, Some(1640995200)); + } + + #[test] + fn delete_fact_clears_dangling_supersession_pointers() { + // Deleting the newer fact (the supersedeR) leaves the older + // fact's superseded_by dangling — the DAO clears it back to + // NULL in the same transaction so the column never points at + // a missing row. The old fact's status stays 'superseded' + // because the historical correction is still meaningful. + let cx = opentelemetry::Context::new(); + let conn = connection_with_fks_on(); + let alice = create_user(&conn, "alice"); + create_persona_row(&conn, alice, "default"); + + let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); + let cameron = make_entity(&mut dao, "Cameron"); + let old = add_fact(&mut dao, cameron.id, "lives_in", "NYC", alice, "default"); + let new = add_fact(&mut dao, cameron.id, "lives_in", "SF", alice, "default"); + + dao.supersede_fact(&cx, old.id, new.id, None) + .unwrap() + .unwrap(); + dao.delete_fact(&cx, new.id).unwrap(); + + let rehydrated = dao + .list_facts( + &cx, + FactFilter { + entity_id: Some(cameron.id), + // "all" — the old fact is 'superseded' now, so the + // default 'active' scope would skip it. + status: Some("all".to_string()), + predicate: None, + persona: PersonaFilter::Single { + user_id: alice, + persona_id: "default".to_string(), + }, + limit: 10, + offset: 0, + }, + ) + .unwrap() + .0; + let old_row = rehydrated.iter().find(|f| f.id == old.id).unwrap(); + assert_eq!( + old_row.superseded_by, None, + "dangling supersession pointer should be cleared" + ); + assert_eq!( + old_row.status, "superseded", + "historical status should survive the supersederr delete" + ); + } + + #[test] + fn update_fact_can_set_and_clear_valid_time() { + // FactPatch.valid_from / valid_until are Option> + // so PATCH can distinguish "leave alone" (None) from "set to + // value" (Some(Some(n))) and "clear back to NULL" (Some(None)). + let cx = opentelemetry::Context::new(); + let conn = connection_with_fks_on(); + let alice = create_user(&conn, "alice"); + create_persona_row(&conn, alice, "default"); + + let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); + let cameron = make_entity(&mut dao, "Cameron"); + let fact = add_fact( + &mut dao, + cameron.id, + "is_in_relationship_with", + "Alex", + alice, + "default", + ); + assert_eq!(fact.valid_from, None); + assert_eq!(fact.valid_until, None); + + // Set both bounds. + let updated = dao + .update_fact( + &cx, + fact.id, + FactPatch { + predicate: None, + object_value: None, + status: None, + confidence: None, + valid_from: Some(Some(1577836800)), // 2020-01-01 + valid_until: Some(Some(1640995200)), // 2022-01-01 + }, + None, + ) + .unwrap() + .unwrap(); + assert_eq!(updated.valid_from, Some(1577836800)); + assert_eq!(updated.valid_until, Some(1640995200)); + + // Leave alone: omit both — values persist. + let still = dao + .update_fact( + &cx, + fact.id, + FactPatch { + predicate: None, + object_value: None, + status: None, + confidence: None, + valid_from: None, + valid_until: None, + }, + None, + ) + .unwrap() + .unwrap(); + assert_eq!(still.valid_from, Some(1577836800)); + assert_eq!(still.valid_until, Some(1640995200)); + + // Clear valid_until back to NULL (relationship ongoing again). + let cleared = dao + .update_fact( + &cx, + fact.id, + FactPatch { + predicate: None, + object_value: None, + status: None, + confidence: None, + valid_from: None, + valid_until: Some(None), + }, + None, + ) + .unwrap() + .unwrap(); + assert_eq!(cleared.valid_from, Some(1577836800)); + assert_eq!(cleared.valid_until, None); + } + + #[test] + fn delete_entity_clears_relational_facts_that_would_violate_check() { + // entity_facts has a CHECK that at least one of object_entity_id / + // object_value is non-null. The FK on object_entity_id is + // ON DELETE SET NULL, which would leave purely-relational facts + // (subject + predicate + object_entity_id, no object_value) + // with both nulls and abort the delete. The DAO pre-deletes + // those rows in a transaction so the parent delete can succeed. + let cx = opentelemetry::Context::new(); + let conn = connection_with_fks_on(); + let alice = create_user(&conn, "alice"); + create_persona_row(&conn, alice, "default"); + + let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); + let bob = make_entity(&mut dao, "Bob"); + let carol = make_entity(&mut dao, "Carol"); + + // A relational fact where Carol is the object — exactly the + // shape the CHECK + SET NULL combination would otherwise break. + let (rel_fact, _) = dao + .upsert_fact( + &cx, + InsertEntityFact { + subject_entity_id: bob.id, + predicate: "is_friend_of".to_string(), + object_entity_id: Some(carol.id), + object_value: None, + source_photo: None, + source_insight_id: None, + confidence: 0.6, + status: "active".to_string(), + created_at: 0, + persona_id: "default".to_string(), + user_id: alice, + valid_from: None, + valid_until: None, + superseded_by: None, + created_by_model: None, + created_by_backend: None, + last_modified_by_model: None, + last_modified_by_backend: None, + last_modified_at: None, + }, + ) + .unwrap(); + + // A typed fact where Bob is the subject — should survive. + add_fact(&mut dao, bob.id, "has_age", "30", alice, "default"); + + // Delete Carol — should succeed (relational fact pre-deleted). + dao.delete_entity(&cx, carol.id).unwrap(); + + assert!( + dao.get_entity_by_id(&cx, carol.id).unwrap().is_none(), + "Carol should be deleted" + ); + // The relational fact about Carol should be gone (pre-deleted by + // the DAO's transaction, not SET NULL'd). + let bob_facts = dao + .get_facts_for_entity( + &cx, + bob.id, + &PersonaFilter::Single { + user_id: alice, + persona_id: "default".to_string(), + }, + ) + .unwrap(); + assert!( + !bob_facts.iter().any(|f| f.id == rel_fact.id), + "relational fact pointing at Carol should be removed" + ); + // The typed fact survives. + assert!( + bob_facts.iter().any(|f| f.predicate == "has_age"), + "typed fact about Bob should survive Carol's deletion" + ); + } + + #[test] + fn upsert_entity_collapses_near_duplicate_by_embedding() { + // The agent's pre-flight check uses FTS5 prefix tokens, which + // miss "Sarah" / "Sara" / "Sarah J." pairs. The DAO upsert is + // the safety net: if no exact (name, type) match but the new + // entity's embedding sits above the cosine threshold against an + // existing same-type entity, we collapse instead of inserting. + let cx = opentelemetry::Context::new(); + let conn = connection_with_fks_on(); + let mut dao = SqliteKnowledgeDao::from_connection(conn.clone()); + + let mut emb_a = vec![0.0_f32; 64]; + emb_a[0] = 1.0; + emb_a[1] = 0.5; + let mut emb_b_near = emb_a.clone(); + emb_b_near[2] = 0.05; // nudge — cosine still well above 0.92 + + // Seed an existing entity with the embedding. + let seeded = dao + .upsert_entity( + &cx, + InsertEntity { + name: "Sarah".to_string(), + entity_type: "person".to_string(), + description: "tagged friend".to_string(), + embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_a)), + confidence: 0.6, + status: "active".to_string(), + created_at: 0, + updated_at: 0, + }, + ) + .unwrap(); + + // A "different name" with a near-identical embedding should + // collapse onto the existing row, not create a new entity. + let collapsed = dao + .upsert_entity( + &cx, + InsertEntity { + name: "Sara".to_string(), + entity_type: "person".to_string(), + description: "tagged friend".to_string(), + embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_b_near)), + confidence: 0.6, + status: "active".to_string(), + created_at: 0, + updated_at: 0, + }, + ) + .unwrap(); + + assert_eq!( + collapsed.id, seeded.id, + "near-duplicate by cosine should reuse the existing entity id" + ); + + // And a clearly-different embedding under a different name should + // still create a new row. + let mut emb_unrelated = vec![0.0_f32; 64]; + emb_unrelated[10] = 1.0; + let distinct = dao + .upsert_entity( + &cx, + InsertEntity { + name: "Bob".to_string(), + entity_type: "person".to_string(), + description: String::new(), + embedding: Some(SqliteKnowledgeDao::serialize_embedding(&emb_unrelated)), + confidence: 0.6, + status: "active".to_string(), + created_at: 0, + updated_at: 0, + }, + ) + .unwrap(); + + assert_ne!( + distinct.id, seeded.id, + "unrelated embedding should not collapse" + ); + } } diff --git a/src/database/mod.rs b/src/database/mod.rs index d5dd9cb..2e919f6 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -59,8 +59,8 @@ pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao}; pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao}; pub use insights_dao::{InsightDao, SqliteInsightDao}; pub use knowledge_dao::{ - EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, RecentActivity, - SqliteKnowledgeDao, + ConsolidationGroup, EntityFilter, EntityGraph, EntityPatch, EntitySort, FactFilter, FactPatch, + KnowledgeDao, PersonaFilter, RecentActivity, SqliteKnowledgeDao, }; pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao}; pub use persona_dao::{ImportPersona, PersonaDao, PersonaPatch, SqlitePersonaDao}; diff --git a/src/database/models.rs b/src/database/models.rs index 479b631..b5e1c1e 100644 --- a/src/database/models.rs +++ b/src/database/models.rs @@ -249,6 +249,33 @@ pub struct InsertEntityFact { /// persona must not see each other's facts. Always paired with /// `persona_id` — they're a unit. pub user_id: i32, + /// Real-world period the fact is/was true (unix seconds). NULL on + /// either side = unbounded — `valid_from IS NULL` reads as + /// "always-true-back-to-the-beginning", `valid_until IS NULL` as + /// "still-true-now-or-unknown". Distinguishes valid time from + /// transaction time (`created_at` is when we recorded the fact, + /// not when it was true in the world). See migration + /// 2026-05-10-000100. + pub valid_from: Option, + pub valid_until: Option, + /// Points at the entity_facts.id that replaced this one. Set by + /// the supersede endpoint; status flips to 'superseded' in the + /// same transaction. See migration 2026-05-10-000200. + pub superseded_by: Option, + /// Provenance for model audit — see migration 2026-05-10-000300. + /// `created_by_model` is the LLM identifier (e.g. "qwen2.5:7b", + /// "anthropic/claude-sonnet-4") or NULL for legacy / manual rows. + /// `created_by_backend` is "local" / "hybrid" / "manual" / NULL. + pub created_by_model: Option, + pub created_by_backend: Option, + /// Audit trail for mutations after creation — see migration + /// 2026-05-10-000500. `last_modified_*` stamp on any update + /// (status flip, valid-time edit, supersede, manual PATCH); + /// `last_modified_at` is unix seconds. NULL on rows that have + /// never been touched since creation. + pub last_modified_by_model: Option, + pub last_modified_by_backend: Option, + pub last_modified_at: Option, } #[derive(Serialize, Queryable, Clone, Debug)] @@ -265,6 +292,14 @@ pub struct EntityFact { pub created_at: i64, pub persona_id: String, pub user_id: i32, + pub valid_from: Option, + pub valid_until: Option, + pub superseded_by: Option, + pub created_by_model: Option, + pub created_by_backend: Option, + pub last_modified_by_model: Option, + pub last_modified_by_backend: Option, + pub last_modified_at: Option, } #[derive(Insertable)] @@ -300,6 +335,15 @@ pub struct InsertPersona<'a> { pub include_all_memories: bool, pub created_at: i64, pub updated_at: i64, + /// "Strict mode" — agent reads only see facts with status = + /// 'reviewed' (human-verified). Default false. See migration + /// 2026-05-10-000400. + pub reviewed_only_facts: bool, + /// Gate for the agent's update_fact / supersede_fact tools. + /// Default false — fresh personas let the agent create but not + /// alter or replace. Operator opts in once a model has earned + /// trust. See migration 2026-05-10-000500. + pub allow_agent_corrections: bool, } #[derive(Serialize, Queryable, Clone, Debug)] @@ -313,6 +357,8 @@ pub struct Persona { pub include_all_memories: bool, pub created_at: i64, pub updated_at: i64, + pub reviewed_only_facts: bool, + pub allow_agent_corrections: bool, } #[derive(Insertable)] diff --git a/src/database/persona_dao.rs b/src/database/persona_dao.rs index 8ea404d..4924244 100644 --- a/src/database/persona_dao.rs +++ b/src/database/persona_dao.rs @@ -17,6 +17,8 @@ pub struct PersonaPatch { pub name: Option, pub system_prompt: Option, pub include_all_memories: Option, + pub reviewed_only_facts: Option, + pub allow_agent_corrections: Option, } /// One row of a bulk migration upload. Fields named to match the JSON @@ -164,6 +166,8 @@ impl PersonaDao for SqlitePersonaDao { include_all_memories: include_all, created_at: now, updated_at: now, + reviewed_only_facts: false, + allow_agent_corrections: false, }) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Insert error: {}", e))?; @@ -211,6 +215,24 @@ impl PersonaDao for SqlitePersonaDao { .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update include_all error: {}", e))?; } + if let Some(new_reviewed_only) = patch.reviewed_only_facts { + diesel::update(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid))) + .set(( + reviewed_only_facts.eq(new_reviewed_only), + updated_at.eq(now), + )) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Update reviewed_only_facts error: {}", e))?; + } + if let Some(new_allow_corrections) = patch.allow_agent_corrections { + diesel::update(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid))) + .set(( + allow_agent_corrections.eq(new_allow_corrections), + updated_at.eq(now), + )) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Update allow_agent_corrections error: {}", e))?; + } personas .filter(user_id.eq(uid)) @@ -384,6 +406,8 @@ mod tests { name: Some("Renamed".into()), system_prompt: Some("new prompt".into()), include_all_memories: None, + reviewed_only_facts: None, + allow_agent_corrections: None, }, ) .unwrap() @@ -412,6 +436,8 @@ mod tests { name: None, system_prompt: None, include_all_memories: Some(true), + reviewed_only_facts: None, + allow_agent_corrections: None, }, ) .unwrap() diff --git a/src/database/schema.rs b/src/database/schema.rs index d93d583..d001d80 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -59,6 +59,14 @@ diesel::table! { created_at -> BigInt, persona_id -> Text, user_id -> Integer, + valid_from -> Nullable, + valid_until -> Nullable, + superseded_by -> Nullable, + created_by_model -> Nullable, + created_by_backend -> Nullable, + last_modified_by_model -> Nullable, + last_modified_by_backend -> Nullable, + last_modified_at -> Nullable, } } @@ -172,6 +180,8 @@ diesel::table! { include_all_memories -> Bool, created_at -> BigInt, updated_at -> BigInt, + reviewed_only_facts -> Bool, + allow_agent_corrections -> Bool, } } diff --git a/src/faces.rs b/src/faces.rs index 1dc6c6c..c81a270 100644 --- a/src/faces.rs +++ b/src/faces.rs @@ -2992,9 +2992,12 @@ mod tests { status: "detected".into(), model_version: "buffalo_l".into(), }; - dao.store_detection(&ctx(), mk_row("a1", Some(alice.id))).unwrap(); - dao.store_detection(&ctx(), mk_row("a2", Some(alice.id))).unwrap(); - dao.store_detection(&ctx(), mk_row("b1", Some(bob.id))).unwrap(); + dao.store_detection(&ctx(), mk_row("a1", Some(alice.id))) + .unwrap(); + dao.store_detection(&ctx(), mk_row("a2", Some(alice.id))) + .unwrap(); + dao.store_detection(&ctx(), mk_row("b1", Some(bob.id))) + .unwrap(); dao.store_detection(&ctx(), mk_row("u1", None)).unwrap(); // person_id=alice returns only alice's two faces — ignoring the @@ -3004,9 +3007,11 @@ mod tests { .list_embeddings(&ctx(), None, true, Some(alice.id), 100, 0) .unwrap(); assert_eq!(alice_rows.len(), 2); - assert!(alice_rows - .iter() - .all(|(r, _)| r.person_id == Some(alice.id))); + assert!( + alice_rows + .iter() + .all(|(r, _)| r.person_id == Some(alice.id)) + ); // unassigned=true with no person_id behaves as before. let unassigned_rows = dao diff --git a/src/knowledge.rs b/src/knowledge.rs index 30ceabd..4c3f5a8 100644 --- a/src/knowledge.rs +++ b/src/knowledge.rs @@ -5,11 +5,13 @@ use serde::{Deserialize, Serialize}; use std::sync::Mutex; use crate::data::Claims; -use crate::database::models::{Entity, EntityFact, EntityPhotoLink}; +use crate::database::models::{Entity, EntityFact, EntityPhotoLink, InsertEntityFact}; use crate::database::{ - EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, RecentActivity, + ConsolidationGroup, EntityFilter, EntityGraph, EntityPatch, EntitySort, FactFilter, FactPatch, + KnowledgeDao, PersonaFilter, RecentActivity, }; use crate::personas::PersonaDaoData; +use crate::state::AppState; /// Resolve the `X-Persona-Id` header into a `PersonaFilter`. Missing /// header → `'default'`. If the persona has `include_all_memories=true`, @@ -57,6 +59,24 @@ pub struct EntitySummary { pub status: String, pub created_at: i64, pub updated_at: i64, + /// Persona-scoped count of non-rejected facts about this entity + /// (subject side). 0 when not provided by the call site, e.g. + /// PATCH responses return the bare entity without scoping context. + #[serde(skip_serializing_if = "Option::is_none")] + pub fact_count: Option, + /// Per-persona breakdown of fact counts for this entity, scoped + /// to the active user. Lets the curation UI surface "this entity + /// is empty under your active persona but has 12 facts in + /// journal" so you know which persona owns the existing + /// knowledge. Skipped on serialization when None. + #[serde(skip_serializing_if = "Option::is_none")] + pub persona_breakdown: Option>, +} + +#[derive(Serialize)] +pub struct PersonaCount { + pub persona_id: String, + pub count: i64, } impl From for EntitySummary { @@ -70,10 +90,30 @@ impl From for EntitySummary { status: e.status, created_at: e.created_at, updated_at: e.updated_at, + fact_count: None, + persona_breakdown: None, } } } +impl EntitySummary { + fn from_entity_with_count(e: Entity, fact_count: i64) -> Self { + let mut s = EntitySummary::from(e); + s.fact_count = Some(fact_count); + s + } + + fn with_persona_breakdown(mut self, breakdown: Vec<(String, i64)>) -> Self { + self.persona_breakdown = Some( + breakdown + .into_iter() + .map(|(persona_id, count)| PersonaCount { persona_id, count }) + .collect(), + ); + self + } +} + #[derive(Serialize)] pub struct EntityListResponse { pub entities: Vec, @@ -94,10 +134,41 @@ pub struct FactDetail { pub source_photo: Option, pub source_insight_id: Option, pub created_at: i64, + /// Real-world valid-time interval. NULL on either side means + /// unbounded; both NULL = "always true" / validity unknown. + /// Distinct from `created_at` (transaction time — when we + /// recorded it). See migration 2026-05-10-000100. + pub valid_from: Option, + pub valid_until: Option, + /// Points at the entity_facts.id that replaced this one (Phase 2 + /// supersession, migration 2026-05-10-000200). Only set when + /// status == 'superseded'. + pub superseded_by: Option, + /// Provenance — see migration 2026-05-10-000300. NULL on legacy + /// rows. `created_by_backend` is "local" / "hybrid" / "manual". + pub created_by_model: Option, + pub created_by_backend: Option, + /// Audit trail — see migration 2026-05-10-000500. Set on any + /// post-creation mutation. NULL on rows that have never been + /// touched after they were first written. + pub last_modified_by_model: Option, + pub last_modified_by_backend: Option, + pub last_modified_at: Option, + /// Set when another active fact has the same subject+predicate, + /// a different object, AND their valid-time intervals overlap. + /// Detected at read time by the get_entity handler grouping + /// facts by predicate. Some predicates are legitimately + /// multi-valued ("tagged_in", "friend_of") so this is a *signal* + /// for the curator, not a hard invariant. The interval check + /// keeps "lives_in NYC 2018-2020" + "lives_in SF 2020-present" + /// from false-positive flagging. + #[serde(skip_serializing_if = "std::ops::Not::not")] + pub in_conflict: bool, } #[derive(Serialize)] pub struct PhotoLinkDetail { + pub library_id: i32, pub file_path: String, pub role: String, } @@ -105,6 +176,7 @@ pub struct PhotoLinkDetail { impl From for PhotoLinkDetail { fn from(l: EntityPhotoLink) -> Self { PhotoLinkDetail { + library_id: l.library_id, file_path: l.file_path, role: l.role, } @@ -123,6 +195,11 @@ pub struct EntityDetailResponse { pub updated_at: i64, pub facts: Vec, pub photo_links: Vec, + /// Per-persona fact counts for the active user. Mirrors the + /// same field on EntitySummary; the detail panel surfaces a + /// clickable list so the curator can switch to the persona + /// that owns existing facts about this entity. + pub persona_breakdown: Vec, } #[derive(Serialize)] @@ -171,12 +248,58 @@ pub struct EntityPatchRequest { pub confidence: Option, } +/// Serde helper for the "tri-state" pattern: distinguish "field +/// omitted" from "field sent as null". Used for nullable columns +/// where we want PATCH to support both "leave alone" and "set NULL". +fn deserialize_optional_nullable_i64<'de, D>(d: D) -> Result>, D::Error> +where + D: serde::Deserializer<'de>, +{ + Ok(Some(Option::::deserialize(d)?)) +} + #[derive(Deserialize)] pub struct FactPatchRequest { pub predicate: Option, pub object_value: Option, pub status: Option, pub confidence: Option, + /// Tri-state: missing = leave alone, null = clear to NULL, number + /// = set. See `deserialize_optional_nullable_i64`. + #[serde(default, deserialize_with = "deserialize_optional_nullable_i64")] + pub valid_from: Option>, + #[serde(default, deserialize_with = "deserialize_optional_nullable_i64")] + pub valid_until: Option>, +} + +#[derive(Deserialize)] +pub struct SupersedeRequest { + /// The id of the new fact that replaces the path-params one. + pub by_fact_id: i32, +} + +#[derive(Deserialize)] +pub struct SynthesizeMergeRequest { + pub source_id: i32, + pub target_id: i32, +} + +#[derive(serde::Serialize)] +pub struct SynthesizeMergeResponse { + pub proposed_description: String, + pub model_used: String, +} + +#[derive(Deserialize)] +pub struct FactCreateRequest { + pub subject_entity_id: i32, + pub predicate: String, + pub object_entity_id: Option, + pub object_value: Option, + pub source_photo: Option, + pub confidence: Option, + pub valid_from: Option, + pub valid_until: Option, } #[derive(Deserialize)] @@ -185,6 +308,9 @@ pub struct EntityListQuery { pub entity_type: Option, pub status: Option, pub search: Option, + /// "updated" (default) | "name" | "count". `count` is persona-scoped + /// via the X-Persona-Id header. + pub sort: Option, pub limit: Option, pub offset: Option, } @@ -204,6 +330,77 @@ pub struct RecentQuery { pub limit: Option, } +#[derive(Deserialize)] +pub struct GraphQuery { + #[serde(rename = "type")] + pub entity_type: Option, + pub limit: Option, +} + +#[derive(Serialize)] +pub struct GraphNodeView { + pub id: i32, + pub name: String, + pub entity_type: String, + pub fact_count: i64, +} + +#[derive(Serialize)] +pub struct GraphEdgeView { + pub source: i32, + pub target: i32, + pub predicate: String, + pub count: i64, +} + +#[derive(Serialize)] +pub struct GraphResponse { + pub nodes: Vec, + pub edges: Vec, +} + +#[derive(Deserialize)] +pub struct PredicateStatsQuery { + pub limit: Option, +} + +#[derive(Serialize)] +pub struct PredicateStat { + pub predicate: String, + pub count: i64, +} + +#[derive(Serialize)] +pub struct PredicateStatsResponse { + pub predicates: Vec, +} + +#[derive(Serialize)] +pub struct BulkRejectResponse { + pub rejected: usize, +} + +#[derive(Deserialize)] +pub struct ConsolidationQuery { + /// Cosine threshold for clustering. Default 0.85 — looser than + /// the upsert-time guard (0.92) so this view surfaces "probably + /// same" pairs for human review. + pub threshold: Option, + pub limit: Option, +} + +#[derive(Serialize)] +pub struct ConsolidationGroupView { + pub entities: Vec, + pub min_cosine: f32, + pub max_cosine: f32, +} + +#[derive(Serialize)] +pub struct ConsolidationResponse { + pub groups: Vec, +} + // --------------------------------------------------------------------------- // Service registration // --------------------------------------------------------------------------- @@ -216,19 +413,44 @@ where web::scope("/knowledge") .service(web::resource("/entities").route(web::get().to(list_entities::))) .service(web::resource("/entities/merge").route(web::post().to(merge_entities::))) + .service( + web::resource("/entities/synthesize-merge") + .route(web::post().to(synthesize_merge::)), + ) .service( web::resource("/entities/{id}") .route(web::get().to(get_entity::)) .route(web::patch().to(patch_entity::)) .route(web::delete().to(delete_entity::)), ) - .service(web::resource("/facts").route(web::get().to(list_facts::))) + .service( + web::resource("/facts") + .route(web::get().to(list_facts::)) + .route(web::post().to(create_fact::)), + ) .service( web::resource("/facts/{id}") .route(web::patch().to(patch_fact::)) .route(web::delete().to(delete_fact::)), ) - .service(web::resource("/recent").route(web::get().to(get_recent::))), + .service( + web::resource("/facts/{id}/supersede").route(web::post().to(supersede_fact::)), + ) + .service(web::resource("/facts/{id}/restore").route(web::post().to(restore_fact::))) + .service(web::resource("/recent").route(web::get().to(get_recent::))) + .service( + web::resource("/consolidation-proposals") + .route(web::get().to(get_consolidation_proposals::)), + ) + .service(web::resource("/graph").route(web::get().to(get_graph::))) + .service( + web::resource("/predicate-stats") + .route(web::get().to(get_predicate_stats::)), + ) + .service( + web::resource("/predicates/{predicate}/bulk-reject") + .route(web::post().to(bulk_reject_predicate::)), + ), ) } @@ -237,9 +459,11 @@ where // --------------------------------------------------------------------------- async fn list_entities( - _claims: Claims, + req: HttpRequest, + claims: Claims, query: web::Query, dao: web::Data>, + persona_dao: PersonaDaoData, ) -> impl Responder { let limit = query.limit.unwrap_or(50).min(200); let offset = query.offset.unwrap_or(0); @@ -250,6 +474,15 @@ async fn list_entities( Some(s) => Some(s.to_string()), }; + let sort = match query.sort.as_deref() { + Some("name") => EntitySort::NameAsc, + Some("count") => EntitySort::FactCountDesc, + // "updated" or anything else falls through to the default. + _ => EntitySort::UpdatedDesc, + }; + + let persona = resolve_persona_filter(&req, &claims, &persona_dao); + let filter = EntityFilter { entity_type: query.entity_type.clone(), status: status_filter, @@ -260,10 +493,26 @@ async fn list_entities( let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); - match dao.list_entities(&cx, filter) { - Ok((entities, total)) => { - let summaries: Vec = - entities.into_iter().map(EntitySummary::from).collect(); + match dao.list_entities_with_fact_counts(&cx, filter, sort, &persona) { + Ok((pairs, total)) => { + // Batch fetch persona breakdowns so the list-row tooltip + // and detail panel can show "0 here · 12 in journal". + // One extra query for the visible page. + let entity_ids: Vec = pairs.iter().map(|(e, _)| e.id).collect(); + let breakdowns = dao + .get_persona_breakdowns_for_entities(&cx, &entity_ids, persona.user_id()) + .unwrap_or_default(); + let summaries: Vec = pairs + .into_iter() + .map(|(e, c)| { + let entity_id = e.id; + let summary = EntitySummary::from_entity_with_count(e, c); + match breakdowns.get(&entity_id) { + Some(bd) => summary.with_persona_breakdown(bd.clone()), + None => summary, + } + }) + .collect(); HttpResponse::Ok().json(EntityListResponse { entities: summaries, total, @@ -334,9 +583,69 @@ async fn get_entity( source_photo: f.source_photo, source_insight_id: f.source_insight_id, created_at: f.created_at, + valid_from: f.valid_from, + valid_until: f.valid_until, + superseded_by: f.superseded_by, + created_by_model: f.created_by_model, + created_by_backend: f.created_by_backend, + last_modified_by_model: f.last_modified_by_model, + last_modified_by_backend: f.last_modified_by_backend, + last_modified_at: f.last_modified_at, + in_conflict: false, }); } + // Conflict detection: within the active set, group by predicate; + // for each pair within a group that disagrees on the object, + // flag both only if their valid-time intervals overlap. NULL on + // either bound treats that side as unbounded — a fact with no + // valid-time data still flags against any time period (worst case + // for legacy data; user adds dates to suppress). + fn intervals_overlap(a: (Option, Option), b: (Option, Option)) -> bool { + let a_lo = a.0.unwrap_or(i64::MIN); + let a_hi = a.1.unwrap_or(i64::MAX); + let b_lo = b.0.unwrap_or(i64::MIN); + let b_hi = b.1.unwrap_or(i64::MAX); + a_lo < b_hi && b_lo < a_hi + } + { + use std::collections::{HashMap, HashSet}; + let mut by_predicate: HashMap> = HashMap::new(); + for (idx, f) in facts.iter().enumerate() { + if f.status == "active" { + by_predicate + .entry(f.predicate.clone()) + .or_default() + .push(idx); + } + } + let mut to_flag: HashSet = HashSet::new(); + for indices in by_predicate.values() { + if indices.len() < 2 { + continue; + } + for (a_pos, &i) in indices.iter().enumerate() { + for &j in &indices[a_pos + 1..] { + let same_object = facts[i].object_entity_id == facts[j].object_entity_id + && facts[i].object_value == facts[j].object_value; + if same_object { + continue; + } + if intervals_overlap( + (facts[i].valid_from, facts[i].valid_until), + (facts[j].valid_from, facts[j].valid_until), + ) { + to_flag.insert(i); + to_flag.insert(j); + } + } + } + } + for i in to_flag { + facts[i].in_conflict = true; + } + } + // Fetch photo links let photo_links: Vec = match dao.get_links_for_entity(&cx, entity_id) { Ok(links) => links.into_iter().map(PhotoLinkDetail::from).collect(), @@ -347,6 +656,18 @@ async fn get_entity( } }; + // Per-persona breakdown for the detail panel's "facts live in + // {persona}" block — same data the list-row tooltip reads. One + // query, single entity in scope. + let persona_breakdown: Vec = dao + .get_persona_breakdowns_for_entities(&cx, &[entity_id], persona.user_id()) + .ok() + .and_then(|mut map| map.remove(&entity_id)) + .unwrap_or_default() + .into_iter() + .map(|(persona_id, count)| PersonaCount { persona_id, count }) + .collect(); + HttpResponse::Ok().json(EntityDetailResponse { id: entity.id, name: entity.name, @@ -358,6 +679,7 @@ async fn get_entity( updated_at: entity.updated_at, facts, photo_links, + persona_breakdown, }) } @@ -461,6 +783,138 @@ async fn merge_entities( } } +/// Preview a merged-description before the actual merge fires. Calls +/// the local Ollama with both entities' names + descriptions and +/// returns a synthesized rewrite that combines them. The curator +/// previews, edits, and either accepts (PATCH target's description +/// then POST /merge) or skips (just /merge as-is). +/// +/// Deliberately doesn't touch the database — read-only on entities, +/// no LLM call gets to write anything. If the model is unavailable +/// the handler returns 503 so the UI can degrade gracefully (skip +/// the preview, fall back to the existing merge action). +async fn synthesize_merge( + _claims: Claims, + body: web::Json, + dao: web::Data>, + app_state: web::Data, +) -> impl Responder { + if body.source_id == body.target_id { + return HttpResponse::BadRequest() + .json(serde_json::json!({"error": "source_id and target_id must differ"})); + } + + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + + let source = match dao.get_entity_by_id(&cx, body.source_id) { + Ok(Some(e)) => e, + Ok(None) => { + return HttpResponse::BadRequest() + .json(serde_json::json!({"error": "source entity not found"})); + } + Err(e) => { + log::error!("synthesize_merge source lookup: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + }; + let target = match dao.get_entity_by_id(&cx, body.target_id) { + Ok(Some(e)) => e, + Ok(None) => { + return HttpResponse::BadRequest() + .json(serde_json::json!({"error": "target entity not found"})); + } + Err(e) => { + log::error!("synthesize_merge target lookup: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + }; + + // Drop the DAO lock before the LLM call — the generate request + // is the slow part (seconds) and we don't want to block other + // knowledge reads while it runs. + drop(dao); + + let source_desc = if source.description.trim().is_empty() { + "(none)".to_string() + } else { + source.description.clone() + }; + let target_desc = if target.description.trim().is_empty() { + "(none)".to_string() + } else { + target.description.clone() + }; + + let system = "You are condensing two stored entity descriptions into one. The two \ + entities refer to the same real-world thing and are about to be merged. Write a \ + single neutral third-person description (1-2 sentences, max 300 chars) that \ + preserves any concrete facts in either source. Do not invent details. Do not \ + editorialize. Plain prose only — no markdown, no bold, no italics, no headings, \ + no bullets, no lists, no code fences. Return ONLY the merged description — no \ + preamble, no labels, no quotes."; + let prompt = format!( + "Entity A: {} [{}]\nDescription: {}\n\nEntity B: {} [{}]\nDescription: {}\n\nMerged description:", + source.name, source.entity_type, source_desc, target.name, target.entity_type, target_desc, + ); + + let ollama = app_state.ollama.clone(); + let model_used = ollama.primary_model.clone(); + let proposed = match ollama.generate(&prompt, Some(system)).await { + Ok(out) => { + // Strip the framing models reach for even with explicit + // "no preamble" guidance: leading "Merged description:" + // labels, wrapping quotes, ``` code fences, leading + // bullets / hash headings. Belt-and-braces against the + // system prompt's plain-text directive. + let mut s = out.trim().to_string(); + s = s + .trim_start_matches("Merged description:") + .trim_start_matches("Merged Description:") + .trim() + .to_string(); + // Code fences (``` or ```text) + s = s + .trim_start_matches("```text") + .trim_start_matches("```markdown") + .trim_start_matches("```") + .trim_end_matches("```") + .trim() + .to_string(); + // Markdown headings / bullets at the very start + while let Some(stripped) = s + .strip_prefix('#') + .or_else(|| s.strip_prefix('*')) + .or_else(|| s.strip_prefix('-')) + .or_else(|| s.strip_prefix('>')) + { + s = stripped.trim_start().to_string(); + } + // Wrapping quotes + s = s.trim_matches(|c| c == '"' || c == '\'').to_string(); + // Inline emphasis: drop standalone `**` / `*` / `__` / + // `_` markers without trying to parse markdown — just + // remove the punctuation. Rare enough that this naive + // replace is fine. + s = s.replace("**", "").replace("__", ""); + s + } + Err(e) => { + log::warn!("synthesize_merge generate failed: {:?}", e); + return HttpResponse::ServiceUnavailable().json(serde_json::json!({ + "error": "LLM unavailable; the merge picker should fall back to skip-synthesis." + })); + } + }; + + HttpResponse::Ok().json(SynthesizeMergeResponse { + proposed_description: proposed, + model_used, + }) +} + async fn list_facts( req: HttpRequest, claims: Claims, @@ -535,6 +989,114 @@ async fn list_facts( } } +async fn create_fact( + req: HttpRequest, + claims: Claims, + body: web::Json, + dao: web::Data>, + persona_dao: PersonaDaoData, +) -> impl Responder { + if body.object_entity_id.is_none() && body.object_value.is_none() { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": "object_entity_id or object_value is required" + })); + } + if body.predicate.trim().is_empty() { + return HttpResponse::BadRequest() + .json(serde_json::json!({"error": "predicate must not be empty"})); + } + + // Persona scoping: facts are written under the active single persona. + // PersonaFilter::All is read-only ("hive-mind" view); callers should + // pin a specific persona for writes via X-Persona-Id. + let persona = resolve_persona_filter(&req, &claims, &persona_dao); + let (user_id, persona_id) = match &persona { + PersonaFilter::Single { + user_id, + persona_id, + } => (*user_id, persona_id.clone()), + PersonaFilter::All { user_id } => (*user_id, "default".to_string()), + }; + + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + + // Verify subject entity exists. + match dao.get_entity_by_id(&cx, body.subject_entity_id) { + Ok(None) => { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": format!("Subject entity {} not found", body.subject_entity_id) + })); + } + Err(e) => { + log::error!("create_fact subject lookup error: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + Ok(Some(_)) => {} + } + + // Optional object entity validation when supplied. + if let Some(oid) = body.object_entity_id { + match dao.get_entity_by_id(&cx, oid) { + Ok(None) => { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": format!("Object entity {} not found", oid) + })); + } + Err(e) => { + log::error!("create_fact object lookup error: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + Ok(Some(_)) => {} + } + } + + let now = Utc::now().timestamp(); + let confidence = body.confidence.unwrap_or(0.6).clamp(0.0, 0.95); + + let insert = InsertEntityFact { + subject_entity_id: body.subject_entity_id, + predicate: body.predicate.trim().to_string(), + object_entity_id: body.object_entity_id, + object_value: body.object_value.clone(), + source_photo: body.source_photo.clone(), + source_insight_id: None, + confidence, + status: "active".to_string(), + created_at: now, + persona_id, + user_id, + valid_from: body.valid_from, + valid_until: body.valid_until, + superseded_by: None, + // Manual creation via curation UI — provenance recorded as + // "manual" with no model, distinguishing user-entered facts + // from agent-generated ones in the audit view. + created_by_model: None, + created_by_backend: Some("manual".to_string()), + last_modified_by_model: None, + last_modified_by_backend: None, + last_modified_at: None, + }; + + match dao.upsert_fact(&cx, insert) { + Ok((fact, is_new)) => { + let status = if is_new { + actix_web::http::StatusCode::CREATED + } else { + actix_web::http::StatusCode::OK + }; + HttpResponse::build(status).json(fact) + } + Err(e) => { + log::error!("create_fact upsert error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + async fn patch_fact( _claims: Claims, id: web::Path, @@ -548,10 +1110,15 @@ async fn patch_fact( object_value: body.object_value.clone(), status: body.status.clone(), confidence: body.confidence, + valid_from: body.valid_from, + valid_until: body.valid_until, }; let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); - match dao.update_fact(&cx, fact_id, patch) { + // Manual PATCH from the curation UI — provenance stamped as + // "manual" so the audit feed can distinguish human edits from + // agent corrections. + match dao.update_fact(&cx, fact_id, patch, Some(("manual", "manual"))) { Ok(Some(fact)) => HttpResponse::Ok().json(fact), Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})), Err(e) => { @@ -578,6 +1145,51 @@ async fn delete_fact( } } +async fn supersede_fact( + _claims: Claims, + id: web::Path, + body: web::Json, + dao: web::Data>, +) -> impl Responder { + let cx = opentelemetry::Context::current(); + let old_id = id.into_inner(); + if old_id == body.by_fact_id { + return HttpResponse::BadRequest() + .json(serde_json::json!({"error": "old_id and by_fact_id must differ"})); + } + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + // Manual supersede from the curation UI — same stamping rule as + // the PATCH path. + match dao.supersede_fact(&cx, old_id, body.by_fact_id, Some(("manual", "manual"))) { + Ok(Some(fact)) => HttpResponse::Ok().json(fact), + Ok(None) => { + HttpResponse::NotFound().json(serde_json::json!({"error": "Old or new fact not found"})) + } + Err(e) => { + log::error!("supersede_fact error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + +async fn restore_fact( + _claims: Claims, + id: web::Path, + dao: web::Data>, +) -> impl Responder { + let cx = opentelemetry::Context::current(); + let fact_id = id.into_inner(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + match dao.revert_supersession(&cx, fact_id, Some(("manual", "manual"))) { + Ok(Some(fact)) => HttpResponse::Ok().json(fact), + Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})), + Err(e) => { + log::error!("restore_fact error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + async fn get_recent( req: HttpRequest, claims: Claims, @@ -608,3 +1220,157 @@ async fn get_recent( } } } + +async fn get_predicate_stats( + req: HttpRequest, + claims: Claims, + query: web::Query, + dao: web::Data>, + persona_dao: PersonaDaoData, +) -> impl Responder { + let limit = query.limit.unwrap_or(100).clamp(1, 500) as usize; + let persona = resolve_persona_filter(&req, &claims, &persona_dao); + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + match dao.get_predicate_stats(&cx, &persona, limit) { + Ok(rows) => HttpResponse::Ok().json(PredicateStatsResponse { + predicates: rows + .into_iter() + .map(|(predicate, count)| PredicateStat { predicate, count }) + .collect(), + }), + Err(e) => { + log::error!("get_predicate_stats error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + +async fn bulk_reject_predicate( + req: HttpRequest, + claims: Claims, + predicate: web::Path, + dao: web::Data>, + persona_dao: PersonaDaoData, +) -> impl Responder { + let predicate = predicate.into_inner(); + if predicate.trim().is_empty() { + return HttpResponse::BadRequest() + .json(serde_json::json!({"error": "predicate must not be empty"})); + } + let persona = resolve_persona_filter(&req, &claims, &persona_dao); + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + match dao.bulk_reject_facts_by_predicate( + &cx, + &persona, + &predicate, + Some(("manual", "manual")), + ) { + Ok(rejected) => HttpResponse::Ok().json(BulkRejectResponse { rejected }), + Err(e) => { + log::error!("bulk_reject_predicate error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + +async fn get_graph( + req: HttpRequest, + claims: Claims, + query: web::Query, + dao: web::Data>, + persona_dao: PersonaDaoData, +) -> impl Responder { + let limit = query.limit.unwrap_or(200).clamp(1, 1000) as usize; + let persona = resolve_persona_filter(&req, &claims, &persona_dao); + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + match dao.build_entity_graph(&cx, query.entity_type.as_deref(), limit, &persona) { + Ok(EntityGraph { nodes, edges }) => HttpResponse::Ok().json(GraphResponse { + nodes: nodes + .into_iter() + .map(|n| GraphNodeView { + id: n.id, + name: n.name, + entity_type: n.entity_type, + fact_count: n.fact_count, + }) + .collect(), + edges: edges + .into_iter() + .map(|e| GraphEdgeView { + source: e.source, + target: e.target, + predicate: e.predicate, + count: e.count, + }) + .collect(), + }), + Err(e) => { + log::error!("build_entity_graph error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + +async fn get_consolidation_proposals( + req: HttpRequest, + claims: Claims, + query: web::Query, + dao: web::Data>, + persona_dao: PersonaDaoData, +) -> impl Responder { + // Clamp threshold so a curious client can't drag the cosine + // floor to 0 and pull every entity into one giant cluster. + let threshold = query.threshold.unwrap_or(0.85).clamp(0.5, 0.99); + let max_groups = query.limit.unwrap_or(50).clamp(1, 200) as usize; + + let persona = resolve_persona_filter(&req, &claims, &persona_dao); + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + let groups: Vec = + match dao.find_consolidation_proposals(&cx, threshold, max_groups) { + Ok(g) => g, + Err(e) => { + log::error!("find_consolidation_proposals: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + }; + + // Decorate with per-persona fact counts so the curation UI can + // show "default 8 · journal 3" inline and the curator can pick + // which entity is the strongest target. + let entity_ids: Vec = groups + .iter() + .flat_map(|g| g.entities.iter().map(|e| e.id)) + .collect(); + let breakdowns = dao + .get_persona_breakdowns_for_entities(&cx, &entity_ids, persona.user_id()) + .unwrap_or_default(); + + let groups_view: Vec = groups + .into_iter() + .map(|g| ConsolidationGroupView { + entities: g + .entities + .into_iter() + .map(|e| { + let id = e.id; + let summary = EntitySummary::from(e); + match breakdowns.get(&id) { + Some(bd) => summary.with_persona_breakdown(bd.clone()), + None => summary, + } + }) + .collect(), + min_cosine: g.min_cosine, + max_cosine: g.max_cosine, + }) + .collect(); + + HttpResponse::Ok().json(ConsolidationResponse { + groups: groups_view, + }) +} diff --git a/src/personas.rs b/src/personas.rs index 4b1fe53..6e2b4bc 100644 --- a/src/personas.rs +++ b/src/personas.rs @@ -35,6 +35,16 @@ pub struct PersonaView { pub created_at: i64, #[serde(rename = "updatedAt")] pub updated_at: i64, + /// "Strict mode" — when true, the agent's recall_* tools return + /// only facts whose status is 'reviewed'. See migration + /// 2026-05-10-000400. + #[serde(rename = "reviewedOnlyFacts")] + pub reviewed_only_facts: bool, + /// Gate for the agent's update_fact / supersede_fact tools. + /// Default false — fresh personas let the agent create but not + /// alter. See migration 2026-05-10-000500. + #[serde(rename = "allowAgentCorrections")] + pub allow_agent_corrections: bool, } impl From for PersonaView { @@ -47,6 +57,8 @@ impl From for PersonaView { include_all_memories: p.include_all_memories, created_at: p.created_at, updated_at: p.updated_at, + reviewed_only_facts: p.reviewed_only_facts, + allow_agent_corrections: p.allow_agent_corrections, } } } @@ -72,6 +84,10 @@ pub struct UpdatePersonaRequest { pub system_prompt: Option, #[serde(default, rename = "includeAllMemories")] pub include_all_memories: Option, + #[serde(default, rename = "reviewedOnlyFacts")] + pub reviewed_only_facts: Option, + #[serde(default, rename = "allowAgentCorrections")] + pub allow_agent_corrections: Option, } #[derive(Deserialize)] @@ -225,8 +241,7 @@ async fn update_persona( // identity. Mirrors the same guard delete_persona enforces below. match dao.get_persona(&cx, uid, &pid) { Ok(Some(p)) if p.is_built_in => { - let editing_identity = - body.name.is_some() || body.system_prompt.is_some(); + let editing_identity = body.name.is_some() || body.system_prompt.is_some(); if editing_identity { return HttpResponse::Conflict().json(serde_json::json!({ "error": "Cannot edit name or systemPrompt of a built-in persona" @@ -249,6 +264,8 @@ async fn update_persona( name: body.name.clone(), system_prompt: body.system_prompt.clone(), include_all_memories: body.include_all_memories, + reviewed_only_facts: body.reviewed_only_facts, + allow_agent_corrections: body.allow_agent_corrections, }; match dao.update_persona(&cx, uid, &pid, patch) { Ok(Some(p)) => HttpResponse::Ok().json(PersonaView::from(p)), diff --git a/src/state.rs b/src/state.rs index e0234a2..739cb6c 100644 --- a/src/state.rs +++ b/src/state.rs @@ -207,6 +207,9 @@ impl Default for AppState { Arc::new(Mutex::new(Box::new(SqliteTagDao::default()))); let knowledge_dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteKnowledgeDao::new()))); + let persona_dao: Arc>> = Arc::new(Mutex::new( + Box::new(crate::database::SqlitePersonaDao::new()), + )); let face_dao: Arc>> = Arc::new(Mutex::new(Box::new(faces::SqliteFaceDao::new()))); @@ -236,6 +239,7 @@ impl Default for AppState { tag_dao.clone(), face_dao.clone(), knowledge_dao, + persona_dao, libraries_vec.clone(), ); @@ -352,6 +356,9 @@ impl AppState { Arc::new(Mutex::new(Box::new(SqliteTagDao::default()))); let knowledge_dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteKnowledgeDao::new()))); + let persona_dao: Arc>> = Arc::new(Mutex::new( + Box::new(crate::database::SqlitePersonaDao::new()), + )); let face_dao: Arc>> = Arc::new(Mutex::new(Box::new(faces::SqliteFaceDao::new()))); @@ -378,6 +385,7 @@ impl AppState { tag_dao.clone(), face_dao.clone(), knowledge_dao, + persona_dao, vec![test_lib], );