feature/insights #46
@@ -72,11 +72,12 @@ pub fn strip_summary_boilerplate(summary: &str) -> String {
|
|||||||
|
|
||||||
// Remove any remaining leading markdown bold markers
|
// Remove any remaining leading markdown bold markers
|
||||||
if text.starts_with("**")
|
if text.starts_with("**")
|
||||||
&& let Some(end) = text[2..].find("**") {
|
&& let Some(end) = text[2..].find("**")
|
||||||
// Keep the content between ** but remove the markers
|
{
|
||||||
let bold_content = &text[2..2 + end];
|
// Keep the content between ** but remove the markers
|
||||||
text = format!("{}{}", bold_content, &text[4 + end..]);
|
let bold_content = &text[2..2 + end];
|
||||||
}
|
text = format!("{}{}", bold_content, &text[4 + end..]);
|
||||||
|
}
|
||||||
|
|
||||||
text.trim().to_string()
|
text.trim().to_string()
|
||||||
}
|
}
|
||||||
@@ -141,10 +142,7 @@ pub async fn generate_daily_summaries(
|
|||||||
if let Some(dt) = msg_dt {
|
if let Some(dt) = msg_dt {
|
||||||
let date = dt.date_naive();
|
let date = dt.date_naive();
|
||||||
if date >= start && date <= end {
|
if date >= start && date <= end {
|
||||||
messages_by_date
|
messages_by_date.entry(date).or_default().push(msg);
|
||||||
.entry(date)
|
|
||||||
.or_default()
|
|
||||||
.push(msg);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,212 +0,0 @@
|
|||||||
use anyhow::Result;
|
|
||||||
use chrono::Utc;
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use tokio::time::{Duration, sleep};
|
|
||||||
|
|
||||||
use crate::ai::{OllamaClient, SmsApiClient};
|
|
||||||
use crate::database::{EmbeddingDao, InsertMessageEmbedding};
|
|
||||||
|
|
||||||
/// Background job to embed messages for a specific contact
|
|
||||||
/// This function is idempotent - it checks if embeddings already exist before processing
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
/// * `contact` - The contact name to embed messages for (e.g., "Amanda")
|
|
||||||
/// * `ollama` - Ollama client for generating embeddings
|
|
||||||
/// * `sms_client` - SMS API client for fetching messages
|
|
||||||
/// * `embedding_dao` - DAO for storing embeddings in the database
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// Ok(()) on success, Err on failure
|
|
||||||
pub async fn embed_contact_messages(
|
|
||||||
contact: &str,
|
|
||||||
ollama: &OllamaClient,
|
|
||||||
sms_client: &SmsApiClient,
|
|
||||||
embedding_dao: Arc<Mutex<Box<dyn EmbeddingDao>>>,
|
|
||||||
) -> Result<()> {
|
|
||||||
log::info!("Starting message embedding job for contact: {}", contact);
|
|
||||||
|
|
||||||
let otel_context = opentelemetry::Context::new();
|
|
||||||
|
|
||||||
// Check existing embeddings count
|
|
||||||
let existing_count = {
|
|
||||||
let mut dao = embedding_dao.lock().expect("Unable to lock EmbeddingDao");
|
|
||||||
dao.get_message_count(&otel_context, contact).unwrap_or(0)
|
|
||||||
};
|
|
||||||
|
|
||||||
if existing_count > 0 {
|
|
||||||
log::info!(
|
|
||||||
"Contact '{}' already has {} embeddings, will check for new messages to embed",
|
|
||||||
contact,
|
|
||||||
existing_count
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
log::info!("Fetching all messages for contact: {}", contact);
|
|
||||||
|
|
||||||
// Fetch all messages for the contact
|
|
||||||
let messages = sms_client.fetch_all_messages_for_contact(contact).await?;
|
|
||||||
|
|
||||||
let total_messages = messages.len();
|
|
||||||
log::info!(
|
|
||||||
"Fetched {} messages for contact '{}'",
|
|
||||||
total_messages,
|
|
||||||
contact
|
|
||||||
);
|
|
||||||
|
|
||||||
if total_messages == 0 {
|
|
||||||
log::warn!(
|
|
||||||
"No messages found for contact '{}', nothing to embed",
|
|
||||||
contact
|
|
||||||
);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Filter out messages that already have embeddings and short/generic messages
|
|
||||||
log::info!("Filtering out messages that already have embeddings and short messages...");
|
|
||||||
let min_message_length = 30; // Skip short messages like "Thanks!" or "Yeah, it was :)"
|
|
||||||
let messages_to_embed: Vec<&crate::ai::SmsMessage> = {
|
|
||||||
let mut dao = embedding_dao.lock().expect("Unable to lock EmbeddingDao");
|
|
||||||
messages
|
|
||||||
.iter()
|
|
||||||
.filter(|msg| {
|
|
||||||
// Filter out short messages
|
|
||||||
if msg.body.len() < min_message_length {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
// Filter out already embedded messages
|
|
||||||
!dao.message_exists(&otel_context, contact, &msg.body, msg.timestamp)
|
|
||||||
.unwrap_or(false)
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
};
|
|
||||||
|
|
||||||
let skipped = total_messages - messages_to_embed.len();
|
|
||||||
let to_embed = messages_to_embed.len();
|
|
||||||
|
|
||||||
log::info!(
|
|
||||||
"Found {} messages to embed ({} already embedded)",
|
|
||||||
to_embed,
|
|
||||||
skipped
|
|
||||||
);
|
|
||||||
|
|
||||||
if to_embed == 0 {
|
|
||||||
log::info!("All messages already embedded for contact '{}'", contact);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process messages in batches
|
|
||||||
let batch_size = 128; // Embed 128 messages per API call
|
|
||||||
let mut successful = 0;
|
|
||||||
let mut failed = 0;
|
|
||||||
|
|
||||||
for (batch_idx, batch) in messages_to_embed.chunks(batch_size).enumerate() {
|
|
||||||
let batch_start = batch_idx * batch_size;
|
|
||||||
let batch_end = batch_start + batch.len();
|
|
||||||
|
|
||||||
log::info!(
|
|
||||||
"Processing batch {}/{}: messages {}-{} ({:.1}% complete)",
|
|
||||||
batch_idx + 1,
|
|
||||||
to_embed.div_ceil(batch_size),
|
|
||||||
batch_start + 1,
|
|
||||||
batch_end,
|
|
||||||
(batch_end as f64 / to_embed as f64) * 100.0
|
|
||||||
);
|
|
||||||
|
|
||||||
match embed_message_batch(batch, contact, ollama, embedding_dao.clone()).await {
|
|
||||||
Ok(count) => {
|
|
||||||
successful += count;
|
|
||||||
log::debug!("Successfully embedded {} messages in batch", count);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
failed += batch.len();
|
|
||||||
log::error!("Failed to embed batch: {:?}", e);
|
|
||||||
// Continue processing despite failures
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Small delay between batches to avoid overwhelming Ollama
|
|
||||||
if batch_end < to_embed {
|
|
||||||
sleep(Duration::from_millis(500)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log::info!(
|
|
||||||
"Message embedding job complete for '{}': {}/{} new embeddings created ({} already embedded, {} failed)",
|
|
||||||
contact,
|
|
||||||
successful,
|
|
||||||
total_messages,
|
|
||||||
skipped,
|
|
||||||
failed
|
|
||||||
);
|
|
||||||
|
|
||||||
if failed > 0 {
|
|
||||||
log::warn!(
|
|
||||||
"{} messages failed to embed for contact '{}'",
|
|
||||||
failed,
|
|
||||||
contact
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Embed a batch of messages using a single API call
|
|
||||||
/// Returns the number of successfully embedded messages
|
|
||||||
async fn embed_message_batch(
|
|
||||||
messages: &[&crate::ai::SmsMessage],
|
|
||||||
contact: &str,
|
|
||||||
ollama: &OllamaClient,
|
|
||||||
embedding_dao: Arc<Mutex<Box<dyn EmbeddingDao>>>,
|
|
||||||
) -> Result<usize> {
|
|
||||||
if messages.is_empty() {
|
|
||||||
return Ok(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect message bodies for batch embedding
|
|
||||||
let bodies: Vec<&str> = messages.iter().map(|m| m.body.as_str()).collect();
|
|
||||||
|
|
||||||
// Generate embeddings for all messages in one API call
|
|
||||||
let embeddings = ollama.generate_embeddings(&bodies).await?;
|
|
||||||
|
|
||||||
if embeddings.len() != messages.len() {
|
|
||||||
return Err(anyhow::anyhow!(
|
|
||||||
"Embedding count mismatch: got {} embeddings for {} messages",
|
|
||||||
embeddings.len(),
|
|
||||||
messages.len()
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build batch of insert records
|
|
||||||
let otel_context = opentelemetry::Context::new();
|
|
||||||
let created_at = Utc::now().timestamp();
|
|
||||||
let mut inserts = Vec::with_capacity(messages.len());
|
|
||||||
|
|
||||||
for (message, embedding) in messages.iter().zip(embeddings.iter()) {
|
|
||||||
// Validate embedding dimensions
|
|
||||||
if embedding.len() != 768 {
|
|
||||||
log::warn!(
|
|
||||||
"Invalid embedding dimensions: {} (expected 768), skipping",
|
|
||||||
embedding.len()
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
inserts.push(InsertMessageEmbedding {
|
|
||||||
contact: contact.to_string(),
|
|
||||||
body: message.body.clone(),
|
|
||||||
timestamp: message.timestamp,
|
|
||||||
is_sent: message.is_sent,
|
|
||||||
embedding: embedding.clone(),
|
|
||||||
created_at,
|
|
||||||
model_version: "nomic-embed-text:v1.5".to_string(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store all embeddings in a single transaction
|
|
||||||
let mut dao = embedding_dao.lock().expect("Unable to lock EmbeddingDao");
|
|
||||||
let stored_count = dao
|
|
||||||
.store_message_embeddings_batch(&otel_context, inserts)
|
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to store embeddings batch: {:?}", e))?;
|
|
||||||
|
|
||||||
Ok(stored_count)
|
|
||||||
}
|
|
||||||
@@ -86,9 +86,10 @@ impl InsightGenerator {
|
|||||||
// If path has at least 2 components (directory + file), extract first directory
|
// If path has at least 2 components (directory + file), extract first directory
|
||||||
if components.len() >= 2
|
if components.len() >= 2
|
||||||
&& let Some(component) = components.first()
|
&& let Some(component) = components.first()
|
||||||
&& let Some(os_str) = component.as_os_str().to_str() {
|
&& let Some(os_str) = component.as_os_str().to_str()
|
||||||
return Some(os_str.to_string());
|
{
|
||||||
}
|
return Some(os_str.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
@@ -190,20 +191,19 @@ impl InsightGenerator {
|
|||||||
.filter(|msg| {
|
.filter(|msg| {
|
||||||
// Extract date from formatted daily summary "[2024-08-15] Contact ..."
|
// Extract date from formatted daily summary "[2024-08-15] Contact ..."
|
||||||
if let Some(bracket_end) = msg.find(']')
|
if let Some(bracket_end) = msg.find(']')
|
||||||
&& let Some(date_str) = msg.get(1..bracket_end) {
|
&& let Some(date_str) = msg.get(1..bracket_end)
|
||||||
// Parse just the date (daily summaries don't have time)
|
{
|
||||||
if let Ok(msg_date) =
|
// Parse just the date (daily summaries don't have time)
|
||||||
chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d")
|
if let Ok(msg_date) = chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
|
||||||
{
|
let msg_timestamp = msg_date
|
||||||
let msg_timestamp = msg_date
|
.and_hms_opt(12, 0, 0)
|
||||||
.and_hms_opt(12, 0, 0)
|
.unwrap()
|
||||||
.unwrap()
|
.and_utc()
|
||||||
.and_utc()
|
.timestamp();
|
||||||
.timestamp();
|
let time_diff = (photo_timestamp - msg_timestamp).abs();
|
||||||
let time_diff = (photo_timestamp - msg_timestamp).abs();
|
return time_diff > exclusion_window;
|
||||||
return time_diff > exclusion_window;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
false
|
false
|
||||||
})
|
})
|
||||||
.take(limit)
|
.take(limit)
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
pub mod daily_summary_job;
|
pub mod daily_summary_job;
|
||||||
pub mod embedding_job;
|
|
||||||
pub mod handlers;
|
pub mod handlers;
|
||||||
pub mod insight_generator;
|
pub mod insight_generator;
|
||||||
pub mod ollama;
|
pub mod ollama;
|
||||||
|
|||||||
@@ -79,10 +79,11 @@ impl OllamaClient {
|
|||||||
{
|
{
|
||||||
let cache = MODEL_LIST_CACHE.lock().unwrap();
|
let cache = MODEL_LIST_CACHE.lock().unwrap();
|
||||||
if let Some(entry) = cache.get(url)
|
if let Some(entry) = cache.get(url)
|
||||||
&& !entry.is_expired() {
|
&& !entry.is_expired()
|
||||||
log::debug!("Returning cached model list for {}", url);
|
{
|
||||||
return Ok(entry.data.clone());
|
log::debug!("Returning cached model list for {}", url);
|
||||||
}
|
return Ok(entry.data.clone());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log::debug!("Fetching fresh model list from {}", url);
|
log::debug!("Fetching fresh model list from {}", url);
|
||||||
@@ -188,10 +189,11 @@ impl OllamaClient {
|
|||||||
{
|
{
|
||||||
let cache = MODEL_CAPABILITIES_CACHE.lock().unwrap();
|
let cache = MODEL_CAPABILITIES_CACHE.lock().unwrap();
|
||||||
if let Some(entry) = cache.get(url)
|
if let Some(entry) = cache.get(url)
|
||||||
&& !entry.is_expired() {
|
&& !entry.is_expired()
|
||||||
log::debug!("Returning cached model capabilities for {}", url);
|
{
|
||||||
return Ok(entry.data.clone());
|
log::debug!("Returning cached model capabilities for {}", url);
|
||||||
}
|
return Ok(entry.data.clone());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log::debug!("Fetching fresh model capabilities from {}", url);
|
log::debug!("Fetching fresh model capabilities from {}", url);
|
||||||
@@ -420,8 +422,8 @@ Return ONLY the title, nothing else."#,
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
} else if let Some(contact_name) = contact {
|
} else if let Some(contact_name) = contact {
|
||||||
format!(
|
format!(
|
||||||
r#"Create a short title (maximum 8 words) about this moment:
|
r#"Create a short title (maximum 8 words) about this moment:
|
||||||
|
|
||||||
Date: {}
|
Date: {}
|
||||||
Location: {}
|
Location: {}
|
||||||
@@ -431,15 +433,15 @@ Return ONLY the title, nothing else."#,
|
|||||||
Use specific details from the context above. The photo is from a folder for {}, so they are likely related to this moment. If no specific details are available, use a simple descriptive title.
|
Use specific details from the context above. The photo is from a folder for {}, so they are likely related to this moment. If no specific details are available, use a simple descriptive title.
|
||||||
|
|
||||||
Return ONLY the title, nothing else."#,
|
Return ONLY the title, nothing else."#,
|
||||||
date.format("%B %d, %Y"),
|
date.format("%B %d, %Y"),
|
||||||
location_str,
|
location_str,
|
||||||
contact_name,
|
contact_name,
|
||||||
sms_str,
|
sms_str,
|
||||||
contact_name
|
contact_name
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
format!(
|
format!(
|
||||||
r#"Create a short title (maximum 8 words) about this moment:
|
r#"Create a short title (maximum 8 words) about this moment:
|
||||||
|
|
||||||
Date: {}
|
Date: {}
|
||||||
Location: {}
|
Location: {}
|
||||||
@@ -448,11 +450,11 @@ Return ONLY the title, nothing else."#,
|
|||||||
Use specific details from the context above. If no specific details are available, use a simple descriptive title.
|
Use specific details from the context above. If no specific details are available, use a simple descriptive title.
|
||||||
|
|
||||||
Return ONLY the title, nothing else."#,
|
Return ONLY the title, nothing else."#,
|
||||||
date.format("%B %d, %Y"),
|
date.format("%B %d, %Y"),
|
||||||
location_str,
|
location_str,
|
||||||
sms_str
|
sms_str
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
let system = custom_system.unwrap_or("You are my long term memory assistant. Use only the information provided. Do not invent details.");
|
let system = custom_system.unwrap_or("You are my long term memory assistant. Use only the information provided. Do not invent details.");
|
||||||
|
|
||||||
@@ -509,8 +511,8 @@ Analyze the image and use specific details from both the visual content and the
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
} else if let Some(contact_name) = contact {
|
} else if let Some(contact_name) = contact {
|
||||||
format!(
|
format!(
|
||||||
r#"Write a 1-3 paragraph description of this moment based on the available information:
|
r#"Write a 1-3 paragraph description of this moment based on the available information:
|
||||||
|
|
||||||
Date: {}
|
Date: {}
|
||||||
Location: {}
|
Location: {}
|
||||||
@@ -518,27 +520,27 @@ Analyze the image and use specific details from both the visual content and the
|
|||||||
Messages: {}
|
Messages: {}
|
||||||
|
|
||||||
Use only the specific details provided above. The photo is from a folder for {}, so they are likely related to this moment. Mention people's names (especially {}), places, or activities if they appear in the context. Write in first person as Cameron with the tone of a journal entry. If limited information is available, keep it simple and factual. If the location is unknown omit it"#,
|
Use only the specific details provided above. The photo is from a folder for {}, so they are likely related to this moment. Mention people's names (especially {}), places, or activities if they appear in the context. Write in first person as Cameron with the tone of a journal entry. If limited information is available, keep it simple and factual. If the location is unknown omit it"#,
|
||||||
date.format("%B %d, %Y"),
|
date.format("%B %d, %Y"),
|
||||||
location_str,
|
location_str,
|
||||||
contact_name,
|
contact_name,
|
||||||
sms_str,
|
sms_str,
|
||||||
contact_name,
|
contact_name,
|
||||||
contact_name
|
contact_name
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
format!(
|
format!(
|
||||||
r#"Write a 1-3 paragraph description of this moment based on the available information:
|
r#"Write a 1-3 paragraph description of this moment based on the available information:
|
||||||
|
|
||||||
Date: {}
|
Date: {}
|
||||||
Location: {}
|
Location: {}
|
||||||
Messages: {}
|
Messages: {}
|
||||||
|
|
||||||
Use only the specific details provided above. Mention people's names, places, or activities if they appear in the context. Write in first person as Cameron with the tone of a journal entry. If limited information is available, keep it simple and factual. If the location is unknown omit it"#,
|
Use only the specific details provided above. Mention people's names, places, or activities if they appear in the context. Write in first person as Cameron with the tone of a journal entry. If limited information is available, keep it simple and factual. If the location is unknown omit it"#,
|
||||||
date.format("%B %d, %Y"),
|
date.format("%B %d, %Y"),
|
||||||
location_str,
|
location_str,
|
||||||
sms_str
|
sms_str
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
let system = custom_system.unwrap_or("You are a memory refreshing assistant who is able to provide insights through analyzing past conversations. Use only the information provided. Do not invent details.");
|
let system = custom_system.unwrap_or("You are a memory refreshing assistant who is able to provide insights through analyzing past conversations. Use only the information provided. Do not invent details.");
|
||||||
|
|
||||||
@@ -642,15 +644,6 @@ Analyze the image and use specific details from both the visual content and the
|
|||||||
Ok(embeddings)
|
Ok(embeddings)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Internal helper to try generating an embedding from a specific server
|
|
||||||
async fn try_generate_embedding(&self, url: &str, model: &str, text: &str) -> Result<Vec<f32>> {
|
|
||||||
let embeddings = self.try_generate_embeddings(url, model, &[text]).await?;
|
|
||||||
embeddings
|
|
||||||
.into_iter()
|
|
||||||
.next()
|
|
||||||
.ok_or_else(|| anyhow::anyhow!("No embedding returned from Ollama"))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Internal helper to try generating embeddings for multiple texts from a specific server
|
/// Internal helper to try generating embeddings for multiple texts from a specific server
|
||||||
async fn try_generate_embeddings(
|
async fn try_generate_embeddings(
|
||||||
&self,
|
&self,
|
||||||
@@ -730,12 +723,6 @@ pub struct ModelCapabilities {
|
|||||||
pub has_vision: bool,
|
pub has_vision: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
|
||||||
struct OllamaEmbedRequest {
|
|
||||||
model: String,
|
|
||||||
input: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
struct OllamaBatchEmbedRequest {
|
struct OllamaBatchEmbedRequest {
|
||||||
model: String,
|
model: String,
|
||||||
|
|||||||
@@ -80,10 +80,11 @@ async fn main() -> Result<()> {
|
|||||||
event.event_uid.as_deref().unwrap_or(""),
|
event.event_uid.as_deref().unwrap_or(""),
|
||||||
event.start_time,
|
event.start_time,
|
||||||
)
|
)
|
||||||
&& exists {
|
&& exists
|
||||||
*skipped_count.lock().unwrap() += 1;
|
{
|
||||||
continue;
|
*skipped_count.lock().unwrap() += 1;
|
||||||
}
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// Generate embedding if requested (blocking call)
|
// Generate embedding if requested (blocking call)
|
||||||
let embedding = if let Some(ref ollama_client) = ollama {
|
let embedding = if let Some(ref ollama_client) = ollama {
|
||||||
|
|||||||
@@ -65,10 +65,11 @@ async fn main() -> Result<()> {
|
|||||||
location.latitude,
|
location.latitude,
|
||||||
location.longitude,
|
location.longitude,
|
||||||
)
|
)
|
||||||
&& exists {
|
&& exists
|
||||||
skipped_count += 1;
|
{
|
||||||
continue;
|
skipped_count += 1;
|
||||||
}
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
batch_inserts.push(InsertLocationRecord {
|
batch_inserts.push(InsertLocationRecord {
|
||||||
timestamp: location.timestamp,
|
timestamp: location.timestamp,
|
||||||
|
|||||||
@@ -95,10 +95,11 @@ async fn main() -> Result<()> {
|
|||||||
if args.skip_existing
|
if args.skip_existing
|
||||||
&& let Ok(exists) =
|
&& let Ok(exists) =
|
||||||
dao_instance.search_exists(&context, search.timestamp, &search.query)
|
dao_instance.search_exists(&context, search.timestamp, &search.query)
|
||||||
&& exists {
|
&& exists
|
||||||
skipped_count += 1;
|
{
|
||||||
continue;
|
skipped_count += 1;
|
||||||
}
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// Only insert if we have an embedding
|
// Only insert if we have an embedding
|
||||||
if let Some(embedding) = embedding_opt {
|
if let Some(embedding) = embedding_opt {
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use chrono::NaiveDate;
|
use chrono::NaiveDate;
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use image_api::ai::{strip_summary_boilerplate, OllamaClient, SmsApiClient};
|
use image_api::ai::{OllamaClient, SmsApiClient, strip_summary_boilerplate};
|
||||||
use image_api::database::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao};
|
use image_api::database::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao};
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|||||||
@@ -167,8 +167,10 @@ pub enum PhotoSize {
|
|||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
pub struct ThumbnailRequest {
|
pub struct ThumbnailRequest {
|
||||||
pub(crate) path: String,
|
pub(crate) path: String,
|
||||||
|
#[allow(dead_code)] // Part of API contract, may be used in future
|
||||||
pub(crate) size: Option<PhotoSize>,
|
pub(crate) size: Option<PhotoSize>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
#[allow(dead_code)] // Part of API contract, may be used in future
|
||||||
pub(crate) format: Option<ThumbnailFormat>,
|
pub(crate) format: Option<ThumbnailFormat>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ pub struct CalendarEvent {
|
|||||||
|
|
||||||
/// Data for inserting a new calendar event
|
/// Data for inserting a new calendar event
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
#[allow(dead_code)]
|
||||||
pub struct InsertCalendarEvent {
|
pub struct InsertCalendarEvent {
|
||||||
pub event_uid: Option<String>,
|
pub event_uid: Option<String>,
|
||||||
pub summary: String,
|
pub summary: String,
|
||||||
@@ -219,12 +220,13 @@ impl CalendarEventDao for SqliteCalendarEventDao {
|
|||||||
|
|
||||||
// Validate embedding dimensions if provided
|
// Validate embedding dimensions if provided
|
||||||
if let Some(ref emb) = event.embedding
|
if let Some(ref emb) = event.embedding
|
||||||
&& emb.len() != 768 {
|
&& emb.len() != 768
|
||||||
return Err(anyhow::anyhow!(
|
{
|
||||||
"Invalid embedding dimensions: {} (expected 768)",
|
return Err(anyhow::anyhow!(
|
||||||
emb.len()
|
"Invalid embedding dimensions: {} (expected 768)",
|
||||||
));
|
emb.len()
|
||||||
}
|
));
|
||||||
|
}
|
||||||
|
|
||||||
let embedding_bytes = event.embedding.as_ref().map(|e| Self::serialize_vector(e));
|
let embedding_bytes = event.embedding.as_ref().map(|e| Self::serialize_vector(e));
|
||||||
|
|
||||||
@@ -289,13 +291,14 @@ impl CalendarEventDao for SqliteCalendarEventDao {
|
|||||||
for event in events {
|
for event in events {
|
||||||
// Validate embedding if provided
|
// Validate embedding if provided
|
||||||
if let Some(ref emb) = event.embedding
|
if let Some(ref emb) = event.embedding
|
||||||
&& emb.len() != 768 {
|
&& emb.len() != 768
|
||||||
log::warn!(
|
{
|
||||||
"Skipping event with invalid embedding dimensions: {}",
|
log::warn!(
|
||||||
emb.len()
|
"Skipping event with invalid embedding dimensions: {}",
|
||||||
);
|
emb.len()
|
||||||
continue;
|
);
|
||||||
}
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
let embedding_bytes =
|
let embedding_bytes =
|
||||||
event.embedding.as_ref().map(|e| Self::serialize_vector(e));
|
event.embedding.as_ref().map(|e| Self::serialize_vector(e));
|
||||||
|
|||||||
@@ -1,569 +0,0 @@
|
|||||||
use diesel::prelude::*;
|
|
||||||
use diesel::sqlite::SqliteConnection;
|
|
||||||
use serde::Serialize;
|
|
||||||
use std::ops::DerefMut;
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
|
|
||||||
use crate::database::{DbError, DbErrorKind, connect};
|
|
||||||
use crate::otel::trace_db_call;
|
|
||||||
|
|
||||||
/// Represents a stored message embedding
|
|
||||||
#[derive(Serialize, Clone, Debug)]
|
|
||||||
pub struct MessageEmbedding {
|
|
||||||
pub id: i32,
|
|
||||||
pub contact: String,
|
|
||||||
pub body: String,
|
|
||||||
pub timestamp: i64,
|
|
||||||
pub is_sent: bool,
|
|
||||||
pub created_at: i64,
|
|
||||||
pub model_version: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Data for inserting a new message embedding
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct InsertMessageEmbedding {
|
|
||||||
pub contact: String,
|
|
||||||
pub body: String,
|
|
||||||
pub timestamp: i64,
|
|
||||||
pub is_sent: bool,
|
|
||||||
pub embedding: Vec<f32>,
|
|
||||||
pub created_at: i64,
|
|
||||||
pub model_version: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait EmbeddingDao: Sync + Send {
|
|
||||||
/// Store a message with its embedding vector
|
|
||||||
fn store_message_embedding(
|
|
||||||
&mut self,
|
|
||||||
context: &opentelemetry::Context,
|
|
||||||
message: InsertMessageEmbedding,
|
|
||||||
) -> Result<MessageEmbedding, DbError>;
|
|
||||||
|
|
||||||
/// Store multiple messages with embeddings in a single transaction
|
|
||||||
/// Returns the number of successfully stored messages
|
|
||||||
fn store_message_embeddings_batch(
|
|
||||||
&mut self,
|
|
||||||
context: &opentelemetry::Context,
|
|
||||||
messages: Vec<InsertMessageEmbedding>,
|
|
||||||
) -> Result<usize, DbError>;
|
|
||||||
|
|
||||||
/// Find semantically similar messages using vector similarity search
|
|
||||||
/// Returns the top `limit` most similar messages
|
|
||||||
/// If contact_filter is provided, only return messages from that contact
|
|
||||||
/// Otherwise, search across all contacts for cross-perspective context
|
|
||||||
fn find_similar_messages(
|
|
||||||
&mut self,
|
|
||||||
context: &opentelemetry::Context,
|
|
||||||
query_embedding: &[f32],
|
|
||||||
limit: usize,
|
|
||||||
contact_filter: Option<&str>,
|
|
||||||
) -> Result<Vec<MessageEmbedding>, DbError>;
|
|
||||||
|
|
||||||
/// Get the count of embedded messages for a specific contact
|
|
||||||
fn get_message_count(
|
|
||||||
&mut self,
|
|
||||||
context: &opentelemetry::Context,
|
|
||||||
contact: &str,
|
|
||||||
) -> Result<i64, DbError>;
|
|
||||||
|
|
||||||
/// Check if embeddings exist for a contact (idempotency check)
|
|
||||||
fn has_embeddings_for_contact(
|
|
||||||
&mut self,
|
|
||||||
context: &opentelemetry::Context,
|
|
||||||
contact: &str,
|
|
||||||
) -> Result<bool, DbError>;
|
|
||||||
|
|
||||||
/// Check if a specific message already has an embedding
|
|
||||||
fn message_exists(
|
|
||||||
&mut self,
|
|
||||||
context: &opentelemetry::Context,
|
|
||||||
contact: &str,
|
|
||||||
body: &str,
|
|
||||||
timestamp: i64,
|
|
||||||
) -> Result<bool, DbError>;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct SqliteEmbeddingDao {
|
|
||||||
connection: Arc<Mutex<SqliteConnection>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for SqliteEmbeddingDao {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::new()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SqliteEmbeddingDao {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
SqliteEmbeddingDao {
|
|
||||||
connection: Arc::new(Mutex::new(connect())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Serialize f32 vector to bytes for BLOB storage
|
|
||||||
fn serialize_vector(vec: &[f32]) -> Vec<u8> {
|
|
||||||
// Convert f32 slice to bytes using zerocopy
|
|
||||||
use zerocopy::IntoBytes;
|
|
||||||
vec.as_bytes().to_vec()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Deserialize bytes from BLOB back to f32 vector
|
|
||||||
fn deserialize_vector(bytes: &[u8]) -> Result<Vec<f32>, DbError> {
|
|
||||||
if !bytes.len().is_multiple_of(4) {
|
|
||||||
return Err(DbError::new(DbErrorKind::QueryError));
|
|
||||||
}
|
|
||||||
|
|
||||||
let count = bytes.len() / 4;
|
|
||||||
let mut vec = Vec::with_capacity(count);
|
|
||||||
|
|
||||||
for chunk in bytes.chunks_exact(4) {
|
|
||||||
let float = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
|
|
||||||
vec.push(float);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(vec)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Compute cosine similarity between two vectors
|
|
||||||
fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
|
|
||||||
if a.len() != b.len() {
|
|
||||||
return 0.0;
|
|
||||||
}
|
|
||||||
|
|
||||||
let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
|
|
||||||
let magnitude_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
|
|
||||||
let magnitude_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
|
|
||||||
|
|
||||||
if magnitude_a == 0.0 || magnitude_b == 0.0 {
|
|
||||||
return 0.0;
|
|
||||||
}
|
|
||||||
|
|
||||||
dot_product / (magnitude_a * magnitude_b)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl EmbeddingDao for SqliteEmbeddingDao {
|
|
||||||
fn store_message_embedding(
|
|
||||||
&mut self,
|
|
||||||
context: &opentelemetry::Context,
|
|
||||||
message: InsertMessageEmbedding,
|
|
||||||
) -> Result<MessageEmbedding, DbError> {
|
|
||||||
trace_db_call(context, "insert", "store_message_embedding", |_span| {
|
|
||||||
let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao");
|
|
||||||
|
|
||||||
// Validate embedding dimensions
|
|
||||||
if message.embedding.len() != 768 {
|
|
||||||
return Err(anyhow::anyhow!(
|
|
||||||
"Invalid embedding dimensions: {} (expected 768)",
|
|
||||||
message.embedding.len()
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Serialize embedding to bytes
|
|
||||||
let embedding_bytes = Self::serialize_vector(&message.embedding);
|
|
||||||
|
|
||||||
// Insert into message_embeddings table with BLOB
|
|
||||||
// Use INSERT OR IGNORE to skip duplicates (based on UNIQUE constraint)
|
|
||||||
let insert_result = diesel::sql_query(
|
|
||||||
"INSERT OR IGNORE INTO message_embeddings (contact, body, timestamp, is_sent, embedding, created_at, model_version)
|
|
||||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"
|
|
||||||
)
|
|
||||||
.bind::<diesel::sql_types::Text, _>(&message.contact)
|
|
||||||
.bind::<diesel::sql_types::Text, _>(&message.body)
|
|
||||||
.bind::<diesel::sql_types::BigInt, _>(message.timestamp)
|
|
||||||
.bind::<diesel::sql_types::Bool, _>(message.is_sent)
|
|
||||||
.bind::<diesel::sql_types::Binary, _>(&embedding_bytes)
|
|
||||||
.bind::<diesel::sql_types::BigInt, _>(message.created_at)
|
|
||||||
.bind::<diesel::sql_types::Text, _>(&message.model_version)
|
|
||||||
.execute(conn.deref_mut())
|
|
||||||
.map_err(|e| anyhow::anyhow!("Insert error: {:?}", e))?;
|
|
||||||
|
|
||||||
// If INSERT OR IGNORE skipped (duplicate), find the existing record
|
|
||||||
let row_id: i32 = if insert_result == 0 {
|
|
||||||
// Duplicate - find the existing record
|
|
||||||
diesel::sql_query(
|
|
||||||
"SELECT id FROM message_embeddings WHERE contact = ?1 AND body = ?2 AND timestamp = ?3"
|
|
||||||
)
|
|
||||||
.bind::<diesel::sql_types::Text, _>(&message.contact)
|
|
||||||
.bind::<diesel::sql_types::Text, _>(&message.body)
|
|
||||||
.bind::<diesel::sql_types::BigInt, _>(message.timestamp)
|
|
||||||
.get_result::<LastInsertRowId>(conn.deref_mut())
|
|
||||||
.map(|r| r.id as i32)
|
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to find existing record: {:?}", e))?
|
|
||||||
} else {
|
|
||||||
// New insert - get the last inserted row ID
|
|
||||||
diesel::sql_query("SELECT last_insert_rowid() as id")
|
|
||||||
.get_result::<LastInsertRowId>(conn.deref_mut())
|
|
||||||
.map(|r| r.id as i32)
|
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to get last insert ID: {:?}", e))?
|
|
||||||
};
|
|
||||||
|
|
||||||
// Return the stored message
|
|
||||||
Ok(MessageEmbedding {
|
|
||||||
id: row_id,
|
|
||||||
contact: message.contact,
|
|
||||||
body: message.body,
|
|
||||||
timestamp: message.timestamp,
|
|
||||||
is_sent: message.is_sent,
|
|
||||||
created_at: message.created_at,
|
|
||||||
model_version: message.model_version,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.map_err(|_| DbError::new(DbErrorKind::InsertError))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn store_message_embeddings_batch(
|
|
||||||
&mut self,
|
|
||||||
context: &opentelemetry::Context,
|
|
||||||
messages: Vec<InsertMessageEmbedding>,
|
|
||||||
) -> Result<usize, DbError> {
|
|
||||||
trace_db_call(context, "insert", "store_message_embeddings_batch", |_span| {
|
|
||||||
let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao");
|
|
||||||
|
|
||||||
// Start transaction
|
|
||||||
conn.transaction::<_, anyhow::Error, _>(|conn| {
|
|
||||||
let mut stored_count = 0;
|
|
||||||
|
|
||||||
for message in messages {
|
|
||||||
// Validate embedding dimensions
|
|
||||||
if message.embedding.len() != 768 {
|
|
||||||
log::warn!(
|
|
||||||
"Invalid embedding dimensions: {} (expected 768), skipping",
|
|
||||||
message.embedding.len()
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Serialize embedding to bytes
|
|
||||||
let embedding_bytes = Self::serialize_vector(&message.embedding);
|
|
||||||
|
|
||||||
// Insert into message_embeddings table with BLOB
|
|
||||||
// Use INSERT OR IGNORE to skip duplicates (based on UNIQUE constraint)
|
|
||||||
match diesel::sql_query(
|
|
||||||
"INSERT OR IGNORE INTO message_embeddings (contact, body, timestamp, is_sent, embedding, created_at, model_version)
|
|
||||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"
|
|
||||||
)
|
|
||||||
.bind::<diesel::sql_types::Text, _>(&message.contact)
|
|
||||||
.bind::<diesel::sql_types::Text, _>(&message.body)
|
|
||||||
.bind::<diesel::sql_types::BigInt, _>(message.timestamp)
|
|
||||||
.bind::<diesel::sql_types::Bool, _>(message.is_sent)
|
|
||||||
.bind::<diesel::sql_types::Binary, _>(&embedding_bytes)
|
|
||||||
.bind::<diesel::sql_types::BigInt, _>(message.created_at)
|
|
||||||
.bind::<diesel::sql_types::Text, _>(&message.model_version)
|
|
||||||
.execute(conn)
|
|
||||||
{
|
|
||||||
Ok(rows) if rows > 0 => stored_count += 1,
|
|
||||||
Ok(_) => {
|
|
||||||
// INSERT OR IGNORE skipped (duplicate)
|
|
||||||
log::debug!("Skipped duplicate message: {:?}", message.body.chars().take(50).collect::<String>());
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
log::warn!("Failed to insert message in batch: {:?}", e);
|
|
||||||
// Continue with other messages instead of failing entire batch
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(stored_count)
|
|
||||||
})
|
|
||||||
.map_err(|e| anyhow::anyhow!("Transaction error: {:?}", e))
|
|
||||||
})
|
|
||||||
.map_err(|_| DbError::new(DbErrorKind::InsertError))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn find_similar_messages(
|
|
||||||
&mut self,
|
|
||||||
context: &opentelemetry::Context,
|
|
||||||
query_embedding: &[f32],
|
|
||||||
limit: usize,
|
|
||||||
contact_filter: Option<&str>,
|
|
||||||
) -> Result<Vec<MessageEmbedding>, DbError> {
|
|
||||||
trace_db_call(context, "query", "find_similar_messages", |_span| {
|
|
||||||
let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao");
|
|
||||||
|
|
||||||
// Validate embedding dimensions
|
|
||||||
if query_embedding.len() != 768 {
|
|
||||||
return Err(anyhow::anyhow!(
|
|
||||||
"Invalid query embedding dimensions: {} (expected 768)",
|
|
||||||
query_embedding.len()
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load messages with optional contact filter
|
|
||||||
let results = if let Some(contact) = contact_filter {
|
|
||||||
log::debug!("RAG search filtered to contact: {}", contact);
|
|
||||||
diesel::sql_query(
|
|
||||||
"SELECT id, contact, body, timestamp, is_sent, embedding, created_at, model_version
|
|
||||||
FROM message_embeddings WHERE contact = ?1"
|
|
||||||
)
|
|
||||||
.bind::<diesel::sql_types::Text, _>(contact)
|
|
||||||
.load::<MessageEmbeddingWithVectorRow>(conn.deref_mut())
|
|
||||||
.map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?
|
|
||||||
} else {
|
|
||||||
log::debug!("RAG search across ALL contacts (cross-perspective)");
|
|
||||||
diesel::sql_query(
|
|
||||||
"SELECT id, contact, body, timestamp, is_sent, embedding, created_at, model_version
|
|
||||||
FROM message_embeddings"
|
|
||||||
)
|
|
||||||
.load::<MessageEmbeddingWithVectorRow>(conn.deref_mut())
|
|
||||||
.map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?
|
|
||||||
};
|
|
||||||
|
|
||||||
log::debug!("Loaded {} messages for similarity comparison", results.len());
|
|
||||||
|
|
||||||
// Compute similarity for each message
|
|
||||||
let mut scored_messages: Vec<(f32, MessageEmbedding)> = results
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|row| {
|
|
||||||
// Deserialize the embedding BLOB
|
|
||||||
match Self::deserialize_vector(&row.embedding) {
|
|
||||||
Ok(embedding) => {
|
|
||||||
// Compute cosine similarity
|
|
||||||
let similarity = Self::cosine_similarity(query_embedding, &embedding);
|
|
||||||
Some((
|
|
||||||
similarity,
|
|
||||||
MessageEmbedding {
|
|
||||||
id: row.id,
|
|
||||||
contact: row.contact,
|
|
||||||
body: row.body,
|
|
||||||
timestamp: row.timestamp,
|
|
||||||
is_sent: row.is_sent,
|
|
||||||
created_at: row.created_at,
|
|
||||||
model_version: row.model_version,
|
|
||||||
},
|
|
||||||
))
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
log::warn!("Failed to deserialize embedding for message {}: {:?}", row.id, e);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// Sort by similarity (highest first)
|
|
||||||
scored_messages.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
|
|
||||||
|
|
||||||
// Log similarity score distribution
|
|
||||||
if !scored_messages.is_empty() {
|
|
||||||
log::info!(
|
|
||||||
"Similarity score distribution - Top: {:.3}, Median: {:.3}, Bottom: {:.3}",
|
|
||||||
scored_messages.first().map(|(s, _)| *s).unwrap_or(0.0),
|
|
||||||
scored_messages.get(scored_messages.len() / 2).map(|(s, _)| *s).unwrap_or(0.0),
|
|
||||||
scored_messages.last().map(|(s, _)| *s).unwrap_or(0.0)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply minimum similarity threshold
|
|
||||||
// With single-contact embeddings, scores tend to be higher due to writing style similarity
|
|
||||||
// Using 0.65 to get only truly semantically relevant messages
|
|
||||||
let min_similarity = 0.65;
|
|
||||||
let filtered_messages: Vec<(f32, MessageEmbedding)> = scored_messages
|
|
||||||
.into_iter()
|
|
||||||
.filter(|(similarity, _)| *similarity >= min_similarity)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
log::info!(
|
|
||||||
"After similarity filtering (min_similarity={}): {} messages passed threshold",
|
|
||||||
min_similarity,
|
|
||||||
filtered_messages.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
// Filter out short/generic messages (under 30 characters)
|
|
||||||
// This removes conversational closings like "Thanks for talking" that dominate results
|
|
||||||
let min_message_length = 30;
|
|
||||||
|
|
||||||
// Common closing phrases that should be excluded from RAG results
|
|
||||||
let stop_phrases = [
|
|
||||||
"thanks for talking",
|
|
||||||
"thank you for talking",
|
|
||||||
"good talking",
|
|
||||||
"nice talking",
|
|
||||||
"good night",
|
|
||||||
"good morning",
|
|
||||||
"love you",
|
|
||||||
];
|
|
||||||
|
|
||||||
let filtered_messages: Vec<(f32, MessageEmbedding)> = filtered_messages
|
|
||||||
.into_iter()
|
|
||||||
.filter(|(_, message)| {
|
|
||||||
// Filter by length
|
|
||||||
if message.body.len() < min_message_length {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Filter out messages that are primarily generic closings
|
|
||||||
let body_lower = message.body.to_lowercase();
|
|
||||||
for phrase in &stop_phrases {
|
|
||||||
// If the message contains this phrase and is short, it's likely just a closing
|
|
||||||
if body_lower.contains(phrase) && message.body.len() < 100 {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
true
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
log::info!(
|
|
||||||
"After length filtering (min {} chars): {} messages remain",
|
|
||||||
min_message_length,
|
|
||||||
filtered_messages.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
// Apply temporal diversity filter - don't return too many messages from the same day
|
|
||||||
// This prevents RAG from returning clusters of messages from one conversation
|
|
||||||
let mut filtered_with_diversity = Vec::new();
|
|
||||||
let mut dates_seen: std::collections::HashMap<chrono::NaiveDate, usize> = std::collections::HashMap::new();
|
|
||||||
let max_per_day = 3; // Maximum 3 messages from any single day
|
|
||||||
|
|
||||||
for (similarity, message) in filtered_messages.into_iter() {
|
|
||||||
let date = chrono::DateTime::from_timestamp(message.timestamp, 0)
|
|
||||||
.map(|dt| dt.date_naive())
|
|
||||||
.unwrap_or_else(|| chrono::Utc::now().date_naive());
|
|
||||||
|
|
||||||
let count = dates_seen.entry(date).or_insert(0);
|
|
||||||
if *count < max_per_day {
|
|
||||||
*count += 1;
|
|
||||||
filtered_with_diversity.push((similarity, message));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log::info!(
|
|
||||||
"After temporal diversity filtering (max {} per day): {} messages remain",
|
|
||||||
max_per_day,
|
|
||||||
filtered_with_diversity.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
// Take top N results from diversity-filtered messages
|
|
||||||
let top_results: Vec<MessageEmbedding> = filtered_with_diversity
|
|
||||||
.into_iter()
|
|
||||||
.take(limit)
|
|
||||||
.map(|(similarity, message)| {
|
|
||||||
let time = chrono::DateTime::from_timestamp(message.timestamp, 0)
|
|
||||||
.map(|dt| dt.format("%Y-%m-%d").to_string())
|
|
||||||
.unwrap_or_default();
|
|
||||||
log::info!(
|
|
||||||
"RAG Match: similarity={:.3}, date={}, contact={}, body=\"{}\"",
|
|
||||||
similarity,
|
|
||||||
time,
|
|
||||||
message.contact,
|
|
||||||
&message.body.chars().take(80).collect::<String>()
|
|
||||||
);
|
|
||||||
message
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
Ok(top_results)
|
|
||||||
})
|
|
||||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_message_count(
|
|
||||||
&mut self,
|
|
||||||
context: &opentelemetry::Context,
|
|
||||||
contact: &str,
|
|
||||||
) -> Result<i64, DbError> {
|
|
||||||
trace_db_call(context, "query", "get_message_count", |_span| {
|
|
||||||
let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao");
|
|
||||||
|
|
||||||
let count = diesel::sql_query(
|
|
||||||
"SELECT COUNT(*) as count FROM message_embeddings WHERE contact = ?1",
|
|
||||||
)
|
|
||||||
.bind::<diesel::sql_types::Text, _>(contact)
|
|
||||||
.get_result::<CountResult>(conn.deref_mut())
|
|
||||||
.map(|r| r.count)
|
|
||||||
.map_err(|e| anyhow::anyhow!("Count query error: {:?}", e))?;
|
|
||||||
|
|
||||||
Ok(count)
|
|
||||||
})
|
|
||||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn has_embeddings_for_contact(
|
|
||||||
&mut self,
|
|
||||||
context: &opentelemetry::Context,
|
|
||||||
contact: &str,
|
|
||||||
) -> Result<bool, DbError> {
|
|
||||||
self.get_message_count(context, contact)
|
|
||||||
.map(|count| count > 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn message_exists(
|
|
||||||
&mut self,
|
|
||||||
context: &opentelemetry::Context,
|
|
||||||
contact: &str,
|
|
||||||
body: &str,
|
|
||||||
timestamp: i64,
|
|
||||||
) -> Result<bool, DbError> {
|
|
||||||
trace_db_call(context, "query", "message_exists", |_span| {
|
|
||||||
let mut conn = self.connection.lock().expect("Unable to get EmbeddingDao");
|
|
||||||
|
|
||||||
let count = diesel::sql_query(
|
|
||||||
"SELECT COUNT(*) as count FROM message_embeddings
|
|
||||||
WHERE contact = ?1 AND body = ?2 AND timestamp = ?3",
|
|
||||||
)
|
|
||||||
.bind::<diesel::sql_types::Text, _>(contact)
|
|
||||||
.bind::<diesel::sql_types::Text, _>(body)
|
|
||||||
.bind::<diesel::sql_types::BigInt, _>(timestamp)
|
|
||||||
.get_result::<CountResult>(conn.deref_mut())
|
|
||||||
.map(|r| r.count)
|
|
||||||
.map_err(|e| anyhow::anyhow!("Count query error: {:?}", e))?;
|
|
||||||
|
|
||||||
Ok(count > 0)
|
|
||||||
})
|
|
||||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper structs for raw SQL queries
|
|
||||||
|
|
||||||
#[derive(QueryableByName)]
|
|
||||||
struct LastInsertRowId {
|
|
||||||
#[diesel(sql_type = diesel::sql_types::BigInt)]
|
|
||||||
id: i64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(QueryableByName)]
|
|
||||||
struct MessageEmbeddingRow {
|
|
||||||
#[diesel(sql_type = diesel::sql_types::Integer)]
|
|
||||||
id: i32,
|
|
||||||
#[diesel(sql_type = diesel::sql_types::Text)]
|
|
||||||
contact: String,
|
|
||||||
#[diesel(sql_type = diesel::sql_types::Text)]
|
|
||||||
body: String,
|
|
||||||
#[diesel(sql_type = diesel::sql_types::BigInt)]
|
|
||||||
timestamp: i64,
|
|
||||||
#[diesel(sql_type = diesel::sql_types::Bool)]
|
|
||||||
is_sent: bool,
|
|
||||||
#[diesel(sql_type = diesel::sql_types::BigInt)]
|
|
||||||
created_at: i64,
|
|
||||||
#[diesel(sql_type = diesel::sql_types::Text)]
|
|
||||||
model_version: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(QueryableByName)]
|
|
||||||
struct MessageEmbeddingWithVectorRow {
|
|
||||||
#[diesel(sql_type = diesel::sql_types::Integer)]
|
|
||||||
id: i32,
|
|
||||||
#[diesel(sql_type = diesel::sql_types::Text)]
|
|
||||||
contact: String,
|
|
||||||
#[diesel(sql_type = diesel::sql_types::Text)]
|
|
||||||
body: String,
|
|
||||||
#[diesel(sql_type = diesel::sql_types::BigInt)]
|
|
||||||
timestamp: i64,
|
|
||||||
#[diesel(sql_type = diesel::sql_types::Bool)]
|
|
||||||
is_sent: bool,
|
|
||||||
#[diesel(sql_type = diesel::sql_types::Binary)]
|
|
||||||
embedding: Vec<u8>,
|
|
||||||
#[diesel(sql_type = diesel::sql_types::BigInt)]
|
|
||||||
created_at: i64,
|
|
||||||
#[diesel(sql_type = diesel::sql_types::Text)]
|
|
||||||
model_version: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(QueryableByName)]
|
|
||||||
struct CountResult {
|
|
||||||
#[diesel(sql_type = diesel::sql_types::BigInt)]
|
|
||||||
count: i64,
|
|
||||||
}
|
|
||||||
@@ -214,12 +214,13 @@ impl LocationHistoryDao for SqliteLocationHistoryDao {
|
|||||||
|
|
||||||
// Validate embedding dimensions if provided (rare for location data)
|
// Validate embedding dimensions if provided (rare for location data)
|
||||||
if let Some(ref emb) = location.embedding
|
if let Some(ref emb) = location.embedding
|
||||||
&& emb.len() != 768 {
|
&& emb.len() != 768
|
||||||
return Err(anyhow::anyhow!(
|
{
|
||||||
"Invalid embedding dimensions: {} (expected 768)",
|
return Err(anyhow::anyhow!(
|
||||||
emb.len()
|
"Invalid embedding dimensions: {} (expected 768)",
|
||||||
));
|
emb.len()
|
||||||
}
|
));
|
||||||
|
}
|
||||||
|
|
||||||
let embedding_bytes = location
|
let embedding_bytes = location
|
||||||
.embedding
|
.embedding
|
||||||
@@ -289,13 +290,14 @@ impl LocationHistoryDao for SqliteLocationHistoryDao {
|
|||||||
for location in locations {
|
for location in locations {
|
||||||
// Validate embedding if provided (rare)
|
// Validate embedding if provided (rare)
|
||||||
if let Some(ref emb) = location.embedding
|
if let Some(ref emb) = location.embedding
|
||||||
&& emb.len() != 768 {
|
&& emb.len() != 768
|
||||||
log::warn!(
|
{
|
||||||
"Skipping location with invalid embedding dimensions: {}",
|
log::warn!(
|
||||||
emb.len()
|
"Skipping location with invalid embedding dimensions: {}",
|
||||||
);
|
emb.len()
|
||||||
continue;
|
);
|
||||||
}
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
let embedding_bytes = location
|
let embedding_bytes = location
|
||||||
.embedding
|
.embedding
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ use crate::otel::trace_db_call;
|
|||||||
|
|
||||||
pub mod calendar_dao;
|
pub mod calendar_dao;
|
||||||
pub mod daily_summary_dao;
|
pub mod daily_summary_dao;
|
||||||
pub mod embeddings_dao;
|
|
||||||
pub mod insights_dao;
|
pub mod insights_dao;
|
||||||
pub mod location_dao;
|
pub mod location_dao;
|
||||||
pub mod models;
|
pub mod models;
|
||||||
@@ -20,7 +19,6 @@ pub mod search_dao;
|
|||||||
|
|
||||||
pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao};
|
pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao};
|
||||||
pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao};
|
pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao};
|
||||||
pub use embeddings_dao::{EmbeddingDao, InsertMessageEmbedding};
|
|
||||||
pub use insights_dao::{InsightDao, SqliteInsightDao};
|
pub use insights_dao::{InsightDao, SqliteInsightDao};
|
||||||
pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao};
|
pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao};
|
||||||
pub use search_dao::{SearchHistoryDao, SqliteSearchHistoryDao};
|
pub use search_dao::{SearchHistoryDao, SqliteSearchHistoryDao};
|
||||||
|
|||||||
17
src/files.rs
17
src/files.rs
@@ -233,12 +233,8 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
|||||||
if let (Some(photo_lat), Some(photo_lon)) =
|
if let (Some(photo_lat), Some(photo_lon)) =
|
||||||
(exif.gps_latitude, exif.gps_longitude)
|
(exif.gps_latitude, exif.gps_longitude)
|
||||||
{
|
{
|
||||||
let distance = haversine_distance(
|
let distance =
|
||||||
lat,
|
haversine_distance(lat, lon, photo_lat as f64, photo_lon as f64);
|
||||||
lon,
|
|
||||||
photo_lat as f64,
|
|
||||||
photo_lon as f64,
|
|
||||||
);
|
|
||||||
distance <= radius_km
|
distance <= radius_km
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
@@ -410,9 +406,13 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
|||||||
)
|
)
|
||||||
})
|
})
|
||||||
.map(|path: &PathBuf| {
|
.map(|path: &PathBuf| {
|
||||||
let relative = path.strip_prefix(&app_state.base_path).unwrap_or_else(|_| panic!("Unable to strip base path {} from file path {}",
|
let relative = path.strip_prefix(&app_state.base_path).unwrap_or_else(|_| {
|
||||||
|
panic!(
|
||||||
|
"Unable to strip base path {} from file path {}",
|
||||||
&app_state.base_path.path(),
|
&app_state.base_path.path(),
|
||||||
path.display()));
|
path.display()
|
||||||
|
)
|
||||||
|
});
|
||||||
relative.to_path_buf()
|
relative.to_path_buf()
|
||||||
})
|
})
|
||||||
.map(|f| f.to_str().unwrap().to_string())
|
.map(|f| f.to_str().unwrap().to_string())
|
||||||
@@ -791,6 +791,7 @@ pub struct RealFileSystem {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RealFileSystem {
|
impl RealFileSystem {
|
||||||
|
#[allow(dead_code)] // Used in main.rs binary and tests
|
||||||
pub(crate) fn new(base_path: String) -> RealFileSystem {
|
pub(crate) fn new(base_path: String) -> RealFileSystem {
|
||||||
RealFileSystem { base_path }
|
RealFileSystem { base_path }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ struct LocationPoint {
|
|||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
struct ActivityRecord {
|
struct ActivityRecord {
|
||||||
activity: Vec<ActivityType>,
|
activity: Vec<ActivityType>,
|
||||||
|
#[allow(dead_code)] // Part of JSON structure, may be used in future
|
||||||
timestamp_ms: Option<String>,
|
timestamp_ms: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -30,34 +30,37 @@ pub fn parse_search_html(path: &str) -> Result<Vec<ParsedSearchRecord>> {
|
|||||||
|
|
||||||
// Strategy 2: Look for outer-cell structure (older format)
|
// Strategy 2: Look for outer-cell structure (older format)
|
||||||
if records.is_empty()
|
if records.is_empty()
|
||||||
&& let Ok(outer_selector) = Selector::parse("div.outer-cell") {
|
&& let Ok(outer_selector) = Selector::parse("div.outer-cell")
|
||||||
for cell in document.select(&outer_selector) {
|
{
|
||||||
if let Some(record) = parse_outer_cell(&cell) {
|
for cell in document.select(&outer_selector) {
|
||||||
records.push(record);
|
if let Some(record) = parse_outer_cell(&cell) {
|
||||||
}
|
records.push(record);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Strategy 3: Generic approach - look for links and timestamps
|
// Strategy 3: Generic approach - look for links and timestamps
|
||||||
if records.is_empty()
|
if records.is_empty()
|
||||||
&& let Ok(link_selector) = Selector::parse("a") {
|
&& let Ok(link_selector) = Selector::parse("a")
|
||||||
for link in document.select(&link_selector) {
|
{
|
||||||
if let Some(href) = link.value().attr("href") {
|
for link in document.select(&link_selector) {
|
||||||
// Check if it's a search URL
|
if let Some(href) = link.value().attr("href") {
|
||||||
if (href.contains("google.com/search?q=") || href.contains("search?q="))
|
// Check if it's a search URL
|
||||||
&& let Some(query) = extract_query_from_url(href) {
|
if (href.contains("google.com/search?q=") || href.contains("search?q="))
|
||||||
// Try to find nearby timestamp
|
&& let Some(query) = extract_query_from_url(href)
|
||||||
let timestamp = find_nearby_timestamp(&link);
|
{
|
||||||
|
// Try to find nearby timestamp
|
||||||
|
let timestamp = find_nearby_timestamp(&link);
|
||||||
|
|
||||||
records.push(ParsedSearchRecord {
|
records.push(ParsedSearchRecord {
|
||||||
timestamp: timestamp.unwrap_or_else(|| Utc::now().timestamp()),
|
timestamp: timestamp.unwrap_or_else(|| Utc::now().timestamp()),
|
||||||
query,
|
query,
|
||||||
search_engine: Some("Google".to_string()),
|
search_engine: Some("Google".to_string()),
|
||||||
});
|
});
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(records)
|
Ok(records)
|
||||||
}
|
}
|
||||||
@@ -118,11 +121,12 @@ fn extract_query_from_url(url: &str) -> Option<String> {
|
|||||||
fn find_nearby_timestamp(element: &scraper::ElementRef) -> Option<i64> {
|
fn find_nearby_timestamp(element: &scraper::ElementRef) -> Option<i64> {
|
||||||
// Look for timestamp in parent or sibling elements
|
// Look for timestamp in parent or sibling elements
|
||||||
if let Some(parent) = element.parent()
|
if let Some(parent) = element.parent()
|
||||||
&& parent.value().as_element().is_some() {
|
&& parent.value().as_element().is_some()
|
||||||
let parent_ref = scraper::ElementRef::wrap(parent)?;
|
{
|
||||||
let text = parent_ref.text().collect::<Vec<_>>().join(" ");
|
let parent_ref = scraper::ElementRef::wrap(parent)?;
|
||||||
return parse_timestamp_from_text(&text);
|
let text = parent_ref.text().collect::<Vec<_>>().join(" ");
|
||||||
}
|
return parse_timestamp_from_text(&text);
|
||||||
|
}
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -135,9 +139,10 @@ fn parse_timestamp_from_text(text: &str) -> Option<i64> {
|
|||||||
if let Some(iso_match) = text
|
if let Some(iso_match) = text
|
||||||
.split_whitespace()
|
.split_whitespace()
|
||||||
.find(|s| s.contains('T') && s.contains('-'))
|
.find(|s| s.contains('T') && s.contains('-'))
|
||||||
&& let Ok(dt) = DateTime::parse_from_rfc3339(iso_match) {
|
&& let Ok(dt) = DateTime::parse_from_rfc3339(iso_match)
|
||||||
return Some(dt.timestamp());
|
{
|
||||||
}
|
return Some(dt.timestamp());
|
||||||
|
}
|
||||||
|
|
||||||
// Try common date patterns
|
// Try common date patterns
|
||||||
let patterns = [
|
let patterns = [
|
||||||
@@ -149,9 +154,10 @@ fn parse_timestamp_from_text(text: &str) -> Option<i64> {
|
|||||||
for pattern in patterns {
|
for pattern in patterns {
|
||||||
// Extract potential date string
|
// Extract potential date string
|
||||||
if let Some(date_part) = extract_date_substring(text)
|
if let Some(date_part) = extract_date_substring(text)
|
||||||
&& let Ok(dt) = NaiveDateTime::parse_from_str(&date_part, pattern) {
|
&& let Ok(dt) = NaiveDateTime::parse_from_str(&date_part, pattern)
|
||||||
return Some(dt.and_utc().timestamp());
|
{
|
||||||
}
|
return Some(dt.and_utc().timestamp());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
None
|
None
|
||||||
|
|||||||
Reference in New Issue
Block a user