From b87eb4e690a389bfd5bad4c2a57dbf2553479f1f Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Wed, 27 May 2026 10:01:17 -0400 Subject: [PATCH] feat: async insight generation with SQLite job tracking - Add insight_generation_jobs table migration and DAO - Implement job lifecycle: create_or_get_active, complete, fail, cancel - Refactor POST /insights/generate and /agentic to async spawn with timeout - Add GET /insights/generation/status endpoint with job_id and file_path lookup - Use String for enum fields in Diesel models to avoid private Bound type - Add from_str() helpers on InsightJobStatus and InsightGenerationType - Fix update_training_messages to return Result - 7/7 DAO unit tests passing --- .../down.sql | 3 + .../up.sql | 25 + src/ai/handlers.rs | 492 +++++++++++------ src/ai/insight_chat.rs | 38 +- src/ai/insight_generator.rs | 6 + src/ai/mod.rs | 5 +- src/database/insight_generation_job_dao.rs | 503 ++++++++++++++++++ src/database/insights_dao.rs | 7 +- src/database/mod.rs | 2 + src/database/models.rs | 103 +++- src/database/schema.rs | 16 + src/main.rs | 1 + src/state.rs | 19 +- 13 files changed, 1046 insertions(+), 174 deletions(-) create mode 100644 migrations/2026-05-27-000000_add_insight_generation_jobs/down.sql create mode 100644 migrations/2026-05-27-000000_add_insight_generation_jobs/up.sql create mode 100644 src/database/insight_generation_job_dao.rs diff --git a/migrations/2026-05-27-000000_add_insight_generation_jobs/down.sql b/migrations/2026-05-27-000000_add_insight_generation_jobs/down.sql new file mode 100644 index 0000000..2c9a2a7 --- /dev/null +++ b/migrations/2026-05-27-000000_add_insight_generation_jobs/down.sql @@ -0,0 +1,3 @@ +DROP INDEX IF EXISTS idx_insight_gen_jobs_status_cleanup; +DROP INDEX IF EXISTS idx_insight_gen_jobs_file; +DROP TABLE IF EXISTS insight_generation_jobs; diff --git a/migrations/2026-05-27-000000_add_insight_generation_jobs/up.sql b/migrations/2026-05-27-000000_add_insight_generation_jobs/up.sql new file mode 100644 index 0000000..a20c195 --- /dev/null +++ b/migrations/2026-05-27-000000_add_insight_generation_jobs/up.sql @@ -0,0 +1,25 @@ +-- Track async insight generation jobs so the client can poll for +-- completion after the server returns 202 Accepted. The UNIQUE +-- constraint on (library_id, file_path, generation_type) ensures +-- idempotent inserts: if a running job already exists, the caller +-- should return that job_id instead of creating a duplicate. +CREATE TABLE insight_generation_jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + library_id INTEGER NOT NULL DEFAULT 1, + file_path TEXT NOT NULL, + generation_type TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'running', + started_at INTEGER NOT NULL, + completed_at INTEGER, + result_insight_id INTEGER, + error_message TEXT, + UNIQUE(library_id, file_path, generation_type) +); + +-- For the status endpoint: fast lookup by (library_id, file_path) +CREATE INDEX idx_insight_gen_jobs_file + ON insight_generation_jobs(library_id, file_path); + +-- For startup cleanup (future): prune old completed/failed jobs +CREATE INDEX idx_insight_gen_jobs_status_cleanup + ON insight_generation_jobs(status, started_at); diff --git a/src/ai/handlers.rs b/src/ai/handlers.rs index a7a3720..5773483 100644 --- a/src/ai/handlers.rs +++ b/src/ai/handlers.rs @@ -5,8 +5,9 @@ use serde::{Deserialize, Serialize}; use crate::ai::insight_chat::{ChatStreamEvent, ChatTurnRequest}; use crate::ai::ollama::ChatMessage; -use crate::ai::{InsightGenerator, ModelCapabilities, OllamaClient}; +use crate::ai::{ModelCapabilities, OllamaClient}; use crate::data::Claims; +use crate::database::models::{InsightGenerationType, InsightJobStatus}; use crate::database::{ExifDao, InsightDao}; use crate::libraries; use crate::otel::{extract_context_from_request, global_tracer}; @@ -64,6 +65,101 @@ pub struct GetPhotoInsightQuery { pub library: Option, } +#[derive(Debug, Deserialize)] +pub struct GenerationStatusQuery { + /// If provided, look up the job by id. + #[serde(default)] + pub job_id: Option, + /// If provided with `library`, look up the latest running job for this + /// file. Used when the client doesn't have a persisted job_id. + #[serde(default)] + pub file_path: Option, + #[serde(default)] + pub library: Option, +} + +/// GET /insights/generation/status - Check status of a generation job. +/// Accepts either `?job_id=` or `?file_path=&library=`. +#[get("/insights/generation/status")] +pub async fn generation_status_handler( + _claims: Claims, + query: web::Query, + app_state: web::Data, +) -> impl Responder { + let ctx = opentelemetry::Context::new(); + + if let Some(jid) = query.job_id { + let mut dao = app_state + .insight_job_dao + .lock() + .expect("Unable to lock InsightJobDao"); + match dao.get_job_by_id(&ctx, jid) { + Ok(Some(job)) => { + return HttpResponse::Ok().json(GenerationStatusResponse { + job_id: job.id, + status: InsightJobStatus::from_str(&job.status), + started_at: job.started_at, + completed_at: job.completed_at, + result_insight_id: job.result_insight_id, + error_message: job.error_message, + }); + } + Ok(None) => { + return HttpResponse::NotFound().json(serde_json::json!({ + "error": format!("Job {} not found", jid) + })); + } + Err(e) => { + log::error!("Failed to look up job {}: {:?}", jid, e); + return HttpResponse::InternalServerError().json(serde_json::json!({ + "error": "Failed to look up job" + })); + } + } + } + + if let Some(ref fp) = query.file_path { + let library = libraries::resolve_library_param(&app_state, query.library.as_deref()) + .ok() + .flatten() + .unwrap_or_else(|| app_state.primary_library()); + let normalized = normalize_path(fp); + + let mut dao = app_state + .insight_job_dao + .lock() + .expect("Unable to lock InsightJobDao"); + match dao.get_active_job(&ctx, library.id, &normalized) { + Ok(Some(job)) => { + return HttpResponse::Ok().json(GenerationStatusResponse { + job_id: job.id, + status: InsightJobStatus::from_str(&job.status), + started_at: job.started_at, + completed_at: job.completed_at, + result_insight_id: job.result_insight_id, + error_message: job.error_message, + }); + } + Ok(None) => { + return HttpResponse::Ok().json(serde_json::json!({ + "status": "idle", + "message": "No running generation job for this file" + })); + } + Err(e) => { + log::error!("Failed to look up active job for {}: {:?}", normalized, e); + return HttpResponse::InternalServerError().json(serde_json::json!({ + "error": "Failed to look up active job" + })); + } + } + } + + HttpResponse::BadRequest().json(serde_json::json!({ + "error": "Provide either job_id or file_path query parameter" + })) +} + #[derive(Debug, Deserialize)] pub struct RateInsightRequest { pub file_path: String, @@ -76,6 +172,24 @@ pub struct ExportTrainingDataQuery { pub approved_only: Option, } +#[derive(Debug, Serialize)] +pub struct JobIdResponse { + pub job_id: i32, +} + +#[derive(Debug, Serialize)] +pub struct GenerationStatusResponse { + pub job_id: i32, + pub status: InsightJobStatus, + pub started_at: i64, + #[serde(skip_serializing_if = "Option::is_none")] + pub completed_at: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub result_insight_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error_message: Option, +} + #[derive(Debug, Serialize)] pub struct PhotoInsightResponse { pub id: i32, @@ -110,70 +224,123 @@ pub struct ServerModels { pub default_model: String, } -/// POST /insights/generate - Generate insight for a specific photo +/// POST /insights/generate - Generate insight for a specific photo (async) #[post("/insights/generate")] pub async fn generate_insight_handler( - http_request: HttpRequest, + _http_request: HttpRequest, _claims: Claims, request: web::Json, - insight_generator: web::Data, + app_state: web::Data, ) -> impl Responder { - let parent_context = extract_context_from_request(&http_request); - let tracer = global_tracer(); - let mut span = tracer.start_with_context("http.insights.generate", &parent_context); - let normalized_path = normalize_path(&request.file_path); - - span.set_attribute(KeyValue::new("file_path", normalized_path.clone())); - if let Some(ref model) = request.model { - span.set_attribute(KeyValue::new("model", model.clone())); - } - if let Some(ref prompt) = request.system_prompt { - span.set_attribute(KeyValue::new("has_custom_prompt", true)); - span.set_attribute(KeyValue::new("prompt_length", prompt.len() as i64)); - } - if let Some(ctx) = request.num_ctx { - span.set_attribute(KeyValue::new("num_ctx", ctx as i64)); - } + let library = app_state.primary_library(); + let gen_type = InsightGenerationType::Standard; log::info!( - "Manual insight generation triggered for photo: {} with model: {:?}, custom_prompt: {}, num_ctx: {:?}", + "Manual insight generation triggered for photo: {} with model: {:?}", normalized_path, - request.model, - request.system_prompt.is_some(), - request.num_ctx + request.model ); - // Generate insight with optional custom model, system prompt, and context size - let result = insight_generator - .generate_insight_for_photo_with_config( + // Cancel any running job for this file, then create a fresh one + { + let mut dao = app_state + .insight_job_dao + .lock() + .expect("Unable to lock InsightJobDao"); + let _ = dao.cancel_active_job( + &opentelemetry::Context::new(), + library.id, &normalized_path, - request.model.clone(), - request.system_prompt.clone(), - request.num_ctx, - request.temperature, - request.top_p, - request.top_k, - request.min_p, + gen_type, + ); + } + + let job_id = { + let mut dao = app_state + .insight_job_dao + .lock() + .expect("Unable to lock InsightJobDao"); + match dao.create_or_get_active_job( + &opentelemetry::Context::new(), + library.id, + &normalized_path, + gen_type, + ) { + Ok(id) => id, + Err(e) => { + log::error!("Failed to create generation job: {:?}", e); + return HttpResponse::InternalServerError().json(serde_json::json!({ + "error": "Failed to create generation job" + })); + } + } + }; + + // Spawn background task with timeout + let generator = app_state.insight_generator.clone(); + let job_dao = app_state.insight_job_dao.clone(); + let lib_id = library.id; + let path = normalized_path.clone(); + + tokio::spawn(async move { + let timeout_secs: u64 = std::env::var("INSIGHT_GENERATION_TIMEOUT_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(120); + + let result = tokio::time::timeout( + std::time::Duration::from_secs(timeout_secs), + generator.generate_insight_for_photo_with_config( + &path, + request.model.clone(), + request.system_prompt.clone(), + request.num_ctx, + request.temperature, + request.top_p, + request.top_k, + request.min_p, + ), ) .await; - match result { - Ok(()) => { - span.set_status(Status::Ok); - HttpResponse::Ok().json(serde_json::json!({ - "success": true, - "message": "Insight generated successfully" - })) + let ctx = opentelemetry::Context::new(); + let mut dao = job_dao.lock().expect("Unable to lock InsightJobDao"); + + match result { + Ok(Ok(())) => { + // Look up the stored insight id to record on the job + let mut insight_dao = generator + .insight_dao() + .lock() + .expect("Unable to lock InsightDao"); + let insight_id = insight_dao + .get_insight(&ctx, &path) + .ok() + .flatten() + .map(|i| i.id); + if let Some(id) = insight_id { + let _ = dao.complete_job(&ctx, job_id, id); + } else { + let _ = dao.fail_job(&ctx, job_id, "generation returned no insight"); + } + } + Ok(Err(e)) => { + log::error!("Insight generation failed for {}: {:?}", path, e); + let _ = dao.fail_job(&ctx, job_id, &format!("{:?}", e)); + } + Err(_) => { + log::error!( + "Insight generation timed out for {} after {}s", + path, + timeout_secs + ); + let _ = dao.fail_job(&ctx, job_id, &format!("timeout after {}s", timeout_secs)); + } } - Err(e) => { - log::error!("Failed to generate insight: {:?}", e); - span.set_status(Status::error(e.to_string())); - HttpResponse::InternalServerError().json(serde_json::json!({ - "error": format!("Failed to generate insight: {:?}", e) - })) - } - } + }); + + HttpResponse::Ok().json(JobIdResponse { job_id }) } /// GET /insights?path=/path/to/photo.jpg - Fetch insight for specific photo @@ -301,56 +468,60 @@ pub async fn get_all_insights_handler( } } -/// POST /insights/generate/agentic - Generate insight using agentic tool-calling loop +/// POST /insights/generate/agentic - Generate insight using agentic tool-calling loop (async) #[post("/insights/generate/agentic")] pub async fn generate_agentic_insight_handler( - http_request: HttpRequest, + _http_request: HttpRequest, claims: Claims, request: web::Json, - insight_generator: web::Data, - insight_dao: web::Data>>, + app_state: web::Data, ) -> impl Responder { - // Service tokens (sub: "service:apollo") fall through to user_id=1 - // — the operator convention. Mobile/web clients have a numeric sub. - let user_id = claims.sub.parse::().unwrap_or(1); - let parent_context = extract_context_from_request(&http_request); - let tracer = global_tracer(); - let mut span = tracer.start_with_context("http.insights.generate_agentic", &parent_context); - let normalized_path = normalize_path(&request.file_path); - - span.set_attribute(KeyValue::new("file_path", normalized_path.clone())); - if let Some(ref model) = request.model { - span.set_attribute(KeyValue::new("model", model.clone())); - } - if let Some(ref prompt) = request.system_prompt { - span.set_attribute(KeyValue::new("has_custom_prompt", true)); - span.set_attribute(KeyValue::new("prompt_length", prompt.len() as i64)); - } - if let Some(ctx) = request.num_ctx { - span.set_attribute(KeyValue::new("num_ctx", ctx as i64)); - } - - let max_iterations: usize = std::env::var("AGENTIC_MAX_ITERATIONS") - .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(12); - - span.set_attribute(KeyValue::new("max_iterations", max_iterations as i64)); + let library = app_state.primary_library(); + let gen_type = InsightGenerationType::Agentic; log::info!( - "Agentic insight generation triggered for photo: {} with model: {:?}, max_iterations: {}", + "Agentic insight generation triggered for photo: {} with model: {:?}", normalized_path, - request.model, - max_iterations + request.model ); - if let Some(ref b) = request.backend { - span.set_attribute(KeyValue::new("backend", b.clone())); + // Cancel any running job for this file, then create a fresh one + { + let mut dao = app_state + .insight_job_dao + .lock() + .expect("Unable to lock InsightJobDao"); + let _ = dao.cancel_active_job( + &opentelemetry::Context::new(), + library.id, + &normalized_path, + gen_type, + ); } - // Resolve few-shot ids: request-provided ids take precedence when - // non-empty; otherwise fall back to the hardcoded defaults. + let job_id = { + let mut dao = app_state + .insight_job_dao + .lock() + .expect("Unable to lock InsightJobDao"); + match dao.create_or_get_active_job( + &opentelemetry::Context::new(), + library.id, + &normalized_path, + gen_type, + ) { + Ok(id) => id, + Err(e) => { + log::error!("Failed to create agentic generation job: {:?}", e); + return HttpResponse::InternalServerError().json(serde_json::json!({ + "error": "Failed to create generation job" + })); + } + } + }; + + // Resolve few-shot ids for the background task let fewshot_ids: Vec = match request.fewshot_insight_ids.as_deref() { Some(ids) if !ids.is_empty() => ids.iter().take(2).copied().collect(), _ => DEFAULT_FEWSHOT_INSIGHT_IDS @@ -359,11 +530,14 @@ pub async fn generate_agentic_insight_handler( .copied() .collect(), }; - span.set_attribute(KeyValue::new("fewshot_count", fewshot_ids.len() as i64)); let fewshot_examples: Vec> = { let otel_context = opentelemetry::Context::new(); - let mut dao = insight_dao.lock().expect("Unable to lock InsightDao"); + let mut dao = app_state + .insight_chat + .insight_dao() + .lock() + .expect("Unable to lock InsightDao"); fewshot_ids .iter() .filter_map(|id| { @@ -384,90 +558,88 @@ pub async fn generate_agentic_insight_handler( .collect() }; + let user_id = claims.sub.parse::().unwrap_or(1); let persona_id = request .persona_id .clone() .filter(|s| !s.trim().is_empty()) .unwrap_or_else(|| "default".to_string()); - span.set_attribute(KeyValue::new("persona_id", persona_id.clone())); - let result = insight_generator - .generate_agentic_insight_for_photo( - &normalized_path, - request.model.clone(), - request.system_prompt.clone(), - request.num_ctx, - request.temperature, - request.top_p, - request.top_k, - request.min_p, - max_iterations, - request.backend.clone(), - fewshot_examples, - fewshot_ids, - user_id, - persona_id, + let max_iterations: usize = std::env::var("AGENTIC_MAX_ITERATIONS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(12); + + // Spawn background task with timeout + let generator = app_state.insight_generator.clone(); + let job_dao = app_state.insight_job_dao.clone(); + let lib_id = library.id; + let path = normalized_path.clone(); + + tokio::spawn(async move { + let timeout_secs: u64 = std::env::var("INSIGHT_GENERATION_TIMEOUT_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(180); + + let result = tokio::time::timeout( + std::time::Duration::from_secs(timeout_secs), + generator.generate_agentic_insight_for_photo( + &path, + request.model.clone(), + request.system_prompt.clone(), + request.num_ctx, + request.temperature, + request.top_p, + request.top_k, + request.min_p, + max_iterations, + request.backend.clone(), + fewshot_examples, + fewshot_ids, + user_id, + persona_id, + ), ) .await; - match result { - Ok((prompt_eval_count, eval_count)) => { - span.set_status(Status::Ok); - // Fetch the stored insight to return it - let otel_context = opentelemetry::Context::new(); - let mut dao = insight_dao.lock().expect("Unable to lock InsightDao"); - match dao.get_insight(&otel_context, &normalized_path) { - Ok(Some(insight)) => { - let response = PhotoInsightResponse { - id: insight.id, - file_path: insight.file_path, - title: insight.title, - summary: insight.summary, - generated_at: insight.generated_at, - model_version: insight.model_version, - prompt_eval_count, - eval_count, - approved: insight.approved, - has_training_messages: insight.training_messages.is_some(), - backend: insight.backend, - }; - HttpResponse::Ok().json(response) - } - Ok(None) => HttpResponse::Ok().json(serde_json::json!({ - "success": true, - "message": "Agentic insight generated successfully" - })), - Err(e) => { - log::warn!("Insight stored but failed to retrieve: {:?}", e); - HttpResponse::Ok().json(serde_json::json!({ - "success": true, - "message": "Agentic insight generated successfully" - })) - } - } - } - Err(e) => { - let error_msg = format!("{:?}", e); - log::error!("Failed to generate agentic insight: {}", error_msg); - span.set_status(Status::error(error_msg.clone())); + let ctx = opentelemetry::Context::new(); + let mut dao = job_dao.lock().expect("Unable to lock InsightJobDao"); - if error_msg.contains("tool calling not supported") - || error_msg.contains("model not available") - { - HttpResponse::BadRequest().json(serde_json::json!({ - "error": format!("Failed to generate agentic insight: {}", error_msg) - })) - } else if error_msg.contains("error parsing tool call") { - HttpResponse::BadRequest().json(serde_json::json!({ - "error": "Model is not compatible with Ollama's tool calling protocol. Try a model known to support native tool calling (e.g. llama3.1, llama3.2, qwen2.5, mistral-nemo)." - })) - } else { - HttpResponse::InternalServerError().json(serde_json::json!({ - "error": format!("Failed to generate agentic insight: {}", error_msg) - })) + match result { + Ok(Ok(_)) => { + // Fetch the stored insight id to record on the job + let mut insight_dao = generator + .insight_dao() + .lock() + .expect("Unable to lock InsightDao"); + let insight_id = insight_dao + .get_insight(&ctx, &path) + .ok() + .flatten() + .map(|i| i.id); + if let Some(id) = insight_id { + let _ = dao.complete_job(&ctx, job_id, id); + } else { + let _ = dao.fail_job(&ctx, job_id, "generation returned no insight"); + } + } + Ok(Err(e)) => { + log::error!("Agentic insight generation failed for {}: {:?}", path, e); + let _ = dao.fail_job(&ctx, job_id, &format!("{:?}", e)); + } + Err(_) => { + log::error!( + "Agentic insight generation timed out for {} after {}s", + path, + timeout_secs + ); + let _ = dao.fail_job(&ctx, job_id, &format!("timeout after {}s", timeout_secs)); } } - } + }); + + HttpResponse::Ok().json(JobIdResponse { job_id }) } /// GET /insights/models - Local-backend models with capabilities. Returns diff --git a/src/ai/insight_chat.rs b/src/ai/insight_chat.rs index 9837733..36d099e 100644 --- a/src/ai/insight_chat.rs +++ b/src/ai/insight_chat.rs @@ -107,6 +107,11 @@ impl InsightChatService { } } + /// Accessor for the insight DAO (used by async job completion). + pub fn insight_dao(&self) -> &Arc>> { + &self.insight_dao + } + /// Load the rendered transcript for chat-UI display. Filters internal /// scaffolding (system message, tool turns, tool-dispatch-only assistant /// messages) and drops base64 images from user turns to keep payloads @@ -522,8 +527,17 @@ impl InsightChatService { } else { let cx = opentelemetry::Context::new(); let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao"); - dao.update_training_messages(&cx, req.library_id, &normalized, &json) + let rows = dao + .update_training_messages(&cx, req.library_id, &normalized, &json) .map_err(|e| anyhow!("failed to persist chat history: {:?}", e))?; + if rows == 0 { + log::warn!( + "update_training_messages updated 0 rows for {} (lib {}), \ + concurrent regenerate likely flipped is_current", + normalized, + req.library_id + ); + } } Ok(ChatTurnResult { @@ -590,8 +604,17 @@ impl InsightChatService { let cx = opentelemetry::Context::new(); let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao"); - dao.update_training_messages(&cx, library_id, &normalized, &json) + let rows = dao + .update_training_messages(&cx, library_id, &normalized, &json) .map_err(|e| anyhow!("failed to persist truncated history: {:?}", e))?; + if rows == 0 { + log::warn!( + "update_training_messages (rewind) updated 0 rows for {} (lib {}), \ + concurrent regenerate likely flipped is_current", + normalized, + library_id + ); + } Ok(()) } @@ -851,8 +874,17 @@ impl InsightChatService { } else { let cx = opentelemetry::Context::new(); let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao"); - dao.update_training_messages(&cx, req.library_id, &normalized, &json) + let rows = dao + .update_training_messages(&cx, req.library_id, &normalized, &json) .map_err(|e| anyhow!("failed to persist chat history: {:?}", e))?; + if rows == 0 { + log::warn!( + "update_training_messages (stream) updated 0 rows for {} (lib {}), \ + concurrent regenerate likely flipped is_current", + normalized, + req.library_id + ); + } } let _ = tx diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index 8e39c59..f2620e7 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -196,6 +196,12 @@ impl InsightGenerator { } } + /// Accessor for the insight DAO (used by async job completion to + /// look up the stored insight id). + pub fn insight_dao(&self) -> &Arc>> { + &self.insight_dao + } + /// Whether the optional Apollo Places integration is wired up. Drives /// tool-definition gating (no point offering `get_personal_place_at` /// when Apollo is unreachable) — exposed publicly so `insight_chat` diff --git a/src/ai/mod.rs b/src/ai/mod.rs index c991c71..672c672 100644 --- a/src/ai/mod.rs +++ b/src/ai/mod.rs @@ -21,8 +21,9 @@ pub use daily_summary_job::{ pub use handlers::{ chat_history_handler, chat_rewind_handler, chat_stream_handler, chat_turn_handler, delete_insight_handler, export_training_data_handler, generate_agentic_insight_handler, - generate_insight_handler, get_all_insights_handler, get_available_models_handler, - get_insight_handler, get_openrouter_models_handler, rate_insight_handler, + generate_insight_handler, generation_status_handler, get_all_insights_handler, + get_available_models_handler, get_insight_handler, get_openrouter_models_handler, + rate_insight_handler, }; pub use insight_generator::InsightGenerator; pub use llamacpp::LlamaCppClient; diff --git a/src/database/insight_generation_job_dao.rs b/src/database/insight_generation_job_dao.rs new file mode 100644 index 0000000..9d848cc --- /dev/null +++ b/src/database/insight_generation_job_dao.rs @@ -0,0 +1,503 @@ +use diesel::prelude::*; +use diesel::sqlite::SqliteConnection; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex}; + +use crate::database::models::{ + InsertInsightGenerationJob, InsightGenerationJob, InsightGenerationType, InsightJobStatus, +}; +use crate::database::schema; +use crate::database::{DbError, DbErrorKind, connect}; +use crate::otel::trace_db_call; + +/// Tracks async insight generation jobs. The idempotent insert ensures +/// concurrent callers for the same (library_id, file_path, generation_type) +/// get the same job_id rather than creating duplicates. +pub trait InsightGenerationJobDao: Sync + Send { + /// Insert a new job or return the existing running job for the same key. + /// Returns the job_id either way. + fn create_or_get_active_job( + &mut self, + context: &opentelemetry::Context, + library_id: i32, + file_path: &str, + generation_type: InsightGenerationType, + ) -> Result; + + /// Mark a job as completed with the resulting insight id. + fn complete_job( + &mut self, + context: &opentelemetry::Context, + job_id: i32, + insight_id: i32, + ) -> Result<(), DbError>; + + /// Mark a job as failed with an error message. + fn fail_job( + &mut self, + context: &opentelemetry::Context, + job_id: i32, + error_message: &str, + ) -> Result<(), DbError>; + + /// Mark the active running job for a file as "cancelled". Returns true if + /// a job was found and cancelled, false if no running job existed. + fn cancel_active_job( + &mut self, + context: &opentelemetry::Context, + library_id: i32, + file_path: &str, + generation_type: InsightGenerationType, + ) -> Result; + + /// Find the latest running job for a given file. Returns None if no + /// running job exists. + fn get_active_job( + &mut self, + context: &opentelemetry::Context, + library_id: i32, + file_path: &str, + ) -> Result, DbError>; + + /// Find any job by id regardless of status. + fn get_job_by_id( + &mut self, + context: &opentelemetry::Context, + job_id: i32, + ) -> Result, DbError>; +} + +pub struct SqliteInsightGenerationJobDao { + connection: Arc>, +} + +impl Default for SqliteInsightGenerationJobDao { + fn default() -> Self { + Self::new() + } +} + +impl SqliteInsightGenerationJobDao { + pub fn new() -> Self { + Self { + connection: Arc::new(Mutex::new(connect())), + } + } + + #[cfg(test)] + pub fn from_connection(conn: Arc>) -> Self { + Self { connection: conn } + } +} + +impl InsightGenerationJobDao for SqliteInsightGenerationJobDao { + fn create_or_get_active_job( + &mut self, + context: &opentelemetry::Context, + library_id: i32, + file_path: &str, + generation_type: InsightGenerationType, + ) -> Result { + trace_db_call(context, "insert", "create_or_get_active_job", |_span| { + use schema::insight_generation_jobs::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock InsightGenerationJobDao"); + + // Check for existing running job + let existing = dsl::insight_generation_jobs + .filter( + dsl::library_id + .eq(library_id) + .and(dsl::file_path.eq(file_path)) + .and(dsl::generation_type.eq(generation_type.as_str())) + .and(dsl::status.eq(InsightJobStatus::Running.as_str())), + ) + .select(dsl::id) + .first::(connection.deref_mut()) + .optional(); + + match existing { + Ok(Some(job_id)) => return Ok(job_id), + Ok(None) => {} + Err(e) => return Err(anyhow::anyhow!("Failed to check existing job: {}", e)), + } + + // No running job exists, insert new one (upsert on conflict) + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() as i64; + + let new_job = InsertInsightGenerationJob { + library_id, + path: file_path.to_string(), + gen_type: generation_type.to_string(), + status: InsightJobStatus::Running.to_string(), + started_at: now, + }; + + diesel::insert_into(dsl::insight_generation_jobs) + .values(&new_job) + .on_conflict((dsl::library_id, dsl::file_path, dsl::generation_type)) + .do_update() + .set(( + dsl::status.eq(InsightJobStatus::Running.as_str()), + dsl::started_at.eq(now), + )) + .execute(connection.deref_mut()) + .map_err(|e| anyhow::anyhow!("Failed to insert job: {}", e))?; + + // Get the job id + dsl::insight_generation_jobs + .filter( + dsl::library_id + .eq(library_id) + .and(dsl::file_path.eq(file_path)) + .and(dsl::generation_type.eq(generation_type.as_str())), + ) + .select(dsl::id) + .order(dsl::id.desc()) + .first::(connection.deref_mut()) + .map_err(|e| anyhow::anyhow!("Failed to get job id: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn complete_job( + &mut self, + context: &opentelemetry::Context, + job_id: i32, + insight_id: i32, + ) -> Result<(), DbError> { + trace_db_call(context, "update", "complete_job", |_span| { + use schema::insight_generation_jobs::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock InsightGenerationJobDao"); + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() as i64; + + diesel::update(dsl::insight_generation_jobs.filter(dsl::id.eq(job_id))) + .set(( + dsl::status.eq(InsightJobStatus::Completed.as_str()), + dsl::completed_at.eq(Some(now)), + dsl::result_insight_id.eq(Some(insight_id)), + )) + .execute(connection.deref_mut()) + .map(|_| ()) + .map_err(|e| anyhow::anyhow!("Failed to complete job: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::UpdateError)) + } + + fn fail_job( + &mut self, + context: &opentelemetry::Context, + job_id: i32, + error_message: &str, + ) -> Result<(), DbError> { + trace_db_call(context, "update", "fail_job", |_span| { + use schema::insight_generation_jobs::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock InsightGenerationJobDao"); + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() as i64; + + diesel::update(dsl::insight_generation_jobs.filter(dsl::id.eq(job_id))) + .set(( + dsl::status.eq(InsightJobStatus::Failed.as_str()), + dsl::completed_at.eq(Some(now)), + dsl::error_message.eq(Some(error_message.to_string())), + )) + .execute(connection.deref_mut()) + .map(|_| ()) + .map_err(|e| anyhow::anyhow!("Failed to fail job: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::UpdateError)) + } + + fn cancel_active_job( + &mut self, + context: &opentelemetry::Context, + library_id: i32, + file_path: &str, + generation_type: InsightGenerationType, + ) -> Result { + trace_db_call(context, "update", "cancel_active_job", |_span| { + use schema::insight_generation_jobs::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock InsightGenerationJobDao"); + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() as i64; + + let rows = diesel::update( + dsl::insight_generation_jobs.filter( + dsl::library_id + .eq(library_id) + .and(dsl::file_path.eq(file_path)) + .and(dsl::generation_type.eq(generation_type.as_str())) + .and(dsl::status.eq(InsightJobStatus::Running.as_str())), + ), + ) + .set(( + dsl::status.eq(InsightJobStatus::Cancelled.as_str()), + dsl::completed_at.eq(Some(now)), + dsl::error_message.eq(Some("cancelled by newer request".to_string())), + )) + .execute(connection.deref_mut()) + .map_err(|e| anyhow::anyhow!("Failed to cancel job: {}", e))?; + + Ok(rows > 0) + }) + .map_err(|_| DbError::new(DbErrorKind::UpdateError)) + } + + fn get_active_job( + &mut self, + context: &opentelemetry::Context, + library_id: i32, + file_path: &str, + ) -> Result, DbError> { + trace_db_call(context, "query", "get_active_job", |_span| { + use schema::insight_generation_jobs::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock InsightGenerationJobDao"); + + dsl::insight_generation_jobs + .filter( + dsl::library_id + .eq(library_id) + .and(dsl::file_path.eq(file_path)) + .and(dsl::status.eq(InsightJobStatus::Running.as_str())), + ) + .order(dsl::id.desc()) + .first::(connection.deref_mut()) + .optional() + .map_err(|e| anyhow::anyhow!("Failed to get active job: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn get_job_by_id( + &mut self, + context: &opentelemetry::Context, + job_id: i32, + ) -> Result, DbError> { + trace_db_call(context, "query", "get_job_by_id", |_span| { + use schema::insight_generation_jobs::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock InsightGenerationJobDao"); + + dsl::insight_generation_jobs + .filter(dsl::id.eq(job_id)) + .first::(connection.deref_mut()) + .optional() + .map_err(|e| anyhow::anyhow!("Failed to get job: {}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use diesel::Connection; + use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; + + const DB_MIGRATIONS: EmbeddedMigrations = embed_migrations!(); + + fn setup_dao() -> SqliteInsightGenerationJobDao { + let mut conn = SqliteConnection::establish(":memory:") + .expect("Unable to create in-memory db connection"); + conn.run_pending_migrations(DB_MIGRATIONS) + .expect("Failure running DB migrations"); + SqliteInsightGenerationJobDao::from_connection(Arc::new(Mutex::new(conn))) + } + + fn ctx() -> opentelemetry::Context { + opentelemetry::Context::new() + } + + #[test] + fn create_job_idempotent() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let job_id_1 = dao + .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .unwrap(); + + let job_id_2 = dao + .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .unwrap(); + + assert_eq!( + job_id_1, job_id_2, + "idempotent insert should return same job_id" + ); + } + + #[test] + fn complete_job_sets_result() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let job_id = dao + .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .unwrap(); + + dao.complete_job(&ctx, job_id, 42).unwrap(); + + let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); + assert_eq!(job.status, InsightJobStatus::Completed.as_str()); + assert_eq!(job.result_insight_id, Some(42)); + assert!(job.completed_at.is_some()); + } + + #[test] + fn fail_job_sets_error() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let job_id = dao + .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Agentic) + .unwrap(); + + dao.fail_job(&ctx, job_id, "model timeout").unwrap(); + + let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); + assert_eq!(job.status, InsightJobStatus::Failed.as_str()); + assert_eq!(job.error_message.as_deref(), Some("model timeout")); + assert!(job.completed_at.is_some()); + } + + #[test] + fn get_active_job_returns_none_when_completed() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let job_id = dao + .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .unwrap(); + + // Job is running + let active = dao.get_active_job(&ctx, 1, "photos/test.jpg").unwrap(); + assert!(active.is_some()); + assert_eq!(active.unwrap().id, job_id); + + // Complete it + dao.complete_job(&ctx, job_id, 1).unwrap(); + + // No longer active + let active = dao.get_active_job(&ctx, 1, "photos/test.jpg").unwrap(); + assert!(active.is_none()); + } + + #[test] + fn cancel_active_job() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let job_id = dao + .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .unwrap(); + + let cancelled = dao + .cancel_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .unwrap(); + assert!(cancelled, "should cancel existing running job"); + + // Job is no longer active + let active = dao.get_active_job(&ctx, 1, "photos/test.jpg").unwrap(); + assert!(active.is_none()); + + // Job exists with cancelled status + let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); + assert_eq!(job.status, InsightJobStatus::Cancelled.as_str()); + + // Cancelling again returns false (nothing to cancel) + let cancelled2 = dao + .cancel_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .unwrap(); + assert!(!cancelled2, "should return false when no running job"); + } + + #[test] + fn get_active_job_scoped_by_library() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let job_id_1 = dao + .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .unwrap(); + + let job_id_2 = dao + .create_or_get_active_job(&ctx, 2, "photos/test.jpg", InsightGenerationType::Standard) + .unwrap(); + + assert_ne!( + job_id_1, job_id_2, + "different libraries should have separate jobs" + ); + + // Complete lib1's job + dao.complete_job(&ctx, job_id_1, 1).unwrap(); + + // lib1 has no active job + let active1 = dao.get_active_job(&ctx, 1, "photos/test.jpg").unwrap(); + assert!(active1.is_none()); + + // lib2 still has active job + let active2 = dao.get_active_job(&ctx, 2, "photos/test.jpg").unwrap(); + assert!(active2.is_some()); + assert_eq!(active2.unwrap().id, job_id_2); + } + + #[test] + fn get_job_by_id_finds_any_status() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let job_id = dao + .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .unwrap(); + + // Find while running + let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); + assert_eq!(job.status, InsightJobStatus::Running.as_str()); + + // Complete it + dao.complete_job(&ctx, job_id, 99).unwrap(); + + // Still findable + let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); + assert_eq!(job.status, InsightJobStatus::Completed.as_str()); + assert_eq!(job.result_insight_id, Some(99)); + } +} diff --git a/src/database/insights_dao.rs b/src/database/insights_dao.rs index 86c51aa..2ca214c 100644 --- a/src/database/insights_dao.rs +++ b/src/database/insights_dao.rs @@ -90,13 +90,15 @@ pub trait InsightDao: Sync + Send { /// Replace the `training_messages` JSON blob on the current row for /// `(library_id, rel_path)`. Used by chat-turn append mode to persist /// the extended conversation without inserting a new insight version. + /// Returns the number of rows affected (0 if no current row matched, + /// indicating a concurrent regenerate/reconcile flipped `is_current`). fn update_training_messages( &mut self, context: &opentelemetry::Context, library_id: i32, file_path: &str, training_messages_json: &str, - ) -> Result<(), DbError>; + ) -> Result; } pub struct SqliteInsightDao { @@ -372,7 +374,7 @@ impl InsightDao for SqliteInsightDao { lib_id: i32, path: &str, training_messages_json: &str, - ) -> Result<(), DbError> { + ) -> Result { trace_db_call(context, "update", "update_training_messages", |_span| { use schema::photo_insights::dsl::*; @@ -386,7 +388,6 @@ impl InsightDao for SqliteInsightDao { ) .set(training_messages.eq(Some(training_messages_json.to_string()))) .execute(connection.deref_mut()) - .map(|_| ()) .map_err(|_| anyhow::anyhow!("Update error")) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) diff --git a/src/database/mod.rs b/src/database/mod.rs index 4488a00..5aa160d 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -45,6 +45,7 @@ pub struct DuplicateRow { pub mod calendar_dao; pub mod daily_summary_dao; +pub mod insight_generation_job_dao; pub mod insights_dao; pub mod knowledge_dao; pub mod location_dao; @@ -57,6 +58,7 @@ pub mod search_dao; pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao}; pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao}; +pub use insight_generation_job_dao::{InsightGenerationJobDao, SqliteInsightGenerationJobDao}; pub use insights_dao::{InsightDao, SqliteInsightDao}; pub use knowledge_dao::{ ConsolidationGroup, EntityFilter, EntityGraph, EntityPatch, EntitySort, FactFilter, FactPatch, diff --git a/src/database/models.rs b/src/database/models.rs index 9d005f5..0c52781 100644 --- a/src/database/models.rs +++ b/src/database/models.rs @@ -1,9 +1,81 @@ use crate::database::schema::{ - entities, entity_facts, entity_photo_links, favorites, image_exif, libraries, personas, - photo_insights, users, video_preview_clips, + entities, entity_facts, entity_photo_links, favorites, image_exif, insight_generation_jobs, + libraries, personas, photo_insights, users, video_preview_clips, }; use serde::Serialize; +/// Possible statuses for an insight generation job. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, FromSqlRow)] +#[serde(rename_all = "snake_case")] +pub enum InsightJobStatus { + Running, + Completed, + Failed, + Cancelled, +} + +impl InsightJobStatus { + pub fn as_str(&self) -> &'static str { + match self { + Self::Running => "running", + Self::Completed => "completed", + Self::Failed => "failed", + Self::Cancelled => "cancelled", + } + } +} + +impl ToString for InsightJobStatus { + fn to_string(&self) -> String { + self.as_str().to_string() + } +} + +impl InsightJobStatus { + pub fn from_str(s: &str) -> Self { + match s { + "running" => Self::Running, + "completed" => Self::Completed, + "failed" => Self::Failed, + "cancelled" => Self::Cancelled, + _ => Self::Failed, + } + } +} + +/// Type of insight generation (standard vs agentic). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum InsightGenerationType { + Standard, + Agentic, +} + +impl InsightGenerationType { + pub fn as_str(&self) -> &'static str { + match self { + Self::Standard => "standard", + Self::Agentic => "agentic", + } + } +} + +impl ToString for InsightGenerationType { + fn to_string(&self) -> String { + self.as_str().to_string() + } +} + +impl InsightGenerationType { + pub fn from_str(s: &str) -> Self { + match s { + "standard" => Self::Standard, + "agentic" => Self::Agentic, + _ => Self::Standard, + } + } +} + #[derive(Insertable)] #[diesel(table_name = users)] pub struct InsertUser<'a> { @@ -394,3 +466,30 @@ pub struct VideoPreviewClip { pub created_at: String, pub updated_at: String, } + +#[derive(Insertable)] +#[diesel(table_name = insight_generation_jobs)] +pub struct InsertInsightGenerationJob { + pub library_id: i32, + #[diesel(column_name = file_path)] + pub path: String, + #[diesel(column_name = generation_type)] + pub gen_type: String, + pub status: String, + pub started_at: i64, +} + +#[derive(Queryable, Serialize, Clone, Debug)] +pub struct InsightGenerationJob { + pub id: i32, + pub library_id: i32, + #[diesel(column_name = file_path)] + pub path: String, + #[diesel(column_name = generation_type)] + pub gen_type: String, + pub status: String, + pub started_at: i64, + pub completed_at: Option, + pub result_insight_id: Option, + pub error_message: Option, +} diff --git a/src/database/schema.rs b/src/database/schema.rs index 28b5d26..61f4bf6 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -271,12 +271,27 @@ diesel::table! { } } +diesel::table! { + insight_generation_jobs (id) { + id -> Integer, + library_id -> Integer, + file_path -> Text, + generation_type -> Text, + status -> Text, + started_at -> BigInt, + completed_at -> Nullable, + result_insight_id -> Nullable, + error_message -> Nullable, + } +} + diesel::joinable!(entity_facts -> photo_insights (source_insight_id)); diesel::joinable!(entity_photo_links -> entities (entity_id)); diesel::joinable!(entity_photo_links -> libraries (library_id)); diesel::joinable!(face_detections -> libraries (library_id)); diesel::joinable!(face_detections -> persons (person_id)); diesel::joinable!(image_exif -> libraries (library_id)); +diesel::joinable!(insight_generation_jobs -> libraries (library_id)); diesel::joinable!(personas -> users (user_id)); diesel::joinable!(persons -> entities (entity_id)); diesel::joinable!(photo_insights -> libraries (library_id)); @@ -292,6 +307,7 @@ diesel::allow_tables_to_appear_in_same_query!( face_detections, favorites, image_exif, + insight_generation_jobs, libraries, location_history, personas, diff --git a/src/main.rs b/src/main.rs index 63013ce..767e336 100644 --- a/src/main.rs +++ b/src/main.rs @@ -308,6 +308,7 @@ fn main() -> std::io::Result<()> { .service(memories::list_memories) .service(ai::generate_insight_handler) .service(ai::generate_agentic_insight_handler) + .service(ai::generation_status_handler) .service(ai::get_insight_handler) .service(ai::delete_insight_handler) .service(ai::get_all_insights_handler) diff --git a/src/state.rs b/src/state.rs index 8cfccbb..329d15f 100644 --- a/src/state.rs +++ b/src/state.rs @@ -6,10 +6,10 @@ use crate::ai::llamacpp::LlamaCppClient; use crate::ai::openrouter::OpenRouterClient; use crate::ai::{InsightGenerator, OllamaClient, SmsApiClient}; use crate::database::{ - CalendarEventDao, DailySummaryDao, ExifDao, InsightDao, KnowledgeDao, LocationHistoryDao, - SearchHistoryDao, SqliteCalendarEventDao, SqliteDailySummaryDao, SqliteExifDao, - SqliteInsightDao, SqliteKnowledgeDao, SqliteLocationHistoryDao, SqliteSearchHistoryDao, - connect, + CalendarEventDao, DailySummaryDao, ExifDao, InsightDao, InsightGenerationJobDao, KnowledgeDao, + LocationHistoryDao, SearchHistoryDao, SqliteCalendarEventDao, SqliteDailySummaryDao, + SqliteExifDao, SqliteInsightDao, SqliteInsightGenerationJobDao, SqliteKnowledgeDao, + SqliteLocationHistoryDao, SqliteSearchHistoryDao, connect, }; use crate::database::{PreviewDao, SqlitePreviewDao}; use crate::faces; @@ -86,6 +86,8 @@ pub struct AppState { /// Same disabled semantics as `face_client`: unset env → no-op /// backlog drain, /photos/search returns an empty result. pub clip_client: ClipClient, + /// Tracks async insight generation jobs (spawned by generate endpoints). + pub insight_job_dao: Arc>>, } impl AppState { @@ -124,6 +126,7 @@ impl AppState { preview_dao: Arc>>, face_client: FaceClient, clip_client: ClipClient, + insight_job_dao: Arc>>, ) -> Self { assert!( !libraries_vec.is_empty(), @@ -165,6 +168,7 @@ impl AppState { insight_chat, face_client, clip_client, + insight_job_dao, } } @@ -253,6 +257,10 @@ impl Default for AppState { let face_dao: Arc>> = Arc::new(Mutex::new(Box::new(faces::SqliteFaceDao::new()))); + // Initialize insight generation job DAO (async generation tracking) + let insight_job_dao: Arc>> = + Arc::new(Mutex::new(Box::new(SqliteInsightGenerationJobDao::new()))); + // Load base path and ensure the primary library row reflects it. let base_path = env::var("BASE_PATH").expect("BASE_PATH was not set in the env"); let mut seed_conn = connect(); @@ -319,6 +327,7 @@ impl Default for AppState { preview_dao, face_client, clip_client, + insight_job_dao, ) } } @@ -389,6 +398,7 @@ fn parse_llamacpp_allowed_models() -> Vec { impl AppState { /// Creates an AppState instance for testing with temporary directories pub fn test_state() -> Self { + use crate::database::insight_generation_job_dao::SqliteInsightGenerationJobDao; use actix::Actor; // Create a base temporary directory let temp_dir = tempfile::tempdir().expect("Failed to create temp directory"); @@ -502,6 +512,7 @@ impl AppState { preview_dao, FaceClient::new(None), // disabled in test ClipClient::new(None), // disabled in test + Arc::new(Mutex::new(Box::new(SqliteInsightGenerationJobDao::new()))), // placeholder for test ) } }