use diesel::prelude::*; use diesel::sqlite::SqliteConnection; use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use crate::database::models::{InsertPhotoInsight, PhotoInsight}; use crate::database::schema; use crate::database::{DbError, DbErrorKind, connect}; use crate::otel::trace_db_call; pub trait InsightDao: Sync + Send { fn store_insight( &mut self, context: &opentelemetry::Context, insight: InsertPhotoInsight, ) -> Result; fn get_insight( &mut self, context: &opentelemetry::Context, file_path: &str, ) -> Result, DbError>; /// Return the most recent current insight whose rel_path is one of /// `paths`. Used for content-hash sharing: the caller expands a /// single file into all rel_paths with the same content_hash, then /// asks here for any existing insight attached to any of them. fn get_insight_for_paths( &mut self, context: &opentelemetry::Context, paths: &[String], ) -> Result, DbError>; #[allow(dead_code)] fn get_insight_history( &mut self, context: &opentelemetry::Context, file_path: &str, ) -> Result, DbError>; fn delete_insight( &mut self, context: &opentelemetry::Context, file_path: &str, ) -> Result<(), DbError>; fn get_all_insights( &mut self, context: &opentelemetry::Context, ) -> Result, DbError>; fn rate_insight( &mut self, context: &opentelemetry::Context, file_path: &str, approved: bool, ) -> Result<(), DbError>; fn get_approved_insights( &mut self, context: &opentelemetry::Context, ) -> Result, DbError>; /// Replace the `training_messages` JSON blob on the current row for /// `(library_id, rel_path)`. Used by chat-turn append mode to persist /// the extended conversation without inserting a new insight version. fn update_training_messages( &mut self, context: &opentelemetry::Context, library_id: i32, file_path: &str, training_messages_json: &str, ) -> Result<(), DbError>; } pub struct SqliteInsightDao { connection: Arc>, } impl Default for SqliteInsightDao { fn default() -> Self { Self::new() } } impl SqliteInsightDao { pub fn new() -> Self { SqliteInsightDao { connection: Arc::new(Mutex::new(connect())), } } #[cfg(test)] #[allow(dead_code)] pub fn from_connection(conn: Arc>) -> Self { SqliteInsightDao { connection: conn } } } impl InsightDao for SqliteInsightDao { fn store_insight( &mut self, context: &opentelemetry::Context, insight: InsertPhotoInsight, ) -> Result { trace_db_call(context, "insert", "store_insight", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); // Mark all existing insights for this file as no longer current diesel::update( photo_insights .filter(library_id.eq(insight.library_id)) .filter(rel_path.eq(&insight.file_path)), ) .set(is_current.eq(false)) .execute(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Update is_current error"))?; // Insert the new insight as current diesel::insert_into(photo_insights) .values(&insight) .execute(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Insert error"))?; // Retrieve the inserted record (is_current = true) photo_insights .filter(library_id.eq(insight.library_id)) .filter(rel_path.eq(&insight.file_path)) .filter(is_current.eq(true)) .first::(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::InsertError)) } fn get_insight( &mut self, context: &opentelemetry::Context, path: &str, ) -> Result, DbError> { trace_db_call(context, "query", "get_insight", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights .filter(rel_path.eq(path)) .filter(is_current.eq(true)) .first::(connection.deref_mut()) .optional() .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_insight_for_paths( &mut self, context: &opentelemetry::Context, paths: &[String], ) -> Result, DbError> { if paths.is_empty() { return Ok(None); } trace_db_call(context, "query", "get_insight_for_paths", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights .filter(rel_path.eq_any(paths)) .filter(is_current.eq(true)) .order(generated_at.desc()) .first::(connection.deref_mut()) .optional() .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_insight_history( &mut self, context: &opentelemetry::Context, path: &str, ) -> Result, DbError> { trace_db_call(context, "query", "get_insight_history", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights .filter(rel_path.eq(path)) .order(generated_at.desc()) .load::(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn delete_insight( &mut self, context: &opentelemetry::Context, path: &str, ) -> Result<(), DbError> { trace_db_call(context, "delete", "delete_insight", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); diesel::delete(photo_insights.filter(rel_path.eq(path))) .execute(connection.deref_mut()) .map(|_| ()) .map_err(|_| anyhow::anyhow!("Delete error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_all_insights( &mut self, context: &opentelemetry::Context, ) -> Result, DbError> { trace_db_call(context, "query", "get_all_insights", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights .filter(is_current.eq(true)) .order(generated_at.desc()) .load::(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn rate_insight( &mut self, context: &opentelemetry::Context, path: &str, is_approved: bool, ) -> Result<(), DbError> { trace_db_call(context, "update", "rate_insight", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); diesel::update( photo_insights .filter(rel_path.eq(path)) .filter(is_current.eq(true)), ) .set(approved.eq(Some(is_approved))) .execute(connection.deref_mut()) .map(|_| ()) .map_err(|_| anyhow::anyhow!("Update error")) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } fn get_approved_insights( &mut self, context: &opentelemetry::Context, ) -> Result, DbError> { trace_db_call(context, "query", "get_approved_insights", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights .filter(approved.eq(true)) .filter(training_messages.is_not_null()) .order(generated_at.desc()) .load::(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn update_training_messages( &mut self, context: &opentelemetry::Context, lib_id: i32, path: &str, training_messages_json: &str, ) -> Result<(), DbError> { trace_db_call(context, "update", "update_training_messages", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); diesel::update( photo_insights .filter(library_id.eq(lib_id)) .filter(rel_path.eq(path)) .filter(is_current.eq(true)), ) .set(training_messages.eq(Some(training_messages_json.to_string()))) .execute(connection.deref_mut()) .map(|_| ()) .map_err(|_| anyhow::anyhow!("Update error")) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } }