From 191ccc0d77d1745a1884d9a5c904bd792ebc812d Mon Sep 17 00:00:00 2001 From: Cameron Date: Fri, 3 Apr 2026 17:27:49 -0400 Subject: [PATCH] feat: add entity-relationship knowledge memory to agentic insights Implements persistent cross-photo knowledge memory so the agentic insight loop can learn and recall facts about people, places, and events across the photo collection. Changes: - photo_insights: drop UNIQUE(file_path) + INSERT OR REPLACE, replace with append-only rows + is_current flag for insight history retention - New tables: entities, entity_facts, entity_photo_links with FK constraints and confidence scoring - KnowledgeDao trait + SqliteKnowledgeDao with upsert, merge, and corroboration (confidence +0.1 on duplicate fact detection) - Four new agent tools: recall_entities, recall_facts_for_photo, store_entity, store_fact (with object_entity_id FK support) - Cameron entity auto-seeded with stable ID injected into system prompt - Pre-run photo link clearing + post-loop source_insight_id backfill - Audit REST API: GET/PATCH/DELETE /knowledge/entities/{id}, POST /knowledge/entities/merge, GET/PATCH/DELETE /knowledge/facts/{id}, GET /knowledge/recent Co-Authored-By: Claude Sonnet 4.6 --- .../down.sql | 19 + .../up.sql | 25 + .../down.sql | 3 + .../up.sql | 55 ++ src/database/insights_dao.rs | 45 +- src/database/knowledge_dao.rs | 854 ++++++++++++++++++ src/database/mod.rs | 5 + src/database/models.rs | 79 +- src/database/schema.rs | 44 + src/knowledge.rs | 567 ++++++++++++ src/main.rs | 5 + src/state.rs | 12 +- 12 files changed, 1706 insertions(+), 7 deletions(-) create mode 100644 migrations/2026-04-02-000000_photo_insights_history/down.sql create mode 100644 migrations/2026-04-02-000000_photo_insights_history/up.sql create mode 100644 migrations/2026-04-02-000100_add_knowledge_memory/down.sql create mode 100644 migrations/2026-04-02-000100_add_knowledge_memory/up.sql create mode 100644 src/database/knowledge_dao.rs create mode 100644 src/knowledge.rs diff --git a/migrations/2026-04-02-000000_photo_insights_history/down.sql b/migrations/2026-04-02-000000_photo_insights_history/down.sql new file mode 100644 index 0000000..ca1ec7f --- /dev/null +++ b/migrations/2026-04-02-000000_photo_insights_history/down.sql @@ -0,0 +1,19 @@ +-- Restore original schema, retaining only the current insight per file. +CREATE TABLE photo_insights_old ( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + file_path TEXT NOT NULL UNIQUE, + title TEXT NOT NULL, + summary TEXT NOT NULL, + generated_at BIGINT NOT NULL, + model_version TEXT NOT NULL +); + +INSERT INTO photo_insights_old (id, file_path, title, summary, generated_at, model_version) + SELECT id, file_path, title, summary, generated_at, model_version + FROM photo_insights + WHERE is_current = 1; + +DROP TABLE photo_insights; +ALTER TABLE photo_insights_old RENAME TO photo_insights; + +CREATE INDEX IF NOT EXISTS idx_photo_insights_path ON photo_insights(file_path); diff --git a/migrations/2026-04-02-000000_photo_insights_history/up.sql b/migrations/2026-04-02-000000_photo_insights_history/up.sql new file mode 100644 index 0000000..8995f2a --- /dev/null +++ b/migrations/2026-04-02-000000_photo_insights_history/up.sql @@ -0,0 +1,25 @@ +-- Convert photo_insights to an append-only history table. +-- SQLite cannot drop a UNIQUE constraint via ALTER TABLE, so we recreate the table. +-- This preserves existing insight IDs so that future entity_facts.source_insight_id +-- FK references remain valid. + +CREATE TABLE photo_insights_new ( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + file_path TEXT NOT NULL, + title TEXT NOT NULL, + summary TEXT NOT NULL, + generated_at BIGINT NOT NULL, + model_version TEXT NOT NULL, + is_current BOOLEAN NOT NULL DEFAULT 0 +); + +-- Migrate existing rows; mark them all as current (one row per path currently). +INSERT INTO photo_insights_new (id, file_path, title, summary, generated_at, model_version, is_current) + SELECT id, file_path, title, summary, generated_at, model_version, 1 + FROM photo_insights; + +DROP TABLE photo_insights; +ALTER TABLE photo_insights_new RENAME TO photo_insights; + +CREATE INDEX idx_photo_insights_file_path ON photo_insights(file_path); +CREATE INDEX idx_photo_insights_current ON photo_insights(file_path, is_current); diff --git a/migrations/2026-04-02-000100_add_knowledge_memory/down.sql b/migrations/2026-04-02-000100_add_knowledge_memory/down.sql new file mode 100644 index 0000000..cc6fa21 --- /dev/null +++ b/migrations/2026-04-02-000100_add_knowledge_memory/down.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS entity_photo_links; +DROP TABLE IF EXISTS entity_facts; +DROP TABLE IF EXISTS entities; diff --git a/migrations/2026-04-02-000100_add_knowledge_memory/up.sql b/migrations/2026-04-02-000100_add_knowledge_memory/up.sql new file mode 100644 index 0000000..c84cf2a --- /dev/null +++ b/migrations/2026-04-02-000100_add_knowledge_memory/up.sql @@ -0,0 +1,55 @@ +-- Entity-relationship knowledge memory tables. +-- Entities are the nodes (people, places, events, things). +-- entity_facts are typed claims about or between entities. +-- entity_photo_links connect entities to specific photos. + +CREATE TABLE entities ( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + name TEXT NOT NULL, + entity_type TEXT NOT NULL, -- 'person' | 'place' | 'event' | 'thing' + description TEXT NOT NULL DEFAULT '', + embedding BLOB, -- 768-dim f32 vector; nullable if embedding service was unavailable + confidence REAL NOT NULL DEFAULT 0.5, + status TEXT NOT NULL DEFAULT 'active', -- 'active' | 'reviewed' | 'rejected' + created_at BIGINT NOT NULL, + updated_at BIGINT NOT NULL, + UNIQUE(name, entity_type) +); + +CREATE INDEX idx_entities_type ON entities(entity_type); +CREATE INDEX idx_entities_status ON entities(status); +CREATE INDEX idx_entities_name ON entities(name); + +CREATE TABLE entity_facts ( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + subject_entity_id INTEGER NOT NULL, + predicate TEXT NOT NULL, + object_entity_id INTEGER, -- nullable: entity-to-entity relationship target + object_value TEXT, -- nullable: free-text attribute value + source_photo TEXT, -- photo path that prompted extraction (injected server-side) + source_insight_id INTEGER, -- backfilled after insight is stored + confidence REAL NOT NULL DEFAULT 0.6, + status TEXT NOT NULL DEFAULT 'active', -- 'active' | 'reviewed' | 'rejected' + created_at BIGINT NOT NULL, + CONSTRAINT fk_ef_subject FOREIGN KEY (subject_entity_id) REFERENCES entities(id) ON DELETE CASCADE, + CONSTRAINT fk_ef_object FOREIGN KEY (object_entity_id) REFERENCES entities(id) ON DELETE SET NULL, + CONSTRAINT fk_ef_insight FOREIGN KEY (source_insight_id) REFERENCES photo_insights(id) ON DELETE SET NULL, + CHECK (object_entity_id IS NOT NULL OR object_value IS NOT NULL) +); + +CREATE INDEX idx_entity_facts_subject ON entity_facts(subject_entity_id); +CREATE INDEX idx_entity_facts_predicate ON entity_facts(predicate); +CREATE INDEX idx_entity_facts_status ON entity_facts(status); +CREATE INDEX idx_entity_facts_source_photo ON entity_facts(source_photo); + +CREATE TABLE entity_photo_links ( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + entity_id INTEGER NOT NULL, + file_path TEXT NOT NULL, + role TEXT NOT NULL, -- 'subject' | 'location' | 'event' | 'thing' + CONSTRAINT fk_epl_entity FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE, + UNIQUE(entity_id, file_path, role) +); + +CREATE INDEX idx_entity_photo_links_entity ON entity_photo_links(entity_id); +CREATE INDEX idx_entity_photo_links_photo ON entity_photo_links(file_path); diff --git a/src/database/insights_dao.rs b/src/database/insights_dao.rs index 1efa9f3..9dff438 100644 --- a/src/database/insights_dao.rs +++ b/src/database/insights_dao.rs @@ -21,6 +21,12 @@ pub trait InsightDao: Sync + Send { file_path: &str, ) -> Result, DbError>; + fn get_insight_history( + &mut self, + context: &opentelemetry::Context, + file_path: &str, + ) -> Result, DbError>; + fn delete_insight( &mut self, context: &opentelemetry::Context, @@ -49,6 +55,11 @@ impl SqliteInsightDao { connection: Arc::new(Mutex::new(connect())), } } + + #[cfg(test)] + pub fn from_connection(conn: Arc>) -> Self { + SqliteInsightDao { connection: conn } + } } impl InsightDao for SqliteInsightDao { @@ -62,15 +73,22 @@ impl InsightDao for SqliteInsightDao { let mut connection = self.connection.lock().expect("Unable to get InsightDao"); - // Insert or replace on conflict (UNIQUE constraint on file_path) - diesel::replace_into(photo_insights) + // Mark all existing insights for this file as no longer current + diesel::update(photo_insights.filter(file_path.eq(&insight.file_path))) + .set(is_current.eq(false)) + .execute(connection.deref_mut()) + .map_err(|_| anyhow::anyhow!("Update is_current error"))?; + + // Insert the new insight as current + diesel::insert_into(photo_insights) .values(&insight) .execute(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Insert error"))?; - // Retrieve the inserted record + // Retrieve the inserted record (is_current = true) photo_insights .filter(file_path.eq(&insight.file_path)) + .filter(is_current.eq(true)) .first::(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Query error")) }) @@ -89,6 +107,7 @@ impl InsightDao for SqliteInsightDao { photo_insights .filter(file_path.eq(path)) + .filter(is_current.eq(true)) .first::(connection.deref_mut()) .optional() .map_err(|_| anyhow::anyhow!("Query error")) @@ -96,6 +115,25 @@ impl InsightDao for SqliteInsightDao { .map_err(|_| DbError::new(DbErrorKind::QueryError)) } + fn get_insight_history( + &mut self, + context: &opentelemetry::Context, + path: &str, + ) -> Result, DbError> { + trace_db_call(context, "query", "get_insight_history", |_span| { + use schema::photo_insights::dsl::*; + + let mut connection = self.connection.lock().expect("Unable to get InsightDao"); + + photo_insights + .filter(file_path.eq(path)) + .order(generated_at.desc()) + .load::(connection.deref_mut()) + .map_err(|_| anyhow::anyhow!("Query error")) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + fn delete_insight( &mut self, context: &opentelemetry::Context, @@ -124,6 +162,7 @@ impl InsightDao for SqliteInsightDao { let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights + .filter(is_current.eq(true)) .order(generated_at.desc()) .load::(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Query error")) diff --git a/src/database/knowledge_dao.rs b/src/database/knowledge_dao.rs new file mode 100644 index 0000000..09ffddf --- /dev/null +++ b/src/database/knowledge_dao.rs @@ -0,0 +1,854 @@ +use diesel::prelude::*; +use diesel::sqlite::SqliteConnection; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex}; + +use crate::database::models::{ + Entity, EntityFact, EntityPhotoLink, InsertEntity, InsertEntityFact, InsertEntityPhotoLink, +}; +use crate::database::schema; +use crate::database::{DbError, DbErrorKind, connect}; +use crate::otel::trace_db_call; + +// --------------------------------------------------------------------------- +// Filter / patch types +// --------------------------------------------------------------------------- + +pub struct EntityFilter { + pub entity_type: Option, + /// "active" | "reviewed" | "rejected" | "all" + pub status: Option, + /// LIKE match on name and description + pub search: Option, + pub limit: i64, + pub offset: i64, +} + +pub struct FactFilter { + pub entity_id: Option, + /// "active" | "reviewed" | "rejected" | "all" + pub status: Option, + pub predicate: Option, + pub limit: i64, + pub offset: i64, +} + +pub struct EntityPatch { + pub name: Option, + pub description: Option, + pub status: Option, + pub confidence: Option, +} + +pub struct FactPatch { + pub predicate: Option, + pub object_value: Option, + pub status: Option, + pub confidence: Option, +} + +pub struct RecentActivity { + pub entities: Vec, + pub facts: Vec, +} + +// --------------------------------------------------------------------------- +// Trait +// --------------------------------------------------------------------------- + +pub trait KnowledgeDao: Sync + Send { + // --- Entity --- + fn upsert_entity( + &mut self, + cx: &opentelemetry::Context, + entity: InsertEntity, + ) -> Result; + + fn get_entity_by_id( + &mut self, + cx: &opentelemetry::Context, + id: i32, + ) -> Result, DbError>; + + fn get_entity_by_name( + &mut self, + cx: &opentelemetry::Context, + name: &str, + entity_type: Option<&str>, + ) -> Result, DbError>; + + fn get_entities_with_embeddings( + &mut self, + cx: &opentelemetry::Context, + entity_type: Option<&str>, + ) -> Result, DbError>; + + fn list_entities( + &mut self, + cx: &opentelemetry::Context, + filter: EntityFilter, + ) -> Result<(Vec, i64), DbError>; + + fn update_entity_status( + &mut self, + cx: &opentelemetry::Context, + id: i32, + status: &str, + ) -> Result<(), DbError>; + + fn update_entity( + &mut self, + cx: &opentelemetry::Context, + id: i32, + patch: EntityPatch, + ) -> Result, DbError>; + + fn delete_entity(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>; + + fn merge_entities( + &mut self, + cx: &opentelemetry::Context, + source_id: i32, + target_id: i32, + ) -> Result<(i64, i64), DbError>; + + // --- Facts --- + fn upsert_fact( + &mut self, + cx: &opentelemetry::Context, + fact: InsertEntityFact, + ) -> Result<(EntityFact, bool), DbError>; + + fn get_facts_for_entity( + &mut self, + cx: &opentelemetry::Context, + entity_id: i32, + ) -> Result, DbError>; + + fn list_facts( + &mut self, + cx: &opentelemetry::Context, + filter: FactFilter, + ) -> Result<(Vec, i64), DbError>; + + fn update_fact( + &mut self, + cx: &opentelemetry::Context, + id: i32, + patch: FactPatch, + ) -> Result, DbError>; + + fn update_facts_insight_id( + &mut self, + cx: &opentelemetry::Context, + source_photo: &str, + insight_id: i32, + ) -> Result<(), DbError>; + + fn delete_fact(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>; + + // --- Photo links --- + fn upsert_photo_link( + &mut self, + cx: &opentelemetry::Context, + link: InsertEntityPhotoLink, + ) -> Result<(), DbError>; + + fn delete_photo_links_for_file( + &mut self, + cx: &opentelemetry::Context, + file_path: &str, + ) -> Result<(), DbError>; + + fn get_links_for_photo( + &mut self, + cx: &opentelemetry::Context, + file_path: &str, + ) -> Result, DbError>; + + fn get_links_for_entity( + &mut self, + cx: &opentelemetry::Context, + entity_id: i32, + ) -> Result, DbError>; + + // --- Audit --- + fn get_recent_activity( + &mut self, + cx: &opentelemetry::Context, + since: i64, + limit: i64, + ) -> Result; +} + +// --------------------------------------------------------------------------- +// SQLite implementation +// --------------------------------------------------------------------------- + +pub struct SqliteKnowledgeDao { + connection: Arc>, +} + +impl Default for SqliteKnowledgeDao { + fn default() -> Self { + Self::new() + } +} + +impl SqliteKnowledgeDao { + pub fn new() -> Self { + SqliteKnowledgeDao { + connection: Arc::new(Mutex::new(connect())), + } + } + + pub fn from_connection(conn: Arc>) -> Self { + SqliteKnowledgeDao { connection: conn } + } + + fn serialize_embedding(vec: &[f32]) -> Vec { + vec.iter().flat_map(|f| f.to_le_bytes()).collect() + } + + fn deserialize_embedding(bytes: &[u8]) -> Result, DbError> { + if bytes.len() % 4 != 0 { + return Err(DbError::new(DbErrorKind::QueryError)); + } + Ok(bytes + .chunks_exact(4) + .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]])) + .collect()) + } + + pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 { + if a.len() != b.len() || a.is_empty() { + return 0.0; + } + let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum(); + let mag_a: f32 = a.iter().map(|x| x * x).sum::().sqrt(); + let mag_b: f32 = b.iter().map(|x| x * x).sum::().sqrt(); + if mag_a == 0.0 || mag_b == 0.0 { + 0.0 + } else { + dot / (mag_a * mag_b) + } + } +} + +impl KnowledgeDao for SqliteKnowledgeDao { + // ----------------------------------------------------------------------- + // Entity operations + // ----------------------------------------------------------------------- + + fn upsert_entity( + &mut self, + cx: &opentelemetry::Context, + entity: InsertEntity, + ) -> Result { + trace_db_call(cx, "insert", "upsert_entity", |_span| { + use schema::entities::dsl::*; + + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + // Case-insensitive lookup by name + entity_type + let name_lower = entity.name.to_lowercase(); + let existing: Option = entities + .filter(diesel::dsl::sql::(&format!( + "lower(name) = '{}' AND entity_type = '{}'", + name_lower.replace('\'', "''"), + entity.entity_type.replace('\'', "''") + ))) + .first::(conn.deref_mut()) + .optional() + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + + if let Some(existing_entity) = existing { + // Update description, embedding, updated_at + diesel::update(entities.filter(id.eq(existing_entity.id))) + .set(( + description.eq(&entity.description), + embedding.eq(&entity.embedding), + updated_at.eq(entity.updated_at), + )) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; + + entities + .filter(id.eq(existing_entity.id)) + .first::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e)) + } else { + diesel::insert_into(entities) + .values(&entity) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Insert error: {}", e))?; + + entities + .order(id.desc()) + .first::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e)) + } + }) + .map_err(|_| DbError::new(DbErrorKind::InsertError)) + } + + fn get_entity_by_id( + &mut self, + cx: &opentelemetry::Context, + entity_id: i32, + ) -> Result, DbError> { + trace_db_call(cx, "query", "get_entity_by_id", |_span| { + use schema::entities::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + entities + .filter(id.eq(entity_id)) + .first::(conn.deref_mut()) + .optional() + .map_err(|e| anyhow::anyhow!("Query error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn get_entity_by_name( + &mut self, + cx: &opentelemetry::Context, + entity_name: &str, + entity_type_filter: Option<&str>, + ) -> Result, DbError> { + trace_db_call(cx, "query", "get_entity_by_name", |_span| { + use schema::entities::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + let name_lower = entity_name.to_lowercase().replace('\'', "''"); + let mut sql = format!("lower(name) = '{}'", name_lower); + if let Some(et) = entity_type_filter { + sql.push_str(&format!(" AND entity_type = '{}'", et.replace('\'', "''"))); + } + sql.push_str(" AND status != 'rejected'"); + + entities + .filter(diesel::dsl::sql::(&sql)) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn get_entities_with_embeddings( + &mut self, + cx: &opentelemetry::Context, + entity_type_filter: Option<&str>, + ) -> Result, DbError> { + trace_db_call(cx, "query", "get_entities_with_embeddings", |_span| { + use schema::entities::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + let mut query = entities + .filter(embedding.is_not_null()) + .filter(status.ne("rejected")) + .into_boxed(); + + if let Some(et) = entity_type_filter { + query = query.filter(entity_type.eq(et)); + } + + query + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn list_entities( + &mut self, + cx: &opentelemetry::Context, + filter: EntityFilter, + ) -> Result<(Vec, i64), DbError> { + trace_db_call(cx, "query", "list_entities", |_span| { + use diesel::dsl::count_star; + use schema::entities::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + let mut query = entities.into_boxed(); + + if let Some(ref et) = filter.entity_type { + query = query.filter(entity_type.eq(et)); + } + + let status_val = filter.status.as_deref().unwrap_or("active"); + if status_val != "all" { + query = query.filter(status.eq(status_val)); + } + + if let Some(ref search_term) = filter.search { + let pattern = format!("%{}%", search_term); + query = query.filter(name.like(pattern.clone()).or(description.like(pattern))); + } + + // Count with same filters applied (build separately since boxed query is consumed) + let mut count_query = entities.into_boxed(); + if let Some(ref et) = filter.entity_type { + count_query = count_query.filter(entity_type.eq(et)); + } + let status_val2 = filter.status.as_deref().unwrap_or("active"); + if status_val2 != "all" { + count_query = count_query.filter(status.eq(status_val2)); + } + if let Some(ref search_term) = filter.search { + let pattern = format!("%{}%", search_term); + count_query = + count_query.filter(name.like(pattern.clone()).or(description.like(pattern))); + } + let total: i64 = count_query + .select(count_star()) + .first(conn.deref_mut()) + .unwrap_or(0); + + let results = query + .order(updated_at.desc()) + .limit(filter.limit) + .offset(filter.offset) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + + Ok((results, total)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn update_entity_status( + &mut self, + cx: &opentelemetry::Context, + entity_id: i32, + new_status: &str, + ) -> Result<(), DbError> { + trace_db_call(cx, "update", "update_entity_status", |_span| { + use schema::entities::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + diesel::update(entities.filter(id.eq(entity_id))) + .set(status.eq(new_status)) + .execute(conn.deref_mut()) + .map(|_| ()) + .map_err(|e| anyhow::anyhow!("Update error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::UpdateError)) + } + + fn update_entity( + &mut self, + cx: &opentelemetry::Context, + entity_id: i32, + patch: EntityPatch, + ) -> Result, DbError> { + trace_db_call(cx, "update", "update_entity", |_span| { + use schema::entities::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + let now = chrono::Utc::now().timestamp(); + + if let Some(ref new_name) = patch.name { + diesel::update(entities.filter(id.eq(entity_id))) + .set((name.eq(new_name), updated_at.eq(now))) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Update name error: {}", e))?; + } + if let Some(ref new_desc) = patch.description { + diesel::update(entities.filter(id.eq(entity_id))) + .set((description.eq(new_desc), updated_at.eq(now))) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Update description error: {}", e))?; + } + if let Some(ref new_status) = patch.status { + diesel::update(entities.filter(id.eq(entity_id))) + .set((status.eq(new_status), updated_at.eq(now))) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Update status error: {}", e))?; + } + if let Some(new_confidence) = patch.confidence { + diesel::update(entities.filter(id.eq(entity_id))) + .set((confidence.eq(new_confidence), updated_at.eq(now))) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Update confidence error: {}", e))?; + } + + entities + .filter(id.eq(entity_id)) + .first::(conn.deref_mut()) + .optional() + .map_err(|e| anyhow::anyhow!("Query error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::UpdateError)) + } + + fn delete_entity( + &mut self, + cx: &opentelemetry::Context, + entity_id: i32, + ) -> Result<(), DbError> { + trace_db_call(cx, "delete", "delete_entity", |_span| { + use schema::entities::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + diesel::delete(entities.filter(id.eq(entity_id))) + .execute(conn.deref_mut()) + .map(|_| ()) + .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn merge_entities( + &mut self, + cx: &opentelemetry::Context, + source_id: i32, + target_id: i32, + ) -> Result<(i64, i64), DbError> { + trace_db_call(cx, "update", "merge_entities", |_span| { + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + conn.transaction::<(i64, i64), diesel::result::Error, _>(|conn| { + use schema::entity_facts::dsl as ef; + use schema::entity_photo_links::dsl as epl; + + // 1. Re-point facts where source is subject + let facts_updated = + diesel::update(ef::entity_facts.filter(ef::subject_entity_id.eq(source_id))) + .set(ef::subject_entity_id.eq(target_id)) + .execute(conn)? as i64; + + // 2. Re-point facts where source is object + diesel::update(ef::entity_facts.filter(ef::object_entity_id.eq(source_id))) + .set(ef::object_entity_id.eq(Some(target_id))) + .execute(conn)?; + + // 3. Copy photo links to target (INSERT OR IGNORE to skip duplicates) + let links_updated = diesel::sql_query( + "INSERT OR IGNORE INTO entity_photo_links (entity_id, file_path, role) \ + SELECT ?, file_path, role FROM entity_photo_links WHERE entity_id = ?", + ) + .bind::(target_id) + .bind::(source_id) + .execute(conn)? as i64; + + // 4. Delete source entity (FK CASCADE removes remaining facts/links) + diesel::delete( + schema::entities::dsl::entities.filter(schema::entities::dsl::id.eq(source_id)), + ) + .execute(conn)?; + + Ok((facts_updated, links_updated)) + }) + .map_err(|e| anyhow::anyhow!("Merge transaction error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::UpdateError)) + } + + // ----------------------------------------------------------------------- + // Fact operations + // ----------------------------------------------------------------------- + + fn upsert_fact( + &mut self, + cx: &opentelemetry::Context, + fact: InsertEntityFact, + ) -> Result<(EntityFact, bool), DbError> { + trace_db_call(cx, "insert", "upsert_fact", |_span| { + use schema::entity_facts::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + // Look for an identical active fact + let mut dup_query = entity_facts + .filter(subject_entity_id.eq(fact.subject_entity_id)) + .filter(predicate.eq(&fact.predicate)) + .filter(status.ne("rejected")) + .into_boxed(); + + match &fact.object_entity_id { + Some(oid) => dup_query = dup_query.filter(object_entity_id.eq(oid)), + None => dup_query = dup_query.filter(object_entity_id.is_null()), + } + match &fact.object_value { + Some(ov) => dup_query = dup_query.filter(object_value.eq(ov)), + None => dup_query = dup_query.filter(object_value.is_null()), + } + + let existing: Option = dup_query + .first::(conn.deref_mut()) + .optional() + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + + if let Some(existing_fact) = existing { + // Corroborate: bump confidence by 0.1 capped at 0.95 + let new_confidence = (existing_fact.confidence + 0.1).min(0.95); + diesel::update(entity_facts.filter(id.eq(existing_fact.id))) + .set(confidence.eq(new_confidence)) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Update confidence error: {}", e))?; + + let updated = entity_facts + .filter(id.eq(existing_fact.id)) + .first::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + + Ok((updated, false)) // false = corroborated, not newly created + } else { + diesel::insert_into(entity_facts) + .values(&fact) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Insert error: {}", e))?; + + let inserted = entity_facts + .order(id.desc()) + .first::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + + Ok((inserted, true)) // true = newly created + } + }) + .map_err(|_| DbError::new(DbErrorKind::InsertError)) + } + + fn get_facts_for_entity( + &mut self, + cx: &opentelemetry::Context, + entity_id: i32, + ) -> Result, DbError> { + trace_db_call(cx, "query", "get_facts_for_entity", |_span| { + use schema::entity_facts::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + entity_facts + .filter(subject_entity_id.eq(entity_id)) + .filter(status.ne("rejected")) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn list_facts( + &mut self, + cx: &opentelemetry::Context, + filter: FactFilter, + ) -> Result<(Vec, i64), DbError> { + trace_db_call(cx, "query", "list_facts", |_span| { + use diesel::dsl::count_star; + use schema::entity_facts::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + let mut query = entity_facts.into_boxed(); + + if let Some(eid) = filter.entity_id { + query = query.filter(subject_entity_id.eq(eid)); + } + let status_val = filter.status.as_deref().unwrap_or("active"); + if status_val != "all" { + query = query.filter(status.eq(status_val)); + } + if let Some(ref pred) = filter.predicate { + query = query.filter(predicate.eq(pred)); + } + + let total: i64 = entity_facts + .select(count_star()) + .first(conn.deref_mut()) + .unwrap_or(0); + + let results = query + .order(created_at.desc()) + .limit(filter.limit) + .offset(filter.offset) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + + Ok((results, total)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn update_fact( + &mut self, + cx: &opentelemetry::Context, + fact_id: i32, + patch: FactPatch, + ) -> Result, DbError> { + trace_db_call(cx, "update", "update_fact", |_span| { + use schema::entity_facts::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + if let Some(ref new_predicate) = patch.predicate { + diesel::update(entity_facts.filter(id.eq(fact_id))) + .set(predicate.eq(new_predicate)) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; + } + if let Some(ref new_value) = patch.object_value { + diesel::update(entity_facts.filter(id.eq(fact_id))) + .set(object_value.eq(new_value)) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; + } + if let Some(ref new_status) = patch.status { + diesel::update(entity_facts.filter(id.eq(fact_id))) + .set(status.eq(new_status)) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; + } + if let Some(new_confidence) = patch.confidence { + diesel::update(entity_facts.filter(id.eq(fact_id))) + .set(confidence.eq(new_confidence)) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; + } + + entity_facts + .filter(id.eq(fact_id)) + .first::(conn.deref_mut()) + .optional() + .map_err(|e| anyhow::anyhow!("Query error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::UpdateError)) + } + + fn update_facts_insight_id( + &mut self, + cx: &opentelemetry::Context, + photo_path: &str, + insight_id: i32, + ) -> Result<(), DbError> { + trace_db_call(cx, "update", "update_facts_insight_id", |_span| { + use schema::entity_facts::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + diesel::update( + entity_facts + .filter(source_photo.eq(photo_path)) + .filter(source_insight_id.is_null()), + ) + .set(source_insight_id.eq(insight_id)) + .execute(conn.deref_mut()) + .map(|_| ()) + .map_err(|e| anyhow::anyhow!("Update error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::UpdateError)) + } + + fn delete_fact(&mut self, cx: &opentelemetry::Context, fact_id: i32) -> Result<(), DbError> { + trace_db_call(cx, "delete", "delete_fact", |_span| { + use schema::entity_facts::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + diesel::delete(entity_facts.filter(id.eq(fact_id))) + .execute(conn.deref_mut()) + .map(|_| ()) + .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + // ----------------------------------------------------------------------- + // Photo link operations + // ----------------------------------------------------------------------- + + fn upsert_photo_link( + &mut self, + cx: &opentelemetry::Context, + link: InsertEntityPhotoLink, + ) -> Result<(), DbError> { + trace_db_call(cx, "insert", "upsert_photo_link", |_span| { + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + // INSERT OR IGNORE respects the UNIQUE(entity_id, file_path, role) constraint + diesel::sql_query( + "INSERT OR IGNORE INTO entity_photo_links (entity_id, file_path, role) VALUES (?, ?, ?)" + ) + .bind::(link.entity_id) + .bind::(&link.file_path) + .bind::(&link.role) + .execute(conn.deref_mut()) + .map(|_| ()) + .map_err(|e| anyhow::anyhow!("Insert error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::InsertError)) + } + + fn delete_photo_links_for_file( + &mut self, + cx: &opentelemetry::Context, + file_path_val: &str, + ) -> Result<(), DbError> { + trace_db_call(cx, "delete", "delete_photo_links_for_file", |_span| { + use schema::entity_photo_links::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + diesel::delete(entity_photo_links.filter(file_path.eq(file_path_val))) + .execute(conn.deref_mut()) + .map(|_| ()) + .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn get_links_for_photo( + &mut self, + cx: &opentelemetry::Context, + file_path_val: &str, + ) -> Result, DbError> { + trace_db_call(cx, "query", "get_links_for_photo", |_span| { + use schema::entity_photo_links::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + entity_photo_links + .filter(file_path.eq(file_path_val)) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn get_links_for_entity( + &mut self, + cx: &opentelemetry::Context, + entity_id_val: i32, + ) -> Result, DbError> { + trace_db_call(cx, "query", "get_links_for_entity", |_span| { + use schema::entity_photo_links::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + entity_photo_links + .filter(entity_id.eq(entity_id_val)) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + // ----------------------------------------------------------------------- + // Audit + // ----------------------------------------------------------------------- + + fn get_recent_activity( + &mut self, + cx: &opentelemetry::Context, + since: i64, + limit: i64, + ) -> Result { + trace_db_call(cx, "query", "get_recent_activity", |_span| { + use schema::entities::dsl as e; + use schema::entity_facts::dsl as ef; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + let recent_entities = e::entities + .filter(e::created_at.gt(since)) + .order(e::created_at.desc()) + .limit(limit) + .load::(conn.deref_mut()) + .map_err(|err| anyhow::anyhow!("Query error: {}", err))?; + + let recent_facts = ef::entity_facts + .filter(ef::created_at.gt(since)) + .order(ef::created_at.desc()) + .limit(limit) + .load::(conn.deref_mut()) + .map_err(|err| anyhow::anyhow!("Query error: {}", err))?; + + Ok(RecentActivity { + entities: recent_entities, + facts: recent_facts, + }) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } +} diff --git a/src/database/mod.rs b/src/database/mod.rs index c1d31cc..7139663 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -12,6 +12,7 @@ use crate::otel::trace_db_call; pub mod calendar_dao; pub mod daily_summary_dao; pub mod insights_dao; +pub mod knowledge_dao; pub mod location_dao; pub mod models; pub mod preview_dao; @@ -21,6 +22,10 @@ pub mod search_dao; pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao}; pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao}; pub use insights_dao::{InsightDao, SqliteInsightDao}; +pub use knowledge_dao::{ + EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, RecentActivity, + SqliteKnowledgeDao, +}; pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao}; pub use preview_dao::{PreviewDao, SqlitePreviewDao}; pub use search_dao::{SearchHistoryDao, SqliteSearchHistoryDao}; diff --git a/src/database/models.rs b/src/database/models.rs index f7bf031..93309fc 100644 --- a/src/database/models.rs +++ b/src/database/models.rs @@ -1,4 +1,7 @@ -use crate::database::schema::{favorites, image_exif, photo_insights, users, video_preview_clips}; +use crate::database::schema::{ + entities, entity_facts, entity_photo_links, favorites, image_exif, photo_insights, users, + video_preview_clips, +}; use serde::Serialize; #[derive(Insertable)] @@ -82,6 +85,7 @@ pub struct InsertPhotoInsight { pub summary: String, pub generated_at: i64, pub model_version: String, + pub is_current: bool, } #[derive(Serialize, Queryable, Clone, Debug)] @@ -92,6 +96,79 @@ pub struct PhotoInsight { pub summary: String, pub generated_at: i64, pub model_version: String, + pub is_current: bool, +} + +// --- Knowledge memory models --- + +#[derive(Insertable)] +#[diesel(table_name = entities)] +pub struct InsertEntity { + pub name: String, + pub entity_type: String, + pub description: String, + pub embedding: Option>, + pub confidence: f32, + pub status: String, + pub created_at: i64, + pub updated_at: i64, +} + +#[derive(Serialize, Queryable, Clone, Debug)] +pub struct Entity { + pub id: i32, + pub name: String, + pub entity_type: String, + pub description: String, + pub embedding: Option>, + pub confidence: f32, + pub status: String, + pub created_at: i64, + pub updated_at: i64, +} + +#[derive(Insertable)] +#[diesel(table_name = entity_facts)] +pub struct InsertEntityFact { + pub subject_entity_id: i32, + pub predicate: String, + pub object_entity_id: Option, + pub object_value: Option, + pub source_photo: Option, + pub source_insight_id: Option, + pub confidence: f32, + pub status: String, + pub created_at: i64, +} + +#[derive(Serialize, Queryable, Clone, Debug)] +pub struct EntityFact { + pub id: i32, + pub subject_entity_id: i32, + pub predicate: String, + pub object_entity_id: Option, + pub object_value: Option, + pub source_photo: Option, + pub source_insight_id: Option, + pub confidence: f32, + pub status: String, + pub created_at: i64, +} + +#[derive(Insertable)] +#[diesel(table_name = entity_photo_links)] +pub struct InsertEntityPhotoLink { + pub entity_id: i32, + pub file_path: String, + pub role: String, +} + +#[derive(Serialize, Queryable, Clone, Debug)] +pub struct EntityPhotoLink { + pub id: i32, + pub entity_id: i32, + pub file_path: String, + pub role: String, } #[derive(Insertable)] diff --git a/src/database/schema.rs b/src/database/schema.rs index ec798ce..cbcff68 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -31,6 +31,44 @@ diesel::table! { } } +diesel::table! { + entities (id) { + id -> Integer, + name -> Text, + entity_type -> Text, + description -> Text, + embedding -> Nullable, + confidence -> Float, + status -> Text, + created_at -> BigInt, + updated_at -> BigInt, + } +} + +diesel::table! { + entity_facts (id) { + id -> Integer, + subject_entity_id -> Integer, + predicate -> Text, + object_entity_id -> Nullable, + object_value -> Nullable, + source_photo -> Nullable, + source_insight_id -> Nullable, + confidence -> Float, + status -> Text, + created_at -> BigInt, + } +} + +diesel::table! { + entity_photo_links (id) { + id -> Integer, + entity_id -> Integer, + file_path -> Text, + role -> Text, + } +} + diesel::table! { favorites (id) { id -> Integer, @@ -112,6 +150,7 @@ diesel::table! { summary -> Text, generated_at -> BigInt, model_version -> Text, + is_current -> Bool, } } @@ -165,11 +204,16 @@ diesel::table! { } } +diesel::joinable!(entity_facts -> photo_insights (source_insight_id)); +diesel::joinable!(entity_photo_links -> entities (entity_id)); diesel::joinable!(tagged_photo -> tags (tag_id)); diesel::allow_tables_to_appear_in_same_query!( calendar_events, daily_conversation_summaries, + entities, + entity_facts, + entity_photo_links, favorites, image_exif, knowledge_embeddings, diff --git a/src/knowledge.rs b/src/knowledge.rs new file mode 100644 index 0000000..d4e3bc7 --- /dev/null +++ b/src/knowledge.rs @@ -0,0 +1,567 @@ +use actix_web::dev::{ServiceFactory, ServiceRequest}; +use actix_web::{App, HttpResponse, Responder, web}; +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use std::sync::Mutex; + +use crate::data::Claims; +use crate::database::models::{Entity, EntityFact, EntityPhotoLink}; +use crate::database::{ + EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, RecentActivity, +}; + +// --------------------------------------------------------------------------- +// Request / Response types +// --------------------------------------------------------------------------- + +#[derive(Serialize)] +pub struct EntitySummary { + pub id: i32, + pub name: String, + pub entity_type: String, + pub description: String, + pub confidence: f32, + pub status: String, + pub created_at: i64, + pub updated_at: i64, +} + +impl From for EntitySummary { + fn from(e: Entity) -> Self { + EntitySummary { + id: e.id, + name: e.name, + entity_type: e.entity_type, + description: e.description, + confidence: e.confidence, + status: e.status, + created_at: e.created_at, + updated_at: e.updated_at, + } + } +} + +#[derive(Serialize)] +pub struct EntityListResponse { + pub entities: Vec, + pub total: i64, + pub limit: i64, + pub offset: i64, +} + +#[derive(Serialize)] +pub struct FactDetail { + pub id: i32, + pub predicate: String, + pub object_entity_id: Option, + pub object_entity_name: Option, + pub object_value: Option, + pub confidence: f32, + pub status: String, + pub source_photo: Option, + pub source_insight_id: Option, + pub created_at: i64, +} + +#[derive(Serialize)] +pub struct PhotoLinkDetail { + pub file_path: String, + pub role: String, +} + +impl From for PhotoLinkDetail { + fn from(l: EntityPhotoLink) -> Self { + PhotoLinkDetail { + file_path: l.file_path, + role: l.role, + } + } +} + +#[derive(Serialize)] +pub struct EntityDetailResponse { + pub id: i32, + pub name: String, + pub entity_type: String, + pub description: String, + pub confidence: f32, + pub status: String, + pub created_at: i64, + pub updated_at: i64, + pub facts: Vec, + pub photo_links: Vec, +} + +#[derive(Serialize)] +pub struct FactSummary { + pub id: i32, + pub subject_entity_id: i32, + pub subject_entity_name: Option, + pub predicate: String, + pub object_entity_id: Option, + pub object_entity_name: Option, + pub object_value: Option, + pub confidence: f32, + pub status: String, + pub source_photo: Option, + pub source_insight_id: Option, + pub created_at: i64, +} + +#[derive(Serialize)] +pub struct FactListResponse { + pub facts: Vec, + pub total: i64, + pub limit: i64, + pub offset: i64, +} + +#[derive(Deserialize)] +pub struct MergeRequest { + pub source_id: i32, + pub target_id: i32, +} + +#[derive(Serialize)] +pub struct MergeResponse { + pub merged_entity_id: i32, + pub deleted_entity_id: i32, + pub facts_transferred: i64, + pub links_transferred: i64, +} + +#[derive(Deserialize)] +pub struct EntityPatchRequest { + pub name: Option, + pub description: Option, + pub status: Option, + pub confidence: Option, +} + +#[derive(Deserialize)] +pub struct FactPatchRequest { + pub predicate: Option, + pub object_value: Option, + pub status: Option, + pub confidence: Option, +} + +#[derive(Deserialize)] +pub struct EntityListQuery { + #[serde(rename = "type")] + pub entity_type: Option, + pub status: Option, + pub search: Option, + pub limit: Option, + pub offset: Option, +} + +#[derive(Deserialize)] +pub struct FactListQuery { + pub entity_id: Option, + pub status: Option, + pub predicate: Option, + pub limit: Option, + pub offset: Option, +} + +#[derive(Deserialize)] +pub struct RecentQuery { + pub since: Option, + pub limit: Option, +} + +// --------------------------------------------------------------------------- +// Service registration +// --------------------------------------------------------------------------- + +pub fn add_knowledge_services(app: App) -> App +where + T: ServiceFactory, +{ + app.service( + web::scope("/knowledge") + .service(web::resource("/entities").route(web::get().to(list_entities::))) + .service(web::resource("/entities/merge").route(web::post().to(merge_entities::))) + .service( + web::resource("/entities/{id}") + .route(web::get().to(get_entity::)) + .route(web::patch().to(patch_entity::)) + .route(web::delete().to(delete_entity::)), + ) + .service(web::resource("/facts").route(web::get().to(list_facts::))) + .service( + web::resource("/facts/{id}") + .route(web::patch().to(patch_fact::)) + .route(web::delete().to(delete_fact::)), + ) + .service(web::resource("/recent").route(web::get().to(get_recent::))), + ) +} + +// --------------------------------------------------------------------------- +// Handlers +// --------------------------------------------------------------------------- + +async fn list_entities( + _claims: Claims, + query: web::Query, + dao: web::Data>, +) -> impl Responder { + let limit = query.limit.unwrap_or(50).min(200); + let offset = query.offset.unwrap_or(0); + + let status_filter = match query.status.as_deref() { + None | Some("active") => Some("active".to_string()), + Some("all") => None, + Some(s) => Some(s.to_string()), + }; + + let filter = EntityFilter { + entity_type: query.entity_type.clone(), + status: status_filter, + search: query.search.clone(), + limit, + offset, + }; + + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + match dao.list_entities(&cx, filter) { + Ok((entities, total)) => { + let summaries: Vec = + entities.into_iter().map(EntitySummary::from).collect(); + HttpResponse::Ok().json(EntityListResponse { + entities: summaries, + total, + limit, + offset, + }) + } + Err(e) => { + log::error!("list_entities error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + +async fn get_entity( + _claims: Claims, + id: web::Path, + dao: web::Data>, +) -> impl Responder { + let cx = opentelemetry::Context::current(); + let entity_id = id.into_inner(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + + let entity = match dao.get_entity_by_id(&cx, entity_id) { + Ok(Some(e)) => e, + Ok(None) => { + return HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"})); + } + Err(e) => { + log::error!("get_entity error: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + }; + + // Fetch all facts (all statuses for audit) + let raw_facts: Vec = match dao.get_facts_for_entity(&cx, entity_id) { + Ok(f) => f, + Err(e) => { + log::error!("get_facts_for_entity error: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + }; + + // Resolve object entity names + let mut facts = Vec::with_capacity(raw_facts.len()); + for f in raw_facts { + let object_entity_name = if let Some(oid) = f.object_entity_id { + dao.get_entity_by_id(&cx, oid) + .ok() + .flatten() + .map(|e| e.name) + } else { + None + }; + facts.push(FactDetail { + id: f.id, + predicate: f.predicate, + object_entity_id: f.object_entity_id, + object_entity_name, + object_value: f.object_value, + confidence: f.confidence, + status: f.status, + source_photo: f.source_photo, + source_insight_id: f.source_insight_id, + created_at: f.created_at, + }); + } + + // Fetch photo links + let photo_links: Vec = match dao.get_links_for_entity(&cx, entity_id) { + Ok(links) => links.into_iter().map(PhotoLinkDetail::from).collect(), + Err(e) => { + log::error!("get_links_for_entity error: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + }; + + HttpResponse::Ok().json(EntityDetailResponse { + id: entity.id, + name: entity.name, + entity_type: entity.entity_type, + description: entity.description, + confidence: entity.confidence, + status: entity.status, + created_at: entity.created_at, + updated_at: entity.updated_at, + facts, + photo_links, + }) +} + +async fn patch_entity( + _claims: Claims, + id: web::Path, + body: web::Json, + dao: web::Data>, +) -> impl Responder { + let cx = opentelemetry::Context::current(); + let entity_id = id.into_inner(); + let patch = EntityPatch { + name: body.name.clone(), + description: body.description.clone(), + status: body.status.clone(), + confidence: body.confidence, + }; + + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + match dao.update_entity(&cx, entity_id, patch) { + Ok(Some(entity)) => HttpResponse::Ok().json(EntitySummary::from(entity)), + Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"})), + Err(e) => { + log::error!("patch_entity error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + +async fn delete_entity( + _claims: Claims, + id: web::Path, + dao: web::Data>, +) -> impl Responder { + let cx = opentelemetry::Context::current(); + let entity_id = id.into_inner(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + + // Verify entity exists before deleting + match dao.get_entity_by_id(&cx, entity_id) { + Ok(None) => { + return HttpResponse::NotFound().json(serde_json::json!({"error": "Entity not found"})); + } + Err(e) => { + log::error!("delete_entity lookup error: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + Ok(Some(_)) => {} + } + + match dao.delete_entity(&cx, entity_id) { + Ok(()) => HttpResponse::NoContent().finish(), + Err(e) => { + log::error!("delete_entity error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + +async fn merge_entities( + _claims: Claims, + body: web::Json, + dao: web::Data>, +) -> impl Responder { + if body.source_id == body.target_id { + return HttpResponse::BadRequest() + .json(serde_json::json!({"error": "source_id and target_id must be different"})); + } + + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + + // Verify both entities exist + for id in [body.source_id, body.target_id] { + match dao.get_entity_by_id(&cx, id) { + Ok(None) => { + return HttpResponse::BadRequest() + .json(serde_json::json!({"error": format!("Entity {} not found", id)})); + } + Err(e) => { + log::error!("merge_entities lookup error: {:?}", e); + return HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Database error"})); + } + Ok(Some(_)) => {} + } + } + + match dao.merge_entities(&cx, body.source_id, body.target_id) { + Ok((facts_transferred, links_transferred)) => HttpResponse::Ok().json(MergeResponse { + merged_entity_id: body.target_id, + deleted_entity_id: body.source_id, + facts_transferred, + links_transferred, + }), + Err(e) => { + log::error!("merge_entities error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + +async fn list_facts( + _claims: Claims, + query: web::Query, + dao: web::Data>, +) -> impl Responder { + let limit = query.limit.unwrap_or(50).min(200); + let offset = query.offset.unwrap_or(0); + + let status_filter = match query.status.as_deref() { + None | Some("active") => Some("active".to_string()), + Some("all") => None, + Some(s) => Some(s.to_string()), + }; + + let filter = FactFilter { + entity_id: query.entity_id, + status: status_filter, + predicate: query.predicate.clone(), + limit, + offset, + }; + + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + match dao.list_facts(&cx, filter) { + Ok((facts, total)) => { + let mut summaries = Vec::with_capacity(facts.len()); + for f in facts { + let subject_entity_name = dao + .get_entity_by_id(&cx, f.subject_entity_id) + .ok() + .flatten() + .map(|e| e.name); + let object_entity_name = if let Some(oid) = f.object_entity_id { + dao.get_entity_by_id(&cx, oid) + .ok() + .flatten() + .map(|e| e.name) + } else { + None + }; + summaries.push(FactSummary { + id: f.id, + subject_entity_id: f.subject_entity_id, + subject_entity_name, + predicate: f.predicate, + object_entity_id: f.object_entity_id, + object_entity_name, + object_value: f.object_value, + confidence: f.confidence, + status: f.status, + source_photo: f.source_photo, + source_insight_id: f.source_insight_id, + created_at: f.created_at, + }); + } + HttpResponse::Ok().json(FactListResponse { + facts: summaries, + total, + limit, + offset, + }) + } + Err(e) => { + log::error!("list_facts error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + +async fn patch_fact( + _claims: Claims, + id: web::Path, + body: web::Json, + dao: web::Data>, +) -> impl Responder { + let cx = opentelemetry::Context::current(); + let fact_id = id.into_inner(); + let patch = FactPatch { + predicate: body.predicate.clone(), + object_value: body.object_value.clone(), + status: body.status.clone(), + confidence: body.confidence, + }; + + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + match dao.update_fact(&cx, fact_id, patch) { + Ok(Some(fact)) => HttpResponse::Ok().json(fact), + Ok(None) => HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})), + Err(e) => { + log::error!("patch_fact error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + +async fn delete_fact( + _claims: Claims, + id: web::Path, + dao: web::Data>, +) -> impl Responder { + let cx = opentelemetry::Context::current(); + let fact_id = id.into_inner(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + match dao.delete_fact(&cx, fact_id) { + Ok(()) => HttpResponse::NoContent().finish(), + Err(e) => { + log::warn!("delete_fact({}) error: {:?}", fact_id, e); + HttpResponse::NotFound().json(serde_json::json!({"error": "Fact not found"})) + } + } +} + +async fn get_recent( + _claims: Claims, + query: web::Query, + dao: web::Data>, +) -> impl Responder { + let since = query + .since + .unwrap_or_else(|| Utc::now().timestamp() - 86400); + let limit = query.limit.unwrap_or(20).min(100); + + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + match dao.get_recent_activity(&cx, since, limit) { + Ok(RecentActivity { entities, facts }) => { + let entity_summaries: Vec = + entities.into_iter().map(EntitySummary::from).collect(); + HttpResponse::Ok().json(serde_json::json!({ + "entities": entity_summaries, + "facts": facts + })) + } + Err(e) => { + log::error!("get_recent error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} diff --git a/src/main.rs b/src/main.rs index b56d43c..ec2be62 100644 --- a/src/main.rs +++ b/src/main.rs @@ -67,6 +67,7 @@ mod tags; mod utils; mod video; +mod knowledge; mod memories; mod otel; mod service; @@ -1186,6 +1187,7 @@ fn main() -> std::io::Result<()> { .service(ai::get_all_insights_handler) .service(ai::get_available_models_handler) .add_feature(add_tag_services::<_, SqliteTagDao>) + .add_feature(knowledge::add_knowledge_services::<_, SqliteKnowledgeDao>) .app_data(app_data.clone()) .app_data::>(Data::new(RealFileSystem::new( app_data.base_path.clone(), @@ -1204,6 +1206,9 @@ fn main() -> std::io::Result<()> { .app_data::>>>(Data::new(Mutex::new(Box::new( preview_dao, )))) + .app_data::>>(Data::new(Mutex::new( + SqliteKnowledgeDao::new(), + ))) .app_data(web::JsonConfig::default().error_handler(|err, req| { let detail = err.to_string(); log::warn!( diff --git a/src/state.rs b/src/state.rs index 4000704..f85a2e6 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,8 +1,8 @@ use crate::ai::{InsightGenerator, OllamaClient, SmsApiClient}; use crate::database::{ - CalendarEventDao, DailySummaryDao, ExifDao, InsightDao, LocationHistoryDao, SearchHistoryDao, - SqliteCalendarEventDao, SqliteDailySummaryDao, SqliteExifDao, SqliteInsightDao, - SqliteLocationHistoryDao, SqliteSearchHistoryDao, + CalendarEventDao, DailySummaryDao, ExifDao, InsightDao, KnowledgeDao, LocationHistoryDao, + SearchHistoryDao, SqliteCalendarEventDao, SqliteDailySummaryDao, SqliteExifDao, + SqliteInsightDao, SqliteKnowledgeDao, SqliteLocationHistoryDao, SqliteSearchHistoryDao, }; use crate::database::{PreviewDao, SqlitePreviewDao}; use crate::tags::{SqliteTagDao, TagDao}; @@ -119,6 +119,8 @@ impl Default for AppState { Arc::new(Mutex::new(Box::new(SqliteSearchHistoryDao::new()))); let tag_dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteTagDao::default()))); + let knowledge_dao: Arc>> = + Arc::new(Mutex::new(Box::new(SqliteKnowledgeDao::new()))); // Load base path let base_path = env::var("BASE_PATH").expect("BASE_PATH was not set in the env"); @@ -134,6 +136,7 @@ impl Default for AppState { location_dao.clone(), search_dao.clone(), tag_dao.clone(), + knowledge_dao, base_path.clone(), ); @@ -200,6 +203,8 @@ impl AppState { Arc::new(Mutex::new(Box::new(SqliteSearchHistoryDao::new()))); let tag_dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteTagDao::default()))); + let knowledge_dao: Arc>> = + Arc::new(Mutex::new(Box::new(SqliteKnowledgeDao::new()))); // Initialize test InsightGenerator with all data sources let base_path_str = base_path.to_string_lossy().to_string(); @@ -213,6 +218,7 @@ impl AppState { location_dao.clone(), search_dao.clone(), tag_dao.clone(), + knowledge_dao, base_path_str.clone(), );