Merge pull request 'feature/search-performance' (#47) from feature/search-performance into master
Reviewed-on: #47
This commit was merged in pull request #47.
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1797,7 +1797,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "image-api"
|
||||
version = "0.5.0"
|
||||
version = "0.5.1"
|
||||
dependencies = [
|
||||
"actix",
|
||||
"actix-cors",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "image-api"
|
||||
version = "0.5.0"
|
||||
version = "0.5.1"
|
||||
authors = ["Cameron Cordes <cameronc.dev@gmail.com>"]
|
||||
edition = "2024"
|
||||
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
-- Revert search performance optimization indexes
|
||||
|
||||
DROP INDEX IF EXISTS idx_image_exif_date_path;
|
||||
DROP INDEX IF EXISTS idx_tagged_photo_count;
|
||||
15
migrations/2026-01-18-000000_optimize_photo_search/up.sql
Normal file
15
migrations/2026-01-18-000000_optimize_photo_search/up.sql
Normal file
@@ -0,0 +1,15 @@
|
||||
-- Add composite indexes for search performance optimization
|
||||
-- This migration addresses N+1 query issues and enables database-level sorting
|
||||
|
||||
-- Covering index for date-sorted queries (supports ORDER BY + pagination)
|
||||
-- Enables efficient date-based sorting without loading all files into memory
|
||||
CREATE INDEX IF NOT EXISTS idx_image_exif_date_path
|
||||
ON image_exif(date_taken DESC, file_path);
|
||||
|
||||
-- Optimize batch tag count queries with GROUP BY
|
||||
-- Reduces N individual queries to a single batch query
|
||||
CREATE INDEX IF NOT EXISTS idx_tagged_photo_count
|
||||
ON tagged_photo(photo_name, tag_id);
|
||||
|
||||
-- Update query planner statistics to optimize query execution
|
||||
ANALYZE;
|
||||
@@ -101,6 +101,16 @@ impl FromRequest for Claims {
|
||||
pub struct PhotosResponse {
|
||||
pub photos: Vec<String>,
|
||||
pub dirs: Vec<String>,
|
||||
|
||||
// Pagination metadata (only present when limit is set)
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub total_count: Option<i64>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub has_more: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub next_offset: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Deserialize, PartialEq, Debug)]
|
||||
@@ -141,6 +151,10 @@ pub struct FilesRequest {
|
||||
|
||||
// Media type filtering
|
||||
pub media_type: Option<MediaType>,
|
||||
|
||||
// Pagination parameters (optional - backward compatible)
|
||||
pub limit: Option<i64>,
|
||||
pub offset: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Deserialize, PartialEq, Debug)]
|
||||
|
||||
@@ -296,6 +296,17 @@ pub trait ExifDao: Sync + Send {
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
) -> Result<Vec<String>, DbError>;
|
||||
|
||||
/// Get files sorted by date with optional pagination
|
||||
/// Returns (sorted_file_paths, total_count)
|
||||
fn get_files_sorted_by_date(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
file_paths: &[String],
|
||||
ascending: bool,
|
||||
limit: Option<i64>,
|
||||
offset: i64,
|
||||
) -> Result<(Vec<String>, i64), DbError>;
|
||||
}
|
||||
|
||||
pub struct SqliteExifDao {
|
||||
@@ -581,4 +592,65 @@ impl ExifDao for SqliteExifDao {
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
|
||||
fn get_files_sorted_by_date(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
file_paths: &[String],
|
||||
ascending: bool,
|
||||
limit: Option<i64>,
|
||||
offset: i64,
|
||||
) -> Result<(Vec<String>, i64), DbError> {
|
||||
trace_db_call(context, "query", "get_files_sorted_by_date", |span| {
|
||||
use diesel::dsl::count_star;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::trace::Span;
|
||||
use schema::image_exif::dsl::*;
|
||||
|
||||
span.set_attributes(vec![
|
||||
KeyValue::new("file_count", file_paths.len() as i64),
|
||||
KeyValue::new("ascending", ascending.to_string()),
|
||||
KeyValue::new("limit", limit.map(|l| l.to_string()).unwrap_or_default()),
|
||||
KeyValue::new("offset", offset.to_string()),
|
||||
]);
|
||||
|
||||
if file_paths.is_empty() {
|
||||
return Ok((Vec::new(), 0));
|
||||
}
|
||||
|
||||
let connection = &mut *self.connection.lock().unwrap();
|
||||
|
||||
// Get total count of files that have EXIF data
|
||||
let total_count: i64 = image_exif
|
||||
.filter(file_path.eq_any(file_paths))
|
||||
.select(count_star())
|
||||
.first(connection)
|
||||
.map_err(|_| anyhow::anyhow!("Count query error"))?;
|
||||
|
||||
// Build sorted query
|
||||
let mut query = image_exif.filter(file_path.eq_any(file_paths)).into_boxed();
|
||||
|
||||
// Apply sorting
|
||||
// Note: SQLite NULL handling varies - NULLs appear first for ASC, last for DESC by default
|
||||
if ascending {
|
||||
query = query.order(date_taken.asc());
|
||||
} else {
|
||||
query = query.order(date_taken.desc());
|
||||
}
|
||||
|
||||
// Apply pagination if requested
|
||||
if let Some(limit_val) = limit {
|
||||
query = query.limit(limit_val).offset(offset);
|
||||
}
|
||||
|
||||
// Execute and extract file paths
|
||||
let results: Vec<String> = query
|
||||
.select(file_path)
|
||||
.load::<String>(connection)
|
||||
.map_err(|_| anyhow::anyhow!("Query error"))?;
|
||||
|
||||
Ok((results, total_count))
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
}
|
||||
|
||||
428
src/files.rs
428
src/files.rs
@@ -47,69 +47,142 @@ use serde::Deserialize;
|
||||
|
||||
/// Apply sorting to files with EXIF data support for date-based sorting
|
||||
/// Handles both date sorting (with EXIF/filename fallback) and regular sorting
|
||||
/// Returns (sorted_file_paths, total_count)
|
||||
fn apply_sorting_with_exif(
|
||||
files: Vec<FileWithTagCount>,
|
||||
sort_type: SortType,
|
||||
exif_dao: &mut Box<dyn ExifDao>,
|
||||
span_context: &opentelemetry::Context,
|
||||
base_path: &Path,
|
||||
) -> Vec<String> {
|
||||
limit: Option<i64>,
|
||||
offset: i64,
|
||||
) -> (Vec<String>, i64) {
|
||||
let total_count = files.len() as i64;
|
||||
|
||||
match sort_type {
|
||||
SortType::DateTakenAsc | SortType::DateTakenDesc => {
|
||||
info!("Date sorting requested, fetching EXIF data");
|
||||
info!("Date sorting requested, using database-level sorting");
|
||||
|
||||
// Collect file paths for batch EXIF query
|
||||
let file_paths: Vec<String> = files.iter().map(|f| f.file_name.clone()).collect();
|
||||
|
||||
// Batch fetch EXIF data
|
||||
let exif_map: std::collections::HashMap<String, i64> = exif_dao
|
||||
.get_exif_batch(span_context, &file_paths)
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.filter_map(|exif| exif.date_taken.map(|dt| (exif.file_path, dt)))
|
||||
.collect();
|
||||
|
||||
// Convert to FileWithMetadata with date fallback logic
|
||||
let files_with_metadata: Vec<FileWithMetadata> = files
|
||||
.into_iter()
|
||||
.map(|f| {
|
||||
// Try EXIF date first
|
||||
let date_taken = exif_map
|
||||
.get(&f.file_name)
|
||||
.copied()
|
||||
.or_else(|| {
|
||||
// Fallback to filename extraction
|
||||
extract_date_from_filename(&f.file_name).map(|dt| dt.timestamp())
|
||||
})
|
||||
.or_else(|| {
|
||||
// Fallback to filesystem metadata creation date
|
||||
let full_path = base_path.join(&f.file_name);
|
||||
std::fs::metadata(full_path)
|
||||
.and_then(|md| md.created().or(md.modified()))
|
||||
.ok()
|
||||
.map(|system_time| {
|
||||
<SystemTime as Into<DateTime<Utc>>>::into(system_time)
|
||||
.timestamp()
|
||||
})
|
||||
});
|
||||
|
||||
FileWithMetadata {
|
||||
file_name: f.file_name,
|
||||
tag_count: f.tag_count,
|
||||
date_taken,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
sort_with_metadata(files_with_metadata, sort_type)
|
||||
// Try database-level sorting first (most efficient)
|
||||
let ascending = sort_type == SortType::DateTakenAsc;
|
||||
match exif_dao.get_files_sorted_by_date(
|
||||
span_context,
|
||||
&file_paths,
|
||||
ascending,
|
||||
limit,
|
||||
offset,
|
||||
) {
|
||||
Ok((sorted_files, db_total)) => {
|
||||
info!(
|
||||
"Database-level date sorting succeeded, returned {} files",
|
||||
sorted_files.len()
|
||||
);
|
||||
(sorted_files, db_total)
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Database-level sorting failed: {:?}, falling back to in-memory sort",
|
||||
e
|
||||
);
|
||||
// Fallback to in-memory sorting with date extraction
|
||||
let (sorted, _) = in_memory_date_sort(
|
||||
files,
|
||||
sort_type,
|
||||
exif_dao,
|
||||
span_context,
|
||||
base_path,
|
||||
limit,
|
||||
offset,
|
||||
);
|
||||
(sorted, total_count)
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// Use regular sort for non-date sorting
|
||||
sort(files, sort_type)
|
||||
let sorted = sort(files, sort_type);
|
||||
let result = if let Some(limit_val) = limit {
|
||||
sorted
|
||||
.into_iter()
|
||||
.skip(offset as usize)
|
||||
.take(limit_val as usize)
|
||||
.collect()
|
||||
} else {
|
||||
sorted
|
||||
};
|
||||
(result, total_count)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Fallback in-memory date sorting with EXIF/filename extraction
|
||||
fn in_memory_date_sort(
|
||||
files: Vec<FileWithTagCount>,
|
||||
sort_type: SortType,
|
||||
exif_dao: &mut Box<dyn ExifDao>,
|
||||
span_context: &opentelemetry::Context,
|
||||
base_path: &Path,
|
||||
limit: Option<i64>,
|
||||
offset: i64,
|
||||
) -> (Vec<String>, i64) {
|
||||
let total_count = files.len() as i64;
|
||||
let file_paths: Vec<String> = files.iter().map(|f| f.file_name.clone()).collect();
|
||||
|
||||
// Batch fetch EXIF data
|
||||
let exif_map: std::collections::HashMap<String, i64> = exif_dao
|
||||
.get_exif_batch(span_context, &file_paths)
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.filter_map(|exif| exif.date_taken.map(|dt| (exif.file_path, dt)))
|
||||
.collect();
|
||||
|
||||
// Convert to FileWithMetadata with date fallback logic
|
||||
let files_with_metadata: Vec<FileWithMetadata> = files
|
||||
.into_iter()
|
||||
.map(|f| {
|
||||
// Try EXIF date first
|
||||
let date_taken = exif_map
|
||||
.get(&f.file_name)
|
||||
.copied()
|
||||
.or_else(|| {
|
||||
// Fallback to filename extraction
|
||||
extract_date_from_filename(&f.file_name).map(|dt| dt.timestamp())
|
||||
})
|
||||
.or_else(|| {
|
||||
// Fallback to filesystem metadata creation date
|
||||
let full_path = base_path.join(&f.file_name);
|
||||
std::fs::metadata(full_path)
|
||||
.and_then(|md| md.created().or(md.modified()))
|
||||
.ok()
|
||||
.map(|system_time| {
|
||||
<SystemTime as Into<DateTime<Utc>>>::into(system_time).timestamp()
|
||||
})
|
||||
});
|
||||
|
||||
FileWithMetadata {
|
||||
file_name: f.file_name,
|
||||
tag_count: f.tag_count,
|
||||
date_taken,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let sorted = sort_with_metadata(files_with_metadata, sort_type);
|
||||
let result = if let Some(limit_val) = limit {
|
||||
sorted
|
||||
.into_iter()
|
||||
.skip(offset as usize)
|
||||
.take(limit_val as usize)
|
||||
.collect()
|
||||
} else {
|
||||
sorted
|
||||
};
|
||||
(result, total_count)
|
||||
}
|
||||
|
||||
pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
||||
_: Claims,
|
||||
request: HttpRequest,
|
||||
@@ -171,6 +244,23 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
||||
.map(|mt| format!("{:?}", mt))
|
||||
.unwrap_or_default(),
|
||||
),
|
||||
// Pagination parameters
|
||||
KeyValue::new("pagination.enabled", req.limit.is_some().to_string()),
|
||||
KeyValue::new(
|
||||
"pagination.limit",
|
||||
req.limit.map(|l| l.to_string()).unwrap_or_default(),
|
||||
),
|
||||
KeyValue::new("pagination.offset", req.offset.unwrap_or(0).to_string()),
|
||||
// Optimization flags
|
||||
KeyValue::new("optimization.batch_tags", "true"),
|
||||
KeyValue::new(
|
||||
"optimization.db_sort",
|
||||
matches!(
|
||||
req.sort,
|
||||
Some(SortType::DateTakenAsc | SortType::DateTakenDesc)
|
||||
)
|
||||
.to_string(),
|
||||
),
|
||||
]);
|
||||
|
||||
let span_context = opentelemetry::Context::current_with_span(span);
|
||||
@@ -332,8 +422,10 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
||||
.collect::<Vec<FileWithTagCount>>()
|
||||
})
|
||||
.map(|files| {
|
||||
// Handle sorting - use helper function that supports EXIF date sorting
|
||||
// Handle sorting - use helper function that supports EXIF date sorting and pagination
|
||||
let sort_type = req.sort.unwrap_or(NameAsc);
|
||||
let limit = req.limit;
|
||||
let offset = req.offset.unwrap_or(0);
|
||||
let mut exif_dao_guard = exif_dao.lock().expect("Unable to get ExifDao");
|
||||
let result = apply_sorting_with_exif(
|
||||
files,
|
||||
@@ -341,25 +433,50 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
||||
&mut exif_dao_guard,
|
||||
&span_context,
|
||||
app_state.base_path.as_ref(),
|
||||
limit,
|
||||
offset,
|
||||
);
|
||||
drop(exif_dao_guard);
|
||||
result
|
||||
})
|
||||
.inspect(|files| debug!("Found {:?} files", files.len()))
|
||||
.map(|tagged_files: Vec<String>| {
|
||||
.inspect(|(files, total)| debug!("Found {:?} files (total: {})", files.len(), total))
|
||||
.map(|(tagged_files, total_count)| {
|
||||
info!(
|
||||
"Found {:?} tagged files: {:?}",
|
||||
tagged_files.len(),
|
||||
tagged_files
|
||||
);
|
||||
|
||||
let returned_count = tagged_files.len() as i64;
|
||||
let offset = req.offset.unwrap_or(0);
|
||||
let pagination_metadata = if req.limit.is_some() {
|
||||
(
|
||||
Some(total_count),
|
||||
Some(offset + returned_count < total_count),
|
||||
if offset + returned_count < total_count {
|
||||
Some(offset + returned_count)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
)
|
||||
} else {
|
||||
(None, None, None)
|
||||
};
|
||||
|
||||
span_context
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("file_count", tagged_files.len().to_string()));
|
||||
span_context
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("total_count", total_count.to_string()));
|
||||
span_context.span().set_status(Status::Ok);
|
||||
|
||||
HttpResponse::Ok().json(PhotosResponse {
|
||||
photos: tagged_files,
|
||||
dirs: vec![],
|
||||
total_count: pagination_metadata.0,
|
||||
has_more: pagination_metadata.1,
|
||||
next_offset: pagination_metadata.2,
|
||||
})
|
||||
})
|
||||
.into_http_internal_err()
|
||||
@@ -393,38 +510,99 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
||||
);
|
||||
|
||||
info!("Starting to filter {} files from filesystem", files.len());
|
||||
let start_filter = std::time::Instant::now();
|
||||
|
||||
let photos = files
|
||||
.iter()
|
||||
.filter(|&f| {
|
||||
f.metadata().map_or_else(
|
||||
|e| {
|
||||
error!("Failed getting file metadata: {:?}", e);
|
||||
f.extension().is_some()
|
||||
},
|
||||
|md| md.is_file(),
|
||||
)
|
||||
})
|
||||
.map(|path: &PathBuf| {
|
||||
let relative = path.strip_prefix(&app_state.base_path).unwrap_or_else(|_| {
|
||||
panic!(
|
||||
"Unable to strip base path {} from file path {}",
|
||||
&app_state.base_path.path(),
|
||||
path.display()
|
||||
)
|
||||
// Separate files and directories in a single pass to avoid redundant metadata calls
|
||||
let (file_names, dirs): (Vec<String>, Vec<String>) =
|
||||
files
|
||||
.iter()
|
||||
.fold((Vec::new(), Vec::new()), |(mut files, mut dirs), path| {
|
||||
match path.metadata() {
|
||||
Ok(md) => {
|
||||
let relative =
|
||||
path.strip_prefix(&app_state.base_path).unwrap_or_else(|_| {
|
||||
panic!(
|
||||
"Unable to strip base path {} from file path {}",
|
||||
&app_state.base_path.path(),
|
||||
path.display()
|
||||
)
|
||||
});
|
||||
let relative_str = relative.to_str().unwrap().to_string();
|
||||
|
||||
if md.is_file() {
|
||||
files.push(relative_str);
|
||||
} else if md.is_dir() {
|
||||
dirs.push(relative_str);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed getting file metadata: {:?}", e);
|
||||
// Include files without metadata if they have extensions
|
||||
if path.extension().is_some() {
|
||||
let relative = path
|
||||
.strip_prefix(&app_state.base_path)
|
||||
.unwrap_or_else(|_| {
|
||||
panic!(
|
||||
"Unable to strip base path {} from file path {}",
|
||||
&app_state.base_path.path(),
|
||||
path.display()
|
||||
)
|
||||
});
|
||||
files.push(relative.to_str().unwrap().to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
(files, dirs)
|
||||
});
|
||||
relative.to_path_buf()
|
||||
})
|
||||
.map(|f| f.to_str().unwrap().to_string())
|
||||
.map(|file_name| {
|
||||
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
|
||||
let file_tags = tag_dao
|
||||
.get_tags_for_path(&span_context, &file_name)
|
||||
.unwrap_or_default();
|
||||
|
||||
info!(
|
||||
"File filtering took {:?}, now fetching tag counts for {} files",
|
||||
start_filter.elapsed(),
|
||||
file_names.len()
|
||||
);
|
||||
let start_tags = std::time::Instant::now();
|
||||
|
||||
// Batch query for tag counts to avoid N+1 queries
|
||||
let tag_counts = {
|
||||
let mut tag_dao_guard = tag_dao.lock().expect("Unable to get TagDao");
|
||||
tag_dao_guard
|
||||
.get_tag_counts_batch(&span_context, &file_names)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
info!("Batch tag count query took {:?}", start_tags.elapsed());
|
||||
|
||||
// Also get full tag lists for files that need tag filtering
|
||||
let start_tag_filter = std::time::Instant::now();
|
||||
let file_tags_map: std::collections::HashMap<String, Vec<crate::tags::Tag>> =
|
||||
if req.tag_ids.is_some() || req.exclude_tag_ids.is_some() {
|
||||
info!(
|
||||
"Tag filtering requested, fetching full tag lists for {} files",
|
||||
file_names.len()
|
||||
);
|
||||
let mut tag_dao_guard = tag_dao.lock().expect("Unable to get TagDao");
|
||||
file_names
|
||||
.iter()
|
||||
.filter_map(|file_name| {
|
||||
tag_dao_guard
|
||||
.get_tags_for_path(&span_context, file_name)
|
||||
.ok()
|
||||
.map(|tags| (file_name.clone(), tags))
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
std::collections::HashMap::new()
|
||||
};
|
||||
if req.tag_ids.is_some() || req.exclude_tag_ids.is_some() {
|
||||
info!("Full tag list fetch took {:?}", start_tag_filter.elapsed());
|
||||
}
|
||||
|
||||
let photos = file_names
|
||||
.into_iter()
|
||||
.map(|file_name| {
|
||||
let file_tags = file_tags_map.get(&file_name).cloned().unwrap_or_default();
|
||||
(file_name, file_tags)
|
||||
})
|
||||
.filter(|(_, file_tags)| {
|
||||
.filter(|(_, file_tags): &(String, Vec<crate::tags::Tag>)| {
|
||||
if let Some(tag_ids) = &req.tag_ids {
|
||||
let tag_ids = tag_ids
|
||||
.split(',')
|
||||
@@ -472,16 +650,28 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
||||
true
|
||||
}
|
||||
})
|
||||
.map(|(file_name, tags)| FileWithTagCount {
|
||||
file_name,
|
||||
tag_count: tags.len() as i64,
|
||||
})
|
||||
.map(
|
||||
|(file_name, _tags): (String, Vec<crate::tags::Tag>)| FileWithTagCount {
|
||||
file_name: file_name.clone(),
|
||||
tag_count: *tag_counts.get(&file_name).unwrap_or(&0),
|
||||
},
|
||||
)
|
||||
.collect::<Vec<FileWithTagCount>>();
|
||||
|
||||
info!("After all filters, {} files remain", photos.len());
|
||||
info!(
|
||||
"After all filters, {} files remain (filtering took {:?})",
|
||||
photos.len(),
|
||||
start_filter.elapsed()
|
||||
);
|
||||
|
||||
// Handle sorting - use helper function that supports EXIF date sorting
|
||||
let response_files = if let Some(sort_type) = req.sort {
|
||||
// Extract pagination parameters
|
||||
let limit = req.limit;
|
||||
let offset = req.offset.unwrap_or(0);
|
||||
let start_sort = std::time::Instant::now();
|
||||
|
||||
// Handle sorting - use helper function that supports EXIF date sorting and pagination
|
||||
let (response_files, total_count) = if let Some(sort_type) = req.sort {
|
||||
info!("Sorting {} files by {:?}", photos.len(), sort_type);
|
||||
let mut exif_dao_guard = exif_dao.lock().expect("Unable to get ExifDao");
|
||||
let result = apply_sorting_with_exif(
|
||||
photos,
|
||||
@@ -489,35 +679,68 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
||||
&mut exif_dao_guard,
|
||||
&span_context,
|
||||
app_state.base_path.as_ref(),
|
||||
limit,
|
||||
offset,
|
||||
);
|
||||
drop(exif_dao_guard);
|
||||
result
|
||||
} else {
|
||||
// No sorting requested
|
||||
photos
|
||||
.into_iter()
|
||||
.map(|f| f.file_name)
|
||||
.collect::<Vec<String>>()
|
||||
// No sorting requested - apply pagination if requested
|
||||
let total = photos.len() as i64;
|
||||
let files: Vec<String> = if let Some(limit_val) = limit {
|
||||
photos
|
||||
.into_iter()
|
||||
.skip(offset as usize)
|
||||
.take(limit_val as usize)
|
||||
.map(|f| f.file_name)
|
||||
.collect()
|
||||
} else {
|
||||
photos.into_iter().map(|f| f.file_name).collect()
|
||||
};
|
||||
(files, total)
|
||||
};
|
||||
info!(
|
||||
"Sorting took {:?}, returned {} files (total: {})",
|
||||
start_sort.elapsed(),
|
||||
response_files.len(),
|
||||
total_count
|
||||
);
|
||||
|
||||
let dirs = files
|
||||
.iter()
|
||||
.filter(|&f| f.metadata().is_ok_and(|md| md.is_dir()))
|
||||
.map(|path: &PathBuf| {
|
||||
let relative = path.strip_prefix(&app_state.base_path).unwrap();
|
||||
relative.to_path_buf()
|
||||
})
|
||||
.map(|f| f.to_str().unwrap().to_string())
|
||||
.collect::<Vec<String>>();
|
||||
// Note: dirs were already collected during file filtering to avoid redundant metadata calls
|
||||
|
||||
// Calculate pagination metadata
|
||||
let returned_count = response_files.len() as i64;
|
||||
let pagination_metadata = if limit.is_some() {
|
||||
(
|
||||
Some(total_count),
|
||||
Some(offset + returned_count < total_count),
|
||||
if offset + returned_count < total_count {
|
||||
Some(offset + returned_count)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
)
|
||||
} else {
|
||||
(None, None, None)
|
||||
};
|
||||
|
||||
span_context
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("file_count", files.len().to_string()));
|
||||
span_context
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("returned_count", returned_count.to_string()));
|
||||
span_context
|
||||
.span()
|
||||
.set_attribute(KeyValue::new("total_count", total_count.to_string()));
|
||||
span_context.span().set_status(Status::Ok);
|
||||
|
||||
HttpResponse::Ok().json(PhotosResponse {
|
||||
photos: response_files,
|
||||
dirs,
|
||||
total_count: pagination_metadata.0,
|
||||
has_more: pagination_metadata.1,
|
||||
next_offset: pagination_metadata.2,
|
||||
})
|
||||
}
|
||||
_ => {
|
||||
@@ -902,7 +1125,7 @@ mod tests {
|
||||
&mut self,
|
||||
_context: &opentelemetry::Context,
|
||||
data: crate::database::models::InsertImageExif,
|
||||
) -> Result<crate::database::models::ImageExif, crate::database::DbError> {
|
||||
) -> Result<crate::database::models::ImageExif, DbError> {
|
||||
// Return a dummy ImageExif for tests
|
||||
Ok(crate::database::models::ImageExif {
|
||||
id: 1,
|
||||
@@ -1020,6 +1243,19 @@ mod tests {
|
||||
) -> Result<Vec<String>, DbError> {
|
||||
Ok(Vec::new())
|
||||
}
|
||||
|
||||
fn get_files_sorted_by_date(
|
||||
&mut self,
|
||||
_context: &opentelemetry::Context,
|
||||
file_paths: &[String],
|
||||
_ascending: bool,
|
||||
_limit: Option<i64>,
|
||||
_offset: i64,
|
||||
) -> Result<(Vec<String>, i64), DbError> {
|
||||
// For tests, just return all files unsorted
|
||||
let count = file_paths.len() as i64;
|
||||
Ok((file_paths.to_vec(), count))
|
||||
}
|
||||
}
|
||||
|
||||
mod api {
|
||||
|
||||
207
src/main.rs
207
src/main.rs
@@ -2,6 +2,7 @@
|
||||
extern crate diesel;
|
||||
extern crate rayon;
|
||||
|
||||
use actix::Addr;
|
||||
use actix_web::web::Data;
|
||||
use actix_web_prom::PrometheusMetricsBuilder;
|
||||
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
|
||||
@@ -45,7 +46,8 @@ use crate::service::ServiceBuilder;
|
||||
use crate::state::AppState;
|
||||
use crate::tags::*;
|
||||
use crate::video::actors::{
|
||||
ProcessMessage, ScanDirectoryMessage, create_playlist, generate_video_thumbnail,
|
||||
ProcessMessage, QueueVideosMessage, ScanDirectoryMessage, VideoPlaylistManager,
|
||||
create_playlist, generate_video_thumbnail,
|
||||
};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
|
||||
@@ -527,6 +529,9 @@ async fn favorites(
|
||||
HttpResponse::Ok().json(PhotosResponse {
|
||||
photos: favorites,
|
||||
dirs: Vec::new(),
|
||||
total_count: None,
|
||||
has_more: None,
|
||||
next_offset: None,
|
||||
})
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
@@ -711,8 +716,6 @@ fn main() -> std::io::Result<()> {
|
||||
|
||||
run_migrations(&mut connect()).expect("Failed to run migrations");
|
||||
|
||||
watch_files();
|
||||
|
||||
let system = actix::System::new();
|
||||
system.block_on(async {
|
||||
// Just use basic logger when running a non-release build
|
||||
@@ -751,6 +754,13 @@ fn main() -> std::io::Result<()> {
|
||||
directory: app_state.base_path.clone(),
|
||||
});
|
||||
|
||||
// Start file watcher with playlist manager
|
||||
let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone();
|
||||
watch_files(playlist_mgr_for_watcher);
|
||||
|
||||
// Start orphaned playlist cleanup job
|
||||
cleanup_orphaned_playlists();
|
||||
|
||||
// Spawn background job to generate daily conversation summaries
|
||||
{
|
||||
use crate::ai::generate_daily_summaries;
|
||||
@@ -893,8 +903,132 @@ fn run_migrations(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn watch_files() {
|
||||
/// Clean up orphaned HLS playlists and segments whose source videos no longer exist
|
||||
fn cleanup_orphaned_playlists() {
|
||||
std::thread::spawn(|| {
|
||||
let video_path = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
|
||||
let base_path = dotenv::var("BASE_PATH").expect("BASE_PATH must be set");
|
||||
|
||||
// Get cleanup interval from environment (default: 24 hours)
|
||||
let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse::<u64>().ok())
|
||||
.unwrap_or(86400); // 24 hours
|
||||
|
||||
info!("Starting orphaned playlist cleanup job");
|
||||
info!(" Cleanup interval: {} seconds", cleanup_interval_secs);
|
||||
info!(" Playlist directory: {}", video_path);
|
||||
|
||||
loop {
|
||||
std::thread::sleep(Duration::from_secs(cleanup_interval_secs));
|
||||
|
||||
info!("Running orphaned playlist cleanup");
|
||||
let start = std::time::Instant::now();
|
||||
let mut deleted_count = 0;
|
||||
let mut error_count = 0;
|
||||
|
||||
// Find all .m3u8 files in VIDEO_PATH
|
||||
let playlists: Vec<PathBuf> = WalkDir::new(&video_path)
|
||||
.into_iter()
|
||||
.filter_map(|e| e.ok())
|
||||
.filter(|e| e.file_type().is_file())
|
||||
.filter(|e| {
|
||||
e.path()
|
||||
.extension()
|
||||
.and_then(|s| s.to_str())
|
||||
.map(|ext| ext.eq_ignore_ascii_case("m3u8"))
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.map(|e| e.path().to_path_buf())
|
||||
.collect();
|
||||
|
||||
info!("Found {} playlist files to check", playlists.len());
|
||||
|
||||
for playlist_path in playlists {
|
||||
// Extract the original video filename from playlist name
|
||||
// Playlist format: {VIDEO_PATH}/{original_filename}.m3u8
|
||||
if let Some(filename) = playlist_path.file_stem() {
|
||||
let video_filename = filename.to_string_lossy();
|
||||
|
||||
// Search for this video file in BASE_PATH
|
||||
let mut video_exists = false;
|
||||
for entry in WalkDir::new(&base_path)
|
||||
.into_iter()
|
||||
.filter_map(|e| e.ok())
|
||||
.filter(|e| e.file_type().is_file())
|
||||
{
|
||||
if let Some(entry_stem) = entry.path().file_stem() {
|
||||
if entry_stem == filename && is_video_file(entry.path()) {
|
||||
video_exists = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !video_exists {
|
||||
debug!(
|
||||
"Source video for playlist {} no longer exists, deleting",
|
||||
playlist_path.display()
|
||||
);
|
||||
|
||||
// Delete the playlist file
|
||||
if let Err(e) = std::fs::remove_file(&playlist_path) {
|
||||
warn!("Failed to delete playlist {}: {}", playlist_path.display(), e);
|
||||
error_count += 1;
|
||||
} else {
|
||||
deleted_count += 1;
|
||||
|
||||
// Also try to delete associated .ts segment files
|
||||
// They are typically named {filename}N.ts in the same directory
|
||||
if let Some(parent_dir) = playlist_path.parent() {
|
||||
for entry in WalkDir::new(parent_dir)
|
||||
.max_depth(1)
|
||||
.into_iter()
|
||||
.filter_map(|e| e.ok())
|
||||
.filter(|e| e.file_type().is_file())
|
||||
{
|
||||
let entry_path = entry.path();
|
||||
if let Some(ext) = entry_path.extension() {
|
||||
if ext.eq_ignore_ascii_case("ts") {
|
||||
// Check if this .ts file belongs to our playlist
|
||||
if let Some(ts_stem) = entry_path.file_stem() {
|
||||
let ts_name = ts_stem.to_string_lossy();
|
||||
if ts_name.starts_with(&*video_filename) {
|
||||
if let Err(e) = std::fs::remove_file(entry_path) {
|
||||
debug!(
|
||||
"Failed to delete segment {}: {}",
|
||||
entry_path.display(),
|
||||
e
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
"Deleted segment: {}",
|
||||
entry_path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Orphaned playlist cleanup completed in {:?}: deleted {} playlists, {} errors",
|
||||
start.elapsed(),
|
||||
deleted_count,
|
||||
error_count
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn watch_files(playlist_manager: Addr<VideoPlaylistManager>) {
|
||||
std::thread::spawn(move || {
|
||||
let base_str = dotenv::var("BASE_PATH").unwrap();
|
||||
let base_path = PathBuf::from(&base_str);
|
||||
|
||||
@@ -937,7 +1071,7 @@ fn watch_files() {
|
||||
|
||||
if is_full_scan {
|
||||
info!("Running full scan (scan #{})", scan_count);
|
||||
process_new_files(&base_path, Arc::clone(&exif_dao), None);
|
||||
process_new_files(&base_path, Arc::clone(&exif_dao), None, playlist_manager.clone());
|
||||
last_full_scan = now;
|
||||
} else {
|
||||
debug!(
|
||||
@@ -948,7 +1082,7 @@ fn watch_files() {
|
||||
let check_since = last_quick_scan
|
||||
.checked_sub(Duration::from_secs(10))
|
||||
.unwrap_or(last_quick_scan);
|
||||
process_new_files(&base_path, Arc::clone(&exif_dao), Some(check_since));
|
||||
process_new_files(&base_path, Arc::clone(&exif_dao), Some(check_since), playlist_manager.clone());
|
||||
}
|
||||
|
||||
last_quick_scan = now;
|
||||
@@ -960,10 +1094,36 @@ fn watch_files() {
|
||||
});
|
||||
}
|
||||
|
||||
/// Check if a playlist needs to be (re)generated
|
||||
/// Returns true if:
|
||||
/// - Playlist doesn't exist, OR
|
||||
/// - Source video is newer than the playlist
|
||||
fn playlist_needs_generation(video_path: &Path, playlist_path: &Path) -> bool {
|
||||
if !playlist_path.exists() {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check if source video is newer than playlist
|
||||
if let (Ok(video_meta), Ok(playlist_meta)) = (
|
||||
std::fs::metadata(video_path),
|
||||
std::fs::metadata(playlist_path),
|
||||
) {
|
||||
if let (Ok(video_modified), Ok(playlist_modified)) =
|
||||
(video_meta.modified(), playlist_meta.modified())
|
||||
{
|
||||
return video_modified > playlist_modified;
|
||||
}
|
||||
}
|
||||
|
||||
// If we can't determine, assume it needs generation
|
||||
true
|
||||
}
|
||||
|
||||
fn process_new_files(
|
||||
base_path: &Path,
|
||||
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
|
||||
modified_since: Option<SystemTime>,
|
||||
playlist_manager: Addr<VideoPlaylistManager>,
|
||||
) {
|
||||
let context = opentelemetry::Context::new();
|
||||
let thumbs = dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
|
||||
@@ -1027,14 +1187,14 @@ fn process_new_files(
|
||||
let mut files_needing_exif = Vec::new();
|
||||
|
||||
// Check each file for missing thumbnail or EXIF data
|
||||
for (file_path, relative_path) in files {
|
||||
for (file_path, relative_path) in &files {
|
||||
// Check if thumbnail exists
|
||||
let thumb_path = thumbnail_directory.join(&relative_path);
|
||||
let thumb_path = thumbnail_directory.join(relative_path);
|
||||
let needs_thumbnail = !thumb_path.exists();
|
||||
|
||||
// Check if EXIF data exists (for supported files)
|
||||
let needs_exif = if exif::supports_exif(&file_path) {
|
||||
!existing_exif_paths.contains_key(&relative_path)
|
||||
!existing_exif_paths.contains_key(relative_path)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
@@ -1047,7 +1207,7 @@ fn process_new_files(
|
||||
}
|
||||
|
||||
if needs_exif {
|
||||
files_needing_exif.push((file_path, relative_path));
|
||||
files_needing_exif.push((file_path.clone(), relative_path.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1101,6 +1261,33 @@ fn process_new_files(
|
||||
}
|
||||
}
|
||||
|
||||
// Check for videos that need HLS playlists
|
||||
let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
|
||||
let mut videos_needing_playlists = Vec::new();
|
||||
|
||||
for (file_path, _relative_path) in &files {
|
||||
if is_video_file(&file_path) {
|
||||
// Construct expected playlist path
|
||||
let playlist_filename = format!(
|
||||
"{}.m3u8",
|
||||
file_path.file_name().unwrap().to_string_lossy()
|
||||
);
|
||||
let playlist_path = Path::new(&video_path_base).join(&playlist_filename);
|
||||
|
||||
// Check if playlist needs (re)generation
|
||||
if playlist_needs_generation(&file_path, &playlist_path) {
|
||||
videos_needing_playlists.push(file_path.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send queue request to playlist manager
|
||||
if !videos_needing_playlists.is_empty() {
|
||||
playlist_manager.do_send(QueueVideosMessage {
|
||||
video_paths: videos_needing_playlists,
|
||||
});
|
||||
}
|
||||
|
||||
// Generate thumbnails for all files that need them
|
||||
if new_files_found {
|
||||
info!("Processing thumbnails for new files...");
|
||||
|
||||
79
src/tags.rs
79
src/tags.rs
@@ -322,6 +322,11 @@ pub trait TagDao {
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
) -> anyhow::Result<Vec<String>>;
|
||||
fn get_tag_counts_batch(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
file_paths: &[String],
|
||||
) -> anyhow::Result<std::collections::HashMap<String, i64>>;
|
||||
}
|
||||
|
||||
pub struct SqliteTagDao {
|
||||
@@ -642,6 +647,65 @@ impl TagDao for SqliteTagDao {
|
||||
.load(&mut self.connection)
|
||||
.with_context(|| "Unable to get photo names")
|
||||
}
|
||||
|
||||
fn get_tag_counts_batch(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
file_paths: &[String],
|
||||
) -> anyhow::Result<std::collections::HashMap<String, i64>> {
|
||||
use std::collections::HashMap;
|
||||
|
||||
trace_db_call(context, "query", "get_tag_counts_batch", |span| {
|
||||
span.set_attribute(KeyValue::new("file_count", file_paths.len() as i64));
|
||||
|
||||
if file_paths.is_empty() {
|
||||
return Ok(HashMap::new());
|
||||
}
|
||||
|
||||
use diesel::dsl::*;
|
||||
use diesel::sql_types::*;
|
||||
|
||||
// Build dynamic query with placeholders
|
||||
let placeholders = std::iter::repeat_n("?", file_paths.len())
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
let query_str = format!(
|
||||
r#"
|
||||
SELECT photo_name, COUNT(DISTINCT tag_id) as tag_count
|
||||
FROM tagged_photo
|
||||
WHERE photo_name IN ({})
|
||||
GROUP BY photo_name
|
||||
"#,
|
||||
placeholders
|
||||
);
|
||||
|
||||
let mut query = sql_query(query_str).into_boxed();
|
||||
|
||||
// Bind all file paths
|
||||
for path in file_paths {
|
||||
query = query.bind::<Text, _>(path);
|
||||
}
|
||||
|
||||
#[derive(QueryableByName, Debug)]
|
||||
struct TagCountRow {
|
||||
#[diesel(sql_type = Text)]
|
||||
photo_name: String,
|
||||
#[diesel(sql_type = BigInt)]
|
||||
tag_count: i64,
|
||||
}
|
||||
|
||||
// Execute query and convert to HashMap
|
||||
query
|
||||
.load::<TagCountRow>(&mut self.connection)
|
||||
.with_context(|| "Unable to get batch tag counts")
|
||||
.map(|rows| {
|
||||
rows.into_iter()
|
||||
.map(|row| (row.photo_name, row.tag_count))
|
||||
.collect::<HashMap<String, i64>>()
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -818,6 +882,21 @@ mod tests {
|
||||
) -> anyhow::Result<Vec<String>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_tag_counts_batch(
|
||||
&mut self,
|
||||
_context: &opentelemetry::Context,
|
||||
file_paths: &[String],
|
||||
) -> anyhow::Result<std::collections::HashMap<String, i64>> {
|
||||
use std::collections::HashMap;
|
||||
let mut counts = HashMap::new();
|
||||
for path in file_paths {
|
||||
if let Some(tags) = self.tagged_photos.borrow().get(path) {
|
||||
counts.insert(path.clone(), tags.len() as i64);
|
||||
}
|
||||
}
|
||||
Ok(counts)
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
|
||||
@@ -100,6 +100,74 @@ pub fn generate_video_thumbnail(path: &Path, destination: &Path) {
|
||||
.expect("Failure to create video frame");
|
||||
}
|
||||
|
||||
/// Check if a video is already encoded with h264 codec
|
||||
/// Returns true if the video uses h264, false otherwise or if detection fails
|
||||
async fn is_h264_encoded(video_path: &str) -> bool {
|
||||
let output = tokio::process::Command::new("ffprobe")
|
||||
.arg("-v")
|
||||
.arg("error")
|
||||
.arg("-select_streams")
|
||||
.arg("v:0")
|
||||
.arg("-show_entries")
|
||||
.arg("stream=codec_name")
|
||||
.arg("-of")
|
||||
.arg("default=noprint_wrappers=1:nokey=1")
|
||||
.arg(video_path)
|
||||
.output()
|
||||
.await;
|
||||
|
||||
match output {
|
||||
Ok(output) if output.status.success() => {
|
||||
let codec = String::from_utf8_lossy(&output.stdout);
|
||||
let codec = codec.trim();
|
||||
debug!("Detected codec for {}: {}", video_path, codec);
|
||||
codec == "h264"
|
||||
}
|
||||
Ok(output) => {
|
||||
warn!(
|
||||
"ffprobe failed for {}: {}",
|
||||
video_path,
|
||||
String::from_utf8_lossy(&output.stderr)
|
||||
);
|
||||
false
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to run ffprobe for {}: {}", video_path, e);
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a video has rotation metadata
|
||||
/// Returns the rotation angle in degrees (0, 90, 180, 270) or 0 if none detected
|
||||
async fn get_video_rotation(video_path: &str) -> i32 {
|
||||
let output = tokio::process::Command::new("ffprobe")
|
||||
.arg("-v")
|
||||
.arg("error")
|
||||
.arg("-select_streams")
|
||||
.arg("v:0")
|
||||
.arg("-show_entries")
|
||||
.arg("stream_tags=rotate")
|
||||
.arg("-of")
|
||||
.arg("default=noprint_wrappers=1:nokey=1")
|
||||
.arg(video_path)
|
||||
.output()
|
||||
.await;
|
||||
|
||||
match output {
|
||||
Ok(output) if output.status.success() => {
|
||||
let rotation_str = String::from_utf8_lossy(&output.stdout);
|
||||
let rotation_str = rotation_str.trim();
|
||||
if rotation_str.is_empty() {
|
||||
0
|
||||
} else {
|
||||
rotation_str.parse::<i32>().unwrap_or(0)
|
||||
}
|
||||
}
|
||||
_ => 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub struct VideoPlaylistManager {
|
||||
playlist_dir: PathBuf,
|
||||
playlist_generator: Addr<PlaylistGenerator>,
|
||||
@@ -192,17 +260,51 @@ impl Handler<ScanDirectoryMessage> for VideoPlaylistManager {
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<QueueVideosMessage> for VideoPlaylistManager {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: QueueVideosMessage, _ctx: &mut Self::Context) -> Self::Result {
|
||||
if msg.video_paths.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
info!(
|
||||
"Queueing {} videos for HLS playlist generation",
|
||||
msg.video_paths.len()
|
||||
);
|
||||
|
||||
let playlist_output_dir = self.playlist_dir.clone();
|
||||
let playlist_generator = self.playlist_generator.clone();
|
||||
|
||||
for video_path in msg.video_paths {
|
||||
let path_str = video_path.to_string_lossy().to_string();
|
||||
debug!("Queueing playlist generation for: {}", path_str);
|
||||
|
||||
playlist_generator.do_send(GeneratePlaylistMessage {
|
||||
playlist_path: playlist_output_dir.to_str().unwrap().to_string(),
|
||||
video_path,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct ScanDirectoryMessage {
|
||||
pub(crate) directory: String,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct QueueVideosMessage {
|
||||
pub video_paths: Vec<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<()>")]
|
||||
struct GeneratePlaylistMessage {
|
||||
video_path: PathBuf,
|
||||
playlist_path: String,
|
||||
pub struct GeneratePlaylistMessage {
|
||||
pub video_path: PathBuf,
|
||||
pub playlist_path: String,
|
||||
}
|
||||
|
||||
pub struct PlaylistGenerator {
|
||||
@@ -272,25 +374,54 @@ impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
|
||||
return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
|
||||
}
|
||||
|
||||
// Check if video is already h264 encoded
|
||||
let is_h264 = is_h264_encoded(&video_file).await;
|
||||
|
||||
// Check for rotation metadata
|
||||
let rotation = get_video_rotation(&video_file).await;
|
||||
let has_rotation = rotation != 0;
|
||||
|
||||
let use_copy = is_h264 && !has_rotation;
|
||||
|
||||
if has_rotation {
|
||||
info!("Video {} has rotation metadata ({}°), transcoding to apply rotation", video_file, rotation);
|
||||
span.add_event("Transcoding due to rotation", vec![
|
||||
KeyValue::new("rotation_degrees", rotation as i64)
|
||||
]);
|
||||
} else if use_copy {
|
||||
info!("Video {} is already h264, using stream copy", video_file);
|
||||
span.add_event("Using stream copy (h264 detected)", vec![]);
|
||||
} else {
|
||||
info!("Video {} needs transcoding to h264", video_file);
|
||||
span.add_event("Transcoding to h264", vec![]);
|
||||
}
|
||||
|
||||
tokio::spawn(async move {
|
||||
let ffmpeg_result = tokio::process::Command::new("ffmpeg")
|
||||
.arg("-i")
|
||||
.arg(&video_file)
|
||||
.arg("-c:v")
|
||||
.arg("h264")
|
||||
.arg("-crf")
|
||||
.arg("21")
|
||||
.arg("-preset")
|
||||
.arg("veryfast")
|
||||
.arg("-hls_time")
|
||||
.arg("3")
|
||||
.arg("-hls_list_size")
|
||||
.arg("100")
|
||||
.arg("-vf")
|
||||
.arg("scale=1080:-2,setsar=1:1")
|
||||
.arg(playlist_file)
|
||||
.stdout(Stdio::null())
|
||||
.stderr(Stdio::piped())
|
||||
let mut cmd = tokio::process::Command::new("ffmpeg");
|
||||
cmd.arg("-i").arg(&video_file);
|
||||
|
||||
if use_copy {
|
||||
// Video is already h264, just copy the stream
|
||||
// Note: rotation metadata will be preserved in the stream
|
||||
cmd.arg("-c:v").arg("copy");
|
||||
cmd.arg("-c:a").arg("aac"); // Still need to ensure audio is compatible
|
||||
} else {
|
||||
// Need to transcode - autorotate is enabled by default and will apply rotation
|
||||
cmd.arg("-c:v").arg("h264");
|
||||
cmd.arg("-crf").arg("21");
|
||||
cmd.arg("-preset").arg("veryfast");
|
||||
cmd.arg("-vf").arg("scale=1080:-2,setsar=1:1");
|
||||
cmd.arg("-c:a").arg("aac");
|
||||
}
|
||||
|
||||
// Common HLS settings
|
||||
cmd.arg("-hls_time").arg("3");
|
||||
cmd.arg("-hls_list_size").arg("100");
|
||||
cmd.arg(&playlist_file);
|
||||
cmd.stdout(Stdio::null());
|
||||
cmd.stderr(Stdio::piped());
|
||||
|
||||
let ffmpeg_result = cmd
|
||||
.output()
|
||||
.inspect_err(|e| error!("Failed to run ffmpeg on child process: {}", e))
|
||||
.map_err(|e| std::io::Error::other(e.to_string()))
|
||||
|
||||
Reference in New Issue
Block a user