diff --git a/migrations/2026-05-27-000000_add_insight_generation_jobs/up.sql b/migrations/2026-05-27-000000_add_insight_generation_jobs/up.sql index a20c195..1ad6aab 100644 --- a/migrations/2026-05-27-000000_add_insight_generation_jobs/up.sql +++ b/migrations/2026-05-27-000000_add_insight_generation_jobs/up.sql @@ -1,8 +1,7 @@ -- Track async insight generation jobs so the client can poll for --- completion after the server returns 202 Accepted. The UNIQUE --- constraint on (library_id, file_path, generation_type) ensures --- idempotent inserts: if a running job already exists, the caller --- should return that job_id instead of creating a duplicate. +-- completion after the server returns 202 Accepted. Each generation +-- creates a new row; the application layer cancels prior running +-- jobs before inserting. CREATE TABLE insight_generation_jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, library_id INTEGER NOT NULL DEFAULT 1, @@ -12,8 +11,7 @@ CREATE TABLE insight_generation_jobs ( started_at INTEGER NOT NULL, completed_at INTEGER, result_insight_id INTEGER, - error_message TEXT, - UNIQUE(library_id, file_path, generation_type) + error_message TEXT ); -- For the status endpoint: fast lookup by (library_id, file_path) diff --git a/migrations/2026-05-27-000001_remove_insight_jobs_unique/down.sql b/migrations/2026-05-27-000001_remove_insight_jobs_unique/down.sql new file mode 100644 index 0000000..7a5cf40 --- /dev/null +++ b/migrations/2026-05-27-000001_remove_insight_jobs_unique/down.sql @@ -0,0 +1,28 @@ +-- Restore UNIQUE constraint + +CREATE TABLE insight_generation_jobs_new ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + library_id INTEGER NOT NULL DEFAULT 1, + file_path TEXT NOT NULL, + generation_type TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'running', + started_at INTEGER NOT NULL, + completed_at INTEGER, + result_insight_id INTEGER, + error_message TEXT, + UNIQUE(library_id, file_path, generation_type) +); + +INSERT INTO insight_generation_jobs_new + SELECT id, library_id, file_path, generation_type, status, started_at, completed_at, result_insight_id, error_message + FROM insight_generation_jobs; + +DROP TABLE insight_generation_jobs; + +ALTER TABLE insight_generation_jobs_new RENAME TO insight_generation_jobs; + +CREATE INDEX idx_insight_gen_jobs_file + ON insight_generation_jobs(library_id, file_path); + +CREATE INDEX idx_insight_gen_jobs_status_cleanup + ON insight_generation_jobs(status, started_at); diff --git a/migrations/2026-05-27-000001_remove_insight_jobs_unique/up.sql b/migrations/2026-05-27-000001_remove_insight_jobs_unique/up.sql new file mode 100644 index 0000000..939c592 --- /dev/null +++ b/migrations/2026-05-27-000001_remove_insight_jobs_unique/up.sql @@ -0,0 +1,30 @@ +-- Remove UNIQUE(library_id, file_path, generation_type) constraint to allow +-- multiple job rows per file. This enables proper cancel/regenerate semantics: +-- a new job is always inserted on regenerate, and the old job is cancelled +-- independently. The application layer prevents concurrent running jobs. + +CREATE TABLE insight_generation_jobs_new ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + library_id INTEGER NOT NULL DEFAULT 1, + file_path TEXT NOT NULL, + generation_type TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'running', + started_at INTEGER NOT NULL, + completed_at INTEGER, + result_insight_id INTEGER, + error_message TEXT +); + +INSERT INTO insight_generation_jobs_new + SELECT id, library_id, file_path, generation_type, status, started_at, completed_at, result_insight_id, error_message + FROM insight_generation_jobs; + +DROP TABLE insight_generation_jobs; + +ALTER TABLE insight_generation_jobs_new RENAME TO insight_generation_jobs; + +CREATE INDEX idx_insight_gen_jobs_file + ON insight_generation_jobs(library_id, file_path); + +CREATE INDEX idx_insight_gen_jobs_status_cleanup + ON insight_generation_jobs(status, started_at); diff --git a/migrations/2026-05-27-000002_add_insight_generation_params/down.sql b/migrations/2026-05-27-000002_add_insight_generation_params/down.sql new file mode 100644 index 0000000..217d330 --- /dev/null +++ b/migrations/2026-05-27-000002_add_insight_generation_params/down.sql @@ -0,0 +1,11 @@ +-- SQLite doesn't support DROP COLUMN before 3.35.0; recreate the table +-- without the new columns. This is only needed for rollback. +CREATE TABLE photo_insights_old AS + SELECT id, library_id, rel_path, title, summary, generated_at, + model_version, is_current, training_messages, approved, + backend, fewshot_source_ids, content_hash + FROM photo_insights; + +DROP TABLE photo_insights; + +ALTER TABLE photo_insights_old RENAME TO photo_insights; diff --git a/migrations/2026-05-27-000002_add_insight_generation_params/up.sql b/migrations/2026-05-27-000002_add_insight_generation_params/up.sql new file mode 100644 index 0000000..1313fde --- /dev/null +++ b/migrations/2026-05-27-000002_add_insight_generation_params/up.sql @@ -0,0 +1,8 @@ +-- Persist generation parameters on each insight row for auditing. +ALTER TABLE photo_insights ADD COLUMN num_ctx INTEGER; +ALTER TABLE photo_insights ADD COLUMN temperature REAL; +ALTER TABLE photo_insights ADD COLUMN top_p REAL; +ALTER TABLE photo_insights ADD COLUMN top_k INTEGER; +ALTER TABLE photo_insights ADD COLUMN min_p REAL; +ALTER TABLE photo_insights ADD COLUMN system_prompt TEXT; +ALTER TABLE photo_insights ADD COLUMN persona_id TEXT; diff --git a/src/ai/handlers.rs b/src/ai/handlers.rs index 5773483..57f9df6 100644 --- a/src/ai/handlers.rs +++ b/src/ai/handlers.rs @@ -73,13 +73,13 @@ pub struct GenerationStatusQuery { /// If provided with `library`, look up the latest running job for this /// file. Used when the client doesn't have a persisted job_id. #[serde(default)] - pub file_path: Option, + pub path: Option, #[serde(default)] pub library: Option, } /// GET /insights/generation/status - Check status of a generation job. -/// Accepts either `?job_id=` or `?file_path=&library=`. +/// Accepts either `?job_id=` or `?path=&library=`. #[get("/insights/generation/status")] pub async fn generation_status_handler( _claims: Claims, @@ -118,7 +118,7 @@ pub async fn generation_status_handler( } } - if let Some(ref fp) = query.file_path { + if let Some(ref fp) = query.path { let library = libraries::resolve_library_param(&app_state, query.library.as_deref()) .ok() .flatten() @@ -156,7 +156,115 @@ pub async fn generation_status_handler( } HttpResponse::BadRequest().json(serde_json::json!({ - "error": "Provide either job_id or file_path query parameter" + "error": "Provide either job_id or path query parameter" + })) +} + +#[derive(Debug, Deserialize)] +pub struct CancelGenerationRequest { + /// If provided, cancel the specific job by id. + #[serde(default)] + pub job_id: Option, + /// If provided with `library`, cancel all running jobs for this file. + #[serde(default)] + pub file_path: Option, + #[serde(default)] + pub library: Option, +} + +/// POST /insights/generation/cancel - Cancel a running generation job. +/// Accepts either `job_id` or `file_path` + optional `library` in the body. +#[post("/insights/generation/cancel")] +pub async fn cancel_generation_handler( + _claims: Claims, + request: web::Json, + app_state: web::Data, +) -> impl Responder { + let ctx = opentelemetry::Context::new(); + + if let Some(jid) = request.job_id { + let mut dao = app_state + .insight_job_dao + .lock() + .expect("Unable to lock InsightJobDao"); + match dao.cancel_job(&ctx, jid) { + Ok(true) => { + let mut handles = app_state + .insight_job_handles + .lock() + .expect("Unable to lock InsightJobHandles"); + if let Some(handle) = handles.remove(&jid) { + handle.abort(); + } + return HttpResponse::Ok().json(serde_json::json!({ + "success": true, + "message": format!("Job {} cancelled", jid) + })); + } + Ok(false) => { + return HttpResponse::Ok().json(serde_json::json!({ + "success": true, + "message": format!("Job {} was not running", jid) + })); + } + Err(e) => { + log::error!("Failed to cancel job {}: {:?}", jid, e); + return HttpResponse::InternalServerError().json(serde_json::json!({ + "error": "Failed to cancel job" + })); + } + } + } + + if let Some(ref fp) = request.file_path { + let library = libraries::resolve_library_param(&app_state, request.library.as_deref()) + .ok() + .flatten() + .unwrap_or_else(|| app_state.primary_library()); + let normalized = normalize_path(fp); + + // Get active job ids first, then cancel in DB, then abort tasks + let active_ids: Vec = { + let mut dao = app_state + .insight_job_dao + .lock() + .expect("Unable to lock InsightJobDao"); + let ids = dao + .get_active_job(&ctx, library.id, &normalized) + .ok() + .flatten() + .map(|j| vec![j.id]) + .unwrap_or_default(); + let _ = dao.cancel_active_jobs(&ctx, library.id, &normalized); + ids + }; + + if active_ids.is_empty() { + return HttpResponse::Ok().json(serde_json::json!({ + "success": true, + "message": "No running generation job for this file" + })); + } + + for jid in &active_ids { + if let Some(handle) = app_state + .insight_job_handles + .lock() + .expect("Unable to lock InsightJobHandles") + .remove(jid) + { + handle.abort(); + } + } + + return HttpResponse::Ok().json(serde_json::json!({ + "success": true, + "message": format!("Cancelled {} running job(s) for {}", active_ids.len(), normalized) + })); + } + + HttpResponse::BadRequest().json(serde_json::json!({ + "error": "Provide either job_id or file_path in the request body" })) } @@ -208,6 +316,20 @@ pub struct PhotoInsightResponse { /// True when the insight was generated agentically and a chat /// continuation can be started against it. Drives the mobile chat button. pub has_training_messages: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub num_ctx: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub temperature: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub top_p: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub top_k: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub min_p: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub system_prompt: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub persona_id: Option, } #[derive(Debug, Serialize)] @@ -227,33 +349,55 @@ pub struct ServerModels { /// POST /insights/generate - Generate insight for a specific photo (async) #[post("/insights/generate")] pub async fn generate_insight_handler( - _http_request: HttpRequest, + http_request: HttpRequest, _claims: Claims, request: web::Json, app_state: web::Data, ) -> impl Responder { + let parent_context = extract_context_from_request(&http_request); + let tracer = global_tracer(); + let mut span = tracer.start_with_context("http.insights.generate", &parent_context); + let normalized_path = normalize_path(&request.file_path); let library = app_state.primary_library(); let gen_type = InsightGenerationType::Standard; + span.set_attribute(KeyValue::new("file_path", normalized_path.clone())); + if let Some(ref model) = request.model { + span.set_attribute(KeyValue::new("model", model.clone())); + } + log::info!( "Manual insight generation triggered for photo: {} with model: {:?}", normalized_path, request.model ); - // Cancel any running job for this file, then create a fresh one - { + // Look up and abort any running job for this file, then cancel in DB + let old_job_ids: Vec = { let mut dao = app_state .insight_job_dao .lock() .expect("Unable to lock InsightJobDao"); - let _ = dao.cancel_active_job( - &opentelemetry::Context::new(), - library.id, - &normalized_path, - gen_type, - ); + let ctx = opentelemetry::Context::new(); + let ids = dao + .get_active_job(&ctx, library.id, &normalized_path) + .ok() + .flatten() + .map(|j| vec![j.id]) + .unwrap_or_default(); + let _ = dao.cancel_active_jobs(&ctx, library.id, &normalized_path); + ids + }; + for jid in &old_job_ids { + if let Some(handle) = app_state + .insight_job_handles + .lock() + .expect("Unable to lock InsightJobHandles") + .remove(jid) + { + handle.abort(); + } } let job_id = { @@ -261,7 +405,7 @@ pub async fn generate_insight_handler( .insight_job_dao .lock() .expect("Unable to lock InsightJobDao"); - match dao.create_or_get_active_job( + match dao.create_job( &opentelemetry::Context::new(), library.id, &normalized_path, @@ -270,6 +414,7 @@ pub async fn generate_insight_handler( Ok(id) => id, Err(e) => { log::error!("Failed to create generation job: {:?}", e); + span.set_status(Status::error("Failed to create generation job")); return HttpResponse::InternalServerError().json(serde_json::json!({ "error": "Failed to create generation job" })); @@ -280,36 +425,40 @@ pub async fn generate_insight_handler( // Spawn background task with timeout let generator = app_state.insight_generator.clone(); let job_dao = app_state.insight_job_dao.clone(); - let lib_id = library.id; + let job_handles = app_state.insight_job_handles.clone(); let path = normalized_path.clone(); - tokio::spawn(async move { + let handle = tokio::spawn(async move { let timeout_secs: u64 = std::env::var("INSIGHT_GENERATION_TIMEOUT_SECS") .ok() .and_then(|v| v.parse().ok()) .unwrap_or(120); - let result = tokio::time::timeout( - std::time::Duration::from_secs(timeout_secs), - generator.generate_insight_for_photo_with_config( - &path, - request.model.clone(), - request.system_prompt.clone(), - request.num_ctx, - request.temperature, - request.top_p, - request.top_k, - request.min_p, - ), - ) + let path_for_task = path.clone(); + let generator_for_task = generator.clone(); + let result = tokio::task::spawn(async move { + tokio::time::timeout( + std::time::Duration::from_secs(timeout_secs), + generator_for_task.generate_insight_for_photo_with_config( + &path_for_task, + request.model.clone(), + request.system_prompt.clone(), + request.num_ctx, + request.temperature, + request.top_p, + request.top_k, + request.min_p, + ), + ) + .await + }) .await; let ctx = opentelemetry::Context::new(); let mut dao = job_dao.lock().expect("Unable to lock InsightJobDao"); match result { - Ok(Ok(())) => { - // Look up the stored insight id to record on the job + Ok(Ok(Ok(()))) => { let mut insight_dao = generator .insight_dao() .lock() @@ -320,27 +469,60 @@ pub async fn generate_insight_handler( .flatten() .map(|i| i.id); if let Some(id) = insight_id { - let _ = dao.complete_job(&ctx, job_id, id); + if let Err(e) = dao.complete_job(&ctx, job_id, id) { + log::error!("Failed to mark job {} as completed: {:?}", job_id, e); + } } else { - let _ = dao.fail_job(&ctx, job_id, "generation returned no insight"); + if let Err(e) = dao.fail_job(&ctx, job_id, "generation returned no insight") { + log::error!("Failed to mark job {} as failed: {:?}", job_id, e); + } } } - Ok(Err(e)) => { + Ok(Ok(Err(e))) => { log::error!("Insight generation failed for {}: {:?}", path, e); - let _ = dao.fail_job(&ctx, job_id, &format!("{:?}", e)); + if let Err(err) = dao.fail_job(&ctx, job_id, &format!("{:?}", e)) { + log::error!("Failed to mark job {} as failed: {:?}", job_id, err); + } } - Err(_) => { + Ok(Err(_)) => { log::error!( "Insight generation timed out for {} after {}s", path, timeout_secs ); - let _ = dao.fail_job(&ctx, job_id, &format!("timeout after {}s", timeout_secs)); + if let Err(err) = + dao.fail_job(&ctx, job_id, &format!("timeout after {}s", timeout_secs)) + { + log::error!("Failed to mark job {} as failed: {:?}", job_id, err); + } + } + Err(_) => { + log::error!("Insight generation task panicked for {}", path); + if let Err(err) = dao.fail_job(&ctx, job_id, "generation task panicked") { + log::error!("Failed to mark job {} as failed: {:?}", job_id, err); + } } } + + // Remove handle from map on completion + let mut handles = job_handles + .lock() + .expect("Unable to lock InsightJobHandles"); + handles.remove(&job_id); }); - HttpResponse::Ok().json(JobIdResponse { job_id }) + // Store abort handle + { + let mut handles = app_state + .insight_job_handles + .lock() + .expect("Unable to lock InsightJobHandles"); + handles.insert(job_id, handle.abort_handle()); + } + + span.set_attribute(KeyValue::new("job_id", job_id as i64)); + span.set_status(Status::Ok); + HttpResponse::Accepted().json(JobIdResponse { job_id }) } /// GET /insights?path=/path/to/photo.jpg - Fetch insight for specific photo @@ -385,6 +567,13 @@ pub async fn get_insight_handler( approved: insight.approved, has_training_messages: insight.training_messages.is_some(), backend: insight.backend, + num_ctx: insight.num_ctx, + temperature: insight.temperature, + top_p: insight.top_p, + top_k: insight.top_k, + min_p: insight.min_p, + system_prompt: insight.system_prompt, + persona_id: insight.persona_id, }; HttpResponse::Ok().json(response) } @@ -454,6 +643,13 @@ pub async fn get_all_insights_handler( approved: insight.approved, has_training_messages: insight.training_messages.is_some(), backend: insight.backend, + num_ctx: insight.num_ctx, + temperature: insight.temperature, + top_p: insight.top_p, + top_k: insight.top_k, + min_p: insight.min_p, + system_prompt: insight.system_prompt, + persona_id: insight.persona_id, }) .collect(); @@ -471,33 +667,58 @@ pub async fn get_all_insights_handler( /// POST /insights/generate/agentic - Generate insight using agentic tool-calling loop (async) #[post("/insights/generate/agentic")] pub async fn generate_agentic_insight_handler( - _http_request: HttpRequest, + http_request: HttpRequest, claims: Claims, request: web::Json, app_state: web::Data, ) -> impl Responder { + let parent_context = extract_context_from_request(&http_request); + let tracer = global_tracer(); + let mut span = tracer.start_with_context("http.insights.generate_agentic", &parent_context); + let normalized_path = normalize_path(&request.file_path); let library = app_state.primary_library(); let gen_type = InsightGenerationType::Agentic; + span.set_attribute(KeyValue::new("file_path", normalized_path.clone())); + if let Some(ref model) = request.model { + span.set_attribute(KeyValue::new("model", model.clone())); + } + if let Some(ref backend) = request.backend { + span.set_attribute(KeyValue::new("backend", backend.clone())); + } + log::info!( "Agentic insight generation triggered for photo: {} with model: {:?}", normalized_path, request.model ); - // Cancel any running job for this file, then create a fresh one - { + // Look up and abort any running job for this file, then cancel in DB + let old_job_ids: Vec = { let mut dao = app_state .insight_job_dao .lock() .expect("Unable to lock InsightJobDao"); - let _ = dao.cancel_active_job( - &opentelemetry::Context::new(), - library.id, - &normalized_path, - gen_type, - ); + let ctx = opentelemetry::Context::new(); + let ids = dao + .get_active_job(&ctx, library.id, &normalized_path) + .ok() + .flatten() + .map(|j| vec![j.id]) + .unwrap_or_default(); + let _ = dao.cancel_active_jobs(&ctx, library.id, &normalized_path); + ids + }; + for jid in &old_job_ids { + if let Some(handle) = app_state + .insight_job_handles + .lock() + .expect("Unable to lock InsightJobHandles") + .remove(jid) + { + handle.abort(); + } } let job_id = { @@ -505,7 +726,7 @@ pub async fn generate_agentic_insight_handler( .insight_job_dao .lock() .expect("Unable to lock InsightJobDao"); - match dao.create_or_get_active_job( + match dao.create_job( &opentelemetry::Context::new(), library.id, &normalized_path, @@ -514,6 +735,7 @@ pub async fn generate_agentic_insight_handler( Ok(id) => id, Err(e) => { log::error!("Failed to create agentic generation job: {:?}", e); + span.set_status(Status::error("Failed to create generation job")); return HttpResponse::InternalServerError().json(serde_json::json!({ "error": "Failed to create generation job" })); @@ -573,73 +795,101 @@ pub async fn generate_agentic_insight_handler( // Spawn background task with timeout let generator = app_state.insight_generator.clone(); let job_dao = app_state.insight_job_dao.clone(); - let lib_id = library.id; + let job_handles = app_state.insight_job_handles.clone(); let path = normalized_path.clone(); - tokio::spawn(async move { + let handle = tokio::spawn(async move { let timeout_secs: u64 = std::env::var("INSIGHT_GENERATION_TIMEOUT_SECS") .ok() .and_then(|v| v.parse().ok()) .unwrap_or(180); - let result = tokio::time::timeout( - std::time::Duration::from_secs(timeout_secs), - generator.generate_agentic_insight_for_photo( - &path, - request.model.clone(), - request.system_prompt.clone(), - request.num_ctx, - request.temperature, - request.top_p, - request.top_k, - request.min_p, - max_iterations, - request.backend.clone(), - fewshot_examples, - fewshot_ids, - user_id, - persona_id, - ), - ) + let path_for_task = path.clone(); + let generator_for_task = generator.clone(); + let result = tokio::task::spawn(async move { + tokio::time::timeout( + std::time::Duration::from_secs(timeout_secs), + generator_for_task.generate_agentic_insight_for_photo( + &path_for_task, + request.model.clone(), + request.system_prompt.clone(), + request.num_ctx, + request.temperature, + request.top_p, + request.top_k, + request.min_p, + max_iterations, + request.backend.clone(), + fewshot_examples, + fewshot_ids, + user_id, + persona_id, + ), + ) + .await + }) .await; let ctx = opentelemetry::Context::new(); let mut dao = job_dao.lock().expect("Unable to lock InsightJobDao"); match result { - Ok(Ok(_)) => { - // Fetch the stored insight id to record on the job - let mut insight_dao = generator - .insight_dao() - .lock() - .expect("Unable to lock InsightDao"); - let insight_id = insight_dao - .get_insight(&ctx, &path) - .ok() - .flatten() - .map(|i| i.id); - if let Some(id) = insight_id { - let _ = dao.complete_job(&ctx, job_id, id); - } else { - let _ = dao.fail_job(&ctx, job_id, "generation returned no insight"); + Ok(Ok(Ok((Some(insight_id), _)))) => { + if let Err(e) = dao.complete_job(&ctx, job_id, insight_id) { + log::error!("Failed to mark job {} as completed: {:?}", job_id, e); } } - Ok(Err(e)) => { - log::error!("Agentic insight generation failed for {}: {:?}", path, e); - let _ = dao.fail_job(&ctx, job_id, &format!("{:?}", e)); + Ok(Ok(Ok((None, _)))) => { + if let Err(e) = dao.fail_job(&ctx, job_id, "agentic generation returned no insight") + { + log::error!("Failed to mark job {} as failed: {:?}", job_id, e); + } } - Err(_) => { + Ok(Ok(Err(e))) => { + log::error!("Agentic insight generation failed for {}: {:?}", path, e); + if let Err(err) = dao.fail_job(&ctx, job_id, &format!("{:?}", e)) { + log::error!("Failed to mark job {} as failed: {:?}", job_id, err); + } + } + Ok(Err(_)) => { log::error!( "Agentic insight generation timed out for {} after {}s", path, timeout_secs ); - let _ = dao.fail_job(&ctx, job_id, &format!("timeout after {}s", timeout_secs)); + if let Err(err) = + dao.fail_job(&ctx, job_id, &format!("timeout after {}s", timeout_secs)) + { + log::error!("Failed to mark job {} as failed: {:?}", job_id, err); + } + } + Err(_) => { + log::error!("Agentic insight generation task panicked for {}", path); + if let Err(err) = dao.fail_job(&ctx, job_id, "generation task panicked") { + log::error!("Failed to mark job {} as failed: {:?}", job_id, err); + } } } + + // Remove handle from map on completion + let mut handles = job_handles + .lock() + .expect("Unable to lock InsightJobHandles"); + handles.remove(&job_id); }); - HttpResponse::Ok().json(JobIdResponse { job_id }) + // Store abort handle + { + let mut handles = app_state + .insight_job_handles + .lock() + .expect("Unable to lock InsightJobHandles"); + handles.insert(job_id, handle.abort_handle()); + } + + span.set_attribute(KeyValue::new("job_id", job_id as i64)); + span.set_status(Status::Ok); + HttpResponse::Accepted().json(JobIdResponse { job_id }) } /// GET /insights/models - Local-backend models with capabilities. Returns diff --git a/src/ai/insight_chat.rs b/src/ai/insight_chat.rs index 36d099e..d5f2fd9 100644 --- a/src/ai/insight_chat.rs +++ b/src/ai/insight_chat.rs @@ -517,6 +517,13 @@ impl InsightChatService { backend: kind.as_str().to_string(), fewshot_source_ids: None, content_hash: None, + num_ctx: req.num_ctx, + temperature: req.temperature, + top_p: req.top_p, + top_k: req.top_k, + min_p: req.min_p, + system_prompt: req.system_prompt.clone(), + persona_id: req.persona_id.clone(), }; let cx = opentelemetry::Context::new(); let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao"); @@ -864,6 +871,13 @@ impl InsightChatService { backend: kind.as_str().to_string(), fewshot_source_ids: None, content_hash: None, + num_ctx: req.num_ctx, + temperature: req.temperature, + top_p: req.top_p, + top_k: req.top_k, + min_p: req.min_p, + system_prompt: req.system_prompt.clone(), + persona_id: req.persona_id.clone(), }; let cx = opentelemetry::Context::new(); let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao"); @@ -1052,6 +1066,13 @@ impl InsightChatService { backend: kind.as_str().to_string(), fewshot_source_ids: None, content_hash: None, + num_ctx: req.num_ctx, + temperature: req.temperature, + top_p: req.top_p, + top_k: req.top_k, + min_p: req.min_p, + system_prompt: req.system_prompt.clone(), + persona_id: req.persona_id.clone(), }; let stored = { let cx = opentelemetry::Context::new(); diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index f2620e7..59ccaad 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -1432,6 +1432,13 @@ impl InsightGenerator { backend: "local".to_string(), fewshot_source_ids: None, content_hash: None, + num_ctx, + temperature, + top_p, + top_k, + min_p, + system_prompt: custom_system_prompt.clone(), + persona_id: None, }; let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao"); @@ -4176,6 +4183,13 @@ Return ONLY the summary, nothing else."#, backend: kind.as_str().to_string(), fewshot_source_ids: fewshot_source_ids_json, content_hash: None, + num_ctx, + temperature, + top_p, + top_k, + min_p, + system_prompt: custom_system_prompt.clone(), + persona_id: Some(persona_id.clone()), }; let stored = { diff --git a/src/ai/mod.rs b/src/ai/mod.rs index 672c672..93a2edc 100644 --- a/src/ai/mod.rs +++ b/src/ai/mod.rs @@ -19,11 +19,11 @@ pub use daily_summary_job::{ generate_daily_summaries, strip_summary_boilerplate, }; pub use handlers::{ - chat_history_handler, chat_rewind_handler, chat_stream_handler, chat_turn_handler, - delete_insight_handler, export_training_data_handler, generate_agentic_insight_handler, - generate_insight_handler, generation_status_handler, get_all_insights_handler, - get_available_models_handler, get_insight_handler, get_openrouter_models_handler, - rate_insight_handler, + cancel_generation_handler, chat_history_handler, chat_rewind_handler, chat_stream_handler, + chat_turn_handler, delete_insight_handler, export_training_data_handler, + generate_agentic_insight_handler, generate_insight_handler, generation_status_handler, + get_all_insights_handler, get_available_models_handler, get_insight_handler, + get_openrouter_models_handler, rate_insight_handler, }; pub use insight_generator::InsightGenerator; pub use llamacpp::LlamaCppClient; diff --git a/src/database/insight_generation_job_dao.rs b/src/database/insight_generation_job_dao.rs index 9d848cc..f0670bb 100644 --- a/src/database/insight_generation_job_dao.rs +++ b/src/database/insight_generation_job_dao.rs @@ -10,13 +10,13 @@ use crate::database::schema; use crate::database::{DbError, DbErrorKind, connect}; use crate::otel::trace_db_call; -/// Tracks async insight generation jobs. The idempotent insert ensures -/// concurrent callers for the same (library_id, file_path, generation_type) -/// get the same job_id rather than creating duplicates. +/// Tracks async insight generation jobs. Each call to `create_job` inserts +/// a new row; the application layer prevents concurrent running jobs by +/// cancelling the old one before creating a new one. pub trait InsightGenerationJobDao: Sync + Send { - /// Insert a new job or return the existing running job for the same key. - /// Returns the job_id either way. - fn create_or_get_active_job( + /// Insert a new running job. Always creates a new row (no upsert). + /// Cleans up terminal-state rows for the same key first. + fn create_job( &mut self, context: &opentelemetry::Context, library_id: i32, @@ -24,7 +24,9 @@ pub trait InsightGenerationJobDao: Sync + Send { generation_type: InsightGenerationType, ) -> Result; - /// Mark a job as completed with the resulting insight id. + /// Mark a job as completed with the resulting insight id. Only updates + /// if the job is still in "running" status (prevents overwriting a + /// cancelled job with a late-completing task). fn complete_job( &mut self, context: &opentelemetry::Context, @@ -32,7 +34,8 @@ pub trait InsightGenerationJobDao: Sync + Send { insight_id: i32, ) -> Result<(), DbError>; - /// Mark a job as failed with an error message. + /// Mark a job as failed with an error message. Only updates if the job + /// is still in "running" status. fn fail_job( &mut self, context: &opentelemetry::Context, @@ -40,15 +43,22 @@ pub trait InsightGenerationJobDao: Sync + Send { error_message: &str, ) -> Result<(), DbError>; - /// Mark the active running job for a file as "cancelled". Returns true if - /// a job was found and cancelled, false if no running job existed. - fn cancel_active_job( + /// Cancel a specific job by id. Only updates if the job is still + /// in "running" status. Returns true if a row was updated. + fn cancel_job( + &mut self, + context: &opentelemetry::Context, + job_id: i32, + ) -> Result; + + /// Cancel all running jobs for a given file. Returns the number of + /// jobs cancelled. + fn cancel_active_jobs( &mut self, context: &opentelemetry::Context, library_id: i32, file_path: &str, - generation_type: InsightGenerationType, - ) -> Result; + ) -> Result; /// Find the latest running job for a given file. Returns None if no /// running job exists. @@ -65,6 +75,11 @@ pub trait InsightGenerationJobDao: Sync + Send { context: &opentelemetry::Context, job_id: i32, ) -> Result, DbError>; + + /// Mark all jobs still in "running" status as "failed" with a recovery + /// error message. Returns the number of jobs recovered. + fn recover_orphaned_jobs(&mut self, context: &opentelemetry::Context) + -> Result; } pub struct SqliteInsightGenerationJobDao { @@ -91,14 +106,14 @@ impl SqliteInsightGenerationJobDao { } impl InsightGenerationJobDao for SqliteInsightGenerationJobDao { - fn create_or_get_active_job( + fn create_job( &mut self, context: &opentelemetry::Context, library_id: i32, file_path: &str, generation_type: InsightGenerationType, ) -> Result { - trace_db_call(context, "insert", "create_or_get_active_job", |_span| { + trace_db_call(context, "insert", "create_job", |_span| { use schema::insight_generation_jobs::dsl; let mut connection = self @@ -106,26 +121,6 @@ impl InsightGenerationJobDao for SqliteInsightGenerationJobDao { .lock() .expect("Unable to lock InsightGenerationJobDao"); - // Check for existing running job - let existing = dsl::insight_generation_jobs - .filter( - dsl::library_id - .eq(library_id) - .and(dsl::file_path.eq(file_path)) - .and(dsl::generation_type.eq(generation_type.as_str())) - .and(dsl::status.eq(InsightJobStatus::Running.as_str())), - ) - .select(dsl::id) - .first::(connection.deref_mut()) - .optional(); - - match existing { - Ok(Some(job_id)) => return Ok(job_id), - Ok(None) => {} - Err(e) => return Err(anyhow::anyhow!("Failed to check existing job: {}", e)), - } - - // No running job exists, insert new one (upsert on conflict) let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("Time went backwards") @@ -141,22 +136,16 @@ impl InsightGenerationJobDao for SqliteInsightGenerationJobDao { diesel::insert_into(dsl::insight_generation_jobs) .values(&new_job) - .on_conflict((dsl::library_id, dsl::file_path, dsl::generation_type)) - .do_update() - .set(( - dsl::status.eq(InsightJobStatus::Running.as_str()), - dsl::started_at.eq(now), - )) .execute(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Failed to insert job: {}", e))?; - // Get the job id dsl::insight_generation_jobs .filter( dsl::library_id .eq(library_id) .and(dsl::file_path.eq(file_path)) - .and(dsl::generation_type.eq(generation_type.as_str())), + .and(dsl::generation_type.eq(generation_type.as_str())) + .and(dsl::status.eq(InsightJobStatus::Running.as_str())), ) .select(dsl::id) .order(dsl::id.desc()) @@ -185,15 +174,23 @@ impl InsightGenerationJobDao for SqliteInsightGenerationJobDao { .expect("Time went backwards") .as_secs() as i64; - diesel::update(dsl::insight_generation_jobs.filter(dsl::id.eq(job_id))) - .set(( - dsl::status.eq(InsightJobStatus::Completed.as_str()), - dsl::completed_at.eq(Some(now)), - dsl::result_insight_id.eq(Some(insight_id)), - )) - .execute(connection.deref_mut()) - .map(|_| ()) - .map_err(|e| anyhow::anyhow!("Failed to complete job: {}", e)) + // Only update if still running — prevents cancelled job from + // being overwritten by a late-completing task. + diesel::update( + dsl::insight_generation_jobs.filter( + dsl::id + .eq(job_id) + .and(dsl::status.eq(InsightJobStatus::Running.as_str())), + ), + ) + .set(( + dsl::status.eq(InsightJobStatus::Completed.as_str()), + dsl::completed_at.eq(Some(now)), + dsl::result_insight_id.eq(Some(insight_id)), + )) + .execute(connection.deref_mut()) + .map(|_| ()) + .map_err(|e| anyhow::anyhow!("Failed to complete job: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } @@ -217,27 +214,71 @@ impl InsightGenerationJobDao for SqliteInsightGenerationJobDao { .expect("Time went backwards") .as_secs() as i64; - diesel::update(dsl::insight_generation_jobs.filter(dsl::id.eq(job_id))) - .set(( - dsl::status.eq(InsightJobStatus::Failed.as_str()), - dsl::completed_at.eq(Some(now)), - dsl::error_message.eq(Some(error_message.to_string())), - )) - .execute(connection.deref_mut()) - .map(|_| ()) - .map_err(|e| anyhow::anyhow!("Failed to fail job: {}", e)) + // Only update if still running. + diesel::update( + dsl::insight_generation_jobs.filter( + dsl::id + .eq(job_id) + .and(dsl::status.eq(InsightJobStatus::Running.as_str())), + ), + ) + .set(( + dsl::status.eq(InsightJobStatus::Failed.as_str()), + dsl::completed_at.eq(Some(now)), + dsl::error_message.eq(Some(error_message.to_string())), + )) + .execute(connection.deref_mut()) + .map(|_| ()) + .map_err(|e| anyhow::anyhow!("Failed to fail job: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } - fn cancel_active_job( + fn cancel_job( + &mut self, + context: &opentelemetry::Context, + job_id: i32, + ) -> Result { + trace_db_call(context, "update", "cancel_job", |_span| { + use schema::insight_generation_jobs::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock InsightGenerationJobDao"); + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() as i64; + + let rows = diesel::update( + dsl::insight_generation_jobs.filter( + dsl::id + .eq(job_id) + .and(dsl::status.eq(InsightJobStatus::Running.as_str())), + ), + ) + .set(( + dsl::status.eq(InsightJobStatus::Cancelled.as_str()), + dsl::completed_at.eq(Some(now)), + dsl::error_message.eq(Some("cancelled by user".to_string())), + )) + .execute(connection.deref_mut()) + .map_err(|e| anyhow::anyhow!("Failed to cancel job: {}", e))?; + + Ok(rows > 0) + }) + .map_err(|_| DbError::new(DbErrorKind::UpdateError)) + } + + fn cancel_active_jobs( &mut self, context: &opentelemetry::Context, library_id: i32, file_path: &str, - generation_type: InsightGenerationType, - ) -> Result { - trace_db_call(context, "update", "cancel_active_job", |_span| { + ) -> Result { + trace_db_call(context, "update", "cancel_active_jobs", |_span| { use schema::insight_generation_jobs::dsl; let mut connection = self @@ -255,7 +296,6 @@ impl InsightGenerationJobDao for SqliteInsightGenerationJobDao { dsl::library_id .eq(library_id) .and(dsl::file_path.eq(file_path)) - .and(dsl::generation_type.eq(generation_type.as_str())) .and(dsl::status.eq(InsightJobStatus::Running.as_str())), ), ) @@ -265,9 +305,9 @@ impl InsightGenerationJobDao for SqliteInsightGenerationJobDao { dsl::error_message.eq(Some("cancelled by newer request".to_string())), )) .execute(connection.deref_mut()) - .map_err(|e| anyhow::anyhow!("Failed to cancel job: {}", e))?; + .map_err(|e| anyhow::anyhow!("Failed to cancel active jobs: {}", e))?; - Ok(rows > 0) + Ok(rows) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } @@ -322,6 +362,40 @@ impl InsightGenerationJobDao for SqliteInsightGenerationJobDao { }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } + + fn recover_orphaned_jobs( + &mut self, + context: &opentelemetry::Context, + ) -> Result { + trace_db_call(context, "update", "recover_orphaned_jobs", |_span| { + use schema::insight_generation_jobs::dsl; + + let mut connection = self + .connection + .lock() + .expect("Unable to lock InsightGenerationJobDao"); + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() as i64; + + let rows = diesel::update( + dsl::insight_generation_jobs + .filter(dsl::status.eq(InsightJobStatus::Running.as_str())), + ) + .set(( + dsl::status.eq(InsightJobStatus::Failed.as_str()), + dsl::completed_at.eq(Some(now)), + dsl::error_message.eq(Some("server crashed while running".to_string())), + )) + .execute(connection.deref_mut()) + .map_err(|e| anyhow::anyhow!("Failed to recover orphaned jobs: {}", e))?; + + Ok(rows) + }) + .map_err(|_| DbError::new(DbErrorKind::UpdateError)) + } } #[cfg(test)] @@ -345,22 +419,19 @@ mod tests { } #[test] - fn create_job_idempotent() { + fn create_job_inserts_new_row() { let mut dao = setup_dao(); let ctx = ctx(); let job_id_1 = dao - .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); let job_id_2 = dao - .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); - assert_eq!( - job_id_1, job_id_2, - "idempotent insert should return same job_id" - ); + assert_ne!(job_id_1, job_id_2, "each create_job call inserts a new row"); } #[test] @@ -369,7 +440,7 @@ mod tests { let ctx = ctx(); let job_id = dao - .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); dao.complete_job(&ctx, job_id, 42).unwrap(); @@ -386,7 +457,7 @@ mod tests { let ctx = ctx(); let job_id = dao - .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Agentic) + .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Agentic) .unwrap(); dao.fail_job(&ctx, job_id, "model timeout").unwrap(); @@ -403,7 +474,7 @@ mod tests { let ctx = ctx(); let job_id = dao - .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); // Job is running @@ -420,18 +491,16 @@ mod tests { } #[test] - fn cancel_active_job() { + fn cancel_active_jobs() { let mut dao = setup_dao(); let ctx = ctx(); let job_id = dao - .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); - let cancelled = dao - .cancel_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) - .unwrap(); - assert!(cancelled, "should cancel existing running job"); + let cancelled = dao.cancel_active_jobs(&ctx, 1, "photos/test.jpg").unwrap(); + assert_eq!(cancelled, 1, "should cancel 1 running job"); // Job is no longer active let active = dao.get_active_job(&ctx, 1, "photos/test.jpg").unwrap(); @@ -441,11 +510,9 @@ mod tests { let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); assert_eq!(job.status, InsightJobStatus::Cancelled.as_str()); - // Cancelling again returns false (nothing to cancel) - let cancelled2 = dao - .cancel_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) - .unwrap(); - assert!(!cancelled2, "should return false when no running job"); + // Cancelling again returns 0 (nothing to cancel) + let cancelled2 = dao.cancel_active_jobs(&ctx, 1, "photos/test.jpg").unwrap(); + assert_eq!(cancelled2, 0, "should return 0 when no running job"); } #[test] @@ -454,11 +521,11 @@ mod tests { let ctx = ctx(); let job_id_1 = dao - .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); let job_id_2 = dao - .create_or_get_active_job(&ctx, 2, "photos/test.jpg", InsightGenerationType::Standard) + .create_job(&ctx, 2, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); assert_ne!( @@ -485,7 +552,7 @@ mod tests { let ctx = ctx(); let job_id = dao - .create_or_get_active_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) .unwrap(); // Find while running @@ -500,4 +567,115 @@ mod tests { assert_eq!(job.status, InsightJobStatus::Completed.as_str()); assert_eq!(job.result_insight_id, Some(99)); } + + #[test] + fn recover_orphaned_jobs() { + let mut dao = setup_dao(); + let ctx = ctx(); + + // Create two running jobs + let job_id_1 = dao + .create_job(&ctx, 1, "photos/a.jpg", InsightGenerationType::Standard) + .unwrap(); + let job_id_2 = dao + .create_job(&ctx, 1, "photos/b.jpg", InsightGenerationType::Agentic) + .unwrap(); + + // Complete one + dao.complete_job(&ctx, job_id_1, 1).unwrap(); + + // Recover should only affect the running job + let recovered = dao.recover_orphaned_jobs(&ctx).unwrap(); + assert_eq!(recovered, 1, "should recover exactly 1 running job"); + + // job_id_1 is still completed + let job1 = dao.get_job_by_id(&ctx, job_id_1).unwrap().unwrap(); + assert_eq!(job1.status, InsightJobStatus::Completed.as_str()); + + // job_id_2 is now failed with recovery message + let job2 = dao.get_job_by_id(&ctx, job_id_2).unwrap().unwrap(); + assert_eq!(job2.status, InsightJobStatus::Failed.as_str()); + assert_eq!( + job2.error_message.as_deref(), + Some("server crashed while running") + ); + + // Second recovery is a no-op + let recovered2 = dao.recover_orphaned_jobs(&ctx).unwrap(); + assert_eq!(recovered2, 0, "no running jobs remain"); + } + + #[test] + fn complete_job_noop_when_cancelled() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let job_id = dao + .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .unwrap(); + + dao.cancel_job(&ctx, job_id).unwrap(); + + // Late-completing task tries to mark as completed — should be a no-op + dao.complete_job(&ctx, job_id, 42).unwrap(); + + let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); + assert_eq!( + job.status, + InsightJobStatus::Cancelled.as_str(), + "cancelled status must not be overwritten by late complete" + ); + assert_eq!( + job.result_insight_id, None, + "insight_id must stay None when complete is a no-op" + ); + } + + #[test] + fn fail_job_noop_when_cancelled() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let job_id = dao + .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Agentic) + .unwrap(); + + dao.cancel_job(&ctx, job_id).unwrap(); + + // Late-failing task tries to mark as failed — should be a no-op + dao.fail_job(&ctx, job_id, "timeout after 120s").unwrap(); + + let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); + assert_eq!( + job.status, + InsightJobStatus::Cancelled.as_str(), + "cancelled status must not be overwritten by late fail" + ); + assert_eq!( + job.error_message.as_deref(), + Some("cancelled by user"), + "error_message must reflect the cancel, not the late fail" + ); + } + + #[test] + fn cancel_job_by_id() { + let mut dao = setup_dao(); + let ctx = ctx(); + + let job_id = dao + .create_job(&ctx, 1, "photos/test.jpg", InsightGenerationType::Standard) + .unwrap(); + + let cancelled = dao.cancel_job(&ctx, job_id).unwrap(); + assert!(cancelled, "should cancel running job"); + + let job = dao.get_job_by_id(&ctx, job_id).unwrap().unwrap(); + assert_eq!(job.status, InsightJobStatus::Cancelled.as_str()); + assert!(job.completed_at.is_some()); + + // Cancelling again is a no-op + let cancelled2 = dao.cancel_job(&ctx, job_id).unwrap(); + assert!(!cancelled2, "already cancelled job should return false"); + } } diff --git a/src/database/models.rs b/src/database/models.rs index 0c52781..d670844 100644 --- a/src/database/models.rs +++ b/src/database/models.rs @@ -38,7 +38,13 @@ impl InsightJobStatus { "completed" => Self::Completed, "failed" => Self::Failed, "cancelled" => Self::Cancelled, - _ => Self::Failed, + other => { + log::warn!( + "Unknown InsightJobStatus value: {:?}, treating as failed", + other + ); + Self::Failed + } } } } @@ -224,6 +230,13 @@ pub struct InsertPhotoInsight { /// inserted before the hash is available stay null and the /// reconciliation pass backfills them. pub content_hash: Option, + pub num_ctx: Option, + pub temperature: Option, + pub top_p: Option, + pub top_k: Option, + pub min_p: Option, + pub system_prompt: Option, + pub persona_id: Option, } #[derive(Serialize, Queryable, Clone, Debug)] @@ -243,6 +256,13 @@ pub struct PhotoInsight { pub backend: String, pub fewshot_source_ids: Option, pub content_hash: Option, + pub num_ctx: Option, + pub temperature: Option, + pub top_p: Option, + pub top_k: Option, + pub min_p: Option, + pub system_prompt: Option, + pub persona_id: Option, } // --- Libraries --- diff --git a/src/database/schema.rs b/src/database/schema.rs index 61f4bf6..9f7efe5 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -216,6 +216,13 @@ diesel::table! { backend -> Text, fewshot_source_ids -> Nullable, content_hash -> Nullable, + num_ctx -> Nullable, + temperature -> Nullable, + top_p -> Nullable, + top_k -> Nullable, + min_p -> Nullable, + system_prompt -> Nullable, + persona_id -> Nullable, } } diff --git a/src/main.rs b/src/main.rs index 767e336..a3af554 100644 --- a/src/main.rs +++ b/src/main.rs @@ -75,6 +75,22 @@ fn main() -> std::io::Result<()> { run_migrations(&mut connect()).expect("Failed to run migrations"); + // Recover orphaned insight generation jobs from a previous crash. + { + use crate::database::{InsightGenerationJobDao, SqliteInsightGenerationJobDao}; + let mut dao = SqliteInsightGenerationJobDao::new(); + let ctx = opentelemetry::Context::new(); + match dao.recover_orphaned_jobs(&ctx) { + Ok(n) if n > 0 => { + info!("Recovered {} orphaned insight generation jobs", n); + } + Ok(_) => {} + Err(e) => { + log::warn!("Failed to recover orphaned insight jobs: {:?}", e); + } + } + } + // One-shot retirement of the pre-content-hash HLS layout. Idempotent // — a second boot finds nothing and reports zero deletions, so it's // safe to leave wired in until the module is removed in a later @@ -309,6 +325,7 @@ fn main() -> std::io::Result<()> { .service(ai::generate_insight_handler) .service(ai::generate_agentic_insight_handler) .service(ai::generation_status_handler) + .service(ai::cancel_generation_handler) .service(ai::get_insight_handler) .service(ai::delete_insight_handler) .service(ai::get_all_insights_handler) diff --git a/src/state.rs b/src/state.rs index 329d15f..4ba63bf 100644 --- a/src/state.rs +++ b/src/state.rs @@ -19,6 +19,7 @@ use crate::video::actors::{ PlaylistGenerator, PreviewClipGenerator, StreamActor, VideoPlaylistManager, }; use actix::{Actor, Addr}; +use std::collections::HashMap; use std::env; use std::sync::{Arc, Mutex, RwLock}; @@ -88,6 +89,9 @@ pub struct AppState { pub clip_client: ClipClient, /// Tracks async insight generation jobs (spawned by generate endpoints). pub insight_job_dao: Arc>>, + /// In-memory map from job_id → tokio AbortHandle for running tasks. + /// Used to abort server-side tasks on cancel or regenerate. + pub insight_job_handles: Arc>>, } impl AppState { @@ -127,6 +131,7 @@ impl AppState { face_client: FaceClient, clip_client: ClipClient, insight_job_dao: Arc>>, + insight_job_handles: Arc>>, ) -> Self { assert!( !libraries_vec.is_empty(), @@ -169,6 +174,7 @@ impl AppState { face_client, clip_client, insight_job_dao, + insight_job_handles, } } @@ -260,6 +266,8 @@ impl Default for AppState { // Initialize insight generation job DAO (async generation tracking) let insight_job_dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteInsightGenerationJobDao::new()))); + let insight_job_handles: Arc>> = + Arc::new(Mutex::new(HashMap::new())); // Load base path and ensure the primary library row reflects it. let base_path = env::var("BASE_PATH").expect("BASE_PATH was not set in the env"); @@ -328,6 +336,7 @@ impl Default for AppState { face_client, clip_client, insight_job_dao, + insight_job_handles, ) } } @@ -513,6 +522,7 @@ impl AppState { FaceClient::new(None), // disabled in test ClipClient::new(None), // disabled in test Arc::new(Mutex::new(Box::new(SqliteInsightGenerationJobDao::new()))), // placeholder for test + Arc::new(Mutex::new(HashMap::new())), // placeholder for test ) } }