529 lines
20 KiB
Rust
529 lines
20 KiB
Rust
use diesel::prelude::*;
|
|
use diesel::sqlite::SqliteConnection;
|
|
use serde::Serialize;
|
|
use std::ops::DerefMut;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use crate::database::{DbError, DbErrorKind, connect};
|
|
use crate::otel::trace_db_call;
|
|
|
|
/// Represents a location history record
|
|
#[derive(Serialize, Clone, Debug)]
|
|
pub struct LocationRecord {
|
|
pub id: i32,
|
|
pub timestamp: i64,
|
|
pub latitude: f64,
|
|
pub longitude: f64,
|
|
pub accuracy: Option<i32>,
|
|
pub activity: Option<String>,
|
|
pub activity_confidence: Option<i32>,
|
|
pub place_name: Option<String>,
|
|
pub place_category: Option<String>,
|
|
pub created_at: i64,
|
|
pub source_file: Option<String>,
|
|
}
|
|
|
|
/// Data for inserting a new location record
|
|
#[derive(Clone, Debug)]
|
|
pub struct InsertLocationRecord {
|
|
pub timestamp: i64,
|
|
pub latitude: f64,
|
|
pub longitude: f64,
|
|
pub accuracy: Option<i32>,
|
|
pub activity: Option<String>,
|
|
pub activity_confidence: Option<i32>,
|
|
pub place_name: Option<String>,
|
|
pub place_category: Option<String>,
|
|
pub embedding: Option<Vec<f32>>, // 768-dim, optional (rarely used)
|
|
pub created_at: i64,
|
|
pub source_file: Option<String>,
|
|
}
|
|
|
|
pub trait LocationHistoryDao: Sync + Send {
|
|
/// Store single location record
|
|
fn store_location(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
location: InsertLocationRecord,
|
|
) -> Result<LocationRecord, DbError>;
|
|
|
|
/// Batch insert locations (Google Takeout has millions of points)
|
|
fn store_locations_batch(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
locations: Vec<InsertLocationRecord>,
|
|
) -> Result<usize, DbError>;
|
|
|
|
/// Find nearest location to timestamp (PRIMARY query)
|
|
/// "Where was I at photo timestamp ±N minutes?"
|
|
fn find_nearest_location(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
timestamp: i64,
|
|
max_time_diff_seconds: i64,
|
|
) -> Result<Option<LocationRecord>, DbError>;
|
|
|
|
/// Find locations in time range
|
|
fn find_locations_in_range(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
start_ts: i64,
|
|
end_ts: i64,
|
|
) -> Result<Vec<LocationRecord>, DbError>;
|
|
|
|
/// Find locations near GPS coordinates (for "photos near this place")
|
|
/// Uses approximate bounding box for performance
|
|
fn find_locations_near_point(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
latitude: f64,
|
|
longitude: f64,
|
|
radius_km: f64,
|
|
) -> Result<Vec<LocationRecord>, DbError>;
|
|
|
|
/// Deduplicate: check if location exists
|
|
fn location_exists(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
timestamp: i64,
|
|
latitude: f64,
|
|
longitude: f64,
|
|
) -> Result<bool, DbError>;
|
|
|
|
/// Get count of location records
|
|
fn get_location_count(&mut self, context: &opentelemetry::Context) -> Result<i64, DbError>;
|
|
}
|
|
|
|
pub struct SqliteLocationHistoryDao {
|
|
connection: Arc<Mutex<SqliteConnection>>,
|
|
}
|
|
|
|
impl Default for SqliteLocationHistoryDao {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl SqliteLocationHistoryDao {
|
|
pub fn new() -> Self {
|
|
SqliteLocationHistoryDao {
|
|
connection: Arc::new(Mutex::new(connect())),
|
|
}
|
|
}
|
|
|
|
fn serialize_vector(vec: &[f32]) -> Vec<u8> {
|
|
use zerocopy::IntoBytes;
|
|
vec.as_bytes().to_vec()
|
|
}
|
|
|
|
/// Haversine distance calculation (in kilometers)
|
|
/// Used for filtering locations by proximity to a point
|
|
fn haversine_distance(lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 {
|
|
const R: f64 = 6371.0; // Earth radius in km
|
|
|
|
let d_lat = (lat2 - lat1).to_radians();
|
|
let d_lon = (lon2 - lon1).to_radians();
|
|
|
|
let a = (d_lat / 2.0).sin().powi(2)
|
|
+ lat1.to_radians().cos() * lat2.to_radians().cos() * (d_lon / 2.0).sin().powi(2);
|
|
|
|
let c = 2.0 * a.sqrt().atan2((1.0 - a).sqrt());
|
|
|
|
R * c
|
|
}
|
|
|
|
/// Calculate approximate bounding box for spatial queries
|
|
/// Returns (min_lat, max_lat, min_lon, max_lon)
|
|
fn bounding_box(lat: f64, lon: f64, radius_km: f64) -> (f64, f64, f64, f64) {
|
|
const KM_PER_DEGREE_LAT: f64 = 111.0;
|
|
let km_per_degree_lon = 111.0 * lat.to_radians().cos();
|
|
|
|
let delta_lat = radius_km / KM_PER_DEGREE_LAT;
|
|
let delta_lon = radius_km / km_per_degree_lon;
|
|
|
|
(
|
|
lat - delta_lat, // min_lat
|
|
lat + delta_lat, // max_lat
|
|
lon - delta_lon, // min_lon
|
|
lon + delta_lon, // max_lon
|
|
)
|
|
}
|
|
}
|
|
|
|
#[derive(QueryableByName)]
|
|
struct LocationRecordRow {
|
|
#[diesel(sql_type = diesel::sql_types::Integer)]
|
|
id: i32,
|
|
#[diesel(sql_type = diesel::sql_types::BigInt)]
|
|
timestamp: i64,
|
|
#[diesel(sql_type = diesel::sql_types::Float)]
|
|
latitude: f32,
|
|
#[diesel(sql_type = diesel::sql_types::Float)]
|
|
longitude: f32,
|
|
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)]
|
|
accuracy: Option<i32>,
|
|
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
|
|
activity: Option<String>,
|
|
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)]
|
|
activity_confidence: Option<i32>,
|
|
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
|
|
place_name: Option<String>,
|
|
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
|
|
place_category: Option<String>,
|
|
#[diesel(sql_type = diesel::sql_types::BigInt)]
|
|
created_at: i64,
|
|
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
|
|
source_file: Option<String>,
|
|
}
|
|
|
|
impl LocationRecordRow {
|
|
fn to_location_record(&self) -> LocationRecord {
|
|
LocationRecord {
|
|
id: self.id,
|
|
timestamp: self.timestamp,
|
|
latitude: self.latitude as f64,
|
|
longitude: self.longitude as f64,
|
|
accuracy: self.accuracy,
|
|
activity: self.activity.clone(),
|
|
activity_confidence: self.activity_confidence,
|
|
place_name: self.place_name.clone(),
|
|
place_category: self.place_category.clone(),
|
|
created_at: self.created_at,
|
|
source_file: self.source_file.clone(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(QueryableByName)]
|
|
struct LastInsertRowId {
|
|
#[diesel(sql_type = diesel::sql_types::Integer)]
|
|
id: i32,
|
|
}
|
|
|
|
impl LocationHistoryDao for SqliteLocationHistoryDao {
|
|
fn store_location(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
location: InsertLocationRecord,
|
|
) -> Result<LocationRecord, DbError> {
|
|
trace_db_call(context, "insert", "store_location", |_span| {
|
|
let mut conn = self
|
|
.connection
|
|
.lock()
|
|
.expect("Unable to get LocationHistoryDao");
|
|
|
|
// Validate embedding dimensions if provided (rare for location data)
|
|
if let Some(ref emb) = location.embedding
|
|
&& emb.len() != 768
|
|
{
|
|
return Err(anyhow::anyhow!(
|
|
"Invalid embedding dimensions: {} (expected 768)",
|
|
emb.len()
|
|
));
|
|
}
|
|
|
|
let embedding_bytes = location
|
|
.embedding
|
|
.as_ref()
|
|
.map(|e| Self::serialize_vector(e));
|
|
|
|
// INSERT OR IGNORE to handle re-imports (UNIQUE constraint on timestamp+lat+lon)
|
|
diesel::sql_query(
|
|
"INSERT OR IGNORE INTO location_history
|
|
(timestamp, latitude, longitude, accuracy, activity, activity_confidence,
|
|
place_name, place_category, embedding, created_at, source_file)
|
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
|
|
)
|
|
.bind::<diesel::sql_types::BigInt, _>(location.timestamp)
|
|
.bind::<diesel::sql_types::Float, _>(location.latitude as f32)
|
|
.bind::<diesel::sql_types::Float, _>(location.longitude as f32)
|
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Integer>, _>(&location.accuracy)
|
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(&location.activity)
|
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Integer>, _>(
|
|
&location.activity_confidence,
|
|
)
|
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(&location.place_name)
|
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
|
|
&location.place_category,
|
|
)
|
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Binary>, _>(&embedding_bytes)
|
|
.bind::<diesel::sql_types::BigInt, _>(location.created_at)
|
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(&location.source_file)
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Insert error: {:?}", e))?;
|
|
|
|
let row_id: i32 = diesel::sql_query("SELECT last_insert_rowid() as id")
|
|
.get_result::<LastInsertRowId>(conn.deref_mut())
|
|
.map(|r| r.id)
|
|
.map_err(|e| anyhow::anyhow!("Failed to get last insert ID: {:?}", e))?;
|
|
|
|
Ok(LocationRecord {
|
|
id: row_id,
|
|
timestamp: location.timestamp,
|
|
latitude: location.latitude,
|
|
longitude: location.longitude,
|
|
accuracy: location.accuracy,
|
|
activity: location.activity,
|
|
activity_confidence: location.activity_confidence,
|
|
place_name: location.place_name,
|
|
place_category: location.place_category,
|
|
created_at: location.created_at,
|
|
source_file: location.source_file,
|
|
})
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::InsertError))
|
|
}
|
|
|
|
fn store_locations_batch(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
locations: Vec<InsertLocationRecord>,
|
|
) -> Result<usize, DbError> {
|
|
trace_db_call(context, "insert", "store_locations_batch", |_span| {
|
|
let mut conn = self
|
|
.connection
|
|
.lock()
|
|
.expect("Unable to get LocationHistoryDao");
|
|
let mut inserted = 0;
|
|
|
|
conn.transaction::<_, anyhow::Error, _>(|conn| {
|
|
for location in locations {
|
|
// Validate embedding if provided (rare)
|
|
if let Some(ref emb) = location.embedding
|
|
&& emb.len() != 768
|
|
{
|
|
log::warn!(
|
|
"Skipping location with invalid embedding dimensions: {}",
|
|
emb.len()
|
|
);
|
|
continue;
|
|
}
|
|
|
|
let embedding_bytes = location
|
|
.embedding
|
|
.as_ref()
|
|
.map(|e| Self::serialize_vector(e));
|
|
|
|
let rows_affected = diesel::sql_query(
|
|
"INSERT OR IGNORE INTO location_history
|
|
(timestamp, latitude, longitude, accuracy, activity, activity_confidence,
|
|
place_name, place_category, embedding, created_at, source_file)
|
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
|
|
)
|
|
.bind::<diesel::sql_types::BigInt, _>(location.timestamp)
|
|
.bind::<diesel::sql_types::Float, _>(location.latitude as f32)
|
|
.bind::<diesel::sql_types::Float, _>(location.longitude as f32)
|
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Integer>, _>(
|
|
&location.accuracy,
|
|
)
|
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
|
|
&location.activity,
|
|
)
|
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Integer>, _>(
|
|
&location.activity_confidence,
|
|
)
|
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
|
|
&location.place_name,
|
|
)
|
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
|
|
&location.place_category,
|
|
)
|
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Binary>, _>(
|
|
&embedding_bytes,
|
|
)
|
|
.bind::<diesel::sql_types::BigInt, _>(location.created_at)
|
|
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
|
|
&location.source_file,
|
|
)
|
|
.execute(conn)
|
|
.map_err(|e| anyhow::anyhow!("Batch insert error: {:?}", e))?;
|
|
|
|
if rows_affected > 0 {
|
|
inserted += 1;
|
|
}
|
|
}
|
|
Ok(())
|
|
})
|
|
.map_err(|e| anyhow::anyhow!("Transaction error: {:?}", e))?;
|
|
|
|
Ok(inserted)
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::InsertError))
|
|
}
|
|
|
|
fn find_nearest_location(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
timestamp: i64,
|
|
max_time_diff_seconds: i64,
|
|
) -> Result<Option<LocationRecord>, DbError> {
|
|
trace_db_call(context, "query", "find_nearest_location", |_span| {
|
|
let mut conn = self
|
|
.connection
|
|
.lock()
|
|
.expect("Unable to get LocationHistoryDao");
|
|
|
|
let start_ts = timestamp - max_time_diff_seconds;
|
|
let end_ts = timestamp + max_time_diff_seconds;
|
|
|
|
// Find location closest to target timestamp within window
|
|
let results = diesel::sql_query(
|
|
"SELECT id, timestamp, latitude, longitude, accuracy, activity, activity_confidence,
|
|
place_name, place_category, created_at, source_file
|
|
FROM location_history
|
|
WHERE timestamp >= ?1 AND timestamp <= ?2
|
|
ORDER BY ABS(timestamp - ?3) ASC
|
|
LIMIT 1"
|
|
)
|
|
.bind::<diesel::sql_types::BigInt, _>(start_ts)
|
|
.bind::<diesel::sql_types::BigInt, _>(end_ts)
|
|
.bind::<diesel::sql_types::BigInt, _>(timestamp)
|
|
.load::<LocationRecordRow>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?;
|
|
|
|
Ok(results.into_iter().next().map(|r| r.to_location_record()))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn find_locations_in_range(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
start_ts: i64,
|
|
end_ts: i64,
|
|
) -> Result<Vec<LocationRecord>, DbError> {
|
|
trace_db_call(context, "query", "find_locations_in_range", |_span| {
|
|
let mut conn = self
|
|
.connection
|
|
.lock()
|
|
.expect("Unable to get LocationHistoryDao");
|
|
|
|
diesel::sql_query(
|
|
"SELECT id, timestamp, latitude, longitude, accuracy, activity, activity_confidence,
|
|
place_name, place_category, created_at, source_file
|
|
FROM location_history
|
|
WHERE timestamp >= ?1 AND timestamp <= ?2
|
|
ORDER BY timestamp ASC"
|
|
)
|
|
.bind::<diesel::sql_types::BigInt, _>(start_ts)
|
|
.bind::<diesel::sql_types::BigInt, _>(end_ts)
|
|
.load::<LocationRecordRow>(conn.deref_mut())
|
|
.map(|rows| rows.into_iter().map(|r| r.to_location_record()).collect())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {:?}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn find_locations_near_point(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
latitude: f64,
|
|
longitude: f64,
|
|
radius_km: f64,
|
|
) -> Result<Vec<LocationRecord>, DbError> {
|
|
trace_db_call(context, "query", "find_locations_near_point", |_span| {
|
|
let mut conn = self
|
|
.connection
|
|
.lock()
|
|
.expect("Unable to get LocationHistoryDao");
|
|
|
|
// Use bounding box for initial filter (fast, indexed)
|
|
let (min_lat, max_lat, min_lon, max_lon) =
|
|
Self::bounding_box(latitude, longitude, radius_km);
|
|
|
|
let results = diesel::sql_query(
|
|
"SELECT id, timestamp, latitude, longitude, accuracy, activity, activity_confidence,
|
|
place_name, place_category, created_at, source_file
|
|
FROM location_history
|
|
WHERE latitude >= ?1 AND latitude <= ?2
|
|
AND longitude >= ?3 AND longitude <= ?4"
|
|
)
|
|
.bind::<diesel::sql_types::Float, _>(min_lat as f32)
|
|
.bind::<diesel::sql_types::Float, _>(max_lat as f32)
|
|
.bind::<diesel::sql_types::Float, _>(min_lon as f32)
|
|
.bind::<diesel::sql_types::Float, _>(max_lon as f32)
|
|
.load::<LocationRecordRow>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?;
|
|
|
|
// Refine with Haversine distance (in-memory, post-filter)
|
|
let filtered: Vec<LocationRecord> = results
|
|
.into_iter()
|
|
.map(|r| r.to_location_record())
|
|
.filter(|loc| {
|
|
let distance =
|
|
Self::haversine_distance(latitude, longitude, loc.latitude, loc.longitude);
|
|
distance <= radius_km
|
|
})
|
|
.collect();
|
|
|
|
log::info!(
|
|
"Found {} locations within {} km of ({}, {})",
|
|
filtered.len(),
|
|
radius_km,
|
|
latitude,
|
|
longitude
|
|
);
|
|
|
|
Ok(filtered)
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn location_exists(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
timestamp: i64,
|
|
latitude: f64,
|
|
longitude: f64,
|
|
) -> Result<bool, DbError> {
|
|
trace_db_call(context, "query", "location_exists", |_span| {
|
|
let mut conn = self
|
|
.connection
|
|
.lock()
|
|
.expect("Unable to get LocationHistoryDao");
|
|
|
|
#[derive(QueryableByName)]
|
|
struct CountResult {
|
|
#[diesel(sql_type = diesel::sql_types::Integer)]
|
|
count: i32,
|
|
}
|
|
|
|
let result: CountResult = diesel::sql_query(
|
|
"SELECT COUNT(*) as count FROM location_history
|
|
WHERE timestamp = ?1 AND latitude = ?2 AND longitude = ?3",
|
|
)
|
|
.bind::<diesel::sql_types::BigInt, _>(timestamp)
|
|
.bind::<diesel::sql_types::Float, _>(latitude as f32)
|
|
.bind::<diesel::sql_types::Float, _>(longitude as f32)
|
|
.get_result(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?;
|
|
|
|
Ok(result.count > 0)
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn get_location_count(&mut self, context: &opentelemetry::Context) -> Result<i64, DbError> {
|
|
trace_db_call(context, "query", "get_location_count", |_span| {
|
|
let mut conn = self
|
|
.connection
|
|
.lock()
|
|
.expect("Unable to get LocationHistoryDao");
|
|
|
|
#[derive(QueryableByName)]
|
|
struct CountResult {
|
|
#[diesel(sql_type = diesel::sql_types::BigInt)]
|
|
count: i64,
|
|
}
|
|
|
|
let result: CountResult =
|
|
diesel::sql_query("SELECT COUNT(*) as count FROM location_history")
|
|
.get_result(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?;
|
|
|
|
Ok(result.count)
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
}
|