Files
ImageApi/src/video/actors.rs
Cameron 08f402d4d1 Add h264 codec detection and orphaned playlist cleanup job
Implement `is_h264_encoded` to detect existing h264 videos and optimize processing by using stream copy when possible. Introduce a background job for cleaning up orphaned playlists and segments based on missing source videos. Improve checks for playlist generation necessity.
2026-01-19 22:37:50 -05:00

405 lines
12 KiB
Rust

use crate::is_video;
use crate::otel::global_tracer;
use actix::prelude::*;
use futures::TryFutureExt;
use log::{debug, error, info, trace, warn};
use opentelemetry::KeyValue;
use opentelemetry::trace::{Span, Status, Tracer};
use std::io::Result;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, ExitStatus, Stdio};
use std::sync::Arc;
use tokio::sync::Semaphore;
use walkdir::{DirEntry, WalkDir};
// ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8
// ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8
pub struct StreamActor;
impl Actor for StreamActor {
type Context = Context<Self>;
}
pub struct ProcessMessage(pub String, pub Child);
impl Message for ProcessMessage {
type Result = Result<ExitStatus>;
}
impl Handler<ProcessMessage> for StreamActor {
type Result = Result<ExitStatus>;
fn handle(&mut self, msg: ProcessMessage, _ctx: &mut Self::Context) -> Self::Result {
trace!("Message received");
let mut process = msg.1;
let result = process.wait();
debug!(
"Finished waiting for: {:?}. Code: {:?}",
msg.0,
result
.as_ref()
.map_or(-1, |status| status.code().unwrap_or(-1))
);
result
}
}
pub async fn create_playlist(video_path: &str, playlist_file: &str) -> Result<Child> {
if Path::new(playlist_file).exists() {
debug!("Playlist already exists: {}", playlist_file);
return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
}
let result = Command::new("ffmpeg")
.arg("-i")
.arg(video_path)
.arg("-c:v")
.arg("h264")
.arg("-crf")
.arg("21")
.arg("-preset")
.arg("veryfast")
.arg("-hls_time")
.arg("3")
.arg("-hls_list_size")
.arg("100")
.arg("-vf")
.arg("scale=1080:-2,setsar=1:1")
.arg(playlist_file)
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn();
let start_time = std::time::Instant::now();
loop {
actix::clock::sleep(std::time::Duration::from_secs(1)).await;
if Path::new(playlist_file).exists()
|| std::time::Instant::now() - start_time > std::time::Duration::from_secs(5)
{
break;
}
}
result
}
pub fn generate_video_thumbnail(path: &Path, destination: &Path) {
Command::new("ffmpeg")
.arg("-ss")
.arg("3")
.arg("-i")
.arg(path.to_str().unwrap())
.arg("-vframes")
.arg("1")
.arg("-f")
.arg("image2")
.arg(destination)
.output()
.expect("Failure to create video frame");
}
/// Check if a video is already encoded with h264 codec
/// Returns true if the video uses h264, false otherwise or if detection fails
async fn is_h264_encoded(video_path: &str) -> bool {
let output = tokio::process::Command::new("ffprobe")
.arg("-v")
.arg("error")
.arg("-select_streams")
.arg("v:0")
.arg("-show_entries")
.arg("stream=codec_name")
.arg("-of")
.arg("default=noprint_wrappers=1:nokey=1")
.arg(video_path)
.output()
.await;
match output {
Ok(output) if output.status.success() => {
let codec = String::from_utf8_lossy(&output.stdout);
let codec = codec.trim();
debug!("Detected codec for {}: {}", video_path, codec);
codec == "h264"
}
Ok(output) => {
warn!(
"ffprobe failed for {}: {}",
video_path,
String::from_utf8_lossy(&output.stderr)
);
false
}
Err(e) => {
warn!("Failed to run ffprobe for {}: {}", video_path, e);
false
}
}
}
pub struct VideoPlaylistManager {
playlist_dir: PathBuf,
playlist_generator: Addr<PlaylistGenerator>,
}
impl VideoPlaylistManager {
pub fn new<P: Into<PathBuf>>(
playlist_dir: P,
playlist_generator: Addr<PlaylistGenerator>,
) -> Self {
Self {
playlist_dir: playlist_dir.into(),
playlist_generator,
}
}
}
impl Actor for VideoPlaylistManager {
type Context = Context<Self>;
}
impl Handler<ScanDirectoryMessage> for VideoPlaylistManager {
type Result = ResponseFuture<()>;
fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result {
let tracer = global_tracer();
let mut span = tracer.start("videoplaylistmanager.scan_directory");
let start = std::time::Instant::now();
info!(
"Starting scan directory for video playlist generation: {}",
msg.directory
);
let video_files = WalkDir::new(&msg.directory)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.filter(is_video)
.collect::<Vec<DirEntry>>();
let scan_dir_name = msg.directory.clone();
let playlist_output_dir = self.playlist_dir.clone();
let playlist_generator = self.playlist_generator.clone();
Box::pin(async move {
for e in video_files {
let path = e.path();
let path_as_str = path.to_str().unwrap();
debug!(
"Sending generate playlist message for path: {}",
path_as_str
);
match playlist_generator
.send(GeneratePlaylistMessage {
playlist_path: playlist_output_dir.to_str().unwrap().to_string(),
video_path: PathBuf::from(path),
})
.await
.expect("Failed to send generate playlist message")
{
Ok(_) => {
span.add_event(
"Playlist generated",
vec![KeyValue::new("video_path", path_as_str.to_string())],
);
debug!(
"Successfully generated playlist for file: '{}'",
path_as_str
);
}
Err(e) => {
warn!("Failed to generate playlist for path '{:?}'. {:?}", path, e);
}
}
}
span.add_event(
"Finished directory scan",
vec![KeyValue::new("directory", scan_dir_name.to_string())],
);
info!(
"Finished directory scan of '{}' in {:?}",
scan_dir_name,
start.elapsed()
);
})
}
}
impl Handler<QueueVideosMessage> for VideoPlaylistManager {
type Result = ();
fn handle(&mut self, msg: QueueVideosMessage, _ctx: &mut Self::Context) -> Self::Result {
if msg.video_paths.is_empty() {
return;
}
info!(
"Queueing {} videos for HLS playlist generation",
msg.video_paths.len()
);
let playlist_output_dir = self.playlist_dir.clone();
let playlist_generator = self.playlist_generator.clone();
for video_path in msg.video_paths {
let path_str = video_path.to_string_lossy().to_string();
debug!("Queueing playlist generation for: {}", path_str);
playlist_generator.do_send(GeneratePlaylistMessage {
playlist_path: playlist_output_dir.to_str().unwrap().to_string(),
video_path,
});
}
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct ScanDirectoryMessage {
pub(crate) directory: String,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct QueueVideosMessage {
pub video_paths: Vec<PathBuf>,
}
#[derive(Message)]
#[rtype(result = "Result<()>")]
pub struct GeneratePlaylistMessage {
pub video_path: PathBuf,
pub playlist_path: String,
}
pub struct PlaylistGenerator {
semaphore: Arc<Semaphore>,
}
impl PlaylistGenerator {
pub(crate) fn new() -> Self {
PlaylistGenerator {
semaphore: Arc::new(Semaphore::new(2)),
}
}
}
impl Actor for PlaylistGenerator {
type Context = Context<Self>;
}
impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
type Result = ResponseFuture<Result<()>>;
fn handle(&mut self, msg: GeneratePlaylistMessage, _ctx: &mut Self::Context) -> Self::Result {
let video_file = msg.video_path.to_str().unwrap().to_owned();
let playlist_path = msg.playlist_path.as_str().to_owned();
let semaphore = self.semaphore.clone();
let playlist_file = format!(
"{}/{}.m3u8",
playlist_path,
msg.video_path.file_name().unwrap().to_str().unwrap()
);
let tracer = global_tracer();
let mut span = tracer
.span_builder("playlistgenerator.generate_playlist")
.with_attributes(vec![
KeyValue::new("video_file", video_file.clone()),
KeyValue::new("playlist_file", playlist_file.clone()),
])
.start(&tracer);
Box::pin(async move {
let wait_start = std::time::Instant::now();
let permit = semaphore
.acquire_owned()
.await
.expect("Unable to acquire semaphore");
debug!(
"Waited for {:?} before starting ffmpeg",
wait_start.elapsed()
);
span.add_event(
"Waited for FFMPEG semaphore",
vec![KeyValue::new(
"wait_time",
wait_start.elapsed().as_secs_f64(),
)],
);
if Path::new(&playlist_file).exists() {
debug!("Playlist already exists: {}", playlist_file);
span.set_status(Status::error(format!(
"Playlist already exists: {}",
playlist_file
)));
return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
}
// Check if video is already h264 encoded
let is_h264 = is_h264_encoded(&video_file).await;
let use_copy = is_h264;
if use_copy {
info!("Video {} is already h264, using stream copy", video_file);
span.add_event("Using stream copy (h264 detected)", vec![]);
} else {
info!("Video {} needs transcoding to h264", video_file);
span.add_event("Transcoding to h264", vec![]);
}
tokio::spawn(async move {
let mut cmd = tokio::process::Command::new("ffmpeg");
cmd.arg("-i").arg(&video_file);
if use_copy {
// Video is already h264, just copy the stream
cmd.arg("-c:v").arg("copy");
cmd.arg("-c:a").arg("aac"); // Still need to ensure audio is compatible
} else {
// Need to transcode
cmd.arg("-c:v").arg("h264");
cmd.arg("-crf").arg("21");
cmd.arg("-preset").arg("veryfast");
cmd.arg("-vf").arg("scale=1080:-2,setsar=1:1");
cmd.arg("-c:a").arg("aac");
}
// Common HLS settings
cmd.arg("-hls_time").arg("3");
cmd.arg("-hls_list_size").arg("100");
cmd.arg(&playlist_file);
cmd.stdout(Stdio::null());
cmd.stderr(Stdio::piped());
let ffmpeg_result = cmd
.output()
.inspect_err(|e| error!("Failed to run ffmpeg on child process: {}", e))
.map_err(|e| std::io::Error::other(e.to_string()))
.await;
// Hang on to the permit until we're done decoding and then explicitly drop
drop(permit);
if let Ok(ref res) = ffmpeg_result {
debug!("ffmpeg output: {:?}", res);
}
span.set_status(Status::Ok);
ffmpeg_result
});
Ok(())
})
}
}