feature/knowledge-curation #91

Merged
cameron merged 19 commits from feature/knowledge-curation into master 2026-05-12 15:40:57 +00:00
27 changed files with 3246 additions and 126 deletions

View File

@@ -0,0 +1,5 @@
-- SQLite can drop columns since 3.35 (March 2021); embedded
-- libsqlite3-sys is well past that. Drop in reverse insert order so
-- a partial down still leaves the schema valid.
ALTER TABLE entity_facts DROP COLUMN valid_until;
ALTER TABLE entity_facts DROP COLUMN valid_from;

View File

@@ -0,0 +1,25 @@
-- Add valid-time columns to entity_facts.
--
-- entity_facts already has created_at — *transaction time*, the
-- moment WE recorded the fact. That's not the same as the real-world
-- period the fact was true. "Cameron is_in_relationship_with X" was
-- only true during a window; recording it in 2026 doesn't make it
-- true today. Without the distinction, every former relationship,
-- former job, former address reads as currently-true.
--
-- Adding two BIGINT NULL columns: valid_from / valid_until (unix
-- seconds). NULL means "unbounded on that side" — `valid_from IS
-- NULL` reads as "always-true-back-to-the-beginning",
-- `valid_until IS NULL` as "still-true-now-or-unknown". Both NULL =
-- temporal validity unknown (current state of all legacy rows).
--
-- Conflict detection refines accordingly: same-predicate facts with
-- different objects stop flagging when their intervals are disjoint
-- ("lives_in NYC 2018-2020" and "lives_in SF 2020-present" are both
-- valid, just at different times).
ALTER TABLE entity_facts ADD COLUMN valid_from BIGINT;
ALTER TABLE entity_facts ADD COLUMN valid_until BIGINT;
-- Optional partial index for time-bounded scans. Skipped for now —
-- conflict detection runs per-entity (small N) and doesn't need it.

View File

@@ -0,0 +1,2 @@
DROP INDEX IF EXISTS idx_entity_facts_superseded_by;
ALTER TABLE entity_facts DROP COLUMN superseded_by;

View File

@@ -0,0 +1,31 @@
-- Add a supersession pointer to entity_facts.
--
-- Status alone is a one-way trapdoor: 'rejected' loses the link
-- between the rejected fact and the one that replaced it. For
-- evolving facts (Cameron's relationship, employer, address) the
-- curator wants to *replace* a stale fact with a new one and keep
-- the history readable: "from 2018 until 2022 this was true, then
-- it became this other thing".
--
-- A nullable INTEGER column pointing at another entity_facts.id —
-- no FK constraint because SQLite can't ALTER ADD COLUMN with REFs;
-- the DAO's delete_fact clears dangling pointers in the same
-- transaction as the parent delete to keep the column honest.
--
-- A status of 'superseded' on the old fact (alongside the existing
-- active / reviewed / rejected) signals "replaced by a newer
-- claim". Read paths already filter 'rejected' out of the active
-- view; the curation UI will treat 'superseded' the same way for
-- conflict detection so they don't keep flagging.
--
-- Pairs with the valid-time columns from 2026-05-10-000100: the
-- supersede action auto-stamps the old fact's `valid_until` from
-- the new fact's `valid_from`, closing the interval cleanly.
ALTER TABLE entity_facts ADD COLUMN superseded_by INTEGER;
-- Helpful index for "show me what superseded this fact" walks
-- (rare today; cheap to add now while the table is small).
CREATE INDEX idx_entity_facts_superseded_by
ON entity_facts(superseded_by)
WHERE superseded_by IS NOT NULL;

View File

@@ -0,0 +1,4 @@
DROP INDEX IF EXISTS idx_entity_facts_created_by_backend;
DROP INDEX IF EXISTS idx_entity_facts_created_by_model;
ALTER TABLE entity_facts DROP COLUMN created_by_backend;
ALTER TABLE entity_facts DROP COLUMN created_by_model;

View File

@@ -0,0 +1,30 @@
-- Track which model + backend generated each fact so the curator
-- can audit which configurations produce trustworthy knowledge.
--
-- photo_insights already carries `model_version` + `backend`, and
-- entity_facts.source_insight_id links to it — but:
-- 1. source_insight_id is only set after an insight is stored
-- (post-loop), so chat-continuation facts and facts whose insight
-- was regenerated lose the link.
-- 2. JOINing for every read is more friction than just embedding the
-- provenance on the fact row itself.
-- 3. Manual facts (POST /knowledge/facts) have no insight at all and
-- need to record "manual" as their provenance.
--
-- Two nullable TEXT columns are enough for the audit use case: model
-- (e.g. "qwen2.5:7b", "anthropic/claude-sonnet-4") and backend
-- ("local", "hybrid", "manual"). Pre-existing rows leave both NULL —
-- legacy facts predate this tracking and can't be back-filled
-- reliably from training_messages without burning compute.
ALTER TABLE entity_facts ADD COLUMN created_by_model TEXT;
ALTER TABLE entity_facts ADD COLUMN created_by_backend TEXT;
-- Indexes are cheap and useful for "show me all facts from model X"
-- audit queries — partial so the legacy NULL rows don't bloat them.
CREATE INDEX idx_entity_facts_created_by_model
ON entity_facts(created_by_model)
WHERE created_by_model IS NOT NULL;
CREATE INDEX idx_entity_facts_created_by_backend
ON entity_facts(created_by_backend)
WHERE created_by_backend IS NOT NULL;

View File

@@ -0,0 +1 @@
ALTER TABLE personas DROP COLUMN reviewed_only_facts;

View File

@@ -0,0 +1,16 @@
-- Per-persona toggle: when true, agent reads only see facts whose
-- status is exactly 'reviewed' (human-verified). When false (the
-- default), agent reads see 'active' OR 'reviewed' — everything not
-- rejected or superseded.
--
-- The mobile app surfaces this as "Strict mode" on the persona
-- editor: useful when you want a persona's chat to be grounded
-- exclusively on the curated subset, e.g. for tasks where
-- hallucinated agent claims are particularly costly.
--
-- Note: this is separate from `include_all_memories` (which unions
-- across personas for hive-mind reads). Reviewed-only operates on
-- the status axis; include_all_memories operates on the persona-
-- scope axis. They compose freely.
ALTER TABLE personas ADD COLUMN reviewed_only_facts BOOLEAN NOT NULL DEFAULT 0;

View File

@@ -0,0 +1,5 @@
ALTER TABLE personas DROP COLUMN allow_agent_corrections;
DROP INDEX IF EXISTS idx_entity_facts_last_modified_at;
ALTER TABLE entity_facts DROP COLUMN last_modified_at;
ALTER TABLE entity_facts DROP COLUMN last_modified_by_backend;
ALTER TABLE entity_facts DROP COLUMN last_modified_by_model;

View File

@@ -0,0 +1,30 @@
-- Three coupled changes for agent self-correction safety:
--
-- 1. `entity_facts.last_modified_by_*` + `last_modified_at` track who
-- most recently mutated each fact. `created_by_*` from migration
-- 2026-05-10-000300 records who first wrote the row; this records
-- who last *changed* it. Separate columns so the create vs update
-- audit is independently grep-able ("show me every fact gpt-5
-- altered last week" stays a single index scan).
--
-- 2. `personas.allow_agent_corrections` is the gate for the new
-- agent-side `update_fact` / `supersede_fact` tools. Default OFF —
-- a fresh persona's agent can create but can't alter or replace.
-- Operator opts in per-persona after the model has earned trust,
-- typically via the strict-mode flow (curate, then ratchet up
-- agent autonomy as confidence rises). Parallel in shape to
-- `reviewed_only_facts` from 2026-05-10-000400; they compose.
--
-- 3. Index on `last_modified_at` (partial, NOT NULL) for the
-- audit-feed reads in the curation UI ("show recent agent edits
-- sorted newest first").
ALTER TABLE entity_facts ADD COLUMN last_modified_by_model TEXT;
ALTER TABLE entity_facts ADD COLUMN last_modified_by_backend TEXT;
ALTER TABLE entity_facts ADD COLUMN last_modified_at BIGINT;
CREATE INDEX idx_entity_facts_last_modified_at
ON entity_facts(last_modified_at)
WHERE last_modified_at IS NOT NULL;
ALTER TABLE personas ADD COLUMN allow_agent_corrections BOOLEAN NOT NULL DEFAULT 0;

View File

@@ -0,0 +1,6 @@
-- Irreversible: we collapsed multiple raw entity_type strings to
-- canonical forms and don't have a per-row record of the original.
-- The down migration is intentionally a no-op (the rewritten values
-- are still semantically correct), and the up migration is safe to
-- re-run because every UPDATE is conditional on `!= canonical`.
SELECT 1;

View File

@@ -0,0 +1,43 @@
-- Canonicalize `entities.entity_type` so legacy rows from before
-- `normalize_entity_type` landed in upsert_entity stop polluting
-- client-side filters. Mirrors the synonym map in
-- `src/database/knowledge_dao.rs::normalize_entity_type`:
-- person ← person | people | human | individual | contact
-- place ← place | location | venue | site | area | landmark
-- event ← event | occasion | activity | celebration
-- thing ← thing | object | item | product
-- Types outside the synonym set (e.g. "friend", "family") are not
-- recognized as canonical and get a lowercase+trim pass instead, so
-- at minimum case variants collapse.
--
-- `UPDATE OR IGNORE` skips rows that would violate UNIQUE(name,
-- entity_type) after the rewrite. Two rows like ("Sarah", "person")
-- + ("Sarah", "Person") would otherwise collide — the duplicate
-- survives unchanged so the curator can merge it via the curation
-- UI rather than have the migration silently delete data.
UPDATE OR IGNORE entities
SET entity_type = 'person'
WHERE LOWER(TRIM(entity_type)) IN ('person', 'people', 'human', 'individual', 'contact')
AND entity_type != 'person';
UPDATE OR IGNORE entities
SET entity_type = 'place'
WHERE LOWER(TRIM(entity_type)) IN ('place', 'location', 'venue', 'site', 'area', 'landmark')
AND entity_type != 'place';
UPDATE OR IGNORE entities
SET entity_type = 'event'
WHERE LOWER(TRIM(entity_type)) IN ('event', 'occasion', 'activity', 'celebration')
AND entity_type != 'event';
UPDATE OR IGNORE entities
SET entity_type = 'thing'
WHERE LOWER(TRIM(entity_type)) IN ('thing', 'object', 'item', 'product')
AND entity_type != 'thing';
-- Anything left ("Friend" vs "friend") gets a lowercase+trim sweep
-- so at least case variants of the same custom type collapse.
UPDATE OR IGNORE entities
SET entity_type = LOWER(TRIM(entity_type))
WHERE entity_type != LOWER(TRIM(entity_type));

View File

@@ -885,10 +885,7 @@ pub async fn chat_history_handler(
.flatten()
.unwrap_or_else(|| app_state.primary_library());
match app_state
.insight_chat
.load_history(library.id, &query.path)
{
match app_state.insight_chat.load_history(library.id, &query.path) {
Ok(view) => HttpResponse::Ok().json(ChatHistoryHttpResponse {
messages: view
.messages

View File

@@ -405,7 +405,10 @@ impl InsightChatService {
// and probes the per-table presence flags. Pass `offer_describe_tool`
// directly — the `!is_hybrid && local_first_user_has_image` decision
// is the chat-path's vision predicate.
let gate_opts = self.generator.current_gate_opts(offer_describe_tool);
let gate_opts = self.generator.current_gate_opts_for_persona(
offer_describe_tool,
Some((req.user_id, &active_persona)),
);
let tools = InsightGenerator::build_tool_definitions(gate_opts);
// Image base64 only needed when describe_photo is on the menu. Load
@@ -497,6 +500,8 @@ impl InsightChatService {
&normalized,
req.user_id,
&active_persona,
&model_used,
&effective_backend,
&loop_cx,
)
.await;
@@ -835,7 +840,10 @@ impl InsightChatService {
.map(|imgs| !imgs.is_empty())
.unwrap_or(false);
let offer_describe_tool = !is_hybrid && local_first_user_has_image;
let gate_opts = self.generator.current_gate_opts(offer_describe_tool);
let gate_opts = self.generator.current_gate_opts_for_persona(
offer_describe_tool,
Some((req.user_id, &active_persona)),
);
let tools = InsightGenerator::build_tool_definitions(gate_opts);
let image_base64: Option<String> = if offer_describe_tool {
@@ -870,6 +878,8 @@ impl InsightChatService {
&normalized,
req.user_id,
&active_persona,
&model_used,
&effective_backend,
max_iterations,
&tx,
)
@@ -1022,7 +1032,10 @@ impl InsightChatService {
// the chat model can re-look at the photo on demand. Hybrid:
// already inlined, no tool needed.
let offer_describe_tool = !is_hybrid && image_base64.is_some();
let gate_opts = self.generator.current_gate_opts(offer_describe_tool);
let gate_opts = self.generator.current_gate_opts_for_persona(
offer_describe_tool,
Some((req.user_id, &active_persona)),
);
let tools = InsightGenerator::build_tool_definitions(gate_opts);
// System message = persona + photo context block. Photo context
@@ -1059,6 +1072,8 @@ impl InsightChatService {
&normalized,
req.user_id,
&active_persona,
&model_used,
&effective_backend,
max_iterations,
&tx,
)
@@ -1210,6 +1225,10 @@ impl InsightChatService {
normalized: &str,
user_id: i32,
active_persona: &str,
// Provenance — stamped onto any store_fact tool call made
// during this loop. Mirrors the non-streaming chat path.
model_used: &str,
effective_backend: &str,
max_iterations: usize,
tx: &tokio::sync::mpsc::Sender<ChatStreamEvent>,
) -> Result<AgenticLoopOutcome> {
@@ -1290,6 +1309,8 @@ impl InsightChatService {
normalized,
user_id,
active_persona,
model_used,
effective_backend,
&cx,
)
.await;

View File

@@ -13,7 +13,7 @@ use crate::ai::apollo_client::{ApolloClient, ApolloPlace};
use crate::ai::llm_client::LlmClient;
use crate::ai::ollama::{ChatMessage, OllamaClient, Tool};
use crate::ai::openrouter::OpenRouterClient;
use crate::ai::sms_client::SmsApiClient;
use crate::ai::sms_client::{SmsApiClient, SmsSearchHit, SmsSearchParams};
use crate::ai::user_display_name;
use crate::database::models::InsertPhotoInsight;
use crate::database::{
@@ -89,6 +89,10 @@ pub struct InsightGenerator {
// Knowledge memory
knowledge_dao: Arc<Mutex<Box<dyn KnowledgeDao>>>,
// Persona settings (looked up by recall_facts_for_photo to honour
// the per-persona "reviewed-only" strict-mode toggle).
persona_dao: Arc<Mutex<Box<dyn crate::database::PersonaDao>>>,
libraries: Vec<Library>,
}
@@ -104,6 +108,12 @@ pub struct ToolGateOpts {
pub calendar_present: bool,
pub location_history_present: bool,
pub faces_present: bool,
/// Per-persona toggle from migration 2026-05-10-000500. When
/// false the agent's update_fact / supersede_fact tools aren't
/// in the catalog at all — defense-in-depth so a hallucinated
/// tool name still 404s, and the agent doesn't waste iterations
/// trying corrections it isn't allowed to do.
pub allow_agent_corrections: bool,
}
impl InsightGenerator {
@@ -121,6 +131,7 @@ impl InsightGenerator {
tag_dao: Arc<Mutex<Box<dyn TagDao>>>,
face_dao: Arc<Mutex<Box<dyn crate::faces::FaceDao>>>,
knowledge_dao: Arc<Mutex<Box<dyn KnowledgeDao>>>,
persona_dao: Arc<Mutex<Box<dyn crate::database::PersonaDao>>>,
libraries: Vec<Library>,
) -> Self {
Self {
@@ -137,6 +148,7 @@ impl InsightGenerator {
tag_dao,
face_dao,
knowledge_dao,
persona_dao,
libraries,
}
}
@@ -158,6 +170,20 @@ impl InsightGenerator {
/// supplied by the caller because it depends on the model selected
/// for this turn, not on persistent state.
pub fn current_gate_opts(&self, has_vision: bool) -> ToolGateOpts {
self.current_gate_opts_for_persona(has_vision, None)
}
/// Same as `current_gate_opts` but resolves the per-persona
/// `allow_agent_corrections` flag too. Pass `Some((user_id,
/// persona_id))` when generating in a persona context (every chat
/// turn does); pass `None` for callers that don't have one yet
/// (cold paths, populate_knowledge bin), which defaults the gate
/// to closed — the conservative posture.
pub fn current_gate_opts_for_persona(
&self,
has_vision: bool,
persona: Option<(i32, &str)>,
) -> ToolGateOpts {
let cx = opentelemetry::Context::new();
let calendar_present = {
let mut dao = self
@@ -184,6 +210,13 @@ impl InsightGenerator {
let mut dao = self.face_dao.lock().expect("Unable to lock FaceDao");
dao.has_any_faces(&cx).unwrap_or(false)
};
let allow_agent_corrections = persona
.and_then(|(uid, pid)| {
let mut pdao = self.persona_dao.lock().expect("Unable to lock PersonaDao");
pdao.get_persona(&cx, uid, pid).ok().flatten()
})
.map(|p| p.allow_agent_corrections)
.unwrap_or(false);
ToolGateOpts {
has_vision,
apollo_enabled: self.apollo_enabled(),
@@ -191,6 +224,7 @@ impl InsightGenerator {
calendar_present,
location_history_present,
faces_present,
allow_agent_corrections,
}
}
@@ -1554,6 +1588,13 @@ Return ONLY the summary, nothing else."#,
file_path: &str,
user_id: i32,
persona_id: &str,
// Provenance — written into entity_facts.created_by_* when
// the loop calls store_fact. The caller knows the actual
// chat-runtime model and backend (which may differ from
// ollama.primary_model in hybrid mode where chat lives on
// OpenRouter while Ollama still handles vision).
model: &str,
backend: &str,
cx: &opentelemetry::Context,
) -> String {
let result = match tool_name {
@@ -1574,7 +1615,17 @@ Return ONLY the summary, nothing else."#,
}
"store_entity" => self.tool_store_entity(arguments, ollama, cx).await,
"store_fact" => {
self.tool_store_fact(arguments, file_path, user_id, persona_id, cx)
self.tool_store_fact(
arguments, file_path, user_id, persona_id, model, backend, cx,
)
.await
}
"update_fact" => {
self.tool_update_fact(arguments, user_id, persona_id, model, backend, cx)
.await
}
"supersede_fact" => {
self.tool_supersede_fact(arguments, user_id, persona_id, model, backend, cx)
.await
}
"get_current_datetime" => Self::tool_get_current_datetime(),
@@ -1807,7 +1858,11 @@ Return ONLY the summary, nothing else."#,
/// Tool: search_messages — keyword / semantic / hybrid search over all
/// SMS message bodies via the Django FTS5 + embeddings pipeline. Now
/// supports optional `contact_id`, `start_ts`, `end_ts` filters.
async fn tool_search_messages(&self, args: &serde_json::Value, cx: &opentelemetry::Context) -> String {
async fn tool_search_messages(
&self,
args: &serde_json::Value,
cx: &opentelemetry::Context,
) -> String {
let query = match args.get("query").and_then(|v| v.as_str()) {
Some(q) if !q.trim().is_empty() => q.trim(),
_ => {
@@ -1827,8 +1882,7 @@ Return ONLY the summary, nothing else."#,
.map(|s| !s.trim().is_empty())
.unwrap_or(false);
let has_numeric_contact = args.get("contact_id").is_some();
let has_ts_window =
args.get("start_ts").is_some() || args.get("end_ts").is_some();
let has_ts_window = args.get("start_ts").is_some() || args.get("end_ts").is_some();
if has_date_str && !has_numeric_contact && !has_ts_window {
log::info!(
"search_messages with `date` and no `query` — routing to get_sms_messages"
@@ -1841,8 +1895,7 @@ Return ONLY the summary, nothing else."#,
routed
);
}
let has_contact_name =
args.get("contact").and_then(|v| v.as_str()).is_some();
let has_contact_name = args.get("contact").and_then(|v| v.as_str()).is_some();
if has_ts_window || has_numeric_contact || has_contact_name {
return "Error: search_messages needs a 'query' (keywords/phrase). \
To fetch messages around a date or from a contact without keywords, \
@@ -1874,70 +1927,62 @@ Return ONLY the summary, nothing else."#,
let contact_id = args.get("contact_id").and_then(|v| v.as_i64());
let start_ts = args.get("start_ts").and_then(|v| v.as_i64());
let end_ts = args.get("end_ts").and_then(|v| v.as_i64());
let is_mms = args.get("is_mms").and_then(|v| v.as_bool());
let has_media = args.get("has_media").and_then(|v| v.as_bool());
let has_date_filter = start_ts.is_some() || end_ts.is_some();
// When a date filter is supplied, fetch a larger pool from SMS-API
// so in-window matches that ranked lower than out-of-window ones
// aren't lost.
let fetch_limit = if has_date_filter { 100 } else { user_limit };
log::info!(
"tool_search_messages: query='{}', mode={}, contact_id={:?}, range=[{:?}, {:?}], user_limit={}, fetch_limit={}",
"tool_search_messages: query='{}', mode={}, contact_id={:?}, range=[{:?}, {:?}], is_mms={:?}, has_media={:?}, limit={}",
query,
mode,
contact_id,
start_ts,
end_ts,
user_limit,
fetch_limit
is_mms,
has_media,
user_limit
);
let hits = match self
.sms_client
.search_messages_with_contact(query, &mode, fetch_limit, contact_id)
.await
{
// SMS-API applies all of date/contact/mms/media filtering and
// pagination server-side, so the response is already exactly the
// page we want — no over-fetch, no client-side post-filter.
let params = SmsSearchParams {
mode: mode.as_str(),
limit: user_limit,
contact_id,
date_from: start_ts,
date_to: end_ts,
is_mms,
has_media,
offset: None,
};
let hits = match self.sms_client.search_messages(query, &params).await {
Ok(h) => h,
Err(e) => return format!("Error searching messages: {}", e),
};
// Date-range post-filter on the client side. SMS-API's /search/
// doesn't accept date params; mirroring Apollo's pattern here.
let filtered: Vec<_> = hits
.into_iter()
.filter(|h| {
if let Some(s) = start_ts
&& h.date < s
{
return false;
}
if let Some(e) = end_ts
&& h.date > e
{
return false;
}
true
})
.take(user_limit)
.collect();
if filtered.is_empty() {
if hits.is_empty() {
return "No messages matched.".to_string();
}
Self::format_search_hits(&hits, &mode, has_date_filter)
}
/// Render a list of [`SmsSearchHit`] for the LLM. Prefers the SMS-API
/// snippet (which already excerpts the matched span and is the only
/// preview MMS-attachment-only matches have) over the full body, and
/// strips the `<mark>` tags the snippet ships with.
fn format_search_hits(hits: &[SmsSearchHit], mode: &str, date_filtered: bool) -> String {
let user_name = user_display_name();
let mut out = String::new();
out.push_str(&format!(
"Found {} messages (mode: {}{}):\n\n",
filtered.len(),
hits.len(),
mode,
if has_date_filter {
", date-filtered"
} else {
""
}
if date_filtered { ", date-filtered" } else { "" }
));
for h in filtered {
for h in hits {
let date = chrono::DateTime::from_timestamp(h.date, 0)
.map(|dt| dt.format("%Y-%m-%d").to_string())
.unwrap_or_else(|| h.date.to_string());
@@ -1950,14 +1995,25 @@ Return ONLY the summary, nothing else."#,
.similarity_score
.map(|s| format!(" [score {:.2}]", s))
.unwrap_or_default();
let preview = match h.snippet.as_deref() {
Some(s) if !s.is_empty() => Self::strip_mark_tags(s),
_ => h.body.clone(),
};
out.push_str(&format!(
"[{}]{} {}{}\n\n",
date, score, direction, h.body
date, score, direction, preview
));
}
out
}
/// Strip the `<mark>` / `</mark>` highlight tags that SMS-API wraps
/// matched terms in. The tags carry no signal beyond what the query
/// already conveys to the LLM, and leaving them in adds prompt noise.
fn strip_mark_tags(s: &str) -> String {
s.replace("<mark>", "").replace("</mark>", "")
}
/// Tool: get_sms_messages — fetch SMS messages near a date for a contact
async fn tool_get_sms_messages(
&self,
@@ -2401,9 +2457,15 @@ Return ONLY the summary, nothing else."#,
limit
);
// Status scope mirrors recall_facts_for_photo: by default
// include both 'active' (agent-generated, not yet reviewed)
// and 'reviewed' (human-verified). The DAO list_entities only
// takes a single status string, so use 'all' here and filter
// out 'rejected' below — slightly broader but fine for the
// recall use case (entities are global, persona-agnostic).
let filter = EntityFilter {
entity_type,
status: Some("active".to_string()),
status: Some("all".to_string()),
search: name_search,
limit,
offset: 0,
@@ -2414,12 +2476,13 @@ Return ONLY the summary, nothing else."#,
.lock()
.expect("Unable to lock KnowledgeDao");
match kdao.list_entities(cx, filter) {
Ok((entities, _total)) if entities.is_empty() => {
Ok((entities, _total)) if entities.iter().all(|e| e.status == "rejected") => {
"No known entities found matching the query.".to_string()
}
Ok((entities, _total)) => {
let lines: Vec<String> = entities
.iter()
.filter(|e| e.status != "rejected")
.map(|e| {
format!(
"ID:{} | {} | {} | {} | confidence:{:.2}",
@@ -2427,7 +2490,11 @@ Return ONLY the summary, nothing else."#,
)
})
.collect();
format!("Known entities:\n{}", lines.join("\n"))
if lines.is_empty() {
"No known entities found matching the query.".to_string()
} else {
format!("Known entities:\n{}", lines.join("\n"))
}
}
Err(e) => format!("Error recalling entities: {:?}", e),
}
@@ -2453,6 +2520,19 @@ Return ONLY the summary, nothing else."#,
log::info!("tool_recall_facts_for_photo: file_path={}", file_path);
// Resolve the persona's reviewed-only-mode flag once. If the
// persona row is missing (shouldn't happen — composite FK
// enforces existence on writes), fall back to the permissive
// default of active+reviewed.
let reviewed_only = {
let mut pdao = self.persona_dao.lock().expect("Unable to lock PersonaDao");
pdao.get_persona(cx, user_id, persona_id)
.ok()
.flatten()
.map(|p| p.reviewed_only_facts)
.unwrap_or(false)
};
let mut kdao = self
.knowledge_dao
.lock()
@@ -2486,7 +2566,18 @@ Return ONLY the summary, nothing else."#,
e.name, e.entity_type, role
));
if let Ok(facts) = kdao.get_facts_for_entity(cx, entity_id, &persona_filter) {
for f in facts.iter().filter(|f| f.status == "active") {
// Default scope: active + reviewed (everything not
// rejected / superseded). Strict mode trims to
// reviewed only — the persona has opted into seeing
// exclusively human-verified facts.
let allow = |s: &str| -> bool {
if reviewed_only {
s == "reviewed"
} else {
s == "active" || s == "reviewed"
}
};
for f in facts.iter().filter(|f| allow(&f.status)) {
let obj = if let Some(ref v) = f.object_value {
v.clone()
} else if let Some(oid) = f.object_entity_id {
@@ -2632,6 +2723,8 @@ Return ONLY the summary, nothing else."#,
file_path: &str,
user_id: i32,
persona_id: &str,
model: &str,
backend: &str,
cx: &opentelemetry::Context,
) -> String {
use crate::database::models::{InsertEntityFact, InsertEntityPhotoLink};
@@ -2672,6 +2765,19 @@ Return ONLY the summary, nothing else."#,
file_path
);
// Anchor the fact in valid-time using the source photo's
// `date_taken` (Apollo's naive-as-UTC convention is fine
// here — we only care about calendar ordering, not absolute
// UTC). The semantic stretch: a photo *evidences* the fact at
// that date — the fact may have started earlier — so this is
// best read as "no later than this it started being true",
// not a strict lower bound. Still useful: gives the curator a
// calendar anchor and lets supersession (next slice) close
// intervals cleanly when a newer fact arrives. valid_until
// stays NULL — a single photo can't tell us when something
// *stopped* being true.
let valid_from = self.fetch_exif(file_path).and_then(|e| e.date_taken);
let fact = InsertEntityFact {
subject_entity_id,
predicate,
@@ -2684,6 +2790,17 @@ Return ONLY the summary, nothing else."#,
created_at: chrono::Utc::now().timestamp(),
persona_id: persona_id.to_string(),
user_id,
valid_from,
valid_until: None,
superseded_by: None,
created_by_model: Some(model.to_string()),
created_by_backend: Some(backend.to_string()),
// Initial write — no modification yet; last_modified_*
// intentionally NULL so the audit feed only shows real
// post-creation changes.
last_modified_by_model: None,
last_modified_by_backend: None,
last_modified_at: None,
};
let mut kdao = self
@@ -2723,6 +2840,144 @@ Return ONLY the summary, nothing else."#,
)
}
/// Tool: update_fact — patch a fact's mutable fields. Gated by the
/// active persona's `allow_agent_corrections` flag at the schema /
/// catalog layer (build_tool_definitions); rechecked here as a
/// defense in depth in case a hallucinated tool call slips
/// through.
async fn tool_update_fact(
&self,
args: &serde_json::Value,
user_id: i32,
persona_id: &str,
model: &str,
backend: &str,
cx: &opentelemetry::Context,
) -> String {
use crate::database::FactPatch;
// Defense-in-depth gate check.
let allowed = {
let mut pdao = self.persona_dao.lock().expect("Unable to lock PersonaDao");
pdao.get_persona(cx, user_id, persona_id)
.ok()
.flatten()
.map(|p| p.allow_agent_corrections)
.unwrap_or(false)
};
if !allowed {
return "Error: agent corrections are disabled for this persona. Ask the operator to flip allow_agent_corrections.".to_string();
}
let fact_id = match args.get("fact_id").and_then(|v| v.as_i64()) {
Some(id) => id as i32,
None => return "Error: missing required parameter 'fact_id'".to_string(),
};
// Build the patch from any fields present on `args`. valid_*
// are tri-state — JSON null → Some(None) → clear back to NULL,
// omitted → None → leave alone, value → Some(Some(value)) →
// set. The match-on-presence pattern below mirrors the HTTP
// PATCH path's serde-helper behaviour.
let parse_optional_i64 = |v: Option<&serde_json::Value>| -> Option<Option<i64>> {
v.map(|val| if val.is_null() { None } else { val.as_i64() })
};
let patch = FactPatch {
predicate: args
.get("predicate")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
object_value: args
.get("object_value")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
status: args
.get("status")
.and_then(|v| v.as_str())
.filter(|s| matches!(*s, "active" | "reviewed" | "rejected"))
.map(|s| s.to_string()),
confidence: args
.get("confidence")
.and_then(|v| v.as_f64())
.map(|f| f.clamp(0.0, 0.95) as f32),
valid_from: parse_optional_i64(args.get("valid_from")),
valid_until: parse_optional_i64(args.get("valid_until")),
};
log::info!("tool_update_fact: fact_id={}", fact_id);
let mut kdao = self
.knowledge_dao
.lock()
.expect("Unable to lock KnowledgeDao");
match kdao.update_fact(cx, fact_id, patch, Some((model, backend))) {
Ok(Some(f)) => format!(
"Updated fact ID:{} (status={}, confidence={:.2})",
f.id, f.status, f.confidence
),
Ok(None) => format!("Error: fact ID:{} not found", fact_id),
Err(e) => format!("Error updating fact: {:?}", e),
}
}
/// Tool: supersede_fact — replace one fact with another. Same
/// gating as update_fact.
async fn tool_supersede_fact(
&self,
args: &serde_json::Value,
user_id: i32,
persona_id: &str,
model: &str,
backend: &str,
cx: &opentelemetry::Context,
) -> String {
let allowed = {
let mut pdao = self.persona_dao.lock().expect("Unable to lock PersonaDao");
pdao.get_persona(cx, user_id, persona_id)
.ok()
.flatten()
.map(|p| p.allow_agent_corrections)
.unwrap_or(false)
};
if !allowed {
return "Error: agent corrections are disabled for this persona.".to_string();
}
let old_fact_id = match args.get("old_fact_id").and_then(|v| v.as_i64()) {
Some(id) => id as i32,
None => return "Error: missing required parameter 'old_fact_id'".to_string(),
};
let new_fact_id = match args.get("new_fact_id").and_then(|v| v.as_i64()) {
Some(id) => id as i32,
None => return "Error: missing required parameter 'new_fact_id'".to_string(),
};
if old_fact_id == new_fact_id {
return "Error: old_fact_id and new_fact_id must differ".to_string();
}
log::info!(
"tool_supersede_fact: old={}, new={}",
old_fact_id,
new_fact_id
);
let mut kdao = self
.knowledge_dao
.lock()
.expect("Unable to lock KnowledgeDao");
match kdao.supersede_fact(cx, old_fact_id, new_fact_id, Some((model, backend))) {
Ok(Some(f)) => format!(
"Superseded fact ID:{} (now status={}, valid_until={:?})",
f.id, f.status, f.valid_until
),
Ok(None) => "Error: old or new fact not found".to_string(),
Err(e) => format!("Error superseding fact: {:?}", e),
}
}
/// Tool: get_current_datetime — returns the current local date and time
fn tool_get_current_datetime() -> String {
let now = Local::now();
@@ -2771,23 +3026,39 @@ Return ONLY the summary, nothing else."#,
tools.push(Tool::function(
"search_messages",
"Search SMS/MMS message bodies. Modes: `fts5` (keyword + phrase + prefix + AND/OR/NOT + NEAR proximity), \
"Search SMS/MMS messages — bodies and (for MMS) attachment text + filenames. \
Modes: `fts5` (keyword + phrase + prefix + AND/OR/NOT + NEAR proximity), \
`semantic` (embedding similarity, requires generated embeddings), `hybrid` (RRF merge, recommended; \
degrades to fts5 when embeddings absent). Optional `start_ts` / `end_ts` (real-UTC unix seconds) and \
`contact_id` filters. For pure date / contact browsing without keywords, prefer `get_sms_messages`. \
Examples: `{query: \"trader joe's\"}` — phrase across all time. \
`{query: \"dinner\", contact_id: 42, start_ts: 1700000000, end_ts: 1700604800}` — keyword within a contact and a week. \
`{query: \"NEAR(meeting work, 5)\"}` — proximity search.",
degrades to fts5 when embeddings absent). Optional filters: `start_ts` / `end_ts` (real-UTC unix \
seconds), `contact_id`, `is_mms` (true = MMS only, false = SMS only), `has_media` (true = messages \
with image/video/audio attachments only). For pure date / contact browsing without keywords, prefer \
`get_sms_messages`. \
\n\nFTS5 query syntax (works in fts5 + hybrid modes):\n\
- Phrase: `\"trader joe's\"` — exact word sequence (use double quotes).\n\
- Prefix: `restaur*` — matches restaurant, restaurants, restauranteur, ….\n\
- Boolean: `dinner AND tahoe`, `wedding OR reception OR ceremony`, `vacation NOT work` (operators must be UPPERCASE).\n\
- Proximity: `NEAR(meeting work, 5)` — both terms within 5 tokens of each other.\n\
- Combine: `(reception OR ceremony) AND tahoe*` — group with parens.\n\
Unquoted multi-word queries are treated as implicit AND. Apostrophes / hyphens / colons are safe — they no longer downgrade to a slow LIKE scan. Use `mode: \"fts5\"` when you want the operators above to be authoritative; `hybrid` still respects them but may surface semantically-similar non-keyword hits alongside.\n\n\
Examples:\n\
- `{query: \"trader joe's\"}` — phrase across all time.\n\
- `{query: \"dinner\", contact_id: 42, start_ts: 1700000000, end_ts: 1700604800}` — keyword within a contact and a week.\n\
- `{query: \"vacation\", has_media: true}` — only matches that include photos / videos.\n\
- `{query: \"wedding OR reception OR ceremony\", mode: \"fts5\"}` — any of several synonyms.\n\
- `{query: \"restaur*\", mode: \"fts5\"}` — prefix expansion for varying word forms.\n\
- `{query: \"NEAR(birthday cake, 5)\", mode: \"fts5\"}` — terms close together but in any order.",
serde_json::json!({
"type": "object",
"required": ["query"],
"properties": {
"query": { "type": "string", "description": "Search query. Min 3 chars. fts5 supports phrase (\"\"), prefix (*), AND/OR/NOT, and NEAR proximity." },
"query": { "type": "string", "description": "Search query. Min 3 chars. fts5 supports phrase (\"\"), prefix (*), AND/OR/NOT, and NEAR proximity. Matches both message body and MMS attachment text/filename." },
"mode": { "type": "string", "enum": ["fts5", "semantic", "hybrid"], "description": "Search strategy. Default: hybrid." },
"limit": { "type": "integer", "description": "Max results (default 20, max 50)." },
"contact_id": { "type": "integer", "description": "Optional numeric contact id to scope the search." },
"start_ts": { "type": "integer", "description": "Optional inclusive lower bound, real-UTC unix seconds." },
"end_ts": { "type": "integer", "description": "Optional inclusive upper bound, real-UTC unix seconds." }
"end_ts": { "type": "integer", "description": "Optional inclusive upper bound, real-UTC unix seconds." },
"is_mms": { "type": "boolean", "description": "Optional: true to restrict to MMS, false to restrict to SMS." },
"has_media":{ "type": "boolean", "description": "Optional: true to restrict to messages with image / video / audio attachments." }
}
}),
));
@@ -2972,6 +3243,55 @@ Return ONLY the summary, nothing else."#,
}),
));
// Self-correction tools — only exposed when the active persona
// has allow_agent_corrections=true. Gating happens here AND in
// the tool method itself (the runtime check is the load-bearing
// one; this just keeps the tool out of the model's catalog so
// it doesn't waste iterations trying calls it can't make).
if opts.allow_agent_corrections {
tools.push(Tool::function(
"update_fact",
"Correct an existing fact in the knowledge memory. Use sparingly — only when you have \
stronger evidence than the original write justified (e.g. a clearer photo, a \
contradicting timestamp on a related fact, or explicit user correction). Common \
patches: tighten `valid_from` / `valid_until` to a known interval, downgrade \
`confidence` after seeing contradicting evidence, or flip `status` to 'reviewed' if \
you've verified the fact independently. Cannot change subject / object — supersede \
instead. Pass `fact_id` plus any subset of patchable fields.",
serde_json::json!({
"type": "object",
"required": ["fact_id"],
"properties": {
"fact_id": { "type": "integer", "description": "ID of the fact to patch (from recall_facts_for_photo or list)." },
"predicate": { "type": "string", "description": "New predicate string. Rare." },
"object_value": { "type": "string", "description": "New free-text object. Use for typed-fact corrections." },
"status": { "type": "string", "description": "'active' | 'reviewed' | 'rejected'. 'superseded' is for the supersede_fact tool." },
"confidence": { "type": "number", "description": "0.00.95. Lower when you've seen contradicting evidence." },
"valid_from": { "type": "integer", "description": "Unix-seconds lower bound on when the fact began being true. Null clears." },
"valid_until": { "type": "integer", "description": "Unix-seconds upper bound on when the fact stopped being true. Null clears." }
}
}),
));
tools.push(Tool::function(
"supersede_fact",
"Mark an old fact as replaced by a newer one. Use when the new fact contradicts the \
old AND the contradiction is a *time-bounded change* (relationship changed, address \
changed, role changed), not a correction of a mistake — for mistakes, set the old \
fact's status to 'rejected' via update_fact. Atomically: flips old.status to \
'superseded', points old.superseded_by at the new fact, and stamps old.valid_until \
from new.valid_from (when not already set) so the two intervals are disjoint.",
serde_json::json!({
"type": "object",
"required": ["old_fact_id", "new_fact_id"],
"properties": {
"old_fact_id": { "type": "integer", "description": "The fact being replaced." },
"new_fact_id": { "type": "integer", "description": "The fact that replaces it. Must already exist (use store_fact first if you're recording a new one)." }
}
}),
));
}
tools.push(Tool::function(
"get_current_datetime",
"Get the current date and time. Useful when reasoning about how long ago a photo was taken.",
@@ -3204,6 +3524,8 @@ Return ONLY the summary, nothing else."#,
— surrounding events matter even when a contact is known.\n\
- Use recall_facts_for_photo + recall_entities to load any prior knowledge about subjects in the photo.\n\
- When you identify people / places / events / things, use store_entity + store_fact to grow the persistent memory.\n\
- Before store_entity, call recall_entities to check whether a similar name already exists; reuse the existing entity_id rather than creating a near-duplicate (e.g. \"Sara\" vs \"Sarah J.\"). The DAO will collapse obvious cosine matches, but choosing the existing id keeps facts and photo links consolidated.\n\
- Predicates should be relationship-shaped verbs that encode a queryable claim — `lives_in`, `works_at`, `attended`, `is_friend_of`, `is_parent_of`, `interested_in`, `married_to`, `owns`. DO NOT use vague speech-act predicates like `expressed`, `said`, `mentioned`, `stated`, `quoted`, `noted`, `discussed`, `thought`, `wondered`. DO NOT store quotations or sentence fragments as `object_value` — paraphrase into a structured claim. Bad: `(Cameron, expressed, \"I'm tempted to get a part-time job there\")`. Good: `(Cameron, considered_employment_at, <Place>)` or `(Cameron, interested_in, \"part-time work\")`.\n\
- A tool returning no results is informative; continue with the others.",
);
@@ -3713,6 +4035,8 @@ Return ONLY the summary, nothing else."#,
&file_path,
user_id,
&persona_id,
chat_backend.primary_model(),
&backend_label,
&loop_cx,
)
.await;
@@ -3928,6 +4252,7 @@ mod tests {
calendar_present: false,
location_history_present: false,
faces_present: false,
allow_agent_corrections: false,
};
let tools = InsightGenerator::build_tool_definitions(opts);
let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect();
@@ -3950,6 +4275,9 @@ mod tests {
assert!(!names.contains(&"get_calendar_events"));
assert!(!names.contains(&"get_location_history"));
assert!(!names.contains(&"get_faces_in_photo"));
// Agent-correction tools are absent without the gate.
assert!(!names.contains(&"update_fact"));
assert!(!names.contains(&"supersede_fact"));
}
#[test]
@@ -3961,6 +4289,7 @@ mod tests {
calendar_present: true,
location_history_present: true,
faces_present: true,
allow_agent_corrections: true,
};
let tools = InsightGenerator::build_tool_definitions(opts);
let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect();
@@ -3970,6 +4299,8 @@ mod tests {
assert!(names.contains(&"get_calendar_events"));
assert!(names.contains(&"get_location_history"));
assert!(names.contains(&"get_faces_in_photo"));
assert!(names.contains(&"update_fact"));
assert!(names.contains(&"supersede_fact"));
}
fn place(name: &str, description: &str) -> ApolloPlace {
@@ -4143,6 +4474,92 @@ mod tests {
);
}
fn make_search_hit(
id: i64,
contact: &str,
body: &str,
snippet: Option<&str>,
type_: i32,
) -> SmsSearchHit {
SmsSearchHit {
message_id: id,
contact_name: contact.to_string(),
contact_address: "+15551234567".to_string(),
body: body.to_string(),
date: 1_700_000_000 + id * 86_400,
type_,
similarity_score: None,
snippet: snippet.map(|s| s.to_string()),
}
}
#[test]
fn format_search_hits_falls_back_to_body_when_no_snippet() {
// fts5 mode often returns no snippet for messages that didn't need
// excerpting (whole body short, or semantic-only hit in hybrid).
let hit = make_search_hit(1, "Sarah", "see you at the lake tomorrow", None, 1);
let out = InsightGenerator::format_search_hits(&[hit], "fts5", false);
assert!(out.starts_with("Found 1 messages (mode: fts5):"));
assert!(out.contains("see you at the lake tomorrow"));
assert!(out.contains("Sarah —"));
assert!(!out.contains("date-filtered"));
}
#[test]
fn format_search_hits_prefers_snippet_over_body_and_strips_marks() {
let hit = make_search_hit(
2,
"Sarah",
"long body text that should be ignored once the server returns a focused snippet",
Some("…at the <mark>lake</mark> tomorrow…"),
1,
);
let out = InsightGenerator::format_search_hits(&[hit], "hybrid", true);
// Snippet wins over body, <mark> tags stripped.
assert!(out.contains("at the lake tomorrow"));
assert!(!out.contains("<mark>"));
assert!(!out.contains("</mark>"));
assert!(!out.contains("long body text"));
// Date-filter flag flows through to the header.
assert!(out.contains("date-filtered"));
}
#[test]
fn format_search_hits_surfaces_mms_attachment_only_matches() {
// Regression guard: pre-snippet, MMS rows that matched via
// message_parts_fts (filename / part text) had an empty `body`
// and rendered as a blank preview to the LLM.
let hit = make_search_hit(3, "Mom", "", Some("<mark>birthday_cake.jpg</mark>"), 1);
let out = InsightGenerator::format_search_hits(&[hit], "fts5", false);
assert!(out.contains("birthday_cake.jpg"));
assert!(!out.contains("<mark>"));
assert!(out.contains("Mom —"));
}
#[test]
fn format_search_hits_empty_snippet_falls_back_to_body() {
let hit = make_search_hit(4, "Dad", "fallback body", Some(""), 1);
let out = InsightGenerator::format_search_hits(&[hit], "fts5", false);
assert!(out.contains("fallback body"));
}
#[test]
fn strip_mark_tags_handles_common_patterns() {
assert_eq!(InsightGenerator::strip_mark_tags("plain text"), "plain text");
assert_eq!(
InsightGenerator::strip_mark_tags("…the <mark>lake</mark>…"),
"…the lake…"
);
// FTS5 highlights every match — multiple marks per snippet are normal.
assert_eq!(
InsightGenerator::strip_mark_tags("<mark>dinner</mark> at <mark>tahoe</mark>"),
"dinner at tahoe"
);
}
#[test]
fn summarize_search_rag_counts_hits() {
let raw = "[2023-08-15] Sarah: venue confirmed\n\n[2023-08-14] Mom: travel plans\n\n[2023-08-13] Dad: weather";

View File

@@ -257,30 +257,45 @@ impl SmsApiClient {
}
/// Search message bodies via the Django side's FTS5 / semantic / hybrid
/// endpoint. `mode` selects the ranking strategy:
/// endpoint. `params.mode` selects the ranking strategy:
/// - "fts5" keyword-only, supports phrase / prefix / boolean / NEAR
/// - "semantic" embedding similarity
/// - "hybrid" both merged via reciprocal rank fusion (recommended)
///
/// The SMS-API endpoint accepts `contact_id` natively; date filtering is
/// the caller's responsibility (post-filter on the returned rows).
pub async fn search_messages_with_contact(
/// All of `contact_id`, `date_from` / `date_to` (unix seconds), `is_mms`,
/// `has_media`, and `offset` are pushed to SMS-API server-side so the
/// filtered+paginated result set is exact rather than a client-side
/// over-fetch.
pub async fn search_messages(
&self,
query: &str,
mode: &str,
limit: usize,
contact_id: Option<i64>,
params: &SmsSearchParams<'_>,
) -> Result<Vec<SmsSearchHit>> {
let mut url = format!(
"{}/api/messages/search/?q={}&mode={}&limit={}",
self.base_url,
urlencoding::encode(query),
urlencoding::encode(mode),
limit
urlencoding::encode(params.mode),
params.limit,
);
if let Some(cid) = contact_id {
if let Some(cid) = params.contact_id {
url.push_str(&format!("&contact_id={}", cid));
}
if let Some(off) = params.offset {
url.push_str(&format!("&offset={}", off));
}
if let Some(from) = params.date_from {
url.push_str(&format!("&date_from={}", from));
}
if let Some(to) = params.date_to {
url.push_str(&format!("&date_to={}", to));
}
if let Some(is_mms) = params.is_mms {
url.push_str(&format!("&is_mms={}", is_mms));
}
if let Some(has_media) = params.has_media {
url.push_str(&format!("&has_media={}", has_media));
}
let mut request = self.client.get(&url);
if let Some(token) = &self.token {
@@ -383,6 +398,30 @@ pub struct SmsSearchHit {
/// Present for semantic / hybrid modes; absent for fts5.
#[serde(default)]
pub similarity_score: Option<f32>,
/// SMS-API-generated excerpt around the match, wrapped in `<mark>` tags.
/// For MMS messages that only matched via attachment text / filename
/// (empty `body`), the snippet is the only meaningful preview.
#[serde(default)]
pub snippet: Option<String>,
}
/// Optional filter / paging knobs for [`SmsApiClient::search_messages`].
/// All fields except `mode` and `limit` map 1:1 to the same-named SMS-API
/// query params (added in the 2026-05 search-enhancements release).
#[derive(Debug, Clone)]
pub struct SmsSearchParams<'a> {
pub mode: &'a str,
pub limit: usize,
pub contact_id: Option<i64>,
/// Unix-seconds inclusive lower bound on `date`.
pub date_from: Option<i64>,
/// Unix-seconds inclusive upper bound on `date`.
pub date_to: Option<i64>,
/// `Some(true)` = MMS only, `Some(false)` = SMS only, `None` = both.
pub is_mms: Option<bool>,
/// `Some(true)` = only messages with image/video/audio attachments.
pub has_media: Option<bool>,
pub offset: Option<usize>,
}
#[derive(Deserialize)]

View File

@@ -185,6 +185,9 @@ async fn main() -> anyhow::Result<()> {
Arc::new(Mutex::new(Box::new(SqliteKnowledgeDao::new())));
let face_dao: Arc<Mutex<Box<dyn FaceDao>>> =
Arc::new(Mutex::new(Box::new(SqliteFaceDao::new())));
let persona_dao: Arc<Mutex<Box<dyn image_api::database::PersonaDao>>> = Arc::new(Mutex::new(
Box::new(image_api::database::SqlitePersonaDao::new()),
));
// Pass the full library set so `resolve_full_path` probes every root,
// even when --library restricts the walk. A rel_path shared across
@@ -203,6 +206,7 @@ async fn main() -> anyhow::Result<()> {
tag_dao,
face_dao,
knowledge_dao,
persona_dao,
all_libs.clone(),
);

View File

@@ -204,19 +204,24 @@ impl InsightDao for SqliteInsightDao {
lib_id: i32,
path: &str,
) -> Result<Option<PhotoInsight>, DbError> {
trace_db_call(context, "query", "get_current_insight_for_library", |_span| {
use schema::photo_insights::dsl::*;
trace_db_call(
context,
"query",
"get_current_insight_for_library",
|_span| {
use schema::photo_insights::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
photo_insights
.filter(library_id.eq(lib_id))
.filter(rel_path.eq(path))
.filter(is_current.eq(true))
.first::<PhotoInsight>(connection.deref_mut())
.optional()
.map_err(|_| anyhow::anyhow!("Query error"))
})
photo_insights
.filter(library_id.eq(lib_id))
.filter(rel_path.eq(path))
.filter(is_current.eq(true))
.first::<PhotoInsight>(connection.deref_mut())
.optional()
.map_err(|_| anyhow::anyhow!("Query error"))
},
)
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}

File diff suppressed because it is too large Load Diff

View File

@@ -59,8 +59,8 @@ pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao};
pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao};
pub use insights_dao::{InsightDao, SqliteInsightDao};
pub use knowledge_dao::{
EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, RecentActivity,
SqliteKnowledgeDao,
ConsolidationGroup, EntityFilter, EntityGraph, EntityPatch, EntitySort, FactFilter, FactPatch,
KnowledgeDao, PersonaFilter, RecentActivity, SqliteKnowledgeDao,
};
pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao};
pub use persona_dao::{ImportPersona, PersonaDao, PersonaPatch, SqlitePersonaDao};

View File

@@ -249,6 +249,33 @@ pub struct InsertEntityFact {
/// persona must not see each other's facts. Always paired with
/// `persona_id` — they're a unit.
pub user_id: i32,
/// Real-world period the fact is/was true (unix seconds). NULL on
/// either side = unbounded — `valid_from IS NULL` reads as
/// "always-true-back-to-the-beginning", `valid_until IS NULL` as
/// "still-true-now-or-unknown". Distinguishes valid time from
/// transaction time (`created_at` is when we recorded the fact,
/// not when it was true in the world). See migration
/// 2026-05-10-000100.
pub valid_from: Option<i64>,
pub valid_until: Option<i64>,
/// Points at the entity_facts.id that replaced this one. Set by
/// the supersede endpoint; status flips to 'superseded' in the
/// same transaction. See migration 2026-05-10-000200.
pub superseded_by: Option<i32>,
/// Provenance for model audit — see migration 2026-05-10-000300.
/// `created_by_model` is the LLM identifier (e.g. "qwen2.5:7b",
/// "anthropic/claude-sonnet-4") or NULL for legacy / manual rows.
/// `created_by_backend` is "local" / "hybrid" / "manual" / NULL.
pub created_by_model: Option<String>,
pub created_by_backend: Option<String>,
/// Audit trail for mutations after creation — see migration
/// 2026-05-10-000500. `last_modified_*` stamp on any update
/// (status flip, valid-time edit, supersede, manual PATCH);
/// `last_modified_at` is unix seconds. NULL on rows that have
/// never been touched since creation.
pub last_modified_by_model: Option<String>,
pub last_modified_by_backend: Option<String>,
pub last_modified_at: Option<i64>,
}
#[derive(Serialize, Queryable, Clone, Debug)]
@@ -265,6 +292,14 @@ pub struct EntityFact {
pub created_at: i64,
pub persona_id: String,
pub user_id: i32,
pub valid_from: Option<i64>,
pub valid_until: Option<i64>,
pub superseded_by: Option<i32>,
pub created_by_model: Option<String>,
pub created_by_backend: Option<String>,
pub last_modified_by_model: Option<String>,
pub last_modified_by_backend: Option<String>,
pub last_modified_at: Option<i64>,
}
#[derive(Insertable)]
@@ -300,6 +335,15 @@ pub struct InsertPersona<'a> {
pub include_all_memories: bool,
pub created_at: i64,
pub updated_at: i64,
/// "Strict mode" — agent reads only see facts with status =
/// 'reviewed' (human-verified). Default false. See migration
/// 2026-05-10-000400.
pub reviewed_only_facts: bool,
/// Gate for the agent's update_fact / supersede_fact tools.
/// Default false — fresh personas let the agent create but not
/// alter or replace. Operator opts in once a model has earned
/// trust. See migration 2026-05-10-000500.
pub allow_agent_corrections: bool,
}
#[derive(Serialize, Queryable, Clone, Debug)]
@@ -313,6 +357,8 @@ pub struct Persona {
pub include_all_memories: bool,
pub created_at: i64,
pub updated_at: i64,
pub reviewed_only_facts: bool,
pub allow_agent_corrections: bool,
}
#[derive(Insertable)]

View File

@@ -17,6 +17,8 @@ pub struct PersonaPatch {
pub name: Option<String>,
pub system_prompt: Option<String>,
pub include_all_memories: Option<bool>,
pub reviewed_only_facts: Option<bool>,
pub allow_agent_corrections: Option<bool>,
}
/// One row of a bulk migration upload. Fields named to match the JSON
@@ -164,6 +166,8 @@ impl PersonaDao for SqlitePersonaDao {
include_all_memories: include_all,
created_at: now,
updated_at: now,
reviewed_only_facts: false,
allow_agent_corrections: false,
})
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
@@ -211,6 +215,24 @@ impl PersonaDao for SqlitePersonaDao {
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update include_all error: {}", e))?;
}
if let Some(new_reviewed_only) = patch.reviewed_only_facts {
diesel::update(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid)))
.set((
reviewed_only_facts.eq(new_reviewed_only),
updated_at.eq(now),
))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update reviewed_only_facts error: {}", e))?;
}
if let Some(new_allow_corrections) = patch.allow_agent_corrections {
diesel::update(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid)))
.set((
allow_agent_corrections.eq(new_allow_corrections),
updated_at.eq(now),
))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update allow_agent_corrections error: {}", e))?;
}
personas
.filter(user_id.eq(uid))
@@ -384,6 +406,8 @@ mod tests {
name: Some("Renamed".into()),
system_prompt: Some("new prompt".into()),
include_all_memories: None,
reviewed_only_facts: None,
allow_agent_corrections: None,
},
)
.unwrap()
@@ -412,6 +436,8 @@ mod tests {
name: None,
system_prompt: None,
include_all_memories: Some(true),
reviewed_only_facts: None,
allow_agent_corrections: None,
},
)
.unwrap()

View File

@@ -59,6 +59,14 @@ diesel::table! {
created_at -> BigInt,
persona_id -> Text,
user_id -> Integer,
valid_from -> Nullable<BigInt>,
valid_until -> Nullable<BigInt>,
superseded_by -> Nullable<Integer>,
created_by_model -> Nullable<Text>,
created_by_backend -> Nullable<Text>,
last_modified_by_model -> Nullable<Text>,
last_modified_by_backend -> Nullable<Text>,
last_modified_at -> Nullable<BigInt>,
}
}
@@ -172,6 +180,8 @@ diesel::table! {
include_all_memories -> Bool,
created_at -> BigInt,
updated_at -> BigInt,
reviewed_only_facts -> Bool,
allow_agent_corrections -> Bool,
}
}

View File

@@ -2992,9 +2992,12 @@ mod tests {
status: "detected".into(),
model_version: "buffalo_l".into(),
};
dao.store_detection(&ctx(), mk_row("a1", Some(alice.id))).unwrap();
dao.store_detection(&ctx(), mk_row("a2", Some(alice.id))).unwrap();
dao.store_detection(&ctx(), mk_row("b1", Some(bob.id))).unwrap();
dao.store_detection(&ctx(), mk_row("a1", Some(alice.id)))
.unwrap();
dao.store_detection(&ctx(), mk_row("a2", Some(alice.id)))
.unwrap();
dao.store_detection(&ctx(), mk_row("b1", Some(bob.id)))
.unwrap();
dao.store_detection(&ctx(), mk_row("u1", None)).unwrap();
// person_id=alice returns only alice's two faces — ignoring the
@@ -3004,9 +3007,11 @@ mod tests {
.list_embeddings(&ctx(), None, true, Some(alice.id), 100, 0)
.unwrap();
assert_eq!(alice_rows.len(), 2);
assert!(alice_rows
.iter()
.all(|(r, _)| r.person_id == Some(alice.id)));
assert!(
alice_rows
.iter()
.all(|(r, _)| r.person_id == Some(alice.id))
);
// unassigned=true with no person_id behaves as before.
let unassigned_rows = dao

View File

@@ -5,11 +5,13 @@ use serde::{Deserialize, Serialize};
use std::sync::Mutex;
use crate::data::Claims;
use crate::database::models::{Entity, EntityFact, EntityPhotoLink};
use crate::database::models::{Entity, EntityFact, EntityPhotoLink, InsertEntityFact};
use crate::database::{
EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, RecentActivity,
ConsolidationGroup, EntityFilter, EntityGraph, EntityPatch, EntitySort, FactFilter, FactPatch,
KnowledgeDao, PersonaFilter, RecentActivity,
};
use crate::personas::PersonaDaoData;
use crate::state::AppState;
/// Resolve the `X-Persona-Id` header into a `PersonaFilter`. Missing
/// header → `'default'`. If the persona has `include_all_memories=true`,
@@ -57,6 +59,24 @@ pub struct EntitySummary {
pub status: String,
pub created_at: i64,
pub updated_at: i64,
/// Persona-scoped count of non-rejected facts about this entity
/// (subject side). 0 when not provided by the call site, e.g.
/// PATCH responses return the bare entity without scoping context.
#[serde(skip_serializing_if = "Option::is_none")]
pub fact_count: Option<i64>,
/// Per-persona breakdown of fact counts for this entity, scoped
/// to the active user. Lets the curation UI surface "this entity
/// is empty under your active persona but has 12 facts in
/// journal" so you know which persona owns the existing
/// knowledge. Skipped on serialization when None.
#[serde(skip_serializing_if = "Option::is_none")]
pub persona_breakdown: Option<Vec<PersonaCount>>,
}
#[derive(Serialize)]
pub struct PersonaCount {
pub persona_id: String,
pub count: i64,
}
impl From<Entity> for EntitySummary {
@@ -70,10 +90,30 @@ impl From<Entity> for EntitySummary {
status: e.status,
created_at: e.created_at,
updated_at: e.updated_at,
fact_count: None,
persona_breakdown: None,
}
}
}
impl EntitySummary {
fn from_entity_with_count(e: Entity, fact_count: i64) -> Self {
let mut s = EntitySummary::from(e);
s.fact_count = Some(fact_count);
s
}
fn with_persona_breakdown(mut self, breakdown: Vec<(String, i64)>) -> Self {
self.persona_breakdown = Some(
breakdown
.into_iter()
.map(|(persona_id, count)| PersonaCount { persona_id, count })
.collect(),
);
self
}
}
#[derive(Serialize)]
pub struct EntityListResponse {
pub entities: Vec<EntitySummary>,
@@ -94,10 +134,41 @@ pub struct FactDetail {
pub source_photo: Option<String>,
pub source_insight_id: Option<i32>,
pub created_at: i64,
/// Real-world valid-time interval. NULL on either side means
/// unbounded; both NULL = "always true" / validity unknown.
/// Distinct from `created_at` (transaction time — when we
/// recorded it). See migration 2026-05-10-000100.
pub valid_from: Option<i64>,
pub valid_until: Option<i64>,
/// Points at the entity_facts.id that replaced this one (Phase 2
/// supersession, migration 2026-05-10-000200). Only set when
/// status == 'superseded'.
pub superseded_by: Option<i32>,
/// Provenance — see migration 2026-05-10-000300. NULL on legacy
/// rows. `created_by_backend` is "local" / "hybrid" / "manual".
pub created_by_model: Option<String>,
pub created_by_backend: Option<String>,
/// Audit trail — see migration 2026-05-10-000500. Set on any
/// post-creation mutation. NULL on rows that have never been
/// touched after they were first written.
pub last_modified_by_model: Option<String>,
pub last_modified_by_backend: Option<String>,
pub last_modified_at: Option<i64>,
/// Set when another active fact has the same subject+predicate,
/// a different object, AND their valid-time intervals overlap.
/// Detected at read time by the get_entity handler grouping
/// facts by predicate. Some predicates are legitimately
/// multi-valued ("tagged_in", "friend_of") so this is a *signal*
/// for the curator, not a hard invariant. The interval check
/// keeps "lives_in NYC 2018-2020" + "lives_in SF 2020-present"
/// from false-positive flagging.
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub in_conflict: bool,
}
#[derive(Serialize)]
pub struct PhotoLinkDetail {
pub library_id: i32,
pub file_path: String,
pub role: String,
}
@@ -105,6 +176,7 @@ pub struct PhotoLinkDetail {
impl From<EntityPhotoLink> for PhotoLinkDetail {
fn from(l: EntityPhotoLink) -> Self {
PhotoLinkDetail {
library_id: l.library_id,
file_path: l.file_path,
role: l.role,
}
@@ -123,6 +195,11 @@ pub struct EntityDetailResponse {
pub updated_at: i64,
pub facts: Vec<FactDetail>,
pub photo_links: Vec<PhotoLinkDetail>,
/// Per-persona fact counts for the active user. Mirrors the
/// same field on EntitySummary; the detail panel surfaces a
/// clickable list so the curator can switch to the persona
/// that owns existing facts about this entity.
pub persona_breakdown: Vec<PersonaCount>,
}
#[derive(Serialize)]
@@ -171,12 +248,58 @@ pub struct EntityPatchRequest {
pub confidence: Option<f32>,
}
/// Serde helper for the "tri-state" pattern: distinguish "field
/// omitted" from "field sent as null". Used for nullable columns
/// where we want PATCH to support both "leave alone" and "set NULL".
fn deserialize_optional_nullable_i64<'de, D>(d: D) -> Result<Option<Option<i64>>, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(Some(Option::<i64>::deserialize(d)?))
}
#[derive(Deserialize)]
pub struct FactPatchRequest {
pub predicate: Option<String>,
pub object_value: Option<String>,
pub status: Option<String>,
pub confidence: Option<f32>,
/// Tri-state: missing = leave alone, null = clear to NULL, number
/// = set. See `deserialize_optional_nullable_i64`.
#[serde(default, deserialize_with = "deserialize_optional_nullable_i64")]
pub valid_from: Option<Option<i64>>,
#[serde(default, deserialize_with = "deserialize_optional_nullable_i64")]
pub valid_until: Option<Option<i64>>,
}
#[derive(Deserialize)]
pub struct SupersedeRequest {
/// The id of the new fact that replaces the path-params one.
pub by_fact_id: i32,
}
#[derive(Deserialize)]
pub struct SynthesizeMergeRequest {
pub source_id: i32,
pub target_id: i32,
}
#[derive(serde::Serialize)]
pub struct SynthesizeMergeResponse {
pub proposed_description: String,
pub model_used: String,
}
#[derive(Deserialize)]
pub struct FactCreateRequest {
pub subject_entity_id: i32,
pub predicate: String,
pub object_entity_id: Option<i32>,
pub object_value: Option<String>,
pub source_photo: Option<String>,
pub confidence: Option<f32>,
pub valid_from: Option<i64>,
pub valid_until: Option<i64>,
}
#[derive(Deserialize)]
@@ -185,6 +308,9 @@ pub struct EntityListQuery {
pub entity_type: Option<String>,
pub status: Option<String>,
pub search: Option<String>,
/// "updated" (default) | "name" | "count". `count` is persona-scoped
/// via the X-Persona-Id header.
pub sort: Option<String>,
pub limit: Option<i64>,
pub offset: Option<i64>,
}
@@ -204,6 +330,77 @@ pub struct RecentQuery {
pub limit: Option<i64>,
}
#[derive(Deserialize)]
pub struct GraphQuery {
#[serde(rename = "type")]
pub entity_type: Option<String>,
pub limit: Option<i64>,
}
#[derive(Serialize)]
pub struct GraphNodeView {
pub id: i32,
pub name: String,
pub entity_type: String,
pub fact_count: i64,
}
#[derive(Serialize)]
pub struct GraphEdgeView {
pub source: i32,
pub target: i32,
pub predicate: String,
pub count: i64,
}
#[derive(Serialize)]
pub struct GraphResponse {
pub nodes: Vec<GraphNodeView>,
pub edges: Vec<GraphEdgeView>,
}
#[derive(Deserialize)]
pub struct PredicateStatsQuery {
pub limit: Option<i64>,
}
#[derive(Serialize)]
pub struct PredicateStat {
pub predicate: String,
pub count: i64,
}
#[derive(Serialize)]
pub struct PredicateStatsResponse {
pub predicates: Vec<PredicateStat>,
}
#[derive(Serialize)]
pub struct BulkRejectResponse {
pub rejected: usize,
}
#[derive(Deserialize)]
pub struct ConsolidationQuery {
/// Cosine threshold for clustering. Default 0.85 — looser than
/// the upsert-time guard (0.92) so this view surfaces "probably
/// same" pairs for human review.
pub threshold: Option<f32>,
pub limit: Option<i64>,
}
#[derive(Serialize)]
pub struct ConsolidationGroupView {
pub entities: Vec<EntitySummary>,
pub min_cosine: f32,
pub max_cosine: f32,
}
#[derive(Serialize)]
pub struct ConsolidationResponse {
pub groups: Vec<ConsolidationGroupView>,
}
// ---------------------------------------------------------------------------
// Service registration
// ---------------------------------------------------------------------------
@@ -216,19 +413,44 @@ where
web::scope("/knowledge")
.service(web::resource("/entities").route(web::get().to(list_entities::<D>)))
.service(web::resource("/entities/merge").route(web::post().to(merge_entities::<D>)))
.service(
web::resource("/entities/synthesize-merge")
.route(web::post().to(synthesize_merge::<D>)),
)
.service(
web::resource("/entities/{id}")
.route(web::get().to(get_entity::<D>))
.route(web::patch().to(patch_entity::<D>))
.route(web::delete().to(delete_entity::<D>)),
)
.service(web::resource("/facts").route(web::get().to(list_facts::<D>)))
.service(
web::resource("/facts")
.route(web::get().to(list_facts::<D>))
.route(web::post().to(create_fact::<D>)),
)
.service(
web::resource("/facts/{id}")
.route(web::patch().to(patch_fact::<D>))
.route(web::delete().to(delete_fact::<D>)),
)
.service(web::resource("/recent").route(web::get().to(get_recent::<D>))),
.service(
web::resource("/facts/{id}/supersede").route(web::post().to(supersede_fact::<D>)),
)
.service(web::resource("/facts/{id}/restore").route(web::post().to(restore_fact::<D>)))
.service(web::resource("/recent").route(web::get().to(get_recent::<D>)))
.service(
web::resource("/consolidation-proposals")
.route(web::get().to(get_consolidation_proposals::<D>)),
)
.service(web::resource("/graph").route(web::get().to(get_graph::<D>)))
.service(
web::resource("/predicate-stats")
.route(web::get().to(get_predicate_stats::<D>)),
)
.service(
web::resource("/predicates/{predicate}/bulk-reject")
.route(web::post().to(bulk_reject_predicate::<D>)),
),
)
}
@@ -237,9 +459,11 @@ where
// ---------------------------------------------------------------------------
async fn list_entities<D: KnowledgeDao + 'static>(
_claims: Claims,
req: HttpRequest,
claims: Claims,
query: web::Query<EntityListQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let limit = query.limit.unwrap_or(50).min(200);
let offset = query.offset.unwrap_or(0);
@@ -250,6 +474,15 @@ async fn list_entities<D: KnowledgeDao + 'static>(
Some(s) => Some(s.to_string()),
};
let sort = match query.sort.as_deref() {
Some("name") => EntitySort::NameAsc,
Some("count") => EntitySort::FactCountDesc,
// "updated" or anything else falls through to the default.
_ => EntitySort::UpdatedDesc,
};
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let filter = EntityFilter {
entity_type: query.entity_type.clone(),
status: status_filter,
@@ -260,10 +493,26 @@ async fn list_entities<D: KnowledgeDao + 'static>(
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.list_entities(&cx, filter) {
Ok((entities, total)) => {
let summaries: Vec<EntitySummary> =
entities.into_iter().map(EntitySummary::from).collect();
match dao.list_entities_with_fact_counts(&cx, filter, sort, &persona) {
Ok((pairs, total)) => {
// Batch fetch persona breakdowns so the list-row tooltip
// and detail panel can show "0 here · 12 in journal".
// One extra query for the visible page.
let entity_ids: Vec<i32> = pairs.iter().map(|(e, _)| e.id).collect();
let breakdowns = dao
.get_persona_breakdowns_for_entities(&cx, &entity_ids, persona.user_id())
.unwrap_or_default();
let summaries: Vec<EntitySummary> = pairs
.into_iter()
.map(|(e, c)| {
let entity_id = e.id;
let summary = EntitySummary::from_entity_with_count(e, c);
match breakdowns.get(&entity_id) {
Some(bd) => summary.with_persona_breakdown(bd.clone()),
None => summary,
}
})
.collect();
HttpResponse::Ok().json(EntityListResponse {
entities: summaries,
total,
@@ -334,9 +583,69 @@ async fn get_entity<D: KnowledgeDao + 'static>(
source_photo: f.source_photo,
source_insight_id: f.source_insight_id,
created_at: f.created_at,
valid_from: f.valid_from,
valid_until: f.valid_until,
superseded_by: f.superseded_by,
created_by_model: f.created_by_model,
created_by_backend: f.created_by_backend,
last_modified_by_model: f.last_modified_by_model,
last_modified_by_backend: f.last_modified_by_backend,
last_modified_at: f.last_modified_at,
in_conflict: false,
});
}
// Conflict detection: within the active set, group by predicate;
// for each pair within a group that disagrees on the object,
// flag both only if their valid-time intervals overlap. NULL on
// either bound treats that side as unbounded — a fact with no
// valid-time data still flags against any time period (worst case
// for legacy data; user adds dates to suppress).
fn intervals_overlap(a: (Option<i64>, Option<i64>), b: (Option<i64>, Option<i64>)) -> bool {
let a_lo = a.0.unwrap_or(i64::MIN);
let a_hi = a.1.unwrap_or(i64::MAX);
let b_lo = b.0.unwrap_or(i64::MIN);
let b_hi = b.1.unwrap_or(i64::MAX);
a_lo < b_hi && b_lo < a_hi
}
{
use std::collections::{HashMap, HashSet};
let mut by_predicate: HashMap<String, Vec<usize>> = HashMap::new();
for (idx, f) in facts.iter().enumerate() {
if f.status == "active" {
by_predicate
.entry(f.predicate.clone())
.or_default()
.push(idx);
}
}
let mut to_flag: HashSet<usize> = HashSet::new();
for indices in by_predicate.values() {
if indices.len() < 2 {
continue;
}
for (a_pos, &i) in indices.iter().enumerate() {
for &j in &indices[a_pos + 1..] {
let same_object = facts[i].object_entity_id == facts[j].object_entity_id
&& facts[i].object_value == facts[j].object_value;
if same_object {
continue;
}
if intervals_overlap(
(facts[i].valid_from, facts[i].valid_until),
(facts[j].valid_from, facts[j].valid_until),
) {
to_flag.insert(i);
to_flag.insert(j);
}
}
}
}
for i in to_flag {
facts[i].in_conflict = true;
}
}
// Fetch photo links
let photo_links: Vec<PhotoLinkDetail> = match dao.get_links_for_entity(&cx, entity_id) {
Ok(links) => links.into_iter().map(PhotoLinkDetail::from).collect(),
@@ -347,6 +656,18 @@ async fn get_entity<D: KnowledgeDao + 'static>(
}
};
// Per-persona breakdown for the detail panel's "facts live in
// {persona}" block — same data the list-row tooltip reads. One
// query, single entity in scope.
let persona_breakdown: Vec<PersonaCount> = dao
.get_persona_breakdowns_for_entities(&cx, &[entity_id], persona.user_id())
.ok()
.and_then(|mut map| map.remove(&entity_id))
.unwrap_or_default()
.into_iter()
.map(|(persona_id, count)| PersonaCount { persona_id, count })
.collect();
HttpResponse::Ok().json(EntityDetailResponse {
id: entity.id,
name: entity.name,
@@ -358,6 +679,7 @@ async fn get_entity<D: KnowledgeDao + 'static>(
updated_at: entity.updated_at,
facts,
photo_links,
persona_breakdown,
})
}
@@ -461,6 +783,138 @@ async fn merge_entities<D: KnowledgeDao + 'static>(
}
}
/// Preview a merged-description before the actual merge fires. Calls
/// the local Ollama with both entities' names + descriptions and
/// returns a synthesized rewrite that combines them. The curator
/// previews, edits, and either accepts (PATCH target's description
/// then POST /merge) or skips (just /merge as-is).
///
/// Deliberately doesn't touch the database — read-only on entities,
/// no LLM call gets to write anything. If the model is unavailable
/// the handler returns 503 so the UI can degrade gracefully (skip
/// the preview, fall back to the existing merge action).
async fn synthesize_merge<D: KnowledgeDao + 'static>(
_claims: Claims,
body: web::Json<SynthesizeMergeRequest>,
dao: web::Data<Mutex<D>>,
app_state: web::Data<AppState>,
) -> impl Responder {
if body.source_id == body.target_id {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "source_id and target_id must differ"}));
}
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
let source = match dao.get_entity_by_id(&cx, body.source_id) {
Ok(Some(e)) => e,
Ok(None) => {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "source entity not found"}));
}
Err(e) => {
log::error!("synthesize_merge source lookup: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
let target = match dao.get_entity_by_id(&cx, body.target_id) {
Ok(Some(e)) => e,
Ok(None) => {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "target entity not found"}));
}
Err(e) => {
log::error!("synthesize_merge target lookup: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Drop the DAO lock before the LLM call — the generate request
// is the slow part (seconds) and we don't want to block other
// knowledge reads while it runs.
drop(dao);
let source_desc = if source.description.trim().is_empty() {
"(none)".to_string()
} else {
source.description.clone()
};
let target_desc = if target.description.trim().is_empty() {
"(none)".to_string()
} else {
target.description.clone()
};
let system = "You are condensing two stored entity descriptions into one. The two \
entities refer to the same real-world thing and are about to be merged. Write a \
single neutral third-person description (1-2 sentences, max 300 chars) that \
preserves any concrete facts in either source. Do not invent details. Do not \
editorialize. Plain prose only — no markdown, no bold, no italics, no headings, \
no bullets, no lists, no code fences. Return ONLY the merged description — no \
preamble, no labels, no quotes.";
let prompt = format!(
"Entity A: {} [{}]\nDescription: {}\n\nEntity B: {} [{}]\nDescription: {}\n\nMerged description:",
source.name, source.entity_type, source_desc, target.name, target.entity_type, target_desc,
);
let ollama = app_state.ollama.clone();
let model_used = ollama.primary_model.clone();
let proposed = match ollama.generate(&prompt, Some(system)).await {
Ok(out) => {
// Strip the framing models reach for even with explicit
// "no preamble" guidance: leading "Merged description:"
// labels, wrapping quotes, ``` code fences, leading
// bullets / hash headings. Belt-and-braces against the
// system prompt's plain-text directive.
let mut s = out.trim().to_string();
s = s
.trim_start_matches("Merged description:")
.trim_start_matches("Merged Description:")
.trim()
.to_string();
// Code fences (``` or ```text)
s = s
.trim_start_matches("```text")
.trim_start_matches("```markdown")
.trim_start_matches("```")
.trim_end_matches("```")
.trim()
.to_string();
// Markdown headings / bullets at the very start
while let Some(stripped) = s
.strip_prefix('#')
.or_else(|| s.strip_prefix('*'))
.or_else(|| s.strip_prefix('-'))
.or_else(|| s.strip_prefix('>'))
{
s = stripped.trim_start().to_string();
}
// Wrapping quotes
s = s.trim_matches(|c| c == '"' || c == '\'').to_string();
// Inline emphasis: drop standalone `**` / `*` / `__` /
// `_` markers without trying to parse markdown — just
// remove the punctuation. Rare enough that this naive
// replace is fine.
s = s.replace("**", "").replace("__", "");
s
}
Err(e) => {
log::warn!("synthesize_merge generate failed: {:?}", e);
return HttpResponse::ServiceUnavailable().json(serde_json::json!({
"error": "LLM unavailable; the merge picker should fall back to skip-synthesis."
}));
}
};
HttpResponse::Ok().json(SynthesizeMergeResponse {
proposed_description: proposed,
model_used,
})
}
async fn list_facts<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
@@ -535,6 +989,114 @@ async fn list_facts<D: KnowledgeDao + 'static>(
}
}
async fn create_fact<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
body: web::Json<FactCreateRequest>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
if body.object_entity_id.is_none() && body.object_value.is_none() {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": "object_entity_id or object_value is required"
}));
}
if body.predicate.trim().is_empty() {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "predicate must not be empty"}));
}
// Persona scoping: facts are written under the active single persona.
// PersonaFilter::All is read-only ("hive-mind" view); callers should
// pin a specific persona for writes via X-Persona-Id.
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let (user_id, persona_id) = match &persona {
PersonaFilter::Single {
user_id,
persona_id,
} => (*user_id, persona_id.clone()),
PersonaFilter::All { user_id } => (*user_id, "default".to_string()),
};
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
// Verify subject entity exists.
match dao.get_entity_by_id(&cx, body.subject_entity_id) {
Ok(None) => {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": format!("Subject entity {} not found", body.subject_entity_id)
}));
}
Err(e) => {
log::error!("create_fact subject lookup error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
Ok(Some(_)) => {}
}
// Optional object entity validation when supplied.
if let Some(oid) = body.object_entity_id {
match dao.get_entity_by_id(&cx, oid) {
Ok(None) => {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": format!("Object entity {} not found", oid)
}));
}
Err(e) => {
log::error!("create_fact object lookup error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
Ok(Some(_)) => {}
}
}
let now = Utc::now().timestamp();
let confidence = body.confidence.unwrap_or(0.6).clamp(0.0, 0.95);
let insert = InsertEntityFact {
subject_entity_id: body.subject_entity_id,
predicate: body.predicate.trim().to_string(),
object_entity_id: body.object_entity_id,
object_value: body.object_value.clone(),
source_photo: body.source_photo.clone(),
source_insight_id: None,
confidence,
status: "active".to_string(),
created_at: now,
persona_id,
user_id,
valid_from: body.valid_from,
valid_until: body.valid_until,
superseded_by: None,
// Manual creation via curation UI — provenance recorded as
// "manual" with no model, distinguishing user-entered facts
// from agent-generated ones in the audit view.
created_by_model: None,
created_by_backend: Some("manual".to_string()),
last_modified_by_model: None,
last_modified_by_backend: None,
last_modified_at: None,
};
match dao.upsert_fact(&cx, insert) {
Ok((fact, is_new)) => {
let status = if is_new {
actix_web::http::StatusCode::CREATED
} else {
actix_web::http::StatusCode::OK
};
HttpResponse::build(status).json(fact)
}
Err(e) => {
log::error!("create_fact upsert error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn patch_fact<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
@@ -548,10 +1110,15 @@ async fn patch_fact<D: KnowledgeDao + 'static>(
object_value: body.object_value.clone(),
status: body.status.clone(),
confidence: body.confidence,
valid_from: body.valid_from,
valid_until: body.valid_until,
};
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.update_fact(&cx, fact_id, patch) {
// Manual PATCH from the curation UI — provenance stamped as
// "manual" so the audit feed can distinguish human edits from
// agent corrections.
match dao.update_fact(&cx, fact_id, patch, Some(("manual", "manual"))) {
Ok(Some(fact)) => HttpResponse::Ok().json(fact),
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})),
Err(e) => {
@@ -578,6 +1145,51 @@ async fn delete_fact<D: KnowledgeDao + 'static>(
}
}
async fn supersede_fact<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
body: web::Json<SupersedeRequest>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let old_id = id.into_inner();
if old_id == body.by_fact_id {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "old_id and by_fact_id must differ"}));
}
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
// Manual supersede from the curation UI — same stamping rule as
// the PATCH path.
match dao.supersede_fact(&cx, old_id, body.by_fact_id, Some(("manual", "manual"))) {
Ok(Some(fact)) => HttpResponse::Ok().json(fact),
Ok(None) => {
HttpResponse::NotFound().json(serde_json::json!({"error": "Old or new fact not found"}))
}
Err(e) => {
log::error!("supersede_fact error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn restore_fact<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let fact_id = id.into_inner();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.revert_supersession(&cx, fact_id, Some(("manual", "manual"))) {
Ok(Some(fact)) => HttpResponse::Ok().json(fact),
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})),
Err(e) => {
log::error!("restore_fact error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn get_recent<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
@@ -608,3 +1220,157 @@ async fn get_recent<D: KnowledgeDao + 'static>(
}
}
}
async fn get_predicate_stats<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
query: web::Query<PredicateStatsQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let limit = query.limit.unwrap_or(100).clamp(1, 500) as usize;
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.get_predicate_stats(&cx, &persona, limit) {
Ok(rows) => HttpResponse::Ok().json(PredicateStatsResponse {
predicates: rows
.into_iter()
.map(|(predicate, count)| PredicateStat { predicate, count })
.collect(),
}),
Err(e) => {
log::error!("get_predicate_stats error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn bulk_reject_predicate<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
predicate: web::Path<String>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let predicate = predicate.into_inner();
if predicate.trim().is_empty() {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "predicate must not be empty"}));
}
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.bulk_reject_facts_by_predicate(
&cx,
&persona,
&predicate,
Some(("manual", "manual")),
) {
Ok(rejected) => HttpResponse::Ok().json(BulkRejectResponse { rejected }),
Err(e) => {
log::error!("bulk_reject_predicate error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn get_graph<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
query: web::Query<GraphQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let limit = query.limit.unwrap_or(200).clamp(1, 1000) as usize;
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.build_entity_graph(&cx, query.entity_type.as_deref(), limit, &persona) {
Ok(EntityGraph { nodes, edges }) => HttpResponse::Ok().json(GraphResponse {
nodes: nodes
.into_iter()
.map(|n| GraphNodeView {
id: n.id,
name: n.name,
entity_type: n.entity_type,
fact_count: n.fact_count,
})
.collect(),
edges: edges
.into_iter()
.map(|e| GraphEdgeView {
source: e.source,
target: e.target,
predicate: e.predicate,
count: e.count,
})
.collect(),
}),
Err(e) => {
log::error!("build_entity_graph error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn get_consolidation_proposals<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
query: web::Query<ConsolidationQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
// Clamp threshold so a curious client can't drag the cosine
// floor to 0 and pull every entity into one giant cluster.
let threshold = query.threshold.unwrap_or(0.85).clamp(0.5, 0.99);
let max_groups = query.limit.unwrap_or(50).clamp(1, 200) as usize;
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
let groups: Vec<ConsolidationGroup> =
match dao.find_consolidation_proposals(&cx, threshold, max_groups) {
Ok(g) => g,
Err(e) => {
log::error!("find_consolidation_proposals: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Decorate with per-persona fact counts so the curation UI can
// show "default 8 · journal 3" inline and the curator can pick
// which entity is the strongest target.
let entity_ids: Vec<i32> = groups
.iter()
.flat_map(|g| g.entities.iter().map(|e| e.id))
.collect();
let breakdowns = dao
.get_persona_breakdowns_for_entities(&cx, &entity_ids, persona.user_id())
.unwrap_or_default();
let groups_view: Vec<ConsolidationGroupView> = groups
.into_iter()
.map(|g| ConsolidationGroupView {
entities: g
.entities
.into_iter()
.map(|e| {
let id = e.id;
let summary = EntitySummary::from(e);
match breakdowns.get(&id) {
Some(bd) => summary.with_persona_breakdown(bd.clone()),
None => summary,
}
})
.collect(),
min_cosine: g.min_cosine,
max_cosine: g.max_cosine,
})
.collect();
HttpResponse::Ok().json(ConsolidationResponse {
groups: groups_view,
})
}

View File

@@ -35,6 +35,16 @@ pub struct PersonaView {
pub created_at: i64,
#[serde(rename = "updatedAt")]
pub updated_at: i64,
/// "Strict mode" — when true, the agent's recall_* tools return
/// only facts whose status is 'reviewed'. See migration
/// 2026-05-10-000400.
#[serde(rename = "reviewedOnlyFacts")]
pub reviewed_only_facts: bool,
/// Gate for the agent's update_fact / supersede_fact tools.
/// Default false — fresh personas let the agent create but not
/// alter. See migration 2026-05-10-000500.
#[serde(rename = "allowAgentCorrections")]
pub allow_agent_corrections: bool,
}
impl From<Persona> for PersonaView {
@@ -47,6 +57,8 @@ impl From<Persona> for PersonaView {
include_all_memories: p.include_all_memories,
created_at: p.created_at,
updated_at: p.updated_at,
reviewed_only_facts: p.reviewed_only_facts,
allow_agent_corrections: p.allow_agent_corrections,
}
}
}
@@ -72,6 +84,10 @@ pub struct UpdatePersonaRequest {
pub system_prompt: Option<String>,
#[serde(default, rename = "includeAllMemories")]
pub include_all_memories: Option<bool>,
#[serde(default, rename = "reviewedOnlyFacts")]
pub reviewed_only_facts: Option<bool>,
#[serde(default, rename = "allowAgentCorrections")]
pub allow_agent_corrections: Option<bool>,
}
#[derive(Deserialize)]
@@ -225,8 +241,7 @@ async fn update_persona(
// identity. Mirrors the same guard delete_persona enforces below.
match dao.get_persona(&cx, uid, &pid) {
Ok(Some(p)) if p.is_built_in => {
let editing_identity =
body.name.is_some() || body.system_prompt.is_some();
let editing_identity = body.name.is_some() || body.system_prompt.is_some();
if editing_identity {
return HttpResponse::Conflict().json(serde_json::json!({
"error": "Cannot edit name or systemPrompt of a built-in persona"
@@ -249,6 +264,8 @@ async fn update_persona(
name: body.name.clone(),
system_prompt: body.system_prompt.clone(),
include_all_memories: body.include_all_memories,
reviewed_only_facts: body.reviewed_only_facts,
allow_agent_corrections: body.allow_agent_corrections,
};
match dao.update_persona(&cx, uid, &pid, patch) {
Ok(Some(p)) => HttpResponse::Ok().json(PersonaView::from(p)),

View File

@@ -207,6 +207,9 @@ impl Default for AppState {
Arc::new(Mutex::new(Box::new(SqliteTagDao::default())));
let knowledge_dao: Arc<Mutex<Box<dyn KnowledgeDao>>> =
Arc::new(Mutex::new(Box::new(SqliteKnowledgeDao::new())));
let persona_dao: Arc<Mutex<Box<dyn crate::database::PersonaDao>>> = Arc::new(Mutex::new(
Box::new(crate::database::SqlitePersonaDao::new()),
));
let face_dao: Arc<Mutex<Box<dyn faces::FaceDao>>> =
Arc::new(Mutex::new(Box::new(faces::SqliteFaceDao::new())));
@@ -236,6 +239,7 @@ impl Default for AppState {
tag_dao.clone(),
face_dao.clone(),
knowledge_dao,
persona_dao,
libraries_vec.clone(),
);
@@ -352,6 +356,9 @@ impl AppState {
Arc::new(Mutex::new(Box::new(SqliteTagDao::default())));
let knowledge_dao: Arc<Mutex<Box<dyn KnowledgeDao>>> =
Arc::new(Mutex::new(Box::new(SqliteKnowledgeDao::new())));
let persona_dao: Arc<Mutex<Box<dyn crate::database::PersonaDao>>> = Arc::new(Mutex::new(
Box::new(crate::database::SqlitePersonaDao::new()),
));
let face_dao: Arc<Mutex<Box<dyn faces::FaceDao>>> =
Arc::new(Mutex::new(Box::new(faces::SqliteFaceDao::new())));
@@ -378,6 +385,7 @@ impl AppState {
tag_dao.clone(),
face_dao.clone(),
knowledge_dao,
persona_dao,
vec![test_lib],
);