diff --git a/src/main.rs b/src/main.rs index b7bc844..75296b0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ extern crate diesel; extern crate rayon; +use actix::Addr; use actix_web::web::Data; use actix_web_prom::PrometheusMetricsBuilder; use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; @@ -45,7 +46,8 @@ use crate::service::ServiceBuilder; use crate::state::AppState; use crate::tags::*; use crate::video::actors::{ - ProcessMessage, ScanDirectoryMessage, create_playlist, generate_video_thumbnail, + ProcessMessage, QueueVideosMessage, ScanDirectoryMessage, VideoPlaylistManager, + create_playlist, generate_video_thumbnail, }; use log::{debug, error, info, trace, warn}; use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; @@ -714,8 +716,6 @@ fn main() -> std::io::Result<()> { run_migrations(&mut connect()).expect("Failed to run migrations"); - watch_files(); - let system = actix::System::new(); system.block_on(async { // Just use basic logger when running a non-release build @@ -754,6 +754,10 @@ fn main() -> std::io::Result<()> { directory: app_state.base_path.clone(), }); + // Start file watcher with playlist manager + let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone(); + watch_files(playlist_mgr_for_watcher); + // Spawn background job to generate daily conversation summaries { use crate::ai::generate_daily_summaries; @@ -896,8 +900,8 @@ fn run_migrations( Ok(()) } -fn watch_files() { - std::thread::spawn(|| { +fn watch_files(playlist_manager: Addr) { + std::thread::spawn(move || { let base_str = dotenv::var("BASE_PATH").unwrap(); let base_path = PathBuf::from(&base_str); @@ -940,7 +944,7 @@ fn watch_files() { if is_full_scan { info!("Running full scan (scan #{})", scan_count); - process_new_files(&base_path, Arc::clone(&exif_dao), None); + process_new_files(&base_path, Arc::clone(&exif_dao), None, playlist_manager.clone()); last_full_scan = now; } else { debug!( @@ -951,7 +955,7 @@ fn watch_files() { let check_since = last_quick_scan .checked_sub(Duration::from_secs(10)) .unwrap_or(last_quick_scan); - process_new_files(&base_path, Arc::clone(&exif_dao), Some(check_since)); + process_new_files(&base_path, Arc::clone(&exif_dao), Some(check_since), playlist_manager.clone()); } last_quick_scan = now; @@ -967,6 +971,7 @@ fn process_new_files( base_path: &Path, exif_dao: Arc>>, modified_since: Option, + playlist_manager: Addr, ) { let context = opentelemetry::Context::new(); let thumbs = dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined"); @@ -1030,14 +1035,14 @@ fn process_new_files( let mut files_needing_exif = Vec::new(); // Check each file for missing thumbnail or EXIF data - for (file_path, relative_path) in files { + for (file_path, relative_path) in &files { // Check if thumbnail exists - let thumb_path = thumbnail_directory.join(&relative_path); + let thumb_path = thumbnail_directory.join(relative_path); let needs_thumbnail = !thumb_path.exists(); // Check if EXIF data exists (for supported files) let needs_exif = if exif::supports_exif(&file_path) { - !existing_exif_paths.contains_key(&relative_path) + !existing_exif_paths.contains_key(relative_path) } else { false }; @@ -1050,7 +1055,7 @@ fn process_new_files( } if needs_exif { - files_needing_exif.push((file_path, relative_path)); + files_needing_exif.push((file_path.clone(), relative_path.clone())); } } } @@ -1104,6 +1109,33 @@ fn process_new_files( } } + // Check for videos that need HLS playlists + let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); + let mut videos_needing_playlists = Vec::new(); + + for (file_path, _relative_path) in &files { + if is_video_file(&file_path) { + // Construct expected playlist path + let playlist_filename = format!( + "{}.m3u8", + file_path.file_name().unwrap().to_string_lossy() + ); + let playlist_path = Path::new(&video_path_base).join(&playlist_filename); + + // Check if playlist already exists + if !playlist_path.exists() { + videos_needing_playlists.push(file_path.clone()); + } + } + } + + // Send queue request to playlist manager + if !videos_needing_playlists.is_empty() { + playlist_manager.do_send(QueueVideosMessage { + video_paths: videos_needing_playlists, + }); + } + // Generate thumbnails for all files that need them if new_files_found { info!("Processing thumbnails for new files..."); diff --git a/src/video/actors.rs b/src/video/actors.rs index 7902158..7f38b05 100644 --- a/src/video/actors.rs +++ b/src/video/actors.rs @@ -192,17 +192,51 @@ impl Handler for VideoPlaylistManager { } } +impl Handler for VideoPlaylistManager { + type Result = (); + + fn handle(&mut self, msg: QueueVideosMessage, _ctx: &mut Self::Context) -> Self::Result { + if msg.video_paths.is_empty() { + return; + } + + info!( + "Queueing {} videos for HLS playlist generation", + msg.video_paths.len() + ); + + let playlist_output_dir = self.playlist_dir.clone(); + let playlist_generator = self.playlist_generator.clone(); + + for video_path in msg.video_paths { + let path_str = video_path.to_string_lossy().to_string(); + debug!("Queueing playlist generation for: {}", path_str); + + playlist_generator.do_send(GeneratePlaylistMessage { + playlist_path: playlist_output_dir.to_str().unwrap().to_string(), + video_path, + }); + } + } +} + #[derive(Message)] #[rtype(result = "()")] pub struct ScanDirectoryMessage { pub(crate) directory: String, } +#[derive(Message)] +#[rtype(result = "()")] +pub struct QueueVideosMessage { + pub video_paths: Vec, +} + #[derive(Message)] #[rtype(result = "Result<()>")] -struct GeneratePlaylistMessage { - video_path: PathBuf, - playlist_path: String, +pub struct GeneratePlaylistMessage { + pub video_path: PathBuf, + pub playlist_path: String, } pub struct PlaylistGenerator {