use anyhow::Result; use chrono::{NaiveDate, Utc}; use opentelemetry::KeyValue; use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; use tokio::time::sleep; use crate::ai::{OllamaClient, SmsApiClient, SmsMessage}; use crate::database::{DailySummaryDao, InsertDailySummary}; use crate::otel::global_tracer; /// Strip boilerplate prefixes and common phrases from summaries before embedding. /// This improves embedding diversity by removing structural similarity. pub fn strip_summary_boilerplate(summary: &str) -> String { let mut text = summary.trim().to_string(); // Remove markdown headers while text.starts_with('#') { if let Some(pos) = text.find('\n') { text = text[pos..].trim_start().to_string(); } else { // Single line with just headers, try to extract content after #s text = text.trim_start_matches('#').trim().to_string(); break; } } // Remove "Summary:" prefix variations (with optional markdown bold) let prefixes = [ "**Summary:**", "**Summary**:", "*Summary:*", "Summary:", "**summary:**", "summary:", ]; for prefix in prefixes { if text.to_lowercase().starts_with(&prefix.to_lowercase()) { text = text[prefix.len()..].trim_start().to_string(); break; } } // Remove common opening phrases that add no semantic value let opening_phrases = [ "Today, Melissa and I discussed", "Today, Amanda and I discussed", "Today Melissa and I discussed", "Today Amanda and I discussed", "Melissa and I discussed", "Amanda and I discussed", "Today, I discussed", "Today I discussed", "The conversation covered", "This conversation covered", "In this conversation,", "During this conversation,", ]; for phrase in opening_phrases { if text.to_lowercase().starts_with(&phrase.to_lowercase()) { text = text[phrase.len()..].trim_start().to_string(); // Remove leading punctuation/articles after stripping phrase text = text .trim_start_matches([',', ':', '-']) .trim_start() .to_string(); break; } } // Remove any remaining leading markdown bold markers if text.starts_with("**") && let Some(end) = text[2..].find("**") { // Keep the content between ** but remove the markers let bold_content = &text[2..2 + end]; text = format!("{}{}", bold_content, &text[4 + end..]); } text.trim().to_string() } /// Generate and embed daily conversation summaries for a date range /// Default: August 2024 ±30 days (July 1 - September 30, 2024) pub async fn generate_daily_summaries( contact: &str, start_date: Option, end_date: Option, ollama: &OllamaClient, sms_client: &SmsApiClient, summary_dao: Arc>>, ) -> Result<()> { let tracer = global_tracer(); // Get current context (empty in background task) and start span with it let current_cx = opentelemetry::Context::current(); let mut span = tracer.start_with_context("ai.daily_summary.generate_batch", ¤t_cx); span.set_attribute(KeyValue::new("contact", contact.to_string())); // Create context with this span for child operations let parent_cx = current_cx.with_span(span); // Default to August 2024 ±30 days let start = start_date.unwrap_or_else(|| NaiveDate::from_ymd_opt(2024, 7, 1).unwrap()); let end = end_date.unwrap_or_else(|| NaiveDate::from_ymd_opt(2024, 9, 30).unwrap()); parent_cx .span() .set_attribute(KeyValue::new("start_date", start.to_string())); parent_cx .span() .set_attribute(KeyValue::new("end_date", end.to_string())); parent_cx.span().set_attribute(KeyValue::new( "date_range_days", (end - start).num_days() + 1, )); log::info!("========================================"); log::info!("Starting daily summary generation for {}", contact); log::info!( "Date range: {} to {} ({} days)", start, end, (end - start).num_days() + 1 ); log::info!("========================================"); // Fetch all messages for the contact in the date range log::info!("Fetching messages for date range..."); let _start_timestamp = start.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp(); let _end_timestamp = end.and_hms_opt(23, 59, 59).unwrap().and_utc().timestamp(); let all_messages = sms_client.fetch_all_messages_for_contact(contact).await?; // Filter to date range and group by date let mut messages_by_date: HashMap> = HashMap::new(); for msg in all_messages { let msg_dt = chrono::DateTime::from_timestamp(msg.timestamp, 0); if let Some(dt) = msg_dt { let date = dt.date_naive(); if date >= start && date <= end { messages_by_date.entry(date).or_default().push(msg); } } } log::info!( "Grouped messages into {} days with activity", messages_by_date.len() ); if messages_by_date.is_empty() { log::warn!("No messages found in date range"); return Ok(()); } // Sort dates for ordered processing let mut dates: Vec = messages_by_date.keys().cloned().collect(); dates.sort(); let total_days = dates.len(); let mut processed = 0; let mut skipped = 0; let mut failed = 0; log::info!("Processing {} days with messages...", total_days); for (idx, date) in dates.iter().enumerate() { let messages = messages_by_date.get(date).unwrap(); let date_str = date.format("%Y-%m-%d").to_string(); // Check if summary already exists { let mut dao = summary_dao.lock().expect("Unable to lock DailySummaryDao"); let otel_context = opentelemetry::Context::new(); if dao .summary_exists(&otel_context, &date_str, contact) .unwrap_or(false) { skipped += 1; if idx % 10 == 0 { log::info!( "Progress: {}/{} ({} processed, {} skipped)", idx + 1, total_days, processed, skipped ); } continue; } } // Generate summary for this day match generate_and_store_daily_summary( &parent_cx, date, contact, messages, ollama, summary_dao.clone(), ) .await { Ok(_) => { processed += 1; log::info!( "✓ {}/{}: {} ({} messages)", idx + 1, total_days, date_str, messages.len() ); } Err(e) => { failed += 1; log::error!("✗ Failed to process {}: {:?}", date_str, e); } } // Rate limiting: sleep 500ms between summaries if idx < total_days - 1 { sleep(std::time::Duration::from_millis(500)).await; } // Progress logging every 10 days if idx % 10 == 0 && idx > 0 { log::info!( "Progress: {}/{} ({} processed, {} skipped, {} failed)", idx + 1, total_days, processed, skipped, failed ); } } log::info!("========================================"); log::info!("Daily summary generation complete!"); log::info!( "Processed: {}, Skipped: {}, Failed: {}", processed, skipped, failed ); log::info!("========================================"); // Record final metrics in span parent_cx .span() .set_attribute(KeyValue::new("days_processed", processed as i64)); parent_cx .span() .set_attribute(KeyValue::new("days_skipped", skipped as i64)); parent_cx .span() .set_attribute(KeyValue::new("days_failed", failed as i64)); parent_cx .span() .set_attribute(KeyValue::new("total_days", total_days as i64)); if failed > 0 { parent_cx .span() .set_status(Status::error(format!("{} days failed to process", failed))); } else { parent_cx.span().set_status(Status::Ok); } Ok(()) } /// Generate and store a single day's summary async fn generate_and_store_daily_summary( parent_cx: &opentelemetry::Context, date: &NaiveDate, contact: &str, messages: &[SmsMessage], ollama: &OllamaClient, summary_dao: Arc>>, ) -> Result<()> { let tracer = global_tracer(); let mut span = tracer.start_with_context("ai.daily_summary.generate_single", parent_cx); span.set_attribute(KeyValue::new("date", date.to_string())); span.set_attribute(KeyValue::new("contact", contact.to_string())); span.set_attribute(KeyValue::new("message_count", messages.len() as i64)); // Format messages for LLM let messages_text: String = messages .iter() .take(200) // Limit to 200 messages per day to avoid token overflow .map(|m| { if m.is_sent { format!("Me: {}", m.body) } else { format!("{}: {}", m.contact, m.body) } }) .collect::>() .join("\n"); let weekday = date.format("%A"); let prompt = format!( r#"Summarize this day's conversation between me and {}. CRITICAL FORMAT RULES: - Do NOT start with "Based on the conversation..." or "Here is a summary..." or similar preambles - Do NOT repeat the date at the beginning - Start DIRECTLY with the content - begin with a person's name or action - Write in past tense, as if recording what happened NARRATIVE (3-5 sentences): - What specific topics, activities, or events were discussed? - What places, people, or organizations were mentioned? - What plans were made or decisions discussed? - Clearly distinguish between what "I" did versus what {} did KEYWORDS (comma-separated): 5-10 specific keywords that capture this conversation's unique content: - Proper nouns (people, places, brands) - Specific activities ("drum corps audition" not just "music") - Distinctive terms that make this day unique Date: {} ({}) Messages: {} YOUR RESPONSE (follow this format EXACTLY): Summary: [Start directly with content, NO preamble] Keywords: [specific, unique terms]"#, contact, contact, date.format("%B %d, %Y"), weekday, messages_text ); // Generate summary with LLM let summary = ollama .generate( &prompt, Some("You are a conversation summarizer. Create clear, factual summaries with precise subject attribution AND extract distinctive keywords. Focus on specific, unique terms that differentiate this conversation from others."), ) .await?; log::debug!( "Generated summary for {}: {}", date, summary.chars().take(100).collect::() ); span.set_attribute(KeyValue::new("summary_length", summary.len() as i64)); // Strip boilerplate before embedding to improve vector diversity let stripped_summary = strip_summary_boilerplate(&summary); log::debug!( "Stripped summary for embedding: {}", stripped_summary.chars().take(100).collect::() ); // Embed the stripped summary (store original summary in DB) let embedding = ollama.generate_embedding(&stripped_summary).await?; span.set_attribute(KeyValue::new( "embedding_dimensions", embedding.len() as i64, )); // Store in database let insert = InsertDailySummary { date: date.format("%Y-%m-%d").to_string(), contact: contact.to_string(), summary: summary.trim().to_string(), message_count: messages.len() as i32, embedding, created_at: Utc::now().timestamp(), // model_version: "nomic-embed-text:v1.5".to_string(), model_version: "mxbai-embed-large:335m".to_string(), }; // Create context from current span for DB operation let child_cx = opentelemetry::Context::current_with_span(span); let mut dao = summary_dao.lock().expect("Unable to lock DailySummaryDao"); let result = dao .store_summary(&child_cx, insert) .map_err(|e| anyhow::anyhow!("Failed to store summary: {:?}", e)); match &result { Ok(_) => child_cx.span().set_status(Status::Ok), Err(e) => child_cx.span().set_status(Status::error(e.to_string())), } result?; Ok(()) }