Files
ImageApi/src/ai/insight_chat.rs
Cameron Cordes a0ec1a5080 insight-chat: photo context belongs in system msg, not user turn
After refresh, the rendered transcript was showing two unwanted
artifacts in the initial user bubble:

  Photo file path: pics/DSC_5171.jpg
  please tell me about this photo and what was going on around it

  Please write your final answer now without calling any more tools.

Two distinct bugs:

1. Bootstrap was prepending `Photo file path: <path>` (and, in
   hybrid mode, the visual description block) into the user-turn
   content. The model needed it to call file_path-keyed tools, but
   the user could see it in their own bubble on replay.

2. The no-tools fallback ("Please write your final answer now…")
   was a synthetic user message we never stripped from history,
   so it persisted into training_messages, rendered as a second
   user bubble, AND wiped the prior tool-call accumulator inside
   load_history (user-turn handler clears pending_tools), which
   is why the tool invocations disappeared from the assistant
   bubble after refresh.

Fixes:

- New `build_bootstrap_system_message` helper composes the persona
  with a `--- PHOTO CONTEXT ---` block (path + optional visual
  description). Lives in the system message, not the user turn.
  The user's bubble shows only what they typed.
- Streaming agentic loop's no-tools fallback now records its
  insertion index and removes the synthetic user prompt from
  `messages` after the model responds. Final assistant content
  stays — it reads coherently on replay without the synthetic
  prompt above it. Applies to both bootstrap and continuation.

3 new tests cover the system-message composer (path-only, with
visual block, persona-trim). Total insight_chat unit tests: 27.
2026-05-08 11:07:03 -04:00

1992 lines
79 KiB
Rust

use anyhow::{Result, anyhow, bail};
use chrono::Utc;
use opentelemetry::KeyValue;
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::Mutex as TokioMutex;
use crate::ai::insight_generator::InsightGenerator;
use crate::ai::llm_client::{ChatMessage, LlmClient, LlmStreamEvent, Tool};
use crate::ai::ollama::OllamaClient;
use crate::ai::openrouter::OpenRouterClient;
use crate::database::InsightDao;
use crate::database::models::InsertPhotoInsight;
use crate::otel::global_tracer;
use crate::utils::normalize_path;
use futures::stream::{BoxStream, StreamExt};
const DEFAULT_MAX_ITERATIONS: usize = 6;
const DEFAULT_NUM_CTX: i32 = 8192;
/// Headroom reserved for the model's response, deducted from the context
/// budget when deciding whether to truncate the replayed history.
const RESPONSE_HEADROOM_TOKENS: usize = 2048;
/// Cheap byte-to-token approximation used by the truncation pass. The real
/// tokenization is model-specific; this avoids carrying tiktoken just for a
/// soft bound.
const BYTES_PER_TOKEN: usize = 4;
pub type ChatLockMap = Arc<TokioMutex<HashMap<(i32, String), Arc<TokioMutex<()>>>>>;
#[derive(Debug)]
pub struct ChatTurnRequest {
pub library_id: i32,
pub file_path: String,
pub user_message: String,
/// Override the model id. Local mode: an Ollama model name. Hybrid:
/// an OpenRouter id. None defers to the stored insight's `model_version`.
pub model: Option<String>,
/// Override the backend used for this turn. None defers to the stored
/// insight's `backend`. Switching `local -> hybrid` is rejected in v1.
pub backend: Option<String>,
pub num_ctx: Option<i32>,
pub temperature: Option<f32>,
pub top_p: Option<f32>,
pub top_k: Option<i32>,
pub min_p: Option<f32>,
pub max_iterations: Option<usize>,
/// Per-turn system-prompt override. In append mode (default), applied
/// ephemerally — original system message restored before persistence.
/// In amend mode, persisted into the new insight row's system message.
/// None / empty = no change.
pub system_prompt: Option<String>,
/// When true, write a new insight row (regenerating title) instead of
/// updating training_messages on the existing row.
pub amend: bool,
/// When true, force the bootstrap path even if an insight already exists:
/// flip prior rows to `is_current=false` and create a new insight row
/// from `system_prompt` + `user_message` + photo. Takes precedence over
/// `amend`. With no existing insight, collapses to a normal bootstrap
/// (the row-flip step is a no-op).
pub regenerate: bool,
}
#[derive(Debug)]
pub struct ChatTurnResult {
pub assistant_message: String,
pub tool_calls_made: usize,
pub iterations_used: usize,
pub truncated: bool,
pub prompt_eval_count: Option<i32>,
pub eval_count: Option<i32>,
/// Set when `amend=true` and the new insight row was inserted.
pub amended_insight_id: Option<i32>,
/// Backend used for this turn — useful when the client overrode the
/// stored value.
pub backend_used: String,
/// Model identifier the chat backend ran with.
pub model_used: String,
}
#[derive(Clone)]
pub struct InsightChatService {
generator: Arc<InsightGenerator>,
ollama: OllamaClient,
openrouter: Option<Arc<OpenRouterClient>>,
insight_dao: Arc<Mutex<Box<dyn InsightDao>>>,
chat_locks: ChatLockMap,
}
impl InsightChatService {
pub fn new(
generator: Arc<InsightGenerator>,
ollama: OllamaClient,
openrouter: Option<Arc<OpenRouterClient>>,
insight_dao: Arc<Mutex<Box<dyn InsightDao>>>,
chat_locks: ChatLockMap,
) -> Self {
Self {
generator,
ollama,
openrouter,
insight_dao,
chat_locks,
}
}
/// Load the rendered transcript for chat-UI display. Filters internal
/// scaffolding (system message, tool turns, tool-dispatch-only assistant
/// messages) and drops base64 images from user turns to keep payloads
/// small. The first remaining user message is flagged `is_initial`.
pub fn load_history(&self, file_path: &str) -> Result<HistoryView> {
let normalized = normalize_path(file_path);
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
let insight = dao
.get_insight(&cx, &normalized)
.map_err(|e| anyhow!("failed to load insight: {:?}", e))?
.ok_or_else(|| anyhow!("no insight found for path"))?;
let raw = insight
.training_messages
.as_ref()
.ok_or_else(|| anyhow!("insight has no chat history (pre-agentic insight)"))?;
let messages: Vec<ChatMessage> = serde_json::from_str(raw)
.map_err(|e| anyhow!("failed to deserialize chat history: {}", e))?;
let mut rendered = Vec::new();
let mut user_turns_seen = 0usize;
let mut assistant_turns_seen = 0usize;
// Accumulate tool invocations seen since the last user turn. An
// invocation is: one assistant tool_call message (which may hold
// multiple calls) + the N following tool-role messages (one per call,
// in order). They attach to the next assistant-with-content, which
// is the "final" reply for the current turn.
//
// Wire shape from the model:
// assistant { tool_calls: [A, B], content: "" }
// tool { content: "result of A" }
// tool { content: "result of B" }
// assistant { content: "here's the answer" } ← rendered as final
let mut pending_tools: Vec<ToolInvocation> = Vec::new();
// Queue of (name, arguments) awaiting a tool_result to pair with.
let mut pending_calls: std::collections::VecDeque<(String, serde_json::Value)> =
std::collections::VecDeque::new();
for msg in &messages {
match msg.role.as_str() {
"system" => continue,
"tool" => {
if let Some((name, arguments)) = pending_calls.pop_front() {
let (result, result_truncated) = truncate_tool_result(&msg.content);
pending_tools.push(ToolInvocation {
name,
arguments,
result,
result_truncated,
});
}
// If there's no pending call, the tool message is an
// orphan (shouldn't happen in practice) — skip silently.
}
"assistant" => {
let has_tool_calls = msg
.tool_calls
.as_ref()
.map(|c| !c.is_empty())
.unwrap_or(false);
if has_tool_calls && msg.content.trim().is_empty() {
// Tool-dispatch turn: enqueue calls, wait for tool
// results on subsequent messages.
if let Some(ref tcs) = msg.tool_calls {
for tc in tcs {
pending_calls.push_back((
tc.function.name.clone(),
tc.function.arguments.clone(),
));
}
}
continue;
}
// Final assistant reply for this turn — drain accumulated
// tools into it.
assistant_turns_seen += 1;
let tools = std::mem::take(&mut pending_tools);
pending_calls.clear(); // any leftover unpaired calls are dropped
rendered.push(RenderedMessage {
role: "assistant".to_string(),
content: msg.content.clone(),
is_initial: false,
tools,
});
}
"user" => {
let is_initial = user_turns_seen == 0;
user_turns_seen += 1;
// New user turn resets any in-flight tool state.
pending_tools.clear();
pending_calls.clear();
rendered.push(RenderedMessage {
role: "user".to_string(),
content: msg.content.clone(),
is_initial,
tools: Vec::new(),
});
}
_ => continue,
}
}
Ok(HistoryView {
messages: rendered,
turn_count: assistant_turns_seen,
model_version: insight.model_version,
backend: insight.backend,
})
}
pub async fn chat_turn(&self, req: ChatTurnRequest) -> Result<ChatTurnResult> {
let tracer = global_tracer();
let parent_cx = opentelemetry::Context::new();
let mut span = tracer.start_with_context("ai.insight.chat_turn", &parent_cx);
span.set_attribute(KeyValue::new("file_path", req.file_path.clone()));
span.set_attribute(KeyValue::new("library_id", req.library_id as i64));
span.set_attribute(KeyValue::new("amend", req.amend));
if req.user_message.trim().is_empty() {
bail!("user_message must not be empty");
}
if req.user_message.len() > 8192 {
bail!("user_message exceeds 8192 chars");
}
let normalized = normalize_path(&req.file_path);
// 1. Acquire the per-(library, file) async mutex. Two concurrent
// chat turns on the same insight would race on the JSON blob —
// the lock serialises them.
let lock_key = (req.library_id, normalized.clone());
let entry_lock = {
let mut locks = self.chat_locks.lock().await;
locks
.entry(lock_key.clone())
.or_insert_with(|| Arc::new(TokioMutex::new(())))
.clone()
};
let _guard = entry_lock.lock().await;
// 2. Load the current insight + history.
let insight = {
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
dao.get_insight(&cx, &normalized)
.map_err(|e| anyhow!("failed to load insight: {:?}", e))?
.ok_or_else(|| anyhow!("no insight found for path"))?
};
let raw_history = insight
.training_messages
.as_ref()
.ok_or_else(|| {
anyhow!("insight has no chat history; regenerate this insight in agentic mode")
})?
.clone();
let mut messages: Vec<ChatMessage> = serde_json::from_str(&raw_history)
.map_err(|e| anyhow!("failed to deserialize chat history: {}", e))?;
// 3. Resolve effective backend. Reject the unsupported switch.
let stored_backend = insight.backend.clone();
let effective_backend = req
.backend
.as_deref()
.map(|s| s.trim().to_lowercase())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| stored_backend.clone());
if !matches!(effective_backend.as_str(), "local" | "hybrid") {
bail!(
"unknown backend '{}'; expected 'local' or 'hybrid'",
effective_backend
);
}
if stored_backend == "local" && effective_backend == "hybrid" {
bail!(
"switching from local to hybrid mid-chat isn't supported yet; \
regenerate the insight in hybrid mode if you want OpenRouter chat"
);
}
let is_hybrid = effective_backend == "hybrid";
span.set_attribute(KeyValue::new("backend", effective_backend.clone()));
// 4. Build the chat backend client. Ollama in local mode, a freshly
// cloned OpenRouter client in hybrid mode (clone so per-request
// sampling/model overrides don't leak into shared state).
let max_iterations = req
.max_iterations
.unwrap_or(DEFAULT_MAX_ITERATIONS)
.clamp(1, env_max_iterations());
span.set_attribute(KeyValue::new("max_iterations", max_iterations as i64));
let stored_model = insight.model_version.clone();
let custom_model = req
.model
.clone()
.or_else(|| Some(stored_model.clone()))
.filter(|m| !m.is_empty());
let mut ollama_client = self.ollama.clone();
let mut openrouter_client: Option<OpenRouterClient> = None;
if is_hybrid {
let arc = self.openrouter.as_ref().ok_or_else(|| {
anyhow!("hybrid backend unavailable: OPENROUTER_API_KEY not configured")
})?;
let mut c: OpenRouterClient = (**arc).clone();
if let Some(ref m) = custom_model {
c.primary_model = m.clone();
}
if req.temperature.is_some()
|| req.top_p.is_some()
|| req.top_k.is_some()
|| req.min_p.is_some()
{
c.set_sampling_params(req.temperature, req.top_p, req.top_k, req.min_p);
}
if let Some(ctx) = req.num_ctx {
c.set_num_ctx(Some(ctx));
}
openrouter_client = Some(c);
} else {
// Local-mode model swap. Build a new client when the chat model
// differs from the configured one (mirrors the agentic pattern).
if let Some(ref m) = custom_model
&& m != &self.ollama.primary_model
{
ollama_client = OllamaClient::new(
self.ollama.primary_url.clone(),
self.ollama.fallback_url.clone(),
m.clone(),
Some(m.clone()),
);
}
if req.temperature.is_some()
|| req.top_p.is_some()
|| req.top_k.is_some()
|| req.min_p.is_some()
{
ollama_client.set_sampling_params(req.temperature, req.top_p, req.top_k, req.min_p);
}
if let Some(ctx) = req.num_ctx {
ollama_client.set_num_ctx(Some(ctx));
}
}
let chat_backend: &dyn LlmClient = if let Some(ref c) = openrouter_client {
c
} else {
&ollama_client
};
let model_used = chat_backend.primary_model().to_string();
span.set_attribute(KeyValue::new("model", model_used.clone()));
// 5. Decide vision + tool set. In hybrid we always omit
// `describe_photo` (matches the original generation flow). In
// local we trust the stored history's first-user shape: if it
// carries `images`, the original model was vision-capable, and
// we keep `describe_photo` available.
let local_first_user_has_image = messages
.iter()
.find(|m| m.role == "user")
.and_then(|m| m.images.as_ref())
.map(|imgs| !imgs.is_empty())
.unwrap_or(false);
let offer_describe_tool = !is_hybrid && local_first_user_has_image;
// current_gate_opts(has_vision) sets gate_opts.has_vision = has_vision
// and probes the per-table presence flags. Pass `offer_describe_tool`
// directly — the `!is_hybrid && local_first_user_has_image` decision
// is the chat-path's vision predicate.
let gate_opts = self.generator.current_gate_opts(offer_describe_tool);
let tools = InsightGenerator::build_tool_definitions(gate_opts);
// Image base64 only needed when describe_photo is on the menu. Load
// lazily to avoid disk IO when the loop never invokes it.
let image_base64: Option<String> = if offer_describe_tool {
self.generator.load_image_as_base64(&normalized).ok()
} else {
None
};
// 6. Apply truncation budget. Drops oldest tool_call+tool pairs
// (preserves system + first user including any images).
let budget_tokens = (req.num_ctx.unwrap_or(DEFAULT_NUM_CTX) as usize)
.saturating_sub(RESPONSE_HEADROOM_TOKENS);
let budget_bytes = budget_tokens.saturating_mul(BYTES_PER_TOKEN);
let truncated = apply_context_budget(&mut messages, budget_bytes);
if truncated {
span.set_attribute(KeyValue::new("history_truncated", true));
}
// 7. Append the new user turn.
messages.push(ChatMessage::user(req.user_message.clone()));
// Apply per-turn system-prompt override BEFORE the budget annotation
// so the budget note attaches to the override, not the original.
// The stash is consumed below before persistence (append mode) or
// dropped (amend mode, where the override stays in place).
let override_stash =
apply_system_prompt_override(&mut messages, req.system_prompt.as_deref());
// Temporarily annotate the system message with this turn's iteration
// budget so the model knows how many tool-calling rounds it has. We
// restore the original content before persistence so the note doesn't
// accumulate across turns.
let original_system_content = annotate_system_with_budget(&mut messages, max_iterations);
let insight_cx = parent_cx.with_span(span);
// 8. Agentic loop — same shape as insight_generator's, but capped
// tighter and dispatching tools through the shared executor.
let loop_span = tracer.start_with_context("ai.chat.loop", &insight_cx);
let loop_cx = insight_cx.with_span(loop_span);
let mut tool_calls_made = 0usize;
let mut iterations_used = 0usize;
let mut last_prompt_eval_count: Option<i32> = None;
let mut last_eval_count: Option<i32> = None;
let mut final_content = String::new();
for iteration in 0..max_iterations {
iterations_used = iteration + 1;
log::info!("Chat iteration {}/{}", iterations_used, max_iterations);
let (response, prompt_tokens, eval_tokens) = chat_backend
.chat_with_tools(messages.clone(), tools.clone())
.await?;
last_prompt_eval_count = prompt_tokens;
last_eval_count = eval_tokens;
// Ollama rejects non-object tool-call arguments on replay.
let mut response = response;
if let Some(ref mut tcs) = response.tool_calls {
for tc in tcs.iter_mut() {
if !tc.function.arguments.is_object() {
tc.function.arguments = serde_json::Value::Object(Default::default());
}
}
}
messages.push(response.clone());
if let Some(ref tool_calls) = response.tool_calls
&& !tool_calls.is_empty()
{
for tool_call in tool_calls {
tool_calls_made += 1;
log::info!(
"Chat tool call [{}]: {} {:?}",
iteration,
tool_call.function.name,
tool_call.function.arguments
);
let result = self
.generator
.execute_tool(
&tool_call.function.name,
&tool_call.function.arguments,
&ollama_client,
&image_base64,
&normalized,
&loop_cx,
)
.await;
messages.push(ChatMessage::tool_result(result));
}
continue;
}
final_content = response.content;
break;
}
if final_content.is_empty() {
// The model never produced a final answer; ask once more without
// tools to force a textual reply.
log::info!(
"Chat loop exhausted after {} iterations, requesting final answer",
iterations_used
);
messages.push(ChatMessage::user(
"Please write your final answer now without calling any more tools.",
));
let (final_response, prompt_tokens, eval_tokens) = chat_backend
.chat_with_tools(messages.clone(), vec![])
.await?;
last_prompt_eval_count = prompt_tokens;
last_eval_count = eval_tokens;
final_content = final_response.content.clone();
messages.push(final_response);
}
loop_cx.span().set_status(Status::Ok);
// Drop the per-turn iteration-budget note from the system message
// before we persist so it doesn't snowball on each subsequent turn.
restore_system_content(&mut messages, original_system_content);
// Append mode: undo the per-turn system-prompt override so the
// stored transcript keeps the original baked persona. Amend mode:
// keep the override in place — it becomes the new insight row's
// system message.
if !req.amend {
restore_system_prompt_override(&mut messages, override_stash);
}
// 9. Persist. Append mode rewrites the JSON blob in place; amend
// mode regenerates the title and inserts a new insight row,
// relying on store_insight to flip prior rows' is_current=false.
let json = serde_json::to_string(&messages)
.map_err(|e| anyhow!("failed to serialize chat history: {}", e))?;
let mut amended_insight_id: Option<i32> = None;
if req.amend {
let title_prompt = format!(
"Create a short title (maximum 8 words) for the following journal entry:\n\n{}\n\n\
Capture the key moment or theme. Return ONLY the title, nothing else.",
final_content
);
let title_raw = chat_backend
.generate(
&title_prompt,
Some(
"You are my long term memory assistant. Use only the information provided. Do not invent details.",
),
None,
)
.await?;
let title = title_raw.trim().trim_matches('"').to_string();
// Amended rows intentionally do not inherit the parent's
// `fewshot_source_ids`. The parent's few-shot influence is still
// present in this row's content; if you want strict lineage
// tracking for training-set filtering, fetch the parent here and
// copy its value forward.
let new_row = InsertPhotoInsight {
library_id: req.library_id,
file_path: normalized.clone(),
title,
summary: final_content.clone(),
generated_at: Utc::now().timestamp(),
model_version: model_used.clone(),
is_current: true,
training_messages: Some(json),
backend: effective_backend.clone(),
fewshot_source_ids: None,
content_hash: None,
};
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
let stored = dao
.store_insight(&cx, new_row)
.map_err(|e| anyhow!("failed to store amended insight: {:?}", e))?;
amended_insight_id = Some(stored.id);
} else {
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
dao.update_training_messages(&cx, req.library_id, &normalized, &json)
.map_err(|e| anyhow!("failed to persist chat history: {:?}", e))?;
}
Ok(ChatTurnResult {
assistant_message: final_content,
tool_calls_made,
iterations_used,
truncated,
prompt_eval_count: last_prompt_eval_count,
eval_count: last_eval_count,
amended_insight_id,
backend_used: effective_backend,
model_used,
})
}
/// Truncate the stored conversation so the rendered message at
/// `discard_from_rendered_index` (and everything after it — including
/// the tool-call scaffolding that produced a discarded assistant reply)
/// is removed. The initial user turn cannot be discarded; attempting to
/// do so returns an error.
///
/// Holds the per-file chat mutex so it serialises with `chat_turn`.
pub async fn rewind_history(
&self,
library_id: i32,
file_path: &str,
discard_from_rendered_index: usize,
) -> Result<()> {
if discard_from_rendered_index == 0 {
bail!("cannot discard the initial user message");
}
let normalized = normalize_path(file_path);
let lock_key = (library_id, normalized.clone());
let entry_lock = {
let mut locks = self.chat_locks.lock().await;
locks
.entry(lock_key.clone())
.or_insert_with(|| Arc::new(TokioMutex::new(())))
.clone()
};
let _guard = entry_lock.lock().await;
let insight = {
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
dao.get_insight(&cx, &normalized)
.map_err(|e| anyhow!("failed to load insight: {:?}", e))?
.ok_or_else(|| anyhow!("no insight found for path"))?
};
let raw_history = insight
.training_messages
.as_ref()
.ok_or_else(|| anyhow!("insight has no chat history"))?;
let messages: Vec<ChatMessage> = serde_json::from_str(raw_history)
.map_err(|e| anyhow!("failed to deserialize chat history: {}", e))?;
let cut_at = find_raw_cut(&messages, discard_from_rendered_index)
.ok_or_else(|| anyhow!("discard_from_rendered_index out of range"))?;
let truncated = &messages[..cut_at];
let json = serde_json::to_string(truncated)
.map_err(|e| anyhow!("failed to serialize truncated history: {}", e))?;
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
dao.update_training_messages(&cx, library_id, &normalized, &json)
.map_err(|e| anyhow!("failed to persist truncated history: {:?}", e))?;
Ok(())
}
/// Streaming variant of `chat_turn`. Emits user-facing events as the
/// conversation progresses: iteration starts, tool dispatch + result,
/// text deltas from the final assistant reply, and a terminal `Done`
/// frame. Persistence happens inside the stream after the loop ends.
///
/// The stream takes ownership of the service via `Arc<Self>` (passed by
/// the caller) so it can live past the handler's await boundary.
pub fn chat_turn_stream(
self: Arc<Self>,
req: ChatTurnRequest,
) -> BoxStream<'static, ChatStreamEvent> {
let svc = self;
let s = async_stream::stream! {
match svc.chat_turn_stream_inner(req, Ok).await {
Ok(mut rx) => {
while let Some(ev) = rx.recv().await {
yield ev;
}
}
Err(e) => {
yield ChatStreamEvent::Error(format!("{}", e));
}
}
};
Box::pin(s)
}
/// Internal: drives the streaming loop on a background task, returning
/// a receiver the caller drains. Keeping the work on a spawned task
/// decouples the HTTP request lifetime from the chat execution, which
/// matters because the chat may run longer than any single network hop
/// and we want clean cancellation semantics via the channel close.
async fn chat_turn_stream_inner<F>(
self: Arc<Self>,
req: ChatTurnRequest,
_ev_mapper: F,
) -> Result<tokio::sync::mpsc::Receiver<ChatStreamEvent>>
where
F: Fn(ChatStreamEvent) -> Result<ChatStreamEvent> + Send + 'static,
{
let (tx, rx) = tokio::sync::mpsc::channel::<ChatStreamEvent>(64);
let svc = self.clone();
tokio::spawn(async move {
let result = svc.run_streaming_turn(req, tx.clone()).await;
if let Err(e) = result {
let _ = tx.send(ChatStreamEvent::Error(format!("{}", e))).await;
}
});
Ok(rx)
}
async fn run_streaming_turn(
self: Arc<Self>,
req: ChatTurnRequest,
tx: tokio::sync::mpsc::Sender<ChatStreamEvent>,
) -> Result<()> {
if req.user_message.trim().is_empty() {
bail!("user_message must not be empty");
}
if req.user_message.len() > 8192 {
bail!("user_message exceeds 8192 chars");
}
let normalized = normalize_path(&req.file_path);
let lock_key = (req.library_id, normalized.clone());
let entry_lock = {
let mut locks = self.chat_locks.lock().await;
locks
.entry(lock_key.clone())
.or_insert_with(|| Arc::new(TokioMutex::new(())))
.clone()
};
let _guard = entry_lock.lock().await;
// Look up existing insight (None when missing). The branch below
// decides bootstrap vs. continuation: `regenerate=true` forces
// bootstrap regardless of the row's presence; missing rows always
// bootstrap.
let existing_insight = {
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
dao.get_insight(&cx, &normalized)
.map_err(|e| anyhow!("failed to load insight: {:?}", e))?
};
if req.regenerate || existing_insight.is_none() {
return self.run_bootstrap_streaming(req, normalized, tx).await;
}
let insight = existing_insight.expect("just checked Some above");
self.run_continuation_streaming(req, normalized, insight, tx)
.await
}
/// Continuation path: photo has an existing agentic insight with
/// `training_messages` populated. Replay the transcript, append a new
/// turn, run the agentic loop, persist (UPDATE for append; INSERT new
/// row for amend).
async fn run_continuation_streaming(
&self,
req: ChatTurnRequest,
normalized: String,
insight: crate::database::models::PhotoInsight,
tx: tokio::sync::mpsc::Sender<ChatStreamEvent>,
) -> Result<()> {
let raw_history = insight.training_messages.as_ref().ok_or_else(|| {
anyhow!("insight has no chat history; regenerate this insight in agentic mode")
})?;
let mut messages: Vec<ChatMessage> = serde_json::from_str(raw_history)
.map_err(|e| anyhow!("failed to deserialize chat history: {}", e))?;
// Backend selection — defer to stored insight's backend unless the
// request supplies an override.
let stored_backend = insight.backend.clone();
let effective_backend = req
.backend
.as_deref()
.map(|s| s.trim().to_lowercase())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| stored_backend.clone());
if !matches!(effective_backend.as_str(), "local" | "hybrid") {
bail!(
"unknown backend '{}'; expected 'local' or 'hybrid'",
effective_backend
);
}
if stored_backend == "local" && effective_backend == "hybrid" {
bail!(
"switching from local to hybrid mid-chat isn't supported yet; \
regenerate the insight in hybrid mode if you want OpenRouter chat"
);
}
let is_hybrid = effective_backend == "hybrid";
let max_iterations = req
.max_iterations
.unwrap_or(DEFAULT_MAX_ITERATIONS)
.clamp(1, env_max_iterations());
let stored_model = insight.model_version.clone();
let custom_model = req
.model
.clone()
.or_else(|| Some(stored_model.clone()))
.filter(|m| !m.is_empty());
let (chat_backend_holder, ollama_client) =
self.build_chat_clients(is_hybrid, custom_model.as_deref(), &req)?;
let chat_backend: &dyn LlmClient = chat_backend_holder.as_ref();
let model_used = chat_backend.primary_model().to_string();
// Tool set — local mode + first user turn carries an image →
// offer describe_photo. Hybrid: visual description was inlined
// when the insight was bootstrapped, no describe tool needed.
let local_first_user_has_image = messages
.iter()
.find(|m| m.role == "user")
.and_then(|m| m.images.as_ref())
.map(|imgs| !imgs.is_empty())
.unwrap_or(false);
let offer_describe_tool = !is_hybrid && local_first_user_has_image;
let gate_opts = self.generator.current_gate_opts(offer_describe_tool);
let tools = InsightGenerator::build_tool_definitions(gate_opts);
let image_base64: Option<String> = if offer_describe_tool {
self.generator.load_image_as_base64(&normalized).ok()
} else {
None
};
// Truncate before appending the new user turn.
let budget_tokens = (req.num_ctx.unwrap_or(DEFAULT_NUM_CTX) as usize)
.saturating_sub(RESPONSE_HEADROOM_TOKENS);
let budget_bytes = budget_tokens.saturating_mul(BYTES_PER_TOKEN);
let truncated = apply_context_budget(&mut messages, budget_bytes);
if truncated {
let _ = tx.send(ChatStreamEvent::Truncated).await;
}
messages.push(ChatMessage::user(req.user_message.clone()));
// Mirror chat_turn: per-turn override goes on first, budget note next.
let override_stash =
apply_system_prompt_override(&mut messages, req.system_prompt.as_deref());
let original_system_content = annotate_system_with_budget(&mut messages, max_iterations);
let outcome = self
.run_streaming_agentic_loop(
chat_backend,
&ollama_client,
&mut messages,
tools,
&image_base64,
&normalized,
max_iterations,
&tx,
)
.await?;
let AgenticLoopOutcome {
tool_calls_made,
iterations_used,
last_prompt_eval_count,
last_eval_count,
final_content,
} = outcome;
// Drop the per-turn iteration-budget note before persisting so it
// doesn't snowball on subsequent turns.
restore_system_content(&mut messages, original_system_content);
// Append mode: undo the per-turn system-prompt override.
// Amend mode: keep it — it becomes the new row's system message.
if !req.amend {
restore_system_prompt_override(&mut messages, override_stash);
}
let json = serde_json::to_string(&messages)
.map_err(|e| anyhow!("failed to serialize chat history: {}", e))?;
let mut amended_insight_id: Option<i32> = None;
if req.amend {
let title = self.generate_title(chat_backend, &final_content).await?;
// Amended rows intentionally do not inherit the parent's
// `fewshot_source_ids`. The parent's few-shot influence is still
// present in this row's content; if you want strict lineage
// tracking for training-set filtering, fetch the parent here and
// copy its value forward.
let new_row = InsertPhotoInsight {
library_id: req.library_id,
file_path: normalized.clone(),
title,
summary: final_content.clone(),
generated_at: Utc::now().timestamp(),
model_version: model_used.clone(),
is_current: true,
training_messages: Some(json),
backend: effective_backend.clone(),
fewshot_source_ids: None,
content_hash: None,
};
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
let stored = dao
.store_insight(&cx, new_row)
.map_err(|e| anyhow!("failed to store amended insight: {:?}", e))?;
amended_insight_id = Some(stored.id);
} else {
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
dao.update_training_messages(&cx, req.library_id, &normalized, &json)
.map_err(|e| anyhow!("failed to persist chat history: {:?}", e))?;
}
let _ = tx
.send(ChatStreamEvent::Done {
tool_calls_made,
iterations_used,
truncated,
prompt_tokens: last_prompt_eval_count,
eval_tokens: last_eval_count,
num_ctx: req.num_ctx,
amended_insight_id,
backend_used: effective_backend,
model_used,
})
.await;
Ok(())
}
/// Bootstrap path: no insight row yet (or `regenerate=true`). Build a
/// fresh transcript from `req.system_prompt` + `req.user_message` +
/// the photo, run the agentic loop, generate a title, and INSERT a
/// new insight row. `store_insight` flips any prior rows for the same
/// `(library_id, file_path)` to `is_current=false` — that's how
/// `regenerate` shadows the previous insight.
async fn run_bootstrap_streaming(
&self,
req: ChatTurnRequest,
normalized: String,
tx: tokio::sync::mpsc::Sender<ChatStreamEvent>,
) -> Result<()> {
let effective_backend = resolve_bootstrap_backend(req.backend.as_deref())?;
let is_hybrid = effective_backend == "hybrid";
let max_iterations = req
.max_iterations
.unwrap_or(DEFAULT_MAX_ITERATIONS)
.clamp(1, env_max_iterations());
let custom_model = req.model.clone().filter(|m| !m.is_empty());
let (chat_backend_holder, ollama_client) =
self.build_chat_clients(is_hybrid, custom_model.as_deref(), &req)?;
let chat_backend: &dyn LlmClient = chat_backend_holder.as_ref();
let model_used = chat_backend.primary_model().to_string();
// Load image bytes once. RAW preview fallback is handled inside
// load_image_as_base64. Errors degrade silently — a chat that
// discusses metadata-only is still useful.
let image_base64: Option<String> = self.generator.load_image_as_base64(&normalized).ok();
// Hybrid backend: pre-describe the image via local Ollama vision
// so OpenRouter chat models (which can't see images directly) get
// the visual description as text. Mirrors the same pre-describe
// pass that `generate_agentic_insight_for_photo` does for hybrid.
let visual_block = if is_hybrid {
match image_base64.as_deref() {
Some(b64) => match self.ollama.describe_image(b64).await {
Ok(desc) => {
format!("Visual description (from local vision model):\n{}\n", desc)
}
Err(e) => {
log::warn!("hybrid bootstrap: local describe_image failed: {}", e);
String::new()
}
},
None => String::new(),
}
} else {
String::new()
};
// Tool gates. Local + image present → expose describe_photo so
// the chat model can re-look at the photo on demand. Hybrid:
// already inlined, no tool needed.
let offer_describe_tool = !is_hybrid && image_base64.is_some();
let gate_opts = self.generator.current_gate_opts(offer_describe_tool);
let tools = InsightGenerator::build_tool_definitions(gate_opts);
// System message = persona + photo context block. Photo context
// is in the system message — not the user turn — so the user's
// bubble in the rendered transcript shows only what they typed.
// Several agentic tools (recall_facts_for_photo, get_file_tags,
// get_faces_in_photo, etc.) take a `file_path` arg the model
// can't know without being told; in hybrid mode the visual
// description belongs here for the same reason.
let persona = resolve_bootstrap_system_prompt(req.system_prompt.as_deref());
let system_content = build_bootstrap_system_message(&persona, &normalized, &visual_block);
let system_msg = ChatMessage::system(system_content);
let mut user_msg = ChatMessage::user(req.user_message.clone());
if !is_hybrid && let Some(ref img) = image_base64 {
user_msg.images = Some(vec![img.clone()]);
}
let mut messages = vec![system_msg, user_msg];
let outcome = self
.run_streaming_agentic_loop(
chat_backend,
&ollama_client,
&mut messages,
tools,
&image_base64,
&normalized,
max_iterations,
&tx,
)
.await?;
let AgenticLoopOutcome {
tool_calls_made,
iterations_used,
last_prompt_eval_count,
last_eval_count,
final_content,
} = outcome;
let title = self.generate_title(chat_backend, &final_content).await?;
let json = serde_json::to_string(&messages)
.map_err(|e| anyhow!("failed to serialize chat history: {}", e))?;
let new_row = InsertPhotoInsight {
library_id: req.library_id,
file_path: normalized.clone(),
title,
summary: final_content,
generated_at: Utc::now().timestamp(),
model_version: model_used.clone(),
is_current: true,
training_messages: Some(json),
backend: effective_backend.clone(),
fewshot_source_ids: None,
content_hash: None,
};
let stored = {
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
dao.store_insight(&cx, new_row)
.map_err(|e| anyhow!("failed to store bootstrap insight: {:?}", e))?
};
// amended_insight_id semantics broaden on bootstrap/regenerate:
// populated whenever this turn produced a new insight row, so
// clients don't need a separate field to learn the new id.
let _ = tx
.send(ChatStreamEvent::Done {
tool_calls_made,
iterations_used,
truncated: false,
prompt_tokens: last_prompt_eval_count,
eval_tokens: last_eval_count,
num_ctx: req.num_ctx,
amended_insight_id: Some(stored.id),
backend_used: effective_backend,
model_used,
})
.await;
Ok(())
}
/// Set up chat clients (Ollama + optional OpenRouter) shared by
/// bootstrap and continuation. Returns the chat-side backend client
/// (boxed because hybrid and local return different concrete types)
/// and the Ollama client used for describe-image / local tool calls.
fn build_chat_clients(
&self,
is_hybrid: bool,
custom_model: Option<&str>,
req: &ChatTurnRequest,
) -> Result<(Box<dyn LlmClient>, OllamaClient)> {
let mut ollama_client = self.ollama.clone();
if is_hybrid {
let arc = self.openrouter.as_ref().ok_or_else(|| {
anyhow!("hybrid backend unavailable: OPENROUTER_API_KEY not configured")
})?;
let mut c: OpenRouterClient = (**arc).clone();
if let Some(m) = custom_model {
c.primary_model = m.to_string();
}
if req.temperature.is_some()
|| req.top_p.is_some()
|| req.top_k.is_some()
|| req.min_p.is_some()
{
c.set_sampling_params(req.temperature, req.top_p, req.top_k, req.min_p);
}
if let Some(ctx) = req.num_ctx {
c.set_num_ctx(Some(ctx));
}
return Ok((Box::new(c), ollama_client));
}
if let Some(m) = custom_model
&& m != self.ollama.primary_model
{
ollama_client = OllamaClient::new(
self.ollama.primary_url.clone(),
self.ollama.fallback_url.clone(),
m.to_string(),
Some(m.to_string()),
);
}
if req.temperature.is_some()
|| req.top_p.is_some()
|| req.top_k.is_some()
|| req.min_p.is_some()
{
ollama_client.set_sampling_params(req.temperature, req.top_p, req.top_k, req.min_p);
}
if let Some(ctx) = req.num_ctx {
ollama_client.set_num_ctx(Some(ctx));
}
Ok((Box::new(ollama_client.clone()), ollama_client))
}
/// Generate a short title via the same chat backend so voice stays
/// consistent with the body. Mirrors generate_agentic_insight_for_photo's
/// titling pass.
async fn generate_title(
&self,
chat_backend: &dyn LlmClient,
final_content: &str,
) -> Result<String> {
let title_prompt = format!(
"Create a short title (maximum 8 words) for the following journal entry:\n\n{}\n\n\
Capture the key moment or theme. Return ONLY the title, nothing else.",
final_content
);
let title_raw = chat_backend
.generate(
&title_prompt,
Some(
"You are my long term memory assistant. Use only the information provided. Do not invent details.",
),
None,
)
.await?;
Ok(title_raw.trim().trim_matches('"').to_string())
}
/// Drive the agentic loop with streaming SSE events. Shared between
/// bootstrap and continuation. Mutates `messages` in place (response
/// turns + tool results are appended) and returns counters + the
/// final assistant content.
async fn run_streaming_agentic_loop(
&self,
chat_backend: &dyn LlmClient,
ollama_client: &OllamaClient,
messages: &mut Vec<ChatMessage>,
tools: Vec<Tool>,
image_base64: &Option<String>,
normalized: &str,
max_iterations: usize,
tx: &tokio::sync::mpsc::Sender<ChatStreamEvent>,
) -> Result<AgenticLoopOutcome> {
let mut tool_calls_made = 0usize;
let mut iterations_used = 0usize;
let mut last_prompt_eval_count: Option<i32> = None;
let mut last_eval_count: Option<i32> = None;
let mut final_content = String::new();
for iteration in 0..max_iterations {
iterations_used = iteration + 1;
let _ = tx
.send(ChatStreamEvent::IterationStart {
n: iterations_used,
max: max_iterations,
})
.await;
let mut stream = chat_backend
.chat_with_tools_stream(messages.clone(), tools.clone())
.await?;
let mut final_message: Option<ChatMessage> = None;
while let Some(ev) = stream.next().await {
let ev = ev?;
match ev {
LlmStreamEvent::TextDelta(delta) => {
let _ = tx.send(ChatStreamEvent::TextDelta(delta)).await;
}
LlmStreamEvent::Done {
message,
prompt_eval_count,
eval_count,
} => {
last_prompt_eval_count = prompt_eval_count;
last_eval_count = eval_count;
final_message = Some(message);
break;
}
}
}
let mut response =
final_message.ok_or_else(|| anyhow!("stream ended without a Done event"))?;
// Normalize non-object tool arguments (some models occasionally
// return null/string/bool which Ollama rejects on the next turn).
if let Some(ref mut tcs) = response.tool_calls {
for tc in tcs.iter_mut() {
if !tc.function.arguments.is_object() {
tc.function.arguments = serde_json::Value::Object(Default::default());
}
}
}
messages.push(response.clone());
if let Some(ref tool_calls) = response.tool_calls
&& !tool_calls.is_empty()
{
for tool_call in tool_calls {
tool_calls_made += 1;
let call_index = tool_calls_made - 1;
let _ = tx
.send(ChatStreamEvent::ToolCall {
index: call_index,
name: tool_call.function.name.clone(),
arguments: tool_call.function.arguments.clone(),
})
.await;
let cx = opentelemetry::Context::new();
let result = self
.generator
.execute_tool(
&tool_call.function.name,
&tool_call.function.arguments,
ollama_client,
image_base64,
normalized,
&cx,
)
.await;
let (result_preview, result_truncated) = truncate_tool_result(&result);
let _ = tx
.send(ChatStreamEvent::ToolResult {
index: call_index,
name: tool_call.function.name.clone(),
result: result_preview,
result_truncated,
})
.await;
messages.push(ChatMessage::tool_result(result));
}
continue;
}
final_content = response.content;
break;
}
// No-tools fallback: loop exhausted iterations without producing a
// tool-free reply. Ask once more, with no tools attached.
if final_content.is_empty() {
// Index of the synthetic "please write your final answer"
// user message — we strip it from history after the model
// responds, so it never appears in the rendered transcript
// and load_history's user-turn handler doesn't reset
// pending_tools at this position (wiping the prior tool
// calls from the final assistant render).
let synthetic_idx = messages.len();
messages.push(ChatMessage::user(
"Please write your final answer now without calling any more tools.",
));
let mut stream = chat_backend
.chat_with_tools_stream(messages.clone(), vec![])
.await?;
let mut final_message: Option<ChatMessage> = None;
while let Some(ev) = stream.next().await {
let ev = ev?;
match ev {
LlmStreamEvent::TextDelta(delta) => {
let _ = tx.send(ChatStreamEvent::TextDelta(delta)).await;
}
LlmStreamEvent::Done {
message,
prompt_eval_count,
eval_count,
} => {
last_prompt_eval_count = prompt_eval_count;
last_eval_count = eval_count;
final_message = Some(message);
break;
}
}
}
let final_response =
final_message.ok_or_else(|| anyhow!("final stream ended without a Done event"))?;
final_content = final_response.content.clone();
messages.push(final_response);
// Drop the synthetic prompt — internal scaffolding only. The
// model's final_response (now at the end) was generated with
// it in context and reads coherently without it on replay.
messages.remove(synthetic_idx);
}
Ok(AgenticLoopOutcome {
tool_calls_made,
iterations_used,
last_prompt_eval_count,
last_eval_count,
final_content,
})
}
}
/// Default system prompt for bootstrap when the client didn't supply one
/// (or supplied an empty string). Apollo's frontend always sends a persona
/// prompt today, so this is a fallback for clients that don't.
const BOOTSTRAP_DEFAULT_SYSTEM_PROMPT: &str = "You are a helpful AI assistant analyzing the user's photo. \
Use the available tools to gather context and answer their questions \
in a conversational tone.";
/// Pick the system prompt for bootstrap. Trimmed-non-empty supplied wins;
/// otherwise fall back to [`BOOTSTRAP_DEFAULT_SYSTEM_PROMPT`]. Returns an
/// owned `String` because the bootstrap caller persists it on the new
/// insight row.
fn resolve_bootstrap_system_prompt(supplied: Option<&str>) -> String {
supplied
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_string)
.unwrap_or_else(|| BOOTSTRAP_DEFAULT_SYSTEM_PROMPT.to_string())
}
/// Compose the bootstrap system message: the persona on top, followed
/// by a photo-context block carrying the file path (and, in hybrid
/// mode, the local-vision visual description). Lives in the system
/// message — not the user turn — so the rendered transcript shows
/// only what the user typed.
fn build_bootstrap_system_message(
persona: &str,
normalized_path: &str,
visual_block: &str,
) -> String {
let mut out = persona.trim_end().to_string();
out.push_str("\n\n--- PHOTO CONTEXT ---\n");
out.push_str(&format!("Photo file path: {}\n", normalized_path));
if !visual_block.is_empty() {
// visual_block already ends with a newline; no extra separator
// needed.
out.push_str(visual_block);
}
out
}
/// Pick the backend label for bootstrap. Bootstrap has no stored insight
/// to defer to (that's continuation's behaviour), so the default is
/// `"local"`. Returns an error if the supplied label is non-empty but
/// not one of the recognised values — same surface as continuation's
/// validation.
fn resolve_bootstrap_backend(supplied: Option<&str>) -> Result<String> {
let lower = supplied
.map(|s| s.trim().to_lowercase())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| "local".to_string());
if !matches!(lower.as_str(), "local" | "hybrid") {
bail!("unknown backend '{}'; expected 'local' or 'hybrid'", lower);
}
Ok(lower)
}
/// Outcome of one streaming agentic loop pass. Shared between bootstrap
/// and continuation.
struct AgenticLoopOutcome {
tool_calls_made: usize,
iterations_used: usize,
last_prompt_eval_count: Option<i32>,
last_eval_count: Option<i32>,
final_content: String,
}
/// Events emitted by `chat_turn_stream`. One stream per turn; ends after
/// `Done` or `Error`.
#[derive(Debug, Clone)]
pub enum ChatStreamEvent {
/// Starting iteration `n` of up to `max` (1-based).
IterationStart { n: usize, max: usize },
/// History was trimmed to fit the context budget before the turn ran.
/// Emitted at most once, before any tool or text events.
Truncated,
/// Incremental content from the final assistant reply. Concatenate to
/// reconstruct the reply body. Tool-dispatch turns don't produce these.
TextDelta(String),
/// The model requested this tool call. Emitted just before execution.
/// `index` is a monotonically-increasing counter across the turn so the
/// client can pair `ToolCall` with its matching `ToolResult`.
ToolCall {
index: usize,
name: String,
arguments: serde_json::Value,
},
/// The tool finished; `result` is the (possibly truncated) output.
ToolResult {
index: usize,
name: String,
result: String,
result_truncated: bool,
},
/// Terminal success event with counters + persistence result.
Done {
tool_calls_made: usize,
iterations_used: usize,
truncated: bool,
/// Renamed from `prompt_eval_count` to match the wire-name Apollo's
/// frontend (and the mobile client) consume on the `done` SSE
/// payload.
prompt_tokens: Option<i32>,
/// Renamed from `eval_count`. Same rationale.
eval_tokens: Option<i32>,
/// The configured context-window ceiling that ran this turn — echoes
/// the request's `num_ctx` (or the server default when none was
/// supplied). Lets clients render `prompt_tokens / num_ctx` without
/// remembering what they asked for.
num_ctx: Option<i32>,
/// Populated when this turn produced a NEW insight row — bootstrap
/// (no prior insight), regenerate (old row flipped to is_current=
/// false), or amend. Null on append. Clients should target this id
/// for any subsequent operation that previously needed a known
/// insight (e.g., a follow-up `/insights/chat` against the just-
/// created row).
amended_insight_id: Option<i32>,
backend_used: String,
model_used: String,
},
/// Terminal failure event. No further events follow.
Error(String),
}
/// Is this raw message visible in the rendered transcript? Must match
/// `load_history`'s filter exactly — `find_raw_cut` depends on it to map
/// rendered indices back to raw positions.
fn is_rendered(m: &ChatMessage) -> bool {
match m.role.as_str() {
"user" => true,
"assistant" => {
let has_tool_calls = m
.tool_calls
.as_ref()
.map(|c| !c.is_empty())
.unwrap_or(false);
!(has_tool_calls && m.content.trim().is_empty())
}
_ => false,
}
}
/// Given a rendered index to start discarding from, find the raw index at
/// which to truncate. The cut position is the raw length after all prior
/// rendered messages — which also strips any tool-call scaffolding that
/// immediately precedes the discarded rendered message.
///
/// Discarding *at* the end (`discard == rendered_count`) is a no-op success:
/// returns `Some(messages.len())`. The mobile client hits this when
/// regenerating after a failed turn — its optimistic user bubble lives at
/// the index just past the server's persisted history. Strictly past the end
/// (`discard > rendered_count`) returns `None`.
pub(crate) fn find_raw_cut(
messages: &[ChatMessage],
discard_from_rendered_index: usize,
) -> Option<usize> {
let mut rendered_count = 0usize;
let mut last_kept_raw_end = 0usize;
for (i, m) in messages.iter().enumerate() {
if !is_rendered(m) {
continue;
}
if rendered_count == discard_from_rendered_index {
return Some(last_kept_raw_end);
}
rendered_count += 1;
last_kept_raw_end = i + 1;
}
if discard_from_rendered_index == rendered_count {
return Some(messages.len());
}
None
}
/// Read AGENTIC_CHAT_MAX_ITERATIONS once per call. Cheap; keeps the code
/// free of static globals and lets the operator change the cap by env without
/// a restart in test harnesses (the running server still caches via Default).
fn env_max_iterations() -> usize {
std::env::var("AGENTIC_CHAT_MAX_ITERATIONS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(DEFAULT_MAX_ITERATIONS)
.max(1)
}
/// Append a per-turn iteration-budget reminder to the replayed system
/// message so the model knows how many tool-calling rounds this turn gets.
/// Returns the original `content` so the caller can restore it before
/// persistence — otherwise the note would accumulate across turns.
///
/// No-op (returns `None`) when `messages` has no leading system message.
fn annotate_system_with_budget(
messages: &mut [ChatMessage],
max_iterations: usize,
) -> Option<String> {
let first = messages.first_mut()?;
if first.role != "system" {
return None;
}
let original = first.content.clone();
first.content = format!(
"{}\n\n(Budget for this chat turn: up to {} tool-calling iterations. Produce your final reply before the budget is exhausted.)",
first.content, max_iterations
);
Some(original)
}
/// Restore a system-message content previously captured by
/// [`annotate_system_with_budget`]. No-op when `original` is `None` or the
/// first message isn't a system message.
fn restore_system_content(messages: &mut [ChatMessage], original: Option<String>) {
let Some(original) = original else { return };
if let Some(first) = messages.first_mut()
&& first.role == "system"
{
first.content = original;
}
}
/// Receipt produced by [`apply_system_prompt_override`] so the caller can
/// undo the override before persistence. Two variants because we either
/// replaced an existing system message (need its original content) or
/// prepended a synthetic one (need to pop it).
#[derive(Debug)]
pub(crate) enum SystemPromptStash {
Replaced { original: String },
Prepended,
}
/// Apply a per-turn `system_prompt` override to `messages` so the model
/// sees the requested persona for this turn. Returns a stash the caller
/// must pass to [`restore_system_prompt_override`] before persisting the
/// transcript — without that step, append-mode chat would silently
/// rewrite the stored persona.
///
/// No-op (returns `None`) when `override_prompt` is `None` or empty.
pub(crate) fn apply_system_prompt_override(
messages: &mut Vec<ChatMessage>,
override_prompt: Option<&str>,
) -> Option<SystemPromptStash> {
let prompt = override_prompt
.map(str::trim)
.filter(|s| !s.is_empty())?
.to_string();
if let Some(first) = messages.first_mut()
&& first.role == "system"
{
let original = std::mem::replace(&mut first.content, prompt);
return Some(SystemPromptStash::Replaced { original });
}
messages.insert(0, ChatMessage::system(prompt));
Some(SystemPromptStash::Prepended)
}
/// Undo an override previously applied by [`apply_system_prompt_override`].
/// No-op when `stash` is `None`.
pub(crate) fn restore_system_prompt_override(
messages: &mut Vec<ChatMessage>,
stash: Option<SystemPromptStash>,
) {
let Some(stash) = stash else { return };
match stash {
SystemPromptStash::Replaced { original } => {
if let Some(first) = messages.first_mut()
&& first.role == "system"
{
first.content = original;
}
}
SystemPromptStash::Prepended => {
if matches!(messages.first(), Some(m) if m.role == "system") {
messages.remove(0);
}
}
}
}
/// View returned to clients for chat-UI rendering.
#[derive(Debug)]
pub struct HistoryView {
pub messages: Vec<RenderedMessage>,
pub turn_count: usize,
pub model_version: String,
pub backend: String,
}
#[derive(Debug)]
pub struct RenderedMessage {
pub role: String,
pub content: String,
pub is_initial: bool,
/// Tools invoked during this turn (only populated for assistant replies).
/// Empty for user messages and for assistant replies that didn't involve
/// tool calls.
pub tools: Vec<ToolInvocation>,
}
#[derive(Debug, Clone)]
pub struct ToolInvocation {
pub name: String,
pub arguments: serde_json::Value,
pub result: String,
/// True when `result` was trimmed for payload size. Full value remains
/// available in the raw training_messages blob.
pub result_truncated: bool,
}
/// Soft cap for tool-result bodies returned via the history API. Keeps
/// payloads small for the mobile client — verbose SMS / geocoding responses
/// don't need to ship in full for inspection.
const TOOL_RESULT_PREVIEW_MAX: usize = 2000;
fn truncate_tool_result(s: &str) -> (String, bool) {
if s.len() <= TOOL_RESULT_PREVIEW_MAX {
(s.to_string(), false)
} else {
// Cut on a char boundary.
let mut cut = TOOL_RESULT_PREVIEW_MAX;
while !s.is_char_boundary(cut) && cut > 0 {
cut -= 1;
}
(s[..cut].to_string(), true)
}
}
/// Trim history to fit within `budget_bytes` of serialized JSON. Preserves
/// the system message and the first user message (with its base64 images
/// intact, since dropping those would invalidate the model's prior visual
/// reasoning). Drops the oldest assistant-tool_call + corresponding
/// tool-result pair on each pass until the budget is met or only the
/// preserved prefix remains.
///
/// Returns true when at least one message was dropped.
pub(crate) fn apply_context_budget(messages: &mut Vec<ChatMessage>, budget_bytes: usize) -> bool {
if budget_bytes == 0 {
return false;
}
if estimate_bytes(messages) <= budget_bytes {
return false;
}
// Find the index past the protected prefix: system messages + the first
// user message. Everything after is droppable in pairs.
let first_user_idx = messages.iter().position(|m| m.role == "user");
let preserve_through = match first_user_idx {
Some(i) => i, // keep [0..=i]
None => return false,
};
let mut dropped_any = false;
loop {
if estimate_bytes(messages) <= budget_bytes {
break;
}
// Find the oldest assistant-with-tool_calls strictly after the
// preserved prefix. Drop it together with the following tool turn(s)
// until we hit the next assistant or user turn.
let drop_start = (preserve_through + 1..messages.len()).find(|&i| {
let m = &messages[i];
m.role == "assistant"
&& m.tool_calls
.as_ref()
.map(|c| !c.is_empty())
.unwrap_or(false)
});
let Some(start) = drop_start else { break };
// Determine end: drop the assistant turn plus any contiguous tool
// result turns that follow.
let mut end = start + 1;
while end < messages.len() && messages[end].role == "tool" {
end += 1;
}
// Stop if dropping these would leave the just-appended user turn at
// the end alone with no preceding context — we still want it kept.
if end > messages.len() {
break;
}
messages.drain(start..end);
dropped_any = true;
}
dropped_any
}
fn estimate_bytes(messages: &[ChatMessage]) -> usize {
serde_json::to_string(messages)
.map(|s| s.len())
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ai::llm_client::{ToolCall, ToolCallFunction};
fn assistant_with_tool_call(name: &str) -> ChatMessage {
ChatMessage {
role: "assistant".to_string(),
content: String::new(),
tool_calls: Some(vec![ToolCall {
id: None,
function: ToolCallFunction {
name: name.to_string(),
arguments: serde_json::Value::Object(Default::default()),
},
}]),
images: None,
}
}
fn assistant_text(text: &str) -> ChatMessage {
ChatMessage {
role: "assistant".to_string(),
content: text.to_string(),
tool_calls: None,
images: None,
}
}
#[test]
fn truncation_preserves_system_and_first_user() {
let mut msgs = vec![
ChatMessage::system("sys"),
ChatMessage::user("first user with lots of context".repeat(50)),
assistant_with_tool_call("get_x"),
ChatMessage::tool_result("x result ".repeat(200)),
assistant_with_tool_call("get_y"),
ChatMessage::tool_result("y result ".repeat(200)),
assistant_text("final answer"),
];
let original_len = msgs.len();
let dropped = apply_context_budget(&mut msgs, 500);
assert!(dropped, "should drop something at this small budget");
assert!(msgs.len() < original_len);
// First two messages preserved.
assert_eq!(msgs[0].role, "system");
assert_eq!(msgs[1].role, "user");
}
#[test]
fn truncation_no_op_when_under_budget() {
let mut msgs = vec![ChatMessage::system("s"), ChatMessage::user("u")];
let dropped = apply_context_budget(&mut msgs, 1_000_000);
assert!(!dropped);
assert_eq!(msgs.len(), 2);
}
#[test]
fn truncation_returns_false_with_no_droppable_pairs() {
// Only system + user, no tool-call turns to drop.
let mut msgs = vec![ChatMessage::system("s"), ChatMessage::user("u")];
let dropped = apply_context_budget(&mut msgs, 1);
assert!(!dropped);
}
#[test]
fn rewind_strips_assistant_and_tool_scaffolding() {
// Rendered: [user1, asst1, user2, asst2] → cut at rendered index 3
// (the final asst2) should drop the tool-call scaffolding + asst2,
// leaving raw up through user2.
let msgs = vec![
ChatMessage::system("sys"),
ChatMessage::user("q1"),
assistant_text("a1"),
ChatMessage::user("q2"),
assistant_with_tool_call("lookup"),
ChatMessage::tool_result("data"),
assistant_text("a2 final"),
];
let cut = find_raw_cut(&msgs, 3).expect("cut found");
// raw[0..cut] should end at user("q2") — indices 0..=3.
assert_eq!(cut, 4);
assert_eq!(msgs[cut - 1].role, "user");
assert_eq!(msgs[cut - 1].content, "q2");
}
#[test]
fn rewind_at_second_rendered_cuts_after_first_user() {
// Rendered index 1 = the first assistant reply → dropping it should
// leave just the initial user message.
let msgs = vec![
ChatMessage::system("s"),
ChatMessage::user("q1"),
assistant_with_tool_call("tool"),
ChatMessage::tool_result("r"),
assistant_text("a1"),
];
let cut = find_raw_cut(&msgs, 1).expect("cut found");
assert_eq!(cut, 2); // sys + user("q1")
}
#[test]
fn rewind_beyond_range_returns_none() {
let msgs = vec![ChatMessage::user("q1"), assistant_text("a1")];
assert!(find_raw_cut(&msgs, 5).is_none());
}
#[test]
fn rewind_at_end_is_noop_success() {
// Mobile client retries after a failed turn that never persisted —
// its optimistic user bubble's index equals the server's rendered
// count. Should resolve to "no cut" rather than an out-of-range error.
let msgs = vec![
ChatMessage::system("s"),
ChatMessage::user("q1"),
assistant_text("a1"),
];
let cut = find_raw_cut(&msgs, 2).expect("boundary cut should succeed");
assert_eq!(cut, msgs.len());
}
#[test]
fn apply_override_replaces_existing_system_message() {
let mut msgs = vec![
ChatMessage::system("original persona"),
ChatMessage::user("hi"),
];
let stash = apply_system_prompt_override(&mut msgs, Some("new persona"));
assert_eq!(msgs[0].content, "new persona");
match stash {
Some(SystemPromptStash::Replaced { original }) => {
assert_eq!(original, "original persona");
}
other => panic!("expected Replaced, got {:?}", other),
}
}
#[test]
fn apply_override_prepends_synthetic_when_missing() {
let mut msgs = vec![ChatMessage::user("hi")];
let stash = apply_system_prompt_override(&mut msgs, Some("new persona"));
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].role, "system");
assert_eq!(msgs[0].content, "new persona");
assert!(matches!(stash, Some(SystemPromptStash::Prepended)));
}
#[test]
fn apply_override_no_op_when_none() {
let mut msgs = vec![ChatMessage::system("sys"), ChatMessage::user("hi")];
let stash = apply_system_prompt_override(&mut msgs, None);
assert!(stash.is_none());
assert_eq!(msgs[0].content, "sys");
}
#[test]
fn apply_override_no_op_for_empty_string() {
let mut msgs = vec![ChatMessage::system("sys")];
let stash = apply_system_prompt_override(&mut msgs, Some(""));
assert!(stash.is_none());
assert_eq!(msgs[0].content, "sys");
}
#[test]
fn restore_override_replaces_back() {
let mut msgs = vec![ChatMessage::system("new"), ChatMessage::user("hi")];
restore_system_prompt_override(
&mut msgs,
Some(SystemPromptStash::Replaced {
original: "original".to_string(),
}),
);
assert_eq!(msgs[0].content, "original");
assert_eq!(msgs.len(), 2);
}
#[test]
fn restore_override_pops_synthetic() {
let mut msgs = vec![ChatMessage::system("new"), ChatMessage::user("hi")];
restore_system_prompt_override(&mut msgs, Some(SystemPromptStash::Prepended));
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].role, "user");
}
#[test]
fn override_round_trip_preserves_original_system_message() {
let mut msgs = vec![
ChatMessage::system("original persona"),
ChatMessage::user("first user"),
assistant_text("first reply"),
];
let stash = apply_system_prompt_override(&mut msgs, Some("ephemeral persona"));
assert_eq!(msgs[0].content, "ephemeral persona");
restore_system_prompt_override(&mut msgs, stash);
assert_eq!(msgs[0].content, "original persona");
assert_eq!(msgs.len(), 3);
assert_eq!(msgs[1].role, "user");
assert_eq!(msgs[2].role, "assistant");
}
#[test]
fn override_with_synthetic_round_trip_drops_extra_message() {
let mut msgs = vec![ChatMessage::user("first user")];
let stash = apply_system_prompt_override(&mut msgs, Some("ephemeral"));
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].role, "system");
restore_system_prompt_override(&mut msgs, stash);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].role, "user");
}
// ── Bootstrap prompt / backend resolution ─────────────────────────
// Resolution helpers run on every bootstrap turn — they pick the
// persisted system prompt and the chosen backend label. Bugs here
// would silently swap the persona or miscategorise a backend.
#[test]
fn bootstrap_system_prompt_falls_back_to_default_for_none() {
let out = resolve_bootstrap_system_prompt(None);
assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT);
}
#[test]
fn bootstrap_system_prompt_falls_back_to_default_for_empty_string() {
// Apollo currently sends `''` when no persona is selected.
let out = resolve_bootstrap_system_prompt(Some(""));
assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT);
}
#[test]
fn bootstrap_system_prompt_falls_back_to_default_for_whitespace() {
let out = resolve_bootstrap_system_prompt(Some(" \n\t "));
assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT);
}
#[test]
fn bootstrap_system_prompt_uses_supplied_when_non_empty() {
let out = resolve_bootstrap_system_prompt(Some("you are a journal"));
assert_eq!(out, "you are a journal");
}
#[test]
fn bootstrap_system_prompt_does_not_strip_inner_whitespace() {
// Trim only happens at the edges — interior newlines and spacing
// (which Apollo's persona uses for tool listings) must survive.
let prompt = "line one\nline two\n bullet";
let out = resolve_bootstrap_system_prompt(Some(prompt));
assert_eq!(out, prompt);
}
#[test]
fn bootstrap_backend_defaults_to_local_when_none() {
let out = resolve_bootstrap_backend(None).unwrap();
assert_eq!(out, "local");
}
#[test]
fn bootstrap_backend_defaults_to_local_when_empty() {
let out = resolve_bootstrap_backend(Some("")).unwrap();
assert_eq!(out, "local");
}
#[test]
fn bootstrap_backend_accepts_local_and_hybrid_case_insensitively() {
assert_eq!(resolve_bootstrap_backend(Some("LOCAL")).unwrap(), "local");
assert_eq!(resolve_bootstrap_backend(Some("Hybrid")).unwrap(), "hybrid");
assert_eq!(
resolve_bootstrap_backend(Some(" local ")).unwrap(),
"local"
);
}
#[test]
fn bootstrap_backend_rejects_unknown_label() {
let err = resolve_bootstrap_backend(Some("openrouter")).unwrap_err();
let msg = format!("{}", err);
assert!(msg.contains("unknown backend"));
assert!(msg.contains("openrouter"));
}
#[test]
fn bootstrap_system_message_includes_path_and_persona() {
let out = build_bootstrap_system_message("you are helpful", "pics/IMG.jpg", "");
assert!(out.starts_with("you are helpful"));
assert!(out.contains("--- PHOTO CONTEXT ---"));
assert!(out.contains("Photo file path: pics/IMG.jpg"));
// No visual block — should not introduce a stray "Visual" line.
assert!(!out.contains("Visual description"));
}
#[test]
fn bootstrap_system_message_includes_visual_block_when_supplied() {
let visual = "Visual description (from local vision model):\nA dog in a park.\n";
let out = build_bootstrap_system_message("voice", "p.jpg", visual);
assert!(out.contains("Photo file path: p.jpg"));
assert!(out.contains("A dog in a park"));
// Path appears before visual.
let path_pos = out.find("Photo file path:").unwrap();
let visual_pos = out.find("A dog in a park").unwrap();
assert!(path_pos < visual_pos);
}
#[test]
fn bootstrap_system_message_trims_persona_trailing_whitespace() {
// Two consecutive newlines before the photo-context divider —
// any trailing whitespace from the persona must be collapsed
// so we don't end up with `\n\n\n\n--- PHOTO CONTEXT ---`.
let out = build_bootstrap_system_message("voice \n\n\n", "p.jpg", "");
assert!(out.contains("voice\n\n--- PHOTO CONTEXT ---"));
}
}