diff --git a/src/ai/handlers.rs b/src/ai/handlers.rs index 5e46418..b3beeda 100644 --- a/src/ai/handlers.rs +++ b/src/ai/handlers.rs @@ -510,7 +510,9 @@ pub async fn generate_insight_handler( } Ok(Ok(Err(e))) => { log::error!("Insight generation failed for {}: {:?}", path, e); - if let Err(err) = dao.fail_job(&ctx, job_id, &format!("{:?}", e)) { + // `{:#}` = one-line context chain; the job's error_message is + // returned to the client verbatim, so no Debug/backtrace here. + if let Err(err) = dao.fail_job(&ctx, job_id, &format!("{:#}", e)) { log::error!("Failed to mark job {} as failed: {:?}", job_id, err); } } @@ -884,7 +886,9 @@ pub async fn generate_agentic_insight_handler( } Ok(Ok(Err(e))) => { log::error!("Agentic insight generation failed for {}: {:?}", path, e); - if let Err(err) = dao.fail_job(&ctx, job_id, &format!("{:?}", e)) { + // `{:#}` = one-line context chain; the job's error_message is + // returned to the client verbatim, so no Debug/backtrace here. + if let Err(err) = dao.fail_job(&ctx, job_id, &format!("{:#}", e)) { log::error!("Failed to mark job {} as failed: {:?}", job_id, err); } } diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index a6a50f1..1ac4db3 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -575,6 +575,67 @@ impl InsightGenerator { Ok(formatted) } + /// Semantic search over daily summaries for the agentic `search_rag` + /// tool. Embeds the caller's query as-is (no metadata boilerplate) and + /// only applies time weighting when an anchor date is provided — + /// without one, results rank purely by similarity across all time. + async fn search_summaries_semantic( + &self, + query: &str, + date: Option, + limit: usize, + ) -> Result> { + let tracer = global_tracer(); + let current_cx = opentelemetry::Context::current(); + let mut span = tracer.start_with_context("ai.rag.search_daily_summaries", ¤t_cx); + span.set_attribute(KeyValue::new("query", query.to_string())); + span.set_attribute(KeyValue::new("limit", limit as i64)); + span.set_attribute(KeyValue::new("time_weighted", date.is_some())); + if let Some(d) = date { + span.set_attribute(KeyValue::new("date", d.to_string())); + } + let search_cx = current_cx.with_span(span); + + log::info!("RAG QUERY: {} (anchor date: {:?})", query, date); + + // Must use the same backend that populated the daily-summary + // embeddings or similarity search is garbage (see embed_one docs). + let query_embedding = + crate::ai::embed_one(&self.ollama, self.llamacpp.as_deref(), query).await?; + + let mut summary_dao = self + .daily_summary_dao + .lock() + .expect("Unable to lock DailySummaryDao"); + + let similar_summaries = match date { + Some(d) => summary_dao.find_similar_summaries_with_time_weight( + &search_cx, + &query_embedding, + &d.format("%Y-%m-%d").to_string(), + limit, + ), + None => summary_dao.find_similar_summaries(&search_cx, &query_embedding, limit), + } + .map_err(|e| anyhow::anyhow!("Failed to find similar summaries: {:?}", e))?; + + search_cx.span().set_attribute(KeyValue::new( + "results_count", + similar_summaries.len() as i64, + )); + search_cx.span().set_status(Status::Ok); + + Ok(similar_summaries + .into_iter() + .map(|s| { + format!( + "[{}] {} ({} messages):\n{}", + s.date, s.contact, s.message_count, s.summary + ) + }) + .collect()) + } + /// Build a metadata-based query (fallback when no topics available) fn build_metadata_query( date: chrono::NaiveDate, @@ -1737,13 +1798,12 @@ Return ONLY the summary, nothing else."#, Some(q) => q.to_string(), None => return "Error: missing required parameter 'query'".to_string(), }; - let date_str = match args.get("date").and_then(|v| v.as_str()) { - Some(d) => d, - None => return "Error: missing required parameter 'date'".to_string(), - }; - let date = match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") { - Ok(d) => d, - Err(e) => return format!("Error: failed to parse date '{}': {}", date_str, e), + let date = match args.get("date").and_then(|v| v.as_str()) { + Some(d) => match NaiveDate::parse_from_str(d, "%Y-%m-%d") { + Ok(d) => Some(d), + Err(e) => return format!("Error: failed to parse date '{}': {}", d, e), + }, + None => None, }; let contact = args .get("contact") @@ -1756,7 +1816,7 @@ Return ONLY the summary, nothing else."#, .clamp(1, 25) as usize; log::info!( - "tool_search_rag: query='{}', date={}, contact={:?}, limit={}", + "tool_search_rag: query='{}', date={:?}, contact={:?}, limit={}", query, date, contact, @@ -1777,15 +1837,17 @@ Return ONLY the summary, nothing else."#, limit }; + // Embed the model's query verbatim — a soft contact bias is the + // only decoration. The metadata boilerplate ("On , it was a + // ") that find_relevant_messages_rag prepends drowns the + // semantic signal, so the tool path deliberately bypasses it. + let search_query = match contact.as_deref() { + Some(c) => format!("{} (conversation with {})", query, c), + None => query.clone(), + }; + let results = match self - .find_relevant_messages_rag( - date, - None, - contact.as_deref(), - None, - candidate_limit, - Some(&query), - ) + .search_summaries_semantic(&search_query, date, candidate_limit) .await { Ok(results) if !results.is_empty() => results, @@ -2062,12 +2124,15 @@ Return ONLY the summary, nothing else."#, /// Render a list of [`SmsSearchHit`] for the LLM. Prefers the SMS-API /// snippet (which already excerpts the matched span and is the only /// preview MMS-attachment-only matches have) over the full body, and - /// strips the `` tags the snippet ships with. + /// strips the `` tags the snippet ships with. Each line names + /// both parties (`sender → recipient`) — results can span multiple + /// conversations, and a sender-only label leaves sent messages + /// unattributable to a thread. fn format_search_hits(hits: &[SmsSearchHit], mode: &str, date_filtered: bool) -> String { let user_name = user_display_name(); let mut out = String::new(); out.push_str(&format!( - "Found {} messages (mode: {}{}):\n\n", + "Found {} messages (mode: {}{}, sender → recipient):\n\n", hits.len(), mode, if date_filtered { ", date-filtered" } else { "" } @@ -2076,10 +2141,10 @@ Return ONLY the summary, nothing else."#, let date = chrono::DateTime::from_timestamp(h.date, 0) .map(|dt| dt.format("%Y-%m-%d").to_string()) .unwrap_or_else(|| h.date.to_string()); - let direction: &str = if h.type_ == 2 { - &user_name + let direction = if h.type_ == 2 { + format!("{} → {}", user_name, h.contact_name) } else { - &h.contact_name + format!("{} → {}", h.contact_name, user_name) }; let score = h .similarity_score @@ -2150,11 +2215,18 @@ Return ONLY the summary, nothing else."#, { Ok(messages) if !messages.is_empty() => { let user_name = user_display_name(); + // Name both parties — without a contact filter the window + // spans every conversation, and a sender-only label leaves + // sent messages unattributable to a thread. let formatted: Vec = messages .iter() .take(limit) .map(|m| { - let sender: &str = if m.is_sent { &user_name } else { &m.contact }; + let direction = if m.is_sent { + format!("{} → {}", user_name, m.contact) + } else { + format!("{} → {}", m.contact, user_name) + }; let ts = DateTime::from_timestamp(m.timestamp, 0) .map(|dt| { dt.with_timezone(&Local) @@ -2162,7 +2234,7 @@ Return ONLY the summary, nothing else."#, .to_string() }) .unwrap_or_else(|| "unknown".to_string()); - format!("[{}] {}: {}", ts, sender, m.body) + format!("[{}] {}: {}", ts, direction, m.body) }) .collect(); format!( @@ -3206,21 +3278,25 @@ Return ONLY the summary, nothing else."#, if opts.daily_summaries_present { tools.push(Tool::function( "search_rag", - "Date-anchored semantic search over the user's daily-summary corpus. \ - Returns up to `limit` summaries most semantically similar to `query`, \ - weighted toward summaries near `date`. For raw message text across all \ - time, prefer `search_messages`. \ - Examples: `{query: \"family dinner\", date: \"2018-12-24\"}` — what \ + "Semantic search over the user's daily-summary corpus. Returns up to \ + `limit` summaries most semantically similar to `query`. Pass `date` \ + to anchor in time: summaries near that date rank higher and matches \ + months away decay sharply. Omit `date` to rank purely by semantic \ + similarity across all time — do this for \"when did X happen?\" \ + questions where the date is unknown. For raw message text, prefer \ + `search_messages`. \ + Examples: `{query: \"family dinner\"}` — best matches across all \ + time. `{query: \"family dinner\", date: \"2018-12-24\"}` — what \ daily summaries near Christmas Eve mention family / dinner / gathering. \ `{query: \"work travel\", date: \"2019-06-15\", contact: \"Alice\"}` — \ - narrowed to summaries that involve Alice.", + biased toward summaries that involve Alice.", serde_json::json!({ "type": "object", - "required": ["query", "date"], + "required": ["query"], "properties": { "query": { "type": "string", "description": "Free-text query, semantically matched." }, - "date": { "type": "string", "description": "Anchor date, YYYY-MM-DD. Summaries near this date rank higher." }, - "contact": { "type": "string", "description": "Optional contact name to bias toward conversations with that person." }, + "date": { "type": "string", "description": "Optional anchor date, YYYY-MM-DD. When set, summaries near this date rank higher; omit to search all time evenly." }, + "contact": { "type": "string", "description": "Optional contact name to bias toward conversations with that person (soft semantic bias, not a hard filter)." }, "limit": { "type": "integer", "description": "Max summaries to return (default 10, max 25)." } } }), @@ -4763,12 +4839,22 @@ mod tests { let hit = make_search_hit(1, "Sarah", "see you at the lake tomorrow", None, 1); let out = InsightGenerator::format_search_hits(&[hit], "fts5", false); - assert!(out.starts_with("Found 1 messages (mode: fts5):")); + assert!(out.starts_with("Found 1 messages (mode: fts5")); assert!(out.contains("see you at the lake tomorrow")); - assert!(out.contains("Sarah —")); + // Received message: contact is the sender. + assert!(out.contains("Sarah →")); assert!(!out.contains("date-filtered")); } + #[test] + fn format_search_hits_labels_sent_direction() { + // Sent messages must name the recipient — results can span multiple + // conversations, and a sender-only label left them unattributable. + let hit = make_search_hit(5, "Sarah", "on my way", None, 2); + let out = InsightGenerator::format_search_hits(&[hit], "fts5", false); + assert!(out.contains("→ Sarah —")); + } + #[test] fn format_search_hits_prefers_snippet_over_body_and_strips_marks() { let hit = make_search_hit( @@ -4799,7 +4885,7 @@ mod tests { assert!(out.contains("birthday_cake.jpg")); assert!(!out.contains("")); - assert!(out.contains("Mom —")); + assert!(out.contains("Mom →")); } #[test] diff --git a/src/ai/local_llm.rs b/src/ai/local_llm.rs new file mode 100644 index 0000000..8344a87 --- /dev/null +++ b/src/ai/local_llm.rs @@ -0,0 +1,86 @@ +//! Bundle of the local LLM pair (Ollama + optional llama-swap) with the +//! `LLM_BACKEND` dispatch baked in. +//! +//! Exists because passing the pair around as loose values invited the same +//! bug three times: import/backfill tooling embedded corpora via +//! `OllamaClient` directly while the query side dispatched through +//! `embed_one`, so flipping `LLM_BACKEND=llamacpp` silently split queries +//! and corpus into different vector spaces. Anything that writes or reads +//! embeddings should go through this type (or `embed_one`/`embed_many`), +//! never a concrete client. +//! +//! Deliberately knows nothing about chat policy — hybrid/OpenRouter routing +//! is request-scoped and stays in `ResolvedBackend`. This is only the +//! local stack: embeddings and offline single-shot generation. + +// Constructed by binaries, not the server — dead code from main.rs's view. +#![allow(dead_code)] + +use std::sync::Arc; + +use anyhow::Result; + +use super::llamacpp::LlamaCppClient; +use super::llm_client::LlmClient; +use super::ollama::{EMBEDDING_MODEL, OllamaClient}; + +#[derive(Clone)] +pub struct LocalLlm { + ollama: OllamaClient, + llamacpp: Option>, +} + +impl LocalLlm { + pub fn new(ollama: OllamaClient, llamacpp: Option>) -> Self { + Self { ollama, llamacpp } + } + + /// Construct from the canonical env wiring shared with `AppState`. + pub fn from_env() -> Self { + Self::new( + crate::state::build_ollama_from_env(), + crate::state::build_llamacpp_from_env(), + ) + } + + /// Embed one string via the `LLM_BACKEND`-selected client. + pub async fn embed(&self, text: &str) -> Result> { + super::embed_one(&self.ollama, self.llamacpp.as_deref(), text).await + } + + /// Embed a batch via the `LLM_BACKEND`-selected client. + pub async fn embed_batch(&self, texts: &[&str]) -> Result>> { + super::embed_many(&self.ollama, self.llamacpp.as_deref(), texts).await + } + + /// Single-shot local text generation via the `LLM_BACKEND`-selected + /// client (offline tooling; chat turns belong to `ResolvedBackend`). + pub async fn generate(&self, prompt: &str, system: Option<&str>) -> Result { + if super::local_backend_is_llamacpp() { + if let Some(lc) = self.llamacpp.as_deref() { + return ::generate(lc, prompt, system, None).await; + } + anyhow::bail!( + "LLM_BACKEND=llamacpp but LlamaCppClient is unconfigured — \ + set LLAMA_SWAP_URL or switch to LLM_BACKEND=ollama" + ); + } + self.ollama.generate(prompt, system).await + } + + /// Label identifying which backend + model produces embeddings right + /// now. Store it alongside vectors (`model_version` columns) so a + /// backend flip is detectable in the data, not just in env history. + pub fn embedding_model_version(&self) -> String { + if super::local_backend_is_llamacpp() { + let slot = self + .llamacpp + .as_deref() + .map(|c| c.embedding_model.as_str()) + .unwrap_or("embed"); + format!("llama-swap:{}", slot) + } else { + EMBEDDING_MODEL.to_string() + } + } +} diff --git a/src/ai/mod.rs b/src/ai/mod.rs index a4f5e14..5125a96 100644 --- a/src/ai/mod.rs +++ b/src/ai/mod.rs @@ -9,6 +9,7 @@ pub mod insight_chat; pub mod insight_generator; pub mod llamacpp; pub mod llm_client; +pub mod local_llm; pub mod ollama; pub mod openrouter; pub mod sms_client; @@ -35,6 +36,9 @@ pub use llamacpp::LlamaCppClient; pub use llm_client::{ ChatMessage, LlmClient, ModelCapabilities, Tool, ToolCall, ToolCallFunction, ToolFunction, }; +// LocalLlm is constructed by binaries (reembed_embeddings, importers), not the server +#[allow(unused_imports)] +pub use local_llm::LocalLlm; pub use ollama::{EMBEDDING_MODEL, OllamaClient}; pub use sms_client::{SmsApiClient, SmsMessage}; pub use tts::{ @@ -71,35 +75,49 @@ pub fn local_backend_is_llamacpp() -> bool { ) } -/// Embed one string via the configured local backend. Routes through -/// llama-swap when `LLM_BACKEND=llamacpp` (and a client is configured), -/// else Ollama. Returns the single embedding vector. See -/// [`local_backend_is_llamacpp`] for the rationale on consistency. -pub async fn embed_one( +/// Embed a batch of strings via the configured local backend. Routes +/// through llama-swap when `LLM_BACKEND=llamacpp` (and a client is +/// configured), else Ollama. See [`local_backend_is_llamacpp`] for the +/// rationale on consistency. +pub async fn embed_many( ollama: &OllamaClient, llamacpp: Option<&LlamaCppClient>, - text: &str, -) -> anyhow::Result> { + texts: &[&str], +) -> anyhow::Result>> { if local_backend_is_llamacpp() { if let Some(lc) = llamacpp { - let mut vecs = ::generate_embeddings(lc, &[text]).await?; - return vecs - .pop() - .ok_or_else(|| anyhow::anyhow!("llama-swap returned no embeddings")); + return ::generate_embeddings(lc, texts).await; } anyhow::bail!( "LLM_BACKEND=llamacpp but LlamaCppClient is unconfigured — \ set LLAMA_SWAP_URL or switch to LLM_BACKEND=ollama" ); } - ollama.generate_embedding(text).await + ollama.generate_embeddings(texts).await +} + +/// Embed one string via the configured local backend. Single-text +/// convenience over [`embed_many`]. +pub async fn embed_one( + ollama: &OllamaClient, + llamacpp: Option<&LlamaCppClient>, + text: &str, +) -> anyhow::Result> { + let mut vecs = embed_many(ollama, llamacpp, &[text]).await?; + vecs.pop() + .ok_or_else(|| anyhow::anyhow!("embedding backend returned no embeddings")) } #[cfg(test)] mod env_dispatch_tests { use super::*; + /// Env vars are process-global, and the test harness runs in parallel — + /// without this lock the `LLM_BACKEND` tests race each other and flake. + static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()); + fn with_env(key: &str, val: Option<&str>, f: F) { + let _guard = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner()); let prev = std::env::var(key).ok(); match val { Some(v) => unsafe { std::env::set_var(key, v) }, diff --git a/src/bin/import_calendar.rs b/src/bin/import_calendar.rs index c8f941b..629a794 100644 --- a/src/bin/import_calendar.rs +++ b/src/bin/import_calendar.rs @@ -1,7 +1,7 @@ use anyhow::{Context, Result}; use chrono::Utc; use clap::Parser; -use image_api::ai::ollama::OllamaClient; +use image_api::ai::LocalLlm; use image_api::bin_progress; use image_api::database::calendar_dao::{InsertCalendarEvent, SqliteCalendarEventDao}; use image_api::parsers::ical_parser::parse_ics_file; @@ -44,22 +44,10 @@ async fn main() -> Result<()> { let context = opentelemetry::Context::current(); - let ollama = if args.generate_embeddings { - let primary_url = dotenv::var("OLLAMA_PRIMARY_URL") - .or_else(|_| dotenv::var("OLLAMA_URL")) - .unwrap_or_else(|_| "http://localhost:11434".to_string()); - let fallback_url = dotenv::var("OLLAMA_FALLBACK_URL").ok(); - let primary_model = dotenv::var("OLLAMA_PRIMARY_MODEL") - .or_else(|_| dotenv::var("OLLAMA_MODEL")) - .unwrap_or_else(|_| "nomic-embed-text:v1.5".to_string()); - let fallback_model = dotenv::var("OLLAMA_FALLBACK_MODEL").ok(); - - Some(OllamaClient::new( - primary_url, - fallback_url, - primary_model, - fallback_model, - )) + // LocalLlm dispatches per LLM_BACKEND, so embeddings written here land + // in the same vector space the query side searches. + let llm = if args.generate_embeddings { + Some(LocalLlm::from_env()) } else { None }; @@ -90,7 +78,7 @@ async fn main() -> Result<()> { } // Generate embedding if requested (blocking call) - let embedding = if let Some(ref ollama_client) = ollama { + let embedding = if let Some(ref llm) = llm { let text = format!( "{} {} {}", event.summary, @@ -99,8 +87,7 @@ async fn main() -> Result<()> { ); match tokio::task::block_in_place(|| { - tokio::runtime::Handle::current() - .block_on(async { ollama_client.generate_embedding(&text).await }) + tokio::runtime::Handle::current().block_on(async { llm.embed(&text).await }) }) { Ok(emb) => Some(emb), Err(e) => { diff --git a/src/bin/import_search_history.rs b/src/bin/import_search_history.rs index 21af659..7494392 100644 --- a/src/bin/import_search_history.rs +++ b/src/bin/import_search_history.rs @@ -1,7 +1,7 @@ use anyhow::{Context, Result}; use chrono::Utc; use clap::Parser; -use image_api::ai::ollama::OllamaClient; +use image_api::ai::LocalLlm; use image_api::bin_progress; use image_api::database::search_dao::{InsertSearchRecord, SqliteSearchHistoryDao}; use image_api::parsers::search_html_parser::parse_search_html; @@ -38,16 +38,9 @@ async fn main() -> Result<()> { info!("Found {} search records", searches.len()); - let primary_url = dotenv::var("OLLAMA_PRIMARY_URL") - .or_else(|_| dotenv::var("OLLAMA_URL")) - .unwrap_or_else(|_| "http://localhost:11434".to_string()); - let fallback_url = dotenv::var("OLLAMA_FALLBACK_URL").ok(); - let primary_model = dotenv::var("OLLAMA_PRIMARY_MODEL") - .or_else(|_| dotenv::var("OLLAMA_MODEL")) - .unwrap_or_else(|_| "nomic-embed-text:v1.5".to_string()); - let fallback_model = dotenv::var("OLLAMA_FALLBACK_MODEL").ok(); - - let ollama = OllamaClient::new(primary_url, fallback_url, primary_model, fallback_model); + // LocalLlm dispatches per LLM_BACKEND, so embeddings written here land + // in the same vector space the query side searches. + let llm = LocalLlm::from_env(); let context = opentelemetry::Context::current(); let mut inserted_count = 0usize; @@ -67,12 +60,11 @@ async fn main() -> Result<()> { let pb_for_warn = pb.clone(); let embeddings_result = tokio::task::spawn({ - let ollama_client = ollama.clone(); + let llm = llm.clone(); async move { - // Generate embeddings in parallel for the batch let mut embeddings = Vec::new(); for query in &queries { - match ollama_client.generate_embedding(query).await { + match llm.embed(query).await { Ok(emb) => embeddings.push(Some(emb)), Err(e) => { pb_for_warn.println(format!("embedding failed for '{}': {}", query, e)); diff --git a/src/bin/reembed_embeddings.rs b/src/bin/reembed_embeddings.rs new file mode 100644 index 0000000..26b2fde --- /dev/null +++ b/src/bin/reembed_embeddings.rs @@ -0,0 +1,464 @@ +//! Re-embed stored corpora through `LocalLlm`, i.e. the same +//! `LLM_BACKEND` dispatch the query side uses. The original import / +//! backfill tools always embedded via Ollama, so a deploy running +//! `LLM_BACKEND=llamacpp` queries vector spaces the corpora may not live +//! in. Three tables share the problem and are all covered here: +//! +//! - `daily_conversation_summaries` — re-embeds +//! `strip_summary_boilerplate(summary)` (what the original job fed the +//! embedder); also rewrites `model_version`. +//! - `calendar_events` — re-embeds "summary description location" exactly +//! as `import_calendar` does; rows without an embedding are skipped (the +//! import only embeds under `--generate-embeddings`). +//! - `search_history` — re-embeds the raw query text. +//! - `entities` (knowledge graph) — re-embeds "name description" exactly as +//! `tool_store_entity` does; embedding-less rows are skipped (embedding +//! is best-effort at store time). +//! +//! Source text is untouched — only vectors are rewritten. The old↔new +//! cosine report doubles as a diagnostic: ~1.0 means both backends already +//! shared a space (re-embedding was a no-op); low values confirm the +//! mismatch this tool exists to fix. + +use anyhow::{Context, Result}; +use clap::Parser; +use diesel::prelude::*; +use diesel::sql_query; +use diesel::sqlite::SqliteConnection; +use image_api::ai::{LocalLlm, strip_summary_boilerplate}; +use image_api::bin_progress; +use std::env; + +#[derive(Parser, Debug)] +#[command(author, version, about = "Re-embed stored corpora via the configured LLM_BACKEND", long_about = None)] +struct Args { + /// Comma-separated tables to process: summaries, calendar, search, entities + #[arg(long, default_value = "summaries,calendar,search,entities")] + tables: String, + + /// Only process the first N rows per table (smoke test) + #[arg(long)] + limit: Option, + + /// Compute embeddings and report old↔new similarity without writing + #[arg(long, default_value_t = false)] + dry_run: bool, +} + +#[derive(QueryableByName)] +struct SummaryRow { + #[diesel(sql_type = diesel::sql_types::Integer)] + id: i32, + #[diesel(sql_type = diesel::sql_types::Text)] + summary: String, + #[diesel(sql_type = diesel::sql_types::Binary)] + embedding: Vec, + #[diesel(sql_type = diesel::sql_types::Text)] + model_version: String, +} + +#[derive(QueryableByName)] +struct CalendarRow { + #[diesel(sql_type = diesel::sql_types::Integer)] + id: i32, + #[diesel(sql_type = diesel::sql_types::Text)] + summary: String, + #[diesel(sql_type = diesel::sql_types::Nullable)] + description: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + location: Option, + #[diesel(sql_type = diesel::sql_types::Binary)] + embedding: Vec, +} + +#[derive(QueryableByName)] +struct SearchRow { + #[diesel(sql_type = diesel::sql_types::BigInt)] + id: i64, + #[diesel(sql_type = diesel::sql_types::Text)] + query: String, + #[diesel(sql_type = diesel::sql_types::Binary)] + embedding: Vec, +} + +#[derive(QueryableByName)] +struct EntityRow { + #[diesel(sql_type = diesel::sql_types::Integer)] + id: i32, + #[diesel(sql_type = diesel::sql_types::Text)] + name: String, + #[diesel(sql_type = diesel::sql_types::Text)] + description: String, + #[diesel(sql_type = diesel::sql_types::Binary)] + embedding: Vec, +} + +/// One unit of re-embed work, normalized across tables. +struct WorkItem { + /// Row key, as i64 so both i32 ids and rowids fit. + id: i64, + /// Text fed to the embedder — must match what the original writer used. + text: String, + /// Existing vector bytes, for the old↔new similarity report. + old_embedding: Vec, +} + +fn deserialize_vector(bytes: &[u8]) -> Option> { + if !bytes.len().is_multiple_of(4) { + return None; + } + Some( + bytes + .chunks_exact(4) + .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]])) + .collect(), + ) +} + +fn serialize_vector(vec: &[f32]) -> Vec { + vec.iter().flat_map(|f| f.to_le_bytes()).collect() +} + +fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 { + if a.len() != b.len() { + return 0.0; + } + let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum(); + let mag_a: f32 = a.iter().map(|x| x * x).sum::().sqrt(); + let mag_b: f32 = b.iter().map(|x| x * x).sum::().sqrt(); + if mag_a == 0.0 || mag_b == 0.0 { + return 0.0; + } + dot / (mag_a * mag_b) +} + +/// Embed `text`, halving it on "input too large" errors until it fits the +/// server's physical batch (`--ubatch-size`). Mirrors the silent truncation +/// Ollama applied when these corpora were first embedded — llama-server +/// returns a 500 instead — except here it's surfaced via the returned flag. +/// Returns `(embedding, truncated)`. +async fn embed_with_truncation(llm: &LocalLlm, text: &str) -> Result<(Vec, bool)> { + let mut text = text.to_string(); + let mut truncated = false; + loop { + match llm.embed(&text).await { + Ok(emb) => return Ok((emb, truncated)), + Err(e) + if e.to_string().contains("too large to process") && text.chars().count() > 64 => + { + let keep = text.chars().count() / 2; + text = text.chars().take(keep).collect(); + truncated = true; + } + Err(e) => return Err(e), + } + } +} + +/// Re-embed `items`, writing each new vector via `update`. Returns the +/// old↔new cosines for the similarity report. +async fn reembed_table( + conn: &mut SqliteConnection, + llm: &LocalLlm, + label: &str, + items: Vec, + dry_run: bool, + update: impl Fn(&mut SqliteConnection, i64, Vec) -> Result<()>, +) -> Result> { + println!("\n[{}] re-embedding {} rows...", label, items.len()); + let pb = bin_progress::determinate(items.len() as u64, format!("re-embedding {}", label)); + + let mut sims: Vec = Vec::with_capacity(items.len()); + let mut updated = 0usize; + let mut failed = 0usize; + let mut truncated_count = 0usize; + + for item in &items { + let new_emb = match embed_with_truncation(llm, &item.text).await { + Ok((e, truncated)) => { + if truncated { + truncated_count += 1; + pb.println(format!( + "⚠ {} id={}: input exceeded the embed server's batch size, \ + truncated before embedding", + label, item.id + )); + } + e + } + Err(e) => { + pb.inc(1); + failed += 1; + eprintln!("✗ {} id={}: {}", label, item.id, e); + continue; + } + }; + + // The whole pipeline (DAO checks, stored corpora) assumes 768 dims. + // A different dim means the active backend is not serving a + // nomic-compatible model — stop rather than corrupt the table. + anyhow::ensure!( + new_emb.len() == 768, + "backend returned {}-dim embedding (expected 768) — '{}' is not \ + serving a nomic-embed-text-v1.5-compatible model", + new_emb.len(), + llm.embedding_model_version() + ); + + if let Some(old_emb) = deserialize_vector(&item.old_embedding) { + sims.push(cosine_similarity(&old_emb, &new_emb)); + } + + if !dry_run { + update(conn, item.id, serialize_vector(&new_emb)) + .with_context(|| format!("updating {} id={}", label, item.id))?; + } + updated += 1; + pb.inc(1); + } + pb.finish_and_clear(); + + println!( + "[{}] {} re-embedded ({} truncated), {} failed", + label, updated, truncated_count, failed + ); + Ok(sims) +} + +fn report_similarity(label: &str, mut sims: Vec) { + if sims.is_empty() { + println!("[{}] no old↔new pairs to compare", label); + return; + } + sims.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + let mean: f32 = sims.iter().sum::() / sims.len() as f32; + let median = sims[sims.len() / 2]; + println!( + "[{}] old↔new cosine over identical text: min={:.3} median={:.3} mean={:.3} max={:.3}", + label, + sims.first().unwrap(), + median, + mean, + sims.last().unwrap() + ); + if median > 0.98 { + println!( + "[{}] → old and new backends agree (~same vector space); poor search \ + results are coming from something else (prefixes, thresholds, corpus).", + label + ); + } else if median > 0.9 { + println!( + "[{}] → same model family but measurably different vectors \ + (quantization / runtime drift); re-embedding was worthwhile.", + label + ); + } else { + println!( + "[{}] → vector-space mismatch confirmed — queries were searching a \ + different space than the corpus. This re-embed should fix it.", + label + ); + } +} + +#[tokio::main] +async fn main() -> Result<()> { + dotenv::dotenv().ok(); + env_logger::init(); + let args = Args::parse(); + + let tables: Vec<&str> = args.tables.split(',').map(|t| t.trim()).collect(); + for t in &tables { + anyhow::ensure!( + matches!(*t, "summaries" | "calendar" | "search" | "entities"), + "unknown table '{}' — expected summaries, calendar, search, entities", + t + ); + } + + let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| "auth.db".to_string()); + println!("Database: {}", database_url); + + let mut conn = SqliteConnection::establish(&database_url) + .with_context(|| format!("connecting to {}", database_url))?; + + let llm = LocalLlm::from_env(); + let model_version = llm.embedding_model_version(); + println!("Embedding via '{}'", model_version); + if args.dry_run { + println!("DRY RUN — no rows will be written"); + } + + if tables.contains(&"summaries") { + let mut rows: Vec = sql_query( + "SELECT id, summary, embedding, model_version + FROM daily_conversation_summaries ORDER BY date", + ) + .load(&mut conn) + .context("loading daily summaries")?; + if let Some(limit) = args.limit { + rows.truncate(limit); + } + if let Some(first) = rows.first() { + println!( + "\n[summaries] previous model_version '{}' → '{}'", + first.model_version, model_version + ); + } + let items = rows + .into_iter() + .map(|r| WorkItem { + id: r.id as i64, + text: strip_summary_boilerplate(&r.summary), + old_embedding: r.embedding, + }) + .collect(); + let mv = model_version.clone(); + let sims = reembed_table( + &mut conn, + &llm, + "summaries", + items, + args.dry_run, + move |conn, id, emb| { + sql_query( + "UPDATE daily_conversation_summaries + SET embedding = ?1, model_version = ?2 WHERE id = ?3", + ) + .bind::(emb) + .bind::(&mv) + .bind::(id as i32) + .execute(conn)?; + Ok(()) + }, + ) + .await?; + report_similarity("summaries", sims); + } + + if tables.contains(&"calendar") { + let mut rows: Vec = sql_query( + "SELECT id, summary, description, location, embedding + FROM calendar_events WHERE embedding IS NOT NULL ORDER BY id", + ) + .load(&mut conn) + .context("loading calendar events")?; + if let Some(limit) = args.limit { + rows.truncate(limit); + } + let items = rows + .into_iter() + .map(|r| WorkItem { + id: r.id as i64, + // Same text construction as import_calendar. + text: format!( + "{} {} {}", + r.summary, + r.description.as_deref().unwrap_or(""), + r.location.as_deref().unwrap_or("") + ), + old_embedding: r.embedding, + }) + .collect(); + let sims = reembed_table( + &mut conn, + &llm, + "calendar", + items, + args.dry_run, + |conn, id, emb| { + sql_query("UPDATE calendar_events SET embedding = ?1 WHERE id = ?2") + .bind::(emb) + .bind::(id as i32) + .execute(conn)?; + Ok(()) + }, + ) + .await?; + report_similarity("calendar", sims); + } + + if tables.contains(&"search") { + let mut rows: Vec = sql_query( + "SELECT rowid AS id, query, embedding + FROM search_history ORDER BY rowid", + ) + .load(&mut conn) + .context("loading search history")?; + if let Some(limit) = args.limit { + rows.truncate(limit); + } + let items = rows + .into_iter() + .map(|r| WorkItem { + id: r.id, + text: r.query, + old_embedding: r.embedding, + }) + .collect(); + let sims = reembed_table( + &mut conn, + &llm, + "search", + items, + args.dry_run, + |conn, id, emb| { + sql_query("UPDATE search_history SET embedding = ?1 WHERE rowid = ?2") + .bind::(emb) + .bind::(id) + .execute(conn)?; + Ok(()) + }, + ) + .await?; + report_similarity("search", sims); + } + + if tables.contains(&"entities") { + let mut rows: Vec = sql_query( + "SELECT id, name, description, embedding + FROM entities WHERE embedding IS NOT NULL ORDER BY id", + ) + .load(&mut conn) + .context("loading knowledge entities")?; + if let Some(limit) = args.limit { + rows.truncate(limit); + } + let items = rows + .into_iter() + .map(|r| WorkItem { + id: r.id as i64, + // Same text construction as tool_store_entity. + text: format!("{} {}", r.name, r.description), + old_embedding: r.embedding, + }) + .collect(); + let sims = reembed_table( + &mut conn, + &llm, + "entities", + items, + args.dry_run, + |conn, id, emb| { + sql_query("UPDATE entities SET embedding = ?1 WHERE id = ?2") + .bind::(emb) + .bind::(id as i32) + .execute(conn)?; + Ok(()) + }, + ) + .await?; + report_similarity("entities", sims); + } + + println!( + "\n{}", + if args.dry_run { + "Dry run complete" + } else { + "Done" + } + ); + Ok(()) +} diff --git a/src/state.rs b/src/state.rs index ef071a8..e678ad1 100644 --- a/src/state.rs +++ b/src/state.rs @@ -186,21 +186,7 @@ impl AppState { impl Default for AppState { fn default() -> Self { // Initialize AI clients - let ollama_primary_url = env::var("OLLAMA_PRIMARY_URL").unwrap_or_else(|_| { - env::var("OLLAMA_URL").unwrap_or_else(|_| "http://localhost:11434".to_string()) - }); - let ollama_fallback_url = env::var("OLLAMA_FALLBACK_URL").ok(); - let ollama_primary_model = env::var("OLLAMA_PRIMARY_MODEL") - .or_else(|_| env::var("OLLAMA_MODEL")) - .unwrap_or_else(|_| "nemotron-3-nano:30b".to_string()); - let ollama_fallback_model = env::var("OLLAMA_FALLBACK_MODEL").ok(); - - let ollama = OllamaClient::new( - ollama_primary_url, - ollama_fallback_url, - ollama_primary_model, - ollama_fallback_model, - ); + let ollama = build_ollama_from_env(); let openrouter = build_openrouter_from_env(); let openrouter_allowed_models = parse_openrouter_allowed_models(); @@ -375,13 +361,29 @@ fn parse_openrouter_allowed_models() -> Vec { .collect() } +/// Build the `OllamaClient` from environment variables — the canonical +/// `OLLAMA_*` wiring shared by the server (`AppState::default`) and the +/// standalone binaries (which predate this helper and used to copy it). +pub fn build_ollama_from_env() -> OllamaClient { + let primary_url = env::var("OLLAMA_PRIMARY_URL").unwrap_or_else(|_| { + env::var("OLLAMA_URL").unwrap_or_else(|_| "http://localhost:11434".to_string()) + }); + let fallback_url = env::var("OLLAMA_FALLBACK_URL").ok(); + let primary_model = env::var("OLLAMA_PRIMARY_MODEL") + .or_else(|_| env::var("OLLAMA_MODEL")) + .unwrap_or_else(|_| "nemotron-3-nano:30b".to_string()); + let fallback_model = env::var("OLLAMA_FALLBACK_MODEL").ok(); + + OllamaClient::new(primary_url, fallback_url, primary_model, fallback_model) +} + /// Build a `LlamaCppClient` from environment variables. Returns `None` when /// `LLAMA_SWAP_URL` is unset. The client is constructed unconditionally /// when the URL is set (so it's available even under `LLM_BACKEND=ollama` /// for ad-hoc tooling), but the agentic / chat paths only route through it /// when `LLM_BACKEND=llamacpp`. Slot ids default to the names the bundled /// `llama-swap/config.yaml` uses — `chat` / `vision` / `embed`. -fn build_llamacpp_from_env() -> Option> { +pub fn build_llamacpp_from_env() -> Option> { let base_url = env::var("LLAMA_SWAP_URL").ok()?; let primary_model = env::var("LLAMA_SWAP_PRIMARY_MODEL").ok(); let mut client = LlamaCppClient::new(Some(base_url), primary_model);