Files
ImageApi/src/bin/import_location_history.rs
Cameron af35a996a3 Cleanup unused message embedding code
Fixup some warnings
2026-01-14 13:33:36 -05:00

115 lines
3.6 KiB
Rust

use anyhow::{Context, Result};
use chrono::Utc;
use clap::Parser;
use image_api::database::location_dao::{InsertLocationRecord, SqliteLocationHistoryDao};
use image_api::parsers::location_json_parser::parse_location_json;
use log::{error, info};
// Import the trait to use its methods
use image_api::database::LocationHistoryDao;
#[derive(Parser, Debug)]
#[command(author, version, about = "Import Google Takeout Location History data", long_about = None)]
struct Args {
/// Path to the Location History JSON file
#[arg(short, long)]
path: String,
/// Skip locations that already exist in the database
#[arg(long, default_value = "true")]
skip_existing: bool,
/// Batch size for database inserts
#[arg(long, default_value = "1000")]
batch_size: usize,
}
#[tokio::main]
async fn main() -> Result<()> {
dotenv::dotenv().ok();
env_logger::init();
let args = Args::parse();
info!("Parsing location history file: {}", args.path);
let locations =
parse_location_json(&args.path).context("Failed to parse location history JSON")?;
info!("Found {} location records", locations.len());
let context = opentelemetry::Context::current();
let mut inserted_count = 0;
let mut skipped_count = 0;
let mut error_count = 0;
let mut dao_instance = SqliteLocationHistoryDao::new();
let created_at = Utc::now().timestamp();
// Process in batches using batch insert for massive speedup
for (batch_idx, chunk) in locations.chunks(args.batch_size).enumerate() {
info!(
"Processing batch {} ({} records)...",
batch_idx + 1,
chunk.len()
);
// Convert to InsertLocationRecord
let mut batch_inserts = Vec::with_capacity(chunk.len());
for location in chunk {
// Skip existing check if requested (makes import much slower)
if args.skip_existing
&& let Ok(exists) = dao_instance.location_exists(
&context,
location.timestamp,
location.latitude,
location.longitude,
)
&& exists
{
skipped_count += 1;
continue;
}
batch_inserts.push(InsertLocationRecord {
timestamp: location.timestamp,
latitude: location.latitude,
longitude: location.longitude,
accuracy: location.accuracy,
activity: location.activity.clone(),
activity_confidence: location.activity_confidence,
place_name: None,
place_category: None,
embedding: None,
created_at,
source_file: Some(args.path.clone()),
});
}
// Batch insert entire chunk in single transaction
if !batch_inserts.is_empty() {
match dao_instance.store_locations_batch(&context, batch_inserts) {
Ok(count) => {
inserted_count += count;
info!(
"Imported {} locations (total: {})...",
count, inserted_count
);
}
Err(e) => {
error!("Failed to store batch: {:?}", e);
error_count += chunk.len();
}
}
}
}
info!("\n=== Import Summary ===");
info!("Total locations found: {}", locations.len());
info!("Successfully inserted: {}", inserted_count);
info!("Skipped (already exist): {}", skipped_count);
info!("Errors: {}", error_count);
Ok(())
}