Scan and generate Video HLS playlists on startup

Refactored and improved video path state. Bumped versions of some dependencies.
This commit is contained in:
Cameron
2024-12-05 20:19:03 -05:00
parent 2b2a811cae
commit 0419aa2323
6 changed files with 593 additions and 62 deletions

View File

@@ -1,10 +1,13 @@
use std::io::Result;
use std::path::Path;
use std::process::{Child, Command, ExitStatus, Stdio};
use crate::is_video;
use actix::prelude::*;
use log::{debug, trace};
use futures::TryFutureExt;
use log::{debug, info, trace, warn};
use std::io::Result;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, ExitStatus, Stdio};
use std::sync::Arc;
use tokio::sync::Semaphore;
use walkdir::{DirEntry, WalkDir};
// ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8
// ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8
@@ -93,3 +96,215 @@ pub fn generate_video_thumbnail(path: &Path, destination: &Path) {
.output()
.expect("Failure to create video frame");
}
pub struct VideoPlaylistManager {
playlist_dir: PathBuf,
playlist_generator: Addr<PlaylistGenerator>,
}
impl VideoPlaylistManager {
pub fn new<P: Into<PathBuf>>(
playlist_dir: P,
playlist_generator: Addr<PlaylistGenerator>,
) -> Self {
Self {
playlist_dir: playlist_dir.into(),
playlist_generator,
}
}
}
impl Actor for VideoPlaylistManager {
type Context = Context<Self>;
}
impl Handler<ScanDirectoryMessage> for VideoPlaylistManager {
type Result = ResponseFuture<()>;
fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result {
let start = std::time::Instant::now();
info!(
"Starting scan directory for video playlist generation: {}",
msg.directory
);
let video_files = WalkDir::new(&msg.directory)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.filter(|e| is_video(e))
.collect::<Vec<DirEntry>>();
let scan_dir_name = msg.directory.clone();
let playlist_output_dir = self.playlist_dir.clone();
let playlist_generator = self.playlist_generator.clone();
Box::pin(async move {
for e in video_files {
let path = e.path();
let path_as_str = path.to_str().unwrap();
debug!(
"Sending generate playlist message for path: {}",
path_as_str
);
match playlist_generator
.send(GeneratePlaylistMessage {
playlist_path: playlist_output_dir.to_str().unwrap().to_string(),
video_path: PathBuf::from(path),
})
.await
.expect("Failed to send generate playlist message")
{
Ok(_) => {}
Err(e) => {
warn!("Failed to generate playlist for path '{:?}'. {:?}", path, e);
}
}
// .expect("Failed to generate video playlist");
}
info!(
"Finished directory scan of '{}' in {:?}",
scan_dir_name,
start.elapsed()
);
})
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct ScanDirectoryMessage {
pub(crate) directory: String,
}
#[derive(Message)]
#[rtype(result = "Result<String>")]
struct GeneratePlaylistMessage {
video_path: PathBuf,
playlist_path: String,
}
pub struct PlaylistGenerator {
semaphore: Arc<Semaphore>,
}
impl PlaylistGenerator {
pub(crate) fn new() -> Self {
PlaylistGenerator {
semaphore: Arc::new(Semaphore::new(2)),
}
}
}
impl Actor for PlaylistGenerator {
type Context = Context<Self>;
}
impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
type Result = ResponseFuture<Result<String>>;
fn handle(&mut self, msg: GeneratePlaylistMessage, _ctx: &mut Self::Context) -> Self::Result {
let video_file = msg.video_path.to_str().unwrap().to_owned();
let playlist_path = msg.playlist_path.as_str().to_owned();
let semaphore = self.semaphore.clone();
let playlist_file = format!(
"{}/{}.m3u8",
playlist_path,
msg.video_path.file_name().unwrap().to_str().unwrap()
);
Box::pin(async move {
let wait_start = std::time::Instant::now();
let permit = semaphore
.acquire_owned()
.await
.expect("Unable to acquire semaphore");
debug!(
"Waited for {:?} before starting ffmpeg",
wait_start.elapsed()
);
if Path::new(&playlist_file).exists() {
debug!("Playlist already exists: {}", playlist_file);
return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
}
tokio::spawn(async move {
let ffmpeg_result = tokio::process::Command::new("ffmpeg")
.arg("-i")
.arg(&video_file)
.arg("-c:v")
.arg("h264")
.arg("-crf")
.arg("21")
.arg("-preset")
.arg("veryfast")
.arg("-hls_time")
.arg("3")
.arg("-hls_list_size")
.arg("100")
.arg("-vf")
.arg("scale=1080:-2,setsar=1:1")
.arg(playlist_file)
.stdout(Stdio::null())
.stderr(Stdio::piped())
.status()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
.await;
// Hang on to the permit until we're done decoding and then explicitly drop
drop(permit);
ffmpeg_result
});
Ok("meeee".to_string())
// .spawn()
// .expect("Failed to spawn ffmpeg process");
// .expect("Failed to spawn child process")
// .wait()
// .await
// .inspect_err(|e| error!("Failed to wait on child process: {}", e));
/* .map(|exit_status| {
debug!(
"Finished waiting for playlist generate process for file '{}' with code: {}",
video_file,
exit_status
);
exit_status.to_string()
})
*/
/* if let Some(stderr) = ffmpeg.stderr {
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
println!("ffmpeg line: {:?}", line);
}
});
}*/
/* ffmpeg.wait().await.map(|exit_status| {
debug!(
"Finished waiting for playlist generate process for file '{}' with code: {}",
video_file, exit_status
);
exit_status.to_string()
})
*/
// ffmpeg
// .wait_with_output()
// .await
// .expect("TODO: panic message")
//
// Ok(video_file)
})
}
}