use bcrypt::{DEFAULT_COST, hash, verify}; use diesel::prelude::*; use diesel::sqlite::SqliteConnection; use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use crate::database::models::{ Favorite, ImageExif, InsertFavorite, InsertImageExif, InsertUser, User, }; use crate::otel::trace_db_call; pub mod calendar_dao; pub mod daily_summary_dao; pub mod insights_dao; pub mod knowledge_dao; pub mod location_dao; pub mod models; pub mod preview_dao; pub mod schema; pub mod search_dao; pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao}; pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao}; pub use insights_dao::{InsightDao, SqliteInsightDao}; pub use knowledge_dao::{ EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, RecentActivity, SqliteKnowledgeDao, }; pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao}; pub use preview_dao::{PreviewDao, SqlitePreviewDao}; pub use search_dao::{SearchHistoryDao, SqliteSearchHistoryDao}; pub trait UserDao { fn create_user(&mut self, user: &str, password: &str) -> Option; fn get_user(&mut self, user: &str, password: &str) -> Option; fn user_exists(&mut self, user: &str) -> bool; } pub struct SqliteUserDao { connection: SqliteConnection, } impl Default for SqliteUserDao { fn default() -> Self { Self::new() } } impl SqliteUserDao { pub fn new() -> Self { Self { connection: connect(), } } } #[cfg(test)] pub mod test { use diesel::{Connection, SqliteConnection}; use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; const DB_MIGRATIONS: EmbeddedMigrations = embed_migrations!(); pub fn in_memory_db_connection() -> SqliteConnection { let mut connection = SqliteConnection::establish(":memory:") .expect("Unable to create in-memory db connection"); connection .run_pending_migrations(DB_MIGRATIONS) .expect("Failure running DB migrations"); connection } } impl UserDao for SqliteUserDao { // TODO: Should probably use Result here fn create_user(&mut self, user: &str, pass: &str) -> Option { use schema::users::dsl::*; let hashed = hash(pass, DEFAULT_COST); if let Ok(hash) = hashed { diesel::insert_into(users) .values(InsertUser { username: user, password: &hash, }) .execute(&mut self.connection) .unwrap(); users .filter(username.eq(username)) .load::(&mut self.connection) .unwrap() .first() .cloned() } else { None } } fn get_user(&mut self, user: &str, pass: &str) -> Option { use schema::users::dsl::*; match users .filter(username.eq(user)) .load::(&mut self.connection) .unwrap_or_default() .first() { Some(u) if verify(pass, &u.password).unwrap_or(false) => Some(u.clone()), _ => None, } } fn user_exists(&mut self, user: &str) -> bool { use schema::users::dsl::*; !users .filter(username.eq(user)) .load::(&mut self.connection) .unwrap_or_default() .is_empty() } } pub fn connect() -> SqliteConnection { let db_url = dotenv::var("DATABASE_URL").expect("DATABASE_URL must be set"); SqliteConnection::establish(&db_url).expect("Error connecting to DB") } #[derive(Debug)] pub struct DbError { pub kind: DbErrorKind, } impl DbError { fn new(kind: DbErrorKind) -> Self { DbError { kind } } fn exists() -> Self { DbError::new(DbErrorKind::AlreadyExists) } } #[derive(Debug, PartialEq)] pub enum DbErrorKind { AlreadyExists, InsertError, QueryError, UpdateError, } pub trait FavoriteDao: Sync + Send { fn add_favorite(&mut self, user_id: i32, favorite_path: &str) -> Result; fn remove_favorite(&mut self, user_id: i32, favorite_path: String); fn get_favorites(&mut self, user_id: i32) -> Result, DbError>; fn update_path(&mut self, old_path: &str, new_path: &str) -> Result<(), DbError>; fn get_all_paths(&mut self) -> Result, DbError>; } pub struct SqliteFavoriteDao { connection: Arc>, } impl Default for SqliteFavoriteDao { fn default() -> Self { Self::new() } } impl SqliteFavoriteDao { pub fn new() -> Self { SqliteFavoriteDao { connection: Arc::new(Mutex::new(connect())), } } } impl FavoriteDao for SqliteFavoriteDao { fn add_favorite(&mut self, user_id: i32, favorite_path: &str) -> Result { use schema::favorites::dsl::*; let mut connection = self.connection.lock().expect("Unable to get FavoriteDao"); if favorites .filter(userid.eq(user_id).and(rel_path.eq(&favorite_path))) .first::(connection.deref_mut()) .is_err() { diesel::insert_into(favorites) .values(InsertFavorite { userid: &user_id, path: favorite_path, }) .execute(connection.deref_mut()) .map_err(|_| DbError::new(DbErrorKind::InsertError)) } else { Err(DbError::exists()) } } fn remove_favorite(&mut self, user_id: i32, favorite_path: String) { use schema::favorites::dsl::*; diesel::delete(favorites) .filter(userid.eq(user_id).and(rel_path.eq(favorite_path))) .execute(self.connection.lock().unwrap().deref_mut()) .unwrap(); } fn get_favorites(&mut self, user_id: i32) -> Result, DbError> { use schema::favorites::dsl::*; favorites .filter(userid.eq(user_id)) .load::(self.connection.lock().unwrap().deref_mut()) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn update_path(&mut self, old_path: &str, new_path: &str) -> Result<(), DbError> { use schema::favorites::dsl::*; diesel::update(favorites.filter(rel_path.eq(old_path))) .set(rel_path.eq(new_path)) .execute(self.connection.lock().unwrap().deref_mut()) .map_err(|_| DbError::new(DbErrorKind::UpdateError))?; Ok(()) } fn get_all_paths(&mut self) -> Result, DbError> { use schema::favorites::dsl::*; favorites .select(rel_path) .distinct() .load(self.connection.lock().unwrap().deref_mut()) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } } pub trait ExifDao: Sync + Send { fn store_exif( &mut self, context: &opentelemetry::Context, exif_data: InsertImageExif, ) -> Result; fn get_exif( &mut self, context: &opentelemetry::Context, file_path: &str, ) -> Result, DbError>; fn update_exif( &mut self, context: &opentelemetry::Context, exif_data: InsertImageExif, ) -> Result; fn delete_exif( &mut self, context: &opentelemetry::Context, file_path: &str, ) -> Result<(), DbError>; fn get_all_with_date_taken( &mut self, context: &opentelemetry::Context, ) -> Result, DbError>; /// Batch load EXIF data for multiple file paths (single query) fn get_exif_batch( &mut self, context: &opentelemetry::Context, file_paths: &[String], ) -> Result, DbError>; /// Query files by EXIF criteria with optional filters fn query_by_exif( &mut self, context: &opentelemetry::Context, camera_make: Option<&str>, camera_model: Option<&str>, lens_model: Option<&str>, gps_bounds: Option<(f64, f64, f64, f64)>, // (min_lat, max_lat, min_lon, max_lon) date_from: Option, date_to: Option, ) -> Result, DbError>; /// Get distinct camera makes with counts fn get_camera_makes( &mut self, context: &opentelemetry::Context, ) -> Result, DbError>; /// Update file path in EXIF database fn update_file_path( &mut self, context: &opentelemetry::Context, old_path: &str, new_path: &str, ) -> Result<(), DbError>; /// Get all file paths from EXIF database fn get_all_file_paths( &mut self, context: &opentelemetry::Context, ) -> Result, DbError>; /// Get all photos with GPS coordinates /// Returns Vec<(file_path, latitude, longitude, date_taken)> fn get_all_with_gps( &mut self, context: &opentelemetry::Context, base_path: &str, recursive: bool, ) -> Result)>, DbError>; /// Return rows that still lack a `content_hash`, oldest first. Used by /// the `backfill_hashes` binary to batch through the historical /// backlog. Returns `(library_id, rel_path)` tuples so the caller can /// resolve each file on disk. fn get_rows_missing_hash( &mut self, context: &opentelemetry::Context, limit: i64, ) -> Result, DbError>; /// Persist the computed blake3 hash + file size for an existing row. fn backfill_content_hash( &mut self, context: &opentelemetry::Context, library_id: i32, rel_path: &str, hash: &str, size_bytes: i64, ) -> Result<(), DbError>; /// Return the first EXIF row with the given content hash (any library). /// Used by thumbnail/HLS generation to detect pre-existing derivatives /// from another library before regenerating. fn find_by_content_hash( &mut self, context: &opentelemetry::Context, hash: &str, ) -> Result, DbError>; } pub struct SqliteExifDao { connection: Arc>, } impl Default for SqliteExifDao { fn default() -> Self { Self::new() } } impl SqliteExifDao { pub fn new() -> Self { SqliteExifDao { connection: Arc::new(Mutex::new(connect())), } } } impl ExifDao for SqliteExifDao { fn store_exif( &mut self, context: &opentelemetry::Context, exif_data: InsertImageExif, ) -> Result { trace_db_call(context, "insert", "store_exif", |_span| { use schema::image_exif::dsl::*; let mut connection = self.connection.lock().expect("Unable to get ExifDao"); diesel::insert_into(image_exif) .values(&exif_data) .execute(connection.deref_mut()) .map_err(|e| { log::warn!( "image_exif insert failed (lib={}, rel_path={:?}): {}", exif_data.library_id, exif_data.file_path, e ); anyhow::anyhow!("Insert error: {}", e) })?; image_exif .filter(library_id.eq(exif_data.library_id)) .filter(rel_path.eq(&exif_data.file_path)) .first::(connection.deref_mut()) .map_err(|e| anyhow::anyhow!("Post-insert lookup failed: {}", e)) }) .map_err(|_| DbError::new(DbErrorKind::InsertError)) } fn get_exif( &mut self, context: &opentelemetry::Context, path: &str, ) -> Result, DbError> { trace_db_call(context, "query", "get_exif", |_span| { use schema::image_exif::dsl::*; let mut connection = self.connection.lock().expect("Unable to get ExifDao"); // Try both normalized (forward slash) and Windows (backslash) paths // since database may contain either format let normalized = path.replace('\\', "/"); let windows_path = path.replace('/', "\\"); match image_exif .filter(rel_path.eq(&normalized).or(rel_path.eq(&windows_path))) .first::(connection.deref_mut()) { Ok(exif) => Ok(Some(exif)), Err(diesel::result::Error::NotFound) => Ok(None), Err(_) => Err(anyhow::anyhow!("Query error")), } }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn update_exif( &mut self, context: &opentelemetry::Context, exif_data: InsertImageExif, ) -> Result { trace_db_call(context, "update", "update_exif", |_span| { use schema::image_exif::dsl::*; let mut connection = self.connection.lock().expect("Unable to get ExifDao"); diesel::update( image_exif .filter(library_id.eq(exif_data.library_id)) .filter(rel_path.eq(&exif_data.file_path)), ) .set(( camera_make.eq(&exif_data.camera_make), camera_model.eq(&exif_data.camera_model), lens_model.eq(&exif_data.lens_model), width.eq(&exif_data.width), height.eq(&exif_data.height), orientation.eq(&exif_data.orientation), gps_latitude.eq(&exif_data.gps_latitude), gps_longitude.eq(&exif_data.gps_longitude), gps_altitude.eq(&exif_data.gps_altitude), focal_length.eq(&exif_data.focal_length), aperture.eq(&exif_data.aperture), shutter_speed.eq(&exif_data.shutter_speed), iso.eq(&exif_data.iso), date_taken.eq(&exif_data.date_taken), last_modified.eq(&exif_data.last_modified), )) .execute(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Update error"))?; image_exif .filter(library_id.eq(exif_data.library_id)) .filter(rel_path.eq(&exif_data.file_path)) .first::(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } fn delete_exif(&mut self, context: &opentelemetry::Context, path: &str) -> Result<(), DbError> { trace_db_call(context, "delete", "delete_exif", |_span| { use schema::image_exif::dsl::*; diesel::delete(image_exif.filter(rel_path.eq(path))) .execute(self.connection.lock().unwrap().deref_mut()) .map(|_| ()) .map_err(|_| anyhow::anyhow!("Delete error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_all_with_date_taken( &mut self, context: &opentelemetry::Context, ) -> Result, DbError> { trace_db_call(context, "query", "get_all_with_date_taken", |_span| { use schema::image_exif::dsl::*; let mut connection = self.connection.lock().expect("Unable to get ExifDao"); image_exif .select((rel_path, date_taken)) .filter(date_taken.is_not_null()) .load::<(String, Option)>(connection.deref_mut()) .map(|records| { records .into_iter() .filter_map(|(path, dt)| dt.map(|ts| (path, ts))) .collect() }) .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_exif_batch( &mut self, context: &opentelemetry::Context, file_paths: &[String], ) -> Result, DbError> { trace_db_call(context, "query", "get_exif_batch", |_span| { use schema::image_exif::dsl::*; if file_paths.is_empty() { return Ok(Vec::new()); } let mut connection = self.connection.lock().expect("Unable to get ExifDao"); image_exif .filter(rel_path.eq_any(file_paths)) .load::(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn query_by_exif( &mut self, context: &opentelemetry::Context, camera_make_filter: Option<&str>, camera_model_filter: Option<&str>, lens_model_filter: Option<&str>, gps_bounds: Option<(f64, f64, f64, f64)>, date_from: Option, date_to: Option, ) -> Result, DbError> { trace_db_call(context, "query", "query_by_exif", |_span| { use schema::image_exif::dsl::*; let mut connection = self.connection.lock().expect("Unable to get ExifDao"); let mut query = image_exif.into_boxed(); // Camera filters (case-insensitive partial match) if let Some(make) = camera_make_filter { query = query.filter(camera_make.like(format!("%{}%", make))); } if let Some(model) = camera_model_filter { query = query.filter(camera_model.like(format!("%{}%", model))); } if let Some(lens) = lens_model_filter { query = query.filter(lens_model.like(format!("%{}%", lens))); } // GPS bounding box if let Some((min_lat, max_lat, min_lon, max_lon)) = gps_bounds { query = query .filter(gps_latitude.between(min_lat as f32, max_lat as f32)) .filter(gps_longitude.between(min_lon as f32, max_lon as f32)) .filter(gps_latitude.is_not_null()) .filter(gps_longitude.is_not_null()); } // Date range if let Some(from) = date_from { query = query.filter(date_taken.ge(from)); } if let Some(to) = date_to { query = query.filter(date_taken.le(to)); } if date_from.is_some() || date_to.is_some() { query = query.filter(date_taken.is_not_null()); } query .load::(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_camera_makes( &mut self, context: &opentelemetry::Context, ) -> Result, DbError> { trace_db_call(context, "query", "get_camera_makes", |_span| { use diesel::dsl::count; use schema::image_exif::dsl::*; let mut connection = self.connection.lock().expect("Unable to get ExifDao"); image_exif .filter(camera_make.is_not_null()) .group_by(camera_make) .select((camera_make, count(id))) .order(count(id).desc()) .load::<(Option, i64)>(connection.deref_mut()) .map(|records| { records .into_iter() .filter_map(|(make, cnt)| make.map(|m| (m, cnt))) .collect() }) .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn update_file_path( &mut self, context: &opentelemetry::Context, old_path: &str, new_path: &str, ) -> Result<(), DbError> { trace_db_call(context, "update", "update_file_path", |_span| { use schema::image_exif::dsl::*; let mut connection = self.connection.lock().expect("Unable to get ExifDao"); diesel::update(image_exif.filter(rel_path.eq(old_path))) .set(rel_path.eq(new_path)) .execute(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Update error"))?; Ok(()) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } fn get_all_file_paths( &mut self, context: &opentelemetry::Context, ) -> Result, DbError> { trace_db_call(context, "query", "get_all_file_paths", |_span| { use schema::image_exif::dsl::*; let mut connection = self.connection.lock().expect("Unable to get ExifDao"); image_exif .select(rel_path) .load(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_all_with_gps( &mut self, context: &opentelemetry::Context, base_path: &str, recursive: bool, ) -> Result)>, DbError> { trace_db_call(context, "query", "get_all_with_gps", |span| { use opentelemetry::KeyValue; use opentelemetry::trace::Span; use schema::image_exif::dsl::*; span.set_attributes(vec![ KeyValue::new("base_path", base_path.to_string()), KeyValue::new("recursive", recursive.to_string()), ]); let connection = &mut *self.connection.lock().unwrap(); // Query all photos with non-null GPS coordinates let mut query = image_exif .filter(gps_latitude.is_not_null().and(gps_longitude.is_not_null())) .into_boxed(); // Apply path filtering // If base_path is empty or "/", return all GPS photos (no filter) // Otherwise filter by path prefix if !base_path.is_empty() && base_path != "/" { // Match base path as prefix (with wildcard) query = query.filter(rel_path.like(format!("{}%", base_path))); span.set_attribute(KeyValue::new("path_filter_applied", true)); } else { span.set_attribute(KeyValue::new("path_filter_applied", false)); span.set_attribute(KeyValue::new("returning_all_gps_photos", true)); } // Load full ImageExif records let results: Vec = query .load::(connection) .map_err(|e| anyhow::anyhow!("GPS query error: {}", e))?; // Convert to tuple format (path, lat, lon, date_taken) // Filter out any rows where GPS is still None (shouldn't happen due to filter) // Cast f32 GPS values to f64 for API compatibility let filtered: Vec<(String, f64, f64, Option)> = results .into_iter() .filter_map(|exif| { if let (Some(lat_val), Some(lon_val)) = (exif.gps_latitude, exif.gps_longitude) { Some(( exif.file_path, lat_val as f64, lon_val as f64, exif.date_taken, )) } else { None } }) .collect(); span.set_attribute(KeyValue::new("result_count", filtered.len() as i64)); Ok(filtered) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn get_rows_missing_hash( &mut self, context: &opentelemetry::Context, limit: i64, ) -> Result, DbError> { trace_db_call(context, "query", "get_rows_missing_hash", |_span| { use schema::image_exif::dsl::*; let mut connection = self.connection.lock().expect("Unable to get ExifDao"); image_exif .filter(content_hash.is_null()) .select((library_id, rel_path)) .order(id.asc()) .limit(limit) .load::<(i32, String)>(connection.deref_mut()) .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } fn backfill_content_hash( &mut self, context: &opentelemetry::Context, library_id_val: i32, rel_path_val: &str, hash: &str, size_val: i64, ) -> Result<(), DbError> { trace_db_call(context, "update", "backfill_content_hash", |_span| { use schema::image_exif::dsl::*; let mut connection = self.connection.lock().expect("Unable to get ExifDao"); diesel::update( image_exif .filter(library_id.eq(library_id_val)) .filter(rel_path.eq(rel_path_val)), ) .set((content_hash.eq(hash), size_bytes.eq(size_val))) .execute(connection.deref_mut()) .map(|_| ()) .map_err(|_| anyhow::anyhow!("Update error")) }) .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } fn find_by_content_hash( &mut self, context: &opentelemetry::Context, hash: &str, ) -> Result, DbError> { trace_db_call(context, "query", "find_by_content_hash", |_span| { use schema::image_exif::dsl::*; let mut connection = self.connection.lock().expect("Unable to get ExifDao"); image_exif .filter(content_hash.eq(hash)) .first::(connection.deref_mut()) .optional() .map_err(|_| anyhow::anyhow!("Query error")) }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } }