feat: add entity-relationship knowledge memory to agentic insights
Implements persistent cross-photo knowledge memory so the agentic
insight loop can learn and recall facts about people, places, and
events across the photo collection.
Changes:
- photo_insights: drop UNIQUE(file_path) + INSERT OR REPLACE, replace
with append-only rows + is_current flag for insight history retention
- New tables: entities, entity_facts, entity_photo_links with FK
constraints and confidence scoring
- KnowledgeDao trait + SqliteKnowledgeDao with upsert, merge, and
corroboration (confidence +0.1 on duplicate fact detection)
- Four new agent tools: recall_entities, recall_facts_for_photo,
store_entity, store_fact (with object_entity_id FK support)
- Cameron entity auto-seeded with stable ID injected into system prompt
- Pre-run photo link clearing + post-loop source_insight_id backfill
- Audit REST API: GET/PATCH/DELETE /knowledge/entities/{id},
POST /knowledge/entities/merge, GET/PATCH/DELETE /knowledge/facts/{id},
GET /knowledge/recent
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
567
src/knowledge.rs
Normal file
567
src/knowledge.rs
Normal file
@@ -0,0 +1,567 @@
|
||||
use actix_web::dev::{ServiceFactory, ServiceRequest};
|
||||
use actix_web::{App, HttpResponse, Responder, web};
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Mutex;
|
||||
|
||||
use crate::data::Claims;
|
||||
use crate::database::models::{Entity, EntityFact, EntityPhotoLink};
|
||||
use crate::database::{
|
||||
EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, RecentActivity,
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Request / Response types
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct EntitySummary {
|
||||
pub id: i32,
|
||||
pub name: String,
|
||||
pub entity_type: String,
|
||||
pub description: String,
|
||||
pub confidence: f32,
|
||||
pub status: String,
|
||||
pub created_at: i64,
|
||||
pub updated_at: i64,
|
||||
}
|
||||
|
||||
impl From<Entity> for EntitySummary {
|
||||
fn from(e: Entity) -> Self {
|
||||
EntitySummary {
|
||||
id: e.id,
|
||||
name: e.name,
|
||||
entity_type: e.entity_type,
|
||||
description: e.description,
|
||||
confidence: e.confidence,
|
||||
status: e.status,
|
||||
created_at: e.created_at,
|
||||
updated_at: e.updated_at,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct EntityListResponse {
|
||||
pub entities: Vec<EntitySummary>,
|
||||
pub total: i64,
|
||||
pub limit: i64,
|
||||
pub offset: i64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct FactDetail {
|
||||
pub id: i32,
|
||||
pub predicate: String,
|
||||
pub object_entity_id: Option<i32>,
|
||||
pub object_entity_name: Option<String>,
|
||||
pub object_value: Option<String>,
|
||||
pub confidence: f32,
|
||||
pub status: String,
|
||||
pub source_photo: Option<String>,
|
||||
pub source_insight_id: Option<i32>,
|
||||
pub created_at: i64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct PhotoLinkDetail {
|
||||
pub file_path: String,
|
||||
pub role: String,
|
||||
}
|
||||
|
||||
impl From<EntityPhotoLink> for PhotoLinkDetail {
|
||||
fn from(l: EntityPhotoLink) -> Self {
|
||||
PhotoLinkDetail {
|
||||
file_path: l.file_path,
|
||||
role: l.role,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct EntityDetailResponse {
|
||||
pub id: i32,
|
||||
pub name: String,
|
||||
pub entity_type: String,
|
||||
pub description: String,
|
||||
pub confidence: f32,
|
||||
pub status: String,
|
||||
pub created_at: i64,
|
||||
pub updated_at: i64,
|
||||
pub facts: Vec<FactDetail>,
|
||||
pub photo_links: Vec<PhotoLinkDetail>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct FactSummary {
|
||||
pub id: i32,
|
||||
pub subject_entity_id: i32,
|
||||
pub subject_entity_name: Option<String>,
|
||||
pub predicate: String,
|
||||
pub object_entity_id: Option<i32>,
|
||||
pub object_entity_name: Option<String>,
|
||||
pub object_value: Option<String>,
|
||||
pub confidence: f32,
|
||||
pub status: String,
|
||||
pub source_photo: Option<String>,
|
||||
pub source_insight_id: Option<i32>,
|
||||
pub created_at: i64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct FactListResponse {
|
||||
pub facts: Vec<FactSummary>,
|
||||
pub total: i64,
|
||||
pub limit: i64,
|
||||
pub offset: i64,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct MergeRequest {
|
||||
pub source_id: i32,
|
||||
pub target_id: i32,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct MergeResponse {
|
||||
pub merged_entity_id: i32,
|
||||
pub deleted_entity_id: i32,
|
||||
pub facts_transferred: i64,
|
||||
pub links_transferred: i64,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct EntityPatchRequest {
|
||||
pub name: Option<String>,
|
||||
pub description: Option<String>,
|
||||
pub status: Option<String>,
|
||||
pub confidence: Option<f32>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct FactPatchRequest {
|
||||
pub predicate: Option<String>,
|
||||
pub object_value: Option<String>,
|
||||
pub status: Option<String>,
|
||||
pub confidence: Option<f32>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct EntityListQuery {
|
||||
#[serde(rename = "type")]
|
||||
pub entity_type: Option<String>,
|
||||
pub status: Option<String>,
|
||||
pub search: Option<String>,
|
||||
pub limit: Option<i64>,
|
||||
pub offset: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct FactListQuery {
|
||||
pub entity_id: Option<i32>,
|
||||
pub status: Option<String>,
|
||||
pub predicate: Option<String>,
|
||||
pub limit: Option<i64>,
|
||||
pub offset: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct RecentQuery {
|
||||
pub since: Option<i64>,
|
||||
pub limit: Option<i64>,
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Service registration
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub fn add_knowledge_services<T, D: KnowledgeDao + 'static>(app: App<T>) -> App<T>
|
||||
where
|
||||
T: ServiceFactory<ServiceRequest, Config = (), Error = actix_web::Error, InitError = ()>,
|
||||
{
|
||||
app.service(
|
||||
web::scope("/knowledge")
|
||||
.service(web::resource("/entities").route(web::get().to(list_entities::<D>)))
|
||||
.service(web::resource("/entities/merge").route(web::post().to(merge_entities::<D>)))
|
||||
.service(
|
||||
web::resource("/entities/{id}")
|
||||
.route(web::get().to(get_entity::<D>))
|
||||
.route(web::patch().to(patch_entity::<D>))
|
||||
.route(web::delete().to(delete_entity::<D>)),
|
||||
)
|
||||
.service(web::resource("/facts").route(web::get().to(list_facts::<D>)))
|
||||
.service(
|
||||
web::resource("/facts/{id}")
|
||||
.route(web::patch().to(patch_fact::<D>))
|
||||
.route(web::delete().to(delete_fact::<D>)),
|
||||
)
|
||||
.service(web::resource("/recent").route(web::get().to(get_recent::<D>))),
|
||||
)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Handlers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async fn list_entities<D: KnowledgeDao + 'static>(
|
||||
_claims: Claims,
|
||||
query: web::Query<EntityListQuery>,
|
||||
dao: web::Data<Mutex<D>>,
|
||||
) -> impl Responder {
|
||||
let limit = query.limit.unwrap_or(50).min(200);
|
||||
let offset = query.offset.unwrap_or(0);
|
||||
|
||||
let status_filter = match query.status.as_deref() {
|
||||
None | Some("active") => Some("active".to_string()),
|
||||
Some("all") => None,
|
||||
Some(s) => Some(s.to_string()),
|
||||
};
|
||||
|
||||
let filter = EntityFilter {
|
||||
entity_type: query.entity_type.clone(),
|
||||
status: status_filter,
|
||||
search: query.search.clone(),
|
||||
limit,
|
||||
offset,
|
||||
};
|
||||
|
||||
let cx = opentelemetry::Context::current();
|
||||
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
|
||||
match dao.list_entities(&cx, filter) {
|
||||
Ok((entities, total)) => {
|
||||
let summaries: Vec<EntitySummary> =
|
||||
entities.into_iter().map(EntitySummary::from).collect();
|
||||
HttpResponse::Ok().json(EntityListResponse {
|
||||
entities: summaries,
|
||||
total,
|
||||
limit,
|
||||
offset,
|
||||
})
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("list_entities error: {:?}", e);
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_entity<D: KnowledgeDao + 'static>(
|
||||
_claims: Claims,
|
||||
id: web::Path<i32>,
|
||||
dao: web::Data<Mutex<D>>,
|
||||
) -> impl Responder {
|
||||
let cx = opentelemetry::Context::current();
|
||||
let entity_id = id.into_inner();
|
||||
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
|
||||
|
||||
let entity = match dao.get_entity_by_id(&cx, entity_id) {
|
||||
Ok(Some(e)) => e,
|
||||
Ok(None) => {
|
||||
return HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"}));
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("get_entity error: {:?}", e);
|
||||
return HttpResponse::InternalServerError()
|
||||
.json(serde_json::json!({"error": "Database error"}));
|
||||
}
|
||||
};
|
||||
|
||||
// Fetch all facts (all statuses for audit)
|
||||
let raw_facts: Vec<EntityFact> = match dao.get_facts_for_entity(&cx, entity_id) {
|
||||
Ok(f) => f,
|
||||
Err(e) => {
|
||||
log::error!("get_facts_for_entity error: {:?}", e);
|
||||
return HttpResponse::InternalServerError()
|
||||
.json(serde_json::json!({"error": "Database error"}));
|
||||
}
|
||||
};
|
||||
|
||||
// Resolve object entity names
|
||||
let mut facts = Vec::with_capacity(raw_facts.len());
|
||||
for f in raw_facts {
|
||||
let object_entity_name = if let Some(oid) = f.object_entity_id {
|
||||
dao.get_entity_by_id(&cx, oid)
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|e| e.name)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
facts.push(FactDetail {
|
||||
id: f.id,
|
||||
predicate: f.predicate,
|
||||
object_entity_id: f.object_entity_id,
|
||||
object_entity_name,
|
||||
object_value: f.object_value,
|
||||
confidence: f.confidence,
|
||||
status: f.status,
|
||||
source_photo: f.source_photo,
|
||||
source_insight_id: f.source_insight_id,
|
||||
created_at: f.created_at,
|
||||
});
|
||||
}
|
||||
|
||||
// Fetch photo links
|
||||
let photo_links: Vec<PhotoLinkDetail> = match dao.get_links_for_entity(&cx, entity_id) {
|
||||
Ok(links) => links.into_iter().map(PhotoLinkDetail::from).collect(),
|
||||
Err(e) => {
|
||||
log::error!("get_links_for_entity error: {:?}", e);
|
||||
return HttpResponse::InternalServerError()
|
||||
.json(serde_json::json!({"error": "Database error"}));
|
||||
}
|
||||
};
|
||||
|
||||
HttpResponse::Ok().json(EntityDetailResponse {
|
||||
id: entity.id,
|
||||
name: entity.name,
|
||||
entity_type: entity.entity_type,
|
||||
description: entity.description,
|
||||
confidence: entity.confidence,
|
||||
status: entity.status,
|
||||
created_at: entity.created_at,
|
||||
updated_at: entity.updated_at,
|
||||
facts,
|
||||
photo_links,
|
||||
})
|
||||
}
|
||||
|
||||
async fn patch_entity<D: KnowledgeDao + 'static>(
|
||||
_claims: Claims,
|
||||
id: web::Path<i32>,
|
||||
body: web::Json<EntityPatchRequest>,
|
||||
dao: web::Data<Mutex<D>>,
|
||||
) -> impl Responder {
|
||||
let cx = opentelemetry::Context::current();
|
||||
let entity_id = id.into_inner();
|
||||
let patch = EntityPatch {
|
||||
name: body.name.clone(),
|
||||
description: body.description.clone(),
|
||||
status: body.status.clone(),
|
||||
confidence: body.confidence,
|
||||
};
|
||||
|
||||
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
|
||||
match dao.update_entity(&cx, entity_id, patch) {
|
||||
Ok(Some(entity)) => HttpResponse::Ok().json(EntitySummary::from(entity)),
|
||||
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"})),
|
||||
Err(e) => {
|
||||
log::error!("patch_entity error: {:?}", e);
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_entity<D: KnowledgeDao + 'static>(
|
||||
_claims: Claims,
|
||||
id: web::Path<i32>,
|
||||
dao: web::Data<Mutex<D>>,
|
||||
) -> impl Responder {
|
||||
let cx = opentelemetry::Context::current();
|
||||
let entity_id = id.into_inner();
|
||||
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
|
||||
|
||||
// Verify entity exists before deleting
|
||||
match dao.get_entity_by_id(&cx, entity_id) {
|
||||
Ok(None) => {
|
||||
return HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"}));
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("delete_entity lookup error: {:?}", e);
|
||||
return HttpResponse::InternalServerError()
|
||||
.json(serde_json::json!({"error": "Database error"}));
|
||||
}
|
||||
Ok(Some(_)) => {}
|
||||
}
|
||||
|
||||
match dao.delete_entity(&cx, entity_id) {
|
||||
Ok(()) => HttpResponse::NoContent().finish(),
|
||||
Err(e) => {
|
||||
log::error!("delete_entity error: {:?}", e);
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn merge_entities<D: KnowledgeDao + 'static>(
|
||||
_claims: Claims,
|
||||
body: web::Json<MergeRequest>,
|
||||
dao: web::Data<Mutex<D>>,
|
||||
) -> impl Responder {
|
||||
if body.source_id == body.target_id {
|
||||
return HttpResponse::BadRequest()
|
||||
.json(serde_json::json!({"error": "source_id and target_id must be different"}));
|
||||
}
|
||||
|
||||
let cx = opentelemetry::Context::current();
|
||||
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
|
||||
|
||||
// Verify both entities exist
|
||||
for id in [body.source_id, body.target_id] {
|
||||
match dao.get_entity_by_id(&cx, id) {
|
||||
Ok(None) => {
|
||||
return HttpResponse::BadRequest()
|
||||
.json(serde_json::json!({"error": format!("Entity {} not found", id)}));
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("merge_entities lookup error: {:?}", e);
|
||||
return HttpResponse::InternalServerError()
|
||||
.json(serde_json::json!({"error": "Database error"}));
|
||||
}
|
||||
Ok(Some(_)) => {}
|
||||
}
|
||||
}
|
||||
|
||||
match dao.merge_entities(&cx, body.source_id, body.target_id) {
|
||||
Ok((facts_transferred, links_transferred)) => HttpResponse::Ok().json(MergeResponse {
|
||||
merged_entity_id: body.target_id,
|
||||
deleted_entity_id: body.source_id,
|
||||
facts_transferred,
|
||||
links_transferred,
|
||||
}),
|
||||
Err(e) => {
|
||||
log::error!("merge_entities error: {:?}", e);
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_facts<D: KnowledgeDao + 'static>(
|
||||
_claims: Claims,
|
||||
query: web::Query<FactListQuery>,
|
||||
dao: web::Data<Mutex<D>>,
|
||||
) -> impl Responder {
|
||||
let limit = query.limit.unwrap_or(50).min(200);
|
||||
let offset = query.offset.unwrap_or(0);
|
||||
|
||||
let status_filter = match query.status.as_deref() {
|
||||
None | Some("active") => Some("active".to_string()),
|
||||
Some("all") => None,
|
||||
Some(s) => Some(s.to_string()),
|
||||
};
|
||||
|
||||
let filter = FactFilter {
|
||||
entity_id: query.entity_id,
|
||||
status: status_filter,
|
||||
predicate: query.predicate.clone(),
|
||||
limit,
|
||||
offset,
|
||||
};
|
||||
|
||||
let cx = opentelemetry::Context::current();
|
||||
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
|
||||
match dao.list_facts(&cx, filter) {
|
||||
Ok((facts, total)) => {
|
||||
let mut summaries = Vec::with_capacity(facts.len());
|
||||
for f in facts {
|
||||
let subject_entity_name = dao
|
||||
.get_entity_by_id(&cx, f.subject_entity_id)
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|e| e.name);
|
||||
let object_entity_name = if let Some(oid) = f.object_entity_id {
|
||||
dao.get_entity_by_id(&cx, oid)
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|e| e.name)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
summaries.push(FactSummary {
|
||||
id: f.id,
|
||||
subject_entity_id: f.subject_entity_id,
|
||||
subject_entity_name,
|
||||
predicate: f.predicate,
|
||||
object_entity_id: f.object_entity_id,
|
||||
object_entity_name,
|
||||
object_value: f.object_value,
|
||||
confidence: f.confidence,
|
||||
status: f.status,
|
||||
source_photo: f.source_photo,
|
||||
source_insight_id: f.source_insight_id,
|
||||
created_at: f.created_at,
|
||||
});
|
||||
}
|
||||
HttpResponse::Ok().json(FactListResponse {
|
||||
facts: summaries,
|
||||
total,
|
||||
limit,
|
||||
offset,
|
||||
})
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("list_facts error: {:?}", e);
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn patch_fact<D: KnowledgeDao + 'static>(
|
||||
_claims: Claims,
|
||||
id: web::Path<i32>,
|
||||
body: web::Json<FactPatchRequest>,
|
||||
dao: web::Data<Mutex<D>>,
|
||||
) -> impl Responder {
|
||||
let cx = opentelemetry::Context::current();
|
||||
let fact_id = id.into_inner();
|
||||
let patch = FactPatch {
|
||||
predicate: body.predicate.clone(),
|
||||
object_value: body.object_value.clone(),
|
||||
status: body.status.clone(),
|
||||
confidence: body.confidence,
|
||||
};
|
||||
|
||||
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
|
||||
match dao.update_fact(&cx, fact_id, patch) {
|
||||
Ok(Some(fact)) => HttpResponse::Ok().json(fact),
|
||||
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})),
|
||||
Err(e) => {
|
||||
log::error!("patch_fact error: {:?}", e);
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_fact<D: KnowledgeDao + 'static>(
|
||||
_claims: Claims,
|
||||
id: web::Path<i32>,
|
||||
dao: web::Data<Mutex<D>>,
|
||||
) -> impl Responder {
|
||||
let cx = opentelemetry::Context::current();
|
||||
let fact_id = id.into_inner();
|
||||
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
|
||||
match dao.delete_fact(&cx, fact_id) {
|
||||
Ok(()) => HttpResponse::NoContent().finish(),
|
||||
Err(e) => {
|
||||
log::warn!("delete_fact({}) error: {:?}", fact_id, e);
|
||||
HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_recent<D: KnowledgeDao + 'static>(
|
||||
_claims: Claims,
|
||||
query: web::Query<RecentQuery>,
|
||||
dao: web::Data<Mutex<D>>,
|
||||
) -> impl Responder {
|
||||
let since = query
|
||||
.since
|
||||
.unwrap_or_else(|| Utc::now().timestamp() - 86400);
|
||||
let limit = query.limit.unwrap_or(20).min(100);
|
||||
|
||||
let cx = opentelemetry::Context::current();
|
||||
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
|
||||
match dao.get_recent_activity(&cx, since, limit) {
|
||||
Ok(RecentActivity { entities, facts }) => {
|
||||
let entity_summaries: Vec<EntitySummary> =
|
||||
entities.into_iter().map(EntitySummary::from).collect();
|
||||
HttpResponse::Ok().json(serde_json::json!({
|
||||
"entities": entity_summaries,
|
||||
"facts": facts
|
||||
}))
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("get_recent error: {:?}", e);
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user