use diesel::prelude::*; use diesel::sqlite::SqliteConnection; use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use crate::database::models::{InsertPhotoInsight, PhotoInsight}; use crate::database::schema; use crate::database::{DbError, DbErrorKind, connect}; use crate::otel::trace_db_call; pub trait InsightDao: Sync + Send { fn store_insight( &mut self, context: &opentelemetry::Context, insight: InsertPhotoInsight, ) -> Result; fn get_insight( &mut self, context: &opentelemetry::Context, file_path: &str, ) -> Result, DbError>; /// Library-scoped variant of `get_insight`. The default `get_insight` /// finds any `is_current=true` row matching `file_path` across /// libraries — fine for the photo-grid metadata fetch (cross-library /// merge), wrong for the chat path: a regenerate on lib1 flips lib1's /// row to `is_current=false` and inserts a new lib1 row, but /// lib2's untouched `is_current=true` row for the same rel_path /// would still satisfy the path-only query and shadow the regen on /// the next history fetch. Always pass a library_id when you have /// one (chat / insight write paths always do). fn get_current_insight_for_library( &mut self, context: &opentelemetry::Context, library_id: i32, file_path: &str, ) -> Result, DbError>; /// Return the most recent current insight whose rel_path is one of /// `paths`. Used for content-hash sharing: the caller expands a /// single file into all rel_paths with the same content_hash, then /// asks here for any existing insight attached to any of them. fn get_insight_for_paths( &mut self, context: &opentelemetry::Context, paths: &[String], ) -> Result, DbError>; fn get_insight_history( &mut self, context: &opentelemetry::Context, file_path: &str, ) -> Result, DbError>; /// Fetch a single insight by primary key, regardless of `is_current`. /// Used by the few-shot injection flow where the caller picks specific /// historical insights (which may have been superseded) as training /// exemplars for a fresh generation. fn get_insight_by_id( &mut self, context: &opentelemetry::Context, insight_id: i32, ) -> Result, DbError>; fn delete_insight( &mut self, context: &opentelemetry::Context, file_path: &str, ) -> Result<(), DbError>; fn get_all_insights( &mut self, context: &opentelemetry::Context, ) -> Result, DbError>; fn rate_insight( &mut self, context: &opentelemetry::Context, file_path: &str, approved: bool, ) -> Result<(), DbError>; /// Rate a specific insight version by primary key, regardless of /// `is_current`. Used by the per-file history view to approve/reject /// previously generated (superseded) versions, which the path-based /// `rate_insight` (current row only) cannot reach. fn rate_insight_by_id( &mut self, context: &opentelemetry::Context, insight_id: i32, approved: bool, ) -> Result<(), DbError>; fn get_approved_insights( &mut self, context: &opentelemetry::Context, ) -> Result, DbError>; /// Replace the `training_messages` JSON blob on the current row for /// `(library_id, rel_path)`. Used by chat-turn append mode to persist /// the extended conversation without inserting a new insight version. /// Returns the number of rows affected (0 if no current row matched, /// indicating a concurrent regenerate/reconcile flipped `is_current`). fn update_training_messages( &mut self, context: &opentelemetry::Context, library_id: i32, file_path: &str, training_messages_json: &str, ) -> Result; } pub struct SqliteInsightDao { connection: Arc>, } impl Default for SqliteInsightDao { fn default() -> Self { Self::new() } } impl SqliteInsightDao { pub fn new() -> Self { SqliteInsightDao { connection: Arc::new(Mutex::new(connect())), } } #[cfg(test)] #[allow(dead_code)] pub fn from_connection(conn: Arc>) -> Self { SqliteInsightDao { connection: conn } } } impl InsightDao for SqliteInsightDao { fn store_insight( &mut self, context: &opentelemetry::Context, mut insight: InsertPhotoInsight, ) -> Result { trace_db_call(context, "insert", "store_insight", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); // Eagerly populate content_hash so this insight follows the // bytes (CLAUDE.md "Multi-library data model"). Caller- // supplied hash wins; otherwise look it up from image_exif // for the (library_id, rel_path) tuple. None is acceptable — // reconciliation backfills it once the hash lands. if insight.content_hash.is_none() { use schema::image_exif as ie; insight.content_hash = ie::table .filter(ie::library_id.eq(insight.library_id)) .filter(ie::rel_path.eq(&insight.file_path)) .filter(ie::content_hash.is_not_null()) .select(ie::content_hash) .first::>(connection.deref_mut()) .ok() .flatten(); } // Mark all existing insights for this file as no longer current diesel::update( photo_insights .filter(library_id.eq(insight.library_id)) .filter(rel_path.eq(&insight.file_path)), ) .set(is_current.eq(false)) .execute(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to flip is_current: {}", e))?; // Insert the new insight as current diesel::insert_into(photo_insights) .values(&insight) .execute(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to insert insight: {}", e))?; // Retrieve the inserted record (is_current = true) photo_insights .filter(library_id.eq(insight.library_id)) .filter(rel_path.eq(&insight.file_path)) .filter(is_current.eq(true)) .first::(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to retrieve inserted insight: {}", e)) }) .map_err(|e| { log::error!("store_insight failed: {}", e); DbError::new(DbErrorKind::InsertError) }) } fn get_insight( &mut self, context: &opentelemetry::Context, path: &str, ) -> Result, DbError> { trace_db_call(context, "query", "get_insight", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights .filter(rel_path.eq(path)) .filter(is_current.eq(true)) .first::(connection.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } fn get_current_insight_for_library( &mut self, context: &opentelemetry::Context, lib_id: i32, path: &str, ) -> Result, DbError> { trace_db_call( context, "query", "get_current_insight_for_library", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights .filter(library_id.eq(lib_id)) .filter(rel_path.eq(path)) .filter(is_current.eq(true)) .first::(connection.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }, ) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } fn get_insight_for_paths( &mut self, context: &opentelemetry::Context, paths: &[String], ) -> Result, DbError> { if paths.is_empty() { return Ok(None); } trace_db_call(context, "query", "get_insight_for_paths", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights .filter(rel_path.eq_any(paths)) .filter(is_current.eq(true)) .order(generated_at.desc()) .first::(connection.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } fn get_insight_history( &mut self, context: &opentelemetry::Context, path: &str, ) -> Result, DbError> { trace_db_call(context, "query", "get_insight_history", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights .filter(rel_path.eq(path)) .order(generated_at.desc()) .load::(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } fn get_insight_by_id( &mut self, context: &opentelemetry::Context, insight_id: i32, ) -> Result, DbError> { trace_db_call(context, "query", "get_insight_by_id", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights .find(insight_id) .first::(connection.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } fn delete_insight( &mut self, context: &opentelemetry::Context, path: &str, ) -> Result<(), DbError> { trace_db_call(context, "delete", "delete_insight", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); diesel::delete(photo_insights.filter(rel_path.eq(path))) .execute(connection.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Delete error: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } fn get_all_insights( &mut self, context: &opentelemetry::Context, ) -> Result, DbError> { trace_db_call(context, "query", "get_all_insights", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights .filter(is_current.eq(true)) .order(generated_at.desc()) .load::(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } fn rate_insight( &mut self, context: &opentelemetry::Context, path: &str, is_approved: bool, ) -> Result<(), DbError> { trace_db_call(context, "update", "rate_insight", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); diesel::update( photo_insights .filter(rel_path.eq(path)) .filter(is_current.eq(true)), ) .set(approved.eq(Some(is_approved))) .execute(connection.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Update error: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) } fn rate_insight_by_id( &mut self, context: &opentelemetry::Context, target_id: i32, is_approved: bool, ) -> Result<(), DbError> { trace_db_call(context, "update", "rate_insight_by_id", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); diesel::update(photo_insights.find(target_id)) .set(approved.eq(Some(is_approved))) .execute(connection.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Update error: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) } fn get_approved_insights( &mut self, context: &opentelemetry::Context, ) -> Result, DbError> { trace_db_call(context, "query", "get_approved_insights", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights .filter(approved.eq(true)) .filter(training_messages.is_not_null()) .order(generated_at.desc()) .load::(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } fn update_training_messages( &mut self, context: &opentelemetry::Context, lib_id: i32, path: &str, training_messages_json: &str, ) -> Result { trace_db_call(context, "update", "update_training_messages", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); diesel::update( photo_insights .filter(library_id.eq(lib_id)) .filter(rel_path.eq(path)) .filter(is_current.eq(true)), ) .set(training_messages.eq(Some(training_messages_json.to_string()))) .execute(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Update error: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) } } #[cfg(test)] mod tests { use super::*; use crate::database::test::in_memory_db_connection; fn dao() -> SqliteInsightDao { let conn = Arc::new(Mutex::new(in_memory_db_connection())); SqliteInsightDao::from_connection(conn) } /// Build an insight insert with sensible defaults; tests override the /// fields they care about (path, generated_at, model). fn insert(path: &str, generated_at: i64, model: &str) -> InsertPhotoInsight { InsertPhotoInsight { library_id: 1, file_path: path.to_string(), title: format!("title for {model}"), summary: "summary".to_string(), generated_at, model_version: model.to_string(), is_current: true, training_messages: None, backend: "local".to_string(), fewshot_source_ids: None, content_hash: None, num_ctx: None, temperature: None, top_p: None, top_k: None, min_p: None, system_prompt: None, persona_id: None, prompt_eval_count: None, eval_count: None, } } #[test] fn get_insight_history_returns_all_versions_newest_first() { let cx = opentelemetry::Context::new(); let mut dao = dao(); // store_insight flips prior rows to is_current=false, so three // generations for the same path leave a 3-row history. dao.store_insight(&cx, insert("a.jpg", 100, "m1")).unwrap(); dao.store_insight(&cx, insert("a.jpg", 200, "m2")).unwrap(); dao.store_insight(&cx, insert("a.jpg", 300, "m3")).unwrap(); // A different path must not leak into the history. dao.store_insight(&cx, insert("b.jpg", 250, "other")) .unwrap(); let history = dao.get_insight_history(&cx, "a.jpg").unwrap(); assert_eq!(history.len(), 3); assert_eq!( history.iter().map(|i| i.generated_at).collect::>(), vec![300, 200, 100], "history should be newest-first" ); // Exactly one version is current (the latest generation). let current: Vec<_> = history.iter().filter(|i| i.is_current).collect(); assert_eq!(current.len(), 1); assert_eq!(current[0].generated_at, 300); } #[test] fn rate_insight_by_id_rates_only_the_targeted_version() { let cx = opentelemetry::Context::new(); let mut dao = dao(); dao.store_insight(&cx, insert("a.jpg", 100, "m1")).unwrap(); dao.store_insight(&cx, insert("a.jpg", 200, "m2")).unwrap(); // History is newest-first: [200 (current), 100 (superseded)]. let history = dao.get_insight_history(&cx, "a.jpg").unwrap(); let old_version = history.iter().find(|i| i.generated_at == 100).unwrap(); assert!(!old_version.is_current); dao.rate_insight_by_id(&cx, old_version.id, true).unwrap(); let history = dao.get_insight_history(&cx, "a.jpg").unwrap(); let old = history.iter().find(|i| i.generated_at == 100).unwrap(); let current = history.iter().find(|i| i.generated_at == 200).unwrap(); assert_eq!(old.approved, Some(true), "targeted version is rated"); assert_eq!(current.approved, None, "current version is untouched"); } }