feat: async insight generation with SQLite job tracking

- Add insight_generation_jobs table migration and DAO
- Implement job lifecycle: create_or_get_active, complete, fail, cancel
- Refactor POST /insights/generate and /agentic to async spawn with timeout
- Add GET /insights/generation/status endpoint with job_id and file_path lookup
- Use String for enum fields in Diesel models to avoid private Bound type
- Add from_str() helpers on InsightJobStatus and InsightGenerationType
- Fix update_training_messages to return Result<usize, DbError>
- 7/7 DAO unit tests passing
This commit is contained in:
Cameron Cordes
2026-05-27 10:01:17 -04:00
parent 5a75d1a28c
commit b87eb4e690
13 changed files with 1046 additions and 174 deletions
+503
View File
@@ -0,0 +1,503 @@
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use crate::database::models::{
InsertInsightGenerationJob, InsightGenerationJob, InsightGenerationType, InsightJobStatus,
};
use crate::database::schema;
use crate::database::{DbError, DbErrorKind, connect};
use crate::otel::trace_db_call;
/// Tracks async insight generation jobs. The idempotent insert ensures
/// concurrent callers for the same (library_id, file_path, generation_type)
/// get the same job_id rather than creating duplicates.
pub trait InsightGenerationJobDao: Sync + Send {
/// Insert a new job or return the existing running job for the same key.
/// Returns the job_id either way.
fn create_or_get_active_job(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
file_path: &str,
generation_type: InsightGenerationType,
) -> Result<i32, DbError>;
/// Mark a job as completed with the resulting insight id.
fn complete_job(
&mut self,
context: &opentelemetry::Context,
job_id: i32,
insight_id: i32,
) -> Result<(), DbError>;
/// Mark a job as failed with an error message.
fn fail_job(
&mut self,
context: &opentelemetry::Context,
job_id: i32,
error_message: &str,
) -> Result<(), DbError>;
/// Mark the active running job for a file as "cancelled". Returns true if
/// a job was found and cancelled, false if no running job existed.
fn cancel_active_job(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
file_path: &str,
generation_type: InsightGenerationType,
) -> Result<bool, DbError>;
/// Find the latest running job for a given file. Returns None if no
/// running job exists.
fn get_active_job(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
file_path: &str,
) -> Result<Option<InsightGenerationJob>, DbError>;
/// Find any job by id regardless of status.
fn get_job_by_id(
&mut self,
context: &opentelemetry::Context,
job_id: i32,
) -> Result<Option<InsightGenerationJob>, DbError>;
}
pub struct SqliteInsightGenerationJobDao {
connection: Arc<Mutex<SqliteConnection>>,
}
impl Default for SqliteInsightGenerationJobDao {
fn default() -> Self {
Self::new()
}
}
impl SqliteInsightGenerationJobDao {
pub fn new() -> Self {
Self {
connection: Arc::new(Mutex::new(connect())),
}
}
#[cfg(test)]
pub fn from_connection(conn: Arc<Mutex<SqliteConnection>>) -> Self {
Self { connection: conn }
}
}
impl InsightGenerationJobDao for SqliteInsightGenerationJobDao {
fn create_or_get_active_job(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
file_path: &str,
generation_type: InsightGenerationType,
) -> Result<i32, DbError> {
trace_db_call(context, "insert", "create_or_get_active_job", |_span| {
use schema::insight_generation_jobs::dsl;
let mut connection = self
.connection
.lock()
.expect("Unable to lock InsightGenerationJobDao");
// Check for existing running job
let existing = dsl::insight_generation_jobs
.filter(
dsl::library_id
.eq(library_id)
.and(dsl::file_path.eq(file_path))
.and(dsl::generation_type.eq(generation_type.as_str()))
.and(dsl::status.eq(InsightJobStatus::Running.as_str())),
)
.select(dsl::id)
.first::<i32>(connection.deref_mut())
.optional();
match existing {
Ok(Some(job_id)) => return Ok(job_id),
Ok(None) => {}
Err(e) => return Err(anyhow::anyhow!("Failed to check existing job: {}", e)),
}
// No running job exists, insert new one (upsert on conflict)
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as i64;
let new_job = InsertInsightGenerationJob {
library_id,
path: file_path.to_string(),
gen_type: generation_type.to_string(),
status: InsightJobStatus::Running.to_string(),
started_at: now,
};
diesel::insert_into(dsl::insight_generation_jobs)
.values(&new_job)
.on_conflict((dsl::library_id, dsl::file_path, dsl::generation_type))
.do_update()
.set((
dsl::status.eq(InsightJobStatus::Running.as_str()),
dsl::started_at.eq(now),
))
.execute(connection.deref_mut())
.map_err(|e| anyhow::anyhow!("Failed to insert job: {}", e))?;
// Get the job id
dsl::insight_generation_jobs
.filter(
dsl::library_id
.eq(library_id)
.and(dsl::file_path.eq(file_path))
.and(dsl::generation_type.eq(generation_type.as_str())),
)
.select(dsl::id)
.order(dsl::id.desc())
.first::<i32>(connection.deref_mut())
.map_err(|e| anyhow::anyhow!("Failed to get job id: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn complete_job(
&mut self,
context: &opentelemetry::Context,
job_id: i32,
insight_id: i32,
) -> Result<(), DbError> {
trace_db_call(context, "update", "complete_job", |_span| {
use schema::insight_generation_jobs::dsl;
let mut connection = self
.connection
.lock()
.expect("Unable to lock InsightGenerationJobDao");
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as i64;
diesel::update(dsl::insight_generation_jobs.filter(dsl::id.eq(job_id)))
.set((
dsl::status.eq(InsightJobStatus::Completed.as_str()),
dsl::completed_at.eq(Some(now)),
dsl::result_insight_id.eq(Some(insight_id)),
))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Failed to complete job: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn fail_job(
&mut self,
context: &opentelemetry::Context,
job_id: i32,
error_message: &str,
) -> Result<(), DbError> {
trace_db_call(context, "update", "fail_job", |_span| {
use schema::insight_generation_jobs::dsl;
let mut connection = self
.connection
.lock()
.expect("Unable to lock InsightGenerationJobDao");
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as i64;
diesel::update(dsl::insight_generation_jobs.filter(dsl::id.eq(job_id)))
.set((
dsl::status.eq(InsightJobStatus::Failed.as_str()),
dsl::completed_at.eq(Some(now)),
dsl::error_message.eq(Some(error_message.to_string())),
))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Failed to fail job: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn cancel_active_job(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
file_path: &str,
generation_type: InsightGenerationType,
) -> Result<bool, DbError> {
trace_db_call(context, "update", "cancel_active_job", |_span| {
use schema::insight_generation_jobs::dsl;
let mut connection = self
.connection
.lock()
.expect("Unable to lock InsightGenerationJobDao");
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as i64;
let rows = diesel::update(
dsl::insight_generation_jobs.filter(
dsl::library_id
.eq(library_id)
.and(dsl::file_path.eq(file_path))
.and(dsl::generation_type.eq(generation_type.as_str()))
.and(dsl::status.eq(InsightJobStatus::Running.as_str())),
),
)
.set((
dsl::status.eq(InsightJobStatus::Cancelled.as_str()),
dsl::completed_at.eq(Some(now)),
dsl::error_message.eq(Some("cancelled by newer request".to_string())),
))
.execute(connection.deref_mut())
.map_err(|e| anyhow::anyhow!("Failed to cancel job: {}", e))?;
Ok(rows > 0)
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn get_active_job(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
file_path: &str,
) -> Result<Option<InsightGenerationJob>, DbError> {
trace_db_call(context, "query", "get_active_job", |_span| {
use schema::insight_generation_jobs::dsl;
let mut connection = self
.connection
.lock()
.expect("Unable to lock InsightGenerationJobDao");
dsl::insight_generation_jobs
.filter(
dsl::library_id
.eq(library_id)
.and(dsl::file_path.eq(file_path))
.and(dsl::status.eq(InsightJobStatus::Running.as_str())),
)
.order(dsl::id.desc())
.first::<InsightGenerationJob>(connection.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Failed to get active job: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_job_by_id(
&mut self,
context: &opentelemetry::Context,
job_id: i32,
) -> Result<Option<InsightGenerationJob>, DbError> {
trace_db_call(context, "query", "get_job_by_id", |_span| {
use schema::insight_generation_jobs::dsl;
let mut connection = self
.connection
.lock()
.expect("Unable to lock InsightGenerationJobDao");
dsl::insight_generation_jobs
.filter(dsl::id.eq(job_id))
.first::<InsightGenerationJob>(connection.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Failed to get job: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
}
#[cfg(test)]
mod tests {
use super::*;
use diesel::Connection;
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
const DB_MIGRATIONS: EmbeddedMigrations = embed_migrations!();
fn setup_dao() -> SqliteInsightGenerationJobDao {
let mut conn = SqliteConnection::establish(":memory:")
.expect("Unable to create in-memory db connection");
conn.run_pending_migrations(DB_MIGRATIONS)
.expect("Failure running DB migrations");
SqliteInsightGenerationJobDao::from_connection(Arc::new(Mutex::new(conn)))
}
fn ctx() -> opentelemetry::Context {
opentelemetry::Context::new()
}
#[test]
fn create_job_idempotent() {
let mut dao = setup_dao();
let ctx = ctx();
let job_id_1 = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
let job_id_2 = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
assert_eq!(
job_id_1, job_id_2,
"idempotent insert should return same job_id"
);
}
#[test]
fn complete_job_sets_result() {
let mut dao = setup_dao();
let ctx = ctx();
let job_id = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
dao.complete_job(&ctx, job_id, 42).unwrap();
let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap();
assert_eq!(job.status, InsightJobStatus::Completed.as_str());
assert_eq!(job.result_insight_id, Some(42));
assert!(job.completed_at.is_some());
}
#[test]
fn fail_job_sets_error() {
let mut dao = setup_dao();
let ctx = ctx();
let job_id = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Agentic)
.unwrap();
dao.fail_job(&ctx, job_id, "model timeout").unwrap();
let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap();
assert_eq!(job.status, InsightJobStatus::Failed.as_str());
assert_eq!(job.error_message.as_deref(), Some("model timeout"));
assert!(job.completed_at.is_some());
}
#[test]
fn get_active_job_returns_none_when_completed() {
let mut dao = setup_dao();
let ctx = ctx();
let job_id = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
// Job is running
let active = dao.get_active_job(&ctx, 1, "photos/test.jpg").unwrap();
assert!(active.is_some());
assert_eq!(active.unwrap().id, job_id);
// Complete it
dao.complete_job(&ctx, job_id, 1).unwrap();
// No longer active
let active = dao.get_active_job(&ctx, 1, "photos/test.jpg").unwrap();
assert!(active.is_none());
}
#[test]
fn cancel_active_job() {
let mut dao = setup_dao();
let ctx = ctx();
let job_id = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
let cancelled = dao
.cancel_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
assert!(cancelled, "should cancel existing running job");
// Job is no longer active
let active = dao.get_active_job(&ctx, 1, "photos/test.jpg").unwrap();
assert!(active.is_none());
// Job exists with cancelled status
let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap();
assert_eq!(job.status, InsightJobStatus::Cancelled.as_str());
// Cancelling again returns false (nothing to cancel)
let cancelled2 = dao
.cancel_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
assert!(!cancelled2, "should return false when no running job");
}
#[test]
fn get_active_job_scoped_by_library() {
let mut dao = setup_dao();
let ctx = ctx();
let job_id_1 = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
let job_id_2 = dao
.create_or_get_active_job(&ctx, 2, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
assert_ne!(
job_id_1, job_id_2,
"different libraries should have separate jobs"
);
// Complete lib1's job
dao.complete_job(&ctx, job_id_1, 1).unwrap();
// lib1 has no active job
let active1 = dao.get_active_job(&ctx, 1, "photos/test.jpg").unwrap();
assert!(active1.is_none());
// lib2 still has active job
let active2 = dao.get_active_job(&ctx, 2, "photos/test.jpg").unwrap();
assert!(active2.is_some());
assert_eq!(active2.unwrap().id, job_id_2);
}
#[test]
fn get_job_by_id_finds_any_status() {
let mut dao = setup_dao();
let ctx = ctx();
let job_id = dao
.create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard)
.unwrap();
// Find while running
let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap();
assert_eq!(job.status, InsightJobStatus::Running.as_str());
// Complete it
dao.complete_job(&ctx, job_id, 99).unwrap();
// Still findable
let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap();
assert_eq!(job.status, InsightJobStatus::Completed.as_str());
assert_eq!(job.result_insight_id, Some(99));
}
}
+4 -3
View File
@@ -90,13 +90,15 @@ pub trait InsightDao: Sync + Send {
/// Replace the `training_messages` JSON blob on the current row for
/// `(library_id, rel_path)`. Used by chat-turn append mode to persist
/// the extended conversation without inserting a new insight version.
/// Returns the number of rows affected (0 if no current row matched,
/// indicating a concurrent regenerate/reconcile flipped `is_current`).
fn update_training_messages(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
file_path: &str,
training_messages_json: &str,
) -> Result<(), DbError>;
) -> Result<usize, DbError>;
}
pub struct SqliteInsightDao {
@@ -372,7 +374,7 @@ impl InsightDao for SqliteInsightDao {
lib_id: i32,
path: &str,
training_messages_json: &str,
) -> Result<(), DbError> {
) -> Result<usize, DbError> {
trace_db_call(context, "update", "update_training_messages", |_span| {
use schema::photo_insights::dsl::*;
@@ -386,7 +388,6 @@ impl InsightDao for SqliteInsightDao {
)
.set(training_messages.eq(Some(training_messages_json.to_string())))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|_| anyhow::anyhow!("Update error"))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
+2
View File
@@ -45,6 +45,7 @@ pub struct DuplicateRow {
pub mod calendar_dao;
pub mod daily_summary_dao;
pub mod insight_generation_job_dao;
pub mod insights_dao;
pub mod knowledge_dao;
pub mod location_dao;
@@ -57,6 +58,7 @@ pub mod search_dao;
pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao};
pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao};
pub use insight_generation_job_dao::{InsightGenerationJobDao, SqliteInsightGenerationJobDao};
pub use insights_dao::{InsightDao, SqliteInsightDao};
pub use knowledge_dao::{
ConsolidationGroup, EntityFilter, EntityGraph, EntityPatch, EntitySort, FactFilter, FactPatch,
+101 -2
View File
@@ -1,9 +1,81 @@
use crate::database::schema::{
entities, entity_facts, entity_photo_links, favorites, image_exif, libraries, personas,
photo_insights, users, video_preview_clips,
entities, entity_facts, entity_photo_links, favorites, image_exif, insight_generation_jobs,
libraries, personas, photo_insights, users, video_preview_clips,
};
use serde::Serialize;
/// Possible statuses for an insight generation job.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, FromSqlRow)]
#[serde(rename_all = "snake_case")]
pub enum InsightJobStatus {
Running,
Completed,
Failed,
Cancelled,
}
impl InsightJobStatus {
pub fn as_str(&self) -> &'static str {
match self {
Self::Running => "running",
Self::Completed => "completed",
Self::Failed => "failed",
Self::Cancelled => "cancelled",
}
}
}
impl ToString for InsightJobStatus {
fn to_string(&self) -> String {
self.as_str().to_string()
}
}
impl InsightJobStatus {
pub fn from_str(s: &str) -> Self {
match s {
"running" => Self::Running,
"completed" => Self::Completed,
"failed" => Self::Failed,
"cancelled" => Self::Cancelled,
_ => Self::Failed,
}
}
}
/// Type of insight generation (standard vs agentic).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum InsightGenerationType {
Standard,
Agentic,
}
impl InsightGenerationType {
pub fn as_str(&self) -> &'static str {
match self {
Self::Standard => "standard",
Self::Agentic => "agentic",
}
}
}
impl ToString for InsightGenerationType {
fn to_string(&self) -> String {
self.as_str().to_string()
}
}
impl InsightGenerationType {
pub fn from_str(s: &str) -> Self {
match s {
"standard" => Self::Standard,
"agentic" => Self::Agentic,
_ => Self::Standard,
}
}
}
#[derive(Insertable)]
#[diesel(table_name = users)]
pub struct InsertUser<'a> {
@@ -394,3 +466,30 @@ pub struct VideoPreviewClip {
pub created_at: String,
pub updated_at: String,
}
#[derive(Insertable)]
#[diesel(table_name = insight_generation_jobs)]
pub struct InsertInsightGenerationJob {
pub library_id: i32,
#[diesel(column_name = file_path)]
pub path: String,
#[diesel(column_name = generation_type)]
pub gen_type: String,
pub status: String,
pub started_at: i64,
}
#[derive(Queryable, Serialize, Clone, Debug)]
pub struct InsightGenerationJob {
pub id: i32,
pub library_id: i32,
#[diesel(column_name = file_path)]
pub path: String,
#[diesel(column_name = generation_type)]
pub gen_type: String,
pub status: String,
pub started_at: i64,
pub completed_at: Option<i64>,
pub result_insight_id: Option<i32>,
pub error_message: Option<String>,
}
+16
View File
@@ -271,12 +271,27 @@ diesel::table! {
}
}
diesel::table! {
insight_generation_jobs (id) {
id -> Integer,
library_id -> Integer,
file_path -> Text,
generation_type -> Text,
status -> Text,
started_at -> BigInt,
completed_at -> Nullable<BigInt>,
result_insight_id -> Nullable<Integer>,
error_message -> Nullable<Text>,
}
}
diesel::joinable!(entity_facts -> photo_insights (source_insight_id));
diesel::joinable!(entity_photo_links -> entities (entity_id));
diesel::joinable!(entity_photo_links -> libraries (library_id));
diesel::joinable!(face_detections -> libraries (library_id));
diesel::joinable!(face_detections -> persons (person_id));
diesel::joinable!(image_exif -> libraries (library_id));
diesel::joinable!(insight_generation_jobs -> libraries (library_id));
diesel::joinable!(personas -> users (user_id));
diesel::joinable!(persons -> entities (entity_id));
diesel::joinable!(photo_insights -> libraries (library_id));
@@ -292,6 +307,7 @@ diesel::allow_tables_to_appear_in_same_query!(
face_detections,
favorites,
image_exif,
insight_generation_jobs,
libraries,
location_history,
personas,