feature/hls-content-hash #95
12
src/main.rs
12
src/main.rs
@@ -26,7 +26,6 @@ use crate::files::{RealFileSystem, move_file};
|
|||||||
use crate::service::ServiceBuilder;
|
use crate::service::ServiceBuilder;
|
||||||
use crate::state::AppState;
|
use crate::state::AppState;
|
||||||
use crate::tags::*;
|
use crate::tags::*;
|
||||||
use crate::video::actors::ScanDirectoryMessage;
|
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
|
|
||||||
mod ai;
|
mod ai;
|
||||||
@@ -119,13 +118,12 @@ fn main() -> std::io::Result<()> {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let app_state = app_data.clone();
|
let app_state = app_data.clone();
|
||||||
for lib in &app_state.libraries {
|
|
||||||
app_state.playlist_manager.do_send(ScanDirectoryMessage {
|
|
||||||
directory: lib.root_path.clone(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start file watcher with playlist manager and preview generator
|
// Start file watcher with playlist manager and preview generator.
|
||||||
|
// The watcher's first tick is configured to be a full scan (see
|
||||||
|
// `watch_files`), so every library's missing HLS playlists are
|
||||||
|
// queued on that first iteration — no separate startup walk
|
||||||
|
// needed.
|
||||||
let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone();
|
let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone();
|
||||||
let preview_gen_for_watcher = app_state.preview_clip_generator.as_ref().clone();
|
let preview_gen_for_watcher = app_state.preview_clip_generator.as_ref().clone();
|
||||||
// Both background jobs read from the shared `live_libraries` lock
|
// Both background jobs read from the shared `live_libraries` lock
|
||||||
|
|||||||
@@ -111,7 +111,7 @@ impl AppState {
|
|||||||
"AppState::new requires at least one library"
|
"AppState::new requires at least one library"
|
||||||
);
|
);
|
||||||
let base_path = libraries_vec[0].root_path.clone();
|
let base_path = libraries_vec[0].root_path.clone();
|
||||||
let playlist_generator = PlaylistGenerator::new();
|
let playlist_generator = PlaylistGenerator::new(video_path.clone());
|
||||||
let video_playlist_manager =
|
let video_playlist_manager =
|
||||||
VideoPlaylistManager::new(video_path.clone(), playlist_generator.start());
|
VideoPlaylistManager::new(video_path.clone(), playlist_generator.start());
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
|
use crate::content_hash;
|
||||||
use crate::database::PreviewDao;
|
use crate::database::PreviewDao;
|
||||||
use crate::libraries::Library;
|
use crate::libraries::Library;
|
||||||
use crate::otel::global_tracer;
|
use crate::otel::global_tracer;
|
||||||
use crate::thumbnails::is_video;
|
|
||||||
use crate::video::ffmpeg::{generate_preview_clip, get_duration_seconds_blocking};
|
use crate::video::ffmpeg::{generate_preview_clip, get_duration_seconds_blocking};
|
||||||
|
use crate::video::hls_paths;
|
||||||
use actix::prelude::*;
|
use actix::prelude::*;
|
||||||
use log::{debug, error, info, trace, warn};
|
use log::{debug, error, info, trace, warn};
|
||||||
use opentelemetry::KeyValue;
|
use opentelemetry::KeyValue;
|
||||||
@@ -12,7 +13,6 @@ use std::path::{Path, PathBuf};
|
|||||||
use std::process::{Child, Command, ExitStatus, Stdio};
|
use std::process::{Child, Command, ExitStatus, Stdio};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
use walkdir::{DirEntry, WalkDir};
|
|
||||||
// ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8
|
// ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8
|
||||||
// ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8
|
// ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8
|
||||||
|
|
||||||
@@ -57,9 +57,11 @@ pub struct VideoToQueue {
|
|||||||
pub content_hash: String,
|
pub content_hash: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Legacy basename-keyed playlist path. Used only by the one-shot startup
|
/// Legacy basename-keyed playlist path. Retained for the one-shot startup
|
||||||
/// migration that retires pre-content-hash output; once cleared, all new
|
/// migration that retires pre-content-hash output; new playlist writes go
|
||||||
/// playlist writes go through [`hls_paths::playlist_for_hash`].
|
/// through [`hls_paths::playlist_for_hash`]. Will be removed once the
|
||||||
|
/// migration ships and runs to completion in production.
|
||||||
|
#[allow(dead_code)]
|
||||||
pub fn playlist_file_for(playlist_dir: &str, video_path: &Path) -> PathBuf {
|
pub fn playlist_file_for(playlist_dir: &str, video_path: &Path) -> PathBuf {
|
||||||
let filename = video_path
|
let filename = video_path
|
||||||
.file_name()
|
.file_name()
|
||||||
@@ -70,6 +72,7 @@ pub fn playlist_file_for(playlist_dir: &str, video_path: &Path) -> PathBuf {
|
|||||||
|
|
||||||
/// Legacy basename-keyed sentinel path. Same migration-only contract as
|
/// Legacy basename-keyed sentinel path. Same migration-only contract as
|
||||||
/// [`playlist_file_for`].
|
/// [`playlist_file_for`].
|
||||||
|
#[allow(dead_code)]
|
||||||
pub fn playlist_unsupported_sentinel(playlist_file: &Path) -> PathBuf {
|
pub fn playlist_unsupported_sentinel(playlist_file: &Path) -> PathBuf {
|
||||||
let mut s = playlist_file.as_os_str().to_owned();
|
let mut s = playlist_file.as_os_str().to_owned();
|
||||||
s.push(".unsupported");
|
s.push(".unsupported");
|
||||||
@@ -342,17 +345,17 @@ async fn get_max_gop_seconds(video_path: &str) -> Option<f64> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct VideoPlaylistManager {
|
pub struct VideoPlaylistManager {
|
||||||
playlist_dir: PathBuf,
|
video_dir: PathBuf,
|
||||||
playlist_generator: Addr<PlaylistGenerator>,
|
playlist_generator: Addr<PlaylistGenerator>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl VideoPlaylistManager {
|
impl VideoPlaylistManager {
|
||||||
pub fn new<P: Into<PathBuf>>(
|
pub fn new<P: Into<PathBuf>>(
|
||||||
playlist_dir: P,
|
video_dir: P,
|
||||||
playlist_generator: Addr<PlaylistGenerator>,
|
playlist_generator: Addr<PlaylistGenerator>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
playlist_dir: playlist_dir.into(),
|
video_dir: video_dir.into(),
|
||||||
playlist_generator,
|
playlist_generator,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -362,144 +365,68 @@ impl Actor for VideoPlaylistManager {
|
|||||||
type Context = Context<Self>;
|
type Context = Context<Self>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Handler<ScanDirectoryMessage> for VideoPlaylistManager {
|
|
||||||
type Result = ResponseFuture<()>;
|
|
||||||
|
|
||||||
fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result {
|
|
||||||
let tracer = global_tracer();
|
|
||||||
let mut span = tracer.start("videoplaylistmanager.scan_directory");
|
|
||||||
|
|
||||||
let start = std::time::Instant::now();
|
|
||||||
info!(
|
|
||||||
"Starting scan directory for video playlist generation: {}",
|
|
||||||
msg.directory
|
|
||||||
);
|
|
||||||
|
|
||||||
let playlist_output_dir = self.playlist_dir.clone();
|
|
||||||
let playlist_dir_str = playlist_output_dir.to_str().unwrap().to_string();
|
|
||||||
|
|
||||||
let video_files = WalkDir::new(&msg.directory)
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|e| e.ok())
|
|
||||||
.filter(|e| e.file_type().is_file())
|
|
||||||
.filter(is_video)
|
|
||||||
.filter(|e| {
|
|
||||||
let playlist = playlist_file_for(&playlist_dir_str, e.path());
|
|
||||||
!playlist.exists() && !playlist_unsupported_sentinel(&playlist).exists()
|
|
||||||
})
|
|
||||||
.collect::<Vec<DirEntry>>();
|
|
||||||
|
|
||||||
let scan_dir_name = msg.directory.clone();
|
|
||||||
let playlist_generator = self.playlist_generator.clone();
|
|
||||||
|
|
||||||
Box::pin(async move {
|
|
||||||
for e in video_files {
|
|
||||||
let path = e.path();
|
|
||||||
let path_as_str = path.to_str().unwrap();
|
|
||||||
debug!(
|
|
||||||
"Sending generate playlist message for path: {}",
|
|
||||||
path_as_str
|
|
||||||
);
|
|
||||||
|
|
||||||
match playlist_generator
|
|
||||||
.send(GeneratePlaylistMessage {
|
|
||||||
playlist_path: playlist_output_dir.to_str().unwrap().to_string(),
|
|
||||||
video_path: PathBuf::from(path),
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.expect("Failed to send generate playlist message")
|
|
||||||
{
|
|
||||||
Ok(_) => {
|
|
||||||
span.add_event(
|
|
||||||
"Playlist generated",
|
|
||||||
vec![KeyValue::new("video_path", path_as_str.to_string())],
|
|
||||||
);
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
"Successfully generated playlist for file: '{}'",
|
|
||||||
path_as_str
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
|
|
||||||
debug!("Playlist already exists for '{:?}', skipping", path);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Failed to generate playlist for path '{:?}'. {:?}", path, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
span.add_event(
|
|
||||||
"Finished directory scan",
|
|
||||||
vec![KeyValue::new("directory", scan_dir_name.to_string())],
|
|
||||||
);
|
|
||||||
info!(
|
|
||||||
"Finished directory scan of '{}' in {:?}",
|
|
||||||
scan_dir_name,
|
|
||||||
start.elapsed()
|
|
||||||
);
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Handler<QueueVideosMessage> for VideoPlaylistManager {
|
impl Handler<QueueVideosMessage> for VideoPlaylistManager {
|
||||||
type Result = ();
|
type Result = ();
|
||||||
|
|
||||||
fn handle(&mut self, msg: QueueVideosMessage, _ctx: &mut Self::Context) -> Self::Result {
|
fn handle(&mut self, msg: QueueVideosMessage, _ctx: &mut Self::Context) -> Self::Result {
|
||||||
if msg.video_paths.is_empty() {
|
if msg.videos.is_empty() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(
|
let video_dir = self.video_dir.clone();
|
||||||
"Queueing {} videos for HLS playlist generation",
|
|
||||||
msg.video_paths.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
let playlist_output_dir = self.playlist_dir.clone();
|
|
||||||
let playlist_dir_str = playlist_output_dir.to_str().unwrap().to_string();
|
|
||||||
let playlist_generator = self.playlist_generator.clone();
|
let playlist_generator = self.playlist_generator.clone();
|
||||||
|
|
||||||
for video_path in msg.video_paths {
|
let mut queued = 0usize;
|
||||||
let playlist = playlist_file_for(&playlist_dir_str, &video_path);
|
let mut already_present = 0usize;
|
||||||
if playlist.exists() || playlist_unsupported_sentinel(&playlist).exists() {
|
for VideoToQueue {
|
||||||
|
video_path,
|
||||||
|
content_hash,
|
||||||
|
} in msg.videos
|
||||||
|
{
|
||||||
|
let playlist = hls_paths::playlist_for_hash(&video_dir, &content_hash);
|
||||||
|
let sentinel = hls_paths::sentinel_for_hash(&video_dir, &content_hash);
|
||||||
|
if playlist.exists() || sentinel.exists() {
|
||||||
|
already_present += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let path_str = video_path.to_string_lossy().to_string();
|
debug!(
|
||||||
debug!("Queueing playlist generation for: {}", path_str);
|
"Queueing playlist generation for {} (hash={})",
|
||||||
|
video_path.display(),
|
||||||
|
short_hash(&content_hash)
|
||||||
|
);
|
||||||
playlist_generator.do_send(GeneratePlaylistMessage {
|
playlist_generator.do_send(GeneratePlaylistMessage {
|
||||||
playlist_path: playlist_dir_str.clone(),
|
|
||||||
video_path,
|
video_path,
|
||||||
|
content_hash,
|
||||||
});
|
});
|
||||||
|
queued += 1;
|
||||||
}
|
}
|
||||||
|
info!(
|
||||||
|
"Queue tick: {} queued, {} skipped (playlist or sentinel already on disk)",
|
||||||
|
queued, already_present
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Message)]
|
|
||||||
#[rtype(result = "()")]
|
|
||||||
pub struct ScanDirectoryMessage {
|
|
||||||
pub(crate) directory: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Message)]
|
#[derive(Message)]
|
||||||
#[rtype(result = "()")]
|
#[rtype(result = "()")]
|
||||||
pub struct QueueVideosMessage {
|
pub struct QueueVideosMessage {
|
||||||
pub video_paths: Vec<PathBuf>,
|
pub videos: Vec<VideoToQueue>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Message)]
|
#[derive(Message)]
|
||||||
#[rtype(result = "Result<()>")]
|
#[rtype(result = "Result<()>")]
|
||||||
pub struct GeneratePlaylistMessage {
|
pub struct GeneratePlaylistMessage {
|
||||||
pub video_path: PathBuf,
|
pub video_path: PathBuf,
|
||||||
pub playlist_path: String,
|
pub content_hash: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct PlaylistGenerator {
|
pub struct PlaylistGenerator {
|
||||||
semaphore: Arc<Semaphore>,
|
semaphore: Arc<Semaphore>,
|
||||||
|
video_dir: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PlaylistGenerator {
|
impl PlaylistGenerator {
|
||||||
pub(crate) fn new() -> Self {
|
pub(crate) fn new<P: Into<PathBuf>>(video_dir: P) -> Self {
|
||||||
// Concurrency is tunable via HLS_CONCURRENCY so operators can dial
|
// Concurrency is tunable via HLS_CONCURRENCY so operators can dial
|
||||||
// it to their hardware: 1 on weak Synology boxes to avoid thermal
|
// it to their hardware: 1 on weak Synology boxes to avoid thermal
|
||||||
// throttling, higher on desktops with spare cores.
|
// throttling, higher on desktops with spare cores.
|
||||||
@@ -511,6 +438,7 @@ impl PlaylistGenerator {
|
|||||||
info!("PlaylistGenerator: concurrency={}", concurrency);
|
info!("PlaylistGenerator: concurrency={}", concurrency);
|
||||||
PlaylistGenerator {
|
PlaylistGenerator {
|
||||||
semaphore: Arc::new(Semaphore::new(concurrency)),
|
semaphore: Arc::new(Semaphore::new(concurrency)),
|
||||||
|
video_dir: video_dir.into(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -524,20 +452,23 @@ impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
|
|||||||
|
|
||||||
fn handle(&mut self, msg: GeneratePlaylistMessage, _ctx: &mut Self::Context) -> Self::Result {
|
fn handle(&mut self, msg: GeneratePlaylistMessage, _ctx: &mut Self::Context) -> Self::Result {
|
||||||
let video_file = msg.video_path.to_str().unwrap().to_owned();
|
let video_file = msg.video_path.to_str().unwrap().to_owned();
|
||||||
let playlist_path = msg.playlist_path.as_str().to_owned();
|
let content_hash_str = msg.content_hash.clone();
|
||||||
let semaphore = self.semaphore.clone();
|
let semaphore = self.semaphore.clone();
|
||||||
|
let video_dir = self.video_dir.clone();
|
||||||
|
|
||||||
let playlist_file = format!(
|
let hash_dir = content_hash::hls_dir(&video_dir, &content_hash_str);
|
||||||
"{}/{}.m3u8",
|
let playlist_path = hls_paths::playlist_for_hash(&video_dir, &content_hash_str);
|
||||||
playlist_path,
|
let sentinel_path = hls_paths::sentinel_for_hash(&video_dir, &content_hash_str);
|
||||||
msg.video_path.file_name().unwrap().to_str().unwrap()
|
let segment_template = hls_paths::segment_template_for_hash(&video_dir, &content_hash_str);
|
||||||
);
|
let playlist_file = playlist_path.to_string_lossy().to_string();
|
||||||
|
let segment_pattern = segment_template.to_string_lossy().to_string();
|
||||||
|
|
||||||
let tracer = global_tracer();
|
let tracer = global_tracer();
|
||||||
let mut span = tracer
|
let mut span = tracer
|
||||||
.span_builder("playlistgenerator.generate_playlist")
|
.span_builder("playlistgenerator.generate_playlist")
|
||||||
.with_attributes(vec![
|
.with_attributes(vec![
|
||||||
KeyValue::new("video_file", video_file.clone()),
|
KeyValue::new("video_file", video_file.clone()),
|
||||||
|
KeyValue::new("content_hash", content_hash_str.clone()),
|
||||||
KeyValue::new("playlist_file", playlist_file.clone()),
|
KeyValue::new("playlist_file", playlist_file.clone()),
|
||||||
])
|
])
|
||||||
.start(&tracer);
|
.start(&tracer);
|
||||||
@@ -561,7 +492,7 @@ impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
|
|||||||
)],
|
)],
|
||||||
);
|
);
|
||||||
|
|
||||||
if Path::new(&playlist_file).exists() {
|
if playlist_path.exists() {
|
||||||
debug!("Playlist already exists: {}", playlist_file);
|
debug!("Playlist already exists: {}", playlist_file);
|
||||||
span.set_status(Status::error(format!(
|
span.set_status(Status::error(format!(
|
||||||
"Playlist already exists: {}",
|
"Playlist already exists: {}",
|
||||||
@@ -570,6 +501,19 @@ impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
|
|||||||
return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
|
return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure the shard + hash directory exist. Idempotent — the
|
||||||
|
// dir may already be present from a prior attempt that wrote
|
||||||
|
// a sentinel before being cleared for retry.
|
||||||
|
if let Err(e) = tokio::fs::create_dir_all(&hash_dir).await {
|
||||||
|
error!(
|
||||||
|
"Failed to create HLS hash dir {}: {}",
|
||||||
|
hash_dir.display(),
|
||||||
|
e
|
||||||
|
);
|
||||||
|
span.set_status(Status::error(format!("mkdir failed: {}", e)));
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
|
||||||
// One ffprobe call for codec + rotation metadata.
|
// One ffprobe call for codec + rotation metadata.
|
||||||
let stream_meta = probe_video_stream_meta(&video_file).await;
|
let stream_meta = probe_video_stream_meta(&video_file).await;
|
||||||
let is_h264 = stream_meta.is_h264;
|
let is_h264 = stream_meta.is_h264;
|
||||||
@@ -630,16 +574,11 @@ impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
|
|||||||
span.add_event("Transcoding to h264", vec![]);
|
span.add_event("Transcoding to h264", vec![]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encode to a .tmp playlist and explicit segment names so a failed
|
// Encode to a .tmp playlist alongside the final inside the
|
||||||
// encode leaves predictable artifacts we can clean up — and so a
|
// hash dir, so a concurrent scan never sees a half-written
|
||||||
// concurrent scan doesn't see a half-written .m3u8 as "done".
|
// .m3u8 as "done". Segments use the hash-keyed template;
|
||||||
|
// ffmpeg writes them next to the playlist (relative refs).
|
||||||
let playlist_tmp = format!("{}.tmp", playlist_file);
|
let playlist_tmp = format!("{}.tmp", playlist_file);
|
||||||
let video_stem = msg
|
|
||||||
.video_path
|
|
||||||
.file_name()
|
|
||||||
.and_then(|n| n.to_str())
|
|
||||||
.unwrap_or("video");
|
|
||||||
let segment_pattern = format!("{}/{}_%03d.ts", playlist_path, video_stem);
|
|
||||||
|
|
||||||
let mut cmd = tokio::process::Command::new("ffmpeg");
|
let mut cmd = tokio::process::Command::new("ffmpeg");
|
||||||
cmd.arg("-y").arg("-i").arg(&video_file);
|
cmd.arg("-y").arg("-i").arg(&video_file);
|
||||||
@@ -728,12 +667,12 @@ impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
|
|||||||
let success = matches!(&ffmpeg_result, Ok(out) if out.status.success());
|
let success = matches!(&ffmpeg_result, Ok(out) if out.status.success());
|
||||||
|
|
||||||
if success {
|
if success {
|
||||||
if let Err(e) = tokio::fs::rename(&playlist_tmp, &playlist_file).await {
|
if let Err(e) = tokio::fs::rename(&playlist_tmp, &playlist_path).await {
|
||||||
error!(
|
error!(
|
||||||
"ffmpeg succeeded but rename {} -> {} failed: {}",
|
"ffmpeg succeeded but rename {} -> {} failed: {}",
|
||||||
playlist_tmp, playlist_file, e
|
playlist_tmp, playlist_file, e
|
||||||
);
|
);
|
||||||
cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await;
|
cleanup_partial_hls(&hash_dir).await;
|
||||||
span.set_status(Status::error(format!("rename failed: {}", e)));
|
span.set_status(Status::error(format!("rename failed: {}", e)));
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
@@ -750,18 +689,17 @@ impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
|
|||||||
Err(e) => format!("ffmpeg failed: {}", e),
|
Err(e) => format!("ffmpeg failed: {}", e),
|
||||||
};
|
};
|
||||||
error!("ffmpeg failed for {}: {}", video_file, detail);
|
error!("ffmpeg failed for {}: {}", video_file, detail);
|
||||||
cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await;
|
cleanup_partial_hls(&hash_dir).await;
|
||||||
let sentinel = playlist_unsupported_sentinel(Path::new(&playlist_file));
|
if let Err(se) = tokio::fs::write(&sentinel_path, b"").await {
|
||||||
if let Err(se) = tokio::fs::write(&sentinel, b"").await {
|
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to write playlist sentinel {}: {}",
|
"Failed to write playlist sentinel {}: {}",
|
||||||
sentinel.display(),
|
sentinel_path.display(),
|
||||||
se
|
se
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
info!(
|
info!(
|
||||||
"Wrote playlist sentinel {} so future scans skip {}",
|
"Wrote playlist sentinel {} so future scans skip {}",
|
||||||
sentinel.display(),
|
sentinel_path.display(),
|
||||||
video_file
|
video_file
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -772,29 +710,47 @@ impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete the temp playlist and any segment files that ffmpeg may have written
|
/// Delete the partial playlist (.tmp) and any segment files left behind by
|
||||||
/// before failing. Called both on ffmpeg error and on rename failure so a
|
/// a failed ffmpeg run. Wipes every non-sentinel file in the hash dir;
|
||||||
/// retry on the next scan starts from a clean slate.
|
/// retains the sentinel if one has already been written by an earlier
|
||||||
async fn cleanup_partial_hls(playlist_tmp: &str, playlist_dir: &str, video_stem: &str) {
|
/// caller in the same path (today there is none, but kept defensively so
|
||||||
let _ = tokio::fs::remove_file(playlist_tmp).await;
|
/// the function is safe to call after sentinel write too).
|
||||||
|
async fn cleanup_partial_hls(hash_dir: &Path) {
|
||||||
let segment_prefix = format!("{}_", video_stem);
|
let Ok(mut entries) = tokio::fs::read_dir(hash_dir).await else {
|
||||||
let Ok(mut entries) = tokio::fs::read_dir(playlist_dir).await else {
|
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
while let Ok(Some(entry)) = entries.next_entry().await {
|
while let Ok(Some(entry)) = entries.next_entry().await {
|
||||||
let Some(name) = entry.file_name().to_str().map(str::to_owned) else {
|
let path = entry.path();
|
||||||
|
let is_sentinel = path
|
||||||
|
.file_name()
|
||||||
|
.and_then(|n| n.to_str())
|
||||||
|
.map(|n| n == hls_paths::UNSUPPORTED_SENTINEL_FILENAME)
|
||||||
|
.unwrap_or(false);
|
||||||
|
if is_sentinel {
|
||||||
continue;
|
continue;
|
||||||
};
|
}
|
||||||
if name.starts_with(&segment_prefix)
|
if let Err(e) = tokio::fs::remove_file(&path).await {
|
||||||
&& name.ends_with(".ts")
|
warn!(
|
||||||
&& let Err(e) = tokio::fs::remove_file(entry.path()).await
|
"Failed to remove partial HLS file {}: {}",
|
||||||
{
|
path.display(),
|
||||||
warn!("Failed to remove partial segment {}: {}", name, e);
|
e
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// First 16 chars of a content hash for log lines. Short enough to keep
|
||||||
|
/// log volume sane, long enough that distinct hashes don't collide in
|
||||||
|
/// practice.
|
||||||
|
fn short_hash(hash: &str) -> &str {
|
||||||
|
let end = hash
|
||||||
|
.char_indices()
|
||||||
|
.nth(16)
|
||||||
|
.map(|(i, _)| i)
|
||||||
|
.unwrap_or(hash.len());
|
||||||
|
&hash[..end]
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Message)]
|
#[derive(Message)]
|
||||||
#[rtype(result = "()")]
|
#[rtype(result = "()")]
|
||||||
pub struct GeneratePreviewClipMessage {
|
pub struct GeneratePreviewClipMessage {
|
||||||
|
|||||||
@@ -40,7 +40,10 @@ use crate::tags;
|
|||||||
use crate::tags::SqliteTagDao;
|
use crate::tags::SqliteTagDao;
|
||||||
use crate::thumbnails;
|
use crate::thumbnails;
|
||||||
use crate::video;
|
use crate::video;
|
||||||
use crate::video::actors::{GeneratePreviewClipMessage, QueueVideosMessage, VideoPlaylistManager};
|
use crate::video::actors::{
|
||||||
|
GeneratePreviewClipMessage, QueueVideosMessage, VideoPlaylistManager, VideoToQueue,
|
||||||
|
};
|
||||||
|
use crate::video::hls_paths;
|
||||||
|
|
||||||
/// Clean up orphaned HLS playlists and segments whose source videos no longer exist.
|
/// Clean up orphaned HLS playlists and segments whose source videos no longer exist.
|
||||||
///
|
///
|
||||||
@@ -288,7 +291,12 @@ pub fn watch_files(
|
|||||||
));
|
));
|
||||||
|
|
||||||
let mut last_quick_scan = SystemTime::now();
|
let mut last_quick_scan = SystemTime::now();
|
||||||
let mut last_full_scan = SystemTime::now();
|
// Initialize to UNIX_EPOCH so the *first* tick is treated as a
|
||||||
|
// full scan. That replaces the legacy startup ScanDirectoryMessage
|
||||||
|
// walk for HLS playlists: every library's existing media gets
|
||||||
|
// checked once at watcher boot, instead of waiting up to
|
||||||
|
// full_interval_secs (1h default) for the first natural full scan.
|
||||||
|
let mut last_full_scan = SystemTime::UNIX_EPOCH;
|
||||||
let mut scan_count = 0u64;
|
let mut scan_count = 0u64;
|
||||||
|
|
||||||
// Per-library cursor for the missing-file scan. Each tick reads
|
// Per-library cursor for the missing-file scan. Each tick reads
|
||||||
@@ -600,14 +608,18 @@ pub fn process_new_files(
|
|||||||
// Batch query: Get all EXIF data for these files in one query
|
// Batch query: Get all EXIF data for these files in one query
|
||||||
let file_paths: Vec<String> = files.iter().map(|(_, rel_path)| rel_path.clone()).collect();
|
let file_paths: Vec<String> = files.iter().map(|(_, rel_path)| rel_path.clone()).collect();
|
||||||
|
|
||||||
let existing_exif_paths: HashMap<String, bool> = {
|
// Map of rel_path -> Option<content_hash>. The presence of the key
|
||||||
|
// tells us "row exists"; the Option value carries the hash for the
|
||||||
|
// HLS pipeline so video files without a hash (mid-backfill) skip
|
||||||
|
// this tick rather than fall back to a basename-colliding playlist.
|
||||||
|
let existing_exif: HashMap<String, Option<String>> = {
|
||||||
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
|
||||||
// Walk is per-library, so scope the lookup so a same-named file
|
// Walk is per-library, so scope the lookup so a same-named file
|
||||||
// in another library doesn't make this one look already-indexed.
|
// in another library doesn't make this one look already-indexed.
|
||||||
match dao.get_exif_batch(&context, Some(library.id), &file_paths) {
|
match dao.get_exif_batch(&context, Some(library.id), &file_paths) {
|
||||||
Ok(exif_records) => exif_records
|
Ok(exif_records) => exif_records
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|record| (record.file_path, true))
|
.map(|record| (record.file_path, record.content_hash))
|
||||||
.collect(),
|
.collect(),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Error batch querying EXIF data: {:?}", e);
|
error!("Error batch querying EXIF data: {:?}", e);
|
||||||
@@ -637,7 +649,7 @@ pub fn process_new_files(
|
|||||||
&& !bare_legacy_thumb_path.exists()
|
&& !bare_legacy_thumb_path.exists()
|
||||||
&& !thumbnails::unsupported_thumbnail_sentinel(&scoped_thumb_path).exists()
|
&& !thumbnails::unsupported_thumbnail_sentinel(&scoped_thumb_path).exists()
|
||||||
&& !thumbnails::unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists();
|
&& !thumbnails::unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists();
|
||||||
let needs_row = !existing_exif_paths.contains_key(relative_path);
|
let needs_row = !existing_exif.contains_key(relative_path);
|
||||||
|
|
||||||
if needs_thumbnail || needs_row {
|
if needs_thumbnail || needs_row {
|
||||||
new_files_found = true;
|
new_files_found = true;
|
||||||
@@ -796,28 +808,45 @@ pub fn process_new_files(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for videos that need HLS playlists
|
// Check for videos that need HLS playlists. All output is keyed on
|
||||||
|
// `content_hash` (see `crate::video::hls_paths`), so files whose
|
||||||
|
// `image_exif.content_hash` is still NULL — typically mid-backfill —
|
||||||
|
// are skipped this tick and picked up after the unhashed backlog
|
||||||
|
// drain populates the hash on a subsequent tick. Skipping is the
|
||||||
|
// correct call: queuing without a hash would either fall back to
|
||||||
|
// basename keying (the bug this refactor fixes) or fabricate one.
|
||||||
let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
|
let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
|
||||||
let mut videos_needing_playlists = Vec::new();
|
let video_dir = Path::new(&video_path_base);
|
||||||
|
let mut videos_needing_playlists: Vec<VideoToQueue> = Vec::new();
|
||||||
|
let mut hashless_video_count = 0usize;
|
||||||
|
|
||||||
for (file_path, _relative_path) in &files {
|
for (file_path, relative_path) in &files {
|
||||||
if file_types::is_video_file(file_path) {
|
if !file_types::is_video_file(file_path) {
|
||||||
// Construct expected playlist path
|
continue;
|
||||||
let playlist_filename =
|
}
|
||||||
format!("{}.m3u8", file_path.file_name().unwrap().to_string_lossy());
|
let Some(hash) = existing_exif.get(relative_path).and_then(|h| h.clone()) else {
|
||||||
let playlist_path = Path::new(&video_path_base).join(&playlist_filename);
|
hashless_video_count += 1;
|
||||||
|
continue;
|
||||||
// Check if playlist needs (re)generation
|
};
|
||||||
|
let playlist_path = hls_paths::playlist_for_hash(video_dir, &hash);
|
||||||
if playlist_needs_generation(file_path, &playlist_path) {
|
if playlist_needs_generation(file_path, &playlist_path) {
|
||||||
videos_needing_playlists.push(file_path.clone());
|
videos_needing_playlists.push(VideoToQueue {
|
||||||
}
|
video_path: file_path.clone(),
|
||||||
|
content_hash: hash,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send queue request to playlist manager
|
if hashless_video_count > 0 {
|
||||||
|
debug!(
|
||||||
|
"Watcher tick for '{}': skipped {} video(s) with NULL content_hash (will retry after backfill)",
|
||||||
|
library.name, hashless_video_count
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
if !videos_needing_playlists.is_empty() {
|
if !videos_needing_playlists.is_empty() {
|
||||||
playlist_manager.do_send(QueueVideosMessage {
|
playlist_manager.do_send(QueueVideosMessage {
|
||||||
video_paths: videos_needing_playlists,
|
videos: videos_needing_playlists,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user