diff --git a/src/main.rs b/src/main.rs index 30be9dc..ec5b3d0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,7 +26,6 @@ use crate::files::{RealFileSystem, move_file}; use crate::service::ServiceBuilder; use crate::state::AppState; use crate::tags::*; -use crate::video::actors::ScanDirectoryMessage; use log::{error, info}; mod ai; @@ -119,13 +118,12 @@ fn main() -> std::io::Result<()> { .unwrap(); let app_state = app_data.clone(); - for lib in &app_state.libraries { - app_state.playlist_manager.do_send(ScanDirectoryMessage { - directory: lib.root_path.clone(), - }); - } - // Start file watcher with playlist manager and preview generator + // Start file watcher with playlist manager and preview generator. + // The watcher's first tick is configured to be a full scan (see + // `watch_files`), so every library's missing HLS playlists are + // queued on that first iteration — no separate startup walk + // needed. let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone(); let preview_gen_for_watcher = app_state.preview_clip_generator.as_ref().clone(); // Both background jobs read from the shared `live_libraries` lock diff --git a/src/state.rs b/src/state.rs index b147c77..fd39cba 100644 --- a/src/state.rs +++ b/src/state.rs @@ -111,7 +111,7 @@ impl AppState { "AppState::new requires at least one library" ); let base_path = libraries_vec[0].root_path.clone(); - let playlist_generator = PlaylistGenerator::new(); + let playlist_generator = PlaylistGenerator::new(video_path.clone()); let video_playlist_manager = VideoPlaylistManager::new(video_path.clone(), playlist_generator.start()); diff --git a/src/video/actors.rs b/src/video/actors.rs index 4d7b90a..d0ae04f 100644 --- a/src/video/actors.rs +++ b/src/video/actors.rs @@ -1,8 +1,9 @@ +use crate::content_hash; use crate::database::PreviewDao; use crate::libraries::Library; use crate::otel::global_tracer; -use crate::thumbnails::is_video; use crate::video::ffmpeg::{generate_preview_clip, get_duration_seconds_blocking}; +use crate::video::hls_paths; use actix::prelude::*; use log::{debug, error, info, trace, warn}; use opentelemetry::KeyValue; @@ -12,7 +13,6 @@ use std::path::{Path, PathBuf}; use std::process::{Child, Command, ExitStatus, Stdio}; use std::sync::{Arc, Mutex}; use tokio::sync::Semaphore; -use walkdir::{DirEntry, WalkDir}; // ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8 // ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8 @@ -57,9 +57,11 @@ pub struct VideoToQueue { pub content_hash: String, } -/// Legacy basename-keyed playlist path. Used only by the one-shot startup -/// migration that retires pre-content-hash output; once cleared, all new -/// playlist writes go through [`hls_paths::playlist_for_hash`]. +/// Legacy basename-keyed playlist path. Retained for the one-shot startup +/// migration that retires pre-content-hash output; new playlist writes go +/// through [`hls_paths::playlist_for_hash`]. Will be removed once the +/// migration ships and runs to completion in production. +#[allow(dead_code)] pub fn playlist_file_for(playlist_dir: &str, video_path: &Path) -> PathBuf { let filename = video_path .file_name() @@ -70,6 +72,7 @@ pub fn playlist_file_for(playlist_dir: &str, video_path: &Path) -> PathBuf { /// Legacy basename-keyed sentinel path. Same migration-only contract as /// [`playlist_file_for`]. +#[allow(dead_code)] pub fn playlist_unsupported_sentinel(playlist_file: &Path) -> PathBuf { let mut s = playlist_file.as_os_str().to_owned(); s.push(".unsupported"); @@ -342,17 +345,17 @@ async fn get_max_gop_seconds(video_path: &str) -> Option { } pub struct VideoPlaylistManager { - playlist_dir: PathBuf, + video_dir: PathBuf, playlist_generator: Addr, } impl VideoPlaylistManager { pub fn new>( - playlist_dir: P, + video_dir: P, playlist_generator: Addr, ) -> Self { Self { - playlist_dir: playlist_dir.into(), + video_dir: video_dir.into(), playlist_generator, } } @@ -362,144 +365,68 @@ impl Actor for VideoPlaylistManager { type Context = Context; } -impl Handler for VideoPlaylistManager { - type Result = ResponseFuture<()>; - - fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result { - let tracer = global_tracer(); - let mut span = tracer.start("videoplaylistmanager.scan_directory"); - - let start = std::time::Instant::now(); - info!( - "Starting scan directory for video playlist generation: {}", - msg.directory - ); - - let playlist_output_dir = self.playlist_dir.clone(); - let playlist_dir_str = playlist_output_dir.to_str().unwrap().to_string(); - - let video_files = WalkDir::new(&msg.directory) - .into_iter() - .filter_map(|e| e.ok()) - .filter(|e| e.file_type().is_file()) - .filter(is_video) - .filter(|e| { - let playlist = playlist_file_for(&playlist_dir_str, e.path()); - !playlist.exists() && !playlist_unsupported_sentinel(&playlist).exists() - }) - .collect::>(); - - let scan_dir_name = msg.directory.clone(); - let playlist_generator = self.playlist_generator.clone(); - - Box::pin(async move { - for e in video_files { - let path = e.path(); - let path_as_str = path.to_str().unwrap(); - debug!( - "Sending generate playlist message for path: {}", - path_as_str - ); - - match playlist_generator - .send(GeneratePlaylistMessage { - playlist_path: playlist_output_dir.to_str().unwrap().to_string(), - video_path: PathBuf::from(path), - }) - .await - .expect("Failed to send generate playlist message") - { - Ok(_) => { - span.add_event( - "Playlist generated", - vec![KeyValue::new("video_path", path_as_str.to_string())], - ); - - debug!( - "Successfully generated playlist for file: '{}'", - path_as_str - ); - } - Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { - debug!("Playlist already exists for '{:?}', skipping", path); - } - Err(e) => { - warn!("Failed to generate playlist for path '{:?}'. {:?}", path, e); - } - } - } - - span.add_event( - "Finished directory scan", - vec![KeyValue::new("directory", scan_dir_name.to_string())], - ); - info!( - "Finished directory scan of '{}' in {:?}", - scan_dir_name, - start.elapsed() - ); - }) - } -} - impl Handler for VideoPlaylistManager { type Result = (); fn handle(&mut self, msg: QueueVideosMessage, _ctx: &mut Self::Context) -> Self::Result { - if msg.video_paths.is_empty() { + if msg.videos.is_empty() { return; } - info!( - "Queueing {} videos for HLS playlist generation", - msg.video_paths.len() - ); - - let playlist_output_dir = self.playlist_dir.clone(); - let playlist_dir_str = playlist_output_dir.to_str().unwrap().to_string(); + let video_dir = self.video_dir.clone(); let playlist_generator = self.playlist_generator.clone(); - for video_path in msg.video_paths { - let playlist = playlist_file_for(&playlist_dir_str, &video_path); - if playlist.exists() || playlist_unsupported_sentinel(&playlist).exists() { + let mut queued = 0usize; + let mut already_present = 0usize; + for VideoToQueue { + video_path, + content_hash, + } in msg.videos + { + let playlist = hls_paths::playlist_for_hash(&video_dir, &content_hash); + let sentinel = hls_paths::sentinel_for_hash(&video_dir, &content_hash); + if playlist.exists() || sentinel.exists() { + already_present += 1; continue; } - let path_str = video_path.to_string_lossy().to_string(); - debug!("Queueing playlist generation for: {}", path_str); - + debug!( + "Queueing playlist generation for {} (hash={})", + video_path.display(), + short_hash(&content_hash) + ); playlist_generator.do_send(GeneratePlaylistMessage { - playlist_path: playlist_dir_str.clone(), video_path, + content_hash, }); + queued += 1; } + info!( + "Queue tick: {} queued, {} skipped (playlist or sentinel already on disk)", + queued, already_present + ); } } -#[derive(Message)] -#[rtype(result = "()")] -pub struct ScanDirectoryMessage { - pub(crate) directory: String, -} - #[derive(Message)] #[rtype(result = "()")] pub struct QueueVideosMessage { - pub video_paths: Vec, + pub videos: Vec, } #[derive(Message)] #[rtype(result = "Result<()>")] pub struct GeneratePlaylistMessage { pub video_path: PathBuf, - pub playlist_path: String, + pub content_hash: String, } pub struct PlaylistGenerator { semaphore: Arc, + video_dir: PathBuf, } impl PlaylistGenerator { - pub(crate) fn new() -> Self { + pub(crate) fn new>(video_dir: P) -> Self { // Concurrency is tunable via HLS_CONCURRENCY so operators can dial // it to their hardware: 1 on weak Synology boxes to avoid thermal // throttling, higher on desktops with spare cores. @@ -511,6 +438,7 @@ impl PlaylistGenerator { info!("PlaylistGenerator: concurrency={}", concurrency); PlaylistGenerator { semaphore: Arc::new(Semaphore::new(concurrency)), + video_dir: video_dir.into(), } } } @@ -524,20 +452,23 @@ impl Handler for PlaylistGenerator { fn handle(&mut self, msg: GeneratePlaylistMessage, _ctx: &mut Self::Context) -> Self::Result { let video_file = msg.video_path.to_str().unwrap().to_owned(); - let playlist_path = msg.playlist_path.as_str().to_owned(); + let content_hash_str = msg.content_hash.clone(); let semaphore = self.semaphore.clone(); + let video_dir = self.video_dir.clone(); - let playlist_file = format!( - "{}/{}.m3u8", - playlist_path, - msg.video_path.file_name().unwrap().to_str().unwrap() - ); + let hash_dir = content_hash::hls_dir(&video_dir, &content_hash_str); + let playlist_path = hls_paths::playlist_for_hash(&video_dir, &content_hash_str); + let sentinel_path = hls_paths::sentinel_for_hash(&video_dir, &content_hash_str); + let segment_template = hls_paths::segment_template_for_hash(&video_dir, &content_hash_str); + let playlist_file = playlist_path.to_string_lossy().to_string(); + let segment_pattern = segment_template.to_string_lossy().to_string(); let tracer = global_tracer(); let mut span = tracer .span_builder("playlistgenerator.generate_playlist") .with_attributes(vec![ KeyValue::new("video_file", video_file.clone()), + KeyValue::new("content_hash", content_hash_str.clone()), KeyValue::new("playlist_file", playlist_file.clone()), ]) .start(&tracer); @@ -561,7 +492,7 @@ impl Handler for PlaylistGenerator { )], ); - if Path::new(&playlist_file).exists() { + if playlist_path.exists() { debug!("Playlist already exists: {}", playlist_file); span.set_status(Status::error(format!( "Playlist already exists: {}", @@ -570,6 +501,19 @@ impl Handler for PlaylistGenerator { return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)); } + // Ensure the shard + hash directory exist. Idempotent — the + // dir may already be present from a prior attempt that wrote + // a sentinel before being cleared for retry. + if let Err(e) = tokio::fs::create_dir_all(&hash_dir).await { + error!( + "Failed to create HLS hash dir {}: {}", + hash_dir.display(), + e + ); + span.set_status(Status::error(format!("mkdir failed: {}", e))); + return Err(e); + } + // One ffprobe call for codec + rotation metadata. let stream_meta = probe_video_stream_meta(&video_file).await; let is_h264 = stream_meta.is_h264; @@ -630,16 +574,11 @@ impl Handler for PlaylistGenerator { span.add_event("Transcoding to h264", vec![]); } - // Encode to a .tmp playlist and explicit segment names so a failed - // encode leaves predictable artifacts we can clean up — and so a - // concurrent scan doesn't see a half-written .m3u8 as "done". + // Encode to a .tmp playlist alongside the final inside the + // hash dir, so a concurrent scan never sees a half-written + // .m3u8 as "done". Segments use the hash-keyed template; + // ffmpeg writes them next to the playlist (relative refs). let playlist_tmp = format!("{}.tmp", playlist_file); - let video_stem = msg - .video_path - .file_name() - .and_then(|n| n.to_str()) - .unwrap_or("video"); - let segment_pattern = format!("{}/{}_%03d.ts", playlist_path, video_stem); let mut cmd = tokio::process::Command::new("ffmpeg"); cmd.arg("-y").arg("-i").arg(&video_file); @@ -728,12 +667,12 @@ impl Handler for PlaylistGenerator { let success = matches!(&ffmpeg_result, Ok(out) if out.status.success()); if success { - if let Err(e) = tokio::fs::rename(&playlist_tmp, &playlist_file).await { + if let Err(e) = tokio::fs::rename(&playlist_tmp, &playlist_path).await { error!( "ffmpeg succeeded but rename {} -> {} failed: {}", playlist_tmp, playlist_file, e ); - cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await; + cleanup_partial_hls(&hash_dir).await; span.set_status(Status::error(format!("rename failed: {}", e))); return Err(e); } @@ -750,18 +689,17 @@ impl Handler for PlaylistGenerator { Err(e) => format!("ffmpeg failed: {}", e), }; error!("ffmpeg failed for {}: {}", video_file, detail); - cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await; - let sentinel = playlist_unsupported_sentinel(Path::new(&playlist_file)); - if let Err(se) = tokio::fs::write(&sentinel, b"").await { + cleanup_partial_hls(&hash_dir).await; + if let Err(se) = tokio::fs::write(&sentinel_path, b"").await { warn!( "Failed to write playlist sentinel {}: {}", - sentinel.display(), + sentinel_path.display(), se ); } else { info!( "Wrote playlist sentinel {} so future scans skip {}", - sentinel.display(), + sentinel_path.display(), video_file ); } @@ -772,29 +710,47 @@ impl Handler for PlaylistGenerator { } } -/// Delete the temp playlist and any segment files that ffmpeg may have written -/// before failing. Called both on ffmpeg error and on rename failure so a -/// retry on the next scan starts from a clean slate. -async fn cleanup_partial_hls(playlist_tmp: &str, playlist_dir: &str, video_stem: &str) { - let _ = tokio::fs::remove_file(playlist_tmp).await; - - let segment_prefix = format!("{}_", video_stem); - let Ok(mut entries) = tokio::fs::read_dir(playlist_dir).await else { +/// Delete the partial playlist (.tmp) and any segment files left behind by +/// a failed ffmpeg run. Wipes every non-sentinel file in the hash dir; +/// retains the sentinel if one has already been written by an earlier +/// caller in the same path (today there is none, but kept defensively so +/// the function is safe to call after sentinel write too). +async fn cleanup_partial_hls(hash_dir: &Path) { + let Ok(mut entries) = tokio::fs::read_dir(hash_dir).await else { return; }; while let Ok(Some(entry)) = entries.next_entry().await { - let Some(name) = entry.file_name().to_str().map(str::to_owned) else { + let path = entry.path(); + let is_sentinel = path + .file_name() + .and_then(|n| n.to_str()) + .map(|n| n == hls_paths::UNSUPPORTED_SENTINEL_FILENAME) + .unwrap_or(false); + if is_sentinel { continue; - }; - if name.starts_with(&segment_prefix) - && name.ends_with(".ts") - && let Err(e) = tokio::fs::remove_file(entry.path()).await - { - warn!("Failed to remove partial segment {}: {}", name, e); + } + if let Err(e) = tokio::fs::remove_file(&path).await { + warn!( + "Failed to remove partial HLS file {}: {}", + path.display(), + e + ); } } } +/// First 16 chars of a content hash for log lines. Short enough to keep +/// log volume sane, long enough that distinct hashes don't collide in +/// practice. +fn short_hash(hash: &str) -> &str { + let end = hash + .char_indices() + .nth(16) + .map(|(i, _)| i) + .unwrap_or(hash.len()); + &hash[..end] +} + #[derive(Message)] #[rtype(result = "()")] pub struct GeneratePreviewClipMessage { diff --git a/src/watcher.rs b/src/watcher.rs index ca56ea6..c8de1ed 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -40,7 +40,10 @@ use crate::tags; use crate::tags::SqliteTagDao; use crate::thumbnails; use crate::video; -use crate::video::actors::{GeneratePreviewClipMessage, QueueVideosMessage, VideoPlaylistManager}; +use crate::video::actors::{ + GeneratePreviewClipMessage, QueueVideosMessage, VideoPlaylistManager, VideoToQueue, +}; +use crate::video::hls_paths; /// Clean up orphaned HLS playlists and segments whose source videos no longer exist. /// @@ -288,7 +291,12 @@ pub fn watch_files( )); let mut last_quick_scan = SystemTime::now(); - let mut last_full_scan = SystemTime::now(); + // Initialize to UNIX_EPOCH so the *first* tick is treated as a + // full scan. That replaces the legacy startup ScanDirectoryMessage + // walk for HLS playlists: every library's existing media gets + // checked once at watcher boot, instead of waiting up to + // full_interval_secs (1h default) for the first natural full scan. + let mut last_full_scan = SystemTime::UNIX_EPOCH; let mut scan_count = 0u64; // Per-library cursor for the missing-file scan. Each tick reads @@ -600,14 +608,18 @@ pub fn process_new_files( // Batch query: Get all EXIF data for these files in one query let file_paths: Vec = files.iter().map(|(_, rel_path)| rel_path.clone()).collect(); - let existing_exif_paths: HashMap = { + // Map of rel_path -> Option. The presence of the key + // tells us "row exists"; the Option value carries the hash for the + // HLS pipeline so video files without a hash (mid-backfill) skip + // this tick rather than fall back to a basename-colliding playlist. + let existing_exif: HashMap> = { let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); // Walk is per-library, so scope the lookup so a same-named file // in another library doesn't make this one look already-indexed. match dao.get_exif_batch(&context, Some(library.id), &file_paths) { Ok(exif_records) => exif_records .into_iter() - .map(|record| (record.file_path, true)) + .map(|record| (record.file_path, record.content_hash)) .collect(), Err(e) => { error!("Error batch querying EXIF data: {:?}", e); @@ -637,7 +649,7 @@ pub fn process_new_files( && !bare_legacy_thumb_path.exists() && !thumbnails::unsupported_thumbnail_sentinel(&scoped_thumb_path).exists() && !thumbnails::unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists(); - let needs_row = !existing_exif_paths.contains_key(relative_path); + let needs_row = !existing_exif.contains_key(relative_path); if needs_thumbnail || needs_row { new_files_found = true; @@ -796,28 +808,45 @@ pub fn process_new_files( } } - // Check for videos that need HLS playlists + // Check for videos that need HLS playlists. All output is keyed on + // `content_hash` (see `crate::video::hls_paths`), so files whose + // `image_exif.content_hash` is still NULL — typically mid-backfill — + // are skipped this tick and picked up after the unhashed backlog + // drain populates the hash on a subsequent tick. Skipping is the + // correct call: queuing without a hash would either fall back to + // basename keying (the bug this refactor fixes) or fabricate one. let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); - let mut videos_needing_playlists = Vec::new(); + let video_dir = Path::new(&video_path_base); + let mut videos_needing_playlists: Vec = Vec::new(); + let mut hashless_video_count = 0usize; - for (file_path, _relative_path) in &files { - if file_types::is_video_file(file_path) { - // Construct expected playlist path - let playlist_filename = - format!("{}.m3u8", file_path.file_name().unwrap().to_string_lossy()); - let playlist_path = Path::new(&video_path_base).join(&playlist_filename); - - // Check if playlist needs (re)generation - if playlist_needs_generation(file_path, &playlist_path) { - videos_needing_playlists.push(file_path.clone()); - } + for (file_path, relative_path) in &files { + if !file_types::is_video_file(file_path) { + continue; + } + let Some(hash) = existing_exif.get(relative_path).and_then(|h| h.clone()) else { + hashless_video_count += 1; + continue; + }; + let playlist_path = hls_paths::playlist_for_hash(video_dir, &hash); + if playlist_needs_generation(file_path, &playlist_path) { + videos_needing_playlists.push(VideoToQueue { + video_path: file_path.clone(), + content_hash: hash, + }); } } - // Send queue request to playlist manager + if hashless_video_count > 0 { + debug!( + "Watcher tick for '{}': skipped {} video(s) with NULL content_hash (will retry after backfill)", + library.name, hashless_video_count + ); + } + if !videos_needing_playlists.is_empty() { playlist_manager.do_send(QueueVideosMessage { - video_paths: videos_needing_playlists, + videos: videos_needing_playlists, }); }