From be483c9c1acd6385311ff7c62f1fc2828a3eadd2 Mon Sep 17 00:00:00 2001 From: Cameron Date: Sun, 18 Jan 2026 19:17:10 -0500 Subject: [PATCH 1/6] Add database optimizations for photo search and pagination Implement database-level sorting with composite indexes for efficient date and tag queries. Add pagination metadata support and optimize tag count queries using batch processing. --- .../down.sql | 4 + .../up.sql | 15 + src/data/mod.rs | 14 + src/database/mod.rs | 72 +++ src/files.rs | 428 ++++++++++++++---- src/main.rs | 3 + src/tags.rs | 79 ++++ 7 files changed, 519 insertions(+), 96 deletions(-) create mode 100644 migrations/2026-01-18-000000_optimize_photo_search/down.sql create mode 100644 migrations/2026-01-18-000000_optimize_photo_search/up.sql diff --git a/migrations/2026-01-18-000000_optimize_photo_search/down.sql b/migrations/2026-01-18-000000_optimize_photo_search/down.sql new file mode 100644 index 0000000..562c67b --- /dev/null +++ b/migrations/2026-01-18-000000_optimize_photo_search/down.sql @@ -0,0 +1,4 @@ +-- Revert search performance optimization indexes + +DROP INDEX IF EXISTS idx_image_exif_date_path; +DROP INDEX IF EXISTS idx_tagged_photo_count; diff --git a/migrations/2026-01-18-000000_optimize_photo_search/up.sql b/migrations/2026-01-18-000000_optimize_photo_search/up.sql new file mode 100644 index 0000000..61f327e --- /dev/null +++ b/migrations/2026-01-18-000000_optimize_photo_search/up.sql @@ -0,0 +1,15 @@ +-- Add composite indexes for search performance optimization +-- This migration addresses N+1 query issues and enables database-level sorting + +-- Covering index for date-sorted queries (supports ORDER BY + pagination) +-- Enables efficient date-based sorting without loading all files into memory +CREATE INDEX IF NOT EXISTS idx_image_exif_date_path + ON image_exif(date_taken DESC, file_path); + +-- Optimize batch tag count queries with GROUP BY +-- Reduces N individual queries to a single batch query +CREATE INDEX IF NOT EXISTS idx_tagged_photo_count + ON tagged_photo(photo_name, tag_id); + +-- Update query planner statistics to optimize query execution +ANALYZE; diff --git a/src/data/mod.rs b/src/data/mod.rs index 4ef8f39..6c7460c 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -101,6 +101,16 @@ impl FromRequest for Claims { pub struct PhotosResponse { pub photos: Vec, pub dirs: Vec, + + // Pagination metadata (only present when limit is set) + #[serde(skip_serializing_if = "Option::is_none")] + pub total_count: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub has_more: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub next_offset: Option, } #[derive(Copy, Clone, Deserialize, PartialEq, Debug)] @@ -141,6 +151,10 @@ pub struct FilesRequest { // Media type filtering pub media_type: Option, + + // Pagination parameters (optional - backward compatible) + pub limit: Option, + pub offset: Option, } #[derive(Copy, Clone, Deserialize, PartialEq, Debug)] diff --git a/src/database/mod.rs b/src/database/mod.rs index 0bfd3f1..b93e396 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -296,6 +296,17 @@ pub trait ExifDao: Sync + Send { &mut self, context: &opentelemetry::Context, ) -> Result, DbError>; + + /// Get files sorted by date with optional pagination + /// Returns (sorted_file_paths, total_count) + fn get_files_sorted_by_date( + &mut self, + context: &opentelemetry::Context, + file_paths: &[String], + ascending: bool, + limit: Option, + offset: i64, + ) -> Result<(Vec, i64), DbError>; } pub struct SqliteExifDao { @@ -581,4 +592,65 @@ impl ExifDao for SqliteExifDao { }) .map_err(|_| DbError::new(DbErrorKind::QueryError)) } + + fn get_files_sorted_by_date( + &mut self, + context: &opentelemetry::Context, + file_paths: &[String], + ascending: bool, + limit: Option, + offset: i64, + ) -> Result<(Vec, i64), DbError> { + trace_db_call(context, "query", "get_files_sorted_by_date", |span| { + use diesel::dsl::count_star; + use opentelemetry::KeyValue; + use opentelemetry::trace::Span; + use schema::image_exif::dsl::*; + + span.set_attributes(vec![ + KeyValue::new("file_count", file_paths.len() as i64), + KeyValue::new("ascending", ascending.to_string()), + KeyValue::new("limit", limit.map(|l| l.to_string()).unwrap_or_default()), + KeyValue::new("offset", offset.to_string()), + ]); + + if file_paths.is_empty() { + return Ok((Vec::new(), 0)); + } + + let connection = &mut *self.connection.lock().unwrap(); + + // Get total count of files that have EXIF data + let total_count: i64 = image_exif + .filter(file_path.eq_any(file_paths)) + .select(count_star()) + .first(connection) + .map_err(|_| anyhow::anyhow!("Count query error"))?; + + // Build sorted query + let mut query = image_exif.filter(file_path.eq_any(file_paths)).into_boxed(); + + // Apply sorting + // Note: SQLite NULL handling varies - NULLs appear first for ASC, last for DESC by default + if ascending { + query = query.order(date_taken.asc()); + } else { + query = query.order(date_taken.desc()); + } + + // Apply pagination if requested + if let Some(limit_val) = limit { + query = query.limit(limit_val).offset(offset); + } + + // Execute and extract file paths + let results: Vec = query + .select(file_path) + .load::(connection) + .map_err(|_| anyhow::anyhow!("Query error"))?; + + Ok((results, total_count)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } } diff --git a/src/files.rs b/src/files.rs index 8721568..bfcc754 100644 --- a/src/files.rs +++ b/src/files.rs @@ -47,69 +47,142 @@ use serde::Deserialize; /// Apply sorting to files with EXIF data support for date-based sorting /// Handles both date sorting (with EXIF/filename fallback) and regular sorting +/// Returns (sorted_file_paths, total_count) fn apply_sorting_with_exif( files: Vec, sort_type: SortType, exif_dao: &mut Box, span_context: &opentelemetry::Context, base_path: &Path, -) -> Vec { + limit: Option, + offset: i64, +) -> (Vec, i64) { + let total_count = files.len() as i64; + match sort_type { SortType::DateTakenAsc | SortType::DateTakenDesc => { - info!("Date sorting requested, fetching EXIF data"); + info!("Date sorting requested, using database-level sorting"); // Collect file paths for batch EXIF query let file_paths: Vec = files.iter().map(|f| f.file_name.clone()).collect(); - // Batch fetch EXIF data - let exif_map: std::collections::HashMap = exif_dao - .get_exif_batch(span_context, &file_paths) - .unwrap_or_default() - .into_iter() - .filter_map(|exif| exif.date_taken.map(|dt| (exif.file_path, dt))) - .collect(); - - // Convert to FileWithMetadata with date fallback logic - let files_with_metadata: Vec = files - .into_iter() - .map(|f| { - // Try EXIF date first - let date_taken = exif_map - .get(&f.file_name) - .copied() - .or_else(|| { - // Fallback to filename extraction - extract_date_from_filename(&f.file_name).map(|dt| dt.timestamp()) - }) - .or_else(|| { - // Fallback to filesystem metadata creation date - let full_path = base_path.join(&f.file_name); - std::fs::metadata(full_path) - .and_then(|md| md.created().or(md.modified())) - .ok() - .map(|system_time| { - >>::into(system_time) - .timestamp() - }) - }); - - FileWithMetadata { - file_name: f.file_name, - tag_count: f.tag_count, - date_taken, - } - }) - .collect(); - - sort_with_metadata(files_with_metadata, sort_type) + // Try database-level sorting first (most efficient) + let ascending = sort_type == SortType::DateTakenAsc; + match exif_dao.get_files_sorted_by_date( + span_context, + &file_paths, + ascending, + limit, + offset, + ) { + Ok((sorted_files, db_total)) => { + info!( + "Database-level date sorting succeeded, returned {} files", + sorted_files.len() + ); + (sorted_files, db_total) + } + Err(e) => { + warn!( + "Database-level sorting failed: {:?}, falling back to in-memory sort", + e + ); + // Fallback to in-memory sorting with date extraction + let (sorted, _) = in_memory_date_sort( + files, + sort_type, + exif_dao, + span_context, + base_path, + limit, + offset, + ); + (sorted, total_count) + } + } } _ => { // Use regular sort for non-date sorting - sort(files, sort_type) + let sorted = sort(files, sort_type); + let result = if let Some(limit_val) = limit { + sorted + .into_iter() + .skip(offset as usize) + .take(limit_val as usize) + .collect() + } else { + sorted + }; + (result, total_count) } } } +/// Fallback in-memory date sorting with EXIF/filename extraction +fn in_memory_date_sort( + files: Vec, + sort_type: SortType, + exif_dao: &mut Box, + span_context: &opentelemetry::Context, + base_path: &Path, + limit: Option, + offset: i64, +) -> (Vec, i64) { + let total_count = files.len() as i64; + let file_paths: Vec = files.iter().map(|f| f.file_name.clone()).collect(); + + // Batch fetch EXIF data + let exif_map: std::collections::HashMap = exif_dao + .get_exif_batch(span_context, &file_paths) + .unwrap_or_default() + .into_iter() + .filter_map(|exif| exif.date_taken.map(|dt| (exif.file_path, dt))) + .collect(); + + // Convert to FileWithMetadata with date fallback logic + let files_with_metadata: Vec = files + .into_iter() + .map(|f| { + // Try EXIF date first + let date_taken = exif_map + .get(&f.file_name) + .copied() + .or_else(|| { + // Fallback to filename extraction + extract_date_from_filename(&f.file_name).map(|dt| dt.timestamp()) + }) + .or_else(|| { + // Fallback to filesystem metadata creation date + let full_path = base_path.join(&f.file_name); + std::fs::metadata(full_path) + .and_then(|md| md.created().or(md.modified())) + .ok() + .map(|system_time| { + >>::into(system_time).timestamp() + }) + }); + + FileWithMetadata { + file_name: f.file_name, + tag_count: f.tag_count, + date_taken, + } + }) + .collect(); + + let sorted = sort_with_metadata(files_with_metadata, sort_type); + let result = if let Some(limit_val) = limit { + sorted + .into_iter() + .skip(offset as usize) + .take(limit_val as usize) + .collect() + } else { + sorted + }; + (result, total_count) +} + pub async fn list_photos( _: Claims, request: HttpRequest, @@ -171,6 +244,23 @@ pub async fn list_photos( .map(|mt| format!("{:?}", mt)) .unwrap_or_default(), ), + // Pagination parameters + KeyValue::new("pagination.enabled", req.limit.is_some().to_string()), + KeyValue::new( + "pagination.limit", + req.limit.map(|l| l.to_string()).unwrap_or_default(), + ), + KeyValue::new("pagination.offset", req.offset.unwrap_or(0).to_string()), + // Optimization flags + KeyValue::new("optimization.batch_tags", "true"), + KeyValue::new( + "optimization.db_sort", + matches!( + req.sort, + Some(SortType::DateTakenAsc | SortType::DateTakenDesc) + ) + .to_string(), + ), ]); let span_context = opentelemetry::Context::current_with_span(span); @@ -332,8 +422,10 @@ pub async fn list_photos( .collect::>() }) .map(|files| { - // Handle sorting - use helper function that supports EXIF date sorting + // Handle sorting - use helper function that supports EXIF date sorting and pagination let sort_type = req.sort.unwrap_or(NameAsc); + let limit = req.limit; + let offset = req.offset.unwrap_or(0); let mut exif_dao_guard = exif_dao.lock().expect("Unable to get ExifDao"); let result = apply_sorting_with_exif( files, @@ -341,25 +433,50 @@ pub async fn list_photos( &mut exif_dao_guard, &span_context, app_state.base_path.as_ref(), + limit, + offset, ); drop(exif_dao_guard); result }) - .inspect(|files| debug!("Found {:?} files", files.len())) - .map(|tagged_files: Vec| { + .inspect(|(files, total)| debug!("Found {:?} files (total: {})", files.len(), total)) + .map(|(tagged_files, total_count)| { info!( "Found {:?} tagged files: {:?}", tagged_files.len(), tagged_files ); + + let returned_count = tagged_files.len() as i64; + let offset = req.offset.unwrap_or(0); + let pagination_metadata = if req.limit.is_some() { + ( + Some(total_count), + Some(offset + returned_count < total_count), + if offset + returned_count < total_count { + Some(offset + returned_count) + } else { + None + }, + ) + } else { + (None, None, None) + }; + span_context .span() .set_attribute(KeyValue::new("file_count", tagged_files.len().to_string())); + span_context + .span() + .set_attribute(KeyValue::new("total_count", total_count.to_string())); span_context.span().set_status(Status::Ok); HttpResponse::Ok().json(PhotosResponse { photos: tagged_files, dirs: vec![], + total_count: pagination_metadata.0, + has_more: pagination_metadata.1, + next_offset: pagination_metadata.2, }) }) .into_http_internal_err() @@ -393,38 +510,99 @@ pub async fn list_photos( ); info!("Starting to filter {} files from filesystem", files.len()); + let start_filter = std::time::Instant::now(); - let photos = files - .iter() - .filter(|&f| { - f.metadata().map_or_else( - |e| { - error!("Failed getting file metadata: {:?}", e); - f.extension().is_some() - }, - |md| md.is_file(), - ) - }) - .map(|path: &PathBuf| { - let relative = path.strip_prefix(&app_state.base_path).unwrap_or_else(|_| { - panic!( - "Unable to strip base path {} from file path {}", - &app_state.base_path.path(), - path.display() - ) + // Separate files and directories in a single pass to avoid redundant metadata calls + let (file_names, dirs): (Vec, Vec) = + files + .iter() + .fold((Vec::new(), Vec::new()), |(mut files, mut dirs), path| { + match path.metadata() { + Ok(md) => { + let relative = + path.strip_prefix(&app_state.base_path).unwrap_or_else(|_| { + panic!( + "Unable to strip base path {} from file path {}", + &app_state.base_path.path(), + path.display() + ) + }); + let relative_str = relative.to_str().unwrap().to_string(); + + if md.is_file() { + files.push(relative_str); + } else if md.is_dir() { + dirs.push(relative_str); + } + } + Err(e) => { + error!("Failed getting file metadata: {:?}", e); + // Include files without metadata if they have extensions + if path.extension().is_some() { + let relative = path + .strip_prefix(&app_state.base_path) + .unwrap_or_else(|_| { + panic!( + "Unable to strip base path {} from file path {}", + &app_state.base_path.path(), + path.display() + ) + }); + files.push(relative.to_str().unwrap().to_string()); + } + } + } + (files, dirs) }); - relative.to_path_buf() - }) - .map(|f| f.to_str().unwrap().to_string()) - .map(|file_name| { - let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); - let file_tags = tag_dao - .get_tags_for_path(&span_context, &file_name) - .unwrap_or_default(); + info!( + "File filtering took {:?}, now fetching tag counts for {} files", + start_filter.elapsed(), + file_names.len() + ); + let start_tags = std::time::Instant::now(); + + // Batch query for tag counts to avoid N+1 queries + let tag_counts = { + let mut tag_dao_guard = tag_dao.lock().expect("Unable to get TagDao"); + tag_dao_guard + .get_tag_counts_batch(&span_context, &file_names) + .unwrap_or_default() + }; + info!("Batch tag count query took {:?}", start_tags.elapsed()); + + // Also get full tag lists for files that need tag filtering + let start_tag_filter = std::time::Instant::now(); + let file_tags_map: std::collections::HashMap> = + if req.tag_ids.is_some() || req.exclude_tag_ids.is_some() { + info!( + "Tag filtering requested, fetching full tag lists for {} files", + file_names.len() + ); + let mut tag_dao_guard = tag_dao.lock().expect("Unable to get TagDao"); + file_names + .iter() + .filter_map(|file_name| { + tag_dao_guard + .get_tags_for_path(&span_context, file_name) + .ok() + .map(|tags| (file_name.clone(), tags)) + }) + .collect() + } else { + std::collections::HashMap::new() + }; + if req.tag_ids.is_some() || req.exclude_tag_ids.is_some() { + info!("Full tag list fetch took {:?}", start_tag_filter.elapsed()); + } + + let photos = file_names + .into_iter() + .map(|file_name| { + let file_tags = file_tags_map.get(&file_name).cloned().unwrap_or_default(); (file_name, file_tags) }) - .filter(|(_, file_tags)| { + .filter(|(_, file_tags): &(String, Vec)| { if let Some(tag_ids) = &req.tag_ids { let tag_ids = tag_ids .split(',') @@ -472,16 +650,28 @@ pub async fn list_photos( true } }) - .map(|(file_name, tags)| FileWithTagCount { - file_name, - tag_count: tags.len() as i64, - }) + .map( + |(file_name, _tags): (String, Vec)| FileWithTagCount { + file_name: file_name.clone(), + tag_count: *tag_counts.get(&file_name).unwrap_or(&0), + }, + ) .collect::>(); - info!("After all filters, {} files remain", photos.len()); + info!( + "After all filters, {} files remain (filtering took {:?})", + photos.len(), + start_filter.elapsed() + ); - // Handle sorting - use helper function that supports EXIF date sorting - let response_files = if let Some(sort_type) = req.sort { + // Extract pagination parameters + let limit = req.limit; + let offset = req.offset.unwrap_or(0); + let start_sort = std::time::Instant::now(); + + // Handle sorting - use helper function that supports EXIF date sorting and pagination + let (response_files, total_count) = if let Some(sort_type) = req.sort { + info!("Sorting {} files by {:?}", photos.len(), sort_type); let mut exif_dao_guard = exif_dao.lock().expect("Unable to get ExifDao"); let result = apply_sorting_with_exif( photos, @@ -489,35 +679,68 @@ pub async fn list_photos( &mut exif_dao_guard, &span_context, app_state.base_path.as_ref(), + limit, + offset, ); drop(exif_dao_guard); result } else { - // No sorting requested - photos - .into_iter() - .map(|f| f.file_name) - .collect::>() + // No sorting requested - apply pagination if requested + let total = photos.len() as i64; + let files: Vec = if let Some(limit_val) = limit { + photos + .into_iter() + .skip(offset as usize) + .take(limit_val as usize) + .map(|f| f.file_name) + .collect() + } else { + photos.into_iter().map(|f| f.file_name).collect() + }; + (files, total) }; + info!( + "Sorting took {:?}, returned {} files (total: {})", + start_sort.elapsed(), + response_files.len(), + total_count + ); - let dirs = files - .iter() - .filter(|&f| f.metadata().is_ok_and(|md| md.is_dir())) - .map(|path: &PathBuf| { - let relative = path.strip_prefix(&app_state.base_path).unwrap(); - relative.to_path_buf() - }) - .map(|f| f.to_str().unwrap().to_string()) - .collect::>(); + // Note: dirs were already collected during file filtering to avoid redundant metadata calls + + // Calculate pagination metadata + let returned_count = response_files.len() as i64; + let pagination_metadata = if limit.is_some() { + ( + Some(total_count), + Some(offset + returned_count < total_count), + if offset + returned_count < total_count { + Some(offset + returned_count) + } else { + None + }, + ) + } else { + (None, None, None) + }; span_context .span() .set_attribute(KeyValue::new("file_count", files.len().to_string())); + span_context + .span() + .set_attribute(KeyValue::new("returned_count", returned_count.to_string())); + span_context + .span() + .set_attribute(KeyValue::new("total_count", total_count.to_string())); span_context.span().set_status(Status::Ok); HttpResponse::Ok().json(PhotosResponse { photos: response_files, dirs, + total_count: pagination_metadata.0, + has_more: pagination_metadata.1, + next_offset: pagination_metadata.2, }) } _ => { @@ -902,7 +1125,7 @@ mod tests { &mut self, _context: &opentelemetry::Context, data: crate::database::models::InsertImageExif, - ) -> Result { + ) -> Result { // Return a dummy ImageExif for tests Ok(crate::database::models::ImageExif { id: 1, @@ -1020,6 +1243,19 @@ mod tests { ) -> Result, DbError> { Ok(Vec::new()) } + + fn get_files_sorted_by_date( + &mut self, + _context: &opentelemetry::Context, + file_paths: &[String], + _ascending: bool, + _limit: Option, + _offset: i64, + ) -> Result<(Vec, i64), DbError> { + // For tests, just return all files unsorted + let count = file_paths.len() as i64; + Ok((file_paths.to_vec(), count)) + } } mod api { diff --git a/src/main.rs b/src/main.rs index b37f5fd..b7bc844 100644 --- a/src/main.rs +++ b/src/main.rs @@ -527,6 +527,9 @@ async fn favorites( HttpResponse::Ok().json(PhotosResponse { photos: favorites, dirs: Vec::new(), + total_count: None, + has_more: None, + next_offset: None, }) } Ok(Err(e)) => { diff --git a/src/tags.rs b/src/tags.rs index ef23ec6..3a16c2f 100644 --- a/src/tags.rs +++ b/src/tags.rs @@ -322,6 +322,11 @@ pub trait TagDao { &mut self, context: &opentelemetry::Context, ) -> anyhow::Result>; + fn get_tag_counts_batch( + &mut self, + context: &opentelemetry::Context, + file_paths: &[String], + ) -> anyhow::Result>; } pub struct SqliteTagDao { @@ -642,6 +647,65 @@ impl TagDao for SqliteTagDao { .load(&mut self.connection) .with_context(|| "Unable to get photo names") } + + fn get_tag_counts_batch( + &mut self, + context: &opentelemetry::Context, + file_paths: &[String], + ) -> anyhow::Result> { + use std::collections::HashMap; + + trace_db_call(context, "query", "get_tag_counts_batch", |span| { + span.set_attribute(KeyValue::new("file_count", file_paths.len() as i64)); + + if file_paths.is_empty() { + return Ok(HashMap::new()); + } + + use diesel::dsl::*; + use diesel::sql_types::*; + + // Build dynamic query with placeholders + let placeholders = std::iter::repeat_n("?", file_paths.len()) + .collect::>() + .join(", "); + + let query_str = format!( + r#" + SELECT photo_name, COUNT(DISTINCT tag_id) as tag_count + FROM tagged_photo + WHERE photo_name IN ({}) + GROUP BY photo_name + "#, + placeholders + ); + + let mut query = sql_query(query_str).into_boxed(); + + // Bind all file paths + for path in file_paths { + query = query.bind::(path); + } + + #[derive(QueryableByName, Debug)] + struct TagCountRow { + #[diesel(sql_type = Text)] + photo_name: String, + #[diesel(sql_type = BigInt)] + tag_count: i64, + } + + // Execute query and convert to HashMap + query + .load::(&mut self.connection) + .with_context(|| "Unable to get batch tag counts") + .map(|rows| { + rows.into_iter() + .map(|row| (row.photo_name, row.tag_count)) + .collect::>() + }) + }) + } } #[cfg(test)] @@ -818,6 +882,21 @@ mod tests { ) -> anyhow::Result> { todo!() } + + fn get_tag_counts_batch( + &mut self, + _context: &opentelemetry::Context, + file_paths: &[String], + ) -> anyhow::Result> { + use std::collections::HashMap; + let mut counts = HashMap::new(); + for path in file_paths { + if let Some(tags) = self.tagged_photos.borrow().get(path) { + counts.insert(path.clone(), tags.len() as i64); + } + } + Ok(counts) + } } #[actix_rt::test] From 85e656767492e8591396578115436f33e9903fc9 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Sun, 18 Jan 2026 19:17:53 -0500 Subject: [PATCH 2/6] Bump to 0.5.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index acf3598..2c237b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "image-api" -version = "0.5.0" +version = "0.5.1" authors = ["Cameron Cordes "] edition = "2024" From ea53932b4b8c584fe78b357a10a1b6a10711a781 Mon Sep 17 00:00:00 2001 From: Cameron Date: Sun, 18 Jan 2026 19:26:23 -0500 Subject: [PATCH 3/6] Update Cargo.lock for version bump --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 7ef5729..0a17c92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1797,7 +1797,7 @@ dependencies = [ [[package]] name = "image-api" -version = "0.5.0" +version = "0.5.1" dependencies = [ "actix", "actix-cors", From 924577839154b86b27403b273724415f9390b941 Mon Sep 17 00:00:00 2001 From: Cameron Date: Mon, 19 Jan 2026 22:32:00 -0500 Subject: [PATCH 4/6] Generate video playlists on period directory scans --- src/main.rs | 54 ++++++++++++++++++++++++++++++++++++--------- src/video/actors.rs | 40 ++++++++++++++++++++++++++++++--- 2 files changed, 80 insertions(+), 14 deletions(-) diff --git a/src/main.rs b/src/main.rs index b7bc844..75296b0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ extern crate diesel; extern crate rayon; +use actix::Addr; use actix_web::web::Data; use actix_web_prom::PrometheusMetricsBuilder; use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; @@ -45,7 +46,8 @@ use crate::service::ServiceBuilder; use crate::state::AppState; use crate::tags::*; use crate::video::actors::{ - ProcessMessage, ScanDirectoryMessage, create_playlist, generate_video_thumbnail, + ProcessMessage, QueueVideosMessage, ScanDirectoryMessage, VideoPlaylistManager, + create_playlist, generate_video_thumbnail, }; use log::{debug, error, info, trace, warn}; use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; @@ -714,8 +716,6 @@ fn main() -> std::io::Result<()> { run_migrations(&mut connect()).expect("Failed to run migrations"); - watch_files(); - let system = actix::System::new(); system.block_on(async { // Just use basic logger when running a non-release build @@ -754,6 +754,10 @@ fn main() -> std::io::Result<()> { directory: app_state.base_path.clone(), }); + // Start file watcher with playlist manager + let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone(); + watch_files(playlist_mgr_for_watcher); + // Spawn background job to generate daily conversation summaries { use crate::ai::generate_daily_summaries; @@ -896,8 +900,8 @@ fn run_migrations( Ok(()) } -fn watch_files() { - std::thread::spawn(|| { +fn watch_files(playlist_manager: Addr) { + std::thread::spawn(move || { let base_str = dotenv::var("BASE_PATH").unwrap(); let base_path = PathBuf::from(&base_str); @@ -940,7 +944,7 @@ fn watch_files() { if is_full_scan { info!("Running full scan (scan #{})", scan_count); - process_new_files(&base_path, Arc::clone(&exif_dao), None); + process_new_files(&base_path, Arc::clone(&exif_dao), None, playlist_manager.clone()); last_full_scan = now; } else { debug!( @@ -951,7 +955,7 @@ fn watch_files() { let check_since = last_quick_scan .checked_sub(Duration::from_secs(10)) .unwrap_or(last_quick_scan); - process_new_files(&base_path, Arc::clone(&exif_dao), Some(check_since)); + process_new_files(&base_path, Arc::clone(&exif_dao), Some(check_since), playlist_manager.clone()); } last_quick_scan = now; @@ -967,6 +971,7 @@ fn process_new_files( base_path: &Path, exif_dao: Arc>>, modified_since: Option, + playlist_manager: Addr, ) { let context = opentelemetry::Context::new(); let thumbs = dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined"); @@ -1030,14 +1035,14 @@ fn process_new_files( let mut files_needing_exif = Vec::new(); // Check each file for missing thumbnail or EXIF data - for (file_path, relative_path) in files { + for (file_path, relative_path) in &files { // Check if thumbnail exists - let thumb_path = thumbnail_directory.join(&relative_path); + let thumb_path = thumbnail_directory.join(relative_path); let needs_thumbnail = !thumb_path.exists(); // Check if EXIF data exists (for supported files) let needs_exif = if exif::supports_exif(&file_path) { - !existing_exif_paths.contains_key(&relative_path) + !existing_exif_paths.contains_key(relative_path) } else { false }; @@ -1050,7 +1055,7 @@ fn process_new_files( } if needs_exif { - files_needing_exif.push((file_path, relative_path)); + files_needing_exif.push((file_path.clone(), relative_path.clone())); } } } @@ -1104,6 +1109,33 @@ fn process_new_files( } } + // Check for videos that need HLS playlists + let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); + let mut videos_needing_playlists = Vec::new(); + + for (file_path, _relative_path) in &files { + if is_video_file(&file_path) { + // Construct expected playlist path + let playlist_filename = format!( + "{}.m3u8", + file_path.file_name().unwrap().to_string_lossy() + ); + let playlist_path = Path::new(&video_path_base).join(&playlist_filename); + + // Check if playlist already exists + if !playlist_path.exists() { + videos_needing_playlists.push(file_path.clone()); + } + } + } + + // Send queue request to playlist manager + if !videos_needing_playlists.is_empty() { + playlist_manager.do_send(QueueVideosMessage { + video_paths: videos_needing_playlists, + }); + } + // Generate thumbnails for all files that need them if new_files_found { info!("Processing thumbnails for new files..."); diff --git a/src/video/actors.rs b/src/video/actors.rs index 7902158..7f38b05 100644 --- a/src/video/actors.rs +++ b/src/video/actors.rs @@ -192,17 +192,51 @@ impl Handler for VideoPlaylistManager { } } +impl Handler for VideoPlaylistManager { + type Result = (); + + fn handle(&mut self, msg: QueueVideosMessage, _ctx: &mut Self::Context) -> Self::Result { + if msg.video_paths.is_empty() { + return; + } + + info!( + "Queueing {} videos for HLS playlist generation", + msg.video_paths.len() + ); + + let playlist_output_dir = self.playlist_dir.clone(); + let playlist_generator = self.playlist_generator.clone(); + + for video_path in msg.video_paths { + let path_str = video_path.to_string_lossy().to_string(); + debug!("Queueing playlist generation for: {}", path_str); + + playlist_generator.do_send(GeneratePlaylistMessage { + playlist_path: playlist_output_dir.to_str().unwrap().to_string(), + video_path, + }); + } + } +} + #[derive(Message)] #[rtype(result = "()")] pub struct ScanDirectoryMessage { pub(crate) directory: String, } +#[derive(Message)] +#[rtype(result = "()")] +pub struct QueueVideosMessage { + pub video_paths: Vec, +} + #[derive(Message)] #[rtype(result = "Result<()>")] -struct GeneratePlaylistMessage { - video_path: PathBuf, - playlist_path: String, +pub struct GeneratePlaylistMessage { + pub video_path: PathBuf, + pub playlist_path: String, } pub struct PlaylistGenerator { From 08f402d4d1ce3fb170f30e5d2946b77d1ffd6a01 Mon Sep 17 00:00:00 2001 From: Cameron Date: Mon, 19 Jan 2026 22:37:50 -0500 Subject: [PATCH 5/6] Add h264 codec detection and orphaned playlist cleanup job Implement `is_h264_encoded` to detect existing h264 videos and optimize processing by using stream copy when possible. Introduce a background job for cleaning up orphaned playlists and segments based on missing source videos. Improve checks for playlist generation necessity. --- src/main.rs | 156 +++++++++++++++++++++++++++++++++++++++++++- src/video/actors.rs | 92 +++++++++++++++++++++----- 2 files changed, 228 insertions(+), 20 deletions(-) diff --git a/src/main.rs b/src/main.rs index 75296b0..c2b3161 100644 --- a/src/main.rs +++ b/src/main.rs @@ -758,6 +758,9 @@ fn main() -> std::io::Result<()> { let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone(); watch_files(playlist_mgr_for_watcher); + // Start orphaned playlist cleanup job + cleanup_orphaned_playlists(); + // Spawn background job to generate daily conversation summaries { use crate::ai::generate_daily_summaries; @@ -900,6 +903,130 @@ fn run_migrations( Ok(()) } +/// Clean up orphaned HLS playlists and segments whose source videos no longer exist +fn cleanup_orphaned_playlists() { + std::thread::spawn(|| { + let video_path = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); + let base_path = dotenv::var("BASE_PATH").expect("BASE_PATH must be set"); + + // Get cleanup interval from environment (default: 24 hours) + let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(86400); // 24 hours + + info!("Starting orphaned playlist cleanup job"); + info!(" Cleanup interval: {} seconds", cleanup_interval_secs); + info!(" Playlist directory: {}", video_path); + + loop { + std::thread::sleep(Duration::from_secs(cleanup_interval_secs)); + + info!("Running orphaned playlist cleanup"); + let start = std::time::Instant::now(); + let mut deleted_count = 0; + let mut error_count = 0; + + // Find all .m3u8 files in VIDEO_PATH + let playlists: Vec = WalkDir::new(&video_path) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + .filter(|e| { + e.path() + .extension() + .and_then(|s| s.to_str()) + .map(|ext| ext.eq_ignore_ascii_case("m3u8")) + .unwrap_or(false) + }) + .map(|e| e.path().to_path_buf()) + .collect(); + + info!("Found {} playlist files to check", playlists.len()); + + for playlist_path in playlists { + // Extract the original video filename from playlist name + // Playlist format: {VIDEO_PATH}/{original_filename}.m3u8 + if let Some(filename) = playlist_path.file_stem() { + let video_filename = filename.to_string_lossy(); + + // Search for this video file in BASE_PATH + let mut video_exists = false; + for entry in WalkDir::new(&base_path) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + { + if let Some(entry_stem) = entry.path().file_stem() { + if entry_stem == filename && is_video_file(entry.path()) { + video_exists = true; + break; + } + } + } + + if !video_exists { + debug!( + "Source video for playlist {} no longer exists, deleting", + playlist_path.display() + ); + + // Delete the playlist file + if let Err(e) = std::fs::remove_file(&playlist_path) { + warn!("Failed to delete playlist {}: {}", playlist_path.display(), e); + error_count += 1; + } else { + deleted_count += 1; + + // Also try to delete associated .ts segment files + // They are typically named {filename}N.ts in the same directory + if let Some(parent_dir) = playlist_path.parent() { + for entry in WalkDir::new(parent_dir) + .max_depth(1) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + { + let entry_path = entry.path(); + if let Some(ext) = entry_path.extension() { + if ext.eq_ignore_ascii_case("ts") { + // Check if this .ts file belongs to our playlist + if let Some(ts_stem) = entry_path.file_stem() { + let ts_name = ts_stem.to_string_lossy(); + if ts_name.starts_with(&*video_filename) { + if let Err(e) = std::fs::remove_file(entry_path) { + debug!( + "Failed to delete segment {}: {}", + entry_path.display(), + e + ); + } else { + debug!( + "Deleted segment: {}", + entry_path.display() + ); + } + } + } + } + } + } + } + } + } + } + } + + info!( + "Orphaned playlist cleanup completed in {:?}: deleted {} playlists, {} errors", + start.elapsed(), + deleted_count, + error_count + ); + } + }); +} + fn watch_files(playlist_manager: Addr) { std::thread::spawn(move || { let base_str = dotenv::var("BASE_PATH").unwrap(); @@ -967,6 +1094,31 @@ fn watch_files(playlist_manager: Addr) { }); } +/// Check if a playlist needs to be (re)generated +/// Returns true if: +/// - Playlist doesn't exist, OR +/// - Source video is newer than the playlist +fn playlist_needs_generation(video_path: &Path, playlist_path: &Path) -> bool { + if !playlist_path.exists() { + return true; + } + + // Check if source video is newer than playlist + if let (Ok(video_meta), Ok(playlist_meta)) = ( + std::fs::metadata(video_path), + std::fs::metadata(playlist_path), + ) { + if let (Ok(video_modified), Ok(playlist_modified)) = + (video_meta.modified(), playlist_meta.modified()) + { + return video_modified > playlist_modified; + } + } + + // If we can't determine, assume it needs generation + true +} + fn process_new_files( base_path: &Path, exif_dao: Arc>>, @@ -1122,8 +1274,8 @@ fn process_new_files( ); let playlist_path = Path::new(&video_path_base).join(&playlist_filename); - // Check if playlist already exists - if !playlist_path.exists() { + // Check if playlist needs (re)generation + if playlist_needs_generation(&file_path, &playlist_path) { videos_needing_playlists.push(file_path.clone()); } } diff --git a/src/video/actors.rs b/src/video/actors.rs index 7f38b05..51721a9 100644 --- a/src/video/actors.rs +++ b/src/video/actors.rs @@ -100,6 +100,44 @@ pub fn generate_video_thumbnail(path: &Path, destination: &Path) { .expect("Failure to create video frame"); } +/// Check if a video is already encoded with h264 codec +/// Returns true if the video uses h264, false otherwise or if detection fails +async fn is_h264_encoded(video_path: &str) -> bool { + let output = tokio::process::Command::new("ffprobe") + .arg("-v") + .arg("error") + .arg("-select_streams") + .arg("v:0") + .arg("-show_entries") + .arg("stream=codec_name") + .arg("-of") + .arg("default=noprint_wrappers=1:nokey=1") + .arg(video_path) + .output() + .await; + + match output { + Ok(output) if output.status.success() => { + let codec = String::from_utf8_lossy(&output.stdout); + let codec = codec.trim(); + debug!("Detected codec for {}: {}", video_path, codec); + codec == "h264" + } + Ok(output) => { + warn!( + "ffprobe failed for {}: {}", + video_path, + String::from_utf8_lossy(&output.stderr) + ); + false + } + Err(e) => { + warn!("Failed to run ffprobe for {}: {}", video_path, e); + false + } + } +} + pub struct VideoPlaylistManager { playlist_dir: PathBuf, playlist_generator: Addr, @@ -306,25 +344,43 @@ impl Handler for PlaylistGenerator { return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)); } + // Check if video is already h264 encoded + let is_h264 = is_h264_encoded(&video_file).await; + let use_copy = is_h264; + + if use_copy { + info!("Video {} is already h264, using stream copy", video_file); + span.add_event("Using stream copy (h264 detected)", vec![]); + } else { + info!("Video {} needs transcoding to h264", video_file); + span.add_event("Transcoding to h264", vec![]); + } + tokio::spawn(async move { - let ffmpeg_result = tokio::process::Command::new("ffmpeg") - .arg("-i") - .arg(&video_file) - .arg("-c:v") - .arg("h264") - .arg("-crf") - .arg("21") - .arg("-preset") - .arg("veryfast") - .arg("-hls_time") - .arg("3") - .arg("-hls_list_size") - .arg("100") - .arg("-vf") - .arg("scale=1080:-2,setsar=1:1") - .arg(playlist_file) - .stdout(Stdio::null()) - .stderr(Stdio::piped()) + let mut cmd = tokio::process::Command::new("ffmpeg"); + cmd.arg("-i").arg(&video_file); + + if use_copy { + // Video is already h264, just copy the stream + cmd.arg("-c:v").arg("copy"); + cmd.arg("-c:a").arg("aac"); // Still need to ensure audio is compatible + } else { + // Need to transcode + cmd.arg("-c:v").arg("h264"); + cmd.arg("-crf").arg("21"); + cmd.arg("-preset").arg("veryfast"); + cmd.arg("-vf").arg("scale=1080:-2,setsar=1:1"); + cmd.arg("-c:a").arg("aac"); + } + + // Common HLS settings + cmd.arg("-hls_time").arg("3"); + cmd.arg("-hls_list_size").arg("100"); + cmd.arg(&playlist_file); + cmd.stdout(Stdio::null()); + cmd.stderr(Stdio::piped()); + + let ffmpeg_result = cmd .output() .inspect_err(|e| error!("Failed to run ffmpeg on child process: {}", e)) .map_err(|e| std::io::Error::other(e.to_string())) From c9410c9c91452a9ac4fe31bf62e75208249ef2bd Mon Sep 17 00:00:00 2001 From: Cameron Date: Mon, 26 Jan 2026 10:55:15 -0500 Subject: [PATCH 6/6] Add rotation check to video transcoding logic Seems like vertical videos aren't preserving rotation logic on copy. --- src/video/actors.rs | 47 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/src/video/actors.rs b/src/video/actors.rs index 51721a9..a6c59e9 100644 --- a/src/video/actors.rs +++ b/src/video/actors.rs @@ -138,6 +138,36 @@ async fn is_h264_encoded(video_path: &str) -> bool { } } +/// Check if a video has rotation metadata +/// Returns the rotation angle in degrees (0, 90, 180, 270) or 0 if none detected +async fn get_video_rotation(video_path: &str) -> i32 { + let output = tokio::process::Command::new("ffprobe") + .arg("-v") + .arg("error") + .arg("-select_streams") + .arg("v:0") + .arg("-show_entries") + .arg("stream_tags=rotate") + .arg("-of") + .arg("default=noprint_wrappers=1:nokey=1") + .arg(video_path) + .output() + .await; + + match output { + Ok(output) if output.status.success() => { + let rotation_str = String::from_utf8_lossy(&output.stdout); + let rotation_str = rotation_str.trim(); + if rotation_str.is_empty() { + 0 + } else { + rotation_str.parse::().unwrap_or(0) + } + } + _ => 0, + } +} + pub struct VideoPlaylistManager { playlist_dir: PathBuf, playlist_generator: Addr, @@ -346,9 +376,19 @@ impl Handler for PlaylistGenerator { // Check if video is already h264 encoded let is_h264 = is_h264_encoded(&video_file).await; - let use_copy = is_h264; - if use_copy { + // Check for rotation metadata + let rotation = get_video_rotation(&video_file).await; + let has_rotation = rotation != 0; + + let use_copy = is_h264 && !has_rotation; + + if has_rotation { + info!("Video {} has rotation metadata ({}°), transcoding to apply rotation", video_file, rotation); + span.add_event("Transcoding due to rotation", vec![ + KeyValue::new("rotation_degrees", rotation as i64) + ]); + } else if use_copy { info!("Video {} is already h264, using stream copy", video_file); span.add_event("Using stream copy (h264 detected)", vec![]); } else { @@ -362,10 +402,11 @@ impl Handler for PlaylistGenerator { if use_copy { // Video is already h264, just copy the stream + // Note: rotation metadata will be preserved in the stream cmd.arg("-c:v").arg("copy"); cmd.arg("-c:a").arg("aac"); // Still need to ensure audio is compatible } else { - // Need to transcode + // Need to transcode - autorotate is enabled by default and will apply rotation cmd.arg("-c:v").arg("h264"); cmd.arg("-crf").arg("21"); cmd.arg("-preset").arg("veryfast");