6 Commits

Author SHA1 Message Date
Cameron Cordes 31904fef80 Raise chat truncation default num_ctx to 32k, env-overridable
The history-truncation budget assumed an 8192-token context whenever a
chat request omitted num_ctx, while the llama-swap chat slots serve
20k-131k. Replayed transcripts past ~6k tokens were silently gutted
every turn — losing conversation history and destroying llama.cpp
KV-cache prefix reuse (full SWA re-prefill per turn).

Default is now 32768 (real conversations top out around 16k), with
AGENTIC_CHAT_DEFAULT_NUM_CTX to override per deploy, floored at
headroom + 1024. Explicit per-request num_ctx still wins.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 19:14:02 -04:00
Cameron Cordes 13f3635db2 Fix clippy lints in backfill and libraries tests
Keep `cargo clippy --tests` clean alongside the agentic-loop changes:
alias backfill's five-element setup() tuple as SetupFixture
(type_complexity) and build the single-library health map via
std::slice::from_ref instead of cloning (unnecessary clone-to-slice).
No behavior change.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 18:29:44 -04:00
Cameron Cordes b711252c23 Resolve persona prompts server-side; drop synthetic prompt in chat_turn
A request carrying persona_id but no system_prompt used to fall back to
the neutral default voice. Both agentic generation
(generate_agentic_insight_handler) and chat bootstrap now resolve the
persona's stored prompt from the persona store, with precedence:
explicit non-blank client system_prompt > persona store lookup >
existing default ("default" persona id behaves the same — used if the
store has a row, neutral default otherwise). Resolution happens at the
handler / bootstrap entry where the DAO is reachable; internals are
unchanged. resolve_bootstrap_system_prompt takes the resolved persona
prompt as a second argument, with precedence tests.

Also in insight_chat:

- Sync chat_turn no longer persists the synthetic "Please write your
  final answer now without calling any more tools." user message pushed
  on iteration exhaustion — extracted both streaming variants'
  synthetic_idx pattern into push/remove_synthetic_final_prompt (the
  remove is a defensive no-op on index drift) and applied it to all
  three loops; round-trip test included.
- Strip leaked <think> blocks from the final content persisted as the
  reply in chat_turn and both streaming AgenticLoopOutcomes (mid-stream
  TextDeltas are untouched; the raw transcript keeps the block).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 18:29:35 -04:00
Cameron Cordes 091982bdfc Add recall_facts_for_entity tool; fix generation gates and tool output
Agentic-loop fixes in the generator:

- New recall_facts_for_entity tool (always-on, like recall_entities):
  fetches facts for one entity by id so the model can follow up on
  entities surfaced by recall_entities that aren't photo-linked
  (recall_facts_for_photo only covers linked entities). Mirrors that
  tool's persona scoping (PersonaFilter::Single) and the persona's
  reviewed_only_facts filter exactly, and renders in the same
  "Entity: ... / - predicate object" style. Wired through execute_tool
  and the trajectory summarizer.
- Generation now resolves gates persona-aware:
  current_gate_opts_for_persona(images_inline, Some((user_id,
  persona_id))) instead of the None-defaulting wrapper, so a persona's
  allow_agent_corrections opens propose_correction during generation the
  same way chat turns already did. The now-unused current_gate_opts
  wrapper is removed.
- Strip leaked <think> blocks from the final assistant content before
  parse_title_body / store_insight (raw training transcript keeps them).
- Honest truncation labels: get_sms_messages and get_location_history
  said "Found N ..." while listing only the first K; found_header now
  emits "Found N ... (showing first K):" when truncated, and the
  summarizer still parses the count.
- Clamp days_radius in get_calendar_events and get_location_history to
  1..=30, matching get_sms_messages.
- persona_system_prompt helper (persona store lookup, blank-prompt ->
  None) for server-side persona resolution; callers land in the next
  commit.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 18:29:20 -04:00
Cameron Cordes 592dfcb42c Accumulate streamed tool calls across chunks in Ollama streaming
Ollama >=0.8 can stream tool_calls incrementally across NDJSON chunks;
chat_with_tools_stream did `tool_calls = Some(tcs)` per chunk, so only
the last chunk's calls survived assembly and earlier calls were silently
dropped. Append into the accumulator instead.

- ollama: append_streamed_tool_calls helper + tests covering two calls
  arriving in separate chunks and the single-chunk batch case.
- llamacpp: the SSE delta assembly was already correct (per-index
  BTreeMap, same-index argument fragments concatenate, distinct indexes
  accumulate); extracted it into apply_tool_call_deltas /
  finalize_tool_calls and added tests pinning that behavior.
- llm_client: new shared strip_think_blocks (moved from ollama's private
  extract_final_answer, which now delegates) so the tool-calling final
  content paths can reuse it; unit tests for tagged/plain/unclosed/empty
  cases.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 18:29:06 -04:00
Cameron Cordes 8e4f91561b Add per-file insight history endpoint and rate-by-id
Expose GET /insights/history?path=... returning every generated version
of a photo's insight (current plus superseded), newest-first, backing the
mobile per-file insight history view.

- New get_insight_history_handler; reuses the existing get_insight_history
  DAO method (removed its dead_code allow).
- impl From<PhotoInsight> for PhotoInsightResponse, collapsing the mapping
  that was duplicated across the single-get and all-insights handlers.
- rate_insight_by_id DAO method + optional insight_id on RateInsightRequest
  so previously generated versions can be approved/rejected (the path-based
  rate only touches the current row).
- DAO tests for history ordering/scoping and id-targeted rating.
- cargo fmt normalized a multi-line assert in insight_chat.rs tests.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 18:28:22 -04:00
12 changed files with 837 additions and 183 deletions
+10 -2
View File
@@ -671,6 +671,11 @@ LLAMA_SWAP_TTS_REQUEST_TIMEOUT_SECONDS=600 # Per-request synth timeout (long
# Insight Chat Continuation
AGENTIC_CHAT_MAX_ITERATIONS=6 # Cap on tool-calling iterations per chat turn (default 6)
AGENTIC_CHAT_DEFAULT_NUM_CTX=32768 # Assumed context window for the history-truncation budget
# when a chat request omits num_ctx (default 32768). Size to
# the smallest context among the chat models actually served;
# too small silently guts replayed history every turn (and
# destroys llama.cpp KV-cache prefix reuse).
```
**AI Insights Fallback Behavior:**
@@ -794,14 +799,17 @@ Per-`(library_id, file_path)` async mutex (`AppState.insight_chat.chat_locks`)
serialises concurrent turns on the same insight so the JSON blob doesn't race.
Context management is a soft bound: if the serialized history exceeds
`num_ctx - 2048` tokens (cheap 4-byte/token heuristic), the oldest
assistant-tool_call + tool_result pairs are dropped until under budget. The
`num_ctx - 2048` tokens (cheap 4-byte/token heuristic; `num_ctx` defaults
to `AGENTIC_CHAT_DEFAULT_NUM_CTX`, 32768, when the request omits it), the
oldest assistant-tool_call + tool_result pairs are dropped until under budget. The
initial user message (with any images) and system prompt are always preserved.
The `truncated` event / flag is surfaced to the client when a drop occurred.
Configurable env:
- `AGENTIC_CHAT_MAX_ITERATIONS` — cap on tool-calling iterations per turn
(default 6). Per-request `max_iterations` is clamped to this cap.
- `AGENTIC_CHAT_DEFAULT_NUM_CTX` — assumed context window for the truncation
budget when the request omits `num_ctx` (default 32768).
**Apollo Places integration (optional):**
+92 -47
View File
@@ -8,7 +8,7 @@ use crate::ai::insight_chat::{ChatStreamEvent, ChatTurnRequest};
use crate::ai::ollama::ChatMessage;
use crate::ai::{ModelCapabilities, OllamaClient};
use crate::data::Claims;
use crate::database::models::{InsightGenerationType, InsightJobStatus};
use crate::database::models::{InsightGenerationType, InsightJobStatus, PhotoInsight};
use crate::database::{ExifDao, InsightDao};
use crate::libraries;
use crate::otel::{extract_context_from_request, global_tracer};
@@ -273,6 +273,11 @@ pub async fn cancel_generation_handler(
pub struct RateInsightRequest {
pub file_path: String,
pub approved: bool,
/// When set, rate this specific insight version by primary key
/// (used by the per-file history view to rate superseded versions).
/// When omitted, the current insight for `file_path` is rated.
#[serde(default)]
pub insight_id: Option<i32>,
}
#[derive(Debug, Deserialize)]
@@ -333,6 +338,31 @@ pub struct PhotoInsightResponse {
pub persona_id: Option<String>,
}
impl From<PhotoInsight> for PhotoInsightResponse {
fn from(insight: PhotoInsight) -> Self {
PhotoInsightResponse {
id: insight.id,
file_path: insight.file_path,
title: insight.title,
summary: insight.summary,
generated_at: insight.generated_at,
model_version: insight.model_version,
prompt_eval_count: insight.prompt_eval_count,
eval_count: insight.eval_count,
approved: insight.approved,
has_training_messages: insight.training_messages.is_some(),
backend: insight.backend,
num_ctx: insight.num_ctx,
temperature: insight.temperature,
top_p: insight.top_p,
top_k: insight.top_k,
min_p: insight.min_p,
system_prompt: insight.system_prompt,
persona_id: insight.persona_id,
}
}
}
#[derive(Debug, Serialize)]
pub struct AvailableModelsResponse {
pub primary: ServerModels,
@@ -554,29 +584,7 @@ pub async fn get_insight_handler(
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
match dao.get_insight_for_paths(&otel_context, &sibling_paths) {
Ok(Some(insight)) => {
let response = PhotoInsightResponse {
id: insight.id,
file_path: insight.file_path,
title: insight.title,
summary: insight.summary,
generated_at: insight.generated_at,
model_version: insight.model_version,
prompt_eval_count: insight.prompt_eval_count,
eval_count: insight.eval_count,
approved: insight.approved,
has_training_messages: insight.training_messages.is_some(),
backend: insight.backend,
num_ctx: insight.num_ctx,
temperature: insight.temperature,
top_p: insight.top_p,
top_k: insight.top_k,
min_p: insight.min_p,
system_prompt: insight.system_prompt,
persona_id: insight.persona_id,
};
HttpResponse::Ok().json(response)
}
Ok(Some(insight)) => HttpResponse::Ok().json(PhotoInsightResponse::from(insight)),
Ok(None) => HttpResponse::NotFound().json(serde_json::json!({
"error": "Insight not found"
})),
@@ -631,26 +639,7 @@ pub async fn get_all_insights_handler(
Ok(insights) => {
let responses: Vec<PhotoInsightResponse> = insights
.into_iter()
.map(|insight| PhotoInsightResponse {
id: insight.id,
file_path: insight.file_path,
title: insight.title,
summary: insight.summary,
generated_at: insight.generated_at,
model_version: insight.model_version,
prompt_eval_count: insight.prompt_eval_count,
eval_count: insight.eval_count,
approved: insight.approved,
has_training_messages: insight.training_messages.is_some(),
backend: insight.backend,
num_ctx: insight.num_ctx,
temperature: insight.temperature,
top_p: insight.top_p,
top_k: insight.top_k,
min_p: insight.min_p,
system_prompt: insight.system_prompt,
persona_id: insight.persona_id,
})
.map(PhotoInsightResponse::from)
.collect();
HttpResponse::Ok().json(responses)
@@ -664,6 +653,39 @@ pub async fn get_all_insights_handler(
}
}
/// GET /insights/history?path=/path/to/photo.jpg - Get all insight versions
/// for a single photo (current plus previously generated/superseded ones),
/// newest first. Backs the per-file insight history view.
#[get("/insights/history")]
pub async fn get_insight_history_handler(
_claims: Claims,
query: web::Query<GetPhotoInsightQuery>,
insight_dao: web::Data<std::sync::Mutex<Box<dyn InsightDao>>>,
) -> impl Responder {
let normalized_path = normalize_path(&query.path);
log::debug!("Fetching insight history for {}", normalized_path);
let otel_context = opentelemetry::Context::new();
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
match dao.get_insight_history(&otel_context, &normalized_path) {
Ok(insights) => {
let responses: Vec<PhotoInsightResponse> = insights
.into_iter()
.map(PhotoInsightResponse::from)
.collect();
HttpResponse::Ok().json(responses)
}
Err(e) => {
log::error!("Failed to fetch insight history ({}): {:?}", &query.path, e);
HttpResponse::InternalServerError().json(serde_json::json!({
"error": format!("Failed to fetch insight history: {:?}", e)
}))
}
}
}
/// POST /insights/generate/agentic - Generate insight using agentic tool-calling loop (async)
#[post("/insights/generate/agentic")]
pub async fn generate_agentic_insight_handler(
@@ -787,6 +809,21 @@ pub async fn generate_agentic_insight_handler(
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "default".to_string());
// Server-side persona resolution: an explicit client `system_prompt`
// wins; otherwise the persona's stored prompt from the persona store;
// otherwise None and `build_system_content` applies its neutral
// default. Without the lookup, a request carrying only `persona_id`
// silently generated in the default voice.
let system_prompt = request
.system_prompt
.clone()
.filter(|s| !s.trim().is_empty())
.or_else(|| {
app_state
.insight_generator
.persona_system_prompt(user_id, &persona_id)
});
let max_iterations: usize = std::env::var("AGENTIC_MAX_ITERATIONS")
.ok()
.and_then(|v| v.parse().ok())
@@ -812,7 +849,7 @@ pub async fn generate_agentic_insight_handler(
generator_for_task.generate_agentic_insight_for_photo(
&path_for_task,
request.model.clone(),
request.system_prompt.clone(),
system_prompt,
request.num_ctx,
request.temperature,
request.top_p,
@@ -1012,15 +1049,23 @@ pub async fn rate_insight_handler(
) -> impl Responder {
let normalized_path = normalize_path(&request.file_path);
log::info!(
"Rating insight for {}: approved={}",
"Rating insight for {} (id={:?}): approved={}",
normalized_path,
request.insight_id,
request.approved
);
let otel_context = opentelemetry::Context::new();
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
match dao.rate_insight(&otel_context, &normalized_path, request.approved) {
// Rate a specific version by id when provided (history view), otherwise
// rate the current insight for the path.
let result = match request.insight_id {
Some(id) => dao.rate_insight_by_id(&otel_context, id, request.approved),
None => dao.rate_insight(&otel_context, &normalized_path, request.approved),
};
match result {
Ok(()) => HttpResponse::Ok().json(serde_json::json!({
"success": true,
"message": "Insight rated successfully"
+169 -34
View File
@@ -19,7 +19,13 @@ use futures::stream::{BoxStream, StreamExt};
use uuid::Uuid;
const DEFAULT_MAX_ITERATIONS: usize = 6;
const DEFAULT_NUM_CTX: i32 = 8192;
/// Assumed context window when the request doesn't specify `num_ctx`.
/// The llama-swap chat slots serve 20k-131k contexts and real conversations
/// rarely pass ~16k tokens, so 32k keeps the truncation pass from gutting
/// history that the server could comfortably hold (which also destroys the
/// server's KV-cache prefix reuse). Override per-deploy with
/// AGENTIC_CHAT_DEFAULT_NUM_CTX if the serving models change shape.
const DEFAULT_NUM_CTX: i32 = 32768;
/// Headroom reserved for the model's response, deducted from the context
/// budget when deciding whether to truncate the replayed history.
const RESPONSE_HEADROOM_TOKENS: usize = 2048;
@@ -33,6 +39,12 @@ const BYTES_PER_TOKEN: usize = 4;
/// characters) must NOT be counted as text bytes — doing so dwarfs the entire
/// text budget and forces spurious truncation on every turn.
const IMAGE_TOKENS_EACH: usize = 1300;
/// User prompt injected when the agentic loop exhausts its iteration budget
/// without producing a tool-free reply. Internal scaffolding only — it is
/// stripped from the transcript before persistence (see
/// [`push_synthetic_final_prompt`] / [`remove_synthetic_final_prompt`]).
const SYNTHETIC_FINAL_ANSWER_PROMPT: &str =
"Please write your final answer now without calling any more tools.";
pub type ChatLockMap = Arc<TokioMutex<HashMap<(i32, String), Arc<TokioMutex<()>>>>>;
@@ -361,7 +373,7 @@ impl InsightChatService {
// 6. Apply truncation budget. Drops oldest tool_call+tool pairs
// (preserves system + first user including any images).
let budget_tokens = (req.num_ctx.unwrap_or(DEFAULT_NUM_CTX) as usize)
let budget_tokens = (req.num_ctx.unwrap_or_else(env_default_num_ctx) as usize)
.saturating_sub(RESPONSE_HEADROOM_TOKENS);
let budget_bytes = budget_tokens.saturating_mul(BYTES_PER_TOKEN);
let truncated = apply_context_budget(&mut messages, budget_bytes);
@@ -457,9 +469,7 @@ impl InsightChatService {
"Chat loop exhausted after {} iterations, requesting final answer",
iterations_used
);
messages.push(ChatMessage::user(
"Please write your final answer now without calling any more tools.",
));
let synthetic_idx = push_synthetic_final_prompt(&mut messages);
let (final_response, prompt_tokens, eval_tokens) = backend
.chat()
.chat_with_tools(messages.clone(), vec![])
@@ -468,8 +478,15 @@ impl InsightChatService {
last_eval_count = eval_tokens;
final_content = final_response.content.clone();
messages.push(final_response);
// Drop the synthetic prompt before persistence — internal
// scaffolding only (mirrors both streaming variants).
remove_synthetic_final_prompt(&mut messages, synthetic_idx);
}
// Strip any leaked <think> reasoning block from the content we
// return / persist as the reply (the raw transcript keeps it).
let final_content = crate::ai::llm_client::strip_think_blocks(&final_content);
loop_cx.span().set_status(Status::Ok);
// Drop the per-turn iteration-budget note from the system message
@@ -853,7 +870,7 @@ impl InsightChatService {
None
};
let budget_tokens = (req.num_ctx.unwrap_or(DEFAULT_NUM_CTX) as usize)
let budget_tokens = (req.num_ctx.unwrap_or_else(env_default_num_ctx) as usize)
.saturating_sub(RESPONSE_HEADROOM_TOKENS);
let budget_bytes = budget_tokens.saturating_mul(BYTES_PER_TOKEN);
let truncated = apply_context_budget(&mut messages, budget_bytes);
@@ -1039,7 +1056,12 @@ impl InsightChatService {
);
let tools = InsightGenerator::build_tool_definitions(gate_opts);
let persona = resolve_bootstrap_system_prompt(req.system_prompt.as_deref());
// Server-side persona resolution: explicit client system_prompt wins;
// else the active persona's stored prompt; else the neutral default.
let persona_prompt = self
.generator
.persona_system_prompt(req.user_id, &active_persona);
let persona = resolve_bootstrap_system_prompt(req.system_prompt.as_deref(), persona_prompt);
let system_content = build_bootstrap_system_message(
&persona,
&normalized,
@@ -1263,10 +1285,7 @@ impl InsightChatService {
// No-tools fallback
if final_content.is_empty() {
let synthetic_idx = messages.len();
messages.push(ChatMessage::user(
"Please write your final answer now without calling any more tools.",
));
let synthetic_idx = push_synthetic_final_prompt(messages);
let mut stream = backend
.chat()
.chat_with_tools_stream(messages.clone(), vec![])
@@ -1294,7 +1313,7 @@ impl InsightChatService {
final_message.ok_or_else(|| anyhow!("final stream ended without a Done event"))?;
final_content = final_response.content.clone();
messages.push(final_response);
messages.remove(synthetic_idx);
remove_synthetic_final_prompt(messages, synthetic_idx);
}
Ok(AgenticLoopOutcome {
@@ -1302,7 +1321,9 @@ impl InsightChatService {
iterations_used,
last_prompt_eval_count,
last_eval_count,
final_content,
// Strip any leaked <think> reasoning block from the content the
// caller persists as title/summary (the raw transcript keeps it).
final_content: crate::ai::llm_client::strip_think_blocks(&final_content),
cancelled: false,
})
}
@@ -1431,7 +1452,7 @@ impl InsightChatService {
};
// Truncate before appending the new user turn.
let budget_tokens = (req.num_ctx.unwrap_or(DEFAULT_NUM_CTX) as usize)
let budget_tokens = (req.num_ctx.unwrap_or_else(env_default_num_ctx) as usize)
.saturating_sub(RESPONSE_HEADROOM_TOKENS);
let budget_bytes = budget_tokens.saturating_mul(BYTES_PER_TOKEN);
let truncated = apply_context_budget(&mut messages, budget_bytes);
@@ -1648,7 +1669,12 @@ impl InsightChatService {
// get_sms_messages / reverse_geocode / get_personal_place_at
// the args they need. In hybrid mode the visual description
// belongs here for the same reason.
let persona = resolve_bootstrap_system_prompt(req.system_prompt.as_deref());
// Server-side persona resolution: explicit client system_prompt wins;
// else the active persona's stored prompt; else the neutral default.
let persona_prompt = self
.generator
.persona_system_prompt(req.user_id, &active_persona);
let persona = resolve_bootstrap_system_prompt(req.system_prompt.as_deref(), persona_prompt);
let system_content = build_bootstrap_system_message(
&persona,
&normalized,
@@ -1866,10 +1892,7 @@ impl InsightChatService {
// and load_history's user-turn handler doesn't reset
// pending_tools at this position (wiping the prior tool
// calls from the final assistant render).
let synthetic_idx = messages.len();
messages.push(ChatMessage::user(
"Please write your final answer now without calling any more tools.",
));
let synthetic_idx = push_synthetic_final_prompt(messages);
let mut stream = backend
.chat()
.chat_with_tools_stream(messages.clone(), vec![])
@@ -1900,7 +1923,7 @@ impl InsightChatService {
// Drop the synthetic prompt — internal scaffolding only. The
// model's final_response (now at the end) was generated with
// it in context and reads coherently without it on replay.
messages.remove(synthetic_idx);
remove_synthetic_final_prompt(messages, synthetic_idx);
}
Ok(AgenticLoopOutcome {
@@ -1908,7 +1931,9 @@ impl InsightChatService {
iterations_used,
last_prompt_eval_count,
last_eval_count,
final_content,
// Strip any leaked <think> reasoning block from the content the
// caller persists as title/summary (the raw transcript keeps it).
final_content: crate::ai::llm_client::strip_think_blocks(&final_content),
cancelled: false,
})
}
@@ -1921,15 +1946,21 @@ const BOOTSTRAP_DEFAULT_SYSTEM_PROMPT: &str = "You are a helpful AI assistant an
Use the available tools to gather context and answer their questions \
in a conversational tone.";
/// Pick the system prompt for bootstrap. Trimmed-non-empty supplied wins;
/// otherwise fall back to [`BOOTSTRAP_DEFAULT_SYSTEM_PROMPT`]. Returns an
/// owned `String` because the bootstrap caller persists it on the new
/// insight row.
fn resolve_bootstrap_system_prompt(supplied: Option<&str>) -> String {
/// Pick the system prompt for bootstrap. Precedence: trimmed-non-empty
/// `supplied` (the client's explicit `system_prompt`) wins; else
/// `persona_prompt` (the active persona's stored prompt, resolved
/// server-side from the persona store); else
/// [`BOOTSTRAP_DEFAULT_SYSTEM_PROMPT`]. Returns an owned `String` because
/// the bootstrap caller persists it on the new insight row.
fn resolve_bootstrap_system_prompt(
supplied: Option<&str>,
persona_prompt: Option<String>,
) -> String {
supplied
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_string)
.or_else(|| persona_prompt.filter(|s| !s.trim().is_empty()))
.unwrap_or_else(|| BOOTSTRAP_DEFAULT_SYSTEM_PROMPT.to_string())
}
@@ -2166,6 +2197,17 @@ fn env_max_iterations() -> usize {
.max(1)
}
/// Read AGENTIC_CHAT_DEFAULT_NUM_CTX once per call — the assumed context
/// window for the truncation budget when the request omits `num_ctx`. Same
/// no-static-global rationale as `env_max_iterations` above.
fn env_default_num_ctx() -> i32 {
std::env::var("AGENTIC_CHAT_DEFAULT_NUM_CTX")
.ok()
.and_then(|s| s.parse::<i32>().ok())
.unwrap_or(DEFAULT_NUM_CTX)
.max(RESPONSE_HEADROOM_TOKENS as i32 + 1024)
}
/// Append a per-turn iteration-budget reminder to the replayed system
/// message so the model knows how many tool-calling rounds this turn gets.
/// Returns the original `content` so the caller can restore it before
@@ -2200,6 +2242,30 @@ fn restore_system_content(messages: &mut [ChatMessage], original: Option<String>
}
}
/// Append the synthetic "write your final answer" user prompt, returning the
/// index the caller must later hand to [`remove_synthetic_final_prompt`].
/// Used when the agentic loop exhausts its budget: the model gets one more
/// (tool-free) request, but the nudge itself must never persist — it would
/// render as a user bubble in the transcript and reset `load_history`'s
/// pending-tools tracking at that position.
fn push_synthetic_final_prompt(messages: &mut Vec<ChatMessage>) -> usize {
let idx = messages.len();
messages.push(ChatMessage::user(SYNTHETIC_FINAL_ANSWER_PROMPT));
idx
}
/// Remove the synthetic prompt inserted by [`push_synthetic_final_prompt`].
/// Defensive no-op when the message at `idx` isn't the synthetic prompt —
/// guards against index drift if the surrounding code is reordered.
fn remove_synthetic_final_prompt(messages: &mut Vec<ChatMessage>, idx: usize) {
if messages
.get(idx)
.is_some_and(|m| m.role == "user" && m.content == SYNTHETIC_FINAL_ANSWER_PROMPT)
{
messages.remove(idx);
}
}
/// Receipt produced by [`apply_system_prompt_override`] so the caller can
/// undo the override before persistence. Two variants because we either
/// replaced an existing system message (need its original content) or
@@ -2467,13 +2533,16 @@ mod tests {
assistant_text("here is the answer"),
];
// Default budget: (8192 - 2048) * 4 bytes ≈ 24KB. The text easily fits;
// only the (excluded) image bytes could blow it.
// Default budget: (32768 - 2048) * 4 bytes ≈ 120KB. The text easily
// fits; only the (excluded) image bytes could blow it.
let budget_bytes = (DEFAULT_NUM_CTX as usize - RESPONSE_HEADROOM_TOKENS) * BYTES_PER_TOKEN;
let original_len = msgs.len();
let dropped = apply_context_budget(&mut msgs, budget_bytes);
assert!(!dropped, "short conversation with one image must not truncate");
assert!(
!dropped,
"short conversation with one image must not truncate"
);
assert_eq!(msgs.len(), original_len, "no messages should be dropped");
// Sanity: the flat image charge is accounted for but stays well under budget.
assert!(estimate_bytes(&msgs) <= budget_bytes);
@@ -2640,26 +2709,26 @@ mod tests {
#[test]
fn bootstrap_system_prompt_falls_back_to_default_for_none() {
let out = resolve_bootstrap_system_prompt(None);
let out = resolve_bootstrap_system_prompt(None, None);
assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT);
}
#[test]
fn bootstrap_system_prompt_falls_back_to_default_for_empty_string() {
// Apollo currently sends `''` when no persona is selected.
let out = resolve_bootstrap_system_prompt(Some(""));
let out = resolve_bootstrap_system_prompt(Some(""), None);
assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT);
}
#[test]
fn bootstrap_system_prompt_falls_back_to_default_for_whitespace() {
let out = resolve_bootstrap_system_prompt(Some(" \n\t "));
let out = resolve_bootstrap_system_prompt(Some(" \n\t "), None);
assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT);
}
#[test]
fn bootstrap_system_prompt_uses_supplied_when_non_empty() {
let out = resolve_bootstrap_system_prompt(Some("you are a journal"));
let out = resolve_bootstrap_system_prompt(Some("you are a journal"), None);
assert_eq!(out, "you are a journal");
}
@@ -2668,10 +2737,76 @@ mod tests {
// Trim only happens at the edges — interior newlines and spacing
// (which Apollo's persona uses for tool listings) must survive.
let prompt = "line one\nline two\n bullet";
let out = resolve_bootstrap_system_prompt(Some(prompt));
let out = resolve_bootstrap_system_prompt(Some(prompt), None);
assert_eq!(out, prompt);
}
#[test]
fn bootstrap_system_prompt_explicit_wins_over_persona_store() {
let out = resolve_bootstrap_system_prompt(
Some("explicit prompt"),
Some("stored persona prompt".to_string()),
);
assert_eq!(out, "explicit prompt");
}
#[test]
fn bootstrap_system_prompt_uses_persona_store_when_no_explicit() {
// Request carried persona_id but no system_prompt — the persona's
// stored prompt must be used, not the neutral default.
let out = resolve_bootstrap_system_prompt(None, Some("stored persona prompt".to_string()));
assert_eq!(out, "stored persona prompt");
// Empty explicit prompt behaves like None.
let out =
resolve_bootstrap_system_prompt(Some(""), Some("stored persona prompt".to_string()));
assert_eq!(out, "stored persona prompt");
}
#[test]
fn bootstrap_system_prompt_blank_persona_prompt_falls_to_default() {
let out = resolve_bootstrap_system_prompt(None, Some(" ".to_string()));
assert_eq!(out, BOOTSTRAP_DEFAULT_SYSTEM_PROMPT);
}
// ── Synthetic final-answer prompt scaffolding ──────────────────────
#[test]
fn synthetic_final_prompt_round_trip_leaves_no_scaffolding() {
// Exhausted-loop fallback: nudge pushed, model reply appended, nudge
// removed — the persisted transcript must contain the reply but not
// the synthetic user prompt (all three loop variants rely on this).
let mut msgs = vec![
ChatMessage::system("sys"),
ChatMessage::user("q"),
assistant_with_tool_call("lookup"),
ChatMessage::tool_result("data"),
];
let idx = push_synthetic_final_prompt(&mut msgs);
assert_eq!(msgs[idx].content, SYNTHETIC_FINAL_ANSWER_PROMPT);
msgs.push(assistant_text("final answer"));
remove_synthetic_final_prompt(&mut msgs, idx);
assert_eq!(msgs.len(), 5);
assert!(
msgs.iter()
.all(|m| m.content != SYNTHETIC_FINAL_ANSWER_PROMPT),
"synthetic prompt must not persist"
);
assert_eq!(msgs.last().unwrap().content, "final answer");
}
#[test]
fn remove_synthetic_final_prompt_is_noop_on_index_mismatch() {
// Defensive guard: if the message at idx isn't the synthetic prompt
// (index drift), nothing is removed.
let mut msgs = vec![ChatMessage::user("q"), assistant_text("a")];
remove_synthetic_final_prompt(&mut msgs, 0);
remove_synthetic_final_prompt(&mut msgs, 5);
assert_eq!(msgs.len(), 2);
}
#[test]
fn bootstrap_backend_defaults_to_local_when_none() {
let out = resolve_bootstrap_backend(None).unwrap();
+209 -24
View File
@@ -28,6 +28,11 @@ use crate::otel::global_tracer;
use crate::tags::TagDao;
use crate::utils::{earliest_fs_time, normalize_path};
/// Max location records rendered by `tool_get_location_history`. The DAO
/// query is range-bounded, not limited, so the tool caps the rendered list
/// and labels the truncation via `found_header`.
const LOCATION_HISTORY_DISPLAY_LIMIT: usize = 20;
/// Parse a "Title: ...\n\n<body>" response into (title, body).
/// Falls back to the first sentence as the title if the model didn't
/// follow the format.
@@ -218,15 +223,11 @@ impl InsightGenerator {
/// be called once per chat turn / generation. `has_vision` is
/// supplied by the caller because it depends on the model selected
/// for this turn, not on persistent state.
pub fn current_gate_opts(&self, has_vision: bool) -> ToolGateOpts {
self.current_gate_opts_for_persona(has_vision, None)
}
/// Same as `current_gate_opts` but resolves the per-persona
/// `allow_agent_corrections` flag too. Pass `Some((user_id,
/// persona_id))` when generating in a persona context (every chat
/// turn does); pass `None` for callers that don't have one yet
/// (cold paths, populate_knowledge bin), which defaults the gate
///
/// Also resolves the per-persona `allow_agent_corrections` flag.
/// Pass `Some((user_id, persona_id))` when generating in a persona
/// context (every chat turn and agentic generation does); pass
/// `None` for callers that don't have one, which defaults the gate
/// to closed — the conservative posture.
pub fn current_gate_opts_for_persona(
&self,
@@ -277,6 +278,22 @@ impl InsightGenerator {
}
}
/// Resolve the stored system prompt for `(user_id, persona_id)` from
/// the persona store. Returns `None` when the persona row doesn't
/// exist or its prompt is blank — callers fall back to their own
/// default. Used for server-side persona resolution: a request that
/// carries `persona_id` but no explicit `system_prompt` should speak
/// in the persona's stored voice, not the neutral default.
pub(crate) fn persona_system_prompt(&self, user_id: i32, persona_id: &str) -> Option<String> {
let cx = opentelemetry::Context::new();
let mut pdao = self.persona_dao.lock().expect("Unable to lock PersonaDao");
pdao.get_persona(&cx, user_id, persona_id)
.ok()
.flatten()
.map(|p| p.system_prompt)
.filter(|s| !s.trim().is_empty())
}
/// Resolve `rel_path` against the configured libraries, returning the
/// first root under which the file exists. Insights may be generated
/// for any library — the generator itself doesn't know which — so we
@@ -1673,6 +1690,10 @@ Return ONLY the summary, nothing else."#,
self.tool_recall_facts_for_photo(arguments, user_id, persona_id, cx)
.await
}
"recall_facts_for_entity" => {
self.tool_recall_facts_for_entity(arguments, user_id, persona_id, cx)
.await
}
"store_entity" => self.tool_store_entity(arguments, cx).await,
"store_fact" => {
self.tool_store_fact(
@@ -2145,8 +2166,8 @@ Return ONLY the summary, nothing else."#,
})
.collect();
format!(
"Found {} messages:\n{}",
messages.len(),
"{}\n{}",
Self::found_header(messages.len(), formatted.len(), "messages"),
formatted.join("\n")
)
}
@@ -2171,7 +2192,8 @@ Return ONLY the summary, nothing else."#,
let days_radius = args
.get("days_radius")
.and_then(|v| v.as_i64())
.unwrap_or(7);
.unwrap_or(7)
.clamp(1, 30);
let limit = args
.get("limit")
.and_then(|v| v.as_i64())
@@ -2250,7 +2272,8 @@ Return ONLY the summary, nothing else."#,
let days_radius = args
.get("days_radius")
.and_then(|v| v.as_i64())
.unwrap_or(14);
.unwrap_or(14)
.clamp(1, 30);
let date = match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
Ok(d) => d,
@@ -2279,7 +2302,7 @@ Return ONLY the summary, nothing else."#,
Some(locs) if !locs.is_empty() => {
let formatted: Vec<String> = locs
.iter()
.take(20)
.take(LOCATION_HISTORY_DISPLAY_LIMIT)
.map(|loc| {
let dt = DateTime::from_timestamp(loc.timestamp, 0)
.map(|dt| {
@@ -2305,8 +2328,8 @@ Return ONLY the summary, nothing else."#,
})
.collect();
format!(
"Found {} location records:\n{}",
locs.len(),
"{}\n{}",
Self::found_header(locs.len(), formatted.len(), "location records"),
formatted.join("\n")
)
}
@@ -2315,6 +2338,20 @@ Return ONLY the summary, nothing else."#,
}
}
/// Render a `Found N <noun>:` tool-result header, annotating when only
/// the first `shown` of `total` items are listed below it. Without the
/// annotation the model believes it saw everything ("Found 312
/// messages:" followed by 60 lines) and reasons from a silently
/// truncated list. `summarize_tool_result` keeps parsing the leading
/// count either way.
fn found_header(total: usize, shown: usize, noun: &str) -> String {
if shown < total {
format!("Found {} {} (showing first {}):", total, noun, shown)
} else {
format!("Found {} {}:", total, noun)
}
}
/// Tool: get_file_tags — fetch tags for a file path
async fn tool_get_file_tags(
&self,
@@ -2669,6 +2706,102 @@ Return ONLY the summary, nothing else."#,
}
}
/// Tool: recall_facts_for_entity — retrieve facts for one entity by id.
/// Persona scoping and the reviewed-only-facts filter mirror
/// `tool_recall_facts_for_photo` exactly: reads are always Single
/// (user_id, persona_id), and strict-mode personas see only
/// human-reviewed facts.
async fn tool_recall_facts_for_entity(
&self,
args: &serde_json::Value,
user_id: i32,
persona_id: &str,
cx: &opentelemetry::Context,
) -> String {
use crate::database::PersonaFilter;
let persona_filter = PersonaFilter::Single {
user_id,
persona_id: persona_id.to_string(),
};
let entity_id = match args.get("entity_id").and_then(|v| v.as_i64()) {
Some(id) => id as i32,
None => return "Error: missing required parameter 'entity_id'".to_string(),
};
let limit = args
.get("limit")
.and_then(|v| v.as_i64())
.unwrap_or(50)
.clamp(1, 100) as usize;
log::info!(
"tool_recall_facts_for_entity: entity_id={}, limit={}",
entity_id,
limit
);
// Resolve the persona's reviewed-only-mode flag once — identical
// fallback semantics to recall_facts_for_photo (missing persona
// row → permissive active+reviewed default).
let reviewed_only = {
let mut pdao = self.persona_dao.lock().expect("Unable to lock PersonaDao");
pdao.get_persona(cx, user_id, persona_id)
.ok()
.flatten()
.map(|p| p.reviewed_only_facts)
.unwrap_or(false)
};
let mut kdao = self
.knowledge_dao
.lock()
.expect("Unable to lock KnowledgeDao");
let entity = match kdao.get_entity_by_id(cx, entity_id) {
Ok(Some(e)) => e,
Ok(None) => return format!("No entity found with ID {}.", entity_id),
Err(e) => return format!("Error fetching entity: {:?}", e),
};
let mut output_lines = vec![format!("Entity: {} ({})", entity.name, entity.entity_type)];
match kdao.get_facts_for_entity(cx, entity_id, &persona_filter) {
Ok(facts) => {
// Default scope: active + reviewed. Strict mode trims to
// reviewed only — same allow rule as recall_facts_for_photo.
let allow = |s: &str| -> bool {
if reviewed_only {
s == "reviewed"
} else {
s == "active" || s == "reviewed"
}
};
for f in facts.iter().filter(|f| allow(&f.status)).take(limit) {
let obj = if let Some(ref v) = f.object_value {
v.clone()
} else if let Some(oid) = f.object_entity_id {
kdao.get_entity_by_id(cx, oid)
.ok()
.flatten()
.map(|e| format!("{} (entity ID: {})", e.name, e.id))
.unwrap_or_else(|| format!("entity:{}", oid))
} else {
"(unknown)".to_string()
};
output_lines.push(format!(" - {} {}", f.predicate, obj));
}
}
Err(e) => return format!("Error fetching facts: {:?}", e),
}
if output_lines.len() == 1 {
format!(
"No active knowledge facts found for entity {} (ID: {}).",
entity.name, entity_id
)
} else {
format!("Knowledge for this entity:\n{}", output_lines.join("\n"))
}
}
/// Tool: store_entity — upsert an entity into the knowledge memory.
/// Embeddings go through the configured local backend (`LLM_BACKEND`),
/// independent of the per-request chat backend in the caller.
@@ -3063,7 +3196,7 @@ Return ONLY the summary, nothing else."#,
/// Build the list of tool definitions for the agentic loop, gated by
/// `opts`. Always-on tools: `search_messages`, `get_sms_messages`,
/// `get_file_tags`, `reverse_geocode`, `get_current_datetime`, the
/// four knowledge-memory tools. Conditional: `describe_photo` (vision
/// five knowledge-memory tools. Conditional: `describe_photo` (vision
/// model), `get_personal_place_at` (Apollo configured), `search_rag`
/// (daily_summaries populated), `get_calendar_events` (calendar
/// populated), `get_location_history` (location history populated).
@@ -3280,6 +3413,22 @@ Return ONLY the summary, nothing else."#,
}),
));
tools.push(Tool::function(
"recall_facts_for_entity",
"Retrieve all stored facts about one specific entity by its ID. Use to follow up on an entity \
surfaced by `recall_entities` (or referenced by another fact's object) that is NOT linked to the \
current photo `recall_facts_for_photo` only covers photo-linked entities. \
Example: `{entity_id: 7}` everything known about entity 7.",
serde_json::json!({
"type": "object",
"required": ["entity_id"],
"properties": {
"entity_id": { "type": "integer", "description": "ID of the entity to fetch facts for (from recall_entities, store_entity, or a fact's object reference)." },
"limit": { "type": "integer", "description": "Max facts to return (default 50, max 100)." }
}
}),
));
tools.push(Tool::function(
"store_entity",
"Upsert a person / place / event / thing into the knowledge memory. Returns the entity id (use it as \
@@ -3543,7 +3692,7 @@ Return ONLY the summary, nothing else."#,
format!("{} personal place(s)", n)
}
}
"recall_entities" | "recall_facts_for_photo" => {
"recall_entities" | "recall_facts_for_photo" | "recall_facts_for_entity" => {
let n = raw.lines().skip(1).filter(|l| !l.trim().is_empty()).count();
let kind = if tool_name == "recall_entities" {
"entities"
@@ -4033,8 +4182,11 @@ Return ONLY the summary, nothing else."#,
// 10. Define tools. describe_photo offered only when the chat model
// sees images directly (images_inline); in hybrid mode the visual
// description is already inlined as text.
let gate_opts = self.current_gate_opts(backend.images_inline);
// description is already inlined as text. Persona-aware so the
// persona's allow_agent_corrections gate opens here exactly like
// it does for chat turns (insight_chat does the same).
let gate_opts =
self.current_gate_opts_for_persona(backend.images_inline, Some((user_id, &persona_id)));
let tools = Self::build_tool_definitions(gate_opts);
// 11. Build initial messages. images_inline → attach base64 to the
@@ -4145,7 +4297,10 @@ Return ONLY the summary, nothing else."#,
.set_attribute(KeyValue::new("iterations_used", iterations_used as i64));
loop_cx.span().set_status(Status::Ok);
// 13. Parse title from the model's inline response.
// 13. Strip any leaked <think>…</think> reasoning block (thinking
// models emit it ahead of the answer; the raw transcript in
// training_messages keeps it), then parse the title.
final_content = crate::ai::llm_client::strip_think_blocks(&final_content);
let (title, body) = parse_title_body(&final_content);
final_content = body;
@@ -4341,6 +4496,7 @@ mod tests {
assert!(names.contains(&"get_current_datetime"));
assert!(names.contains(&"recall_entities"));
assert!(names.contains(&"recall_facts_for_photo"));
assert!(names.contains(&"recall_facts_for_entity"));
assert!(names.contains(&"store_entity"));
assert!(names.contains(&"store_fact"));
@@ -4550,6 +4706,37 @@ mod tests {
);
}
#[test]
fn found_header_labels_truncation_honestly() {
// Truncated: total exceeds what's listed below the header.
assert_eq!(
InsightGenerator::found_header(312, 60, "messages"),
"Found 312 messages (showing first 60):"
);
// Not truncated: plain header, no annotation noise.
assert_eq!(
InsightGenerator::found_header(7, 7, "messages"),
"Found 7 messages:"
);
assert_eq!(
InsightGenerator::found_header(40, 20, "location records"),
"Found 40 location records (showing first 20):"
);
}
#[test]
fn summarize_parses_truncated_found_header() {
// The "(showing first K)" annotation must not break the few-shot
// trajectory summarizer's "Found N" count parsing.
assert_eq!(
InsightGenerator::summarize_tool_result(
"get_sms_messages",
"Found 312 messages (showing first 60):\n[2023-08-15 10:00] Sarah: hi"
),
"312 messages"
);
}
fn make_search_hit(
id: i64,
contact: &str,
@@ -4869,12 +5056,10 @@ mod tests {
// Replicate the resolve_backend local-client construction
// (lines ~3686-3695 of this file).
let mut lc = base.clone();
if let Some(ref m) = overrides_model {
if !is_hybrid {
if !is_hybrid && let Some(ref m) = overrides_model {
lc.primary_model = m.clone();
lc.set_vision_model(m.clone());
}
}
// In hybrid mode the local client must keep its configured slots.
assert_eq!(
+119 -52
View File
@@ -590,10 +590,7 @@ impl LlmClient for LlamaCppClient {
let mut byte_stream = byte_stream;
let mut buf: Vec<u8> = Vec::new();
let mut accumulated_content = String::new();
let mut tool_state: std::collections::BTreeMap<
usize,
(Option<String>, Option<String>, String),
> = std::collections::BTreeMap::new();
let mut tool_state = ToolCallAssembly::new();
let mut role = "assistant".to_string();
let mut prompt_tokens: Option<i32> = None;
let mut completion_tokens: Option<i32> = None;
@@ -670,32 +667,7 @@ impl LlmClient for LlamaCppClient {
yield Ok(LlmStreamEvent::TextDelta(content.to_string()));
}
if let Some(tcs) = delta.get("tool_calls").and_then(|v| v.as_array()) {
for tc_delta in tcs {
let idx = tc_delta
.get("index")
.and_then(|n| n.as_u64())
.unwrap_or(0) as usize;
let entry = tool_state
.entry(idx)
.or_insert((None, None, String::new()));
if let Some(id) =
tc_delta.get("id").and_then(|v| v.as_str())
{
entry.0 = Some(id.to_string());
}
if let Some(func) = tc_delta.get("function") {
if let Some(name) =
func.get("name").and_then(|v| v.as_str())
{
entry.1 = Some(name.to_string());
}
if let Some(args) =
func.get("arguments").and_then(|v| v.as_str())
{
entry.2.push_str(args);
}
}
}
apply_tool_call_deltas(&mut tool_state, tcs);
}
}
if done_seen {
@@ -707,28 +679,7 @@ impl LlmClient for LlamaCppClient {
}
}
let tool_calls: Option<Vec<ToolCall>> = if tool_state.is_empty() {
None
} else {
let mut v = Vec::with_capacity(tool_state.len());
for (_idx, (id, name, args)) in tool_state {
let arguments: Value = if args.trim().is_empty() {
Value::Object(Default::default())
} else {
serde_json::from_str(&args).unwrap_or_else(|_| {
Value::Object(Default::default())
})
};
v.push(ToolCall {
id,
function: ToolCallFunction {
name: name.unwrap_or_default(),
arguments,
},
});
}
Some(v)
};
let tool_calls = finalize_tool_calls(tool_state);
if let Some(ref frame) = last_frame {
log_timings(frame, prompt_tokens, completion_tokens);
@@ -937,6 +888,58 @@ fn extract_error_detail(parsed: &Value) -> String {
raw.chars().take(300).collect()
}
/// Per-index assembly state for streamed OpenAI-style tool-call deltas:
/// `index → (id, name, concatenated argument fragments)`. BTreeMap so the
/// finalized calls come out in index order.
type ToolCallAssembly = std::collections::BTreeMap<usize, (Option<String>, Option<String>, String)>;
/// Fold one SSE frame's `delta.tool_calls` array into the assembly state.
/// Deltas carrying the same `index` merge into one call (llama.cpp streams a
/// call's argument JSON in fragments — they concatenate); distinct indexes
/// accumulate as separate calls.
fn apply_tool_call_deltas(state: &mut ToolCallAssembly, tcs: &[Value]) {
for tc_delta in tcs {
let idx = tc_delta.get("index").and_then(|n| n.as_u64()).unwrap_or(0) as usize;
let entry = state.entry(idx).or_insert((None, None, String::new()));
if let Some(id) = tc_delta.get("id").and_then(|v| v.as_str()) {
entry.0 = Some(id.to_string());
}
if let Some(func) = tc_delta.get("function") {
if let Some(name) = func.get("name").and_then(|v| v.as_str()) {
entry.1 = Some(name.to_string());
}
if let Some(args) = func.get("arguments").and_then(|v| v.as_str()) {
entry.2.push_str(args);
}
}
}
}
/// Convert assembled tool-call state into canonical `ToolCall`s, parsing each
/// call's concatenated argument JSON (empty / malformed → `{}`). `None` when
/// no tool-call deltas arrived.
fn finalize_tool_calls(state: ToolCallAssembly) -> Option<Vec<ToolCall>> {
if state.is_empty() {
return None;
}
let mut v = Vec::with_capacity(state.len());
for (_idx, (id, name, args)) in state {
let arguments: Value = if args.trim().is_empty() {
Value::Object(Default::default())
} else {
serde_json::from_str(&args).unwrap_or_else(|_| Value::Object(Default::default()))
};
v.push(ToolCall {
id,
function: ToolCallFunction {
name: name.unwrap_or_default(),
arguments,
},
});
}
Some(v)
}
fn find_double_newline(buf: &[u8]) -> Option<usize> {
for i in 0..buf.len().saturating_sub(1) {
if buf[i] == b'\n' && buf[i + 1] == b'\n' {
@@ -1302,4 +1305,68 @@ mod tests {
let c = LlamaCppClient::new(None, None);
assert_eq!(c.tts_model, "chatterbox");
}
#[test]
fn stream_assembly_keeps_two_tool_calls_from_separate_chunks() {
// llama.cpp emits one delta per SSE frame; two calls with distinct
// `index` values arriving in separate frames must BOTH survive.
let mut state = ToolCallAssembly::new();
apply_tool_call_deltas(
&mut state,
&[json!({
"index": 0,
"id": "call_a",
"function": { "name": "get_sms_messages", "arguments": "{\"date\":\"2019-01-01\"}" }
})],
);
apply_tool_call_deltas(
&mut state,
&[json!({
"index": 1,
"id": "call_b",
"function": { "name": "reverse_geocode", "arguments": "{\"latitude\":1.0,\"longitude\":2.0}" }
})],
);
let calls = finalize_tool_calls(state).expect("two calls assembled");
assert_eq!(calls.len(), 2);
assert_eq!(calls[0].id.as_deref(), Some("call_a"));
assert_eq!(calls[0].function.name, "get_sms_messages");
assert_eq!(calls[0].function.arguments["date"], "2019-01-01");
assert_eq!(calls[1].id.as_deref(), Some("call_b"));
assert_eq!(calls[1].function.name, "reverse_geocode");
assert_eq!(calls[1].function.arguments["latitude"], 1.0);
}
#[test]
fn stream_assembly_concatenates_argument_fragments_for_same_index() {
// A single call's argument JSON streamed across frames concatenates
// into one parseable document.
let mut state = ToolCallAssembly::new();
apply_tool_call_deltas(
&mut state,
&[json!({
"index": 0,
"id": "call_x",
"function": { "name": "search_messages", "arguments": "{\"query\":" }
})],
);
apply_tool_call_deltas(
&mut state,
&[json!({
"index": 0,
"function": { "arguments": "\"dinner\"}" }
})],
);
let calls = finalize_tool_calls(state).expect("one call assembled");
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].function.name, "search_messages");
assert_eq!(calls[0].function.arguments["query"], "dinner");
}
#[test]
fn stream_assembly_empty_state_finalizes_to_none() {
assert!(finalize_tool_calls(ToolCallAssembly::new()).is_none());
}
}
+52
View File
@@ -170,3 +170,55 @@ pub struct ModelCapabilities {
pub has_vision: bool,
pub has_tool_calling: bool,
}
/// Strip a leading `<think>…</think>` reasoning block from model output.
///
/// Thinking models sometimes emit chain-of-thought inside think tags before
/// the real answer. Everything after the first `</think>` is the answer;
/// when no tag is present — or the text after it is empty — the trimmed
/// input is returned unchanged. Mirrors the behavior Ollama's
/// `extract_final_answer` has applied to single-shot generation; shared here
/// so the tool-calling final-content paths (agentic generation + chat) can
/// apply the identical cleanup before parsing / persisting.
pub fn strip_think_blocks(response: &str) -> String {
let response = response.trim();
if let Some(pos) = response.find("</think>") {
let answer = response[pos + "</think>".len()..].trim();
if !answer.is_empty() {
return answer.to_string();
}
}
response.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn strip_think_blocks_removes_leading_think_block() {
let raw = "<think>\nLet me reason about this.\n</think>\n\nTitle: A Day Out\n\nThe body.";
assert_eq!(strip_think_blocks(raw), "Title: A Day Out\n\nThe body.");
}
#[test]
fn strip_think_blocks_passes_through_plain_content() {
assert_eq!(strip_think_blocks(" just an answer "), "just an answer");
}
#[test]
fn strip_think_blocks_keeps_content_when_answer_after_tag_is_empty() {
// A think block with nothing after it: better to return the trimmed
// original than an empty string (matches Ollama's fallback).
let raw = "<think>only thoughts</think>";
assert_eq!(strip_think_blocks(raw), raw);
}
#[test]
fn strip_think_blocks_handles_unclosed_tag() {
let raw = "<think>thinking forever";
assert_eq!(strip_think_blocks(raw), raw);
}
}
+2 -1
View File
@@ -25,7 +25,8 @@ pub use handlers::{
chat_stream_handler, chat_turn_handler, delete_insight_handler, export_training_data_handler,
generate_agentic_insight_handler, generate_insight_handler, generation_status_handler,
get_all_insights_handler, get_available_models_handler, get_insight_handler,
get_openrouter_models_handler, rate_insight_handler, turn_async_handler, turn_replay_handler,
get_insight_history_handler, get_openrouter_models_handler, rate_insight_handler,
turn_async_handler, turn_replay_handler,
};
pub use insight_generator::InsightGenerator;
pub use llamacpp::LlamaCppClient;
+52 -14
View File
@@ -360,18 +360,7 @@ impl OllamaClient {
/// Extract final answer from thinking model output
/// Handles <think>...</think> tags and takes everything after
fn extract_final_answer(&self, response: &str) -> String {
let response = response.trim();
// Look for </think> tag and take everything after it
if let Some(pos) = response.find("</think>") {
let answer = response[pos + 8..].trim();
if !answer.is_empty() {
return answer.to_string();
}
}
// Fallback: return the whole response trimmed
response.to_string()
crate::ai::llm_client::strip_think_blocks(response)
}
async fn try_generate(
@@ -846,11 +835,14 @@ Analyze the image and use specific details from both the visual content and the
if !chunk.message.role.is_empty() {
role = chunk.message.role;
}
// Ollama only attaches tool_calls on the final chunk.
// Ollama ≥0.8 can stream tool_calls incrementally
// across chunks (older servers attach them all to
// one chunk) — append rather than overwrite so
// calls from earlier chunks survive.
if let Some(tcs) = chunk.message.tool_calls
&& !tcs.is_empty()
{
tool_calls = Some(tcs);
append_streamed_tool_calls(&mut tool_calls, tcs);
}
if chunk.done {
prompt_eval_count = chunk.prompt_eval_count;
@@ -1329,8 +1321,20 @@ struct OllamaEmbedResponse {
embeddings: Vec<Vec<f32>>,
}
/// Accumulate tool calls streamed across NDJSON chunks. Ollama ≥0.8 may
/// emit each tool call on its own chunk; replacing the accumulator on every
/// chunk would keep only the last call, so extend instead.
fn append_streamed_tool_calls(
acc: &mut Option<Vec<crate::ai::llm_client::ToolCall>>,
new: Vec<crate::ai::llm_client::ToolCall>,
) {
acc.get_or_insert_with(Vec::new).extend(new);
}
#[cfg(test)]
mod tests {
use super::append_streamed_tool_calls;
use crate::ai::llm_client::{ToolCall, ToolCallFunction};
#[test]
fn generate_photo_description_prompt_is_concise() {
@@ -1341,4 +1345,38 @@ mod tests {
Focus on the people, location, and activity.";
assert!(prompt.len() < 200, "Prompt should be concise");
}
fn call(name: &str) -> ToolCall {
ToolCall {
id: None,
function: ToolCallFunction {
name: name.to_string(),
arguments: serde_json::json!({}),
},
}
}
#[test]
fn streamed_tool_calls_across_chunks_accumulate() {
// Two tool calls arriving in two separate stream chunks must BOTH
// survive assembly — the old `tool_calls = Some(tcs)` kept only the
// last chunk's calls.
let mut acc: Option<Vec<ToolCall>> = None;
append_streamed_tool_calls(&mut acc, vec![call("get_sms_messages")]);
append_streamed_tool_calls(&mut acc, vec![call("reverse_geocode")]);
let calls = acc.expect("tool calls accumulated");
assert_eq!(calls.len(), 2);
assert_eq!(calls[0].function.name, "get_sms_messages");
assert_eq!(calls[1].function.name, "reverse_geocode");
}
#[test]
fn streamed_tool_calls_single_chunk_batch_kept_intact() {
// Older Ollama servers attach all calls to one chunk — unchanged.
let mut acc: Option<Vec<ToolCall>> = None;
append_streamed_tool_calls(&mut acc, vec![call("a"), call("b")]);
let calls = acc.expect("tool calls accumulated");
assert_eq!(calls.len(), 2);
}
}
+10 -5
View File
@@ -529,16 +529,21 @@ mod tests {
opentelemetry::Context::new()
}
/// Build a tempdir-backed library + DAOs sharing a single in-memory
/// SQLite connection (so cross-table joins like
/// `list_unscanned_candidates` see consistent state).
fn setup() -> (
/// Everything `setup` hands back to a test: tempdir, library, shared
/// connection, and the two DAOs. Aliased to keep clippy's
/// type-complexity lint satisfied.
type SetupFixture = (
TempDir,
Library,
Arc<Mutex<diesel::SqliteConnection>>,
Arc<Mutex<Box<dyn ExifDao>>>,
Arc<Mutex<Box<dyn FaceDao>>>,
) {
);
/// Build a tempdir-backed library + DAOs sharing a single in-memory
/// SQLite connection (so cross-table joins like
/// `list_unscanned_candidates` see consistent state).
fn setup() -> SetupFixture {
let tmp = TempDir::new().expect("tempdir");
let mut conn = in_memory_db_connection();
// Migration seeds library id=1 with a placeholder root; rewrite it
+118 -1
View File
@@ -47,7 +47,6 @@ pub trait InsightDao: Sync + Send {
paths: &[String],
) -> Result<Option<PhotoInsight>, DbError>;
#[allow(dead_code)]
fn get_insight_history(
&mut self,
context: &opentelemetry::Context,
@@ -82,6 +81,17 @@ pub trait InsightDao: Sync + Send {
approved: bool,
) -> Result<(), DbError>;
/// Rate a specific insight version by primary key, regardless of
/// `is_current`. Used by the per-file history view to approve/reject
/// previously generated (superseded) versions, which the path-based
/// `rate_insight` (current row only) cannot reach.
fn rate_insight_by_id(
&mut self,
context: &opentelemetry::Context,
insight_id: i32,
approved: bool,
) -> Result<(), DbError>;
fn get_approved_insights(
&mut self,
context: &opentelemetry::Context,
@@ -352,6 +362,26 @@ impl InsightDao for SqliteInsightDao {
.map_err(|e| DbError::log(DbErrorKind::UpdateError, e))
}
fn rate_insight_by_id(
&mut self,
context: &opentelemetry::Context,
target_id: i32,
is_approved: bool,
) -> Result<(), DbError> {
trace_db_call(context, "update", "rate_insight_by_id", |_span| {
use schema::photo_insights::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
diesel::update(photo_insights.find(target_id))
.set(approved.eq(Some(is_approved)))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))
})
.map_err(|e| DbError::log(DbErrorKind::UpdateError, e))
}
fn get_approved_insights(
&mut self,
context: &opentelemetry::Context,
@@ -396,3 +426,90 @@ impl InsightDao for SqliteInsightDao {
.map_err(|e| DbError::log(DbErrorKind::UpdateError, e))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::database::test::in_memory_db_connection;
fn dao() -> SqliteInsightDao {
let conn = Arc::new(Mutex::new(in_memory_db_connection()));
SqliteInsightDao::from_connection(conn)
}
/// Build an insight insert with sensible defaults; tests override the
/// fields they care about (path, generated_at, model).
fn insert(path: &str, generated_at: i64, model: &str) -> InsertPhotoInsight {
InsertPhotoInsight {
library_id: 1,
file_path: path.to_string(),
title: format!("title for {model}"),
summary: "summary".to_string(),
generated_at,
model_version: model.to_string(),
is_current: true,
training_messages: None,
backend: "local".to_string(),
fewshot_source_ids: None,
content_hash: None,
num_ctx: None,
temperature: None,
top_p: None,
top_k: None,
min_p: None,
system_prompt: None,
persona_id: None,
prompt_eval_count: None,
eval_count: None,
}
}
#[test]
fn get_insight_history_returns_all_versions_newest_first() {
let cx = opentelemetry::Context::new();
let mut dao = dao();
// store_insight flips prior rows to is_current=false, so three
// generations for the same path leave a 3-row history.
dao.store_insight(&cx, insert("a.jpg", 100, "m1")).unwrap();
dao.store_insight(&cx, insert("a.jpg", 200, "m2")).unwrap();
dao.store_insight(&cx, insert("a.jpg", 300, "m3")).unwrap();
// A different path must not leak into the history.
dao.store_insight(&cx, insert("b.jpg", 250, "other"))
.unwrap();
let history = dao.get_insight_history(&cx, "a.jpg").unwrap();
assert_eq!(history.len(), 3);
assert_eq!(
history.iter().map(|i| i.generated_at).collect::<Vec<_>>(),
vec![300, 200, 100],
"history should be newest-first"
);
// Exactly one version is current (the latest generation).
let current: Vec<_> = history.iter().filter(|i| i.is_current).collect();
assert_eq!(current.len(), 1);
assert_eq!(current[0].generated_at, 300);
}
#[test]
fn rate_insight_by_id_rates_only_the_targeted_version() {
let cx = opentelemetry::Context::new();
let mut dao = dao();
dao.store_insight(&cx, insert("a.jpg", 100, "m1")).unwrap();
dao.store_insight(&cx, insert("a.jpg", 200, "m2")).unwrap();
// History is newest-first: [200 (current), 100 (superseded)].
let history = dao.get_insight_history(&cx, "a.jpg").unwrap();
let old_version = history.iter().find(|i| i.generated_at == 100).unwrap();
assert!(!old_version.is_current);
dao.rate_insight_by_id(&cx, old_version.id, true).unwrap();
let history = dao.get_insight_history(&cx, "a.jpg").unwrap();
let old = history.iter().find(|i| i.generated_at == 100).unwrap();
let current = history.iter().find(|i| i.generated_at == 200).unwrap();
assert_eq!(old.approved, Some(true), "targeted version is rated");
assert_eq!(current.approved, None, "current version is untouched");
}
}
+1 -1
View File
@@ -1052,7 +1052,7 @@ mod tests {
enabled: true,
excluded_dirs: Vec::new(),
};
let map = new_health_map(&[lib.clone()]);
let map = new_health_map(std::slice::from_ref(&lib));
// First probe: empty dir, no prior data — Online.
let s1 = refresh_health(&map, &lib, false);
+1
View File
@@ -351,6 +351,7 @@ fn main() -> std::io::Result<()> {
.service(ai::get_insight_handler)
.service(ai::delete_insight_handler)
.service(ai::get_all_insights_handler)
.service(ai::get_insight_history_handler)
.service(ai::get_available_models_handler)
.service(ai::get_openrouter_models_handler)
.service(ai::chat_turn_handler)