Files
ImageApi/src/video/actors.rs
Cameron Cordes 5476ed8ac4 video: handle unknown/short durations in thumb + preview gen
`get_duration_seconds` now returns `Option<f64>` and falls back from
`format=duration` to `stream=duration`. Empty stdout no longer
parse-panics with "cannot parse float from empty string", which was
poisoning the preview-clip row with status=failed and re-queueing every
full scan (notably for GoPro LRV files). `generate_preview_clip` handles
the unknown-duration case by transcoding the whole file (capped at 10s).

`generate_video_thumbnail` seeks to ~50% of the probed duration instead
of a hardcoded `-ss 3`, with a first-frame fallback when the probe
returns nothing. Fixes the loop where short Snapchat clips (<3s) got
"missing thumbnail" logged on every scan because ffmpeg exited 0
without writing a frame, and never wrote the .unsupported sentinel
either.

Adds unit tests for `parse_ffprobe_duration` covering the empty-output,
N/A, multi-line, non-positive, and non-finite cases.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 23:08:16 -04:00

911 lines
32 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use crate::database::PreviewDao;
use crate::is_video;
use crate::libraries::Library;
use crate::otel::global_tracer;
use crate::video::ffmpeg::{generate_preview_clip, get_duration_seconds_blocking};
use actix::prelude::*;
use log::{debug, error, info, trace, warn};
use opentelemetry::KeyValue;
use opentelemetry::trace::{Span, Status, Tracer};
use std::io::Result;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, ExitStatus, Stdio};
use std::sync::{Arc, Mutex};
use tokio::sync::Semaphore;
use walkdir::{DirEntry, WalkDir};
// ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8
// ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8
pub struct StreamActor;
impl Actor for StreamActor {
type Context = Context<Self>;
}
pub struct ProcessMessage(pub String, pub Child);
impl Message for ProcessMessage {
type Result = Result<ExitStatus>;
}
impl Handler<ProcessMessage> for StreamActor {
type Result = Result<ExitStatus>;
fn handle(&mut self, msg: ProcessMessage, _ctx: &mut Self::Context) -> Self::Result {
trace!("Message received");
let mut process = msg.1;
let result = process.wait();
debug!(
"Finished waiting for: {:?}. Code: {:?}",
msg.0,
result
.as_ref()
.map_or(-1, |status| status.code().unwrap_or(-1))
);
result
}
}
pub fn playlist_file_for(playlist_dir: &str, video_path: &Path) -> PathBuf {
let filename = video_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown");
PathBuf::from(format!("{}/{}.m3u8", playlist_dir, filename))
}
/// Sentinel path written next to a would-be playlist when ffmpeg cannot
/// transcode the source (e.g. truncated mp4 with no moov atom). Its presence
/// causes future scans to skip the file instead of re-running ffmpeg every
/// pass. Delete the `.unsupported` file to force a retry.
pub fn playlist_unsupported_sentinel(playlist_file: &Path) -> PathBuf {
let mut s = playlist_file.as_os_str().to_owned();
s.push(".unsupported");
PathBuf::from(s)
}
pub async fn create_playlist(video_path: &str, playlist_file: &str) -> Result<Child> {
if Path::new(playlist_file).exists() {
debug!("Playlist already exists: {}", playlist_file);
return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
}
let result = Command::new("ffmpeg")
.arg("-i")
.arg(video_path)
.arg("-c:v")
.arg("h264")
.arg("-crf")
.arg("21")
.arg("-preset")
.arg("veryfast")
.arg("-hls_time")
.arg("3")
.arg("-hls_list_size")
.arg("0")
.arg("-hls_playlist_type")
.arg("vod")
.arg("-vf")
.arg("scale='min(1080,iw)':-2,setsar=1:1")
.arg(playlist_file)
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn();
let start_time = std::time::Instant::now();
loop {
actix::clock::sleep(std::time::Duration::from_secs(1)).await;
if Path::new(playlist_file).exists()
|| std::time::Instant::now() - start_time > std::time::Duration::from_secs(5)
{
break;
}
}
result
}
pub fn generate_video_thumbnail(path: &Path, destination: &Path) -> std::io::Result<()> {
// Probe duration up front and seek to ~50% — gives a more
// representative frame than a fixed offset (skipping title cards on
// long videos, landing inside the clip on 12s Snapchat MP4s) and
// sidesteps the seek-past-EOF class of bug entirely. When duration
// probing fails (LRV files, fragmented MP4s, ffprobe missing) fall
// back to the first frame: ugly but reliable.
//
// -vf scale + -c:v mjpeg mirrors `generate_image_thumbnail_ffmpeg`. The
// filter chain matters as much as the scale does: without it, ffmpeg
// hands the decoded frame straight to the mjpeg encoder, which rejects
// any non-yuvj420p source ("Non full-range YUV is non-standard"). The
// filter chain lets ffmpeg auto-insert the pix_fmt converter the
// encoder needs, which is how the image-thumbnail path already handles
// the same class of source.
let seek = get_duration_seconds_blocking(path).map(|d| format!("{:.3}", d / 2.0));
let mut cmd = Command::new("ffmpeg");
cmd.arg("-y");
if let Some(s) = &seek {
cmd.arg("-ss").arg(s);
}
let output = cmd
.arg("-i")
.arg(path)
.arg("-vframes")
.arg("1")
.arg("-vf")
.arg("scale=200:-1")
.arg("-f")
.arg("image2")
.arg("-c:v")
.arg("mjpeg")
.arg(destination)
.output()?;
if !output.status.success() {
return Err(std::io::Error::other(format!(
"ffmpeg failed ({}): {}",
output.status,
String::from_utf8_lossy(&output.stderr).trim()
)));
}
// ffmpeg can exit 0 without writing a frame for malformed files where
// the probe duration lies. Confirm a non-empty file actually landed —
// returning Err makes the caller write the `.unsupported` sentinel so
// we stop re-detecting on every scan.
let wrote = std::fs::metadata(destination)
.map(|m| m.len() > 0)
.unwrap_or(false);
if !wrote {
return Err(std::io::Error::other(
"ffmpeg exited successfully but produced no thumbnail output",
));
}
Ok(())
}
/// Use ffmpeg to extract a 200px-wide thumbnail from formats the `image` crate
/// can't decode (RAW: NEF/ARW, HEIC/HEIF). Writes JPEG bytes to `destination`
/// regardless of its extension.
pub fn generate_image_thumbnail_ffmpeg(path: &Path, destination: &Path) -> std::io::Result<()> {
let output = Command::new("ffmpeg")
.arg("-y")
.arg("-i")
.arg(path)
.arg("-vframes")
.arg("1")
.arg("-vf")
.arg("scale=200:-1")
.arg("-f")
.arg("image2")
.arg("-c:v")
.arg("mjpeg")
.arg(destination)
.output()?;
if !output.status.success() {
return Err(std::io::Error::other(format!(
"ffmpeg failed ({}): {}",
output.status,
String::from_utf8_lossy(&output.stderr).trim()
)));
}
Ok(())
}
/// Video stream metadata needed to pick HLS encode settings. Populated by
/// a single ffprobe call to avoid spawning multiple subprocesses per video.
#[derive(Debug, Default)]
struct VideoStreamMeta {
is_h264: bool,
/// Rotation in degrees (0/90/180/270). Checks both the legacy `rotate`
/// stream tag and the modern display-matrix side data.
rotation: i32,
}
/// Probe video stream metadata in one ffprobe call. Returns default (codec
/// unknown, rotation 0) on any failure — callers fall back to transcoding.
async fn probe_video_stream_meta(video_path: &str) -> VideoStreamMeta {
let output = tokio::process::Command::new("ffprobe")
.arg("-v")
.arg("error")
.arg("-select_streams")
.arg("v:0")
.arg("-print_format")
.arg("json")
.arg("-show_entries")
.arg("stream=codec_name:stream_tags=rotate:side_data_list")
.arg(video_path)
.output()
.await;
let Ok(output) = output else {
warn!("Failed to run ffprobe for {}", video_path);
return VideoStreamMeta::default();
};
if !output.status.success() {
warn!(
"ffprobe failed for {}: {}",
video_path,
String::from_utf8_lossy(&output.stderr).trim()
);
return VideoStreamMeta::default();
}
let Ok(json) = serde_json::from_slice::<serde_json::Value>(&output.stdout) else {
warn!("ffprobe returned non-JSON for {}", video_path);
return VideoStreamMeta::default();
};
let stream = &json["streams"][0];
let is_h264 = stream
.get("codec_name")
.and_then(|v| v.as_str())
.map(|s| s == "h264")
.unwrap_or(false);
// Prefer legacy `tags.rotate` (older containers); fall back to the
// display-matrix side data (iPhone and other modern recorders).
let rotation = stream
.get("tags")
.and_then(|t| t.get("rotate"))
.and_then(|r| r.as_str())
.and_then(|s| s.parse::<i32>().ok())
.filter(|r| *r != 0)
.or_else(|| {
stream
.get("side_data_list")
.and_then(|l| l.as_array())
.and_then(|arr| {
arr.iter()
.find_map(|sd| sd.get("rotation").and_then(|r| r.as_f64()))
})
.map(|f| f.abs() as i32)
.filter(|r| *r != 0)
})
.unwrap_or(0);
debug!(
"Probed {}: codec_h264={}, rotation={}°",
video_path, is_h264, rotation
);
VideoStreamMeta { is_h264, rotation }
}
/// Probe the max keyframe interval (GOP) in the first ~30s of a video.
/// Returns `None` on probe failure or if we couldn't see at least two keyframes.
///
/// Used to decide between stream-copy and transcode: HLS needs segments to
/// start on keyframes, so if the source GOP exceeds `hls_time`, copying
/// produces oversized/glitchy segments and we need to re-encode.
async fn get_max_gop_seconds(video_path: &str) -> Option<f64> {
let output = tokio::process::Command::new("ffprobe")
.arg("-v")
.arg("error")
.arg("-select_streams")
.arg("v:0")
.arg("-skip_frame")
.arg("nokey")
.arg("-show_entries")
.arg("frame=pts_time")
.arg("-of")
.arg("csv=p=0")
.arg("-read_intervals")
.arg("%+30")
.arg(video_path)
.output()
.await
.ok()?;
if !output.status.success() {
warn!(
"ffprobe GOP check failed for {}: {}",
video_path,
String::from_utf8_lossy(&output.stderr).trim()
);
return None;
}
let times: Vec<f64> = String::from_utf8_lossy(&output.stdout)
.lines()
.filter_map(|l| l.trim().parse::<f64>().ok())
.collect();
if times.len() < 2 {
return None;
}
let max_gop = times
.windows(2)
.map(|w| w[1] - w[0])
.fold(0.0_f64, f64::max);
debug!(
"Max GOP in first {} keyframes of {}: {:.2}s",
times.len(),
video_path,
max_gop
);
Some(max_gop)
}
pub struct VideoPlaylistManager {
playlist_dir: PathBuf,
playlist_generator: Addr<PlaylistGenerator>,
}
impl VideoPlaylistManager {
pub fn new<P: Into<PathBuf>>(
playlist_dir: P,
playlist_generator: Addr<PlaylistGenerator>,
) -> Self {
Self {
playlist_dir: playlist_dir.into(),
playlist_generator,
}
}
}
impl Actor for VideoPlaylistManager {
type Context = Context<Self>;
}
impl Handler<ScanDirectoryMessage> for VideoPlaylistManager {
type Result = ResponseFuture<()>;
fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result {
let tracer = global_tracer();
let mut span = tracer.start("videoplaylistmanager.scan_directory");
let start = std::time::Instant::now();
info!(
"Starting scan directory for video playlist generation: {}",
msg.directory
);
let playlist_output_dir = self.playlist_dir.clone();
let playlist_dir_str = playlist_output_dir.to_str().unwrap().to_string();
let video_files = WalkDir::new(&msg.directory)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.filter(is_video)
.filter(|e| {
let playlist = playlist_file_for(&playlist_dir_str, e.path());
!playlist.exists() && !playlist_unsupported_sentinel(&playlist).exists()
})
.collect::<Vec<DirEntry>>();
let scan_dir_name = msg.directory.clone();
let playlist_generator = self.playlist_generator.clone();
Box::pin(async move {
for e in video_files {
let path = e.path();
let path_as_str = path.to_str().unwrap();
debug!(
"Sending generate playlist message for path: {}",
path_as_str
);
match playlist_generator
.send(GeneratePlaylistMessage {
playlist_path: playlist_output_dir.to_str().unwrap().to_string(),
video_path: PathBuf::from(path),
})
.await
.expect("Failed to send generate playlist message")
{
Ok(_) => {
span.add_event(
"Playlist generated",
vec![KeyValue::new("video_path", path_as_str.to_string())],
);
debug!(
"Successfully generated playlist for file: '{}'",
path_as_str
);
}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
debug!("Playlist already exists for '{:?}', skipping", path);
}
Err(e) => {
warn!("Failed to generate playlist for path '{:?}'. {:?}", path, e);
}
}
}
span.add_event(
"Finished directory scan",
vec![KeyValue::new("directory", scan_dir_name.to_string())],
);
info!(
"Finished directory scan of '{}' in {:?}",
scan_dir_name,
start.elapsed()
);
})
}
}
impl Handler<QueueVideosMessage> for VideoPlaylistManager {
type Result = ();
fn handle(&mut self, msg: QueueVideosMessage, _ctx: &mut Self::Context) -> Self::Result {
if msg.video_paths.is_empty() {
return;
}
info!(
"Queueing {} videos for HLS playlist generation",
msg.video_paths.len()
);
let playlist_output_dir = self.playlist_dir.clone();
let playlist_dir_str = playlist_output_dir.to_str().unwrap().to_string();
let playlist_generator = self.playlist_generator.clone();
for video_path in msg.video_paths {
let playlist = playlist_file_for(&playlist_dir_str, &video_path);
if playlist.exists() || playlist_unsupported_sentinel(&playlist).exists() {
continue;
}
let path_str = video_path.to_string_lossy().to_string();
debug!("Queueing playlist generation for: {}", path_str);
playlist_generator.do_send(GeneratePlaylistMessage {
playlist_path: playlist_dir_str.clone(),
video_path,
});
}
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct ScanDirectoryMessage {
pub(crate) directory: String,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct QueueVideosMessage {
pub video_paths: Vec<PathBuf>,
}
#[derive(Message)]
#[rtype(result = "Result<()>")]
pub struct GeneratePlaylistMessage {
pub video_path: PathBuf,
pub playlist_path: String,
}
pub struct PlaylistGenerator {
semaphore: Arc<Semaphore>,
}
impl PlaylistGenerator {
pub(crate) fn new() -> Self {
// Concurrency is tunable via HLS_CONCURRENCY so operators can dial
// it to their hardware: 1 on weak Synology boxes to avoid thermal
// throttling, higher on desktops with spare cores.
let concurrency = std::env::var("HLS_CONCURRENCY")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|&n| n > 0)
.unwrap_or(2);
info!("PlaylistGenerator: concurrency={}", concurrency);
PlaylistGenerator {
semaphore: Arc::new(Semaphore::new(concurrency)),
}
}
}
impl Actor for PlaylistGenerator {
type Context = Context<Self>;
}
impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
type Result = ResponseFuture<Result<()>>;
fn handle(&mut self, msg: GeneratePlaylistMessage, _ctx: &mut Self::Context) -> Self::Result {
let video_file = msg.video_path.to_str().unwrap().to_owned();
let playlist_path = msg.playlist_path.as_str().to_owned();
let semaphore = self.semaphore.clone();
let playlist_file = format!(
"{}/{}.m3u8",
playlist_path,
msg.video_path.file_name().unwrap().to_str().unwrap()
);
let tracer = global_tracer();
let mut span = tracer
.span_builder("playlistgenerator.generate_playlist")
.with_attributes(vec![
KeyValue::new("video_file", video_file.clone()),
KeyValue::new("playlist_file", playlist_file.clone()),
])
.start(&tracer);
Box::pin(async move {
let wait_start = std::time::Instant::now();
let permit = semaphore
.acquire_owned()
.await
.expect("Unable to acquire semaphore");
debug!(
"Waited for {:?} before starting ffmpeg",
wait_start.elapsed()
);
span.add_event(
"Waited for FFMPEG semaphore",
vec![KeyValue::new(
"wait_time",
wait_start.elapsed().as_secs_f64(),
)],
);
if Path::new(&playlist_file).exists() {
debug!("Playlist already exists: {}", playlist_file);
span.set_status(Status::error(format!(
"Playlist already exists: {}",
playlist_file
)));
return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
}
// One ffprobe call for codec + rotation metadata.
let stream_meta = probe_video_stream_meta(&video_file).await;
let is_h264 = stream_meta.is_h264;
let rotation = stream_meta.rotation;
let has_rotation = rotation != 0;
// Stream-copy is only safe when the source GOP fits inside a
// single HLS segment. Otherwise ffmpeg has to extend segments
// past hls_time to land on a keyframe, producing uneven
// segments and seeking glitches.
const HLS_SEGMENT_SECONDS: f64 = 3.0;
let gop_ok = if is_h264 && !has_rotation {
match get_max_gop_seconds(&video_file).await {
Some(g) if g > HLS_SEGMENT_SECONDS => {
info!(
"Video {} has long GOP ({:.1}s > {}s), transcoding for segment alignment",
video_file, g, HLS_SEGMENT_SECONDS
);
false
}
Some(_) => true,
None => {
// Probe failed — be conservative and transcode rather
// than risk broken segments from a mystery source.
debug!(
"GOP probe failed for {}, transcoding to be safe",
video_file
);
false
}
}
} else {
false
};
let use_copy = is_h264 && !has_rotation && gop_ok;
if has_rotation {
info!(
"Video {} has rotation metadata ({}°), transcoding to apply rotation",
video_file, rotation
);
span.add_event(
"Transcoding due to rotation",
vec![KeyValue::new("rotation_degrees", rotation as i64)],
);
} else if use_copy {
info!("Video {} is already h264, using stream copy", video_file);
span.add_event("Using stream copy (h264 detected)", vec![]);
} else if is_h264 {
info!(
"Video {} is h264 but needs transcoding for GOP alignment",
video_file
);
span.add_event("Transcoding for GOP alignment", vec![]);
} else {
info!("Video {} needs transcoding to h264", video_file);
span.add_event("Transcoding to h264", vec![]);
}
// Encode to a .tmp playlist and explicit segment names so a failed
// encode leaves predictable artifacts we can clean up — and so a
// concurrent scan doesn't see a half-written .m3u8 as "done".
let playlist_tmp = format!("{}.tmp", playlist_file);
let video_stem = msg
.video_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("video");
let segment_pattern = format!("{}/{}_%03d.ts", playlist_path, video_stem);
let mut cmd = tokio::process::Command::new("ffmpeg");
cmd.arg("-y").arg("-i").arg(&video_file);
if use_copy {
cmd.arg("-c:v").arg("copy");
cmd.arg("-c:a").arg("aac");
} else {
let nvenc = crate::video::ffmpeg::is_nvenc_available().await;
if nvenc {
// NVENC: no CRF, use VBR + target CQ. p1 = fastest
// preset — prioritizes encoder throughput over bitrate
// efficiency. CQ 23 roughly matches libx264 crf 21
// visually; NVENC has slightly lower compression
// efficiency per quality.
cmd.arg("-c:v").arg("h264_nvenc");
cmd.arg("-preset").arg("p1");
cmd.arg("-rc").arg("vbr");
cmd.arg("-cq").arg("23");
cmd.arg("-pix_fmt").arg("yuv420p");
} else {
cmd.arg("-c:v").arg("h264");
cmd.arg("-crf").arg("21");
cmd.arg("-preset").arg("veryfast");
}
cmd.arg("-vf").arg("scale='min(1080,iw)':-2,setsar=1:1");
cmd.arg("-c:a").arg("aac");
// Force an IDR frame every hls_time seconds so each HLS
// segment starts on a keyframe — accurate seeking without
// players having to decode from a prior segment.
cmd.arg("-force_key_frames").arg("expr:gte(t,n_forced*3)");
}
// -f hls is required because the playlist is written to a .tmp
// path during encoding — ffmpeg normally infers the muxer from
// the output extension and doesn't recognize ".m3u8.tmp".
cmd.arg("-f").arg("hls");
cmd.arg("-hls_time").arg("3");
cmd.arg("-hls_list_size").arg("0");
cmd.arg("-hls_playlist_type").arg("vod");
// independent_segments advertises that each segment can be
// decoded without reference to any other — the matching guarantee
// for the forced keyframes above.
cmd.arg("-hls_flags").arg("independent_segments");
cmd.arg("-hls_segment_filename").arg(&segment_pattern);
cmd.arg(&playlist_tmp);
cmd.stdout(Stdio::null());
cmd.stderr(Stdio::piped());
cmd.kill_on_drop(true);
// Spawn + wait under a timeout so a hung ffmpeg (corrupt source,
// NFS stall, etc.) doesn't permanently hold a semaphore slot.
// Default is generous — a long 4K transcode on CPU can take hours.
let timeout_secs = std::env::var("HLS_TIMEOUT_SECONDS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(7200);
let ffmpeg_result = match cmd.spawn() {
Ok(child) => {
match tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
child.wait_with_output(),
)
.await
{
Ok(res) => res
.inspect_err(|e| {
error!("Failed to wait on ffmpeg child process: {}", e)
})
.map_err(|e| std::io::Error::other(e.to_string())),
Err(_) => Err(std::io::Error::other(format!(
"ffmpeg exceeded {}s timeout",
timeout_secs
))),
}
}
Err(e) => {
error!("Failed to spawn ffmpeg: {}", e);
Err(std::io::Error::other(e.to_string()))
}
};
drop(permit);
let success = matches!(&ffmpeg_result, Ok(out) if out.status.success());
if success {
if let Err(e) = tokio::fs::rename(&playlist_tmp, &playlist_file).await {
error!(
"ffmpeg succeeded but rename {} -> {} failed: {}",
playlist_tmp, playlist_file, e
);
cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await;
span.set_status(Status::error(format!("rename failed: {}", e)));
return Err(e);
}
debug!("Playlist complete: {}", playlist_file);
span.set_status(Status::Ok);
Ok(())
} else {
let detail = match &ffmpeg_result {
Ok(out) => format!(
"exit {}: {}",
out.status,
String::from_utf8_lossy(&out.stderr).trim()
),
Err(e) => format!("ffmpeg failed: {}", e),
};
error!("ffmpeg failed for {}: {}", video_file, detail);
cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await;
let sentinel = playlist_unsupported_sentinel(Path::new(&playlist_file));
if let Err(se) = tokio::fs::write(&sentinel, b"").await {
warn!(
"Failed to write playlist sentinel {}: {}",
sentinel.display(),
se
);
} else {
info!(
"Wrote playlist sentinel {} so future scans skip {}",
sentinel.display(),
video_file
);
}
span.set_status(Status::error(detail.clone()));
Err(std::io::Error::other(detail))
}
})
}
}
/// Delete the temp playlist and any segment files that ffmpeg may have written
/// before failing. Called both on ffmpeg error and on rename failure so a
/// retry on the next scan starts from a clean slate.
async fn cleanup_partial_hls(playlist_tmp: &str, playlist_dir: &str, video_stem: &str) {
let _ = tokio::fs::remove_file(playlist_tmp).await;
let segment_prefix = format!("{}_", video_stem);
let Ok(mut entries) = tokio::fs::read_dir(playlist_dir).await else {
return;
};
while let Ok(Some(entry)) = entries.next_entry().await {
let Some(name) = entry.file_name().to_str().map(str::to_owned) else {
continue;
};
if name.starts_with(&segment_prefix)
&& name.ends_with(".ts")
&& let Err(e) = tokio::fs::remove_file(entry.path()).await
{
warn!("Failed to remove partial segment {}: {}", name, e);
}
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct GeneratePreviewClipMessage {
pub video_path: String,
}
pub struct PreviewClipGenerator {
semaphore: Arc<Semaphore>,
preview_clips_dir: String,
libraries: Vec<Library>,
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
}
impl PreviewClipGenerator {
pub fn new(
preview_clips_dir: String,
libraries: Vec<Library>,
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
) -> Self {
PreviewClipGenerator {
semaphore: Arc::new(Semaphore::new(2)),
preview_clips_dir,
libraries,
preview_dao,
}
}
/// Strip whichever library root actually contains `video_path`.
/// Falls back to the first library if none match, so we never
/// accidentally emit the absolute input path as the output path
/// (which ffmpeg rejects as "cannot edit existing files in place").
fn relativize(&self, video_path: &str) -> String {
for lib in &self.libraries {
if let Some(stripped) = video_path.strip_prefix(&lib.root_path) {
return stripped.trim_start_matches(['/', '\\']).replace('\\', "/");
}
}
video_path
.trim_start_matches(['/', '\\'])
.replace('\\', "/")
}
}
impl Actor for PreviewClipGenerator {
type Context = Context<Self>;
}
impl Handler<GeneratePreviewClipMessage> for PreviewClipGenerator {
type Result = ResponseFuture<()>;
fn handle(
&mut self,
msg: GeneratePreviewClipMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
let semaphore = self.semaphore.clone();
let preview_clips_dir = self.preview_clips_dir.clone();
let preview_dao = self.preview_dao.clone();
let video_path = msg.video_path;
// Resolve against whichever library actually owns this video.
let relative_path = self.relativize(&video_path);
Box::pin(async move {
let permit = semaphore
.acquire_owned()
.await
.expect("Unable to acquire preview semaphore");
// Update status to processing
{
let otel_ctx = opentelemetry::Context::current();
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ =
dao.update_status(&otel_ctx, &relative_path, "processing", None, None, None);
}
// Compute output path: join preview_clips_dir with relative path, change ext to .mp4
let output_path = PathBuf::from(&preview_clips_dir)
.join(&relative_path)
.with_extension("mp4");
let output_str = output_path.to_string_lossy().to_string();
let video_path_owned = video_path.clone();
let relative_path_owned = relative_path.clone();
tokio::spawn(async move {
match generate_preview_clip(&video_path_owned, &output_str).await {
Ok((duration, size)) => {
info!(
"Preview clip complete for '{}' ({:.1}s, {} bytes)",
relative_path_owned, duration, size
);
let otel_ctx = opentelemetry::Context::current();
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.update_status(
&otel_ctx,
&relative_path_owned,
"complete",
Some(duration as f32),
Some(size as i32),
None,
);
}
Err(e) => {
error!(
"Failed to generate preview clip for '{}': {}",
relative_path_owned, e
);
let otel_ctx = opentelemetry::Context::current();
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.update_status(
&otel_ctx,
&relative_path_owned,
"failed",
None,
None,
Some(&e.to_string()),
);
}
}
drop(permit);
});
})
}
}