use diesel::prelude::*; use diesel::sqlite::SqliteConnection; use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use crate::database::models::{InsertPhotoInsight, PhotoInsight}; use crate::database::schema; use crate::database::{DbError, DbErrorKind, connect}; use crate::otel::trace_db_call; pub trait InsightDao: Sync + Send { fn store_insight( &mut self, context: &opentelemetry::Context, insight: InsertPhotoInsight, ) -> Result; fn get_insight( &mut self, context: &opentelemetry::Context, file_path: &str, ) -> Result, DbError>; fn delete_insight( &mut self, context: &opentelemetry::Context, file_path: &str, ) -> Result<(), DbError>; fn get_all_insights( &mut self, context: &opentelemetry::Context, ) -> Result, DbError>; } pub struct SqliteInsightDao { connection: Arc>, } impl Default for SqliteInsightDao { fn default() -> Self { Self::new() } } impl SqliteInsightDao { pub fn new() -> Self { SqliteInsightDao { connection: Arc::new(Mutex::new(connect())), } } } impl InsightDao for SqliteInsightDao { fn store_insight( &mut self, context: &opentelemetry::Context, insight: InsertPhotoInsight, ) -> Result { trace_db_call(context, "insert", "store_insight", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); // Insert or replace on conflict (UNIQUE constraint on file_path) diesel::replace_into(photo_insights) .values(&insight) .execute(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Insert error"))?; // Retrieve the inserted record photo_insights .filter(file_path.eq(&insight.file_path)) .first::(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::InsertError)) } fn get_insight( &mut self, context: &opentelemetry::Context, path: &str, ) -> Result, DbError> { trace_db_call(context, "query", "get_insight", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights .filter(file_path.eq(path)) .first::(connection.deref_mut()) .optional() .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn delete_insight( &mut self, context: &opentelemetry::Context, path: &str, ) -> Result<(), DbError> { trace_db_call(context, "delete", "delete_insight", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); diesel::delete(photo_insights.filter(file_path.eq(path))) .execute(connection.deref_mut()) .map(|_| ()) .map_err(|_| anyhow::anyhow!("Delete error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_all_insights( &mut self, context: &opentelemetry::Context, ) -> Result, DbError> { trace_db_call(context, "query", "get_all_insights", |_span| { use schema::photo_insights::dsl::*; let mut connection = self.connection.lock().expect("Unable to get InsightDao"); photo_insights .order(generated_at.desc()) .load::(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } }