Files
ImageApi/src/knowledge.rs
Cameron Cordes e67e00ef8a knowledge: predicate-quality nudge + bulk-reject endpoint
Two coupled changes to fight the speech-act-predicate problem
(facts like (Cameron, expressed, "I'm tempted to...")):

1. System prompt grows an explicit predicate-quality rule. The
   agent is told to use relationship-shaped verbs (lives_in,
   works_at, attended, is_friend_of, interested_in), and is
   given an explicit DON'T list (expressed, said, mentioned,
   stated, quoted, noted, discussed, thought, wondered). Plus a
   concrete Bad / Good example contrasting the noise pattern
   with the structured paraphrase the agent should be writing.
   Stops the bleed for new insights.

2. Cleanup tools for the legacy noise that's already in the
   table:
   - get_predicate_stats(persona, limit) returns
     [(predicate, count)] sorted desc — feeds the curation UI's
     PREDICATES tab.
   - bulk_reject_facts_by_predicate(persona, predicate, audit)
     flips every ACTIVE fact under that predicate to 'rejected'
     in one transaction, stamping last_modified_* so the action
     is attributable + reversible per-fact through the entity
     detail panel. REVIEWED facts under the same predicate are
     left alone — the curator may have hand-approved an
     exception ("interested_in" might be largely noise but a
     reviewed entry is intentional).

New HTTP endpoints:
   GET  /knowledge/predicate-stats?limit=
   POST /knowledge/predicates/{predicate}/bulk-reject

Persona-scoped via the existing X-Persona-Id header.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:50:26 -04:00

1377 lines
48 KiB
Rust

use actix_web::dev::{ServiceFactory, ServiceRequest};
use actix_web::{App, HttpRequest, HttpResponse, Responder, web};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use std::sync::Mutex;
use crate::data::Claims;
use crate::database::models::{Entity, EntityFact, EntityPhotoLink, InsertEntityFact};
use crate::database::{
ConsolidationGroup, EntityFilter, EntityGraph, EntityPatch, EntitySort, FactFilter, FactPatch,
KnowledgeDao, PersonaFilter, RecentActivity,
};
use crate::personas::PersonaDaoData;
use crate::state::AppState;
/// Resolve the `X-Persona-Id` header into a `PersonaFilter`. Missing
/// header → `'default'`. If the persona has `include_all_memories=true`,
/// returns `PersonaFilter::All` so reads see the full hive-mind pool.
/// On JWT-parse failure (sub is not a numeric user_id) the resolver
/// falls through to user_id=1 — the operator convention for service
/// tokens — preserving the historical baseline view. Same fallback
/// applies on any persona-lookup error.
fn resolve_persona_filter(
req: &HttpRequest,
claims: &Claims,
persona_dao: &PersonaDaoData,
) -> PersonaFilter {
let pid = req
.headers()
.get("X-Persona-Id")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
.unwrap_or_else(|| "default".to_string());
let uid = claims.sub.parse::<i32>().unwrap_or(1);
let cx = opentelemetry::Context::current();
let mut dao = persona_dao.lock().expect("Unable to lock PersonaDao");
match dao.get_persona(&cx, uid, &pid) {
Ok(Some(p)) if p.include_all_memories => PersonaFilter::All { user_id: uid },
_ => PersonaFilter::Single {
user_id: uid,
persona_id: pid,
},
}
}
// ---------------------------------------------------------------------------
// Request / Response types
// ---------------------------------------------------------------------------
#[derive(Serialize)]
pub struct EntitySummary {
pub id: i32,
pub name: String,
pub entity_type: String,
pub description: String,
pub confidence: f32,
pub status: String,
pub created_at: i64,
pub updated_at: i64,
/// Persona-scoped count of non-rejected facts about this entity
/// (subject side). 0 when not provided by the call site, e.g.
/// PATCH responses return the bare entity without scoping context.
#[serde(skip_serializing_if = "Option::is_none")]
pub fact_count: Option<i64>,
/// Per-persona breakdown of fact counts for this entity, scoped
/// to the active user. Lets the curation UI surface "this entity
/// is empty under your active persona but has 12 facts in
/// journal" so you know which persona owns the existing
/// knowledge. Skipped on serialization when None.
#[serde(skip_serializing_if = "Option::is_none")]
pub persona_breakdown: Option<Vec<PersonaCount>>,
}
#[derive(Serialize)]
pub struct PersonaCount {
pub persona_id: String,
pub count: i64,
}
impl From<Entity> for EntitySummary {
fn from(e: Entity) -> Self {
EntitySummary {
id: e.id,
name: e.name,
entity_type: e.entity_type,
description: e.description,
confidence: e.confidence,
status: e.status,
created_at: e.created_at,
updated_at: e.updated_at,
fact_count: None,
persona_breakdown: None,
}
}
}
impl EntitySummary {
fn from_entity_with_count(e: Entity, fact_count: i64) -> Self {
let mut s = EntitySummary::from(e);
s.fact_count = Some(fact_count);
s
}
fn with_persona_breakdown(mut self, breakdown: Vec<(String, i64)>) -> Self {
self.persona_breakdown = Some(
breakdown
.into_iter()
.map(|(persona_id, count)| PersonaCount { persona_id, count })
.collect(),
);
self
}
}
#[derive(Serialize)]
pub struct EntityListResponse {
pub entities: Vec<EntitySummary>,
pub total: i64,
pub limit: i64,
pub offset: i64,
}
#[derive(Serialize)]
pub struct FactDetail {
pub id: i32,
pub predicate: String,
pub object_entity_id: Option<i32>,
pub object_entity_name: Option<String>,
pub object_value: Option<String>,
pub confidence: f32,
pub status: String,
pub source_photo: Option<String>,
pub source_insight_id: Option<i32>,
pub created_at: i64,
/// Real-world valid-time interval. NULL on either side means
/// unbounded; both NULL = "always true" / validity unknown.
/// Distinct from `created_at` (transaction time — when we
/// recorded it). See migration 2026-05-10-000100.
pub valid_from: Option<i64>,
pub valid_until: Option<i64>,
/// Points at the entity_facts.id that replaced this one (Phase 2
/// supersession, migration 2026-05-10-000200). Only set when
/// status == 'superseded'.
pub superseded_by: Option<i32>,
/// Provenance — see migration 2026-05-10-000300. NULL on legacy
/// rows. `created_by_backend` is "local" / "hybrid" / "manual".
pub created_by_model: Option<String>,
pub created_by_backend: Option<String>,
/// Audit trail — see migration 2026-05-10-000500. Set on any
/// post-creation mutation. NULL on rows that have never been
/// touched after they were first written.
pub last_modified_by_model: Option<String>,
pub last_modified_by_backend: Option<String>,
pub last_modified_at: Option<i64>,
/// Set when another active fact has the same subject+predicate,
/// a different object, AND their valid-time intervals overlap.
/// Detected at read time by the get_entity handler grouping
/// facts by predicate. Some predicates are legitimately
/// multi-valued ("tagged_in", "friend_of") so this is a *signal*
/// for the curator, not a hard invariant. The interval check
/// keeps "lives_in NYC 2018-2020" + "lives_in SF 2020-present"
/// from false-positive flagging.
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub in_conflict: bool,
}
#[derive(Serialize)]
pub struct PhotoLinkDetail {
pub library_id: i32,
pub file_path: String,
pub role: String,
}
impl From<EntityPhotoLink> for PhotoLinkDetail {
fn from(l: EntityPhotoLink) -> Self {
PhotoLinkDetail {
library_id: l.library_id,
file_path: l.file_path,
role: l.role,
}
}
}
#[derive(Serialize)]
pub struct EntityDetailResponse {
pub id: i32,
pub name: String,
pub entity_type: String,
pub description: String,
pub confidence: f32,
pub status: String,
pub created_at: i64,
pub updated_at: i64,
pub facts: Vec<FactDetail>,
pub photo_links: Vec<PhotoLinkDetail>,
/// Per-persona fact counts for the active user. Mirrors the
/// same field on EntitySummary; the detail panel surfaces a
/// clickable list so the curator can switch to the persona
/// that owns existing facts about this entity.
pub persona_breakdown: Vec<PersonaCount>,
}
#[derive(Serialize)]
pub struct FactSummary {
pub id: i32,
pub subject_entity_id: i32,
pub subject_entity_name: Option<String>,
pub predicate: String,
pub object_entity_id: Option<i32>,
pub object_entity_name: Option<String>,
pub object_value: Option<String>,
pub confidence: f32,
pub status: String,
pub source_photo: Option<String>,
pub source_insight_id: Option<i32>,
pub created_at: i64,
}
#[derive(Serialize)]
pub struct FactListResponse {
pub facts: Vec<FactSummary>,
pub total: i64,
pub limit: i64,
pub offset: i64,
}
#[derive(Deserialize)]
pub struct MergeRequest {
pub source_id: i32,
pub target_id: i32,
}
#[derive(Serialize)]
pub struct MergeResponse {
pub merged_entity_id: i32,
pub deleted_entity_id: i32,
pub facts_transferred: i64,
pub links_transferred: i64,
}
#[derive(Deserialize)]
pub struct EntityPatchRequest {
pub name: Option<String>,
pub description: Option<String>,
pub status: Option<String>,
pub confidence: Option<f32>,
}
/// Serde helper for the "tri-state" pattern: distinguish "field
/// omitted" from "field sent as null". Used for nullable columns
/// where we want PATCH to support both "leave alone" and "set NULL".
fn deserialize_optional_nullable_i64<'de, D>(d: D) -> Result<Option<Option<i64>>, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(Some(Option::<i64>::deserialize(d)?))
}
#[derive(Deserialize)]
pub struct FactPatchRequest {
pub predicate: Option<String>,
pub object_value: Option<String>,
pub status: Option<String>,
pub confidence: Option<f32>,
/// Tri-state: missing = leave alone, null = clear to NULL, number
/// = set. See `deserialize_optional_nullable_i64`.
#[serde(default, deserialize_with = "deserialize_optional_nullable_i64")]
pub valid_from: Option<Option<i64>>,
#[serde(default, deserialize_with = "deserialize_optional_nullable_i64")]
pub valid_until: Option<Option<i64>>,
}
#[derive(Deserialize)]
pub struct SupersedeRequest {
/// The id of the new fact that replaces the path-params one.
pub by_fact_id: i32,
}
#[derive(Deserialize)]
pub struct SynthesizeMergeRequest {
pub source_id: i32,
pub target_id: i32,
}
#[derive(serde::Serialize)]
pub struct SynthesizeMergeResponse {
pub proposed_description: String,
pub model_used: String,
}
#[derive(Deserialize)]
pub struct FactCreateRequest {
pub subject_entity_id: i32,
pub predicate: String,
pub object_entity_id: Option<i32>,
pub object_value: Option<String>,
pub source_photo: Option<String>,
pub confidence: Option<f32>,
pub valid_from: Option<i64>,
pub valid_until: Option<i64>,
}
#[derive(Deserialize)]
pub struct EntityListQuery {
#[serde(rename = "type")]
pub entity_type: Option<String>,
pub status: Option<String>,
pub search: Option<String>,
/// "updated" (default) | "name" | "count". `count` is persona-scoped
/// via the X-Persona-Id header.
pub sort: Option<String>,
pub limit: Option<i64>,
pub offset: Option<i64>,
}
#[derive(Deserialize)]
pub struct FactListQuery {
pub entity_id: Option<i32>,
pub status: Option<String>,
pub predicate: Option<String>,
pub limit: Option<i64>,
pub offset: Option<i64>,
}
#[derive(Deserialize)]
pub struct RecentQuery {
pub since: Option<i64>,
pub limit: Option<i64>,
}
#[derive(Deserialize)]
pub struct GraphQuery {
#[serde(rename = "type")]
pub entity_type: Option<String>,
pub limit: Option<i64>,
}
#[derive(Serialize)]
pub struct GraphNodeView {
pub id: i32,
pub name: String,
pub entity_type: String,
pub fact_count: i64,
}
#[derive(Serialize)]
pub struct GraphEdgeView {
pub source: i32,
pub target: i32,
pub predicate: String,
pub count: i64,
}
#[derive(Serialize)]
pub struct GraphResponse {
pub nodes: Vec<GraphNodeView>,
pub edges: Vec<GraphEdgeView>,
}
#[derive(Deserialize)]
pub struct PredicateStatsQuery {
pub limit: Option<i64>,
}
#[derive(Serialize)]
pub struct PredicateStat {
pub predicate: String,
pub count: i64,
}
#[derive(Serialize)]
pub struct PredicateStatsResponse {
pub predicates: Vec<PredicateStat>,
}
#[derive(Serialize)]
pub struct BulkRejectResponse {
pub rejected: usize,
}
#[derive(Deserialize)]
pub struct ConsolidationQuery {
/// Cosine threshold for clustering. Default 0.85 — looser than
/// the upsert-time guard (0.92) so this view surfaces "probably
/// same" pairs for human review.
pub threshold: Option<f32>,
pub limit: Option<i64>,
}
#[derive(Serialize)]
pub struct ConsolidationGroupView {
pub entities: Vec<EntitySummary>,
pub min_cosine: f32,
pub max_cosine: f32,
}
#[derive(Serialize)]
pub struct ConsolidationResponse {
pub groups: Vec<ConsolidationGroupView>,
}
// ---------------------------------------------------------------------------
// Service registration
// ---------------------------------------------------------------------------
pub fn add_knowledge_services<T, D: KnowledgeDao + 'static>(app: App<T>) -> App<T>
where
T: ServiceFactory<ServiceRequest, Config = (), Error = actix_web::Error, InitError = ()>,
{
app.service(
web::scope("/knowledge")
.service(web::resource("/entities").route(web::get().to(list_entities::<D>)))
.service(web::resource("/entities/merge").route(web::post().to(merge_entities::<D>)))
.service(
web::resource("/entities/synthesize-merge")
.route(web::post().to(synthesize_merge::<D>)),
)
.service(
web::resource("/entities/{id}")
.route(web::get().to(get_entity::<D>))
.route(web::patch().to(patch_entity::<D>))
.route(web::delete().to(delete_entity::<D>)),
)
.service(
web::resource("/facts")
.route(web::get().to(list_facts::<D>))
.route(web::post().to(create_fact::<D>)),
)
.service(
web::resource("/facts/{id}")
.route(web::patch().to(patch_fact::<D>))
.route(web::delete().to(delete_fact::<D>)),
)
.service(
web::resource("/facts/{id}/supersede").route(web::post().to(supersede_fact::<D>)),
)
.service(web::resource("/facts/{id}/restore").route(web::post().to(restore_fact::<D>)))
.service(web::resource("/recent").route(web::get().to(get_recent::<D>)))
.service(
web::resource("/consolidation-proposals")
.route(web::get().to(get_consolidation_proposals::<D>)),
)
.service(web::resource("/graph").route(web::get().to(get_graph::<D>)))
.service(
web::resource("/predicate-stats")
.route(web::get().to(get_predicate_stats::<D>)),
)
.service(
web::resource("/predicates/{predicate}/bulk-reject")
.route(web::post().to(bulk_reject_predicate::<D>)),
),
)
}
// ---------------------------------------------------------------------------
// Handlers
// ---------------------------------------------------------------------------
async fn list_entities<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
query: web::Query<EntityListQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let limit = query.limit.unwrap_or(50).min(200);
let offset = query.offset.unwrap_or(0);
let status_filter = match query.status.as_deref() {
None | Some("active") => Some("active".to_string()),
Some("all") => None,
Some(s) => Some(s.to_string()),
};
let sort = match query.sort.as_deref() {
Some("name") => EntitySort::NameAsc,
Some("count") => EntitySort::FactCountDesc,
// "updated" or anything else falls through to the default.
_ => EntitySort::UpdatedDesc,
};
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let filter = EntityFilter {
entity_type: query.entity_type.clone(),
status: status_filter,
search: query.search.clone(),
limit,
offset,
};
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.list_entities_with_fact_counts(&cx, filter, sort, &persona) {
Ok((pairs, total)) => {
// Batch fetch persona breakdowns so the list-row tooltip
// and detail panel can show "0 here · 12 in journal".
// One extra query for the visible page.
let entity_ids: Vec<i32> = pairs.iter().map(|(e, _)| e.id).collect();
let breakdowns = dao
.get_persona_breakdowns_for_entities(&cx, &entity_ids, persona.user_id())
.unwrap_or_default();
let summaries: Vec<EntitySummary> = pairs
.into_iter()
.map(|(e, c)| {
let entity_id = e.id;
let summary = EntitySummary::from_entity_with_count(e, c);
match breakdowns.get(&entity_id) {
Some(bd) => summary.with_persona_breakdown(bd.clone()),
None => summary,
}
})
.collect();
HttpResponse::Ok().json(EntityListResponse {
entities: summaries,
total,
limit,
offset,
})
}
Err(e) => {
log::error!("list_entities error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn get_entity<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
id: web::Path<i32>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current();
let entity_id = id.into_inner();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
let entity = match dao.get_entity_by_id(&cx, entity_id) {
Ok(Some(e)) => e,
Ok(None) => {
return HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"}));
}
Err(e) => {
log::error!("get_entity error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Fetch all facts (all statuses for audit), scoped to the active persona.
let raw_facts: Vec<EntityFact> = match dao.get_facts_for_entity(&cx, entity_id, &persona) {
Ok(f) => f,
Err(e) => {
log::error!("get_facts_for_entity error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Resolve object entity names
let mut facts = Vec::with_capacity(raw_facts.len());
for f in raw_facts {
let object_entity_name = if let Some(oid) = f.object_entity_id {
dao.get_entity_by_id(&cx, oid)
.ok()
.flatten()
.map(|e| e.name)
} else {
None
};
facts.push(FactDetail {
id: f.id,
predicate: f.predicate,
object_entity_id: f.object_entity_id,
object_entity_name,
object_value: f.object_value,
confidence: f.confidence,
status: f.status,
source_photo: f.source_photo,
source_insight_id: f.source_insight_id,
created_at: f.created_at,
valid_from: f.valid_from,
valid_until: f.valid_until,
superseded_by: f.superseded_by,
created_by_model: f.created_by_model,
created_by_backend: f.created_by_backend,
last_modified_by_model: f.last_modified_by_model,
last_modified_by_backend: f.last_modified_by_backend,
last_modified_at: f.last_modified_at,
in_conflict: false,
});
}
// Conflict detection: within the active set, group by predicate;
// for each pair within a group that disagrees on the object,
// flag both only if their valid-time intervals overlap. NULL on
// either bound treats that side as unbounded — a fact with no
// valid-time data still flags against any time period (worst case
// for legacy data; user adds dates to suppress).
fn intervals_overlap(a: (Option<i64>, Option<i64>), b: (Option<i64>, Option<i64>)) -> bool {
let a_lo = a.0.unwrap_or(i64::MIN);
let a_hi = a.1.unwrap_or(i64::MAX);
let b_lo = b.0.unwrap_or(i64::MIN);
let b_hi = b.1.unwrap_or(i64::MAX);
a_lo < b_hi && b_lo < a_hi
}
{
use std::collections::{HashMap, HashSet};
let mut by_predicate: HashMap<String, Vec<usize>> = HashMap::new();
for (idx, f) in facts.iter().enumerate() {
if f.status == "active" {
by_predicate
.entry(f.predicate.clone())
.or_default()
.push(idx);
}
}
let mut to_flag: HashSet<usize> = HashSet::new();
for indices in by_predicate.values() {
if indices.len() < 2 {
continue;
}
for (a_pos, &i) in indices.iter().enumerate() {
for &j in &indices[a_pos + 1..] {
let same_object = facts[i].object_entity_id == facts[j].object_entity_id
&& facts[i].object_value == facts[j].object_value;
if same_object {
continue;
}
if intervals_overlap(
(facts[i].valid_from, facts[i].valid_until),
(facts[j].valid_from, facts[j].valid_until),
) {
to_flag.insert(i);
to_flag.insert(j);
}
}
}
}
for i in to_flag {
facts[i].in_conflict = true;
}
}
// Fetch photo links
let photo_links: Vec<PhotoLinkDetail> = match dao.get_links_for_entity(&cx, entity_id) {
Ok(links) => links.into_iter().map(PhotoLinkDetail::from).collect(),
Err(e) => {
log::error!("get_links_for_entity error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Per-persona breakdown for the detail panel's "facts live in
// {persona}" block — same data the list-row tooltip reads. One
// query, single entity in scope.
let persona_breakdown: Vec<PersonaCount> = dao
.get_persona_breakdowns_for_entities(&cx, &[entity_id], persona.user_id())
.ok()
.and_then(|mut map| map.remove(&entity_id))
.unwrap_or_default()
.into_iter()
.map(|(persona_id, count)| PersonaCount { persona_id, count })
.collect();
HttpResponse::Ok().json(EntityDetailResponse {
id: entity.id,
name: entity.name,
entity_type: entity.entity_type,
description: entity.description,
confidence: entity.confidence,
status: entity.status,
created_at: entity.created_at,
updated_at: entity.updated_at,
facts,
photo_links,
persona_breakdown,
})
}
async fn patch_entity<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
body: web::Json<EntityPatchRequest>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let entity_id = id.into_inner();
let patch = EntityPatch {
name: body.name.clone(),
description: body.description.clone(),
status: body.status.clone(),
confidence: body.confidence,
};
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.update_entity(&cx, entity_id, patch) {
Ok(Some(entity)) => HttpResponse::Ok().json(EntitySummary::from(entity)),
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"})),
Err(e) => {
log::error!("patch_entity error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn delete_entity<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let entity_id = id.into_inner();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
// Verify entity exists before deleting
match dao.get_entity_by_id(&cx, entity_id) {
Ok(None) => {
return HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"}));
}
Err(e) => {
log::error!("delete_entity lookup error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
Ok(Some(_)) => {}
}
match dao.delete_entity(&cx, entity_id) {
Ok(()) => HttpResponse::NoContent().finish(),
Err(e) => {
log::error!("delete_entity error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn merge_entities<D: KnowledgeDao + 'static>(
_claims: Claims,
body: web::Json<MergeRequest>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
if body.source_id == body.target_id {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "source_id and target_id must be different"}));
}
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
// Verify both entities exist
for id in [body.source_id, body.target_id] {
match dao.get_entity_by_id(&cx, id) {
Ok(None) => {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": format!("Entity {} not found", id)}));
}
Err(e) => {
log::error!("merge_entities lookup error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
Ok(Some(_)) => {}
}
}
match dao.merge_entities(&cx, body.source_id, body.target_id) {
Ok((facts_transferred, links_transferred)) => HttpResponse::Ok().json(MergeResponse {
merged_entity_id: body.target_id,
deleted_entity_id: body.source_id,
facts_transferred,
links_transferred,
}),
Err(e) => {
log::error!("merge_entities error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
/// Preview a merged-description before the actual merge fires. Calls
/// the local Ollama with both entities' names + descriptions and
/// returns a synthesized rewrite that combines them. The curator
/// previews, edits, and either accepts (PATCH target's description
/// then POST /merge) or skips (just /merge as-is).
///
/// Deliberately doesn't touch the database — read-only on entities,
/// no LLM call gets to write anything. If the model is unavailable
/// the handler returns 503 so the UI can degrade gracefully (skip
/// the preview, fall back to the existing merge action).
async fn synthesize_merge<D: KnowledgeDao + 'static>(
_claims: Claims,
body: web::Json<SynthesizeMergeRequest>,
dao: web::Data<Mutex<D>>,
app_state: web::Data<AppState>,
) -> impl Responder {
if body.source_id == body.target_id {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "source_id and target_id must differ"}));
}
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
let source = match dao.get_entity_by_id(&cx, body.source_id) {
Ok(Some(e)) => e,
Ok(None) => {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "source entity not found"}));
}
Err(e) => {
log::error!("synthesize_merge source lookup: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
let target = match dao.get_entity_by_id(&cx, body.target_id) {
Ok(Some(e)) => e,
Ok(None) => {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "target entity not found"}));
}
Err(e) => {
log::error!("synthesize_merge target lookup: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Drop the DAO lock before the LLM call — the generate request
// is the slow part (seconds) and we don't want to block other
// knowledge reads while it runs.
drop(dao);
let source_desc = if source.description.trim().is_empty() {
"(none)".to_string()
} else {
source.description.clone()
};
let target_desc = if target.description.trim().is_empty() {
"(none)".to_string()
} else {
target.description.clone()
};
let system = "You are condensing two stored entity descriptions into one. The two \
entities refer to the same real-world thing and are about to be merged. Write a \
single neutral third-person description (1-2 sentences, max 300 chars) that \
preserves any concrete facts in either source. Do not invent details. Do not \
editorialize. Plain prose only — no markdown, no bold, no italics, no headings, \
no bullets, no lists, no code fences. Return ONLY the merged description — no \
preamble, no labels, no quotes.";
let prompt = format!(
"Entity A: {} [{}]\nDescription: {}\n\nEntity B: {} [{}]\nDescription: {}\n\nMerged description:",
source.name, source.entity_type, source_desc, target.name, target.entity_type, target_desc,
);
let ollama = app_state.ollama.clone();
let model_used = ollama.primary_model.clone();
let proposed = match ollama.generate(&prompt, Some(system)).await {
Ok(out) => {
// Strip the framing models reach for even with explicit
// "no preamble" guidance: leading "Merged description:"
// labels, wrapping quotes, ``` code fences, leading
// bullets / hash headings. Belt-and-braces against the
// system prompt's plain-text directive.
let mut s = out.trim().to_string();
s = s
.trim_start_matches("Merged description:")
.trim_start_matches("Merged Description:")
.trim()
.to_string();
// Code fences (``` or ```text)
s = s
.trim_start_matches("```text")
.trim_start_matches("```markdown")
.trim_start_matches("```")
.trim_end_matches("```")
.trim()
.to_string();
// Markdown headings / bullets at the very start
while let Some(stripped) = s
.strip_prefix('#')
.or_else(|| s.strip_prefix('*'))
.or_else(|| s.strip_prefix('-'))
.or_else(|| s.strip_prefix('>'))
{
s = stripped.trim_start().to_string();
}
// Wrapping quotes
s = s.trim_matches(|c| c == '"' || c == '\'').to_string();
// Inline emphasis: drop standalone `**` / `*` / `__` /
// `_` markers without trying to parse markdown — just
// remove the punctuation. Rare enough that this naive
// replace is fine.
s = s.replace("**", "").replace("__", "");
s
}
Err(e) => {
log::warn!("synthesize_merge generate failed: {:?}", e);
return HttpResponse::ServiceUnavailable().json(serde_json::json!({
"error": "LLM unavailable; the merge picker should fall back to skip-synthesis."
}));
}
};
HttpResponse::Ok().json(SynthesizeMergeResponse {
proposed_description: proposed,
model_used,
})
}
async fn list_facts<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
query: web::Query<FactListQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let limit = query.limit.unwrap_or(50).min(200);
let offset = query.offset.unwrap_or(0);
let status_filter = match query.status.as_deref() {
None | Some("active") => Some("active".to_string()),
Some("all") => None,
Some(s) => Some(s.to_string()),
};
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let filter = FactFilter {
entity_id: query.entity_id,
status: status_filter,
predicate: query.predicate.clone(),
persona,
limit,
offset,
};
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.list_facts(&cx, filter) {
Ok((facts, total)) => {
let mut summaries = Vec::with_capacity(facts.len());
for f in facts {
let subject_entity_name = dao
.get_entity_by_id(&cx, f.subject_entity_id)
.ok()
.flatten()
.map(|e| e.name);
let object_entity_name = if let Some(oid) = f.object_entity_id {
dao.get_entity_by_id(&cx, oid)
.ok()
.flatten()
.map(|e| e.name)
} else {
None
};
summaries.push(FactSummary {
id: f.id,
subject_entity_id: f.subject_entity_id,
subject_entity_name,
predicate: f.predicate,
object_entity_id: f.object_entity_id,
object_entity_name,
object_value: f.object_value,
confidence: f.confidence,
status: f.status,
source_photo: f.source_photo,
source_insight_id: f.source_insight_id,
created_at: f.created_at,
});
}
HttpResponse::Ok().json(FactListResponse {
facts: summaries,
total,
limit,
offset,
})
}
Err(e) => {
log::error!("list_facts error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn create_fact<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
body: web::Json<FactCreateRequest>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
if body.object_entity_id.is_none() && body.object_value.is_none() {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": "object_entity_id or object_value is required"
}));
}
if body.predicate.trim().is_empty() {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "predicate must not be empty"}));
}
// Persona scoping: facts are written under the active single persona.
// PersonaFilter::All is read-only ("hive-mind" view); callers should
// pin a specific persona for writes via X-Persona-Id.
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let (user_id, persona_id) = match &persona {
PersonaFilter::Single {
user_id,
persona_id,
} => (*user_id, persona_id.clone()),
PersonaFilter::All { user_id } => (*user_id, "default".to_string()),
};
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
// Verify subject entity exists.
match dao.get_entity_by_id(&cx, body.subject_entity_id) {
Ok(None) => {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": format!("Subject entity {} not found", body.subject_entity_id)
}));
}
Err(e) => {
log::error!("create_fact subject lookup error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
Ok(Some(_)) => {}
}
// Optional object entity validation when supplied.
if let Some(oid) = body.object_entity_id {
match dao.get_entity_by_id(&cx, oid) {
Ok(None) => {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": format!("Object entity {} not found", oid)
}));
}
Err(e) => {
log::error!("create_fact object lookup error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
Ok(Some(_)) => {}
}
}
let now = Utc::now().timestamp();
let confidence = body.confidence.unwrap_or(0.6).clamp(0.0, 0.95);
let insert = InsertEntityFact {
subject_entity_id: body.subject_entity_id,
predicate: body.predicate.trim().to_string(),
object_entity_id: body.object_entity_id,
object_value: body.object_value.clone(),
source_photo: body.source_photo.clone(),
source_insight_id: None,
confidence,
status: "active".to_string(),
created_at: now,
persona_id,
user_id,
valid_from: body.valid_from,
valid_until: body.valid_until,
superseded_by: None,
// Manual creation via curation UI — provenance recorded as
// "manual" with no model, distinguishing user-entered facts
// from agent-generated ones in the audit view.
created_by_model: None,
created_by_backend: Some("manual".to_string()),
last_modified_by_model: None,
last_modified_by_backend: None,
last_modified_at: None,
};
match dao.upsert_fact(&cx, insert) {
Ok((fact, is_new)) => {
let status = if is_new {
actix_web::http::StatusCode::CREATED
} else {
actix_web::http::StatusCode::OK
};
HttpResponse::build(status).json(fact)
}
Err(e) => {
log::error!("create_fact upsert error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn patch_fact<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
body: web::Json<FactPatchRequest>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let fact_id = id.into_inner();
let patch = FactPatch {
predicate: body.predicate.clone(),
object_value: body.object_value.clone(),
status: body.status.clone(),
confidence: body.confidence,
valid_from: body.valid_from,
valid_until: body.valid_until,
};
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
// Manual PATCH from the curation UI — provenance stamped as
// "manual" so the audit feed can distinguish human edits from
// agent corrections.
match dao.update_fact(&cx, fact_id, patch, Some(("manual", "manual"))) {
Ok(Some(fact)) => HttpResponse::Ok().json(fact),
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})),
Err(e) => {
log::error!("patch_fact error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn delete_fact<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let fact_id = id.into_inner();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.delete_fact(&cx, fact_id) {
Ok(()) => HttpResponse::NoContent().finish(),
Err(e) => {
log::warn!("delete_fact({}) error: {:?}", fact_id, e);
HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"}))
}
}
}
async fn supersede_fact<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
body: web::Json<SupersedeRequest>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let old_id = id.into_inner();
if old_id == body.by_fact_id {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "old_id and by_fact_id must differ"}));
}
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
// Manual supersede from the curation UI — same stamping rule as
// the PATCH path.
match dao.supersede_fact(&cx, old_id, body.by_fact_id, Some(("manual", "manual"))) {
Ok(Some(fact)) => HttpResponse::Ok().json(fact),
Ok(None) => {
HttpResponse::NotFound().json(serde_json::json!({"error": "Old or new fact not found"}))
}
Err(e) => {
log::error!("supersede_fact error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn restore_fact<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let fact_id = id.into_inner();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.revert_supersession(&cx, fact_id, Some(("manual", "manual"))) {
Ok(Some(fact)) => HttpResponse::Ok().json(fact),
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})),
Err(e) => {
log::error!("restore_fact error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn get_recent<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
query: web::Query<RecentQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let since = query
.since
.unwrap_or_else(|| Utc::now().timestamp() - 86400);
let limit = query.limit.unwrap_or(20).min(100);
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.get_recent_activity(&cx, since, limit, &persona) {
Ok(RecentActivity { entities, facts }) => {
let entity_summaries: Vec<EntitySummary> =
entities.into_iter().map(EntitySummary::from).collect();
HttpResponse::Ok().json(serde_json::json!({
"entities": entity_summaries,
"facts": facts
}))
}
Err(e) => {
log::error!("get_recent error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn get_predicate_stats<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
query: web::Query<PredicateStatsQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let limit = query.limit.unwrap_or(100).clamp(1, 500) as usize;
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.get_predicate_stats(&cx, &persona, limit) {
Ok(rows) => HttpResponse::Ok().json(PredicateStatsResponse {
predicates: rows
.into_iter()
.map(|(predicate, count)| PredicateStat { predicate, count })
.collect(),
}),
Err(e) => {
log::error!("get_predicate_stats error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn bulk_reject_predicate<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
predicate: web::Path<String>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let predicate = predicate.into_inner();
if predicate.trim().is_empty() {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "predicate must not be empty"}));
}
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.bulk_reject_facts_by_predicate(
&cx,
&persona,
&predicate,
Some(("manual", "manual")),
) {
Ok(rejected) => HttpResponse::Ok().json(BulkRejectResponse { rejected }),
Err(e) => {
log::error!("bulk_reject_predicate error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn get_graph<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
query: web::Query<GraphQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let limit = query.limit.unwrap_or(200).clamp(1, 1000) as usize;
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.build_entity_graph(&cx, query.entity_type.as_deref(), limit, &persona) {
Ok(EntityGraph { nodes, edges }) => HttpResponse::Ok().json(GraphResponse {
nodes: nodes
.into_iter()
.map(|n| GraphNodeView {
id: n.id,
name: n.name,
entity_type: n.entity_type,
fact_count: n.fact_count,
})
.collect(),
edges: edges
.into_iter()
.map(|e| GraphEdgeView {
source: e.source,
target: e.target,
predicate: e.predicate,
count: e.count,
})
.collect(),
}),
Err(e) => {
log::error!("build_entity_graph error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn get_consolidation_proposals<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
query: web::Query<ConsolidationQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
// Clamp threshold so a curious client can't drag the cosine
// floor to 0 and pull every entity into one giant cluster.
let threshold = query.threshold.unwrap_or(0.85).clamp(0.5, 0.99);
let max_groups = query.limit.unwrap_or(50).clamp(1, 200) as usize;
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
let groups: Vec<ConsolidationGroup> =
match dao.find_consolidation_proposals(&cx, threshold, max_groups) {
Ok(g) => g,
Err(e) => {
log::error!("find_consolidation_proposals: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Decorate with per-persona fact counts so the curation UI can
// show "default 8 · journal 3" inline and the curator can pick
// which entity is the strongest target.
let entity_ids: Vec<i32> = groups
.iter()
.flat_map(|g| g.entities.iter().map(|e| e.id))
.collect();
let breakdowns = dao
.get_persona_breakdowns_for_entities(&cx, &entity_ids, persona.user_id())
.unwrap_or_default();
let groups_view: Vec<ConsolidationGroupView> = groups
.into_iter()
.map(|g| ConsolidationGroupView {
entities: g
.entities
.into_iter()
.map(|e| {
let id = e.id;
let summary = EntitySummary::from(e);
match breakdowns.get(&id) {
Some(bd) => summary.with_persona_breakdown(bd.clone()),
None => summary,
}
})
.collect(),
min_cosine: g.min_cosine,
max_cosine: g.max_cosine,
})
.collect();
HttpResponse::Ok().json(ConsolidationResponse {
groups: groups_view,
})
}