From 8e4f91561b9b797de02fadcb78a9c9a24eb2d767 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Sun, 7 Jun 2026 18:28:22 -0400 Subject: [PATCH 1/6] Add per-file insight history endpoint and rate-by-id Expose GET /insights/history?path=... returning every generated version of a photo's insight (current plus superseded), newest-first, backing the mobile per-file insight history view. - New get_insight_history_handler; reuses the existing get_insight_history DAO method (removed its dead_code allow). - impl From for PhotoInsightResponse, collapsing the mapping that was duplicated across the single-get and all-insights handlers. - rate_insight_by_id DAO method + optional insight_id on RateInsightRequest so previously generated versions can be approved/rejected (the path-based rate only touches the current row). - DAO tests for history ordering/scoping and id-targeted rating. - cargo fmt normalized a multi-line assert in insight_chat.rs tests. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/ai/handlers.rs | 122 ++++++++++++++++++++++------------- src/ai/insight_chat.rs | 5 +- src/ai/mod.rs | 3 +- src/database/insights_dao.rs | 119 +++++++++++++++++++++++++++++++++- src/main.rs | 1 + 5 files changed, 201 insertions(+), 49 deletions(-) diff --git a/src/ai/handlers.rs b/src/ai/handlers.rs index 9fbe6b7..9bcb048 100644 --- a/src/ai/handlers.rs +++ b/src/ai/handlers.rs @@ -8,7 +8,7 @@ use crate::ai::insight_chat::{ChatStreamEvent, ChatTurnRequest}; use crate::ai::ollama::ChatMessage; use crate::ai::{ModelCapabilities, OllamaClient}; use crate::data::Claims; -use crate::database::models::{InsightGenerationType, InsightJobStatus}; +use crate::database::models::{InsightGenerationType, InsightJobStatus, PhotoInsight}; use crate::database::{ExifDao, InsightDao}; use crate::libraries; use crate::otel::{extract_context_from_request, global_tracer}; @@ -273,6 +273,11 @@ pub async fn cancel_generation_handler( pub struct RateInsightRequest { pub file_path: String, pub approved: bool, + /// When set, rate this specific insight version by primary key + /// (used by the per-file history view to rate superseded versions). + /// When omitted, the current insight for `file_path` is rated. + #[serde(default)] + pub insight_id: Option, } #[derive(Debug, Deserialize)] @@ -333,6 +338,31 @@ pub struct PhotoInsightResponse { pub persona_id: Option, } +impl From for PhotoInsightResponse { + fn from(insight: PhotoInsight) -> Self { + PhotoInsightResponse { + id: insight.id, + file_path: insight.file_path, + title: insight.title, + summary: insight.summary, + generated_at: insight.generated_at, + model_version: insight.model_version, + prompt_eval_count: insight.prompt_eval_count, + eval_count: insight.eval_count, + approved: insight.approved, + has_training_messages: insight.training_messages.is_some(), + backend: insight.backend, + num_ctx: insight.num_ctx, + temperature: insight.temperature, + top_p: insight.top_p, + top_k: insight.top_k, + min_p: insight.min_p, + system_prompt: insight.system_prompt, + persona_id: insight.persona_id, + } + } +} + #[derive(Debug, Serialize)] pub struct AvailableModelsResponse { pub primary: ServerModels, @@ -554,29 +584,7 @@ pub async fn get_insight_handler( let mut dao = insight_dao.lock().expect("Unable to lock InsightDao"); match dao.get_insight_for_paths(&otel_context, &sibling_paths) { - Ok(Some(insight)) => { - let response = PhotoInsightResponse { - id: insight.id, - file_path: insight.file_path, - title: insight.title, - summary: insight.summary, - generated_at: insight.generated_at, - model_version: insight.model_version, - prompt_eval_count: insight.prompt_eval_count, - eval_count: insight.eval_count, - approved: insight.approved, - has_training_messages: insight.training_messages.is_some(), - backend: insight.backend, - num_ctx: insight.num_ctx, - temperature: insight.temperature, - top_p: insight.top_p, - top_k: insight.top_k, - min_p: insight.min_p, - system_prompt: insight.system_prompt, - persona_id: insight.persona_id, - }; - HttpResponse::Ok().json(response) - } + Ok(Some(insight)) => HttpResponse::Ok().json(PhotoInsightResponse::from(insight)), Ok(None) => HttpResponse::NotFound().json(serde_json::json!({ "error": "Insight not found" })), @@ -631,26 +639,7 @@ pub async fn get_all_insights_handler( Ok(insights) => { let responses: Vec = insights .into_iter() - .map(|insight| PhotoInsightResponse { - id: insight.id, - file_path: insight.file_path, - title: insight.title, - summary: insight.summary, - generated_at: insight.generated_at, - model_version: insight.model_version, - prompt_eval_count: insight.prompt_eval_count, - eval_count: insight.eval_count, - approved: insight.approved, - has_training_messages: insight.training_messages.is_some(), - backend: insight.backend, - num_ctx: insight.num_ctx, - temperature: insight.temperature, - top_p: insight.top_p, - top_k: insight.top_k, - min_p: insight.min_p, - system_prompt: insight.system_prompt, - persona_id: insight.persona_id, - }) + .map(PhotoInsightResponse::from) .collect(); HttpResponse::Ok().json(responses) @@ -664,6 +653,39 @@ pub async fn get_all_insights_handler( } } +/// GET /insights/history?path=/path/to/photo.jpg - Get all insight versions +/// for a single photo (current plus previously generated/superseded ones), +/// newest first. Backs the per-file insight history view. +#[get("/insights/history")] +pub async fn get_insight_history_handler( + _claims: Claims, + query: web::Query, + insight_dao: web::Data>>, +) -> impl Responder { + let normalized_path = normalize_path(&query.path); + log::debug!("Fetching insight history for {}", normalized_path); + + let otel_context = opentelemetry::Context::new(); + let mut dao = insight_dao.lock().expect("Unable to lock InsightDao"); + + match dao.get_insight_history(&otel_context, &normalized_path) { + Ok(insights) => { + let responses: Vec = insights + .into_iter() + .map(PhotoInsightResponse::from) + .collect(); + + HttpResponse::Ok().json(responses) + } + Err(e) => { + log::error!("Failed to fetch insight history ({}): {:?}", &query.path, e); + HttpResponse::InternalServerError().json(serde_json::json!({ + "error": format!("Failed to fetch insight history: {:?}", e) + })) + } + } +} + /// POST /insights/generate/agentic - Generate insight using agentic tool-calling loop (async) #[post("/insights/generate/agentic")] pub async fn generate_agentic_insight_handler( @@ -1012,15 +1034,23 @@ pub async fn rate_insight_handler( ) -> impl Responder { let normalized_path = normalize_path(&request.file_path); log::info!( - "Rating insight for {}: approved={}", + "Rating insight for {} (id={:?}): approved={}", normalized_path, + request.insight_id, request.approved ); let otel_context = opentelemetry::Context::new(); let mut dao = insight_dao.lock().expect("Unable to lock InsightDao"); - match dao.rate_insight(&otel_context, &normalized_path, request.approved) { + // Rate a specific version by id when provided (history view), otherwise + // rate the current insight for the path. + let result = match request.insight_id { + Some(id) => dao.rate_insight_by_id(&otel_context, id, request.approved), + None => dao.rate_insight(&otel_context, &normalized_path, request.approved), + }; + + match result { Ok(()) => HttpResponse::Ok().json(serde_json::json!({ "success": true, "message": "Insight rated successfully" diff --git a/src/ai/insight_chat.rs b/src/ai/insight_chat.rs index f4d478b..3c5eb26 100644 --- a/src/ai/insight_chat.rs +++ b/src/ai/insight_chat.rs @@ -2473,7 +2473,10 @@ mod tests { let original_len = msgs.len(); let dropped = apply_context_budget(&mut msgs, budget_bytes); - assert!(!dropped, "short conversation with one image must not truncate"); + assert!( + !dropped, + "short conversation with one image must not truncate" + ); assert_eq!(msgs.len(), original_len, "no messages should be dropped"); // Sanity: the flat image charge is accounted for but stays well under budget. assert!(estimate_bytes(&msgs) <= budget_bytes); diff --git a/src/ai/mod.rs b/src/ai/mod.rs index e61eace..40a3f21 100644 --- a/src/ai/mod.rs +++ b/src/ai/mod.rs @@ -25,7 +25,8 @@ pub use handlers::{ chat_stream_handler, chat_turn_handler, delete_insight_handler, export_training_data_handler, generate_agentic_insight_handler, generate_insight_handler, generation_status_handler, get_all_insights_handler, get_available_models_handler, get_insight_handler, - get_openrouter_models_handler, rate_insight_handler, turn_async_handler, turn_replay_handler, + get_insight_history_handler, get_openrouter_models_handler, rate_insight_handler, + turn_async_handler, turn_replay_handler, }; pub use insight_generator::InsightGenerator; pub use llamacpp::LlamaCppClient; diff --git a/src/database/insights_dao.rs b/src/database/insights_dao.rs index 6b467ea..3880550 100644 --- a/src/database/insights_dao.rs +++ b/src/database/insights_dao.rs @@ -47,7 +47,6 @@ pub trait InsightDao: Sync + Send { paths: &[String], ) -> Result, DbError>; - #[allow(dead_code)] fn get_insight_history( &mut self, context: &opentelemetry::Context, @@ -82,6 +81,17 @@ pub trait InsightDao: Sync + Send { approved: bool, ) -> Result<(), DbError>; + /// Rate a specific insight version by primary key, regardless of + /// `is_current`. Used by the per-file history view to approve/reject + /// previously generated (superseded) versions, which the path-based + /// `rate_insight` (current row only) cannot reach. + fn rate_insight_by_id( + &mut self, + context: &opentelemetry::Context, + insight_id: i32, + approved: bool, + ) -> Result<(), DbError>; + fn get_approved_insights( &mut self, context: &opentelemetry::Context, @@ -352,6 +362,26 @@ impl InsightDao for SqliteInsightDao { .map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) } + fn rate_insight_by_id( + &mut self, + context: &opentelemetry::Context, + target_id: i32, + is_approved: bool, + ) -> Result<(), DbError> { + trace_db_call(context, "update", "rate_insight_by_id", |_span| { + use schema::photo_insights::dsl::*; + + let mut connection = self.connection.lock().expect("Unable to get InsightDao"); + + diesel::update(photo_insights.find(target_id)) + .set(approved.eq(Some(is_approved))) + .execute(connection.deref_mut()) + .map(|_| ()) + .map_err(|e| anyhow::anyhow!("Update error: {}", e)) + }) + .map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) + } + fn get_approved_insights( &mut self, context: &opentelemetry::Context, @@ -396,3 +426,90 @@ impl InsightDao for SqliteInsightDao { .map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::database::test::in_memory_db_connection; + + fn dao() -> SqliteInsightDao { + let conn = Arc::new(Mutex::new(in_memory_db_connection())); + SqliteInsightDao::from_connection(conn) + } + + /// Build an insight insert with sensible defaults; tests override the + /// fields they care about (path, generated_at, model). + fn insert(path: &str, generated_at: i64, model: &str) -> InsertPhotoInsight { + InsertPhotoInsight { + library_id: 1, + file_path: path.to_string(), + title: format!("title for {model}"), + summary: "summary".to_string(), + generated_at, + model_version: model.to_string(), + is_current: true, + training_messages: None, + backend: "local".to_string(), + fewshot_source_ids: None, + content_hash: None, + num_ctx: None, + temperature: None, + top_p: None, + top_k: None, + min_p: None, + system_prompt: None, + persona_id: None, + prompt_eval_count: None, + eval_count: None, + } + } + + #[test] + fn get_insight_history_returns_all_versions_newest_first() { + let cx = opentelemetry::Context::new(); + let mut dao = dao(); + + // store_insight flips prior rows to is_current=false, so three + // generations for the same path leave a 3-row history. + dao.store_insight(&cx, insert("a.jpg", 100, "m1")).unwrap(); + dao.store_insight(&cx, insert("a.jpg", 200, "m2")).unwrap(); + dao.store_insight(&cx, insert("a.jpg", 300, "m3")).unwrap(); + // A different path must not leak into the history. + dao.store_insight(&cx, insert("b.jpg", 250, "other")) + .unwrap(); + + let history = dao.get_insight_history(&cx, "a.jpg").unwrap(); + assert_eq!(history.len(), 3); + assert_eq!( + history.iter().map(|i| i.generated_at).collect::>(), + vec![300, 200, 100], + "history should be newest-first" + ); + // Exactly one version is current (the latest generation). + let current: Vec<_> = history.iter().filter(|i| i.is_current).collect(); + assert_eq!(current.len(), 1); + assert_eq!(current[0].generated_at, 300); + } + + #[test] + fn rate_insight_by_id_rates_only_the_targeted_version() { + let cx = opentelemetry::Context::new(); + let mut dao = dao(); + + dao.store_insight(&cx, insert("a.jpg", 100, "m1")).unwrap(); + dao.store_insight(&cx, insert("a.jpg", 200, "m2")).unwrap(); + + // History is newest-first: [200 (current), 100 (superseded)]. + let history = dao.get_insight_history(&cx, "a.jpg").unwrap(); + let old_version = history.iter().find(|i| i.generated_at == 100).unwrap(); + assert!(!old_version.is_current); + + dao.rate_insight_by_id(&cx, old_version.id, true).unwrap(); + + let history = dao.get_insight_history(&cx, "a.jpg").unwrap(); + let old = history.iter().find(|i| i.generated_at == 100).unwrap(); + let current = history.iter().find(|i| i.generated_at == 200).unwrap(); + assert_eq!(old.approved, Some(true), "targeted version is rated"); + assert_eq!(current.approved, None, "current version is untouched"); + } +} diff --git a/src/main.rs b/src/main.rs index 8b06228..f27cf8f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -351,6 +351,7 @@ fn main() -> std::io::Result<()> { .service(ai::get_insight_handler) .service(ai::delete_insight_handler) .service(ai::get_all_insights_handler) + .service(ai::get_insight_history_handler) .service(ai::get_available_models_handler) .service(ai::get_openrouter_models_handler) .service(ai::chat_turn_handler) From 592dfcb42cb542a73750babdba8e952aea8ba780 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Tue, 9 Jun 2026 18:29:06 -0400 Subject: [PATCH 2/6] Accumulate streamed tool calls across chunks in Ollama streaming Ollama >=0.8 can stream tool_calls incrementally across NDJSON chunks; chat_with_tools_stream did `tool_calls = Some(tcs)` per chunk, so only the last chunk's calls survived assembly and earlier calls were silently dropped. Append into the accumulator instead. - ollama: append_streamed_tool_calls helper + tests covering two calls arriving in separate chunks and the single-chunk batch case. - llamacpp: the SSE delta assembly was already correct (per-index BTreeMap, same-index argument fragments concatenate, distinct indexes accumulate); extracted it into apply_tool_call_deltas / finalize_tool_calls and added tests pinning that behavior. - llm_client: new shared strip_think_blocks (moved from ollama's private extract_final_answer, which now delegates) so the tool-calling final content paths can reuse it; unit tests for tagged/plain/unclosed/empty cases. Co-Authored-By: Claude Fable 5 --- src/ai/llamacpp.rs | 171 ++++++++++++++++++++++++++++++------------- src/ai/llm_client.rs | 52 +++++++++++++ src/ai/ollama.rs | 66 +++++++++++++---- 3 files changed, 223 insertions(+), 66 deletions(-) diff --git a/src/ai/llamacpp.rs b/src/ai/llamacpp.rs index d56b645..6227e2f 100644 --- a/src/ai/llamacpp.rs +++ b/src/ai/llamacpp.rs @@ -590,10 +590,7 @@ impl LlmClient for LlamaCppClient { let mut byte_stream = byte_stream; let mut buf: Vec = Vec::new(); let mut accumulated_content = String::new(); - let mut tool_state: std::collections::BTreeMap< - usize, - (Option, Option, String), - > = std::collections::BTreeMap::new(); + let mut tool_state = ToolCallAssembly::new(); let mut role = "assistant".to_string(); let mut prompt_tokens: Option = None; let mut completion_tokens: Option = None; @@ -670,32 +667,7 @@ impl LlmClient for LlamaCppClient { yield Ok(LlmStreamEvent::TextDelta(content.to_string())); } if let Some(tcs) = delta.get("tool_calls").and_then(|v| v.as_array()) { - for tc_delta in tcs { - let idx = tc_delta - .get("index") - .and_then(|n| n.as_u64()) - .unwrap_or(0) as usize; - let entry = tool_state - .entry(idx) - .or_insert((None, None, String::new())); - if let Some(id) = - tc_delta.get("id").and_then(|v| v.as_str()) - { - entry.0 = Some(id.to_string()); - } - if let Some(func) = tc_delta.get("function") { - if let Some(name) = - func.get("name").and_then(|v| v.as_str()) - { - entry.1 = Some(name.to_string()); - } - if let Some(args) = - func.get("arguments").and_then(|v| v.as_str()) - { - entry.2.push_str(args); - } - } - } + apply_tool_call_deltas(&mut tool_state, tcs); } } if done_seen { @@ -707,28 +679,7 @@ impl LlmClient for LlamaCppClient { } } - let tool_calls: Option> = if tool_state.is_empty() { - None - } else { - let mut v = Vec::with_capacity(tool_state.len()); - for (_idx, (id, name, args)) in tool_state { - let arguments: Value = if args.trim().is_empty() { - Value::Object(Default::default()) - } else { - serde_json::from_str(&args).unwrap_or_else(|_| { - Value::Object(Default::default()) - }) - }; - v.push(ToolCall { - id, - function: ToolCallFunction { - name: name.unwrap_or_default(), - arguments, - }, - }); - } - Some(v) - }; + let tool_calls = finalize_tool_calls(tool_state); if let Some(ref frame) = last_frame { log_timings(frame, prompt_tokens, completion_tokens); @@ -937,6 +888,58 @@ fn extract_error_detail(parsed: &Value) -> String { raw.chars().take(300).collect() } +/// Per-index assembly state for streamed OpenAI-style tool-call deltas: +/// `index → (id, name, concatenated argument fragments)`. BTreeMap so the +/// finalized calls come out in index order. +type ToolCallAssembly = std::collections::BTreeMap, Option, String)>; + +/// Fold one SSE frame's `delta.tool_calls` array into the assembly state. +/// Deltas carrying the same `index` merge into one call (llama.cpp streams a +/// call's argument JSON in fragments — they concatenate); distinct indexes +/// accumulate as separate calls. +fn apply_tool_call_deltas(state: &mut ToolCallAssembly, tcs: &[Value]) { + for tc_delta in tcs { + let idx = tc_delta.get("index").and_then(|n| n.as_u64()).unwrap_or(0) as usize; + let entry = state.entry(idx).or_insert((None, None, String::new())); + if let Some(id) = tc_delta.get("id").and_then(|v| v.as_str()) { + entry.0 = Some(id.to_string()); + } + if let Some(func) = tc_delta.get("function") { + if let Some(name) = func.get("name").and_then(|v| v.as_str()) { + entry.1 = Some(name.to_string()); + } + if let Some(args) = func.get("arguments").and_then(|v| v.as_str()) { + entry.2.push_str(args); + } + } + } +} + +/// Convert assembled tool-call state into canonical `ToolCall`s, parsing each +/// call's concatenated argument JSON (empty / malformed → `{}`). `None` when +/// no tool-call deltas arrived. +fn finalize_tool_calls(state: ToolCallAssembly) -> Option> { + if state.is_empty() { + return None; + } + let mut v = Vec::with_capacity(state.len()); + for (_idx, (id, name, args)) in state { + let arguments: Value = if args.trim().is_empty() { + Value::Object(Default::default()) + } else { + serde_json::from_str(&args).unwrap_or_else(|_| Value::Object(Default::default())) + }; + v.push(ToolCall { + id, + function: ToolCallFunction { + name: name.unwrap_or_default(), + arguments, + }, + }); + } + Some(v) +} + fn find_double_newline(buf: &[u8]) -> Option { for i in 0..buf.len().saturating_sub(1) { if buf[i] == b'\n' && buf[i + 1] == b'\n' { @@ -1302,4 +1305,68 @@ mod tests { let c = LlamaCppClient::new(None, None); assert_eq!(c.tts_model, "chatterbox"); } + + #[test] + fn stream_assembly_keeps_two_tool_calls_from_separate_chunks() { + // llama.cpp emits one delta per SSE frame; two calls with distinct + // `index` values arriving in separate frames must BOTH survive. + let mut state = ToolCallAssembly::new(); + apply_tool_call_deltas( + &mut state, + &[json!({ + "index": 0, + "id": "call_a", + "function": { "name": "get_sms_messages", "arguments": "{\"date\":\"2019-01-01\"}" } + })], + ); + apply_tool_call_deltas( + &mut state, + &[json!({ + "index": 1, + "id": "call_b", + "function": { "name": "reverse_geocode", "arguments": "{\"latitude\":1.0,\"longitude\":2.0}" } + })], + ); + + let calls = finalize_tool_calls(state).expect("two calls assembled"); + assert_eq!(calls.len(), 2); + assert_eq!(calls[0].id.as_deref(), Some("call_a")); + assert_eq!(calls[0].function.name, "get_sms_messages"); + assert_eq!(calls[0].function.arguments["date"], "2019-01-01"); + assert_eq!(calls[1].id.as_deref(), Some("call_b")); + assert_eq!(calls[1].function.name, "reverse_geocode"); + assert_eq!(calls[1].function.arguments["latitude"], 1.0); + } + + #[test] + fn stream_assembly_concatenates_argument_fragments_for_same_index() { + // A single call's argument JSON streamed across frames concatenates + // into one parseable document. + let mut state = ToolCallAssembly::new(); + apply_tool_call_deltas( + &mut state, + &[json!({ + "index": 0, + "id": "call_x", + "function": { "name": "search_messages", "arguments": "{\"query\":" } + })], + ); + apply_tool_call_deltas( + &mut state, + &[json!({ + "index": 0, + "function": { "arguments": "\"dinner\"}" } + })], + ); + + let calls = finalize_tool_calls(state).expect("one call assembled"); + assert_eq!(calls.len(), 1); + assert_eq!(calls[0].function.name, "search_messages"); + assert_eq!(calls[0].function.arguments["query"], "dinner"); + } + + #[test] + fn stream_assembly_empty_state_finalizes_to_none() { + assert!(finalize_tool_calls(ToolCallAssembly::new()).is_none()); + } } diff --git a/src/ai/llm_client.rs b/src/ai/llm_client.rs index 8d68978..a50a6d8 100644 --- a/src/ai/llm_client.rs +++ b/src/ai/llm_client.rs @@ -170,3 +170,55 @@ pub struct ModelCapabilities { pub has_vision: bool, pub has_tool_calling: bool, } + +/// Strip a leading `` reasoning block from model output. +/// +/// Thinking models sometimes emit chain-of-thought inside think tags before +/// the real answer. Everything after the first `` is the answer; +/// when no tag is present — or the text after it is empty — the trimmed +/// input is returned unchanged. Mirrors the behavior Ollama's +/// `extract_final_answer` has applied to single-shot generation; shared here +/// so the tool-calling final-content paths (agentic generation + chat) can +/// apply the identical cleanup before parsing / persisting. +pub fn strip_think_blocks(response: &str) -> String { + let response = response.trim(); + + if let Some(pos) = response.find("") { + let answer = response[pos + "".len()..].trim(); + if !answer.is_empty() { + return answer.to_string(); + } + } + + response.to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn strip_think_blocks_removes_leading_think_block() { + let raw = "\nLet me reason about this.\n\n\nTitle: A Day Out\n\nThe body."; + assert_eq!(strip_think_blocks(raw), "Title: A Day Out\n\nThe body."); + } + + #[test] + fn strip_think_blocks_passes_through_plain_content() { + assert_eq!(strip_think_blocks(" just an answer "), "just an answer"); + } + + #[test] + fn strip_think_blocks_keeps_content_when_answer_after_tag_is_empty() { + // A think block with nothing after it: better to return the trimmed + // original than an empty string (matches Ollama's fallback). + let raw = "only thoughts"; + assert_eq!(strip_think_blocks(raw), raw); + } + + #[test] + fn strip_think_blocks_handles_unclosed_tag() { + let raw = "thinking forever"; + assert_eq!(strip_think_blocks(raw), raw); + } +} diff --git a/src/ai/ollama.rs b/src/ai/ollama.rs index 680668f..75c8a02 100644 --- a/src/ai/ollama.rs +++ b/src/ai/ollama.rs @@ -360,18 +360,7 @@ impl OllamaClient { /// Extract final answer from thinking model output /// Handles ... tags and takes everything after fn extract_final_answer(&self, response: &str) -> String { - let response = response.trim(); - - // Look for tag and take everything after it - if let Some(pos) = response.find("") { - let answer = response[pos + 8..].trim(); - if !answer.is_empty() { - return answer.to_string(); - } - } - - // Fallback: return the whole response trimmed - response.to_string() + crate::ai::llm_client::strip_think_blocks(response) } async fn try_generate( @@ -846,11 +835,14 @@ Analyze the image and use specific details from both the visual content and the if !chunk.message.role.is_empty() { role = chunk.message.role; } - // Ollama only attaches tool_calls on the final chunk. + // Ollama ≥0.8 can stream tool_calls incrementally + // across chunks (older servers attach them all to + // one chunk) — append rather than overwrite so + // calls from earlier chunks survive. if let Some(tcs) = chunk.message.tool_calls && !tcs.is_empty() { - tool_calls = Some(tcs); + append_streamed_tool_calls(&mut tool_calls, tcs); } if chunk.done { prompt_eval_count = chunk.prompt_eval_count; @@ -1329,8 +1321,20 @@ struct OllamaEmbedResponse { embeddings: Vec>, } +/// Accumulate tool calls streamed across NDJSON chunks. Ollama ≥0.8 may +/// emit each tool call on its own chunk; replacing the accumulator on every +/// chunk would keep only the last call, so extend instead. +fn append_streamed_tool_calls( + acc: &mut Option>, + new: Vec, +) { + acc.get_or_insert_with(Vec::new).extend(new); +} + #[cfg(test)] mod tests { + use super::append_streamed_tool_calls; + use crate::ai::llm_client::{ToolCall, ToolCallFunction}; #[test] fn generate_photo_description_prompt_is_concise() { @@ -1341,4 +1345,38 @@ mod tests { Focus on the people, location, and activity."; assert!(prompt.len() < 200, "Prompt should be concise"); } + + fn call(name: &str) -> ToolCall { + ToolCall { + id: None, + function: ToolCallFunction { + name: name.to_string(), + arguments: serde_json::json!({}), + }, + } + } + + #[test] + fn streamed_tool_calls_across_chunks_accumulate() { + // Two tool calls arriving in two separate stream chunks must BOTH + // survive assembly — the old `tool_calls = Some(tcs)` kept only the + // last chunk's calls. + let mut acc: Option> = None; + append_streamed_tool_calls(&mut acc, vec![call("get_sms_messages")]); + append_streamed_tool_calls(&mut acc, vec![call("reverse_geocode")]); + + let calls = acc.expect("tool calls accumulated"); + assert_eq!(calls.len(), 2); + assert_eq!(calls[0].function.name, "get_sms_messages"); + assert_eq!(calls[1].function.name, "reverse_geocode"); + } + + #[test] + fn streamed_tool_calls_single_chunk_batch_kept_intact() { + // Older Ollama servers attach all calls to one chunk — unchanged. + let mut acc: Option> = None; + append_streamed_tool_calls(&mut acc, vec![call("a"), call("b")]); + let calls = acc.expect("tool calls accumulated"); + assert_eq!(calls.len(), 2); + } } From 091982bdfc70c56aa70fd02194d0be6b3bde7f71 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Tue, 9 Jun 2026 18:29:20 -0400 Subject: [PATCH 3/6] Add recall_facts_for_entity tool; fix generation gates and tool output Agentic-loop fixes in the generator: - New recall_facts_for_entity tool (always-on, like recall_entities): fetches facts for one entity by id so the model can follow up on entities surfaced by recall_entities that aren't photo-linked (recall_facts_for_photo only covers linked entities). Mirrors that tool's persona scoping (PersonaFilter::Single) and the persona's reviewed_only_facts filter exactly, and renders in the same "Entity: ... / - predicate object" style. Wired through execute_tool and the trajectory summarizer. - Generation now resolves gates persona-aware: current_gate_opts_for_persona(images_inline, Some((user_id, persona_id))) instead of the None-defaulting wrapper, so a persona's allow_agent_corrections opens propose_correction during generation the same way chat turns already did. The now-unused current_gate_opts wrapper is removed. - Strip leaked blocks from the final assistant content before parse_title_body / store_insight (raw training transcript keeps them). - Honest truncation labels: get_sms_messages and get_location_history said "Found N ..." while listing only the first K; found_header now emits "Found N ... (showing first K):" when truncated, and the summarizer still parses the count. - Clamp days_radius in get_calendar_events and get_location_history to 1..=30, matching get_sms_messages. - persona_system_prompt helper (persona store lookup, blank-prompt -> None) for server-side persona resolution; callers land in the next commit. Co-Authored-By: Claude Fable 5 --- src/ai/insight_generator.rs | 237 ++++++++++++++++++++++++++++++++---- 1 file changed, 211 insertions(+), 26 deletions(-) diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index 4f15ef4..a6a50f1 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -28,6 +28,11 @@ use crate::otel::global_tracer; use crate::tags::TagDao; use crate::utils::{earliest_fs_time, normalize_path}; +/// Max location records rendered by `tool_get_location_history`. The DAO +/// query is range-bounded, not limited, so the tool caps the rendered list +/// and labels the truncation via `found_header`. +const LOCATION_HISTORY_DISPLAY_LIMIT: usize = 20; + /// Parse a "Title: ...\n\n" response into (title, body). /// Falls back to the first sentence as the title if the model didn't /// follow the format. @@ -218,15 +223,11 @@ impl InsightGenerator { /// be called once per chat turn / generation. `has_vision` is /// supplied by the caller because it depends on the model selected /// for this turn, not on persistent state. - pub fn current_gate_opts(&self, has_vision: bool) -> ToolGateOpts { - self.current_gate_opts_for_persona(has_vision, None) - } - - /// Same as `current_gate_opts` but resolves the per-persona - /// `allow_agent_corrections` flag too. Pass `Some((user_id, - /// persona_id))` when generating in a persona context (every chat - /// turn does); pass `None` for callers that don't have one yet - /// (cold paths, populate_knowledge bin), which defaults the gate + /// + /// Also resolves the per-persona `allow_agent_corrections` flag. + /// Pass `Some((user_id, persona_id))` when generating in a persona + /// context (every chat turn and agentic generation does); pass + /// `None` for callers that don't have one, which defaults the gate /// to closed — the conservative posture. pub fn current_gate_opts_for_persona( &self, @@ -277,6 +278,22 @@ impl InsightGenerator { } } + /// Resolve the stored system prompt for `(user_id, persona_id)` from + /// the persona store. Returns `None` when the persona row doesn't + /// exist or its prompt is blank — callers fall back to their own + /// default. Used for server-side persona resolution: a request that + /// carries `persona_id` but no explicit `system_prompt` should speak + /// in the persona's stored voice, not the neutral default. + pub(crate) fn persona_system_prompt(&self, user_id: i32, persona_id: &str) -> Option { + let cx = opentelemetry::Context::new(); + let mut pdao = self.persona_dao.lock().expect("Unable to lock PersonaDao"); + pdao.get_persona(&cx, user_id, persona_id) + .ok() + .flatten() + .map(|p| p.system_prompt) + .filter(|s| !s.trim().is_empty()) + } + /// Resolve `rel_path` against the configured libraries, returning the /// first root under which the file exists. Insights may be generated /// for any library — the generator itself doesn't know which — so we @@ -1673,6 +1690,10 @@ Return ONLY the summary, nothing else."#, self.tool_recall_facts_for_photo(arguments, user_id, persona_id, cx) .await } + "recall_facts_for_entity" => { + self.tool_recall_facts_for_entity(arguments, user_id, persona_id, cx) + .await + } "store_entity" => self.tool_store_entity(arguments, cx).await, "store_fact" => { self.tool_store_fact( @@ -2145,8 +2166,8 @@ Return ONLY the summary, nothing else."#, }) .collect(); format!( - "Found {} messages:\n{}", - messages.len(), + "{}\n{}", + Self::found_header(messages.len(), formatted.len(), "messages"), formatted.join("\n") ) } @@ -2171,7 +2192,8 @@ Return ONLY the summary, nothing else."#, let days_radius = args .get("days_radius") .and_then(|v| v.as_i64()) - .unwrap_or(7); + .unwrap_or(7) + .clamp(1, 30); let limit = args .get("limit") .and_then(|v| v.as_i64()) @@ -2250,7 +2272,8 @@ Return ONLY the summary, nothing else."#, let days_radius = args .get("days_radius") .and_then(|v| v.as_i64()) - .unwrap_or(14); + .unwrap_or(14) + .clamp(1, 30); let date = match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") { Ok(d) => d, @@ -2279,7 +2302,7 @@ Return ONLY the summary, nothing else."#, Some(locs) if !locs.is_empty() => { let formatted: Vec = locs .iter() - .take(20) + .take(LOCATION_HISTORY_DISPLAY_LIMIT) .map(|loc| { let dt = DateTime::from_timestamp(loc.timestamp, 0) .map(|dt| { @@ -2305,8 +2328,8 @@ Return ONLY the summary, nothing else."#, }) .collect(); format!( - "Found {} location records:\n{}", - locs.len(), + "{}\n{}", + Self::found_header(locs.len(), formatted.len(), "location records"), formatted.join("\n") ) } @@ -2315,6 +2338,20 @@ Return ONLY the summary, nothing else."#, } } + /// Render a `Found N :` tool-result header, annotating when only + /// the first `shown` of `total` items are listed below it. Without the + /// annotation the model believes it saw everything ("Found 312 + /// messages:" followed by 60 lines) and reasons from a silently + /// truncated list. `summarize_tool_result` keeps parsing the leading + /// count either way. + fn found_header(total: usize, shown: usize, noun: &str) -> String { + if shown < total { + format!("Found {} {} (showing first {}):", total, noun, shown) + } else { + format!("Found {} {}:", total, noun) + } + } + /// Tool: get_file_tags — fetch tags for a file path async fn tool_get_file_tags( &self, @@ -2669,6 +2706,102 @@ Return ONLY the summary, nothing else."#, } } + /// Tool: recall_facts_for_entity — retrieve facts for one entity by id. + /// Persona scoping and the reviewed-only-facts filter mirror + /// `tool_recall_facts_for_photo` exactly: reads are always Single + /// (user_id, persona_id), and strict-mode personas see only + /// human-reviewed facts. + async fn tool_recall_facts_for_entity( + &self, + args: &serde_json::Value, + user_id: i32, + persona_id: &str, + cx: &opentelemetry::Context, + ) -> String { + use crate::database::PersonaFilter; + let persona_filter = PersonaFilter::Single { + user_id, + persona_id: persona_id.to_string(), + }; + let entity_id = match args.get("entity_id").and_then(|v| v.as_i64()) { + Some(id) => id as i32, + None => return "Error: missing required parameter 'entity_id'".to_string(), + }; + let limit = args + .get("limit") + .and_then(|v| v.as_i64()) + .unwrap_or(50) + .clamp(1, 100) as usize; + + log::info!( + "tool_recall_facts_for_entity: entity_id={}, limit={}", + entity_id, + limit + ); + + // Resolve the persona's reviewed-only-mode flag once — identical + // fallback semantics to recall_facts_for_photo (missing persona + // row → permissive active+reviewed default). + let reviewed_only = { + let mut pdao = self.persona_dao.lock().expect("Unable to lock PersonaDao"); + pdao.get_persona(cx, user_id, persona_id) + .ok() + .flatten() + .map(|p| p.reviewed_only_facts) + .unwrap_or(false) + }; + + let mut kdao = self + .knowledge_dao + .lock() + .expect("Unable to lock KnowledgeDao"); + + let entity = match kdao.get_entity_by_id(cx, entity_id) { + Ok(Some(e)) => e, + Ok(None) => return format!("No entity found with ID {}.", entity_id), + Err(e) => return format!("Error fetching entity: {:?}", e), + }; + + let mut output_lines = vec![format!("Entity: {} ({})", entity.name, entity.entity_type)]; + match kdao.get_facts_for_entity(cx, entity_id, &persona_filter) { + Ok(facts) => { + // Default scope: active + reviewed. Strict mode trims to + // reviewed only — same allow rule as recall_facts_for_photo. + let allow = |s: &str| -> bool { + if reviewed_only { + s == "reviewed" + } else { + s == "active" || s == "reviewed" + } + }; + for f in facts.iter().filter(|f| allow(&f.status)).take(limit) { + let obj = if let Some(ref v) = f.object_value { + v.clone() + } else if let Some(oid) = f.object_entity_id { + kdao.get_entity_by_id(cx, oid) + .ok() + .flatten() + .map(|e| format!("{} (entity ID: {})", e.name, e.id)) + .unwrap_or_else(|| format!("entity:{}", oid)) + } else { + "(unknown)".to_string() + }; + output_lines.push(format!(" - {} {}", f.predicate, obj)); + } + } + Err(e) => return format!("Error fetching facts: {:?}", e), + } + + if output_lines.len() == 1 { + format!( + "No active knowledge facts found for entity {} (ID: {}).", + entity.name, entity_id + ) + } else { + format!("Knowledge for this entity:\n{}", output_lines.join("\n")) + } + } + /// Tool: store_entity — upsert an entity into the knowledge memory. /// Embeddings go through the configured local backend (`LLM_BACKEND`), /// independent of the per-request chat backend in the caller. @@ -3063,7 +3196,7 @@ Return ONLY the summary, nothing else."#, /// Build the list of tool definitions for the agentic loop, gated by /// `opts`. Always-on tools: `search_messages`, `get_sms_messages`, /// `get_file_tags`, `reverse_geocode`, `get_current_datetime`, the - /// four knowledge-memory tools. Conditional: `describe_photo` (vision + /// five knowledge-memory tools. Conditional: `describe_photo` (vision /// model), `get_personal_place_at` (Apollo configured), `search_rag` /// (daily_summaries populated), `get_calendar_events` (calendar /// populated), `get_location_history` (location history populated). @@ -3280,6 +3413,22 @@ Return ONLY the summary, nothing else."#, }), )); + tools.push(Tool::function( + "recall_facts_for_entity", + "Retrieve all stored facts about one specific entity by its ID. Use to follow up on an entity \ + surfaced by `recall_entities` (or referenced by another fact's object) that is NOT linked to the \ + current photo — `recall_facts_for_photo` only covers photo-linked entities. \ + Example: `{entity_id: 7}` — everything known about entity 7.", + serde_json::json!({ + "type": "object", + "required": ["entity_id"], + "properties": { + "entity_id": { "type": "integer", "description": "ID of the entity to fetch facts for (from recall_entities, store_entity, or a fact's object reference)." }, + "limit": { "type": "integer", "description": "Max facts to return (default 50, max 100)." } + } + }), + )); + tools.push(Tool::function( "store_entity", "Upsert a person / place / event / thing into the knowledge memory. Returns the entity id (use it as \ @@ -3543,7 +3692,7 @@ Return ONLY the summary, nothing else."#, format!("{} personal place(s)", n) } } - "recall_entities" | "recall_facts_for_photo" => { + "recall_entities" | "recall_facts_for_photo" | "recall_facts_for_entity" => { let n = raw.lines().skip(1).filter(|l| !l.trim().is_empty()).count(); let kind = if tool_name == "recall_entities" { "entities" @@ -4033,8 +4182,11 @@ Return ONLY the summary, nothing else."#, // 10. Define tools. describe_photo offered only when the chat model // sees images directly (images_inline); in hybrid mode the visual - // description is already inlined as text. - let gate_opts = self.current_gate_opts(backend.images_inline); + // description is already inlined as text. Persona-aware so the + // persona's allow_agent_corrections gate opens here exactly like + // it does for chat turns (insight_chat does the same). + let gate_opts = + self.current_gate_opts_for_persona(backend.images_inline, Some((user_id, &persona_id))); let tools = Self::build_tool_definitions(gate_opts); // 11. Build initial messages. images_inline → attach base64 to the @@ -4145,7 +4297,10 @@ Return ONLY the summary, nothing else."#, .set_attribute(KeyValue::new("iterations_used", iterations_used as i64)); loop_cx.span().set_status(Status::Ok); - // 13. Parse title from the model's inline response. + // 13. Strip any leaked reasoning block (thinking + // models emit it ahead of the answer; the raw transcript in + // training_messages keeps it), then parse the title. + final_content = crate::ai::llm_client::strip_think_blocks(&final_content); let (title, body) = parse_title_body(&final_content); final_content = body; @@ -4341,6 +4496,7 @@ mod tests { assert!(names.contains(&"get_current_datetime")); assert!(names.contains(&"recall_entities")); assert!(names.contains(&"recall_facts_for_photo")); + assert!(names.contains(&"recall_facts_for_entity")); assert!(names.contains(&"store_entity")); assert!(names.contains(&"store_fact")); @@ -4550,6 +4706,37 @@ mod tests { ); } + #[test] + fn found_header_labels_truncation_honestly() { + // Truncated: total exceeds what's listed below the header. + assert_eq!( + InsightGenerator::found_header(312, 60, "messages"), + "Found 312 messages (showing first 60):" + ); + // Not truncated: plain header, no annotation noise. + assert_eq!( + InsightGenerator::found_header(7, 7, "messages"), + "Found 7 messages:" + ); + assert_eq!( + InsightGenerator::found_header(40, 20, "location records"), + "Found 40 location records (showing first 20):" + ); + } + + #[test] + fn summarize_parses_truncated_found_header() { + // The "(showing first K)" annotation must not break the few-shot + // trajectory summarizer's "Found N" count parsing. + assert_eq!( + InsightGenerator::summarize_tool_result( + "get_sms_messages", + "Found 312 messages (showing first 60):\n[2023-08-15 10:00] Sarah: hi" + ), + "312 messages" + ); + } + fn make_search_hit( id: i64, contact: &str, @@ -4869,11 +5056,9 @@ mod tests { // Replicate the resolve_backend local-client construction // (lines ~3686-3695 of this file). let mut lc = base.clone(); - if let Some(ref m) = overrides_model { - if !is_hybrid { - lc.primary_model = m.clone(); - lc.set_vision_model(m.clone()); - } + if !is_hybrid && let Some(ref m) = overrides_model { + lc.primary_model = m.clone(); + lc.set_vision_model(m.clone()); } // In hybrid mode the local client must keep its configured slots. From b711252c23e625a863d4c81edcfc43707ab1f874 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Tue, 9 Jun 2026 18:29:35 -0400 Subject: [PATCH 4/6] Resolve persona prompts server-side; drop synthetic prompt in chat_turn MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A request carrying persona_id but no system_prompt used to fall back to the neutral default voice. Both agentic generation (generate_agentic_insight_handler) and chat bootstrap now resolve the persona's stored prompt from the persona store, with precedence: explicit non-blank client system_prompt > persona store lookup > existing default ("default" persona id behaves the same — used if the store has a row, neutral default otherwise). Resolution happens at the handler / bootstrap entry where the DAO is reachable; internals are unchanged. resolve_bootstrap_system_prompt takes the resolved persona prompt as a second argument, with precedence tests. Also in insight_chat: - Sync chat_turn no longer persists the synthetic "Please write your final answer now without calling any more tools." user message pushed on iteration exhaustion — extracted both streaming variants' synthetic_idx pattern into push/remove_synthetic_final_prompt (the remove is a defensive no-op on index drift) and applied it to all three loops; round-trip test included. - Strip leaked blocks from the final content persisted as the reply in chat_turn and both streaming AgenticLoopOutcomes (mid-stream TextDeltas are untouched; the raw transcript keeps the block). Co-Authored-By: Claude Fable 5 --- src/ai/handlers.rs | 17 ++++- src/ai/insight_chat.rs | 169 ++++++++++++++++++++++++++++++++++------- 2 files changed, 158 insertions(+), 28 deletions(-) diff --git a/src/ai/handlers.rs b/src/ai/handlers.rs index 9bcb048..5e46418 100644 --- a/src/ai/handlers.rs +++ b/src/ai/handlers.rs @@ -809,6 +809,21 @@ pub async fn generate_agentic_insight_handler( .filter(|s| !s.trim().is_empty()) .unwrap_or_else(|| "default".to_string()); + // Server-side persona resolution: an explicit client `system_prompt` + // wins; otherwise the persona's stored prompt from the persona store; + // otherwise None and `build_system_content` applies its neutral + // default. Without the lookup, a request carrying only `persona_id` + // silently generated in the default voice. + let system_prompt = request + .system_prompt + .clone() + .filter(|s| !s.trim().is_empty()) + .or_else(|| { + app_state + .insight_generator + .persona_system_prompt(user_id, &persona_id) + }); + let max_iterations: usize = std::env::var("AGENTIC_MAX_ITERATIONS") .ok() .and_then(|v| v.parse().ok()) @@ -834,7 +849,7 @@ pub async fn generate_agentic_insight_handler( generator_for_task.generate_agentic_insight_for_photo( &path_for_task, request.model.clone(), - request.system_prompt.clone(), + system_prompt, request.num_ctx, request.temperature, request.top_p, diff --git a/src/ai/insight_chat.rs b/src/ai/insight_chat.rs index 3c5eb26..7298296 100644 --- a/src/ai/insight_chat.rs +++ b/src/ai/insight_chat.rs @@ -33,6 +33,12 @@ const BYTES_PER_TOKEN: usize = 4; /// characters) must NOT be counted as text bytes — doing so dwarfs the entire /// text budget and forces spurious truncation on every turn. const IMAGE_TOKENS_EACH: usize = 1300; +/// User prompt injected when the agentic loop exhausts its iteration budget +/// without producing a tool-free reply. Internal scaffolding only — it is +/// stripped from the transcript before persistence (see +/// [`push_synthetic_final_prompt`] / [`remove_synthetic_final_prompt`]). +const SYNTHETIC_FINAL_ANSWER_PROMPT: &str = + "Please write your final answer now without calling any more tools."; pub type ChatLockMap = Arc>>>>; @@ -457,9 +463,7 @@ impl InsightChatService { "Chat loop exhausted after {} iterations, requesting final answer", iterations_used ); - messages.push(ChatMessage::user( - "Please write your final answer now without calling any more tools.", - )); + let synthetic_idx = push_synthetic_final_prompt(&mut messages); let (final_response, prompt_tokens, eval_tokens) = backend .chat() .chat_with_tools(messages.clone(), vec![]) @@ -468,8 +472,15 @@ impl InsightChatService { last_eval_count = eval_tokens; final_content = final_response.content.clone(); messages.push(final_response); + // Drop the synthetic prompt before persistence — internal + // scaffolding only (mirrors both streaming variants). + remove_synthetic_final_prompt(&mut messages, synthetic_idx); } + // Strip any leaked reasoning block from the content we + // return / persist as the reply (the raw transcript keeps it). + let final_content = crate::ai::llm_client::strip_think_blocks(&final_content); + loop_cx.span().set_status(Status::Ok); // Drop the per-turn iteration-budget note from the system message @@ -1039,7 +1050,12 @@ impl InsightChatService { ); let tools = InsightGenerator::build_tool_definitions(gate_opts); - let persona = resolve_bootstrap_system_prompt(req.system_prompt.as_deref()); + // Server-side persona resolution: explicit client system_prompt wins; + // else the active persona's stored prompt; else the neutral default. + let persona_prompt = self + .generator + .persona_system_prompt(req.user_id, &active_persona); + let persona = resolve_bootstrap_system_prompt(req.system_prompt.as_deref(), persona_prompt); let system_content = build_bootstrap_system_message( &persona, &normalized, @@ -1263,10 +1279,7 @@ impl InsightChatService { // No-tools fallback if final_content.is_empty() { - let synthetic_idx = messages.len(); - messages.push(ChatMessage::user( - "Please write your final answer now without calling any more tools.", - )); + let synthetic_idx = push_synthetic_final_prompt(messages); let mut stream = backend .chat() .chat_with_tools_stream(messages.clone(), vec![]) @@ -1294,7 +1307,7 @@ impl InsightChatService { final_message.ok_or_else(|| anyhow!("final stream ended without a Done event"))?; final_content = final_response.content.clone(); messages.push(final_response); - messages.remove(synthetic_idx); + remove_synthetic_final_prompt(messages, synthetic_idx); } Ok(AgenticLoopOutcome { @@ -1302,7 +1315,9 @@ impl InsightChatService { iterations_used, last_prompt_eval_count, last_eval_count, - final_content, + // Strip any leaked reasoning block from the content the + // caller persists as title/summary (the raw transcript keeps it). + final_content: crate::ai::llm_client::strip_think_blocks(&final_content), cancelled: false, }) } @@ -1648,7 +1663,12 @@ impl InsightChatService { // get_sms_messages / reverse_geocode / get_personal_place_at // the args they need. In hybrid mode the visual description // belongs here for the same reason. - let persona = resolve_bootstrap_system_prompt(req.system_prompt.as_deref()); + // Server-side persona resolution: explicit client system_prompt wins; + // else the active persona's stored prompt; else the neutral default. + let persona_prompt = self + .generator + .persona_system_prompt(req.user_id, &active_persona); + let persona = resolve_bootstrap_system_prompt(req.system_prompt.as_deref(), persona_prompt); let system_content = build_bootstrap_system_message( &persona, &normalized, @@ -1866,10 +1886,7 @@ impl InsightChatService { // and load_history's user-turn handler doesn't reset // pending_tools at this position (wiping the prior tool // calls from the final assistant render). - let synthetic_idx = messages.len(); - messages.push(ChatMessage::user( - "Please write your final answer now without calling any more tools.", - )); + let synthetic_idx = push_synthetic_final_prompt(messages); let mut stream = backend .chat() .chat_with_tools_stream(messages.clone(), vec![]) @@ -1900,7 +1917,7 @@ impl InsightChatService { // Drop the synthetic prompt — internal scaffolding only. The // model's final_response (now at the end) was generated with // it in context and reads coherently without it on replay. - messages.remove(synthetic_idx); + remove_synthetic_final_prompt(messages, synthetic_idx); } Ok(AgenticLoopOutcome { @@ -1908,7 +1925,9 @@ impl InsightChatService { iterations_used, last_prompt_eval_count, last_eval_count, - final_content, + // Strip any leaked reasoning block from the content the + // caller persists as title/summary (the raw transcript keeps it). + final_content: crate::ai::llm_client::strip_think_blocks(&final_content), cancelled: false, }) } @@ -1921,15 +1940,21 @@ const BOOTSTRAP_DEFAULT_SYSTEM_PROMPT: &str = "You are a helpful AI assistant an Use the available tools to gather context and answer their questions \ in a conversational tone."; -/// Pick the system prompt for bootstrap. Trimmed-non-empty supplied wins; -/// otherwise fall back to [`BOOTSTRAP_DEFAULT_SYSTEM_PROMPT`]. Returns an -/// owned `String` because the bootstrap caller persists it on the new -/// insight row. -fn resolve_bootstrap_system_prompt(supplied: Option<&str>) -> String { +/// Pick the system prompt for bootstrap. Precedence: trimmed-non-empty +/// `supplied` (the client's explicit `system_prompt`) wins; else +/// `persona_prompt` (the active persona's stored prompt, resolved +/// server-side from the persona store); else +/// [`BOOTSTRAP_DEFAULT_SYSTEM_PROMPT`]. Returns an owned `String` because +/// the bootstrap caller persists it on the new insight row. +fn resolve_bootstrap_system_prompt( + supplied: Option<&str>, + persona_prompt: Option, +) -> String { supplied .map(str::trim) .filter(|s| !s.is_empty()) .map(str::to_string) + .or_else(|| persona_prompt.filter(|s| !s.trim().is_empty())) .unwrap_or_else(|| BOOTSTRAP_DEFAULT_SYSTEM_PROMPT.to_string()) } @@ -2200,6 +2225,30 @@ fn restore_system_content(messages: &mut [ChatMessage], original: Option } } +/// Append the synthetic "write your final answer" user prompt, returning the +/// index the caller must later hand to [`remove_synthetic_final_prompt`]. +/// Used when the agentic loop exhausts its budget: the model gets one more +/// (tool-free) request, but the nudge itself must never persist — it would +/// render as a user bubble in the transcript and reset `load_history`'s +/// pending-tools tracking at that position. +fn push_synthetic_final_prompt(messages: &mut Vec) -> usize { + let idx = messages.len(); + messages.push(ChatMessage::user(SYNTHETIC_FINAL_ANSWER_PROMPT)); + idx +} + +/// Remove the synthetic prompt inserted by [`push_synthetic_final_prompt`]. +/// Defensive no-op when the message at `idx` isn't the synthetic prompt — +/// guards against index drift if the surrounding code is reordered. +fn remove_synthetic_final_prompt(messages: &mut Vec, idx: usize) { + if messages + .get(idx) + .is_some_and(|m| m.role == "user" && m.content == SYNTHETIC_FINAL_ANSWER_PROMPT) + { + messages.remove(idx); + } +} + /// Receipt produced by [`apply_system_prompt_override`] so the caller can /// undo the override before persistence. Two variants because we either /// replaced an existing system message (need its original content) or @@ -2643,26 +2692,26 @@ mod tests { #[test] fn bootstrap_system_prompt_falls_back_to_default_for_none() { - let out = resolve_bootstrap_system_prompt(None); + let out = resolve_bootstrap_system_prompt(None, None); assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT); } #[test] fn bootstrap_system_prompt_falls_back_to_default_for_empty_string() { // Apollo currently sends `''` when no persona is selected. - let out = resolve_bootstrap_system_prompt(Some("")); + let out = resolve_bootstrap_system_prompt(Some(""), None); assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT); } #[test] fn bootstrap_system_prompt_falls_back_to_default_for_whitespace() { - let out = resolve_bootstrap_system_prompt(Some(" \n\t ")); + let out = resolve_bootstrap_system_prompt(Some(" \n\t "), None); assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT); } #[test] fn bootstrap_system_prompt_uses_supplied_when_non_empty() { - let out = resolve_bootstrap_system_prompt(Some("you are a journal")); + let out = resolve_bootstrap_system_prompt(Some("you are a journal"), None); assert_eq!(out, "you are a journal"); } @@ -2671,10 +2720,76 @@ mod tests { // Trim only happens at the edges — interior newlines and spacing // (which Apollo's persona uses for tool listings) must survive. let prompt = "line one\nline two\n bullet"; - let out = resolve_bootstrap_system_prompt(Some(prompt)); + let out = resolve_bootstrap_system_prompt(Some(prompt), None); assert_eq!(out, prompt); } + #[test] + fn bootstrap_system_prompt_explicit_wins_over_persona_store() { + let out = resolve_bootstrap_system_prompt( + Some("explicit prompt"), + Some("stored persona prompt".to_string()), + ); + assert_eq!(out, "explicit prompt"); + } + + #[test] + fn bootstrap_system_prompt_uses_persona_store_when_no_explicit() { + // Request carried persona_id but no system_prompt — the persona's + // stored prompt must be used, not the neutral default. + let out = resolve_bootstrap_system_prompt(None, Some("stored persona prompt".to_string())); + assert_eq!(out, "stored persona prompt"); + + // Empty explicit prompt behaves like None. + let out = + resolve_bootstrap_system_prompt(Some(""), Some("stored persona prompt".to_string())); + assert_eq!(out, "stored persona prompt"); + } + + #[test] + fn bootstrap_system_prompt_blank_persona_prompt_falls_to_default() { + let out = resolve_bootstrap_system_prompt(None, Some(" ".to_string())); + assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT); + } + + // ── Synthetic final-answer prompt scaffolding ────────────────────── + + #[test] + fn synthetic_final_prompt_round_trip_leaves_no_scaffolding() { + // Exhausted-loop fallback: nudge pushed, model reply appended, nudge + // removed — the persisted transcript must contain the reply but not + // the synthetic user prompt (all three loop variants rely on this). + let mut msgs = vec![ + ChatMessage::system("sys"), + ChatMessage::user("q"), + assistant_with_tool_call("lookup"), + ChatMessage::tool_result("data"), + ]; + let idx = push_synthetic_final_prompt(&mut msgs); + assert_eq!(msgs[idx].content, SYNTHETIC_FINAL_ANSWER_PROMPT); + + msgs.push(assistant_text("final answer")); + remove_synthetic_final_prompt(&mut msgs, idx); + + assert_eq!(msgs.len(), 5); + assert!( + msgs.iter() + .all(|m| m.content != SYNTHETIC_FINAL_ANSWER_PROMPT), + "synthetic prompt must not persist" + ); + assert_eq!(msgs.last().unwrap().content, "final answer"); + } + + #[test] + fn remove_synthetic_final_prompt_is_noop_on_index_mismatch() { + // Defensive guard: if the message at idx isn't the synthetic prompt + // (index drift), nothing is removed. + let mut msgs = vec![ChatMessage::user("q"), assistant_text("a")]; + remove_synthetic_final_prompt(&mut msgs, 0); + remove_synthetic_final_prompt(&mut msgs, 5); + assert_eq!(msgs.len(), 2); + } + #[test] fn bootstrap_backend_defaults_to_local_when_none() { let out = resolve_bootstrap_backend(None).unwrap(); From 13f3635db2da1581b5063e2ce04cb48fdf4b7ea6 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Tue, 9 Jun 2026 18:29:44 -0400 Subject: [PATCH 5/6] Fix clippy lints in backfill and libraries tests Keep `cargo clippy --tests` clean alongside the agentic-loop changes: alias backfill's five-element setup() tuple as SetupFixture (type_complexity) and build the single-library health map via std::slice::from_ref instead of cloning (unnecessary clone-to-slice). No behavior change. Co-Authored-By: Claude Fable 5 --- src/backfill.rs | 15 ++++++++++----- src/libraries.rs | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/backfill.rs b/src/backfill.rs index 34baa41..ec25fbb 100644 --- a/src/backfill.rs +++ b/src/backfill.rs @@ -529,16 +529,21 @@ mod tests { opentelemetry::Context::new() } - /// Build a tempdir-backed library + DAOs sharing a single in-memory - /// SQLite connection (so cross-table joins like - /// `list_unscanned_candidates` see consistent state). - fn setup() -> ( + /// Everything `setup` hands back to a test: tempdir, library, shared + /// connection, and the two DAOs. Aliased to keep clippy's + /// type-complexity lint satisfied. + type SetupFixture = ( TempDir, Library, Arc>, Arc>>, Arc>>, - ) { + ); + + /// Build a tempdir-backed library + DAOs sharing a single in-memory + /// SQLite connection (so cross-table joins like + /// `list_unscanned_candidates` see consistent state). + fn setup() -> SetupFixture { let tmp = TempDir::new().expect("tempdir"); let mut conn = in_memory_db_connection(); // Migration seeds library id=1 with a placeholder root; rewrite it diff --git a/src/libraries.rs b/src/libraries.rs index 6248cfa..55bf5c1 100644 --- a/src/libraries.rs +++ b/src/libraries.rs @@ -1052,7 +1052,7 @@ mod tests { enabled: true, excluded_dirs: Vec::new(), }; - let map = new_health_map(&[lib.clone()]); + let map = new_health_map(std::slice::from_ref(&lib)); // First probe: empty dir, no prior data — Online. let s1 = refresh_health(&map, &lib, false); From 31904fef803e74e81eea94387aa693317d3fc568 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Tue, 9 Jun 2026 19:14:02 -0400 Subject: [PATCH 6/6] Raise chat truncation default num_ctx to 32k, env-overridable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The history-truncation budget assumed an 8192-token context whenever a chat request omitted num_ctx, while the llama-swap chat slots serve 20k-131k. Replayed transcripts past ~6k tokens were silently gutted every turn — losing conversation history and destroying llama.cpp KV-cache prefix reuse (full SWA re-prefill per turn). Default is now 32768 (real conversations top out around 16k), with AGENTIC_CHAT_DEFAULT_NUM_CTX to override per deploy, floored at headroom + 1024. Explicit per-request num_ctx still wins. Co-Authored-By: Claude Fable 5 --- CLAUDE.md | 12 ++++++++++-- src/ai/insight_chat.rs | 29 +++++++++++++++++++++++------ 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index fba33e0..b63ed4c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -671,6 +671,11 @@ LLAMA_SWAP_TTS_REQUEST_TIMEOUT_SECONDS=600 # Per-request synth timeout (long # Insight Chat Continuation AGENTIC_CHAT_MAX_ITERATIONS=6 # Cap on tool-calling iterations per chat turn (default 6) +AGENTIC_CHAT_DEFAULT_NUM_CTX=32768 # Assumed context window for the history-truncation budget + # when a chat request omits num_ctx (default 32768). Size to + # the smallest context among the chat models actually served; + # too small silently guts replayed history every turn (and + # destroys llama.cpp KV-cache prefix reuse). ``` **AI Insights Fallback Behavior:** @@ -794,14 +799,17 @@ Per-`(library_id, file_path)` async mutex (`AppState.insight_chat.chat_locks`) serialises concurrent turns on the same insight so the JSON blob doesn't race. Context management is a soft bound: if the serialized history exceeds -`num_ctx - 2048` tokens (cheap 4-byte/token heuristic), the oldest -assistant-tool_call + tool_result pairs are dropped until under budget. The +`num_ctx - 2048` tokens (cheap 4-byte/token heuristic; `num_ctx` defaults +to `AGENTIC_CHAT_DEFAULT_NUM_CTX`, 32768, when the request omits it), the +oldest assistant-tool_call + tool_result pairs are dropped until under budget. The initial user message (with any images) and system prompt are always preserved. The `truncated` event / flag is surfaced to the client when a drop occurred. Configurable env: - `AGENTIC_CHAT_MAX_ITERATIONS` — cap on tool-calling iterations per turn (default 6). Per-request `max_iterations` is clamped to this cap. +- `AGENTIC_CHAT_DEFAULT_NUM_CTX` — assumed context window for the truncation + budget when the request omits `num_ctx` (default 32768). **Apollo Places integration (optional):** diff --git a/src/ai/insight_chat.rs b/src/ai/insight_chat.rs index 7298296..84f2b32 100644 --- a/src/ai/insight_chat.rs +++ b/src/ai/insight_chat.rs @@ -19,7 +19,13 @@ use futures::stream::{BoxStream, StreamExt}; use uuid::Uuid; const DEFAULT_MAX_ITERATIONS: usize = 6; -const DEFAULT_NUM_CTX: i32 = 8192; +/// Assumed context window when the request doesn't specify `num_ctx`. +/// The llama-swap chat slots serve 20k-131k contexts and real conversations +/// rarely pass ~16k tokens, so 32k keeps the truncation pass from gutting +/// history that the server could comfortably hold (which also destroys the +/// server's KV-cache prefix reuse). Override per-deploy with +/// AGENTIC_CHAT_DEFAULT_NUM_CTX if the serving models change shape. +const DEFAULT_NUM_CTX: i32 = 32768; /// Headroom reserved for the model's response, deducted from the context /// budget when deciding whether to truncate the replayed history. const RESPONSE_HEADROOM_TOKENS: usize = 2048; @@ -367,7 +373,7 @@ impl InsightChatService { // 6. Apply truncation budget. Drops oldest tool_call+tool pairs // (preserves system + first user including any images). - let budget_tokens = (req.num_ctx.unwrap_or(DEFAULT_NUM_CTX) as usize) + let budget_tokens = (req.num_ctx.unwrap_or_else(env_default_num_ctx) as usize) .saturating_sub(RESPONSE_HEADROOM_TOKENS); let budget_bytes = budget_tokens.saturating_mul(BYTES_PER_TOKEN); let truncated = apply_context_budget(&mut messages, budget_bytes); @@ -864,7 +870,7 @@ impl InsightChatService { None }; - let budget_tokens = (req.num_ctx.unwrap_or(DEFAULT_NUM_CTX) as usize) + let budget_tokens = (req.num_ctx.unwrap_or_else(env_default_num_ctx) as usize) .saturating_sub(RESPONSE_HEADROOM_TOKENS); let budget_bytes = budget_tokens.saturating_mul(BYTES_PER_TOKEN); let truncated = apply_context_budget(&mut messages, budget_bytes); @@ -1446,7 +1452,7 @@ impl InsightChatService { }; // Truncate before appending the new user turn. - let budget_tokens = (req.num_ctx.unwrap_or(DEFAULT_NUM_CTX) as usize) + let budget_tokens = (req.num_ctx.unwrap_or_else(env_default_num_ctx) as usize) .saturating_sub(RESPONSE_HEADROOM_TOKENS); let budget_bytes = budget_tokens.saturating_mul(BYTES_PER_TOKEN); let truncated = apply_context_budget(&mut messages, budget_bytes); @@ -2191,6 +2197,17 @@ fn env_max_iterations() -> usize { .max(1) } +/// Read AGENTIC_CHAT_DEFAULT_NUM_CTX once per call — the assumed context +/// window for the truncation budget when the request omits `num_ctx`. Same +/// no-static-global rationale as `env_max_iterations` above. +fn env_default_num_ctx() -> i32 { + std::env::var("AGENTIC_CHAT_DEFAULT_NUM_CTX") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(DEFAULT_NUM_CTX) + .max(RESPONSE_HEADROOM_TOKENS as i32 + 1024) +} + /// Append a per-turn iteration-budget reminder to the replayed system /// message so the model knows how many tool-calling rounds this turn gets. /// Returns the original `content` so the caller can restore it before @@ -2516,8 +2533,8 @@ mod tests { assistant_text("here is the answer"), ]; - // Default budget: (8192 - 2048) * 4 bytes ≈ 24KB. The text easily fits; - // only the (excluded) image bytes could blow it. + // Default budget: (32768 - 2048) * 4 bytes ≈ 120KB. The text easily + // fits; only the (excluded) image bytes could blow it. let budget_bytes = (DEFAULT_NUM_CTX as usize - RESPONSE_HEADROOM_TOKENS) * BYTES_PER_TOKEN; let original_len = msgs.len(); let dropped = apply_context_budget(&mut msgs, budget_bytes);