From af35a996a3e64f3c27e23f8deab06e91aa9b99e4 Mon Sep 17 00:00:00 2001 From: Cameron Date: Wed, 14 Jan 2026 13:31:15 -0500 Subject: [PATCH] Cleanup unused message embedding code Fixup some warnings --- src/ai/daily_summary_job.rs | 16 +- src/ai/embedding_job.rs | 212 ----------- src/ai/insight_generator.rs | 32 +- src/ai/mod.rs | 1 - src/ai/ollama.rs | 99 +++-- src/bin/import_calendar.rs | 9 +- src/bin/import_location_history.rs | 9 +- src/bin/import_search_history.rs | 9 +- src/bin/test_daily_summary.rs | 2 +- src/data/mod.rs | 2 + src/database/calendar_dao.rs | 29 +- src/database/embeddings_dao.rs | 569 ---------------------------- src/database/location_dao.rs | 28 +- src/database/mod.rs | 2 - src/files.rs | 17 +- src/parsers/location_json_parser.rs | 1 + src/parsers/search_html_parser.rs | 66 ++-- 17 files changed, 161 insertions(+), 942 deletions(-) delete mode 100644 src/ai/embedding_job.rs delete mode 100644 src/database/embeddings_dao.rs diff --git a/src/ai/daily_summary_job.rs b/src/ai/daily_summary_job.rs index 988c046..9d9c9e0 100644 --- a/src/ai/daily_summary_job.rs +++ b/src/ai/daily_summary_job.rs @@ -72,11 +72,12 @@ pub fn strip_summary_boilerplate(summary: &str) -> String { // Remove any remaining leading markdown bold markers if text.starts_with("**") - && let Some(end) = text[2..].find("**") { - // Keep the content between ** but remove the markers - let bold_content = &text[2..2 + end]; - text = format!("{}{}", bold_content, &text[4 + end..]); - } + && let Some(end) = text[2..].find("**") + { + // Keep the content between ** but remove the markers + let bold_content = &text[2..2 + end]; + text = format!("{}{}", bold_content, &text[4 + end..]); + } text.trim().to_string() } @@ -141,10 +142,7 @@ pub async fn generate_daily_summaries( if let Some(dt) = msg_dt { let date = dt.date_naive(); if date >= start && date <= end { - messages_by_date - .entry(date) - .or_default() - .push(msg); + messages_by_date.entry(date).or_default().push(msg); } } } diff --git a/src/ai/embedding_job.rs b/src/ai/embedding_job.rs deleted file mode 100644 index 6926262..0000000 --- a/src/ai/embedding_job.rs +++ /dev/null @@ -1,212 +0,0 @@ -use anyhow::Result; -use chrono::Utc; -use std::sync::{Arc, Mutex}; -use tokio::time::{Duration, sleep}; - -use crate::ai::{OllamaClient, SmsApiClient}; -use crate::database::{EmbeddingDao, InsertMessageEmbedding}; - -/// Background job to embed messages for a specific contact -/// This function is idempotent - it checks if embeddings already exist before processing -/// -/// # Arguments -/// * `contact` - The contact name to embed messages for (e.g., "Amanda") -/// * `ollama` - Ollama client for generating embeddings -/// * `sms_client` - SMS API client for fetching messages -/// * `embedding_dao` - DAO for storing embeddings in the database -/// -/// # Returns -/// Ok(()) on success, Err on failure -pub async fn embed_contact_messages( - contact: &str, - ollama: &OllamaClient, - sms_client: &SmsApiClient, - embedding_dao: Arc>>, -) -> Result<()> { - log::info!("Starting message embedding job for contact: {}", contact); - - let otel_context = opentelemetry::Context::new(); - - // Check existing embeddings count - let existing_count = { - let mut dao = embedding_dao.lock().expect("Unable to lock EmbeddingDao"); - dao.get_message_count(&otel_context, contact).unwrap_or(0) - }; - - if existing_count > 0 { - log::info!( - "Contact '{}' already has {} embeddings, will check for new messages to embed", - contact, - existing_count - ); - } - - log::info!("Fetching all messages for contact: {}", contact); - - // Fetch all messages for the contact - let messages = sms_client.fetch_all_messages_for_contact(contact).await?; - - let total_messages = messages.len(); - log::info!( - "Fetched {} messages for contact '{}'", - total_messages, - contact - ); - - if total_messages == 0 { - log::warn!( - "No messages found for contact '{}', nothing to embed", - contact - ); - return Ok(()); - } - - // Filter out messages that already have embeddings and short/generic messages - log::info!("Filtering out messages that already have embeddings and short messages..."); - let min_message_length = 30; // Skip short messages like "Thanks!" or "Yeah, it was :)" - let messages_to_embed: Vec<&crate::ai::SmsMessage> = { - let mut dao = embedding_dao.lock().expect("Unable to lock EmbeddingDao"); - messages - .iter() - .filter(|msg| { - // Filter out short messages - if msg.body.len() < min_message_length { - return false; - } - // Filter out already embedded messages - !dao.message_exists(&otel_context, contact, &msg.body, msg.timestamp) - .unwrap_or(false) - }) - .collect() - }; - - let skipped = total_messages - messages_to_embed.len(); - let to_embed = messages_to_embed.len(); - - log::info!( - "Found {} messages to embed ({} already embedded)", - to_embed, - skipped - ); - - if to_embed == 0 { - log::info!("All messages already embedded for contact '{}'", contact); - return Ok(()); - } - - // Process messages in batches - let batch_size = 128; // Embed 128 messages per API call - let mut successful = 0; - let mut failed = 0; - - for (batch_idx, batch) in messages_to_embed.chunks(batch_size).enumerate() { - let batch_start = batch_idx * batch_size; - let batch_end = batch_start + batch.len(); - - log::info!( - "Processing batch {}/{}: messages {}-{} ({:.1}% complete)", - batch_idx + 1, - to_embed.div_ceil(batch_size), - batch_start + 1, - batch_end, - (batch_end as f64 / to_embed as f64) * 100.0 - ); - - match embed_message_batch(batch, contact, ollama, embedding_dao.clone()).await { - Ok(count) => { - successful += count; - log::debug!("Successfully embedded {} messages in batch", count); - } - Err(e) => { - failed += batch.len(); - log::error!("Failed to embed batch: {:?}", e); - // Continue processing despite failures - } - } - - // Small delay between batches to avoid overwhelming Ollama - if batch_end < to_embed { - sleep(Duration::from_millis(500)).await; - } - } - - log::info!( - "Message embedding job complete for '{}': {}/{} new embeddings created ({} already embedded, {} failed)", - contact, - successful, - total_messages, - skipped, - failed - ); - - if failed > 0 { - log::warn!( - "{} messages failed to embed for contact '{}'", - failed, - contact - ); - } - - Ok(()) -} - -/// Embed a batch of messages using a single API call -/// Returns the number of successfully embedded messages -async fn embed_message_batch( - messages: &[&crate::ai::SmsMessage], - contact: &str, - ollama: &OllamaClient, - embedding_dao: Arc>>, -) -> Result { - if messages.is_empty() { - return Ok(0); - } - - // Collect message bodies for batch embedding - let bodies: Vec<&str> = messages.iter().map(|m| m.body.as_str()).collect(); - - // Generate embeddings for all messages in one API call - let embeddings = ollama.generate_embeddings(&bodies).await?; - - if embeddings.len() != messages.len() { - return Err(anyhow::anyhow!( - "Embedding count mismatch: got {} embeddings for {} messages", - embeddings.len(), - messages.len() - )); - } - - // Build batch of insert records - let otel_context = opentelemetry::Context::new(); - let created_at = Utc::now().timestamp(); - let mut inserts = Vec::with_capacity(messages.len()); - - for (message, embedding) in messages.iter().zip(embeddings.iter()) { - // Validate embedding dimensions - if embedding.len() != 768 { - log::warn!( - "Invalid embedding dimensions: {} (expected 768), skipping", - embedding.len() - ); - continue; - } - - inserts.push(InsertMessageEmbedding { - contact: contact.to_string(), - body: message.body.clone(), - timestamp: message.timestamp, - is_sent: message.is_sent, - embedding: embedding.clone(), - created_at, - model_version: "nomic-embed-text:v1.5".to_string(), - }); - } - - // Store all embeddings in a single transaction - let mut dao = embedding_dao.lock().expect("Unable to lock EmbeddingDao"); - let stored_count = dao - .store_message_embeddings_batch(&otel_context, inserts) - .map_err(|e| anyhow::anyhow!("Failed to store embeddings batch: {:?}", e))?; - - Ok(stored_count) -} diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index b6aa432..844a38e 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -86,9 +86,10 @@ impl InsightGenerator { // If path has at least 2 components (directory + file), extract first directory if components.len() >= 2 && let Some(component) = components.first() - && let Some(os_str) = component.as_os_str().to_str() { - return Some(os_str.to_string()); - } + && let Some(os_str) = component.as_os_str().to_str() + { + return Some(os_str.to_string()); + } None } @@ -190,20 +191,19 @@ impl InsightGenerator { .filter(|msg| { // Extract date from formatted daily summary "[2024-08-15] Contact ..." if let Some(bracket_end) = msg.find(']') - && let Some(date_str) = msg.get(1..bracket_end) { - // Parse just the date (daily summaries don't have time) - if let Ok(msg_date) = - chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d") - { - let msg_timestamp = msg_date - .and_hms_opt(12, 0, 0) - .unwrap() - .and_utc() - .timestamp(); - let time_diff = (photo_timestamp - msg_timestamp).abs(); - return time_diff > exclusion_window; - } + && let Some(date_str) = msg.get(1..bracket_end) + { + // Parse just the date (daily summaries don't have time) + if let Ok(msg_date) = chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d") { + let msg_timestamp = msg_date + .and_hms_opt(12, 0, 0) + .unwrap() + .and_utc() + .timestamp(); + let time_diff = (photo_timestamp - msg_timestamp).abs(); + return time_diff > exclusion_window; } + } false }) .take(limit) diff --git a/src/ai/mod.rs b/src/ai/mod.rs index fb566a4..57425e1 100644 --- a/src/ai/mod.rs +++ b/src/ai/mod.rs @@ -1,5 +1,4 @@ pub mod daily_summary_job; -pub mod embedding_job; pub mod handlers; pub mod insight_generator; pub mod ollama; diff --git a/src/ai/ollama.rs b/src/ai/ollama.rs index 7d27fb8..c7da48f 100644 --- a/src/ai/ollama.rs +++ b/src/ai/ollama.rs @@ -79,10 +79,11 @@ impl OllamaClient { { let cache = MODEL_LIST_CACHE.lock().unwrap(); if let Some(entry) = cache.get(url) - && !entry.is_expired() { - log::debug!("Returning cached model list for {}", url); - return Ok(entry.data.clone()); - } + && !entry.is_expired() + { + log::debug!("Returning cached model list for {}", url); + return Ok(entry.data.clone()); + } } log::debug!("Fetching fresh model list from {}", url); @@ -188,10 +189,11 @@ impl OllamaClient { { let cache = MODEL_CAPABILITIES_CACHE.lock().unwrap(); if let Some(entry) = cache.get(url) - && !entry.is_expired() { - log::debug!("Returning cached model capabilities for {}", url); - return Ok(entry.data.clone()); - } + && !entry.is_expired() + { + log::debug!("Returning cached model capabilities for {}", url); + return Ok(entry.data.clone()); + } } log::debug!("Fetching fresh model capabilities from {}", url); @@ -420,8 +422,8 @@ Return ONLY the title, nothing else."#, ) } } else if let Some(contact_name) = contact { - format!( - r#"Create a short title (maximum 8 words) about this moment: + format!( + r#"Create a short title (maximum 8 words) about this moment: Date: {} Location: {} @@ -431,15 +433,15 @@ Return ONLY the title, nothing else."#, Use specific details from the context above. The photo is from a folder for {}, so they are likely related to this moment. If no specific details are available, use a simple descriptive title. Return ONLY the title, nothing else."#, - date.format("%B %d, %Y"), - location_str, - contact_name, - sms_str, - contact_name - ) - } else { - format!( - r#"Create a short title (maximum 8 words) about this moment: + date.format("%B %d, %Y"), + location_str, + contact_name, + sms_str, + contact_name + ) + } else { + format!( + r#"Create a short title (maximum 8 words) about this moment: Date: {} Location: {} @@ -448,11 +450,11 @@ Return ONLY the title, nothing else."#, Use specific details from the context above. If no specific details are available, use a simple descriptive title. Return ONLY the title, nothing else."#, - date.format("%B %d, %Y"), - location_str, - sms_str - ) - }; + date.format("%B %d, %Y"), + location_str, + sms_str + ) + }; let system = custom_system.unwrap_or("You are my long term memory assistant. Use only the information provided. Do not invent details."); @@ -509,8 +511,8 @@ Analyze the image and use specific details from both the visual content and the ) } } else if let Some(contact_name) = contact { - format!( - r#"Write a 1-3 paragraph description of this moment based on the available information: + format!( + r#"Write a 1-3 paragraph description of this moment based on the available information: Date: {} Location: {} @@ -518,27 +520,27 @@ Analyze the image and use specific details from both the visual content and the Messages: {} Use only the specific details provided above. The photo is from a folder for {}, so they are likely related to this moment. Mention people's names (especially {}), places, or activities if they appear in the context. Write in first person as Cameron with the tone of a journal entry. If limited information is available, keep it simple and factual. If the location is unknown omit it"#, - date.format("%B %d, %Y"), - location_str, - contact_name, - sms_str, - contact_name, - contact_name - ) - } else { - format!( - r#"Write a 1-3 paragraph description of this moment based on the available information: + date.format("%B %d, %Y"), + location_str, + contact_name, + sms_str, + contact_name, + contact_name + ) + } else { + format!( + r#"Write a 1-3 paragraph description of this moment based on the available information: Date: {} Location: {} Messages: {} Use only the specific details provided above. Mention people's names, places, or activities if they appear in the context. Write in first person as Cameron with the tone of a journal entry. If limited information is available, keep it simple and factual. If the location is unknown omit it"#, - date.format("%B %d, %Y"), - location_str, - sms_str - ) - }; + date.format("%B %d, %Y"), + location_str, + sms_str + ) + }; let system = custom_system.unwrap_or("You are a memory refreshing assistant who is able to provide insights through analyzing past conversations. Use only the information provided. Do not invent details."); @@ -642,15 +644,6 @@ Analyze the image and use specific details from both the visual content and the Ok(embeddings) } - /// Internal helper to try generating an embedding from a specific server - async fn try_generate_embedding(&self, url: &str, model: &str, text: &str) -> Result> { - let embeddings = self.try_generate_embeddings(url, model, &[text]).await?; - embeddings - .into_iter() - .next() - .ok_or_else(|| anyhow::anyhow!("No embedding returned from Ollama")) - } - /// Internal helper to try generating embeddings for multiple texts from a specific server async fn try_generate_embeddings( &self, @@ -730,12 +723,6 @@ pub struct ModelCapabilities { pub has_vision: bool, } -#[derive(Serialize)] -struct OllamaEmbedRequest { - model: String, - input: String, -} - #[derive(Serialize)] struct OllamaBatchEmbedRequest { model: String, diff --git a/src/bin/import_calendar.rs b/src/bin/import_calendar.rs index 277614c..8dba110 100644 --- a/src/bin/import_calendar.rs +++ b/src/bin/import_calendar.rs @@ -80,10 +80,11 @@ async fn main() -> Result<()> { event.event_uid.as_deref().unwrap_or(""), event.start_time, ) - && exists { - *skipped_count.lock().unwrap() += 1; - continue; - } + && exists + { + *skipped_count.lock().unwrap() += 1; + continue; + } // Generate embedding if requested (blocking call) let embedding = if let Some(ref ollama_client) = ollama { diff --git a/src/bin/import_location_history.rs b/src/bin/import_location_history.rs index 792cb55..baa0d54 100644 --- a/src/bin/import_location_history.rs +++ b/src/bin/import_location_history.rs @@ -65,10 +65,11 @@ async fn main() -> Result<()> { location.latitude, location.longitude, ) - && exists { - skipped_count += 1; - continue; - } + && exists + { + skipped_count += 1; + continue; + } batch_inserts.push(InsertLocationRecord { timestamp: location.timestamp, diff --git a/src/bin/import_search_history.rs b/src/bin/import_search_history.rs index 0b1df28..f278ca1 100644 --- a/src/bin/import_search_history.rs +++ b/src/bin/import_search_history.rs @@ -95,10 +95,11 @@ async fn main() -> Result<()> { if args.skip_existing && let Ok(exists) = dao_instance.search_exists(&context, search.timestamp, &search.query) - && exists { - skipped_count += 1; - continue; - } + && exists + { + skipped_count += 1; + continue; + } // Only insert if we have an embedding if let Some(embedding) = embedding_opt { diff --git a/src/bin/test_daily_summary.rs b/src/bin/test_daily_summary.rs index d1f5c42..fbbb621 100644 --- a/src/bin/test_daily_summary.rs +++ b/src/bin/test_daily_summary.rs @@ -1,7 +1,7 @@ use anyhow::Result; use chrono::NaiveDate; use clap::Parser; -use image_api::ai::{strip_summary_boilerplate, OllamaClient, SmsApiClient}; +use image_api::ai::{OllamaClient, SmsApiClient, strip_summary_boilerplate}; use image_api::database::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao}; use std::env; use std::sync::{Arc, Mutex}; diff --git a/src/data/mod.rs b/src/data/mod.rs index fa402b5..4ef8f39 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -167,8 +167,10 @@ pub enum PhotoSize { #[derive(Debug, Deserialize)] pub struct ThumbnailRequest { pub(crate) path: String, + #[allow(dead_code)] // Part of API contract, may be used in future pub(crate) size: Option, #[serde(default)] + #[allow(dead_code)] // Part of API contract, may be used in future pub(crate) format: Option, } diff --git a/src/database/calendar_dao.rs b/src/database/calendar_dao.rs index 63aded5..82eea20 100644 --- a/src/database/calendar_dao.rs +++ b/src/database/calendar_dao.rs @@ -26,6 +26,7 @@ pub struct CalendarEvent { /// Data for inserting a new calendar event #[derive(Clone, Debug)] +#[allow(dead_code)] pub struct InsertCalendarEvent { pub event_uid: Option, pub summary: String, @@ -219,12 +220,13 @@ impl CalendarEventDao for SqliteCalendarEventDao { // Validate embedding dimensions if provided if let Some(ref emb) = event.embedding - && emb.len() != 768 { - return Err(anyhow::anyhow!( - "Invalid embedding dimensions: {} (expected 768)", - emb.len() - )); - } + && emb.len() != 768 + { + return Err(anyhow::anyhow!( + "Invalid embedding dimensions: {} (expected 768)", + emb.len() + )); + } let embedding_bytes = event.embedding.as_ref().map(|e| Self::serialize_vector(e)); @@ -289,13 +291,14 @@ impl CalendarEventDao for SqliteCalendarEventDao { for event in events { // Validate embedding if provided if let Some(ref emb) = event.embedding - && emb.len() != 768 { - log::warn!( - "Skipping event with invalid embedding dimensions: {}", - emb.len() - ); - continue; - } + && emb.len() != 768 + { + log::warn!( + "Skipping event with invalid embedding dimensions: {}", + emb.len() + ); + continue; + } let embedding_bytes = event.embedding.as_ref().map(|e| Self::serialize_vector(e)); diff --git a/src/database/embeddings_dao.rs b/src/database/embeddings_dao.rs deleted file mode 100644 index 5a9df5a..0000000 --- a/src/database/embeddings_dao.rs +++ /dev/null @@ -1,569 +0,0 @@ -use diesel::prelude::*; -use diesel::sqlite::SqliteConnection; -use serde::Serialize; -use std::ops::DerefMut; -use std::sync::{Arc, Mutex}; - -use crate::database::{DbError, DbErrorKind, connect}; -use crate::otel::trace_db_call; - -/// Represents a stored message embedding -#[derive(Serialize, Clone, Debug)] -pub struct MessageEmbedding { - pub id: i32, - pub contact: String, - pub body: String, - pub timestamp: i64, - pub is_sent: bool, - pub created_at: i64, - pub model_version: String, -} - -/// Data for inserting a new message embedding -#[derive(Clone, Debug)] -pub struct InsertMessageEmbedding { - pub contact: String, - pub body: String, - pub timestamp: i64, - pub is_sent: bool, - pub embedding: Vec, - pub created_at: i64, - pub model_version: String, -} - -pub trait EmbeddingDao: Sync + Send { - /// Store a message with its embedding vector - fn store_message_embedding( - &mut self, - context: &opentelemetry::Context, - message: InsertMessageEmbedding, - ) -> Result; - - /// Store multiple messages with embeddings in a single transaction - /// Returns the number of successfully stored messages - fn store_message_embeddings_batch( - &mut self, - context: &opentelemetry::Context, - messages: Vec, - ) -> Result; - - /// Find semantically similar messages using vector similarity search - /// Returns the top `limit` most similar messages - /// If contact_filter is provided, only return messages from that contact - /// Otherwise, search across all contacts for cross-perspective context - fn find_similar_messages( - &mut self, - context: &opentelemetry::Context, - query_embedding: &[f32], - limit: usize, - contact_filter: Option<&str>, - ) -> Result, DbError>; - - /// Get the count of embedded messages for a specific contact - fn get_message_count( - &mut self, - context: &opentelemetry::Context, - contact: &str, - ) -> Result; - - /// Check if embeddings exist for a contact (idempotency check) - fn has_embeddings_for_contact( - &mut self, - context: &opentelemetry::Context, - contact: &str, - ) -> Result; - - /// Check if a specific message already has an embedding - fn message_exists( - &mut self, - context: &opentelemetry::Context, - contact: &str, - body: &str, - timestamp: i64, - ) -> Result; -} - -pub struct SqliteEmbeddingDao { - connection: Arc>, -} - -impl Default for SqliteEmbeddingDao { - fn default() -> Self { - Self::new() - } -} - -impl SqliteEmbeddingDao { - pub fn new() -> Self { - SqliteEmbeddingDao { - connection: Arc::new(Mutex::new(connect())), - } - } - - /// Serialize f32 vector to bytes for BLOB storage - fn serialize_vector(vec: &[f32]) -> Vec { - // Convert f32 slice to bytes using zerocopy - use zerocopy::IntoBytes; - vec.as_bytes().to_vec() - } - - /// Deserialize bytes from BLOB back to f32 vector - fn deserialize_vector(bytes: &[u8]) -> Result, DbError> { - if !bytes.len().is_multiple_of(4) { - return Err(DbError::new(DbErrorKind::QueryError)); - } - - let count = bytes.len() / 4; - let mut vec = Vec::with_capacity(count); - - for chunk in bytes.chunks_exact(4) { - let float = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]); - vec.push(float); - } - - Ok(vec) - } - - /// Compute cosine similarity between two vectors - fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 { - if a.len() != b.len() { - return 0.0; - } - - let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum(); - let magnitude_a: f32 = a.iter().map(|x| x * x).sum::().sqrt(); - let magnitude_b: f32 = b.iter().map(|x| x * x).sum::().sqrt(); - - if magnitude_a == 0.0 || magnitude_b == 0.0 { - return 0.0; - } - - dot_product / (magnitude_a * magnitude_b) - } -} - -impl EmbeddingDao for SqliteEmbeddingDao { - fn store_message_embedding( - &mut self, - context: &opentelemetry::Context, - message: InsertMessageEmbedding, - ) -> Result { - trace_db_call(context, "insert", "store_message_embedding", |_span| { - let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao"); - - // Validate embedding dimensions - if message.embedding.len() != 768 { - return Err(anyhow::anyhow!( - "Invalid embedding dimensions: {} (expected 768)", - message.embedding.len() - )); - } - - // Serialize embedding to bytes - let embedding_bytes = Self::serialize_vector(&message.embedding); - - // Insert into message_embeddings table with BLOB - // Use INSERT OR IGNORE to skip duplicates (based on UNIQUE constraint) - let insert_result = diesel::sql_query( - "INSERT OR IGNORE INTO message_embeddings (contact, body, timestamp, is_sent, embedding, created_at, model_version) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)" - ) - .bind::(&message.contact) - .bind::(&message.body) - .bind::(message.timestamp) - .bind::(message.is_sent) - .bind::(&embedding_bytes) - .bind::(message.created_at) - .bind::(&message.model_version) - .execute(conn.deref_mut()) - .map_err(|e| anyhow::anyhow!("Insert error: {:?}", e))?; - - // If INSERT OR IGNORE skipped (duplicate), find the existing record - let row_id: i32 = if insert_result == 0 { - // Duplicate - find the existing record - diesel::sql_query( - "SELECT id FROM message_embeddings WHERE contact = ?1 AND body = ?2 AND timestamp = ?3" - ) - .bind::(&message.contact) - .bind::(&message.body) - .bind::(message.timestamp) - .get_result::(conn.deref_mut()) - .map(|r| r.id as i32) - .map_err(|e| anyhow::anyhow!("Failed to find existing record: {:?}", e))? - } else { - // New insert - get the last inserted row ID - diesel::sql_query("SELECT last_insert_rowid() as id") - .get_result::(conn.deref_mut()) - .map(|r| r.id as i32) - .map_err(|e| anyhow::anyhow!("Failed to get last insert ID: {:?}", e))? - }; - - // Return the stored message - Ok(MessageEmbedding { - id: row_id, - contact: message.contact, - body: message.body, - timestamp: message.timestamp, - is_sent: message.is_sent, - created_at: message.created_at, - model_version: message.model_version, - }) - }) - .map_err(|_| DbError::new(DbErrorKind::InsertError)) - } - - fn store_message_embeddings_batch( - &mut self, - context: &opentelemetry::Context, - messages: Vec, - ) -> Result { - trace_db_call(context, "insert", "store_message_embeddings_batch", |_span| { - let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao"); - - // Start transaction - conn.transaction::<_, anyhow::Error, _>(|conn| { - let mut stored_count = 0; - - for message in messages { - // Validate embedding dimensions - if message.embedding.len() != 768 { - log::warn!( - "Invalid embedding dimensions: {} (expected 768), skipping", - message.embedding.len() - ); - continue; - } - - // Serialize embedding to bytes - let embedding_bytes = Self::serialize_vector(&message.embedding); - - // Insert into message_embeddings table with BLOB - // Use INSERT OR IGNORE to skip duplicates (based on UNIQUE constraint) - match diesel::sql_query( - "INSERT OR IGNORE INTO message_embeddings (contact, body, timestamp, is_sent, embedding, created_at, model_version) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)" - ) - .bind::(&message.contact) - .bind::(&message.body) - .bind::(message.timestamp) - .bind::(message.is_sent) - .bind::(&embedding_bytes) - .bind::(message.created_at) - .bind::(&message.model_version) - .execute(conn) - { - Ok(rows) if rows > 0 => stored_count += 1, - Ok(_) => { - // INSERT OR IGNORE skipped (duplicate) - log::debug!("Skipped duplicate message: {:?}", message.body.chars().take(50).collect::()); - } - Err(e) => { - log::warn!("Failed to insert message in batch: {:?}", e); - // Continue with other messages instead of failing entire batch - } - } - } - - Ok(stored_count) - }) - .map_err(|e| anyhow::anyhow!("Transaction error: {:?}", e)) - }) - .map_err(|_| DbError::new(DbErrorKind::InsertError)) - } - - fn find_similar_messages( - &mut self, - context: &opentelemetry::Context, - query_embedding: &[f32], - limit: usize, - contact_filter: Option<&str>, - ) -> Result, DbError> { - trace_db_call(context, "query", "find_similar_messages", |_span| { - let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao"); - - // Validate embedding dimensions - if query_embedding.len() != 768 { - return Err(anyhow::anyhow!( - "Invalid query embedding dimensions: {} (expected 768)", - query_embedding.len() - )); - } - - // Load messages with optional contact filter - let results = if let Some(contact) = contact_filter { - log::debug!("RAG search filtered to contact: {}", contact); - diesel::sql_query( - "SELECT id, contact, body, timestamp, is_sent, embedding, created_at, model_version - FROM message_embeddings WHERE contact = ?1" - ) - .bind::(contact) - .load::(conn.deref_mut()) - .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))? - } else { - log::debug!("RAG search across ALL contacts (cross-perspective)"); - diesel::sql_query( - "SELECT id, contact, body, timestamp, is_sent, embedding, created_at, model_version - FROM message_embeddings" - ) - .load::(conn.deref_mut()) - .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))? - }; - - log::debug!("Loaded {} messages for similarity comparison", results.len()); - - // Compute similarity for each message - let mut scored_messages: Vec<(f32, MessageEmbedding)> = results - .into_iter() - .filter_map(|row| { - // Deserialize the embedding BLOB - match Self::deserialize_vector(&row.embedding) { - Ok(embedding) => { - // Compute cosine similarity - let similarity = Self::cosine_similarity(query_embedding, &embedding); - Some(( - similarity, - MessageEmbedding { - id: row.id, - contact: row.contact, - body: row.body, - timestamp: row.timestamp, - is_sent: row.is_sent, - created_at: row.created_at, - model_version: row.model_version, - }, - )) - } - Err(e) => { - log::warn!("Failed to deserialize embedding for message {}: {:?}", row.id, e); - None - } - } - }) - .collect(); - - // Sort by similarity (highest first) - scored_messages.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); - - // Log similarity score distribution - if !scored_messages.is_empty() { - log::info!( - "Similarity score distribution - Top: {:.3}, Median: {:.3}, Bottom: {:.3}", - scored_messages.first().map(|(s, _)| *s).unwrap_or(0.0), - scored_messages.get(scored_messages.len() / 2).map(|(s, _)| *s).unwrap_or(0.0), - scored_messages.last().map(|(s, _)| *s).unwrap_or(0.0) - ); - } - - // Apply minimum similarity threshold - // With single-contact embeddings, scores tend to be higher due to writing style similarity - // Using 0.65 to get only truly semantically relevant messages - let min_similarity = 0.65; - let filtered_messages: Vec<(f32, MessageEmbedding)> = scored_messages - .into_iter() - .filter(|(similarity, _)| *similarity >= min_similarity) - .collect(); - - log::info!( - "After similarity filtering (min_similarity={}): {} messages passed threshold", - min_similarity, - filtered_messages.len() - ); - - // Filter out short/generic messages (under 30 characters) - // This removes conversational closings like "Thanks for talking" that dominate results - let min_message_length = 30; - - // Common closing phrases that should be excluded from RAG results - let stop_phrases = [ - "thanks for talking", - "thank you for talking", - "good talking", - "nice talking", - "good night", - "good morning", - "love you", - ]; - - let filtered_messages: Vec<(f32, MessageEmbedding)> = filtered_messages - .into_iter() - .filter(|(_, message)| { - // Filter by length - if message.body.len() < min_message_length { - return false; - } - - // Filter out messages that are primarily generic closings - let body_lower = message.body.to_lowercase(); - for phrase in &stop_phrases { - // If the message contains this phrase and is short, it's likely just a closing - if body_lower.contains(phrase) && message.body.len() < 100 { - return false; - } - } - - true - }) - .collect(); - - log::info!( - "After length filtering (min {} chars): {} messages remain", - min_message_length, - filtered_messages.len() - ); - - // Apply temporal diversity filter - don't return too many messages from the same day - // This prevents RAG from returning clusters of messages from one conversation - let mut filtered_with_diversity = Vec::new(); - let mut dates_seen: std::collections::HashMap = std::collections::HashMap::new(); - let max_per_day = 3; // Maximum 3 messages from any single day - - for (similarity, message) in filtered_messages.into_iter() { - let date = chrono::DateTime::from_timestamp(message.timestamp, 0) - .map(|dt| dt.date_naive()) - .unwrap_or_else(|| chrono::Utc::now().date_naive()); - - let count = dates_seen.entry(date).or_insert(0); - if *count < max_per_day { - *count += 1; - filtered_with_diversity.push((similarity, message)); - } - } - - log::info!( - "After temporal diversity filtering (max {} per day): {} messages remain", - max_per_day, - filtered_with_diversity.len() - ); - - // Take top N results from diversity-filtered messages - let top_results: Vec = filtered_with_diversity - .into_iter() - .take(limit) - .map(|(similarity, message)| { - let time = chrono::DateTime::from_timestamp(message.timestamp, 0) - .map(|dt| dt.format("%Y-%m-%d").to_string()) - .unwrap_or_default(); - log::info!( - "RAG Match: similarity={:.3}, date={}, contact={}, body=\"{}\"", - similarity, - time, - message.contact, - &message.body.chars().take(80).collect::() - ); - message - }) - .collect(); - - Ok(top_results) - }) - .map_err(|_| DbError::new(DbErrorKind::QueryError)) - } - - fn get_message_count( - &mut self, - context: &opentelemetry::Context, - contact: &str, - ) -> Result { - trace_db_call(context, "query", "get_message_count", |_span| { - let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao"); - - let count = diesel::sql_query( - "SELECT COUNT(*) as count FROM message_embeddings WHERE contact = ?1", - ) - .bind::(contact) - .get_result::(conn.deref_mut()) - .map(|r| r.count) - .map_err(|e| anyhow::anyhow!("Count query error: {:?}", e))?; - - Ok(count) - }) - .map_err(|_| DbError::new(DbErrorKind::QueryError)) - } - - fn has_embeddings_for_contact( - &mut self, - context: &opentelemetry::Context, - contact: &str, - ) -> Result { - self.get_message_count(context, contact) - .map(|count| count > 0) - } - - fn message_exists( - &mut self, - context: &opentelemetry::Context, - contact: &str, - body: &str, - timestamp: i64, - ) -> Result { - trace_db_call(context, "query", "message_exists", |_span| { - let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao"); - - let count = diesel::sql_query( - "SELECT COUNT(*) as count FROM message_embeddings - WHERE contact = ?1 AND body = ?2 AND timestamp = ?3", - ) - .bind::(contact) - .bind::(body) - .bind::(timestamp) - .get_result::(conn.deref_mut()) - .map(|r| r.count) - .map_err(|e| anyhow::anyhow!("Count query error: {:?}", e))?; - - Ok(count > 0) - }) - .map_err(|_| DbError::new(DbErrorKind::QueryError)) - } -} - -// Helper structs for raw SQL queries - -#[derive(QueryableByName)] -struct LastInsertRowId { - #[diesel(sql_type = diesel::sql_types::BigInt)] - id: i64, -} - -#[derive(QueryableByName)] -struct MessageEmbeddingRow { - #[diesel(sql_type = diesel::sql_types::Integer)] - id: i32, - #[diesel(sql_type = diesel::sql_types::Text)] - contact: String, - #[diesel(sql_type = diesel::sql_types::Text)] - body: String, - #[diesel(sql_type = diesel::sql_types::BigInt)] - timestamp: i64, - #[diesel(sql_type = diesel::sql_types::Bool)] - is_sent: bool, - #[diesel(sql_type = diesel::sql_types::BigInt)] - created_at: i64, - #[diesel(sql_type = diesel::sql_types::Text)] - model_version: String, -} - -#[derive(QueryableByName)] -struct MessageEmbeddingWithVectorRow { - #[diesel(sql_type = diesel::sql_types::Integer)] - id: i32, - #[diesel(sql_type = diesel::sql_types::Text)] - contact: String, - #[diesel(sql_type = diesel::sql_types::Text)] - body: String, - #[diesel(sql_type = diesel::sql_types::BigInt)] - timestamp: i64, - #[diesel(sql_type = diesel::sql_types::Bool)] - is_sent: bool, - #[diesel(sql_type = diesel::sql_types::Binary)] - embedding: Vec, - #[diesel(sql_type = diesel::sql_types::BigInt)] - created_at: i64, - #[diesel(sql_type = diesel::sql_types::Text)] - model_version: String, -} - -#[derive(QueryableByName)] -struct CountResult { - #[diesel(sql_type = diesel::sql_types::BigInt)] - count: i64, -} diff --git a/src/database/location_dao.rs b/src/database/location_dao.rs index c4b3989..73e1c10 100644 --- a/src/database/location_dao.rs +++ b/src/database/location_dao.rs @@ -214,12 +214,13 @@ impl LocationHistoryDao for SqliteLocationHistoryDao { // Validate embedding dimensions if provided (rare for location data) if let Some(ref emb) = location.embedding - && emb.len() != 768 { - return Err(anyhow::anyhow!( - "Invalid embedding dimensions: {} (expected 768)", - emb.len() - )); - } + && emb.len() != 768 + { + return Err(anyhow::anyhow!( + "Invalid embedding dimensions: {} (expected 768)", + emb.len() + )); + } let embedding_bytes = location .embedding @@ -289,13 +290,14 @@ impl LocationHistoryDao for SqliteLocationHistoryDao { for location in locations { // Validate embedding if provided (rare) if let Some(ref emb) = location.embedding - && emb.len() != 768 { - log::warn!( - "Skipping location with invalid embedding dimensions: {}", - emb.len() - ); - continue; - } + && emb.len() != 768 + { + log::warn!( + "Skipping location with invalid embedding dimensions: {}", + emb.len() + ); + continue; + } let embedding_bytes = location .embedding diff --git a/src/database/mod.rs b/src/database/mod.rs index bbdf33f..0bfd3f1 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -11,7 +11,6 @@ use crate::otel::trace_db_call; pub mod calendar_dao; pub mod daily_summary_dao; -pub mod embeddings_dao; pub mod insights_dao; pub mod location_dao; pub mod models; @@ -20,7 +19,6 @@ pub mod search_dao; pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao}; pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao}; -pub use embeddings_dao::{EmbeddingDao, InsertMessageEmbedding}; pub use insights_dao::{InsightDao, SqliteInsightDao}; pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao}; pub use search_dao::{SearchHistoryDao, SqliteSearchHistoryDao}; diff --git a/src/files.rs b/src/files.rs index 729e73e..8721568 100644 --- a/src/files.rs +++ b/src/files.rs @@ -233,12 +233,8 @@ pub async fn list_photos( if let (Some(photo_lat), Some(photo_lon)) = (exif.gps_latitude, exif.gps_longitude) { - let distance = haversine_distance( - lat, - lon, - photo_lat as f64, - photo_lon as f64, - ); + let distance = + haversine_distance(lat, lon, photo_lat as f64, photo_lon as f64); distance <= radius_km } else { false @@ -410,9 +406,13 @@ pub async fn list_photos( ) }) .map(|path: &PathBuf| { - let relative = path.strip_prefix(&app_state.base_path).unwrap_or_else(|_| panic!("Unable to strip base path {} from file path {}", + let relative = path.strip_prefix(&app_state.base_path).unwrap_or_else(|_| { + panic!( + "Unable to strip base path {} from file path {}", &app_state.base_path.path(), - path.display())); + path.display() + ) + }); relative.to_path_buf() }) .map(|f| f.to_str().unwrap().to_string()) @@ -791,6 +791,7 @@ pub struct RealFileSystem { } impl RealFileSystem { + #[allow(dead_code)] // Used in main.rs binary and tests pub(crate) fn new(base_path: String) -> RealFileSystem { RealFileSystem { base_path } } diff --git a/src/parsers/location_json_parser.rs b/src/parsers/location_json_parser.rs index 7ca6b87..9e8aa0e 100644 --- a/src/parsers/location_json_parser.rs +++ b/src/parsers/location_json_parser.rs @@ -34,6 +34,7 @@ struct LocationPoint { #[derive(Debug, Deserialize)] struct ActivityRecord { activity: Vec, + #[allow(dead_code)] // Part of JSON structure, may be used in future timestamp_ms: Option, } diff --git a/src/parsers/search_html_parser.rs b/src/parsers/search_html_parser.rs index 91440be..7688185 100644 --- a/src/parsers/search_html_parser.rs +++ b/src/parsers/search_html_parser.rs @@ -30,34 +30,37 @@ pub fn parse_search_html(path: &str) -> Result> { // Strategy 2: Look for outer-cell structure (older format) if records.is_empty() - && let Ok(outer_selector) = Selector::parse("div.outer-cell") { - for cell in document.select(&outer_selector) { - if let Some(record) = parse_outer_cell(&cell) { - records.push(record); - } + && let Ok(outer_selector) = Selector::parse("div.outer-cell") + { + for cell in document.select(&outer_selector) { + if let Some(record) = parse_outer_cell(&cell) { + records.push(record); } } + } // Strategy 3: Generic approach - look for links and timestamps if records.is_empty() - && let Ok(link_selector) = Selector::parse("a") { - for link in document.select(&link_selector) { - if let Some(href) = link.value().attr("href") { - // Check if it's a search URL - if (href.contains("google.com/search?q=") || href.contains("search?q=")) - && let Some(query) = extract_query_from_url(href) { - // Try to find nearby timestamp - let timestamp = find_nearby_timestamp(&link); + && let Ok(link_selector) = Selector::parse("a") + { + for link in document.select(&link_selector) { + if let Some(href) = link.value().attr("href") { + // Check if it's a search URL + if (href.contains("google.com/search?q=") || href.contains("search?q=")) + && let Some(query) = extract_query_from_url(href) + { + // Try to find nearby timestamp + let timestamp = find_nearby_timestamp(&link); - records.push(ParsedSearchRecord { - timestamp: timestamp.unwrap_or_else(|| Utc::now().timestamp()), - query, - search_engine: Some("Google".to_string()), - }); - } + records.push(ParsedSearchRecord { + timestamp: timestamp.unwrap_or_else(|| Utc::now().timestamp()), + query, + search_engine: Some("Google".to_string()), + }); } } } + } Ok(records) } @@ -118,11 +121,12 @@ fn extract_query_from_url(url: &str) -> Option { fn find_nearby_timestamp(element: &scraper::ElementRef) -> Option { // Look for timestamp in parent or sibling elements if let Some(parent) = element.parent() - && parent.value().as_element().is_some() { - let parent_ref = scraper::ElementRef::wrap(parent)?; - let text = parent_ref.text().collect::>().join(" "); - return parse_timestamp_from_text(&text); - } + && parent.value().as_element().is_some() + { + let parent_ref = scraper::ElementRef::wrap(parent)?; + let text = parent_ref.text().collect::>().join(" "); + return parse_timestamp_from_text(&text); + } None } @@ -135,9 +139,10 @@ fn parse_timestamp_from_text(text: &str) -> Option { if let Some(iso_match) = text .split_whitespace() .find(|s| s.contains('T') && s.contains('-')) - && let Ok(dt) = DateTime::parse_from_rfc3339(iso_match) { - return Some(dt.timestamp()); - } + && let Ok(dt) = DateTime::parse_from_rfc3339(iso_match) + { + return Some(dt.timestamp()); + } // Try common date patterns let patterns = [ @@ -149,9 +154,10 @@ fn parse_timestamp_from_text(text: &str) -> Option { for pattern in patterns { // Extract potential date string if let Some(date_part) = extract_date_substring(text) - && let Ok(dt) = NaiveDateTime::parse_from_str(&date_part, pattern) { - return Some(dt.and_utc().timestamp()); - } + && let Ok(dt) = NaiveDateTime::parse_from_str(&date_part, pattern) + { + return Some(dt.and_utc().timestamp()); + } } None