FFMPEG playlist improvements

Better playlist management, .tmp renaming, HLS playlist parameter and concurrency tweaking.
This commit is contained in:
Cameron
2026-04-24 10:08:03 -04:00
parent 13b9d54861
commit 29f32b9d22
2 changed files with 300 additions and 126 deletions

View File

@@ -4,7 +4,6 @@ use crate::libraries::Library;
use crate::otel::global_tracer;
use crate::video::ffmpeg::generate_preview_clip;
use actix::prelude::*;
use futures::TryFutureExt;
use log::{debug, error, info, trace, warn};
use opentelemetry::KeyValue;
use opentelemetry::trace::{Span, Status, Tracer};
@@ -74,9 +73,11 @@ pub async fn create_playlist(video_path: &str, playlist_file: &str) -> Result<Ch
.arg("-hls_time")
.arg("3")
.arg("-hls_list_size")
.arg("100")
.arg("0")
.arg("-hls_playlist_type")
.arg("vod")
.arg("-vf")
.arg("scale=1080:-2,setsar=1:1")
.arg("scale='min(1080,iw)':-2,setsar=1:1")
.arg(playlist_file)
.stdout(Stdio::null())
.stderr(Stdio::null())
@@ -140,113 +141,140 @@ pub fn generate_image_thumbnail_ffmpeg(path: &Path, destination: &Path) -> std::
Ok(())
}
/// Check if a video is already encoded with h264 codec
/// Returns true if the video uses h264, false otherwise or if detection fails
async fn is_h264_encoded(video_path: &str) -> bool {
/// Video stream metadata needed to pick HLS encode settings. Populated by
/// a single ffprobe call to avoid spawning multiple subprocesses per video.
#[derive(Debug, Default)]
struct VideoStreamMeta {
is_h264: bool,
/// Rotation in degrees (0/90/180/270). Checks both the legacy `rotate`
/// stream tag and the modern display-matrix side data.
rotation: i32,
}
/// Probe video stream metadata in one ffprobe call. Returns default (codec
/// unknown, rotation 0) on any failure — callers fall back to transcoding.
async fn probe_video_stream_meta(video_path: &str) -> VideoStreamMeta {
let output = tokio::process::Command::new("ffprobe")
.arg("-v")
.arg("error")
.arg("-select_streams")
.arg("v:0")
.arg("-print_format")
.arg("json")
.arg("-show_entries")
.arg("stream=codec_name")
.arg("-of")
.arg("default=noprint_wrappers=1:nokey=1")
.arg("stream=codec_name:stream_tags=rotate:side_data_list")
.arg(video_path)
.output()
.await;
match output {
Ok(output) if output.status.success() => {
let codec = String::from_utf8_lossy(&output.stdout);
let codec = codec.trim();
debug!("Detected codec for {}: {}", video_path, codec);
codec == "h264"
}
Ok(output) => {
let Ok(output) = output else {
warn!("Failed to run ffprobe for {}", video_path);
return VideoStreamMeta::default();
};
if !output.status.success() {
warn!(
"ffprobe failed for {}: {}",
video_path,
String::from_utf8_lossy(&output.stderr)
String::from_utf8_lossy(&output.stderr).trim()
);
false
}
Err(e) => {
warn!("Failed to run ffprobe for {}: {}", video_path, e);
false
}
}
return VideoStreamMeta::default();
}
/// Check if a video has rotation metadata
/// Returns the rotation angle in degrees (0, 90, 180, 270) or 0 if none detected
/// Checks both legacy stream tags and modern display matrix side data
async fn get_video_rotation(video_path: &str) -> i32 {
// Check legacy rotate stream tag (older videos)
let Ok(json) = serde_json::from_slice::<serde_json::Value>(&output.stdout) else {
warn!("ffprobe returned non-JSON for {}", video_path);
return VideoStreamMeta::default();
};
let stream = &json["streams"][0];
let is_h264 = stream
.get("codec_name")
.and_then(|v| v.as_str())
.map(|s| s == "h264")
.unwrap_or(false);
// Prefer legacy `tags.rotate` (older containers); fall back to the
// display-matrix side data (iPhone and other modern recorders).
let rotation = stream
.get("tags")
.and_then(|t| t.get("rotate"))
.and_then(|r| r.as_str())
.and_then(|s| s.parse::<i32>().ok())
.filter(|r| *r != 0)
.or_else(|| {
stream
.get("side_data_list")
.and_then(|l| l.as_array())
.and_then(|arr| {
arr.iter()
.find_map(|sd| sd.get("rotation").and_then(|r| r.as_f64()))
})
.map(|f| f.abs() as i32)
.filter(|r| *r != 0)
})
.unwrap_or(0);
debug!(
"Probed {}: codec_h264={}, rotation={}°",
video_path, is_h264, rotation
);
VideoStreamMeta { is_h264, rotation }
}
/// Probe the max keyframe interval (GOP) in the first ~30s of a video.
/// Returns `None` on probe failure or if we couldn't see at least two keyframes.
///
/// Used to decide between stream-copy and transcode: HLS needs segments to
/// start on keyframes, so if the source GOP exceeds `hls_time`, copying
/// produces oversized/glitchy segments and we need to re-encode.
async fn get_max_gop_seconds(video_path: &str) -> Option<f64> {
let output = tokio::process::Command::new("ffprobe")
.arg("-v")
.arg("error")
.arg("-select_streams")
.arg("v:0")
.arg("-skip_frame")
.arg("nokey")
.arg("-show_entries")
.arg("stream_tags=rotate")
.arg("frame=pts_time")
.arg("-of")
.arg("default=noprint_wrappers=1:nokey=1")
.arg("csv=p=0")
.arg("-read_intervals")
.arg("%+30")
.arg(video_path)
.output()
.await;
.await
.ok()?;
if let Ok(output) = output
&& output.status.success()
{
let rotation_str = String::from_utf8_lossy(&output.stdout);
let rotation_str = rotation_str.trim();
if !rotation_str.is_empty()
&& let Ok(rotation) = rotation_str.parse::<i32>()
&& rotation != 0
{
debug!(
"Detected rotation {}° from stream tag for {}",
rotation, video_path
if !output.status.success() {
warn!(
"ffprobe GOP check failed for {}: {}",
video_path,
String::from_utf8_lossy(&output.stderr).trim()
);
return rotation;
}
return None;
}
// Check display matrix side data (modern videos, e.g. iPhone)
let output = tokio::process::Command::new("ffprobe")
.arg("-v")
.arg("error")
.arg("-select_streams")
.arg("v:0")
.arg("-show_entries")
.arg("side_data=rotation")
.arg("-of")
.arg("default=noprint_wrappers=1:nokey=1")
.arg(video_path)
.output()
.await;
let times: Vec<f64> = String::from_utf8_lossy(&output.stdout)
.lines()
.filter_map(|l| l.trim().parse::<f64>().ok())
.collect();
if let Ok(output) = output
&& output.status.success()
{
let rotation_str = String::from_utf8_lossy(&output.stdout);
let rotation_str = rotation_str.trim();
if !rotation_str.is_empty()
&& let Ok(rotation) = rotation_str.parse::<f64>()
{
let rotation = rotation.abs() as i32;
if rotation != 0 {
if times.len() < 2 {
return None;
}
let max_gop = times
.windows(2)
.map(|w| w[1] - w[0])
.fold(0.0_f64, f64::max);
debug!(
"Detected rotation {}° from display matrix for {}",
rotation, video_path
"Max GOP in first {} keyframes of {}: {:.2}s",
times.len(),
video_path,
max_gop
);
return rotation;
}
}
}
0
Some(max_gop)
}
pub struct VideoPlaylistManager {
@@ -404,8 +432,17 @@ pub struct PlaylistGenerator {
impl PlaylistGenerator {
pub(crate) fn new() -> Self {
// Concurrency is tunable via HLS_CONCURRENCY so operators can dial
// it to their hardware: 1 on weak Synology boxes to avoid thermal
// throttling, higher on desktops with spare cores.
let concurrency = std::env::var("HLS_CONCURRENCY")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|&n| n > 0)
.unwrap_or(2);
info!("PlaylistGenerator: concurrency={}", concurrency);
PlaylistGenerator {
semaphore: Arc::new(Semaphore::new(2)),
semaphore: Arc::new(Semaphore::new(concurrency)),
}
}
}
@@ -465,14 +502,42 @@ impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
}
// Check if video is already h264 encoded
let is_h264 = is_h264_encoded(&video_file).await;
// Check for rotation metadata
let rotation = get_video_rotation(&video_file).await;
// One ffprobe call for codec + rotation metadata.
let stream_meta = probe_video_stream_meta(&video_file).await;
let is_h264 = stream_meta.is_h264;
let rotation = stream_meta.rotation;
let has_rotation = rotation != 0;
let use_copy = is_h264 && !has_rotation;
// Stream-copy is only safe when the source GOP fits inside a
// single HLS segment. Otherwise ffmpeg has to extend segments
// past hls_time to land on a keyframe, producing uneven
// segments and seeking glitches.
const HLS_SEGMENT_SECONDS: f64 = 3.0;
let gop_ok = if is_h264 && !has_rotation {
match get_max_gop_seconds(&video_file).await {
Some(g) if g > HLS_SEGMENT_SECONDS => {
info!(
"Video {} has long GOP ({:.1}s > {}s), transcoding for segment alignment",
video_file, g, HLS_SEGMENT_SECONDS
);
false
}
Some(_) => true,
None => {
// Probe failed — be conservative and transcode rather
// than risk broken segments from a mystery source.
debug!(
"GOP probe failed for {}, transcoding to be safe",
video_file
);
false
}
}
} else {
false
};
let use_copy = is_h264 && !has_rotation && gop_ok;
if has_rotation {
info!(
@@ -486,56 +551,165 @@ impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
} else if use_copy {
info!("Video {} is already h264, using stream copy", video_file);
span.add_event("Using stream copy (h264 detected)", vec![]);
} else if is_h264 {
info!(
"Video {} is h264 but needs transcoding for GOP alignment",
video_file
);
span.add_event("Transcoding for GOP alignment", vec![]);
} else {
info!("Video {} needs transcoding to h264", video_file);
span.add_event("Transcoding to h264", vec![]);
}
tokio::spawn(async move {
// Encode to a .tmp playlist and explicit segment names so a failed
// encode leaves predictable artifacts we can clean up — and so a
// concurrent scan doesn't see a half-written .m3u8 as "done".
let playlist_tmp = format!("{}.tmp", playlist_file);
let video_stem = msg
.video_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("video");
let segment_pattern = format!("{}/{}_%03d.ts", playlist_path, video_stem);
let mut cmd = tokio::process::Command::new("ffmpeg");
cmd.arg("-i").arg(&video_file);
cmd.arg("-y").arg("-i").arg(&video_file);
if use_copy {
// Video is already h264, just copy the stream
// Note: rotation metadata will be preserved in the stream
cmd.arg("-c:v").arg("copy");
cmd.arg("-c:a").arg("aac"); // Still need to ensure audio is compatible
cmd.arg("-c:a").arg("aac");
} else {
let nvenc = crate::video::ffmpeg::is_nvenc_available().await;
if nvenc {
// NVENC: no CRF, use VBR + target CQ. p1 = fastest
// preset — prioritizes encoder throughput over bitrate
// efficiency. CQ 23 roughly matches libx264 crf 21
// visually; NVENC has slightly lower compression
// efficiency per quality.
cmd.arg("-c:v").arg("h264_nvenc");
cmd.arg("-preset").arg("p1");
cmd.arg("-rc").arg("vbr");
cmd.arg("-cq").arg("23");
cmd.arg("-pix_fmt").arg("yuv420p");
} else {
// Need to transcode - autorotate is enabled by default and will apply rotation
cmd.arg("-c:v").arg("h264");
cmd.arg("-crf").arg("21");
cmd.arg("-preset").arg("veryfast");
cmd.arg("-vf").arg("scale=1080:-2,setsar=1:1");
}
cmd.arg("-vf").arg("scale='min(1080,iw)':-2,setsar=1:1");
cmd.arg("-c:a").arg("aac");
// Force an IDR frame every hls_time seconds so each HLS
// segment starts on a keyframe — accurate seeking without
// players having to decode from a prior segment.
cmd.arg("-force_key_frames").arg("expr:gte(t,n_forced*3)");
}
// Common HLS settings
// -f hls is required because the playlist is written to a .tmp
// path during encoding — ffmpeg normally infers the muxer from
// the output extension and doesn't recognize ".m3u8.tmp".
cmd.arg("-f").arg("hls");
cmd.arg("-hls_time").arg("3");
cmd.arg("-hls_list_size").arg("100");
cmd.arg(&playlist_file);
cmd.arg("-hls_list_size").arg("0");
cmd.arg("-hls_playlist_type").arg("vod");
// independent_segments advertises that each segment can be
// decoded without reference to any other — the matching guarantee
// for the forced keyframes above.
cmd.arg("-hls_flags").arg("independent_segments");
cmd.arg("-hls_segment_filename").arg(&segment_pattern);
cmd.arg(&playlist_tmp);
cmd.stdout(Stdio::null());
cmd.stderr(Stdio::piped());
cmd.kill_on_drop(true);
let ffmpeg_result = cmd
.output()
.inspect_err(|e| error!("Failed to run ffmpeg on child process: {}", e))
.map_err(|e| std::io::Error::other(e.to_string()))
.await;
// Spawn + wait under a timeout so a hung ffmpeg (corrupt source,
// NFS stall, etc.) doesn't permanently hold a semaphore slot.
// Default is generous — a long 4K transcode on CPU can take hours.
let timeout_secs = std::env::var("HLS_TIMEOUT_SECONDS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(7200);
let ffmpeg_result = match cmd.spawn() {
Ok(child) => {
match tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
child.wait_with_output(),
)
.await
{
Ok(res) => res
.inspect_err(|e| {
error!("Failed to wait on ffmpeg child process: {}", e)
})
.map_err(|e| std::io::Error::other(e.to_string())),
Err(_) => Err(std::io::Error::other(format!(
"ffmpeg exceeded {}s timeout",
timeout_secs
))),
}
}
Err(e) => {
error!("Failed to spawn ffmpeg: {}", e);
Err(std::io::Error::other(e.to_string()))
}
};
// Hang on to the permit until we're done decoding and then explicitly drop
drop(permit);
if let Ok(ref res) = ffmpeg_result {
debug!("ffmpeg output: {:?}", res);
let success = matches!(&ffmpeg_result, Ok(out) if out.status.success());
if success {
if let Err(e) = tokio::fs::rename(&playlist_tmp, &playlist_file).await {
error!(
"ffmpeg succeeded but rename {} -> {} failed: {}",
playlist_tmp, playlist_file, e
);
cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await;
span.set_status(Status::error(format!("rename failed: {}", e)));
return Err(e);
}
debug!("Playlist complete: {}", playlist_file);
span.set_status(Status::Ok);
Ok(())
} else {
let detail = match &ffmpeg_result {
Ok(out) => format!(
"exit {}: {}",
out.status,
String::from_utf8_lossy(&out.stderr).trim()
),
Err(e) => format!("ffmpeg failed: {}", e),
};
error!("ffmpeg failed for {}: {}", video_file, detail);
cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await;
span.set_status(Status::error(detail.clone()));
Err(std::io::Error::other(detail))
}
})
}
}
span.set_status(Status::Ok);
/// Delete the temp playlist and any segment files that ffmpeg may have written
/// before failing. Called both on ffmpeg error and on rename failure so a
/// retry on the next scan starts from a clean slate.
async fn cleanup_partial_hls(playlist_tmp: &str, playlist_dir: &str, video_stem: &str) {
let _ = tokio::fs::remove_file(playlist_tmp).await;
ffmpeg_result
});
Ok(())
})
let segment_prefix = format!("{}_", video_stem);
let Ok(mut entries) = tokio::fs::read_dir(playlist_dir).await else {
return;
};
while let Ok(Some(entry)) = entries.next_entry().await {
let Some(name) = entry.file_name().to_str().map(str::to_owned) else {
continue;
};
if name.starts_with(&segment_prefix)
&& name.ends_with(".ts")
&& let Err(e) = tokio::fs::remove_file(entry.path()).await
{
warn!("Failed to remove partial segment {}: {}", name, e);
}
}
}

View File

@@ -22,16 +22,16 @@ async fn check_nvenc_available() -> bool {
}
/// Returns whether NVENC is available, caching the result after first check.
async fn is_nvenc_available() -> bool {
pub async fn is_nvenc_available() -> bool {
if let Some(&available) = NVENC_AVAILABLE.get() {
return available;
}
let available = check_nvenc_available().await;
let _ = NVENC_AVAILABLE.set(available);
if available {
info!("CUDA NVENC hardware acceleration detected and enabled for preview clips");
info!("CUDA NVENC hardware acceleration detected and enabled");
} else {
info!("NVENC not available, using CPU encoding for preview clips");
info!("NVENC not available, using CPU encoding");
}
available
}