Files
ImageApi/src/bin/import_calendar.rs
T
Cameron Cordes efd05db523 Make the embedding model swappable via env for A/B testing
Trialing Qwen3-Embedding-0.6B (1024-dim, instruct-prefixed queries)
against nomic required code changes at every hardcoded seam; now it's a
config flip plus a reembed_embeddings run.

- EMBEDDING_DIM env (default 768) replaces every hardcoded dim check:
  daily summary / calendar / search / location DAOs, Ollama batch
  validation, reembed_embeddings
- entities gains the dim guard it never had — a wrong-dim vector
  silently kills dedup/recall (cosine over mismatched lengths is 0),
  so store None and warn instead
- embed_query / embed_document split with EMBED_QUERY_PREFIX /
  EMBED_DOCUMENT_PREFIX (literal \n expanded): retrieval models treat
  the two sides differently — nomic wants search_query:/search_document:,
  Qwen3 wants Instruct:...\nQuery: on queries only. All query-side
  call sites and all corpus writers now declare their side.
- document the contract in CLAUDE.md: change the model or any of these
  vars → re-run reembed_embeddings or search is garbage

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 21:40:40 -04:00

160 lines
4.9 KiB
Rust

use anyhow::{Context, Result};
use chrono::Utc;
use clap::Parser;
use image_api::ai::LocalLlm;
use image_api::bin_progress;
use image_api::database::calendar_dao::{InsertCalendarEvent, SqliteCalendarEventDao};
use image_api::parsers::ical_parser::parse_ics_file;
use log::{error, info};
// Import the trait to use its methods
use image_api::database::CalendarEventDao;
#[derive(Parser, Debug)]
#[command(author, version, about = "Import Google Takeout Calendar data", long_about = None)]
struct Args {
/// Path to the .ics calendar file
#[arg(short, long)]
path: String,
/// Generate embeddings for calendar events (slower but enables semantic search)
#[arg(long, default_value = "false")]
generate_embeddings: bool,
/// Skip events that already exist in the database
#[arg(long, default_value = "true")]
skip_existing: bool,
/// Batch size for embedding generation
#[arg(long, default_value = "128")]
batch_size: usize,
}
#[tokio::main]
async fn main() -> Result<()> {
dotenv::dotenv().ok();
env_logger::init();
let args = Args::parse();
info!("Parsing calendar file: {}", args.path);
let events = parse_ics_file(&args.path).context("Failed to parse .ics file")?;
info!("Found {} calendar events", events.len());
let context = opentelemetry::Context::current();
// LocalLlm dispatches per LLM_BACKEND, so embeddings written here land
// in the same vector space the query side searches.
let llm = if args.generate_embeddings {
Some(LocalLlm::from_env())
} else {
None
};
let mut inserted_count = 0usize;
let mut skipped_count = 0usize;
let mut error_count = 0usize;
let pb = bin_progress::determinate(events.len() as u64, "importing");
// Process events in batches
// Can't use rayon with async, so process sequentially
for event in &events {
let mut dao_instance = SqliteCalendarEventDao::new();
// Check if event exists
if args.skip_existing
&& let Ok(exists) = dao_instance.event_exists(
&context,
event.event_uid.as_deref().unwrap_or(""),
event.start_time,
)
&& exists
{
skipped_count += 1;
pb.inc(1);
continue;
}
// Generate embedding if requested (blocking call)
let embedding = if let Some(ref llm) = llm {
let text = format!(
"{} {} {}",
event.summary,
event.description.as_deref().unwrap_or(""),
event.location.as_deref().unwrap_or("")
);
match tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(async { llm.embed_document(&text).await })
}) {
Ok(emb) => Some(emb),
Err(e) => {
pb.println(format!("embedding failed for '{}': {}", event.summary, e));
None
}
}
} else {
None
};
// Insert into database
let insert_event = InsertCalendarEvent {
event_uid: event.event_uid.clone(),
summary: event.summary.clone(),
description: event.description.clone(),
location: event.location.clone(),
start_time: event.start_time,
end_time: event.end_time,
all_day: event.all_day,
organizer: event.organizer.clone(),
attendees: if event.attendees.is_empty() {
None
} else {
Some(serde_json::to_string(&event.attendees).unwrap_or_default())
},
embedding,
created_at: Utc::now().timestamp(),
source_file: Some(args.path.clone()),
};
match dao_instance.store_event(&context, insert_event) {
Ok(_) => inserted_count += 1,
Err(e) => {
pb.println(format!("store failed for '{}': {:?}", event.summary, e));
error_count += 1;
}
}
pb.set_message(format!(
"inserted={} skipped={} errors={}",
inserted_count, skipped_count, error_count
));
pb.inc(1);
}
pb.finish_and_clear();
info!("=== Import Summary ===");
info!("Total events found: {}", events.len());
info!("Successfully inserted: {}", inserted_count);
info!("Skipped (already exist): {}", skipped_count);
info!("Errors: {}", error_count);
if args.generate_embeddings {
info!("Embeddings were generated for semantic search");
} else {
info!("No embeddings generated (use --generate-embeddings to enable semantic search)");
}
if error_count > 0 {
error!(
"Completed with {} errors — review log output above",
error_count
);
}
Ok(())
}