From 29f32b9d22909b09294577b4538a9a92a1847ecd Mon Sep 17 00:00:00 2001 From: Cameron Date: Fri, 24 Apr 2026 10:08:03 -0400 Subject: [PATCH] FFMPEG playlist improvements Better playlist management, .tmp renaming, HLS playlist parameter and concurrency tweaking. --- src/video/actors.rs | 420 +++++++++++++++++++++++++++++++------------- src/video/ffmpeg.rs | 6 +- 2 files changed, 300 insertions(+), 126 deletions(-) diff --git a/src/video/actors.rs b/src/video/actors.rs index e85feb4..a461efe 100644 --- a/src/video/actors.rs +++ b/src/video/actors.rs @@ -4,7 +4,6 @@ use crate::libraries::Library; use crate::otel::global_tracer; use crate::video::ffmpeg::generate_preview_clip; use actix::prelude::*; -use futures::TryFutureExt; use log::{debug, error, info, trace, warn}; use opentelemetry::KeyValue; use opentelemetry::trace::{Span, Status, Tracer}; @@ -74,9 +73,11 @@ pub async fn create_playlist(video_path: &str, playlist_file: &str) -> Result std:: Ok(()) } -/// Check if a video is already encoded with h264 codec -/// Returns true if the video uses h264, false otherwise or if detection fails -async fn is_h264_encoded(video_path: &str) -> bool { - let output = tokio::process::Command::new("ffprobe") - .arg("-v") - .arg("error") - .arg("-select_streams") - .arg("v:0") - .arg("-show_entries") - .arg("stream=codec_name") - .arg("-of") - .arg("default=noprint_wrappers=1:nokey=1") - .arg(video_path) - .output() - .await; - - match output { - Ok(output) if output.status.success() => { - let codec = String::from_utf8_lossy(&output.stdout); - let codec = codec.trim(); - debug!("Detected codec for {}: {}", video_path, codec); - codec == "h264" - } - Ok(output) => { - warn!( - "ffprobe failed for {}: {}", - video_path, - String::from_utf8_lossy(&output.stderr) - ); - false - } - Err(e) => { - warn!("Failed to run ffprobe for {}: {}", video_path, e); - false - } - } +/// Video stream metadata needed to pick HLS encode settings. Populated by +/// a single ffprobe call to avoid spawning multiple subprocesses per video. +#[derive(Debug, Default)] +struct VideoStreamMeta { + is_h264: bool, + /// Rotation in degrees (0/90/180/270). Checks both the legacy `rotate` + /// stream tag and the modern display-matrix side data. + rotation: i32, } -/// Check if a video has rotation metadata -/// Returns the rotation angle in degrees (0, 90, 180, 270) or 0 if none detected -/// Checks both legacy stream tags and modern display matrix side data -async fn get_video_rotation(video_path: &str) -> i32 { - // Check legacy rotate stream tag (older videos) +/// Probe video stream metadata in one ffprobe call. Returns default (codec +/// unknown, rotation 0) on any failure — callers fall back to transcoding. +async fn probe_video_stream_meta(video_path: &str) -> VideoStreamMeta { let output = tokio::process::Command::new("ffprobe") .arg("-v") .arg("error") .arg("-select_streams") .arg("v:0") + .arg("-print_format") + .arg("json") .arg("-show_entries") - .arg("stream_tags=rotate") - .arg("-of") - .arg("default=noprint_wrappers=1:nokey=1") + .arg("stream=codec_name:stream_tags=rotate:side_data_list") .arg(video_path) .output() .await; - if let Ok(output) = output - && output.status.success() - { - let rotation_str = String::from_utf8_lossy(&output.stdout); - let rotation_str = rotation_str.trim(); - if !rotation_str.is_empty() - && let Ok(rotation) = rotation_str.parse::() - && rotation != 0 - { - debug!( - "Detected rotation {}° from stream tag for {}", - rotation, video_path - ); - return rotation; - } + let Ok(output) = output else { + warn!("Failed to run ffprobe for {}", video_path); + return VideoStreamMeta::default(); + }; + if !output.status.success() { + warn!( + "ffprobe failed for {}: {}", + video_path, + String::from_utf8_lossy(&output.stderr).trim() + ); + return VideoStreamMeta::default(); } - // Check display matrix side data (modern videos, e.g. iPhone) + let Ok(json) = serde_json::from_slice::(&output.stdout) else { + warn!("ffprobe returned non-JSON for {}", video_path); + return VideoStreamMeta::default(); + }; + + let stream = &json["streams"][0]; + let is_h264 = stream + .get("codec_name") + .and_then(|v| v.as_str()) + .map(|s| s == "h264") + .unwrap_or(false); + + // Prefer legacy `tags.rotate` (older containers); fall back to the + // display-matrix side data (iPhone and other modern recorders). + let rotation = stream + .get("tags") + .and_then(|t| t.get("rotate")) + .and_then(|r| r.as_str()) + .and_then(|s| s.parse::().ok()) + .filter(|r| *r != 0) + .or_else(|| { + stream + .get("side_data_list") + .and_then(|l| l.as_array()) + .and_then(|arr| { + arr.iter() + .find_map(|sd| sd.get("rotation").and_then(|r| r.as_f64())) + }) + .map(|f| f.abs() as i32) + .filter(|r| *r != 0) + }) + .unwrap_or(0); + + debug!( + "Probed {}: codec_h264={}, rotation={}°", + video_path, is_h264, rotation + ); + + VideoStreamMeta { is_h264, rotation } +} + +/// Probe the max keyframe interval (GOP) in the first ~30s of a video. +/// Returns `None` on probe failure or if we couldn't see at least two keyframes. +/// +/// Used to decide between stream-copy and transcode: HLS needs segments to +/// start on keyframes, so if the source GOP exceeds `hls_time`, copying +/// produces oversized/glitchy segments and we need to re-encode. +async fn get_max_gop_seconds(video_path: &str) -> Option { let output = tokio::process::Command::new("ffprobe") .arg("-v") .arg("error") .arg("-select_streams") .arg("v:0") + .arg("-skip_frame") + .arg("nokey") .arg("-show_entries") - .arg("side_data=rotation") + .arg("frame=pts_time") .arg("-of") - .arg("default=noprint_wrappers=1:nokey=1") + .arg("csv=p=0") + .arg("-read_intervals") + .arg("%+30") .arg(video_path) .output() - .await; + .await + .ok()?; - if let Ok(output) = output - && output.status.success() - { - let rotation_str = String::from_utf8_lossy(&output.stdout); - let rotation_str = rotation_str.trim(); - if !rotation_str.is_empty() - && let Ok(rotation) = rotation_str.parse::() - { - let rotation = rotation.abs() as i32; - if rotation != 0 { - debug!( - "Detected rotation {}° from display matrix for {}", - rotation, video_path - ); - return rotation; - } - } + if !output.status.success() { + warn!( + "ffprobe GOP check failed for {}: {}", + video_path, + String::from_utf8_lossy(&output.stderr).trim() + ); + return None; } - 0 + let times: Vec = String::from_utf8_lossy(&output.stdout) + .lines() + .filter_map(|l| l.trim().parse::().ok()) + .collect(); + + if times.len() < 2 { + return None; + } + + let max_gop = times + .windows(2) + .map(|w| w[1] - w[0]) + .fold(0.0_f64, f64::max); + debug!( + "Max GOP in first {} keyframes of {}: {:.2}s", + times.len(), + video_path, + max_gop + ); + Some(max_gop) } pub struct VideoPlaylistManager { @@ -404,8 +432,17 @@ pub struct PlaylistGenerator { impl PlaylistGenerator { pub(crate) fn new() -> Self { + // Concurrency is tunable via HLS_CONCURRENCY so operators can dial + // it to their hardware: 1 on weak Synology boxes to avoid thermal + // throttling, higher on desktops with spare cores. + let concurrency = std::env::var("HLS_CONCURRENCY") + .ok() + .and_then(|v| v.parse::().ok()) + .filter(|&n| n > 0) + .unwrap_or(2); + info!("PlaylistGenerator: concurrency={}", concurrency); PlaylistGenerator { - semaphore: Arc::new(Semaphore::new(2)), + semaphore: Arc::new(Semaphore::new(concurrency)), } } } @@ -465,14 +502,42 @@ impl Handler for PlaylistGenerator { return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)); } - // Check if video is already h264 encoded - let is_h264 = is_h264_encoded(&video_file).await; - - // Check for rotation metadata - let rotation = get_video_rotation(&video_file).await; + // One ffprobe call for codec + rotation metadata. + let stream_meta = probe_video_stream_meta(&video_file).await; + let is_h264 = stream_meta.is_h264; + let rotation = stream_meta.rotation; let has_rotation = rotation != 0; - let use_copy = is_h264 && !has_rotation; + // Stream-copy is only safe when the source GOP fits inside a + // single HLS segment. Otherwise ffmpeg has to extend segments + // past hls_time to land on a keyframe, producing uneven + // segments and seeking glitches. + const HLS_SEGMENT_SECONDS: f64 = 3.0; + let gop_ok = if is_h264 && !has_rotation { + match get_max_gop_seconds(&video_file).await { + Some(g) if g > HLS_SEGMENT_SECONDS => { + info!( + "Video {} has long GOP ({:.1}s > {}s), transcoding for segment alignment", + video_file, g, HLS_SEGMENT_SECONDS + ); + false + } + Some(_) => true, + None => { + // Probe failed — be conservative and transcode rather + // than risk broken segments from a mystery source. + debug!( + "GOP probe failed for {}, transcoding to be safe", + video_file + ); + false + } + } + } else { + false + }; + + let use_copy = is_h264 && !has_rotation && gop_ok; if has_rotation { info!( @@ -486,59 +551,168 @@ impl Handler for PlaylistGenerator { } else if use_copy { info!("Video {} is already h264, using stream copy", video_file); span.add_event("Using stream copy (h264 detected)", vec![]); + } else if is_h264 { + info!( + "Video {} is h264 but needs transcoding for GOP alignment", + video_file + ); + span.add_event("Transcoding for GOP alignment", vec![]); } else { info!("Video {} needs transcoding to h264", video_file); span.add_event("Transcoding to h264", vec![]); } - tokio::spawn(async move { - let mut cmd = tokio::process::Command::new("ffmpeg"); - cmd.arg("-i").arg(&video_file); + // Encode to a .tmp playlist and explicit segment names so a failed + // encode leaves predictable artifacts we can clean up — and so a + // concurrent scan doesn't see a half-written .m3u8 as "done". + let playlist_tmp = format!("{}.tmp", playlist_file); + let video_stem = msg + .video_path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("video"); + let segment_pattern = format!("{}/{}_%03d.ts", playlist_path, video_stem); - if use_copy { - // Video is already h264, just copy the stream - // Note: rotation metadata will be preserved in the stream - cmd.arg("-c:v").arg("copy"); - cmd.arg("-c:a").arg("aac"); // Still need to ensure audio is compatible + let mut cmd = tokio::process::Command::new("ffmpeg"); + cmd.arg("-y").arg("-i").arg(&video_file); + + if use_copy { + cmd.arg("-c:v").arg("copy"); + cmd.arg("-c:a").arg("aac"); + } else { + let nvenc = crate::video::ffmpeg::is_nvenc_available().await; + if nvenc { + // NVENC: no CRF, use VBR + target CQ. p1 = fastest + // preset — prioritizes encoder throughput over bitrate + // efficiency. CQ 23 roughly matches libx264 crf 21 + // visually; NVENC has slightly lower compression + // efficiency per quality. + cmd.arg("-c:v").arg("h264_nvenc"); + cmd.arg("-preset").arg("p1"); + cmd.arg("-rc").arg("vbr"); + cmd.arg("-cq").arg("23"); + cmd.arg("-pix_fmt").arg("yuv420p"); } else { - // Need to transcode - autorotate is enabled by default and will apply rotation cmd.arg("-c:v").arg("h264"); cmd.arg("-crf").arg("21"); cmd.arg("-preset").arg("veryfast"); - cmd.arg("-vf").arg("scale=1080:-2,setsar=1:1"); - cmd.arg("-c:a").arg("aac"); } + cmd.arg("-vf").arg("scale='min(1080,iw)':-2,setsar=1:1"); + cmd.arg("-c:a").arg("aac"); + // Force an IDR frame every hls_time seconds so each HLS + // segment starts on a keyframe — accurate seeking without + // players having to decode from a prior segment. + cmd.arg("-force_key_frames").arg("expr:gte(t,n_forced*3)"); + } - // Common HLS settings - cmd.arg("-hls_time").arg("3"); - cmd.arg("-hls_list_size").arg("100"); - cmd.arg(&playlist_file); - cmd.stdout(Stdio::null()); - cmd.stderr(Stdio::piped()); + // -f hls is required because the playlist is written to a .tmp + // path during encoding — ffmpeg normally infers the muxer from + // the output extension and doesn't recognize ".m3u8.tmp". + cmd.arg("-f").arg("hls"); + cmd.arg("-hls_time").arg("3"); + cmd.arg("-hls_list_size").arg("0"); + cmd.arg("-hls_playlist_type").arg("vod"); + // independent_segments advertises that each segment can be + // decoded without reference to any other — the matching guarantee + // for the forced keyframes above. + cmd.arg("-hls_flags").arg("independent_segments"); + cmd.arg("-hls_segment_filename").arg(&segment_pattern); + cmd.arg(&playlist_tmp); + cmd.stdout(Stdio::null()); + cmd.stderr(Stdio::piped()); + cmd.kill_on_drop(true); - let ffmpeg_result = cmd - .output() - .inspect_err(|e| error!("Failed to run ffmpeg on child process: {}", e)) - .map_err(|e| std::io::Error::other(e.to_string())) - .await; + // Spawn + wait under a timeout so a hung ffmpeg (corrupt source, + // NFS stall, etc.) doesn't permanently hold a semaphore slot. + // Default is generous — a long 4K transcode on CPU can take hours. + let timeout_secs = std::env::var("HLS_TIMEOUT_SECONDS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(7200); - // Hang on to the permit until we're done decoding and then explicitly drop - drop(permit); - - if let Ok(ref res) = ffmpeg_result { - debug!("ffmpeg output: {:?}", res); + let ffmpeg_result = match cmd.spawn() { + Ok(child) => { + match tokio::time::timeout( + std::time::Duration::from_secs(timeout_secs), + child.wait_with_output(), + ) + .await + { + Ok(res) => res + .inspect_err(|e| { + error!("Failed to wait on ffmpeg child process: {}", e) + }) + .map_err(|e| std::io::Error::other(e.to_string())), + Err(_) => Err(std::io::Error::other(format!( + "ffmpeg exceeded {}s timeout", + timeout_secs + ))), + } } + Err(e) => { + error!("Failed to spawn ffmpeg: {}", e); + Err(std::io::Error::other(e.to_string())) + } + }; + drop(permit); + + let success = matches!(&ffmpeg_result, Ok(out) if out.status.success()); + + if success { + if let Err(e) = tokio::fs::rename(&playlist_tmp, &playlist_file).await { + error!( + "ffmpeg succeeded but rename {} -> {} failed: {}", + playlist_tmp, playlist_file, e + ); + cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await; + span.set_status(Status::error(format!("rename failed: {}", e))); + return Err(e); + } + debug!("Playlist complete: {}", playlist_file); span.set_status(Status::Ok); - - ffmpeg_result - }); - - Ok(()) + Ok(()) + } else { + let detail = match &ffmpeg_result { + Ok(out) => format!( + "exit {}: {}", + out.status, + String::from_utf8_lossy(&out.stderr).trim() + ), + Err(e) => format!("ffmpeg failed: {}", e), + }; + error!("ffmpeg failed for {}: {}", video_file, detail); + cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await; + span.set_status(Status::error(detail.clone())); + Err(std::io::Error::other(detail)) + } }) } } +/// Delete the temp playlist and any segment files that ffmpeg may have written +/// before failing. Called both on ffmpeg error and on rename failure so a +/// retry on the next scan starts from a clean slate. +async fn cleanup_partial_hls(playlist_tmp: &str, playlist_dir: &str, video_stem: &str) { + let _ = tokio::fs::remove_file(playlist_tmp).await; + + let segment_prefix = format!("{}_", video_stem); + let Ok(mut entries) = tokio::fs::read_dir(playlist_dir).await else { + return; + }; + while let Ok(Some(entry)) = entries.next_entry().await { + let Some(name) = entry.file_name().to_str().map(str::to_owned) else { + continue; + }; + if name.starts_with(&segment_prefix) + && name.ends_with(".ts") + && let Err(e) = tokio::fs::remove_file(entry.path()).await + { + warn!("Failed to remove partial segment {}: {}", name, e); + } + } +} + #[derive(Message)] #[rtype(result = "()")] pub struct GeneratePreviewClipMessage { diff --git a/src/video/ffmpeg.rs b/src/video/ffmpeg.rs index 5ed9308..a31fd0c 100644 --- a/src/video/ffmpeg.rs +++ b/src/video/ffmpeg.rs @@ -22,16 +22,16 @@ async fn check_nvenc_available() -> bool { } /// Returns whether NVENC is available, caching the result after first check. -async fn is_nvenc_available() -> bool { +pub async fn is_nvenc_available() -> bool { if let Some(&available) = NVENC_AVAILABLE.get() { return available; } let available = check_nvenc_available().await; let _ = NVENC_AVAILABLE.set(available); if available { - info!("CUDA NVENC hardware acceleration detected and enabled for preview clips"); + info!("CUDA NVENC hardware acceleration detected and enabled"); } else { - info!("NVENC not available, using CPU encoding for preview clips"); + info!("NVENC not available, using CPU encoding"); } available }