#![allow(dead_code)] use diesel::prelude::*; use diesel::sqlite::SqliteConnection; use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use crate::database::models::{ Entity, EntityFact, EntityPhotoLink, InsertEntity, InsertEntityFact, InsertEntityPhotoLink, }; use crate::database::schema; use crate::database::{DbError, DbErrorKind, connect}; use crate::otel::trace_db_call; // --------------------------------------------------------------------------- // Entity type normalisation // --------------------------------------------------------------------------- /// Canonicalise a model-supplied entity_type to a consistent lowercase form. /// Weak models frequently vary capitalisation ("Person" vs "person") or use /// synonym types ("location" vs "place"). Normalising here prevents duplicate /// entities that differ only by type spelling. pub(crate) fn normalize_entity_type(raw: &str) -> String { match raw.to_lowercase().as_str() { "person" | "people" | "human" | "individual" | "contact" => "person", "place" | "location" | "venue" | "site" | "area" | "landmark" => "place", "event" | "occasion" | "activity" | "celebration" => "event", "thing" | "object" | "item" | "product" => "thing", other => other, } .to_string() } // --------------------------------------------------------------------------- // Filter / patch types // --------------------------------------------------------------------------- pub struct EntityFilter { pub entity_type: Option, /// "active" | "reviewed" | "rejected" | "all" pub status: Option, /// LIKE match on name and description pub search: Option, pub limit: i64, pub offset: i64, } pub struct FactFilter { pub entity_id: Option, /// "active" | "reviewed" | "rejected" | "all" pub status: Option, pub predicate: Option, pub limit: i64, pub offset: i64, } pub struct EntityPatch { pub name: Option, pub description: Option, pub status: Option, pub confidence: Option, } pub struct FactPatch { pub predicate: Option, pub object_value: Option, pub status: Option, pub confidence: Option, } pub struct RecentActivity { pub entities: Vec, pub facts: Vec, } // --------------------------------------------------------------------------- // Trait // --------------------------------------------------------------------------- pub trait KnowledgeDao: Sync + Send { // --- Entity --- fn upsert_entity( &mut self, cx: &opentelemetry::Context, entity: InsertEntity, ) -> Result; fn get_entity_by_id( &mut self, cx: &opentelemetry::Context, id: i32, ) -> Result, DbError>; fn get_entity_by_name( &mut self, cx: &opentelemetry::Context, name: &str, entity_type: Option<&str>, ) -> Result, DbError>; fn get_entities_with_embeddings( &mut self, cx: &opentelemetry::Context, entity_type: Option<&str>, ) -> Result, DbError>; fn list_entities( &mut self, cx: &opentelemetry::Context, filter: EntityFilter, ) -> Result<(Vec, i64), DbError>; fn update_entity_status( &mut self, cx: &opentelemetry::Context, id: i32, status: &str, ) -> Result<(), DbError>; fn update_entity( &mut self, cx: &opentelemetry::Context, id: i32, patch: EntityPatch, ) -> Result, DbError>; fn delete_entity(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>; fn merge_entities( &mut self, cx: &opentelemetry::Context, source_id: i32, target_id: i32, ) -> Result<(i64, i64), DbError>; // --- Facts --- fn upsert_fact( &mut self, cx: &opentelemetry::Context, fact: InsertEntityFact, ) -> Result<(EntityFact, bool), DbError>; fn get_facts_for_entity( &mut self, cx: &opentelemetry::Context, entity_id: i32, ) -> Result, DbError>; fn list_facts( &mut self, cx: &opentelemetry::Context, filter: FactFilter, ) -> Result<(Vec, i64), DbError>; fn update_fact( &mut self, cx: &opentelemetry::Context, id: i32, patch: FactPatch, ) -> Result, DbError>; fn update_facts_insight_id( &mut self, cx: &opentelemetry::Context, source_photo: &str, insight_id: i32, ) -> Result<(), DbError>; fn delete_fact(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>; // --- Photo links --- fn upsert_photo_link( &mut self, cx: &opentelemetry::Context, link: InsertEntityPhotoLink, ) -> Result<(), DbError>; fn delete_photo_links_for_file( &mut self, cx: &opentelemetry::Context, file_path: &str, ) -> Result<(), DbError>; fn get_links_for_photo( &mut self, cx: &opentelemetry::Context, file_path: &str, ) -> Result, DbError>; fn get_links_for_entity( &mut self, cx: &opentelemetry::Context, entity_id: i32, ) -> Result, DbError>; // --- Audit --- fn get_recent_activity( &mut self, cx: &opentelemetry::Context, since: i64, limit: i64, ) -> Result; } // --------------------------------------------------------------------------- // SQLite implementation // --------------------------------------------------------------------------- pub struct SqliteKnowledgeDao { connection: Arc>, } impl Default for SqliteKnowledgeDao { fn default() -> Self { Self::new() } } impl SqliteKnowledgeDao { pub fn new() -> Self { SqliteKnowledgeDao { connection: Arc::new(Mutex::new(connect())), } } pub fn from_connection(conn: Arc>) -> Self { SqliteKnowledgeDao { connection: conn } } fn serialize_embedding(vec: &[f32]) -> Vec { vec.iter().flat_map(|f| f.to_le_bytes()).collect() } fn deserialize_embedding(bytes: &[u8]) -> Result, DbError> { if !bytes.len().is_multiple_of(4) { return Err(DbError::new(DbErrorKind::QueryError)); } Ok(bytes .chunks_exact(4) .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]])) .collect()) } pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 { if a.len() != b.len() || a.is_empty() { return 0.0; } let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum(); let mag_a: f32 = a.iter().map(|x| x * x).sum::().sqrt(); let mag_b: f32 = b.iter().map(|x| x * x).sum::().sqrt(); if mag_a == 0.0 || mag_b == 0.0 { 0.0 } else { dot / (mag_a * mag_b) } } } impl KnowledgeDao for SqliteKnowledgeDao { // ----------------------------------------------------------------------- // Entity operations // ----------------------------------------------------------------------- fn upsert_entity( &mut self, cx: &opentelemetry::Context, entity: InsertEntity, ) -> Result { trace_db_call(cx, "insert", "upsert_entity", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); // Normalise type before lookup and insert so that model variations // ("Person" / "person", "location" / "place") collapse to one row. let entity = InsertEntity { entity_type: normalize_entity_type(&entity.entity_type), ..entity }; // Case-insensitive lookup by name + entity_type. // Use lower() on both sides so existing dirty rows ("Person") still match. let name_lower = entity.name.to_lowercase(); let type_lower = entity.entity_type.to_lowercase(); let existing: Option = entities .filter(diesel::dsl::sql::(&format!( "lower(name) = '{}' AND lower(entity_type) = '{}'", name_lower.replace('\'', "''"), type_lower.replace('\'', "''") ))) .first::(conn.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; if let Some(existing_entity) = existing { // Update description, embedding, updated_at diesel::update(entities.filter(id.eq(existing_entity.id))) .set(( description.eq(&entity.description), embedding.eq(&entity.embedding), updated_at.eq(entity.updated_at), )) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; entities .filter(id.eq(existing_entity.id)) .first::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) } else { diesel::insert_into(entities) .values(&entity) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Insert error: {}", e))?; entities .order(id.desc()) .first::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) } }) .map_err(|_| DbError::new(DbErrorKind::InsertError)) } fn get_entity_by_id( &mut self, cx: &opentelemetry::Context, entity_id: i32, ) -> Result, DbError> { trace_db_call(cx, "query", "get_entity_by_id", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); entities .filter(id.eq(entity_id)) .first::(conn.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_entity_by_name( &mut self, cx: &opentelemetry::Context, entity_name: &str, entity_type_filter: Option<&str>, ) -> Result, DbError> { trace_db_call(cx, "query", "get_entity_by_name", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let name_lower = entity_name.to_lowercase().replace('\'', "''"); let mut sql = format!("lower(name) = '{}'", name_lower); if let Some(et) = entity_type_filter { sql.push_str(&format!(" AND entity_type = '{}'", et.replace('\'', "''"))); } sql.push_str(" AND status != 'rejected'"); entities .filter(diesel::dsl::sql::(&sql)) .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_entities_with_embeddings( &mut self, cx: &opentelemetry::Context, entity_type_filter: Option<&str>, ) -> Result, DbError> { trace_db_call(cx, "query", "get_entities_with_embeddings", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let mut query = entities .filter(embedding.is_not_null()) .filter(status.ne("rejected")) .into_boxed(); if let Some(et) = entity_type_filter { query = query.filter(entity_type.eq(et)); } query .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn list_entities( &mut self, cx: &opentelemetry::Context, filter: EntityFilter, ) -> Result<(Vec, i64), DbError> { trace_db_call(cx, "query", "list_entities", |_span| { use diesel::dsl::count_star; use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let mut query = entities.into_boxed(); if let Some(ref et) = filter.entity_type { query = query.filter(entity_type.eq(et)); } let status_val = filter.status.as_deref().unwrap_or("active"); if status_val != "all" { query = query.filter(status.eq(status_val)); } if let Some(ref search_term) = filter.search { let pattern = format!("%{}%", search_term); query = query.filter(name.like(pattern.clone()).or(description.like(pattern))); } // Count with same filters applied (build separately since boxed query is consumed) let mut count_query = entities.into_boxed(); if let Some(ref et) = filter.entity_type { count_query = count_query.filter(entity_type.eq(et)); } let status_val2 = filter.status.as_deref().unwrap_or("active"); if status_val2 != "all" { count_query = count_query.filter(status.eq(status_val2)); } if let Some(ref search_term) = filter.search { let pattern = format!("%{}%", search_term); count_query = count_query.filter(name.like(pattern.clone()).or(description.like(pattern))); } let total: i64 = count_query .select(count_star()) .first(conn.deref_mut()) .unwrap_or(0); let results = query .order(updated_at.desc()) .limit(filter.limit) .offset(filter.offset) .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; Ok((results, total)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn update_entity_status( &mut self, cx: &opentelemetry::Context, entity_id: i32, new_status: &str, ) -> Result<(), DbError> { trace_db_call(cx, "update", "update_entity_status", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); diesel::update(entities.filter(id.eq(entity_id))) .set(status.eq(new_status)) .execute(conn.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Update error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } fn update_entity( &mut self, cx: &opentelemetry::Context, entity_id: i32, patch: EntityPatch, ) -> Result, DbError> { trace_db_call(cx, "update", "update_entity", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let now = chrono::Utc::now().timestamp(); if let Some(ref new_name) = patch.name { diesel::update(entities.filter(id.eq(entity_id))) .set((name.eq(new_name), updated_at.eq(now))) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update name error: {}", e))?; } if let Some(ref new_desc) = patch.description { diesel::update(entities.filter(id.eq(entity_id))) .set((description.eq(new_desc), updated_at.eq(now))) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update description error: {}", e))?; } if let Some(ref new_status) = patch.status { diesel::update(entities.filter(id.eq(entity_id))) .set((status.eq(new_status), updated_at.eq(now))) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update status error: {}", e))?; } if let Some(new_confidence) = patch.confidence { diesel::update(entities.filter(id.eq(entity_id))) .set((confidence.eq(new_confidence), updated_at.eq(now))) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update confidence error: {}", e))?; } entities .filter(id.eq(entity_id)) .first::(conn.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } fn delete_entity( &mut self, cx: &opentelemetry::Context, entity_id: i32, ) -> Result<(), DbError> { trace_db_call(cx, "delete", "delete_entity", |_span| { use schema::entities::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); diesel::delete(entities.filter(id.eq(entity_id))) .execute(conn.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn merge_entities( &mut self, cx: &opentelemetry::Context, source_id: i32, target_id: i32, ) -> Result<(i64, i64), DbError> { trace_db_call(cx, "update", "merge_entities", |_span| { let mut conn = self.connection.lock().expect("KnowledgeDao lock"); conn.transaction::<(i64, i64), diesel::result::Error, _>(|conn| { use schema::entity_facts::dsl as ef; // 1. Re-point facts where source is subject let facts_updated = diesel::update(ef::entity_facts.filter(ef::subject_entity_id.eq(source_id))) .set(ef::subject_entity_id.eq(target_id)) .execute(conn)? as i64; // 2. Re-point facts where source is object diesel::update(ef::entity_facts.filter(ef::object_entity_id.eq(source_id))) .set(ef::object_entity_id.eq(Some(target_id))) .execute(conn)?; // 3. Copy photo links to target (INSERT OR IGNORE to skip duplicates) let links_updated = diesel::sql_query( "INSERT OR IGNORE INTO entity_photo_links (entity_id, library_id, rel_path, role) \ SELECT ?, library_id, rel_path, role FROM entity_photo_links WHERE entity_id = ?", ) .bind::(target_id) .bind::(source_id) .execute(conn)? as i64; // 4. Delete source entity (FK CASCADE removes remaining facts/links) diesel::delete( schema::entities::dsl::entities.filter(schema::entities::dsl::id.eq(source_id)), ) .execute(conn)?; Ok((facts_updated, links_updated)) }) .map_err(|e| anyhow::anyhow!("Merge transaction error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } // ----------------------------------------------------------------------- // Fact operations // ----------------------------------------------------------------------- fn upsert_fact( &mut self, cx: &opentelemetry::Context, fact: InsertEntityFact, ) -> Result<(EntityFact, bool), DbError> { trace_db_call(cx, "insert", "upsert_fact", |_span| { use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); // Look for an identical active fact let mut dup_query = entity_facts .filter(subject_entity_id.eq(fact.subject_entity_id)) .filter(predicate.eq(&fact.predicate)) .filter(status.ne("rejected")) .into_boxed(); match &fact.object_entity_id { Some(oid) => dup_query = dup_query.filter(object_entity_id.eq(oid)), None => dup_query = dup_query.filter(object_entity_id.is_null()), } match &fact.object_value { Some(ov) => dup_query = dup_query.filter(object_value.eq(ov)), None => dup_query = dup_query.filter(object_value.is_null()), } let existing: Option = dup_query .first::(conn.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; if let Some(existing_fact) = existing { // Corroborate: bump confidence by 0.1 capped at 0.95 let new_confidence = (existing_fact.confidence + 0.1).min(0.95); diesel::update(entity_facts.filter(id.eq(existing_fact.id))) .set(confidence.eq(new_confidence)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update confidence error: {}", e))?; let updated = entity_facts .filter(id.eq(existing_fact.id)) .first::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; Ok((updated, false)) // false = corroborated, not newly created } else { diesel::insert_into(entity_facts) .values(&fact) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Insert error: {}", e))?; let inserted = entity_facts .order(id.desc()) .first::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; Ok((inserted, true)) // true = newly created } }) .map_err(|_| DbError::new(DbErrorKind::InsertError)) } fn get_facts_for_entity( &mut self, cx: &opentelemetry::Context, entity_id: i32, ) -> Result, DbError> { trace_db_call(cx, "query", "get_facts_for_entity", |_span| { use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); entity_facts .filter(subject_entity_id.eq(entity_id)) .filter(status.ne("rejected")) .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn list_facts( &mut self, cx: &opentelemetry::Context, filter: FactFilter, ) -> Result<(Vec, i64), DbError> { trace_db_call(cx, "query", "list_facts", |_span| { use diesel::dsl::count_star; use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let mut query = entity_facts.into_boxed(); if let Some(eid) = filter.entity_id { query = query.filter(subject_entity_id.eq(eid)); } let status_val = filter.status.as_deref().unwrap_or("active"); if status_val != "all" { query = query.filter(status.eq(status_val)); } if let Some(ref pred) = filter.predicate { query = query.filter(predicate.eq(pred)); } let total: i64 = entity_facts .select(count_star()) .first(conn.deref_mut()) .unwrap_or(0); let results = query .order(created_at.desc()) .limit(filter.limit) .offset(filter.offset) .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; Ok((results, total)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn update_fact( &mut self, cx: &opentelemetry::Context, fact_id: i32, patch: FactPatch, ) -> Result, DbError> { trace_db_call(cx, "update", "update_fact", |_span| { use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); if let Some(ref new_predicate) = patch.predicate { diesel::update(entity_facts.filter(id.eq(fact_id))) .set(predicate.eq(new_predicate)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; } if let Some(ref new_value) = patch.object_value { diesel::update(entity_facts.filter(id.eq(fact_id))) .set(object_value.eq(new_value)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; } if let Some(ref new_status) = patch.status { diesel::update(entity_facts.filter(id.eq(fact_id))) .set(status.eq(new_status)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; } if let Some(new_confidence) = patch.confidence { diesel::update(entity_facts.filter(id.eq(fact_id))) .set(confidence.eq(new_confidence)) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e))?; } entity_facts .filter(id.eq(fact_id)) .first::(conn.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } fn update_facts_insight_id( &mut self, cx: &opentelemetry::Context, photo_path: &str, insight_id: i32, ) -> Result<(), DbError> { trace_db_call(cx, "update", "update_facts_insight_id", |_span| { use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); diesel::update( entity_facts .filter(source_photo.eq(photo_path)) .filter(source_insight_id.is_null()), ) .set(source_insight_id.eq(insight_id)) .execute(conn.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Update error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } fn delete_fact(&mut self, cx: &opentelemetry::Context, fact_id: i32) -> Result<(), DbError> { trace_db_call(cx, "delete", "delete_fact", |_span| { use schema::entity_facts::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); diesel::delete(entity_facts.filter(id.eq(fact_id))) .execute(conn.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } // ----------------------------------------------------------------------- // Photo link operations // ----------------------------------------------------------------------- fn upsert_photo_link( &mut self, cx: &opentelemetry::Context, link: InsertEntityPhotoLink, ) -> Result<(), DbError> { trace_db_call(cx, "insert", "upsert_photo_link", |_span| { let mut conn = self.connection.lock().expect("KnowledgeDao lock"); // INSERT OR IGNORE respects the UNIQUE(entity_id, library_id, rel_path, role) constraint diesel::sql_query( "INSERT OR IGNORE INTO entity_photo_links (entity_id, library_id, rel_path, role) VALUES (?, ?, ?, ?)" ) .bind::(link.entity_id) .bind::(link.library_id) .bind::(&link.file_path) .bind::(&link.role) .execute(conn.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Insert error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::InsertError)) } fn delete_photo_links_for_file( &mut self, cx: &opentelemetry::Context, file_path_val: &str, ) -> Result<(), DbError> { trace_db_call(cx, "delete", "delete_photo_links_for_file", |_span| { use schema::entity_photo_links::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); diesel::delete(entity_photo_links.filter(rel_path.eq(file_path_val))) .execute(conn.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_links_for_photo( &mut self, cx: &opentelemetry::Context, file_path_val: &str, ) -> Result, DbError> { trace_db_call(cx, "query", "get_links_for_photo", |_span| { use schema::entity_photo_links::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); entity_photo_links .filter(rel_path.eq(file_path_val)) .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_links_for_entity( &mut self, cx: &opentelemetry::Context, entity_id_val: i32, ) -> Result, DbError> { trace_db_call(cx, "query", "get_links_for_entity", |_span| { use schema::entity_photo_links::dsl::*; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); entity_photo_links .filter(entity_id.eq(entity_id_val)) .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } // ----------------------------------------------------------------------- // Audit // ----------------------------------------------------------------------- fn get_recent_activity( &mut self, cx: &opentelemetry::Context, since: i64, limit: i64, ) -> Result { trace_db_call(cx, "query", "get_recent_activity", |_span| { use schema::entities::dsl as e; use schema::entity_facts::dsl as ef; let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let recent_entities = e::entities .filter(e::created_at.gt(since)) .order(e::created_at.desc()) .limit(limit) .load::(conn.deref_mut()) .map_err(|err| anyhow::anyhow!("Query error: {}", err))?; let recent_facts = ef::entity_facts .filter(ef::created_at.gt(since)) .order(ef::created_at.desc()) .limit(limit) .load::(conn.deref_mut()) .map_err(|err| anyhow::anyhow!("Query error: {}", err))?; Ok(RecentActivity { entities: recent_entities, facts: recent_facts, }) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } }