Add Google Takeout data import infrastructure
Implements Phase 1 & 2 of Google Takeout RAG integration: - Database migrations for calendar_events, location_history, search_history - DAO implementations with hybrid time + semantic search - Parsers for .ics, JSON, and HTML Google Takeout formats - Import utilities with batch insert optimization Features: - CalendarEventDao: Hybrid time-range + semantic search for events - LocationHistoryDao: GPS proximity with Haversine distance calculation - SearchHistoryDao: Semantic-first search (queries are embedding-rich) - Batch inserts for performance (1M+ records in minutes vs hours) - OpenTelemetry tracing for all database operations Import utilities: - import_calendar: Parse .ics with optional embedding generation - import_location_history: High-volume GPS data with batch inserts - import_search_history: Always generates embeddings for semantic search 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
167
src/bin/import_calendar.rs
Normal file
167
src/bin/import_calendar.rs
Normal file
@@ -0,0 +1,167 @@
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::Utc;
|
||||
use clap::Parser;
|
||||
use image_api::ai::ollama::OllamaClient;
|
||||
use image_api::database::calendar_dao::{InsertCalendarEvent, SqliteCalendarEventDao};
|
||||
use image_api::parsers::ical_parser::parse_ics_file;
|
||||
use log::{error, info};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
// Import the trait to use its methods
|
||||
use image_api::database::CalendarEventDao;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(author, version, about = "Import Google Takeout Calendar data", long_about = None)]
|
||||
struct Args {
|
||||
/// Path to the .ics calendar file
|
||||
#[arg(short, long)]
|
||||
path: String,
|
||||
|
||||
/// Generate embeddings for calendar events (slower but enables semantic search)
|
||||
#[arg(long, default_value = "false")]
|
||||
generate_embeddings: bool,
|
||||
|
||||
/// Skip events that already exist in the database
|
||||
#[arg(long, default_value = "true")]
|
||||
skip_existing: bool,
|
||||
|
||||
/// Batch size for embedding generation
|
||||
#[arg(long, default_value = "128")]
|
||||
batch_size: usize,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
dotenv::dotenv().ok();
|
||||
env_logger::init();
|
||||
|
||||
let args = Args::parse();
|
||||
|
||||
info!("Parsing calendar file: {}", args.path);
|
||||
let events = parse_ics_file(&args.path).context("Failed to parse .ics file")?;
|
||||
|
||||
info!("Found {} calendar events", events.len());
|
||||
|
||||
let context = opentelemetry::Context::current();
|
||||
|
||||
let ollama = if args.generate_embeddings {
|
||||
let primary_url = dotenv::var("OLLAMA_PRIMARY_URL")
|
||||
.or_else(|_| dotenv::var("OLLAMA_URL"))
|
||||
.unwrap_or_else(|_| "http://localhost:11434".to_string());
|
||||
let fallback_url = dotenv::var("OLLAMA_FALLBACK_URL").ok();
|
||||
let primary_model = dotenv::var("OLLAMA_PRIMARY_MODEL")
|
||||
.or_else(|_| dotenv::var("OLLAMA_MODEL"))
|
||||
.unwrap_or_else(|_| "nomic-embed-text:v1.5".to_string());
|
||||
let fallback_model = dotenv::var("OLLAMA_FALLBACK_MODEL").ok();
|
||||
|
||||
Some(OllamaClient::new(
|
||||
primary_url,
|
||||
fallback_url,
|
||||
primary_model,
|
||||
fallback_model,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let inserted_count = Arc::new(Mutex::new(0));
|
||||
let skipped_count = Arc::new(Mutex::new(0));
|
||||
let error_count = Arc::new(Mutex::new(0));
|
||||
|
||||
// Process events in batches
|
||||
// Can't use rayon with async, so process sequentially
|
||||
for event in &events {
|
||||
let mut dao_instance = SqliteCalendarEventDao::new();
|
||||
|
||||
// Check if event exists
|
||||
if args.skip_existing {
|
||||
if let Ok(exists) = dao_instance.event_exists(
|
||||
&context,
|
||||
event.event_uid.as_deref().unwrap_or(""),
|
||||
event.start_time,
|
||||
) {
|
||||
if exists {
|
||||
*skipped_count.lock().unwrap() += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Generate embedding if requested (blocking call)
|
||||
let embedding = if let Some(ref ollama_client) = ollama {
|
||||
let text = format!(
|
||||
"{} {} {}",
|
||||
event.summary,
|
||||
event.description.as_deref().unwrap_or(""),
|
||||
event.location.as_deref().unwrap_or("")
|
||||
);
|
||||
|
||||
match tokio::task::block_in_place(|| {
|
||||
tokio::runtime::Handle::current()
|
||||
.block_on(async { ollama_client.generate_embedding(&text).await })
|
||||
}) {
|
||||
Ok(emb) => Some(emb),
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to generate embedding for event '{}': {}",
|
||||
event.summary, e
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Insert into database
|
||||
let insert_event = InsertCalendarEvent {
|
||||
event_uid: event.event_uid.clone(),
|
||||
summary: event.summary.clone(),
|
||||
description: event.description.clone(),
|
||||
location: event.location.clone(),
|
||||
start_time: event.start_time,
|
||||
end_time: event.end_time,
|
||||
all_day: event.all_day,
|
||||
organizer: event.organizer.clone(),
|
||||
attendees: if event.attendees.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(serde_json::to_string(&event.attendees).unwrap_or_default())
|
||||
},
|
||||
embedding,
|
||||
created_at: Utc::now().timestamp(),
|
||||
source_file: Some(args.path.clone()),
|
||||
};
|
||||
|
||||
match dao_instance.store_event(&context, insert_event) {
|
||||
Ok(_) => {
|
||||
*inserted_count.lock().unwrap() += 1;
|
||||
if *inserted_count.lock().unwrap() % 100 == 0 {
|
||||
info!("Imported {} events...", *inserted_count.lock().unwrap());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to store event '{}': {:?}", event.summary, e);
|
||||
*error_count.lock().unwrap() += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let final_inserted = *inserted_count.lock().unwrap();
|
||||
let final_skipped = *skipped_count.lock().unwrap();
|
||||
let final_errors = *error_count.lock().unwrap();
|
||||
|
||||
info!("\n=== Import Summary ===");
|
||||
info!("Total events found: {}", events.len());
|
||||
info!("Successfully inserted: {}", final_inserted);
|
||||
info!("Skipped (already exist): {}", final_skipped);
|
||||
info!("Errors: {}", final_errors);
|
||||
|
||||
if args.generate_embeddings {
|
||||
info!("Embeddings were generated for semantic search");
|
||||
} else {
|
||||
info!("No embeddings generated (use --generate-embeddings to enable semantic search)");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
115
src/bin/import_location_history.rs
Normal file
115
src/bin/import_location_history.rs
Normal file
@@ -0,0 +1,115 @@
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::Utc;
|
||||
use clap::Parser;
|
||||
use image_api::database::location_dao::{InsertLocationRecord, SqliteLocationHistoryDao};
|
||||
use image_api::parsers::location_json_parser::parse_location_json;
|
||||
use log::{error, info};
|
||||
// Import the trait to use its methods
|
||||
use image_api::database::LocationHistoryDao;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(author, version, about = "Import Google Takeout Location History data", long_about = None)]
|
||||
struct Args {
|
||||
/// Path to the Location History JSON file
|
||||
#[arg(short, long)]
|
||||
path: String,
|
||||
|
||||
/// Skip locations that already exist in the database
|
||||
#[arg(long, default_value = "true")]
|
||||
skip_existing: bool,
|
||||
|
||||
/// Batch size for database inserts
|
||||
#[arg(long, default_value = "1000")]
|
||||
batch_size: usize,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
dotenv::dotenv().ok();
|
||||
env_logger::init();
|
||||
|
||||
let args = Args::parse();
|
||||
|
||||
info!("Parsing location history file: {}", args.path);
|
||||
let locations =
|
||||
parse_location_json(&args.path).context("Failed to parse location history JSON")?;
|
||||
|
||||
info!("Found {} location records", locations.len());
|
||||
|
||||
let context = opentelemetry::Context::current();
|
||||
|
||||
let mut inserted_count = 0;
|
||||
let mut skipped_count = 0;
|
||||
let mut error_count = 0;
|
||||
|
||||
let mut dao_instance = SqliteLocationHistoryDao::new();
|
||||
let created_at = Utc::now().timestamp();
|
||||
|
||||
// Process in batches using batch insert for massive speedup
|
||||
for (batch_idx, chunk) in locations.chunks(args.batch_size).enumerate() {
|
||||
info!(
|
||||
"Processing batch {} ({} records)...",
|
||||
batch_idx + 1,
|
||||
chunk.len()
|
||||
);
|
||||
|
||||
// Convert to InsertLocationRecord
|
||||
let mut batch_inserts = Vec::with_capacity(chunk.len());
|
||||
|
||||
for location in chunk {
|
||||
// Skip existing check if requested (makes import much slower)
|
||||
if args.skip_existing {
|
||||
if let Ok(exists) = dao_instance.location_exists(
|
||||
&context,
|
||||
location.timestamp,
|
||||
location.latitude,
|
||||
location.longitude,
|
||||
) {
|
||||
if exists {
|
||||
skipped_count += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
batch_inserts.push(InsertLocationRecord {
|
||||
timestamp: location.timestamp,
|
||||
latitude: location.latitude,
|
||||
longitude: location.longitude,
|
||||
accuracy: location.accuracy,
|
||||
activity: location.activity.clone(),
|
||||
activity_confidence: location.activity_confidence,
|
||||
place_name: None,
|
||||
place_category: None,
|
||||
embedding: None,
|
||||
created_at,
|
||||
source_file: Some(args.path.clone()),
|
||||
});
|
||||
}
|
||||
|
||||
// Batch insert entire chunk in single transaction
|
||||
if !batch_inserts.is_empty() {
|
||||
match dao_instance.store_locations_batch(&context, batch_inserts) {
|
||||
Ok(count) => {
|
||||
inserted_count += count;
|
||||
info!(
|
||||
"Imported {} locations (total: {})...",
|
||||
count, inserted_count
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to store batch: {:?}", e);
|
||||
error_count += chunk.len();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("\n=== Import Summary ===");
|
||||
info!("Total locations found: {}", locations.len());
|
||||
info!("Successfully inserted: {}", inserted_count);
|
||||
info!("Skipped (already exist): {}", skipped_count);
|
||||
info!("Errors: {}", error_count);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
154
src/bin/import_search_history.rs
Normal file
154
src/bin/import_search_history.rs
Normal file
@@ -0,0 +1,154 @@
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::Utc;
|
||||
use clap::Parser;
|
||||
use image_api::ai::ollama::OllamaClient;
|
||||
use image_api::database::search_dao::{InsertSearchRecord, SqliteSearchHistoryDao};
|
||||
use image_api::parsers::search_html_parser::parse_search_html;
|
||||
use log::{error, info, warn};
|
||||
|
||||
// Import the trait to use its methods
|
||||
use image_api::database::SearchHistoryDao;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(author, version, about = "Import Google Takeout Search History data", long_about = None)]
|
||||
struct Args {
|
||||
/// Path to the search history HTML file
|
||||
#[arg(short, long)]
|
||||
path: String,
|
||||
|
||||
/// Skip searches that already exist in the database
|
||||
#[arg(long, default_value = "true")]
|
||||
skip_existing: bool,
|
||||
|
||||
/// Batch size for embedding generation (max 128 recommended)
|
||||
#[arg(long, default_value = "64")]
|
||||
batch_size: usize,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
dotenv::dotenv().ok();
|
||||
env_logger::init();
|
||||
|
||||
let args = Args::parse();
|
||||
|
||||
info!("Parsing search history file: {}", args.path);
|
||||
let searches = parse_search_html(&args.path).context("Failed to parse search history HTML")?;
|
||||
|
||||
info!("Found {} search records", searches.len());
|
||||
|
||||
let primary_url = dotenv::var("OLLAMA_PRIMARY_URL")
|
||||
.or_else(|_| dotenv::var("OLLAMA_URL"))
|
||||
.unwrap_or_else(|_| "http://localhost:11434".to_string());
|
||||
let fallback_url = dotenv::var("OLLAMA_FALLBACK_URL").ok();
|
||||
let primary_model = dotenv::var("OLLAMA_PRIMARY_MODEL")
|
||||
.or_else(|_| dotenv::var("OLLAMA_MODEL"))
|
||||
.unwrap_or_else(|_| "nomic-embed-text:v1.5".to_string());
|
||||
let fallback_model = dotenv::var("OLLAMA_FALLBACK_MODEL").ok();
|
||||
|
||||
let ollama = OllamaClient::new(primary_url, fallback_url, primary_model, fallback_model);
|
||||
let context = opentelemetry::Context::current();
|
||||
|
||||
let mut inserted_count = 0;
|
||||
let mut skipped_count = 0;
|
||||
let mut error_count = 0;
|
||||
|
||||
let mut dao_instance = SqliteSearchHistoryDao::new();
|
||||
let created_at = Utc::now().timestamp();
|
||||
|
||||
// Process searches in batches (embeddings are REQUIRED for searches)
|
||||
for (batch_idx, chunk) in searches.chunks(args.batch_size).enumerate() {
|
||||
info!(
|
||||
"Processing batch {} ({} searches)...",
|
||||
batch_idx + 1,
|
||||
chunk.len()
|
||||
);
|
||||
|
||||
// Generate embeddings for this batch
|
||||
let queries: Vec<String> = chunk.iter().map(|s| s.query.clone()).collect();
|
||||
|
||||
let embeddings_result = tokio::task::spawn({
|
||||
let ollama_client = ollama.clone();
|
||||
async move {
|
||||
// Generate embeddings in parallel for the batch
|
||||
let mut embeddings = Vec::new();
|
||||
for query in &queries {
|
||||
match ollama_client.generate_embedding(query).await {
|
||||
Ok(emb) => embeddings.push(Some(emb)),
|
||||
Err(e) => {
|
||||
warn!("Failed to generate embedding for query '{}': {}", query, e);
|
||||
embeddings.push(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
embeddings
|
||||
}
|
||||
})
|
||||
.await
|
||||
.context("Failed to generate embeddings for batch")?;
|
||||
|
||||
// Build batch of searches with embeddings
|
||||
let mut batch_inserts = Vec::new();
|
||||
|
||||
for (search, embedding_opt) in chunk.iter().zip(embeddings_result.iter()) {
|
||||
// Check if search exists (optional for speed)
|
||||
if args.skip_existing {
|
||||
if let Ok(exists) =
|
||||
dao_instance.search_exists(&context, search.timestamp, &search.query)
|
||||
{
|
||||
if exists {
|
||||
skipped_count += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Only insert if we have an embedding
|
||||
if let Some(embedding) = embedding_opt {
|
||||
batch_inserts.push(InsertSearchRecord {
|
||||
timestamp: search.timestamp,
|
||||
query: search.query.clone(),
|
||||
search_engine: search.search_engine.clone(),
|
||||
embedding: embedding.clone(),
|
||||
created_at,
|
||||
source_file: Some(args.path.clone()),
|
||||
});
|
||||
} else {
|
||||
error!(
|
||||
"Skipping search '{}' due to missing embedding",
|
||||
search.query
|
||||
);
|
||||
error_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Batch insert entire chunk in single transaction
|
||||
if !batch_inserts.is_empty() {
|
||||
match dao_instance.store_searches_batch(&context, batch_inserts) {
|
||||
Ok(count) => {
|
||||
inserted_count += count;
|
||||
info!("Imported {} searches (total: {})...", count, inserted_count);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to store batch: {:?}", e);
|
||||
error_count += chunk.len();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Rate limiting between batches
|
||||
if batch_idx < searches.len() / args.batch_size {
|
||||
info!("Waiting 500ms before next batch...");
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
|
||||
}
|
||||
}
|
||||
|
||||
info!("\n=== Import Summary ===");
|
||||
info!("Total searches found: {}", searches.len());
|
||||
info!("Successfully inserted: {}", inserted_count);
|
||||
info!("Skipped (already exist): {}", skipped_count);
|
||||
info!("Errors: {}", error_count);
|
||||
info!("All imported searches have embeddings for semantic search");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -102,11 +102,11 @@ fn main() -> anyhow::Result<()> {
|
||||
width: exif_data.width,
|
||||
height: exif_data.height,
|
||||
orientation: exif_data.orientation,
|
||||
gps_latitude: exif_data.gps_latitude,
|
||||
gps_longitude: exif_data.gps_longitude,
|
||||
gps_altitude: exif_data.gps_altitude,
|
||||
focal_length: exif_data.focal_length,
|
||||
aperture: exif_data.aperture,
|
||||
gps_latitude: exif_data.gps_latitude.map(|v| v as f32),
|
||||
gps_longitude: exif_data.gps_longitude.map(|v| v as f32),
|
||||
gps_altitude: exif_data.gps_altitude.map(|v| v as f32),
|
||||
focal_length: exif_data.focal_length.map(|v| v as f32),
|
||||
aperture: exif_data.aperture.map(|v| v as f32),
|
||||
shutter_speed: exif_data.shutter_speed,
|
||||
iso: exif_data.iso,
|
||||
date_taken: exif_data.date_taken,
|
||||
|
||||
Reference in New Issue
Block a user