use crate::database::PreviewDao; use crate::libraries::Library; use crate::otel::global_tracer; use crate::thumbnails::is_video; use crate::video::ffmpeg::{generate_preview_clip, get_duration_seconds_blocking}; use actix::prelude::*; use log::{debug, error, info, trace, warn}; use opentelemetry::KeyValue; use opentelemetry::trace::{Span, Status, Tracer}; use std::io::Result; use std::path::{Path, PathBuf}; use std::process::{Child, Command, ExitStatus, Stdio}; use std::sync::{Arc, Mutex}; use tokio::sync::Semaphore; use walkdir::{DirEntry, WalkDir}; // ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8 // ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8 pub struct StreamActor; impl Actor for StreamActor { type Context = Context; } pub struct ProcessMessage(pub String, pub Child); impl Message for ProcessMessage { type Result = Result; } impl Handler for StreamActor { type Result = Result; fn handle(&mut self, msg: ProcessMessage, _ctx: &mut Self::Context) -> Self::Result { trace!("Message received"); let mut process = msg.1; let result = process.wait(); debug!( "Finished waiting for: {:?}. Code: {:?}", msg.0, result .as_ref() .map_or(-1, |status| status.code().unwrap_or(-1)) ); result } } pub fn playlist_file_for(playlist_dir: &str, video_path: &Path) -> PathBuf { let filename = video_path .file_name() .and_then(|n| n.to_str()) .unwrap_or("unknown"); PathBuf::from(format!("{}/{}.m3u8", playlist_dir, filename)) } /// Sentinel path written next to a would-be playlist when ffmpeg cannot /// transcode the source (e.g. truncated mp4 with no moov atom). Its presence /// causes future scans to skip the file instead of re-running ffmpeg every /// pass. Delete the `.unsupported` file to force a retry. pub fn playlist_unsupported_sentinel(playlist_file: &Path) -> PathBuf { let mut s = playlist_file.as_os_str().to_owned(); s.push(".unsupported"); PathBuf::from(s) } pub async fn create_playlist(video_path: &str, playlist_file: &str) -> Result { if Path::new(playlist_file).exists() { debug!("Playlist already exists: {}", playlist_file); return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)); } let result = Command::new("ffmpeg") .arg("-i") .arg(video_path) .arg("-c:v") .arg("h264") .arg("-crf") .arg("21") .arg("-preset") .arg("veryfast") .arg("-hls_time") .arg("3") .arg("-hls_list_size") .arg("0") .arg("-hls_playlist_type") .arg("vod") .arg("-vf") .arg("scale='min(1080,iw)':-2,setsar=1:1") .arg(playlist_file) .stdout(Stdio::null()) .stderr(Stdio::null()) .spawn(); let start_time = std::time::Instant::now(); loop { actix::clock::sleep(std::time::Duration::from_secs(1)).await; if Path::new(playlist_file).exists() || std::time::Instant::now() - start_time > std::time::Duration::from_secs(5) { break; } } result } pub fn generate_video_thumbnail(path: &Path, destination: &Path) -> std::io::Result<()> { // Probe duration up front and seek to ~50% — gives a more // representative frame than a fixed offset (skipping title cards on // long videos, landing inside the clip on 1–2s Snapchat MP4s) and // sidesteps the seek-past-EOF class of bug entirely. When duration // probing fails (LRV files, fragmented MP4s, ffprobe missing) fall // back to the first frame: ugly but reliable. // // -vf scale + -c:v mjpeg mirrors `generate_image_thumbnail_ffmpeg`. The // filter chain matters as much as the scale does: without it, ffmpeg // hands the decoded frame straight to the mjpeg encoder, which rejects // any non-yuvj420p source ("Non full-range YUV is non-standard"). The // filter chain lets ffmpeg auto-insert the pix_fmt converter the // encoder needs, which is how the image-thumbnail path already handles // the same class of source. let seek = get_duration_seconds_blocking(path).map(|d| format!("{:.3}", d / 2.0)); let mut cmd = Command::new("ffmpeg"); cmd.arg("-y"); if let Some(s) = &seek { cmd.arg("-ss").arg(s); } let output = cmd .arg("-i") .arg(path) .arg("-vframes") .arg("1") .arg("-vf") .arg("scale=200:-1") .arg("-f") .arg("image2") .arg("-c:v") .arg("mjpeg") .arg(destination) .output()?; if !output.status.success() { return Err(std::io::Error::other(format!( "ffmpeg failed ({}): {}", output.status, String::from_utf8_lossy(&output.stderr).trim() ))); } // ffmpeg can exit 0 without writing a frame for malformed files where // the probe duration lies. Confirm a non-empty file actually landed — // returning Err makes the caller write the `.unsupported` sentinel so // we stop re-detecting on every scan. let wrote = std::fs::metadata(destination) .map(|m| m.len() > 0) .unwrap_or(false); if !wrote { return Err(std::io::Error::other( "ffmpeg exited successfully but produced no thumbnail output", )); } Ok(()) } /// Use ffmpeg to extract a 200px-wide thumbnail from formats the `image` crate /// can't decode (RAW: NEF/ARW, HEIC/HEIF). Writes JPEG bytes to `destination` /// regardless of its extension. pub fn generate_image_thumbnail_ffmpeg(path: &Path, destination: &Path) -> std::io::Result<()> { let output = Command::new("ffmpeg") .arg("-y") .arg("-i") .arg(path) .arg("-vframes") .arg("1") .arg("-vf") .arg("scale=200:-1") .arg("-f") .arg("image2") .arg("-c:v") .arg("mjpeg") .arg(destination) .output()?; if !output.status.success() { return Err(std::io::Error::other(format!( "ffmpeg failed ({}): {}", output.status, String::from_utf8_lossy(&output.stderr).trim() ))); } Ok(()) } /// Video stream metadata needed to pick HLS encode settings. Populated by /// a single ffprobe call to avoid spawning multiple subprocesses per video. #[derive(Debug, Default)] struct VideoStreamMeta { is_h264: bool, /// Rotation in degrees (0/90/180/270). Checks both the legacy `rotate` /// stream tag and the modern display-matrix side data. rotation: i32, } /// Probe video stream metadata in one ffprobe call. Returns default (codec /// unknown, rotation 0) on any failure — callers fall back to transcoding. async fn probe_video_stream_meta(video_path: &str) -> VideoStreamMeta { let output = tokio::process::Command::new("ffprobe") .arg("-v") .arg("error") .arg("-select_streams") .arg("v:0") .arg("-print_format") .arg("json") .arg("-show_entries") .arg("stream=codec_name:stream_tags=rotate:side_data_list") .arg(video_path) .output() .await; let Ok(output) = output else { warn!("Failed to run ffprobe for {}", video_path); return VideoStreamMeta::default(); }; if !output.status.success() { warn!( "ffprobe failed for {}: {}", video_path, String::from_utf8_lossy(&output.stderr).trim() ); return VideoStreamMeta::default(); } let Ok(json) = serde_json::from_slice::(&output.stdout) else { warn!("ffprobe returned non-JSON for {}", video_path); return VideoStreamMeta::default(); }; let stream = &json["streams"][0]; let is_h264 = stream .get("codec_name") .and_then(|v| v.as_str()) .map(|s| s == "h264") .unwrap_or(false); // Prefer legacy `tags.rotate` (older containers); fall back to the // display-matrix side data (iPhone and other modern recorders). let rotation = stream .get("tags") .and_then(|t| t.get("rotate")) .and_then(|r| r.as_str()) .and_then(|s| s.parse::().ok()) .filter(|r| *r != 0) .or_else(|| { stream .get("side_data_list") .and_then(|l| l.as_array()) .and_then(|arr| { arr.iter() .find_map(|sd| sd.get("rotation").and_then(|r| r.as_f64())) }) .map(|f| f.abs() as i32) .filter(|r| *r != 0) }) .unwrap_or(0); debug!( "Probed {}: codec_h264={}, rotation={}°", video_path, is_h264, rotation ); VideoStreamMeta { is_h264, rotation } } /// Probe the max keyframe interval (GOP) in the first ~30s of a video. /// Returns `None` on probe failure or if we couldn't see at least two keyframes. /// /// Used to decide between stream-copy and transcode: HLS needs segments to /// start on keyframes, so if the source GOP exceeds `hls_time`, copying /// produces oversized/glitchy segments and we need to re-encode. async fn get_max_gop_seconds(video_path: &str) -> Option { let output = tokio::process::Command::new("ffprobe") .arg("-v") .arg("error") .arg("-select_streams") .arg("v:0") .arg("-skip_frame") .arg("nokey") .arg("-show_entries") .arg("frame=pts_time") .arg("-of") .arg("csv=p=0") .arg("-read_intervals") .arg("%+30") .arg(video_path) .output() .await .ok()?; if !output.status.success() { warn!( "ffprobe GOP check failed for {}: {}", video_path, String::from_utf8_lossy(&output.stderr).trim() ); return None; } let times: Vec = String::from_utf8_lossy(&output.stdout) .lines() .filter_map(|l| l.trim().parse::().ok()) .collect(); if times.len() < 2 { return None; } let max_gop = times .windows(2) .map(|w| w[1] - w[0]) .fold(0.0_f64, f64::max); debug!( "Max GOP in first {} keyframes of {}: {:.2}s", times.len(), video_path, max_gop ); Some(max_gop) } pub struct VideoPlaylistManager { playlist_dir: PathBuf, playlist_generator: Addr, } impl VideoPlaylistManager { pub fn new>( playlist_dir: P, playlist_generator: Addr, ) -> Self { Self { playlist_dir: playlist_dir.into(), playlist_generator, } } } impl Actor for VideoPlaylistManager { type Context = Context; } impl Handler for VideoPlaylistManager { type Result = ResponseFuture<()>; fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result { let tracer = global_tracer(); let mut span = tracer.start("videoplaylistmanager.scan_directory"); let start = std::time::Instant::now(); info!( "Starting scan directory for video playlist generation: {}", msg.directory ); let playlist_output_dir = self.playlist_dir.clone(); let playlist_dir_str = playlist_output_dir.to_str().unwrap().to_string(); let video_files = WalkDir::new(&msg.directory) .into_iter() .filter_map(|e| e.ok()) .filter(|e| e.file_type().is_file()) .filter(is_video) .filter(|e| { let playlist = playlist_file_for(&playlist_dir_str, e.path()); !playlist.exists() && !playlist_unsupported_sentinel(&playlist).exists() }) .collect::>(); let scan_dir_name = msg.directory.clone(); let playlist_generator = self.playlist_generator.clone(); Box::pin(async move { for e in video_files { let path = e.path(); let path_as_str = path.to_str().unwrap(); debug!( "Sending generate playlist message for path: {}", path_as_str ); match playlist_generator .send(GeneratePlaylistMessage { playlist_path: playlist_output_dir.to_str().unwrap().to_string(), video_path: PathBuf::from(path), }) .await .expect("Failed to send generate playlist message") { Ok(_) => { span.add_event( "Playlist generated", vec![KeyValue::new("video_path", path_as_str.to_string())], ); debug!( "Successfully generated playlist for file: '{}'", path_as_str ); } Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { debug!("Playlist already exists for '{:?}', skipping", path); } Err(e) => { warn!("Failed to generate playlist for path '{:?}'. {:?}", path, e); } } } span.add_event( "Finished directory scan", vec![KeyValue::new("directory", scan_dir_name.to_string())], ); info!( "Finished directory scan of '{}' in {:?}", scan_dir_name, start.elapsed() ); }) } } impl Handler for VideoPlaylistManager { type Result = (); fn handle(&mut self, msg: QueueVideosMessage, _ctx: &mut Self::Context) -> Self::Result { if msg.video_paths.is_empty() { return; } info!( "Queueing {} videos for HLS playlist generation", msg.video_paths.len() ); let playlist_output_dir = self.playlist_dir.clone(); let playlist_dir_str = playlist_output_dir.to_str().unwrap().to_string(); let playlist_generator = self.playlist_generator.clone(); for video_path in msg.video_paths { let playlist = playlist_file_for(&playlist_dir_str, &video_path); if playlist.exists() || playlist_unsupported_sentinel(&playlist).exists() { continue; } let path_str = video_path.to_string_lossy().to_string(); debug!("Queueing playlist generation for: {}", path_str); playlist_generator.do_send(GeneratePlaylistMessage { playlist_path: playlist_dir_str.clone(), video_path, }); } } } #[derive(Message)] #[rtype(result = "()")] pub struct ScanDirectoryMessage { pub(crate) directory: String, } #[derive(Message)] #[rtype(result = "()")] pub struct QueueVideosMessage { pub video_paths: Vec, } #[derive(Message)] #[rtype(result = "Result<()>")] pub struct GeneratePlaylistMessage { pub video_path: PathBuf, pub playlist_path: String, } pub struct PlaylistGenerator { semaphore: Arc, } impl PlaylistGenerator { pub(crate) fn new() -> Self { // Concurrency is tunable via HLS_CONCURRENCY so operators can dial // it to their hardware: 1 on weak Synology boxes to avoid thermal // throttling, higher on desktops with spare cores. let concurrency = std::env::var("HLS_CONCURRENCY") .ok() .and_then(|v| v.parse::().ok()) .filter(|&n| n > 0) .unwrap_or(2); info!("PlaylistGenerator: concurrency={}", concurrency); PlaylistGenerator { semaphore: Arc::new(Semaphore::new(concurrency)), } } } impl Actor for PlaylistGenerator { type Context = Context; } impl Handler for PlaylistGenerator { type Result = ResponseFuture>; fn handle(&mut self, msg: GeneratePlaylistMessage, _ctx: &mut Self::Context) -> Self::Result { let video_file = msg.video_path.to_str().unwrap().to_owned(); let playlist_path = msg.playlist_path.as_str().to_owned(); let semaphore = self.semaphore.clone(); let playlist_file = format!( "{}/{}.m3u8", playlist_path, msg.video_path.file_name().unwrap().to_str().unwrap() ); let tracer = global_tracer(); let mut span = tracer .span_builder("playlistgenerator.generate_playlist") .with_attributes(vec![ KeyValue::new("video_file", video_file.clone()), KeyValue::new("playlist_file", playlist_file.clone()), ]) .start(&tracer); Box::pin(async move { let wait_start = std::time::Instant::now(); let permit = semaphore .acquire_owned() .await .expect("Unable to acquire semaphore"); debug!( "Waited for {:?} before starting ffmpeg", wait_start.elapsed() ); span.add_event( "Waited for FFMPEG semaphore", vec![KeyValue::new( "wait_time", wait_start.elapsed().as_secs_f64(), )], ); if Path::new(&playlist_file).exists() { debug!("Playlist already exists: {}", playlist_file); span.set_status(Status::error(format!( "Playlist already exists: {}", playlist_file ))); return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)); } // One ffprobe call for codec + rotation metadata. let stream_meta = probe_video_stream_meta(&video_file).await; let is_h264 = stream_meta.is_h264; let rotation = stream_meta.rotation; let has_rotation = rotation != 0; // Stream-copy is only safe when the source GOP fits inside a // single HLS segment. Otherwise ffmpeg has to extend segments // past hls_time to land on a keyframe, producing uneven // segments and seeking glitches. const HLS_SEGMENT_SECONDS: f64 = 3.0; let gop_ok = if is_h264 && !has_rotation { match get_max_gop_seconds(&video_file).await { Some(g) if g > HLS_SEGMENT_SECONDS => { info!( "Video {} has long GOP ({:.1}s > {}s), transcoding for segment alignment", video_file, g, HLS_SEGMENT_SECONDS ); false } Some(_) => true, None => { // Probe failed — be conservative and transcode rather // than risk broken segments from a mystery source. debug!( "GOP probe failed for {}, transcoding to be safe", video_file ); false } } } else { false }; let use_copy = is_h264 && !has_rotation && gop_ok; if has_rotation { info!( "Video {} has rotation metadata ({}°), transcoding to apply rotation", video_file, rotation ); span.add_event( "Transcoding due to rotation", vec![KeyValue::new("rotation_degrees", rotation as i64)], ); } else if use_copy { info!("Video {} is already h264, using stream copy", video_file); span.add_event("Using stream copy (h264 detected)", vec![]); } else if is_h264 { info!( "Video {} is h264 but needs transcoding for GOP alignment", video_file ); span.add_event("Transcoding for GOP alignment", vec![]); } else { info!("Video {} needs transcoding to h264", video_file); span.add_event("Transcoding to h264", vec![]); } // Encode to a .tmp playlist and explicit segment names so a failed // encode leaves predictable artifacts we can clean up — and so a // concurrent scan doesn't see a half-written .m3u8 as "done". let playlist_tmp = format!("{}.tmp", playlist_file); let video_stem = msg .video_path .file_name() .and_then(|n| n.to_str()) .unwrap_or("video"); let segment_pattern = format!("{}/{}_%03d.ts", playlist_path, video_stem); let mut cmd = tokio::process::Command::new("ffmpeg"); cmd.arg("-y").arg("-i").arg(&video_file); if use_copy { cmd.arg("-c:v").arg("copy"); cmd.arg("-c:a").arg("aac"); } else { let nvenc = crate::video::ffmpeg::is_nvenc_available().await; if nvenc { // NVENC: no CRF, use VBR + target CQ. p1 = fastest // preset — prioritizes encoder throughput over bitrate // efficiency. CQ 23 roughly matches libx264 crf 21 // visually; NVENC has slightly lower compression // efficiency per quality. cmd.arg("-c:v").arg("h264_nvenc"); cmd.arg("-preset").arg("p1"); cmd.arg("-rc").arg("vbr"); cmd.arg("-cq").arg("23"); cmd.arg("-pix_fmt").arg("yuv420p"); } else { cmd.arg("-c:v").arg("h264"); cmd.arg("-crf").arg("21"); cmd.arg("-preset").arg("veryfast"); } cmd.arg("-vf").arg("scale='min(1080,iw)':-2,setsar=1:1"); cmd.arg("-c:a").arg("aac"); // Force an IDR frame every hls_time seconds so each HLS // segment starts on a keyframe — accurate seeking without // players having to decode from a prior segment. cmd.arg("-force_key_frames").arg("expr:gte(t,n_forced*3)"); } // -f hls is required because the playlist is written to a .tmp // path during encoding — ffmpeg normally infers the muxer from // the output extension and doesn't recognize ".m3u8.tmp". cmd.arg("-f").arg("hls"); cmd.arg("-hls_time").arg("3"); cmd.arg("-hls_list_size").arg("0"); cmd.arg("-hls_playlist_type").arg("vod"); // independent_segments advertises that each segment can be // decoded without reference to any other — the matching guarantee // for the forced keyframes above. cmd.arg("-hls_flags").arg("independent_segments"); cmd.arg("-hls_segment_filename").arg(&segment_pattern); cmd.arg(&playlist_tmp); cmd.stdout(Stdio::null()); cmd.stderr(Stdio::piped()); cmd.kill_on_drop(true); // Spawn + wait under a timeout so a hung ffmpeg (corrupt source, // NFS stall, etc.) doesn't permanently hold a semaphore slot. // Default is generous — a long 4K transcode on CPU can take hours. let timeout_secs = std::env::var("HLS_TIMEOUT_SECONDS") .ok() .and_then(|v| v.parse::().ok()) .unwrap_or(7200); let ffmpeg_result = match cmd.spawn() { Ok(child) => { match tokio::time::timeout( std::time::Duration::from_secs(timeout_secs), child.wait_with_output(), ) .await { Ok(res) => res .inspect_err(|e| { error!("Failed to wait on ffmpeg child process: {}", e) }) .map_err(|e| std::io::Error::other(e.to_string())), Err(_) => Err(std::io::Error::other(format!( "ffmpeg exceeded {}s timeout", timeout_secs ))), } } Err(e) => { error!("Failed to spawn ffmpeg: {}", e); Err(std::io::Error::other(e.to_string())) } }; drop(permit); let success = matches!(&ffmpeg_result, Ok(out) if out.status.success()); if success { if let Err(e) = tokio::fs::rename(&playlist_tmp, &playlist_file).await { error!( "ffmpeg succeeded but rename {} -> {} failed: {}", playlist_tmp, playlist_file, e ); cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await; span.set_status(Status::error(format!("rename failed: {}", e))); return Err(e); } debug!("Playlist complete: {}", playlist_file); span.set_status(Status::Ok); Ok(()) } else { let detail = match &ffmpeg_result { Ok(out) => format!( "exit {}: {}", out.status, String::from_utf8_lossy(&out.stderr).trim() ), Err(e) => format!("ffmpeg failed: {}", e), }; error!("ffmpeg failed for {}: {}", video_file, detail); cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await; let sentinel = playlist_unsupported_sentinel(Path::new(&playlist_file)); if let Err(se) = tokio::fs::write(&sentinel, b"").await { warn!( "Failed to write playlist sentinel {}: {}", sentinel.display(), se ); } else { info!( "Wrote playlist sentinel {} so future scans skip {}", sentinel.display(), video_file ); } span.set_status(Status::error(detail.clone())); Err(std::io::Error::other(detail)) } }) } } /// Delete the temp playlist and any segment files that ffmpeg may have written /// before failing. Called both on ffmpeg error and on rename failure so a /// retry on the next scan starts from a clean slate. async fn cleanup_partial_hls(playlist_tmp: &str, playlist_dir: &str, video_stem: &str) { let _ = tokio::fs::remove_file(playlist_tmp).await; let segment_prefix = format!("{}_", video_stem); let Ok(mut entries) = tokio::fs::read_dir(playlist_dir).await else { return; }; while let Ok(Some(entry)) = entries.next_entry().await { let Some(name) = entry.file_name().to_str().map(str::to_owned) else { continue; }; if name.starts_with(&segment_prefix) && name.ends_with(".ts") && let Err(e) = tokio::fs::remove_file(entry.path()).await { warn!("Failed to remove partial segment {}: {}", name, e); } } } #[derive(Message)] #[rtype(result = "()")] pub struct GeneratePreviewClipMessage { pub video_path: String, } pub struct PreviewClipGenerator { semaphore: Arc, preview_clips_dir: String, libraries: Vec, preview_dao: Arc>>, } impl PreviewClipGenerator { pub fn new( preview_clips_dir: String, libraries: Vec, preview_dao: Arc>>, ) -> Self { PreviewClipGenerator { semaphore: Arc::new(Semaphore::new(2)), preview_clips_dir, libraries, preview_dao, } } /// Strip whichever library root actually contains `video_path`. /// Falls back to the first library if none match, so we never /// accidentally emit the absolute input path as the output path /// (which ffmpeg rejects as "cannot edit existing files in place"). fn relativize(&self, video_path: &str) -> String { for lib in &self.libraries { if let Some(stripped) = video_path.strip_prefix(&lib.root_path) { return stripped.trim_start_matches(['/', '\\']).replace('\\', "/"); } } video_path .trim_start_matches(['/', '\\']) .replace('\\', "/") } } impl Actor for PreviewClipGenerator { type Context = Context; } impl Handler for PreviewClipGenerator { type Result = ResponseFuture<()>; fn handle( &mut self, msg: GeneratePreviewClipMessage, _ctx: &mut Self::Context, ) -> Self::Result { let semaphore = self.semaphore.clone(); let preview_clips_dir = self.preview_clips_dir.clone(); let preview_dao = self.preview_dao.clone(); let video_path = msg.video_path; // Resolve against whichever library actually owns this video. let relative_path = self.relativize(&video_path); Box::pin(async move { let permit = semaphore .acquire_owned() .await .expect("Unable to acquire preview semaphore"); // Update status to processing { let otel_ctx = opentelemetry::Context::current(); let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); let _ = dao.update_status(&otel_ctx, &relative_path, "processing", None, None, None); } // Compute output path: join preview_clips_dir with relative path, change ext to .mp4 let output_path = PathBuf::from(&preview_clips_dir) .join(&relative_path) .with_extension("mp4"); let output_str = output_path.to_string_lossy().to_string(); let video_path_owned = video_path.clone(); let relative_path_owned = relative_path.clone(); tokio::spawn(async move { match generate_preview_clip(&video_path_owned, &output_str).await { Ok((duration, size)) => { info!( "Preview clip complete for '{}' ({:.1}s, {} bytes)", relative_path_owned, duration, size ); let otel_ctx = opentelemetry::Context::current(); let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); let _ = dao.update_status( &otel_ctx, &relative_path_owned, "complete", Some(duration as f32), Some(size as i32), None, ); } Err(e) => { error!( "Failed to generate preview clip for '{}': {}", relative_path_owned, e ); let otel_ctx = opentelemetry::Context::current(); let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); let _ = dao.update_status( &otel_ctx, &relative_path_owned, "failed", None, None, Some(&e.to_string()), ); } } drop(permit); }); }) } }