Cargo fix
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
use anyhow::Result;
|
||||
use chrono::{NaiveDate, Utc};
|
||||
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::time::sleep;
|
||||
@@ -34,34 +34,33 @@ pub async fn generate_daily_summaries(
|
||||
let start = start_date.unwrap_or_else(|| NaiveDate::from_ymd_opt(2024, 7, 1).unwrap());
|
||||
let end = end_date.unwrap_or_else(|| NaiveDate::from_ymd_opt(2024, 9, 30).unwrap());
|
||||
|
||||
parent_cx.span().set_attribute(KeyValue::new("start_date", start.to_string()));
|
||||
parent_cx.span().set_attribute(KeyValue::new("end_date", end.to_string()));
|
||||
parent_cx.span().set_attribute(KeyValue::new("date_range_days", (end - start).num_days() + 1));
|
||||
parent_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("start_date", start.to_string()));
|
||||
parent_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("end_date", end.to_string()));
|
||||
parent_cx.span().set_attribute(KeyValue::new(
|
||||
"date_range_days",
|
||||
(end - start).num_days() + 1,
|
||||
));
|
||||
|
||||
log::info!(
|
||||
"========================================");
|
||||
log::info!("========================================");
|
||||
log::info!("Starting daily summary generation for {}", contact);
|
||||
log::info!("Date range: {} to {} ({} days)",
|
||||
start, end, (end - start).num_days() + 1
|
||||
log::info!(
|
||||
"Date range: {} to {} ({} days)",
|
||||
start,
|
||||
end,
|
||||
(end - start).num_days() + 1
|
||||
);
|
||||
log::info!("========================================");
|
||||
|
||||
// Fetch all messages for the contact in the date range
|
||||
log::info!("Fetching messages for date range...");
|
||||
let _start_timestamp = start
|
||||
.and_hms_opt(0, 0, 0)
|
||||
.unwrap()
|
||||
.and_utc()
|
||||
.timestamp();
|
||||
let _end_timestamp = end
|
||||
.and_hms_opt(23, 59, 59)
|
||||
.unwrap()
|
||||
.and_utc()
|
||||
.timestamp();
|
||||
let _start_timestamp = start.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp();
|
||||
let _end_timestamp = end.and_hms_opt(23, 59, 59).unwrap().and_utc().timestamp();
|
||||
|
||||
let all_messages = sms_client
|
||||
.fetch_all_messages_for_contact(contact)
|
||||
.await?;
|
||||
let all_messages = sms_client.fetch_all_messages_for_contact(contact).await?;
|
||||
|
||||
// Filter to date range and group by date
|
||||
let mut messages_by_date: HashMap<NaiveDate, Vec<SmsMessage>> = HashMap::new();
|
||||
@@ -109,7 +108,10 @@ pub async fn generate_daily_summaries(
|
||||
let mut dao = summary_dao.lock().expect("Unable to lock DailySummaryDao");
|
||||
let otel_context = opentelemetry::Context::new();
|
||||
|
||||
if dao.summary_exists(&otel_context, &date_str, contact).unwrap_or(false) {
|
||||
if dao
|
||||
.summary_exists(&otel_context, &date_str, contact)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
skipped += 1;
|
||||
if idx % 10 == 0 {
|
||||
log::info!(
|
||||
@@ -171,17 +173,32 @@ pub async fn generate_daily_summaries(
|
||||
|
||||
log::info!("========================================");
|
||||
log::info!("Daily summary generation complete!");
|
||||
log::info!("Processed: {}, Skipped: {}, Failed: {}", processed, skipped, failed);
|
||||
log::info!(
|
||||
"Processed: {}, Skipped: {}, Failed: {}",
|
||||
processed,
|
||||
skipped,
|
||||
failed
|
||||
);
|
||||
log::info!("========================================");
|
||||
|
||||
// Record final metrics in span
|
||||
parent_cx.span().set_attribute(KeyValue::new("days_processed", processed as i64));
|
||||
parent_cx.span().set_attribute(KeyValue::new("days_skipped", skipped as i64));
|
||||
parent_cx.span().set_attribute(KeyValue::new("days_failed", failed as i64));
|
||||
parent_cx.span().set_attribute(KeyValue::new("total_days", total_days as i64));
|
||||
parent_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("days_processed", processed as i64));
|
||||
parent_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("days_skipped", skipped as i64));
|
||||
parent_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("days_failed", failed as i64));
|
||||
parent_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("total_days", total_days as i64));
|
||||
|
||||
if failed > 0 {
|
||||
parent_cx.span().set_status(Status::error(format!("{} days failed to process", failed)));
|
||||
parent_cx
|
||||
.span()
|
||||
.set_status(Status::error(format!("{} days failed to process", failed)));
|
||||
} else {
|
||||
parent_cx.span().set_status(Status::Ok);
|
||||
}
|
||||
@@ -252,14 +269,21 @@ Summary:"#,
|
||||
)
|
||||
.await?;
|
||||
|
||||
log::debug!("Generated summary for {}: {}", date, summary.chars().take(100).collect::<String>());
|
||||
log::debug!(
|
||||
"Generated summary for {}: {}",
|
||||
date,
|
||||
summary.chars().take(100).collect::<String>()
|
||||
);
|
||||
|
||||
span.set_attribute(KeyValue::new("summary_length", summary.len() as i64));
|
||||
|
||||
// Embed the summary
|
||||
let embedding = ollama.generate_embedding(&summary).await?;
|
||||
|
||||
span.set_attribute(KeyValue::new("embedding_dimensions", embedding.len() as i64));
|
||||
span.set_attribute(KeyValue::new(
|
||||
"embedding_dimensions",
|
||||
embedding.len() as i64,
|
||||
));
|
||||
|
||||
// Store in database
|
||||
let insert = InsertDailySummary {
|
||||
@@ -276,7 +300,8 @@ Summary:"#,
|
||||
let child_cx = opentelemetry::Context::current_with_span(span);
|
||||
|
||||
let mut dao = summary_dao.lock().expect("Unable to lock DailySummaryDao");
|
||||
let result = dao.store_summary(&child_cx, insert)
|
||||
let result = dao
|
||||
.store_summary(&child_cx, insert)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to store summary: {:?}", e));
|
||||
|
||||
match &result {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use anyhow::Result;
|
||||
use chrono::Utc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::time::{sleep, Duration};
|
||||
use tokio::time::{Duration, sleep};
|
||||
|
||||
use crate::ai::{OllamaClient, SmsApiClient};
|
||||
use crate::database::{EmbeddingDao, InsertMessageEmbedding};
|
||||
@@ -30,8 +30,7 @@ pub async fn embed_contact_messages(
|
||||
// Check existing embeddings count
|
||||
let existing_count = {
|
||||
let mut dao = embedding_dao.lock().expect("Unable to lock EmbeddingDao");
|
||||
dao.get_message_count(&otel_context, contact)
|
||||
.unwrap_or(0)
|
||||
dao.get_message_count(&otel_context, contact).unwrap_or(0)
|
||||
};
|
||||
|
||||
if existing_count > 0 {
|
||||
@@ -45,15 +44,20 @@ pub async fn embed_contact_messages(
|
||||
log::info!("Fetching all messages for contact: {}", contact);
|
||||
|
||||
// Fetch all messages for the contact
|
||||
let messages = sms_client
|
||||
.fetch_all_messages_for_contact(contact)
|
||||
.await?;
|
||||
let messages = sms_client.fetch_all_messages_for_contact(contact).await?;
|
||||
|
||||
let total_messages = messages.len();
|
||||
log::info!("Fetched {} messages for contact '{}'", total_messages, contact);
|
||||
log::info!(
|
||||
"Fetched {} messages for contact '{}'",
|
||||
total_messages,
|
||||
contact
|
||||
);
|
||||
|
||||
if total_messages == 0 {
|
||||
log::warn!("No messages found for contact '{}', nothing to embed", contact);
|
||||
log::warn!(
|
||||
"No messages found for contact '{}', nothing to embed",
|
||||
contact
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -62,7 +66,8 @@ pub async fn embed_contact_messages(
|
||||
let min_message_length = 30; // Skip short messages like "Thanks!" or "Yeah, it was :)"
|
||||
let messages_to_embed: Vec<&crate::ai::SmsMessage> = {
|
||||
let mut dao = embedding_dao.lock().expect("Unable to lock EmbeddingDao");
|
||||
messages.iter()
|
||||
messages
|
||||
.iter()
|
||||
.filter(|msg| {
|
||||
// Filter out short messages
|
||||
if msg.body.len() < min_message_length {
|
||||
@@ -107,14 +112,7 @@ pub async fn embed_contact_messages(
|
||||
(batch_end as f64 / to_embed as f64) * 100.0
|
||||
);
|
||||
|
||||
match embed_message_batch(
|
||||
batch,
|
||||
contact,
|
||||
ollama,
|
||||
embedding_dao.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
match embed_message_batch(batch, contact, ollama, embedding_dao.clone()).await {
|
||||
Ok(count) => {
|
||||
successful += count;
|
||||
log::debug!("Successfully embedded {} messages in batch", count);
|
||||
@@ -206,7 +204,8 @@ async fn embed_message_batch(
|
||||
|
||||
// Store all embeddings in a single transaction
|
||||
let mut dao = embedding_dao.lock().expect("Unable to lock EmbeddingDao");
|
||||
let stored_count = dao.store_message_embeddings_batch(&otel_context, inserts)
|
||||
let stored_count = dao
|
||||
.store_message_embeddings_batch(&otel_context, inserts)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to store embeddings batch: {:?}", e))?;
|
||||
|
||||
Ok(stored_count)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use actix_web::{HttpRequest, HttpResponse, Responder, delete, get, post, web};
|
||||
use opentelemetry::trace::{Span, Status, Tracer};
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::trace::{Span, Status, Tracer};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::ai::{InsightGenerator, OllamaClient};
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use anyhow::Result;
|
||||
use chrono::{DateTime, Utc};
|
||||
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
|
||||
use serde::Deserialize;
|
||||
use std::fs::File;
|
||||
use std::sync::{Arc, Mutex};
|
||||
@@ -89,19 +89,31 @@ impl InsightGenerator {
|
||||
limit: usize,
|
||||
) -> Result<Vec<String>> {
|
||||
let tracer = global_tracer();
|
||||
let mut span = tracer.start_with_context("ai.rag.filter_historical", parent_cx);
|
||||
let span = tracer.start_with_context("ai.rag.filter_historical", parent_cx);
|
||||
let filter_cx = parent_cx.with_span(span);
|
||||
|
||||
filter_cx.span().set_attribute(KeyValue::new("date", date.to_string()));
|
||||
filter_cx.span().set_attribute(KeyValue::new("limit", limit as i64));
|
||||
filter_cx.span().set_attribute(KeyValue::new("exclusion_window_days", 30));
|
||||
filter_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("date", date.to_string()));
|
||||
filter_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("limit", limit as i64));
|
||||
filter_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("exclusion_window_days", 30));
|
||||
|
||||
let query_results = self.find_relevant_messages_rag(date, location, contact, limit * 2).await?;
|
||||
let query_results = self
|
||||
.find_relevant_messages_rag(date, location, contact, limit * 2)
|
||||
.await?;
|
||||
|
||||
filter_cx.span().set_attribute(KeyValue::new("rag_results_count", query_results.len() as i64));
|
||||
filter_cx.span().set_attribute(KeyValue::new(
|
||||
"rag_results_count",
|
||||
query_results.len() as i64,
|
||||
));
|
||||
|
||||
// Filter out messages from within 30 days of the photo date
|
||||
let photo_timestamp = date.and_hms_opt(12, 0, 0)
|
||||
let photo_timestamp = date
|
||||
.and_hms_opt(12, 0, 0)
|
||||
.ok_or_else(|| anyhow::anyhow!("Invalid date"))?
|
||||
.and_utc()
|
||||
.timestamp();
|
||||
@@ -114,7 +126,9 @@ impl InsightGenerator {
|
||||
if let Some(bracket_end) = msg.find(']') {
|
||||
if let Some(date_str) = msg.get(1..bracket_end) {
|
||||
// Parse just the date (daily summaries don't have time)
|
||||
if let Ok(msg_date) = chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
|
||||
if let Ok(msg_date) =
|
||||
chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d")
|
||||
{
|
||||
let msg_timestamp = msg_date
|
||||
.and_hms_opt(12, 0, 0)
|
||||
.unwrap()
|
||||
@@ -135,7 +149,10 @@ impl InsightGenerator {
|
||||
historical_only.len()
|
||||
);
|
||||
|
||||
filter_cx.span().set_attribute(KeyValue::new("historical_results_count", historical_only.len() as i64));
|
||||
filter_cx.span().set_attribute(KeyValue::new(
|
||||
"historical_results_count",
|
||||
historical_only.len() as i64,
|
||||
));
|
||||
filter_cx.span().set_status(Status::Ok);
|
||||
|
||||
Ok(historical_only)
|
||||
@@ -206,9 +223,15 @@ impl InsightGenerator {
|
||||
.find_similar_summaries(&search_cx, &query_embedding, limit)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to find similar summaries: {:?}", e))?;
|
||||
|
||||
log::info!("Found {} relevant daily summaries via RAG", similar_summaries.len());
|
||||
log::info!(
|
||||
"Found {} relevant daily summaries via RAG",
|
||||
similar_summaries.len()
|
||||
);
|
||||
|
||||
search_cx.span().set_attribute(KeyValue::new("results_count", similar_summaries.len() as i64));
|
||||
search_cx.span().set_attribute(KeyValue::new(
|
||||
"results_count",
|
||||
similar_summaries.len() as i64,
|
||||
));
|
||||
|
||||
// Format daily summaries for LLM context
|
||||
let formatted = similar_summaries
|
||||
@@ -303,9 +326,13 @@ impl InsightGenerator {
|
||||
let contact = Self::extract_contact_from_path(&file_path);
|
||||
log::info!("Extracted contact from path: {:?}", contact);
|
||||
|
||||
insight_cx.span().set_attribute(KeyValue::new("date_taken", date_taken.to_string()));
|
||||
insight_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("date_taken", date_taken.to_string()));
|
||||
if let Some(ref c) = contact {
|
||||
insight_cx.span().set_attribute(KeyValue::new("contact", c.clone()));
|
||||
insight_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("contact", c.clone()));
|
||||
}
|
||||
|
||||
// 4. Get location name from GPS coordinates (needed for RAG query)
|
||||
@@ -314,7 +341,9 @@ impl InsightGenerator {
|
||||
if let (Some(lat), Some(lon)) = (exif.gps_latitude, exif.gps_longitude) {
|
||||
let loc = self.reverse_geocode(lat, lon).await;
|
||||
if let Some(ref l) = loc {
|
||||
insight_cx.span().set_attribute(KeyValue::new("location", l.clone()));
|
||||
insight_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("location", l.clone()));
|
||||
}
|
||||
loc
|
||||
} else {
|
||||
@@ -341,12 +370,7 @@ impl InsightGenerator {
|
||||
// Strategy A: Pure RAG (we have location for good semantic matching)
|
||||
log::info!("Using RAG with location-based query");
|
||||
match self
|
||||
.find_relevant_messages_rag(
|
||||
date_taken,
|
||||
location.as_deref(),
|
||||
contact.as_deref(),
|
||||
20,
|
||||
)
|
||||
.find_relevant_messages_rag(date_taken, location.as_deref(), contact.as_deref(), 20)
|
||||
.await
|
||||
{
|
||||
Ok(rag_messages) if !rag_messages.is_empty() => {
|
||||
@@ -377,7 +401,9 @@ impl InsightGenerator {
|
||||
|
||||
if !immediate_messages.is_empty() {
|
||||
// Step 2: Extract topics from immediate messages to enrich RAG query
|
||||
let topics = self.extract_topics_from_messages(&immediate_messages, &ollama_client).await;
|
||||
let topics = self
|
||||
.extract_topics_from_messages(&immediate_messages, &ollama_client)
|
||||
.await;
|
||||
|
||||
log::info!("Extracted topics for query enrichment: {:?}", topics);
|
||||
|
||||
@@ -420,11 +446,15 @@ impl InsightGenerator {
|
||||
Ok(_) => {
|
||||
// RAG found no historical matches, just use immediate context
|
||||
log::info!("No historical RAG matches, using immediate context only");
|
||||
sms_summary = self.summarize_context_from_messages(&immediate_messages, &ollama_client).await;
|
||||
sms_summary = self
|
||||
.summarize_context_from_messages(&immediate_messages, &ollama_client)
|
||||
.await;
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Historical RAG failed, using immediate context only: {}", e);
|
||||
sms_summary = self.summarize_context_from_messages(&immediate_messages, &ollama_client).await;
|
||||
sms_summary = self
|
||||
.summarize_context_from_messages(&immediate_messages, &ollama_client)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -481,8 +511,12 @@ impl InsightGenerator {
|
||||
}
|
||||
|
||||
let retrieval_method = if used_rag { "RAG" } else { "time-based" };
|
||||
insight_cx.span().set_attribute(KeyValue::new("retrieval_method", retrieval_method));
|
||||
insight_cx.span().set_attribute(KeyValue::new("has_sms_context", sms_summary.is_some()));
|
||||
insight_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("retrieval_method", retrieval_method));
|
||||
insight_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("has_sms_context", sms_summary.is_some()));
|
||||
|
||||
log::info!(
|
||||
"Photo context: date={}, location={:?}, retrieval_method={}",
|
||||
@@ -503,8 +537,12 @@ impl InsightGenerator {
|
||||
log::info!("Generated title: {}", title);
|
||||
log::info!("Generated summary: {}", summary);
|
||||
|
||||
insight_cx.span().set_attribute(KeyValue::new("title_length", title.len() as i64));
|
||||
insight_cx.span().set_attribute(KeyValue::new("summary_length", summary.len() as i64));
|
||||
insight_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("title_length", title.len() as i64));
|
||||
insight_cx
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("summary_length", summary.len() as i64));
|
||||
|
||||
// 8. Store in database
|
||||
let insight = InsertPhotoInsight {
|
||||
@@ -516,7 +554,8 @@ impl InsightGenerator {
|
||||
};
|
||||
|
||||
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
|
||||
let result = dao.store_insight(&insight_cx, insight)
|
||||
let result = dao
|
||||
.store_insight(&insight_cx, insight)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to store insight: {:?}", e));
|
||||
|
||||
match &result {
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
pub mod embedding_job;
|
||||
pub mod daily_summary_job;
|
||||
pub mod embedding_job;
|
||||
pub mod handlers;
|
||||
pub mod insight_generator;
|
||||
pub mod ollama;
|
||||
pub mod sms_client;
|
||||
|
||||
pub use embedding_job::embed_contact_messages;
|
||||
pub use daily_summary_job::generate_daily_summaries;
|
||||
pub use handlers::{
|
||||
delete_insight_handler, generate_insight_handler, get_all_insights_handler,
|
||||
|
||||
@@ -247,7 +247,9 @@ Use only the specific details provided above. Mention people's names, places, or
|
||||
/// Returns a 768-dimensional vector as Vec<f32>
|
||||
pub async fn generate_embedding(&self, text: &str) -> Result<Vec<f32>> {
|
||||
let embeddings = self.generate_embeddings(&[text]).await?;
|
||||
embeddings.into_iter().next()
|
||||
embeddings
|
||||
.into_iter()
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("No embedding returned"))
|
||||
}
|
||||
|
||||
@@ -275,7 +277,10 @@ Use only the specific details provided above. Mention people's names, places, or
|
||||
|
||||
let embeddings = match primary_result {
|
||||
Ok(embeddings) => {
|
||||
log::debug!("Successfully generated {} embeddings from primary server", embeddings.len());
|
||||
log::debug!(
|
||||
"Successfully generated {} embeddings from primary server",
|
||||
embeddings.len()
|
||||
);
|
||||
embeddings
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -294,11 +299,17 @@ Use only the specific details provided above. Mention people's names, places, or
|
||||
.await
|
||||
{
|
||||
Ok(embeddings) => {
|
||||
log::info!("Successfully generated {} embeddings from fallback server", embeddings.len());
|
||||
log::info!(
|
||||
"Successfully generated {} embeddings from fallback server",
|
||||
embeddings.len()
|
||||
);
|
||||
embeddings
|
||||
}
|
||||
Err(fallback_e) => {
|
||||
log::error!("Fallback server batch embedding also failed: {}", fallback_e);
|
||||
log::error!(
|
||||
"Fallback server batch embedding also failed: {}",
|
||||
fallback_e
|
||||
);
|
||||
return Err(anyhow::anyhow!(
|
||||
"Both primary and fallback servers failed. Primary: {}, Fallback: {}",
|
||||
e,
|
||||
@@ -328,14 +339,11 @@ Use only the specific details provided above. Mention people's names, places, or
|
||||
}
|
||||
|
||||
/// Internal helper to try generating an embedding from a specific server
|
||||
async fn try_generate_embedding(
|
||||
&self,
|
||||
url: &str,
|
||||
model: &str,
|
||||
text: &str,
|
||||
) -> Result<Vec<f32>> {
|
||||
async fn try_generate_embedding(&self, url: &str, model: &str, text: &str) -> Result<Vec<f32>> {
|
||||
let embeddings = self.try_generate_embeddings(url, model, &[text]).await?;
|
||||
embeddings.into_iter().next()
|
||||
embeddings
|
||||
.into_iter()
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("No embedding returned from Ollama"))
|
||||
}
|
||||
|
||||
|
||||
@@ -100,31 +100,31 @@ impl SmsApiClient {
|
||||
.timestamp();
|
||||
let end_ts = chrono::Utc::now().timestamp();
|
||||
|
||||
log::info!(
|
||||
"Fetching all historical messages for contact: {}",
|
||||
contact
|
||||
);
|
||||
log::info!("Fetching all historical messages for contact: {}", contact);
|
||||
|
||||
let mut all_messages = Vec::new();
|
||||
let mut offset = 0;
|
||||
let limit = 1000; // Fetch in batches of 1000
|
||||
|
||||
loop {
|
||||
log::debug!("Fetching batch at offset {} for contact {}", offset, contact);
|
||||
log::debug!(
|
||||
"Fetching batch at offset {} for contact {}",
|
||||
offset,
|
||||
contact
|
||||
);
|
||||
|
||||
let batch = self.fetch_messages_paginated(
|
||||
start_ts,
|
||||
end_ts,
|
||||
Some(contact),
|
||||
None,
|
||||
limit,
|
||||
offset
|
||||
).await?;
|
||||
let batch = self
|
||||
.fetch_messages_paginated(start_ts, end_ts, Some(contact), None, limit, offset)
|
||||
.await?;
|
||||
|
||||
let batch_size = batch.len();
|
||||
all_messages.extend(batch);
|
||||
|
||||
log::debug!("Fetched {} messages (total so far: {})", batch_size, all_messages.len());
|
||||
log::debug!(
|
||||
"Fetched {} messages (total so far: {})",
|
||||
batch_size,
|
||||
all_messages.len()
|
||||
);
|
||||
|
||||
// If we got fewer messages than the limit, we've reached the end
|
||||
if batch_size < limit {
|
||||
|
||||
@@ -4,7 +4,7 @@ use serde::Serialize;
|
||||
use std::ops::DerefMut;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use crate::database::{connect, DbError, DbErrorKind};
|
||||
use crate::database::{DbError, DbErrorKind, connect};
|
||||
use crate::otel::trace_db_call;
|
||||
|
||||
/// Represents a daily conversation summary
|
||||
@@ -125,7 +125,10 @@ impl DailySummaryDao for SqliteDailySummaryDao {
|
||||
summary: InsertDailySummary,
|
||||
) -> Result<DailySummary, DbError> {
|
||||
trace_db_call(context, "insert", "store_summary", |_span| {
|
||||
let mut conn = self.connection.lock().expect("Unable to get DailySummaryDao");
|
||||
let mut conn = self
|
||||
.connection
|
||||
.lock()
|
||||
.expect("Unable to get DailySummaryDao");
|
||||
|
||||
// Validate embedding dimensions
|
||||
if summary.embedding.len() != 768 {
|
||||
@@ -141,7 +144,7 @@ impl DailySummaryDao for SqliteDailySummaryDao {
|
||||
diesel::sql_query(
|
||||
"INSERT OR REPLACE INTO daily_conversation_summaries
|
||||
(date, contact, summary, message_count, embedding, created_at, model_version)
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
|
||||
)
|
||||
.bind::<diesel::sql_types::Text, _>(&summary.date)
|
||||
.bind::<diesel::sql_types::Text, _>(&summary.contact)
|
||||
@@ -266,11 +269,14 @@ impl DailySummaryDao for SqliteDailySummaryDao {
|
||||
contact: &str,
|
||||
) -> Result<bool, DbError> {
|
||||
trace_db_call(context, "query", "summary_exists", |_span| {
|
||||
let mut conn = self.connection.lock().expect("Unable to get DailySummaryDao");
|
||||
let mut conn = self
|
||||
.connection
|
||||
.lock()
|
||||
.expect("Unable to get DailySummaryDao");
|
||||
|
||||
let count = diesel::sql_query(
|
||||
"SELECT COUNT(*) as count FROM daily_conversation_summaries
|
||||
WHERE date = ?1 AND contact = ?2"
|
||||
WHERE date = ?1 AND contact = ?2",
|
||||
)
|
||||
.bind::<diesel::sql_types::Text, _>(date)
|
||||
.bind::<diesel::sql_types::Text, _>(contact)
|
||||
@@ -289,10 +295,13 @@ impl DailySummaryDao for SqliteDailySummaryDao {
|
||||
contact: &str,
|
||||
) -> Result<i64, DbError> {
|
||||
trace_db_call(context, "query", "get_summary_count", |_span| {
|
||||
let mut conn = self.connection.lock().expect("Unable to get DailySummaryDao");
|
||||
let mut conn = self
|
||||
.connection
|
||||
.lock()
|
||||
.expect("Unable to get DailySummaryDao");
|
||||
|
||||
diesel::sql_query(
|
||||
"SELECT COUNT(*) as count FROM daily_conversation_summaries WHERE contact = ?1"
|
||||
"SELECT COUNT(*) as count FROM daily_conversation_summaries WHERE contact = ?1",
|
||||
)
|
||||
.bind::<diesel::sql_types::Text, _>(contact)
|
||||
.get_result::<CountResult>(conn.deref_mut())
|
||||
|
||||
@@ -468,7 +468,7 @@ impl EmbeddingDao for SqliteEmbeddingDao {
|
||||
let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao");
|
||||
|
||||
let count = diesel::sql_query(
|
||||
"SELECT COUNT(*) as count FROM message_embeddings WHERE contact = ?1"
|
||||
"SELECT COUNT(*) as count FROM message_embeddings WHERE contact = ?1",
|
||||
)
|
||||
.bind::<diesel::sql_types::Text, _>(contact)
|
||||
.get_result::<CountResult>(conn.deref_mut())
|
||||
@@ -501,7 +501,7 @@ impl EmbeddingDao for SqliteEmbeddingDao {
|
||||
|
||||
let count = diesel::sql_query(
|
||||
"SELECT COUNT(*) as count FROM message_embeddings
|
||||
WHERE contact = ?1 AND body = ?2 AND timestamp = ?3"
|
||||
WHERE contact = ?1 AND body = ?2 AND timestamp = ?3",
|
||||
)
|
||||
.bind::<diesel::sql_types::Text, _>(contact)
|
||||
.bind::<diesel::sql_types::Text, _>(body)
|
||||
|
||||
@@ -9,15 +9,15 @@ use crate::database::models::{
|
||||
};
|
||||
use crate::otel::trace_db_call;
|
||||
|
||||
pub mod embeddings_dao;
|
||||
pub mod daily_summary_dao;
|
||||
pub mod embeddings_dao;
|
||||
pub mod insights_dao;
|
||||
pub mod models;
|
||||
pub mod schema;
|
||||
|
||||
pub use embeddings_dao::{EmbeddingDao, InsertMessageEmbedding, SqliteEmbeddingDao};
|
||||
pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao};
|
||||
pub use embeddings_dao::{EmbeddingDao, InsertMessageEmbedding};
|
||||
pub use insights_dao::{InsightDao, SqliteInsightDao};
|
||||
pub use daily_summary_dao::{DailySummaryDao, SqliteDailySummaryDao, DailySummary, InsertDailySummary};
|
||||
|
||||
pub trait UserDao {
|
||||
fn create_user(&mut self, user: &str, password: &str) -> Option<User>;
|
||||
|
||||
@@ -49,7 +49,6 @@ use crate::video::actors::{
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
|
||||
use opentelemetry::{KeyValue, global};
|
||||
use crate::video::generate_video_gifs;
|
||||
|
||||
mod ai;
|
||||
mod auth;
|
||||
@@ -776,8 +775,10 @@ fn main() -> std::io::Result<()> {
|
||||
end,
|
||||
&ollama_clone,
|
||||
&sms_client_clone,
|
||||
summary_dao
|
||||
).await {
|
||||
summary_dao,
|
||||
)
|
||||
.await
|
||||
{
|
||||
log::error!("Daily summary generation failed for {}: {:?}", contact, e);
|
||||
} else {
|
||||
log::info!("Daily summary generation completed for {}", contact);
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use crate::ai::{InsightGenerator, OllamaClient, SmsApiClient};
|
||||
use crate::database::{DailySummaryDao, ExifDao, InsightDao, SqliteDailySummaryDao, SqliteExifDao, SqliteInsightDao};
|
||||
use crate::database::{
|
||||
DailySummaryDao, ExifDao, InsightDao, SqliteDailySummaryDao, SqliteExifDao, SqliteInsightDao,
|
||||
};
|
||||
use crate::video::actors::{PlaylistGenerator, StreamActor, VideoPlaylistManager};
|
||||
use actix::{Actor, Addr};
|
||||
use std::env;
|
||||
|
||||
Reference in New Issue
Block a user