use diesel::prelude::*; use diesel::sqlite::SqliteConnection; use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use crate::database::models::{InsertVideoPreviewClip, VideoPreviewClip}; use crate::database::{DbError, DbErrorKind, connect}; use crate::otel::trace_db_call; pub trait PreviewDao: Sync + Send { fn insert_preview( &mut self, context: &opentelemetry::Context, file_path_val: &str, status_val: &str, ) -> Result<(), DbError>; fn update_status( &mut self, context: &opentelemetry::Context, file_path_val: &str, status_val: &str, duration: Option, size: Option, error: Option<&str>, ) -> Result<(), DbError>; fn get_preview( &mut self, context: &opentelemetry::Context, file_path_val: &str, ) -> Result, DbError>; fn get_previews_batch( &mut self, context: &opentelemetry::Context, file_paths: &[String], ) -> Result, DbError>; fn get_by_status( &mut self, context: &opentelemetry::Context, status_val: &str, ) -> Result, DbError>; } pub struct SqlitePreviewDao { connection: Arc>, } impl Default for SqlitePreviewDao { fn default() -> Self { Self::new() } } impl SqlitePreviewDao { pub fn new() -> Self { SqlitePreviewDao { connection: Arc::new(Mutex::new(connect())), } } #[cfg(test)] pub fn from_connection(conn: SqliteConnection) -> Self { SqlitePreviewDao { connection: Arc::new(Mutex::new(conn)), } } } impl PreviewDao for SqlitePreviewDao { fn insert_preview( &mut self, context: &opentelemetry::Context, file_path_val: &str, status_val: &str, ) -> Result<(), DbError> { trace_db_call(context, "insert", "insert_preview", |_span| { use crate::database::schema::video_preview_clips::dsl::*; let mut connection = self.connection.lock().expect("Unable to get PreviewDao"); let now = chrono::Utc::now().to_rfc3339(); diesel::insert_or_ignore_into(video_preview_clips) .values(InsertVideoPreviewClip { library_id: 1, file_path: file_path_val.to_string(), status: status_val.to_string(), created_at: now.clone(), updated_at: now, }) .execute(connection.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Insert error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::InsertError)) } fn update_status( &mut self, context: &opentelemetry::Context, file_path_val: &str, status_val: &str, duration: Option, size: Option, error: Option<&str>, ) -> Result<(), DbError> { trace_db_call(context, "update", "update_preview_status", |_span| { use crate::database::schema::video_preview_clips::dsl::*; let mut connection = self.connection.lock().expect("Unable to get PreviewDao"); let now = chrono::Utc::now().to_rfc3339(); diesel::update(video_preview_clips.filter(rel_path.eq(file_path_val))) .set(( status.eq(status_val), duration_seconds.eq(duration), file_size_bytes.eq(size), error_message.eq(error), updated_at.eq(&now), )) .execute(connection.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Update error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } fn get_preview( &mut self, context: &opentelemetry::Context, file_path_val: &str, ) -> Result, DbError> { trace_db_call(context, "query", "get_preview", |_span| { use crate::database::schema::video_preview_clips::dsl::*; let mut connection = self.connection.lock().expect("Unable to get PreviewDao"); match video_preview_clips .filter(rel_path.eq(file_path_val)) .first::(connection.deref_mut()) { Ok(clip) => Ok(Some(clip)), Err(diesel::result::Error::NotFound) => Ok(None), Err(e) => Err(anyhow::anyhow!("Query error: {}", e)), } }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_previews_batch( &mut self, context: &opentelemetry::Context, file_paths: &[String], ) -> Result, DbError> { trace_db_call(context, "query", "get_previews_batch", |_span| { use crate::database::schema::video_preview_clips::dsl::*; if file_paths.is_empty() { return Ok(Vec::new()); } let mut connection = self.connection.lock().expect("Unable to get PreviewDao"); video_preview_clips .filter(rel_path.eq_any(file_paths)) .load::(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_by_status( &mut self, context: &opentelemetry::Context, status_val: &str, ) -> Result, DbError> { trace_db_call(context, "query", "get_previews_by_status", |_span| { use crate::database::schema::video_preview_clips::dsl::*; let mut connection = self.connection.lock().expect("Unable to get PreviewDao"); video_preview_clips .filter(status.eq(status_val)) .load::(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } } #[cfg(test)] mod tests { use super::*; use crate::database::test::in_memory_db_connection; fn setup_dao() -> SqlitePreviewDao { SqlitePreviewDao::from_connection(in_memory_db_connection()) } fn ctx() -> opentelemetry::Context { opentelemetry::Context::new() } #[test] fn test_insert_and_get_preview() { let mut dao = setup_dao(); let ctx = ctx(); dao.insert_preview(&ctx, "photos/video.mp4", "pending") .unwrap(); let result = dao.get_preview(&ctx, "photos/video.mp4").unwrap(); assert!(result.is_some()); let clip = result.unwrap(); assert_eq!(clip.file_path, "photos/video.mp4"); assert_eq!(clip.status, "pending"); assert!(clip.duration_seconds.is_none()); assert!(clip.file_size_bytes.is_none()); assert!(clip.error_message.is_none()); } #[test] fn test_insert_duplicate_ignored() { let mut dao = setup_dao(); let ctx = ctx(); dao.insert_preview(&ctx, "photos/video.mp4", "pending") .unwrap(); // Second insert with same path should not error (INSERT OR IGNORE) dao.insert_preview(&ctx, "photos/video.mp4", "processing") .unwrap(); // Status should remain "pending" from the first insert let clip = dao.get_preview(&ctx, "photos/video.mp4").unwrap().unwrap(); assert_eq!(clip.status, "pending"); } #[test] fn test_update_status_to_complete() { let mut dao = setup_dao(); let ctx = ctx(); dao.insert_preview(&ctx, "photos/video.mp4", "pending") .unwrap(); dao.update_status( &ctx, "photos/video.mp4", "complete", Some(9.5), Some(1024000), None, ) .unwrap(); let clip = dao.get_preview(&ctx, "photos/video.mp4").unwrap().unwrap(); assert_eq!(clip.status, "complete"); assert_eq!(clip.duration_seconds, Some(9.5)); assert_eq!(clip.file_size_bytes, Some(1024000)); assert!(clip.error_message.is_none()); } #[test] fn test_update_status_to_failed() { let mut dao = setup_dao(); let ctx = ctx(); dao.insert_preview(&ctx, "photos/video.mp4", "pending") .unwrap(); dao.update_status( &ctx, "photos/video.mp4", "failed", None, None, Some("ffmpeg exited with code 1"), ) .unwrap(); let clip = dao.get_preview(&ctx, "photos/video.mp4").unwrap().unwrap(); assert_eq!(clip.status, "failed"); assert_eq!( clip.error_message.as_deref(), Some("ffmpeg exited with code 1") ); } #[test] fn test_get_preview_not_found() { let mut dao = setup_dao(); let ctx = ctx(); let result = dao.get_preview(&ctx, "nonexistent/path.mp4").unwrap(); assert!(result.is_none()); } #[test] fn test_get_previews_batch() { let mut dao = setup_dao(); let ctx = ctx(); dao.insert_preview(&ctx, "a/one.mp4", "complete").unwrap(); dao.insert_preview(&ctx, "b/two.mp4", "pending").unwrap(); dao.insert_preview(&ctx, "c/three.mp4", "failed").unwrap(); // Query only two of the three let paths = vec!["a/one.mp4".to_string(), "c/three.mp4".to_string()]; let results = dao.get_previews_batch(&ctx, &paths).unwrap(); assert_eq!(results.len(), 2); let statuses: Vec<&str> = results.iter().map(|c| c.status.as_str()).collect(); assert!(statuses.contains(&"complete")); assert!(statuses.contains(&"failed")); } #[test] fn test_get_previews_batch_empty_input() { let mut dao = setup_dao(); let ctx = ctx(); let results = dao.get_previews_batch(&ctx, &[]).unwrap(); assert!(results.is_empty()); } #[test] fn test_get_by_status() { let mut dao = setup_dao(); let ctx = ctx(); dao.insert_preview(&ctx, "a.mp4", "pending").unwrap(); dao.insert_preview(&ctx, "b.mp4", "complete").unwrap(); dao.insert_preview(&ctx, "c.mp4", "pending").unwrap(); dao.insert_preview(&ctx, "d.mp4", "failed").unwrap(); let pending = dao.get_by_status(&ctx, "pending").unwrap(); assert_eq!(pending.len(), 2); let complete = dao.get_by_status(&ctx, "complete").unwrap(); assert_eq!(complete.len(), 1); assert_eq!(complete[0].file_path, "b.mp4"); let processing = dao.get_by_status(&ctx, "processing").unwrap(); assert!(processing.is_empty()); } }