Enhanced Insights with daily summary embeddings
Bump to 0.5.0. Added daily summary generation job
This commit is contained in:
289
src/ai/daily_summary_job.rs
Normal file
289
src/ai/daily_summary_job.rs
Normal file
@@ -0,0 +1,289 @@
|
||||
use anyhow::Result;
|
||||
use chrono::{NaiveDate, Utc};
|
||||
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
|
||||
use opentelemetry::KeyValue;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::ai::{OllamaClient, SmsApiClient, SmsMessage};
|
||||
use crate::database::{DailySummaryDao, InsertDailySummary};
|
||||
use crate::otel::global_tracer;
|
||||
|
||||
/// Generate and embed daily conversation summaries for a date range
|
||||
/// Default: August 2024 ±30 days (July 1 - September 30, 2024)
|
||||
pub async fn generate_daily_summaries(
|
||||
contact: &str,
|
||||
start_date: Option<NaiveDate>,
|
||||
end_date: Option<NaiveDate>,
|
||||
ollama: &OllamaClient,
|
||||
sms_client: &SmsApiClient,
|
||||
summary_dao: Arc<Mutex<Box<dyn DailySummaryDao>>>,
|
||||
) -> Result<()> {
|
||||
let tracer = global_tracer();
|
||||
|
||||
// Get current context (empty in background task) and start span with it
|
||||
let current_cx = opentelemetry::Context::current();
|
||||
let mut span = tracer.start_with_context("ai.daily_summary.generate_batch", ¤t_cx);
|
||||
span.set_attribute(KeyValue::new("contact", contact.to_string()));
|
||||
|
||||
// Create context with this span for child operations
|
||||
let parent_cx = current_cx.with_span(span);
|
||||
|
||||
// Default to August 2024 ±30 days
|
||||
let start = start_date.unwrap_or_else(|| NaiveDate::from_ymd_opt(2024, 7, 1).unwrap());
|
||||
let end = end_date.unwrap_or_else(|| NaiveDate::from_ymd_opt(2024, 9, 30).unwrap());
|
||||
|
||||
parent_cx.span().set_attribute(KeyValue::new("start_date", start.to_string()));
|
||||
parent_cx.span().set_attribute(KeyValue::new("end_date", end.to_string()));
|
||||
parent_cx.span().set_attribute(KeyValue::new("date_range_days", (end - start).num_days() + 1));
|
||||
|
||||
log::info!(
|
||||
"========================================");
|
||||
log::info!("Starting daily summary generation for {}", contact);
|
||||
log::info!("Date range: {} to {} ({} days)",
|
||||
start, end, (end - start).num_days() + 1
|
||||
);
|
||||
log::info!("========================================");
|
||||
|
||||
// Fetch all messages for the contact in the date range
|
||||
log::info!("Fetching messages for date range...");
|
||||
let _start_timestamp = start
|
||||
.and_hms_opt(0, 0, 0)
|
||||
.unwrap()
|
||||
.and_utc()
|
||||
.timestamp();
|
||||
let _end_timestamp = end
|
||||
.and_hms_opt(23, 59, 59)
|
||||
.unwrap()
|
||||
.and_utc()
|
||||
.timestamp();
|
||||
|
||||
let all_messages = sms_client
|
||||
.fetch_all_messages_for_contact(contact)
|
||||
.await?;
|
||||
|
||||
// Filter to date range and group by date
|
||||
let mut messages_by_date: HashMap<NaiveDate, Vec<SmsMessage>> = HashMap::new();
|
||||
|
||||
for msg in all_messages {
|
||||
let msg_dt = chrono::DateTime::from_timestamp(msg.timestamp, 0);
|
||||
if let Some(dt) = msg_dt {
|
||||
let date = dt.date_naive();
|
||||
if date >= start && date <= end {
|
||||
messages_by_date
|
||||
.entry(date)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log::info!(
|
||||
"Grouped messages into {} days with activity",
|
||||
messages_by_date.len()
|
||||
);
|
||||
|
||||
if messages_by_date.is_empty() {
|
||||
log::warn!("No messages found in date range");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Sort dates for ordered processing
|
||||
let mut dates: Vec<NaiveDate> = messages_by_date.keys().cloned().collect();
|
||||
dates.sort();
|
||||
|
||||
let total_days = dates.len();
|
||||
let mut processed = 0;
|
||||
let mut skipped = 0;
|
||||
let mut failed = 0;
|
||||
|
||||
log::info!("Processing {} days with messages...", total_days);
|
||||
|
||||
for (idx, date) in dates.iter().enumerate() {
|
||||
let messages = messages_by_date.get(date).unwrap();
|
||||
let date_str = date.format("%Y-%m-%d").to_string();
|
||||
|
||||
// Check if summary already exists
|
||||
{
|
||||
let mut dao = summary_dao.lock().expect("Unable to lock DailySummaryDao");
|
||||
let otel_context = opentelemetry::Context::new();
|
||||
|
||||
if dao.summary_exists(&otel_context, &date_str, contact).unwrap_or(false) {
|
||||
skipped += 1;
|
||||
if idx % 10 == 0 {
|
||||
log::info!(
|
||||
"Progress: {}/{} ({} processed, {} skipped)",
|
||||
idx + 1,
|
||||
total_days,
|
||||
processed,
|
||||
skipped
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Generate summary for this day
|
||||
match generate_and_store_daily_summary(
|
||||
&parent_cx,
|
||||
date,
|
||||
contact,
|
||||
messages,
|
||||
ollama,
|
||||
summary_dao.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
processed += 1;
|
||||
log::info!(
|
||||
"✓ {}/{}: {} ({} messages)",
|
||||
idx + 1,
|
||||
total_days,
|
||||
date_str,
|
||||
messages.len()
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
failed += 1;
|
||||
log::error!("✗ Failed to process {}: {:?}", date_str, e);
|
||||
}
|
||||
}
|
||||
|
||||
// Rate limiting: sleep 500ms between summaries
|
||||
if idx < total_days - 1 {
|
||||
sleep(std::time::Duration::from_millis(500)).await;
|
||||
}
|
||||
|
||||
// Progress logging every 10 days
|
||||
if idx % 10 == 0 && idx > 0 {
|
||||
log::info!(
|
||||
"Progress: {}/{} ({} processed, {} skipped, {} failed)",
|
||||
idx + 1,
|
||||
total_days,
|
||||
processed,
|
||||
skipped,
|
||||
failed
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
log::info!("========================================");
|
||||
log::info!("Daily summary generation complete!");
|
||||
log::info!("Processed: {}, Skipped: {}, Failed: {}", processed, skipped, failed);
|
||||
log::info!("========================================");
|
||||
|
||||
// Record final metrics in span
|
||||
parent_cx.span().set_attribute(KeyValue::new("days_processed", processed as i64));
|
||||
parent_cx.span().set_attribute(KeyValue::new("days_skipped", skipped as i64));
|
||||
parent_cx.span().set_attribute(KeyValue::new("days_failed", failed as i64));
|
||||
parent_cx.span().set_attribute(KeyValue::new("total_days", total_days as i64));
|
||||
|
||||
if failed > 0 {
|
||||
parent_cx.span().set_status(Status::error(format!("{} days failed to process", failed)));
|
||||
} else {
|
||||
parent_cx.span().set_status(Status::Ok);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Generate and store a single day's summary
|
||||
async fn generate_and_store_daily_summary(
|
||||
parent_cx: &opentelemetry::Context,
|
||||
date: &NaiveDate,
|
||||
contact: &str,
|
||||
messages: &[SmsMessage],
|
||||
ollama: &OllamaClient,
|
||||
summary_dao: Arc<Mutex<Box<dyn DailySummaryDao>>>,
|
||||
) -> Result<()> {
|
||||
let tracer = global_tracer();
|
||||
let mut span = tracer.start_with_context("ai.daily_summary.generate_single", parent_cx);
|
||||
span.set_attribute(KeyValue::new("date", date.to_string()));
|
||||
span.set_attribute(KeyValue::new("contact", contact.to_string()));
|
||||
span.set_attribute(KeyValue::new("message_count", messages.len() as i64));
|
||||
|
||||
// Format messages for LLM
|
||||
let messages_text: String = messages
|
||||
.iter()
|
||||
.take(200) // Limit to 200 messages per day to avoid token overflow
|
||||
.map(|m| {
|
||||
if m.is_sent {
|
||||
format!("Me: {}", m.body)
|
||||
} else {
|
||||
format!("{}: {}", m.contact, m.body)
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
|
||||
let weekday = date.format("%A");
|
||||
|
||||
let prompt = format!(
|
||||
r#"Summarize this day's conversation in 3-5 sentences. Focus on:
|
||||
- Key topics, activities, and events discussed
|
||||
- Places, people, or organizations mentioned
|
||||
- Plans made or decisions discussed
|
||||
- Overall mood or themes of the day
|
||||
|
||||
IMPORTANT: Clearly distinguish between what "I" or "Me" did versus what {} did.
|
||||
Always explicitly attribute actions, plans, and activities to the correct person.
|
||||
Use "I" or "Me" for my actions and "{}" for their actions.
|
||||
|
||||
Date: {} ({})
|
||||
Messages:
|
||||
{}
|
||||
|
||||
Write a natural, informative summary with clear subject attribution.
|
||||
Summary:"#,
|
||||
contact,
|
||||
contact,
|
||||
date.format("%B %d, %Y"),
|
||||
weekday,
|
||||
messages_text
|
||||
);
|
||||
|
||||
// Generate summary with LLM
|
||||
let summary = ollama
|
||||
.generate(
|
||||
&prompt,
|
||||
Some("You are a conversation summarizer. Create clear, factual summaries that maintain precise subject attribution - clearly distinguishing who said or did what."),
|
||||
)
|
||||
.await?;
|
||||
|
||||
log::debug!("Generated summary for {}: {}", date, summary.chars().take(100).collect::<String>());
|
||||
|
||||
span.set_attribute(KeyValue::new("summary_length", summary.len() as i64));
|
||||
|
||||
// Embed the summary
|
||||
let embedding = ollama.generate_embedding(&summary).await?;
|
||||
|
||||
span.set_attribute(KeyValue::new("embedding_dimensions", embedding.len() as i64));
|
||||
|
||||
// Store in database
|
||||
let insert = InsertDailySummary {
|
||||
date: date.format("%Y-%m-%d").to_string(),
|
||||
contact: contact.to_string(),
|
||||
summary: summary.trim().to_string(),
|
||||
message_count: messages.len() as i32,
|
||||
embedding,
|
||||
created_at: Utc::now().timestamp(),
|
||||
model_version: "nomic-embed-text:v1.5".to_string(),
|
||||
};
|
||||
|
||||
// Create context from current span for DB operation
|
||||
let child_cx = opentelemetry::Context::current_with_span(span);
|
||||
|
||||
let mut dao = summary_dao.lock().expect("Unable to lock DailySummaryDao");
|
||||
let result = dao.store_summary(&child_cx, insert)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to store summary: {:?}", e));
|
||||
|
||||
match &result {
|
||||
Ok(_) => child_cx.span().set_status(Status::Ok),
|
||||
Err(e) => child_cx.span().set_status(Status::error(e.to_string())),
|
||||
}
|
||||
|
||||
result?;
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user