use diesel::prelude::*; use diesel::sqlite::SqliteConnection; use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use crate::database::models::{ InsertInsightGenerationJob, InsightGenerationJob, InsightGenerationType, InsightJobStatus, }; use crate::database::schema; use crate::database::{DbError, DbErrorKind, connect}; use crate::otel::trace_db_call; /// Tracks async insight generation jobs. Each call to `create_job` inserts /// a new row; the application layer prevents concurrent running jobs by /// cancelling the old one before creating a new one. pub trait InsightGenerationJobDao: Sync + Send { /// Insert a new running job. Always creates a new row (no upsert). /// Cleans up terminal-state rows for the same key first. fn create_job( &mut self, context: &opentelemetry::Context, library_id: i32, file_path: &str, generation_type: InsightGenerationType, ) -> Result; /// Mark a job as completed with the resulting insight id. Only updates /// if the job is still in "running" status (prevents overwriting a /// cancelled job with a late-completing task). fn complete_job( &mut self, context: &opentelemetry::Context, job_id: i32, insight_id: i32, ) -> Result<(), DbError>; /// Mark a job as failed with an error message. Only updates if the job /// is still in "running" status. fn fail_job( &mut self, context: &opentelemetry::Context, job_id: i32, error_message: &str, ) -> Result<(), DbError>; /// Cancel a specific job by id. Only updates if the job is still /// in "running" status. Returns true if a row was updated. fn cancel_job( &mut self, context: &opentelemetry::Context, job_id: i32, ) -> Result; /// Cancel all running jobs for a given file. Returns the number of /// jobs cancelled. fn cancel_active_jobs( &mut self, context: &opentelemetry::Context, library_id: i32, file_path: &str, ) -> Result; /// Find the latest running job for a given file. Returns None if no /// running job exists. fn get_active_job( &mut self, context: &opentelemetry::Context, library_id: i32, file_path: &str, ) -> Result, DbError>; /// Find any job by id regardless of status. fn get_job_by_id( &mut self, context: &opentelemetry::Context, job_id: i32, ) -> Result, DbError>; /// Mark all jobs still in "running" status as "failed" with a recovery /// error message. Returns the number of jobs recovered. fn recover_orphaned_jobs(&mut self, context: &opentelemetry::Context) -> Result; } pub struct SqliteInsightGenerationJobDao { connection: Arc>, } impl Default for SqliteInsightGenerationJobDao { fn default() -> Self { Self::new() } } impl SqliteInsightGenerationJobDao { pub fn new() -> Self { Self { connection: Arc::new(Mutex::new(connect())), } } #[cfg(test)] pub fn from_connection(conn: Arc>) -> Self { Self { connection: conn } } } impl InsightGenerationJobDao for SqliteInsightGenerationJobDao { fn create_job( &mut self, context: &opentelemetry::Context, library_id: i32, file_path: &str, generation_type: InsightGenerationType, ) -> Result { trace_db_call(context, "insert", "create_job", |_span| { use schema::insight_generation_jobs::dsl; let mut connection = self .connection .lock() .expect("Unable to lock InsightGenerationJobDao"); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("Time went backwards") .as_secs() as i64; let new_job = InsertInsightGenerationJob { library_id, path: file_path.to_string(), gen_type: generation_type.to_string(), status: InsightJobStatus::Running.to_string(), started_at: now, }; diesel::insert_into(dsl::insight_generation_jobs) .values(&new_job) .execute(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to insert job: {}", e))?; dsl::insight_generation_jobs .filter( dsl::library_id .eq(library_id) .and(dsl::file_path.eq(file_path)) .and(dsl::generation_type.eq(generation_type.as_str())) .and(dsl::status.eq(InsightJobStatus::Running.as_str())), ) .select(dsl::id) .order(dsl::id.desc()) .first::(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to get job id: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } fn complete_job( &mut self, context: &opentelemetry::Context, job_id: i32, insight_id: i32, ) -> Result<(), DbError> { trace_db_call(context, "update", "complete_job", |_span| { use schema::insight_generation_jobs::dsl; let mut connection = self .connection .lock() .expect("Unable to lock InsightGenerationJobDao"); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("Time went backwards") .as_secs() as i64; // Only update if still running — prevents cancelled job from // being overwritten by a late-completing task. diesel::update( dsl::insight_generation_jobs.filter( dsl::id .eq(job_id) .and(dsl::status.eq(InsightJobStatus::Running.as_str())), ), ) .set(( dsl::status.eq(InsightJobStatus::Completed.as_str()), dsl::completed_at.eq(Some(now)), dsl::result_insight_id.eq(Some(insight_id)), )) .execute(connection.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Failed to complete job: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) } fn fail_job( &mut self, context: &opentelemetry::Context, job_id: i32, error_message: &str, ) -> Result<(), DbError> { trace_db_call(context, "update", "fail_job", |_span| { use schema::insight_generation_jobs::dsl; let mut connection = self .connection .lock() .expect("Unable to lock InsightGenerationJobDao"); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("Time went backwards") .as_secs() as i64; // Only update if still running. diesel::update( dsl::insight_generation_jobs.filter( dsl::id .eq(job_id) .and(dsl::status.eq(InsightJobStatus::Running.as_str())), ), ) .set(( dsl::status.eq(InsightJobStatus::Failed.as_str()), dsl::completed_at.eq(Some(now)), dsl::error_message.eq(Some(error_message.to_string())), )) .execute(connection.deref_mut()) .map(|_| ()) .map_err(|e| anyhow::anyhow!("Failed to fail job: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) } fn cancel_job( &mut self, context: &opentelemetry::Context, job_id: i32, ) -> Result { trace_db_call(context, "update", "cancel_job", |_span| { use schema::insight_generation_jobs::dsl; let mut connection = self .connection .lock() .expect("Unable to lock InsightGenerationJobDao"); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("Time went backwards") .as_secs() as i64; let rows = diesel::update( dsl::insight_generation_jobs.filter( dsl::id .eq(job_id) .and(dsl::status.eq(InsightJobStatus::Running.as_str())), ), ) .set(( dsl::status.eq(InsightJobStatus::Cancelled.as_str()), dsl::completed_at.eq(Some(now)), dsl::error_message.eq(Some("cancelled by user".to_string())), )) .execute(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to cancel job: {}", e))?; Ok(rows > 0) }) .map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) } fn cancel_active_jobs( &mut self, context: &opentelemetry::Context, library_id: i32, file_path: &str, ) -> Result { trace_db_call(context, "update", "cancel_active_jobs", |_span| { use schema::insight_generation_jobs::dsl; let mut connection = self .connection .lock() .expect("Unable to lock InsightGenerationJobDao"); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("Time went backwards") .as_secs() as i64; let rows = diesel::update( dsl::insight_generation_jobs.filter( dsl::library_id .eq(library_id) .and(dsl::file_path.eq(file_path)) .and(dsl::status.eq(InsightJobStatus::Running.as_str())), ), ) .set(( dsl::status.eq(InsightJobStatus::Cancelled.as_str()), dsl::completed_at.eq(Some(now)), dsl::error_message.eq(Some("cancelled by newer request".to_string())), )) .execute(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to cancel active jobs: {}", e))?; Ok(rows) }) .map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) } fn get_active_job( &mut self, context: &opentelemetry::Context, library_id: i32, file_path: &str, ) -> Result, DbError> { trace_db_call(context, "query", "get_active_job", |_span| { use schema::insight_generation_jobs::dsl; let mut connection = self .connection .lock() .expect("Unable to lock InsightGenerationJobDao"); dsl::insight_generation_jobs .filter( dsl::library_id .eq(library_id) .and(dsl::file_path.eq(file_path)) .and(dsl::status.eq(InsightJobStatus::Running.as_str())), ) .order(dsl::id.desc()) .first::(connection.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Failed to get active job: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } fn get_job_by_id( &mut self, context: &opentelemetry::Context, job_id: i32, ) -> Result, DbError> { trace_db_call(context, "query", "get_job_by_id", |_span| { use schema::insight_generation_jobs::dsl; let mut connection = self .connection .lock() .expect("Unable to lock InsightGenerationJobDao"); dsl::insight_generation_jobs .filter(dsl::id.eq(job_id)) .first::(connection.deref_mut()) .optional() .map_err(|e| anyhow::anyhow!("Failed to get job: {}", e)) }) .map_err(|e| DbError::log(DbErrorKind::QueryError, e)) } fn recover_orphaned_jobs( &mut self, context: &opentelemetry::Context, ) -> Result { trace_db_call(context, "update", "recover_orphaned_jobs", |_span| { use schema::insight_generation_jobs::dsl; let mut connection = self .connection .lock() .expect("Unable to lock InsightGenerationJobDao"); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("Time went backwards") .as_secs() as i64; let rows = diesel::update( dsl::insight_generation_jobs .filter(dsl::status.eq(InsightJobStatus::Running.as_str())), ) .set(( dsl::status.eq(InsightJobStatus::Failed.as_str()), dsl::completed_at.eq(Some(now)), dsl::error_message.eq(Some("server crashed while running".to_string())), )) .execute(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to recover orphaned jobs: {}", e))?; Ok(rows) }) .map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) } } #[cfg(test)] mod tests { use super::*; use diesel::Connection; use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; const DB_MIGRATIONS: EmbeddedMigrations = embed_migrations!(); fn setup_dao() -> SqliteInsightGenerationJobDao { let mut conn = SqliteConnection::establish(":memory:") .expect("Unable to create in-memory db connection"); conn.run_pending_migrations(DB_MIGRATIONS) .expect("Failure running DB migrations"); SqliteInsightGenerationJobDao::from_connection(Arc::new(Mutex::new(conn))) } fn ctx() -> opentelemetry::Context { opentelemetry::Context::new() } #[test] fn create_job_inserts_new_row() { let mut dao = setup_dao(); let ctx = ctx(); let job_id_1 = dao .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); let job_id_2 = dao .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); assert_ne!(job_id_1, job_id_2, "each create_job call inserts a new row"); } #[test] fn complete_job_sets_result() { let mut dao = setup_dao(); let ctx = ctx(); let job_id = dao .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); dao.complete_job(&ctx, job_id, 42).unwrap(); let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); assert_eq!(job.status, InsightJobStatus::Completed.as_str()); assert_eq!(job.result_insight_id, Some(42)); assert!(job.completed_at.is_some()); } #[test] fn fail_job_sets_error() { let mut dao = setup_dao(); let ctx = ctx(); let job_id = dao .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Agentic) .unwrap(); dao.fail_job(&ctx, job_id, "model timeout").unwrap(); let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); assert_eq!(job.status, InsightJobStatus::Failed.as_str()); assert_eq!(job.error_message.as_deref(), Some("model timeout")); assert!(job.completed_at.is_some()); } #[test] fn get_active_job_returns_none_when_completed() { let mut dao = setup_dao(); let ctx = ctx(); let job_id = dao .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); // Job is running let active = dao.get_active_job(&ctx, 1, "photos/test.jpg").unwrap(); assert!(active.is_some()); assert_eq!(active.unwrap().id, job_id); // Complete it dao.complete_job(&ctx, job_id, 1).unwrap(); // No longer active let active = dao.get_active_job(&ctx, 1, "photos/test.jpg").unwrap(); assert!(active.is_none()); } #[test] fn cancel_active_jobs() { let mut dao = setup_dao(); let ctx = ctx(); let job_id = dao .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); let cancelled = dao.cancel_active_jobs(&ctx, 1, "photos/test.jpg").unwrap(); assert_eq!(cancelled, 1, "should cancel 1 running job"); // Job is no longer active let active = dao.get_active_job(&ctx, 1, "photos/test.jpg").unwrap(); assert!(active.is_none()); // Job exists with cancelled status let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); assert_eq!(job.status, InsightJobStatus::Cancelled.as_str()); // Cancelling again returns 0 (nothing to cancel) let cancelled2 = dao.cancel_active_jobs(&ctx, 1, "photos/test.jpg").unwrap(); assert_eq!(cancelled2, 0, "should return 0 when no running job"); } #[test] fn get_active_job_scoped_by_library() { let mut dao = setup_dao(); let ctx = ctx(); let job_id_1 = dao .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); let job_id_2 = dao .create_job(&ctx, 2, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); assert_ne!( job_id_1, job_id_2, "different libraries should have separate jobs" ); // Complete lib1's job dao.complete_job(&ctx, job_id_1, 1).unwrap(); // lib1 has no active job let active1 = dao.get_active_job(&ctx, 1, "photos/test.jpg").unwrap(); assert!(active1.is_none()); // lib2 still has active job let active2 = dao.get_active_job(&ctx, 2, "photos/test.jpg").unwrap(); assert!(active2.is_some()); assert_eq!(active2.unwrap().id, job_id_2); } #[test] fn get_job_by_id_finds_any_status() { let mut dao = setup_dao(); let ctx = ctx(); let job_id = dao .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); // Find while running let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); assert_eq!(job.status, InsightJobStatus::Running.as_str()); // Complete it dao.complete_job(&ctx, job_id, 99).unwrap(); // Still findable let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); assert_eq!(job.status, InsightJobStatus::Completed.as_str()); assert_eq!(job.result_insight_id, Some(99)); } #[test] fn recover_orphaned_jobs() { let mut dao = setup_dao(); let ctx = ctx(); // Create two running jobs let job_id_1 = dao .create_job(&ctx, 1, "photos/a.jpg", InsightGenerationType::Standard) .unwrap(); let job_id_2 = dao .create_job(&ctx, 1, "photos/b.jpg", InsightGenerationType::Agentic) .unwrap(); // Complete one dao.complete_job(&ctx, job_id_1, 1).unwrap(); // Recover should only affect the running job let recovered = dao.recover_orphaned_jobs(&ctx).unwrap(); assert_eq!(recovered, 1, "should recover exactly 1 running job"); // job_id_1 is still completed let job1 = dao.get_job_by_id(&ctx, job_id_1).unwrap().unwrap(); assert_eq!(job1.status, InsightJobStatus::Completed.as_str()); // job_id_2 is now failed with recovery message let job2 = dao.get_job_by_id(&ctx, job_id_2).unwrap().unwrap(); assert_eq!(job2.status, InsightJobStatus::Failed.as_str()); assert_eq!( job2.error_message.as_deref(), Some("server crashed while running") ); // Second recovery is a no-op let recovered2 = dao.recover_orphaned_jobs(&ctx).unwrap(); assert_eq!(recovered2, 0, "no running jobs remain"); } #[test] fn complete_job_noop_when_cancelled() { let mut dao = setup_dao(); let ctx = ctx(); let job_id = dao .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); dao.cancel_job(&ctx, job_id).unwrap(); // Late-completing task tries to mark as completed — should be a no-op dao.complete_job(&ctx, job_id, 42).unwrap(); let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); assert_eq!( job.status, InsightJobStatus::Cancelled.as_str(), "cancelled status must not be overwritten by late complete" ); assert_eq!( job.result_insight_id, None, "insight_id must stay None when complete is a no-op" ); } #[test] fn fail_job_noop_when_cancelled() { let mut dao = setup_dao(); let ctx = ctx(); let job_id = dao .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Agentic) .unwrap(); dao.cancel_job(&ctx, job_id).unwrap(); // Late-failing task tries to mark as failed — should be a no-op dao.fail_job(&ctx, job_id, "timeout after 120s").unwrap(); let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); assert_eq!( job.status, InsightJobStatus::Cancelled.as_str(), "cancelled status must not be overwritten by late fail" ); assert_eq!( job.error_message.as_deref(), Some("cancelled by user"), "error_message must reflect the cancel, not the late fail" ); } #[test] fn cancel_job_by_id() { let mut dao = setup_dao(); let ctx = ctx(); let job_id = dao .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); let cancelled = dao.cancel_job(&ctx, job_id).unwrap(); assert!(cancelled, "should cancel running job"); let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); assert_eq!(job.status, InsightJobStatus::Cancelled.as_str()); assert!(job.completed_at.is_some()); // Cancelling again is a no-op let cancelled2 = dao.cancel_job(&ctx, job_id).unwrap(); assert!(!cancelled2, "already cancelled job should return false"); } }