use anyhow::{Context, Result}; use chrono::Utc; use clap::Parser; use image_api::ai::ollama::OllamaClient; use image_api::database::search_dao::{InsertSearchRecord, SqliteSearchHistoryDao}; use image_api::parsers::search_html_parser::parse_search_html; use log::{error, info, warn}; // Import the trait to use its methods use image_api::database::SearchHistoryDao; #[derive(Parser, Debug)] #[command(author, version, about = "Import Google Takeout Search History data", long_about = None)] struct Args { /// Path to the search history HTML file #[arg(short, long)] path: String, /// Skip searches that already exist in the database #[arg(long, default_value = "true")] skip_existing: bool, /// Batch size for embedding generation (max 128 recommended) #[arg(long, default_value = "64")] batch_size: usize, } #[tokio::main] async fn main() -> Result<()> { dotenv::dotenv().ok(); env_logger::init(); let args = Args::parse(); info!("Parsing search history file: {}", args.path); let searches = parse_search_html(&args.path).context("Failed to parse search history HTML")?; info!("Found {} search records", searches.len()); let primary_url = dotenv::var("OLLAMA_PRIMARY_URL") .or_else(|_| dotenv::var("OLLAMA_URL")) .unwrap_or_else(|_| "http://localhost:11434".to_string()); let fallback_url = dotenv::var("OLLAMA_FALLBACK_URL").ok(); let primary_model = dotenv::var("OLLAMA_PRIMARY_MODEL") .or_else(|_| dotenv::var("OLLAMA_MODEL")) .unwrap_or_else(|_| "nomic-embed-text:v1.5".to_string()); let fallback_model = dotenv::var("OLLAMA_FALLBACK_MODEL").ok(); let ollama = OllamaClient::new(primary_url, fallback_url, primary_model, fallback_model); let context = opentelemetry::Context::current(); let mut inserted_count = 0; let mut skipped_count = 0; let mut error_count = 0; let mut dao_instance = SqliteSearchHistoryDao::new(); let created_at = Utc::now().timestamp(); // Process searches in batches (embeddings are REQUIRED for searches) for (batch_idx, chunk) in searches.chunks(args.batch_size).enumerate() { info!( "Processing batch {} ({} searches)...", batch_idx + 1, chunk.len() ); // Generate embeddings for this batch let queries: Vec = chunk.iter().map(|s| s.query.clone()).collect(); let embeddings_result = tokio::task::spawn({ let ollama_client = ollama.clone(); async move { // Generate embeddings in parallel for the batch let mut embeddings = Vec::new(); for query in &queries { match ollama_client.generate_embedding(query).await { Ok(emb) => embeddings.push(Some(emb)), Err(e) => { warn!("Failed to generate embedding for query '{}': {}", query, e); embeddings.push(None); } } } embeddings } }) .await .context("Failed to generate embeddings for batch")?; // Build batch of searches with embeddings let mut batch_inserts = Vec::new(); for (search, embedding_opt) in chunk.iter().zip(embeddings_result.iter()) { // Check if search exists (optional for speed) if args.skip_existing && let Ok(exists) = dao_instance.search_exists(&context, search.timestamp, &search.query) && exists { skipped_count += 1; continue; } // Only insert if we have an embedding if let Some(embedding) = embedding_opt { batch_inserts.push(InsertSearchRecord { timestamp: search.timestamp, query: search.query.clone(), search_engine: search.search_engine.clone(), embedding: embedding.clone(), created_at, source_file: Some(args.path.clone()), }); } else { error!( "Skipping search '{}' due to missing embedding", search.query ); error_count += 1; } } // Batch insert entire chunk in single transaction if !batch_inserts.is_empty() { match dao_instance.store_searches_batch(&context, batch_inserts) { Ok(count) => { inserted_count += count; info!("Imported {} searches (total: {})...", count, inserted_count); } Err(e) => { error!("Failed to store batch: {:?}", e); error_count += chunk.len(); } } } // Rate limiting between batches if batch_idx < searches.len() / args.batch_size { info!("Waiting 500ms before next batch..."); tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; } } info!("\n=== Import Summary ==="); info!("Total searches found: {}", searches.len()); info!("Successfully inserted: {}", inserted_count); info!("Skipped (already exist): {}", skipped_count); info!("Errors: {}", error_count); info!("All imported searches have embeddings for semantic search"); Ok(()) }