fix: audit fixes for async insight jobs + persist generation params

- Fix query param mismatch: rename GenerationStatusQuery.file_path to
  path so the client's app-resume buildQuery({ path: ... }) resolves
  correctly instead of always getting 400
- Remove dead _lib_id bindings from both generate handlers
- Return 202 Accepted instead of 200 from generate endpoints
- Restore OpenTelemetry span instrumentation on generate handlers
- Remove stale UNIQUE constraint from initial migration (incompatible
  with plain-INSERT DAO)
- Add tests for status guard: complete_job/fail_job are no-ops when
  job is already cancelled, and cancel_job by id
- Persist generation params (num_ctx, temperature, top_p, top_k, min_p,
  system_prompt, persona_id) on the photo_insights table for auditing

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Cameron Cordes
2026-05-27 13:02:15 -04:00
parent b87eb4e690
commit 2818936739
14 changed files with 786 additions and 194 deletions
+270 -92
View File
@@ -10,13 +10,13 @@ use crate::database::schema;
use crate::database::{DbError, DbErrorKind, connect};
use crate::otel::trace_db_call;
/// Tracks async insight generation jobs. The idempotent insert ensures
/// concurrent callers for the same (library_id, file_path, generation_type)
/// get the same job_id rather than creating duplicates.
/// Tracks async insight generation jobs. Each call to `create_job` inserts
/// a new row; the application layer prevents concurrent running jobs by
/// cancelling the old one before creating a new one.
pub trait InsightGenerationJobDao: Sync + Send {
/// Insert a new job or return the existing running job for the same key.
/// Returns the job_id either way.
fn create_or_get_active_job(
/// Insert a new running job. Always creates a new row (no upsert).
/// Cleans up terminal-state rows for the same key first.
fn create_job(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
@@ -24,7 +24,9 @@ pub trait InsightGenerationJobDao: Sync + Send {
generation_type: InsightGenerationType,
) -> Result<i32, DbError>;
/// Mark a job as completed with the resulting insight id.
/// Mark a job as completed with the resulting insight id. Only updates
/// if the job is still in "running" status (prevents overwriting a
/// cancelled job with a late-completing task).
fn complete_job(
&mut self,
context: &opentelemetry::Context,
@@ -32,7 +34,8 @@ pub trait InsightGenerationJobDao: Sync + Send {
insight_id: i32,
) -> Result<(), DbError>;
/// Mark a job as failed with an error message.
/// Mark a job as failed with an error message. Only updates if the job
/// is still in "running" status.
fn fail_job(
&mut self,
context: &opentelemetry::Context,
@@ -40,15 +43,22 @@ pub trait InsightGenerationJobDao: Sync + Send {
error_message: &str,
) -> Result<(), DbError>;
/// Mark the active running job for a file as "cancelled". Returns true if
/// a job was found and cancelled, false if no running job existed.
fn cancel_active_job(
/// Cancel a specific job by id. Only updates if the job is still
/// in "running" status. Returns true if a row was updated.
fn cancel_job(
&mut self,
context: &opentelemetry::Context,
job_id: i32,
) -> Result<bool, DbError>;
/// Cancel all running jobs for a given file. Returns the number of
/// jobs cancelled.
fn cancel_active_jobs(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
file_path: &str,
generation_type: InsightGenerationType,
) -> Result<bool, DbError>;
) -> Result<usize, DbError>;
/// Find the latest running job for a given file. Returns None if no
/// running job exists.
@@ -65,6 +75,11 @@ pub trait InsightGenerationJobDao: Sync + Send {
context: &opentelemetry::Context,
job_id: i32,
) -> Result<Option<InsightGenerationJob>, DbError>;
/// Mark all jobs still in "running" status as "failed" with a recovery
/// error message. Returns the number of jobs recovered.
fn recover_orphaned_jobs(&mut self, context: &opentelemetry::Context)
-> Result<usize, DbError>;
}
pub struct SqliteInsightGenerationJobDao {
@@ -91,14 +106,14 @@ impl SqliteInsightGenerationJobDao {
}
impl InsightGenerationJobDao for SqliteInsightGenerationJobDao {
fn create_or_get_active_job(
fn create_job(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
file_path: &str,
generation_type: InsightGenerationType,
) -> Result<i32, DbError> {
trace_db_call(context, "insert", "create_or_get_active_job", |_span| {
trace_db_call(context, "insert", "create_job", |_span| {
use schema::insight_generation_jobs::dsl;
let mut connection = self
@@ -106,26 +121,6 @@ impl InsightGenerationJobDao for SqliteInsightGenerationJobDao {
.lock()
.expect("Unable to lock InsightGenerationJobDao");
// Check for existing running job
let existing = dsl::insight_generation_jobs
.filter(
dsl::library_id
.eq(library_id)
.and(dsl::file_path.eq(file_path))
.and(dsl::generation_type.eq(generation_type.as_str()))
.and(dsl::status.eq(InsightJobStatus::Running.as_str())),
)
.select(dsl::id)
.first::<i32>(connection.deref_mut())
.optional();
match existing {
Ok(Some(job_id)) => return Ok(job_id),
Ok(None) => {}
Err(e) => return Err(anyhow::anyhow!("Failed to check existing job: {}", e)),
}
// No running job exists, insert new one (upsert on conflict)
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
@@ -141,22 +136,16 @@ impl InsightGenerationJobDao for SqliteInsightGenerationJobDao {
diesel::insert_into(dsl::insight_generation_jobs)
.values(&new_job)
.on_conflict((dsl::library_id, dsl::file_path, dsl::generation_type))
.do_update()
.set((
dsl::status.eq(InsightJobStatus::Running.as_str()),
dsl::started_at.eq(now),
))
.execute(connection.deref_mut())
.map_err(|e| anyhow::anyhow!("Failed to insert job: {}", e))?;
// Get the job id
dsl::insight_generation_jobs
.filter(
dsl::library_id
.eq(library_id)
.and(dsl::file_path.eq(file_path))
.and(dsl::generation_type.eq(generation_type.as_str())),
.and(dsl::generation_type.eq(generation_type.as_str()))
.and(dsl::status.eq(InsightJobStatus::Running.as_str())),
)
.select(dsl::id)
.order(dsl::id.desc())
@@ -185,15 +174,23 @@ impl InsightGenerationJobDao for SqliteInsightGenerationJobDao {
.expect("Time went backwards")
.as_secs() as i64;
diesel::update(dsl::insight_generation_jobs.filter(dsl::id.eq(job_id)))
.set((
dsl::status.eq(InsightJobStatus::Completed.as_str()),
dsl::completed_at.eq(Some(now)),
dsl::result_insight_id.eq(Some(insight_id)),
))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Failed to complete job: {}", e))
// Only update if still running — prevents cancelled job from
// being overwritten by a late-completing task.
diesel::update(
dsl::insight_generation_jobs.filter(
dsl::id
.eq(job_id)
.and(dsl::status.eq(InsightJobStatus::Running.as_str())),
),
)
.set((
dsl::status.eq(InsightJobStatus::Completed.as_str()),
dsl::completed_at.eq(Some(now)),
dsl::result_insight_id.eq(Some(insight_id)),
))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Failed to complete job: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
@@ -217,27 +214,71 @@ impl InsightGenerationJobDao for SqliteInsightGenerationJobDao {
.expect("Time went backwards")
.as_secs() as i64;
diesel::update(dsl::insight_generation_jobs.filter(dsl::id.eq(job_id)))
.set((
dsl::status.eq(InsightJobStatus::Failed.as_str()),
dsl::completed_at.eq(Some(now)),
dsl::error_message.eq(Some(error_message.to_string())),
))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Failed to fail job: {}", e))
// Only update if still running.
diesel::update(
dsl::insight_generation_jobs.filter(
dsl::id
.eq(job_id)
.and(dsl::status.eq(InsightJobStatus::Running.as_str())),
),
)
.set((
dsl::status.eq(InsightJobStatus::Failed.as_str()),
dsl::completed_at.eq(Some(now)),
dsl::error_message.eq(Some(error_message.to_string())),
))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Failed to fail job: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn cancel_active_job(
fn cancel_job(
&mut self,
context: &opentelemetry::Context,
job_id: i32,
) -> Result<bool, DbError> {
trace_db_call(context, "update", "cancel_job", |_span| {
use schema::insight_generation_jobs::dsl;
let mut connection = self
.connection
.lock()
.expect("Unable to lock InsightGenerationJobDao");
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as i64;
let rows = diesel::update(
dsl::insight_generation_jobs.filter(
dsl::id
.eq(job_id)
.and(dsl::status.eq(InsightJobStatus::Running.as_str())),
),
)
.set((
dsl::status.eq(InsightJobStatus::Cancelled.as_str()),
dsl::completed_at.eq(Some(now)),
dsl::error_message.eq(Some("cancelled by user".to_string())),
))
.execute(connection.deref_mut())
.map_err(|e| anyhow::anyhow!("Failed to cancel job: {}", e))?;
Ok(rows > 0)
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn cancel_active_jobs(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
file_path: &str,
generation_type: InsightGenerationType,
) -> Result<bool, DbError> {
trace_db_call(context, "update", "cancel_active_job", |_span| {
) -> Result<usize, DbError> {
trace_db_call(context, "update", "cancel_active_jobs", |_span| {
use schema::insight_generation_jobs::dsl;
let mut connection = self
@@ -255,7 +296,6 @@ impl InsightGenerationJobDao for SqliteInsightGenerationJobDao {
dsl::library_id
.eq(library_id)
.and(dsl::file_path.eq(file_path))
.and(dsl::generation_type.eq(generation_type.as_str()))
.and(dsl::status.eq(InsightJobStatus::Running.as_str())),
),
)
@@ -265,9 +305,9 @@ impl InsightGenerationJobDao for SqliteInsightGenerationJobDao {
dsl::error_message.eq(Some("cancelled by newer request".to_string())),
))
.execute(connection.deref_mut())
.map_err(|e| anyhow::anyhow!("Failed to cancel job: {}", e))?;
.map_err(|e| anyhow::anyhow!("Failed to cancel active jobs: {}", e))?;
Ok(rows > 0)
Ok(rows)
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
@@ -322,6 +362,40 @@ impl InsightGenerationJobDao for SqliteInsightGenerationJobDao {
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn recover_orphaned_jobs(
&mut self,
context: &opentelemetry::Context,
) -> Result<usize, DbError> {
trace_db_call(context, "update", "recover_orphaned_jobs", |_span| {
use schema::insight_generation_jobs::dsl;
let mut connection = self
.connection
.lock()
.expect("Unable to lock InsightGenerationJobDao");
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as i64;
let rows = diesel::update(
dsl::insight_generation_jobs
.filter(dsl::status.eq(InsightJobStatus::Running.as_str())),
)
.set((
dsl::status.eq(InsightJobStatus::Failed.as_str()),
dsl::completed_at.eq(Some(now)),
dsl::error_message.eq(Some("server crashed while running".to_string())),
))
.execute(connection.deref_mut())
.map_err(|e| anyhow::anyhow!("Failed to recover orphaned jobs: {}", e))?;
Ok(rows)
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
}
#[cfg(test)]
@@ -345,22 +419,19 @@ mod tests {
}
#[test]
fn create_job_idempotent() {
fn create_job_inserts_new_row() {
let mut dao = setup_dao();
let ctx = ctx();
let job_id_1 = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
let job_id_2 = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
assert_eq!(
job_id_1, job_id_2,
"idempotent insert should return same job_id"
);
assert_ne!(job_id_1, job_id_2, "each create_job call inserts a new row");
}
#[test]
@@ -369,7 +440,7 @@ mod tests {
let ctx = ctx();
let job_id = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
dao.complete_job(&ctx, job_id, 42).unwrap();
@@ -386,7 +457,7 @@ mod tests {
let ctx = ctx();
let job_id = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Agentic)
.create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Agentic)
.unwrap();
dao.fail_job(&ctx, job_id, "model timeout").unwrap();
@@ -403,7 +474,7 @@ mod tests {
let ctx = ctx();
let job_id = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
// Job is running
@@ -420,18 +491,16 @@ mod tests {
}
#[test]
fn cancel_active_job() {
fn cancel_active_jobs() {
let mut dao = setup_dao();
let ctx = ctx();
let job_id = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
let cancelled = dao
.cancel_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
assert!(cancelled, "should cancel existing running job");
let cancelled = dao.cancel_active_jobs(&ctx, 1, "photos/test.jpg").unwrap();
assert_eq!(cancelled, 1, "should cancel 1 running job");
// Job is no longer active
let active = dao.get_active_job(&ctx, 1, "photos/test.jpg").unwrap();
@@ -441,11 +510,9 @@ mod tests {
let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap();
assert_eq!(job.status, InsightJobStatus::Cancelled.as_str());
// Cancelling again returns false (nothing to cancel)
let cancelled2 = dao
.cancel_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
assert!(!cancelled2, "should return false when no running job");
// Cancelling again returns 0 (nothing to cancel)
let cancelled2 = dao.cancel_active_jobs(&ctx, 1, "photos/test.jpg").unwrap();
assert_eq!(cancelled2, 0, "should return 0 when no running job");
}
#[test]
@@ -454,11 +521,11 @@ mod tests {
let ctx = ctx();
let job_id_1 = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
let job_id_2 = dao
.create_or_get_active_job(&ctx, 2, "photos/test.jpg", InsightGenerationType::Standard)
.create_job(&ctx, 2, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
assert_ne!(
@@ -485,7 +552,7 @@ mod tests {
let ctx = ctx();
let job_id = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
// Find while running
@@ -500,4 +567,115 @@ mod tests {
assert_eq!(job.status, InsightJobStatus::Completed.as_str());
assert_eq!(job.result_insight_id, Some(99));
}
#[test]
fn recover_orphaned_jobs() {
let mut dao = setup_dao();
let ctx = ctx();
// Create two running jobs
let job_id_1 = dao
.create_job(&ctx, 1, "photos/a.jpg", InsightGenerationType::Standard)
.unwrap();
let job_id_2 = dao
.create_job(&ctx, 1, "photos/b.jpg", InsightGenerationType::Agentic)
.unwrap();
// Complete one
dao.complete_job(&ctx, job_id_1, 1).unwrap();
// Recover should only affect the running job
let recovered = dao.recover_orphaned_jobs(&ctx).unwrap();
assert_eq!(recovered, 1, "should recover exactly 1 running job");
// job_id_1 is still completed
let job1 = dao.get_job_by_id(&ctx, job_id_1).unwrap().unwrap();
assert_eq!(job1.status, InsightJobStatus::Completed.as_str());
// job_id_2 is now failed with recovery message
let job2 = dao.get_job_by_id(&ctx, job_id_2).unwrap().unwrap();
assert_eq!(job2.status, InsightJobStatus::Failed.as_str());
assert_eq!(
job2.error_message.as_deref(),
Some("server crashed while running")
);
// Second recovery is a no-op
let recovered2 = dao.recover_orphaned_jobs(&ctx).unwrap();
assert_eq!(recovered2, 0, "no running jobs remain");
}
#[test]
fn complete_job_noop_when_cancelled() {
let mut dao = setup_dao();
let ctx = ctx();
let job_id = dao
.create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
dao.cancel_job(&ctx, job_id).unwrap();
// Late-completing task tries to mark as completed — should be a no-op
dao.complete_job(&ctx, job_id, 42).unwrap();
let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap();
assert_eq!(
job.status,
InsightJobStatus::Cancelled.as_str(),
"cancelled status must not be overwritten by late complete"
);
assert_eq!(
job.result_insight_id, None,
"insight_id must stay None when complete is a no-op"
);
}
#[test]
fn fail_job_noop_when_cancelled() {
let mut dao = setup_dao();
let ctx = ctx();
let job_id = dao
.create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Agentic)
.unwrap();
dao.cancel_job(&ctx, job_id).unwrap();
// Late-failing task tries to mark as failed — should be a no-op
dao.fail_job(&ctx, job_id, "timeout after 120s").unwrap();
let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap();
assert_eq!(
job.status,
InsightJobStatus::Cancelled.as_str(),
"cancelled status must not be overwritten by late fail"
);
assert_eq!(
job.error_message.as_deref(),
Some("cancelled by user"),
"error_message must reflect the cancel, not the late fail"
);
}
#[test]
fn cancel_job_by_id() {
let mut dao = setup_dao();
let ctx = ctx();
let job_id = dao
.create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
let cancelled = dao.cancel_job(&ctx, job_id).unwrap();
assert!(cancelled, "should cancel running job");
let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap();
assert_eq!(job.status, InsightJobStatus::Cancelled.as_str());
assert!(job.completed_at.is_some());
// Cancelling again is a no-op
let cancelled2 = dao.cancel_job(&ctx, job_id).unwrap();
assert!(!cancelled2, "already cancelled job should return false");
}
}