diff --git a/Cargo.lock b/Cargo.lock index 180906e..b964197 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1689,7 +1689,7 @@ dependencies = [ [[package]] name = "image-api" -version = "0.4.1" +version = "0.5.0" dependencies = [ "actix", "actix-cors", @@ -1713,6 +1713,7 @@ dependencies = [ "jsonwebtoken", "kamadak-exif", "lazy_static", + "libsqlite3-sys", "log", "opentelemetry", "opentelemetry-appender-log", @@ -1731,6 +1732,7 @@ dependencies = [ "tokio", "urlencoding", "walkdir", + "zerocopy", ] [[package]] @@ -1943,6 +1945,7 @@ version = "0.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f" dependencies = [ + "cc", "pkg-config", "vcpkg", ] diff --git a/Cargo.toml b/Cargo.toml index c21ba51..27e6e78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "image-api" -version = "0.4.1" +version = "0.5.0" authors = ["Cameron Cordes "] edition = "2024" @@ -23,6 +23,7 @@ jsonwebtoken = "9.3.0" serde = "1" serde_json = "1" diesel = { version = "2.2.10", features = ["sqlite"] } +libsqlite3-sys = { version = "0.35", features = ["bundled"] } diesel_migrations = "2.2.0" chrono = "0.4" clap = { version = "4.5", features = ["derive"] } @@ -50,3 +51,4 @@ regex = "1.11.1" exif = { package = "kamadak-exif", version = "0.6.1" } reqwest = { version = "0.12", features = ["json"] } urlencoding = "2.1" +zerocopy = "0.8" diff --git a/migrations/2026-01-04-000000_add_message_embeddings/down.sql b/migrations/2026-01-04-000000_add_message_embeddings/down.sql new file mode 100644 index 0000000..c8b6965 --- /dev/null +++ b/migrations/2026-01-04-000000_add_message_embeddings/down.sql @@ -0,0 +1,3 @@ +-- Drop tables in reverse order +DROP TABLE IF EXISTS vec_message_embeddings; +DROP TABLE IF EXISTS message_embeddings; diff --git a/migrations/2026-01-04-000000_add_message_embeddings/up.sql b/migrations/2026-01-04-000000_add_message_embeddings/up.sql new file mode 100644 index 0000000..a2fff45 --- /dev/null +++ b/migrations/2026-01-04-000000_add_message_embeddings/up.sql @@ -0,0 +1,19 @@ +-- Table for storing message metadata and embeddings +-- Embeddings stored as BLOB for proof-of-concept +-- For production with many contacts, consider using sqlite-vec extension +CREATE TABLE message_embeddings ( + id INTEGER PRIMARY KEY NOT NULL, + contact TEXT NOT NULL, + body TEXT NOT NULL, + timestamp BIGINT NOT NULL, + is_sent BOOLEAN NOT NULL, + embedding BLOB NOT NULL, + created_at BIGINT NOT NULL, + model_version TEXT NOT NULL, + -- Prevent duplicate embeddings for the same message + UNIQUE(contact, body, timestamp) +); + +-- Indexes for efficient queries +CREATE INDEX idx_message_embeddings_contact ON message_embeddings(contact); +CREATE INDEX idx_message_embeddings_timestamp ON message_embeddings(timestamp); diff --git a/migrations/2026-01-04-060000_add_daily_summaries/down.sql b/migrations/2026-01-04-060000_add_daily_summaries/down.sql new file mode 100644 index 0000000..f142059 --- /dev/null +++ b/migrations/2026-01-04-060000_add_daily_summaries/down.sql @@ -0,0 +1 @@ +DROP TABLE daily_conversation_summaries; diff --git a/migrations/2026-01-04-060000_add_daily_summaries/up.sql b/migrations/2026-01-04-060000_add_daily_summaries/up.sql new file mode 100644 index 0000000..6c47122 --- /dev/null +++ b/migrations/2026-01-04-060000_add_daily_summaries/up.sql @@ -0,0 +1,19 @@ +-- Daily conversation summaries for improved RAG quality +-- Each row = one day's conversation with a contact, summarized by LLM and embedded + +CREATE TABLE daily_conversation_summaries ( + id INTEGER PRIMARY KEY NOT NULL, + date TEXT NOT NULL, -- ISO date "2024-08-15" + contact TEXT NOT NULL, -- Contact name + summary TEXT NOT NULL, -- LLM-generated 3-5 sentence summary + message_count INTEGER NOT NULL, -- Number of messages in this day + embedding BLOB NOT NULL, -- 768-dim vector of the summary + created_at BIGINT NOT NULL, -- When this summary was generated + model_version TEXT NOT NULL, -- "nomic-embed-text:v1.5" + UNIQUE(date, contact) +); + +-- Indexes for efficient querying +CREATE INDEX idx_daily_summaries_date ON daily_conversation_summaries(date); +CREATE INDEX idx_daily_summaries_contact ON daily_conversation_summaries(contact); +CREATE INDEX idx_daily_summaries_date_contact ON daily_conversation_summaries(date, contact); diff --git a/src/ai/daily_summary_job.rs b/src/ai/daily_summary_job.rs new file mode 100644 index 0000000..cd5053b --- /dev/null +++ b/src/ai/daily_summary_job.rs @@ -0,0 +1,289 @@ +use anyhow::Result; +use chrono::{NaiveDate, Utc}; +use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; +use opentelemetry::KeyValue; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use tokio::time::sleep; + +use crate::ai::{OllamaClient, SmsApiClient, SmsMessage}; +use crate::database::{DailySummaryDao, InsertDailySummary}; +use crate::otel::global_tracer; + +/// Generate and embed daily conversation summaries for a date range +/// Default: August 2024 ±30 days (July 1 - September 30, 2024) +pub async fn generate_daily_summaries( + contact: &str, + start_date: Option, + end_date: Option, + ollama: &OllamaClient, + sms_client: &SmsApiClient, + summary_dao: Arc>>, +) -> Result<()> { + let tracer = global_tracer(); + + // Get current context (empty in background task) and start span with it + let current_cx = opentelemetry::Context::current(); + let mut span = tracer.start_with_context("ai.daily_summary.generate_batch", ¤t_cx); + span.set_attribute(KeyValue::new("contact", contact.to_string())); + + // Create context with this span for child operations + let parent_cx = current_cx.with_span(span); + + // Default to August 2024 ±30 days + let start = start_date.unwrap_or_else(|| NaiveDate::from_ymd_opt(2024, 7, 1).unwrap()); + let end = end_date.unwrap_or_else(|| NaiveDate::from_ymd_opt(2024, 9, 30).unwrap()); + + parent_cx.span().set_attribute(KeyValue::new("start_date", start.to_string())); + parent_cx.span().set_attribute(KeyValue::new("end_date", end.to_string())); + parent_cx.span().set_attribute(KeyValue::new("date_range_days", (end - start).num_days() + 1)); + + log::info!( + "========================================"); + log::info!("Starting daily summary generation for {}", contact); + log::info!("Date range: {} to {} ({} days)", + start, end, (end - start).num_days() + 1 + ); + log::info!("========================================"); + + // Fetch all messages for the contact in the date range + log::info!("Fetching messages for date range..."); + let _start_timestamp = start + .and_hms_opt(0, 0, 0) + .unwrap() + .and_utc() + .timestamp(); + let _end_timestamp = end + .and_hms_opt(23, 59, 59) + .unwrap() + .and_utc() + .timestamp(); + + let all_messages = sms_client + .fetch_all_messages_for_contact(contact) + .await?; + + // Filter to date range and group by date + let mut messages_by_date: HashMap> = HashMap::new(); + + for msg in all_messages { + let msg_dt = chrono::DateTime::from_timestamp(msg.timestamp, 0); + if let Some(dt) = msg_dt { + let date = dt.date_naive(); + if date >= start && date <= end { + messages_by_date + .entry(date) + .or_insert_with(Vec::new) + .push(msg); + } + } + } + + log::info!( + "Grouped messages into {} days with activity", + messages_by_date.len() + ); + + if messages_by_date.is_empty() { + log::warn!("No messages found in date range"); + return Ok(()); + } + + // Sort dates for ordered processing + let mut dates: Vec = messages_by_date.keys().cloned().collect(); + dates.sort(); + + let total_days = dates.len(); + let mut processed = 0; + let mut skipped = 0; + let mut failed = 0; + + log::info!("Processing {} days with messages...", total_days); + + for (idx, date) in dates.iter().enumerate() { + let messages = messages_by_date.get(date).unwrap(); + let date_str = date.format("%Y-%m-%d").to_string(); + + // Check if summary already exists + { + let mut dao = summary_dao.lock().expect("Unable to lock DailySummaryDao"); + let otel_context = opentelemetry::Context::new(); + + if dao.summary_exists(&otel_context, &date_str, contact).unwrap_or(false) { + skipped += 1; + if idx % 10 == 0 { + log::info!( + "Progress: {}/{} ({} processed, {} skipped)", + idx + 1, + total_days, + processed, + skipped + ); + } + continue; + } + } + + // Generate summary for this day + match generate_and_store_daily_summary( + &parent_cx, + date, + contact, + messages, + ollama, + summary_dao.clone(), + ) + .await + { + Ok(_) => { + processed += 1; + log::info!( + "✓ {}/{}: {} ({} messages)", + idx + 1, + total_days, + date_str, + messages.len() + ); + } + Err(e) => { + failed += 1; + log::error!("✗ Failed to process {}: {:?}", date_str, e); + } + } + + // Rate limiting: sleep 500ms between summaries + if idx < total_days - 1 { + sleep(std::time::Duration::from_millis(500)).await; + } + + // Progress logging every 10 days + if idx % 10 == 0 && idx > 0 { + log::info!( + "Progress: {}/{} ({} processed, {} skipped, {} failed)", + idx + 1, + total_days, + processed, + skipped, + failed + ); + } + } + + log::info!("========================================"); + log::info!("Daily summary generation complete!"); + log::info!("Processed: {}, Skipped: {}, Failed: {}", processed, skipped, failed); + log::info!("========================================"); + + // Record final metrics in span + parent_cx.span().set_attribute(KeyValue::new("days_processed", processed as i64)); + parent_cx.span().set_attribute(KeyValue::new("days_skipped", skipped as i64)); + parent_cx.span().set_attribute(KeyValue::new("days_failed", failed as i64)); + parent_cx.span().set_attribute(KeyValue::new("total_days", total_days as i64)); + + if failed > 0 { + parent_cx.span().set_status(Status::error(format!("{} days failed to process", failed))); + } else { + parent_cx.span().set_status(Status::Ok); + } + + Ok(()) +} + +/// Generate and store a single day's summary +async fn generate_and_store_daily_summary( + parent_cx: &opentelemetry::Context, + date: &NaiveDate, + contact: &str, + messages: &[SmsMessage], + ollama: &OllamaClient, + summary_dao: Arc>>, +) -> Result<()> { + let tracer = global_tracer(); + let mut span = tracer.start_with_context("ai.daily_summary.generate_single", parent_cx); + span.set_attribute(KeyValue::new("date", date.to_string())); + span.set_attribute(KeyValue::new("contact", contact.to_string())); + span.set_attribute(KeyValue::new("message_count", messages.len() as i64)); + + // Format messages for LLM + let messages_text: String = messages + .iter() + .take(200) // Limit to 200 messages per day to avoid token overflow + .map(|m| { + if m.is_sent { + format!("Me: {}", m.body) + } else { + format!("{}: {}", m.contact, m.body) + } + }) + .collect::>() + .join("\n"); + + let weekday = date.format("%A"); + + let prompt = format!( + r#"Summarize this day's conversation in 3-5 sentences. Focus on: +- Key topics, activities, and events discussed +- Places, people, or organizations mentioned +- Plans made or decisions discussed +- Overall mood or themes of the day + +IMPORTANT: Clearly distinguish between what "I" or "Me" did versus what {} did. +Always explicitly attribute actions, plans, and activities to the correct person. +Use "I" or "Me" for my actions and "{}" for their actions. + +Date: {} ({}) +Messages: +{} + +Write a natural, informative summary with clear subject attribution. +Summary:"#, + contact, + contact, + date.format("%B %d, %Y"), + weekday, + messages_text + ); + + // Generate summary with LLM + let summary = ollama + .generate( + &prompt, + Some("You are a conversation summarizer. Create clear, factual summaries that maintain precise subject attribution - clearly distinguishing who said or did what."), + ) + .await?; + + log::debug!("Generated summary for {}: {}", date, summary.chars().take(100).collect::()); + + span.set_attribute(KeyValue::new("summary_length", summary.len() as i64)); + + // Embed the summary + let embedding = ollama.generate_embedding(&summary).await?; + + span.set_attribute(KeyValue::new("embedding_dimensions", embedding.len() as i64)); + + // Store in database + let insert = InsertDailySummary { + date: date.format("%Y-%m-%d").to_string(), + contact: contact.to_string(), + summary: summary.trim().to_string(), + message_count: messages.len() as i32, + embedding, + created_at: Utc::now().timestamp(), + model_version: "nomic-embed-text:v1.5".to_string(), + }; + + // Create context from current span for DB operation + let child_cx = opentelemetry::Context::current_with_span(span); + + let mut dao = summary_dao.lock().expect("Unable to lock DailySummaryDao"); + let result = dao.store_summary(&child_cx, insert) + .map_err(|e| anyhow::anyhow!("Failed to store summary: {:?}", e)); + + match &result { + Ok(_) => child_cx.span().set_status(Status::Ok), + Err(e) => child_cx.span().set_status(Status::error(e.to_string())), + } + + result?; + Ok(()) +} diff --git a/src/ai/embedding_job.rs b/src/ai/embedding_job.rs new file mode 100644 index 0000000..af5b5fb --- /dev/null +++ b/src/ai/embedding_job.rs @@ -0,0 +1,213 @@ +use anyhow::Result; +use chrono::Utc; +use std::sync::{Arc, Mutex}; +use tokio::time::{sleep, Duration}; + +use crate::ai::{OllamaClient, SmsApiClient}; +use crate::database::{EmbeddingDao, InsertMessageEmbedding}; + +/// Background job to embed messages for a specific contact +/// This function is idempotent - it checks if embeddings already exist before processing +/// +/// # Arguments +/// * `contact` - The contact name to embed messages for (e.g., "Amanda") +/// * `ollama` - Ollama client for generating embeddings +/// * `sms_client` - SMS API client for fetching messages +/// * `embedding_dao` - DAO for storing embeddings in the database +/// +/// # Returns +/// Ok(()) on success, Err on failure +pub async fn embed_contact_messages( + contact: &str, + ollama: &OllamaClient, + sms_client: &SmsApiClient, + embedding_dao: Arc>>, +) -> Result<()> { + log::info!("Starting message embedding job for contact: {}", contact); + + let otel_context = opentelemetry::Context::new(); + + // Check existing embeddings count + let existing_count = { + let mut dao = embedding_dao.lock().expect("Unable to lock EmbeddingDao"); + dao.get_message_count(&otel_context, contact) + .unwrap_or(0) + }; + + if existing_count > 0 { + log::info!( + "Contact '{}' already has {} embeddings, will check for new messages to embed", + contact, + existing_count + ); + } + + log::info!("Fetching all messages for contact: {}", contact); + + // Fetch all messages for the contact + let messages = sms_client + .fetch_all_messages_for_contact(contact) + .await?; + + let total_messages = messages.len(); + log::info!("Fetched {} messages for contact '{}'", total_messages, contact); + + if total_messages == 0 { + log::warn!("No messages found for contact '{}', nothing to embed", contact); + return Ok(()); + } + + // Filter out messages that already have embeddings and short/generic messages + log::info!("Filtering out messages that already have embeddings and short messages..."); + let min_message_length = 30; // Skip short messages like "Thanks!" or "Yeah, it was :)" + let messages_to_embed: Vec<&crate::ai::SmsMessage> = { + let mut dao = embedding_dao.lock().expect("Unable to lock EmbeddingDao"); + messages.iter() + .filter(|msg| { + // Filter out short messages + if msg.body.len() < min_message_length { + return false; + } + // Filter out already embedded messages + !dao.message_exists(&otel_context, contact, &msg.body, msg.timestamp) + .unwrap_or(false) + }) + .collect() + }; + + let skipped = total_messages - messages_to_embed.len(); + let to_embed = messages_to_embed.len(); + + log::info!( + "Found {} messages to embed ({} already embedded)", + to_embed, + skipped + ); + + if to_embed == 0 { + log::info!("All messages already embedded for contact '{}'", contact); + return Ok(()); + } + + // Process messages in batches + let batch_size = 128; // Embed 128 messages per API call + let mut successful = 0; + let mut failed = 0; + + for (batch_idx, batch) in messages_to_embed.chunks(batch_size).enumerate() { + let batch_start = batch_idx * batch_size; + let batch_end = batch_start + batch.len(); + + log::info!( + "Processing batch {}/{}: messages {}-{} ({:.1}% complete)", + batch_idx + 1, + (to_embed + batch_size - 1) / batch_size, + batch_start + 1, + batch_end, + (batch_end as f64 / to_embed as f64) * 100.0 + ); + + match embed_message_batch( + batch, + contact, + ollama, + embedding_dao.clone(), + ) + .await + { + Ok(count) => { + successful += count; + log::debug!("Successfully embedded {} messages in batch", count); + } + Err(e) => { + failed += batch.len(); + log::error!("Failed to embed batch: {:?}", e); + // Continue processing despite failures + } + } + + // Small delay between batches to avoid overwhelming Ollama + if batch_end < to_embed { + sleep(Duration::from_millis(500)).await; + } + } + + log::info!( + "Message embedding job complete for '{}': {}/{} new embeddings created ({} already embedded, {} failed)", + contact, + successful, + total_messages, + skipped, + failed + ); + + if failed > 0 { + log::warn!( + "{} messages failed to embed for contact '{}'", + failed, + contact + ); + } + + Ok(()) +} + +/// Embed a batch of messages using a single API call +/// Returns the number of successfully embedded messages +async fn embed_message_batch( + messages: &[&crate::ai::SmsMessage], + contact: &str, + ollama: &OllamaClient, + embedding_dao: Arc>>, +) -> Result { + if messages.is_empty() { + return Ok(0); + } + + // Collect message bodies for batch embedding + let bodies: Vec<&str> = messages.iter().map(|m| m.body.as_str()).collect(); + + // Generate embeddings for all messages in one API call + let embeddings = ollama.generate_embeddings(&bodies).await?; + + if embeddings.len() != messages.len() { + return Err(anyhow::anyhow!( + "Embedding count mismatch: got {} embeddings for {} messages", + embeddings.len(), + messages.len() + )); + } + + // Build batch of insert records + let otel_context = opentelemetry::Context::new(); + let created_at = Utc::now().timestamp(); + let mut inserts = Vec::with_capacity(messages.len()); + + for (message, embedding) in messages.iter().zip(embeddings.iter()) { + // Validate embedding dimensions + if embedding.len() != 768 { + log::warn!( + "Invalid embedding dimensions: {} (expected 768), skipping", + embedding.len() + ); + continue; + } + + inserts.push(InsertMessageEmbedding { + contact: contact.to_string(), + body: message.body.clone(), + timestamp: message.timestamp, + is_sent: message.is_sent, + embedding: embedding.clone(), + created_at, + model_version: "nomic-embed-text:v1.5".to_string(), + }); + } + + // Store all embeddings in a single transaction + let mut dao = embedding_dao.lock().expect("Unable to lock EmbeddingDao"); + let stored_count = dao.store_message_embeddings_batch(&otel_context, inserts) + .map_err(|e| anyhow::anyhow!("Failed to store embeddings batch: {:?}", e))?; + + Ok(stored_count) +} diff --git a/src/ai/handlers.rs b/src/ai/handlers.rs index a7cfcc2..0e4eab8 100644 --- a/src/ai/handlers.rs +++ b/src/ai/handlers.rs @@ -1,9 +1,12 @@ -use actix_web::{HttpResponse, Responder, delete, get, post, web}; +use actix_web::{HttpRequest, HttpResponse, Responder, delete, get, post, web}; +use opentelemetry::trace::{Span, Status, Tracer}; +use opentelemetry::KeyValue; use serde::{Deserialize, Serialize}; use crate::ai::{InsightGenerator, OllamaClient}; use crate::data::Claims; use crate::database::InsightDao; +use crate::otel::{extract_context_from_request, global_tracer}; use crate::utils::normalize_path; #[derive(Debug, Deserialize)] @@ -45,12 +48,22 @@ pub struct ServerModels { /// POST /insights/generate - Generate insight for a specific photo #[post("/insights/generate")] pub async fn generate_insight_handler( + http_request: HttpRequest, _claims: Claims, request: web::Json, insight_generator: web::Data, ) -> impl Responder { + let parent_context = extract_context_from_request(&http_request); + let tracer = global_tracer(); + let mut span = tracer.start_with_context("http.insights.generate", &parent_context); + let normalized_path = normalize_path(&request.file_path); + span.set_attribute(KeyValue::new("file_path", normalized_path.clone())); + if let Some(ref model) = request.model { + span.set_attribute(KeyValue::new("model", model.clone())); + } + log::info!( "Manual insight generation triggered for photo: {} with model: {:?}", normalized_path, @@ -58,16 +71,21 @@ pub async fn generate_insight_handler( ); // Generate insight with optional custom model - match insight_generator + let result = insight_generator .generate_insight_for_photo_with_model(&normalized_path, request.model.clone()) - .await - { - Ok(()) => HttpResponse::Ok().json(serde_json::json!({ - "success": true, - "message": "Insight generated successfully" - })), + .await; + + match result { + Ok(()) => { + span.set_status(Status::Ok); + HttpResponse::Ok().json(serde_json::json!({ + "success": true, + "message": "Insight generated successfully" + })) + } Err(e) => { log::error!("Failed to generate insight: {:?}", e); + span.set_status(Status::error(e.to_string())); HttpResponse::InternalServerError().json(serde_json::json!({ "error": format!("Failed to generate insight: {:?}", e) })) diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index 5394e2d..4d2ce47 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -1,5 +1,7 @@ use anyhow::Result; use chrono::{DateTime, Utc}; +use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; +use opentelemetry::KeyValue; use serde::Deserialize; use std::fs::File; use std::sync::{Arc, Mutex}; @@ -7,8 +9,9 @@ use std::sync::{Arc, Mutex}; use crate::ai::ollama::OllamaClient; use crate::ai::sms_client::SmsApiClient; use crate::database::models::InsertPhotoInsight; -use crate::database::{ExifDao, InsightDao}; +use crate::database::{DailySummaryDao, ExifDao, InsightDao}; use crate::memories::extract_date_from_filename; +use crate::otel::global_tracer; use crate::utils::normalize_path; #[derive(Deserialize)] @@ -31,6 +34,7 @@ pub struct InsightGenerator { sms_client: SmsApiClient, insight_dao: Arc>>, exif_dao: Arc>>, + daily_summary_dao: Arc>>, base_path: String, } @@ -40,6 +44,7 @@ impl InsightGenerator { sms_client: SmsApiClient, insight_dao: Arc>>, exif_dao: Arc>>, + daily_summary_dao: Arc>>, base_path: String, ) -> Self { Self { @@ -47,6 +52,7 @@ impl InsightGenerator { sms_client, insight_dao, exif_dao, + daily_summary_dao, base_path, } } @@ -72,19 +78,174 @@ impl InsightGenerator { None } + /// Find relevant messages using RAG, excluding recent messages (>30 days ago) + /// This prevents RAG from returning messages already in the immediate time window + async fn find_relevant_messages_rag_historical( + &self, + parent_cx: &opentelemetry::Context, + date: chrono::NaiveDate, + location: Option<&str>, + contact: Option<&str>, + limit: usize, + ) -> Result> { + let tracer = global_tracer(); + let mut span = tracer.start_with_context("ai.rag.filter_historical", parent_cx); + let filter_cx = parent_cx.with_span(span); + + filter_cx.span().set_attribute(KeyValue::new("date", date.to_string())); + filter_cx.span().set_attribute(KeyValue::new("limit", limit as i64)); + filter_cx.span().set_attribute(KeyValue::new("exclusion_window_days", 30)); + + let query_results = self.find_relevant_messages_rag(date, location, contact, limit * 2).await?; + + filter_cx.span().set_attribute(KeyValue::new("rag_results_count", query_results.len() as i64)); + + // Filter out messages from within 30 days of the photo date + let photo_timestamp = date.and_hms_opt(12, 0, 0) + .ok_or_else(|| anyhow::anyhow!("Invalid date"))? + .and_utc() + .timestamp(); + let exclusion_window = 30 * 86400; // 30 days in seconds + + let historical_only: Vec = query_results + .into_iter() + .filter(|msg| { + // Extract date from formatted daily summary "[2024-08-15] Contact ..." + if let Some(bracket_end) = msg.find(']') { + if let Some(date_str) = msg.get(1..bracket_end) { + // Parse just the date (daily summaries don't have time) + if let Ok(msg_date) = chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d") { + let msg_timestamp = msg_date + .and_hms_opt(12, 0, 0) + .unwrap() + .and_utc() + .timestamp(); + let time_diff = (photo_timestamp - msg_timestamp).abs(); + return time_diff > exclusion_window; + } + } + } + false + }) + .take(limit) + .collect(); + + log::info!( + "Found {} historical messages (>30 days from photo date)", + historical_only.len() + ); + + filter_cx.span().set_attribute(KeyValue::new("historical_results_count", historical_only.len() as i64)); + filter_cx.span().set_status(Status::Ok); + + Ok(historical_only) + } + + /// Find relevant daily summaries using RAG (semantic search) + /// Returns formatted daily summary strings for LLM context + async fn find_relevant_messages_rag( + &self, + date: chrono::NaiveDate, + location: Option<&str>, + contact: Option<&str>, + limit: usize, + ) -> Result> { + let tracer = global_tracer(); + let current_cx = opentelemetry::Context::current(); + let mut span = tracer.start_with_context("ai.rag.search_daily_summaries", ¤t_cx); + span.set_attribute(KeyValue::new("date", date.to_string())); + span.set_attribute(KeyValue::new("limit", limit as i64)); + if let Some(loc) = location { + span.set_attribute(KeyValue::new("location", loc.to_string())); + } + if let Some(c) = contact { + span.set_attribute(KeyValue::new("contact", c.to_string())); + } + + // Build more detailed query string from photo context + let mut query_parts = Vec::new(); + + // Add temporal context + query_parts.push(format!("On {}", date.format("%B %d, %Y"))); + + // Add location if available + if let Some(loc) = location { + query_parts.push(format!("at {}", loc)); + } + + // Add contact context if available + if let Some(c) = contact { + query_parts.push(format!("conversation with {}", c)); + } + + // Add day of week for temporal context + let weekday = date.format("%A"); + query_parts.push(format!("it was a {}", weekday)); + + let query = query_parts.join(", "); + + span.set_attribute(KeyValue::new("query", query.clone())); + + // Create context with this span for child operations + let search_cx = current_cx.with_span(span); + + log::info!("========================================"); + log::info!("RAG QUERY: {}", query); + log::info!("========================================"); + + // Generate embedding for the query + let query_embedding = self.ollama.generate_embedding(&query).await?; + + // Search for similar daily summaries + let mut summary_dao = self + .daily_summary_dao + .lock() + .expect("Unable to lock DailySummaryDao"); + + let similar_summaries = summary_dao + .find_similar_summaries(&search_cx, &query_embedding, limit) + .map_err(|e| anyhow::anyhow!("Failed to find similar summaries: {:?}", e))?; + + log::info!("Found {} relevant daily summaries via RAG", similar_summaries.len()); + + search_cx.span().set_attribute(KeyValue::new("results_count", similar_summaries.len() as i64)); + + // Format daily summaries for LLM context + let formatted = similar_summaries + .into_iter() + .map(|s| { + format!( + "[{}] {} ({} messages):\n{}", + s.date, s.contact, s.message_count, s.summary + ) + }) + .collect(); + + search_cx.span().set_status(Status::Ok); + + Ok(formatted) + } + /// Generate AI insight for a single photo with optional custom model pub async fn generate_insight_for_photo_with_model( &self, file_path: &str, custom_model: Option, ) -> Result<()> { + let tracer = global_tracer(); + let current_cx = opentelemetry::Context::current(); + let mut span = tracer.start_with_context("ai.insight.generate", ¤t_cx); + // Normalize path to ensure consistent forward slashes in database let file_path = normalize_path(file_path); log::info!("Generating insight for photo: {}", file_path); + span.set_attribute(KeyValue::new("file_path", file_path.clone())); + // Create custom Ollama client if model is specified let ollama_client = if let Some(model) = custom_model { log::info!("Using custom model: {}", model); + span.set_attribute(KeyValue::new("custom_model", model.clone())); OllamaClient::new( self.ollama.primary_url.clone(), self.ollama.fallback_url.clone(), @@ -92,15 +253,18 @@ impl InsightGenerator { Some(model), // Use the same custom model for fallback server ) } else { + span.set_attribute(KeyValue::new("model", self.ollama.primary_model.clone())); self.ollama.clone() }; + // Create context with this span for child operations + let insight_cx = current_cx.with_span(span); + // 1. Get EXIF data for the photo - let otel_context = opentelemetry::Context::new(); let exif = { let mut exif_dao = self.exif_dao.lock().expect("Unable to lock ExifDao"); exif_dao - .get_exif(&otel_context, &file_path) + .get_exif(&insight_cx, &file_path) .map_err(|e| anyhow::anyhow!("Failed to get EXIF: {:?}", e))? }; @@ -139,47 +303,20 @@ impl InsightGenerator { let contact = Self::extract_contact_from_path(&file_path); log::info!("Extracted contact from path: {:?}", contact); - // 4. Fetch SMS messages for the contact (±1 day) - // Pass the full timestamp for proximity sorting - let sms_messages = self - .sms_client - .fetch_messages_for_contact(contact.as_deref(), timestamp) - .await - .unwrap_or_else(|e| { - log::error!("Failed to fetch SMS messages: {}", e); - Vec::new() - }); + insight_cx.span().set_attribute(KeyValue::new("date_taken", date_taken.to_string())); + if let Some(ref c) = contact { + insight_cx.span().set_attribute(KeyValue::new("contact", c.clone())); + } - log::info!( - "Fetched {} SMS messages closest to {}", - sms_messages.len(), - chrono::DateTime::from_timestamp(timestamp, 0) - .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string()) - .unwrap_or_else(|| "unknown time".to_string()) - ); - - // 5. Summarize SMS context - let sms_summary = if !sms_messages.is_empty() { - match self - .sms_client - .summarize_context(&sms_messages, &ollama_client) - .await - { - Ok(summary) => Some(summary), - Err(e) => { - log::warn!("Failed to summarize SMS context: {}", e); - None - } - } - } else { - None - }; - - // 6. Get location name from GPS coordinates + // 4. Get location name from GPS coordinates (needed for RAG query) let location = match exif { - Some(exif) => { + Some(ref exif) => { if let (Some(lat), Some(lon)) = (exif.gps_latitude, exif.gps_longitude) { - self.reverse_geocode(lat, lon).await + let loc = self.reverse_geocode(lat, lon).await; + if let Some(ref l) = loc { + insight_cx.span().set_attribute(KeyValue::new("location", l.clone())); + } + loc } else { None } @@ -187,11 +324,171 @@ impl InsightGenerator { None => None, }; + // 5. Intelligent retrieval: Hybrid approach for better context + let mut sms_summary = None; + let mut used_rag = false; + + // TEMPORARY: Set to true to disable RAG and use only time-based retrieval for testing + let disable_rag_for_testing = false; + + // Decide strategy based on available metadata + let has_strong_query = location.is_some(); + + if disable_rag_for_testing { + log::warn!("RAG DISABLED FOR TESTING - Using only time-based retrieval (±1 day)"); + // Skip directly to fallback + } else if has_strong_query { + // Strategy A: Pure RAG (we have location for good semantic matching) + log::info!("Using RAG with location-based query"); + match self + .find_relevant_messages_rag( + date_taken, + location.as_deref(), + contact.as_deref(), + 20, + ) + .await + { + Ok(rag_messages) if !rag_messages.is_empty() => { + used_rag = true; + sms_summary = self.summarize_messages(&rag_messages, &ollama_client).await; + } + Ok(_) => log::info!("RAG returned no messages"), + Err(e) => log::warn!("RAG failed: {}", e), + } + } else { + // Strategy B: Expanded immediate context + historical RAG + log::info!("Using expanded immediate context + historical RAG approach"); + + // Step 1: Get FULL immediate temporal context (±1 day, ALL messages) + let immediate_messages = self + .sms_client + .fetch_messages_for_contact(contact.as_deref(), timestamp) + .await + .unwrap_or_else(|e| { + log::error!("Failed to fetch immediate messages: {}", e); + Vec::new() + }); + + log::info!( + "Fetched {} messages from ±1 day window (using ALL for immediate context)", + immediate_messages.len() + ); + + if !immediate_messages.is_empty() { + // Step 2: Extract topics from immediate messages to enrich RAG query + let topics = self.extract_topics_from_messages(&immediate_messages, &ollama_client).await; + + log::info!("Extracted topics for query enrichment: {:?}", topics); + + // Step 3: Try historical RAG (>30 days ago) + match self + .find_relevant_messages_rag_historical( + &insight_cx, + date_taken, + None, + contact.as_deref(), + 10, // Top 10 historical matches + ) + .await + { + Ok(historical_messages) if !historical_messages.is_empty() => { + log::info!( + "Two-context approach: {} immediate (full conversation) + {} historical (similar past moments)", + immediate_messages.len(), + historical_messages.len() + ); + used_rag = true; + + // Step 4: Summarize contexts separately, then combine + let immediate_summary = self + .summarize_context_from_messages(&immediate_messages, &ollama_client) + .await + .unwrap_or_else(|| String::from("No immediate context")); + + let historical_summary = self + .summarize_messages(&historical_messages, &ollama_client) + .await + .unwrap_or_else(|| String::from("No historical context")); + + // Combine summaries + sms_summary = Some(format!( + "Immediate context (±1 day): {}\n\nSimilar moments from the past: {}", + immediate_summary, historical_summary + )); + } + Ok(_) => { + // RAG found no historical matches, just use immediate context + log::info!("No historical RAG matches, using immediate context only"); + sms_summary = self.summarize_context_from_messages(&immediate_messages, &ollama_client).await; + } + Err(e) => { + log::warn!("Historical RAG failed, using immediate context only: {}", e); + sms_summary = self.summarize_context_from_messages(&immediate_messages, &ollama_client).await; + } + } + } else { + log::info!("No immediate messages found, trying basic RAG as fallback"); + // Fallback to basic RAG even without strong query + match self + .find_relevant_messages_rag(date_taken, None, contact.as_deref(), 20) + .await + { + Ok(rag_messages) if !rag_messages.is_empty() => { + used_rag = true; + sms_summary = self.summarize_messages(&rag_messages, &ollama_client).await; + } + _ => {} + } + } + } + + // 6. Fallback to traditional time-based message retrieval if RAG didn't work + if !used_rag { + log::info!("Using traditional time-based message retrieval (±1 day)"); + let sms_messages = self + .sms_client + .fetch_messages_for_contact(contact.as_deref(), timestamp) + .await + .unwrap_or_else(|e| { + log::error!("Failed to fetch SMS messages: {}", e); + Vec::new() + }); + + log::info!( + "Fetched {} SMS messages closest to {}", + sms_messages.len(), + chrono::DateTime::from_timestamp(timestamp, 0) + .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string()) + .unwrap_or_else(|| "unknown time".to_string()) + ); + + // Summarize time-based messages + if !sms_messages.is_empty() { + match self + .sms_client + .summarize_context(&sms_messages, &ollama_client) + .await + { + Ok(summary) => { + sms_summary = Some(summary); + } + Err(e) => { + log::warn!("Failed to summarize SMS context: {}", e); + } + } + } + } + + let retrieval_method = if used_rag { "RAG" } else { "time-based" }; + insight_cx.span().set_attribute(KeyValue::new("retrieval_method", retrieval_method)); + insight_cx.span().set_attribute(KeyValue::new("has_sms_context", sms_summary.is_some())); + log::info!( - "Photo context: date={}, location={:?}, sms_messages={}", + "Photo context: date={}, location={:?}, retrieval_method={}", date_taken, location, - sms_messages.len() + retrieval_method ); // 7. Generate title and summary with Ollama @@ -206,6 +503,9 @@ impl InsightGenerator { log::info!("Generated title: {}", title); log::info!("Generated summary: {}", summary); + insight_cx.span().set_attribute(KeyValue::new("title_length", title.len() as i64)); + insight_cx.span().set_attribute(KeyValue::new("summary_length", summary.len() as i64)); + // 8. Store in database let insight = InsertPhotoInsight { file_path: file_path.to_string(), @@ -216,13 +516,210 @@ impl InsightGenerator { }; let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao"); - dao.store_insight(&otel_context, insight) - .map_err(|e| anyhow::anyhow!("Failed to store insight: {:?}", e))?; + let result = dao.store_insight(&insight_cx, insight) + .map_err(|e| anyhow::anyhow!("Failed to store insight: {:?}", e)); - log::info!("Successfully stored insight for {}", file_path); + match &result { + Ok(_) => { + log::info!("Successfully stored insight for {}", file_path); + insight_cx.span().set_status(Status::Ok); + } + Err(e) => { + log::error!("Failed to store insight: {:?}", e); + insight_cx.span().set_status(Status::error(e.to_string())); + } + } + + result?; Ok(()) } + /// Extract key topics/entities from messages using LLM for query enrichment + async fn extract_topics_from_messages( + &self, + messages: &[crate::ai::SmsMessage], + ollama: &OllamaClient, + ) -> Vec { + if messages.is_empty() { + return Vec::new(); + } + + // Format a sample of messages for topic extraction + let sample_size = messages.len().min(20); + let sample_text: Vec = messages + .iter() + .take(sample_size) + .map(|m| format!("{}: {}", if m.is_sent { "Me" } else { &m.contact }, m.body)) + .collect(); + + let prompt = format!( + r#"Extract important entities from these messages that provide context about what was happening. Focus on: + +1. **People**: Names of specific people mentioned (first names, nicknames) +2. **Places**: Locations, cities, buildings, workplaces, parks, restaurants, venues +3. **Activities**: Specific events, hobbies, groups, organizations (e.g., "drum corps", "auditions") +4. **Unique terms**: Domain-specific words or phrases that might need explanation (e.g., "Hyland", "Vanguard", "DCI") + +Messages: +{} + +Return a comma-separated list of 3-7 specific entities (people, places, activities, unique terms). +Focus on proper nouns and specific terms that provide context. +Return ONLY the comma-separated list, nothing else."#, + sample_text.join("\n") + ); + + match ollama + .generate(&prompt, Some("You are an entity extraction assistant. Extract proper nouns, people, places, and domain-specific terms that provide context.")) + .await + { + Ok(response) => { + // Parse comma-separated topics + response + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty() && s.len() > 1) // Filter out single chars + .take(7) // Increased from 5 to 7 + .collect() + } + Err(e) => { + log::warn!("Failed to extract topics from messages: {}", e); + Vec::new() + } + } + } + + /// Find relevant messages using RAG with topic-enriched query + async fn find_relevant_messages_rag_enriched( + &self, + date: chrono::NaiveDate, + contact: Option<&str>, + topics: &[String], + limit: usize, + ) -> Result> { + // Build enriched query from date + topics + let mut query_parts = Vec::new(); + + query_parts.push(format!("On {}", date.format("%B %d, %Y"))); + + if !topics.is_empty() { + query_parts.push(format!("about {}", topics.join(", "))); + } + + if let Some(c) = contact { + query_parts.push(format!("conversation with {}", c)); + } + + // Add day of week + let weekday = date.format("%A"); + query_parts.push(format!("it was a {}", weekday)); + + let query = query_parts.join(", "); + + log::info!("========================================"); + log::info!("ENRICHED RAG QUERY: {}", query); + log::info!("Extracted topics: {:?}", topics); + log::info!("========================================"); + + // Use existing RAG method with enriched query + self.find_relevant_messages_rag(date, None, contact, limit) + .await + } + + /// Summarize pre-formatted message strings using LLM (concise version for historical context) + async fn summarize_messages( + &self, + messages: &[String], + ollama: &OllamaClient, + ) -> Option { + if messages.is_empty() { + return None; + } + + let messages_text = messages.join("\n"); + + let prompt = format!( + r#"Summarize the context from these messages in 2-3 sentences. Focus on activities, locations, events, and relationships mentioned. + +Messages: +{} + +Return ONLY the summary, nothing else."#, + messages_text + ); + + match ollama + .generate( + &prompt, + Some("You are a context summarization assistant. Be concise and factual."), + ) + .await + { + Ok(summary) => Some(summary), + Err(e) => { + log::warn!("Failed to summarize messages: {}", e); + None + } + } + } + + /// Convert SmsMessage objects to formatted strings and summarize with more detail + /// This is used for immediate context (±1 day) to preserve conversation details + async fn summarize_context_from_messages( + &self, + messages: &[crate::ai::SmsMessage], + ollama: &OllamaClient, + ) -> Option { + if messages.is_empty() { + return None; + } + + // Format messages + let formatted: Vec = messages + .iter() + .map(|m| { + let sender = if m.is_sent { "Me" } else { &m.contact }; + let timestamp = chrono::DateTime::from_timestamp(m.timestamp, 0) + .map(|dt| dt.format("%Y-%m-%d %H:%M").to_string()) + .unwrap_or_else(|| "unknown time".to_string()); + format!("[{}] {}: {}", timestamp, sender, m.body) + }) + .collect(); + + let messages_text = formatted.join("\n"); + + // Use a more detailed prompt for immediate context + let prompt = format!( + r#"Provide a detailed summary of the conversation context from these messages. Include: +- Key activities, events, and plans discussed +- Important locations or places mentioned +- Emotional tone and relationship dynamics +- Any significant details that provide context about what was happening + +Be thorough but organized. Use 1-2 paragraphs. + +Messages: +{} + +Return ONLY the summary, nothing else."#, + messages_text + ); + + match ollama + .generate( + &prompt, + Some("You are a context summarization assistant. Be detailed and factual, preserving important context."), + ) + .await + { + Ok(summary) => Some(summary), + Err(e) => { + log::warn!("Failed to summarize immediate context: {}", e); + None + } + } + } + /// Reverse geocode GPS coordinates to human-readable place names async fn reverse_geocode(&self, lat: f64, lon: f64) -> Option { let url = format!( diff --git a/src/ai/mod.rs b/src/ai/mod.rs index ef0d52b..1f7ddda 100644 --- a/src/ai/mod.rs +++ b/src/ai/mod.rs @@ -1,12 +1,16 @@ +pub mod embedding_job; +pub mod daily_summary_job; pub mod handlers; pub mod insight_generator; pub mod ollama; pub mod sms_client; +pub use embedding_job::embed_contact_messages; +pub use daily_summary_job::generate_daily_summaries; pub use handlers::{ delete_insight_handler, generate_insight_handler, get_all_insights_handler, get_available_models_handler, get_insight_handler, }; pub use insight_generator::InsightGenerator; pub use ollama::OllamaClient; -pub use sms_client::SmsApiClient; +pub use sms_client::{SmsApiClient, SmsMessage}; diff --git a/src/ai/ollama.rs b/src/ai/ollama.rs index bac4c4c..b7ad707 100644 --- a/src/ai/ollama.rs +++ b/src/ai/ollama.rs @@ -226,7 +226,7 @@ Return ONLY the title, nothing else."#, let sms_str = sms_summary.unwrap_or("No messages"); let prompt = format!( - r#"Write a brief 1-2 paragraph description of this moment based on the available information: + r#"Write a 1-3 paragraph description of this moment based on the available information: Date: {} Location: {} @@ -238,10 +238,139 @@ Use only the specific details provided above. Mention people's names, places, or sms_str ); - let system = "You are a memory refreshing assistant. Use only the information provided. Do not invent details. Help me remember this day."; + let system = "You are a memory refreshing assistant who is able to provide insights through analyzing past conversations. Use only the information provided. Do not invent details."; self.generate(&prompt, Some(system)).await } + + /// Generate an embedding vector for text using nomic-embed-text:v1.5 + /// Returns a 768-dimensional vector as Vec + pub async fn generate_embedding(&self, text: &str) -> Result> { + let embeddings = self.generate_embeddings(&[text]).await?; + embeddings.into_iter().next() + .ok_or_else(|| anyhow::anyhow!("No embedding returned")) + } + + /// Generate embeddings for multiple texts in a single API call (batch mode) + /// Returns a vector of 768-dimensional vectors + /// This is much more efficient than calling generate_embedding multiple times + pub async fn generate_embeddings(&self, texts: &[&str]) -> Result>> { + let embedding_model = "nomic-embed-text:v1.5"; + + log::debug!("=== Ollama Batch Embedding Request ==="); + log::debug!("Model: {}", embedding_model); + log::debug!("Batch size: {} texts", texts.len()); + log::debug!("======================================"); + + // Try primary server first + log::debug!( + "Attempting to generate {} embeddings with primary server: {} (model: {})", + texts.len(), + self.primary_url, + embedding_model + ); + let primary_result = self + .try_generate_embeddings(&self.primary_url, embedding_model, texts) + .await; + + let embeddings = match primary_result { + Ok(embeddings) => { + log::debug!("Successfully generated {} embeddings from primary server", embeddings.len()); + embeddings + } + Err(e) => { + log::warn!("Primary server batch embedding failed: {}", e); + + // Try fallback server if available + if let Some(fallback_url) = &self.fallback_url { + log::info!( + "Attempting to generate {} embeddings with fallback server: {} (model: {})", + texts.len(), + fallback_url, + embedding_model + ); + match self + .try_generate_embeddings(fallback_url, embedding_model, texts) + .await + { + Ok(embeddings) => { + log::info!("Successfully generated {} embeddings from fallback server", embeddings.len()); + embeddings + } + Err(fallback_e) => { + log::error!("Fallback server batch embedding also failed: {}", fallback_e); + return Err(anyhow::anyhow!( + "Both primary and fallback servers failed. Primary: {}, Fallback: {}", + e, + fallback_e + )); + } + } + } else { + log::error!("No fallback server configured"); + return Err(e); + } + } + }; + + // Validate embedding dimensions (should be 768 for nomic-embed-text:v1.5) + for (i, embedding) in embeddings.iter().enumerate() { + if embedding.len() != 768 { + log::warn!( + "Unexpected embedding dimensions for item {}: {} (expected 768)", + i, + embedding.len() + ); + } + } + + Ok(embeddings) + } + + /// Internal helper to try generating an embedding from a specific server + async fn try_generate_embedding( + &self, + url: &str, + model: &str, + text: &str, + ) -> Result> { + let embeddings = self.try_generate_embeddings(url, model, &[text]).await?; + embeddings.into_iter().next() + .ok_or_else(|| anyhow::anyhow!("No embedding returned from Ollama")) + } + + /// Internal helper to try generating embeddings for multiple texts from a specific server + async fn try_generate_embeddings( + &self, + url: &str, + model: &str, + texts: &[&str], + ) -> Result>> { + let request = OllamaBatchEmbedRequest { + model: model.to_string(), + input: texts.iter().map(|s| s.to_string()).collect(), + }; + + let response = self + .client + .post(&format!("{}/api/embed", url)) + .json(&request) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let error_body = response.text().await.unwrap_or_default(); + return Err(anyhow::anyhow!( + "Ollama batch embedding request failed: {} - {}", + status, + error_body + )); + } + + let result: OllamaEmbedResponse = response.json().await?; + Ok(result.embeddings) + } } #[derive(Serialize)] @@ -267,3 +396,20 @@ struct OllamaTagsResponse { struct OllamaModel { name: String, } + +#[derive(Serialize)] +struct OllamaEmbedRequest { + model: String, + input: String, +} + +#[derive(Serialize)] +struct OllamaBatchEmbedRequest { + model: String, + input: Vec, +} + +#[derive(Deserialize)] +struct OllamaEmbedResponse { + embeddings: Vec>, +} diff --git a/src/ai/sms_client.rs b/src/ai/sms_client.rs index 154dabc..7919898 100644 --- a/src/ai/sms_client.rs +++ b/src/ai/sms_client.rs @@ -91,6 +91,118 @@ impl SmsApiClient { .await } + /// Fetch all messages for a specific contact across all time + /// Used for embedding generation - retrieves complete message history + /// Handles pagination automatically if the API returns a limited number of results + pub async fn fetch_all_messages_for_contact(&self, contact: &str) -> Result> { + let start_ts = chrono::DateTime::parse_from_rfc3339("2000-01-01T00:00:00Z") + .unwrap() + .timestamp(); + let end_ts = chrono::Utc::now().timestamp(); + + log::info!( + "Fetching all historical messages for contact: {}", + contact + ); + + let mut all_messages = Vec::new(); + let mut offset = 0; + let limit = 1000; // Fetch in batches of 1000 + + loop { + log::debug!("Fetching batch at offset {} for contact {}", offset, contact); + + let batch = self.fetch_messages_paginated( + start_ts, + end_ts, + Some(contact), + None, + limit, + offset + ).await?; + + let batch_size = batch.len(); + all_messages.extend(batch); + + log::debug!("Fetched {} messages (total so far: {})", batch_size, all_messages.len()); + + // If we got fewer messages than the limit, we've reached the end + if batch_size < limit { + break; + } + + offset += limit; + } + + log::info!( + "Fetched {} total messages for contact {}", + all_messages.len(), + contact + ); + + Ok(all_messages) + } + + /// Internal method to fetch messages with pagination support + async fn fetch_messages_paginated( + &self, + start_ts: i64, + end_ts: i64, + contact: Option<&str>, + center_timestamp: Option, + limit: usize, + offset: usize, + ) -> Result> { + let mut url = format!( + "{}/api/messages/by-date-range/?start_date={}&end_date={}&limit={}&offset={}", + self.base_url, start_ts, end_ts, limit, offset + ); + + if let Some(contact_name) = contact { + url.push_str(&format!("&contact={}", urlencoding::encode(contact_name))); + } + + if let Some(ts) = center_timestamp { + url.push_str(&format!("×tamp={}", ts)); + } + + log::debug!("Fetching SMS messages from: {}", url); + + let mut request = self.client.get(&url); + + if let Some(token) = &self.token { + request = request.header("Authorization", format!("Bearer {}", token)); + } + + let response = request.send().await?; + + log::debug!("SMS API response status: {}", response.status()); + + if !response.status().is_success() { + let status = response.status(); + let error_body = response.text().await.unwrap_or_default(); + log::error!("SMS API request failed: {} - {}", status, error_body); + return Err(anyhow::anyhow!( + "SMS API request failed: {} - {}", + status, + error_body + )); + } + + let data: SmsApiResponse = response.json().await?; + + Ok(data + .messages + .into_iter() + .map(|m| SmsMessage { + contact: m.contact_name, + body: m.body, + timestamp: m.date, + is_sent: m.type_ == 2, + }) + .collect()) + } + /// Internal method to fetch messages with optional contact filter and timestamp sorting async fn fetch_messages( &self, diff --git a/src/database/daily_summary_dao.rs b/src/database/daily_summary_dao.rs new file mode 100644 index 0000000..6e399e4 --- /dev/null +++ b/src/database/daily_summary_dao.rs @@ -0,0 +1,338 @@ +use diesel::prelude::*; +use diesel::sqlite::SqliteConnection; +use serde::Serialize; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex}; + +use crate::database::{connect, DbError, DbErrorKind}; +use crate::otel::trace_db_call; + +/// Represents a daily conversation summary +#[derive(Serialize, Clone, Debug)] +pub struct DailySummary { + pub id: i32, + pub date: String, + pub contact: String, + pub summary: String, + pub message_count: i32, + pub created_at: i64, + pub model_version: String, +} + +/// Data for inserting a new daily summary +#[derive(Clone, Debug)] +pub struct InsertDailySummary { + pub date: String, + pub contact: String, + pub summary: String, + pub message_count: i32, + pub embedding: Vec, + pub created_at: i64, + pub model_version: String, +} + +pub trait DailySummaryDao: Sync + Send { + /// Store a daily summary with its embedding + fn store_summary( + &mut self, + context: &opentelemetry::Context, + summary: InsertDailySummary, + ) -> Result; + + /// Find semantically similar daily summaries using vector similarity + fn find_similar_summaries( + &mut self, + context: &opentelemetry::Context, + query_embedding: &[f32], + limit: usize, + ) -> Result, DbError>; + + /// Check if a summary exists for a given date and contact + fn summary_exists( + &mut self, + context: &opentelemetry::Context, + date: &str, + contact: &str, + ) -> Result; + + /// Get count of summaries for a contact + fn get_summary_count( + &mut self, + context: &opentelemetry::Context, + contact: &str, + ) -> Result; +} + +pub struct SqliteDailySummaryDao { + connection: Arc>, +} + +impl Default for SqliteDailySummaryDao { + fn default() -> Self { + Self::new() + } +} + +impl SqliteDailySummaryDao { + pub fn new() -> Self { + SqliteDailySummaryDao { + connection: Arc::new(Mutex::new(connect())), + } + } + + fn serialize_vector(vec: &[f32]) -> Vec { + use zerocopy::IntoBytes; + vec.as_bytes().to_vec() + } + + fn deserialize_vector(bytes: &[u8]) -> Result, DbError> { + if bytes.len() % 4 != 0 { + return Err(DbError::new(DbErrorKind::QueryError)); + } + + let count = bytes.len() / 4; + let mut vec = Vec::with_capacity(count); + + for chunk in bytes.chunks_exact(4) { + let float = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]); + vec.push(float); + } + + Ok(vec) + } + + fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 { + if a.len() != b.len() { + return 0.0; + } + + let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum(); + let magnitude_a: f32 = a.iter().map(|x| x * x).sum::().sqrt(); + let magnitude_b: f32 = b.iter().map(|x| x * x).sum::().sqrt(); + + if magnitude_a == 0.0 || magnitude_b == 0.0 { + return 0.0; + } + + dot_product / (magnitude_a * magnitude_b) + } +} + +impl DailySummaryDao for SqliteDailySummaryDao { + fn store_summary( + &mut self, + context: &opentelemetry::Context, + summary: InsertDailySummary, + ) -> Result { + trace_db_call(context, "insert", "store_summary", |_span| { + let mut conn = self.connection.lock().expect("Unable to get DailySummaryDao"); + + // Validate embedding dimensions + if summary.embedding.len() != 768 { + return Err(anyhow::anyhow!( + "Invalid embedding dimensions: {} (expected 768)", + summary.embedding.len() + )); + } + + let embedding_bytes = Self::serialize_vector(&summary.embedding); + + // INSERT OR REPLACE to handle updates if summary needs regeneration + diesel::sql_query( + "INSERT OR REPLACE INTO daily_conversation_summaries + (date, contact, summary, message_count, embedding, created_at, model_version) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)" + ) + .bind::(&summary.date) + .bind::(&summary.contact) + .bind::(&summary.summary) + .bind::(summary.message_count) + .bind::(&embedding_bytes) + .bind::(summary.created_at) + .bind::(&summary.model_version) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Insert error: {:?}", e))?; + + let row_id: i32 = diesel::sql_query("SELECT last_insert_rowid() as id") + .get_result::(conn.deref_mut()) + .map(|r| r.id as i32) + .map_err(|e| anyhow::anyhow!("Failed to get last insert ID: {:?}", e))?; + + Ok(DailySummary { + id: row_id, + date: summary.date, + contact: summary.contact, + summary: summary.summary, + message_count: summary.message_count, + created_at: summary.created_at, + model_version: summary.model_version, + }) + }) + .map_err(|_| DbError::new(DbErrorKind::InsertError)) + } + + fn find_similar_summaries( + &mut self, + context: &opentelemetry::Context, + query_embedding: &[f32], + limit: usize, + ) -> Result, DbError> { + trace_db_call(context, "query", "find_similar_summaries", |_span| { + let mut conn = self.connection.lock().expect("Unable to get DailySummaryDao"); + + if query_embedding.len() != 768 { + return Err(anyhow::anyhow!( + "Invalid query embedding dimensions: {} (expected 768)", + query_embedding.len() + )); + } + + // Load all summaries with embeddings + let results = diesel::sql_query( + "SELECT id, date, contact, summary, message_count, embedding, created_at, model_version + FROM daily_conversation_summaries" + ) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; + + log::info!("Loaded {} daily summaries for similarity comparison", results.len()); + + // Compute similarity for each summary + let mut scored_summaries: Vec<(f32, DailySummary)> = results + .into_iter() + .filter_map(|row| { + match Self::deserialize_vector(&row.embedding) { + Ok(embedding) => { + let similarity = Self::cosine_similarity(query_embedding, &embedding); + Some(( + similarity, + DailySummary { + id: row.id, + date: row.date, + contact: row.contact, + summary: row.summary, + message_count: row.message_count, + created_at: row.created_at, + model_version: row.model_version, + }, + )) + } + Err(e) => { + log::warn!("Failed to deserialize embedding for summary {}: {:?}", row.id, e); + None + } + } + }) + .collect(); + + // Sort by similarity (highest first) + scored_summaries.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); + + // Log similarity distribution + if !scored_summaries.is_empty() { + log::info!( + "Daily summary similarity - Top: {:.3}, Median: {:.3}, Count: {}", + scored_summaries.first().map(|(s, _)| *s).unwrap_or(0.0), + scored_summaries.get(scored_summaries.len() / 2).map(|(s, _)| *s).unwrap_or(0.0), + scored_summaries.len() + ); + } + + // Take top N and log matches + let top_results: Vec = scored_summaries + .into_iter() + .take(limit) + .map(|(similarity, summary)| { + log::info!( + "Summary match: similarity={:.3}, date={}, contact={}, summary=\"{}\"", + similarity, + summary.date, + summary.contact, + summary.summary.chars().take(100).collect::() + ); + summary + }) + .collect(); + + Ok(top_results) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn summary_exists( + &mut self, + context: &opentelemetry::Context, + date: &str, + contact: &str, + ) -> Result { + trace_db_call(context, "query", "summary_exists", |_span| { + let mut conn = self.connection.lock().expect("Unable to get DailySummaryDao"); + + let count = diesel::sql_query( + "SELECT COUNT(*) as count FROM daily_conversation_summaries + WHERE date = ?1 AND contact = ?2" + ) + .bind::(date) + .bind::(contact) + .get_result::(conn.deref_mut()) + .map(|r| r.count) + .map_err(|e| anyhow::anyhow!("Count query error: {:?}", e))?; + + Ok(count > 0) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn get_summary_count( + &mut self, + context: &opentelemetry::Context, + contact: &str, + ) -> Result { + trace_db_call(context, "query", "get_summary_count", |_span| { + let mut conn = self.connection.lock().expect("Unable to get DailySummaryDao"); + + diesel::sql_query( + "SELECT COUNT(*) as count FROM daily_conversation_summaries WHERE contact = ?1" + ) + .bind::(contact) + .get_result::(conn.deref_mut()) + .map(|r| r.count) + .map_err(|e| anyhow::anyhow!("Count query error: {:?}", e).into()) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } +} + +// Helper structs for raw SQL queries + +#[derive(QueryableByName)] +struct LastInsertRowId { + #[diesel(sql_type = diesel::sql_types::BigInt)] + id: i64, +} + +#[derive(QueryableByName)] +struct DailySummaryWithVectorRow { + #[diesel(sql_type = diesel::sql_types::Integer)] + id: i32, + #[diesel(sql_type = diesel::sql_types::Text)] + date: String, + #[diesel(sql_type = diesel::sql_types::Text)] + contact: String, + #[diesel(sql_type = diesel::sql_types::Text)] + summary: String, + #[diesel(sql_type = diesel::sql_types::Integer)] + message_count: i32, + #[diesel(sql_type = diesel::sql_types::Binary)] + embedding: Vec, + #[diesel(sql_type = diesel::sql_types::BigInt)] + created_at: i64, + #[diesel(sql_type = diesel::sql_types::Text)] + model_version: String, +} + +#[derive(QueryableByName)] +struct CountResult { + #[diesel(sql_type = diesel::sql_types::BigInt)] + count: i64, +} diff --git a/src/database/embeddings_dao.rs b/src/database/embeddings_dao.rs new file mode 100644 index 0000000..48fd458 --- /dev/null +++ b/src/database/embeddings_dao.rs @@ -0,0 +1,569 @@ +use diesel::prelude::*; +use diesel::sqlite::SqliteConnection; +use serde::Serialize; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex}; + +use crate::database::{DbError, DbErrorKind, connect}; +use crate::otel::trace_db_call; + +/// Represents a stored message embedding +#[derive(Serialize, Clone, Debug)] +pub struct MessageEmbedding { + pub id: i32, + pub contact: String, + pub body: String, + pub timestamp: i64, + pub is_sent: bool, + pub created_at: i64, + pub model_version: String, +} + +/// Data for inserting a new message embedding +#[derive(Clone, Debug)] +pub struct InsertMessageEmbedding { + pub contact: String, + pub body: String, + pub timestamp: i64, + pub is_sent: bool, + pub embedding: Vec, + pub created_at: i64, + pub model_version: String, +} + +pub trait EmbeddingDao: Sync + Send { + /// Store a message with its embedding vector + fn store_message_embedding( + &mut self, + context: &opentelemetry::Context, + message: InsertMessageEmbedding, + ) -> Result; + + /// Store multiple messages with embeddings in a single transaction + /// Returns the number of successfully stored messages + fn store_message_embeddings_batch( + &mut self, + context: &opentelemetry::Context, + messages: Vec, + ) -> Result; + + /// Find semantically similar messages using vector similarity search + /// Returns the top `limit` most similar messages + /// If contact_filter is provided, only return messages from that contact + /// Otherwise, search across all contacts for cross-perspective context + fn find_similar_messages( + &mut self, + context: &opentelemetry::Context, + query_embedding: &[f32], + limit: usize, + contact_filter: Option<&str>, + ) -> Result, DbError>; + + /// Get the count of embedded messages for a specific contact + fn get_message_count( + &mut self, + context: &opentelemetry::Context, + contact: &str, + ) -> Result; + + /// Check if embeddings exist for a contact (idempotency check) + fn has_embeddings_for_contact( + &mut self, + context: &opentelemetry::Context, + contact: &str, + ) -> Result; + + /// Check if a specific message already has an embedding + fn message_exists( + &mut self, + context: &opentelemetry::Context, + contact: &str, + body: &str, + timestamp: i64, + ) -> Result; +} + +pub struct SqliteEmbeddingDao { + connection: Arc>, +} + +impl Default for SqliteEmbeddingDao { + fn default() -> Self { + Self::new() + } +} + +impl SqliteEmbeddingDao { + pub fn new() -> Self { + SqliteEmbeddingDao { + connection: Arc::new(Mutex::new(connect())), + } + } + + /// Serialize f32 vector to bytes for BLOB storage + fn serialize_vector(vec: &[f32]) -> Vec { + // Convert f32 slice to bytes using zerocopy + use zerocopy::IntoBytes; + vec.as_bytes().to_vec() + } + + /// Deserialize bytes from BLOB back to f32 vector + fn deserialize_vector(bytes: &[u8]) -> Result, DbError> { + if bytes.len() % 4 != 0 { + return Err(DbError::new(DbErrorKind::QueryError)); + } + + let count = bytes.len() / 4; + let mut vec = Vec::with_capacity(count); + + for chunk in bytes.chunks_exact(4) { + let float = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]); + vec.push(float); + } + + Ok(vec) + } + + /// Compute cosine similarity between two vectors + fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 { + if a.len() != b.len() { + return 0.0; + } + + let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum(); + let magnitude_a: f32 = a.iter().map(|x| x * x).sum::().sqrt(); + let magnitude_b: f32 = b.iter().map(|x| x * x).sum::().sqrt(); + + if magnitude_a == 0.0 || magnitude_b == 0.0 { + return 0.0; + } + + dot_product / (magnitude_a * magnitude_b) + } +} + +impl EmbeddingDao for SqliteEmbeddingDao { + fn store_message_embedding( + &mut self, + context: &opentelemetry::Context, + message: InsertMessageEmbedding, + ) -> Result { + trace_db_call(context, "insert", "store_message_embedding", |_span| { + let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao"); + + // Validate embedding dimensions + if message.embedding.len() != 768 { + return Err(anyhow::anyhow!( + "Invalid embedding dimensions: {} (expected 768)", + message.embedding.len() + )); + } + + // Serialize embedding to bytes + let embedding_bytes = Self::serialize_vector(&message.embedding); + + // Insert into message_embeddings table with BLOB + // Use INSERT OR IGNORE to skip duplicates (based on UNIQUE constraint) + let insert_result = diesel::sql_query( + "INSERT OR IGNORE INTO message_embeddings (contact, body, timestamp, is_sent, embedding, created_at, model_version) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)" + ) + .bind::(&message.contact) + .bind::(&message.body) + .bind::(message.timestamp) + .bind::(message.is_sent) + .bind::(&embedding_bytes) + .bind::(message.created_at) + .bind::(&message.model_version) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Insert error: {:?}", e))?; + + // If INSERT OR IGNORE skipped (duplicate), find the existing record + let row_id: i32 = if insert_result == 0 { + // Duplicate - find the existing record + diesel::sql_query( + "SELECT id FROM message_embeddings WHERE contact = ?1 AND body = ?2 AND timestamp = ?3" + ) + .bind::(&message.contact) + .bind::(&message.body) + .bind::(message.timestamp) + .get_result::(conn.deref_mut()) + .map(|r| r.id as i32) + .map_err(|e| anyhow::anyhow!("Failed to find existing record: {:?}", e))? + } else { + // New insert - get the last inserted row ID + diesel::sql_query("SELECT last_insert_rowid() as id") + .get_result::(conn.deref_mut()) + .map(|r| r.id as i32) + .map_err(|e| anyhow::anyhow!("Failed to get last insert ID: {:?}", e))? + }; + + // Return the stored message + Ok(MessageEmbedding { + id: row_id, + contact: message.contact, + body: message.body, + timestamp: message.timestamp, + is_sent: message.is_sent, + created_at: message.created_at, + model_version: message.model_version, + }) + }) + .map_err(|_| DbError::new(DbErrorKind::InsertError)) + } + + fn store_message_embeddings_batch( + &mut self, + context: &opentelemetry::Context, + messages: Vec, + ) -> Result { + trace_db_call(context, "insert", "store_message_embeddings_batch", |_span| { + let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao"); + + // Start transaction + conn.transaction::<_, anyhow::Error, _>(|conn| { + let mut stored_count = 0; + + for message in messages { + // Validate embedding dimensions + if message.embedding.len() != 768 { + log::warn!( + "Invalid embedding dimensions: {} (expected 768), skipping", + message.embedding.len() + ); + continue; + } + + // Serialize embedding to bytes + let embedding_bytes = Self::serialize_vector(&message.embedding); + + // Insert into message_embeddings table with BLOB + // Use INSERT OR IGNORE to skip duplicates (based on UNIQUE constraint) + match diesel::sql_query( + "INSERT OR IGNORE INTO message_embeddings (contact, body, timestamp, is_sent, embedding, created_at, model_version) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)" + ) + .bind::(&message.contact) + .bind::(&message.body) + .bind::(message.timestamp) + .bind::(message.is_sent) + .bind::(&embedding_bytes) + .bind::(message.created_at) + .bind::(&message.model_version) + .execute(conn) + { + Ok(rows) if rows > 0 => stored_count += 1, + Ok(_) => { + // INSERT OR IGNORE skipped (duplicate) + log::debug!("Skipped duplicate message: {:?}", message.body.chars().take(50).collect::()); + } + Err(e) => { + log::warn!("Failed to insert message in batch: {:?}", e); + // Continue with other messages instead of failing entire batch + } + } + } + + Ok(stored_count) + }) + .map_err(|e| anyhow::anyhow!("Transaction error: {:?}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::InsertError)) + } + + fn find_similar_messages( + &mut self, + context: &opentelemetry::Context, + query_embedding: &[f32], + limit: usize, + contact_filter: Option<&str>, + ) -> Result, DbError> { + trace_db_call(context, "query", "find_similar_messages", |_span| { + let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao"); + + // Validate embedding dimensions + if query_embedding.len() != 768 { + return Err(anyhow::anyhow!( + "Invalid query embedding dimensions: {} (expected 768)", + query_embedding.len() + )); + } + + // Load messages with optional contact filter + let results = if let Some(contact) = contact_filter { + log::debug!("RAG search filtered to contact: {}", contact); + diesel::sql_query( + "SELECT id, contact, body, timestamp, is_sent, embedding, created_at, model_version + FROM message_embeddings WHERE contact = ?1" + ) + .bind::(contact) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))? + } else { + log::debug!("RAG search across ALL contacts (cross-perspective)"); + diesel::sql_query( + "SELECT id, contact, body, timestamp, is_sent, embedding, created_at, model_version + FROM message_embeddings" + ) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))? + }; + + log::debug!("Loaded {} messages for similarity comparison", results.len()); + + // Compute similarity for each message + let mut scored_messages: Vec<(f32, MessageEmbedding)> = results + .into_iter() + .filter_map(|row| { + // Deserialize the embedding BLOB + match Self::deserialize_vector(&row.embedding) { + Ok(embedding) => { + // Compute cosine similarity + let similarity = Self::cosine_similarity(query_embedding, &embedding); + Some(( + similarity, + MessageEmbedding { + id: row.id, + contact: row.contact, + body: row.body, + timestamp: row.timestamp, + is_sent: row.is_sent, + created_at: row.created_at, + model_version: row.model_version, + }, + )) + } + Err(e) => { + log::warn!("Failed to deserialize embedding for message {}: {:?}", row.id, e); + None + } + } + }) + .collect(); + + // Sort by similarity (highest first) + scored_messages.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); + + // Log similarity score distribution + if !scored_messages.is_empty() { + log::info!( + "Similarity score distribution - Top: {:.3}, Median: {:.3}, Bottom: {:.3}", + scored_messages.first().map(|(s, _)| *s).unwrap_or(0.0), + scored_messages.get(scored_messages.len() / 2).map(|(s, _)| *s).unwrap_or(0.0), + scored_messages.last().map(|(s, _)| *s).unwrap_or(0.0) + ); + } + + // Apply minimum similarity threshold + // With single-contact embeddings, scores tend to be higher due to writing style similarity + // Using 0.65 to get only truly semantically relevant messages + let min_similarity = 0.65; + let filtered_messages: Vec<(f32, MessageEmbedding)> = scored_messages + .into_iter() + .filter(|(similarity, _)| *similarity >= min_similarity) + .collect(); + + log::info!( + "After similarity filtering (min_similarity={}): {} messages passed threshold", + min_similarity, + filtered_messages.len() + ); + + // Filter out short/generic messages (under 30 characters) + // This removes conversational closings like "Thanks for talking" that dominate results + let min_message_length = 30; + + // Common closing phrases that should be excluded from RAG results + let stop_phrases = [ + "thanks for talking", + "thank you for talking", + "good talking", + "nice talking", + "good night", + "good morning", + "love you", + ]; + + let filtered_messages: Vec<(f32, MessageEmbedding)> = filtered_messages + .into_iter() + .filter(|(_, message)| { + // Filter by length + if message.body.len() < min_message_length { + return false; + } + + // Filter out messages that are primarily generic closings + let body_lower = message.body.to_lowercase(); + for phrase in &stop_phrases { + // If the message contains this phrase and is short, it's likely just a closing + if body_lower.contains(phrase) && message.body.len() < 100 { + return false; + } + } + + true + }) + .collect(); + + log::info!( + "After length filtering (min {} chars): {} messages remain", + min_message_length, + filtered_messages.len() + ); + + // Apply temporal diversity filter - don't return too many messages from the same day + // This prevents RAG from returning clusters of messages from one conversation + let mut filtered_with_diversity = Vec::new(); + let mut dates_seen: std::collections::HashMap = std::collections::HashMap::new(); + let max_per_day = 3; // Maximum 3 messages from any single day + + for (similarity, message) in filtered_messages.into_iter() { + let date = chrono::DateTime::from_timestamp(message.timestamp, 0) + .map(|dt| dt.date_naive()) + .unwrap_or_else(|| chrono::Utc::now().date_naive()); + + let count = dates_seen.entry(date).or_insert(0); + if *count < max_per_day { + *count += 1; + filtered_with_diversity.push((similarity, message)); + } + } + + log::info!( + "After temporal diversity filtering (max {} per day): {} messages remain", + max_per_day, + filtered_with_diversity.len() + ); + + // Take top N results from diversity-filtered messages + let top_results: Vec = filtered_with_diversity + .into_iter() + .take(limit) + .map(|(similarity, message)| { + let time = chrono::DateTime::from_timestamp(message.timestamp, 0) + .map(|dt| dt.format("%Y-%m-%d").to_string()) + .unwrap_or_default(); + log::info!( + "RAG Match: similarity={:.3}, date={}, contact={}, body=\"{}\"", + similarity, + time, + message.contact, + &message.body.chars().take(80).collect::() + ); + message + }) + .collect(); + + Ok(top_results) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn get_message_count( + &mut self, + context: &opentelemetry::Context, + contact: &str, + ) -> Result { + trace_db_call(context, "query", "get_message_count", |_span| { + let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao"); + + let count = diesel::sql_query( + "SELECT COUNT(*) as count FROM message_embeddings WHERE contact = ?1" + ) + .bind::(contact) + .get_result::(conn.deref_mut()) + .map(|r| r.count) + .map_err(|e| anyhow::anyhow!("Count query error: {:?}", e))?; + + Ok(count) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn has_embeddings_for_contact( + &mut self, + context: &opentelemetry::Context, + contact: &str, + ) -> Result { + self.get_message_count(context, contact) + .map(|count| count > 0) + } + + fn message_exists( + &mut self, + context: &opentelemetry::Context, + contact: &str, + body: &str, + timestamp: i64, + ) -> Result { + trace_db_call(context, "query", "message_exists", |_span| { + let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao"); + + let count = diesel::sql_query( + "SELECT COUNT(*) as count FROM message_embeddings + WHERE contact = ?1 AND body = ?2 AND timestamp = ?3" + ) + .bind::(contact) + .bind::(body) + .bind::(timestamp) + .get_result::(conn.deref_mut()) + .map(|r| r.count) + .map_err(|e| anyhow::anyhow!("Count query error: {:?}", e))?; + + Ok(count > 0) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } +} + +// Helper structs for raw SQL queries + +#[derive(QueryableByName)] +struct LastInsertRowId { + #[diesel(sql_type = diesel::sql_types::BigInt)] + id: i64, +} + +#[derive(QueryableByName)] +struct MessageEmbeddingRow { + #[diesel(sql_type = diesel::sql_types::Integer)] + id: i32, + #[diesel(sql_type = diesel::sql_types::Text)] + contact: String, + #[diesel(sql_type = diesel::sql_types::Text)] + body: String, + #[diesel(sql_type = diesel::sql_types::BigInt)] + timestamp: i64, + #[diesel(sql_type = diesel::sql_types::Bool)] + is_sent: bool, + #[diesel(sql_type = diesel::sql_types::BigInt)] + created_at: i64, + #[diesel(sql_type = diesel::sql_types::Text)] + model_version: String, +} + +#[derive(QueryableByName)] +struct MessageEmbeddingWithVectorRow { + #[diesel(sql_type = diesel::sql_types::Integer)] + id: i32, + #[diesel(sql_type = diesel::sql_types::Text)] + contact: String, + #[diesel(sql_type = diesel::sql_types::Text)] + body: String, + #[diesel(sql_type = diesel::sql_types::BigInt)] + timestamp: i64, + #[diesel(sql_type = diesel::sql_types::Bool)] + is_sent: bool, + #[diesel(sql_type = diesel::sql_types::Binary)] + embedding: Vec, + #[diesel(sql_type = diesel::sql_types::BigInt)] + created_at: i64, + #[diesel(sql_type = diesel::sql_types::Text)] + model_version: String, +} + +#[derive(QueryableByName)] +struct CountResult { + #[diesel(sql_type = diesel::sql_types::BigInt)] + count: i64, +} diff --git a/src/database/mod.rs b/src/database/mod.rs index 759d5f4..e27d1ed 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -9,11 +9,15 @@ use crate::database::models::{ }; use crate::otel::trace_db_call; +pub mod embeddings_dao; +pub mod daily_summary_dao; pub mod insights_dao; pub mod models; pub mod schema; +pub use embeddings_dao::{EmbeddingDao, InsertMessageEmbedding, SqliteEmbeddingDao}; pub use insights_dao::{InsightDao, SqliteInsightDao}; +pub use daily_summary_dao::{DailySummaryDao, SqliteDailySummaryDao, DailySummary, InsertDailySummary}; pub trait UserDao { fn create_user(&mut self, user: &str, password: &str) -> Option; diff --git a/src/main.rs b/src/main.rs index f481107..956675a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -718,7 +718,7 @@ fn main() -> std::io::Result<()> { } create_thumbnails(); - generate_video_gifs().await; + // generate_video_gifs().await; let app_data = Data::new(AppState::default()); @@ -742,6 +742,50 @@ fn main() -> std::io::Result<()> { directory: app_state.base_path.clone(), }); + // Spawn background job to generate daily conversation summaries + { + use crate::ai::generate_daily_summaries; + use crate::database::{DailySummaryDao, SqliteDailySummaryDao}; + use chrono::NaiveDate; + + // Configure date range for summary generation + // Default: August 2024 ±30 days (July 1 - September 30, 2024) + // To expand: change start_date and end_date + let start_date = Some(NaiveDate::from_ymd_opt(2015, 10, 1).unwrap()); + let end_date = Some(NaiveDate::from_ymd_opt(2020, 1, 1).unwrap()); + + let contacts_to_summarize = vec!["Domenique", "Zach", "Paul"]; // Add more contacts as needed + + let ollama = app_state.ollama.clone(); + let sms_client = app_state.sms_client.clone(); + + for contact in contacts_to_summarize { + let ollama_clone = ollama.clone(); + let sms_client_clone = sms_client.clone(); + let summary_dao: Arc>> = + Arc::new(Mutex::new(Box::new(SqliteDailySummaryDao::new()))); + + let start = start_date; + let end = end_date; + + tokio::spawn(async move { + log::info!("Starting daily summary generation for {}", contact); + if let Err(e) = generate_daily_summaries( + contact, + start, + end, + &ollama_clone, + &sms_client_clone, + summary_dao + ).await { + log::error!("Daily summary generation failed for {}: {:?}", contact, e); + } else { + log::info!("Daily summary generation completed for {}", contact); + } + }); + } + } + HttpServer::new(move || { let user_dao = SqliteUserDao::new(); let favorites_dao = SqliteFavoriteDao::new(); diff --git a/src/state.rs b/src/state.rs index 40f33af..6dff518 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,5 +1,5 @@ use crate::ai::{InsightGenerator, OllamaClient, SmsApiClient}; -use crate::database::{ExifDao, InsightDao, SqliteExifDao, SqliteInsightDao}; +use crate::database::{DailySummaryDao, ExifDao, InsightDao, SqliteDailySummaryDao, SqliteExifDao, SqliteInsightDao}; use crate::video::actors::{PlaylistGenerator, StreamActor, VideoPlaylistManager}; use actix::{Actor, Addr}; use std::env; @@ -91,6 +91,8 @@ impl Default for AppState { Arc::new(Mutex::new(Box::new(SqliteInsightDao::new()))); let exif_dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteExifDao::new()))); + let daily_summary_dao: Arc>> = + Arc::new(Mutex::new(Box::new(SqliteDailySummaryDao::new()))); // Load base path let base_path = env::var("BASE_PATH").expect("BASE_PATH was not set in the env"); @@ -101,6 +103,7 @@ impl Default for AppState { sms_client.clone(), insight_dao.clone(), exif_dao.clone(), + daily_summary_dao.clone(), base_path.clone(), ); @@ -147,6 +150,8 @@ impl AppState { Arc::new(Mutex::new(Box::new(SqliteInsightDao::new()))); let exif_dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteExifDao::new()))); + let daily_summary_dao: Arc>> = + Arc::new(Mutex::new(Box::new(SqliteDailySummaryDao::new()))); // Initialize test InsightGenerator let base_path_str = base_path.to_string_lossy().to_string(); @@ -155,6 +160,7 @@ impl AppState { sms_client.clone(), insight_dao.clone(), exif_dao.clone(), + daily_summary_dao.clone(), base_path_str.clone(), );