Files
ImageApi/src/ai/handlers.rs
T
Cameron Cordes b711252c23 Resolve persona prompts server-side; drop synthetic prompt in chat_turn
A request carrying persona_id but no system_prompt used to fall back to
the neutral default voice. Both agentic generation
(generate_agentic_insight_handler) and chat bootstrap now resolve the
persona's stored prompt from the persona store, with precedence:
explicit non-blank client system_prompt > persona store lookup >
existing default ("default" persona id behaves the same — used if the
store has a row, neutral default otherwise). Resolution happens at the
handler / bootstrap entry where the DAO is reachable; internals are
unchanged. resolve_bootstrap_system_prompt takes the resolved persona
prompt as a second argument, with precedence tests.

Also in insight_chat:

- Sync chat_turn no longer persists the synthetic "Please write your
  final answer now without calling any more tools." user message pushed
  on iteration exhaustion — extracted both streaming variants'
  synthetic_idx pattern into push/remove_synthetic_final_prompt (the
  remove is a defensive no-op on index drift) and applied it to all
  three loops; round-trip test included.
- Strip leaked <think> blocks from the final content persisted as the
  reply in chat_turn and both streaming AgenticLoopOutcomes (mid-stream
  TextDeltas are untouched; the raw transcript keeps the block).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 18:29:35 -04:00

2049 lines
74 KiB
Rust

use actix_web::{HttpRequest, HttpResponse, Responder, delete, get, post, web};
use futures::StreamExt;
use opentelemetry::KeyValue;
use opentelemetry::trace::{Span, Status, Tracer};
use serde::{Deserialize, Serialize};
use crate::ai::insight_chat::{ChatStreamEvent, ChatTurnRequest};
use crate::ai::ollama::ChatMessage;
use crate::ai::{ModelCapabilities, OllamaClient};
use crate::data::Claims;
use crate::database::models::{InsightGenerationType, InsightJobStatus, PhotoInsight};
use crate::database::{ExifDao, InsightDao};
use crate::libraries;
use crate::otel::{extract_context_from_request, global_tracer};
use crate::state::AppState;
use crate::utils::normalize_path;
/// Hardcoded few-shot exemplars for the agentic endpoint. Populate with the
/// ids of approved insights whose `training_messages` should be compressed
/// into trajectory form and injected into the system prompt. Empty = no
/// change in behavior. Request-level `fewshot_insight_ids` overrides this
/// when non-empty.
// const DEFAULT_FEWSHOT_INSIGHT_IDS: &[i32] = &[2918, 2908];
const DEFAULT_FEWSHOT_INSIGHT_IDS: &[i32] = &[];
#[derive(Debug, Deserialize)]
pub struct GeneratePhotoInsightRequest {
pub file_path: String,
#[serde(default)]
pub model: Option<String>,
#[serde(default)]
pub system_prompt: Option<String>,
#[serde(default)]
pub num_ctx: Option<i32>,
#[serde(default)]
pub temperature: Option<f32>,
#[serde(default)]
pub top_p: Option<f32>,
#[serde(default)]
pub top_k: Option<i32>,
#[serde(default)]
pub min_p: Option<f32>,
/// `"local"` (default, Ollama with images) | `"hybrid"` (local vision +
/// OpenRouter chat). Only respected by the agentic endpoint.
#[serde(default)]
pub backend: Option<String>,
/// Insight ids whose stored `training_messages` should be compressed
/// into few-shot trajectories and injected into the system prompt.
/// Silently truncated to the first 2. When absent/empty, the handler
/// falls back to `DEFAULT_FEWSHOT_INSIGHT_IDS`.
#[serde(default)]
pub fewshot_insight_ids: Option<Vec<i32>>,
/// Active persona id for this generation. New facts are tagged with
/// it (`entity_facts.persona_id`); recall during the agentic loop is
/// scoped to it. Defaults to `"default"` when absent.
#[serde(default)]
pub persona_id: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct GetPhotoInsightQuery {
pub path: String,
/// Library context for this lookup. Used to pick the right content
/// hash when the same rel_path exists under multiple roots.
#[serde(default)]
pub library: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct GenerationStatusQuery {
/// If provided, look up the job by id.
#[serde(default)]
pub job_id: Option<i32>,
/// If provided with `library`, look up the latest running job for this
/// file. Used when the client doesn't have a persisted job_id.
#[serde(default)]
pub path: Option<String>,
#[serde(default)]
pub library: Option<String>,
}
/// GET /insights/generation/status - Check status of a generation job.
/// Accepts either `?job_id=<id>` or `?path=<path>&library=<name>`.
#[get("/insights/generation/status")]
pub async fn generation_status_handler(
_claims: Claims,
query: web::Query<GenerationStatusQuery>,
app_state: web::Data<AppState>,
) -> impl Responder {
let ctx = opentelemetry::Context::new();
if let Some(jid) = query.job_id {
let mut dao = app_state
.insight_job_dao
.lock()
.expect("Unable to lock InsightJobDao");
match dao.get_job_by_id(&ctx, jid) {
Ok(Some(job)) => {
return HttpResponse::Ok().json(GenerationStatusResponse {
job_id: job.id,
status: InsightJobStatus::parse(&job.status),
started_at: job.started_at,
completed_at: job.completed_at,
result_insight_id: job.result_insight_id,
error_message: job.error_message,
});
}
Ok(None) => {
return HttpResponse::NotFound().json(serde_json::json!({
"error": format!("Job {} not found", jid)
}));
}
Err(e) => {
log::error!("Failed to look up job {}: {:?}", jid, e);
return HttpResponse::InternalServerError().json(serde_json::json!({
"error": "Failed to look up job"
}));
}
}
}
if let Some(ref fp) = query.path {
let library = libraries::resolve_library_param(&app_state, query.library.as_deref())
.ok()
.flatten()
.unwrap_or_else(|| app_state.primary_library());
let normalized = normalize_path(fp);
let mut dao = app_state
.insight_job_dao
.lock()
.expect("Unable to lock InsightJobDao");
match dao.get_active_job(&ctx, library.id, &normalized) {
Ok(Some(job)) => {
return HttpResponse::Ok().json(GenerationStatusResponse {
job_id: job.id,
status: InsightJobStatus::parse(&job.status),
started_at: job.started_at,
completed_at: job.completed_at,
result_insight_id: job.result_insight_id,
error_message: job.error_message,
});
}
Ok(None) => {
return HttpResponse::Ok().json(serde_json::json!({
"status": "idle",
"message": "No running generation job for this file"
}));
}
Err(e) => {
log::error!("Failed to look up active job for {}: {:?}", normalized, e);
return HttpResponse::InternalServerError().json(serde_json::json!({
"error": "Failed to look up active job"
}));
}
}
}
HttpResponse::BadRequest().json(serde_json::json!({
"error": "Provide either job_id or path query parameter"
}))
}
#[derive(Debug, Deserialize)]
pub struct CancelGenerationRequest {
/// If provided, cancel the specific job by id.
#[serde(default)]
pub job_id: Option<i32>,
/// If provided with `library`, cancel all running jobs for this file.
#[serde(default)]
pub file_path: Option<String>,
#[serde(default)]
pub library: Option<String>,
}
/// POST /insights/generation/cancel - Cancel a running generation job.
/// Accepts either `job_id` or `file_path` + optional `library` in the body.
#[post("/insights/generation/cancel")]
pub async fn cancel_generation_handler(
_claims: Claims,
request: web::Json<CancelGenerationRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
let ctx = opentelemetry::Context::new();
if let Some(jid) = request.job_id {
let mut dao = app_state
.insight_job_dao
.lock()
.expect("Unable to lock InsightJobDao");
match dao.cancel_job(&ctx, jid) {
Ok(true) => {
let mut handles = app_state
.insight_job_handles
.lock()
.expect("Unable to lock InsightJobHandles");
if let Some(handle) = handles.remove(&jid) {
handle.abort();
}
return HttpResponse::Ok().json(serde_json::json!({
"success": true,
"message": format!("Job {} cancelled", jid)
}));
}
Ok(false) => {
return HttpResponse::Ok().json(serde_json::json!({
"success": true,
"message": format!("Job {} was not running", jid)
}));
}
Err(e) => {
log::error!("Failed to cancel job {}: {:?}", jid, e);
return HttpResponse::InternalServerError().json(serde_json::json!({
"error": "Failed to cancel job"
}));
}
}
}
if let Some(ref fp) = request.file_path {
let library = libraries::resolve_library_param(&app_state, request.library.as_deref())
.ok()
.flatten()
.unwrap_or_else(|| app_state.primary_library());
let normalized = normalize_path(fp);
// Get active job ids first, then cancel in DB, then abort tasks
let active_ids: Vec<i32> = {
let mut dao = app_state
.insight_job_dao
.lock()
.expect("Unable to lock InsightJobDao");
let ids = dao
.get_active_job(&ctx, library.id, &normalized)
.ok()
.flatten()
.map(|j| vec![j.id])
.unwrap_or_default();
let _ = dao.cancel_active_jobs(&ctx, library.id, &normalized);
ids
};
if active_ids.is_empty() {
return HttpResponse::Ok().json(serde_json::json!({
"success": true,
"message": "No running generation job for this file"
}));
}
for jid in &active_ids {
if let Some(handle) = app_state
.insight_job_handles
.lock()
.expect("Unable to lock InsightJobHandles")
.remove(jid)
{
handle.abort();
}
}
return HttpResponse::Ok().json(serde_json::json!({
"success": true,
"message": format!("Cancelled {} running job(s) for {}", active_ids.len(), normalized)
}));
}
HttpResponse::BadRequest().json(serde_json::json!({
"error": "Provide either job_id or file_path in the request body"
}))
}
#[derive(Debug, Deserialize)]
pub struct RateInsightRequest {
pub file_path: String,
pub approved: bool,
/// When set, rate this specific insight version by primary key
/// (used by the per-file history view to rate superseded versions).
/// When omitted, the current insight for `file_path` is rated.
#[serde(default)]
pub insight_id: Option<i32>,
}
#[derive(Debug, Deserialize)]
pub struct ExportTrainingDataQuery {
#[serde(default)]
pub approved_only: Option<bool>,
}
#[derive(Debug, Serialize)]
pub struct JobIdResponse {
pub job_id: i32,
}
#[derive(Debug, Serialize)]
pub struct GenerationStatusResponse {
pub job_id: i32,
pub status: InsightJobStatus,
pub started_at: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub completed_at: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub result_insight_id: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_message: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct PhotoInsightResponse {
pub id: i32,
pub file_path: String,
pub title: String,
pub summary: String,
pub generated_at: i64,
pub model_version: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub prompt_eval_count: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub eval_count: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub approved: Option<bool>,
pub backend: String,
/// True when the insight was generated agentically and a chat
/// continuation can be started against it. Drives the mobile chat button.
pub has_training_messages: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub num_ctx: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub temperature: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub top_p: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub top_k: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub min_p: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub system_prompt: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub persona_id: Option<String>,
}
impl From<PhotoInsight> for PhotoInsightResponse {
fn from(insight: PhotoInsight) -> Self {
PhotoInsightResponse {
id: insight.id,
file_path: insight.file_path,
title: insight.title,
summary: insight.summary,
generated_at: insight.generated_at,
model_version: insight.model_version,
prompt_eval_count: insight.prompt_eval_count,
eval_count: insight.eval_count,
approved: insight.approved,
has_training_messages: insight.training_messages.is_some(),
backend: insight.backend,
num_ctx: insight.num_ctx,
temperature: insight.temperature,
top_p: insight.top_p,
top_k: insight.top_k,
min_p: insight.min_p,
system_prompt: insight.system_prompt,
persona_id: insight.persona_id,
}
}
}
#[derive(Debug, Serialize)]
pub struct AvailableModelsResponse {
pub primary: ServerModels,
#[serde(skip_serializing_if = "Option::is_none")]
pub fallback: Option<ServerModels>,
}
#[derive(Debug, Serialize)]
pub struct ServerModels {
pub url: String,
pub models: Vec<ModelCapabilities>,
pub default_model: String,
}
/// POST /insights/generate - Generate insight for a specific photo (async)
#[post("/insights/generate")]
pub async fn generate_insight_handler(
http_request: HttpRequest,
_claims: Claims,
request: web::Json<GeneratePhotoInsightRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
let parent_context = extract_context_from_request(&http_request);
let tracer = global_tracer();
let mut span = tracer.start_with_context("http.insights.generate", &parent_context);
let normalized_path = normalize_path(&request.file_path);
let library = app_state.primary_library();
let gen_type = InsightGenerationType::Standard;
span.set_attribute(KeyValue::new("file_path", normalized_path.clone()));
if let Some(ref model) = request.model {
span.set_attribute(KeyValue::new("model", model.clone()));
}
log::info!(
"Manual insight generation triggered for photo: {} with model: {:?}",
normalized_path,
request.model
);
// Look up and abort any running job for this file, then cancel in DB
let old_job_ids: Vec<i32> = {
let mut dao = app_state
.insight_job_dao
.lock()
.expect("Unable to lock InsightJobDao");
let ctx = opentelemetry::Context::new();
let ids = dao
.get_active_job(&ctx, library.id, &normalized_path)
.ok()
.flatten()
.map(|j| vec![j.id])
.unwrap_or_default();
let _ = dao.cancel_active_jobs(&ctx, library.id, &normalized_path);
ids
};
for jid in &old_job_ids {
if let Some(handle) = app_state
.insight_job_handles
.lock()
.expect("Unable to lock InsightJobHandles")
.remove(jid)
{
handle.abort();
}
}
let job_id = {
let mut dao = app_state
.insight_job_dao
.lock()
.expect("Unable to lock InsightJobDao");
match dao.create_job(
&opentelemetry::Context::new(),
library.id,
&normalized_path,
gen_type,
) {
Ok(id) => id,
Err(e) => {
log::error!("Failed to create generation job: {:?}", e);
span.set_status(Status::error("Failed to create generation job"));
return HttpResponse::InternalServerError().json(serde_json::json!({
"error": "Failed to create generation job"
}));
}
}
};
// Spawn background task with timeout
let generator = app_state.insight_generator.clone();
let job_dao = app_state.insight_job_dao.clone();
let job_handles = app_state.insight_job_handles.clone();
let path = normalized_path.clone();
let handle = tokio::spawn(async move {
let timeout_secs: u64 = std::env::var("INSIGHT_GENERATION_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(120);
let path_for_task = path.clone();
let generator_for_task = generator.clone();
let result = tokio::task::spawn(async move {
tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
generator_for_task.generate_insight_for_photo_with_config(
&path_for_task,
request.model.clone(),
request.system_prompt.clone(),
request.num_ctx,
request.temperature,
request.top_p,
request.top_k,
request.min_p,
),
)
.await
})
.await;
let ctx = opentelemetry::Context::new();
let mut dao = job_dao.lock().expect("Unable to lock InsightJobDao");
match result {
Ok(Ok(Ok(()))) => {
let mut insight_dao = generator
.insight_dao()
.lock()
.expect("Unable to lock InsightDao");
let insight_id = insight_dao
.get_insight(&ctx, &path)
.ok()
.flatten()
.map(|i| i.id);
if let Some(id) = insight_id {
if let Err(e) = dao.complete_job(&ctx, job_id, id) {
log::error!("Failed to mark job {} as completed: {:?}", job_id, e);
}
} else if let Err(e) = dao.fail_job(&ctx, job_id, "generation returned no insight")
{
log::error!("Failed to mark job {} as failed: {:?}", job_id, e);
}
}
Ok(Ok(Err(e))) => {
log::error!("Insight generation failed for {}: {:?}", path, e);
if let Err(err) = dao.fail_job(&ctx, job_id, &format!("{:?}", e)) {
log::error!("Failed to mark job {} as failed: {:?}", job_id, err);
}
}
Ok(Err(_)) => {
log::error!(
"Insight generation timed out for {} after {}s",
path,
timeout_secs
);
if let Err(err) =
dao.fail_job(&ctx, job_id, &format!("timeout after {}s", timeout_secs))
{
log::error!("Failed to mark job {} as failed: {:?}", job_id, err);
}
}
Err(_) => {
log::error!("Insight generation task panicked for {}", path);
if let Err(err) = dao.fail_job(&ctx, job_id, "generation task panicked") {
log::error!("Failed to mark job {} as failed: {:?}", job_id, err);
}
}
}
// Remove handle from map on completion
let mut handles = job_handles
.lock()
.expect("Unable to lock InsightJobHandles");
handles.remove(&job_id);
});
// Store abort handle
{
let mut handles = app_state
.insight_job_handles
.lock()
.expect("Unable to lock InsightJobHandles");
handles.insert(job_id, handle.abort_handle());
}
span.set_attribute(KeyValue::new("job_id", job_id as i64));
span.set_status(Status::Ok);
HttpResponse::Accepted().json(JobIdResponse { job_id })
}
/// GET /insights?path=/path/to/photo.jpg - Fetch insight for specific photo
#[get("/insights")]
pub async fn get_insight_handler(
_claims: Claims,
query: web::Query<GetPhotoInsightQuery>,
app_state: web::Data<AppState>,
insight_dao: web::Data<std::sync::Mutex<Box<dyn InsightDao>>>,
exif_dao: web::Data<std::sync::Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let normalized_path = normalize_path(&query.path);
log::debug!("Fetching insight for {}", normalized_path);
let otel_context = opentelemetry::Context::new();
// Expand to rel_paths sharing content so an insight generated under
// library 1 still shows when the same photo is viewed from library 2.
let library = libraries::resolve_library_param(&app_state, query.library.as_deref())
.ok()
.flatten()
.unwrap_or_else(|| app_state.primary_library());
let sibling_paths = {
let mut exif = exif_dao.lock().expect("Unable to lock ExifDao");
exif.get_rel_paths_sharing_content(&otel_context, library.id, &normalized_path)
.unwrap_or_else(|_| vec![normalized_path.clone()])
};
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
match dao.get_insight_for_paths(&otel_context, &sibling_paths) {
Ok(Some(insight)) => HttpResponse::Ok().json(PhotoInsightResponse::from(insight)),
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({
"error": "Insight not found"
})),
Err(e) => {
log::error!("Failed to fetch insight ({}): {:?}", &query.path, e);
HttpResponse::InternalServerError().json(serde_json::json!({
"error": format!("Failed to fetch insight: {:?}", e)
}))
}
}
}
/// DELETE /insights?path=/path/to/photo.jpg - Remove insight (will regenerate on next request)
#[delete("/insights")]
pub async fn delete_insight_handler(
_claims: Claims,
query: web::Query<GetPhotoInsightQuery>,
insight_dao: web::Data<std::sync::Mutex<Box<dyn InsightDao>>>,
) -> impl Responder {
let normalized_path = normalize_path(&query.path);
log::info!("Deleting insight for {}", normalized_path);
let otel_context = opentelemetry::Context::new();
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
match dao.delete_insight(&otel_context, &normalized_path) {
Ok(()) => HttpResponse::Ok().json(serde_json::json!({
"success": true,
"message": "Insight deleted successfully"
})),
Err(e) => {
log::error!("Failed to delete insight: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({
"error": format!("Failed to delete insight: {:?}", e)
}))
}
}
}
/// GET /insights/all - Get all insights
#[get("/insights/all")]
pub async fn get_all_insights_handler(
_claims: Claims,
insight_dao: web::Data<std::sync::Mutex<Box<dyn InsightDao>>>,
) -> impl Responder {
log::debug!("Fetching all insights");
let otel_context = opentelemetry::Context::new();
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
match dao.get_all_insights(&otel_context) {
Ok(insights) => {
let responses: Vec<PhotoInsightResponse> = insights
.into_iter()
.map(PhotoInsightResponse::from)
.collect();
HttpResponse::Ok().json(responses)
}
Err(e) => {
log::error!("Failed to fetch all insights: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({
"error": format!("Failed to fetch insights: {:?}", e)
}))
}
}
}
/// GET /insights/history?path=/path/to/photo.jpg - Get all insight versions
/// for a single photo (current plus previously generated/superseded ones),
/// newest first. Backs the per-file insight history view.
#[get("/insights/history")]
pub async fn get_insight_history_handler(
_claims: Claims,
query: web::Query<GetPhotoInsightQuery>,
insight_dao: web::Data<std::sync::Mutex<Box<dyn InsightDao>>>,
) -> impl Responder {
let normalized_path = normalize_path(&query.path);
log::debug!("Fetching insight history for {}", normalized_path);
let otel_context = opentelemetry::Context::new();
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
match dao.get_insight_history(&otel_context, &normalized_path) {
Ok(insights) => {
let responses: Vec<PhotoInsightResponse> = insights
.into_iter()
.map(PhotoInsightResponse::from)
.collect();
HttpResponse::Ok().json(responses)
}
Err(e) => {
log::error!("Failed to fetch insight history ({}): {:?}", &query.path, e);
HttpResponse::InternalServerError().json(serde_json::json!({
"error": format!("Failed to fetch insight history: {:?}", e)
}))
}
}
}
/// POST /insights/generate/agentic - Generate insight using agentic tool-calling loop (async)
#[post("/insights/generate/agentic")]
pub async fn generate_agentic_insight_handler(
http_request: HttpRequest,
claims: Claims,
request: web::Json<GeneratePhotoInsightRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
let parent_context = extract_context_from_request(&http_request);
let tracer = global_tracer();
let mut span = tracer.start_with_context("http.insights.generate_agentic", &parent_context);
let normalized_path = normalize_path(&request.file_path);
let library = app_state.primary_library();
let gen_type = InsightGenerationType::Agentic;
span.set_attribute(KeyValue::new("file_path", normalized_path.clone()));
if let Some(ref model) = request.model {
span.set_attribute(KeyValue::new("model", model.clone()));
}
if let Some(ref backend) = request.backend {
span.set_attribute(KeyValue::new("backend", backend.clone()));
}
log::info!(
"Agentic insight generation triggered for photo: {} with model: {:?}",
normalized_path,
request.model
);
// Look up and abort any running job for this file, then cancel in DB
let old_job_ids: Vec<i32> = {
let mut dao = app_state
.insight_job_dao
.lock()
.expect("Unable to lock InsightJobDao");
let ctx = opentelemetry::Context::new();
let ids = dao
.get_active_job(&ctx, library.id, &normalized_path)
.ok()
.flatten()
.map(|j| vec![j.id])
.unwrap_or_default();
let _ = dao.cancel_active_jobs(&ctx, library.id, &normalized_path);
ids
};
for jid in &old_job_ids {
if let Some(handle) = app_state
.insight_job_handles
.lock()
.expect("Unable to lock InsightJobHandles")
.remove(jid)
{
handle.abort();
}
}
let job_id = {
let mut dao = app_state
.insight_job_dao
.lock()
.expect("Unable to lock InsightJobDao");
match dao.create_job(
&opentelemetry::Context::new(),
library.id,
&normalized_path,
gen_type,
) {
Ok(id) => id,
Err(e) => {
log::error!("Failed to create agentic generation job: {:?}", e);
span.set_status(Status::error("Failed to create generation job"));
return HttpResponse::InternalServerError().json(serde_json::json!({
"error": "Failed to create generation job"
}));
}
}
};
// Resolve few-shot ids for the background task
let fewshot_ids: Vec<i32> = match request.fewshot_insight_ids.as_deref() {
Some(ids) if !ids.is_empty() => ids.iter().take(2).copied().collect(),
_ => DEFAULT_FEWSHOT_INSIGHT_IDS
.iter()
.take(2)
.copied()
.collect(),
};
let fewshot_examples: Vec<Vec<ChatMessage>> = {
let otel_context = opentelemetry::Context::new();
let mut dao = app_state
.insight_chat
.insight_dao()
.lock()
.expect("Unable to lock InsightDao");
fewshot_ids
.iter()
.filter_map(|id| {
let insight = dao.get_insight_by_id(&otel_context, *id).ok().flatten()?;
let json = insight.training_messages?;
match serde_json::from_str::<Vec<ChatMessage>>(&json) {
Ok(msgs) => Some(msgs),
Err(e) => {
log::warn!(
"Few-shot insight {} has malformed training_messages: {}",
id,
e
);
None
}
}
})
.collect()
};
let user_id = claims.sub.parse::<i32>().unwrap_or(1);
let persona_id = request
.persona_id
.clone()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "default".to_string());
// Server-side persona resolution: an explicit client `system_prompt`
// wins; otherwise the persona's stored prompt from the persona store;
// otherwise None and `build_system_content` applies its neutral
// default. Without the lookup, a request carrying only `persona_id`
// silently generated in the default voice.
let system_prompt = request
.system_prompt
.clone()
.filter(|s| !s.trim().is_empty())
.or_else(|| {
app_state
.insight_generator
.persona_system_prompt(user_id, &persona_id)
});
let max_iterations: usize = std::env::var("AGENTIC_MAX_ITERATIONS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(12);
// Spawn background task with timeout
let generator = app_state.insight_generator.clone();
let job_dao = app_state.insight_job_dao.clone();
let job_handles = app_state.insight_job_handles.clone();
let path = normalized_path.clone();
let handle = tokio::spawn(async move {
let timeout_secs: u64 = std::env::var("INSIGHT_GENERATION_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(180);
let path_for_task = path.clone();
let generator_for_task = generator.clone();
let result = tokio::task::spawn(async move {
tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
generator_for_task.generate_agentic_insight_for_photo(
&path_for_task,
request.model.clone(),
system_prompt,
request.num_ctx,
request.temperature,
request.top_p,
request.top_k,
request.min_p,
max_iterations,
request.backend.clone(),
fewshot_examples,
fewshot_ids,
user_id,
persona_id,
),
)
.await
})
.await;
let ctx = opentelemetry::Context::new();
let mut dao = job_dao.lock().expect("Unable to lock InsightJobDao");
match result {
Ok(Ok(Ok((Some(insight_id), _, _)))) => {
if let Err(e) = dao.complete_job(&ctx, job_id, insight_id) {
log::error!("Failed to mark job {} as completed: {:?}", job_id, e);
}
}
Ok(Ok(Ok((None, _, _)))) => {
if let Err(e) = dao.fail_job(&ctx, job_id, "agentic generation returned no insight")
{
log::error!("Failed to mark job {} as failed: {:?}", job_id, e);
}
}
Ok(Ok(Err(e))) => {
log::error!("Agentic insight generation failed for {}: {:?}", path, e);
if let Err(err) = dao.fail_job(&ctx, job_id, &format!("{:?}", e)) {
log::error!("Failed to mark job {} as failed: {:?}", job_id, err);
}
}
Ok(Err(_)) => {
log::error!(
"Agentic insight generation timed out for {} after {}s",
path,
timeout_secs
);
if let Err(err) =
dao.fail_job(&ctx, job_id, &format!("timeout after {}s", timeout_secs))
{
log::error!("Failed to mark job {} as failed: {:?}", job_id, err);
}
}
Err(_) => {
log::error!("Agentic insight generation task panicked for {}", path);
if let Err(err) = dao.fail_job(&ctx, job_id, "generation task panicked") {
log::error!("Failed to mark job {} as failed: {:?}", job_id, err);
}
}
}
// Remove handle from map on completion
let mut handles = job_handles
.lock()
.expect("Unable to lock InsightJobHandles");
handles.remove(&job_id);
});
// Store abort handle
{
let mut handles = app_state
.insight_job_handles
.lock()
.expect("Unable to lock InsightJobHandles");
handles.insert(job_id, handle.abort_handle());
}
span.set_attribute(KeyValue::new("job_id", job_id as i64));
span.set_status(Status::Ok);
HttpResponse::Accepted().json(JobIdResponse { job_id })
}
/// GET /insights/models - Local-backend models with capabilities. Returns
/// Ollama servers when `LLM_BACKEND=ollama` (default), or llama-swap slots
/// when `LLM_BACKEND=llamacpp`. Same envelope shape either way so the
/// client picker doesn't have to branch on backend kind.
///
/// For llama-swap: `models` comes verbatim from `LLAMA_SWAP_ALLOWED_MODELS`
/// (no live `/v1/models` probe), `has_vision` is true only for the
/// configured `LLAMA_SWAP_VISION_MODEL` slot id, and `has_tool_calling` is
/// reported as true for every slot (llama-server is launched with `--jinja`
/// by convention — a misconfigured slot surfaces as a chat-call error).
#[get("/insights/models")]
pub async fn get_available_models_handler(
_claims: Claims,
app_state: web::Data<crate::state::AppState>,
) -> impl Responder {
log::debug!("Fetching available models with capabilities");
if crate::ai::local_backend_is_llamacpp()
&& let Some(lc) = app_state.llamacpp.as_ref()
{
let models: Vec<ModelCapabilities> = app_state
.llamacpp_allowed_models
.iter()
.map(|name| ModelCapabilities {
name: name.clone(),
has_vision: true,
has_tool_calling: true,
})
.collect();
let primary = ServerModels {
url: lc.base_url.clone(),
models,
default_model: lc.primary_model.clone(),
};
return HttpResponse::Ok().json(AvailableModelsResponse {
primary,
fallback: None,
});
}
let ollama_client = &app_state.ollama;
// Fetch models with capabilities from primary server
let primary_models =
match OllamaClient::list_models_with_capabilities(&ollama_client.primary_url).await {
Ok(models) => models,
Err(e) => {
log::warn!("Failed to fetch models from primary server: {:?}", e);
vec![]
}
};
let primary = ServerModels {
url: ollama_client.primary_url.clone(),
models: primary_models,
default_model: ollama_client.primary_model.clone(),
};
// Fetch models with capabilities from fallback server if configured
let fallback = if let Some(fallback_url) = &ollama_client.fallback_url {
match OllamaClient::list_models_with_capabilities(fallback_url).await {
Ok(models) => Some(ServerModels {
url: fallback_url.clone(),
models,
default_model: ollama_client
.fallback_model
.clone()
.unwrap_or_else(|| ollama_client.primary_model.clone()),
}),
Err(e) => {
log::warn!("Failed to fetch models from fallback server: {:?}", e);
None
}
}
} else {
None
};
let response = AvailableModelsResponse { primary, fallback };
HttpResponse::Ok().json(response)
}
#[derive(Debug, Serialize)]
pub struct OpenRouterModelsResponse {
pub models: Vec<String>,
pub default_model: Option<String>,
pub configured: bool,
}
/// GET /insights/openrouter/models - Curated OpenRouter model ids exposed
/// to clients for the hybrid backend. Returned verbatim from
/// `OPENROUTER_ALLOWED_MODELS`; no live call to OpenRouter.
#[get("/insights/openrouter/models")]
pub async fn get_openrouter_models_handler(
_claims: Claims,
app_state: web::Data<crate::state::AppState>,
) -> impl Responder {
let configured = app_state.openrouter.is_some();
let default_model = app_state
.openrouter
.as_ref()
.map(|c| c.primary_model.clone());
let response = OpenRouterModelsResponse {
models: app_state.openrouter_allowed_models.clone(),
default_model,
configured,
};
HttpResponse::Ok().json(response)
}
/// POST /insights/rate - Rate an insight (thumbs up/down for training data)
#[post("/insights/rate")]
pub async fn rate_insight_handler(
_claims: Claims,
request: web::Json<RateInsightRequest>,
insight_dao: web::Data<std::sync::Mutex<Box<dyn InsightDao>>>,
) -> impl Responder {
let normalized_path = normalize_path(&request.file_path);
log::info!(
"Rating insight for {} (id={:?}): approved={}",
normalized_path,
request.insight_id,
request.approved
);
let otel_context = opentelemetry::Context::new();
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
// Rate a specific version by id when provided (history view), otherwise
// rate the current insight for the path.
let result = match request.insight_id {
Some(id) => dao.rate_insight_by_id(&otel_context, id, request.approved),
None => dao.rate_insight(&otel_context, &normalized_path, request.approved),
};
match result {
Ok(()) => HttpResponse::Ok().json(serde_json::json!({
"success": true,
"message": "Insight rated successfully"
})),
Err(e) => {
log::error!("Failed to rate insight: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({
"error": format!("Failed to rate insight: {:?}", e)
}))
}
}
}
/// GET /insights/training-data - Export approved training data as JSONL
#[get("/insights/training-data")]
pub async fn export_training_data_handler(
_claims: Claims,
query: web::Query<ExportTrainingDataQuery>,
insight_dao: web::Data<std::sync::Mutex<Box<dyn InsightDao>>>,
) -> impl Responder {
let approved_only = query.approved_only.unwrap_or(true);
log::info!("Exporting training data (approved_only={})", approved_only);
let otel_context = opentelemetry::Context::new();
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
let insights = if approved_only {
dao.get_approved_insights(&otel_context)
} else {
dao.get_all_insights(&otel_context)
};
match insights {
Ok(insights) => {
let mut jsonl = String::new();
for insight in &insights {
if let Some(ref messages) = insight.training_messages {
let entry = serde_json::json!({
"file_path": insight.file_path,
"model_version": insight.model_version,
"generated_at": insight.generated_at,
"title": insight.title,
"summary": insight.summary,
"messages": serde_json::from_str::<serde_json::Value>(messages)
.unwrap_or(serde_json::Value::Null),
});
jsonl.push_str(&entry.to_string());
jsonl.push('\n');
}
}
HttpResponse::Ok()
.content_type("application/jsonl")
.insert_header((
"Content-Disposition",
"attachment; filename=\"training_data.jsonl\"",
))
.body(jsonl)
}
Err(e) => {
log::error!("Failed to export training data: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({
"error": format!("Failed to export training data: {:?}", e)
}))
}
}
}
#[derive(Debug, Deserialize)]
pub struct ChatTurnHttpRequest {
pub file_path: String,
#[serde(default)]
pub library: Option<String>,
pub user_message: String,
#[serde(default)]
pub model: Option<String>,
#[serde(default)]
pub backend: Option<String>,
#[serde(default)]
pub num_ctx: Option<i32>,
#[serde(default)]
pub temperature: Option<f32>,
#[serde(default)]
pub top_p: Option<f32>,
#[serde(default)]
pub top_k: Option<i32>,
#[serde(default)]
pub min_p: Option<f32>,
#[serde(default)]
pub max_iterations: Option<usize>,
/// Per-turn system-prompt override. Ephemeral in append mode,
/// persisted in amend / regenerate mode. See ChatTurnRequest for
/// semantics. Also seeds the bootstrap path when no insight exists.
#[serde(default)]
pub system_prompt: Option<String>,
/// Active persona id for this turn. New facts/recalls scope to it.
/// Defaults to `"default"` when missing.
#[serde(default)]
pub persona_id: Option<String>,
#[serde(default)]
pub amend: bool,
/// When true, force the bootstrap path even if an insight already
/// exists: flip the existing row(s) to `is_current=false` and create
/// a new insight row from this turn. Takes precedence over `amend`.
/// Collapses to a normal bootstrap when no insight exists.
#[serde(default)]
pub regenerate: bool,
}
#[derive(Debug, Serialize)]
pub struct ChatTurnHttpResponse {
pub assistant_message: String,
pub tool_calls_made: usize,
pub iterations_used: usize,
pub truncated: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub prompt_eval_count: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub eval_count: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub amended_insight_id: Option<i32>,
pub backend: String,
pub model: String,
}
/// POST /insights/chat — submit a follow-up turn against an existing insight.
#[post("/insights/chat")]
pub async fn chat_turn_handler(
http_request: HttpRequest,
claims: Claims,
request: web::Json<ChatTurnHttpRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
let parent_context = extract_context_from_request(&http_request);
let tracer = global_tracer();
let mut span = tracer.start_with_context("http.insights.chat", &parent_context);
span.set_attribute(KeyValue::new("file_path", request.file_path.clone()));
let library = match libraries::resolve_library_param(&app_state, request.library.as_deref()) {
Ok(Some(lib)) => lib,
Ok(None) => app_state.primary_library(),
Err(e) => {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": format!("invalid library: {}", e)
}));
}
};
// Service-token claims (sub: "service:apollo") fall through to
// user_id=1 — the operator convention. Mobile/web clients have a
// numeric sub. Required for the entity_facts composite FK.
let user_id = claims.sub.parse::<i32>().unwrap_or(1);
let chat_req = ChatTurnRequest {
library_id: library.id,
user_id,
file_path: request.file_path.clone(),
user_message: request.user_message.clone(),
model: request.model.clone(),
backend: request.backend.clone(),
num_ctx: request.num_ctx,
temperature: request.temperature,
top_p: request.top_p,
top_k: request.top_k,
min_p: request.min_p,
max_iterations: request.max_iterations,
system_prompt: request.system_prompt.clone(),
persona_id: request.persona_id.clone(),
amend: request.amend,
regenerate: request.regenerate,
};
match app_state.insight_chat.chat_turn(chat_req).await {
Ok(result) => {
span.set_status(Status::Ok);
HttpResponse::Ok().json(ChatTurnHttpResponse {
assistant_message: result.assistant_message,
tool_calls_made: result.tool_calls_made,
iterations_used: result.iterations_used,
truncated: result.truncated,
prompt_eval_count: result.prompt_eval_count,
eval_count: result.eval_count,
amended_insight_id: result.amended_insight_id,
backend: result.backend_used,
model: result.model_used,
})
}
Err(e) => {
let msg = format!("{}", e);
log::error!("Chat turn failed: {}", msg);
span.set_status(Status::error(msg.clone()));
// Map well-known errors to client-facing 4xx codes.
if msg.contains("no insight found") {
HttpResponse::NotFound().json(serde_json::json!({ "error": msg }))
} else if msg.contains("no chat history") {
HttpResponse::Conflict().json(serde_json::json!({ "error": msg }))
} else if msg.contains("user_message")
|| msg.contains("unknown backend")
|| msg.contains("switching from local to hybrid")
|| msg.contains("hybrid backend unavailable")
{
HttpResponse::BadRequest().json(serde_json::json!({ "error": msg }))
} else {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": msg }))
}
}
}
}
#[derive(Debug, Deserialize)]
pub struct ChatHistoryQuery {
pub path: String,
#[serde(default)]
pub library: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct ChatHistoryHttpResponse {
pub messages: Vec<RenderedHistoryMessage>,
pub turn_count: usize,
pub model_version: String,
pub backend: String,
}
#[derive(Debug, Serialize)]
pub struct RenderedHistoryMessage {
pub role: String,
pub content: String,
pub is_initial: bool,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub tools: Vec<HistoryToolInvocation>,
}
#[derive(Debug, Serialize)]
pub struct HistoryToolInvocation {
pub name: String,
pub arguments: serde_json::Value,
pub result: String,
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub result_truncated: bool,
}
#[derive(Debug, Deserialize)]
pub struct ChatRewindHttpRequest {
pub file_path: String,
#[serde(default)]
pub library: Option<String>,
/// 0-based index into the rendered transcript. The message at this
/// index, and everything after it, is discarded. Must be > 0 — the
/// initial user message is protected.
pub discard_from_rendered_index: usize,
}
/// POST /insights/chat/rewind — truncate the stored conversation so the
/// rendered message at `discard_from_rendered_index` (and everything after)
/// is removed. Use when a user wants to retry a turn with a different
/// prompt without prior replies poisoning context.
#[post("/insights/chat/rewind")]
pub async fn chat_rewind_handler(
_claims: Claims,
request: web::Json<ChatRewindHttpRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
let library = match libraries::resolve_library_param(&app_state, request.library.as_deref()) {
Ok(Some(lib)) => lib,
Ok(None) => app_state.primary_library(),
Err(e) => {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": format!("invalid library: {}", e)
}));
}
};
match app_state
.insight_chat
.rewind_history(
library.id,
&request.file_path,
request.discard_from_rendered_index,
)
.await
{
Ok(()) => HttpResponse::Ok().json(serde_json::json!({ "success": true })),
Err(e) => {
let msg = format!("{}", e);
log::error!("Chat rewind failed: {}", msg);
if msg.contains("no insight found") {
HttpResponse::NotFound().json(serde_json::json!({ "error": msg }))
} else if msg.contains("no chat history") {
HttpResponse::Conflict().json(serde_json::json!({ "error": msg }))
} else if msg.contains("cannot discard the initial") || msg.contains("out of range") {
HttpResponse::BadRequest().json(serde_json::json!({ "error": msg }))
} else {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": msg }))
}
}
}
}
/// GET /insights/chat/history — return the rendered transcript for a photo.
#[get("/insights/chat/history")]
pub async fn chat_history_handler(
_claims: Claims,
query: web::Query<ChatHistoryQuery>,
app_state: web::Data<AppState>,
) -> impl Responder {
// library_id scopes the lookup so a regenerate on this library
// isn't shadowed by an untouched is_current=true row in another
// library for the same rel_path. load_history falls back to the
// cross-library lookup when the scoped one misses, so a photo
// with no insight in this library but one in another still
// surfaces (the "show this photo's primary insight" merge case).
let library = libraries::resolve_library_param(&app_state, query.library.as_deref())
.ok()
.flatten()
.unwrap_or_else(|| app_state.primary_library());
match app_state.insight_chat.load_history(library.id, &query.path) {
Ok(view) => HttpResponse::Ok().json(ChatHistoryHttpResponse {
messages: view
.messages
.into_iter()
.map(|m| RenderedHistoryMessage {
role: m.role,
content: m.content,
is_initial: m.is_initial,
tools: m
.tools
.into_iter()
.map(|t| HistoryToolInvocation {
name: t.name,
arguments: t.arguments,
result: t.result,
result_truncated: t.result_truncated,
})
.collect(),
})
.collect(),
turn_count: view.turn_count,
model_version: view.model_version,
backend: view.backend,
}),
Err(e) => {
let msg = format!("{}", e);
if msg.contains("no insight found") {
HttpResponse::NotFound().json(serde_json::json!({ "error": msg }))
} else if msg.contains("no chat history") {
HttpResponse::Conflict().json(serde_json::json!({ "error": msg }))
} else {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": msg }))
}
}
}
}
/// POST /insights/chat/stream — streaming variant of /insights/chat.
/// Returns `text/event-stream` with one event per chat stream event.
#[post("/insights/chat/stream")]
pub async fn chat_stream_handler(
claims: Claims,
request: web::Json<ChatTurnHttpRequest>,
app_state: web::Data<AppState>,
) -> HttpResponse {
let library = match libraries::resolve_library_param(&app_state, request.library.as_deref()) {
Ok(Some(lib)) => lib,
Ok(None) => app_state.primary_library(),
Err(e) => {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": format!("invalid library: {}", e)
}));
}
};
// Service-token sub falls through to user_id=1 (see chat_turn_handler).
let user_id = claims.sub.parse::<i32>().unwrap_or(1);
let chat_req = ChatTurnRequest {
library_id: library.id,
user_id,
file_path: request.file_path.clone(),
user_message: request.user_message.clone(),
model: request.model.clone(),
backend: request.backend.clone(),
num_ctx: request.num_ctx,
temperature: request.temperature,
top_p: request.top_p,
top_k: request.top_k,
min_p: request.min_p,
max_iterations: request.max_iterations,
system_prompt: request.system_prompt.clone(),
persona_id: request.persona_id.clone(),
amend: request.amend,
regenerate: request.regenerate,
};
let service = app_state.insight_chat.clone();
let events = service.chat_turn_stream(chat_req);
// Map ChatStreamEvent → SSE frame bytes.
let sse_stream = futures::stream::StreamExt::map(events, |ev| {
let frame = render_sse_frame(&ev);
Ok::<_, actix_web::Error>(actix_web::web::Bytes::from(frame))
});
HttpResponse::Ok()
.content_type("text/event-stream")
.insert_header(("Cache-Control", "no-cache"))
.insert_header(("X-Accel-Buffering", "no")) // nginx: disable response buffering
.streaming(sse_stream)
}
fn render_sse_frame(ev: &ChatStreamEvent) -> String {
let (event_name, payload) = sse_event_payload(ev);
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
format!("event: {}\ndata: {}\n\n", event_name, data)
}
/// Like `render_sse_frame`, but stamps the event's absolute sequence number
/// (`seq`) into the payload so reconnecting replay clients can compute
/// `skip_before` precisely. `seq` is distinct from the tool-pairing `index`
/// already carried by `tool_call`/`tool_result`.
fn render_indexed_frame(ev: &ChatStreamEvent, seq: u32) -> String {
let (event_name, mut payload) = sse_event_payload(ev);
if let serde_json::Value::Object(map) = &mut payload {
map.insert("seq".to_string(), serde_json::json!(seq));
}
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
format!("event: {}\ndata: {}\n\n", event_name, data)
}
fn sse_event_payload(ev: &ChatStreamEvent) -> (&'static str, serde_json::Value) {
match ev {
ChatStreamEvent::IterationStart { n, max } => {
("iteration_start", serde_json::json!({ "n": n, "max": max }))
}
ChatStreamEvent::Truncated => ("truncated", serde_json::json!({})),
ChatStreamEvent::TextDelta(delta) => ("text", serde_json::json!({ "delta": delta })),
ChatStreamEvent::ToolCall {
index,
name,
arguments,
} => (
"tool_call",
serde_json::json!({ "index": index, "name": name, "arguments": arguments }),
),
ChatStreamEvent::ToolResult {
index,
name,
result,
result_truncated,
} => (
"tool_result",
serde_json::json!({
"index": index,
"name": name,
"result": result,
"result_truncated": result_truncated,
}),
),
ChatStreamEvent::Done {
tool_calls_made,
iterations_used,
truncated,
prompt_tokens,
eval_tokens,
num_ctx,
amended_insight_id,
backend_used,
model_used,
cancelled,
} => (
"done",
serde_json::json!({
"tool_calls_made": tool_calls_made,
"iterations_used": iterations_used,
"truncated": truncated,
"prompt_tokens": prompt_tokens,
"eval_tokens": eval_tokens,
"num_ctx": num_ctx,
"amended_insight_id": amended_insight_id,
"backend": backend_used,
"model": model_used,
"cancelled": cancelled,
}),
),
// Apollo's frontend SSE consumer (and its free-chat backend, which
// is the de-facto convention) listens for `error_message`. Emitting
// `error` here meant any failure on the photo-chat path (e.g.
// "no insight found for path") was silently dropped, leaving an
// empty assistant bubble with no clue why the turn died.
ChatStreamEvent::Error(msg) => ("error_message", serde_json::json!({ "message": msg })),
}
}
/// POST /insights/chat/turn — async turn dispatch. Returns turn_id immediately,
/// client then polls GET /insights/chat/turn/{turn_id} for SSE replay.
#[post("/insights/chat/turn")]
pub async fn turn_async_handler(
http_request: HttpRequest,
claims: Claims,
request: web::Json<ChatTurnHttpRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
let parent_context = extract_context_from_request(&http_request);
let tracer = global_tracer();
let mut span = tracer.start_with_context("http.insights.chat_turn_async", &parent_context);
span.set_attribute(KeyValue::new("file_path", request.file_path.clone()));
let library = match libraries::resolve_library_param(&app_state, request.library.as_deref()) {
Ok(Some(lib)) => lib,
Ok(None) => app_state.primary_library(),
Err(e) => {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": format!("invalid library: {}", e)
}));
}
};
let user_id = claims.sub.parse::<i32>().unwrap_or(1);
let chat_req = ChatTurnRequest {
library_id: library.id,
user_id,
file_path: request.file_path.clone(),
user_message: request.user_message.clone(),
model: request.model.clone(),
backend: request.backend.clone(),
num_ctx: request.num_ctx,
temperature: request.temperature,
top_p: request.top_p,
top_k: request.top_k,
min_p: request.min_p,
max_iterations: request.max_iterations,
system_prompt: request.system_prompt.clone(),
persona_id: request.persona_id.clone(),
amend: request.amend,
regenerate: request.regenerate,
};
let service = app_state.insight_chat.clone();
let registry = app_state.turn_registry.clone();
let turn_id = service.chat_turn_async(registry, chat_req).await;
span.set_attribute(KeyValue::new("turn_id", turn_id.clone()));
span.set_status(Status::Ok);
HttpResponse::Accepted().json(serde_json::json!({
"turn_id": turn_id,
"status": "running"
}))
}
/// Query params for the SSE replay stream.
#[derive(Debug, Deserialize)]
pub struct ReplayQuery {
/// Replay events from this absolute sequence number (`seq`) onward.
/// Absent or 0 replays from the beginning. On reconnect the client sends
/// the `seq` of the last event it applied, plus one.
pub skip_before: Option<u32>,
}
/// GET /insights/chat/turn/{turn_id} — SSE replay stream.
#[get("/insights/chat/turn/{turn_id}")]
pub async fn turn_replay_handler(
http_request: HttpRequest,
path: web::Path<String>,
query: web::Query<ReplayQuery>,
app_state: web::Data<AppState>,
) -> HttpResponse {
use crate::ai::turn_registry::ReplayOutcome;
let turn_id = path.into_inner();
let skip_before = query.skip_before.unwrap_or(0);
let parent_context = extract_context_from_request(&http_request);
let tracer = global_tracer();
let mut span = tracer.start_with_context("ai.chat.turn.replay", &parent_context);
span.set_attribute(KeyValue::new("turn_id", turn_id.clone()));
span.set_attribute(KeyValue::new("skip_before", skip_before as i64));
let registry = app_state.turn_registry.clone();
let entry = match registry.get(&turn_id).await {
Some(e) => e,
None => {
span.set_status(Status::error("turn not found"));
return HttpResponse::NotFound().json(serde_json::json!({
"error": format!("turn {} not found", turn_id)
}));
}
};
let info = entry.info().await;
span.set_attribute(KeyValue::new("status", info.status.as_str()));
span.set_attribute(KeyValue::new(
"event_count",
info.total_events_pushed as i64,
));
let turn_info_frame = render_turn_info_frame(&info);
// Initial buffered batch: events produced before this connection attached.
// Stamp each frame with its absolute `seq` so the client can track
// `skip_before` precisely across reconnects.
let (initial_frames, start_skip) = match entry.replay_from(skip_before).await {
ReplayOutcome::Gone => {
span.set_status(Status::error("buffer evicted"));
return HttpResponse::Gone().json(serde_json::json!({
"error": "turn history has expired (buffer evicted)"
}));
}
ReplayOutcome::CaughtUp { next_skip } => (Vec::new(), next_skip),
ReplayOutcome::Events { events, next_skip } => {
let frames: Vec<actix_web::web::Bytes> = events
.into_iter()
.enumerate()
.map(|(i, ev)| {
actix_web::web::Bytes::from(render_indexed_frame(&ev, skip_before + i as u32))
})
.collect();
(frames, next_skip)
}
};
span.set_status(Status::Ok);
let running = entry.is_running();
// Head: the `turn_info` event followed by any already-buffered events.
let head = futures::stream::once(async move {
Ok::<_, actix_web::Error>(actix_web::web::Bytes::from(turn_info_frame))
})
.chain(futures::stream::iter(
initial_frames.into_iter().map(Ok::<_, actix_web::Error>),
));
if !running {
// Completed turn: every event — including the terminal Done/Error — is
// already in the buffered batch above. Emit it and close.
return HttpResponse::Ok()
.content_type("text/event-stream")
.insert_header(("Cache-Control", "no-cache"))
.insert_header(("X-Accel-Buffering", "no"))
.streaming(head);
}
// In-progress turn: after the head, wait for new events. `next_batch`
// drains every buffered event (including the terminal one) before it
// reports the turn finished, so the final Done/Error is never dropped;
// CaughtUp then closes the stream by returning None.
let tail = futures::stream::unfold(
(
entry,
start_skip,
Vec::<actix_web::web::Bytes>::new(),
false,
),
|(entry, skip, pending, finished)| async move {
// Flush queued frames from a previous multi-event batch first.
if let Some((first, rest)) = pending.split_first() {
return Some((Ok(first.clone()), (entry, skip, rest.to_vec(), finished)));
}
if finished {
return None;
}
match entry.next_batch(skip).await {
ReplayOutcome::Events { events, next_skip } => {
let frames: Vec<actix_web::web::Bytes> = events
.into_iter()
.enumerate()
.map(|(i, ev)| {
actix_web::web::Bytes::from(render_indexed_frame(&ev, skip + i as u32))
})
.collect();
// next_batch only returns Events for a non-empty batch.
let (first, rest) = frames.split_first().expect("non-empty batch");
Some((Ok(first.clone()), (entry, next_skip, rest.to_vec(), false)))
}
// Terminal reached and fully drained — close the connection.
ReplayOutcome::CaughtUp { .. } => None,
ReplayOutcome::Gone => {
// Evicted mid-stream: emit one error frame, then close.
let gone =
actix_web::web::Bytes::from(render_sse_frame(&ChatStreamEvent::Error(
"turn history has expired (buffer evicted)".to_string(),
)));
Some((Ok(gone), (entry, skip, Vec::new(), true)))
}
}
},
);
HttpResponse::Ok()
.content_type("text/event-stream")
.insert_header(("Cache-Control", "no-cache"))
.insert_header(("X-Accel-Buffering", "no"))
.streaming(head.chain(tail))
}
fn render_turn_info_frame(info: &crate::ai::turn_registry::TurnInfo) -> String {
let payload = serde_json::json!({
"turn_id": info.turn_id,
"file_path": info.file_path,
"library_id": info.library_id,
"status": info.status.as_str(),
"total_events_pushed": info.total_events_pushed,
"buffered_count": info.buffered_count,
});
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
format!("event: turn_info\ndata: {}\n\n", data)
}
/// DELETE /insights/chat/turn/{turn_id} — cancel a running turn.
#[delete("/insights/chat/turn/{turn_id}")]
pub async fn cancel_turn_handler(
http_request: HttpRequest,
path: web::Path<String>,
app_state: web::Data<AppState>,
) -> impl Responder {
let turn_id = path.into_inner();
let parent_context = extract_context_from_request(&http_request);
let tracer = global_tracer();
let mut span = tracer.start_with_context("ai.chat.turn.cancel", &parent_context);
span.set_attribute(KeyValue::new("turn_id", turn_id.clone()));
let registry = app_state.turn_registry.clone();
let entry = match registry.get(&turn_id).await {
Some(e) => e,
None => {
span.set_status(Status::error("turn not found"));
return HttpResponse::NotFound().json(serde_json::json!({
"error": format!("turn {} not found", turn_id)
}));
}
};
// Abort the spawned task so it stops producing events promptly. The loop
// also checks `is_running()` at each iteration boundary as a graceful
// backstop in case the abort lands between await points.
let aborted = entry.abort();
span.set_attribute(KeyValue::new("aborted", aborted));
// Push the terminal event BEFORE flipping status: a replay reader treats a
// terminal status with no buffered tail as "closed", so the Done must be
// buffered first for in-progress connections to receive it.
let _ = entry
.push_event(ChatStreamEvent::Done {
tool_calls_made: 0,
iterations_used: 0,
truncated: false,
prompt_tokens: None,
eval_tokens: None,
num_ctx: None,
amended_insight_id: None,
backend_used: "cancelled".to_string(),
model_used: "cancelled".to_string(),
cancelled: true,
})
.await;
entry.set_terminal_status(crate::ai::turn_registry::TurnStatus::Cancelled);
span.set_status(Status::Ok);
HttpResponse::Ok().json(serde_json::json!({
"cancelled": true
}))
}
#[cfg(test)]
mod turn_replay_tests {
use super::{cancel_turn_handler, render_indexed_frame, turn_replay_handler};
use crate::ai::insight_chat::ChatStreamEvent;
use crate::ai::turn_registry::{TurnEntry, TurnStatus};
use crate::state::AppState;
use actix_web::test as actix_test;
use actix_web::{App, web::Data};
use std::sync::Arc;
/// Serialize `AppState::test_state()` construction across the parallel
/// tests in this module: each build opens ~10 DAO connections to the one
/// shared `DATABASE_URL` file, and doing several at once races the WAL
/// `journal_mode` switch into a spurious "database is locked". The test
/// bodies themselves still run in parallel; only the open is gated.
static DB_INIT: std::sync::Mutex<()> = std::sync::Mutex::new(());
fn build_state() -> Data<AppState> {
let _guard = DB_INIT.lock().unwrap_or_else(|p| p.into_inner());
Data::new(AppState::test_state())
}
fn done(cancelled: bool) -> ChatStreamEvent {
ChatStreamEvent::Done {
tool_calls_made: 0,
iterations_used: 1,
truncated: false,
prompt_tokens: Some(10),
eval_tokens: Some(20),
num_ctx: None,
amended_insight_id: None,
backend_used: "local".into(),
model_used: "m".into(),
cancelled,
}
}
/// Seed a completed turn (events + terminal Done) directly in the registry.
async fn seed_completed(state: &AppState, id: &str, text_events: usize) {
let entry = Arc::new(TurnEntry::new(id.into(), "/p.jpg".into(), 1));
for i in 0..text_events {
entry
.push_event(ChatStreamEvent::TextDelta(format!("d{i}")))
.await;
}
entry.push_event(done(false)).await;
entry.set_terminal_status(TurnStatus::Done);
state.turn_registry.insert(entry).await;
}
#[test]
fn indexed_frame_stamps_seq_without_clobbering_tool_index() {
// tool_call carries its own pairing `index`; `seq` must be additive.
let frame = render_indexed_frame(
&ChatStreamEvent::ToolCall {
index: 3,
name: "geo".into(),
arguments: serde_json::json!({}),
},
42,
);
assert!(frame.contains("event: tool_call"));
assert!(frame.contains("\"index\":3"));
assert!(frame.contains("\"seq\":42"));
}
#[actix_rt::test]
async fn replay_unknown_turn_is_404() {
let state = build_state();
let app = actix_test::init_service(
App::new()
.service(turn_replay_handler)
.app_data(state.clone()),
)
.await;
let req = actix_test::TestRequest::get()
.uri("/insights/chat/turn/nope")
.to_request();
let resp = actix_test::call_service(&app, req).await;
assert_eq!(resp.status(), 404);
}
#[actix_rt::test]
async fn replay_completed_turn_emits_turn_info_and_done_with_seq() {
let state = build_state();
seed_completed(&state, "t1", 2).await;
let app = actix_test::init_service(
App::new()
.service(turn_replay_handler)
.app_data(state.clone()),
)
.await;
let req = actix_test::TestRequest::get()
.uri("/insights/chat/turn/t1")
.to_request();
let resp = actix_test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
let body = String::from_utf8(actix_test::read_body(resp).await.to_vec()).unwrap();
assert!(body.contains("event: turn_info"));
assert!(body.contains("event: text"));
assert!(body.contains("event: done"));
// Events are seq-stamped 0,1 (text) and 2 (done).
assert!(body.contains("\"seq\":0"));
assert!(body.contains("\"seq\":2"));
// Done payload carries the renamed token fields the client reads.
assert!(body.contains("\"prompt_tokens\":10"));
}
#[actix_rt::test]
async fn replay_skip_before_query_skips_applied_events() {
let state = build_state();
seed_completed(&state, "t2", 3).await; // seqs 0,1,2 text; 3 done
let app = actix_test::init_service(
App::new()
.service(turn_replay_handler)
.app_data(state.clone()),
)
.await;
let req = actix_test::TestRequest::get()
.uri("/insights/chat/turn/t2?skip_before=2")
.to_request();
let resp = actix_test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
let body = String::from_utf8(actix_test::read_body(resp).await.to_vec()).unwrap();
// Only seq 2 (last text) and seq 3 (done) should be present.
assert!(body.contains("\"seq\":2"));
assert!(body.contains("\"seq\":3"));
assert!(!body.contains("\"seq\":0"));
assert!(!body.contains("\"seq\":1"));
}
#[actix_rt::test]
async fn replay_evicted_index_is_410() {
let state = build_state();
let entry = Arc::new(TurnEntry::new("t3".into(), "/p.jpg".into(), 1));
// Push past the cap so the front is evicted and base advances.
for i in 0..600 {
entry
.push_event(ChatStreamEvent::TextDelta(format!("d{i}")))
.await;
}
entry.set_terminal_status(TurnStatus::Done);
state.turn_registry.insert(entry).await;
let app = actix_test::init_service(
App::new()
.service(turn_replay_handler)
.app_data(state.clone()),
)
.await;
let req = actix_test::TestRequest::get()
.uri("/insights/chat/turn/t3?skip_before=0")
.to_request();
let resp = actix_test::call_service(&app, req).await;
assert_eq!(resp.status(), 410);
}
#[actix_rt::test]
async fn cancel_unknown_turn_is_404() {
let state = build_state();
let app = actix_test::init_service(
App::new()
.service(cancel_turn_handler)
.app_data(state.clone()),
)
.await;
let req = actix_test::TestRequest::delete()
.uri("/insights/chat/turn/nope")
.to_request();
let resp = actix_test::call_service(&app, req).await;
assert_eq!(resp.status(), 404);
}
#[actix_rt::test]
async fn cancel_running_turn_marks_cancelled_and_buffers_terminal() {
let state = build_state();
let entry = Arc::new(TurnEntry::new("t4".into(), "/p.jpg".into(), 1));
entry
.push_event(ChatStreamEvent::TextDelta("partial".into()))
.await;
state.turn_registry.insert(entry.clone()).await;
let app = actix_test::init_service(
App::new()
.service(cancel_turn_handler)
.app_data(state.clone()),
)
.await;
let req = actix_test::TestRequest::delete()
.uri("/insights/chat/turn/t4")
.to_request();
let resp = actix_test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
// Status flipped to Cancelled and a terminal Done(cancelled) buffered
// after the existing event, so a late replay reader still completes.
assert_eq!(
TurnStatus::from(entry.status.load(std::sync::atomic::Ordering::Relaxed)),
TurnStatus::Cancelled
);
let info = entry.info().await;
assert_eq!(info.total_events_pushed, 2);
}
}