Merge pull request 'Feature/insight history' (#104) from feature/insight-history into master

Reviewed-on: #104
This commit was merged in pull request #104.
This commit is contained in:
2026-06-10 19:01:14 +00:00
12 changed files with 837 additions and 183 deletions
+10 -2
View File
@@ -671,6 +671,11 @@ LLAMA_SWAP_TTS_REQUEST_TIMEOUT_SECONDS=600 # Per-request synth timeout (long
# Insight Chat Continuation # Insight Chat Continuation
AGENTIC_CHAT_MAX_ITERATIONS=6 # Cap on tool-calling iterations per chat turn (default 6) AGENTIC_CHAT_MAX_ITERATIONS=6 # Cap on tool-calling iterations per chat turn (default 6)
AGENTIC_CHAT_DEFAULT_NUM_CTX=32768 # Assumed context window for the history-truncation budget
# when a chat request omits num_ctx (default 32768). Size to
# the smallest context among the chat models actually served;
# too small silently guts replayed history every turn (and
# destroys llama.cpp KV-cache prefix reuse).
``` ```
**AI Insights Fallback Behavior:** **AI Insights Fallback Behavior:**
@@ -794,14 +799,17 @@ Per-`(library_id, file_path)` async mutex (`AppState.insight_chat.chat_locks`)
serialises concurrent turns on the same insight so the JSON blob doesn't race. serialises concurrent turns on the same insight so the JSON blob doesn't race.
Context management is a soft bound: if the serialized history exceeds Context management is a soft bound: if the serialized history exceeds
`num_ctx - 2048` tokens (cheap 4-byte/token heuristic), the oldest `num_ctx - 2048` tokens (cheap 4-byte/token heuristic; `num_ctx` defaults
assistant-tool_call + tool_result pairs are dropped until under budget. The to `AGENTIC_CHAT_DEFAULT_NUM_CTX`, 32768, when the request omits it), the
oldest assistant-tool_call + tool_result pairs are dropped until under budget. The
initial user message (with any images) and system prompt are always preserved. initial user message (with any images) and system prompt are always preserved.
The `truncated` event / flag is surfaced to the client when a drop occurred. The `truncated` event / flag is surfaced to the client when a drop occurred.
Configurable env: Configurable env:
- `AGENTIC_CHAT_MAX_ITERATIONS` — cap on tool-calling iterations per turn - `AGENTIC_CHAT_MAX_ITERATIONS` — cap on tool-calling iterations per turn
(default 6). Per-request `max_iterations` is clamped to this cap. (default 6). Per-request `max_iterations` is clamped to this cap.
- `AGENTIC_CHAT_DEFAULT_NUM_CTX` — assumed context window for the truncation
budget when the request omits `num_ctx` (default 32768).
**Apollo Places integration (optional):** **Apollo Places integration (optional):**
+92 -47
View File
@@ -8,7 +8,7 @@ use crate::ai::insight_chat::{ChatStreamEvent, ChatTurnRequest};
use crate::ai::ollama::ChatMessage; use crate::ai::ollama::ChatMessage;
use crate::ai::{ModelCapabilities, OllamaClient}; use crate::ai::{ModelCapabilities, OllamaClient};
use crate::data::Claims; use crate::data::Claims;
use crate::database::models::{InsightGenerationType, InsightJobStatus}; use crate::database::models::{InsightGenerationType, InsightJobStatus, PhotoInsight};
use crate::database::{ExifDao, InsightDao}; use crate::database::{ExifDao, InsightDao};
use crate::libraries; use crate::libraries;
use crate::otel::{extract_context_from_request, global_tracer}; use crate::otel::{extract_context_from_request, global_tracer};
@@ -273,6 +273,11 @@ pub async fn cancel_generation_handler(
pub struct RateInsightRequest { pub struct RateInsightRequest {
pub file_path: String, pub file_path: String,
pub approved: bool, pub approved: bool,
/// When set, rate this specific insight version by primary key
/// (used by the per-file history view to rate superseded versions).
/// When omitted, the current insight for `file_path` is rated.
#[serde(default)]
pub insight_id: Option<i32>,
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
@@ -333,6 +338,31 @@ pub struct PhotoInsightResponse {
pub persona_id: Option<String>, pub persona_id: Option<String>,
} }
impl From<PhotoInsight> for PhotoInsightResponse {
fn from(insight: PhotoInsight) -> Self {
PhotoInsightResponse {
id: insight.id,
file_path: insight.file_path,
title: insight.title,
summary: insight.summary,
generated_at: insight.generated_at,
model_version: insight.model_version,
prompt_eval_count: insight.prompt_eval_count,
eval_count: insight.eval_count,
approved: insight.approved,
has_training_messages: insight.training_messages.is_some(),
backend: insight.backend,
num_ctx: insight.num_ctx,
temperature: insight.temperature,
top_p: insight.top_p,
top_k: insight.top_k,
min_p: insight.min_p,
system_prompt: insight.system_prompt,
persona_id: insight.persona_id,
}
}
}
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
pub struct AvailableModelsResponse { pub struct AvailableModelsResponse {
pub primary: ServerModels, pub primary: ServerModels,
@@ -554,29 +584,7 @@ pub async fn get_insight_handler(
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao"); let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
match dao.get_insight_for_paths(&otel_context, &sibling_paths) { match dao.get_insight_for_paths(&otel_context, &sibling_paths) {
Ok(Some(insight)) => { Ok(Some(insight)) => HttpResponse::Ok().json(PhotoInsightResponse::from(insight)),
let response = PhotoInsightResponse {
id: insight.id,
file_path: insight.file_path,
title: insight.title,
summary: insight.summary,
generated_at: insight.generated_at,
model_version: insight.model_version,
prompt_eval_count: insight.prompt_eval_count,
eval_count: insight.eval_count,
approved: insight.approved,
has_training_messages: insight.training_messages.is_some(),
backend: insight.backend,
num_ctx: insight.num_ctx,
temperature: insight.temperature,
top_p: insight.top_p,
top_k: insight.top_k,
min_p: insight.min_p,
system_prompt: insight.system_prompt,
persona_id: insight.persona_id,
};
HttpResponse::Ok().json(response)
}
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({ Ok(None) => HttpResponse::NotFound().json(serde_json::json!({
"error": "Insight not found" "error": "Insight not found"
})), })),
@@ -631,26 +639,7 @@ pub async fn get_all_insights_handler(
Ok(insights) => { Ok(insights) => {
let responses: Vec<PhotoInsightResponse> = insights let responses: Vec<PhotoInsightResponse> = insights
.into_iter() .into_iter()
.map(|insight| PhotoInsightResponse { .map(PhotoInsightResponse::from)
id: insight.id,
file_path: insight.file_path,
title: insight.title,
summary: insight.summary,
generated_at: insight.generated_at,
model_version: insight.model_version,
prompt_eval_count: insight.prompt_eval_count,
eval_count: insight.eval_count,
approved: insight.approved,
has_training_messages: insight.training_messages.is_some(),
backend: insight.backend,
num_ctx: insight.num_ctx,
temperature: insight.temperature,
top_p: insight.top_p,
top_k: insight.top_k,
min_p: insight.min_p,
system_prompt: insight.system_prompt,
persona_id: insight.persona_id,
})
.collect(); .collect();
HttpResponse::Ok().json(responses) HttpResponse::Ok().json(responses)
@@ -664,6 +653,39 @@ pub async fn get_all_insights_handler(
} }
} }
/// GET /insights/history?path=/path/to/photo.jpg - Get all insight versions
/// for a single photo (current plus previously generated/superseded ones),
/// newest first. Backs the per-file insight history view.
#[get("/insights/history")]
pub async fn get_insight_history_handler(
_claims: Claims,
query: web::Query<GetPhotoInsightQuery>,
insight_dao: web::Data<std::sync::Mutex<Box<dyn InsightDao>>>,
) -> impl Responder {
let normalized_path = normalize_path(&query.path);
log::debug!("Fetching insight history for {}", normalized_path);
let otel_context = opentelemetry::Context::new();
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
match dao.get_insight_history(&otel_context, &normalized_path) {
Ok(insights) => {
let responses: Vec<PhotoInsightResponse> = insights
.into_iter()
.map(PhotoInsightResponse::from)
.collect();
HttpResponse::Ok().json(responses)
}
Err(e) => {
log::error!("Failed to fetch insight history ({}): {:?}", &query.path, e);
HttpResponse::InternalServerError().json(serde_json::json!({
"error": format!("Failed to fetch insight history: {:?}", e)
}))
}
}
}
/// POST /insights/generate/agentic - Generate insight using agentic tool-calling loop (async) /// POST /insights/generate/agentic - Generate insight using agentic tool-calling loop (async)
#[post("/insights/generate/agentic")] #[post("/insights/generate/agentic")]
pub async fn generate_agentic_insight_handler( pub async fn generate_agentic_insight_handler(
@@ -787,6 +809,21 @@ pub async fn generate_agentic_insight_handler(
.filter(|s| !s.trim().is_empty()) .filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "default".to_string()); .unwrap_or_else(|| "default".to_string());
// Server-side persona resolution: an explicit client `system_prompt`
// wins; otherwise the persona's stored prompt from the persona store;
// otherwise None and `build_system_content` applies its neutral
// default. Without the lookup, a request carrying only `persona_id`
// silently generated in the default voice.
let system_prompt = request
.system_prompt
.clone()
.filter(|s| !s.trim().is_empty())
.or_else(|| {
app_state
.insight_generator
.persona_system_prompt(user_id, &persona_id)
});
let max_iterations: usize = std::env::var("AGENTIC_MAX_ITERATIONS") let max_iterations: usize = std::env::var("AGENTIC_MAX_ITERATIONS")
.ok() .ok()
.and_then(|v| v.parse().ok()) .and_then(|v| v.parse().ok())
@@ -812,7 +849,7 @@ pub async fn generate_agentic_insight_handler(
generator_for_task.generate_agentic_insight_for_photo( generator_for_task.generate_agentic_insight_for_photo(
&path_for_task, &path_for_task,
request.model.clone(), request.model.clone(),
request.system_prompt.clone(), system_prompt,
request.num_ctx, request.num_ctx,
request.temperature, request.temperature,
request.top_p, request.top_p,
@@ -1012,15 +1049,23 @@ pub async fn rate_insight_handler(
) -> impl Responder { ) -> impl Responder {
let normalized_path = normalize_path(&request.file_path); let normalized_path = normalize_path(&request.file_path);
log::info!( log::info!(
"Rating insight for {}: approved={}", "Rating insight for {} (id={:?}): approved={}",
normalized_path, normalized_path,
request.insight_id,
request.approved request.approved
); );
let otel_context = opentelemetry::Context::new(); let otel_context = opentelemetry::Context::new();
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao"); let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
match dao.rate_insight(&otel_context, &normalized_path, request.approved) { // Rate a specific version by id when provided (history view), otherwise
// rate the current insight for the path.
let result = match request.insight_id {
Some(id) => dao.rate_insight_by_id(&otel_context, id, request.approved),
None => dao.rate_insight(&otel_context, &normalized_path, request.approved),
};
match result {
Ok(()) => HttpResponse::Ok().json(serde_json::json!({ Ok(()) => HttpResponse::Ok().json(serde_json::json!({
"success": true, "success": true,
"message": "Insight rated successfully" "message": "Insight rated successfully"
+169 -34
View File
@@ -19,7 +19,13 @@ use futures::stream::{BoxStream, StreamExt};
use uuid::Uuid; use uuid::Uuid;
const DEFAULT_MAX_ITERATIONS: usize = 6; const DEFAULT_MAX_ITERATIONS: usize = 6;
const DEFAULT_NUM_CTX: i32 = 8192; /// Assumed context window when the request doesn't specify `num_ctx`.
/// The llama-swap chat slots serve 20k-131k contexts and real conversations
/// rarely pass ~16k tokens, so 32k keeps the truncation pass from gutting
/// history that the server could comfortably hold (which also destroys the
/// server's KV-cache prefix reuse). Override per-deploy with
/// AGENTIC_CHAT_DEFAULT_NUM_CTX if the serving models change shape.
const DEFAULT_NUM_CTX: i32 = 32768;
/// Headroom reserved for the model's response, deducted from the context /// Headroom reserved for the model's response, deducted from the context
/// budget when deciding whether to truncate the replayed history. /// budget when deciding whether to truncate the replayed history.
const RESPONSE_HEADROOM_TOKENS: usize = 2048; const RESPONSE_HEADROOM_TOKENS: usize = 2048;
@@ -33,6 +39,12 @@ const BYTES_PER_TOKEN: usize = 4;
/// characters) must NOT be counted as text bytes — doing so dwarfs the entire /// characters) must NOT be counted as text bytes — doing so dwarfs the entire
/// text budget and forces spurious truncation on every turn. /// text budget and forces spurious truncation on every turn.
const IMAGE_TOKENS_EACH: usize = 1300; const IMAGE_TOKENS_EACH: usize = 1300;
/// User prompt injected when the agentic loop exhausts its iteration budget
/// without producing a tool-free reply. Internal scaffolding only — it is
/// stripped from the transcript before persistence (see
/// [`push_synthetic_final_prompt`] / [`remove_synthetic_final_prompt`]).
const SYNTHETIC_FINAL_ANSWER_PROMPT: &str =
"Please write your final answer now without calling any more tools.";
pub type ChatLockMap = Arc<TokioMutex<HashMap<(i32, String), Arc<TokioMutex<()>>>>>; pub type ChatLockMap = Arc<TokioMutex<HashMap<(i32, String), Arc<TokioMutex<()>>>>>;
@@ -361,7 +373,7 @@ impl InsightChatService {
// 6. Apply truncation budget. Drops oldest tool_call+tool pairs // 6. Apply truncation budget. Drops oldest tool_call+tool pairs
// (preserves system + first user including any images). // (preserves system + first user including any images).
let budget_tokens = (req.num_ctx.unwrap_or(DEFAULT_NUM_CTX) as usize) let budget_tokens = (req.num_ctx.unwrap_or_else(env_default_num_ctx) as usize)
.saturating_sub(RESPONSE_HEADROOM_TOKENS); .saturating_sub(RESPONSE_HEADROOM_TOKENS);
let budget_bytes = budget_tokens.saturating_mul(BYTES_PER_TOKEN); let budget_bytes = budget_tokens.saturating_mul(BYTES_PER_TOKEN);
let truncated = apply_context_budget(&mut messages, budget_bytes); let truncated = apply_context_budget(&mut messages, budget_bytes);
@@ -457,9 +469,7 @@ impl InsightChatService {
"Chat loop exhausted after {} iterations, requesting final answer", "Chat loop exhausted after {} iterations, requesting final answer",
iterations_used iterations_used
); );
messages.push(ChatMessage::user( let synthetic_idx = push_synthetic_final_prompt(&mut messages);
"Please write your final answer now without calling any more tools.",
));
let (final_response, prompt_tokens, eval_tokens) = backend let (final_response, prompt_tokens, eval_tokens) = backend
.chat() .chat()
.chat_with_tools(messages.clone(), vec![]) .chat_with_tools(messages.clone(), vec![])
@@ -468,8 +478,15 @@ impl InsightChatService {
last_eval_count = eval_tokens; last_eval_count = eval_tokens;
final_content = final_response.content.clone(); final_content = final_response.content.clone();
messages.push(final_response); messages.push(final_response);
// Drop the synthetic prompt before persistence — internal
// scaffolding only (mirrors both streaming variants).
remove_synthetic_final_prompt(&mut messages, synthetic_idx);
} }
// Strip any leaked <think> reasoning block from the content we
// return / persist as the reply (the raw transcript keeps it).
let final_content = crate::ai::llm_client::strip_think_blocks(&final_content);
loop_cx.span().set_status(Status::Ok); loop_cx.span().set_status(Status::Ok);
// Drop the per-turn iteration-budget note from the system message // Drop the per-turn iteration-budget note from the system message
@@ -853,7 +870,7 @@ impl InsightChatService {
None None
}; };
let budget_tokens = (req.num_ctx.unwrap_or(DEFAULT_NUM_CTX) as usize) let budget_tokens = (req.num_ctx.unwrap_or_else(env_default_num_ctx) as usize)
.saturating_sub(RESPONSE_HEADROOM_TOKENS); .saturating_sub(RESPONSE_HEADROOM_TOKENS);
let budget_bytes = budget_tokens.saturating_mul(BYTES_PER_TOKEN); let budget_bytes = budget_tokens.saturating_mul(BYTES_PER_TOKEN);
let truncated = apply_context_budget(&mut messages, budget_bytes); let truncated = apply_context_budget(&mut messages, budget_bytes);
@@ -1039,7 +1056,12 @@ impl InsightChatService {
); );
let tools = InsightGenerator::build_tool_definitions(gate_opts); let tools = InsightGenerator::build_tool_definitions(gate_opts);
let persona = resolve_bootstrap_system_prompt(req.system_prompt.as_deref()); // Server-side persona resolution: explicit client system_prompt wins;
// else the active persona's stored prompt; else the neutral default.
let persona_prompt = self
.generator
.persona_system_prompt(req.user_id, &active_persona);
let persona = resolve_bootstrap_system_prompt(req.system_prompt.as_deref(), persona_prompt);
let system_content = build_bootstrap_system_message( let system_content = build_bootstrap_system_message(
&persona, &persona,
&normalized, &normalized,
@@ -1263,10 +1285,7 @@ impl InsightChatService {
// No-tools fallback // No-tools fallback
if final_content.is_empty() { if final_content.is_empty() {
let synthetic_idx = messages.len(); let synthetic_idx = push_synthetic_final_prompt(messages);
messages.push(ChatMessage::user(
"Please write your final answer now without calling any more tools.",
));
let mut stream = backend let mut stream = backend
.chat() .chat()
.chat_with_tools_stream(messages.clone(), vec![]) .chat_with_tools_stream(messages.clone(), vec![])
@@ -1294,7 +1313,7 @@ impl InsightChatService {
final_message.ok_or_else(|| anyhow!("final stream ended without a Done event"))?; final_message.ok_or_else(|| anyhow!("final stream ended without a Done event"))?;
final_content = final_response.content.clone(); final_content = final_response.content.clone();
messages.push(final_response); messages.push(final_response);
messages.remove(synthetic_idx); remove_synthetic_final_prompt(messages, synthetic_idx);
} }
Ok(AgenticLoopOutcome { Ok(AgenticLoopOutcome {
@@ -1302,7 +1321,9 @@ impl InsightChatService {
iterations_used, iterations_used,
last_prompt_eval_count, last_prompt_eval_count,
last_eval_count, last_eval_count,
final_content, // Strip any leaked <think> reasoning block from the content the
// caller persists as title/summary (the raw transcript keeps it).
final_content: crate::ai::llm_client::strip_think_blocks(&final_content),
cancelled: false, cancelled: false,
}) })
} }
@@ -1431,7 +1452,7 @@ impl InsightChatService {
}; };
// Truncate before appending the new user turn. // Truncate before appending the new user turn.
let budget_tokens = (req.num_ctx.unwrap_or(DEFAULT_NUM_CTX) as usize) let budget_tokens = (req.num_ctx.unwrap_or_else(env_default_num_ctx) as usize)
.saturating_sub(RESPONSE_HEADROOM_TOKENS); .saturating_sub(RESPONSE_HEADROOM_TOKENS);
let budget_bytes = budget_tokens.saturating_mul(BYTES_PER_TOKEN); let budget_bytes = budget_tokens.saturating_mul(BYTES_PER_TOKEN);
let truncated = apply_context_budget(&mut messages, budget_bytes); let truncated = apply_context_budget(&mut messages, budget_bytes);
@@ -1648,7 +1669,12 @@ impl InsightChatService {
// get_sms_messages / reverse_geocode / get_personal_place_at // get_sms_messages / reverse_geocode / get_personal_place_at
// the args they need. In hybrid mode the visual description // the args they need. In hybrid mode the visual description
// belongs here for the same reason. // belongs here for the same reason.
let persona = resolve_bootstrap_system_prompt(req.system_prompt.as_deref()); // Server-side persona resolution: explicit client system_prompt wins;
// else the active persona's stored prompt; else the neutral default.
let persona_prompt = self
.generator
.persona_system_prompt(req.user_id, &active_persona);
let persona = resolve_bootstrap_system_prompt(req.system_prompt.as_deref(), persona_prompt);
let system_content = build_bootstrap_system_message( let system_content = build_bootstrap_system_message(
&persona, &persona,
&normalized, &normalized,
@@ -1866,10 +1892,7 @@ impl InsightChatService {
// and load_history's user-turn handler doesn't reset // and load_history's user-turn handler doesn't reset
// pending_tools at this position (wiping the prior tool // pending_tools at this position (wiping the prior tool
// calls from the final assistant render). // calls from the final assistant render).
let synthetic_idx = messages.len(); let synthetic_idx = push_synthetic_final_prompt(messages);
messages.push(ChatMessage::user(
"Please write your final answer now without calling any more tools.",
));
let mut stream = backend let mut stream = backend
.chat() .chat()
.chat_with_tools_stream(messages.clone(), vec![]) .chat_with_tools_stream(messages.clone(), vec![])
@@ -1900,7 +1923,7 @@ impl InsightChatService {
// Drop the synthetic prompt — internal scaffolding only. The // Drop the synthetic prompt — internal scaffolding only. The
// model's final_response (now at the end) was generated with // model's final_response (now at the end) was generated with
// it in context and reads coherently without it on replay. // it in context and reads coherently without it on replay.
messages.remove(synthetic_idx); remove_synthetic_final_prompt(messages, synthetic_idx);
} }
Ok(AgenticLoopOutcome { Ok(AgenticLoopOutcome {
@@ -1908,7 +1931,9 @@ impl InsightChatService {
iterations_used, iterations_used,
last_prompt_eval_count, last_prompt_eval_count,
last_eval_count, last_eval_count,
final_content, // Strip any leaked <think> reasoning block from the content the
// caller persists as title/summary (the raw transcript keeps it).
final_content: crate::ai::llm_client::strip_think_blocks(&final_content),
cancelled: false, cancelled: false,
}) })
} }
@@ -1921,15 +1946,21 @@ const BOOTSTRAP_DEFAULT_SYSTEM_PROMPT: &str = "You are a helpful AI assistant an
Use the available tools to gather context and answer their questions \ Use the available tools to gather context and answer their questions \
in a conversational tone."; in a conversational tone.";
/// Pick the system prompt for bootstrap. Trimmed-non-empty supplied wins; /// Pick the system prompt for bootstrap. Precedence: trimmed-non-empty
/// otherwise fall back to [`BOOTSTRAP_DEFAULT_SYSTEM_PROMPT`]. Returns an /// `supplied` (the client's explicit `system_prompt`) wins; else
/// owned `String` because the bootstrap caller persists it on the new /// `persona_prompt` (the active persona's stored prompt, resolved
/// insight row. /// server-side from the persona store); else
fn resolve_bootstrap_system_prompt(supplied: Option<&str>) -> String { /// [`BOOTSTRAP_DEFAULT_SYSTEM_PROMPT`]. Returns an owned `String` because
/// the bootstrap caller persists it on the new insight row.
fn resolve_bootstrap_system_prompt(
supplied: Option<&str>,
persona_prompt: Option<String>,
) -> String {
supplied supplied
.map(str::trim) .map(str::trim)
.filter(|s| !s.is_empty()) .filter(|s| !s.is_empty())
.map(str::to_string) .map(str::to_string)
.or_else(|| persona_prompt.filter(|s| !s.trim().is_empty()))
.unwrap_or_else(|| BOOTSTRAP_DEFAULT_SYSTEM_PROMPT.to_string()) .unwrap_or_else(|| BOOTSTRAP_DEFAULT_SYSTEM_PROMPT.to_string())
} }
@@ -2166,6 +2197,17 @@ fn env_max_iterations() -> usize {
.max(1) .max(1)
} }
/// Read AGENTIC_CHAT_DEFAULT_NUM_CTX once per call — the assumed context
/// window for the truncation budget when the request omits `num_ctx`. Same
/// no-static-global rationale as `env_max_iterations` above.
fn env_default_num_ctx() -> i32 {
std::env::var("AGENTIC_CHAT_DEFAULT_NUM_CTX")
.ok()
.and_then(|s| s.parse::<i32>().ok())
.unwrap_or(DEFAULT_NUM_CTX)
.max(RESPONSE_HEADROOM_TOKENS as i32 + 1024)
}
/// Append a per-turn iteration-budget reminder to the replayed system /// Append a per-turn iteration-budget reminder to the replayed system
/// message so the model knows how many tool-calling rounds this turn gets. /// message so the model knows how many tool-calling rounds this turn gets.
/// Returns the original `content` so the caller can restore it before /// Returns the original `content` so the caller can restore it before
@@ -2200,6 +2242,30 @@ fn restore_system_content(messages: &mut [ChatMessage], original: Option<String>
} }
} }
/// Append the synthetic "write your final answer" user prompt, returning the
/// index the caller must later hand to [`remove_synthetic_final_prompt`].
/// Used when the agentic loop exhausts its budget: the model gets one more
/// (tool-free) request, but the nudge itself must never persist — it would
/// render as a user bubble in the transcript and reset `load_history`'s
/// pending-tools tracking at that position.
fn push_synthetic_final_prompt(messages: &mut Vec<ChatMessage>) -> usize {
let idx = messages.len();
messages.push(ChatMessage::user(SYNTHETIC_FINAL_ANSWER_PROMPT));
idx
}
/// Remove the synthetic prompt inserted by [`push_synthetic_final_prompt`].
/// Defensive no-op when the message at `idx` isn't the synthetic prompt —
/// guards against index drift if the surrounding code is reordered.
fn remove_synthetic_final_prompt(messages: &mut Vec<ChatMessage>, idx: usize) {
if messages
.get(idx)
.is_some_and(|m| m.role == "user" && m.content == SYNTHETIC_FINAL_ANSWER_PROMPT)
{
messages.remove(idx);
}
}
/// Receipt produced by [`apply_system_prompt_override`] so the caller can /// Receipt produced by [`apply_system_prompt_override`] so the caller can
/// undo the override before persistence. Two variants because we either /// undo the override before persistence. Two variants because we either
/// replaced an existing system message (need its original content) or /// replaced an existing system message (need its original content) or
@@ -2467,13 +2533,16 @@ mod tests {
assistant_text("here is the answer"), assistant_text("here is the answer"),
]; ];
// Default budget: (8192 - 2048) * 4 bytes ≈ 24KB. The text easily fits; // Default budget: (32768 - 2048) * 4 bytes ≈ 120KB. The text easily
// only the (excluded) image bytes could blow it. // fits; only the (excluded) image bytes could blow it.
let budget_bytes = (DEFAULT_NUM_CTX as usize - RESPONSE_HEADROOM_TOKENS) * BYTES_PER_TOKEN; let budget_bytes = (DEFAULT_NUM_CTX as usize - RESPONSE_HEADROOM_TOKENS) * BYTES_PER_TOKEN;
let original_len = msgs.len(); let original_len = msgs.len();
let dropped = apply_context_budget(&mut msgs, budget_bytes); let dropped = apply_context_budget(&mut msgs, budget_bytes);
assert!(!dropped, "short conversation with one image must not truncate"); assert!(
!dropped,
"short conversation with one image must not truncate"
);
assert_eq!(msgs.len(), original_len, "no messages should be dropped"); assert_eq!(msgs.len(), original_len, "no messages should be dropped");
// Sanity: the flat image charge is accounted for but stays well under budget. // Sanity: the flat image charge is accounted for but stays well under budget.
assert!(estimate_bytes(&msgs) <= budget_bytes); assert!(estimate_bytes(&msgs) <= budget_bytes);
@@ -2640,26 +2709,26 @@ mod tests {
#[test] #[test]
fn bootstrap_system_prompt_falls_back_to_default_for_none() { fn bootstrap_system_prompt_falls_back_to_default_for_none() {
let out = resolve_bootstrap_system_prompt(None); let out = resolve_bootstrap_system_prompt(None, None);
assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT); assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT);
} }
#[test] #[test]
fn bootstrap_system_prompt_falls_back_to_default_for_empty_string() { fn bootstrap_system_prompt_falls_back_to_default_for_empty_string() {
// Apollo currently sends `''` when no persona is selected. // Apollo currently sends `''` when no persona is selected.
let out = resolve_bootstrap_system_prompt(Some("")); let out = resolve_bootstrap_system_prompt(Some(""), None);
assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT); assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT);
} }
#[test] #[test]
fn bootstrap_system_prompt_falls_back_to_default_for_whitespace() { fn bootstrap_system_prompt_falls_back_to_default_for_whitespace() {
let out = resolve_bootstrap_system_prompt(Some(" \n\t ")); let out = resolve_bootstrap_system_prompt(Some(" \n\t "), None);
assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT); assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT);
} }
#[test] #[test]
fn bootstrap_system_prompt_uses_supplied_when_non_empty() { fn bootstrap_system_prompt_uses_supplied_when_non_empty() {
let out = resolve_bootstrap_system_prompt(Some("you are a journal")); let out = resolve_bootstrap_system_prompt(Some("you are a journal"), None);
assert_eq!(out, "you are a journal"); assert_eq!(out, "you are a journal");
} }
@@ -2668,10 +2737,76 @@ mod tests {
// Trim only happens at the edges — interior newlines and spacing // Trim only happens at the edges — interior newlines and spacing
// (which Apollo's persona uses for tool listings) must survive. // (which Apollo's persona uses for tool listings) must survive.
let prompt = "line one\nline two\n bullet"; let prompt = "line one\nline two\n bullet";
let out = resolve_bootstrap_system_prompt(Some(prompt)); let out = resolve_bootstrap_system_prompt(Some(prompt), None);
assert_eq!(out, prompt); assert_eq!(out, prompt);
} }
#[test]
fn bootstrap_system_prompt_explicit_wins_over_persona_store() {
let out = resolve_bootstrap_system_prompt(
Some("explicit prompt"),
Some("stored persona prompt".to_string()),
);
assert_eq!(out, "explicit prompt");
}
#[test]
fn bootstrap_system_prompt_uses_persona_store_when_no_explicit() {
// Request carried persona_id but no system_prompt — the persona's
// stored prompt must be used, not the neutral default.
let out = resolve_bootstrap_system_prompt(None, Some("stored persona prompt".to_string()));
assert_eq!(out, "stored persona prompt");
// Empty explicit prompt behaves like None.
let out =
resolve_bootstrap_system_prompt(Some(""), Some("stored persona prompt".to_string()));
assert_eq!(out, "stored persona prompt");
}
#[test]
fn bootstrap_system_prompt_blank_persona_prompt_falls_to_default() {
let out = resolve_bootstrap_system_prompt(None, Some(" ".to_string()));
assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT);
}
// ── Synthetic final-answer prompt scaffolding ──────────────────────
#[test]
fn synthetic_final_prompt_round_trip_leaves_no_scaffolding() {
// Exhausted-loop fallback: nudge pushed, model reply appended, nudge
// removed — the persisted transcript must contain the reply but not
// the synthetic user prompt (all three loop variants rely on this).
let mut msgs = vec![
ChatMessage::system("sys"),
ChatMessage::user("q"),
assistant_with_tool_call("lookup"),
ChatMessage::tool_result("data"),
];
let idx = push_synthetic_final_prompt(&mut msgs);
assert_eq!(msgs[idx].content, SYNTHETIC_FINAL_ANSWER_PROMPT);
msgs.push(assistant_text("final answer"));
remove_synthetic_final_prompt(&mut msgs, idx);
assert_eq!(msgs.len(), 5);
assert!(
msgs.iter()
.all(|m| m.content != SYNTHETIC_FINAL_ANSWER_PROMPT),
"synthetic prompt must not persist"
);
assert_eq!(msgs.last().unwrap().content, "final answer");
}
#[test]
fn remove_synthetic_final_prompt_is_noop_on_index_mismatch() {
// Defensive guard: if the message at idx isn't the synthetic prompt
// (index drift), nothing is removed.
let mut msgs = vec![ChatMessage::user("q"), assistant_text("a")];
remove_synthetic_final_prompt(&mut msgs, 0);
remove_synthetic_final_prompt(&mut msgs, 5);
assert_eq!(msgs.len(), 2);
}
#[test] #[test]
fn bootstrap_backend_defaults_to_local_when_none() { fn bootstrap_backend_defaults_to_local_when_none() {
let out = resolve_bootstrap_backend(None).unwrap(); let out = resolve_bootstrap_backend(None).unwrap();
+211 -26
View File
@@ -28,6 +28,11 @@ use crate::otel::global_tracer;
use crate::tags::TagDao; use crate::tags::TagDao;
use crate::utils::{earliest_fs_time, normalize_path}; use crate::utils::{earliest_fs_time, normalize_path};
/// Max location records rendered by `tool_get_location_history`. The DAO
/// query is range-bounded, not limited, so the tool caps the rendered list
/// and labels the truncation via `found_header`.
const LOCATION_HISTORY_DISPLAY_LIMIT: usize = 20;
/// Parse a "Title: ...\n\n<body>" response into (title, body). /// Parse a "Title: ...\n\n<body>" response into (title, body).
/// Falls back to the first sentence as the title if the model didn't /// Falls back to the first sentence as the title if the model didn't
/// follow the format. /// follow the format.
@@ -218,15 +223,11 @@ impl InsightGenerator {
/// be called once per chat turn / generation. `has_vision` is /// be called once per chat turn / generation. `has_vision` is
/// supplied by the caller because it depends on the model selected /// supplied by the caller because it depends on the model selected
/// for this turn, not on persistent state. /// for this turn, not on persistent state.
pub fn current_gate_opts(&self, has_vision: bool) -> ToolGateOpts { ///
self.current_gate_opts_for_persona(has_vision, None) /// Also resolves the per-persona `allow_agent_corrections` flag.
} /// Pass `Some((user_id, persona_id))` when generating in a persona
/// context (every chat turn and agentic generation does); pass
/// Same as `current_gate_opts` but resolves the per-persona /// `None` for callers that don't have one, which defaults the gate
/// `allow_agent_corrections` flag too. Pass `Some((user_id,
/// persona_id))` when generating in a persona context (every chat
/// turn does); pass `None` for callers that don't have one yet
/// (cold paths, populate_knowledge bin), which defaults the gate
/// to closed — the conservative posture. /// to closed — the conservative posture.
pub fn current_gate_opts_for_persona( pub fn current_gate_opts_for_persona(
&self, &self,
@@ -277,6 +278,22 @@ impl InsightGenerator {
} }
} }
/// Resolve the stored system prompt for `(user_id, persona_id)` from
/// the persona store. Returns `None` when the persona row doesn't
/// exist or its prompt is blank — callers fall back to their own
/// default. Used for server-side persona resolution: a request that
/// carries `persona_id` but no explicit `system_prompt` should speak
/// in the persona's stored voice, not the neutral default.
pub(crate) fn persona_system_prompt(&self, user_id: i32, persona_id: &str) -> Option<String> {
let cx = opentelemetry::Context::new();
let mut pdao = self.persona_dao.lock().expect("Unable to lock PersonaDao");
pdao.get_persona(&cx, user_id, persona_id)
.ok()
.flatten()
.map(|p| p.system_prompt)
.filter(|s| !s.trim().is_empty())
}
/// Resolve `rel_path` against the configured libraries, returning the /// Resolve `rel_path` against the configured libraries, returning the
/// first root under which the file exists. Insights may be generated /// first root under which the file exists. Insights may be generated
/// for any library — the generator itself doesn't know which — so we /// for any library — the generator itself doesn't know which — so we
@@ -1673,6 +1690,10 @@ Return ONLY the summary, nothing else."#,
self.tool_recall_facts_for_photo(arguments, user_id, persona_id, cx) self.tool_recall_facts_for_photo(arguments, user_id, persona_id, cx)
.await .await
} }
"recall_facts_for_entity" => {
self.tool_recall_facts_for_entity(arguments, user_id, persona_id, cx)
.await
}
"store_entity" => self.tool_store_entity(arguments, cx).await, "store_entity" => self.tool_store_entity(arguments, cx).await,
"store_fact" => { "store_fact" => {
self.tool_store_fact( self.tool_store_fact(
@@ -2145,8 +2166,8 @@ Return ONLY the summary, nothing else."#,
}) })
.collect(); .collect();
format!( format!(
"Found {} messages:\n{}", "{}\n{}",
messages.len(), Self::found_header(messages.len(), formatted.len(), "messages"),
formatted.join("\n") formatted.join("\n")
) )
} }
@@ -2171,7 +2192,8 @@ Return ONLY the summary, nothing else."#,
let days_radius = args let days_radius = args
.get("days_radius") .get("days_radius")
.and_then(|v| v.as_i64()) .and_then(|v| v.as_i64())
.unwrap_or(7); .unwrap_or(7)
.clamp(1, 30);
let limit = args let limit = args
.get("limit") .get("limit")
.and_then(|v| v.as_i64()) .and_then(|v| v.as_i64())
@@ -2250,7 +2272,8 @@ Return ONLY the summary, nothing else."#,
let days_radius = args let days_radius = args
.get("days_radius") .get("days_radius")
.and_then(|v| v.as_i64()) .and_then(|v| v.as_i64())
.unwrap_or(14); .unwrap_or(14)
.clamp(1, 30);
let date = match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") { let date = match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
Ok(d) => d, Ok(d) => d,
@@ -2279,7 +2302,7 @@ Return ONLY the summary, nothing else."#,
Some(locs) if !locs.is_empty() => { Some(locs) if !locs.is_empty() => {
let formatted: Vec<String> = locs let formatted: Vec<String> = locs
.iter() .iter()
.take(20) .take(LOCATION_HISTORY_DISPLAY_LIMIT)
.map(|loc| { .map(|loc| {
let dt = DateTime::from_timestamp(loc.timestamp, 0) let dt = DateTime::from_timestamp(loc.timestamp, 0)
.map(|dt| { .map(|dt| {
@@ -2305,8 +2328,8 @@ Return ONLY the summary, nothing else."#,
}) })
.collect(); .collect();
format!( format!(
"Found {} location records:\n{}", "{}\n{}",
locs.len(), Self::found_header(locs.len(), formatted.len(), "location records"),
formatted.join("\n") formatted.join("\n")
) )
} }
@@ -2315,6 +2338,20 @@ Return ONLY the summary, nothing else."#,
} }
} }
/// Render a `Found N <noun>:` tool-result header, annotating when only
/// the first `shown` of `total` items are listed below it. Without the
/// annotation the model believes it saw everything ("Found 312
/// messages:" followed by 60 lines) and reasons from a silently
/// truncated list. `summarize_tool_result` keeps parsing the leading
/// count either way.
fn found_header(total: usize, shown: usize, noun: &str) -> String {
if shown < total {
format!("Found {} {} (showing first {}):", total, noun, shown)
} else {
format!("Found {} {}:", total, noun)
}
}
/// Tool: get_file_tags — fetch tags for a file path /// Tool: get_file_tags — fetch tags for a file path
async fn tool_get_file_tags( async fn tool_get_file_tags(
&self, &self,
@@ -2669,6 +2706,102 @@ Return ONLY the summary, nothing else."#,
} }
} }
/// Tool: recall_facts_for_entity — retrieve facts for one entity by id.
/// Persona scoping and the reviewed-only-facts filter mirror
/// `tool_recall_facts_for_photo` exactly: reads are always Single
/// (user_id, persona_id), and strict-mode personas see only
/// human-reviewed facts.
async fn tool_recall_facts_for_entity(
&self,
args: &serde_json::Value,
user_id: i32,
persona_id: &str,
cx: &opentelemetry::Context,
) -> String {
use crate::database::PersonaFilter;
let persona_filter = PersonaFilter::Single {
user_id,
persona_id: persona_id.to_string(),
};
let entity_id = match args.get("entity_id").and_then(|v| v.as_i64()) {
Some(id) => id as i32,
None => return "Error: missing required parameter 'entity_id'".to_string(),
};
let limit = args
.get("limit")
.and_then(|v| v.as_i64())
.unwrap_or(50)
.clamp(1, 100) as usize;
log::info!(
"tool_recall_facts_for_entity: entity_id={}, limit={}",
entity_id,
limit
);
// Resolve the persona's reviewed-only-mode flag once — identical
// fallback semantics to recall_facts_for_photo (missing persona
// row → permissive active+reviewed default).
let reviewed_only = {
let mut pdao = self.persona_dao.lock().expect("Unable to lock PersonaDao");
pdao.get_persona(cx, user_id, persona_id)
.ok()
.flatten()
.map(|p| p.reviewed_only_facts)
.unwrap_or(false)
};
let mut kdao = self
.knowledge_dao
.lock()
.expect("Unable to lock KnowledgeDao");
let entity = match kdao.get_entity_by_id(cx, entity_id) {
Ok(Some(e)) => e,
Ok(None) => return format!("No entity found with ID {}.", entity_id),
Err(e) => return format!("Error fetching entity: {:?}", e),
};
let mut output_lines = vec![format!("Entity: {} ({})", entity.name, entity.entity_type)];
match kdao.get_facts_for_entity(cx, entity_id, &persona_filter) {
Ok(facts) => {
// Default scope: active + reviewed. Strict mode trims to
// reviewed only — same allow rule as recall_facts_for_photo.
let allow = |s: &str| -> bool {
if reviewed_only {
s == "reviewed"
} else {
s == "active" || s == "reviewed"
}
};
for f in facts.iter().filter(|f| allow(&f.status)).take(limit) {
let obj = if let Some(ref v) = f.object_value {
v.clone()
} else if let Some(oid) = f.object_entity_id {
kdao.get_entity_by_id(cx, oid)
.ok()
.flatten()
.map(|e| format!("{} (entity ID: {})", e.name, e.id))
.unwrap_or_else(|| format!("entity:{}", oid))
} else {
"(unknown)".to_string()
};
output_lines.push(format!(" - {} {}", f.predicate, obj));
}
}
Err(e) => return format!("Error fetching facts: {:?}", e),
}
if output_lines.len() == 1 {
format!(
"No active knowledge facts found for entity {} (ID: {}).",
entity.name, entity_id
)
} else {
format!("Knowledge for this entity:\n{}", output_lines.join("\n"))
}
}
/// Tool: store_entity — upsert an entity into the knowledge memory. /// Tool: store_entity — upsert an entity into the knowledge memory.
/// Embeddings go through the configured local backend (`LLM_BACKEND`), /// Embeddings go through the configured local backend (`LLM_BACKEND`),
/// independent of the per-request chat backend in the caller. /// independent of the per-request chat backend in the caller.
@@ -3063,7 +3196,7 @@ Return ONLY the summary, nothing else."#,
/// Build the list of tool definitions for the agentic loop, gated by /// Build the list of tool definitions for the agentic loop, gated by
/// `opts`. Always-on tools: `search_messages`, `get_sms_messages`, /// `opts`. Always-on tools: `search_messages`, `get_sms_messages`,
/// `get_file_tags`, `reverse_geocode`, `get_current_datetime`, the /// `get_file_tags`, `reverse_geocode`, `get_current_datetime`, the
/// four knowledge-memory tools. Conditional: `describe_photo` (vision /// five knowledge-memory tools. Conditional: `describe_photo` (vision
/// model), `get_personal_place_at` (Apollo configured), `search_rag` /// model), `get_personal_place_at` (Apollo configured), `search_rag`
/// (daily_summaries populated), `get_calendar_events` (calendar /// (daily_summaries populated), `get_calendar_events` (calendar
/// populated), `get_location_history` (location history populated). /// populated), `get_location_history` (location history populated).
@@ -3280,6 +3413,22 @@ Return ONLY the summary, nothing else."#,
}), }),
)); ));
tools.push(Tool::function(
"recall_facts_for_entity",
"Retrieve all stored facts about one specific entity by its ID. Use to follow up on an entity \
surfaced by `recall_entities` (or referenced by another fact's object) that is NOT linked to the \
current photo `recall_facts_for_photo` only covers photo-linked entities. \
Example: `{entity_id: 7}` everything known about entity 7.",
serde_json::json!({
"type": "object",
"required": ["entity_id"],
"properties": {
"entity_id": { "type": "integer", "description": "ID of the entity to fetch facts for (from recall_entities, store_entity, or a fact's object reference)." },
"limit": { "type": "integer", "description": "Max facts to return (default 50, max 100)." }
}
}),
));
tools.push(Tool::function( tools.push(Tool::function(
"store_entity", "store_entity",
"Upsert a person / place / event / thing into the knowledge memory. Returns the entity id (use it as \ "Upsert a person / place / event / thing into the knowledge memory. Returns the entity id (use it as \
@@ -3543,7 +3692,7 @@ Return ONLY the summary, nothing else."#,
format!("{} personal place(s)", n) format!("{} personal place(s)", n)
} }
} }
"recall_entities" | "recall_facts_for_photo" => { "recall_entities" | "recall_facts_for_photo" | "recall_facts_for_entity" => {
let n = raw.lines().skip(1).filter(|l| !l.trim().is_empty()).count(); let n = raw.lines().skip(1).filter(|l| !l.trim().is_empty()).count();
let kind = if tool_name == "recall_entities" { let kind = if tool_name == "recall_entities" {
"entities" "entities"
@@ -4033,8 +4182,11 @@ Return ONLY the summary, nothing else."#,
// 10. Define tools. describe_photo offered only when the chat model // 10. Define tools. describe_photo offered only when the chat model
// sees images directly (images_inline); in hybrid mode the visual // sees images directly (images_inline); in hybrid mode the visual
// description is already inlined as text. // description is already inlined as text. Persona-aware so the
let gate_opts = self.current_gate_opts(backend.images_inline); // persona's allow_agent_corrections gate opens here exactly like
// it does for chat turns (insight_chat does the same).
let gate_opts =
self.current_gate_opts_for_persona(backend.images_inline, Some((user_id, &persona_id)));
let tools = Self::build_tool_definitions(gate_opts); let tools = Self::build_tool_definitions(gate_opts);
// 11. Build initial messages. images_inline → attach base64 to the // 11. Build initial messages. images_inline → attach base64 to the
@@ -4145,7 +4297,10 @@ Return ONLY the summary, nothing else."#,
.set_attribute(KeyValue::new("iterations_used", iterations_used as i64)); .set_attribute(KeyValue::new("iterations_used", iterations_used as i64));
loop_cx.span().set_status(Status::Ok); loop_cx.span().set_status(Status::Ok);
// 13. Parse title from the model's inline response. // 13. Strip any leaked <think>…</think> reasoning block (thinking
// models emit it ahead of the answer; the raw transcript in
// training_messages keeps it), then parse the title.
final_content = crate::ai::llm_client::strip_think_blocks(&final_content);
let (title, body) = parse_title_body(&final_content); let (title, body) = parse_title_body(&final_content);
final_content = body; final_content = body;
@@ -4341,6 +4496,7 @@ mod tests {
assert!(names.contains(&"get_current_datetime")); assert!(names.contains(&"get_current_datetime"));
assert!(names.contains(&"recall_entities")); assert!(names.contains(&"recall_entities"));
assert!(names.contains(&"recall_facts_for_photo")); assert!(names.contains(&"recall_facts_for_photo"));
assert!(names.contains(&"recall_facts_for_entity"));
assert!(names.contains(&"store_entity")); assert!(names.contains(&"store_entity"));
assert!(names.contains(&"store_fact")); assert!(names.contains(&"store_fact"));
@@ -4550,6 +4706,37 @@ mod tests {
); );
} }
#[test]
fn found_header_labels_truncation_honestly() {
// Truncated: total exceeds what's listed below the header.
assert_eq!(
InsightGenerator::found_header(312, 60, "messages"),
"Found 312 messages (showing first 60):"
);
// Not truncated: plain header, no annotation noise.
assert_eq!(
InsightGenerator::found_header(7, 7, "messages"),
"Found 7 messages:"
);
assert_eq!(
InsightGenerator::found_header(40, 20, "location records"),
"Found 40 location records (showing first 20):"
);
}
#[test]
fn summarize_parses_truncated_found_header() {
// The "(showing first K)" annotation must not break the few-shot
// trajectory summarizer's "Found N" count parsing.
assert_eq!(
InsightGenerator::summarize_tool_result(
"get_sms_messages",
"Found 312 messages (showing first 60):\n[2023-08-15 10:00] Sarah: hi"
),
"312 messages"
);
}
fn make_search_hit( fn make_search_hit(
id: i64, id: i64,
contact: &str, contact: &str,
@@ -4869,11 +5056,9 @@ mod tests {
// Replicate the resolve_backend local-client construction // Replicate the resolve_backend local-client construction
// (lines ~3686-3695 of this file). // (lines ~3686-3695 of this file).
let mut lc = base.clone(); let mut lc = base.clone();
if let Some(ref m) = overrides_model { if !is_hybrid && let Some(ref m) = overrides_model {
if !is_hybrid { lc.primary_model = m.clone();
lc.primary_model = m.clone(); lc.set_vision_model(m.clone());
lc.set_vision_model(m.clone());
}
} }
// In hybrid mode the local client must keep its configured slots. // In hybrid mode the local client must keep its configured slots.
+119 -52
View File
@@ -590,10 +590,7 @@ impl LlmClient for LlamaCppClient {
let mut byte_stream = byte_stream; let mut byte_stream = byte_stream;
let mut buf: Vec<u8> = Vec::new(); let mut buf: Vec<u8> = Vec::new();
let mut accumulated_content = String::new(); let mut accumulated_content = String::new();
let mut tool_state: std::collections::BTreeMap< let mut tool_state = ToolCallAssembly::new();
usize,
(Option<String>, Option<String>, String),
> = std::collections::BTreeMap::new();
let mut role = "assistant".to_string(); let mut role = "assistant".to_string();
let mut prompt_tokens: Option<i32> = None; let mut prompt_tokens: Option<i32> = None;
let mut completion_tokens: Option<i32> = None; let mut completion_tokens: Option<i32> = None;
@@ -670,32 +667,7 @@ impl LlmClient for LlamaCppClient {
yield Ok(LlmStreamEvent::TextDelta(content.to_string())); yield Ok(LlmStreamEvent::TextDelta(content.to_string()));
} }
if let Some(tcs) = delta.get("tool_calls").and_then(|v| v.as_array()) { if let Some(tcs) = delta.get("tool_calls").and_then(|v| v.as_array()) {
for tc_delta in tcs { apply_tool_call_deltas(&mut tool_state, tcs);
let idx = tc_delta
.get("index")
.and_then(|n| n.as_u64())
.unwrap_or(0) as usize;
let entry = tool_state
.entry(idx)
.or_insert((None, None, String::new()));
if let Some(id) =
tc_delta.get("id").and_then(|v| v.as_str())
{
entry.0 = Some(id.to_string());
}
if let Some(func) = tc_delta.get("function") {
if let Some(name) =
func.get("name").and_then(|v| v.as_str())
{
entry.1 = Some(name.to_string());
}
if let Some(args) =
func.get("arguments").and_then(|v| v.as_str())
{
entry.2.push_str(args);
}
}
}
} }
} }
if done_seen { if done_seen {
@@ -707,28 +679,7 @@ impl LlmClient for LlamaCppClient {
} }
} }
let tool_calls: Option<Vec<ToolCall>> = if tool_state.is_empty() { let tool_calls = finalize_tool_calls(tool_state);
None
} else {
let mut v = Vec::with_capacity(tool_state.len());
for (_idx, (id, name, args)) in tool_state {
let arguments: Value = if args.trim().is_empty() {
Value::Object(Default::default())
} else {
serde_json::from_str(&args).unwrap_or_else(|_| {
Value::Object(Default::default())
})
};
v.push(ToolCall {
id,
function: ToolCallFunction {
name: name.unwrap_or_default(),
arguments,
},
});
}
Some(v)
};
if let Some(ref frame) = last_frame { if let Some(ref frame) = last_frame {
log_timings(frame, prompt_tokens, completion_tokens); log_timings(frame, prompt_tokens, completion_tokens);
@@ -937,6 +888,58 @@ fn extract_error_detail(parsed: &Value) -> String {
raw.chars().take(300).collect() raw.chars().take(300).collect()
} }
/// Per-index assembly state for streamed OpenAI-style tool-call deltas:
/// `index → (id, name, concatenated argument fragments)`. BTreeMap so the
/// finalized calls come out in index order.
type ToolCallAssembly = std::collections::BTreeMap<usize, (Option<String>, Option<String>, String)>;
/// Fold one SSE frame's `delta.tool_calls` array into the assembly state.
/// Deltas carrying the same `index` merge into one call (llama.cpp streams a
/// call's argument JSON in fragments — they concatenate); distinct indexes
/// accumulate as separate calls.
fn apply_tool_call_deltas(state: &mut ToolCallAssembly, tcs: &[Value]) {
for tc_delta in tcs {
let idx = tc_delta.get("index").and_then(|n| n.as_u64()).unwrap_or(0) as usize;
let entry = state.entry(idx).or_insert((None, None, String::new()));
if let Some(id) = tc_delta.get("id").and_then(|v| v.as_str()) {
entry.0 = Some(id.to_string());
}
if let Some(func) = tc_delta.get("function") {
if let Some(name) = func.get("name").and_then(|v| v.as_str()) {
entry.1 = Some(name.to_string());
}
if let Some(args) = func.get("arguments").and_then(|v| v.as_str()) {
entry.2.push_str(args);
}
}
}
}
/// Convert assembled tool-call state into canonical `ToolCall`s, parsing each
/// call's concatenated argument JSON (empty / malformed → `{}`). `None` when
/// no tool-call deltas arrived.
fn finalize_tool_calls(state: ToolCallAssembly) -> Option<Vec<ToolCall>> {
if state.is_empty() {
return None;
}
let mut v = Vec::with_capacity(state.len());
for (_idx, (id, name, args)) in state {
let arguments: Value = if args.trim().is_empty() {
Value::Object(Default::default())
} else {
serde_json::from_str(&args).unwrap_or_else(|_| Value::Object(Default::default()))
};
v.push(ToolCall {
id,
function: ToolCallFunction {
name: name.unwrap_or_default(),
arguments,
},
});
}
Some(v)
}
fn find_double_newline(buf: &[u8]) -> Option<usize> { fn find_double_newline(buf: &[u8]) -> Option<usize> {
for i in 0..buf.len().saturating_sub(1) { for i in 0..buf.len().saturating_sub(1) {
if buf[i] == b'\n' && buf[i + 1] == b'\n' { if buf[i] == b'\n' && buf[i + 1] == b'\n' {
@@ -1302,4 +1305,68 @@ mod tests {
let c = LlamaCppClient::new(None, None); let c = LlamaCppClient::new(None, None);
assert_eq!(c.tts_model, "chatterbox"); assert_eq!(c.tts_model, "chatterbox");
} }
#[test]
fn stream_assembly_keeps_two_tool_calls_from_separate_chunks() {
// llama.cpp emits one delta per SSE frame; two calls with distinct
// `index` values arriving in separate frames must BOTH survive.
let mut state = ToolCallAssembly::new();
apply_tool_call_deltas(
&mut state,
&[json!({
"index": 0,
"id": "call_a",
"function": { "name": "get_sms_messages", "arguments": "{\"date\":\"2019-01-01\"}" }
})],
);
apply_tool_call_deltas(
&mut state,
&[json!({
"index": 1,
"id": "call_b",
"function": { "name": "reverse_geocode", "arguments": "{\"latitude\":1.0,\"longitude\":2.0}" }
})],
);
let calls = finalize_tool_calls(state).expect("two calls assembled");
assert_eq!(calls.len(), 2);
assert_eq!(calls[0].id.as_deref(), Some("call_a"));
assert_eq!(calls[0].function.name, "get_sms_messages");
assert_eq!(calls[0].function.arguments["date"], "2019-01-01");
assert_eq!(calls[1].id.as_deref(), Some("call_b"));
assert_eq!(calls[1].function.name, "reverse_geocode");
assert_eq!(calls[1].function.arguments["latitude"], 1.0);
}
#[test]
fn stream_assembly_concatenates_argument_fragments_for_same_index() {
// A single call's argument JSON streamed across frames concatenates
// into one parseable document.
let mut state = ToolCallAssembly::new();
apply_tool_call_deltas(
&mut state,
&[json!({
"index": 0,
"id": "call_x",
"function": { "name": "search_messages", "arguments": "{\"query\":" }
})],
);
apply_tool_call_deltas(
&mut state,
&[json!({
"index": 0,
"function": { "arguments": "\"dinner\"}" }
})],
);
let calls = finalize_tool_calls(state).expect("one call assembled");
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].function.name, "search_messages");
assert_eq!(calls[0].function.arguments["query"], "dinner");
}
#[test]
fn stream_assembly_empty_state_finalizes_to_none() {
assert!(finalize_tool_calls(ToolCallAssembly::new()).is_none());
}
} }
+52
View File
@@ -170,3 +170,55 @@ pub struct ModelCapabilities {
pub has_vision: bool, pub has_vision: bool,
pub has_tool_calling: bool, pub has_tool_calling: bool,
} }
/// Strip a leading `<think>…</think>` reasoning block from model output.
///
/// Thinking models sometimes emit chain-of-thought inside think tags before
/// the real answer. Everything after the first `</think>` is the answer;
/// when no tag is present — or the text after it is empty — the trimmed
/// input is returned unchanged. Mirrors the behavior Ollama's
/// `extract_final_answer` has applied to single-shot generation; shared here
/// so the tool-calling final-content paths (agentic generation + chat) can
/// apply the identical cleanup before parsing / persisting.
pub fn strip_think_blocks(response: &str) -> String {
let response = response.trim();
if let Some(pos) = response.find("</think>") {
let answer = response[pos + "</think>".len()..].trim();
if !answer.is_empty() {
return answer.to_string();
}
}
response.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn strip_think_blocks_removes_leading_think_block() {
let raw = "<think>\nLet me reason about this.\n</think>\n\nTitle: A Day Out\n\nThe body.";
assert_eq!(strip_think_blocks(raw), "Title: A Day Out\n\nThe body.");
}
#[test]
fn strip_think_blocks_passes_through_plain_content() {
assert_eq!(strip_think_blocks(" just an answer "), "just an answer");
}
#[test]
fn strip_think_blocks_keeps_content_when_answer_after_tag_is_empty() {
// A think block with nothing after it: better to return the trimmed
// original than an empty string (matches Ollama's fallback).
let raw = "<think>only thoughts</think>";
assert_eq!(strip_think_blocks(raw), raw);
}
#[test]
fn strip_think_blocks_handles_unclosed_tag() {
let raw = "<think>thinking forever";
assert_eq!(strip_think_blocks(raw), raw);
}
}
+2 -1
View File
@@ -25,7 +25,8 @@ pub use handlers::{
chat_stream_handler, chat_turn_handler, delete_insight_handler, export_training_data_handler, chat_stream_handler, chat_turn_handler, delete_insight_handler, export_training_data_handler,
generate_agentic_insight_handler, generate_insight_handler, generation_status_handler, generate_agentic_insight_handler, generate_insight_handler, generation_status_handler,
get_all_insights_handler, get_available_models_handler, get_insight_handler, get_all_insights_handler, get_available_models_handler, get_insight_handler,
get_openrouter_models_handler, rate_insight_handler, turn_async_handler, turn_replay_handler, get_insight_history_handler, get_openrouter_models_handler, rate_insight_handler,
turn_async_handler, turn_replay_handler,
}; };
pub use insight_generator::InsightGenerator; pub use insight_generator::InsightGenerator;
pub use llamacpp::LlamaCppClient; pub use llamacpp::LlamaCppClient;
+52 -14
View File
@@ -360,18 +360,7 @@ impl OllamaClient {
/// Extract final answer from thinking model output /// Extract final answer from thinking model output
/// Handles <think>...</think> tags and takes everything after /// Handles <think>...</think> tags and takes everything after
fn extract_final_answer(&self, response: &str) -> String { fn extract_final_answer(&self, response: &str) -> String {
let response = response.trim(); crate::ai::llm_client::strip_think_blocks(response)
// Look for </think> tag and take everything after it
if let Some(pos) = response.find("</think>") {
let answer = response[pos + 8..].trim();
if !answer.is_empty() {
return answer.to_string();
}
}
// Fallback: return the whole response trimmed
response.to_string()
} }
async fn try_generate( async fn try_generate(
@@ -846,11 +835,14 @@ Analyze the image and use specific details from both the visual content and the
if !chunk.message.role.is_empty() { if !chunk.message.role.is_empty() {
role = chunk.message.role; role = chunk.message.role;
} }
// Ollama only attaches tool_calls on the final chunk. // Ollama ≥0.8 can stream tool_calls incrementally
// across chunks (older servers attach them all to
// one chunk) — append rather than overwrite so
// calls from earlier chunks survive.
if let Some(tcs) = chunk.message.tool_calls if let Some(tcs) = chunk.message.tool_calls
&& !tcs.is_empty() && !tcs.is_empty()
{ {
tool_calls = Some(tcs); append_streamed_tool_calls(&mut tool_calls, tcs);
} }
if chunk.done { if chunk.done {
prompt_eval_count = chunk.prompt_eval_count; prompt_eval_count = chunk.prompt_eval_count;
@@ -1329,8 +1321,20 @@ struct OllamaEmbedResponse {
embeddings: Vec<Vec<f32>>, embeddings: Vec<Vec<f32>>,
} }
/// Accumulate tool calls streamed across NDJSON chunks. Ollama ≥0.8 may
/// emit each tool call on its own chunk; replacing the accumulator on every
/// chunk would keep only the last call, so extend instead.
fn append_streamed_tool_calls(
acc: &mut Option<Vec<crate::ai::llm_client::ToolCall>>,
new: Vec<crate::ai::llm_client::ToolCall>,
) {
acc.get_or_insert_with(Vec::new).extend(new);
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::append_streamed_tool_calls;
use crate::ai::llm_client::{ToolCall, ToolCallFunction};
#[test] #[test]
fn generate_photo_description_prompt_is_concise() { fn generate_photo_description_prompt_is_concise() {
@@ -1341,4 +1345,38 @@ mod tests {
Focus on the people, location, and activity."; Focus on the people, location, and activity.";
assert!(prompt.len() < 200, "Prompt should be concise"); assert!(prompt.len() < 200, "Prompt should be concise");
} }
fn call(name: &str) -> ToolCall {
ToolCall {
id: None,
function: ToolCallFunction {
name: name.to_string(),
arguments: serde_json::json!({}),
},
}
}
#[test]
fn streamed_tool_calls_across_chunks_accumulate() {
// Two tool calls arriving in two separate stream chunks must BOTH
// survive assembly — the old `tool_calls = Some(tcs)` kept only the
// last chunk's calls.
let mut acc: Option<Vec<ToolCall>> = None;
append_streamed_tool_calls(&mut acc, vec![call("get_sms_messages")]);
append_streamed_tool_calls(&mut acc, vec![call("reverse_geocode")]);
let calls = acc.expect("tool calls accumulated");
assert_eq!(calls.len(), 2);
assert_eq!(calls[0].function.name, "get_sms_messages");
assert_eq!(calls[1].function.name, "reverse_geocode");
}
#[test]
fn streamed_tool_calls_single_chunk_batch_kept_intact() {
// Older Ollama servers attach all calls to one chunk — unchanged.
let mut acc: Option<Vec<ToolCall>> = None;
append_streamed_tool_calls(&mut acc, vec![call("a"), call("b")]);
let calls = acc.expect("tool calls accumulated");
assert_eq!(calls.len(), 2);
}
} }
+10 -5
View File
@@ -529,16 +529,21 @@ mod tests {
opentelemetry::Context::new() opentelemetry::Context::new()
} }
/// Build a tempdir-backed library + DAOs sharing a single in-memory /// Everything `setup` hands back to a test: tempdir, library, shared
/// SQLite connection (so cross-table joins like /// connection, and the two DAOs. Aliased to keep clippy's
/// `list_unscanned_candidates` see consistent state). /// type-complexity lint satisfied.
fn setup() -> ( type SetupFixture = (
TempDir, TempDir,
Library, Library,
Arc<Mutex<diesel::SqliteConnection>>, Arc<Mutex<diesel::SqliteConnection>>,
Arc<Mutex<Box<dyn ExifDao>>>, Arc<Mutex<Box<dyn ExifDao>>>,
Arc<Mutex<Box<dyn FaceDao>>>, Arc<Mutex<Box<dyn FaceDao>>>,
) { );
/// Build a tempdir-backed library + DAOs sharing a single in-memory
/// SQLite connection (so cross-table joins like
/// `list_unscanned_candidates` see consistent state).
fn setup() -> SetupFixture {
let tmp = TempDir::new().expect("tempdir"); let tmp = TempDir::new().expect("tempdir");
let mut conn = in_memory_db_connection(); let mut conn = in_memory_db_connection();
// Migration seeds library id=1 with a placeholder root; rewrite it // Migration seeds library id=1 with a placeholder root; rewrite it
+118 -1
View File
@@ -47,7 +47,6 @@ pub trait InsightDao: Sync + Send {
paths: &[String], paths: &[String],
) -> Result<Option<PhotoInsight>, DbError>; ) -> Result<Option<PhotoInsight>, DbError>;
#[allow(dead_code)]
fn get_insight_history( fn get_insight_history(
&mut self, &mut self,
context: &opentelemetry::Context, context: &opentelemetry::Context,
@@ -82,6 +81,17 @@ pub trait InsightDao: Sync + Send {
approved: bool, approved: bool,
) -> Result<(), DbError>; ) -> Result<(), DbError>;
/// Rate a specific insight version by primary key, regardless of
/// `is_current`. Used by the per-file history view to approve/reject
/// previously generated (superseded) versions, which the path-based
/// `rate_insight` (current row only) cannot reach.
fn rate_insight_by_id(
&mut self,
context: &opentelemetry::Context,
insight_id: i32,
approved: bool,
) -> Result<(), DbError>;
fn get_approved_insights( fn get_approved_insights(
&mut self, &mut self,
context: &opentelemetry::Context, context: &opentelemetry::Context,
@@ -352,6 +362,26 @@ impl InsightDao for SqliteInsightDao {
.map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) .map_err(|e| DbError::log(DbErrorKind::UpdateError, e))
} }
fn rate_insight_by_id(
&mut self,
context: &opentelemetry::Context,
target_id: i32,
is_approved: bool,
) -> Result<(), DbError> {
trace_db_call(context, "update", "rate_insight_by_id", |_span| {
use schema::photo_insights::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
diesel::update(photo_insights.find(target_id))
.set(approved.eq(Some(is_approved)))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))
})
.map_err(|e| DbError::log(DbErrorKind::UpdateError, e))
}
fn get_approved_insights( fn get_approved_insights(
&mut self, &mut self,
context: &opentelemetry::Context, context: &opentelemetry::Context,
@@ -396,3 +426,90 @@ impl InsightDao for SqliteInsightDao {
.map_err(|e| DbError::log(DbErrorKind::UpdateError, e)) .map_err(|e| DbError::log(DbErrorKind::UpdateError, e))
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::database::test::in_memory_db_connection;
fn dao() -> SqliteInsightDao {
let conn = Arc::new(Mutex::new(in_memory_db_connection()));
SqliteInsightDao::from_connection(conn)
}
/// Build an insight insert with sensible defaults; tests override the
/// fields they care about (path, generated_at, model).
fn insert(path: &str, generated_at: i64, model: &str) -> InsertPhotoInsight {
InsertPhotoInsight {
library_id: 1,
file_path: path.to_string(),
title: format!("title for {model}"),
summary: "summary".to_string(),
generated_at,
model_version: model.to_string(),
is_current: true,
training_messages: None,
backend: "local".to_string(),
fewshot_source_ids: None,
content_hash: None,
num_ctx: None,
temperature: None,
top_p: None,
top_k: None,
min_p: None,
system_prompt: None,
persona_id: None,
prompt_eval_count: None,
eval_count: None,
}
}
#[test]
fn get_insight_history_returns_all_versions_newest_first() {
let cx = opentelemetry::Context::new();
let mut dao = dao();
// store_insight flips prior rows to is_current=false, so three
// generations for the same path leave a 3-row history.
dao.store_insight(&cx, insert("a.jpg", 100, "m1")).unwrap();
dao.store_insight(&cx, insert("a.jpg", 200, "m2")).unwrap();
dao.store_insight(&cx, insert("a.jpg", 300, "m3")).unwrap();
// A different path must not leak into the history.
dao.store_insight(&cx, insert("b.jpg", 250, "other"))
.unwrap();
let history = dao.get_insight_history(&cx, "a.jpg").unwrap();
assert_eq!(history.len(), 3);
assert_eq!(
history.iter().map(|i| i.generated_at).collect::<Vec<_>>(),
vec![300, 200, 100],
"history should be newest-first"
);
// Exactly one version is current (the latest generation).
let current: Vec<_> = history.iter().filter(|i| i.is_current).collect();
assert_eq!(current.len(), 1);
assert_eq!(current[0].generated_at, 300);
}
#[test]
fn rate_insight_by_id_rates_only_the_targeted_version() {
let cx = opentelemetry::Context::new();
let mut dao = dao();
dao.store_insight(&cx, insert("a.jpg", 100, "m1")).unwrap();
dao.store_insight(&cx, insert("a.jpg", 200, "m2")).unwrap();
// History is newest-first: [200 (current), 100 (superseded)].
let history = dao.get_insight_history(&cx, "a.jpg").unwrap();
let old_version = history.iter().find(|i| i.generated_at == 100).unwrap();
assert!(!old_version.is_current);
dao.rate_insight_by_id(&cx, old_version.id, true).unwrap();
let history = dao.get_insight_history(&cx, "a.jpg").unwrap();
let old = history.iter().find(|i| i.generated_at == 100).unwrap();
let current = history.iter().find(|i| i.generated_at == 200).unwrap();
assert_eq!(old.approved, Some(true), "targeted version is rated");
assert_eq!(current.approved, None, "current version is untouched");
}
}
+1 -1
View File
@@ -1052,7 +1052,7 @@ mod tests {
enabled: true, enabled: true,
excluded_dirs: Vec::new(), excluded_dirs: Vec::new(),
}; };
let map = new_health_map(&[lib.clone()]); let map = new_health_map(std::slice::from_ref(&lib));
// First probe: empty dir, no prior data — Online. // First probe: empty dir, no prior data — Online.
let s1 = refresh_health(&map, &lib, false); let s1 = refresh_health(&map, &lib, false);
+1
View File
@@ -351,6 +351,7 @@ fn main() -> std::io::Result<()> {
.service(ai::get_insight_handler) .service(ai::get_insight_handler)
.service(ai::delete_insight_handler) .service(ai::delete_insight_handler)
.service(ai::get_all_insights_handler) .service(ai::get_all_insights_handler)
.service(ai::get_insight_history_handler)
.service(ai::get_available_models_handler) .service(ai::get_available_models_handler)
.service(ai::get_openrouter_models_handler) .service(ai::get_openrouter_models_handler)
.service(ai::chat_turn_handler) .service(ai::chat_turn_handler)