Files
ImageApi/src/knowledge.rs
Cameron Cordes d123cde333 knowledge: entity-graph endpoint for force-directed view
New GET /knowledge/graph?type=&limit= returns the data the
curation UI's graph tab needs:
  - nodes = entities with at least one in-scope fact (rejected /
    superseded excluded). Carries fact_count for visual sizing.
    Top-N by count desc; default cap 200 (clamped 1..1000).
  - edges = relational facts (object_entity_id set) grouped by
    (subject, object, predicate) so 3 "is_friend_of" facts
    between the same pair collapse into one edge with count=3.

Two raw SQL queries: an INNER JOIN onto a persona-scoped fact-
count subquery for nodes (skips 0-fact entities entirely so the
sim doesn't waste time on disconnected islands), then a follow-
up GROUP BY over the persona-scoped fact set restricted to the
node id set via IN clauses (ids are i32 so inlining is safe).

Pairs with the Apollo-side GraphPanel that runs d3-force over
the returned payload and renders SVG with click-to-open.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:26:02 -04:00

1294 lines
46 KiB
Rust

use actix_web::dev::{ServiceFactory, ServiceRequest};
use actix_web::{App, HttpRequest, HttpResponse, Responder, web};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use std::sync::Mutex;
use crate::data::Claims;
use crate::database::models::{Entity, EntityFact, EntityPhotoLink, InsertEntityFact};
use crate::database::{
ConsolidationGroup, EntityFilter, EntityGraph, EntityPatch, EntitySort, FactFilter, FactPatch,
KnowledgeDao, PersonaFilter, RecentActivity,
};
use crate::personas::PersonaDaoData;
use crate::state::AppState;
/// Resolve the `X-Persona-Id` header into a `PersonaFilter`. Missing
/// header → `'default'`. If the persona has `include_all_memories=true`,
/// returns `PersonaFilter::All` so reads see the full hive-mind pool.
/// On JWT-parse failure (sub is not a numeric user_id) the resolver
/// falls through to user_id=1 — the operator convention for service
/// tokens — preserving the historical baseline view. Same fallback
/// applies on any persona-lookup error.
fn resolve_persona_filter(
req: &HttpRequest,
claims: &Claims,
persona_dao: &PersonaDaoData,
) -> PersonaFilter {
let pid = req
.headers()
.get("X-Persona-Id")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
.unwrap_or_else(|| "default".to_string());
let uid = claims.sub.parse::<i32>().unwrap_or(1);
let cx = opentelemetry::Context::current();
let mut dao = persona_dao.lock().expect("Unable to lock PersonaDao");
match dao.get_persona(&cx, uid, &pid) {
Ok(Some(p)) if p.include_all_memories => PersonaFilter::All { user_id: uid },
_ => PersonaFilter::Single {
user_id: uid,
persona_id: pid,
},
}
}
// ---------------------------------------------------------------------------
// Request / Response types
// ---------------------------------------------------------------------------
#[derive(Serialize)]
pub struct EntitySummary {
pub id: i32,
pub name: String,
pub entity_type: String,
pub description: String,
pub confidence: f32,
pub status: String,
pub created_at: i64,
pub updated_at: i64,
/// Persona-scoped count of non-rejected facts about this entity
/// (subject side). 0 when not provided by the call site, e.g.
/// PATCH responses return the bare entity without scoping context.
#[serde(skip_serializing_if = "Option::is_none")]
pub fact_count: Option<i64>,
/// Per-persona breakdown of fact counts for this entity, scoped
/// to the active user. Lets the curation UI surface "this entity
/// is empty under your active persona but has 12 facts in
/// journal" so you know which persona owns the existing
/// knowledge. Skipped on serialization when None.
#[serde(skip_serializing_if = "Option::is_none")]
pub persona_breakdown: Option<Vec<PersonaCount>>,
}
#[derive(Serialize)]
pub struct PersonaCount {
pub persona_id: String,
pub count: i64,
}
impl From<Entity> for EntitySummary {
fn from(e: Entity) -> Self {
EntitySummary {
id: e.id,
name: e.name,
entity_type: e.entity_type,
description: e.description,
confidence: e.confidence,
status: e.status,
created_at: e.created_at,
updated_at: e.updated_at,
fact_count: None,
persona_breakdown: None,
}
}
}
impl EntitySummary {
fn from_entity_with_count(e: Entity, fact_count: i64) -> Self {
let mut s = EntitySummary::from(e);
s.fact_count = Some(fact_count);
s
}
fn with_persona_breakdown(mut self, breakdown: Vec<(String, i64)>) -> Self {
self.persona_breakdown = Some(
breakdown
.into_iter()
.map(|(persona_id, count)| PersonaCount { persona_id, count })
.collect(),
);
self
}
}
#[derive(Serialize)]
pub struct EntityListResponse {
pub entities: Vec<EntitySummary>,
pub total: i64,
pub limit: i64,
pub offset: i64,
}
#[derive(Serialize)]
pub struct FactDetail {
pub id: i32,
pub predicate: String,
pub object_entity_id: Option<i32>,
pub object_entity_name: Option<String>,
pub object_value: Option<String>,
pub confidence: f32,
pub status: String,
pub source_photo: Option<String>,
pub source_insight_id: Option<i32>,
pub created_at: i64,
/// Real-world valid-time interval. NULL on either side means
/// unbounded; both NULL = "always true" / validity unknown.
/// Distinct from `created_at` (transaction time — when we
/// recorded it). See migration 2026-05-10-000100.
pub valid_from: Option<i64>,
pub valid_until: Option<i64>,
/// Points at the entity_facts.id that replaced this one (Phase 2
/// supersession, migration 2026-05-10-000200). Only set when
/// status == 'superseded'.
pub superseded_by: Option<i32>,
/// Provenance — see migration 2026-05-10-000300. NULL on legacy
/// rows. `created_by_backend` is "local" / "hybrid" / "manual".
pub created_by_model: Option<String>,
pub created_by_backend: Option<String>,
/// Audit trail — see migration 2026-05-10-000500. Set on any
/// post-creation mutation. NULL on rows that have never been
/// touched after they were first written.
pub last_modified_by_model: Option<String>,
pub last_modified_by_backend: Option<String>,
pub last_modified_at: Option<i64>,
/// Set when another active fact has the same subject+predicate,
/// a different object, AND their valid-time intervals overlap.
/// Detected at read time by the get_entity handler grouping
/// facts by predicate. Some predicates are legitimately
/// multi-valued ("tagged_in", "friend_of") so this is a *signal*
/// for the curator, not a hard invariant. The interval check
/// keeps "lives_in NYC 2018-2020" + "lives_in SF 2020-present"
/// from false-positive flagging.
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub in_conflict: bool,
}
#[derive(Serialize)]
pub struct PhotoLinkDetail {
pub library_id: i32,
pub file_path: String,
pub role: String,
}
impl From<EntityPhotoLink> for PhotoLinkDetail {
fn from(l: EntityPhotoLink) -> Self {
PhotoLinkDetail {
library_id: l.library_id,
file_path: l.file_path,
role: l.role,
}
}
}
#[derive(Serialize)]
pub struct EntityDetailResponse {
pub id: i32,
pub name: String,
pub entity_type: String,
pub description: String,
pub confidence: f32,
pub status: String,
pub created_at: i64,
pub updated_at: i64,
pub facts: Vec<FactDetail>,
pub photo_links: Vec<PhotoLinkDetail>,
/// Per-persona fact counts for the active user. Mirrors the
/// same field on EntitySummary; the detail panel surfaces a
/// clickable list so the curator can switch to the persona
/// that owns existing facts about this entity.
pub persona_breakdown: Vec<PersonaCount>,
}
#[derive(Serialize)]
pub struct FactSummary {
pub id: i32,
pub subject_entity_id: i32,
pub subject_entity_name: Option<String>,
pub predicate: String,
pub object_entity_id: Option<i32>,
pub object_entity_name: Option<String>,
pub object_value: Option<String>,
pub confidence: f32,
pub status: String,
pub source_photo: Option<String>,
pub source_insight_id: Option<i32>,
pub created_at: i64,
}
#[derive(Serialize)]
pub struct FactListResponse {
pub facts: Vec<FactSummary>,
pub total: i64,
pub limit: i64,
pub offset: i64,
}
#[derive(Deserialize)]
pub struct MergeRequest {
pub source_id: i32,
pub target_id: i32,
}
#[derive(Serialize)]
pub struct MergeResponse {
pub merged_entity_id: i32,
pub deleted_entity_id: i32,
pub facts_transferred: i64,
pub links_transferred: i64,
}
#[derive(Deserialize)]
pub struct EntityPatchRequest {
pub name: Option<String>,
pub description: Option<String>,
pub status: Option<String>,
pub confidence: Option<f32>,
}
/// Serde helper for the "tri-state" pattern: distinguish "field
/// omitted" from "field sent as null". Used for nullable columns
/// where we want PATCH to support both "leave alone" and "set NULL".
fn deserialize_optional_nullable_i64<'de, D>(d: D) -> Result<Option<Option<i64>>, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(Some(Option::<i64>::deserialize(d)?))
}
#[derive(Deserialize)]
pub struct FactPatchRequest {
pub predicate: Option<String>,
pub object_value: Option<String>,
pub status: Option<String>,
pub confidence: Option<f32>,
/// Tri-state: missing = leave alone, null = clear to NULL, number
/// = set. See `deserialize_optional_nullable_i64`.
#[serde(default, deserialize_with = "deserialize_optional_nullable_i64")]
pub valid_from: Option<Option<i64>>,
#[serde(default, deserialize_with = "deserialize_optional_nullable_i64")]
pub valid_until: Option<Option<i64>>,
}
#[derive(Deserialize)]
pub struct SupersedeRequest {
/// The id of the new fact that replaces the path-params one.
pub by_fact_id: i32,
}
#[derive(Deserialize)]
pub struct SynthesizeMergeRequest {
pub source_id: i32,
pub target_id: i32,
}
#[derive(serde::Serialize)]
pub struct SynthesizeMergeResponse {
pub proposed_description: String,
pub model_used: String,
}
#[derive(Deserialize)]
pub struct FactCreateRequest {
pub subject_entity_id: i32,
pub predicate: String,
pub object_entity_id: Option<i32>,
pub object_value: Option<String>,
pub source_photo: Option<String>,
pub confidence: Option<f32>,
pub valid_from: Option<i64>,
pub valid_until: Option<i64>,
}
#[derive(Deserialize)]
pub struct EntityListQuery {
#[serde(rename = "type")]
pub entity_type: Option<String>,
pub status: Option<String>,
pub search: Option<String>,
/// "updated" (default) | "name" | "count". `count` is persona-scoped
/// via the X-Persona-Id header.
pub sort: Option<String>,
pub limit: Option<i64>,
pub offset: Option<i64>,
}
#[derive(Deserialize)]
pub struct FactListQuery {
pub entity_id: Option<i32>,
pub status: Option<String>,
pub predicate: Option<String>,
pub limit: Option<i64>,
pub offset: Option<i64>,
}
#[derive(Deserialize)]
pub struct RecentQuery {
pub since: Option<i64>,
pub limit: Option<i64>,
}
#[derive(Deserialize)]
pub struct GraphQuery {
#[serde(rename = "type")]
pub entity_type: Option<String>,
pub limit: Option<i64>,
}
#[derive(Serialize)]
pub struct GraphNodeView {
pub id: i32,
pub name: String,
pub entity_type: String,
pub fact_count: i64,
}
#[derive(Serialize)]
pub struct GraphEdgeView {
pub source: i32,
pub target: i32,
pub predicate: String,
pub count: i64,
}
#[derive(Serialize)]
pub struct GraphResponse {
pub nodes: Vec<GraphNodeView>,
pub edges: Vec<GraphEdgeView>,
}
#[derive(Deserialize)]
pub struct ConsolidationQuery {
/// Cosine threshold for clustering. Default 0.85 — looser than
/// the upsert-time guard (0.92) so this view surfaces "probably
/// same" pairs for human review.
pub threshold: Option<f32>,
pub limit: Option<i64>,
}
#[derive(Serialize)]
pub struct ConsolidationGroupView {
pub entities: Vec<EntitySummary>,
pub min_cosine: f32,
pub max_cosine: f32,
}
#[derive(Serialize)]
pub struct ConsolidationResponse {
pub groups: Vec<ConsolidationGroupView>,
}
// ---------------------------------------------------------------------------
// Service registration
// ---------------------------------------------------------------------------
pub fn add_knowledge_services<T, D: KnowledgeDao + 'static>(app: App<T>) -> App<T>
where
T: ServiceFactory<ServiceRequest, Config = (), Error = actix_web::Error, InitError = ()>,
{
app.service(
web::scope("/knowledge")
.service(web::resource("/entities").route(web::get().to(list_entities::<D>)))
.service(web::resource("/entities/merge").route(web::post().to(merge_entities::<D>)))
.service(
web::resource("/entities/synthesize-merge")
.route(web::post().to(synthesize_merge::<D>)),
)
.service(
web::resource("/entities/{id}")
.route(web::get().to(get_entity::<D>))
.route(web::patch().to(patch_entity::<D>))
.route(web::delete().to(delete_entity::<D>)),
)
.service(
web::resource("/facts")
.route(web::get().to(list_facts::<D>))
.route(web::post().to(create_fact::<D>)),
)
.service(
web::resource("/facts/{id}")
.route(web::patch().to(patch_fact::<D>))
.route(web::delete().to(delete_fact::<D>)),
)
.service(
web::resource("/facts/{id}/supersede").route(web::post().to(supersede_fact::<D>)),
)
.service(web::resource("/facts/{id}/restore").route(web::post().to(restore_fact::<D>)))
.service(web::resource("/recent").route(web::get().to(get_recent::<D>)))
.service(
web::resource("/consolidation-proposals")
.route(web::get().to(get_consolidation_proposals::<D>)),
)
.service(web::resource("/graph").route(web::get().to(get_graph::<D>))),
)
}
// ---------------------------------------------------------------------------
// Handlers
// ---------------------------------------------------------------------------
async fn list_entities<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
query: web::Query<EntityListQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let limit = query.limit.unwrap_or(50).min(200);
let offset = query.offset.unwrap_or(0);
let status_filter = match query.status.as_deref() {
None | Some("active") => Some("active".to_string()),
Some("all") => None,
Some(s) => Some(s.to_string()),
};
let sort = match query.sort.as_deref() {
Some("name") => EntitySort::NameAsc,
Some("count") => EntitySort::FactCountDesc,
// "updated" or anything else falls through to the default.
_ => EntitySort::UpdatedDesc,
};
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let filter = EntityFilter {
entity_type: query.entity_type.clone(),
status: status_filter,
search: query.search.clone(),
limit,
offset,
};
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.list_entities_with_fact_counts(&cx, filter, sort, &persona) {
Ok((pairs, total)) => {
// Batch fetch persona breakdowns so the list-row tooltip
// and detail panel can show "0 here · 12 in journal".
// One extra query for the visible page.
let entity_ids: Vec<i32> = pairs.iter().map(|(e, _)| e.id).collect();
let breakdowns = dao
.get_persona_breakdowns_for_entities(&cx, &entity_ids, persona.user_id())
.unwrap_or_default();
let summaries: Vec<EntitySummary> = pairs
.into_iter()
.map(|(e, c)| {
let entity_id = e.id;
let summary = EntitySummary::from_entity_with_count(e, c);
match breakdowns.get(&entity_id) {
Some(bd) => summary.with_persona_breakdown(bd.clone()),
None => summary,
}
})
.collect();
HttpResponse::Ok().json(EntityListResponse {
entities: summaries,
total,
limit,
offset,
})
}
Err(e) => {
log::error!("list_entities error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn get_entity<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
id: web::Path<i32>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current();
let entity_id = id.into_inner();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
let entity = match dao.get_entity_by_id(&cx, entity_id) {
Ok(Some(e)) => e,
Ok(None) => {
return HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"}));
}
Err(e) => {
log::error!("get_entity error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Fetch all facts (all statuses for audit), scoped to the active persona.
let raw_facts: Vec<EntityFact> = match dao.get_facts_for_entity(&cx, entity_id, &persona) {
Ok(f) => f,
Err(e) => {
log::error!("get_facts_for_entity error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Resolve object entity names
let mut facts = Vec::with_capacity(raw_facts.len());
for f in raw_facts {
let object_entity_name = if let Some(oid) = f.object_entity_id {
dao.get_entity_by_id(&cx, oid)
.ok()
.flatten()
.map(|e| e.name)
} else {
None
};
facts.push(FactDetail {
id: f.id,
predicate: f.predicate,
object_entity_id: f.object_entity_id,
object_entity_name,
object_value: f.object_value,
confidence: f.confidence,
status: f.status,
source_photo: f.source_photo,
source_insight_id: f.source_insight_id,
created_at: f.created_at,
valid_from: f.valid_from,
valid_until: f.valid_until,
superseded_by: f.superseded_by,
created_by_model: f.created_by_model,
created_by_backend: f.created_by_backend,
last_modified_by_model: f.last_modified_by_model,
last_modified_by_backend: f.last_modified_by_backend,
last_modified_at: f.last_modified_at,
in_conflict: false,
});
}
// Conflict detection: within the active set, group by predicate;
// for each pair within a group that disagrees on the object,
// flag both only if their valid-time intervals overlap. NULL on
// either bound treats that side as unbounded — a fact with no
// valid-time data still flags against any time period (worst case
// for legacy data; user adds dates to suppress).
fn intervals_overlap(a: (Option<i64>, Option<i64>), b: (Option<i64>, Option<i64>)) -> bool {
let a_lo = a.0.unwrap_or(i64::MIN);
let a_hi = a.1.unwrap_or(i64::MAX);
let b_lo = b.0.unwrap_or(i64::MIN);
let b_hi = b.1.unwrap_or(i64::MAX);
a_lo < b_hi && b_lo < a_hi
}
{
use std::collections::{HashMap, HashSet};
let mut by_predicate: HashMap<String, Vec<usize>> = HashMap::new();
for (idx, f) in facts.iter().enumerate() {
if f.status == "active" {
by_predicate
.entry(f.predicate.clone())
.or_default()
.push(idx);
}
}
let mut to_flag: HashSet<usize> = HashSet::new();
for indices in by_predicate.values() {
if indices.len() < 2 {
continue;
}
for (a_pos, &i) in indices.iter().enumerate() {
for &j in &indices[a_pos + 1..] {
let same_object = facts[i].object_entity_id == facts[j].object_entity_id
&& facts[i].object_value == facts[j].object_value;
if same_object {
continue;
}
if intervals_overlap(
(facts[i].valid_from, facts[i].valid_until),
(facts[j].valid_from, facts[j].valid_until),
) {
to_flag.insert(i);
to_flag.insert(j);
}
}
}
}
for i in to_flag {
facts[i].in_conflict = true;
}
}
// Fetch photo links
let photo_links: Vec<PhotoLinkDetail> = match dao.get_links_for_entity(&cx, entity_id) {
Ok(links) => links.into_iter().map(PhotoLinkDetail::from).collect(),
Err(e) => {
log::error!("get_links_for_entity error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Per-persona breakdown for the detail panel's "facts live in
// {persona}" block — same data the list-row tooltip reads. One
// query, single entity in scope.
let persona_breakdown: Vec<PersonaCount> = dao
.get_persona_breakdowns_for_entities(&cx, &[entity_id], persona.user_id())
.ok()
.and_then(|mut map| map.remove(&entity_id))
.unwrap_or_default()
.into_iter()
.map(|(persona_id, count)| PersonaCount { persona_id, count })
.collect();
HttpResponse::Ok().json(EntityDetailResponse {
id: entity.id,
name: entity.name,
entity_type: entity.entity_type,
description: entity.description,
confidence: entity.confidence,
status: entity.status,
created_at: entity.created_at,
updated_at: entity.updated_at,
facts,
photo_links,
persona_breakdown,
})
}
async fn patch_entity<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
body: web::Json<EntityPatchRequest>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let entity_id = id.into_inner();
let patch = EntityPatch {
name: body.name.clone(),
description: body.description.clone(),
status: body.status.clone(),
confidence: body.confidence,
};
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.update_entity(&cx, entity_id, patch) {
Ok(Some(entity)) => HttpResponse::Ok().json(EntitySummary::from(entity)),
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"})),
Err(e) => {
log::error!("patch_entity error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn delete_entity<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let entity_id = id.into_inner();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
// Verify entity exists before deleting
match dao.get_entity_by_id(&cx, entity_id) {
Ok(None) => {
return HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"}));
}
Err(e) => {
log::error!("delete_entity lookup error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
Ok(Some(_)) => {}
}
match dao.delete_entity(&cx, entity_id) {
Ok(()) => HttpResponse::NoContent().finish(),
Err(e) => {
log::error!("delete_entity error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn merge_entities<D: KnowledgeDao + 'static>(
_claims: Claims,
body: web::Json<MergeRequest>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
if body.source_id == body.target_id {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "source_id and target_id must be different"}));
}
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
// Verify both entities exist
for id in [body.source_id, body.target_id] {
match dao.get_entity_by_id(&cx, id) {
Ok(None) => {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": format!("Entity {} not found", id)}));
}
Err(e) => {
log::error!("merge_entities lookup error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
Ok(Some(_)) => {}
}
}
match dao.merge_entities(&cx, body.source_id, body.target_id) {
Ok((facts_transferred, links_transferred)) => HttpResponse::Ok().json(MergeResponse {
merged_entity_id: body.target_id,
deleted_entity_id: body.source_id,
facts_transferred,
links_transferred,
}),
Err(e) => {
log::error!("merge_entities error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
/// Preview a merged-description before the actual merge fires. Calls
/// the local Ollama with both entities' names + descriptions and
/// returns a synthesized rewrite that combines them. The curator
/// previews, edits, and either accepts (PATCH target's description
/// then POST /merge) or skips (just /merge as-is).
///
/// Deliberately doesn't touch the database — read-only on entities,
/// no LLM call gets to write anything. If the model is unavailable
/// the handler returns 503 so the UI can degrade gracefully (skip
/// the preview, fall back to the existing merge action).
async fn synthesize_merge<D: KnowledgeDao + 'static>(
_claims: Claims,
body: web::Json<SynthesizeMergeRequest>,
dao: web::Data<Mutex<D>>,
app_state: web::Data<AppState>,
) -> impl Responder {
if body.source_id == body.target_id {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "source_id and target_id must differ"}));
}
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
let source = match dao.get_entity_by_id(&cx, body.source_id) {
Ok(Some(e)) => e,
Ok(None) => {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "source entity not found"}));
}
Err(e) => {
log::error!("synthesize_merge source lookup: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
let target = match dao.get_entity_by_id(&cx, body.target_id) {
Ok(Some(e)) => e,
Ok(None) => {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "target entity not found"}));
}
Err(e) => {
log::error!("synthesize_merge target lookup: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Drop the DAO lock before the LLM call — the generate request
// is the slow part (seconds) and we don't want to block other
// knowledge reads while it runs.
drop(dao);
let source_desc = if source.description.trim().is_empty() {
"(none)".to_string()
} else {
source.description.clone()
};
let target_desc = if target.description.trim().is_empty() {
"(none)".to_string()
} else {
target.description.clone()
};
let system = "You are condensing two stored entity descriptions into one. The two \
entities refer to the same real-world thing and are about to be merged. Write a \
single neutral third-person description (1-2 sentences, max 300 chars) that \
preserves any concrete facts in either source. Do not invent details. Do not \
editorialize. Plain prose only — no markdown, no bold, no italics, no headings, \
no bullets, no lists, no code fences. Return ONLY the merged description — no \
preamble, no labels, no quotes.";
let prompt = format!(
"Entity A: {} [{}]\nDescription: {}\n\nEntity B: {} [{}]\nDescription: {}\n\nMerged description:",
source.name, source.entity_type, source_desc, target.name, target.entity_type, target_desc,
);
let ollama = app_state.ollama.clone();
let model_used = ollama.primary_model.clone();
let proposed = match ollama.generate(&prompt, Some(system)).await {
Ok(out) => {
// Strip the framing models reach for even with explicit
// "no preamble" guidance: leading "Merged description:"
// labels, wrapping quotes, ``` code fences, leading
// bullets / hash headings. Belt-and-braces against the
// system prompt's plain-text directive.
let mut s = out.trim().to_string();
s = s
.trim_start_matches("Merged description:")
.trim_start_matches("Merged Description:")
.trim()
.to_string();
// Code fences (``` or ```text)
s = s
.trim_start_matches("```text")
.trim_start_matches("```markdown")
.trim_start_matches("```")
.trim_end_matches("```")
.trim()
.to_string();
// Markdown headings / bullets at the very start
while let Some(stripped) = s
.strip_prefix('#')
.or_else(|| s.strip_prefix('*'))
.or_else(|| s.strip_prefix('-'))
.or_else(|| s.strip_prefix('>'))
{
s = stripped.trim_start().to_string();
}
// Wrapping quotes
s = s.trim_matches(|c| c == '"' || c == '\'').to_string();
// Inline emphasis: drop standalone `**` / `*` / `__` /
// `_` markers without trying to parse markdown — just
// remove the punctuation. Rare enough that this naive
// replace is fine.
s = s.replace("**", "").replace("__", "");
s
}
Err(e) => {
log::warn!("synthesize_merge generate failed: {:?}", e);
return HttpResponse::ServiceUnavailable().json(serde_json::json!({
"error": "LLM unavailable; the merge picker should fall back to skip-synthesis."
}));
}
};
HttpResponse::Ok().json(SynthesizeMergeResponse {
proposed_description: proposed,
model_used,
})
}
async fn list_facts<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
query: web::Query<FactListQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let limit = query.limit.unwrap_or(50).min(200);
let offset = query.offset.unwrap_or(0);
let status_filter = match query.status.as_deref() {
None | Some("active") => Some("active".to_string()),
Some("all") => None,
Some(s) => Some(s.to_string()),
};
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let filter = FactFilter {
entity_id: query.entity_id,
status: status_filter,
predicate: query.predicate.clone(),
persona,
limit,
offset,
};
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.list_facts(&cx, filter) {
Ok((facts, total)) => {
let mut summaries = Vec::with_capacity(facts.len());
for f in facts {
let subject_entity_name = dao
.get_entity_by_id(&cx, f.subject_entity_id)
.ok()
.flatten()
.map(|e| e.name);
let object_entity_name = if let Some(oid) = f.object_entity_id {
dao.get_entity_by_id(&cx, oid)
.ok()
.flatten()
.map(|e| e.name)
} else {
None
};
summaries.push(FactSummary {
id: f.id,
subject_entity_id: f.subject_entity_id,
subject_entity_name,
predicate: f.predicate,
object_entity_id: f.object_entity_id,
object_entity_name,
object_value: f.object_value,
confidence: f.confidence,
status: f.status,
source_photo: f.source_photo,
source_insight_id: f.source_insight_id,
created_at: f.created_at,
});
}
HttpResponse::Ok().json(FactListResponse {
facts: summaries,
total,
limit,
offset,
})
}
Err(e) => {
log::error!("list_facts error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn create_fact<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
body: web::Json<FactCreateRequest>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
if body.object_entity_id.is_none() && body.object_value.is_none() {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": "object_entity_id or object_value is required"
}));
}
if body.predicate.trim().is_empty() {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "predicate must not be empty"}));
}
// Persona scoping: facts are written under the active single persona.
// PersonaFilter::All is read-only ("hive-mind" view); callers should
// pin a specific persona for writes via X-Persona-Id.
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let (user_id, persona_id) = match &persona {
PersonaFilter::Single {
user_id,
persona_id,
} => (*user_id, persona_id.clone()),
PersonaFilter::All { user_id } => (*user_id, "default".to_string()),
};
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
// Verify subject entity exists.
match dao.get_entity_by_id(&cx, body.subject_entity_id) {
Ok(None) => {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": format!("Subject entity {} not found", body.subject_entity_id)
}));
}
Err(e) => {
log::error!("create_fact subject lookup error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
Ok(Some(_)) => {}
}
// Optional object entity validation when supplied.
if let Some(oid) = body.object_entity_id {
match dao.get_entity_by_id(&cx, oid) {
Ok(None) => {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": format!("Object entity {} not found", oid)
}));
}
Err(e) => {
log::error!("create_fact object lookup error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
Ok(Some(_)) => {}
}
}
let now = Utc::now().timestamp();
let confidence = body.confidence.unwrap_or(0.6).clamp(0.0, 0.95);
let insert = InsertEntityFact {
subject_entity_id: body.subject_entity_id,
predicate: body.predicate.trim().to_string(),
object_entity_id: body.object_entity_id,
object_value: body.object_value.clone(),
source_photo: body.source_photo.clone(),
source_insight_id: None,
confidence,
status: "active".to_string(),
created_at: now,
persona_id,
user_id,
valid_from: body.valid_from,
valid_until: body.valid_until,
superseded_by: None,
// Manual creation via curation UI — provenance recorded as
// "manual" with no model, distinguishing user-entered facts
// from agent-generated ones in the audit view.
created_by_model: None,
created_by_backend: Some("manual".to_string()),
last_modified_by_model: None,
last_modified_by_backend: None,
last_modified_at: None,
};
match dao.upsert_fact(&cx, insert) {
Ok((fact, is_new)) => {
let status = if is_new {
actix_web::http::StatusCode::CREATED
} else {
actix_web::http::StatusCode::OK
};
HttpResponse::build(status).json(fact)
}
Err(e) => {
log::error!("create_fact upsert error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn patch_fact<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
body: web::Json<FactPatchRequest>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let fact_id = id.into_inner();
let patch = FactPatch {
predicate: body.predicate.clone(),
object_value: body.object_value.clone(),
status: body.status.clone(),
confidence: body.confidence,
valid_from: body.valid_from,
valid_until: body.valid_until,
};
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
// Manual PATCH from the curation UI — provenance stamped as
// "manual" so the audit feed can distinguish human edits from
// agent corrections.
match dao.update_fact(&cx, fact_id, patch, Some(("manual", "manual"))) {
Ok(Some(fact)) => HttpResponse::Ok().json(fact),
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})),
Err(e) => {
log::error!("patch_fact error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn delete_fact<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let fact_id = id.into_inner();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.delete_fact(&cx, fact_id) {
Ok(()) => HttpResponse::NoContent().finish(),
Err(e) => {
log::warn!("delete_fact({}) error: {:?}", fact_id, e);
HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"}))
}
}
}
async fn supersede_fact<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
body: web::Json<SupersedeRequest>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let old_id = id.into_inner();
if old_id == body.by_fact_id {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "old_id and by_fact_id must differ"}));
}
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
// Manual supersede from the curation UI — same stamping rule as
// the PATCH path.
match dao.supersede_fact(&cx, old_id, body.by_fact_id, Some(("manual", "manual"))) {
Ok(Some(fact)) => HttpResponse::Ok().json(fact),
Ok(None) => {
HttpResponse::NotFound().json(serde_json::json!({"error": "Old or new fact not found"}))
}
Err(e) => {
log::error!("supersede_fact error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn restore_fact<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let fact_id = id.into_inner();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.revert_supersession(&cx, fact_id, Some(("manual", "manual"))) {
Ok(Some(fact)) => HttpResponse::Ok().json(fact),
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})),
Err(e) => {
log::error!("restore_fact error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn get_recent<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
query: web::Query<RecentQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let since = query
.since
.unwrap_or_else(|| Utc::now().timestamp() - 86400);
let limit = query.limit.unwrap_or(20).min(100);
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.get_recent_activity(&cx, since, limit, &persona) {
Ok(RecentActivity { entities, facts }) => {
let entity_summaries: Vec<EntitySummary> =
entities.into_iter().map(EntitySummary::from).collect();
HttpResponse::Ok().json(serde_json::json!({
"entities": entity_summaries,
"facts": facts
}))
}
Err(e) => {
log::error!("get_recent error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn get_graph<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
query: web::Query<GraphQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
let limit = query.limit.unwrap_or(200).clamp(1, 1000) as usize;
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.build_entity_graph(&cx, query.entity_type.as_deref(), limit, &persona) {
Ok(EntityGraph { nodes, edges }) => HttpResponse::Ok().json(GraphResponse {
nodes: nodes
.into_iter()
.map(|n| GraphNodeView {
id: n.id,
name: n.name,
entity_type: n.entity_type,
fact_count: n.fact_count,
})
.collect(),
edges: edges
.into_iter()
.map(|e| GraphEdgeView {
source: e.source,
target: e.target,
predicate: e.predicate,
count: e.count,
})
.collect(),
}),
Err(e) => {
log::error!("build_entity_graph error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn get_consolidation_proposals<D: KnowledgeDao + 'static>(
req: HttpRequest,
claims: Claims,
query: web::Query<ConsolidationQuery>,
dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder {
// Clamp threshold so a curious client can't drag the cosine
// floor to 0 and pull every entity into one giant cluster.
let threshold = query.threshold.unwrap_or(0.85).clamp(0.5, 0.99);
let max_groups = query.limit.unwrap_or(50).clamp(1, 200) as usize;
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
let groups: Vec<ConsolidationGroup> =
match dao.find_consolidation_proposals(&cx, threshold, max_groups) {
Ok(g) => g,
Err(e) => {
log::error!("find_consolidation_proposals: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Decorate with per-persona fact counts so the curation UI can
// show "default 8 · journal 3" inline and the curator can pick
// which entity is the strongest target.
let entity_ids: Vec<i32> = groups
.iter()
.flat_map(|g| g.entities.iter().map(|e| e.id))
.collect();
let breakdowns = dao
.get_persona_breakdowns_for_entities(&cx, &entity_ids, persona.user_id())
.unwrap_or_default();
let groups_view: Vec<ConsolidationGroupView> = groups
.into_iter()
.map(|g| ConsolidationGroupView {
entities: g
.entities
.into_iter()
.map(|e| {
let id = e.id;
let summary = EntitySummary::from(e);
match breakdowns.get(&id) {
Some(bd) => summary.with_persona_breakdown(bd.clone()),
None => summary,
}
})
.collect(),
min_cosine: g.min_cosine,
max_cosine: g.max_cosine,
})
.collect();
HttpResponse::Ok().json(ConsolidationResponse {
groups: groups_view,
})
}