use diesel::prelude::*; use diesel::sqlite::SqliteConnection; use serde::Serialize; use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use crate::database::{DbError, DbErrorKind, connect}; use crate::otel::trace_db_call; /// Represents a location history record #[derive(Serialize, Clone, Debug)] pub struct LocationRecord { pub id: i32, pub timestamp: i64, pub latitude: f64, pub longitude: f64, pub accuracy: Option, pub activity: Option, pub activity_confidence: Option, pub place_name: Option, pub place_category: Option, pub created_at: i64, pub source_file: Option, } /// Data for inserting a new location record #[derive(Clone, Debug)] pub struct InsertLocationRecord { pub timestamp: i64, pub latitude: f64, pub longitude: f64, pub accuracy: Option, pub activity: Option, pub activity_confidence: Option, pub place_name: Option, pub place_category: Option, pub embedding: Option>, // 768-dim, optional (rarely used) pub created_at: i64, pub source_file: Option, } pub trait LocationHistoryDao: Sync + Send { /// Store single location record fn store_location( &mut self, context: &opentelemetry::Context, location: InsertLocationRecord, ) -> Result; /// Batch insert locations (Google Takeout has millions of points) fn store_locations_batch( &mut self, context: &opentelemetry::Context, locations: Vec, ) -> Result; /// Find nearest location to timestamp (PRIMARY query) /// "Where was I at photo timestamp ±N minutes?" fn find_nearest_location( &mut self, context: &opentelemetry::Context, timestamp: i64, max_time_diff_seconds: i64, ) -> Result, DbError>; /// Find locations in time range fn find_locations_in_range( &mut self, context: &opentelemetry::Context, start_ts: i64, end_ts: i64, ) -> Result, DbError>; /// Find locations near GPS coordinates (for "photos near this place") /// Uses approximate bounding box for performance fn find_locations_near_point( &mut self, context: &opentelemetry::Context, latitude: f64, longitude: f64, radius_km: f64, ) -> Result, DbError>; /// Deduplicate: check if location exists fn location_exists( &mut self, context: &opentelemetry::Context, timestamp: i64, latitude: f64, longitude: f64, ) -> Result; /// Get count of location records fn get_location_count(&mut self, context: &opentelemetry::Context) -> Result; } pub struct SqliteLocationHistoryDao { connection: Arc>, } impl Default for SqliteLocationHistoryDao { fn default() -> Self { Self::new() } } impl SqliteLocationHistoryDao { pub fn new() -> Self { SqliteLocationHistoryDao { connection: Arc::new(Mutex::new(connect())), } } fn serialize_vector(vec: &[f32]) -> Vec { use zerocopy::IntoBytes; vec.as_bytes().to_vec() } /// Haversine distance calculation (in kilometers) /// Used for filtering locations by proximity to a point fn haversine_distance(lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 { const R: f64 = 6371.0; // Earth radius in km let d_lat = (lat2 - lat1).to_radians(); let d_lon = (lon2 - lon1).to_radians(); let a = (d_lat / 2.0).sin().powi(2) + lat1.to_radians().cos() * lat2.to_radians().cos() * (d_lon / 2.0).sin().powi(2); let c = 2.0 * a.sqrt().atan2((1.0 - a).sqrt()); R * c } /// Calculate approximate bounding box for spatial queries /// Returns (min_lat, max_lat, min_lon, max_lon) fn bounding_box(lat: f64, lon: f64, radius_km: f64) -> (f64, f64, f64, f64) { const KM_PER_DEGREE_LAT: f64 = 111.0; let km_per_degree_lon = 111.0 * lat.to_radians().cos(); let delta_lat = radius_km / KM_PER_DEGREE_LAT; let delta_lon = radius_km / km_per_degree_lon; ( lat - delta_lat, // min_lat lat + delta_lat, // max_lat lon - delta_lon, // min_lon lon + delta_lon, // max_lon ) } } #[derive(QueryableByName)] struct LocationRecordRow { #[diesel(sql_type = diesel::sql_types::Integer)] id: i32, #[diesel(sql_type = diesel::sql_types::BigInt)] timestamp: i64, #[diesel(sql_type = diesel::sql_types::Float)] latitude: f32, #[diesel(sql_type = diesel::sql_types::Float)] longitude: f32, #[diesel(sql_type = diesel::sql_types::Nullable)] accuracy: Option, #[diesel(sql_type = diesel::sql_types::Nullable)] activity: Option, #[diesel(sql_type = diesel::sql_types::Nullable)] activity_confidence: Option, #[diesel(sql_type = diesel::sql_types::Nullable)] place_name: Option, #[diesel(sql_type = diesel::sql_types::Nullable)] place_category: Option, #[diesel(sql_type = diesel::sql_types::BigInt)] created_at: i64, #[diesel(sql_type = diesel::sql_types::Nullable)] source_file: Option, } impl LocationRecordRow { fn to_location_record(&self) -> LocationRecord { LocationRecord { id: self.id, timestamp: self.timestamp, latitude: self.latitude as f64, longitude: self.longitude as f64, accuracy: self.accuracy, activity: self.activity.clone(), activity_confidence: self.activity_confidence, place_name: self.place_name.clone(), place_category: self.place_category.clone(), created_at: self.created_at, source_file: self.source_file.clone(), } } } #[derive(QueryableByName)] struct LastInsertRowId { #[diesel(sql_type = diesel::sql_types::Integer)] id: i32, } impl LocationHistoryDao for SqliteLocationHistoryDao { fn store_location( &mut self, context: &opentelemetry::Context, location: InsertLocationRecord, ) -> Result { trace_db_call(context, "insert", "store_location", |_span| { let mut conn = self .connection .lock() .expect("Unable to get LocationHistoryDao"); // Validate embedding dimensions if provided (rare for location data) if let Some(ref emb) = location.embedding && emb.len() != 768 { return Err(anyhow::anyhow!( "Invalid embedding dimensions: {} (expected 768)", emb.len() )); } let embedding_bytes = location .embedding .as_ref() .map(|e| Self::serialize_vector(e)); // INSERT OR IGNORE to handle re-imports (UNIQUE constraint on timestamp+lat+lon) diesel::sql_query( "INSERT OR IGNORE INTO location_history (timestamp, latitude, longitude, accuracy, activity, activity_confidence, place_name, place_category, embedding, created_at, source_file) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", ) .bind::(location.timestamp) .bind::(location.latitude as f32) .bind::(location.longitude as f32) .bind::, _>(&location.accuracy) .bind::, _>(&location.activity) .bind::, _>( &location.activity_confidence, ) .bind::, _>(&location.place_name) .bind::, _>( &location.place_category, ) .bind::, _>(&embedding_bytes) .bind::(location.created_at) .bind::, _>(&location.source_file) .execute(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Insert error: {:?}", e))?; let row_id: i32 = diesel::sql_query("SELECT last_insert_rowid() as id") .get_result::(conn.deref_mut()) .map(|r| r.id) .map_err(|e| anyhow::anyhow!("Failed to get last insert ID: {:?}", e))?; Ok(LocationRecord { id: row_id, timestamp: location.timestamp, latitude: location.latitude, longitude: location.longitude, accuracy: location.accuracy, activity: location.activity, activity_confidence: location.activity_confidence, place_name: location.place_name, place_category: location.place_category, created_at: location.created_at, source_file: location.source_file, }) }) .map_err(|_| DbError::new(DbErrorKind::InsertError)) } fn store_locations_batch( &mut self, context: &opentelemetry::Context, locations: Vec, ) -> Result { trace_db_call(context, "insert", "store_locations_batch", |_span| { let mut conn = self .connection .lock() .expect("Unable to get LocationHistoryDao"); let mut inserted = 0; conn.transaction::<_, anyhow::Error, _>(|conn| { for location in locations { // Validate embedding if provided (rare) if let Some(ref emb) = location.embedding && emb.len() != 768 { log::warn!( "Skipping location with invalid embedding dimensions: {}", emb.len() ); continue; } let embedding_bytes = location .embedding .as_ref() .map(|e| Self::serialize_vector(e)); let rows_affected = diesel::sql_query( "INSERT OR IGNORE INTO location_history (timestamp, latitude, longitude, accuracy, activity, activity_confidence, place_name, place_category, embedding, created_at, source_file) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", ) .bind::(location.timestamp) .bind::(location.latitude as f32) .bind::(location.longitude as f32) .bind::, _>( &location.accuracy, ) .bind::, _>( &location.activity, ) .bind::, _>( &location.activity_confidence, ) .bind::, _>( &location.place_name, ) .bind::, _>( &location.place_category, ) .bind::, _>( &embedding_bytes, ) .bind::(location.created_at) .bind::, _>( &location.source_file, ) .execute(conn) .map_err(|e| anyhow::anyhow!("Batch insert error: {:?}", e))?; if rows_affected > 0 { inserted += 1; } } Ok(()) }) .map_err(|e| anyhow::anyhow!("Transaction error: {:?}", e))?; Ok(inserted) }) .map_err(|_| DbError::new(DbErrorKind::InsertError)) } fn find_nearest_location( &mut self, context: &opentelemetry::Context, timestamp: i64, max_time_diff_seconds: i64, ) -> Result, DbError> { trace_db_call(context, "query", "find_nearest_location", |_span| { let mut conn = self .connection .lock() .expect("Unable to get LocationHistoryDao"); let start_ts = timestamp - max_time_diff_seconds; let end_ts = timestamp + max_time_diff_seconds; // Find location closest to target timestamp within window let results = diesel::sql_query( "SELECT id, timestamp, latitude, longitude, accuracy, activity, activity_confidence, place_name, place_category, created_at, source_file FROM location_history WHERE timestamp >= ?1 AND timestamp <= ?2 ORDER BY ABS(timestamp - ?3) ASC LIMIT 1" ) .bind::(start_ts) .bind::(end_ts) .bind::(timestamp) .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; Ok(results.into_iter().next().map(|r| r.to_location_record())) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn find_locations_in_range( &mut self, context: &opentelemetry::Context, start_ts: i64, end_ts: i64, ) -> Result, DbError> { trace_db_call(context, "query", "find_locations_in_range", |_span| { let mut conn = self .connection .lock() .expect("Unable to get LocationHistoryDao"); diesel::sql_query( "SELECT id, timestamp, latitude, longitude, accuracy, activity, activity_confidence, place_name, place_category, created_at, source_file FROM location_history WHERE timestamp >= ?1 AND timestamp <= ?2 ORDER BY timestamp ASC" ) .bind::(start_ts) .bind::(end_ts) .load::(conn.deref_mut()) .map(|rows| rows.into_iter().map(|r| r.to_location_record()).collect()) .map_err(|e| anyhow::anyhow!("Query error: {:?}", e)) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn find_locations_near_point( &mut self, context: &opentelemetry::Context, latitude: f64, longitude: f64, radius_km: f64, ) -> Result, DbError> { trace_db_call(context, "query", "find_locations_near_point", |_span| { let mut conn = self .connection .lock() .expect("Unable to get LocationHistoryDao"); // Use bounding box for initial filter (fast, indexed) let (min_lat, max_lat, min_lon, max_lon) = Self::bounding_box(latitude, longitude, radius_km); let results = diesel::sql_query( "SELECT id, timestamp, latitude, longitude, accuracy, activity, activity_confidence, place_name, place_category, created_at, source_file FROM location_history WHERE latitude >= ?1 AND latitude <= ?2 AND longitude >= ?3 AND longitude <= ?4" ) .bind::(min_lat as f32) .bind::(max_lat as f32) .bind::(min_lon as f32) .bind::(max_lon as f32) .load::(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; // Refine with Haversine distance (in-memory, post-filter) let filtered: Vec = results .into_iter() .map(|r| r.to_location_record()) .filter(|loc| { let distance = Self::haversine_distance(latitude, longitude, loc.latitude, loc.longitude); distance <= radius_km }) .collect(); log::info!( "Found {} locations within {} km of ({}, {})", filtered.len(), radius_km, latitude, longitude ); Ok(filtered) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn location_exists( &mut self, context: &opentelemetry::Context, timestamp: i64, latitude: f64, longitude: f64, ) -> Result { trace_db_call(context, "query", "location_exists", |_span| { let mut conn = self .connection .lock() .expect("Unable to get LocationHistoryDao"); #[derive(QueryableByName)] struct CountResult { #[diesel(sql_type = diesel::sql_types::Integer)] count: i32, } let result: CountResult = diesel::sql_query( "SELECT COUNT(*) as count FROM location_history WHERE timestamp = ?1 AND latitude = ?2 AND longitude = ?3", ) .bind::(timestamp) .bind::(latitude as f32) .bind::(longitude as f32) .get_result(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; Ok(result.count > 0) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_location_count(&mut self, context: &opentelemetry::Context) -> Result { trace_db_call(context, "query", "get_location_count", |_span| { let mut conn = self .connection .lock() .expect("Unable to get LocationHistoryDao"); #[derive(QueryableByName)] struct CountResult { #[diesel(sql_type = diesel::sql_types::BigInt)] count: i64, } let result: CountResult = diesel::sql_query("SELECT COUNT(*) as count FROM location_history") .get_result(conn.deref_mut()) .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; Ok(result.count) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } }