Fix RAG vector-space mismatch and search_rag retrieval quality
Queries embedded via llama-swap were searching corpora embedded via
Ollama (measured: spaces diverged). Introduce LocalLlm — the local
Ollama + llama-swap pair with LLM_BACKEND dispatch baked in — and route
all embedding writers through it; anything embedding via a concrete
client reintroduces the bug.
- search_rag: embed the model's query verbatim (no metadata boilerplate),
make date optional — no time-decay when omitted, so "when did X
happen?" queries rank purely by similarity across all time
- reembed_embeddings bin: re-embed summaries / calendar / search /
knowledge entities via the active backend, with old-new cosine report
per table and truncate-and-retry for inputs over the embed server's
physical batch size
- import_calendar, import_search_history: embed through LocalLlm
- search_messages / get_sms_messages: render sender → recipient so sent
messages are attributable to a conversation
- insight job failures: store the one-line anyhow context chain ({:#})
instead of the Debug dump the client was shown verbatim
- serialize env_dispatch tests behind a lock (parallel-runner flake)
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
+6
-2
@@ -510,7 +510,9 @@ pub async fn generate_insight_handler(
|
|||||||
}
|
}
|
||||||
Ok(Ok(Err(e))) => {
|
Ok(Ok(Err(e))) => {
|
||||||
log::error!("Insight generation failed for {}: {:?}", path, e);
|
log::error!("Insight generation failed for {}: {:?}", path, e);
|
||||||
if let Err(err) = dao.fail_job(&ctx, job_id, &format!("{:?}", e)) {
|
// `{:#}` = one-line context chain; the job's error_message is
|
||||||
|
// returned to the client verbatim, so no Debug/backtrace here.
|
||||||
|
if let Err(err) = dao.fail_job(&ctx, job_id, &format!("{:#}", e)) {
|
||||||
log::error!("Failed to mark job {} as failed: {:?}", job_id, err);
|
log::error!("Failed to mark job {} as failed: {:?}", job_id, err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -884,7 +886,9 @@ pub async fn generate_agentic_insight_handler(
|
|||||||
}
|
}
|
||||||
Ok(Ok(Err(e))) => {
|
Ok(Ok(Err(e))) => {
|
||||||
log::error!("Agentic insight generation failed for {}: {:?}", path, e);
|
log::error!("Agentic insight generation failed for {}: {:?}", path, e);
|
||||||
if let Err(err) = dao.fail_job(&ctx, job_id, &format!("{:?}", e)) {
|
// `{:#}` = one-line context chain; the job's error_message is
|
||||||
|
// returned to the client verbatim, so no Debug/backtrace here.
|
||||||
|
if let Err(err) = dao.fail_job(&ctx, job_id, &format!("{:#}", e)) {
|
||||||
log::error!("Failed to mark job {} as failed: {:?}", job_id, err);
|
log::error!("Failed to mark job {} as failed: {:?}", job_id, err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+121
-35
@@ -575,6 +575,67 @@ impl InsightGenerator {
|
|||||||
Ok(formatted)
|
Ok(formatted)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Semantic search over daily summaries for the agentic `search_rag`
|
||||||
|
/// tool. Embeds the caller's query as-is (no metadata boilerplate) and
|
||||||
|
/// only applies time weighting when an anchor date is provided —
|
||||||
|
/// without one, results rank purely by similarity across all time.
|
||||||
|
async fn search_summaries_semantic(
|
||||||
|
&self,
|
||||||
|
query: &str,
|
||||||
|
date: Option<chrono::NaiveDate>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<Vec<String>> {
|
||||||
|
let tracer = global_tracer();
|
||||||
|
let current_cx = opentelemetry::Context::current();
|
||||||
|
let mut span = tracer.start_with_context("ai.rag.search_daily_summaries", ¤t_cx);
|
||||||
|
span.set_attribute(KeyValue::new("query", query.to_string()));
|
||||||
|
span.set_attribute(KeyValue::new("limit", limit as i64));
|
||||||
|
span.set_attribute(KeyValue::new("time_weighted", date.is_some()));
|
||||||
|
if let Some(d) = date {
|
||||||
|
span.set_attribute(KeyValue::new("date", d.to_string()));
|
||||||
|
}
|
||||||
|
let search_cx = current_cx.with_span(span);
|
||||||
|
|
||||||
|
log::info!("RAG QUERY: {} (anchor date: {:?})", query, date);
|
||||||
|
|
||||||
|
// Must use the same backend that populated the daily-summary
|
||||||
|
// embeddings or similarity search is garbage (see embed_one docs).
|
||||||
|
let query_embedding =
|
||||||
|
crate::ai::embed_one(&self.ollama, self.llamacpp.as_deref(), query).await?;
|
||||||
|
|
||||||
|
let mut summary_dao = self
|
||||||
|
.daily_summary_dao
|
||||||
|
.lock()
|
||||||
|
.expect("Unable to lock DailySummaryDao");
|
||||||
|
|
||||||
|
let similar_summaries = match date {
|
||||||
|
Some(d) => summary_dao.find_similar_summaries_with_time_weight(
|
||||||
|
&search_cx,
|
||||||
|
&query_embedding,
|
||||||
|
&d.format("%Y-%m-%d").to_string(),
|
||||||
|
limit,
|
||||||
|
),
|
||||||
|
None => summary_dao.find_similar_summaries(&search_cx, &query_embedding, limit),
|
||||||
|
}
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to find similar summaries: {:?}", e))?;
|
||||||
|
|
||||||
|
search_cx.span().set_attribute(KeyValue::new(
|
||||||
|
"results_count",
|
||||||
|
similar_summaries.len() as i64,
|
||||||
|
));
|
||||||
|
search_cx.span().set_status(Status::Ok);
|
||||||
|
|
||||||
|
Ok(similar_summaries
|
||||||
|
.into_iter()
|
||||||
|
.map(|s| {
|
||||||
|
format!(
|
||||||
|
"[{}] {} ({} messages):\n{}",
|
||||||
|
s.date, s.contact, s.message_count, s.summary
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
/// Build a metadata-based query (fallback when no topics available)
|
/// Build a metadata-based query (fallback when no topics available)
|
||||||
fn build_metadata_query(
|
fn build_metadata_query(
|
||||||
date: chrono::NaiveDate,
|
date: chrono::NaiveDate,
|
||||||
@@ -1737,13 +1798,12 @@ Return ONLY the summary, nothing else."#,
|
|||||||
Some(q) => q.to_string(),
|
Some(q) => q.to_string(),
|
||||||
None => return "Error: missing required parameter 'query'".to_string(),
|
None => return "Error: missing required parameter 'query'".to_string(),
|
||||||
};
|
};
|
||||||
let date_str = match args.get("date").and_then(|v| v.as_str()) {
|
let date = match args.get("date").and_then(|v| v.as_str()) {
|
||||||
Some(d) => d,
|
Some(d) => match NaiveDate::parse_from_str(d, "%Y-%m-%d") {
|
||||||
None => return "Error: missing required parameter 'date'".to_string(),
|
Ok(d) => Some(d),
|
||||||
};
|
Err(e) => return format!("Error: failed to parse date '{}': {}", d, e),
|
||||||
let date = match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
|
},
|
||||||
Ok(d) => d,
|
None => None,
|
||||||
Err(e) => return format!("Error: failed to parse date '{}': {}", date_str, e),
|
|
||||||
};
|
};
|
||||||
let contact = args
|
let contact = args
|
||||||
.get("contact")
|
.get("contact")
|
||||||
@@ -1756,7 +1816,7 @@ Return ONLY the summary, nothing else."#,
|
|||||||
.clamp(1, 25) as usize;
|
.clamp(1, 25) as usize;
|
||||||
|
|
||||||
log::info!(
|
log::info!(
|
||||||
"tool_search_rag: query='{}', date={}, contact={:?}, limit={}",
|
"tool_search_rag: query='{}', date={:?}, contact={:?}, limit={}",
|
||||||
query,
|
query,
|
||||||
date,
|
date,
|
||||||
contact,
|
contact,
|
||||||
@@ -1777,15 +1837,17 @@ Return ONLY the summary, nothing else."#,
|
|||||||
limit
|
limit
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Embed the model's query verbatim — a soft contact bias is the
|
||||||
|
// only decoration. The metadata boilerplate ("On <date>, it was a
|
||||||
|
// <weekday>") that find_relevant_messages_rag prepends drowns the
|
||||||
|
// semantic signal, so the tool path deliberately bypasses it.
|
||||||
|
let search_query = match contact.as_deref() {
|
||||||
|
Some(c) => format!("{} (conversation with {})", query, c),
|
||||||
|
None => query.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
let results = match self
|
let results = match self
|
||||||
.find_relevant_messages_rag(
|
.search_summaries_semantic(&search_query, date, candidate_limit)
|
||||||
date,
|
|
||||||
None,
|
|
||||||
contact.as_deref(),
|
|
||||||
None,
|
|
||||||
candidate_limit,
|
|
||||||
Some(&query),
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(results) if !results.is_empty() => results,
|
Ok(results) if !results.is_empty() => results,
|
||||||
@@ -2062,12 +2124,15 @@ Return ONLY the summary, nothing else."#,
|
|||||||
/// Render a list of [`SmsSearchHit`] for the LLM. Prefers the SMS-API
|
/// Render a list of [`SmsSearchHit`] for the LLM. Prefers the SMS-API
|
||||||
/// snippet (which already excerpts the matched span and is the only
|
/// snippet (which already excerpts the matched span and is the only
|
||||||
/// preview MMS-attachment-only matches have) over the full body, and
|
/// preview MMS-attachment-only matches have) over the full body, and
|
||||||
/// strips the `<mark>` tags the snippet ships with.
|
/// strips the `<mark>` tags the snippet ships with. Each line names
|
||||||
|
/// both parties (`sender → recipient`) — results can span multiple
|
||||||
|
/// conversations, and a sender-only label leaves sent messages
|
||||||
|
/// unattributable to a thread.
|
||||||
fn format_search_hits(hits: &[SmsSearchHit], mode: &str, date_filtered: bool) -> String {
|
fn format_search_hits(hits: &[SmsSearchHit], mode: &str, date_filtered: bool) -> String {
|
||||||
let user_name = user_display_name();
|
let user_name = user_display_name();
|
||||||
let mut out = String::new();
|
let mut out = String::new();
|
||||||
out.push_str(&format!(
|
out.push_str(&format!(
|
||||||
"Found {} messages (mode: {}{}):\n\n",
|
"Found {} messages (mode: {}{}, sender → recipient):\n\n",
|
||||||
hits.len(),
|
hits.len(),
|
||||||
mode,
|
mode,
|
||||||
if date_filtered { ", date-filtered" } else { "" }
|
if date_filtered { ", date-filtered" } else { "" }
|
||||||
@@ -2076,10 +2141,10 @@ Return ONLY the summary, nothing else."#,
|
|||||||
let date = chrono::DateTime::from_timestamp(h.date, 0)
|
let date = chrono::DateTime::from_timestamp(h.date, 0)
|
||||||
.map(|dt| dt.format("%Y-%m-%d").to_string())
|
.map(|dt| dt.format("%Y-%m-%d").to_string())
|
||||||
.unwrap_or_else(|| h.date.to_string());
|
.unwrap_or_else(|| h.date.to_string());
|
||||||
let direction: &str = if h.type_ == 2 {
|
let direction = if h.type_ == 2 {
|
||||||
&user_name
|
format!("{} → {}", user_name, h.contact_name)
|
||||||
} else {
|
} else {
|
||||||
&h.contact_name
|
format!("{} → {}", h.contact_name, user_name)
|
||||||
};
|
};
|
||||||
let score = h
|
let score = h
|
||||||
.similarity_score
|
.similarity_score
|
||||||
@@ -2150,11 +2215,18 @@ Return ONLY the summary, nothing else."#,
|
|||||||
{
|
{
|
||||||
Ok(messages) if !messages.is_empty() => {
|
Ok(messages) if !messages.is_empty() => {
|
||||||
let user_name = user_display_name();
|
let user_name = user_display_name();
|
||||||
|
// Name both parties — without a contact filter the window
|
||||||
|
// spans every conversation, and a sender-only label leaves
|
||||||
|
// sent messages unattributable to a thread.
|
||||||
let formatted: Vec<String> = messages
|
let formatted: Vec<String> = messages
|
||||||
.iter()
|
.iter()
|
||||||
.take(limit)
|
.take(limit)
|
||||||
.map(|m| {
|
.map(|m| {
|
||||||
let sender: &str = if m.is_sent { &user_name } else { &m.contact };
|
let direction = if m.is_sent {
|
||||||
|
format!("{} → {}", user_name, m.contact)
|
||||||
|
} else {
|
||||||
|
format!("{} → {}", m.contact, user_name)
|
||||||
|
};
|
||||||
let ts = DateTime::from_timestamp(m.timestamp, 0)
|
let ts = DateTime::from_timestamp(m.timestamp, 0)
|
||||||
.map(|dt| {
|
.map(|dt| {
|
||||||
dt.with_timezone(&Local)
|
dt.with_timezone(&Local)
|
||||||
@@ -2162,7 +2234,7 @@ Return ONLY the summary, nothing else."#,
|
|||||||
.to_string()
|
.to_string()
|
||||||
})
|
})
|
||||||
.unwrap_or_else(|| "unknown".to_string());
|
.unwrap_or_else(|| "unknown".to_string());
|
||||||
format!("[{}] {}: {}", ts, sender, m.body)
|
format!("[{}] {}: {}", ts, direction, m.body)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
format!(
|
format!(
|
||||||
@@ -3206,21 +3278,25 @@ Return ONLY the summary, nothing else."#,
|
|||||||
if opts.daily_summaries_present {
|
if opts.daily_summaries_present {
|
||||||
tools.push(Tool::function(
|
tools.push(Tool::function(
|
||||||
"search_rag",
|
"search_rag",
|
||||||
"Date-anchored semantic search over the user's daily-summary corpus. \
|
"Semantic search over the user's daily-summary corpus. Returns up to \
|
||||||
Returns up to `limit` summaries most semantically similar to `query`, \
|
`limit` summaries most semantically similar to `query`. Pass `date` \
|
||||||
weighted toward summaries near `date`. For raw message text across all \
|
to anchor in time: summaries near that date rank higher and matches \
|
||||||
time, prefer `search_messages`. \
|
months away decay sharply. Omit `date` to rank purely by semantic \
|
||||||
Examples: `{query: \"family dinner\", date: \"2018-12-24\"}` — what \
|
similarity across all time — do this for \"when did X happen?\" \
|
||||||
|
questions where the date is unknown. For raw message text, prefer \
|
||||||
|
`search_messages`. \
|
||||||
|
Examples: `{query: \"family dinner\"}` — best matches across all \
|
||||||
|
time. `{query: \"family dinner\", date: \"2018-12-24\"}` — what \
|
||||||
daily summaries near Christmas Eve mention family / dinner / gathering. \
|
daily summaries near Christmas Eve mention family / dinner / gathering. \
|
||||||
`{query: \"work travel\", date: \"2019-06-15\", contact: \"Alice\"}` — \
|
`{query: \"work travel\", date: \"2019-06-15\", contact: \"Alice\"}` — \
|
||||||
narrowed to summaries that involve Alice.",
|
biased toward summaries that involve Alice.",
|
||||||
serde_json::json!({
|
serde_json::json!({
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"required": ["query", "date"],
|
"required": ["query"],
|
||||||
"properties": {
|
"properties": {
|
||||||
"query": { "type": "string", "description": "Free-text query, semantically matched." },
|
"query": { "type": "string", "description": "Free-text query, semantically matched." },
|
||||||
"date": { "type": "string", "description": "Anchor date, YYYY-MM-DD. Summaries near this date rank higher." },
|
"date": { "type": "string", "description": "Optional anchor date, YYYY-MM-DD. When set, summaries near this date rank higher; omit to search all time evenly." },
|
||||||
"contact": { "type": "string", "description": "Optional contact name to bias toward conversations with that person." },
|
"contact": { "type": "string", "description": "Optional contact name to bias toward conversations with that person (soft semantic bias, not a hard filter)." },
|
||||||
"limit": { "type": "integer", "description": "Max summaries to return (default 10, max 25)." }
|
"limit": { "type": "integer", "description": "Max summaries to return (default 10, max 25)." }
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
@@ -4763,12 +4839,22 @@ mod tests {
|
|||||||
let hit = make_search_hit(1, "Sarah", "see you at the lake tomorrow", None, 1);
|
let hit = make_search_hit(1, "Sarah", "see you at the lake tomorrow", None, 1);
|
||||||
let out = InsightGenerator::format_search_hits(&[hit], "fts5", false);
|
let out = InsightGenerator::format_search_hits(&[hit], "fts5", false);
|
||||||
|
|
||||||
assert!(out.starts_with("Found 1 messages (mode: fts5):"));
|
assert!(out.starts_with("Found 1 messages (mode: fts5"));
|
||||||
assert!(out.contains("see you at the lake tomorrow"));
|
assert!(out.contains("see you at the lake tomorrow"));
|
||||||
assert!(out.contains("Sarah —"));
|
// Received message: contact is the sender.
|
||||||
|
assert!(out.contains("Sarah →"));
|
||||||
assert!(!out.contains("date-filtered"));
|
assert!(!out.contains("date-filtered"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn format_search_hits_labels_sent_direction() {
|
||||||
|
// Sent messages must name the recipient — results can span multiple
|
||||||
|
// conversations, and a sender-only label left them unattributable.
|
||||||
|
let hit = make_search_hit(5, "Sarah", "on my way", None, 2);
|
||||||
|
let out = InsightGenerator::format_search_hits(&[hit], "fts5", false);
|
||||||
|
assert!(out.contains("→ Sarah —"));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn format_search_hits_prefers_snippet_over_body_and_strips_marks() {
|
fn format_search_hits_prefers_snippet_over_body_and_strips_marks() {
|
||||||
let hit = make_search_hit(
|
let hit = make_search_hit(
|
||||||
@@ -4799,7 +4885,7 @@ mod tests {
|
|||||||
|
|
||||||
assert!(out.contains("birthday_cake.jpg"));
|
assert!(out.contains("birthday_cake.jpg"));
|
||||||
assert!(!out.contains("<mark>"));
|
assert!(!out.contains("<mark>"));
|
||||||
assert!(out.contains("Mom —"));
|
assert!(out.contains("Mom →"));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -0,0 +1,86 @@
|
|||||||
|
//! Bundle of the local LLM pair (Ollama + optional llama-swap) with the
|
||||||
|
//! `LLM_BACKEND` dispatch baked in.
|
||||||
|
//!
|
||||||
|
//! Exists because passing the pair around as loose values invited the same
|
||||||
|
//! bug three times: import/backfill tooling embedded corpora via
|
||||||
|
//! `OllamaClient` directly while the query side dispatched through
|
||||||
|
//! `embed_one`, so flipping `LLM_BACKEND=llamacpp` silently split queries
|
||||||
|
//! and corpus into different vector spaces. Anything that writes or reads
|
||||||
|
//! embeddings should go through this type (or `embed_one`/`embed_many`),
|
||||||
|
//! never a concrete client.
|
||||||
|
//!
|
||||||
|
//! Deliberately knows nothing about chat policy — hybrid/OpenRouter routing
|
||||||
|
//! is request-scoped and stays in `ResolvedBackend`. This is only the
|
||||||
|
//! local stack: embeddings and offline single-shot generation.
|
||||||
|
|
||||||
|
// Constructed by binaries, not the server — dead code from main.rs's view.
|
||||||
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
|
||||||
|
use super::llamacpp::LlamaCppClient;
|
||||||
|
use super::llm_client::LlmClient;
|
||||||
|
use super::ollama::{EMBEDDING_MODEL, OllamaClient};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct LocalLlm {
|
||||||
|
ollama: OllamaClient,
|
||||||
|
llamacpp: Option<Arc<LlamaCppClient>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LocalLlm {
|
||||||
|
pub fn new(ollama: OllamaClient, llamacpp: Option<Arc<LlamaCppClient>>) -> Self {
|
||||||
|
Self { ollama, llamacpp }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Construct from the canonical env wiring shared with `AppState`.
|
||||||
|
pub fn from_env() -> Self {
|
||||||
|
Self::new(
|
||||||
|
crate::state::build_ollama_from_env(),
|
||||||
|
crate::state::build_llamacpp_from_env(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Embed one string via the `LLM_BACKEND`-selected client.
|
||||||
|
pub async fn embed(&self, text: &str) -> Result<Vec<f32>> {
|
||||||
|
super::embed_one(&self.ollama, self.llamacpp.as_deref(), text).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Embed a batch via the `LLM_BACKEND`-selected client.
|
||||||
|
pub async fn embed_batch(&self, texts: &[&str]) -> Result<Vec<Vec<f32>>> {
|
||||||
|
super::embed_many(&self.ollama, self.llamacpp.as_deref(), texts).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Single-shot local text generation via the `LLM_BACKEND`-selected
|
||||||
|
/// client (offline tooling; chat turns belong to `ResolvedBackend`).
|
||||||
|
pub async fn generate(&self, prompt: &str, system: Option<&str>) -> Result<String> {
|
||||||
|
if super::local_backend_is_llamacpp() {
|
||||||
|
if let Some(lc) = self.llamacpp.as_deref() {
|
||||||
|
return <LlamaCppClient as LlmClient>::generate(lc, prompt, system, None).await;
|
||||||
|
}
|
||||||
|
anyhow::bail!(
|
||||||
|
"LLM_BACKEND=llamacpp but LlamaCppClient is unconfigured — \
|
||||||
|
set LLAMA_SWAP_URL or switch to LLM_BACKEND=ollama"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
self.ollama.generate(prompt, system).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Label identifying which backend + model produces embeddings right
|
||||||
|
/// now. Store it alongside vectors (`model_version` columns) so a
|
||||||
|
/// backend flip is detectable in the data, not just in env history.
|
||||||
|
pub fn embedding_model_version(&self) -> String {
|
||||||
|
if super::local_backend_is_llamacpp() {
|
||||||
|
let slot = self
|
||||||
|
.llamacpp
|
||||||
|
.as_deref()
|
||||||
|
.map(|c| c.embedding_model.as_str())
|
||||||
|
.unwrap_or("embed");
|
||||||
|
format!("llama-swap:{}", slot)
|
||||||
|
} else {
|
||||||
|
EMBEDDING_MODEL.to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
+30
-12
@@ -9,6 +9,7 @@ pub mod insight_chat;
|
|||||||
pub mod insight_generator;
|
pub mod insight_generator;
|
||||||
pub mod llamacpp;
|
pub mod llamacpp;
|
||||||
pub mod llm_client;
|
pub mod llm_client;
|
||||||
|
pub mod local_llm;
|
||||||
pub mod ollama;
|
pub mod ollama;
|
||||||
pub mod openrouter;
|
pub mod openrouter;
|
||||||
pub mod sms_client;
|
pub mod sms_client;
|
||||||
@@ -35,6 +36,9 @@ pub use llamacpp::LlamaCppClient;
|
|||||||
pub use llm_client::{
|
pub use llm_client::{
|
||||||
ChatMessage, LlmClient, ModelCapabilities, Tool, ToolCall, ToolCallFunction, ToolFunction,
|
ChatMessage, LlmClient, ModelCapabilities, Tool, ToolCall, ToolCallFunction, ToolFunction,
|
||||||
};
|
};
|
||||||
|
// LocalLlm is constructed by binaries (reembed_embeddings, importers), not the server
|
||||||
|
#[allow(unused_imports)]
|
||||||
|
pub use local_llm::LocalLlm;
|
||||||
pub use ollama::{EMBEDDING_MODEL, OllamaClient};
|
pub use ollama::{EMBEDDING_MODEL, OllamaClient};
|
||||||
pub use sms_client::{SmsApiClient, SmsMessage};
|
pub use sms_client::{SmsApiClient, SmsMessage};
|
||||||
pub use tts::{
|
pub use tts::{
|
||||||
@@ -71,35 +75,49 @@ pub fn local_backend_is_llamacpp() -> bool {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Embed one string via the configured local backend. Routes through
|
/// Embed a batch of strings via the configured local backend. Routes
|
||||||
/// llama-swap when `LLM_BACKEND=llamacpp` (and a client is configured),
|
/// through llama-swap when `LLM_BACKEND=llamacpp` (and a client is
|
||||||
/// else Ollama. Returns the single embedding vector. See
|
/// configured), else Ollama. See [`local_backend_is_llamacpp`] for the
|
||||||
/// [`local_backend_is_llamacpp`] for the rationale on consistency.
|
/// rationale on consistency.
|
||||||
pub async fn embed_one(
|
pub async fn embed_many(
|
||||||
ollama: &OllamaClient,
|
ollama: &OllamaClient,
|
||||||
llamacpp: Option<&LlamaCppClient>,
|
llamacpp: Option<&LlamaCppClient>,
|
||||||
text: &str,
|
texts: &[&str],
|
||||||
) -> anyhow::Result<Vec<f32>> {
|
) -> anyhow::Result<Vec<Vec<f32>>> {
|
||||||
if local_backend_is_llamacpp() {
|
if local_backend_is_llamacpp() {
|
||||||
if let Some(lc) = llamacpp {
|
if let Some(lc) = llamacpp {
|
||||||
let mut vecs = <LlamaCppClient as LlmClient>::generate_embeddings(lc, &[text]).await?;
|
return <LlamaCppClient as LlmClient>::generate_embeddings(lc, texts).await;
|
||||||
return vecs
|
|
||||||
.pop()
|
|
||||||
.ok_or_else(|| anyhow::anyhow!("llama-swap returned no embeddings"));
|
|
||||||
}
|
}
|
||||||
anyhow::bail!(
|
anyhow::bail!(
|
||||||
"LLM_BACKEND=llamacpp but LlamaCppClient is unconfigured — \
|
"LLM_BACKEND=llamacpp but LlamaCppClient is unconfigured — \
|
||||||
set LLAMA_SWAP_URL or switch to LLM_BACKEND=ollama"
|
set LLAMA_SWAP_URL or switch to LLM_BACKEND=ollama"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
ollama.generate_embedding(text).await
|
ollama.generate_embeddings(texts).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Embed one string via the configured local backend. Single-text
|
||||||
|
/// convenience over [`embed_many`].
|
||||||
|
pub async fn embed_one(
|
||||||
|
ollama: &OllamaClient,
|
||||||
|
llamacpp: Option<&LlamaCppClient>,
|
||||||
|
text: &str,
|
||||||
|
) -> anyhow::Result<Vec<f32>> {
|
||||||
|
let mut vecs = embed_many(ollama, llamacpp, &[text]).await?;
|
||||||
|
vecs.pop()
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("embedding backend returned no embeddings"))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod env_dispatch_tests {
|
mod env_dispatch_tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
/// Env vars are process-global, and the test harness runs in parallel —
|
||||||
|
/// without this lock the `LLM_BACKEND` tests race each other and flake.
|
||||||
|
static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
|
||||||
|
|
||||||
fn with_env<F: FnOnce()>(key: &str, val: Option<&str>, f: F) {
|
fn with_env<F: FnOnce()>(key: &str, val: Option<&str>, f: F) {
|
||||||
|
let _guard = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
|
||||||
let prev = std::env::var(key).ok();
|
let prev = std::env::var(key).ok();
|
||||||
match val {
|
match val {
|
||||||
Some(v) => unsafe { std::env::set_var(key, v) },
|
Some(v) => unsafe { std::env::set_var(key, v) },
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use image_api::ai::ollama::OllamaClient;
|
use image_api::ai::LocalLlm;
|
||||||
use image_api::bin_progress;
|
use image_api::bin_progress;
|
||||||
use image_api::database::calendar_dao::{InsertCalendarEvent, SqliteCalendarEventDao};
|
use image_api::database::calendar_dao::{InsertCalendarEvent, SqliteCalendarEventDao};
|
||||||
use image_api::parsers::ical_parser::parse_ics_file;
|
use image_api::parsers::ical_parser::parse_ics_file;
|
||||||
@@ -44,22 +44,10 @@ async fn main() -> Result<()> {
|
|||||||
|
|
||||||
let context = opentelemetry::Context::current();
|
let context = opentelemetry::Context::current();
|
||||||
|
|
||||||
let ollama = if args.generate_embeddings {
|
// LocalLlm dispatches per LLM_BACKEND, so embeddings written here land
|
||||||
let primary_url = dotenv::var("OLLAMA_PRIMARY_URL")
|
// in the same vector space the query side searches.
|
||||||
.or_else(|_| dotenv::var("OLLAMA_URL"))
|
let llm = if args.generate_embeddings {
|
||||||
.unwrap_or_else(|_| "http://localhost:11434".to_string());
|
Some(LocalLlm::from_env())
|
||||||
let fallback_url = dotenv::var("OLLAMA_FALLBACK_URL").ok();
|
|
||||||
let primary_model = dotenv::var("OLLAMA_PRIMARY_MODEL")
|
|
||||||
.or_else(|_| dotenv::var("OLLAMA_MODEL"))
|
|
||||||
.unwrap_or_else(|_| "nomic-embed-text:v1.5".to_string());
|
|
||||||
let fallback_model = dotenv::var("OLLAMA_FALLBACK_MODEL").ok();
|
|
||||||
|
|
||||||
Some(OllamaClient::new(
|
|
||||||
primary_url,
|
|
||||||
fallback_url,
|
|
||||||
primary_model,
|
|
||||||
fallback_model,
|
|
||||||
))
|
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
@@ -90,7 +78,7 @@ async fn main() -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Generate embedding if requested (blocking call)
|
// Generate embedding if requested (blocking call)
|
||||||
let embedding = if let Some(ref ollama_client) = ollama {
|
let embedding = if let Some(ref llm) = llm {
|
||||||
let text = format!(
|
let text = format!(
|
||||||
"{} {} {}",
|
"{} {} {}",
|
||||||
event.summary,
|
event.summary,
|
||||||
@@ -99,8 +87,7 @@ async fn main() -> Result<()> {
|
|||||||
);
|
);
|
||||||
|
|
||||||
match tokio::task::block_in_place(|| {
|
match tokio::task::block_in_place(|| {
|
||||||
tokio::runtime::Handle::current()
|
tokio::runtime::Handle::current().block_on(async { llm.embed(&text).await })
|
||||||
.block_on(async { ollama_client.generate_embedding(&text).await })
|
|
||||||
}) {
|
}) {
|
||||||
Ok(emb) => Some(emb),
|
Ok(emb) => Some(emb),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use image_api::ai::ollama::OllamaClient;
|
use image_api::ai::LocalLlm;
|
||||||
use image_api::bin_progress;
|
use image_api::bin_progress;
|
||||||
use image_api::database::search_dao::{InsertSearchRecord, SqliteSearchHistoryDao};
|
use image_api::database::search_dao::{InsertSearchRecord, SqliteSearchHistoryDao};
|
||||||
use image_api::parsers::search_html_parser::parse_search_html;
|
use image_api::parsers::search_html_parser::parse_search_html;
|
||||||
@@ -38,16 +38,9 @@ async fn main() -> Result<()> {
|
|||||||
|
|
||||||
info!("Found {} search records", searches.len());
|
info!("Found {} search records", searches.len());
|
||||||
|
|
||||||
let primary_url = dotenv::var("OLLAMA_PRIMARY_URL")
|
// LocalLlm dispatches per LLM_BACKEND, so embeddings written here land
|
||||||
.or_else(|_| dotenv::var("OLLAMA_URL"))
|
// in the same vector space the query side searches.
|
||||||
.unwrap_or_else(|_| "http://localhost:11434".to_string());
|
let llm = LocalLlm::from_env();
|
||||||
let fallback_url = dotenv::var("OLLAMA_FALLBACK_URL").ok();
|
|
||||||
let primary_model = dotenv::var("OLLAMA_PRIMARY_MODEL")
|
|
||||||
.or_else(|_| dotenv::var("OLLAMA_MODEL"))
|
|
||||||
.unwrap_or_else(|_| "nomic-embed-text:v1.5".to_string());
|
|
||||||
let fallback_model = dotenv::var("OLLAMA_FALLBACK_MODEL").ok();
|
|
||||||
|
|
||||||
let ollama = OllamaClient::new(primary_url, fallback_url, primary_model, fallback_model);
|
|
||||||
let context = opentelemetry::Context::current();
|
let context = opentelemetry::Context::current();
|
||||||
|
|
||||||
let mut inserted_count = 0usize;
|
let mut inserted_count = 0usize;
|
||||||
@@ -67,12 +60,11 @@ async fn main() -> Result<()> {
|
|||||||
|
|
||||||
let pb_for_warn = pb.clone();
|
let pb_for_warn = pb.clone();
|
||||||
let embeddings_result = tokio::task::spawn({
|
let embeddings_result = tokio::task::spawn({
|
||||||
let ollama_client = ollama.clone();
|
let llm = llm.clone();
|
||||||
async move {
|
async move {
|
||||||
// Generate embeddings in parallel for the batch
|
|
||||||
let mut embeddings = Vec::new();
|
let mut embeddings = Vec::new();
|
||||||
for query in &queries {
|
for query in &queries {
|
||||||
match ollama_client.generate_embedding(query).await {
|
match llm.embed(query).await {
|
||||||
Ok(emb) => embeddings.push(Some(emb)),
|
Ok(emb) => embeddings.push(Some(emb)),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
pb_for_warn.println(format!("embedding failed for '{}': {}", query, e));
|
pb_for_warn.println(format!("embedding failed for '{}': {}", query, e));
|
||||||
|
|||||||
@@ -0,0 +1,464 @@
|
|||||||
|
//! Re-embed stored corpora through `LocalLlm`, i.e. the same
|
||||||
|
//! `LLM_BACKEND` dispatch the query side uses. The original import /
|
||||||
|
//! backfill tools always embedded via Ollama, so a deploy running
|
||||||
|
//! `LLM_BACKEND=llamacpp` queries vector spaces the corpora may not live
|
||||||
|
//! in. Three tables share the problem and are all covered here:
|
||||||
|
//!
|
||||||
|
//! - `daily_conversation_summaries` — re-embeds
|
||||||
|
//! `strip_summary_boilerplate(summary)` (what the original job fed the
|
||||||
|
//! embedder); also rewrites `model_version`.
|
||||||
|
//! - `calendar_events` — re-embeds "summary description location" exactly
|
||||||
|
//! as `import_calendar` does; rows without an embedding are skipped (the
|
||||||
|
//! import only embeds under `--generate-embeddings`).
|
||||||
|
//! - `search_history` — re-embeds the raw query text.
|
||||||
|
//! - `entities` (knowledge graph) — re-embeds "name description" exactly as
|
||||||
|
//! `tool_store_entity` does; embedding-less rows are skipped (embedding
|
||||||
|
//! is best-effort at store time).
|
||||||
|
//!
|
||||||
|
//! Source text is untouched — only vectors are rewritten. The old↔new
|
||||||
|
//! cosine report doubles as a diagnostic: ~1.0 means both backends already
|
||||||
|
//! shared a space (re-embedding was a no-op); low values confirm the
|
||||||
|
//! mismatch this tool exists to fix.
|
||||||
|
|
||||||
|
use anyhow::{Context, Result};
|
||||||
|
use clap::Parser;
|
||||||
|
use diesel::prelude::*;
|
||||||
|
use diesel::sql_query;
|
||||||
|
use diesel::sqlite::SqliteConnection;
|
||||||
|
use image_api::ai::{LocalLlm, strip_summary_boilerplate};
|
||||||
|
use image_api::bin_progress;
|
||||||
|
use std::env;
|
||||||
|
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
#[command(author, version, about = "Re-embed stored corpora via the configured LLM_BACKEND", long_about = None)]
|
||||||
|
struct Args {
|
||||||
|
/// Comma-separated tables to process: summaries, calendar, search, entities
|
||||||
|
#[arg(long, default_value = "summaries,calendar,search,entities")]
|
||||||
|
tables: String,
|
||||||
|
|
||||||
|
/// Only process the first N rows per table (smoke test)
|
||||||
|
#[arg(long)]
|
||||||
|
limit: Option<usize>,
|
||||||
|
|
||||||
|
/// Compute embeddings and report old↔new similarity without writing
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
dry_run: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(QueryableByName)]
|
||||||
|
struct SummaryRow {
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Integer)]
|
||||||
|
id: i32,
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Text)]
|
||||||
|
summary: String,
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Binary)]
|
||||||
|
embedding: Vec<u8>,
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Text)]
|
||||||
|
model_version: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(QueryableByName)]
|
||||||
|
struct CalendarRow {
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Integer)]
|
||||||
|
id: i32,
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Text)]
|
||||||
|
summary: String,
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
|
||||||
|
description: Option<String>,
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
|
||||||
|
location: Option<String>,
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Binary)]
|
||||||
|
embedding: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(QueryableByName)]
|
||||||
|
struct SearchRow {
|
||||||
|
#[diesel(sql_type = diesel::sql_types::BigInt)]
|
||||||
|
id: i64,
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Text)]
|
||||||
|
query: String,
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Binary)]
|
||||||
|
embedding: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(QueryableByName)]
|
||||||
|
struct EntityRow {
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Integer)]
|
||||||
|
id: i32,
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Text)]
|
||||||
|
name: String,
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Text)]
|
||||||
|
description: String,
|
||||||
|
#[diesel(sql_type = diesel::sql_types::Binary)]
|
||||||
|
embedding: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One unit of re-embed work, normalized across tables.
|
||||||
|
struct WorkItem {
|
||||||
|
/// Row key, as i64 so both i32 ids and rowids fit.
|
||||||
|
id: i64,
|
||||||
|
/// Text fed to the embedder — must match what the original writer used.
|
||||||
|
text: String,
|
||||||
|
/// Existing vector bytes, for the old↔new similarity report.
|
||||||
|
old_embedding: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deserialize_vector(bytes: &[u8]) -> Option<Vec<f32>> {
|
||||||
|
if !bytes.len().is_multiple_of(4) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(
|
||||||
|
bytes
|
||||||
|
.chunks_exact(4)
|
||||||
|
.map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_vector(vec: &[f32]) -> Vec<u8> {
|
||||||
|
vec.iter().flat_map(|f| f.to_le_bytes()).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
|
||||||
|
if a.len() != b.len() {
|
||||||
|
return 0.0;
|
||||||
|
}
|
||||||
|
let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
|
||||||
|
let mag_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
|
||||||
|
let mag_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
|
||||||
|
if mag_a == 0.0 || mag_b == 0.0 {
|
||||||
|
return 0.0;
|
||||||
|
}
|
||||||
|
dot / (mag_a * mag_b)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Embed `text`, halving it on "input too large" errors until it fits the
|
||||||
|
/// server's physical batch (`--ubatch-size`). Mirrors the silent truncation
|
||||||
|
/// Ollama applied when these corpora were first embedded — llama-server
|
||||||
|
/// returns a 500 instead — except here it's surfaced via the returned flag.
|
||||||
|
/// Returns `(embedding, truncated)`.
|
||||||
|
async fn embed_with_truncation(llm: &LocalLlm, text: &str) -> Result<(Vec<f32>, bool)> {
|
||||||
|
let mut text = text.to_string();
|
||||||
|
let mut truncated = false;
|
||||||
|
loop {
|
||||||
|
match llm.embed(&text).await {
|
||||||
|
Ok(emb) => return Ok((emb, truncated)),
|
||||||
|
Err(e)
|
||||||
|
if e.to_string().contains("too large to process") && text.chars().count() > 64 =>
|
||||||
|
{
|
||||||
|
let keep = text.chars().count() / 2;
|
||||||
|
text = text.chars().take(keep).collect();
|
||||||
|
truncated = true;
|
||||||
|
}
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Re-embed `items`, writing each new vector via `update`. Returns the
|
||||||
|
/// old↔new cosines for the similarity report.
|
||||||
|
async fn reembed_table(
|
||||||
|
conn: &mut SqliteConnection,
|
||||||
|
llm: &LocalLlm,
|
||||||
|
label: &str,
|
||||||
|
items: Vec<WorkItem>,
|
||||||
|
dry_run: bool,
|
||||||
|
update: impl Fn(&mut SqliteConnection, i64, Vec<u8>) -> Result<()>,
|
||||||
|
) -> Result<Vec<f32>> {
|
||||||
|
println!("\n[{}] re-embedding {} rows...", label, items.len());
|
||||||
|
let pb = bin_progress::determinate(items.len() as u64, format!("re-embedding {}", label));
|
||||||
|
|
||||||
|
let mut sims: Vec<f32> = Vec::with_capacity(items.len());
|
||||||
|
let mut updated = 0usize;
|
||||||
|
let mut failed = 0usize;
|
||||||
|
let mut truncated_count = 0usize;
|
||||||
|
|
||||||
|
for item in &items {
|
||||||
|
let new_emb = match embed_with_truncation(llm, &item.text).await {
|
||||||
|
Ok((e, truncated)) => {
|
||||||
|
if truncated {
|
||||||
|
truncated_count += 1;
|
||||||
|
pb.println(format!(
|
||||||
|
"⚠ {} id={}: input exceeded the embed server's batch size, \
|
||||||
|
truncated before embedding",
|
||||||
|
label, item.id
|
||||||
|
));
|
||||||
|
}
|
||||||
|
e
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
pb.inc(1);
|
||||||
|
failed += 1;
|
||||||
|
eprintln!("✗ {} id={}: {}", label, item.id, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// The whole pipeline (DAO checks, stored corpora) assumes 768 dims.
|
||||||
|
// A different dim means the active backend is not serving a
|
||||||
|
// nomic-compatible model — stop rather than corrupt the table.
|
||||||
|
anyhow::ensure!(
|
||||||
|
new_emb.len() == 768,
|
||||||
|
"backend returned {}-dim embedding (expected 768) — '{}' is not \
|
||||||
|
serving a nomic-embed-text-v1.5-compatible model",
|
||||||
|
new_emb.len(),
|
||||||
|
llm.embedding_model_version()
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some(old_emb) = deserialize_vector(&item.old_embedding) {
|
||||||
|
sims.push(cosine_similarity(&old_emb, &new_emb));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !dry_run {
|
||||||
|
update(conn, item.id, serialize_vector(&new_emb))
|
||||||
|
.with_context(|| format!("updating {} id={}", label, item.id))?;
|
||||||
|
}
|
||||||
|
updated += 1;
|
||||||
|
pb.inc(1);
|
||||||
|
}
|
||||||
|
pb.finish_and_clear();
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"[{}] {} re-embedded ({} truncated), {} failed",
|
||||||
|
label, updated, truncated_count, failed
|
||||||
|
);
|
||||||
|
Ok(sims)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn report_similarity(label: &str, mut sims: Vec<f32>) {
|
||||||
|
if sims.is_empty() {
|
||||||
|
println!("[{}] no old↔new pairs to compare", label);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sims.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
|
||||||
|
let mean: f32 = sims.iter().sum::<f32>() / sims.len() as f32;
|
||||||
|
let median = sims[sims.len() / 2];
|
||||||
|
println!(
|
||||||
|
"[{}] old↔new cosine over identical text: min={:.3} median={:.3} mean={:.3} max={:.3}",
|
||||||
|
label,
|
||||||
|
sims.first().unwrap(),
|
||||||
|
median,
|
||||||
|
mean,
|
||||||
|
sims.last().unwrap()
|
||||||
|
);
|
||||||
|
if median > 0.98 {
|
||||||
|
println!(
|
||||||
|
"[{}] → old and new backends agree (~same vector space); poor search \
|
||||||
|
results are coming from something else (prefixes, thresholds, corpus).",
|
||||||
|
label
|
||||||
|
);
|
||||||
|
} else if median > 0.9 {
|
||||||
|
println!(
|
||||||
|
"[{}] → same model family but measurably different vectors \
|
||||||
|
(quantization / runtime drift); re-embedding was worthwhile.",
|
||||||
|
label
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
println!(
|
||||||
|
"[{}] → vector-space mismatch confirmed — queries were searching a \
|
||||||
|
different space than the corpus. This re-embed should fix it.",
|
||||||
|
label
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<()> {
|
||||||
|
dotenv::dotenv().ok();
|
||||||
|
env_logger::init();
|
||||||
|
let args = Args::parse();
|
||||||
|
|
||||||
|
let tables: Vec<&str> = args.tables.split(',').map(|t| t.trim()).collect();
|
||||||
|
for t in &tables {
|
||||||
|
anyhow::ensure!(
|
||||||
|
matches!(*t, "summaries" | "calendar" | "search" | "entities"),
|
||||||
|
"unknown table '{}' — expected summaries, calendar, search, entities",
|
||||||
|
t
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| "auth.db".to_string());
|
||||||
|
println!("Database: {}", database_url);
|
||||||
|
|
||||||
|
let mut conn = SqliteConnection::establish(&database_url)
|
||||||
|
.with_context(|| format!("connecting to {}", database_url))?;
|
||||||
|
|
||||||
|
let llm = LocalLlm::from_env();
|
||||||
|
let model_version = llm.embedding_model_version();
|
||||||
|
println!("Embedding via '{}'", model_version);
|
||||||
|
if args.dry_run {
|
||||||
|
println!("DRY RUN — no rows will be written");
|
||||||
|
}
|
||||||
|
|
||||||
|
if tables.contains(&"summaries") {
|
||||||
|
let mut rows: Vec<SummaryRow> = sql_query(
|
||||||
|
"SELECT id, summary, embedding, model_version
|
||||||
|
FROM daily_conversation_summaries ORDER BY date",
|
||||||
|
)
|
||||||
|
.load(&mut conn)
|
||||||
|
.context("loading daily summaries")?;
|
||||||
|
if let Some(limit) = args.limit {
|
||||||
|
rows.truncate(limit);
|
||||||
|
}
|
||||||
|
if let Some(first) = rows.first() {
|
||||||
|
println!(
|
||||||
|
"\n[summaries] previous model_version '{}' → '{}'",
|
||||||
|
first.model_version, model_version
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let items = rows
|
||||||
|
.into_iter()
|
||||||
|
.map(|r| WorkItem {
|
||||||
|
id: r.id as i64,
|
||||||
|
text: strip_summary_boilerplate(&r.summary),
|
||||||
|
old_embedding: r.embedding,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let mv = model_version.clone();
|
||||||
|
let sims = reembed_table(
|
||||||
|
&mut conn,
|
||||||
|
&llm,
|
||||||
|
"summaries",
|
||||||
|
items,
|
||||||
|
args.dry_run,
|
||||||
|
move |conn, id, emb| {
|
||||||
|
sql_query(
|
||||||
|
"UPDATE daily_conversation_summaries
|
||||||
|
SET embedding = ?1, model_version = ?2 WHERE id = ?3",
|
||||||
|
)
|
||||||
|
.bind::<diesel::sql_types::Binary, _>(emb)
|
||||||
|
.bind::<diesel::sql_types::Text, _>(&mv)
|
||||||
|
.bind::<diesel::sql_types::Integer, _>(id as i32)
|
||||||
|
.execute(conn)?;
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
report_similarity("summaries", sims);
|
||||||
|
}
|
||||||
|
|
||||||
|
if tables.contains(&"calendar") {
|
||||||
|
let mut rows: Vec<CalendarRow> = sql_query(
|
||||||
|
"SELECT id, summary, description, location, embedding
|
||||||
|
FROM calendar_events WHERE embedding IS NOT NULL ORDER BY id",
|
||||||
|
)
|
||||||
|
.load(&mut conn)
|
||||||
|
.context("loading calendar events")?;
|
||||||
|
if let Some(limit) = args.limit {
|
||||||
|
rows.truncate(limit);
|
||||||
|
}
|
||||||
|
let items = rows
|
||||||
|
.into_iter()
|
||||||
|
.map(|r| WorkItem {
|
||||||
|
id: r.id as i64,
|
||||||
|
// Same text construction as import_calendar.
|
||||||
|
text: format!(
|
||||||
|
"{} {} {}",
|
||||||
|
r.summary,
|
||||||
|
r.description.as_deref().unwrap_or(""),
|
||||||
|
r.location.as_deref().unwrap_or("")
|
||||||
|
),
|
||||||
|
old_embedding: r.embedding,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let sims = reembed_table(
|
||||||
|
&mut conn,
|
||||||
|
&llm,
|
||||||
|
"calendar",
|
||||||
|
items,
|
||||||
|
args.dry_run,
|
||||||
|
|conn, id, emb| {
|
||||||
|
sql_query("UPDATE calendar_events SET embedding = ?1 WHERE id = ?2")
|
||||||
|
.bind::<diesel::sql_types::Binary, _>(emb)
|
||||||
|
.bind::<diesel::sql_types::Integer, _>(id as i32)
|
||||||
|
.execute(conn)?;
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
report_similarity("calendar", sims);
|
||||||
|
}
|
||||||
|
|
||||||
|
if tables.contains(&"search") {
|
||||||
|
let mut rows: Vec<SearchRow> = sql_query(
|
||||||
|
"SELECT rowid AS id, query, embedding
|
||||||
|
FROM search_history ORDER BY rowid",
|
||||||
|
)
|
||||||
|
.load(&mut conn)
|
||||||
|
.context("loading search history")?;
|
||||||
|
if let Some(limit) = args.limit {
|
||||||
|
rows.truncate(limit);
|
||||||
|
}
|
||||||
|
let items = rows
|
||||||
|
.into_iter()
|
||||||
|
.map(|r| WorkItem {
|
||||||
|
id: r.id,
|
||||||
|
text: r.query,
|
||||||
|
old_embedding: r.embedding,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let sims = reembed_table(
|
||||||
|
&mut conn,
|
||||||
|
&llm,
|
||||||
|
"search",
|
||||||
|
items,
|
||||||
|
args.dry_run,
|
||||||
|
|conn, id, emb| {
|
||||||
|
sql_query("UPDATE search_history SET embedding = ?1 WHERE rowid = ?2")
|
||||||
|
.bind::<diesel::sql_types::Binary, _>(emb)
|
||||||
|
.bind::<diesel::sql_types::BigInt, _>(id)
|
||||||
|
.execute(conn)?;
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
report_similarity("search", sims);
|
||||||
|
}
|
||||||
|
|
||||||
|
if tables.contains(&"entities") {
|
||||||
|
let mut rows: Vec<EntityRow> = sql_query(
|
||||||
|
"SELECT id, name, description, embedding
|
||||||
|
FROM entities WHERE embedding IS NOT NULL ORDER BY id",
|
||||||
|
)
|
||||||
|
.load(&mut conn)
|
||||||
|
.context("loading knowledge entities")?;
|
||||||
|
if let Some(limit) = args.limit {
|
||||||
|
rows.truncate(limit);
|
||||||
|
}
|
||||||
|
let items = rows
|
||||||
|
.into_iter()
|
||||||
|
.map(|r| WorkItem {
|
||||||
|
id: r.id as i64,
|
||||||
|
// Same text construction as tool_store_entity.
|
||||||
|
text: format!("{} {}", r.name, r.description),
|
||||||
|
old_embedding: r.embedding,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let sims = reembed_table(
|
||||||
|
&mut conn,
|
||||||
|
&llm,
|
||||||
|
"entities",
|
||||||
|
items,
|
||||||
|
args.dry_run,
|
||||||
|
|conn, id, emb| {
|
||||||
|
sql_query("UPDATE entities SET embedding = ?1 WHERE id = ?2")
|
||||||
|
.bind::<diesel::sql_types::Binary, _>(emb)
|
||||||
|
.bind::<diesel::sql_types::Integer, _>(id as i32)
|
||||||
|
.execute(conn)?;
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
report_similarity("entities", sims);
|
||||||
|
}
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"\n{}",
|
||||||
|
if args.dry_run {
|
||||||
|
"Dry run complete"
|
||||||
|
} else {
|
||||||
|
"Done"
|
||||||
|
}
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
+18
-16
@@ -186,21 +186,7 @@ impl AppState {
|
|||||||
impl Default for AppState {
|
impl Default for AppState {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
// Initialize AI clients
|
// Initialize AI clients
|
||||||
let ollama_primary_url = env::var("OLLAMA_PRIMARY_URL").unwrap_or_else(|_| {
|
let ollama = build_ollama_from_env();
|
||||||
env::var("OLLAMA_URL").unwrap_or_else(|_| "http://localhost:11434".to_string())
|
|
||||||
});
|
|
||||||
let ollama_fallback_url = env::var("OLLAMA_FALLBACK_URL").ok();
|
|
||||||
let ollama_primary_model = env::var("OLLAMA_PRIMARY_MODEL")
|
|
||||||
.or_else(|_| env::var("OLLAMA_MODEL"))
|
|
||||||
.unwrap_or_else(|_| "nemotron-3-nano:30b".to_string());
|
|
||||||
let ollama_fallback_model = env::var("OLLAMA_FALLBACK_MODEL").ok();
|
|
||||||
|
|
||||||
let ollama = OllamaClient::new(
|
|
||||||
ollama_primary_url,
|
|
||||||
ollama_fallback_url,
|
|
||||||
ollama_primary_model,
|
|
||||||
ollama_fallback_model,
|
|
||||||
);
|
|
||||||
|
|
||||||
let openrouter = build_openrouter_from_env();
|
let openrouter = build_openrouter_from_env();
|
||||||
let openrouter_allowed_models = parse_openrouter_allowed_models();
|
let openrouter_allowed_models = parse_openrouter_allowed_models();
|
||||||
@@ -375,13 +361,29 @@ fn parse_openrouter_allowed_models() -> Vec<String> {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Build the `OllamaClient` from environment variables — the canonical
|
||||||
|
/// `OLLAMA_*` wiring shared by the server (`AppState::default`) and the
|
||||||
|
/// standalone binaries (which predate this helper and used to copy it).
|
||||||
|
pub fn build_ollama_from_env() -> OllamaClient {
|
||||||
|
let primary_url = env::var("OLLAMA_PRIMARY_URL").unwrap_or_else(|_| {
|
||||||
|
env::var("OLLAMA_URL").unwrap_or_else(|_| "http://localhost:11434".to_string())
|
||||||
|
});
|
||||||
|
let fallback_url = env::var("OLLAMA_FALLBACK_URL").ok();
|
||||||
|
let primary_model = env::var("OLLAMA_PRIMARY_MODEL")
|
||||||
|
.or_else(|_| env::var("OLLAMA_MODEL"))
|
||||||
|
.unwrap_or_else(|_| "nemotron-3-nano:30b".to_string());
|
||||||
|
let fallback_model = env::var("OLLAMA_FALLBACK_MODEL").ok();
|
||||||
|
|
||||||
|
OllamaClient::new(primary_url, fallback_url, primary_model, fallback_model)
|
||||||
|
}
|
||||||
|
|
||||||
/// Build a `LlamaCppClient` from environment variables. Returns `None` when
|
/// Build a `LlamaCppClient` from environment variables. Returns `None` when
|
||||||
/// `LLAMA_SWAP_URL` is unset. The client is constructed unconditionally
|
/// `LLAMA_SWAP_URL` is unset. The client is constructed unconditionally
|
||||||
/// when the URL is set (so it's available even under `LLM_BACKEND=ollama`
|
/// when the URL is set (so it's available even under `LLM_BACKEND=ollama`
|
||||||
/// for ad-hoc tooling), but the agentic / chat paths only route through it
|
/// for ad-hoc tooling), but the agentic / chat paths only route through it
|
||||||
/// when `LLM_BACKEND=llamacpp`. Slot ids default to the names the bundled
|
/// when `LLM_BACKEND=llamacpp`. Slot ids default to the names the bundled
|
||||||
/// `llama-swap/config.yaml` uses — `chat` / `vision` / `embed`.
|
/// `llama-swap/config.yaml` uses — `chat` / `vision` / `embed`.
|
||||||
fn build_llamacpp_from_env() -> Option<Arc<LlamaCppClient>> {
|
pub fn build_llamacpp_from_env() -> Option<Arc<LlamaCppClient>> {
|
||||||
let base_url = env::var("LLAMA_SWAP_URL").ok()?;
|
let base_url = env::var("LLAMA_SWAP_URL").ok()?;
|
||||||
let primary_model = env::var("LLAMA_SWAP_PRIMARY_MODEL").ok();
|
let primary_model = env::var("LLAMA_SWAP_PRIMARY_MODEL").ok();
|
||||||
let mut client = LlamaCppClient::new(Some(base_url), primary_model);
|
let mut client = LlamaCppClient::new(Some(base_url), primary_model);
|
||||||
|
|||||||
Reference in New Issue
Block a user