use actix_web::dev::{ServiceFactory, ServiceRequest}; use actix_web::{App, HttpRequest, HttpResponse, Responder, web}; use chrono::Utc; use serde::{Deserialize, Serialize}; use std::sync::Mutex; use crate::data::Claims; use crate::database::models::{Entity, EntityFact, EntityPhotoLink, InsertEntityFact}; use crate::database::{ ConsolidationGroup, EntityFilter, EntityPatch, EntitySort, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, RecentActivity, }; use crate::personas::PersonaDaoData; use crate::state::AppState; /// Resolve the `X-Persona-Id` header into a `PersonaFilter`. Missing /// header → `'default'`. If the persona has `include_all_memories=true`, /// returns `PersonaFilter::All` so reads see the full hive-mind pool. /// On JWT-parse failure (sub is not a numeric user_id) the resolver /// falls through to user_id=1 — the operator convention for service /// tokens — preserving the historical baseline view. Same fallback /// applies on any persona-lookup error. fn resolve_persona_filter( req: &HttpRequest, claims: &Claims, persona_dao: &PersonaDaoData, ) -> PersonaFilter { let pid = req .headers() .get("X-Persona-Id") .and_then(|v| v.to_str().ok()) .map(|s| s.to_string()) .unwrap_or_else(|| "default".to_string()); let uid = claims.sub.parse::().unwrap_or(1); let cx = opentelemetry::Context::current(); let mut dao = persona_dao.lock().expect("Unable to lock PersonaDao"); match dao.get_persona(&cx, uid, &pid) { Ok(Some(p)) if p.include_all_memories => PersonaFilter::All { user_id: uid }, _ => PersonaFilter::Single { user_id: uid, persona_id: pid, }, } } // --------------------------------------------------------------------------- // Request / Response types // --------------------------------------------------------------------------- #[derive(Serialize)] pub struct EntitySummary { pub id: i32, pub name: String, pub entity_type: String, pub description: String, pub confidence: f32, pub status: String, pub created_at: i64, pub updated_at: i64, /// Persona-scoped count of non-rejected facts about this entity /// (subject side). 0 when not provided by the call site, e.g. /// PATCH responses return the bare entity without scoping context. #[serde(skip_serializing_if = "Option::is_none")] pub fact_count: Option, /// Per-persona breakdown of fact counts for this entity, scoped /// to the active user. Lets the curation UI surface "this entity /// is empty under your active persona but has 12 facts in /// journal" so you know which persona owns the existing /// knowledge. Skipped on serialization when None. #[serde(skip_serializing_if = "Option::is_none")] pub persona_breakdown: Option>, } #[derive(Serialize)] pub struct PersonaCount { pub persona_id: String, pub count: i64, } impl From for EntitySummary { fn from(e: Entity) -> Self { EntitySummary { id: e.id, name: e.name, entity_type: e.entity_type, description: e.description, confidence: e.confidence, status: e.status, created_at: e.created_at, updated_at: e.updated_at, fact_count: None, persona_breakdown: None, } } } impl EntitySummary { fn from_entity_with_count(e: Entity, fact_count: i64) -> Self { let mut s = EntitySummary::from(e); s.fact_count = Some(fact_count); s } fn with_persona_breakdown(mut self, breakdown: Vec<(String, i64)>) -> Self { self.persona_breakdown = Some( breakdown .into_iter() .map(|(persona_id, count)| PersonaCount { persona_id, count }) .collect(), ); self } } #[derive(Serialize)] pub struct EntityListResponse { pub entities: Vec, pub total: i64, pub limit: i64, pub offset: i64, } #[derive(Serialize)] pub struct FactDetail { pub id: i32, pub predicate: String, pub object_entity_id: Option, pub object_entity_name: Option, pub object_value: Option, pub confidence: f32, pub status: String, pub source_photo: Option, pub source_insight_id: Option, pub created_at: i64, /// Real-world valid-time interval. NULL on either side means /// unbounded; both NULL = "always true" / validity unknown. /// Distinct from `created_at` (transaction time — when we /// recorded it). See migration 2026-05-10-000100. pub valid_from: Option, pub valid_until: Option, /// Points at the entity_facts.id that replaced this one (Phase 2 /// supersession, migration 2026-05-10-000200). Only set when /// status == 'superseded'. pub superseded_by: Option, /// Provenance — see migration 2026-05-10-000300. NULL on legacy /// rows. `created_by_backend` is "local" / "hybrid" / "manual". pub created_by_model: Option, pub created_by_backend: Option, /// Audit trail — see migration 2026-05-10-000500. Set on any /// post-creation mutation. NULL on rows that have never been /// touched after they were first written. pub last_modified_by_model: Option, pub last_modified_by_backend: Option, pub last_modified_at: Option, /// Set when another active fact has the same subject+predicate, /// a different object, AND their valid-time intervals overlap. /// Detected at read time by the get_entity handler grouping /// facts by predicate. Some predicates are legitimately /// multi-valued ("tagged_in", "friend_of") so this is a *signal* /// for the curator, not a hard invariant. The interval check /// keeps "lives_in NYC 2018-2020" + "lives_in SF 2020-present" /// from false-positive flagging. #[serde(skip_serializing_if = "std::ops::Not::not")] pub in_conflict: bool, } #[derive(Serialize)] pub struct PhotoLinkDetail { pub library_id: i32, pub file_path: String, pub role: String, } impl From for PhotoLinkDetail { fn from(l: EntityPhotoLink) -> Self { PhotoLinkDetail { library_id: l.library_id, file_path: l.file_path, role: l.role, } } } #[derive(Serialize)] pub struct EntityDetailResponse { pub id: i32, pub name: String, pub entity_type: String, pub description: String, pub confidence: f32, pub status: String, pub created_at: i64, pub updated_at: i64, pub facts: Vec, pub photo_links: Vec, /// Per-persona fact counts for the active user. Mirrors the /// same field on EntitySummary; the detail panel surfaces a /// clickable list so the curator can switch to the persona /// that owns existing facts about this entity. pub persona_breakdown: Vec, } #[derive(Serialize)] pub struct FactSummary { pub id: i32, pub subject_entity_id: i32, pub subject_entity_name: Option, pub predicate: String, pub object_entity_id: Option, pub object_entity_name: Option, pub object_value: Option, pub confidence: f32, pub status: String, pub source_photo: Option, pub source_insight_id: Option, pub created_at: i64, } #[derive(Serialize)] pub struct FactListResponse { pub facts: Vec, pub total: i64, pub limit: i64, pub offset: i64, } #[derive(Deserialize)] pub struct MergeRequest { pub source_id: i32, pub target_id: i32, } #[derive(Serialize)] pub struct MergeResponse { pub merged_entity_id: i32, pub deleted_entity_id: i32, pub facts_transferred: i64, pub links_transferred: i64, } #[derive(Deserialize)] pub struct EntityPatchRequest { pub name: Option, pub description: Option, pub status: Option, pub confidence: Option, } /// Serde helper for the "tri-state" pattern: distinguish "field /// omitted" from "field sent as null". Used for nullable columns /// where we want PATCH to support both "leave alone" and "set NULL". fn deserialize_optional_nullable_i64<'de, D>(d: D) -> Result>, D::Error> where D: serde::Deserializer<'de>, { Ok(Some(Option::::deserialize(d)?)) } #[derive(Deserialize)] pub struct FactPatchRequest { pub predicate: Option, pub object_value: Option, pub status: Option, pub confidence: Option, /// Tri-state: missing = leave alone, null = clear to NULL, number /// = set. See `deserialize_optional_nullable_i64`. #[serde(default, deserialize_with = "deserialize_optional_nullable_i64")] pub valid_from: Option>, #[serde(default, deserialize_with = "deserialize_optional_nullable_i64")] pub valid_until: Option>, } #[derive(Deserialize)] pub struct SupersedeRequest { /// The id of the new fact that replaces the path-params one. pub by_fact_id: i32, } #[derive(Deserialize)] pub struct SynthesizeMergeRequest { pub source_id: i32, pub target_id: i32, } #[derive(serde::Serialize)] pub struct SynthesizeMergeResponse { pub proposed_description: String, pub model_used: String, } #[derive(Deserialize)] pub struct FactCreateRequest { pub subject_entity_id: i32, pub predicate: String, pub object_entity_id: Option, pub object_value: Option, pub source_photo: Option, pub confidence: Option, pub valid_from: Option, pub valid_until: Option, } #[derive(Deserialize)] pub struct EntityListQuery { #[serde(rename = "type")] pub entity_type: Option, pub status: Option, pub search: Option, /// "updated" (default) | "name" | "count". `count` is persona-scoped /// via the X-Persona-Id header. pub sort: Option, pub limit: Option, pub offset: Option, } #[derive(Deserialize)] pub struct FactListQuery { pub entity_id: Option, pub status: Option, pub predicate: Option, pub limit: Option, pub offset: Option, } #[derive(Deserialize)] pub struct RecentQuery { pub since: Option, pub limit: Option, } #[derive(Deserialize)] pub struct ConsolidationQuery { /// Cosine threshold for clustering. Default 0.85 — looser than /// the upsert-time guard (0.92) so this view surfaces "probably /// same" pairs for human review. pub threshold: Option, pub limit: Option, } #[derive(Serialize)] pub struct ConsolidationGroupView { pub entities: Vec, pub min_cosine: f32, pub max_cosine: f32, } #[derive(Serialize)] pub struct ConsolidationResponse { pub groups: Vec, } // --------------------------------------------------------------------------- // Service registration // --------------------------------------------------------------------------- pub fn add_knowledge_services(app: App) -> App where T: ServiceFactory, { app.service( web::scope("/knowledge") .service(web::resource("/entities").route(web::get().to(list_entities::))) .service(web::resource("/entities/merge").route(web::post().to(merge_entities::))) .service( web::resource("/entities/synthesize-merge") .route(web::post().to(synthesize_merge::)), ) .service( web::resource("/entities/{id}") .route(web::get().to(get_entity::)) .route(web::patch().to(patch_entity::)) .route(web::delete().to(delete_entity::)), ) .service( web::resource("/facts") .route(web::get().to(list_facts::)) .route(web::post().to(create_fact::)), ) .service( web::resource("/facts/{id}") .route(web::patch().to(patch_fact::)) .route(web::delete().to(delete_fact::)), ) .service( web::resource("/facts/{id}/supersede").route(web::post().to(supersede_fact::)), ) .service(web::resource("/facts/{id}/restore").route(web::post().to(restore_fact::))) .service(web::resource("/recent").route(web::get().to(get_recent::))) .service( web::resource("/consolidation-proposals") .route(web::get().to(get_consolidation_proposals::)), ), ) } // --------------------------------------------------------------------------- // Handlers // --------------------------------------------------------------------------- async fn list_entities( req: HttpRequest, claims: Claims, query: web::Query, dao: web::Data>, persona_dao: PersonaDaoData, ) -> impl Responder { let limit = query.limit.unwrap_or(50).min(200); let offset = query.offset.unwrap_or(0); let status_filter = match query.status.as_deref() { None | Some("active") => Some("active".to_string()), Some("all") => None, Some(s) => Some(s.to_string()), }; let sort = match query.sort.as_deref() { Some("name") => EntitySort::NameAsc, Some("count") => EntitySort::FactCountDesc, // "updated" or anything else falls through to the default. _ => EntitySort::UpdatedDesc, }; let persona = resolve_persona_filter(&req, &claims, &persona_dao); let filter = EntityFilter { entity_type: query.entity_type.clone(), status: status_filter, search: query.search.clone(), limit, offset, }; let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); match dao.list_entities_with_fact_counts(&cx, filter, sort, &persona) { Ok((pairs, total)) => { // Batch fetch persona breakdowns so the list-row tooltip // and detail panel can show "0 here · 12 in journal". // One extra query for the visible page. let entity_ids: Vec = pairs.iter().map(|(e, _)| e.id).collect(); let breakdowns = dao .get_persona_breakdowns_for_entities(&cx, &entity_ids, persona.user_id()) .unwrap_or_default(); let summaries: Vec = pairs .into_iter() .map(|(e, c)| { let entity_id = e.id; let summary = EntitySummary::from_entity_with_count(e, c); match breakdowns.get(&entity_id) { Some(bd) => summary.with_persona_breakdown(bd.clone()), None => summary, } }) .collect(); HttpResponse::Ok().json(EntityListResponse { entities: summaries, total, limit, offset, }) } Err(e) => { log::error!("list_entities error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn get_entity( req: HttpRequest, claims: Claims, id: web::Path, dao: web::Data>, persona_dao: PersonaDaoData, ) -> impl Responder { let persona = resolve_persona_filter(&req, &claims, &persona_dao); let cx = opentelemetry::Context::current(); let entity_id = id.into_inner(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); let entity = match dao.get_entity_by_id(&cx, entity_id) { Ok(Some(e)) => e, Ok(None) => { return HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"})); } Err(e) => { log::error!("get_entity error: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } }; // Fetch all facts (all statuses for audit), scoped to the active persona. let raw_facts: Vec = match dao.get_facts_for_entity(&cx, entity_id, &persona) { Ok(f) => f, Err(e) => { log::error!("get_facts_for_entity error: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } }; // Resolve object entity names let mut facts = Vec::with_capacity(raw_facts.len()); for f in raw_facts { let object_entity_name = if let Some(oid) = f.object_entity_id { dao.get_entity_by_id(&cx, oid) .ok() .flatten() .map(|e| e.name) } else { None }; facts.push(FactDetail { id: f.id, predicate: f.predicate, object_entity_id: f.object_entity_id, object_entity_name, object_value: f.object_value, confidence: f.confidence, status: f.status, source_photo: f.source_photo, source_insight_id: f.source_insight_id, created_at: f.created_at, valid_from: f.valid_from, valid_until: f.valid_until, superseded_by: f.superseded_by, created_by_model: f.created_by_model, created_by_backend: f.created_by_backend, last_modified_by_model: f.last_modified_by_model, last_modified_by_backend: f.last_modified_by_backend, last_modified_at: f.last_modified_at, in_conflict: false, }); } // Conflict detection: within the active set, group by predicate; // for each pair within a group that disagrees on the object, // flag both only if their valid-time intervals overlap. NULL on // either bound treats that side as unbounded — a fact with no // valid-time data still flags against any time period (worst case // for legacy data; user adds dates to suppress). fn intervals_overlap(a: (Option, Option), b: (Option, Option)) -> bool { let a_lo = a.0.unwrap_or(i64::MIN); let a_hi = a.1.unwrap_or(i64::MAX); let b_lo = b.0.unwrap_or(i64::MIN); let b_hi = b.1.unwrap_or(i64::MAX); a_lo < b_hi && b_lo < a_hi } { use std::collections::{HashMap, HashSet}; let mut by_predicate: HashMap> = HashMap::new(); for (idx, f) in facts.iter().enumerate() { if f.status == "active" { by_predicate .entry(f.predicate.clone()) .or_default() .push(idx); } } let mut to_flag: HashSet = HashSet::new(); for indices in by_predicate.values() { if indices.len() < 2 { continue; } for (a_pos, &i) in indices.iter().enumerate() { for &j in &indices[a_pos + 1..] { let same_object = facts[i].object_entity_id == facts[j].object_entity_id && facts[i].object_value == facts[j].object_value; if same_object { continue; } if intervals_overlap( (facts[i].valid_from, facts[i].valid_until), (facts[j].valid_from, facts[j].valid_until), ) { to_flag.insert(i); to_flag.insert(j); } } } } for i in to_flag { facts[i].in_conflict = true; } } // Fetch photo links let photo_links: Vec = match dao.get_links_for_entity(&cx, entity_id) { Ok(links) => links.into_iter().map(PhotoLinkDetail::from).collect(), Err(e) => { log::error!("get_links_for_entity error: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } }; // Per-persona breakdown for the detail panel's "facts live in // {persona}" block — same data the list-row tooltip reads. One // query, single entity in scope. let persona_breakdown: Vec = dao .get_persona_breakdowns_for_entities(&cx, &[entity_id], persona.user_id()) .ok() .and_then(|mut map| map.remove(&entity_id)) .unwrap_or_default() .into_iter() .map(|(persona_id, count)| PersonaCount { persona_id, count }) .collect(); HttpResponse::Ok().json(EntityDetailResponse { id: entity.id, name: entity.name, entity_type: entity.entity_type, description: entity.description, confidence: entity.confidence, status: entity.status, created_at: entity.created_at, updated_at: entity.updated_at, facts, photo_links, persona_breakdown, }) } async fn patch_entity( _claims: Claims, id: web::Path, body: web::Json, dao: web::Data>, ) -> impl Responder { let cx = opentelemetry::Context::current(); let entity_id = id.into_inner(); let patch = EntityPatch { name: body.name.clone(), description: body.description.clone(), status: body.status.clone(), confidence: body.confidence, }; let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); match dao.update_entity(&cx, entity_id, patch) { Ok(Some(entity)) => HttpResponse::Ok().json(EntitySummary::from(entity)), Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"})), Err(e) => { log::error!("patch_entity error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn delete_entity( _claims: Claims, id: web::Path, dao: web::Data>, ) -> impl Responder { let cx = opentelemetry::Context::current(); let entity_id = id.into_inner(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); // Verify entity exists before deleting match dao.get_entity_by_id(&cx, entity_id) { Ok(None) => { return HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"})); } Err(e) => { log::error!("delete_entity lookup error: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } Ok(Some(_)) => {} } match dao.delete_entity(&cx, entity_id) { Ok(()) => HttpResponse::NoContent().finish(), Err(e) => { log::error!("delete_entity error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn merge_entities( _claims: Claims, body: web::Json, dao: web::Data>, ) -> impl Responder { if body.source_id == body.target_id { return HttpResponse::BadRequest() .json(serde_json::json!({"error": "source_id and target_id must be different"})); } let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); // Verify both entities exist for id in [body.source_id, body.target_id] { match dao.get_entity_by_id(&cx, id) { Ok(None) => { return HttpResponse::BadRequest() .json(serde_json::json!({"error": format!("Entity {} not found", id)})); } Err(e) => { log::error!("merge_entities lookup error: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } Ok(Some(_)) => {} } } match dao.merge_entities(&cx, body.source_id, body.target_id) { Ok((facts_transferred, links_transferred)) => HttpResponse::Ok().json(MergeResponse { merged_entity_id: body.target_id, deleted_entity_id: body.source_id, facts_transferred, links_transferred, }), Err(e) => { log::error!("merge_entities error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } /// Preview a merged-description before the actual merge fires. Calls /// the local Ollama with both entities' names + descriptions and /// returns a synthesized rewrite that combines them. The curator /// previews, edits, and either accepts (PATCH target's description /// then POST /merge) or skips (just /merge as-is). /// /// Deliberately doesn't touch the database — read-only on entities, /// no LLM call gets to write anything. If the model is unavailable /// the handler returns 503 so the UI can degrade gracefully (skip /// the preview, fall back to the existing merge action). async fn synthesize_merge( _claims: Claims, body: web::Json, dao: web::Data>, app_state: web::Data, ) -> impl Responder { if body.source_id == body.target_id { return HttpResponse::BadRequest() .json(serde_json::json!({"error": "source_id and target_id must differ"})); } let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); let source = match dao.get_entity_by_id(&cx, body.source_id) { Ok(Some(e)) => e, Ok(None) => { return HttpResponse::BadRequest() .json(serde_json::json!({"error": "source entity not found"})); } Err(e) => { log::error!("synthesize_merge source lookup: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } }; let target = match dao.get_entity_by_id(&cx, body.target_id) { Ok(Some(e)) => e, Ok(None) => { return HttpResponse::BadRequest() .json(serde_json::json!({"error": "target entity not found"})); } Err(e) => { log::error!("synthesize_merge target lookup: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } }; // Drop the DAO lock before the LLM call — the generate request // is the slow part (seconds) and we don't want to block other // knowledge reads while it runs. drop(dao); let source_desc = if source.description.trim().is_empty() { "(none)".to_string() } else { source.description.clone() }; let target_desc = if target.description.trim().is_empty() { "(none)".to_string() } else { target.description.clone() }; let system = "You are condensing two stored entity descriptions into one. The two \ entities refer to the same real-world thing and are about to be merged. Write a \ single neutral third-person description (1-2 sentences, max 300 chars) that \ preserves any concrete facts in either source. Do not invent details. Do not \ editorialize. Plain prose only — no markdown, no bold, no italics, no headings, \ no bullets, no lists, no code fences. Return ONLY the merged description — no \ preamble, no labels, no quotes."; let prompt = format!( "Entity A: {} [{}]\nDescription: {}\n\nEntity B: {} [{}]\nDescription: {}\n\nMerged description:", source.name, source.entity_type, source_desc, target.name, target.entity_type, target_desc, ); let ollama = app_state.ollama.clone(); let model_used = ollama.primary_model.clone(); let proposed = match ollama.generate(&prompt, Some(system)).await { Ok(out) => { // Strip the framing models reach for even with explicit // "no preamble" guidance: leading "Merged description:" // labels, wrapping quotes, ``` code fences, leading // bullets / hash headings. Belt-and-braces against the // system prompt's plain-text directive. let mut s = out.trim().to_string(); s = s .trim_start_matches("Merged description:") .trim_start_matches("Merged Description:") .trim() .to_string(); // Code fences (``` or ```text) s = s .trim_start_matches("```text") .trim_start_matches("```markdown") .trim_start_matches("```") .trim_end_matches("```") .trim() .to_string(); // Markdown headings / bullets at the very start while let Some(stripped) = s .strip_prefix('#') .or_else(|| s.strip_prefix('*')) .or_else(|| s.strip_prefix('-')) .or_else(|| s.strip_prefix('>')) { s = stripped.trim_start().to_string(); } // Wrapping quotes s = s.trim_matches(|c| c == '"' || c == '\'').to_string(); // Inline emphasis: drop standalone `**` / `*` / `__` / // `_` markers without trying to parse markdown — just // remove the punctuation. Rare enough that this naive // replace is fine. s = s.replace("**", "").replace("__", ""); s } Err(e) => { log::warn!("synthesize_merge generate failed: {:?}", e); return HttpResponse::ServiceUnavailable().json(serde_json::json!({ "error": "LLM unavailable; the merge picker should fall back to skip-synthesis." })); } }; HttpResponse::Ok().json(SynthesizeMergeResponse { proposed_description: proposed, model_used, }) } async fn list_facts( req: HttpRequest, claims: Claims, query: web::Query, dao: web::Data>, persona_dao: PersonaDaoData, ) -> impl Responder { let limit = query.limit.unwrap_or(50).min(200); let offset = query.offset.unwrap_or(0); let status_filter = match query.status.as_deref() { None | Some("active") => Some("active".to_string()), Some("all") => None, Some(s) => Some(s.to_string()), }; let persona = resolve_persona_filter(&req, &claims, &persona_dao); let filter = FactFilter { entity_id: query.entity_id, status: status_filter, predicate: query.predicate.clone(), persona, limit, offset, }; let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); match dao.list_facts(&cx, filter) { Ok((facts, total)) => { let mut summaries = Vec::with_capacity(facts.len()); for f in facts { let subject_entity_name = dao .get_entity_by_id(&cx, f.subject_entity_id) .ok() .flatten() .map(|e| e.name); let object_entity_name = if let Some(oid) = f.object_entity_id { dao.get_entity_by_id(&cx, oid) .ok() .flatten() .map(|e| e.name) } else { None }; summaries.push(FactSummary { id: f.id, subject_entity_id: f.subject_entity_id, subject_entity_name, predicate: f.predicate, object_entity_id: f.object_entity_id, object_entity_name, object_value: f.object_value, confidence: f.confidence, status: f.status, source_photo: f.source_photo, source_insight_id: f.source_insight_id, created_at: f.created_at, }); } HttpResponse::Ok().json(FactListResponse { facts: summaries, total, limit, offset, }) } Err(e) => { log::error!("list_facts error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn create_fact( req: HttpRequest, claims: Claims, body: web::Json, dao: web::Data>, persona_dao: PersonaDaoData, ) -> impl Responder { if body.object_entity_id.is_none() && body.object_value.is_none() { return HttpResponse::BadRequest().json(serde_json::json!({ "error": "object_entity_id or object_value is required" })); } if body.predicate.trim().is_empty() { return HttpResponse::BadRequest() .json(serde_json::json!({"error": "predicate must not be empty"})); } // Persona scoping: facts are written under the active single persona. // PersonaFilter::All is read-only ("hive-mind" view); callers should // pin a specific persona for writes via X-Persona-Id. let persona = resolve_persona_filter(&req, &claims, &persona_dao); let (user_id, persona_id) = match &persona { PersonaFilter::Single { user_id, persona_id, } => (*user_id, persona_id.clone()), PersonaFilter::All { user_id } => (*user_id, "default".to_string()), }; let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); // Verify subject entity exists. match dao.get_entity_by_id(&cx, body.subject_entity_id) { Ok(None) => { return HttpResponse::BadRequest().json(serde_json::json!({ "error": format!("Subject entity {} not found", body.subject_entity_id) })); } Err(e) => { log::error!("create_fact subject lookup error: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } Ok(Some(_)) => {} } // Optional object entity validation when supplied. if let Some(oid) = body.object_entity_id { match dao.get_entity_by_id(&cx, oid) { Ok(None) => { return HttpResponse::BadRequest().json(serde_json::json!({ "error": format!("Object entity {} not found", oid) })); } Err(e) => { log::error!("create_fact object lookup error: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } Ok(Some(_)) => {} } } let now = Utc::now().timestamp(); let confidence = body.confidence.unwrap_or(0.6).clamp(0.0, 0.95); let insert = InsertEntityFact { subject_entity_id: body.subject_entity_id, predicate: body.predicate.trim().to_string(), object_entity_id: body.object_entity_id, object_value: body.object_value.clone(), source_photo: body.source_photo.clone(), source_insight_id: None, confidence, status: "active".to_string(), created_at: now, persona_id, user_id, valid_from: body.valid_from, valid_until: body.valid_until, superseded_by: None, // Manual creation via curation UI — provenance recorded as // "manual" with no model, distinguishing user-entered facts // from agent-generated ones in the audit view. created_by_model: None, created_by_backend: Some("manual".to_string()), last_modified_by_model: None, last_modified_by_backend: None, last_modified_at: None, }; match dao.upsert_fact(&cx, insert) { Ok((fact, is_new)) => { let status = if is_new { actix_web::http::StatusCode::CREATED } else { actix_web::http::StatusCode::OK }; HttpResponse::build(status).json(fact) } Err(e) => { log::error!("create_fact upsert error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn patch_fact( _claims: Claims, id: web::Path, body: web::Json, dao: web::Data>, ) -> impl Responder { let cx = opentelemetry::Context::current(); let fact_id = id.into_inner(); let patch = FactPatch { predicate: body.predicate.clone(), object_value: body.object_value.clone(), status: body.status.clone(), confidence: body.confidence, valid_from: body.valid_from, valid_until: body.valid_until, }; let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); // Manual PATCH from the curation UI — provenance stamped as // "manual" so the audit feed can distinguish human edits from // agent corrections. match dao.update_fact(&cx, fact_id, patch, Some(("manual", "manual"))) { Ok(Some(fact)) => HttpResponse::Ok().json(fact), Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})), Err(e) => { log::error!("patch_fact error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn delete_fact( _claims: Claims, id: web::Path, dao: web::Data>, ) -> impl Responder { let cx = opentelemetry::Context::current(); let fact_id = id.into_inner(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); match dao.delete_fact(&cx, fact_id) { Ok(()) => HttpResponse::NoContent().finish(), Err(e) => { log::warn!("delete_fact({}) error: {:?}", fact_id, e); HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})) } } } async fn supersede_fact( _claims: Claims, id: web::Path, body: web::Json, dao: web::Data>, ) -> impl Responder { let cx = opentelemetry::Context::current(); let old_id = id.into_inner(); if old_id == body.by_fact_id { return HttpResponse::BadRequest() .json(serde_json::json!({"error": "old_id and by_fact_id must differ"})); } let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); // Manual supersede from the curation UI — same stamping rule as // the PATCH path. match dao.supersede_fact(&cx, old_id, body.by_fact_id, Some(("manual", "manual"))) { Ok(Some(fact)) => HttpResponse::Ok().json(fact), Ok(None) => { HttpResponse::NotFound().json(serde_json::json!({"error": "Old or new fact not found"})) } Err(e) => { log::error!("supersede_fact error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn restore_fact( _claims: Claims, id: web::Path, dao: web::Data>, ) -> impl Responder { let cx = opentelemetry::Context::current(); let fact_id = id.into_inner(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); match dao.revert_supersession(&cx, fact_id, Some(("manual", "manual"))) { Ok(Some(fact)) => HttpResponse::Ok().json(fact), Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})), Err(e) => { log::error!("restore_fact error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn get_recent( req: HttpRequest, claims: Claims, query: web::Query, dao: web::Data>, persona_dao: PersonaDaoData, ) -> impl Responder { let since = query .since .unwrap_or_else(|| Utc::now().timestamp() - 86400); let limit = query.limit.unwrap_or(20).min(100); let persona = resolve_persona_filter(&req, &claims, &persona_dao); let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); match dao.get_recent_activity(&cx, since, limit, &persona) { Ok(RecentActivity { entities, facts }) => { let entity_summaries: Vec = entities.into_iter().map(EntitySummary::from).collect(); HttpResponse::Ok().json(serde_json::json!({ "entities": entity_summaries, "facts": facts })) } Err(e) => { log::error!("get_recent error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn get_consolidation_proposals( req: HttpRequest, claims: Claims, query: web::Query, dao: web::Data>, persona_dao: PersonaDaoData, ) -> impl Responder { // Clamp threshold so a curious client can't drag the cosine // floor to 0 and pull every entity into one giant cluster. let threshold = query.threshold.unwrap_or(0.85).clamp(0.5, 0.99); let max_groups = query.limit.unwrap_or(50).clamp(1, 200) as usize; let persona = resolve_persona_filter(&req, &claims, &persona_dao); let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); let groups: Vec = match dao.find_consolidation_proposals(&cx, threshold, max_groups) { Ok(g) => g, Err(e) => { log::error!("find_consolidation_proposals: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } }; // Decorate with per-persona fact counts so the curation UI can // show "default 8 · journal 3" inline and the curator can pick // which entity is the strongest target. let entity_ids: Vec = groups .iter() .flat_map(|g| g.entities.iter().map(|e| e.id)) .collect(); let breakdowns = dao .get_persona_breakdowns_for_entities(&cx, &entity_ids, persona.user_id()) .unwrap_or_default(); let groups_view: Vec = groups .into_iter() .map(|g| ConsolidationGroupView { entities: g .entities .into_iter() .map(|e| { let id = e.id; let summary = EntitySummary::from(e); match breakdowns.get(&id) { Some(bd) => summary.with_persona_breakdown(bd.clone()), None => summary, } }) .collect(), min_cosine: g.min_cosine, max_cosine: g.max_cosine, }) .collect(); HttpResponse::Ok().json(ConsolidationResponse { groups: groups_view, }) }