Files
ImageApi/src/knowledge.rs
Cameron 191ccc0d77 feat: add entity-relationship knowledge memory to agentic insights
Implements persistent cross-photo knowledge memory so the agentic
insight loop can learn and recall facts about people, places, and
events across the photo collection.

Changes:
- photo_insights: drop UNIQUE(file_path) + INSERT OR REPLACE, replace
  with append-only rows + is_current flag for insight history retention
- New tables: entities, entity_facts, entity_photo_links with FK
  constraints and confidence scoring
- KnowledgeDao trait + SqliteKnowledgeDao with upsert, merge, and
  corroboration (confidence +0.1 on duplicate fact detection)
- Four new agent tools: recall_entities, recall_facts_for_photo,
  store_entity, store_fact (with object_entity_id FK support)
- Cameron entity auto-seeded with stable ID injected into system prompt
- Pre-run photo link clearing + post-loop source_insight_id backfill
- Audit REST API: GET/PATCH/DELETE /knowledge/entities/{id},
  POST /knowledge/entities/merge, GET/PATCH/DELETE /knowledge/facts/{id},
  GET /knowledge/recent

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 17:27:49 -04:00

568 lines
18 KiB
Rust

use actix_web::dev::{ServiceFactory, ServiceRequest};
use actix_web::{App, HttpResponse, Responder, web};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use std::sync::Mutex;
use crate::data::Claims;
use crate::database::models::{Entity, EntityFact, EntityPhotoLink};
use crate::database::{
EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, RecentActivity,
};
// ---------------------------------------------------------------------------
// Request / Response types
// ---------------------------------------------------------------------------
#[derive(Serialize)]
pub struct EntitySummary {
pub id: i32,
pub name: String,
pub entity_type: String,
pub description: String,
pub confidence: f32,
pub status: String,
pub created_at: i64,
pub updated_at: i64,
}
impl From<Entity> for EntitySummary {
fn from(e: Entity) -> Self {
EntitySummary {
id: e.id,
name: e.name,
entity_type: e.entity_type,
description: e.description,
confidence: e.confidence,
status: e.status,
created_at: e.created_at,
updated_at: e.updated_at,
}
}
}
#[derive(Serialize)]
pub struct EntityListResponse {
pub entities: Vec<EntitySummary>,
pub total: i64,
pub limit: i64,
pub offset: i64,
}
#[derive(Serialize)]
pub struct FactDetail {
pub id: i32,
pub predicate: String,
pub object_entity_id: Option<i32>,
pub object_entity_name: Option<String>,
pub object_value: Option<String>,
pub confidence: f32,
pub status: String,
pub source_photo: Option<String>,
pub source_insight_id: Option<i32>,
pub created_at: i64,
}
#[derive(Serialize)]
pub struct PhotoLinkDetail {
pub file_path: String,
pub role: String,
}
impl From<EntityPhotoLink> for PhotoLinkDetail {
fn from(l: EntityPhotoLink) -> Self {
PhotoLinkDetail {
file_path: l.file_path,
role: l.role,
}
}
}
#[derive(Serialize)]
pub struct EntityDetailResponse {
pub id: i32,
pub name: String,
pub entity_type: String,
pub description: String,
pub confidence: f32,
pub status: String,
pub created_at: i64,
pub updated_at: i64,
pub facts: Vec<FactDetail>,
pub photo_links: Vec<PhotoLinkDetail>,
}
#[derive(Serialize)]
pub struct FactSummary {
pub id: i32,
pub subject_entity_id: i32,
pub subject_entity_name: Option<String>,
pub predicate: String,
pub object_entity_id: Option<i32>,
pub object_entity_name: Option<String>,
pub object_value: Option<String>,
pub confidence: f32,
pub status: String,
pub source_photo: Option<String>,
pub source_insight_id: Option<i32>,
pub created_at: i64,
}
#[derive(Serialize)]
pub struct FactListResponse {
pub facts: Vec<FactSummary>,
pub total: i64,
pub limit: i64,
pub offset: i64,
}
#[derive(Deserialize)]
pub struct MergeRequest {
pub source_id: i32,
pub target_id: i32,
}
#[derive(Serialize)]
pub struct MergeResponse {
pub merged_entity_id: i32,
pub deleted_entity_id: i32,
pub facts_transferred: i64,
pub links_transferred: i64,
}
#[derive(Deserialize)]
pub struct EntityPatchRequest {
pub name: Option<String>,
pub description: Option<String>,
pub status: Option<String>,
pub confidence: Option<f32>,
}
#[derive(Deserialize)]
pub struct FactPatchRequest {
pub predicate: Option<String>,
pub object_value: Option<String>,
pub status: Option<String>,
pub confidence: Option<f32>,
}
#[derive(Deserialize)]
pub struct EntityListQuery {
#[serde(rename = "type")]
pub entity_type: Option<String>,
pub status: Option<String>,
pub search: Option<String>,
pub limit: Option<i64>,
pub offset: Option<i64>,
}
#[derive(Deserialize)]
pub struct FactListQuery {
pub entity_id: Option<i32>,
pub status: Option<String>,
pub predicate: Option<String>,
pub limit: Option<i64>,
pub offset: Option<i64>,
}
#[derive(Deserialize)]
pub struct RecentQuery {
pub since: Option<i64>,
pub limit: Option<i64>,
}
// ---------------------------------------------------------------------------
// Service registration
// ---------------------------------------------------------------------------
pub fn add_knowledge_services<T, D: KnowledgeDao + 'static>(app: App<T>) -> App<T>
where
T: ServiceFactory<ServiceRequest, Config = (), Error = actix_web::Error, InitError = ()>,
{
app.service(
web::scope("/knowledge")
.service(web::resource("/entities").route(web::get().to(list_entities::<D>)))
.service(web::resource("/entities/merge").route(web::post().to(merge_entities::<D>)))
.service(
web::resource("/entities/{id}")
.route(web::get().to(get_entity::<D>))
.route(web::patch().to(patch_entity::<D>))
.route(web::delete().to(delete_entity::<D>)),
)
.service(web::resource("/facts").route(web::get().to(list_facts::<D>)))
.service(
web::resource("/facts/{id}")
.route(web::patch().to(patch_fact::<D>))
.route(web::delete().to(delete_fact::<D>)),
)
.service(web::resource("/recent").route(web::get().to(get_recent::<D>))),
)
}
// ---------------------------------------------------------------------------
// Handlers
// ---------------------------------------------------------------------------
async fn list_entities<D: KnowledgeDao + 'static>(
_claims: Claims,
query: web::Query<EntityListQuery>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let limit = query.limit.unwrap_or(50).min(200);
let offset = query.offset.unwrap_or(0);
let status_filter = match query.status.as_deref() {
None | Some("active") => Some("active".to_string()),
Some("all") => None,
Some(s) => Some(s.to_string()),
};
let filter = EntityFilter {
entity_type: query.entity_type.clone(),
status: status_filter,
search: query.search.clone(),
limit,
offset,
};
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.list_entities(&cx, filter) {
Ok((entities, total)) => {
let summaries: Vec<EntitySummary> =
entities.into_iter().map(EntitySummary::from).collect();
HttpResponse::Ok().json(EntityListResponse {
entities: summaries,
total,
limit,
offset,
})
}
Err(e) => {
log::error!("list_entities error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn get_entity<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let entity_id = id.into_inner();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
let entity = match dao.get_entity_by_id(&cx, entity_id) {
Ok(Some(e)) => e,
Ok(None) => {
return HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"}));
}
Err(e) => {
log::error!("get_entity error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Fetch all facts (all statuses for audit)
let raw_facts: Vec<EntityFact> = match dao.get_facts_for_entity(&cx, entity_id) {
Ok(f) => f,
Err(e) => {
log::error!("get_facts_for_entity error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Resolve object entity names
let mut facts = Vec::with_capacity(raw_facts.len());
for f in raw_facts {
let object_entity_name = if let Some(oid) = f.object_entity_id {
dao.get_entity_by_id(&cx, oid)
.ok()
.flatten()
.map(|e| e.name)
} else {
None
};
facts.push(FactDetail {
id: f.id,
predicate: f.predicate,
object_entity_id: f.object_entity_id,
object_entity_name,
object_value: f.object_value,
confidence: f.confidence,
status: f.status,
source_photo: f.source_photo,
source_insight_id: f.source_insight_id,
created_at: f.created_at,
});
}
// Fetch photo links
let photo_links: Vec<PhotoLinkDetail> = match dao.get_links_for_entity(&cx, entity_id) {
Ok(links) => links.into_iter().map(PhotoLinkDetail::from).collect(),
Err(e) => {
log::error!("get_links_for_entity error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
HttpResponse::Ok().json(EntityDetailResponse {
id: entity.id,
name: entity.name,
entity_type: entity.entity_type,
description: entity.description,
confidence: entity.confidence,
status: entity.status,
created_at: entity.created_at,
updated_at: entity.updated_at,
facts,
photo_links,
})
}
async fn patch_entity<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
body: web::Json<EntityPatchRequest>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let entity_id = id.into_inner();
let patch = EntityPatch {
name: body.name.clone(),
description: body.description.clone(),
status: body.status.clone(),
confidence: body.confidence,
};
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.update_entity(&cx, entity_id, patch) {
Ok(Some(entity)) => HttpResponse::Ok().json(EntitySummary::from(entity)),
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"})),
Err(e) => {
log::error!("patch_entity error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn delete_entity<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let entity_id = id.into_inner();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
// Verify entity exists before deleting
match dao.get_entity_by_id(&cx, entity_id) {
Ok(None) => {
return HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"}));
}
Err(e) => {
log::error!("delete_entity lookup error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
Ok(Some(_)) => {}
}
match dao.delete_entity(&cx, entity_id) {
Ok(()) => HttpResponse::NoContent().finish(),
Err(e) => {
log::error!("delete_entity error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn merge_entities<D: KnowledgeDao + 'static>(
_claims: Claims,
body: web::Json<MergeRequest>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
if body.source_id == body.target_id {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "source_id and target_id must be different"}));
}
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
// Verify both entities exist
for id in [body.source_id, body.target_id] {
match dao.get_entity_by_id(&cx, id) {
Ok(None) => {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": format!("Entity {} not found", id)}));
}
Err(e) => {
log::error!("merge_entities lookup error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
Ok(Some(_)) => {}
}
}
match dao.merge_entities(&cx, body.source_id, body.target_id) {
Ok((facts_transferred, links_transferred)) => HttpResponse::Ok().json(MergeResponse {
merged_entity_id: body.target_id,
deleted_entity_id: body.source_id,
facts_transferred,
links_transferred,
}),
Err(e) => {
log::error!("merge_entities error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn list_facts<D: KnowledgeDao + 'static>(
_claims: Claims,
query: web::Query<FactListQuery>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let limit = query.limit.unwrap_or(50).min(200);
let offset = query.offset.unwrap_or(0);
let status_filter = match query.status.as_deref() {
None | Some("active") => Some("active".to_string()),
Some("all") => None,
Some(s) => Some(s.to_string()),
};
let filter = FactFilter {
entity_id: query.entity_id,
status: status_filter,
predicate: query.predicate.clone(),
limit,
offset,
};
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.list_facts(&cx, filter) {
Ok((facts, total)) => {
let mut summaries = Vec::with_capacity(facts.len());
for f in facts {
let subject_entity_name = dao
.get_entity_by_id(&cx, f.subject_entity_id)
.ok()
.flatten()
.map(|e| e.name);
let object_entity_name = if let Some(oid) = f.object_entity_id {
dao.get_entity_by_id(&cx, oid)
.ok()
.flatten()
.map(|e| e.name)
} else {
None
};
summaries.push(FactSummary {
id: f.id,
subject_entity_id: f.subject_entity_id,
subject_entity_name,
predicate: f.predicate,
object_entity_id: f.object_entity_id,
object_entity_name,
object_value: f.object_value,
confidence: f.confidence,
status: f.status,
source_photo: f.source_photo,
source_insight_id: f.source_insight_id,
created_at: f.created_at,
});
}
HttpResponse::Ok().json(FactListResponse {
facts: summaries,
total,
limit,
offset,
})
}
Err(e) => {
log::error!("list_facts error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn patch_fact<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
body: web::Json<FactPatchRequest>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let fact_id = id.into_inner();
let patch = FactPatch {
predicate: body.predicate.clone(),
object_value: body.object_value.clone(),
status: body.status.clone(),
confidence: body.confidence,
};
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.update_fact(&cx, fact_id, patch) {
Ok(Some(fact)) => HttpResponse::Ok().json(fact),
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})),
Err(e) => {
log::error!("patch_fact error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn delete_fact<D: KnowledgeDao + 'static>(
_claims: Claims,
id: web::Path<i32>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let cx = opentelemetry::Context::current();
let fact_id = id.into_inner();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.delete_fact(&cx, fact_id) {
Ok(()) => HttpResponse::NoContent().finish(),
Err(e) => {
log::warn!("delete_fact({}) error: {:?}", fact_id, e);
HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"}))
}
}
}
async fn get_recent<D: KnowledgeDao + 'static>(
_claims: Claims,
query: web::Query<RecentQuery>,
dao: web::Data<Mutex<D>>,
) -> impl Responder {
let since = query
.since
.unwrap_or_else(|| Utc::now().timestamp() - 86400);
let limit = query.limit.unwrap_or(20).min(100);
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.get_recent_activity(&cx, since, limit) {
Ok(RecentActivity { entities, facts }) => {
let entity_summaries: Vec<EntitySummary> =
entities.into_iter().map(EntitySummary::from).collect();
HttpResponse::Ok().json(serde_json::json!({
"entities": entity_summaries,
"facts": facts
}))
}
Err(e) => {
log::error!("get_recent error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}