use actix_web::dev::{ServiceFactory, ServiceRequest}; use actix_web::{App, HttpRequest, HttpResponse, Responder, web}; use chrono::Utc; use serde::{Deserialize, Serialize}; use std::sync::Mutex; use crate::data::Claims; use crate::database::models::{Entity, EntityFact, EntityPhotoLink, InsertEntityFact}; use crate::database::{ EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, RecentActivity, }; use crate::personas::PersonaDaoData; /// Resolve the `X-Persona-Id` header into a `PersonaFilter`. Missing /// header → `'default'`. If the persona has `include_all_memories=true`, /// returns `PersonaFilter::All` so reads see the full hive-mind pool. /// On JWT-parse failure (sub is not a numeric user_id) the resolver /// falls through to user_id=1 — the operator convention for service /// tokens — preserving the historical baseline view. Same fallback /// applies on any persona-lookup error. fn resolve_persona_filter( req: &HttpRequest, claims: &Claims, persona_dao: &PersonaDaoData, ) -> PersonaFilter { let pid = req .headers() .get("X-Persona-Id") .and_then(|v| v.to_str().ok()) .map(|s| s.to_string()) .unwrap_or_else(|| "default".to_string()); let uid = claims.sub.parse::().unwrap_or(1); let cx = opentelemetry::Context::current(); let mut dao = persona_dao.lock().expect("Unable to lock PersonaDao"); match dao.get_persona(&cx, uid, &pid) { Ok(Some(p)) if p.include_all_memories => PersonaFilter::All { user_id: uid }, _ => PersonaFilter::Single { user_id: uid, persona_id: pid, }, } } // --------------------------------------------------------------------------- // Request / Response types // --------------------------------------------------------------------------- #[derive(Serialize)] pub struct EntitySummary { pub id: i32, pub name: String, pub entity_type: String, pub description: String, pub confidence: f32, pub status: String, pub created_at: i64, pub updated_at: i64, } impl From for EntitySummary { fn from(e: Entity) -> Self { EntitySummary { id: e.id, name: e.name, entity_type: e.entity_type, description: e.description, confidence: e.confidence, status: e.status, created_at: e.created_at, updated_at: e.updated_at, } } } #[derive(Serialize)] pub struct EntityListResponse { pub entities: Vec, pub total: i64, pub limit: i64, pub offset: i64, } #[derive(Serialize)] pub struct FactDetail { pub id: i32, pub predicate: String, pub object_entity_id: Option, pub object_entity_name: Option, pub object_value: Option, pub confidence: f32, pub status: String, pub source_photo: Option, pub source_insight_id: Option, pub created_at: i64, } #[derive(Serialize)] pub struct PhotoLinkDetail { pub file_path: String, pub role: String, } impl From for PhotoLinkDetail { fn from(l: EntityPhotoLink) -> Self { PhotoLinkDetail { file_path: l.file_path, role: l.role, } } } #[derive(Serialize)] pub struct EntityDetailResponse { pub id: i32, pub name: String, pub entity_type: String, pub description: String, pub confidence: f32, pub status: String, pub created_at: i64, pub updated_at: i64, pub facts: Vec, pub photo_links: Vec, } #[derive(Serialize)] pub struct FactSummary { pub id: i32, pub subject_entity_id: i32, pub subject_entity_name: Option, pub predicate: String, pub object_entity_id: Option, pub object_entity_name: Option, pub object_value: Option, pub confidence: f32, pub status: String, pub source_photo: Option, pub source_insight_id: Option, pub created_at: i64, } #[derive(Serialize)] pub struct FactListResponse { pub facts: Vec, pub total: i64, pub limit: i64, pub offset: i64, } #[derive(Deserialize)] pub struct MergeRequest { pub source_id: i32, pub target_id: i32, } #[derive(Serialize)] pub struct MergeResponse { pub merged_entity_id: i32, pub deleted_entity_id: i32, pub facts_transferred: i64, pub links_transferred: i64, } #[derive(Deserialize)] pub struct EntityPatchRequest { pub name: Option, pub description: Option, pub status: Option, pub confidence: Option, } #[derive(Deserialize)] pub struct FactPatchRequest { pub predicate: Option, pub object_value: Option, pub status: Option, pub confidence: Option, } #[derive(Deserialize)] pub struct FactCreateRequest { pub subject_entity_id: i32, pub predicate: String, pub object_entity_id: Option, pub object_value: Option, pub source_photo: Option, pub confidence: Option, } #[derive(Deserialize)] pub struct EntityListQuery { #[serde(rename = "type")] pub entity_type: Option, pub status: Option, pub search: Option, pub limit: Option, pub offset: Option, } #[derive(Deserialize)] pub struct FactListQuery { pub entity_id: Option, pub status: Option, pub predicate: Option, pub limit: Option, pub offset: Option, } #[derive(Deserialize)] pub struct RecentQuery { pub since: Option, pub limit: Option, } // --------------------------------------------------------------------------- // Service registration // --------------------------------------------------------------------------- pub fn add_knowledge_services(app: App) -> App where T: ServiceFactory, { app.service( web::scope("/knowledge") .service(web::resource("/entities").route(web::get().to(list_entities::))) .service(web::resource("/entities/merge").route(web::post().to(merge_entities::))) .service( web::resource("/entities/{id}") .route(web::get().to(get_entity::)) .route(web::patch().to(patch_entity::)) .route(web::delete().to(delete_entity::)), ) .service( web::resource("/facts") .route(web::get().to(list_facts::)) .route(web::post().to(create_fact::)), ) .service( web::resource("/facts/{id}") .route(web::patch().to(patch_fact::)) .route(web::delete().to(delete_fact::)), ) .service(web::resource("/recent").route(web::get().to(get_recent::))), ) } // --------------------------------------------------------------------------- // Handlers // --------------------------------------------------------------------------- async fn list_entities( _claims: Claims, query: web::Query, dao: web::Data>, ) -> impl Responder { let limit = query.limit.unwrap_or(50).min(200); let offset = query.offset.unwrap_or(0); let status_filter = match query.status.as_deref() { None | Some("active") => Some("active".to_string()), Some("all") => None, Some(s) => Some(s.to_string()), }; let filter = EntityFilter { entity_type: query.entity_type.clone(), status: status_filter, search: query.search.clone(), limit, offset, }; let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); match dao.list_entities(&cx, filter) { Ok((entities, total)) => { let summaries: Vec = entities.into_iter().map(EntitySummary::from).collect(); HttpResponse::Ok().json(EntityListResponse { entities: summaries, total, limit, offset, }) } Err(e) => { log::error!("list_entities error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn get_entity( req: HttpRequest, claims: Claims, id: web::Path, dao: web::Data>, persona_dao: PersonaDaoData, ) -> impl Responder { let persona = resolve_persona_filter(&req, &claims, &persona_dao); let cx = opentelemetry::Context::current(); let entity_id = id.into_inner(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); let entity = match dao.get_entity_by_id(&cx, entity_id) { Ok(Some(e)) => e, Ok(None) => { return HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"})); } Err(e) => { log::error!("get_entity error: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } }; // Fetch all facts (all statuses for audit), scoped to the active persona. let raw_facts: Vec = match dao.get_facts_for_entity(&cx, entity_id, &persona) { Ok(f) => f, Err(e) => { log::error!("get_facts_for_entity error: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } }; // Resolve object entity names let mut facts = Vec::with_capacity(raw_facts.len()); for f in raw_facts { let object_entity_name = if let Some(oid) = f.object_entity_id { dao.get_entity_by_id(&cx, oid) .ok() .flatten() .map(|e| e.name) } else { None }; facts.push(FactDetail { id: f.id, predicate: f.predicate, object_entity_id: f.object_entity_id, object_entity_name, object_value: f.object_value, confidence: f.confidence, status: f.status, source_photo: f.source_photo, source_insight_id: f.source_insight_id, created_at: f.created_at, }); } // Fetch photo links let photo_links: Vec = match dao.get_links_for_entity(&cx, entity_id) { Ok(links) => links.into_iter().map(PhotoLinkDetail::from).collect(), Err(e) => { log::error!("get_links_for_entity error: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } }; HttpResponse::Ok().json(EntityDetailResponse { id: entity.id, name: entity.name, entity_type: entity.entity_type, description: entity.description, confidence: entity.confidence, status: entity.status, created_at: entity.created_at, updated_at: entity.updated_at, facts, photo_links, }) } async fn patch_entity( _claims: Claims, id: web::Path, body: web::Json, dao: web::Data>, ) -> impl Responder { let cx = opentelemetry::Context::current(); let entity_id = id.into_inner(); let patch = EntityPatch { name: body.name.clone(), description: body.description.clone(), status: body.status.clone(), confidence: body.confidence, }; let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); match dao.update_entity(&cx, entity_id, patch) { Ok(Some(entity)) => HttpResponse::Ok().json(EntitySummary::from(entity)), Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"})), Err(e) => { log::error!("patch_entity error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn delete_entity( _claims: Claims, id: web::Path, dao: web::Data>, ) -> impl Responder { let cx = opentelemetry::Context::current(); let entity_id = id.into_inner(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); // Verify entity exists before deleting match dao.get_entity_by_id(&cx, entity_id) { Ok(None) => { return HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"})); } Err(e) => { log::error!("delete_entity lookup error: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } Ok(Some(_)) => {} } match dao.delete_entity(&cx, entity_id) { Ok(()) => HttpResponse::NoContent().finish(), Err(e) => { log::error!("delete_entity error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn merge_entities( _claims: Claims, body: web::Json, dao: web::Data>, ) -> impl Responder { if body.source_id == body.target_id { return HttpResponse::BadRequest() .json(serde_json::json!({"error": "source_id and target_id must be different"})); } let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); // Verify both entities exist for id in [body.source_id, body.target_id] { match dao.get_entity_by_id(&cx, id) { Ok(None) => { return HttpResponse::BadRequest() .json(serde_json::json!({"error": format!("Entity {} not found", id)})); } Err(e) => { log::error!("merge_entities lookup error: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } Ok(Some(_)) => {} } } match dao.merge_entities(&cx, body.source_id, body.target_id) { Ok((facts_transferred, links_transferred)) => HttpResponse::Ok().json(MergeResponse { merged_entity_id: body.target_id, deleted_entity_id: body.source_id, facts_transferred, links_transferred, }), Err(e) => { log::error!("merge_entities error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn list_facts( req: HttpRequest, claims: Claims, query: web::Query, dao: web::Data>, persona_dao: PersonaDaoData, ) -> impl Responder { let limit = query.limit.unwrap_or(50).min(200); let offset = query.offset.unwrap_or(0); let status_filter = match query.status.as_deref() { None | Some("active") => Some("active".to_string()), Some("all") => None, Some(s) => Some(s.to_string()), }; let persona = resolve_persona_filter(&req, &claims, &persona_dao); let filter = FactFilter { entity_id: query.entity_id, status: status_filter, predicate: query.predicate.clone(), persona, limit, offset, }; let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); match dao.list_facts(&cx, filter) { Ok((facts, total)) => { let mut summaries = Vec::with_capacity(facts.len()); for f in facts { let subject_entity_name = dao .get_entity_by_id(&cx, f.subject_entity_id) .ok() .flatten() .map(|e| e.name); let object_entity_name = if let Some(oid) = f.object_entity_id { dao.get_entity_by_id(&cx, oid) .ok() .flatten() .map(|e| e.name) } else { None }; summaries.push(FactSummary { id: f.id, subject_entity_id: f.subject_entity_id, subject_entity_name, predicate: f.predicate, object_entity_id: f.object_entity_id, object_entity_name, object_value: f.object_value, confidence: f.confidence, status: f.status, source_photo: f.source_photo, source_insight_id: f.source_insight_id, created_at: f.created_at, }); } HttpResponse::Ok().json(FactListResponse { facts: summaries, total, limit, offset, }) } Err(e) => { log::error!("list_facts error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn create_fact( req: HttpRequest, claims: Claims, body: web::Json, dao: web::Data>, persona_dao: PersonaDaoData, ) -> impl Responder { if body.object_entity_id.is_none() && body.object_value.is_none() { return HttpResponse::BadRequest().json(serde_json::json!({ "error": "object_entity_id or object_value is required" })); } if body.predicate.trim().is_empty() { return HttpResponse::BadRequest() .json(serde_json::json!({"error": "predicate must not be empty"})); } // Persona scoping: facts are written under the active single persona. // PersonaFilter::All is read-only ("hive-mind" view); callers should // pin a specific persona for writes via X-Persona-Id. let persona = resolve_persona_filter(&req, &claims, &persona_dao); let (user_id, persona_id) = match &persona { PersonaFilter::Single { user_id, persona_id } => (*user_id, persona_id.clone()), PersonaFilter::All { user_id } => (*user_id, "default".to_string()), }; let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); // Verify subject entity exists. match dao.get_entity_by_id(&cx, body.subject_entity_id) { Ok(None) => { return HttpResponse::BadRequest().json(serde_json::json!({ "error": format!("Subject entity {} not found", body.subject_entity_id) })); } Err(e) => { log::error!("create_fact subject lookup error: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } Ok(Some(_)) => {} } // Optional object entity validation when supplied. if let Some(oid) = body.object_entity_id { match dao.get_entity_by_id(&cx, oid) { Ok(None) => { return HttpResponse::BadRequest().json(serde_json::json!({ "error": format!("Object entity {} not found", oid) })); } Err(e) => { log::error!("create_fact object lookup error: {:?}", e); return HttpResponse::InternalServerError() .json(serde_json::json!({"error": "Database error"})); } Ok(Some(_)) => {} } } let now = Utc::now().timestamp(); let confidence = body.confidence.unwrap_or(0.6).clamp(0.0, 0.95); let insert = InsertEntityFact { subject_entity_id: body.subject_entity_id, predicate: body.predicate.trim().to_string(), object_entity_id: body.object_entity_id, object_value: body.object_value.clone(), source_photo: body.source_photo.clone(), source_insight_id: None, confidence, status: "active".to_string(), created_at: now, persona_id, user_id, }; match dao.upsert_fact(&cx, insert) { Ok((fact, is_new)) => { let status = if is_new { actix_web::http::StatusCode::CREATED } else { actix_web::http::StatusCode::OK }; HttpResponse::build(status).json(fact) } Err(e) => { log::error!("create_fact upsert error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn patch_fact( _claims: Claims, id: web::Path, body: web::Json, dao: web::Data>, ) -> impl Responder { let cx = opentelemetry::Context::current(); let fact_id = id.into_inner(); let patch = FactPatch { predicate: body.predicate.clone(), object_value: body.object_value.clone(), status: body.status.clone(), confidence: body.confidence, }; let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); match dao.update_fact(&cx, fact_id, patch) { Ok(Some(fact)) => HttpResponse::Ok().json(fact), Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})), Err(e) => { log::error!("patch_fact error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } async fn delete_fact( _claims: Claims, id: web::Path, dao: web::Data>, ) -> impl Responder { let cx = opentelemetry::Context::current(); let fact_id = id.into_inner(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); match dao.delete_fact(&cx, fact_id) { Ok(()) => HttpResponse::NoContent().finish(), Err(e) => { log::warn!("delete_fact({}) error: {:?}", fact_id, e); HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})) } } } async fn get_recent( req: HttpRequest, claims: Claims, query: web::Query, dao: web::Data>, persona_dao: PersonaDaoData, ) -> impl Responder { let since = query .since .unwrap_or_else(|| Utc::now().timestamp() - 86400); let limit = query.limit.unwrap_or(20).min(100); let persona = resolve_persona_filter(&req, &claims, &persona_dao); let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); match dao.get_recent_activity(&cx, since, limit, &persona) { Ok(RecentActivity { entities, facts }) => { let entity_summaries: Vec = entities.into_iter().map(EntitySummary::from).collect(); HttpResponse::Ok().json(serde_json::json!({ "entities": entity_summaries, "facts": facts })) } Err(e) => { log::error!("get_recent error: {:?}", e); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } }