insight-chat: bootstrap insight on first Discuss message + regenerate flag
Tap-Discuss-on-no-insight previously failed silently: ImageApi's /insights/chat/stream required an existing agentic insight, errored when missing, and emitted the failure as `event: error` — which the frontend SSE consumer ignored (it listens for `error_message`). This commit closes both gaps with a server-side state machine: - /insights/chat/stream now branches on insight presence. Missing insight (or `regenerate: true` in the body) → bootstrap path: builds [System(req.system_prompt), User(req.user_message + image)], runs the agentic loop, generates a title, persists a new row via store_insight (which auto-flips priors). Existing insight → continuation path (unchanged behaviour). - New `regenerate: bool` request field forces bootstrap even when an insight exists. Takes precedence over `amend`. - `done` SSE payload field-name alignment with Apollo's frontend convention: prompt_eval_count → prompt_tokens, eval_count → eval_tokens, num_ctx echo added. - `amended_insight_id` semantics broaden — now populated whenever the turn produced a new row (bootstrap, regenerate, or amend). Existing amend clients keep working unchanged; new clients get the new row's id for free. - `event: error` → `event: error_message` so frontend errors stop silently dropping. Refactor: extracted run_streaming_agentic_loop, build_chat_clients, and generate_title as shared helpers between bootstrap and continuation. Continuation path's outer logic moves to run_continuation_streaming with no behaviour change. Mobile-ready: any client (Apollo backend, mobile, future) sends one request to /insights/chat/stream and gets the right path. Apollo's proxy stays a dumb pipe. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -641,11 +641,18 @@ pub struct ChatTurnHttpRequest {
|
||||
#[serde(default)]
|
||||
pub max_iterations: Option<usize>,
|
||||
/// Per-turn system-prompt override. Ephemeral in append mode,
|
||||
/// persisted in amend mode. See ChatTurnRequest for semantics.
|
||||
/// persisted in amend / regenerate mode. See ChatTurnRequest for
|
||||
/// semantics. Also seeds the bootstrap path when no insight exists.
|
||||
#[serde(default)]
|
||||
pub system_prompt: Option<String>,
|
||||
#[serde(default)]
|
||||
pub amend: bool,
|
||||
/// When true, force the bootstrap path even if an insight already
|
||||
/// exists: flip the existing row(s) to `is_current=false` and create
|
||||
/// a new insight row from this turn. Takes precedence over `amend`.
|
||||
/// Collapses to a normal bootstrap when no insight exists.
|
||||
#[serde(default)]
|
||||
pub regenerate: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
@@ -701,6 +708,7 @@ pub async fn chat_turn_handler(
|
||||
max_iterations: request.max_iterations,
|
||||
system_prompt: request.system_prompt.clone(),
|
||||
amend: request.amend,
|
||||
regenerate: request.regenerate,
|
||||
};
|
||||
|
||||
match app_state.insight_chat.chat_turn(chat_req).await {
|
||||
@@ -916,6 +924,7 @@ pub async fn chat_stream_handler(
|
||||
max_iterations: request.max_iterations,
|
||||
system_prompt: request.system_prompt.clone(),
|
||||
amend: request.amend,
|
||||
regenerate: request.regenerate,
|
||||
};
|
||||
|
||||
let service = app_state.insight_chat.clone();
|
||||
@@ -967,8 +976,9 @@ fn render_sse_frame(ev: &ChatStreamEvent) -> String {
|
||||
tool_calls_made,
|
||||
iterations_used,
|
||||
truncated,
|
||||
prompt_eval_count,
|
||||
eval_count,
|
||||
prompt_tokens,
|
||||
eval_tokens,
|
||||
num_ctx,
|
||||
amended_insight_id,
|
||||
backend_used,
|
||||
model_used,
|
||||
@@ -978,14 +988,20 @@ fn render_sse_frame(ev: &ChatStreamEvent) -> String {
|
||||
"tool_calls_made": tool_calls_made,
|
||||
"iterations_used": iterations_used,
|
||||
"truncated": truncated,
|
||||
"prompt_eval_count": prompt_eval_count,
|
||||
"eval_count": eval_count,
|
||||
"prompt_tokens": prompt_tokens,
|
||||
"eval_tokens": eval_tokens,
|
||||
"num_ctx": num_ctx,
|
||||
"amended_insight_id": amended_insight_id,
|
||||
"backend": backend_used,
|
||||
"model": model_used,
|
||||
}),
|
||||
),
|
||||
ChatStreamEvent::Error(msg) => ("error", serde_json::json!({ "message": msg })),
|
||||
// Apollo's frontend SSE consumer (and its free-chat backend, which
|
||||
// is the de-facto convention) listens for `error_message`. Emitting
|
||||
// `error` here meant any failure on the photo-chat path (e.g.
|
||||
// "no insight found for path") was silently dropped, leaving an
|
||||
// empty assistant bubble with no clue why the turn died.
|
||||
ChatStreamEvent::Error(msg) => ("error_message", serde_json::json!({ "message": msg })),
|
||||
};
|
||||
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
|
||||
format!("event: {}\ndata: {}\n\n", event_name, data)
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::Mutex as TokioMutex;
|
||||
|
||||
use crate::ai::insight_generator::InsightGenerator;
|
||||
use crate::ai::llm_client::{ChatMessage, LlmClient, LlmStreamEvent};
|
||||
use crate::ai::llm_client::{ChatMessage, LlmClient, LlmStreamEvent, Tool};
|
||||
use crate::ai::ollama::OllamaClient;
|
||||
use crate::ai::openrouter::OpenRouterClient;
|
||||
use crate::database::InsightDao;
|
||||
@@ -53,6 +53,12 @@ pub struct ChatTurnRequest {
|
||||
/// When true, write a new insight row (regenerating title) instead of
|
||||
/// updating training_messages on the existing row.
|
||||
pub amend: bool,
|
||||
/// When true, force the bootstrap path even if an insight already exists:
|
||||
/// flip prior rows to `is_current=false` and create a new insight row
|
||||
/// from `system_prompt` + `user_message` + photo. Takes precedence over
|
||||
/// `amend`. With no existing insight, collapses to a normal bootstrap
|
||||
/// (the row-flip step is a no-op).
|
||||
pub regenerate: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -701,24 +707,44 @@ impl InsightChatService {
|
||||
};
|
||||
let _guard = entry_lock.lock().await;
|
||||
|
||||
let insight = {
|
||||
// Look up existing insight (None when missing). The branch below
|
||||
// decides bootstrap vs. continuation: `regenerate=true` forces
|
||||
// bootstrap regardless of the row's presence; missing rows always
|
||||
// bootstrap.
|
||||
let existing_insight = {
|
||||
let cx = opentelemetry::Context::new();
|
||||
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
|
||||
dao.get_insight(&cx, &normalized)
|
||||
.map_err(|e| anyhow!("failed to load insight: {:?}", e))?
|
||||
.ok_or_else(|| anyhow!("no insight found for path"))?
|
||||
};
|
||||
let raw_history = insight
|
||||
.training_messages
|
||||
.as_ref()
|
||||
.ok_or_else(|| {
|
||||
anyhow!("insight has no chat history; regenerate this insight in agentic mode")
|
||||
})?
|
||||
.clone();
|
||||
let mut messages: Vec<ChatMessage> = serde_json::from_str(&raw_history)
|
||||
|
||||
if req.regenerate || existing_insight.is_none() {
|
||||
return self.run_bootstrap_streaming(req, normalized, tx).await;
|
||||
}
|
||||
let insight = existing_insight.expect("just checked Some above");
|
||||
self.run_continuation_streaming(req, normalized, insight, tx)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Continuation path: photo has an existing agentic insight with
|
||||
/// `training_messages` populated. Replay the transcript, append a new
|
||||
/// turn, run the agentic loop, persist (UPDATE for append; INSERT new
|
||||
/// row for amend).
|
||||
async fn run_continuation_streaming(
|
||||
&self,
|
||||
req: ChatTurnRequest,
|
||||
normalized: String,
|
||||
insight: crate::database::models::PhotoInsight,
|
||||
tx: tokio::sync::mpsc::Sender<ChatStreamEvent>,
|
||||
) -> Result<()> {
|
||||
let raw_history = insight.training_messages.as_ref().ok_or_else(|| {
|
||||
anyhow!("insight has no chat history; regenerate this insight in agentic mode")
|
||||
})?;
|
||||
let mut messages: Vec<ChatMessage> = serde_json::from_str(raw_history)
|
||||
.map_err(|e| anyhow!("failed to deserialize chat history: {}", e))?;
|
||||
|
||||
// Backend selection — same rules as non-streaming chat_turn.
|
||||
// Backend selection — defer to stored insight's backend unless the
|
||||
// request supplies an override.
|
||||
let stored_backend = insight.backend.clone();
|
||||
let effective_backend = req
|
||||
.backend
|
||||
@@ -752,59 +778,14 @@ impl InsightChatService {
|
||||
.or_else(|| Some(stored_model.clone()))
|
||||
.filter(|m| !m.is_empty());
|
||||
|
||||
let mut ollama_client = self.ollama.clone();
|
||||
let mut openrouter_client: Option<OpenRouterClient> = None;
|
||||
|
||||
if is_hybrid {
|
||||
let arc = self.openrouter.as_ref().ok_or_else(|| {
|
||||
anyhow!("hybrid backend unavailable: OPENROUTER_API_KEY not configured")
|
||||
})?;
|
||||
let mut c: OpenRouterClient = (**arc).clone();
|
||||
if let Some(ref m) = custom_model {
|
||||
c.primary_model = m.clone();
|
||||
}
|
||||
if req.temperature.is_some()
|
||||
|| req.top_p.is_some()
|
||||
|| req.top_k.is_some()
|
||||
|| req.min_p.is_some()
|
||||
{
|
||||
c.set_sampling_params(req.temperature, req.top_p, req.top_k, req.min_p);
|
||||
}
|
||||
if let Some(ctx) = req.num_ctx {
|
||||
c.set_num_ctx(Some(ctx));
|
||||
}
|
||||
openrouter_client = Some(c);
|
||||
} else {
|
||||
if let Some(ref m) = custom_model
|
||||
&& m != &self.ollama.primary_model
|
||||
{
|
||||
ollama_client = OllamaClient::new(
|
||||
self.ollama.primary_url.clone(),
|
||||
self.ollama.fallback_url.clone(),
|
||||
m.clone(),
|
||||
Some(m.clone()),
|
||||
);
|
||||
}
|
||||
if req.temperature.is_some()
|
||||
|| req.top_p.is_some()
|
||||
|| req.top_k.is_some()
|
||||
|| req.min_p.is_some()
|
||||
{
|
||||
ollama_client.set_sampling_params(req.temperature, req.top_p, req.top_k, req.min_p);
|
||||
}
|
||||
if let Some(ctx) = req.num_ctx {
|
||||
ollama_client.set_num_ctx(Some(ctx));
|
||||
}
|
||||
}
|
||||
|
||||
let chat_backend: &dyn LlmClient = if let Some(ref c) = openrouter_client {
|
||||
c
|
||||
} else {
|
||||
&ollama_client
|
||||
};
|
||||
let (chat_backend_holder, ollama_client) =
|
||||
self.build_chat_clients(is_hybrid, custom_model.as_deref(), &req)?;
|
||||
let chat_backend: &dyn LlmClient = chat_backend_holder.as_ref();
|
||||
let model_used = chat_backend.primary_model().to_string();
|
||||
|
||||
// Tool set.
|
||||
// Tool set — local mode + first user turn carries an image →
|
||||
// offer describe_photo. Hybrid: visual description was inlined
|
||||
// when the insight was bootstrapped, no describe tool needed.
|
||||
let local_first_user_has_image = messages
|
||||
.iter()
|
||||
.find(|m| m.role == "user")
|
||||
@@ -812,10 +793,6 @@ impl InsightChatService {
|
||||
.map(|imgs| !imgs.is_empty())
|
||||
.unwrap_or(false);
|
||||
let offer_describe_tool = !is_hybrid && local_first_user_has_image;
|
||||
// current_gate_opts(has_vision) sets gate_opts.has_vision = has_vision
|
||||
// and probes the per-table presence flags. Pass `offer_describe_tool`
|
||||
// directly — the `!is_hybrid && local_first_user_has_image` decision
|
||||
// is the chat-path's vision predicate.
|
||||
let gate_opts = self.generator.current_gate_opts(offer_describe_tool);
|
||||
let tools = InsightGenerator::build_tool_definitions(gate_opts);
|
||||
|
||||
@@ -839,9 +816,340 @@ impl InsightChatService {
|
||||
// Mirror chat_turn: per-turn override goes on first, budget note next.
|
||||
let override_stash =
|
||||
apply_system_prompt_override(&mut messages, req.system_prompt.as_deref());
|
||||
|
||||
let original_system_content = annotate_system_with_budget(&mut messages, max_iterations);
|
||||
|
||||
let outcome = self
|
||||
.run_streaming_agentic_loop(
|
||||
chat_backend,
|
||||
&ollama_client,
|
||||
&mut messages,
|
||||
tools,
|
||||
&image_base64,
|
||||
&normalized,
|
||||
max_iterations,
|
||||
&tx,
|
||||
)
|
||||
.await?;
|
||||
let AgenticLoopOutcome {
|
||||
tool_calls_made,
|
||||
iterations_used,
|
||||
last_prompt_eval_count,
|
||||
last_eval_count,
|
||||
final_content,
|
||||
} = outcome;
|
||||
|
||||
// Drop the per-turn iteration-budget note before persisting so it
|
||||
// doesn't snowball on subsequent turns.
|
||||
restore_system_content(&mut messages, original_system_content);
|
||||
|
||||
// Append mode: undo the per-turn system-prompt override.
|
||||
// Amend mode: keep it — it becomes the new row's system message.
|
||||
if !req.amend {
|
||||
restore_system_prompt_override(&mut messages, override_stash);
|
||||
}
|
||||
|
||||
let json = serde_json::to_string(&messages)
|
||||
.map_err(|e| anyhow!("failed to serialize chat history: {}", e))?;
|
||||
|
||||
let mut amended_insight_id: Option<i32> = None;
|
||||
if req.amend {
|
||||
let title = self.generate_title(chat_backend, &final_content).await?;
|
||||
|
||||
// Amended rows intentionally do not inherit the parent's
|
||||
// `fewshot_source_ids`. The parent's few-shot influence is still
|
||||
// present in this row's content; if you want strict lineage
|
||||
// tracking for training-set filtering, fetch the parent here and
|
||||
// copy its value forward.
|
||||
let new_row = InsertPhotoInsight {
|
||||
library_id: req.library_id,
|
||||
file_path: normalized.clone(),
|
||||
title,
|
||||
summary: final_content.clone(),
|
||||
generated_at: Utc::now().timestamp(),
|
||||
model_version: model_used.clone(),
|
||||
is_current: true,
|
||||
training_messages: Some(json),
|
||||
backend: effective_backend.clone(),
|
||||
fewshot_source_ids: None,
|
||||
content_hash: None,
|
||||
};
|
||||
let cx = opentelemetry::Context::new();
|
||||
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
|
||||
let stored = dao
|
||||
.store_insight(&cx, new_row)
|
||||
.map_err(|e| anyhow!("failed to store amended insight: {:?}", e))?;
|
||||
amended_insight_id = Some(stored.id);
|
||||
} else {
|
||||
let cx = opentelemetry::Context::new();
|
||||
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
|
||||
dao.update_training_messages(&cx, req.library_id, &normalized, &json)
|
||||
.map_err(|e| anyhow!("failed to persist chat history: {:?}", e))?;
|
||||
}
|
||||
|
||||
let _ = tx
|
||||
.send(ChatStreamEvent::Done {
|
||||
tool_calls_made,
|
||||
iterations_used,
|
||||
truncated,
|
||||
prompt_tokens: last_prompt_eval_count,
|
||||
eval_tokens: last_eval_count,
|
||||
num_ctx: req.num_ctx,
|
||||
amended_insight_id,
|
||||
backend_used: effective_backend,
|
||||
model_used,
|
||||
})
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Bootstrap path: no insight row yet (or `regenerate=true`). Build a
|
||||
/// fresh transcript from `req.system_prompt` + `req.user_message` +
|
||||
/// the photo, run the agentic loop, generate a title, and INSERT a
|
||||
/// new insight row. `store_insight` flips any prior rows for the same
|
||||
/// `(library_id, file_path)` to `is_current=false` — that's how
|
||||
/// `regenerate` shadows the previous insight.
|
||||
async fn run_bootstrap_streaming(
|
||||
&self,
|
||||
req: ChatTurnRequest,
|
||||
normalized: String,
|
||||
tx: tokio::sync::mpsc::Sender<ChatStreamEvent>,
|
||||
) -> Result<()> {
|
||||
let effective_backend = req
|
||||
.backend
|
||||
.as_deref()
|
||||
.map(|s| s.trim().to_lowercase())
|
||||
.filter(|s| !s.is_empty())
|
||||
.unwrap_or_else(|| "local".to_string());
|
||||
if !matches!(effective_backend.as_str(), "local" | "hybrid") {
|
||||
bail!(
|
||||
"unknown backend '{}'; expected 'local' or 'hybrid'",
|
||||
effective_backend
|
||||
);
|
||||
}
|
||||
let is_hybrid = effective_backend == "hybrid";
|
||||
|
||||
let max_iterations = req
|
||||
.max_iterations
|
||||
.unwrap_or(DEFAULT_MAX_ITERATIONS)
|
||||
.clamp(1, env_max_iterations());
|
||||
|
||||
let custom_model = req.model.clone().filter(|m| !m.is_empty());
|
||||
let (chat_backend_holder, ollama_client) =
|
||||
self.build_chat_clients(is_hybrid, custom_model.as_deref(), &req)?;
|
||||
let chat_backend: &dyn LlmClient = chat_backend_holder.as_ref();
|
||||
let model_used = chat_backend.primary_model().to_string();
|
||||
|
||||
// Load image bytes once. RAW preview fallback is handled inside
|
||||
// load_image_as_base64. Errors degrade silently — a chat that
|
||||
// discusses metadata-only is still useful.
|
||||
let image_base64: Option<String> = self.generator.load_image_as_base64(&normalized).ok();
|
||||
|
||||
// Hybrid backend: pre-describe the image via local Ollama vision
|
||||
// and inline the description into the user turn. OpenRouter chat
|
||||
// models don't see images directly. Mirrors the same pre-describe
|
||||
// pass that `generate_agentic_insight_for_photo` does for hybrid.
|
||||
let user_content = if is_hybrid {
|
||||
let visual = match image_base64.as_deref() {
|
||||
Some(b64) => match self.ollama.describe_image(b64).await {
|
||||
Ok(desc) => format!(
|
||||
"Visual description (from local vision model):\n{}\n\n",
|
||||
desc
|
||||
),
|
||||
Err(e) => {
|
||||
log::warn!("hybrid bootstrap: local describe_image failed: {}", e);
|
||||
String::new()
|
||||
}
|
||||
},
|
||||
None => String::new(),
|
||||
};
|
||||
format!("{}{}", visual, req.user_message)
|
||||
} else {
|
||||
req.user_message.clone()
|
||||
};
|
||||
|
||||
// Tool gates. Local + image present → expose describe_photo so
|
||||
// the chat model can re-look at the photo on demand. Hybrid:
|
||||
// already inlined, no tool needed.
|
||||
let offer_describe_tool = !is_hybrid && image_base64.is_some();
|
||||
let gate_opts = self.generator.current_gate_opts(offer_describe_tool);
|
||||
let tools = InsightGenerator::build_tool_definitions(gate_opts);
|
||||
|
||||
// Build initial messages: persona system + user-turn-with-image.
|
||||
let system_content = req
|
||||
.system_prompt
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.filter(|s| !s.is_empty())
|
||||
.unwrap_or(BOOTSTRAP_DEFAULT_SYSTEM_PROMPT)
|
||||
.to_string();
|
||||
let system_msg = ChatMessage::system(system_content);
|
||||
let mut user_msg = ChatMessage::user(user_content);
|
||||
if !is_hybrid && let Some(ref img) = image_base64 {
|
||||
user_msg.images = Some(vec![img.clone()]);
|
||||
}
|
||||
let mut messages = vec![system_msg, user_msg];
|
||||
|
||||
let outcome = self
|
||||
.run_streaming_agentic_loop(
|
||||
chat_backend,
|
||||
&ollama_client,
|
||||
&mut messages,
|
||||
tools,
|
||||
&image_base64,
|
||||
&normalized,
|
||||
max_iterations,
|
||||
&tx,
|
||||
)
|
||||
.await?;
|
||||
let AgenticLoopOutcome {
|
||||
tool_calls_made,
|
||||
iterations_used,
|
||||
last_prompt_eval_count,
|
||||
last_eval_count,
|
||||
final_content,
|
||||
} = outcome;
|
||||
|
||||
let title = self.generate_title(chat_backend, &final_content).await?;
|
||||
|
||||
let json = serde_json::to_string(&messages)
|
||||
.map_err(|e| anyhow!("failed to serialize chat history: {}", e))?;
|
||||
let new_row = InsertPhotoInsight {
|
||||
library_id: req.library_id,
|
||||
file_path: normalized.clone(),
|
||||
title,
|
||||
summary: final_content,
|
||||
generated_at: Utc::now().timestamp(),
|
||||
model_version: model_used.clone(),
|
||||
is_current: true,
|
||||
training_messages: Some(json),
|
||||
backend: effective_backend.clone(),
|
||||
fewshot_source_ids: None,
|
||||
content_hash: None,
|
||||
};
|
||||
let stored = {
|
||||
let cx = opentelemetry::Context::new();
|
||||
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
|
||||
dao.store_insight(&cx, new_row)
|
||||
.map_err(|e| anyhow!("failed to store bootstrap insight: {:?}", e))?
|
||||
};
|
||||
|
||||
// amended_insight_id semantics broaden on bootstrap/regenerate:
|
||||
// populated whenever this turn produced a new insight row, so
|
||||
// clients don't need a separate field to learn the new id.
|
||||
let _ = tx
|
||||
.send(ChatStreamEvent::Done {
|
||||
tool_calls_made,
|
||||
iterations_used,
|
||||
truncated: false,
|
||||
prompt_tokens: last_prompt_eval_count,
|
||||
eval_tokens: last_eval_count,
|
||||
num_ctx: req.num_ctx,
|
||||
amended_insight_id: Some(stored.id),
|
||||
backend_used: effective_backend,
|
||||
model_used,
|
||||
})
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set up chat clients (Ollama + optional OpenRouter) shared by
|
||||
/// bootstrap and continuation. Returns the chat-side backend client
|
||||
/// (boxed because hybrid and local return different concrete types)
|
||||
/// and the Ollama client used for describe-image / local tool calls.
|
||||
fn build_chat_clients(
|
||||
&self,
|
||||
is_hybrid: bool,
|
||||
custom_model: Option<&str>,
|
||||
req: &ChatTurnRequest,
|
||||
) -> Result<(Box<dyn LlmClient>, OllamaClient)> {
|
||||
let mut ollama_client = self.ollama.clone();
|
||||
|
||||
if is_hybrid {
|
||||
let arc = self.openrouter.as_ref().ok_or_else(|| {
|
||||
anyhow!("hybrid backend unavailable: OPENROUTER_API_KEY not configured")
|
||||
})?;
|
||||
let mut c: OpenRouterClient = (**arc).clone();
|
||||
if let Some(m) = custom_model {
|
||||
c.primary_model = m.to_string();
|
||||
}
|
||||
if req.temperature.is_some()
|
||||
|| req.top_p.is_some()
|
||||
|| req.top_k.is_some()
|
||||
|| req.min_p.is_some()
|
||||
{
|
||||
c.set_sampling_params(req.temperature, req.top_p, req.top_k, req.min_p);
|
||||
}
|
||||
if let Some(ctx) = req.num_ctx {
|
||||
c.set_num_ctx(Some(ctx));
|
||||
}
|
||||
return Ok((Box::new(c), ollama_client));
|
||||
}
|
||||
|
||||
if let Some(m) = custom_model
|
||||
&& m != self.ollama.primary_model
|
||||
{
|
||||
ollama_client = OllamaClient::new(
|
||||
self.ollama.primary_url.clone(),
|
||||
self.ollama.fallback_url.clone(),
|
||||
m.to_string(),
|
||||
Some(m.to_string()),
|
||||
);
|
||||
}
|
||||
if req.temperature.is_some()
|
||||
|| req.top_p.is_some()
|
||||
|| req.top_k.is_some()
|
||||
|| req.min_p.is_some()
|
||||
{
|
||||
ollama_client.set_sampling_params(req.temperature, req.top_p, req.top_k, req.min_p);
|
||||
}
|
||||
if let Some(ctx) = req.num_ctx {
|
||||
ollama_client.set_num_ctx(Some(ctx));
|
||||
}
|
||||
Ok((Box::new(ollama_client.clone()), ollama_client))
|
||||
}
|
||||
|
||||
/// Generate a short title via the same chat backend so voice stays
|
||||
/// consistent with the body. Mirrors generate_agentic_insight_for_photo's
|
||||
/// titling pass.
|
||||
async fn generate_title(
|
||||
&self,
|
||||
chat_backend: &dyn LlmClient,
|
||||
final_content: &str,
|
||||
) -> Result<String> {
|
||||
let title_prompt = format!(
|
||||
"Create a short title (maximum 8 words) for the following journal entry:\n\n{}\n\n\
|
||||
Capture the key moment or theme. Return ONLY the title, nothing else.",
|
||||
final_content
|
||||
);
|
||||
let title_raw = chat_backend
|
||||
.generate(
|
||||
&title_prompt,
|
||||
Some(
|
||||
"You are my long term memory assistant. Use only the information provided. Do not invent details.",
|
||||
),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
Ok(title_raw.trim().trim_matches('"').to_string())
|
||||
}
|
||||
|
||||
/// Drive the agentic loop with streaming SSE events. Shared between
|
||||
/// bootstrap and continuation. Mutates `messages` in place (response
|
||||
/// turns + tool results are appended) and returns counters + the
|
||||
/// final assistant content.
|
||||
async fn run_streaming_agentic_loop(
|
||||
&self,
|
||||
chat_backend: &dyn LlmClient,
|
||||
ollama_client: &OllamaClient,
|
||||
messages: &mut Vec<ChatMessage>,
|
||||
tools: Vec<Tool>,
|
||||
image_base64: &Option<String>,
|
||||
normalized: &str,
|
||||
max_iterations: usize,
|
||||
tx: &tokio::sync::mpsc::Sender<ChatStreamEvent>,
|
||||
) -> Result<AgenticLoopOutcome> {
|
||||
let mut tool_calls_made = 0usize;
|
||||
let mut iterations_used = 0usize;
|
||||
let mut last_prompt_eval_count: Option<i32> = None;
|
||||
@@ -883,7 +1191,8 @@ impl InsightChatService {
|
||||
let mut response =
|
||||
final_message.ok_or_else(|| anyhow!("stream ended without a Done event"))?;
|
||||
|
||||
// Normalize non-object tool arguments (same as non-streaming path).
|
||||
// Normalize non-object tool arguments (some models occasionally
|
||||
// return null/string/bool which Ollama rejects on the next turn).
|
||||
if let Some(ref mut tcs) = response.tool_calls {
|
||||
for tc in tcs.iter_mut() {
|
||||
if !tc.function.arguments.is_object() {
|
||||
@@ -897,7 +1206,7 @@ impl InsightChatService {
|
||||
if let Some(ref tool_calls) = response.tool_calls
|
||||
&& !tool_calls.is_empty()
|
||||
{
|
||||
for (i, tool_call) in tool_calls.iter().enumerate() {
|
||||
for tool_call in tool_calls {
|
||||
tool_calls_made += 1;
|
||||
let call_index = tool_calls_made - 1;
|
||||
let _ = tx
|
||||
@@ -913,9 +1222,9 @@ impl InsightChatService {
|
||||
.execute_tool(
|
||||
&tool_call.function.name,
|
||||
&tool_call.function.arguments,
|
||||
&ollama_client,
|
||||
&image_base64,
|
||||
&normalized,
|
||||
ollama_client,
|
||||
image_base64,
|
||||
normalized,
|
||||
&cx,
|
||||
)
|
||||
.await;
|
||||
@@ -929,7 +1238,6 @@ impl InsightChatService {
|
||||
})
|
||||
.await;
|
||||
messages.push(ChatMessage::tool_result(result));
|
||||
let _ = i; // reserved for per-call ordering if needed
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -938,6 +1246,8 @@ impl InsightChatService {
|
||||
break;
|
||||
}
|
||||
|
||||
// No-tools fallback: loop exhausted iterations without producing a
|
||||
// tool-free reply. Ask once more, with no tools attached.
|
||||
if final_content.is_empty() {
|
||||
messages.push(ChatMessage::user(
|
||||
"Please write your final answer now without calling any more tools.",
|
||||
@@ -970,87 +1280,33 @@ impl InsightChatService {
|
||||
messages.push(final_response);
|
||||
}
|
||||
|
||||
// Drop the per-turn iteration-budget note from the system message
|
||||
// before we persist so it doesn't snowball on each subsequent turn.
|
||||
restore_system_content(&mut messages, original_system_content);
|
||||
|
||||
// Append mode: undo the per-turn system-prompt override (mirrors
|
||||
// chat_turn). Amend mode: keep the override — it becomes the new
|
||||
// insight row's system message.
|
||||
if !req.amend {
|
||||
restore_system_prompt_override(&mut messages, override_stash);
|
||||
}
|
||||
|
||||
// Persist.
|
||||
let json = serde_json::to_string(&messages)
|
||||
.map_err(|e| anyhow!("failed to serialize chat history: {}", e))?;
|
||||
|
||||
let mut amended_insight_id: Option<i32> = None;
|
||||
if req.amend {
|
||||
let title_prompt = format!(
|
||||
"Create a short title (maximum 8 words) for the following journal entry:\n\n{}\n\n\
|
||||
Capture the key moment or theme. Return ONLY the title, nothing else.",
|
||||
final_content
|
||||
);
|
||||
let title_raw = chat_backend
|
||||
.generate(
|
||||
&title_prompt,
|
||||
Some(
|
||||
"You are my long term memory assistant. Use only the information provided. Do not invent details.",
|
||||
),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let title = title_raw.trim().trim_matches('"').to_string();
|
||||
|
||||
// Amended rows intentionally do not inherit the parent's
|
||||
// `fewshot_source_ids`. The parent's few-shot influence is still
|
||||
// present in this row's content; if you want strict lineage
|
||||
// tracking for training-set filtering, fetch the parent here and
|
||||
// copy its value forward.
|
||||
let new_row = InsertPhotoInsight {
|
||||
library_id: req.library_id,
|
||||
file_path: normalized.clone(),
|
||||
title,
|
||||
summary: final_content.clone(),
|
||||
generated_at: Utc::now().timestamp(),
|
||||
model_version: model_used.clone(),
|
||||
is_current: true,
|
||||
training_messages: Some(json),
|
||||
backend: effective_backend.clone(),
|
||||
fewshot_source_ids: None,
|
||||
content_hash: None,
|
||||
};
|
||||
let cx = opentelemetry::Context::new();
|
||||
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
|
||||
let stored = dao
|
||||
.store_insight(&cx, new_row)
|
||||
.map_err(|e| anyhow!("failed to store amended insight: {:?}", e))?;
|
||||
amended_insight_id = Some(stored.id);
|
||||
} else {
|
||||
let cx = opentelemetry::Context::new();
|
||||
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
|
||||
dao.update_training_messages(&cx, req.library_id, &normalized, &json)
|
||||
.map_err(|e| anyhow!("failed to persist chat history: {:?}", e))?;
|
||||
}
|
||||
|
||||
let _ = tx
|
||||
.send(ChatStreamEvent::Done {
|
||||
tool_calls_made,
|
||||
iterations_used,
|
||||
truncated,
|
||||
prompt_eval_count: last_prompt_eval_count,
|
||||
eval_count: last_eval_count,
|
||||
amended_insight_id,
|
||||
backend_used: effective_backend,
|
||||
model_used,
|
||||
})
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
Ok(AgenticLoopOutcome {
|
||||
tool_calls_made,
|
||||
iterations_used,
|
||||
last_prompt_eval_count,
|
||||
last_eval_count,
|
||||
final_content,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Default system prompt for bootstrap when the client didn't supply one
|
||||
/// (or supplied an empty string). Apollo's frontend always sends a persona
|
||||
/// prompt today, so this is a fallback for clients that don't.
|
||||
const BOOTSTRAP_DEFAULT_SYSTEM_PROMPT: &str = "You are a helpful AI assistant analyzing the user's photo. \
|
||||
Use the available tools to gather context and answer their questions \
|
||||
in a conversational tone.";
|
||||
|
||||
/// Outcome of one streaming agentic loop pass. Shared between bootstrap
|
||||
/// and continuation.
|
||||
struct AgenticLoopOutcome {
|
||||
tool_calls_made: usize,
|
||||
iterations_used: usize,
|
||||
last_prompt_eval_count: Option<i32>,
|
||||
last_eval_count: Option<i32>,
|
||||
final_content: String,
|
||||
}
|
||||
|
||||
/// Events emitted by `chat_turn_stream`. One stream per turn; ends after
|
||||
/// `Done` or `Error`.
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -1083,8 +1339,23 @@ pub enum ChatStreamEvent {
|
||||
tool_calls_made: usize,
|
||||
iterations_used: usize,
|
||||
truncated: bool,
|
||||
prompt_eval_count: Option<i32>,
|
||||
eval_count: Option<i32>,
|
||||
/// Renamed from `prompt_eval_count` to match the wire-name Apollo's
|
||||
/// frontend (and the mobile client) consume on the `done` SSE
|
||||
/// payload.
|
||||
prompt_tokens: Option<i32>,
|
||||
/// Renamed from `eval_count`. Same rationale.
|
||||
eval_tokens: Option<i32>,
|
||||
/// The configured context-window ceiling that ran this turn — echoes
|
||||
/// the request's `num_ctx` (or the server default when none was
|
||||
/// supplied). Lets clients render `prompt_tokens / num_ctx` without
|
||||
/// remembering what they asked for.
|
||||
num_ctx: Option<i32>,
|
||||
/// Populated when this turn produced a NEW insight row — bootstrap
|
||||
/// (no prior insight), regenerate (old row flipped to is_current=
|
||||
/// false), or amend. Null on append. Clients should target this id
|
||||
/// for any subsequent operation that previously needed a known
|
||||
/// insight (e.g., a follow-up `/insights/chat` against the just-
|
||||
/// created row).
|
||||
amended_insight_id: Option<i32>,
|
||||
backend_used: String,
|
||||
model_used: String,
|
||||
|
||||
Reference in New Issue
Block a user