diff --git a/migrations/2026-04-24-000000_add_fewshot_source_to_insights/down.sql b/migrations/2026-04-24-000000_add_fewshot_source_to_insights/down.sql new file mode 100644 index 0000000..2702414 --- /dev/null +++ b/migrations/2026-04-24-000000_add_fewshot_source_to_insights/down.sql @@ -0,0 +1,24 @@ +-- SQLite can't DROP COLUMN cleanly on older versions; rebuild the table. +CREATE TABLE photo_insights_backup AS + SELECT id, library_id, rel_path, title, summary, generated_at, model_version, + is_current, training_messages, approved, backend + FROM photo_insights; +DROP TABLE photo_insights; +CREATE TABLE photo_insights ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + library_id INTEGER NOT NULL REFERENCES libraries(id), + rel_path TEXT NOT NULL, + title TEXT NOT NULL, + summary TEXT NOT NULL, + generated_at BIGINT NOT NULL, + model_version TEXT NOT NULL, + is_current BOOLEAN NOT NULL DEFAULT TRUE, + training_messages TEXT, + approved BOOLEAN, + backend TEXT NOT NULL DEFAULT 'local' +); +INSERT INTO photo_insights + SELECT id, library_id, rel_path, title, summary, generated_at, model_version, + is_current, training_messages, approved, backend + FROM photo_insights_backup; +DROP TABLE photo_insights_backup; diff --git a/migrations/2026-04-24-000000_add_fewshot_source_to_insights/up.sql b/migrations/2026-04-24-000000_add_fewshot_source_to_insights/up.sql new file mode 100644 index 0000000..f39340c --- /dev/null +++ b/migrations/2026-04-24-000000_add_fewshot_source_to_insights/up.sql @@ -0,0 +1 @@ +ALTER TABLE photo_insights ADD COLUMN fewshot_source_ids TEXT; diff --git a/src/ai/handlers.rs b/src/ai/handlers.rs index d24927f..2121a81 100644 --- a/src/ai/handlers.rs +++ b/src/ai/handlers.rs @@ -4,6 +4,7 @@ use opentelemetry::trace::{Span, Status, Tracer}; use serde::{Deserialize, Serialize}; use crate::ai::insight_chat::{ChatStreamEvent, ChatTurnRequest}; +use crate::ai::ollama::ChatMessage; use crate::ai::{InsightGenerator, ModelCapabilities, OllamaClient}; use crate::data::Claims; use crate::database::{ExifDao, InsightDao}; @@ -12,6 +13,13 @@ use crate::otel::{extract_context_from_request, global_tracer}; use crate::state::AppState; use crate::utils::normalize_path; +/// Hardcoded few-shot exemplars for the agentic endpoint. Populate with the +/// ids of approved insights whose `training_messages` should be compressed +/// into trajectory form and injected into the system prompt. Empty = no +/// change in behavior. Request-level `fewshot_insight_ids` overrides this +/// when non-empty. +const DEFAULT_FEWSHOT_INSIGHT_IDS: &[i32] = &[2918, 2908]; + #[derive(Debug, Deserialize)] pub struct GeneratePhotoInsightRequest { pub file_path: String, @@ -33,6 +41,12 @@ pub struct GeneratePhotoInsightRequest { /// OpenRouter chat). Only respected by the agentic endpoint. #[serde(default)] pub backend: Option, + /// Insight ids whose stored `training_messages` should be compressed + /// into few-shot trajectories and injected into the system prompt. + /// Silently truncated to the first 2. When absent/empty, the handler + /// falls back to `DEFAULT_FEWSHOT_INSIGHT_IDS`. + #[serde(default)] + pub fewshot_insight_ids: Option>, } #[derive(Debug, Deserialize)] @@ -326,6 +340,41 @@ pub async fn generate_agentic_insight_handler( span.set_attribute(KeyValue::new("backend", b.clone())); } + // Resolve few-shot ids: request-provided ids take precedence when + // non-empty; otherwise fall back to the hardcoded defaults. + let fewshot_ids: Vec = match request.fewshot_insight_ids.as_deref() { + Some(ids) if !ids.is_empty() => ids.iter().take(2).copied().collect(), + _ => DEFAULT_FEWSHOT_INSIGHT_IDS + .iter() + .take(2) + .copied() + .collect(), + }; + span.set_attribute(KeyValue::new("fewshot_count", fewshot_ids.len() as i64)); + + let fewshot_examples: Vec> = { + let otel_context = opentelemetry::Context::new(); + let mut dao = insight_dao.lock().expect("Unable to lock InsightDao"); + fewshot_ids + .iter() + .filter_map(|id| { + let insight = dao.get_insight_by_id(&otel_context, *id).ok().flatten()?; + let json = insight.training_messages?; + match serde_json::from_str::>(&json) { + Ok(msgs) => Some(msgs), + Err(e) => { + log::warn!( + "Few-shot insight {} has malformed training_messages: {}", + id, + e + ); + None + } + } + }) + .collect() + }; + let result = insight_generator .generate_agentic_insight_for_photo( &normalized_path, @@ -338,6 +387,8 @@ pub async fn generate_agentic_insight_handler( request.min_p, max_iterations, request.backend.clone(), + fewshot_examples, + fewshot_ids, ) .await; diff --git a/src/ai/insight_chat.rs b/src/ai/insight_chat.rs index 01dd4de..c51befd 100644 --- a/src/ai/insight_chat.rs +++ b/src/ai/insight_chat.rs @@ -502,6 +502,11 @@ impl InsightChatService { .await?; let title = title_raw.trim().trim_matches('"').to_string(); + // Amended rows intentionally do not inherit the parent's + // `fewshot_source_ids`. The parent's few-shot influence is still + // present in this row's content; if you want strict lineage + // tracking for training-set filtering, fetch the parent here and + // copy its value forward. let new_row = InsertPhotoInsight { library_id: req.library_id, file_path: normalized.clone(), @@ -512,6 +517,7 @@ impl InsightChatService { is_current: true, training_messages: Some(json), backend: effective_backend.clone(), + fewshot_source_ids: None, }; let cx = opentelemetry::Context::new(); let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao"); @@ -608,7 +614,7 @@ impl InsightChatService { ) -> BoxStream<'static, ChatStreamEvent> { let svc = self; let s = async_stream::stream! { - match svc.chat_turn_stream_inner(req, |ev| Ok(ev)).await { + match svc.chat_turn_stream_inner(req, Ok).await { Ok(mut rx) => { while let Some(ev) = rx.recv().await { yield ev; @@ -955,6 +961,11 @@ impl InsightChatService { .await?; let title = title_raw.trim().trim_matches('"').to_string(); + // Amended rows intentionally do not inherit the parent's + // `fewshot_source_ids`. The parent's few-shot influence is still + // present in this row's content; if you want strict lineage + // tracking for training-set filtering, fetch the parent here and + // copy its value forward. let new_row = InsertPhotoInsight { library_id: req.library_id, file_path: normalized.clone(), @@ -965,6 +976,7 @@ impl InsightChatService { is_current: true, training_messages: Some(json), backend: effective_backend.clone(), + fewshot_source_ids: None, }; let cx = opentelemetry::Context::new(); let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao"); diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index dad7040..65ba3a1 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -1227,6 +1227,7 @@ impl InsightGenerator { is_current: true, training_messages: None, backend: "local".to_string(), + fewshot_source_ids: None, }; let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao"); @@ -2634,6 +2635,159 @@ Return ONLY the summary, nothing else."#, /// text, and runs the loop through OpenRouter (chat only — embeddings /// and describe calls stay local in either mode). #[allow(clippy::too_many_arguments)] + /// Render a set of prior-conversation transcripts into a compact + /// trajectory block for inclusion in the system prompt. Tool results + /// are summarised to one line each so the prompt stays small. + fn render_fewshot_examples(examples: &[Vec]) -> String { + if examples.is_empty() { + return String::new(); + } + + let mut out = String::from("## Examples of strong context-gathering\n\n"); + out.push_str( + "The following are compressed trajectories from prior high-quality insights. \ + They show the *pattern* of tool use, not answers to copy.\n\n", + ); + + for (i, msgs) in examples.iter().enumerate() { + out.push_str(&format!("### Example {}\n\n", i + 1)); + out.push_str(&Self::render_single_trajectory(msgs)); + out.push('\n'); + } + + out.push_str("---\n\n"); + out + } + + fn render_single_trajectory(msgs: &[ChatMessage]) -> String { + let mut out = String::new(); + + if let Some(first_user) = msgs.iter().find(|m| m.role == "user") { + let trimmed = first_user + .content + .lines() + .filter(|l| !l.trim().is_empty()) + .take(8) + .collect::>() + .join("\n"); + out.push_str(&format!("Input:\n{}\n\n", trimmed)); + } + + out.push_str("Trajectory:\n"); + let mut step = 1; + let mut final_content: Option = None; + + for (i, m) in msgs.iter().enumerate() { + if m.role != "assistant" { + continue; + } + if let Some(ref calls) = m.tool_calls { + for call in calls { + let args_brief = Self::brief_json_args(&call.function.arguments); + let result_summary = msgs + .get(i + 1) + .filter(|r| r.role == "tool") + .map(|r| Self::summarize_tool_result(&call.function.name, &r.content)) + .unwrap_or_else(|| "(no result)".to_string()); + out.push_str(&format!( + "{}. {}({}) -> {}\n", + step, call.function.name, args_brief, result_summary + )); + step += 1; + } + } else if !m.content.is_empty() { + final_content = Some(m.content.clone()); + } + } + + if let Some(content) = final_content { + let short: String = content.chars().take(240).collect(); + out.push_str(&format!("\nFinal insight: {}...\n", short)); + } + + out + } + + fn brief_json_args(v: &serde_json::Value) -> String { + let Some(obj) = v.as_object() else { + return v.to_string(); + }; + obj.iter() + .map(|(k, v)| { + let rendered = match v { + serde_json::Value::String(s) if s.len() > 40 => { + format!("\"{}...\"", &s[..40]) + } + _ => v.to_string(), + }; + format!("{}={}", k, rendered) + }) + .collect::>() + .join(", ") + } + + /// Collapse a raw tool-result string (the text the model saw) into a + /// short phrase suitable for a few-shot trajectory. Detects the + /// "Found N ...", "No ...", and "Error ..." idioms used by the tool + /// implementations in this file. Unknown shapes fall back to a char + /// count, which is deliberately visible so drift shows up in output. + fn summarize_tool_result(tool_name: &str, raw: &str) -> String { + if raw.starts_with("Error ") { + return "error".to_string(); + } + if raw.starts_with("No ") || raw.starts_with("Could not ") { + return "empty (pivoted)".to_string(); + } + + if let Some(rest) = raw.strip_prefix("Found ") + && let Some(n_str) = rest.split_whitespace().next() + && let Ok(n) = n_str.parse::() + { + let kind = match tool_name { + "search_messages" | "get_sms_messages" => "messages", + "get_calendar_events" => "events", + "get_location_history" => "location records", + _ => "results", + }; + return format!("{} {}", n, kind); + } + + match tool_name { + "search_rag" => { + let n = raw.split("\n\n").filter(|s| !s.trim().is_empty()).count(); + format!("{} rag hits", n) + } + "get_file_tags" => { + let n = raw.split(',').filter(|s| !s.trim().is_empty()).count(); + format!("{} tags", n) + } + "describe_photo" => { + let short: String = raw.chars().take(80).collect(); + format!("described: \"{}...\"", short) + } + "reverse_geocode" => { + let short: String = raw.chars().take(60).collect(); + format!("place: {}", short) + } + "recall_entities" | "recall_facts_for_photo" => { + let n = raw.lines().skip(1).filter(|l| !l.trim().is_empty()).count(); + let kind = if tool_name == "recall_entities" { + "entities" + } else { + "facts" + }; + format!("{} {}", n, kind) + } + "store_entity" | "store_fact" => raw + .split_whitespace() + .find_map(|tok| tok.strip_prefix("ID:")) + .map(|id| format!("stored id={}", id.trim_end_matches(','))) + .unwrap_or_else(|| "stored".to_string()), + "get_current_datetime" => "time noted".to_string(), + _ => format!("{} chars", raw.len()), + } + } + pub async fn generate_agentic_insight_for_photo( &self, file_path: &str, @@ -2646,6 +2800,8 @@ Return ONLY the summary, nothing else."#, min_p: Option, max_iterations: usize, backend: Option, + fewshot_examples: Vec>, + fewshot_source_ids: Vec, ) -> Result<(Option, Option)> { let tracer = global_tracer(); let current_cx = opentelemetry::Context::current(); @@ -2990,8 +3146,10 @@ Return ONLY the summary, nothing else."#, ), None => String::new(), }; + let fewshot_block = Self::render_fewshot_examples(&fewshot_examples); let base_system = format!( "You are a personal photo memory assistant helping to reconstruct a memory from a photo.{owner_id_note}\n\n\ + {fewshot_block}\ IMPORTANT INSTRUCTIONS:\n\ 1. You MUST call multiple tools to gather context BEFORE writing any final insight. Do not produce a final answer after only one or two tool calls.\n\ 2. When calling get_sms_messages and search_rag, always make at least one call WITHOUT a contact filter to capture what else was happening in {owner_name}'s life around this date — other conversations, events, and activities provide important wider context even when a specific contact is known.\n\ @@ -3002,6 +3160,7 @@ Return ONLY the summary, nothing else."#, 7. If a tool returns no results, that is useful information — continue calling the remaining tools anyway.\n\ 8. You have a hard budget of {max_iterations} tool-calling iterations before the loop ends. Plan your context gathering so you can write a complete final insight within that budget.", owner_id_note = owner_id_note, + fewshot_block = fewshot_block, owner_name = owner_name, max_iterations = max_iterations ); @@ -3153,12 +3312,10 @@ Return ONLY the summary, nothing else."#, "Agentic loop exhausted after {} iterations, requesting final answer", iterations_used ); - messages.push(ChatMessage::user( - &format!( - "Based on the context gathered, please write the final photo insight: a title and a detailed personal summary. Write in first person as {}.", - user_display_name() - ), - )); + messages.push(ChatMessage::user(format!( + "Based on the context gathered, please write the final photo insight: a title and a detailed personal summary. Write in first person as {}.", + user_display_name() + ))); let (final_response, prompt_tokens, eval_tokens) = chat_backend .chat_with_tools(messages.clone(), vec![]) .await?; @@ -3204,6 +3361,11 @@ Return ONLY the summary, nothing else."#, // 15. Store insight (returns the persisted row including its new id) let model_version = chat_backend.primary_model().to_string(); + let fewshot_source_ids_json = if fewshot_source_ids.is_empty() { + None + } else { + Some(serde_json::to_string(&fewshot_source_ids).unwrap_or_else(|_| "[]".to_string())) + }; let insight = InsertPhotoInsight { library_id: crate::libraries::PRIMARY_LIBRARY_ID, file_path: file_path.to_string(), @@ -3214,6 +3376,7 @@ Return ONLY the summary, nothing else."#, is_current: true, training_messages, backend: backend_label.clone(), + fewshot_source_ids: fewshot_source_ids_json, }; let stored = { @@ -3333,6 +3496,7 @@ Return ONLY the summary, nothing else."#, #[cfg(test)] mod tests { use super::*; + use crate::ai::ollama::{ToolCall, ToolCallFunction}; #[test] fn combine_contexts_includes_tags_section_when_tags_present() { @@ -3374,4 +3538,219 @@ mod tests { let result = InsightGenerator::combine_contexts(None, None, None, None, None); assert_eq!(result, "No additional context available"); } + + // These tests assert the shape of the strings returned by the tool + // implementations above. If a tool's output format changes, update the + // tool AND the corresponding arm of `summarize_tool_result` — these + // tests exist to make that coupling loud. + + #[test] + fn summarize_errors_uniformly() { + assert_eq!( + InsightGenerator::summarize_tool_result("search_rag", "Error searching RAG: boom"), + "error" + ); + assert_eq!( + InsightGenerator::summarize_tool_result( + "get_sms_messages", + "Error fetching SMS messages: timeout" + ), + "error" + ); + } + + #[test] + fn summarize_empty_results_uniformly() { + assert_eq!( + InsightGenerator::summarize_tool_result("search_rag", "No relevant messages found."), + "empty (pivoted)" + ); + assert_eq!( + InsightGenerator::summarize_tool_result("get_sms_messages", "No messages found."), + "empty (pivoted)" + ); + assert_eq!( + InsightGenerator::summarize_tool_result( + "reverse_geocode", + "Could not resolve coordinates to a place name." + ), + "empty (pivoted)" + ); + assert_eq!( + InsightGenerator::summarize_tool_result( + "recall_facts_for_photo", + "No knowledge facts found for this photo." + ), + "empty (pivoted)" + ); + } + + #[test] + fn summarize_found_count_per_tool() { + assert_eq!( + InsightGenerator::summarize_tool_result( + "get_sms_messages", + "Found 7 messages:\n[2023-08-15 10:00] Sarah: hi" + ), + "7 messages" + ); + assert_eq!( + InsightGenerator::summarize_tool_result( + "search_messages", + "Found 3 messages (mode: hybrid):\n\n[2023-08-15] Sarah — hi" + ), + "3 messages" + ); + assert_eq!( + InsightGenerator::summarize_tool_result( + "get_calendar_events", + "Found 2 calendar events:\n[2023-08-15 10:00] Wedding" + ), + "2 events" + ); + assert_eq!( + InsightGenerator::summarize_tool_result( + "get_location_history", + "Found 5 location records:\n[2023-08-15 10:00] 39.0, -120.0" + ), + "5 location records" + ); + } + + #[test] + fn summarize_search_rag_counts_hits() { + let raw = "[2023-08-15] Sarah: venue confirmed\n\n[2023-08-14] Mom: travel plans\n\n[2023-08-13] Dad: weather"; + assert_eq!( + InsightGenerator::summarize_tool_result("search_rag", raw), + "3 rag hits" + ); + } + + #[test] + fn summarize_get_file_tags() { + assert_eq!( + InsightGenerator::summarize_tool_result("get_file_tags", "wedding, tahoe, 2023"), + "3 tags" + ); + } + + #[test] + fn summarize_describe_photo_truncates() { + let raw = "A wedding ceremony at Lake Tahoe with about 40 guests seated in rows facing a lakeside arch decorated with white flowers."; + let out = InsightGenerator::summarize_tool_result("describe_photo", raw); + assert!(out.starts_with("described: \"")); + assert!(out.contains("A wedding ceremony at Lake Tahoe")); + assert!(out.ends_with("...\"")); + } + + #[test] + fn summarize_reverse_geocode_returns_place() { + let out = + InsightGenerator::summarize_tool_result("reverse_geocode", "South Lake Tahoe, CA, USA"); + assert_eq!(out, "place: South Lake Tahoe, CA, USA"); + } + + #[test] + fn summarize_recall_entities_counts_lines() { + let raw = "Known entities:\n- Sarah (person)\n- Tahoe (place)\n- Wedding 2023 (event)"; + assert_eq!( + InsightGenerator::summarize_tool_result("recall_entities", raw), + "3 entities" + ); + } + + #[test] + fn summarize_recall_facts_counts_lines() { + let raw = "Knowledge for this photo:\n- Sarah: college friend\n- Tahoe: vacation spot"; + assert_eq!( + InsightGenerator::summarize_tool_result("recall_facts_for_photo", raw), + "2 facts" + ); + } + + #[test] + fn summarize_store_entity_extracts_id() { + assert_eq!( + InsightGenerator::summarize_tool_result( + "store_entity", + "Entity stored: ID:42 | person | Sarah | confidence:0.80" + ), + "stored id=42" + ); + } + + #[test] + fn summarize_store_fact_extracts_id() { + assert_eq!( + InsightGenerator::summarize_tool_result( + "store_fact", + "Stored new fact: ID:17 | confidence:0.60" + ), + "stored id=17" + ); + assert_eq!( + InsightGenerator::summarize_tool_result( + "store_fact", + "Corroborated existing fact: ID:17 | confidence:0.85" + ), + "stored id=17" + ); + } + + #[test] + fn summarize_current_datetime() { + assert_eq!( + InsightGenerator::summarize_tool_result( + "get_current_datetime", + "Current date/time: 2024-01-15 12:00:00 PST (Monday)" + ), + "time noted" + ); + } + + #[test] + fn summarize_unknown_tool_falls_back_to_char_count() { + let out = InsightGenerator::summarize_tool_result("never_heard_of_it", "some output"); + assert_eq!(out, "11 chars"); + } + + #[test] + fn render_fewshot_empty_returns_empty_string() { + assert!(InsightGenerator::render_fewshot_examples(&[]).is_empty()); + } + + #[test] + fn render_single_trajectory_walks_tool_calls_in_order() { + let arguments = serde_json::json!({ "query": "wedding", "date": "2023-08-15" }); + let msgs = vec![ + ChatMessage::system("ignored"), + ChatMessage::user("Photo file path: /photos/img.jpg\nDate taken: August 15, 2023"), + ChatMessage { + role: "assistant".to_string(), + content: String::new(), + tool_calls: Some(vec![ToolCall { + function: ToolCallFunction { + name: "search_rag".to_string(), + arguments, + }, + id: None, + }]), + images: None, + }, + ChatMessage::tool_result("No relevant messages found."), + ChatMessage { + role: "assistant".to_string(), + content: "Final title\n\nFinal body.".to_string(), + tool_calls: None, + images: None, + }, + ]; + let out = InsightGenerator::render_single_trajectory(&msgs); + assert!(out.contains("Input:")); + assert!(out.contains("/photos/img.jpg")); + assert!(out.contains("1. search_rag(")); + assert!(out.contains("query=\"wedding\"")); + assert!(out.contains("-> empty (pivoted)")); + assert!(out.contains("Final insight: Final title")); + } } diff --git a/src/ai/ollama.rs b/src/ai/ollama.rs index 81185f0..eb810d2 100644 --- a/src/ai/ollama.rs +++ b/src/ai/ollama.rs @@ -4,6 +4,7 @@ use chrono::NaiveDate; use reqwest::Client; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -19,6 +20,19 @@ pub use crate::ai::llm_client::{ToolCall, ToolCallFunction, ToolFunction}; // Cache duration: 15 minutes const CACHE_DURATION_SECS: u64 = 15 * 60; +/// Default total request timeout for generation calls, in seconds. +/// Overridable via `OLLAMA_REQUEST_TIMEOUT_SECONDS` env var for slow +/// CPU-offloaded models where inference can take several minutes. +const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 120; + +fn configured_request_timeout_secs() -> u64 { + std::env::var("OLLAMA_REQUEST_TIMEOUT_SECONDS") + .ok() + .and_then(|v| v.parse::().ok()) + .filter(|&s| s > 0) + .unwrap_or(DEFAULT_REQUEST_TIMEOUT_SECS) +} + /// Embedding model used across the app. Callers that persist a /// `model_version` alongside an embedding should read this constant so the /// stored label always matches what `generate_embeddings` actually ran. @@ -65,6 +79,12 @@ pub struct OllamaClient { top_p: Option, top_k: Option, min_p: Option, + /// Sticky preference shared across clones: when the fallback server + /// succeeded most recently, try it first on the next call. Avoids + /// re-probing the primary with a model it doesn't have loaded across + /// every iteration of the agent loop. `Arc` so cloning + /// `OllamaClient` shares the flag rather than resetting it. + prefer_fallback: Arc, } impl OllamaClient { @@ -77,7 +97,7 @@ impl OllamaClient { Self { client: Client::builder() .connect_timeout(Duration::from_secs(5)) // Quick connection timeout - .timeout(Duration::from_secs(120)) // Total request timeout for generation + .timeout(Duration::from_secs(configured_request_timeout_secs())) .build() .unwrap_or_else(|_| Client::new()), primary_url, @@ -89,9 +109,44 @@ impl OllamaClient { top_p: None, top_k: None, min_p: None, + prefer_fallback: Arc::new(AtomicBool::new(false)), } } + /// Return the server attempt order as `(label, url, model)` tuples. + /// Respects the sticky `prefer_fallback` flag so the most recently + /// successful server is tried first. + fn attempt_order(&self) -> Vec<(&'static str, String, String)> { + let primary = ( + "primary", + self.primary_url.clone(), + self.primary_model.clone(), + ); + let fallback = self.fallback_url.as_ref().map(|url| { + let model = self + .fallback_model + .clone() + .unwrap_or_else(|| self.primary_model.clone()); + ("fallback", url.clone(), model) + }); + + let prefer_fallback = fallback.is_some() && self.prefer_fallback.load(Ordering::Relaxed); + + let mut order = Vec::with_capacity(2); + if prefer_fallback { + if let Some(fb) = fallback.clone() { + order.push(fb); + } + order.push(primary); + } else { + order.push(primary); + if let Some(fb) = fallback { + order.push(fb); + } + } + order + } + pub fn set_num_ctx(&mut self, num_ctx: Option) { self.num_ctx = num_ctx; } @@ -587,68 +642,57 @@ Analyze the image and use specific details from both the visual content and the /// Send a chat request with tool definitions to /api/chat. /// Returns the assistant's response message (may contain tool_calls or final content). - /// Uses primary/fallback URL routing same as other generation methods. + /// Tries servers in preference order — most recently successful first — + /// so a fallback-only model doesn't re-404 against the primary on every + /// iteration of the agent loop. pub async fn chat_with_tools( &self, messages: Vec, tools: Vec, ) -> Result<(ChatMessage, Option, Option)> { - // Try primary server first - log::info!( - "Attempting chat_with_tools with primary server: {} (model: {})", - self.primary_url, - self.primary_model - ); - let primary_result = self - .try_chat_with_tools(&self.primary_url, messages.clone(), tools.clone()) - .await; - - match primary_result { - Ok(result) => { - log::info!("Successfully got chat_with_tools response from primary server"); - Ok(result) - } - Err(e) => { - log::warn!("Primary server chat_with_tools failed: {}", e); - - // Try fallback server if available - if let Some(fallback_url) = &self.fallback_url { - let fallback_model = - self.fallback_model.as_ref().unwrap_or(&self.primary_model); + let order = self.attempt_order(); + let mut errors: Vec = Vec::new(); + for (label, url, model) in &order { + log::info!( + "Attempting chat_with_tools with {} server: {} (model: {})", + label, + url, + model + ); + match self + .try_chat_with_tools(url, messages.clone(), tools.clone()) + .await + { + Ok(result) => { log::info!( - "Attempting chat_with_tools with fallback server: {} (model: {})", - fallback_url, - fallback_model + "Successfully got chat_with_tools response from {} server", + label ); - match self - .try_chat_with_tools(fallback_url, messages, tools) - .await - { - Ok(result) => { - log::info!( - "Successfully got chat_with_tools response from fallback server" - ); - Ok(result) - } - Err(fallback_e) => { - log::error!( - "Fallback server chat_with_tools also failed: {}", - fallback_e - ); - Err(anyhow::anyhow!( - "Both primary and fallback servers failed. Primary: {}, Fallback: {}", - e, - fallback_e - )) - } - } - } else { - log::error!("No fallback server configured"); - Err(e) + self.prefer_fallback + .store(*label == "fallback", Ordering::Relaxed); + return Ok(result); + } + Err(e) => { + log::warn!("{} server chat_with_tools failed: {}", label, e); + errors.push(format!("{}: {}", label, e)); } } } + + if order.len() <= 1 { + log::error!("No fallback server configured; chat_with_tools exhausted"); + } else { + log::error!( + "All {} servers failed for chat_with_tools ({})", + order.len(), + errors.join(" / ") + ); + } + Err(anyhow::anyhow!( + "chat_with_tools failed on all servers: {}", + errors.join(" / ") + )) } /// Streaming variant of `chat_with_tools`. Tries primary, then falls @@ -662,26 +706,30 @@ Analyze the image and use specific details from both the visual content and the messages: Vec, tools: Vec, ) -> Result>> { - // Attempt primary. If it can't be opened at all, try fallback. - match self - .try_chat_with_tools_stream(&self.primary_url, messages.clone(), tools.clone()) - .await - { - Ok(s) => Ok(s), - Err(e) => { - if let Some(fallback_url) = self.fallback_url.clone() { - log::warn!( - "Streaming chat primary failed ({}); trying fallback {}", - e, - fallback_url - ); - self.try_chat_with_tools_stream(&fallback_url, messages, tools) - .await - } else { - Err(e) + // Same preference logic as `chat_with_tools`. Only the initial + // connection is retried across servers — once the stream begins, + // mid-stream errors propagate to the caller. + let order = self.attempt_order(); + let mut last_err: Option = None; + + for (label, url, _model) in &order { + match self + .try_chat_with_tools_stream(url, messages.clone(), tools.clone()) + .await + { + Ok(s) => { + self.prefer_fallback + .store(*label == "fallback", Ordering::Relaxed); + return Ok(s); + } + Err(e) => { + log::warn!("Streaming chat on {} server failed: {}", label, e); + last_err = Some(e); } } } + + Err(last_err.unwrap_or_else(|| anyhow::anyhow!("No Ollama server configured"))) } async fn try_chat_with_tools_stream( @@ -859,8 +907,12 @@ Analyze the image and use specific details from both the visual content and the if !response.status().is_success() { let status = response.status(); let body = response.text().await.unwrap_or_default(); - log::error!( - "chat_with_tools request body that caused {}: {}", + // warn, not error — the outer `chat_with_tools` may recover via + // the fallback server. When both fail, the outer layer emits the + // actual error log. + log::warn!( + "chat_with_tools request to {} got {}: {}", + base_url, status, request_json ); diff --git a/src/ai/openrouter.rs b/src/ai/openrouter.rs index 1559479..82195cf 100644 --- a/src/ai/openrouter.rs +++ b/src/ai/openrouter.rs @@ -452,8 +452,7 @@ impl LlmClient for OpenRouterClient { // SSE frames are delimited by a blank line. Walk the buffer // for "\n\n" markers; anything before them is a complete // frame (possibly multi-line). - loop { - let Some(sep) = find_double_newline(&buf) else { break }; + while let Some(sep) = find_double_newline(&buf) { let frame = buf.drain(..sep + 2).collect::>(); let frame_str = match std::str::from_utf8(&frame) { Ok(s) => s, diff --git a/src/bin/populate_knowledge.rs b/src/bin/populate_knowledge.rs index f70c5a2..b3239ca 100644 --- a/src/bin/populate_knowledge.rs +++ b/src/bin/populate_knowledge.rs @@ -251,6 +251,8 @@ async fn main() -> anyhow::Result<()> { args.min_p, args.max_iterations, None, + Vec::new(), + Vec::new(), ) .await { diff --git a/src/database/daily_summary_dao.rs b/src/database/daily_summary_dao.rs index 6ea560a..276a5a2 100644 --- a/src/database/daily_summary_dao.rs +++ b/src/database/daily_summary_dao.rs @@ -268,7 +268,7 @@ impl DailySummaryDao for SqliteDailySummaryDao { .into_iter() .take(limit) .map(|(similarity, summary)| { - log::info!( + log::debug!( "Summary match: similarity={:.3}, date={}, contact={}, summary=\"{}\"", similarity, summary.date, @@ -388,7 +388,7 @@ impl DailySummaryDao for SqliteDailySummaryDao { .into_iter() .take(limit) .map(|(combined, similarity, days, summary)| { - log::info!( + log::debug!( "Summary match: combined={:.3} (sim={:.3}, days={}), date={}, contact={}, summary=\"{}\"", combined, similarity, diff --git a/src/database/insights_dao.rs b/src/database/insights_dao.rs index 34c1d12..2821b5b 100644 --- a/src/database/insights_dao.rs +++ b/src/database/insights_dao.rs @@ -38,6 +38,16 @@ pub trait InsightDao: Sync + Send { file_path: &str, ) -> Result, DbError>; + /// Fetch a single insight by primary key, regardless of `is_current`. + /// Used by the few-shot injection flow where the caller picks specific + /// historical insights (which may have been superseded) as training + /// exemplars for a fresh generation. + fn get_insight_by_id( + &mut self, + context: &opentelemetry::Context, + insight_id: i32, + ) -> Result, DbError>; + fn delete_insight( &mut self, context: &opentelemetry::Context, @@ -198,6 +208,25 @@ impl InsightDao for SqliteInsightDao { .map_err(|_| DbError::new(DbErrorKind::QueryError)) } + fn get_insight_by_id( + &mut self, + context: &opentelemetry::Context, + insight_id: i32, + ) -> Result, DbError> { + trace_db_call(context, "query", "get_insight_by_id", |_span| { + use schema::photo_insights::dsl::*; + + let mut connection = self.connection.lock().expect("Unable to get InsightDao"); + + photo_insights + .find(insight_id) + .first::(connection.deref_mut()) + .optional() + .map_err(|_| anyhow::anyhow!("Query error")) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + fn delete_insight( &mut self, context: &opentelemetry::Context, diff --git a/src/database/models.rs b/src/database/models.rs index 3d63f1a..96d9c53 100644 --- a/src/database/models.rs +++ b/src/database/models.rs @@ -102,6 +102,12 @@ pub struct InsertPhotoInsight { pub training_messages: Option, /// `"local"` (Ollama with images) | `"hybrid"` (local vision + OpenRouter chat). pub backend: String, + /// JSON array of insight ids whose `training_messages` were compressed + /// and injected into the system prompt as few-shot exemplars when this + /// row was generated. `None` means no few-shot was used (pristine + /// generation). Used downstream to filter out contaminated rows when + /// assembling an unbiased training / evaluation set. + pub fewshot_source_ids: Option, } #[derive(Serialize, Queryable, Clone, Debug)] @@ -119,6 +125,7 @@ pub struct PhotoInsight { pub approved: Option, /// `"local"` (Ollama with images) | `"hybrid"` (local vision + OpenRouter chat). pub backend: String, + pub fewshot_source_ids: Option, } // --- Libraries --- diff --git a/src/database/schema.rs b/src/database/schema.rs index 200cb15..e49f21f 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -143,6 +143,7 @@ diesel::table! { training_messages -> Nullable, approved -> Nullable, backend -> Text, + fewshot_source_ids -> Nullable, } }