Files
ImageApi/src/database/knowledge_dao.rs
Cameron ffcddbb843 feat: multi-library foundation (schema + libraries module)
Adds a `libraries` registry table and threads library_id through
per-instance metadata tables (image_exif, photo_insights,
entity_photo_links, video_preview_clips). File-path columns renamed to
rel_path to make the relative-to-root semantics explicit. Adds
content_hash + size_bytes on image_exif to support future hash-keyed
thumbnail/HLS dedup. Tags and favorites stay library-agnostic so they
share across libraries by rel_path.

Behavior is unchanged: a single primary library (id=1) is seeded from
BASE_PATH on first boot; all handlers and DAOs route through it as a
transitional shim until the API gains a library query param.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-21 01:55:07 +00:00

884 lines
32 KiB
Rust

use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use crate::database::models::{
Entity, EntityFact, EntityPhotoLink, InsertEntity, InsertEntityFact, InsertEntityPhotoLink,
};
use crate::database::schema;
use crate::database::{DbError, DbErrorKind, connect};
use crate::otel::trace_db_call;
// ---------------------------------------------------------------------------
// Entity type normalisation
// ---------------------------------------------------------------------------
/// Canonicalise a model-supplied entity_type to a consistent lowercase form.
/// Weak models frequently vary capitalisation ("Person" vs "person") or use
/// synonym types ("location" vs "place"). Normalising here prevents duplicate
/// entities that differ only by type spelling.
pub(crate) fn normalize_entity_type(raw: &str) -> String {
match raw.to_lowercase().as_str() {
"person" | "people" | "human" | "individual" | "contact" => "person",
"place" | "location" | "venue" | "site" | "area" | "landmark" => "place",
"event" | "occasion" | "activity" | "celebration" => "event",
"thing" | "object" | "item" | "product" => "thing",
other => other,
}
.to_string()
}
// ---------------------------------------------------------------------------
// Filter / patch types
// ---------------------------------------------------------------------------
pub struct EntityFilter {
pub entity_type: Option<String>,
/// "active" | "reviewed" | "rejected" | "all"
pub status: Option<String>,
/// LIKE match on name and description
pub search: Option<String>,
pub limit: i64,
pub offset: i64,
}
pub struct FactFilter {
pub entity_id: Option<i32>,
/// "active" | "reviewed" | "rejected" | "all"
pub status: Option<String>,
pub predicate: Option<String>,
pub limit: i64,
pub offset: i64,
}
pub struct EntityPatch {
pub name: Option<String>,
pub description: Option<String>,
pub status: Option<String>,
pub confidence: Option<f32>,
}
pub struct FactPatch {
pub predicate: Option<String>,
pub object_value: Option<String>,
pub status: Option<String>,
pub confidence: Option<f32>,
}
pub struct RecentActivity {
pub entities: Vec<Entity>,
pub facts: Vec<EntityFact>,
}
// ---------------------------------------------------------------------------
// Trait
// ---------------------------------------------------------------------------
pub trait KnowledgeDao: Sync + Send {
// --- Entity ---
fn upsert_entity(
&mut self,
cx: &opentelemetry::Context,
entity: InsertEntity,
) -> Result<Entity, DbError>;
fn get_entity_by_id(
&mut self,
cx: &opentelemetry::Context,
id: i32,
) -> Result<Option<Entity>, DbError>;
fn get_entity_by_name(
&mut self,
cx: &opentelemetry::Context,
name: &str,
entity_type: Option<&str>,
) -> Result<Vec<Entity>, DbError>;
fn get_entities_with_embeddings(
&mut self,
cx: &opentelemetry::Context,
entity_type: Option<&str>,
) -> Result<Vec<Entity>, DbError>;
fn list_entities(
&mut self,
cx: &opentelemetry::Context,
filter: EntityFilter,
) -> Result<(Vec<Entity>, i64), DbError>;
fn update_entity_status(
&mut self,
cx: &opentelemetry::Context,
id: i32,
status: &str,
) -> Result<(), DbError>;
fn update_entity(
&mut self,
cx: &opentelemetry::Context,
id: i32,
patch: EntityPatch,
) -> Result<Option<Entity>, DbError>;
fn delete_entity(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>;
fn merge_entities(
&mut self,
cx: &opentelemetry::Context,
source_id: i32,
target_id: i32,
) -> Result<(i64, i64), DbError>;
// --- Facts ---
fn upsert_fact(
&mut self,
cx: &opentelemetry::Context,
fact: InsertEntityFact,
) -> Result<(EntityFact, bool), DbError>;
fn get_facts_for_entity(
&mut self,
cx: &opentelemetry::Context,
entity_id: i32,
) -> Result<Vec<EntityFact>, DbError>;
fn list_facts(
&mut self,
cx: &opentelemetry::Context,
filter: FactFilter,
) -> Result<(Vec<EntityFact>, i64), DbError>;
fn update_fact(
&mut self,
cx: &opentelemetry::Context,
id: i32,
patch: FactPatch,
) -> Result<Option<EntityFact>, DbError>;
fn update_facts_insight_id(
&mut self,
cx: &opentelemetry::Context,
source_photo: &str,
insight_id: i32,
) -> Result<(), DbError>;
fn delete_fact(&mut self, cx: &opentelemetry::Context, id: i32) -> Result<(), DbError>;
// --- Photo links ---
fn upsert_photo_link(
&mut self,
cx: &opentelemetry::Context,
link: InsertEntityPhotoLink,
) -> Result<(), DbError>;
fn delete_photo_links_for_file(
&mut self,
cx: &opentelemetry::Context,
file_path: &str,
) -> Result<(), DbError>;
fn get_links_for_photo(
&mut self,
cx: &opentelemetry::Context,
file_path: &str,
) -> Result<Vec<EntityPhotoLink>, DbError>;
fn get_links_for_entity(
&mut self,
cx: &opentelemetry::Context,
entity_id: i32,
) -> Result<Vec<EntityPhotoLink>, DbError>;
// --- Audit ---
fn get_recent_activity(
&mut self,
cx: &opentelemetry::Context,
since: i64,
limit: i64,
) -> Result<RecentActivity, DbError>;
}
// ---------------------------------------------------------------------------
// SQLite implementation
// ---------------------------------------------------------------------------
pub struct SqliteKnowledgeDao {
connection: Arc<Mutex<SqliteConnection>>,
}
impl Default for SqliteKnowledgeDao {
fn default() -> Self {
Self::new()
}
}
impl SqliteKnowledgeDao {
pub fn new() -> Self {
SqliteKnowledgeDao {
connection: Arc::new(Mutex::new(connect())),
}
}
pub fn from_connection(conn: Arc<Mutex<SqliteConnection>>) -> Self {
SqliteKnowledgeDao { connection: conn }
}
fn serialize_embedding(vec: &[f32]) -> Vec<u8> {
vec.iter().flat_map(|f| f.to_le_bytes()).collect()
}
fn deserialize_embedding(bytes: &[u8]) -> Result<Vec<f32>, DbError> {
if bytes.len() % 4 != 0 {
return Err(DbError::new(DbErrorKind::QueryError));
}
Ok(bytes
.chunks_exact(4)
.map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
.collect())
}
pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
if a.len() != b.len() || a.is_empty() {
return 0.0;
}
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let mag_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let mag_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if mag_a == 0.0 || mag_b == 0.0 {
0.0
} else {
dot / (mag_a * mag_b)
}
}
}
impl KnowledgeDao for SqliteKnowledgeDao {
// -----------------------------------------------------------------------
// Entity operations
// -----------------------------------------------------------------------
fn upsert_entity(
&mut self,
cx: &opentelemetry::Context,
entity: InsertEntity,
) -> Result<Entity, DbError> {
trace_db_call(cx, "insert", "upsert_entity", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// Normalise type before lookup and insert so that model variations
// ("Person" / "person", "location" / "place") collapse to one row.
let entity = InsertEntity {
entity_type: normalize_entity_type(&entity.entity_type),
..entity
};
// Case-insensitive lookup by name + entity_type.
// Use lower() on both sides so existing dirty rows ("Person") still match.
let name_lower = entity.name.to_lowercase();
let type_lower = entity.entity_type.to_lowercase();
let existing: Option<Entity> = entities
.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&format!(
"lower(name) = '{}' AND lower(entity_type) = '{}'",
name_lower.replace('\'', "''"),
type_lower.replace('\'', "''")
)))
.first::<Entity>(conn.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
if let Some(existing_entity) = existing {
// Update description, embedding, updated_at
diesel::update(entities.filter(id.eq(existing_entity.id)))
.set((
description.eq(&entity.description),
embedding.eq(&entity.embedding),
updated_at.eq(entity.updated_at),
))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
entities
.filter(id.eq(existing_entity.id))
.first::<Entity>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
} else {
diesel::insert_into(entities)
.values(&entity)
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
entities
.order(id.desc())
.first::<Entity>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
}
})
.map_err(|_| DbError::new(DbErrorKind::InsertError))
}
fn get_entity_by_id(
&mut self,
cx: &opentelemetry::Context,
entity_id: i32,
) -> Result<Option<Entity>, DbError> {
trace_db_call(cx, "query", "get_entity_by_id", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
entities
.filter(id.eq(entity_id))
.first::<Entity>(conn.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_entity_by_name(
&mut self,
cx: &opentelemetry::Context,
entity_name: &str,
entity_type_filter: Option<&str>,
) -> Result<Vec<Entity>, DbError> {
trace_db_call(cx, "query", "get_entity_by_name", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let name_lower = entity_name.to_lowercase().replace('\'', "''");
let mut sql = format!("lower(name) = '{}'", name_lower);
if let Some(et) = entity_type_filter {
sql.push_str(&format!(" AND entity_type = '{}'", et.replace('\'', "''")));
}
sql.push_str(" AND status != 'rejected'");
entities
.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&sql))
.load::<Entity>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_entities_with_embeddings(
&mut self,
cx: &opentelemetry::Context,
entity_type_filter: Option<&str>,
) -> Result<Vec<Entity>, DbError> {
trace_db_call(cx, "query", "get_entities_with_embeddings", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let mut query = entities
.filter(embedding.is_not_null())
.filter(status.ne("rejected"))
.into_boxed();
if let Some(et) = entity_type_filter {
query = query.filter(entity_type.eq(et));
}
query
.load::<Entity>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn list_entities(
&mut self,
cx: &opentelemetry::Context,
filter: EntityFilter,
) -> Result<(Vec<Entity>, i64), DbError> {
trace_db_call(cx, "query", "list_entities", |_span| {
use diesel::dsl::count_star;
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let mut query = entities.into_boxed();
if let Some(ref et) = filter.entity_type {
query = query.filter(entity_type.eq(et));
}
let status_val = filter.status.as_deref().unwrap_or("active");
if status_val != "all" {
query = query.filter(status.eq(status_val));
}
if let Some(ref search_term) = filter.search {
let pattern = format!("%{}%", search_term);
query = query.filter(name.like(pattern.clone()).or(description.like(pattern)));
}
// Count with same filters applied (build separately since boxed query is consumed)
let mut count_query = entities.into_boxed();
if let Some(ref et) = filter.entity_type {
count_query = count_query.filter(entity_type.eq(et));
}
let status_val2 = filter.status.as_deref().unwrap_or("active");
if status_val2 != "all" {
count_query = count_query.filter(status.eq(status_val2));
}
if let Some(ref search_term) = filter.search {
let pattern = format!("%{}%", search_term);
count_query =
count_query.filter(name.like(pattern.clone()).or(description.like(pattern)));
}
let total: i64 = count_query
.select(count_star())
.first(conn.deref_mut())
.unwrap_or(0);
let results = query
.order(updated_at.desc())
.limit(filter.limit)
.offset(filter.offset)
.load::<Entity>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
Ok((results, total))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn update_entity_status(
&mut self,
cx: &opentelemetry::Context,
entity_id: i32,
new_status: &str,
) -> Result<(), DbError> {
trace_db_call(cx, "update", "update_entity_status", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
diesel::update(entities.filter(id.eq(entity_id)))
.set(status.eq(new_status))
.execute(conn.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn update_entity(
&mut self,
cx: &opentelemetry::Context,
entity_id: i32,
patch: EntityPatch,
) -> Result<Option<Entity>, DbError> {
trace_db_call(cx, "update", "update_entity", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let now = chrono::Utc::now().timestamp();
if let Some(ref new_name) = patch.name {
diesel::update(entities.filter(id.eq(entity_id)))
.set((name.eq(new_name), updated_at.eq(now)))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update name error: {}", e))?;
}
if let Some(ref new_desc) = patch.description {
diesel::update(entities.filter(id.eq(entity_id)))
.set((description.eq(new_desc), updated_at.eq(now)))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update description error: {}", e))?;
}
if let Some(ref new_status) = patch.status {
diesel::update(entities.filter(id.eq(entity_id)))
.set((status.eq(new_status), updated_at.eq(now)))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update status error: {}", e))?;
}
if let Some(new_confidence) = patch.confidence {
diesel::update(entities.filter(id.eq(entity_id)))
.set((confidence.eq(new_confidence), updated_at.eq(now)))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update confidence error: {}", e))?;
}
entities
.filter(id.eq(entity_id))
.first::<Entity>(conn.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn delete_entity(
&mut self,
cx: &opentelemetry::Context,
entity_id: i32,
) -> Result<(), DbError> {
trace_db_call(cx, "delete", "delete_entity", |_span| {
use schema::entities::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
diesel::delete(entities.filter(id.eq(entity_id)))
.execute(conn.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn merge_entities(
&mut self,
cx: &opentelemetry::Context,
source_id: i32,
target_id: i32,
) -> Result<(i64, i64), DbError> {
trace_db_call(cx, "update", "merge_entities", |_span| {
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
conn.transaction::<(i64, i64), diesel::result::Error, _>(|conn| {
use schema::entity_facts::dsl as ef;
use schema::entity_photo_links::dsl as epl;
// 1. Re-point facts where source is subject
let facts_updated =
diesel::update(ef::entity_facts.filter(ef::subject_entity_id.eq(source_id)))
.set(ef::subject_entity_id.eq(target_id))
.execute(conn)? as i64;
// 2. Re-point facts where source is object
diesel::update(ef::entity_facts.filter(ef::object_entity_id.eq(source_id)))
.set(ef::object_entity_id.eq(Some(target_id)))
.execute(conn)?;
// 3. Copy photo links to target (INSERT OR IGNORE to skip duplicates)
let links_updated = diesel::sql_query(
"INSERT OR IGNORE INTO entity_photo_links (entity_id, library_id, rel_path, role) \
SELECT ?, library_id, rel_path, role FROM entity_photo_links WHERE entity_id = ?",
)
.bind::<diesel::sql_types::Integer, _>(target_id)
.bind::<diesel::sql_types::Integer, _>(source_id)
.execute(conn)? as i64;
// 4. Delete source entity (FK CASCADE removes remaining facts/links)
diesel::delete(
schema::entities::dsl::entities.filter(schema::entities::dsl::id.eq(source_id)),
)
.execute(conn)?;
Ok((facts_updated, links_updated))
})
.map_err(|e| anyhow::anyhow!("Merge transaction error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
// -----------------------------------------------------------------------
// Fact operations
// -----------------------------------------------------------------------
fn upsert_fact(
&mut self,
cx: &opentelemetry::Context,
fact: InsertEntityFact,
) -> Result<(EntityFact, bool), DbError> {
trace_db_call(cx, "insert", "upsert_fact", |_span| {
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// Look for an identical active fact
let mut dup_query = entity_facts
.filter(subject_entity_id.eq(fact.subject_entity_id))
.filter(predicate.eq(&fact.predicate))
.filter(status.ne("rejected"))
.into_boxed();
match &fact.object_entity_id {
Some(oid) => dup_query = dup_query.filter(object_entity_id.eq(oid)),
None => dup_query = dup_query.filter(object_entity_id.is_null()),
}
match &fact.object_value {
Some(ov) => dup_query = dup_query.filter(object_value.eq(ov)),
None => dup_query = dup_query.filter(object_value.is_null()),
}
let existing: Option<EntityFact> = dup_query
.first::<EntityFact>(conn.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
if let Some(existing_fact) = existing {
// Corroborate: bump confidence by 0.1 capped at 0.95
let new_confidence = (existing_fact.confidence + 0.1).min(0.95);
diesel::update(entity_facts.filter(id.eq(existing_fact.id)))
.set(confidence.eq(new_confidence))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update confidence error: {}", e))?;
let updated = entity_facts
.filter(id.eq(existing_fact.id))
.first::<EntityFact>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
Ok((updated, false)) // false = corroborated, not newly created
} else {
diesel::insert_into(entity_facts)
.values(&fact)
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
let inserted = entity_facts
.order(id.desc())
.first::<EntityFact>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
Ok((inserted, true)) // true = newly created
}
})
.map_err(|_| DbError::new(DbErrorKind::InsertError))
}
fn get_facts_for_entity(
&mut self,
cx: &opentelemetry::Context,
entity_id: i32,
) -> Result<Vec<EntityFact>, DbError> {
trace_db_call(cx, "query", "get_facts_for_entity", |_span| {
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
entity_facts
.filter(subject_entity_id.eq(entity_id))
.filter(status.ne("rejected"))
.load::<EntityFact>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn list_facts(
&mut self,
cx: &opentelemetry::Context,
filter: FactFilter,
) -> Result<(Vec<EntityFact>, i64), DbError> {
trace_db_call(cx, "query", "list_facts", |_span| {
use diesel::dsl::count_star;
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let mut query = entity_facts.into_boxed();
if let Some(eid) = filter.entity_id {
query = query.filter(subject_entity_id.eq(eid));
}
let status_val = filter.status.as_deref().unwrap_or("active");
if status_val != "all" {
query = query.filter(status.eq(status_val));
}
if let Some(ref pred) = filter.predicate {
query = query.filter(predicate.eq(pred));
}
let total: i64 = entity_facts
.select(count_star())
.first(conn.deref_mut())
.unwrap_or(0);
let results = query
.order(created_at.desc())
.limit(filter.limit)
.offset(filter.offset)
.load::<EntityFact>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))?;
Ok((results, total))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn update_fact(
&mut self,
cx: &opentelemetry::Context,
fact_id: i32,
patch: FactPatch,
) -> Result<Option<EntityFact>, DbError> {
trace_db_call(cx, "update", "update_fact", |_span| {
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
if let Some(ref new_predicate) = patch.predicate {
diesel::update(entity_facts.filter(id.eq(fact_id)))
.set(predicate.eq(new_predicate))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
}
if let Some(ref new_value) = patch.object_value {
diesel::update(entity_facts.filter(id.eq(fact_id)))
.set(object_value.eq(new_value))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
}
if let Some(ref new_status) = patch.status {
diesel::update(entity_facts.filter(id.eq(fact_id)))
.set(status.eq(new_status))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
}
if let Some(new_confidence) = patch.confidence {
diesel::update(entity_facts.filter(id.eq(fact_id)))
.set(confidence.eq(new_confidence))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
}
entity_facts
.filter(id.eq(fact_id))
.first::<EntityFact>(conn.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn update_facts_insight_id(
&mut self,
cx: &opentelemetry::Context,
photo_path: &str,
insight_id: i32,
) -> Result<(), DbError> {
trace_db_call(cx, "update", "update_facts_insight_id", |_span| {
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
diesel::update(
entity_facts
.filter(source_photo.eq(photo_path))
.filter(source_insight_id.is_null()),
)
.set(source_insight_id.eq(insight_id))
.execute(conn.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn delete_fact(&mut self, cx: &opentelemetry::Context, fact_id: i32) -> Result<(), DbError> {
trace_db_call(cx, "delete", "delete_fact", |_span| {
use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
diesel::delete(entity_facts.filter(id.eq(fact_id)))
.execute(conn.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
// -----------------------------------------------------------------------
// Photo link operations
// -----------------------------------------------------------------------
fn upsert_photo_link(
&mut self,
cx: &opentelemetry::Context,
link: InsertEntityPhotoLink,
) -> Result<(), DbError> {
trace_db_call(cx, "insert", "upsert_photo_link", |_span| {
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// INSERT OR IGNORE respects the UNIQUE(entity_id, library_id, rel_path, role) constraint
diesel::sql_query(
"INSERT OR IGNORE INTO entity_photo_links (entity_id, library_id, rel_path, role) VALUES (?, ?, ?, ?)"
)
.bind::<diesel::sql_types::Integer, _>(link.entity_id)
.bind::<diesel::sql_types::Integer, _>(link.library_id)
.bind::<diesel::sql_types::Text, _>(&link.file_path)
.bind::<diesel::sql_types::Text, _>(&link.role)
.execute(conn.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::InsertError))
}
fn delete_photo_links_for_file(
&mut self,
cx: &opentelemetry::Context,
file_path_val: &str,
) -> Result<(), DbError> {
trace_db_call(cx, "delete", "delete_photo_links_for_file", |_span| {
use schema::entity_photo_links::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
diesel::delete(entity_photo_links.filter(rel_path.eq(file_path_val)))
.execute(conn.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_links_for_photo(
&mut self,
cx: &opentelemetry::Context,
file_path_val: &str,
) -> Result<Vec<EntityPhotoLink>, DbError> {
trace_db_call(cx, "query", "get_links_for_photo", |_span| {
use schema::entity_photo_links::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
entity_photo_links
.filter(rel_path.eq(file_path_val))
.load::<EntityPhotoLink>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_links_for_entity(
&mut self,
cx: &opentelemetry::Context,
entity_id_val: i32,
) -> Result<Vec<EntityPhotoLink>, DbError> {
trace_db_call(cx, "query", "get_links_for_entity", |_span| {
use schema::entity_photo_links::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
entity_photo_links
.filter(entity_id.eq(entity_id_val))
.load::<EntityPhotoLink>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
// -----------------------------------------------------------------------
// Audit
// -----------------------------------------------------------------------
fn get_recent_activity(
&mut self,
cx: &opentelemetry::Context,
since: i64,
limit: i64,
) -> Result<RecentActivity, DbError> {
trace_db_call(cx, "query", "get_recent_activity", |_span| {
use schema::entities::dsl as e;
use schema::entity_facts::dsl as ef;
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let recent_entities = e::entities
.filter(e::created_at.gt(since))
.order(e::created_at.desc())
.limit(limit)
.load::<Entity>(conn.deref_mut())
.map_err(|err| anyhow::anyhow!("Query error: {}", err))?;
let recent_facts = ef::entity_facts
.filter(ef::created_at.gt(since))
.order(ef::created_at.desc())
.limit(limit)
.load::<EntityFact>(conn.deref_mut())
.map_err(|err| anyhow::anyhow!("Query error: {}", err))?;
Ok(RecentActivity {
entities: recent_entities,
facts: recent_facts,
})
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
}