feat(ai): few-shot exemplars + sticky Ollama preference
- Few-shot injection on /insights/generate/agentic: compresses prior training_messages into trajectory blocks (tool calls + result summaries) and injects into the system prompt. Hardcoded default ids with optional request override. - New fewshot_source_ids column on photo_insights (+ migration) to track which exemplars influenced a given row, for downstream training-set filtering. Chat amend rows stamp None with a lineage note. - Ollama client now remembers which server (primary/fallback) most recently succeeded and tries it first on the next call, via a shared Arc<AtomicBool>. Avoids re-404ing the primary on every agent iteration when the chosen model only lives on the fallback. - Demote noisy logs: daily_summary "Summary match" lines to debug; inner chat_with_tools non-2xx body log from error to warn (outer layer owns the terminal-error signal). - Drift-guard tests for summarize_tool_result covering the success / empty / error / unknown shape for every tool. - Tidy: three pre-existing clippy warnings cleaned up. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,24 @@
|
||||
-- SQLite can't DROP COLUMN cleanly on older versions; rebuild the table.
|
||||
CREATE TABLE photo_insights_backup AS
|
||||
SELECT id, library_id, rel_path, title, summary, generated_at, model_version,
|
||||
is_current, training_messages, approved, backend
|
||||
FROM photo_insights;
|
||||
DROP TABLE photo_insights;
|
||||
CREATE TABLE photo_insights (
|
||||
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
library_id INTEGER NOT NULL REFERENCES libraries(id),
|
||||
rel_path TEXT NOT NULL,
|
||||
title TEXT NOT NULL,
|
||||
summary TEXT NOT NULL,
|
||||
generated_at BIGINT NOT NULL,
|
||||
model_version TEXT NOT NULL,
|
||||
is_current BOOLEAN NOT NULL DEFAULT TRUE,
|
||||
training_messages TEXT,
|
||||
approved BOOLEAN,
|
||||
backend TEXT NOT NULL DEFAULT 'local'
|
||||
);
|
||||
INSERT INTO photo_insights
|
||||
SELECT id, library_id, rel_path, title, summary, generated_at, model_version,
|
||||
is_current, training_messages, approved, backend
|
||||
FROM photo_insights_backup;
|
||||
DROP TABLE photo_insights_backup;
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE photo_insights ADD COLUMN fewshot_source_ids TEXT;
|
||||
@@ -4,6 +4,7 @@ use opentelemetry::trace::{Span, Status, Tracer};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::ai::insight_chat::{ChatStreamEvent, ChatTurnRequest};
|
||||
use crate::ai::ollama::ChatMessage;
|
||||
use crate::ai::{InsightGenerator, ModelCapabilities, OllamaClient};
|
||||
use crate::data::Claims;
|
||||
use crate::database::{ExifDao, InsightDao};
|
||||
@@ -12,6 +13,13 @@ use crate::otel::{extract_context_from_request, global_tracer};
|
||||
use crate::state::AppState;
|
||||
use crate::utils::normalize_path;
|
||||
|
||||
/// Hardcoded few-shot exemplars for the agentic endpoint. Populate with the
|
||||
/// ids of approved insights whose `training_messages` should be compressed
|
||||
/// into trajectory form and injected into the system prompt. Empty = no
|
||||
/// change in behavior. Request-level `fewshot_insight_ids` overrides this
|
||||
/// when non-empty.
|
||||
const DEFAULT_FEWSHOT_INSIGHT_IDS: &[i32] = &[2918, 2908];
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct GeneratePhotoInsightRequest {
|
||||
pub file_path: String,
|
||||
@@ -33,6 +41,12 @@ pub struct GeneratePhotoInsightRequest {
|
||||
/// OpenRouter chat). Only respected by the agentic endpoint.
|
||||
#[serde(default)]
|
||||
pub backend: Option<String>,
|
||||
/// Insight ids whose stored `training_messages` should be compressed
|
||||
/// into few-shot trajectories and injected into the system prompt.
|
||||
/// Silently truncated to the first 2. When absent/empty, the handler
|
||||
/// falls back to `DEFAULT_FEWSHOT_INSIGHT_IDS`.
|
||||
#[serde(default)]
|
||||
pub fewshot_insight_ids: Option<Vec<i32>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
@@ -326,6 +340,41 @@ pub async fn generate_agentic_insight_handler(
|
||||
span.set_attribute(KeyValue::new("backend", b.clone()));
|
||||
}
|
||||
|
||||
// Resolve few-shot ids: request-provided ids take precedence when
|
||||
// non-empty; otherwise fall back to the hardcoded defaults.
|
||||
let fewshot_ids: Vec<i32> = match request.fewshot_insight_ids.as_deref() {
|
||||
Some(ids) if !ids.is_empty() => ids.iter().take(2).copied().collect(),
|
||||
_ => DEFAULT_FEWSHOT_INSIGHT_IDS
|
||||
.iter()
|
||||
.take(2)
|
||||
.copied()
|
||||
.collect(),
|
||||
};
|
||||
span.set_attribute(KeyValue::new("fewshot_count", fewshot_ids.len() as i64));
|
||||
|
||||
let fewshot_examples: Vec<Vec<ChatMessage>> = {
|
||||
let otel_context = opentelemetry::Context::new();
|
||||
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
|
||||
fewshot_ids
|
||||
.iter()
|
||||
.filter_map(|id| {
|
||||
let insight = dao.get_insight_by_id(&otel_context, *id).ok().flatten()?;
|
||||
let json = insight.training_messages?;
|
||||
match serde_json::from_str::<Vec<ChatMessage>>(&json) {
|
||||
Ok(msgs) => Some(msgs),
|
||||
Err(e) => {
|
||||
log::warn!(
|
||||
"Few-shot insight {} has malformed training_messages: {}",
|
||||
id,
|
||||
e
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
let result = insight_generator
|
||||
.generate_agentic_insight_for_photo(
|
||||
&normalized_path,
|
||||
@@ -338,6 +387,8 @@ pub async fn generate_agentic_insight_handler(
|
||||
request.min_p,
|
||||
max_iterations,
|
||||
request.backend.clone(),
|
||||
fewshot_examples,
|
||||
fewshot_ids,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -502,6 +502,11 @@ impl InsightChatService {
|
||||
.await?;
|
||||
let title = title_raw.trim().trim_matches('"').to_string();
|
||||
|
||||
// Amended rows intentionally do not inherit the parent's
|
||||
// `fewshot_source_ids`. The parent's few-shot influence is still
|
||||
// present in this row's content; if you want strict lineage
|
||||
// tracking for training-set filtering, fetch the parent here and
|
||||
// copy its value forward.
|
||||
let new_row = InsertPhotoInsight {
|
||||
library_id: req.library_id,
|
||||
file_path: normalized.clone(),
|
||||
@@ -512,6 +517,7 @@ impl InsightChatService {
|
||||
is_current: true,
|
||||
training_messages: Some(json),
|
||||
backend: effective_backend.clone(),
|
||||
fewshot_source_ids: None,
|
||||
};
|
||||
let cx = opentelemetry::Context::new();
|
||||
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
|
||||
@@ -608,7 +614,7 @@ impl InsightChatService {
|
||||
) -> BoxStream<'static, ChatStreamEvent> {
|
||||
let svc = self;
|
||||
let s = async_stream::stream! {
|
||||
match svc.chat_turn_stream_inner(req, |ev| Ok(ev)).await {
|
||||
match svc.chat_turn_stream_inner(req, Ok).await {
|
||||
Ok(mut rx) => {
|
||||
while let Some(ev) = rx.recv().await {
|
||||
yield ev;
|
||||
@@ -955,6 +961,11 @@ impl InsightChatService {
|
||||
.await?;
|
||||
let title = title_raw.trim().trim_matches('"').to_string();
|
||||
|
||||
// Amended rows intentionally do not inherit the parent's
|
||||
// `fewshot_source_ids`. The parent's few-shot influence is still
|
||||
// present in this row's content; if you want strict lineage
|
||||
// tracking for training-set filtering, fetch the parent here and
|
||||
// copy its value forward.
|
||||
let new_row = InsertPhotoInsight {
|
||||
library_id: req.library_id,
|
||||
file_path: normalized.clone(),
|
||||
@@ -965,6 +976,7 @@ impl InsightChatService {
|
||||
is_current: true,
|
||||
training_messages: Some(json),
|
||||
backend: effective_backend.clone(),
|
||||
fewshot_source_ids: None,
|
||||
};
|
||||
let cx = opentelemetry::Context::new();
|
||||
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
|
||||
|
||||
@@ -1227,6 +1227,7 @@ impl InsightGenerator {
|
||||
is_current: true,
|
||||
training_messages: None,
|
||||
backend: "local".to_string(),
|
||||
fewshot_source_ids: None,
|
||||
};
|
||||
|
||||
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
|
||||
@@ -2634,6 +2635,159 @@ Return ONLY the summary, nothing else."#,
|
||||
/// text, and runs the loop through OpenRouter (chat only — embeddings
|
||||
/// and describe calls stay local in either mode).
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
/// Render a set of prior-conversation transcripts into a compact
|
||||
/// trajectory block for inclusion in the system prompt. Tool results
|
||||
/// are summarised to one line each so the prompt stays small.
|
||||
fn render_fewshot_examples(examples: &[Vec<ChatMessage>]) -> String {
|
||||
if examples.is_empty() {
|
||||
return String::new();
|
||||
}
|
||||
|
||||
let mut out = String::from("## Examples of strong context-gathering\n\n");
|
||||
out.push_str(
|
||||
"The following are compressed trajectories from prior high-quality insights. \
|
||||
They show the *pattern* of tool use, not answers to copy.\n\n",
|
||||
);
|
||||
|
||||
for (i, msgs) in examples.iter().enumerate() {
|
||||
out.push_str(&format!("### Example {}\n\n", i + 1));
|
||||
out.push_str(&Self::render_single_trajectory(msgs));
|
||||
out.push('\n');
|
||||
}
|
||||
|
||||
out.push_str("---\n\n");
|
||||
out
|
||||
}
|
||||
|
||||
fn render_single_trajectory(msgs: &[ChatMessage]) -> String {
|
||||
let mut out = String::new();
|
||||
|
||||
if let Some(first_user) = msgs.iter().find(|m| m.role == "user") {
|
||||
let trimmed = first_user
|
||||
.content
|
||||
.lines()
|
||||
.filter(|l| !l.trim().is_empty())
|
||||
.take(8)
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
out.push_str(&format!("Input:\n{}\n\n", trimmed));
|
||||
}
|
||||
|
||||
out.push_str("Trajectory:\n");
|
||||
let mut step = 1;
|
||||
let mut final_content: Option<String> = None;
|
||||
|
||||
for (i, m) in msgs.iter().enumerate() {
|
||||
if m.role != "assistant" {
|
||||
continue;
|
||||
}
|
||||
if let Some(ref calls) = m.tool_calls {
|
||||
for call in calls {
|
||||
let args_brief = Self::brief_json_args(&call.function.arguments);
|
||||
let result_summary = msgs
|
||||
.get(i + 1)
|
||||
.filter(|r| r.role == "tool")
|
||||
.map(|r| Self::summarize_tool_result(&call.function.name, &r.content))
|
||||
.unwrap_or_else(|| "(no result)".to_string());
|
||||
out.push_str(&format!(
|
||||
"{}. {}({}) -> {}\n",
|
||||
step, call.function.name, args_brief, result_summary
|
||||
));
|
||||
step += 1;
|
||||
}
|
||||
} else if !m.content.is_empty() {
|
||||
final_content = Some(m.content.clone());
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(content) = final_content {
|
||||
let short: String = content.chars().take(240).collect();
|
||||
out.push_str(&format!("\nFinal insight: {}...\n", short));
|
||||
}
|
||||
|
||||
out
|
||||
}
|
||||
|
||||
fn brief_json_args(v: &serde_json::Value) -> String {
|
||||
let Some(obj) = v.as_object() else {
|
||||
return v.to_string();
|
||||
};
|
||||
obj.iter()
|
||||
.map(|(k, v)| {
|
||||
let rendered = match v {
|
||||
serde_json::Value::String(s) if s.len() > 40 => {
|
||||
format!("\"{}...\"", &s[..40])
|
||||
}
|
||||
_ => v.to_string(),
|
||||
};
|
||||
format!("{}={}", k, rendered)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
}
|
||||
|
||||
/// Collapse a raw tool-result string (the text the model saw) into a
|
||||
/// short phrase suitable for a few-shot trajectory. Detects the
|
||||
/// "Found N ...", "No ...", and "Error ..." idioms used by the tool
|
||||
/// implementations in this file. Unknown shapes fall back to a char
|
||||
/// count, which is deliberately visible so drift shows up in output.
|
||||
fn summarize_tool_result(tool_name: &str, raw: &str) -> String {
|
||||
if raw.starts_with("Error ") {
|
||||
return "error".to_string();
|
||||
}
|
||||
if raw.starts_with("No ") || raw.starts_with("Could not ") {
|
||||
return "empty (pivoted)".to_string();
|
||||
}
|
||||
|
||||
if let Some(rest) = raw.strip_prefix("Found ")
|
||||
&& let Some(n_str) = rest.split_whitespace().next()
|
||||
&& let Ok(n) = n_str.parse::<usize>()
|
||||
{
|
||||
let kind = match tool_name {
|
||||
"search_messages" | "get_sms_messages" => "messages",
|
||||
"get_calendar_events" => "events",
|
||||
"get_location_history" => "location records",
|
||||
_ => "results",
|
||||
};
|
||||
return format!("{} {}", n, kind);
|
||||
}
|
||||
|
||||
match tool_name {
|
||||
"search_rag" => {
|
||||
let n = raw.split("\n\n").filter(|s| !s.trim().is_empty()).count();
|
||||
format!("{} rag hits", n)
|
||||
}
|
||||
"get_file_tags" => {
|
||||
let n = raw.split(',').filter(|s| !s.trim().is_empty()).count();
|
||||
format!("{} tags", n)
|
||||
}
|
||||
"describe_photo" => {
|
||||
let short: String = raw.chars().take(80).collect();
|
||||
format!("described: \"{}...\"", short)
|
||||
}
|
||||
"reverse_geocode" => {
|
||||
let short: String = raw.chars().take(60).collect();
|
||||
format!("place: {}", short)
|
||||
}
|
||||
"recall_entities" | "recall_facts_for_photo" => {
|
||||
let n = raw.lines().skip(1).filter(|l| !l.trim().is_empty()).count();
|
||||
let kind = if tool_name == "recall_entities" {
|
||||
"entities"
|
||||
} else {
|
||||
"facts"
|
||||
};
|
||||
format!("{} {}", n, kind)
|
||||
}
|
||||
"store_entity" | "store_fact" => raw
|
||||
.split_whitespace()
|
||||
.find_map(|tok| tok.strip_prefix("ID:"))
|
||||
.map(|id| format!("stored id={}", id.trim_end_matches(',')))
|
||||
.unwrap_or_else(|| "stored".to_string()),
|
||||
"get_current_datetime" => "time noted".to_string(),
|
||||
_ => format!("{} chars", raw.len()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn generate_agentic_insight_for_photo(
|
||||
&self,
|
||||
file_path: &str,
|
||||
@@ -2646,6 +2800,8 @@ Return ONLY the summary, nothing else."#,
|
||||
min_p: Option<f32>,
|
||||
max_iterations: usize,
|
||||
backend: Option<String>,
|
||||
fewshot_examples: Vec<Vec<ChatMessage>>,
|
||||
fewshot_source_ids: Vec<i32>,
|
||||
) -> Result<(Option<i32>, Option<i32>)> {
|
||||
let tracer = global_tracer();
|
||||
let current_cx = opentelemetry::Context::current();
|
||||
@@ -2990,8 +3146,10 @@ Return ONLY the summary, nothing else."#,
|
||||
),
|
||||
None => String::new(),
|
||||
};
|
||||
let fewshot_block = Self::render_fewshot_examples(&fewshot_examples);
|
||||
let base_system = format!(
|
||||
"You are a personal photo memory assistant helping to reconstruct a memory from a photo.{owner_id_note}\n\n\
|
||||
{fewshot_block}\
|
||||
IMPORTANT INSTRUCTIONS:\n\
|
||||
1. You MUST call multiple tools to gather context BEFORE writing any final insight. Do not produce a final answer after only one or two tool calls.\n\
|
||||
2. When calling get_sms_messages and search_rag, always make at least one call WITHOUT a contact filter to capture what else was happening in {owner_name}'s life around this date — other conversations, events, and activities provide important wider context even when a specific contact is known.\n\
|
||||
@@ -3002,6 +3160,7 @@ Return ONLY the summary, nothing else."#,
|
||||
7. If a tool returns no results, that is useful information — continue calling the remaining tools anyway.\n\
|
||||
8. You have a hard budget of {max_iterations} tool-calling iterations before the loop ends. Plan your context gathering so you can write a complete final insight within that budget.",
|
||||
owner_id_note = owner_id_note,
|
||||
fewshot_block = fewshot_block,
|
||||
owner_name = owner_name,
|
||||
max_iterations = max_iterations
|
||||
);
|
||||
@@ -3153,12 +3312,10 @@ Return ONLY the summary, nothing else."#,
|
||||
"Agentic loop exhausted after {} iterations, requesting final answer",
|
||||
iterations_used
|
||||
);
|
||||
messages.push(ChatMessage::user(
|
||||
&format!(
|
||||
"Based on the context gathered, please write the final photo insight: a title and a detailed personal summary. Write in first person as {}.",
|
||||
user_display_name()
|
||||
),
|
||||
));
|
||||
messages.push(ChatMessage::user(format!(
|
||||
"Based on the context gathered, please write the final photo insight: a title and a detailed personal summary. Write in first person as {}.",
|
||||
user_display_name()
|
||||
)));
|
||||
let (final_response, prompt_tokens, eval_tokens) = chat_backend
|
||||
.chat_with_tools(messages.clone(), vec![])
|
||||
.await?;
|
||||
@@ -3204,6 +3361,11 @@ Return ONLY the summary, nothing else."#,
|
||||
|
||||
// 15. Store insight (returns the persisted row including its new id)
|
||||
let model_version = chat_backend.primary_model().to_string();
|
||||
let fewshot_source_ids_json = if fewshot_source_ids.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(serde_json::to_string(&fewshot_source_ids).unwrap_or_else(|_| "[]".to_string()))
|
||||
};
|
||||
let insight = InsertPhotoInsight {
|
||||
library_id: crate::libraries::PRIMARY_LIBRARY_ID,
|
||||
file_path: file_path.to_string(),
|
||||
@@ -3214,6 +3376,7 @@ Return ONLY the summary, nothing else."#,
|
||||
is_current: true,
|
||||
training_messages,
|
||||
backend: backend_label.clone(),
|
||||
fewshot_source_ids: fewshot_source_ids_json,
|
||||
};
|
||||
|
||||
let stored = {
|
||||
@@ -3333,6 +3496,7 @@ Return ONLY the summary, nothing else."#,
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::ai::ollama::{ToolCall, ToolCallFunction};
|
||||
|
||||
#[test]
|
||||
fn combine_contexts_includes_tags_section_when_tags_present() {
|
||||
@@ -3374,4 +3538,219 @@ mod tests {
|
||||
let result = InsightGenerator::combine_contexts(None, None, None, None, None);
|
||||
assert_eq!(result, "No additional context available");
|
||||
}
|
||||
|
||||
// These tests assert the shape of the strings returned by the tool
|
||||
// implementations above. If a tool's output format changes, update the
|
||||
// tool AND the corresponding arm of `summarize_tool_result` — these
|
||||
// tests exist to make that coupling loud.
|
||||
|
||||
#[test]
|
||||
fn summarize_errors_uniformly() {
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result("search_rag", "Error searching RAG: boom"),
|
||||
"error"
|
||||
);
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result(
|
||||
"get_sms_messages",
|
||||
"Error fetching SMS messages: timeout"
|
||||
),
|
||||
"error"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn summarize_empty_results_uniformly() {
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result("search_rag", "No relevant messages found."),
|
||||
"empty (pivoted)"
|
||||
);
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result("get_sms_messages", "No messages found."),
|
||||
"empty (pivoted)"
|
||||
);
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result(
|
||||
"reverse_geocode",
|
||||
"Could not resolve coordinates to a place name."
|
||||
),
|
||||
"empty (pivoted)"
|
||||
);
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result(
|
||||
"recall_facts_for_photo",
|
||||
"No knowledge facts found for this photo."
|
||||
),
|
||||
"empty (pivoted)"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn summarize_found_count_per_tool() {
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result(
|
||||
"get_sms_messages",
|
||||
"Found 7 messages:\n[2023-08-15 10:00] Sarah: hi"
|
||||
),
|
||||
"7 messages"
|
||||
);
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result(
|
||||
"search_messages",
|
||||
"Found 3 messages (mode: hybrid):\n\n[2023-08-15] Sarah — hi"
|
||||
),
|
||||
"3 messages"
|
||||
);
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result(
|
||||
"get_calendar_events",
|
||||
"Found 2 calendar events:\n[2023-08-15 10:00] Wedding"
|
||||
),
|
||||
"2 events"
|
||||
);
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result(
|
||||
"get_location_history",
|
||||
"Found 5 location records:\n[2023-08-15 10:00] 39.0, -120.0"
|
||||
),
|
||||
"5 location records"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn summarize_search_rag_counts_hits() {
|
||||
let raw = "[2023-08-15] Sarah: venue confirmed\n\n[2023-08-14] Mom: travel plans\n\n[2023-08-13] Dad: weather";
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result("search_rag", raw),
|
||||
"3 rag hits"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn summarize_get_file_tags() {
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result("get_file_tags", "wedding, tahoe, 2023"),
|
||||
"3 tags"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn summarize_describe_photo_truncates() {
|
||||
let raw = "A wedding ceremony at Lake Tahoe with about 40 guests seated in rows facing a lakeside arch decorated with white flowers.";
|
||||
let out = InsightGenerator::summarize_tool_result("describe_photo", raw);
|
||||
assert!(out.starts_with("described: \""));
|
||||
assert!(out.contains("A wedding ceremony at Lake Tahoe"));
|
||||
assert!(out.ends_with("...\""));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn summarize_reverse_geocode_returns_place() {
|
||||
let out =
|
||||
InsightGenerator::summarize_tool_result("reverse_geocode", "South Lake Tahoe, CA, USA");
|
||||
assert_eq!(out, "place: South Lake Tahoe, CA, USA");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn summarize_recall_entities_counts_lines() {
|
||||
let raw = "Known entities:\n- Sarah (person)\n- Tahoe (place)\n- Wedding 2023 (event)";
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result("recall_entities", raw),
|
||||
"3 entities"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn summarize_recall_facts_counts_lines() {
|
||||
let raw = "Knowledge for this photo:\n- Sarah: college friend\n- Tahoe: vacation spot";
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result("recall_facts_for_photo", raw),
|
||||
"2 facts"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn summarize_store_entity_extracts_id() {
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result(
|
||||
"store_entity",
|
||||
"Entity stored: ID:42 | person | Sarah | confidence:0.80"
|
||||
),
|
||||
"stored id=42"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn summarize_store_fact_extracts_id() {
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result(
|
||||
"store_fact",
|
||||
"Stored new fact: ID:17 | confidence:0.60"
|
||||
),
|
||||
"stored id=17"
|
||||
);
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result(
|
||||
"store_fact",
|
||||
"Corroborated existing fact: ID:17 | confidence:0.85"
|
||||
),
|
||||
"stored id=17"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn summarize_current_datetime() {
|
||||
assert_eq!(
|
||||
InsightGenerator::summarize_tool_result(
|
||||
"get_current_datetime",
|
||||
"Current date/time: 2024-01-15 12:00:00 PST (Monday)"
|
||||
),
|
||||
"time noted"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn summarize_unknown_tool_falls_back_to_char_count() {
|
||||
let out = InsightGenerator::summarize_tool_result("never_heard_of_it", "some output");
|
||||
assert_eq!(out, "11 chars");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn render_fewshot_empty_returns_empty_string() {
|
||||
assert!(InsightGenerator::render_fewshot_examples(&[]).is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn render_single_trajectory_walks_tool_calls_in_order() {
|
||||
let arguments = serde_json::json!({ "query": "wedding", "date": "2023-08-15" });
|
||||
let msgs = vec![
|
||||
ChatMessage::system("ignored"),
|
||||
ChatMessage::user("Photo file path: /photos/img.jpg\nDate taken: August 15, 2023"),
|
||||
ChatMessage {
|
||||
role: "assistant".to_string(),
|
||||
content: String::new(),
|
||||
tool_calls: Some(vec![ToolCall {
|
||||
function: ToolCallFunction {
|
||||
name: "search_rag".to_string(),
|
||||
arguments,
|
||||
},
|
||||
id: None,
|
||||
}]),
|
||||
images: None,
|
||||
},
|
||||
ChatMessage::tool_result("No relevant messages found."),
|
||||
ChatMessage {
|
||||
role: "assistant".to_string(),
|
||||
content: "Final title\n\nFinal body.".to_string(),
|
||||
tool_calls: None,
|
||||
images: None,
|
||||
},
|
||||
];
|
||||
let out = InsightGenerator::render_single_trajectory(&msgs);
|
||||
assert!(out.contains("Input:"));
|
||||
assert!(out.contains("/photos/img.jpg"));
|
||||
assert!(out.contains("1. search_rag("));
|
||||
assert!(out.contains("query=\"wedding\""));
|
||||
assert!(out.contains("-> empty (pivoted)"));
|
||||
assert!(out.contains("Final insight: Final title"));
|
||||
}
|
||||
}
|
||||
|
||||
194
src/ai/ollama.rs
194
src/ai/ollama.rs
@@ -4,6 +4,7 @@ use chrono::NaiveDate;
|
||||
use reqwest::Client;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@@ -19,6 +20,19 @@ pub use crate::ai::llm_client::{ToolCall, ToolCallFunction, ToolFunction};
|
||||
// Cache duration: 15 minutes
|
||||
const CACHE_DURATION_SECS: u64 = 15 * 60;
|
||||
|
||||
/// Default total request timeout for generation calls, in seconds.
|
||||
/// Overridable via `OLLAMA_REQUEST_TIMEOUT_SECONDS` env var for slow
|
||||
/// CPU-offloaded models where inference can take several minutes.
|
||||
const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 120;
|
||||
|
||||
fn configured_request_timeout_secs() -> u64 {
|
||||
std::env::var("OLLAMA_REQUEST_TIMEOUT_SECONDS")
|
||||
.ok()
|
||||
.and_then(|v| v.parse::<u64>().ok())
|
||||
.filter(|&s| s > 0)
|
||||
.unwrap_or(DEFAULT_REQUEST_TIMEOUT_SECS)
|
||||
}
|
||||
|
||||
/// Embedding model used across the app. Callers that persist a
|
||||
/// `model_version` alongside an embedding should read this constant so the
|
||||
/// stored label always matches what `generate_embeddings` actually ran.
|
||||
@@ -65,6 +79,12 @@ pub struct OllamaClient {
|
||||
top_p: Option<f32>,
|
||||
top_k: Option<i32>,
|
||||
min_p: Option<f32>,
|
||||
/// Sticky preference shared across clones: when the fallback server
|
||||
/// succeeded most recently, try it first on the next call. Avoids
|
||||
/// re-probing the primary with a model it doesn't have loaded across
|
||||
/// every iteration of the agent loop. `Arc<AtomicBool>` so cloning
|
||||
/// `OllamaClient` shares the flag rather than resetting it.
|
||||
prefer_fallback: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl OllamaClient {
|
||||
@@ -77,7 +97,7 @@ impl OllamaClient {
|
||||
Self {
|
||||
client: Client::builder()
|
||||
.connect_timeout(Duration::from_secs(5)) // Quick connection timeout
|
||||
.timeout(Duration::from_secs(120)) // Total request timeout for generation
|
||||
.timeout(Duration::from_secs(configured_request_timeout_secs()))
|
||||
.build()
|
||||
.unwrap_or_else(|_| Client::new()),
|
||||
primary_url,
|
||||
@@ -89,9 +109,44 @@ impl OllamaClient {
|
||||
top_p: None,
|
||||
top_k: None,
|
||||
min_p: None,
|
||||
prefer_fallback: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the server attempt order as `(label, url, model)` tuples.
|
||||
/// Respects the sticky `prefer_fallback` flag so the most recently
|
||||
/// successful server is tried first.
|
||||
fn attempt_order(&self) -> Vec<(&'static str, String, String)> {
|
||||
let primary = (
|
||||
"primary",
|
||||
self.primary_url.clone(),
|
||||
self.primary_model.clone(),
|
||||
);
|
||||
let fallback = self.fallback_url.as_ref().map(|url| {
|
||||
let model = self
|
||||
.fallback_model
|
||||
.clone()
|
||||
.unwrap_or_else(|| self.primary_model.clone());
|
||||
("fallback", url.clone(), model)
|
||||
});
|
||||
|
||||
let prefer_fallback = fallback.is_some() && self.prefer_fallback.load(Ordering::Relaxed);
|
||||
|
||||
let mut order = Vec::with_capacity(2);
|
||||
if prefer_fallback {
|
||||
if let Some(fb) = fallback.clone() {
|
||||
order.push(fb);
|
||||
}
|
||||
order.push(primary);
|
||||
} else {
|
||||
order.push(primary);
|
||||
if let Some(fb) = fallback {
|
||||
order.push(fb);
|
||||
}
|
||||
}
|
||||
order
|
||||
}
|
||||
|
||||
pub fn set_num_ctx(&mut self, num_ctx: Option<i32>) {
|
||||
self.num_ctx = num_ctx;
|
||||
}
|
||||
@@ -587,68 +642,57 @@ Analyze the image and use specific details from both the visual content and the
|
||||
|
||||
/// Send a chat request with tool definitions to /api/chat.
|
||||
/// Returns the assistant's response message (may contain tool_calls or final content).
|
||||
/// Uses primary/fallback URL routing same as other generation methods.
|
||||
/// Tries servers in preference order — most recently successful first —
|
||||
/// so a fallback-only model doesn't re-404 against the primary on every
|
||||
/// iteration of the agent loop.
|
||||
pub async fn chat_with_tools(
|
||||
&self,
|
||||
messages: Vec<ChatMessage>,
|
||||
tools: Vec<Tool>,
|
||||
) -> Result<(ChatMessage, Option<i32>, Option<i32>)> {
|
||||
// Try primary server first
|
||||
log::info!(
|
||||
"Attempting chat_with_tools with primary server: {} (model: {})",
|
||||
self.primary_url,
|
||||
self.primary_model
|
||||
);
|
||||
let primary_result = self
|
||||
.try_chat_with_tools(&self.primary_url, messages.clone(), tools.clone())
|
||||
.await;
|
||||
|
||||
match primary_result {
|
||||
Ok(result) => {
|
||||
log::info!("Successfully got chat_with_tools response from primary server");
|
||||
Ok(result)
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Primary server chat_with_tools failed: {}", e);
|
||||
|
||||
// Try fallback server if available
|
||||
if let Some(fallback_url) = &self.fallback_url {
|
||||
let fallback_model =
|
||||
self.fallback_model.as_ref().unwrap_or(&self.primary_model);
|
||||
let order = self.attempt_order();
|
||||
let mut errors: Vec<String> = Vec::new();
|
||||
|
||||
for (label, url, model) in &order {
|
||||
log::info!(
|
||||
"Attempting chat_with_tools with {} server: {} (model: {})",
|
||||
label,
|
||||
url,
|
||||
model
|
||||
);
|
||||
match self
|
||||
.try_chat_with_tools(url, messages.clone(), tools.clone())
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
log::info!(
|
||||
"Attempting chat_with_tools with fallback server: {} (model: {})",
|
||||
fallback_url,
|
||||
fallback_model
|
||||
"Successfully got chat_with_tools response from {} server",
|
||||
label
|
||||
);
|
||||
match self
|
||||
.try_chat_with_tools(fallback_url, messages, tools)
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
log::info!(
|
||||
"Successfully got chat_with_tools response from fallback server"
|
||||
);
|
||||
Ok(result)
|
||||
}
|
||||
Err(fallback_e) => {
|
||||
log::error!(
|
||||
"Fallback server chat_with_tools also failed: {}",
|
||||
fallback_e
|
||||
);
|
||||
Err(anyhow::anyhow!(
|
||||
"Both primary and fallback servers failed. Primary: {}, Fallback: {}",
|
||||
e,
|
||||
fallback_e
|
||||
))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log::error!("No fallback server configured");
|
||||
Err(e)
|
||||
self.prefer_fallback
|
||||
.store(*label == "fallback", Ordering::Relaxed);
|
||||
return Ok(result);
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("{} server chat_with_tools failed: {}", label, e);
|
||||
errors.push(format!("{}: {}", label, e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if order.len() <= 1 {
|
||||
log::error!("No fallback server configured; chat_with_tools exhausted");
|
||||
} else {
|
||||
log::error!(
|
||||
"All {} servers failed for chat_with_tools ({})",
|
||||
order.len(),
|
||||
errors.join(" / ")
|
||||
);
|
||||
}
|
||||
Err(anyhow::anyhow!(
|
||||
"chat_with_tools failed on all servers: {}",
|
||||
errors.join(" / ")
|
||||
))
|
||||
}
|
||||
|
||||
/// Streaming variant of `chat_with_tools`. Tries primary, then falls
|
||||
@@ -662,26 +706,30 @@ Analyze the image and use specific details from both the visual content and the
|
||||
messages: Vec<ChatMessage>,
|
||||
tools: Vec<Tool>,
|
||||
) -> Result<BoxStream<'static, Result<LlmStreamEvent>>> {
|
||||
// Attempt primary. If it can't be opened at all, try fallback.
|
||||
match self
|
||||
.try_chat_with_tools_stream(&self.primary_url, messages.clone(), tools.clone())
|
||||
.await
|
||||
{
|
||||
Ok(s) => Ok(s),
|
||||
Err(e) => {
|
||||
if let Some(fallback_url) = self.fallback_url.clone() {
|
||||
log::warn!(
|
||||
"Streaming chat primary failed ({}); trying fallback {}",
|
||||
e,
|
||||
fallback_url
|
||||
);
|
||||
self.try_chat_with_tools_stream(&fallback_url, messages, tools)
|
||||
.await
|
||||
} else {
|
||||
Err(e)
|
||||
// Same preference logic as `chat_with_tools`. Only the initial
|
||||
// connection is retried across servers — once the stream begins,
|
||||
// mid-stream errors propagate to the caller.
|
||||
let order = self.attempt_order();
|
||||
let mut last_err: Option<anyhow::Error> = None;
|
||||
|
||||
for (label, url, _model) in &order {
|
||||
match self
|
||||
.try_chat_with_tools_stream(url, messages.clone(), tools.clone())
|
||||
.await
|
||||
{
|
||||
Ok(s) => {
|
||||
self.prefer_fallback
|
||||
.store(*label == "fallback", Ordering::Relaxed);
|
||||
return Ok(s);
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Streaming chat on {} server failed: {}", label, e);
|
||||
last_err = Some(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("No Ollama server configured")))
|
||||
}
|
||||
|
||||
async fn try_chat_with_tools_stream(
|
||||
@@ -859,8 +907,12 @@ Analyze the image and use specific details from both the visual content and the
|
||||
if !response.status().is_success() {
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
log::error!(
|
||||
"chat_with_tools request body that caused {}: {}",
|
||||
// warn, not error — the outer `chat_with_tools` may recover via
|
||||
// the fallback server. When both fail, the outer layer emits the
|
||||
// actual error log.
|
||||
log::warn!(
|
||||
"chat_with_tools request to {} got {}: {}",
|
||||
base_url,
|
||||
status,
|
||||
request_json
|
||||
);
|
||||
|
||||
@@ -452,8 +452,7 @@ impl LlmClient for OpenRouterClient {
|
||||
// SSE frames are delimited by a blank line. Walk the buffer
|
||||
// for "\n\n" markers; anything before them is a complete
|
||||
// frame (possibly multi-line).
|
||||
loop {
|
||||
let Some(sep) = find_double_newline(&buf) else { break };
|
||||
while let Some(sep) = find_double_newline(&buf) {
|
||||
let frame = buf.drain(..sep + 2).collect::<Vec<_>>();
|
||||
let frame_str = match std::str::from_utf8(&frame) {
|
||||
Ok(s) => s,
|
||||
|
||||
@@ -251,6 +251,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
args.min_p,
|
||||
args.max_iterations,
|
||||
None,
|
||||
Vec::new(),
|
||||
Vec::new(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -268,7 +268,7 @@ impl DailySummaryDao for SqliteDailySummaryDao {
|
||||
.into_iter()
|
||||
.take(limit)
|
||||
.map(|(similarity, summary)| {
|
||||
log::info!(
|
||||
log::debug!(
|
||||
"Summary match: similarity={:.3}, date={}, contact={}, summary=\"{}\"",
|
||||
similarity,
|
||||
summary.date,
|
||||
@@ -388,7 +388,7 @@ impl DailySummaryDao for SqliteDailySummaryDao {
|
||||
.into_iter()
|
||||
.take(limit)
|
||||
.map(|(combined, similarity, days, summary)| {
|
||||
log::info!(
|
||||
log::debug!(
|
||||
"Summary match: combined={:.3} (sim={:.3}, days={}), date={}, contact={}, summary=\"{}\"",
|
||||
combined,
|
||||
similarity,
|
||||
|
||||
@@ -38,6 +38,16 @@ pub trait InsightDao: Sync + Send {
|
||||
file_path: &str,
|
||||
) -> Result<Vec<PhotoInsight>, DbError>;
|
||||
|
||||
/// Fetch a single insight by primary key, regardless of `is_current`.
|
||||
/// Used by the few-shot injection flow where the caller picks specific
|
||||
/// historical insights (which may have been superseded) as training
|
||||
/// exemplars for a fresh generation.
|
||||
fn get_insight_by_id(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
insight_id: i32,
|
||||
) -> Result<Option<PhotoInsight>, DbError>;
|
||||
|
||||
fn delete_insight(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
@@ -198,6 +208,25 @@ impl InsightDao for SqliteInsightDao {
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
|
||||
fn get_insight_by_id(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
insight_id: i32,
|
||||
) -> Result<Option<PhotoInsight>, DbError> {
|
||||
trace_db_call(context, "query", "get_insight_by_id", |_span| {
|
||||
use schema::photo_insights::dsl::*;
|
||||
|
||||
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
|
||||
|
||||
photo_insights
|
||||
.find(insight_id)
|
||||
.first::<PhotoInsight>(connection.deref_mut())
|
||||
.optional()
|
||||
.map_err(|_| anyhow::anyhow!("Query error"))
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
|
||||
fn delete_insight(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
|
||||
@@ -102,6 +102,12 @@ pub struct InsertPhotoInsight {
|
||||
pub training_messages: Option<String>,
|
||||
/// `"local"` (Ollama with images) | `"hybrid"` (local vision + OpenRouter chat).
|
||||
pub backend: String,
|
||||
/// JSON array of insight ids whose `training_messages` were compressed
|
||||
/// and injected into the system prompt as few-shot exemplars when this
|
||||
/// row was generated. `None` means no few-shot was used (pristine
|
||||
/// generation). Used downstream to filter out contaminated rows when
|
||||
/// assembling an unbiased training / evaluation set.
|
||||
pub fewshot_source_ids: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Queryable, Clone, Debug)]
|
||||
@@ -119,6 +125,7 @@ pub struct PhotoInsight {
|
||||
pub approved: Option<bool>,
|
||||
/// `"local"` (Ollama with images) | `"hybrid"` (local vision + OpenRouter chat).
|
||||
pub backend: String,
|
||||
pub fewshot_source_ids: Option<String>,
|
||||
}
|
||||
|
||||
// --- Libraries ---
|
||||
|
||||
@@ -143,6 +143,7 @@ diesel::table! {
|
||||
training_messages -> Nullable<Text>,
|
||||
approved -> Nullable<Bool>,
|
||||
backend -> Text,
|
||||
fewshot_source_ids -> Nullable<Text>,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user