Merge pull request 'feature/streaming-insights' (#85) from feature/streaming-insights into master

Reviewed-on: #85
This commit was merged in pull request #85.
This commit is contained in:
2026-05-09 20:57:16 +00:00
3 changed files with 745 additions and 160 deletions

View File

@@ -641,11 +641,18 @@ pub struct ChatTurnHttpRequest {
#[serde(default)]
pub max_iterations: Option<usize>,
/// Per-turn system-prompt override. Ephemeral in append mode,
/// persisted in amend mode. See ChatTurnRequest for semantics.
/// persisted in amend / regenerate mode. See ChatTurnRequest for
/// semantics. Also seeds the bootstrap path when no insight exists.
#[serde(default)]
pub system_prompt: Option<String>,
#[serde(default)]
pub amend: bool,
/// When true, force the bootstrap path even if an insight already
/// exists: flip the existing row(s) to `is_current=false` and create
/// a new insight row from this turn. Takes precedence over `amend`.
/// Collapses to a normal bootstrap when no insight exists.
#[serde(default)]
pub regenerate: bool,
}
#[derive(Debug, Serialize)]
@@ -701,6 +708,7 @@ pub async fn chat_turn_handler(
max_iterations: request.max_iterations,
system_prompt: request.system_prompt.clone(),
amend: request.amend,
regenerate: request.regenerate,
};
match app_state.insight_chat.chat_turn(chat_req).await {
@@ -916,6 +924,7 @@ pub async fn chat_stream_handler(
max_iterations: request.max_iterations,
system_prompt: request.system_prompt.clone(),
amend: request.amend,
regenerate: request.regenerate,
};
let service = app_state.insight_chat.clone();
@@ -967,8 +976,9 @@ fn render_sse_frame(ev: &ChatStreamEvent) -> String {
tool_calls_made,
iterations_used,
truncated,
prompt_eval_count,
eval_count,
prompt_tokens,
eval_tokens,
num_ctx,
amended_insight_id,
backend_used,
model_used,
@@ -978,14 +988,20 @@ fn render_sse_frame(ev: &ChatStreamEvent) -> String {
"tool_calls_made": tool_calls_made,
"iterations_used": iterations_used,
"truncated": truncated,
"prompt_eval_count": prompt_eval_count,
"eval_count": eval_count,
"prompt_tokens": prompt_tokens,
"eval_tokens": eval_tokens,
"num_ctx": num_ctx,
"amended_insight_id": amended_insight_id,
"backend": backend_used,
"model": model_used,
}),
),
ChatStreamEvent::Error(msg) => ("error", serde_json::json!({ "message": msg })),
// Apollo's frontend SSE consumer (and its free-chat backend, which
// is the de-facto convention) listens for `error_message`. Emitting
// `error` here meant any failure on the photo-chat path (e.g.
// "no insight found for path") was silently dropped, leaving an
// empty assistant bubble with no clue why the turn died.
ChatStreamEvent::Error(msg) => ("error_message", serde_json::json!({ "message": msg })),
};
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
format!("event: {}\ndata: {}\n\n", event_name, data)

View File

@@ -7,7 +7,7 @@ use std::sync::{Arc, Mutex};
use tokio::sync::Mutex as TokioMutex;
use crate::ai::insight_generator::InsightGenerator;
use crate::ai::llm_client::{ChatMessage, LlmClient, LlmStreamEvent};
use crate::ai::llm_client::{ChatMessage, LlmClient, LlmStreamEvent, Tool};
use crate::ai::ollama::OllamaClient;
use crate::ai::openrouter::OpenRouterClient;
use crate::database::InsightDao;
@@ -53,6 +53,12 @@ pub struct ChatTurnRequest {
/// When true, write a new insight row (regenerating title) instead of
/// updating training_messages on the existing row.
pub amend: bool,
/// When true, force the bootstrap path even if an insight already exists:
/// flip prior rows to `is_current=false` and create a new insight row
/// from `system_prompt` + `user_message` + photo. Takes precedence over
/// `amend`. With no existing insight, collapses to a normal bootstrap
/// (the row-flip step is a no-op).
pub regenerate: bool,
}
#[derive(Debug)]
@@ -701,24 +707,44 @@ impl InsightChatService {
};
let _guard = entry_lock.lock().await;
let insight = {
// Look up existing insight (None when missing). The branch below
// decides bootstrap vs. continuation: `regenerate=true` forces
// bootstrap regardless of the row's presence; missing rows always
// bootstrap.
let existing_insight = {
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
dao.get_insight(&cx, &normalized)
.map_err(|e| anyhow!("failed to load insight: {:?}", e))?
.ok_or_else(|| anyhow!("no insight found for path"))?
};
let raw_history = insight
.training_messages
.as_ref()
.ok_or_else(|| {
anyhow!("insight has no chat history; regenerate this insight in agentic mode")
})?
.clone();
let mut messages: Vec<ChatMessage> = serde_json::from_str(&raw_history)
if req.regenerate || existing_insight.is_none() {
return self.run_bootstrap_streaming(req, normalized, tx).await;
}
let insight = existing_insight.expect("just checked Some above");
self.run_continuation_streaming(req, normalized, insight, tx)
.await
}
/// Continuation path: photo has an existing agentic insight with
/// `training_messages` populated. Replay the transcript, append a new
/// turn, run the agentic loop, persist (UPDATE for append; INSERT new
/// row for amend).
async fn run_continuation_streaming(
&self,
req: ChatTurnRequest,
normalized: String,
insight: crate::database::models::PhotoInsight,
tx: tokio::sync::mpsc::Sender<ChatStreamEvent>,
) -> Result<()> {
let raw_history = insight.training_messages.as_ref().ok_or_else(|| {
anyhow!("insight has no chat history; regenerate this insight in agentic mode")
})?;
let mut messages: Vec<ChatMessage> = serde_json::from_str(raw_history)
.map_err(|e| anyhow!("failed to deserialize chat history: {}", e))?;
// Backend selection — same rules as non-streaming chat_turn.
// Backend selection — defer to stored insight's backend unless the
// request supplies an override.
let stored_backend = insight.backend.clone();
let effective_backend = req
.backend
@@ -752,59 +778,14 @@ impl InsightChatService {
.or_else(|| Some(stored_model.clone()))
.filter(|m| !m.is_empty());
let mut ollama_client = self.ollama.clone();
let mut openrouter_client: Option<OpenRouterClient> = None;
if is_hybrid {
let arc = self.openrouter.as_ref().ok_or_else(|| {
anyhow!("hybrid backend unavailable: OPENROUTER_API_KEY not configured")
})?;
let mut c: OpenRouterClient = (**arc).clone();
if let Some(ref m) = custom_model {
c.primary_model = m.clone();
}
if req.temperature.is_some()
|| req.top_p.is_some()
|| req.top_k.is_some()
|| req.min_p.is_some()
{
c.set_sampling_params(req.temperature, req.top_p, req.top_k, req.min_p);
}
if let Some(ctx) = req.num_ctx {
c.set_num_ctx(Some(ctx));
}
openrouter_client = Some(c);
} else {
if let Some(ref m) = custom_model
&& m != &self.ollama.primary_model
{
ollama_client = OllamaClient::new(
self.ollama.primary_url.clone(),
self.ollama.fallback_url.clone(),
m.clone(),
Some(m.clone()),
);
}
if req.temperature.is_some()
|| req.top_p.is_some()
|| req.top_k.is_some()
|| req.min_p.is_some()
{
ollama_client.set_sampling_params(req.temperature, req.top_p, req.top_k, req.min_p);
}
if let Some(ctx) = req.num_ctx {
ollama_client.set_num_ctx(Some(ctx));
}
}
let chat_backend: &dyn LlmClient = if let Some(ref c) = openrouter_client {
c
} else {
&ollama_client
};
let (chat_backend_holder, ollama_client) =
self.build_chat_clients(is_hybrid, custom_model.as_deref(), &req)?;
let chat_backend: &dyn LlmClient = chat_backend_holder.as_ref();
let model_used = chat_backend.primary_model().to_string();
// Tool set.
// Tool set — local mode + first user turn carries an image →
// offer describe_photo. Hybrid: visual description was inlined
// when the insight was bootstrapped, no describe tool needed.
let local_first_user_has_image = messages
.iter()
.find(|m| m.role == "user")
@@ -812,10 +793,6 @@ impl InsightChatService {
.map(|imgs| !imgs.is_empty())
.unwrap_or(false);
let offer_describe_tool = !is_hybrid && local_first_user_has_image;
// current_gate_opts(has_vision) sets gate_opts.has_vision = has_vision
// and probes the per-table presence flags. Pass `offer_describe_tool`
// directly — the `!is_hybrid && local_first_user_has_image` decision
// is the chat-path's vision predicate.
let gate_opts = self.generator.current_gate_opts(offer_describe_tool);
let tools = InsightGenerator::build_tool_definitions(gate_opts);
@@ -839,9 +816,350 @@ impl InsightChatService {
// Mirror chat_turn: per-turn override goes on first, budget note next.
let override_stash =
apply_system_prompt_override(&mut messages, req.system_prompt.as_deref());
let original_system_content = annotate_system_with_budget(&mut messages, max_iterations);
let outcome = self
.run_streaming_agentic_loop(
chat_backend,
&ollama_client,
&mut messages,
tools,
&image_base64,
&normalized,
max_iterations,
&tx,
)
.await?;
let AgenticLoopOutcome {
tool_calls_made,
iterations_used,
last_prompt_eval_count,
last_eval_count,
final_content,
} = outcome;
// Drop the per-turn iteration-budget note before persisting so it
// doesn't snowball on subsequent turns.
restore_system_content(&mut messages, original_system_content);
// Append mode: undo the per-turn system-prompt override.
// Amend mode: keep it — it becomes the new row's system message.
if !req.amend {
restore_system_prompt_override(&mut messages, override_stash);
}
let json = serde_json::to_string(&messages)
.map_err(|e| anyhow!("failed to serialize chat history: {}", e))?;
let mut amended_insight_id: Option<i32> = None;
if req.amend {
let title = self.generate_title(chat_backend, &final_content).await?;
// Amended rows intentionally do not inherit the parent's
// `fewshot_source_ids`. The parent's few-shot influence is still
// present in this row's content; if you want strict lineage
// tracking for training-set filtering, fetch the parent here and
// copy its value forward.
let new_row = InsertPhotoInsight {
library_id: req.library_id,
file_path: normalized.clone(),
title,
summary: final_content.clone(),
generated_at: Utc::now().timestamp(),
model_version: model_used.clone(),
is_current: true,
training_messages: Some(json),
backend: effective_backend.clone(),
fewshot_source_ids: None,
content_hash: None,
};
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
let stored = dao
.store_insight(&cx, new_row)
.map_err(|e| anyhow!("failed to store amended insight: {:?}", e))?;
amended_insight_id = Some(stored.id);
} else {
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
dao.update_training_messages(&cx, req.library_id, &normalized, &json)
.map_err(|e| anyhow!("failed to persist chat history: {:?}", e))?;
}
let _ = tx
.send(ChatStreamEvent::Done {
tool_calls_made,
iterations_used,
truncated,
prompt_tokens: last_prompt_eval_count,
eval_tokens: last_eval_count,
num_ctx: req.num_ctx,
amended_insight_id,
backend_used: effective_backend,
model_used,
})
.await;
Ok(())
}
/// Bootstrap path: no insight row yet (or `regenerate=true`). Build a
/// fresh transcript from `req.system_prompt` + `req.user_message` +
/// the photo, run the agentic loop, generate a title, and INSERT a
/// new insight row. `store_insight` flips any prior rows for the same
/// `(library_id, file_path)` to `is_current=false` — that's how
/// `regenerate` shadows the previous insight.
async fn run_bootstrap_streaming(
&self,
req: ChatTurnRequest,
normalized: String,
tx: tokio::sync::mpsc::Sender<ChatStreamEvent>,
) -> Result<()> {
let effective_backend = resolve_bootstrap_backend(req.backend.as_deref())?;
let is_hybrid = effective_backend == "hybrid";
let max_iterations = req
.max_iterations
.unwrap_or(DEFAULT_MAX_ITERATIONS)
.clamp(1, env_max_iterations());
let custom_model = req.model.clone().filter(|m| !m.is_empty());
let (chat_backend_holder, ollama_client) =
self.build_chat_clients(is_hybrid, custom_model.as_deref(), &req)?;
let chat_backend: &dyn LlmClient = chat_backend_holder.as_ref();
let model_used = chat_backend.primary_model().to_string();
// Load image bytes once. RAW preview fallback is handled inside
// load_image_as_base64. Errors degrade silently — a chat that
// discusses metadata-only is still useful.
let image_base64: Option<String> = self.generator.load_image_as_base64(&normalized).ok();
// EXIF lookup once — date_taken and GPS go into the photo
// context block in the system message. Without these the model
// hallucinates dates / GPS-keyed tool args (`get_sms_messages`
// would otherwise default to today's date and miss every
// historical photo).
let exif = self.generator.fetch_exif(&normalized);
let date_taken_str = resolve_date_taken_for_context(&exif, &normalized);
let gps = exif
.as_ref()
.and_then(|e| match (e.gps_latitude, e.gps_longitude) {
(Some(lat), Some(lon)) => Some((lat as f64, lon as f64)),
_ => None,
});
// Hybrid backend: pre-describe the image via local Ollama vision
// so OpenRouter chat models (which can't see images directly) get
// the visual description as text. Mirrors the same pre-describe
// pass that `generate_agentic_insight_for_photo` does for hybrid.
let visual_block = if is_hybrid {
match image_base64.as_deref() {
Some(b64) => match self.ollama.describe_image(b64).await {
Ok(desc) => {
format!("Visual description (from local vision model):\n{}\n", desc)
}
Err(e) => {
log::warn!("hybrid bootstrap: local describe_image failed: {}", e);
String::new()
}
},
None => String::new(),
}
} else {
String::new()
};
// Tool gates. Local + image present → expose describe_photo so
// the chat model can re-look at the photo on demand. Hybrid:
// already inlined, no tool needed.
let offer_describe_tool = !is_hybrid && image_base64.is_some();
let gate_opts = self.generator.current_gate_opts(offer_describe_tool);
let tools = InsightGenerator::build_tool_definitions(gate_opts);
// System message = persona + photo context block. Photo context
// is in the system message — not the user turn — so the user's
// bubble in the rendered transcript shows only what they typed.
// Several agentic tools (recall_facts_for_photo, get_file_tags,
// get_faces_in_photo, etc.) take a `file_path` arg the model
// can't know without being told. `Date taken:` and `GPS:` give
// get_sms_messages / reverse_geocode / get_personal_place_at
// the args they need. In hybrid mode the visual description
// belongs here for the same reason.
let persona = resolve_bootstrap_system_prompt(req.system_prompt.as_deref());
let system_content = build_bootstrap_system_message(
&persona,
&normalized,
date_taken_str.as_deref(),
gps,
&visual_block,
);
let system_msg = ChatMessage::system(system_content);
let mut user_msg = ChatMessage::user(req.user_message.clone());
if !is_hybrid && let Some(ref img) = image_base64 {
user_msg.images = Some(vec![img.clone()]);
}
let mut messages = vec![system_msg, user_msg];
let outcome = self
.run_streaming_agentic_loop(
chat_backend,
&ollama_client,
&mut messages,
tools,
&image_base64,
&normalized,
max_iterations,
&tx,
)
.await?;
let AgenticLoopOutcome {
tool_calls_made,
iterations_used,
last_prompt_eval_count,
last_eval_count,
final_content,
} = outcome;
let title = self.generate_title(chat_backend, &final_content).await?;
let json = serde_json::to_string(&messages)
.map_err(|e| anyhow!("failed to serialize chat history: {}", e))?;
let new_row = InsertPhotoInsight {
library_id: req.library_id,
file_path: normalized.clone(),
title,
summary: final_content,
generated_at: Utc::now().timestamp(),
model_version: model_used.clone(),
is_current: true,
training_messages: Some(json),
backend: effective_backend.clone(),
fewshot_source_ids: None,
content_hash: None,
};
let stored = {
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
dao.store_insight(&cx, new_row)
.map_err(|e| anyhow!("failed to store bootstrap insight: {:?}", e))?
};
// amended_insight_id semantics broaden on bootstrap/regenerate:
// populated whenever this turn produced a new insight row, so
// clients don't need a separate field to learn the new id.
let _ = tx
.send(ChatStreamEvent::Done {
tool_calls_made,
iterations_used,
truncated: false,
prompt_tokens: last_prompt_eval_count,
eval_tokens: last_eval_count,
num_ctx: req.num_ctx,
amended_insight_id: Some(stored.id),
backend_used: effective_backend,
model_used,
})
.await;
Ok(())
}
/// Set up chat clients (Ollama + optional OpenRouter) shared by
/// bootstrap and continuation. Returns the chat-side backend client
/// (boxed because hybrid and local return different concrete types)
/// and the Ollama client used for describe-image / local tool calls.
fn build_chat_clients(
&self,
is_hybrid: bool,
custom_model: Option<&str>,
req: &ChatTurnRequest,
) -> Result<(Box<dyn LlmClient>, OllamaClient)> {
let mut ollama_client = self.ollama.clone();
if is_hybrid {
let arc = self.openrouter.as_ref().ok_or_else(|| {
anyhow!("hybrid backend unavailable: OPENROUTER_API_KEY not configured")
})?;
let mut c: OpenRouterClient = (**arc).clone();
if let Some(m) = custom_model {
c.primary_model = m.to_string();
}
if req.temperature.is_some()
|| req.top_p.is_some()
|| req.top_k.is_some()
|| req.min_p.is_some()
{
c.set_sampling_params(req.temperature, req.top_p, req.top_k, req.min_p);
}
if let Some(ctx) = req.num_ctx {
c.set_num_ctx(Some(ctx));
}
return Ok((Box::new(c), ollama_client));
}
if let Some(m) = custom_model
&& m != self.ollama.primary_model
{
ollama_client = OllamaClient::new(
self.ollama.primary_url.clone(),
self.ollama.fallback_url.clone(),
m.to_string(),
Some(m.to_string()),
);
}
if req.temperature.is_some()
|| req.top_p.is_some()
|| req.top_k.is_some()
|| req.min_p.is_some()
{
ollama_client.set_sampling_params(req.temperature, req.top_p, req.top_k, req.min_p);
}
if let Some(ctx) = req.num_ctx {
ollama_client.set_num_ctx(Some(ctx));
}
Ok((Box::new(ollama_client.clone()), ollama_client))
}
/// Generate a short title via the same chat backend so voice stays
/// consistent with the body. Mirrors generate_agentic_insight_for_photo's
/// titling pass.
async fn generate_title(
&self,
chat_backend: &dyn LlmClient,
final_content: &str,
) -> Result<String> {
let title_prompt = format!(
"Create a short title (maximum 8 words) for the following journal entry:\n\n{}\n\n\
Capture the key moment or theme. Return ONLY the title, nothing else.",
final_content
);
let title_raw = chat_backend
.generate(
&title_prompt,
Some(
"You are my long term memory assistant. Use only the information provided. Do not invent details.",
),
None,
)
.await?;
Ok(title_raw.trim().trim_matches('"').to_string())
}
/// Drive the agentic loop with streaming SSE events. Shared between
/// bootstrap and continuation. Mutates `messages` in place (response
/// turns + tool results are appended) and returns counters + the
/// final assistant content.
async fn run_streaming_agentic_loop(
&self,
chat_backend: &dyn LlmClient,
ollama_client: &OllamaClient,
messages: &mut Vec<ChatMessage>,
tools: Vec<Tool>,
image_base64: &Option<String>,
normalized: &str,
max_iterations: usize,
tx: &tokio::sync::mpsc::Sender<ChatStreamEvent>,
) -> Result<AgenticLoopOutcome> {
let mut tool_calls_made = 0usize;
let mut iterations_used = 0usize;
let mut last_prompt_eval_count: Option<i32> = None;
@@ -883,7 +1201,8 @@ impl InsightChatService {
let mut response =
final_message.ok_or_else(|| anyhow!("stream ended without a Done event"))?;
// Normalize non-object tool arguments (same as non-streaming path).
// Normalize non-object tool arguments (some models occasionally
// return null/string/bool which Ollama rejects on the next turn).
if let Some(ref mut tcs) = response.tool_calls {
for tc in tcs.iter_mut() {
if !tc.function.arguments.is_object() {
@@ -897,7 +1216,7 @@ impl InsightChatService {
if let Some(ref tool_calls) = response.tool_calls
&& !tool_calls.is_empty()
{
for (i, tool_call) in tool_calls.iter().enumerate() {
for tool_call in tool_calls {
tool_calls_made += 1;
let call_index = tool_calls_made - 1;
let _ = tx
@@ -913,9 +1232,9 @@ impl InsightChatService {
.execute_tool(
&tool_call.function.name,
&tool_call.function.arguments,
&ollama_client,
&image_base64,
&normalized,
ollama_client,
image_base64,
normalized,
&cx,
)
.await;
@@ -929,7 +1248,6 @@ impl InsightChatService {
})
.await;
messages.push(ChatMessage::tool_result(result));
let _ = i; // reserved for per-call ordering if needed
}
continue;
}
@@ -938,7 +1256,16 @@ impl InsightChatService {
break;
}
// No-tools fallback: loop exhausted iterations without producing a
// tool-free reply. Ask once more, with no tools attached.
if final_content.is_empty() {
// Index of the synthetic "please write your final answer"
// user message — we strip it from history after the model
// responds, so it never appears in the rendered transcript
// and load_history's user-turn handler doesn't reset
// pending_tools at this position (wiping the prior tool
// calls from the final assistant render).
let synthetic_idx = messages.len();
messages.push(ChatMessage::user(
"Please write your final answer now without calling any more tools.",
));
@@ -968,89 +1295,120 @@ impl InsightChatService {
final_message.ok_or_else(|| anyhow!("final stream ended without a Done event"))?;
final_content = final_response.content.clone();
messages.push(final_response);
// Drop the synthetic prompt — internal scaffolding only. The
// model's final_response (now at the end) was generated with
// it in context and reads coherently without it on replay.
messages.remove(synthetic_idx);
}
// Drop the per-turn iteration-budget note from the system message
// before we persist so it doesn't snowball on each subsequent turn.
restore_system_content(&mut messages, original_system_content);
// Append mode: undo the per-turn system-prompt override (mirrors
// chat_turn). Amend mode: keep the override — it becomes the new
// insight row's system message.
if !req.amend {
restore_system_prompt_override(&mut messages, override_stash);
}
// Persist.
let json = serde_json::to_string(&messages)
.map_err(|e| anyhow!("failed to serialize chat history: {}", e))?;
let mut amended_insight_id: Option<i32> = None;
if req.amend {
let title_prompt = format!(
"Create a short title (maximum 8 words) for the following journal entry:\n\n{}\n\n\
Capture the key moment or theme. Return ONLY the title, nothing else.",
final_content
);
let title_raw = chat_backend
.generate(
&title_prompt,
Some(
"You are my long term memory assistant. Use only the information provided. Do not invent details.",
),
None,
)
.await?;
let title = title_raw.trim().trim_matches('"').to_string();
// Amended rows intentionally do not inherit the parent's
// `fewshot_source_ids`. The parent's few-shot influence is still
// present in this row's content; if you want strict lineage
// tracking for training-set filtering, fetch the parent here and
// copy its value forward.
let new_row = InsertPhotoInsight {
library_id: req.library_id,
file_path: normalized.clone(),
title,
summary: final_content.clone(),
generated_at: Utc::now().timestamp(),
model_version: model_used.clone(),
is_current: true,
training_messages: Some(json),
backend: effective_backend.clone(),
fewshot_source_ids: None,
content_hash: None,
};
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
let stored = dao
.store_insight(&cx, new_row)
.map_err(|e| anyhow!("failed to store amended insight: {:?}", e))?;
amended_insight_id = Some(stored.id);
} else {
let cx = opentelemetry::Context::new();
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
dao.update_training_messages(&cx, req.library_id, &normalized, &json)
.map_err(|e| anyhow!("failed to persist chat history: {:?}", e))?;
}
let _ = tx
.send(ChatStreamEvent::Done {
tool_calls_made,
iterations_used,
truncated,
prompt_eval_count: last_prompt_eval_count,
eval_count: last_eval_count,
amended_insight_id,
backend_used: effective_backend,
model_used,
})
.await;
Ok(())
Ok(AgenticLoopOutcome {
tool_calls_made,
iterations_used,
last_prompt_eval_count,
last_eval_count,
final_content,
})
}
}
/// Default system prompt for bootstrap when the client didn't supply one
/// (or supplied an empty string). Apollo's frontend always sends a persona
/// prompt today, so this is a fallback for clients that don't.
const BOOTSTRAP_DEFAULT_SYSTEM_PROMPT: &str = "You are a helpful AI assistant analyzing the user's photo. \
Use the available tools to gather context and answer their questions \
in a conversational tone.";
/// Pick the system prompt for bootstrap. Trimmed-non-empty supplied wins;
/// otherwise fall back to [`BOOTSTRAP_DEFAULT_SYSTEM_PROMPT`]. Returns an
/// owned `String` because the bootstrap caller persists it on the new
/// insight row.
fn resolve_bootstrap_system_prompt(supplied: Option<&str>) -> String {
supplied
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_string)
.unwrap_or_else(|| BOOTSTRAP_DEFAULT_SYSTEM_PROMPT.to_string())
}
/// Compose the bootstrap system message: the persona on top, followed
/// by a photo-context block carrying the file path, date taken (when
/// known), GPS (when present), and — in hybrid mode — the local-vision
/// visual description. Lives in the system message — not the user
/// turn — so the rendered transcript shows only what the user typed.
fn build_bootstrap_system_message(
persona: &str,
normalized_path: &str,
date_taken: Option<&str>,
gps: Option<(f64, f64)>,
visual_block: &str,
) -> String {
let mut out = persona.trim_end().to_string();
out.push_str("\n\n--- PHOTO CONTEXT ---\n");
out.push_str(&format!("Photo file path: {}\n", normalized_path));
out.push_str(&format!(
"Date taken: {}\n",
date_taken.unwrap_or("unknown")
));
if let Some((lat, lon)) = gps {
// Four decimal places ≈ 11 m of precision — plenty for any
// place-lookup tool, and keeps the prompt short.
out.push_str(&format!("GPS: {:.4}, {:.4}\n", lat, lon));
}
if !visual_block.is_empty() {
// visual_block already ends with a newline; no extra separator
// needed.
out.push_str(visual_block);
}
out
}
/// Resolve a human-readable `YYYY-MM-DD` date string for the photo
/// context block. Waterfall: EXIF `date_taken` → filename pattern →
/// `None`. The fs-time fallback that `generate_agentic_insight_for_photo`
/// uses is intentionally NOT applied here — for chat we'd rather show
/// "unknown" than a misleading inode mtime as the photo's date.
fn resolve_date_taken_for_context(
exif: &Option<crate::database::models::ImageExif>,
file_path: &str,
) -> Option<String> {
let from_exif = exif
.as_ref()
.and_then(|e| e.date_taken)
.and_then(|ts| chrono::DateTime::from_timestamp(ts, 0))
.map(|dt| dt.format("%Y-%m-%d").to_string());
if from_exif.is_some() {
return from_exif;
}
crate::memories::extract_date_from_filename(file_path)
.map(|dt| dt.format("%Y-%m-%d").to_string())
}
/// Pick the backend label for bootstrap. Bootstrap has no stored insight
/// to defer to (that's continuation's behaviour), so the default is
/// `"local"`. Returns an error if the supplied label is non-empty but
/// not one of the recognised values — same surface as continuation's
/// validation.
fn resolve_bootstrap_backend(supplied: Option<&str>) -> Result<String> {
let lower = supplied
.map(|s| s.trim().to_lowercase())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| "local".to_string());
if !matches!(lower.as_str(), "local" | "hybrid") {
bail!("unknown backend '{}'; expected 'local' or 'hybrid'", lower);
}
Ok(lower)
}
/// Outcome of one streaming agentic loop pass. Shared between bootstrap
/// and continuation.
struct AgenticLoopOutcome {
tool_calls_made: usize,
iterations_used: usize,
last_prompt_eval_count: Option<i32>,
last_eval_count: Option<i32>,
final_content: String,
}
/// Events emitted by `chat_turn_stream`. One stream per turn; ends after
/// `Done` or `Error`.
#[derive(Debug, Clone)]
@@ -1083,8 +1441,23 @@ pub enum ChatStreamEvent {
tool_calls_made: usize,
iterations_used: usize,
truncated: bool,
prompt_eval_count: Option<i32>,
eval_count: Option<i32>,
/// Renamed from `prompt_eval_count` to match the wire-name Apollo's
/// frontend (and the mobile client) consume on the `done` SSE
/// payload.
prompt_tokens: Option<i32>,
/// Renamed from `eval_count`. Same rationale.
eval_tokens: Option<i32>,
/// The configured context-window ceiling that ran this turn — echoes
/// the request's `num_ctx` (or the server default when none was
/// supplied). Lets clients render `prompt_tokens / num_ctx` without
/// remembering what they asked for.
num_ctx: Option<i32>,
/// Populated when this turn produced a NEW insight row — bootstrap
/// (no prior insight), regenerate (old row flipped to is_current=
/// false), or amend. Null on append. Clients should target this id
/// for any subsequent operation that previously needed a known
/// insight (e.g., a follow-up `/insights/chat` against the just-
/// created row).
amended_insight_id: Option<i32>,
backend_used: String,
model_used: String,
@@ -1569,4 +1942,189 @@ mod tests {
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].role, "user");
}
// ── Bootstrap prompt / backend resolution ─────────────────────────
// Resolution helpers run on every bootstrap turn — they pick the
// persisted system prompt and the chosen backend label. Bugs here
// would silently swap the persona or miscategorise a backend.
#[test]
fn bootstrap_system_prompt_falls_back_to_default_for_none() {
let out = resolve_bootstrap_system_prompt(None);
assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT);
}
#[test]
fn bootstrap_system_prompt_falls_back_to_default_for_empty_string() {
// Apollo currently sends `''` when no persona is selected.
let out = resolve_bootstrap_system_prompt(Some(""));
assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT);
}
#[test]
fn bootstrap_system_prompt_falls_back_to_default_for_whitespace() {
let out = resolve_bootstrap_system_prompt(Some(" \n\t "));
assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT);
}
#[test]
fn bootstrap_system_prompt_uses_supplied_when_non_empty() {
let out = resolve_bootstrap_system_prompt(Some("you are a journal"));
assert_eq!(out, "you are a journal");
}
#[test]
fn bootstrap_system_prompt_does_not_strip_inner_whitespace() {
// Trim only happens at the edges — interior newlines and spacing
// (which Apollo's persona uses for tool listings) must survive.
let prompt = "line one\nline two\n bullet";
let out = resolve_bootstrap_system_prompt(Some(prompt));
assert_eq!(out, prompt);
}
#[test]
fn bootstrap_backend_defaults_to_local_when_none() {
let out = resolve_bootstrap_backend(None).unwrap();
assert_eq!(out, "local");
}
#[test]
fn bootstrap_backend_defaults_to_local_when_empty() {
let out = resolve_bootstrap_backend(Some("")).unwrap();
assert_eq!(out, "local");
}
#[test]
fn bootstrap_backend_accepts_local_and_hybrid_case_insensitively() {
assert_eq!(resolve_bootstrap_backend(Some("LOCAL")).unwrap(), "local");
assert_eq!(resolve_bootstrap_backend(Some("Hybrid")).unwrap(), "hybrid");
assert_eq!(
resolve_bootstrap_backend(Some(" local ")).unwrap(),
"local"
);
}
#[test]
fn bootstrap_backend_rejects_unknown_label() {
let err = resolve_bootstrap_backend(Some("openrouter")).unwrap_err();
let msg = format!("{}", err);
assert!(msg.contains("unknown backend"));
assert!(msg.contains("openrouter"));
}
#[test]
fn bootstrap_system_message_includes_path_and_persona() {
let out = build_bootstrap_system_message("you are helpful", "pics/IMG.jpg", None, None, "");
assert!(out.starts_with("you are helpful"));
assert!(out.contains("--- PHOTO CONTEXT ---"));
assert!(out.contains("Photo file path: pics/IMG.jpg"));
// No date supplied → "unknown" so the model doesn't guess.
assert!(out.contains("Date taken: unknown"));
assert!(!out.contains("GPS:"));
assert!(!out.contains("Visual description"));
}
#[test]
fn bootstrap_system_message_includes_date_when_supplied() {
let out =
build_bootstrap_system_message("voice", "pics/IMG.jpg", Some("2014-11-08"), None, "");
assert!(out.contains("Date taken: 2014-11-08"));
assert!(!out.contains("Date taken: unknown"));
}
#[test]
fn bootstrap_system_message_includes_gps_when_present() {
let out = build_bootstrap_system_message(
"voice",
"p.jpg",
Some("2020-01-01"),
Some((42.36123, -71.05789)),
"",
);
// Four decimals — enough for place lookup, short enough to
// not bloat the system prompt.
assert!(out.contains("GPS: 42.3612, -71.0579"));
}
#[test]
fn bootstrap_system_message_omits_gps_when_none() {
let out = build_bootstrap_system_message("voice", "p.jpg", Some("2020-01-01"), None, "");
assert!(!out.contains("GPS:"));
}
#[test]
fn bootstrap_system_message_includes_visual_block_when_supplied() {
let visual = "Visual description (from local vision model):\nA dog in a park.\n";
let out =
build_bootstrap_system_message("voice", "p.jpg", Some("2020-01-01"), None, visual);
assert!(out.contains("Photo file path: p.jpg"));
assert!(out.contains("A dog in a park"));
// Path before date before visual.
let path_pos = out.find("Photo file path:").unwrap();
let date_pos = out.find("Date taken:").unwrap();
let visual_pos = out.find("A dog in a park").unwrap();
assert!(path_pos < date_pos);
assert!(date_pos < visual_pos);
}
#[test]
fn bootstrap_system_message_trims_persona_trailing_whitespace() {
let out = build_bootstrap_system_message("voice \n\n\n", "p.jpg", None, None, "");
assert!(out.contains("voice\n\n--- PHOTO CONTEXT ---"));
}
#[test]
fn date_taken_for_context_prefers_exif_over_filename() {
// EXIF wins when both are present (matches the canonical
// date_resolver waterfall — EXIF is more reliable than
// import-named filenames).
let exif = Some(crate::database::models::ImageExif {
id: 0,
library_id: 1,
file_path: "Screenshot_2014-06-01.png".to_string(),
camera_make: None,
camera_model: None,
lens_model: None,
width: None,
height: None,
orientation: None,
gps_latitude: None,
gps_longitude: None,
gps_altitude: None,
focal_length: None,
aperture: None,
shutter_speed: None,
iso: None,
// 2021-08-15 12:00:00 UTC
date_taken: Some(1_629_028_800),
created_time: 0,
last_modified: 0,
content_hash: None,
size_bytes: None,
phash_64: None,
dhash_64: None,
duplicate_of_hash: None,
duplicate_decided_at: None,
date_taken_source: None,
original_date_taken: None,
original_date_taken_source: None,
});
let out = resolve_date_taken_for_context(&exif, "Screenshot_2014-06-01.png");
assert_eq!(out.as_deref(), Some("2021-08-15"));
}
#[test]
fn date_taken_for_context_falls_back_to_filename_when_no_exif() {
// memories::extract_date_from_filename requires date+time in
// the filename — date-only patterns aren't matched. Use the
// canonical screenshot pattern for the regression case.
let out = resolve_date_taken_for_context(&None, "Screenshot_2014-06-01-20-44-50.png");
assert_eq!(out.as_deref(), Some("2014-06-01"));
}
#[test]
fn date_taken_for_context_returns_none_when_neither_source() {
let out = resolve_date_taken_for_context(&None, "DSC_5171.JPG");
assert!(out.is_none());
}
}

View File

@@ -229,6 +229,17 @@ impl InsightGenerator {
None
}
/// Look up the EXIF row for a photo. Returns `None` when no row
/// exists yet (file watcher hasn't reached it) or the DAO call
/// fails. Used by callers — including the chat-bootstrap path —
/// that need a few specific fields (date_taken, GPS) without
/// duplicating DAO plumbing.
pub(crate) fn fetch_exif(&self, file_path: &str) -> Option<crate::database::models::ImageExif> {
let cx = opentelemetry::Context::current();
let mut dao = self.exif_dao.lock().expect("Unable to lock ExifDao");
dao.get_exif(&cx, file_path).ok().flatten()
}
/// Load image file, resize it, and encode as base64 for vision models
/// Resizes to max 1024px on longest edge to reduce context usage
pub(crate) fn load_image_as_base64(&self, file_path: &str) -> Result<String> {