From c71e1cdce079043312cd094807cea6d2ec540aa4 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 14 May 2026 15:23:31 -0400 Subject: [PATCH 01/10] hls: add hash-keyed path helpers + VideoToQueue type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation for migrating HLS playlist output from basename-keyed (`$VIDEO_PATH/{basename}.m3u8`) to content-hash-keyed (`$VIDEO_PATH/{hash[..2]}/{hash}/playlist.m3u8`). The basename layout collides whenever two source videos share a filename — common with iPhone-style sequential naming (`IMG_NNNN.MOV`) across libraries — so the loser's playlist gets overwritten and ffmpeg keeps re-queueing the file every scan. This commit adds the path layout and type plumbing without touching the actor pipeline, watcher, or HTTP handlers yet: - `src/video/hls_paths.rs`: `playlist_for_hash`, `sentinel_for_hash`, `segment_template_for_hash` built on top of `content_hash::hls_dir`, with constants for the filenames inside the hash dir. Unit tests cover the sharded layout and the playlist/sentinel/segment paths all landing in the same directory (so HLS relative refs resolve). - `src/content_hash::hls_dir` un-deaded — was waiting for this branch. - `VideoToQueue` struct in `actors.rs`: pairs a source path with its content hash so callers that lack a hash (rows mid-backfill) skip the video rather than fabricate one. - `playlist_file_for` / `playlist_unsupported_sentinel` retained as migration-only helpers — they're only needed by the one-shot startup pass that retires pre-content-hash output. Follow-ups (separate commits on this branch): wire `hls_paths` through the queue handler + `PlaylistGenerator`, update the watcher's `process_new_files` to build `VideoToQueue`, switch `/video/generate` and `/video/stream` to resolve path→hash and return stable URLs, add the legacy-layout migration, rewrite `cleanup_orphaned_playlists` for the new dir shape, and surface progress via Prometheus + `/hls/stats`. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/content_hash.rs | 9 ++--- src/video/actors.rs | 19 ++++++++-- src/video/hls_paths.rs | 84 ++++++++++++++++++++++++++++++++++++++++++ src/video/mod.rs | 1 + 4 files changed, 103 insertions(+), 10 deletions(-) create mode 100644 src/video/hls_paths.rs diff --git a/src/content_hash.rs b/src/content_hash.rs index 5d334ff..a2a9e9e 100644 --- a/src/content_hash.rs +++ b/src/content_hash.rs @@ -52,12 +52,9 @@ pub fn thumbnail_path(thumbs_dir: &Path, hash: &str) -> PathBuf { /// Hash-keyed HLS output directory: `///`. /// The playlist lives at `playlist.m3u8` inside this directory and its -/// segments are co-located so HLS relative references Just Work. -/// -/// Allow-dead until Branch B/C rewires the HLS pipeline to use it; the -/// helper lives here today so Branch A's path layout decisions stay -/// adjacent to thumbnail/legacy ones. -#[allow(dead_code)] +/// segments are co-located so HLS relative references Just Work. See +/// [`crate::video::hls_paths`] for the filename constants and the +/// per-file helpers built on this dir. pub fn hls_dir(video_dir: &Path, hash: &str) -> PathBuf { let shard = shard_prefix(hash); video_dir.join(shard).join(hash) diff --git a/src/video/actors.rs b/src/video/actors.rs index eb539ef..4d7b90a 100644 --- a/src/video/actors.rs +++ b/src/video/actors.rs @@ -47,6 +47,19 @@ impl Handler for StreamActor { } } +/// A video paired with its content hash, ready to be queued for HLS +/// playlist generation. Hash is required because all output paths are +/// keyed on it; callers that lack a hash (rows mid-backfill) must skip +/// the video rather than fabricate one. +#[derive(Debug, Clone)] +pub struct VideoToQueue { + pub video_path: PathBuf, + pub content_hash: String, +} + +/// Legacy basename-keyed playlist path. Used only by the one-shot startup +/// migration that retires pre-content-hash output; once cleared, all new +/// playlist writes go through [`hls_paths::playlist_for_hash`]. pub fn playlist_file_for(playlist_dir: &str, video_path: &Path) -> PathBuf { let filename = video_path .file_name() @@ -55,10 +68,8 @@ pub fn playlist_file_for(playlist_dir: &str, video_path: &Path) -> PathBuf { PathBuf::from(format!("{}/{}.m3u8", playlist_dir, filename)) } -/// Sentinel path written next to a would-be playlist when ffmpeg cannot -/// transcode the source (e.g. truncated mp4 with no moov atom). Its presence -/// causes future scans to skip the file instead of re-running ffmpeg every -/// pass. Delete the `.unsupported` file to force a retry. +/// Legacy basename-keyed sentinel path. Same migration-only contract as +/// [`playlist_file_for`]. pub fn playlist_unsupported_sentinel(playlist_file: &Path) -> PathBuf { let mut s = playlist_file.as_os_str().to_owned(); s.push(".unsupported"); diff --git a/src/video/hls_paths.rs b/src/video/hls_paths.rs new file mode 100644 index 0000000..cf7fa96 --- /dev/null +++ b/src/video/hls_paths.rs @@ -0,0 +1,84 @@ +//! Path layout for hash-keyed HLS output. +//! +//! Source-of-truth is [`crate::content_hash::hls_dir`], which gives +//! `///`. The playlist, the per-segment files, +//! and the "ffmpeg refused" sentinel all live inside that directory so a +//! `.m3u8` written with relative segment references resolves correctly +//! at serve time without any URL rewriting. + +use std::path::{Path, PathBuf}; + +use crate::content_hash; + +/// Standard filename for the HLS playlist inside a hash dir. Fixed so +/// the URL contract is `playlist.m3u8` regardless of the source video's +/// original basename. +pub const PLAYLIST_FILENAME: &str = "playlist.m3u8"; + +/// Sentinel filename written when ffmpeg refused to transcode the +/// source. Presence in the hash dir tells future scans to skip the file +/// instead of re-running ffmpeg every tick. Delete to force a retry. +pub const UNSUPPORTED_SENTINEL_FILENAME: &str = "playlist.unsupported"; + +/// Segment-name template passed to ffmpeg via `-hls_segment_filename`. +/// Segments live inside the hash dir; the playlist's relative refs +/// resolve to siblings automatically. +pub const SEGMENT_TEMPLATE: &str = "segment_%03d.ts"; + +/// Path to the HLS playlist for a video identified by content hash. +pub fn playlist_for_hash(video_dir: &Path, hash: &str) -> PathBuf { + content_hash::hls_dir(video_dir, hash).join(PLAYLIST_FILENAME) +} + +/// Path to the unsupported-source sentinel for a hash. +pub fn sentinel_for_hash(video_dir: &Path, hash: &str) -> PathBuf { + content_hash::hls_dir(video_dir, hash).join(UNSUPPORTED_SENTINEL_FILENAME) +} + +/// Absolute path used as ffmpeg's `-hls_segment_filename` value. +pub fn segment_template_for_hash(video_dir: &Path, hash: &str) -> PathBuf { + content_hash::hls_dir(video_dir, hash).join(SEGMENT_TEMPLATE) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn playlist_path_lives_inside_sharded_hash_dir() { + let video = Path::new("/var/video"); + let p = playlist_for_hash(video, "abcdef0123456789"); + assert_eq!( + p, + PathBuf::from("/var/video/ab/abcdef0123456789/playlist.m3u8") + ); + } + + #[test] + fn sentinel_path_lives_alongside_playlist() { + let video = Path::new("/var/video"); + let s = sentinel_for_hash(video, "abcdef0123456789"); + assert_eq!( + s, + PathBuf::from("/var/video/ab/abcdef0123456789/playlist.unsupported") + ); + } + + #[test] + fn segment_template_lives_alongside_playlist() { + let video = Path::new("/var/video"); + let t = segment_template_for_hash(video, "abcdef0123456789"); + assert_eq!( + t, + PathBuf::from("/var/video/ab/abcdef0123456789/segment_%03d.ts") + ); + } + + #[test] + fn distinct_hashes_yield_distinct_dirs() { + let video = Path::new("/var/video"); + let a = playlist_for_hash(video, "1111aaaa"); + let b = playlist_for_hash(video, "2222bbbb"); + assert_ne!(a.parent(), b.parent()); + } +} diff --git a/src/video/mod.rs b/src/video/mod.rs index 8078a1f..e7cfa67 100644 --- a/src/video/mod.rs +++ b/src/video/mod.rs @@ -9,6 +9,7 @@ use walkdir::WalkDir; pub mod actors; pub mod ffmpeg; +pub mod hls_paths; #[allow(dead_code)] pub async fn generate_video_gifs() { -- 2.49.1 From d1667099c357650cd83652c23e4a7e02275e129a Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 14 May 2026 15:36:01 -0400 Subject: [PATCH 02/10] hls: rewire queue + generator to write hash-keyed playlists MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Switches the watcher → VideoPlaylistManager → PlaylistGenerator path from the basename-keyed layout (`$VIDEO_PATH/{basename}.m3u8`) to the hash-keyed layout (`$VIDEO_PATH/{hash[..2]}/{hash}/playlist.m3u8`) introduced in the prior commit. Source videos that share a basename across libraries (or across subdirs of one library) no longer overwrite each other's playlists. The legacy HTTP endpoints in `/video/generate` / `/video/stream` still use the basename layout — those move in a follow-up commit alongside the stable streaming URL. actors.rs: - `QueueVideosMessage.video_paths: Vec` → `videos: Vec`. The queue handler dedups against the hash-keyed playlist + sentinel and forwards `GeneratePlaylistMessage` carrying the hash. - `GeneratePlaylistMessage` now carries `content_hash: String`; the legacy `playlist_path: String` field is gone. - `PlaylistGenerator` takes a `video_dir: PathBuf` at construction, computes the hash dir + playlist + sentinel + segment template via `hls_paths`, `mkdir -p`s the shard/hash dir before ffmpeg runs, and cleans up partial output on failure by walking the hash dir. - `ScanDirectoryMessage` and its handler are retired entirely; their startup-walk role is taken over by the watcher's first tick (see `watcher.rs` below). Dropping it avoids threading an `ExifDao` into `VideoPlaylistManager` just so the actor can resolve hashes. - Legacy `playlist_file_for` / `playlist_unsupported_sentinel` are retained behind `#[allow(dead_code)]` for the upcoming migration pass that retires pre-content-hash output. watcher.rs: - `process_new_files` keeps `content_hash` in the EXIF-batch result (formerly threw it away). Videos with `image_exif.content_hash = NULL` — mid-backfill rows — are skipped this tick rather than falling back to a basename-colliding playlist; they get picked up after `backfill_unhashed_backlog` populates the hash on a subsequent tick. Skipped count is logged at debug. - The video staleness check now uses `hls_paths::playlist_for_hash` instead of `$VIDEO_PATH/{basename}.m3u8`. - `last_full_scan` initialises to `UNIX_EPOCH` so the watcher's first tick is treated as a full scan. That covers the catch-up gap left by removing `ScanDirectoryMessage` — every library's existing media is checked once at watcher boot (≈60s after startup) instead of waiting up to `WATCH_FULL_INTERVAL_SECONDS` (1h default). main.rs: removes the `ScanDirectoryMessage` import and the per-library `do_send` loop, with a comment pointing at the watcher's first-tick behavior. state.rs: `PlaylistGenerator::new` now takes the video dir. Tests: existing `video::hls_paths` (4) and `watcher::tests` (4) pass. The basename-keyed `/video/generate` endpoint still compiles and serves; behavior change there is deferred to the follow-up commit. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/main.rs | 12 +-- src/state.rs | 2 +- src/video/actors.rs | 256 ++++++++++++++++++-------------------------- src/watcher.rs | 69 ++++++++---- 4 files changed, 161 insertions(+), 178 deletions(-) diff --git a/src/main.rs b/src/main.rs index 30be9dc..ec5b3d0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,7 +26,6 @@ use crate::files::{RealFileSystem, move_file}; use crate::service::ServiceBuilder; use crate::state::AppState; use crate::tags::*; -use crate::video::actors::ScanDirectoryMessage; use log::{error, info}; mod ai; @@ -119,13 +118,12 @@ fn main() -> std::io::Result<()> { .unwrap(); let app_state = app_data.clone(); - for lib in &app_state.libraries { - app_state.playlist_manager.do_send(ScanDirectoryMessage { - directory: lib.root_path.clone(), - }); - } - // Start file watcher with playlist manager and preview generator + // Start file watcher with playlist manager and preview generator. + // The watcher's first tick is configured to be a full scan (see + // `watch_files`), so every library's missing HLS playlists are + // queued on that first iteration — no separate startup walk + // needed. let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone(); let preview_gen_for_watcher = app_state.preview_clip_generator.as_ref().clone(); // Both background jobs read from the shared `live_libraries` lock diff --git a/src/state.rs b/src/state.rs index b147c77..fd39cba 100644 --- a/src/state.rs +++ b/src/state.rs @@ -111,7 +111,7 @@ impl AppState { "AppState::new requires at least one library" ); let base_path = libraries_vec[0].root_path.clone(); - let playlist_generator = PlaylistGenerator::new(); + let playlist_generator = PlaylistGenerator::new(video_path.clone()); let video_playlist_manager = VideoPlaylistManager::new(video_path.clone(), playlist_generator.start()); diff --git a/src/video/actors.rs b/src/video/actors.rs index 4d7b90a..d0ae04f 100644 --- a/src/video/actors.rs +++ b/src/video/actors.rs @@ -1,8 +1,9 @@ +use crate::content_hash; use crate::database::PreviewDao; use crate::libraries::Library; use crate::otel::global_tracer; -use crate::thumbnails::is_video; use crate::video::ffmpeg::{generate_preview_clip, get_duration_seconds_blocking}; +use crate::video::hls_paths; use actix::prelude::*; use log::{debug, error, info, trace, warn}; use opentelemetry::KeyValue; @@ -12,7 +13,6 @@ use std::path::{Path, PathBuf}; use std::process::{Child, Command, ExitStatus, Stdio}; use std::sync::{Arc, Mutex}; use tokio::sync::Semaphore; -use walkdir::{DirEntry, WalkDir}; // ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8 // ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8 @@ -57,9 +57,11 @@ pub struct VideoToQueue { pub content_hash: String, } -/// Legacy basename-keyed playlist path. Used only by the one-shot startup -/// migration that retires pre-content-hash output; once cleared, all new -/// playlist writes go through [`hls_paths::playlist_for_hash`]. +/// Legacy basename-keyed playlist path. Retained for the one-shot startup +/// migration that retires pre-content-hash output; new playlist writes go +/// through [`hls_paths::playlist_for_hash`]. Will be removed once the +/// migration ships and runs to completion in production. +#[allow(dead_code)] pub fn playlist_file_for(playlist_dir: &str, video_path: &Path) -> PathBuf { let filename = video_path .file_name() @@ -70,6 +72,7 @@ pub fn playlist_file_for(playlist_dir: &str, video_path: &Path) -> PathBuf { /// Legacy basename-keyed sentinel path. Same migration-only contract as /// [`playlist_file_for`]. +#[allow(dead_code)] pub fn playlist_unsupported_sentinel(playlist_file: &Path) -> PathBuf { let mut s = playlist_file.as_os_str().to_owned(); s.push(".unsupported"); @@ -342,17 +345,17 @@ async fn get_max_gop_seconds(video_path: &str) -> Option { } pub struct VideoPlaylistManager { - playlist_dir: PathBuf, + video_dir: PathBuf, playlist_generator: Addr, } impl VideoPlaylistManager { pub fn new>( - playlist_dir: P, + video_dir: P, playlist_generator: Addr, ) -> Self { Self { - playlist_dir: playlist_dir.into(), + video_dir: video_dir.into(), playlist_generator, } } @@ -362,144 +365,68 @@ impl Actor for VideoPlaylistManager { type Context = Context; } -impl Handler for VideoPlaylistManager { - type Result = ResponseFuture<()>; - - fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result { - let tracer = global_tracer(); - let mut span = tracer.start("videoplaylistmanager.scan_directory"); - - let start = std::time::Instant::now(); - info!( - "Starting scan directory for video playlist generation: {}", - msg.directory - ); - - let playlist_output_dir = self.playlist_dir.clone(); - let playlist_dir_str = playlist_output_dir.to_str().unwrap().to_string(); - - let video_files = WalkDir::new(&msg.directory) - .into_iter() - .filter_map(|e| e.ok()) - .filter(|e| e.file_type().is_file()) - .filter(is_video) - .filter(|e| { - let playlist = playlist_file_for(&playlist_dir_str, e.path()); - !playlist.exists() && !playlist_unsupported_sentinel(&playlist).exists() - }) - .collect::>(); - - let scan_dir_name = msg.directory.clone(); - let playlist_generator = self.playlist_generator.clone(); - - Box::pin(async move { - for e in video_files { - let path = e.path(); - let path_as_str = path.to_str().unwrap(); - debug!( - "Sending generate playlist message for path: {}", - path_as_str - ); - - match playlist_generator - .send(GeneratePlaylistMessage { - playlist_path: playlist_output_dir.to_str().unwrap().to_string(), - video_path: PathBuf::from(path), - }) - .await - .expect("Failed to send generate playlist message") - { - Ok(_) => { - span.add_event( - "Playlist generated", - vec![KeyValue::new("video_path", path_as_str.to_string())], - ); - - debug!( - "Successfully generated playlist for file: '{}'", - path_as_str - ); - } - Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { - debug!("Playlist already exists for '{:?}', skipping", path); - } - Err(e) => { - warn!("Failed to generate playlist for path '{:?}'. {:?}", path, e); - } - } - } - - span.add_event( - "Finished directory scan", - vec![KeyValue::new("directory", scan_dir_name.to_string())], - ); - info!( - "Finished directory scan of '{}' in {:?}", - scan_dir_name, - start.elapsed() - ); - }) - } -} - impl Handler for VideoPlaylistManager { type Result = (); fn handle(&mut self, msg: QueueVideosMessage, _ctx: &mut Self::Context) -> Self::Result { - if msg.video_paths.is_empty() { + if msg.videos.is_empty() { return; } - info!( - "Queueing {} videos for HLS playlist generation", - msg.video_paths.len() - ); - - let playlist_output_dir = self.playlist_dir.clone(); - let playlist_dir_str = playlist_output_dir.to_str().unwrap().to_string(); + let video_dir = self.video_dir.clone(); let playlist_generator = self.playlist_generator.clone(); - for video_path in msg.video_paths { - let playlist = playlist_file_for(&playlist_dir_str, &video_path); - if playlist.exists() || playlist_unsupported_sentinel(&playlist).exists() { + let mut queued = 0usize; + let mut already_present = 0usize; + for VideoToQueue { + video_path, + content_hash, + } in msg.videos + { + let playlist = hls_paths::playlist_for_hash(&video_dir, &content_hash); + let sentinel = hls_paths::sentinel_for_hash(&video_dir, &content_hash); + if playlist.exists() || sentinel.exists() { + already_present += 1; continue; } - let path_str = video_path.to_string_lossy().to_string(); - debug!("Queueing playlist generation for: {}", path_str); - + debug!( + "Queueing playlist generation for {} (hash={})", + video_path.display(), + short_hash(&content_hash) + ); playlist_generator.do_send(GeneratePlaylistMessage { - playlist_path: playlist_dir_str.clone(), video_path, + content_hash, }); + queued += 1; } + info!( + "Queue tick: {} queued, {} skipped (playlist or sentinel already on disk)", + queued, already_present + ); } } -#[derive(Message)] -#[rtype(result = "()")] -pub struct ScanDirectoryMessage { - pub(crate) directory: String, -} - #[derive(Message)] #[rtype(result = "()")] pub struct QueueVideosMessage { - pub video_paths: Vec, + pub videos: Vec, } #[derive(Message)] #[rtype(result = "Result<()>")] pub struct GeneratePlaylistMessage { pub video_path: PathBuf, - pub playlist_path: String, + pub content_hash: String, } pub struct PlaylistGenerator { semaphore: Arc, + video_dir: PathBuf, } impl PlaylistGenerator { - pub(crate) fn new() -> Self { + pub(crate) fn new>(video_dir: P) -> Self { // Concurrency is tunable via HLS_CONCURRENCY so operators can dial // it to their hardware: 1 on weak Synology boxes to avoid thermal // throttling, higher on desktops with spare cores. @@ -511,6 +438,7 @@ impl PlaylistGenerator { info!("PlaylistGenerator: concurrency={}", concurrency); PlaylistGenerator { semaphore: Arc::new(Semaphore::new(concurrency)), + video_dir: video_dir.into(), } } } @@ -524,20 +452,23 @@ impl Handler for PlaylistGenerator { fn handle(&mut self, msg: GeneratePlaylistMessage, _ctx: &mut Self::Context) -> Self::Result { let video_file = msg.video_path.to_str().unwrap().to_owned(); - let playlist_path = msg.playlist_path.as_str().to_owned(); + let content_hash_str = msg.content_hash.clone(); let semaphore = self.semaphore.clone(); + let video_dir = self.video_dir.clone(); - let playlist_file = format!( - "{}/{}.m3u8", - playlist_path, - msg.video_path.file_name().unwrap().to_str().unwrap() - ); + let hash_dir = content_hash::hls_dir(&video_dir, &content_hash_str); + let playlist_path = hls_paths::playlist_for_hash(&video_dir, &content_hash_str); + let sentinel_path = hls_paths::sentinel_for_hash(&video_dir, &content_hash_str); + let segment_template = hls_paths::segment_template_for_hash(&video_dir, &content_hash_str); + let playlist_file = playlist_path.to_string_lossy().to_string(); + let segment_pattern = segment_template.to_string_lossy().to_string(); let tracer = global_tracer(); let mut span = tracer .span_builder("playlistgenerator.generate_playlist") .with_attributes(vec![ KeyValue::new("video_file", video_file.clone()), + KeyValue::new("content_hash", content_hash_str.clone()), KeyValue::new("playlist_file", playlist_file.clone()), ]) .start(&tracer); @@ -561,7 +492,7 @@ impl Handler for PlaylistGenerator { )], ); - if Path::new(&playlist_file).exists() { + if playlist_path.exists() { debug!("Playlist already exists: {}", playlist_file); span.set_status(Status::error(format!( "Playlist already exists: {}", @@ -570,6 +501,19 @@ impl Handler for PlaylistGenerator { return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)); } + // Ensure the shard + hash directory exist. Idempotent — the + // dir may already be present from a prior attempt that wrote + // a sentinel before being cleared for retry. + if let Err(e) = tokio::fs::create_dir_all(&hash_dir).await { + error!( + "Failed to create HLS hash dir {}: {}", + hash_dir.display(), + e + ); + span.set_status(Status::error(format!("mkdir failed: {}", e))); + return Err(e); + } + // One ffprobe call for codec + rotation metadata. let stream_meta = probe_video_stream_meta(&video_file).await; let is_h264 = stream_meta.is_h264; @@ -630,16 +574,11 @@ impl Handler for PlaylistGenerator { span.add_event("Transcoding to h264", vec![]); } - // Encode to a .tmp playlist and explicit segment names so a failed - // encode leaves predictable artifacts we can clean up — and so a - // concurrent scan doesn't see a half-written .m3u8 as "done". + // Encode to a .tmp playlist alongside the final inside the + // hash dir, so a concurrent scan never sees a half-written + // .m3u8 as "done". Segments use the hash-keyed template; + // ffmpeg writes them next to the playlist (relative refs). let playlist_tmp = format!("{}.tmp", playlist_file); - let video_stem = msg - .video_path - .file_name() - .and_then(|n| n.to_str()) - .unwrap_or("video"); - let segment_pattern = format!("{}/{}_%03d.ts", playlist_path, video_stem); let mut cmd = tokio::process::Command::new("ffmpeg"); cmd.arg("-y").arg("-i").arg(&video_file); @@ -728,12 +667,12 @@ impl Handler for PlaylistGenerator { let success = matches!(&ffmpeg_result, Ok(out) if out.status.success()); if success { - if let Err(e) = tokio::fs::rename(&playlist_tmp, &playlist_file).await { + if let Err(e) = tokio::fs::rename(&playlist_tmp, &playlist_path).await { error!( "ffmpeg succeeded but rename {} -> {} failed: {}", playlist_tmp, playlist_file, e ); - cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await; + cleanup_partial_hls(&hash_dir).await; span.set_status(Status::error(format!("rename failed: {}", e))); return Err(e); } @@ -750,18 +689,17 @@ impl Handler for PlaylistGenerator { Err(e) => format!("ffmpeg failed: {}", e), }; error!("ffmpeg failed for {}: {}", video_file, detail); - cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await; - let sentinel = playlist_unsupported_sentinel(Path::new(&playlist_file)); - if let Err(se) = tokio::fs::write(&sentinel, b"").await { + cleanup_partial_hls(&hash_dir).await; + if let Err(se) = tokio::fs::write(&sentinel_path, b"").await { warn!( "Failed to write playlist sentinel {}: {}", - sentinel.display(), + sentinel_path.display(), se ); } else { info!( "Wrote playlist sentinel {} so future scans skip {}", - sentinel.display(), + sentinel_path.display(), video_file ); } @@ -772,29 +710,47 @@ impl Handler for PlaylistGenerator { } } -/// Delete the temp playlist and any segment files that ffmpeg may have written -/// before failing. Called both on ffmpeg error and on rename failure so a -/// retry on the next scan starts from a clean slate. -async fn cleanup_partial_hls(playlist_tmp: &str, playlist_dir: &str, video_stem: &str) { - let _ = tokio::fs::remove_file(playlist_tmp).await; - - let segment_prefix = format!("{}_", video_stem); - let Ok(mut entries) = tokio::fs::read_dir(playlist_dir).await else { +/// Delete the partial playlist (.tmp) and any segment files left behind by +/// a failed ffmpeg run. Wipes every non-sentinel file in the hash dir; +/// retains the sentinel if one has already been written by an earlier +/// caller in the same path (today there is none, but kept defensively so +/// the function is safe to call after sentinel write too). +async fn cleanup_partial_hls(hash_dir: &Path) { + let Ok(mut entries) = tokio::fs::read_dir(hash_dir).await else { return; }; while let Ok(Some(entry)) = entries.next_entry().await { - let Some(name) = entry.file_name().to_str().map(str::to_owned) else { + let path = entry.path(); + let is_sentinel = path + .file_name() + .and_then(|n| n.to_str()) + .map(|n| n == hls_paths::UNSUPPORTED_SENTINEL_FILENAME) + .unwrap_or(false); + if is_sentinel { continue; - }; - if name.starts_with(&segment_prefix) - && name.ends_with(".ts") - && let Err(e) = tokio::fs::remove_file(entry.path()).await - { - warn!("Failed to remove partial segment {}: {}", name, e); + } + if let Err(e) = tokio::fs::remove_file(&path).await { + warn!( + "Failed to remove partial HLS file {}: {}", + path.display(), + e + ); } } } +/// First 16 chars of a content hash for log lines. Short enough to keep +/// log volume sane, long enough that distinct hashes don't collide in +/// practice. +fn short_hash(hash: &str) -> &str { + let end = hash + .char_indices() + .nth(16) + .map(|(i, _)| i) + .unwrap_or(hash.len()); + &hash[..end] +} + #[derive(Message)] #[rtype(result = "()")] pub struct GeneratePreviewClipMessage { diff --git a/src/watcher.rs b/src/watcher.rs index ca56ea6..c8de1ed 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -40,7 +40,10 @@ use crate::tags; use crate::tags::SqliteTagDao; use crate::thumbnails; use crate::video; -use crate::video::actors::{GeneratePreviewClipMessage, QueueVideosMessage, VideoPlaylistManager}; +use crate::video::actors::{ + GeneratePreviewClipMessage, QueueVideosMessage, VideoPlaylistManager, VideoToQueue, +}; +use crate::video::hls_paths; /// Clean up orphaned HLS playlists and segments whose source videos no longer exist. /// @@ -288,7 +291,12 @@ pub fn watch_files( )); let mut last_quick_scan = SystemTime::now(); - let mut last_full_scan = SystemTime::now(); + // Initialize to UNIX_EPOCH so the *first* tick is treated as a + // full scan. That replaces the legacy startup ScanDirectoryMessage + // walk for HLS playlists: every library's existing media gets + // checked once at watcher boot, instead of waiting up to + // full_interval_secs (1h default) for the first natural full scan. + let mut last_full_scan = SystemTime::UNIX_EPOCH; let mut scan_count = 0u64; // Per-library cursor for the missing-file scan. Each tick reads @@ -600,14 +608,18 @@ pub fn process_new_files( // Batch query: Get all EXIF data for these files in one query let file_paths: Vec = files.iter().map(|(_, rel_path)| rel_path.clone()).collect(); - let existing_exif_paths: HashMap = { + // Map of rel_path -> Option. The presence of the key + // tells us "row exists"; the Option value carries the hash for the + // HLS pipeline so video files without a hash (mid-backfill) skip + // this tick rather than fall back to a basename-colliding playlist. + let existing_exif: HashMap> = { let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); // Walk is per-library, so scope the lookup so a same-named file // in another library doesn't make this one look already-indexed. match dao.get_exif_batch(&context, Some(library.id), &file_paths) { Ok(exif_records) => exif_records .into_iter() - .map(|record| (record.file_path, true)) + .map(|record| (record.file_path, record.content_hash)) .collect(), Err(e) => { error!("Error batch querying EXIF data: {:?}", e); @@ -637,7 +649,7 @@ pub fn process_new_files( && !bare_legacy_thumb_path.exists() && !thumbnails::unsupported_thumbnail_sentinel(&scoped_thumb_path).exists() && !thumbnails::unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists(); - let needs_row = !existing_exif_paths.contains_key(relative_path); + let needs_row = !existing_exif.contains_key(relative_path); if needs_thumbnail || needs_row { new_files_found = true; @@ -796,28 +808,45 @@ pub fn process_new_files( } } - // Check for videos that need HLS playlists + // Check for videos that need HLS playlists. All output is keyed on + // `content_hash` (see `crate::video::hls_paths`), so files whose + // `image_exif.content_hash` is still NULL — typically mid-backfill — + // are skipped this tick and picked up after the unhashed backlog + // drain populates the hash on a subsequent tick. Skipping is the + // correct call: queuing without a hash would either fall back to + // basename keying (the bug this refactor fixes) or fabricate one. let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); - let mut videos_needing_playlists = Vec::new(); + let video_dir = Path::new(&video_path_base); + let mut videos_needing_playlists: Vec = Vec::new(); + let mut hashless_video_count = 0usize; - for (file_path, _relative_path) in &files { - if file_types::is_video_file(file_path) { - // Construct expected playlist path - let playlist_filename = - format!("{}.m3u8", file_path.file_name().unwrap().to_string_lossy()); - let playlist_path = Path::new(&video_path_base).join(&playlist_filename); - - // Check if playlist needs (re)generation - if playlist_needs_generation(file_path, &playlist_path) { - videos_needing_playlists.push(file_path.clone()); - } + for (file_path, relative_path) in &files { + if !file_types::is_video_file(file_path) { + continue; + } + let Some(hash) = existing_exif.get(relative_path).and_then(|h| h.clone()) else { + hashless_video_count += 1; + continue; + }; + let playlist_path = hls_paths::playlist_for_hash(video_dir, &hash); + if playlist_needs_generation(file_path, &playlist_path) { + videos_needing_playlists.push(VideoToQueue { + video_path: file_path.clone(), + content_hash: hash, + }); } } - // Send queue request to playlist manager + if hashless_video_count > 0 { + debug!( + "Watcher tick for '{}': skipped {} video(s) with NULL content_hash (will retry after backfill)", + library.name, hashless_video_count + ); + } + if !videos_needing_playlists.is_empty() { playlist_manager.do_send(QueueVideosMessage { - video_paths: videos_needing_playlists, + videos: videos_needing_playlists, }); } -- 2.49.1 From b8e17e05b7af273a883629081068f9906814a11f Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 14 May 2026 15:41:04 -0400 Subject: [PATCH 03/10] hls: rewrite orphan cleanup for hash-keyed layout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cleanup walk previously looked for `$VIDEO_PATH/.m3u8` and matched each file's stem against a recursive walk of every library. With the hash-keyed layout now in place, every playlist's file_stem is the literal string "playlist" — the old logic would treat every hash-keyed playlist as orphaned on its next run and wipe them all in one tick (default cleanup interval is 24h, so this is a 24-hour bomb on top of the prior commit). New approach: orphan-ness is decided in the database, not on the filesystem. The cleanup loop: - Snapshots every distinct non-NULL `image_exif.content_hash` into a HashSet (new `ExifDao::list_distinct_content_hashes` method — `SELECT DISTINCT content_hash WHERE content_hash IS NOT NULL`). - Walks `$VIDEO_PATH` two levels deep: top-level entries are filtered to 2-char lowercase hex shard dirs, each shard's children to 64-char hex hash dirs. Anything else (legacy `.m3u8` at root from the pre-content-hash era, operator-stashed dirs, partial writes) is left alone. - Hash dirs whose hash isn't in the alive set are `remove_dir_all`'d. Shard dirs that emptied as a result are reaped on the same pass via `remove_dir` (no-op if non-empty). - The library-stale safety gate is preserved: a stale library skips the cycle even though the orphan decision is DB-only, because the upstream missing-file scan that retires `image_exif` rows itself pauses for stale libraries. Belt-and-suspenders — keeping a hash dir for one extra 24h cycle is cheaper than wiping one whose source was briefly unreachable. The gate now also filters disabled libraries out of the stale set (they're intentionally absent from the health map). - The legacy `excluded_dirs` parameter is preserved on the function signature but unused (the walk no longer crosses library trees); flagged with a leading underscore. Callers in `main.rs` stay unchanged. `MockExifDao` in `files.rs` grows the new method (returns empty); unit tests for the new `is_hash_shard` / `is_full_hash` validators guard against an operator's stashed directory under VIDEO_PATH ever matching the orphan-rm path. Both pass. A follow-up commit handles the one-shot startup migration that retires the legacy basename-keyed `.m3u8` / `.ts` files at `$VIDEO_PATH` root. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/database/mod.rs | 30 +++++ src/files.rs | 7 ++ src/watcher.rs | 298 +++++++++++++++++++++++++++----------------- 3 files changed, 220 insertions(+), 115 deletions(-) diff --git a/src/database/mod.rs b/src/database/mod.rs index cf20ee9..d5a4d4a 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -414,6 +414,16 @@ pub trait ExifDao: Sync + Send { size_bytes: i64, ) -> Result<(), DbError>; + /// Every distinct non-NULL `content_hash` across all libraries. Used + /// by HLS orphan cleanup to identify hash dirs under `$VIDEO_PATH` + /// whose source video no longer exists. Cheap query (single column, + /// indexed) but unbounded in size — the result is a HashSet membership + /// check, so a 100k-photo library produces ~100k strings. + fn list_distinct_content_hashes( + &mut self, + context: &opentelemetry::Context, + ) -> Result, DbError>; + /// Return image_exif rows that need their `date_taken` resolved by the /// canonical-date waterfall (see `crate::date_resolver`): `date_taken /// IS NULL`. Returns `(library_id, rel_path)`. The caller filters to @@ -1231,6 +1241,26 @@ impl ExifDao for SqliteExifDao { .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } + fn list_distinct_content_hashes( + &mut self, + context: &opentelemetry::Context, + ) -> Result, DbError> { + trace_db_call(context, "query", "list_distinct_content_hashes", |_span| { + use schema::image_exif::dsl::*; + + let mut connection = self.connection.lock().expect("Unable to get ExifDao"); + + image_exif + .filter(content_hash.is_not_null()) + .select(content_hash) + .distinct() + .load::>(connection.deref_mut()) + .map(|rows| rows.into_iter().flatten().collect()) + .map_err(|_| anyhow::anyhow!("Query error")) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + fn get_rows_needing_date_backfill( &mut self, context: &opentelemetry::Context, diff --git a/src/files.rs b/src/files.rs index b7b035c..93d4345 100644 --- a/src/files.rs +++ b/src/files.rs @@ -1689,6 +1689,13 @@ mod tests { Ok(()) } + fn list_distinct_content_hashes( + &mut self, + _context: &opentelemetry::Context, + ) -> Result, DbError> { + Ok(Vec::new()) + } + fn get_rows_needing_date_backfill( &mut self, _context: &opentelemetry::Context, diff --git a/src/watcher.rs b/src/watcher.rs index c8de1ed..6182ddc 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -22,7 +22,6 @@ use std::time::{Duration, SystemTime}; use actix::Addr; use chrono::Utc; use log::{debug, error, info, warn}; -use walkdir::WalkDir; use crate::backfill; use crate::content_hash; @@ -45,18 +44,29 @@ use crate::video::actors::{ }; use crate::video::hls_paths; -/// Clean up orphaned HLS playlists and segments whose source videos no longer exist. +/// Clean up orphaned HLS hash directories under `$VIDEO_PATH` whose +/// content_hash no longer appears in `image_exif`. +/// +/// Walks `///` — the layout written by the +/// hash-keyed `PlaylistGenerator` — and deletes any hash directory whose +/// hash isn't in the current DISTINCT set of `image_exif.content_hash` +/// values. Empty shard parents are reaped on the same pass. +/// +/// Legacy basename-keyed files at `$VIDEO_PATH` root (from the +/// pre-content-hash layout) are left alone here; the one-shot startup +/// migration is responsible for retiring those. /// /// `libs_lock` is the shared live view of the libraries table — read at the /// top of each cleanup pass so a PATCH /libraries/{id} that disables or /// re-mounts a library is picked up without a restart. pub fn cleanup_orphaned_playlists( libs_lock: Arc>>, - excluded_dirs: Vec, + _excluded_dirs: Vec, library_health: libraries::LibraryHealthMap, ) { std::thread::spawn(move || { - let video_path = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); + let video_path_str = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); + let video_path = PathBuf::from(&video_path_str); // Get cleanup interval from environment (default: 24 hours) let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS") @@ -64,18 +74,13 @@ pub fn cleanup_orphaned_playlists( .and_then(|s| s.parse::().ok()) .unwrap_or(86400); // 24 hours - info!("Starting orphaned playlist cleanup job"); + info!("Starting orphaned HLS cleanup job"); info!(" Cleanup interval: {} seconds", cleanup_interval_secs); - info!(" Playlist directory: {}", video_path); - { - let libs = libs_lock.read().unwrap_or_else(|e| e.into_inner()); - for lib in libs.iter() { - info!( - " Checking sources under '{}' at {}", - lib.name, lib.root_path - ); - } - } + info!(" HLS directory: {}", video_path.display()); + + let exif_dao: Arc>> = Arc::new(Mutex::new( + Box::new(SqliteExifDao::new()) as Box + )); loop { std::thread::sleep(Duration::from_secs(cleanup_interval_secs)); @@ -86,22 +91,27 @@ pub fn cleanup_orphaned_playlists( let libs: Vec = libs_lock.read().unwrap_or_else(|e| e.into_inner()).clone(); - // Safety gate: skip the cleanup cycle if any library is - // stale. A missing source video on a stale library is - // indistinguishable from a transient unmount, and the - // cleanup is destructive — we'd rather leak a few playlist - // files for a tick than delete one whose source is briefly - // unreachable. The cycle re-runs on the next interval. + // Safety gate: skip the cleanup cycle if any (enabled) + // library is stale. With hash-keyed layout the orphan + // decision is a pure DB query, but the upstream + // missing-file scan that *removes* image_exif rows already + // pauses for stale libraries — so a stale tick can hold + // hashes alive that would otherwise have been GC'd. The + // safety is then mostly belt-and-suspenders: a hash that + // should have been retired is just kept one tick longer. + // We'd rather leak a few hash dirs for 24h than wipe a + // hash dir whose source was briefly unreachable. { let guard = library_health.read().unwrap_or_else(|e| e.into_inner()); let stale: Vec = libs .iter() + .filter(|lib| lib.enabled) .filter(|lib| guard.get(&lib.id).map(|h| !h.is_online()).unwrap_or(false)) .map(|lib| lib.name.clone()) .collect(); if !stale.is_empty() { warn!( - "Skipping orphaned-playlist cleanup: {} library(ies) stale: [{}]", + "Skipping orphaned-HLS cleanup: {} library(ies) stale: [{}]", stale.len(), stale.join(", ") ); @@ -109,116 +119,135 @@ pub fn cleanup_orphaned_playlists( } } - info!("Running orphaned playlist cleanup"); + info!("Running orphaned HLS cleanup"); let start = std::time::Instant::now(); - let mut deleted_count = 0; - let mut error_count = 0; - // Find all .m3u8 files in VIDEO_PATH - let playlists: Vec = WalkDir::new(&video_path) - .into_iter() - .filter_map(|e| e.ok()) - .filter(|e| e.file_type().is_file()) - .filter(|e| { - e.path() - .extension() - .and_then(|s| s.to_str()) - .map(|ext| ext.eq_ignore_ascii_case("m3u8")) + // Snapshot every live content_hash currently in image_exif. + // We intentionally don't filter by library here — a hash that + // lives in any library is alive, even if the library a given + // download attributed it to has since been disabled. + let alive_hashes: HashSet = { + let context = opentelemetry::Context::new(); + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + match dao.list_distinct_content_hashes(&context) { + Ok(hashes) => hashes.into_iter().collect(), + Err(e) => { + error!( + "Failed to load distinct content hashes; skipping HLS cleanup: {:?}", + e + ); + continue; + } + } + }; + + let mut deleted_count = 0usize; + let mut error_count = 0usize; + let mut inspected = 0usize; + + // Walk top-level entries of VIDEO_PATH. Each is either a + // legacy basename-keyed `.m3u8` / `.ts` (skip — migration + // owns those) or a 2-char shard directory. + let read_root = match std::fs::read_dir(&video_path) { + Ok(r) => r, + Err(e) => { + error!( + "HLS cleanup: failed to read VIDEO_PATH {}: {}", + video_path.display(), + e + ); + continue; + } + }; + + for shard_entry in read_root.flatten() { + let shard_path = shard_entry.path(); + if !shard_entry + .file_type() + .map(|t| t.is_dir()) + .unwrap_or(false) + { + continue; + } + let shard_name = match shard_path.file_name().and_then(|n| n.to_str()) { + Some(n) => n.to_owned(), + None => continue, + }; + if !is_hash_shard(&shard_name) { + continue; + } + + // Hash dirs inside this shard. + let read_shard = match std::fs::read_dir(&shard_path) { + Ok(r) => r, + Err(e) => { + warn!( + "HLS cleanup: failed to read shard {}: {}", + shard_path.display(), + e + ); + continue; + } + }; + + let mut shard_emptied = true; + for hash_entry in read_shard.flatten() { + let hash_path = hash_entry.path(); + if !hash_entry + .file_type() + .map(|t| t.is_dir()) .unwrap_or(false) - }) - .map(|e| e.path().to_path_buf()) - .collect(); + { + shard_emptied = false; + continue; + } + let Some(hash_name) = + hash_path.file_name().and_then(|n| n.to_str()).map(|n| n.to_owned()) + else { + shard_emptied = false; + continue; + }; + if !is_full_hash(&hash_name) { + shard_emptied = false; + continue; + } + inspected += 1; - info!("Found {} playlist files to check", playlists.len()); - - for playlist_path in playlists { - // Extract the original video filename from playlist name - // Playlist format: {VIDEO_PATH}/{original_filename}.m3u8 - if let Some(filename) = playlist_path.file_stem() { - let video_filename = filename.to_string_lossy(); - - // Search for this video file across every configured - // library, respecting EXCLUDED_DIRS so we don't - // false-resurrect playlists for videos that only - // exist inside an excluded subtree. As soon as one - // library has a matching source, we're done — the - // playlist isn't orphaned. - let mut video_exists = false; - 'libs: for lib in &libs { - let effective = lib.effective_excluded_dirs(&excluded_dirs); - for entry in image_api::file_scan::walk_library_files( - Path::new(&lib.root_path), - &effective, - ) { - if let Some(entry_stem) = entry.path().file_stem() - && entry_stem == filename - && file_types::is_video_file(entry.path()) - { - video_exists = true; - break 'libs; - } - } + if alive_hashes.contains(&hash_name) { + shard_emptied = false; + continue; } - if !video_exists { - debug!( - "Source video for playlist {} no longer exists, deleting", - playlist_path.display() - ); - - // Delete the playlist file - if let Err(e) = std::fs::remove_file(&playlist_path) { + debug!( + "HLS cleanup: removing orphan hash dir {}", + hash_path.display() + ); + match std::fs::remove_dir_all(&hash_path) { + Ok(()) => deleted_count += 1, + Err(e) => { warn!( - "Failed to delete playlist {}: {}", - playlist_path.display(), + "Failed to delete orphan hash dir {}: {}", + hash_path.display(), e ); error_count += 1; - } else { - deleted_count += 1; - - // Also try to delete associated .ts segment files - // They are typically named {filename}N.ts in the same directory - if let Some(parent_dir) = playlist_path.parent() { - for entry in WalkDir::new(parent_dir) - .max_depth(1) - .into_iter() - .filter_map(|e| e.ok()) - .filter(|e| e.file_type().is_file()) - { - let entry_path = entry.path(); - if let Some(ext) = entry_path.extension() - && ext.eq_ignore_ascii_case("ts") - { - // Check if this .ts file belongs to our playlist - if let Some(ts_stem) = entry_path.file_stem() { - let ts_name = ts_stem.to_string_lossy(); - if ts_name.starts_with(&*video_filename) { - if let Err(e) = std::fs::remove_file(entry_path) { - debug!( - "Failed to delete segment {}: {}", - entry_path.display(), - e - ); - } else { - debug!( - "Deleted segment: {}", - entry_path.display() - ); - } - } - } - } - } - } + shard_emptied = false; } } } + + // If this shard now has no surviving hash dirs, reap + // the (empty) shard dir too. remove_dir fails if non- + // empty, which is the guard. + if shard_emptied { + let _ = std::fs::remove_dir(&shard_path); + } } info!( - "Orphaned playlist cleanup completed in {:?}: deleted {} playlists, {} errors", + "Orphaned HLS cleanup completed in {:?}: inspected {} hash dirs, deleted {} orphans, {} errors", start.elapsed(), + inspected, deleted_count, error_count ); @@ -226,6 +255,18 @@ pub fn cleanup_orphaned_playlists( }); } +/// True iff `s` is a two-character lowercase-hex shard prefix. +fn is_hash_shard(s: &str) -> bool { + s.len() == 2 && s.bytes().all(|b| b.is_ascii_hexdigit()) +} + +/// True iff `s` looks like a full blake3 hex digest (64 hex chars). +/// Be strict so we don't accidentally rm a non-HLS directory operators +/// have stashed under VIDEO_PATH. +fn is_full_hash(s: &str) -> bool { + s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit()) +} + pub fn watch_files( libs_lock: Arc>>, playlist_manager: Addr, @@ -991,6 +1032,33 @@ mod tests { assert!(playlist_needs_generation(&video, &playlist)); } + #[test] + fn is_hash_shard_accepts_only_two_hex_chars() { + assert!(is_hash_shard("ab")); + assert!(is_hash_shard("00")); + assert!(is_hash_shard("FF")); // ASCII hexdigit covers upper-case too + assert!(!is_hash_shard("a")); + assert!(!is_hash_shard("abc")); + assert!(!is_hash_shard("zz")); + assert!(!is_hash_shard("")); + assert!(!is_hash_shard("a/")); + } + + #[test] + fn is_full_hash_accepts_only_64_hex_chars() { + let h64 = "a".repeat(64); + assert!(is_full_hash(&h64)); + let mixed = format!("ab{}", "0".repeat(62)); + assert!(is_full_hash(&mixed)); + assert!(!is_full_hash(&"a".repeat(63))); + assert!(!is_full_hash(&"a".repeat(65))); + assert!(!is_full_hash(&format!("z{}", "a".repeat(63)))); + // Defends against operator stashing e.g. ".tmp" or "Plex" under + // VIDEO_PATH — neither passes the full-hash gate. + assert!(!is_full_hash(".tmp")); + assert!(!is_full_hash("Plex")); + } + #[test] fn playlist_needs_generation_true_when_video_missing_metadata() { // Video doesn't exist; metadata fails for it. Falls through to the -- 2.49.1 From 78fabc2b3249b62139afc98f78abe51dfb2c425d Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 14 May 2026 15:43:13 -0400 Subject: [PATCH 04/10] hls: retire legacy basename-keyed HLS files on startup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `video::legacy_migration::retire_legacy_hls_output`, called once from `main` right after the diesel migrations run and before the actor pipeline starts. Walks `$VIDEO_PATH` at depth 1, deletes every `.m3u8` / `.m3u8.tmp` / `.m3u8.unsupported` / `.ts` file at root, and logs a single info line with per-class counts. Skips directories (the new layout's `//` lives there) and unknown extensions, so an operator's stashed README or `.tmp` from a different tool is safe. Why this needs its own one-shot pass rather than letting the rewritten `cleanup_orphaned_playlists` handle it: the cleanup walk deliberately only looks at `//` dirs (so it can't accidentally `rm` operator-stashed content), so without this migration the legacy files would sit at root forever, never served, never refreshed. Operator complaint count from the previous IMG_NNNN.MOV collision: ~10 duplicate-basename hits on one library alone; total .m3u8 count was 699 vs a much larger video count — i.e. the loser of every collision was a permanent orphan. This pass collects all of them, then the running watcher writes hash-keyed playlists going forward. Idempotent — a second boot finds nothing and reports zero deletions, so the call site can stay in `main` across releases until the module is removed in a later cleanup commit. Tests cover the happy path (legacy artifacts gone, hash dir untouched, unrelated files left alone), idempotency, and the missing-directory case. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/main.rs | 10 ++ src/video/legacy_migration.rs | 246 ++++++++++++++++++++++++++++++++++ src/video/mod.rs | 1 + 3 files changed, 257 insertions(+) create mode 100644 src/video/legacy_migration.rs diff --git a/src/main.rs b/src/main.rs index ec5b3d0..023b57b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -72,6 +72,16 @@ fn main() -> std::io::Result<()> { run_migrations(&mut connect()).expect("Failed to run migrations"); + // One-shot retirement of the pre-content-hash HLS layout. Idempotent + // — a second boot finds nothing and reports zero deletions, so it's + // safe to leave wired in until the module is removed in a later + // release. Runs before the actor pipeline starts so we never race a + // PlaylistGenerator write against this rm. + { + let video_path = env::var("VIDEO_PATH").expect("VIDEO_PATH was not set in the env"); + video::legacy_migration::retire_legacy_hls_output(std::path::Path::new(&video_path)); + } + let system = actix::System::new(); system.block_on(async { // Just use basic logger when running a non-release build diff --git a/src/video/legacy_migration.rs b/src/video/legacy_migration.rs new file mode 100644 index 0000000..0db8e25 --- /dev/null +++ b/src/video/legacy_migration.rs @@ -0,0 +1,246 @@ +//! One-shot retirement of the pre-content-hash HLS output layout. +//! +//! Before the hash-keyed layout landed, the actor pipeline wrote every +//! playlist as `$VIDEO_PATH/.m3u8` with sibling +//! `_NNN.ts` segments and a `.m3u8.unsupported` +//! sentinel on ffmpeg failure. The new pipeline (see +//! [`crate::video::hls_paths`]) puts everything inside a hash-keyed +//! subdirectory, so the legacy flat files are orphaned the moment the +//! upgraded binary boots — they're not served, not refreshed, and not +//! GC'd by the new orphan cleanup (which deliberately ignores anything +//! that doesn't sit inside a `//` dir). +//! +//! This migration runs once on startup. It walks `$VIDEO_PATH` at depth +//! 1, deletes every `.m3u8` / `.m3u8.tmp` / `.m3u8.unsupported` / `.ts` +//! file, and reports a single info line. It is idempotent — a second +//! run finds nothing and reports zero deletions, so it's safe to leave +//! wired in across releases until the codebase finally drops the +//! module. +//! +//! Sub-directories under `$VIDEO_PATH` are intentionally left alone: +//! every legitimate child of `$VIDEO_PATH` in the new layout is a +//! 2-char shard directory holding hash subdirs, and those are managed +//! by `cleanup_orphaned_playlists`. + +use std::path::Path; + +use log::{info, warn}; + +/// Counters for what the migration did this run. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub struct RetireStats { + pub deleted_playlists: usize, + pub deleted_segments: usize, + pub deleted_sentinels: usize, + pub deleted_tmp: usize, + pub errors: usize, +} + +impl RetireStats { + pub fn total_deleted(&self) -> usize { + self.deleted_playlists + + self.deleted_segments + + self.deleted_sentinels + + self.deleted_tmp + } +} + +/// Delete every legacy basename-keyed HLS artifact at the root of +/// `video_dir`. Hash dirs (children that are directories) are skipped. +/// Returns counts so the caller can log a single line summary. +pub fn retire_legacy_hls_output(video_dir: &Path) -> RetireStats { + let mut stats = RetireStats::default(); + + let read = match std::fs::read_dir(video_dir) { + Ok(r) => r, + Err(e) => { + warn!( + "Legacy HLS migration: cannot read {} ({}); skipping", + video_dir.display(), + e + ); + return stats; + } + }; + + for entry in read.flatten() { + let file_type = match entry.file_type() { + Ok(t) => t, + Err(_) => continue, + }; + if !file_type.is_file() { + // Hash shard directories live here in the new layout. + continue; + } + let path = entry.path(); + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + + let bucket = classify(name); + let Some(bucket) = bucket else { + continue; + }; + + match std::fs::remove_file(&path) { + Ok(()) => match bucket { + LegacyKind::Playlist => stats.deleted_playlists += 1, + LegacyKind::Segment => stats.deleted_segments += 1, + LegacyKind::Sentinel => stats.deleted_sentinels += 1, + LegacyKind::Tmp => stats.deleted_tmp += 1, + }, + Err(e) => { + warn!( + "Legacy HLS migration: failed to remove {}: {}", + path.display(), + e + ); + stats.errors += 1; + } + } + } + + if stats.total_deleted() > 0 || stats.errors > 0 { + info!( + "Legacy HLS migration: deleted {} playlist(s), {} segment(s), {} sentinel(s), {} tmp; {} error(s)", + stats.deleted_playlists, + stats.deleted_segments, + stats.deleted_sentinels, + stats.deleted_tmp, + stats.errors, + ); + } else { + info!( + "Legacy HLS migration: nothing to do under {}", + video_dir.display() + ); + } + + stats +} + +#[derive(Debug, Clone, Copy)] +enum LegacyKind { + Playlist, + Segment, + Sentinel, + Tmp, +} + +/// Decide whether a flat file at `$VIDEO_PATH` root is legacy HLS +/// output. Returns `None` for anything else — operator-stashed files, +/// new-layout files (which don't live here), etc. — so we don't rm them. +fn classify(name: &str) -> Option { + // Order matters: sentinel and tmp are more specific suffixes that + // sit on top of the .m3u8 / .ts extensions, so check them first. + if name.ends_with(".m3u8.unsupported") { + return Some(LegacyKind::Sentinel); + } + if name.ends_with(".m3u8.tmp") { + return Some(LegacyKind::Tmp); + } + if name.ends_with(".m3u8") { + return Some(LegacyKind::Playlist); + } + if name.ends_with(".ts") { + return Some(LegacyKind::Segment); + } + None +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + use tempfile::tempdir; + + #[test] + fn classify_recognises_each_legacy_artifact() { + assert!(matches!( + classify("IMG_0341.MOV.m3u8"), + Some(LegacyKind::Playlist) + )); + assert!(matches!( + classify("IMG_0341.MOV_000.ts"), + Some(LegacyKind::Segment) + )); + assert!(matches!( + classify("IMG_0341.MOV.m3u8.unsupported"), + Some(LegacyKind::Sentinel) + )); + assert!(matches!( + classify("IMG_0341.MOV.m3u8.tmp"), + Some(LegacyKind::Tmp) + )); + + assert!(classify("README.md").is_none()); + assert!(classify("ab").is_none()); // shard dir name + assert!(classify(".keep").is_none()); + } + + #[test] + fn retire_deletes_legacy_and_leaves_hash_dirs() { + let tmp = tempdir().unwrap(); + let root = tmp.path(); + + // Legacy artifacts at root. + fs::write(root.join("IMG_0341.MOV.m3u8"), b"#EXTM3U").unwrap(); + fs::write(root.join("IMG_0341.MOV_000.ts"), b"\x00").unwrap(); + fs::write(root.join("IMG_0341.MOV_001.ts"), b"\x00").unwrap(); + fs::write(root.join("clip.MP4.m3u8.unsupported"), b"").unwrap(); + fs::write(root.join("partial.m3u8.tmp"), b"").unwrap(); + + // New-layout hash dir we must NOT touch. + let hash_dir = root.join("ab").join("a".repeat(64)); + fs::create_dir_all(&hash_dir).unwrap(); + fs::write(hash_dir.join("playlist.m3u8"), b"#EXTM3U").unwrap(); + fs::write(hash_dir.join("segment_000.ts"), b"\x00").unwrap(); + + // Unrelated file we must NOT touch. + fs::write(root.join("README.md"), b"don't touch me").unwrap(); + + let stats = retire_legacy_hls_output(root); + assert_eq!(stats.deleted_playlists, 1); + assert_eq!(stats.deleted_segments, 2); + assert_eq!(stats.deleted_sentinels, 1); + assert_eq!(stats.deleted_tmp, 1); + assert_eq!(stats.errors, 0); + + // Legacy artifacts gone. + assert!(!root.join("IMG_0341.MOV.m3u8").exists()); + assert!(!root.join("IMG_0341.MOV_000.ts").exists()); + assert!(!root.join("clip.MP4.m3u8.unsupported").exists()); + assert!(!root.join("partial.m3u8.tmp").exists()); + // Hash dir untouched. + assert!(hash_dir.join("playlist.m3u8").exists()); + assert!(hash_dir.join("segment_000.ts").exists()); + // Unrelated file untouched. + assert!(root.join("README.md").exists()); + } + + #[test] + fn retire_is_idempotent() { + let tmp = tempdir().unwrap(); + let root = tmp.path(); + + fs::write(root.join("video.mp4.m3u8"), b"#EXTM3U").unwrap(); + fs::write(root.join("video.mp4_000.ts"), b"\x00").unwrap(); + + let first = retire_legacy_hls_output(root); + assert_eq!(first.deleted_playlists + first.deleted_segments, 2); + + let second = retire_legacy_hls_output(root); + assert_eq!(second.total_deleted(), 0); + assert_eq!(second.errors, 0); + } + + #[test] + fn retire_handles_missing_dir() { + // No panic, no error count blowing up — just a warn + zero stats. + let tmp = tempdir().unwrap(); + let missing = tmp.path().join("does_not_exist"); + let stats = retire_legacy_hls_output(&missing); + assert_eq!(stats.total_deleted(), 0); + assert_eq!(stats.errors, 0); + } +} diff --git a/src/video/mod.rs b/src/video/mod.rs index e7cfa67..f28d302 100644 --- a/src/video/mod.rs +++ b/src/video/mod.rs @@ -10,6 +10,7 @@ use walkdir::WalkDir; pub mod actors; pub mod ffmpeg; pub mod hls_paths; +pub mod legacy_migration; #[allow(dead_code)] pub async fn generate_video_gifs() { -- 2.49.1 From 7c153596fe18b12790d9c0e86adf076e579e67a6 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 14 May 2026 15:51:01 -0400 Subject: [PATCH 05/10] hls: hash-keyed HTTP routes for /video/generate and serving MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `POST /video/generate` is reshaped to return a JSON object instead of a bare string. New fields: - `playlist_url`: stable hash-keyed URL of the form `/video/hls//playlist.m3u8`. Use this with hls.js / native players — relative segment refs inside the playlist resolve to `/video/hls//segment_NNN.ts` because the URL is path-based. - `content_hash`: the blake3 hex digest that identifies the bytes. Stable across libraries, archive ingests, renames; clients can cache the URL by hash. - `ready`: true iff the playlist file is already on disk. False means a transcode was just queued; the client should retry the URL after a short delay (or rely on hls.js's built-in retry). - `playlist` (legacy): basename-keyed path string, echoed under the old field name so clients that destructure `response.playlist` keep working during the rollout. The startup migration deletes the underlying file, so this URL will 404; clients should migrate to `playlist_url`. Field is slated for removal once Apollo / File Viewer ship the update. The handler: - resolves the source path across libraries (same logic as before), - looks up `image_exif.content_hash` for that (library_id, rel_path), - falls back to inline `content_hash::compute` when the row is mid- backfill — pure read, no library mutation, - sends a single-element `QueueVideosMessage` to `VideoPlaylistManager` if the playlist isn't already on disk and there's no `playlist.unsupported` sentinel, - returns the URL immediately. The actor pipeline owns transcoding. New route `GET /video/hls/{hash}/{file}`: - strict validation: hash must be 64 ascii-hex chars; file must be `playlist.m3u8` or `segment_NNN.ts` (digits only). Anything else returns 400 so we never have to rely on path canonicalisation alone to defend against traversal, - belt-and-suspenders canonicalize() guard verifies the resolved file lives under `$VIDEO_PATH`, - serves with the standard `NamedFile::into_response` machinery. Cleanup in `actors.rs`: - `ProcessMessage` + its `StreamActor` handler had no senders after the rewire — removed. `StreamActor` itself stays (still handles `RefreshThumbnailsMessage` from `files.rs`). - `create_playlist`, `playlist_file_for`, `playlist_unsupported_sentinel` are gone — the legacy on-demand transcode helper and the migration-only path helpers had no remaining users (the migration uses its own classify() function). - Imports tightened: dropped `Child`, `ExitStatus`, `trace`. Tests cover both new validators (`is_valid_hash`, `is_allowed_hls_filename`) including the strings that motivated the defence-in-depth (traversal attempts, internal `.tmp`/`.unsupported` artifacts, malformed segment names). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/handlers/video.rs | 347 +++++++++++++++++++++++++++++++++++------- src/main.rs | 1 + src/video/actors.rs | 93 +---------- 3 files changed, 297 insertions(+), 144 deletions(-) diff --git a/src/handlers/video.rs b/src/handlers/video.rs index d00346c..21338b0 100644 --- a/src/handlers/video.rs +++ b/src/handlers/video.rs @@ -13,92 +13,298 @@ use actix_web::{ use log::{debug, error, info, warn}; use opentelemetry::trace::{Span, Status, Tracer}; use opentelemetry::{KeyValue, global}; +use serde::Serialize; +use crate::content_hash; use crate::data::{ Claims, PreviewClipRequest, PreviewStatusItem, PreviewStatusRequest, PreviewStatusResponse, ThumbnailRequest, }; -use crate::database::PreviewDao; +use crate::database::{ExifDao, PreviewDao}; use crate::files::is_valid_full_path; use crate::libraries; use crate::otel::{extract_context_from_request, global_tracer}; use crate::state::AppState; -use crate::video::actors::{GeneratePreviewClipMessage, ProcessMessage, create_playlist}; +use crate::video::actors::{GeneratePreviewClipMessage, QueueVideosMessage, VideoToQueue}; +use crate::video::hls_paths; + +/// Response body for `POST /video/generate`. New clients should consume +/// `playlist_url` (hash-keyed, stable across libraries and renames) and +/// poll for readiness via the URL itself. Legacy clients reading the +/// raw `playlist` string will be served the legacy basename-keyed path +/// for as long as the field exists — that field will be dropped once +/// every shipped client has migrated. +#[derive(Serialize, Debug)] +struct GenerateVideoResponse { + /// Hash-keyed URL to the HLS playlist. Resolves to + /// `$VIDEO_PATH///playlist.m3u8` server-side. Relative + /// segment refs inside the playlist resolve correctly because the + /// browser appends to this URL's path. + playlist_url: String, + /// blake3 content hash of the source video. Stable per byte content, + /// so duplicate uploads / archive ingests share one set of HLS + /// output. + content_hash: String, + /// `true` iff the playlist file is already on disk. `false` means a + /// transcode was queued; clients should retry the URL after a short + /// delay (or rely on HLS.js's own retry policy). + ready: bool, + /// Legacy basename-keyed playlist *path string*. Returned for older + /// clients that read the response body as a single string under the + /// pre-2026-05 wire format. New clients should ignore this field. + #[serde(rename = "playlist")] + legacy_path: String, +} #[post("/video/generate")] pub async fn generate_video( _claims: Claims, request: HttpRequest, app_state: Data, + exif_dao: Data>>, body: web::Json, ) -> impl Responder { let tracer = global_tracer(); - let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("generate_video", &context); - let filename = PathBuf::from(&body.path); + let filename_pb = PathBuf::from(&body.path); + let Some(filename) = filename_pb + .file_name() + .and_then(|n| n.to_str()) + .map(str::to_string) + else { + let message = format!("Unable to get file name: {:?}", &body.path); + error!("{}", message); + span.set_status(Status::error(message)); + return HttpResponse::BadRequest().finish(); + }; - if let Some(name) = filename.file_name() { - let filename = name.to_str().expect("Filename should convert to string"); - // KNOWN ISSUE (multi-library): playlist filename is the basename - // alone, so two source files with the same basename — whether in - // different libraries or different subdirs of one library — - // overwrite each other's playlists while ffmpeg runs. The - // hash-keyed `content_hash::hls_dir` is the long-term answer - // (see CLAUDE.md "Multi-library data model"); rewiring the - // actor pipeline to use it is out of scope for this branch. - // The orphan-cleanup job above already walks every library so - // it doesn't false-delete archive playlists. - let playlist = format!("{}/{}.m3u8", app_state.video_path, filename); - - let library = libraries::resolve_library_param(&app_state, body.library.as_deref()) + let preferred_library = + libraries::resolve_library_param(&app_state, body.library.as_deref()) .ok() .flatten() .unwrap_or_else(|| app_state.primary_library()); - // Try the resolved library first, then fall back to any other library - // that actually contains the file — handles union-mode requests where - // the mobile client passes no library but the file lives in a - // non-primary library. - let resolved = is_valid_full_path(&library.root_path, &body.path, false) - .filter(|p| p.exists()) - .or_else(|| { - app_state.libraries.iter().find_map(|lib| { - if lib.id == library.id { - return None; - } - is_valid_full_path(&lib.root_path, &body.path, false).filter(|p| p.exists()) - }) - }); + // Try the resolved library first, then fall back to any other library + // that actually contains the file — handles union-mode requests where + // the mobile client passes no library but the file lives in a + // non-primary library. Track which library won so the DB lookup is + // scoped correctly. + let resolved = is_valid_full_path(&preferred_library.root_path, &body.path, false) + .filter(|p| p.exists()) + .map(|p| (preferred_library.id, preferred_library.root_path.clone(), p)) + .or_else(|| { + app_state.libraries.iter().find_map(|lib| { + if lib.id == preferred_library.id { + return None; + } + is_valid_full_path(&lib.root_path, &body.path, false) + .filter(|p| p.exists()) + .map(|p| (lib.id, lib.root_path.clone(), p)) + }) + }); - if let Some(path) = resolved { - if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await { - span.add_event( - "playlist_created".to_string(), - vec![KeyValue::new("playlist-name", filename.to_string())], + let Some((resolved_library_id, resolved_root, full_path)) = resolved else { + span.set_status(Status::error(format!("invalid path {:?}", &body.path))); + return HttpResponse::BadRequest().finish(); + }; + + // Build the rel_path used to look up the row. + let full_path_str = full_path.to_string_lossy().to_string(); + let rel_path = full_path_str + .strip_prefix(&resolved_root) + .unwrap_or(full_path_str.as_str()) + .trim_start_matches(['/', '\\']) + .to_string(); + + // DB lookup first. Cheap and avoids re-reading the file off disk for + // already-ingested videos. + let hash_from_db: Option = { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + match dao.get_exif_batch(&context, Some(resolved_library_id), &[rel_path.clone()]) { + Ok(rows) => rows.into_iter().next().and_then(|r| r.content_hash), + Err(e) => { + warn!( + "exif_dao.get_exif_batch failed for {} (lib {}): {:?}", + rel_path, resolved_library_id, e ); - - span.set_status(Status::Ok); - app_state.stream_manager.do_send(ProcessMessage( - playlist.clone(), - child, - // opentelemetry::Context::new().with_span(span), - )); + None } - } else { - span.set_status(Status::error(format!("invalid path {:?}", &body.path))); - return HttpResponse::BadRequest().finish(); } + }; - HttpResponse::Ok().json(playlist) + // Best-effort fallback: compute on-the-fly when the DB row hasn't + // been written or is mid-backfill. Read-only — no library mutation. + let content_hash_str = match hash_from_db { + Some(h) => h, + None => match content_hash::compute(&full_path) { + Ok(id) => id.content_hash, + Err(e) => { + error!( + "Failed to compute content_hash for {}: {}", + full_path.display(), + e + ); + span.set_status(Status::error(format!("hash compute failed: {}", e))); + return HttpResponse::InternalServerError().finish(); + } + }, + }; + + let video_dir = std::path::Path::new(&app_state.video_path); + let playlist_path = hls_paths::playlist_for_hash(video_dir, &content_hash_str); + let sentinel_path = hls_paths::sentinel_for_hash(video_dir, &content_hash_str); + let ready = playlist_path.exists(); + + if !ready && !sentinel_path.exists() { + // Kick off generation via the existing actor pipeline. Fire-and- + // forget — the playlist appears at `playlist_path` once ffmpeg + // + rename complete. The client polls the URL. + info!( + "/video/generate: queueing playlist for {} (hash={})", + full_path.display(), + &content_hash_str[..content_hash_str.len().min(16)] + ); + app_state.playlist_manager.do_send(QueueVideosMessage { + videos: vec![VideoToQueue { + video_path: full_path.clone(), + content_hash: content_hash_str.clone(), + }], + }); + span.add_event( + "playlist_queued", + vec![KeyValue::new("content_hash", content_hash_str.clone())], + ); + } else if ready { + span.add_event( + "playlist_already_present", + vec![KeyValue::new("content_hash", content_hash_str.clone())], + ); } else { - let message = format!("Unable to get file name: {:?}", filename); - error!("{}", message); - span.set_status(Status::error(message)); - - HttpResponse::BadRequest().finish() + // Sentinel present — past transcode attempt failed. Return the + // URL anyway (it'll 404 / 5xx at fetch time) so the client gets + // a deterministic answer. Operator must delete the sentinel to + // force a retry. + warn!( + "/video/generate: unsupported sentinel present for {} (hash={}); not re-queueing", + full_path.display(), + &content_hash_str[..content_hash_str.len().min(16)] + ); } + + let playlist_url = format!( + "/video/hls/{}/{}", + content_hash_str, + hls_paths::PLAYLIST_FILENAME + ); + let legacy_path = format!("{}/{}.m3u8", app_state.video_path, filename); + + span.set_status(Status::Ok); + HttpResponse::Ok().json(GenerateVideoResponse { + playlist_url, + content_hash: content_hash_str, + ready, + legacy_path, + }) +} + +/// Serve HLS playlist or segment files under the hash-keyed layout +/// `$VIDEO_PATH///`. The matched `{file}` must be +/// either `playlist.m3u8` or a `segment_NNN.ts` style segment; any other +/// shape is 400'd to defend against operators stashing other content in +/// the hash dir. +#[get("/video/hls/{hash}/{file}")] +pub async fn stream_hls_file( + request: HttpRequest, + _: Claims, + path: web::Path<(String, String)>, + app_state: Data, +) -> impl Responder { + let tracer = global_tracer(); + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("stream_hls_file", &context); + + let (hash, file) = path.into_inner(); + if !is_valid_hash(&hash) { + span.set_status(Status::error("invalid hash")); + return HttpResponse::BadRequest().body("invalid hash"); + } + if !is_allowed_hls_filename(&file) { + span.set_status(Status::error("invalid file")); + return HttpResponse::BadRequest().body("invalid file"); + } + + let shard = &hash[..2]; + let file_path = PathBuf::from(&app_state.video_path) + .join(shard) + .join(&hash) + .join(&file); + + // Path-traversal guard: canonicalize both sides and require the file + // to live under `app_state.video_path`. `is_valid_hash` / + // `is_allowed_hls_filename` already block dangerous strings, but + // belt-and-suspenders here is cheap. + let canonical_base = match std::fs::canonicalize(&app_state.video_path) { + Ok(p) => p, + Err(e) => { + error!("Failed to canonicalize VIDEO_PATH: {:?}", e); + span.set_status(Status::error("VIDEO_PATH not canonicalisable")); + return HttpResponse::InternalServerError().finish(); + } + }; + let canonical_file = match std::fs::canonicalize(&file_path) { + Ok(p) => p, + Err(_) => { + debug!("HLS file not found: {}", file_path.display()); + span.set_status(Status::error("not found")); + return HttpResponse::NotFound().finish(); + } + }; + if !canonical_file.starts_with(&canonical_base) { + warn!( + "Path traversal attempt: {} resolved outside VIDEO_PATH", + file_path.display() + ); + span.set_status(Status::error("traversal")); + return HttpResponse::Forbidden().finish(); + } + + match NamedFile::open(&canonical_file) { + Ok(f) => { + span.set_status(Status::Ok); + f.into_response(&request) + } + Err(_) => { + span.set_status(Status::error("not found")); + HttpResponse::NotFound().finish() + } + } +} + +/// 64 lowercase-or-upper hex chars. Strict so we don't accept arbitrary +/// strings that might canonicalize into trouble. +fn is_valid_hash(s: &str) -> bool { + s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit()) +} + +/// Allowed file names inside a hash dir. `playlist.m3u8` plus segment +/// files matching the `segment_NNN.ts` template that `PlaylistGenerator` +/// writes via `hls_paths::SEGMENT_TEMPLATE`. Anything else (including +/// `.tmp`, `.unsupported`, dotfiles) returns 400 — these are internal +/// artifacts the client should never request. +fn is_allowed_hls_filename(name: &str) -> bool { + if name == hls_paths::PLAYLIST_FILENAME { + return true; + } + if let Some(rest) = name.strip_prefix("segment_") + && let Some(num) = rest.strip_suffix(".ts") + && !num.is_empty() + && num.bytes().all(|b| b.is_ascii_digit()) + { + return true; + } + false } #[get("/video/stream")] @@ -427,6 +633,41 @@ mod tests { use crate::testhelpers::TestPreviewDao; use actix_web::App; + #[test] + fn is_valid_hash_requires_64_ascii_hex() { + assert!(is_valid_hash(&"a".repeat(64))); + assert!(is_valid_hash(&"F".repeat(64))); + assert!(is_valid_hash(&format!("ab{}", "0".repeat(62)))); + + assert!(!is_valid_hash(&"a".repeat(63))); + assert!(!is_valid_hash(&"a".repeat(65))); + // Anything outside the hex alphabet — including '/', '.', '..' — + // is rejected up front so the path-traversal canonicalisation + // never has to defend the boundary alone. + assert!(!is_valid_hash(&format!("/{}", "a".repeat(63)))); + assert!(!is_valid_hash(&format!("..{}", "a".repeat(62)))); + assert!(!is_valid_hash(&"g".repeat(64))); + } + + #[test] + fn is_allowed_hls_filename_accepts_only_playlist_and_segments() { + assert!(is_allowed_hls_filename("playlist.m3u8")); + assert!(is_allowed_hls_filename("segment_000.ts")); + assert!(is_allowed_hls_filename("segment_999.ts")); + assert!(is_allowed_hls_filename("segment_0.ts")); + + // Internal artifacts the client should never request. + assert!(!is_allowed_hls_filename("playlist.m3u8.tmp")); + assert!(!is_allowed_hls_filename("playlist.unsupported")); + // Traversal / path components — defence in depth alongside + // the actix path matcher itself. + assert!(!is_allowed_hls_filename("..")); + assert!(!is_allowed_hls_filename("../etc/passwd")); + assert!(!is_allowed_hls_filename("segment_abc.ts")); + assert!(!is_allowed_hls_filename("segment_.ts")); + assert!(!is_allowed_hls_filename("")); + } + fn make_token() -> String { let claims = Claims::valid_user("1".to_string()); jsonwebtoken::encode( diff --git a/src/main.rs b/src/main.rs index 023b57b..cf493b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -266,6 +266,7 @@ fn main() -> std::io::Result<()> { .service(handlers::image::upload_image) .service(handlers::video::generate_video) .service(handlers::video::stream_video) + .service(handlers::video::stream_hls_file) .service(handlers::video::get_video_preview) .service(handlers::video::get_preview_status) .service(handlers::video::get_video_part) diff --git a/src/video/actors.rs b/src/video/actors.rs index d0ae04f..9f2df1b 100644 --- a/src/video/actors.rs +++ b/src/video/actors.rs @@ -5,12 +5,12 @@ use crate::otel::global_tracer; use crate::video::ffmpeg::{generate_preview_clip, get_duration_seconds_blocking}; use crate::video::hls_paths; use actix::prelude::*; -use log::{debug, error, info, trace, warn}; +use log::{debug, error, info, warn}; use opentelemetry::KeyValue; use opentelemetry::trace::{Span, Status, Tracer}; use std::io::Result; use std::path::{Path, PathBuf}; -use std::process::{Child, Command, ExitStatus, Stdio}; +use std::process::{Command, Stdio}; use std::sync::{Arc, Mutex}; use tokio::sync::Semaphore; // ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8 @@ -22,31 +22,6 @@ impl Actor for StreamActor { type Context = Context; } -pub struct ProcessMessage(pub String, pub Child); - -impl Message for ProcessMessage { - type Result = Result; -} - -impl Handler for StreamActor { - type Result = Result; - - fn handle(&mut self, msg: ProcessMessage, _ctx: &mut Self::Context) -> Self::Result { - trace!("Message received"); - let mut process = msg.1; - let result = process.wait(); - - debug!( - "Finished waiting for: {:?}. Code: {:?}", - msg.0, - result - .as_ref() - .map_or(-1, |status| status.code().unwrap_or(-1)) - ); - result - } -} - /// A video paired with its content hash, ready to be queued for HLS /// playlist generation. Hash is required because all output paths are /// keyed on it; callers that lack a hash (rows mid-backfill) must skip @@ -57,70 +32,6 @@ pub struct VideoToQueue { pub content_hash: String, } -/// Legacy basename-keyed playlist path. Retained for the one-shot startup -/// migration that retires pre-content-hash output; new playlist writes go -/// through [`hls_paths::playlist_for_hash`]. Will be removed once the -/// migration ships and runs to completion in production. -#[allow(dead_code)] -pub fn playlist_file_for(playlist_dir: &str, video_path: &Path) -> PathBuf { - let filename = video_path - .file_name() - .and_then(|n| n.to_str()) - .unwrap_or("unknown"); - PathBuf::from(format!("{}/{}.m3u8", playlist_dir, filename)) -} - -/// Legacy basename-keyed sentinel path. Same migration-only contract as -/// [`playlist_file_for`]. -#[allow(dead_code)] -pub fn playlist_unsupported_sentinel(playlist_file: &Path) -> PathBuf { - let mut s = playlist_file.as_os_str().to_owned(); - s.push(".unsupported"); - PathBuf::from(s) -} - -pub async fn create_playlist(video_path: &str, playlist_file: &str) -> Result { - if Path::new(playlist_file).exists() { - debug!("Playlist already exists: {}", playlist_file); - return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)); - } - - let result = Command::new("ffmpeg") - .arg("-i") - .arg(video_path) - .arg("-c:v") - .arg("h264") - .arg("-crf") - .arg("21") - .arg("-preset") - .arg("veryfast") - .arg("-hls_time") - .arg("3") - .arg("-hls_list_size") - .arg("0") - .arg("-hls_playlist_type") - .arg("vod") - .arg("-vf") - .arg("scale='min(1080,iw)':-2,setsar=1:1") - .arg(playlist_file) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .spawn(); - - let start_time = std::time::Instant::now(); - loop { - actix::clock::sleep(std::time::Duration::from_secs(1)).await; - - if Path::new(playlist_file).exists() - || std::time::Instant::now() - start_time > std::time::Duration::from_secs(5) - { - break; - } - } - - result -} - pub fn generate_video_thumbnail(path: &Path, destination: &Path) -> std::io::Result<()> { // Probe duration up front and seek to ~50% — gives a more // representative frame than a fixed offset (skipping title cards on -- 2.49.1 From 7cd1ea3cf87ba22b300a3fd6862ccd11bcdbc80f Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 14 May 2026 15:58:46 -0400 Subject: [PATCH 06/10] hls: per-library readiness gauges + GET /hls/stats endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The hash-keyed pipeline transcodes lazily, so a freshly mounted (or freshly upgraded) library is "mostly pending" for the first hour while the watcher works through the backlog. The operator wants a live read on remaining work so they can tune `HLS_CONCURRENCY` and know when to stop waiting. Adds: - `src/hls_stats.rs` — pure compute path (`stats_from_rows`) and an Arc> wrapper (`compute_and_publish`). Per library: `total`, `with_playlist`, `pending`, `unsupported`, `hashless_videos`. Dedup is by content_hash so duplicate-bytes-at- N-paths counts once (same domain rule as `faces::stats`). `hashless_videos` is a separate counter so the operator can see the "hash backfill, then transcode" pipeline depth instead of having NULL-hash rows just hide. - Prometheus gauges labeled by library name: `imageserver_hls_videos_total`, `..._with_playlist`, `..._pending`, `..._unsupported`. Updated by the watcher at the end of every full- scan tick *and* on every `/hls/stats` hit, so whichever surface the operator is watching stays fresh. Registered in `main` alongside the existing image/video gauges. - `GET /hls/stats` — Claims-protected JSON snapshot of the same data plus a top-level cross-library aggregate. Runs on a blocking pool so it doesn't pin the actix worker; per-call cost is one `list_paths_and_hashes_for_library` SQL query per library plus a `stat()` per distinct video hash. Bounded — never invoked from middleware, only from the explicit endpoint and the full-scan tick. The watcher's end-of-tick `info!` summary line mirrors the endpoint output for operators tailing the log. - New `ExifDao::list_paths_and_hashes_for_library` method: `SELECT rel_path, content_hash FROM image_exif WHERE library_id = ?`. Single round-trip; callers filter to video extensions client-side because the schema doesn't carry media-type. Mock impl in `files.rs` returns an empty vec. Tests in `hls_stats::tests` exercise stats_from_rows directly (videos- only filter, hash dedup, playlist vs sentinel decision, NULL-hash hashless counting) plus a publish_gauges round-trip that reads the gauge value back. Full suite (347 lib + 360 bin = 707) passes. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/database/mod.rs | 35 ++++ src/files.rs | 8 + src/hls_stats.rs | 410 ++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 20 +++ src/watcher.rs | 15 ++ 5 files changed, 488 insertions(+) create mode 100644 src/hls_stats.rs diff --git a/src/database/mod.rs b/src/database/mod.rs index d5a4d4a..873b662 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -424,6 +424,17 @@ pub trait ExifDao: Sync + Send { context: &opentelemetry::Context, ) -> Result, DbError>; + /// Every row in `image_exif` for `library_id`, as + /// `(rel_path, content_hash)`. The hash is Option because rows + /// mid-backfill carry NULL. Used by HLS readiness stats; callers + /// filter by extension client-side because the DB schema doesn't + /// carry media type. + fn list_paths_and_hashes_for_library( + &mut self, + context: &opentelemetry::Context, + library_id: i32, + ) -> Result)>, DbError>; + /// Return image_exif rows that need their `date_taken` resolved by the /// canonical-date waterfall (see `crate::date_resolver`): `date_taken /// IS NULL`. Returns `(library_id, rel_path)`. The caller filters to @@ -1261,6 +1272,30 @@ impl ExifDao for SqliteExifDao { .map_err(|_| DbError::new(DbErrorKind::QueryError)) } + fn list_paths_and_hashes_for_library( + &mut self, + context: &opentelemetry::Context, + lib_id: i32, + ) -> Result)>, DbError> { + trace_db_call( + context, + "query", + "list_paths_and_hashes_for_library", + |_span| { + use schema::image_exif::dsl::*; + + let mut connection = self.connection.lock().expect("Unable to get ExifDao"); + + image_exif + .filter(library_id.eq(lib_id)) + .select((rel_path, content_hash)) + .load::<(String, Option)>(connection.deref_mut()) + .map_err(|_| anyhow::anyhow!("Query error")) + }, + ) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + fn get_rows_needing_date_backfill( &mut self, context: &opentelemetry::Context, diff --git a/src/files.rs b/src/files.rs index 93d4345..91904e3 100644 --- a/src/files.rs +++ b/src/files.rs @@ -1696,6 +1696,14 @@ mod tests { Ok(Vec::new()) } + fn list_paths_and_hashes_for_library( + &mut self, + _context: &opentelemetry::Context, + _library_id: i32, + ) -> Result)>, DbError> { + Ok(Vec::new()) + } + fn get_rows_needing_date_backfill( &mut self, _context: &opentelemetry::Context, diff --git a/src/hls_stats.rs b/src/hls_stats.rs new file mode 100644 index 0000000..3e7fb03 --- /dev/null +++ b/src/hls_stats.rs @@ -0,0 +1,410 @@ +//! Per-library HLS readiness: Prometheus gauges + `/hls/stats` endpoint. +//! +//! The new hash-keyed pipeline transcodes lazily — most of a freshly +//! mounted library is "pending" for the first hour, and operators want +//! a live read on "how much work is left, am I CPU-bound, do I need to +//! bump `HLS_CONCURRENCY`." This module supplies both surfaces against +//! the same compute path: +//! +//! - **Prometheus gauges** `imageserver_hls_videos_total{library}`, +//! `..._with_playlist{library}`, `..._pending{library}`, +//! `..._unsupported{library}`. Updated every watcher full-scan tick +//! and on every `/hls/stats` request, so the freshness matches +//! whichever surface the operator is watching. +//! +//! - **`GET /hls/stats`** returns a JSON snapshot of the same counts +//! plus a top-level cross-library aggregate. Claims-protected +//! (matches every other authenticated read in this crate). +//! +//! Cost is O(distinct video hashes per library), each row needing a +//! single `stat()` on the playlist file. On a 100k-video library that's +//! noticeable; on a typical home library (few thousand) it's noise. +//! We call from explicit triggers only — never per-request from +//! middleware — so the cost is bounded. + +use std::collections::HashSet; +use std::path::Path; +use std::sync::{Arc, Mutex}; + +use actix_web::{HttpResponse, Responder, get, web}; +use lazy_static::lazy_static; +use log::{info, warn}; +use prometheus::IntGaugeVec; +use serde::Serialize; + +use crate::data::Claims; +use crate::database::ExifDao; +use crate::file_types; +use crate::libraries::Library; +use crate::state::AppState; +use crate::video::hls_paths; + +lazy_static! { + pub static ref HLS_VIDEOS_TOTAL: IntGaugeVec = IntGaugeVec::new( + prometheus::Opts::new( + "imageserver_hls_videos_total", + "Distinct video content hashes per library known to image_exif", + ), + &["library"], + ) + .expect("HLS_VIDEOS_TOTAL"); + pub static ref HLS_VIDEOS_WITH_PLAYLIST: IntGaugeVec = IntGaugeVec::new( + prometheus::Opts::new( + "imageserver_hls_videos_with_playlist", + "Videos whose hash-keyed HLS playlist is already on disk", + ), + &["library"], + ) + .expect("HLS_VIDEOS_WITH_PLAYLIST"); + pub static ref HLS_VIDEOS_PENDING: IntGaugeVec = IntGaugeVec::new( + prometheus::Opts::new( + "imageserver_hls_videos_pending", + "Videos whose hash-keyed HLS playlist is not yet on disk", + ), + &["library"], + ) + .expect("HLS_VIDEOS_PENDING"); + pub static ref HLS_VIDEOS_UNSUPPORTED: IntGaugeVec = IntGaugeVec::new( + prometheus::Opts::new( + "imageserver_hls_videos_unsupported", + "Videos with an `.unsupported` sentinel — ffmpeg refused; \ + operator must delete to retry", + ), + &["library"], + ) + .expect("HLS_VIDEOS_UNSUPPORTED"); +} + +/// Per-library HLS readiness snapshot. +#[derive(Serialize, Debug, Clone, PartialEq, Eq)] +pub struct HlsLibraryStats { + pub library_id: i32, + pub library: String, + /// Distinct video content hashes (dedupes intra-library bytes-at-N-paths). + pub total: usize, + /// Of `total`, hashes whose `playlist.m3u8` is on disk. + pub with_playlist: usize, + /// Of `total`, hashes whose ffmpeg attempt left a `.unsupported` + /// sentinel. Counted separately because they won't progress without + /// operator intervention (delete the sentinel to retry). + pub unsupported: usize, + /// `total - (with_playlist + unsupported)` — videos awaiting transcode. + pub pending: usize, + /// Distinct rel_paths under this library that are video files but + /// whose `image_exif.content_hash` is still NULL (mid-backfill). + /// These don't yet count toward `total` because they're invisible + /// to the hash-keyed pipeline; surfaced so the operator can see + /// "hash backfill, then transcode" pipeline depth. + pub hashless_videos: usize, +} + +/// JSON response body for `GET /hls/stats`. +#[derive(Serialize, Debug)] +pub struct HlsStatsResponse { + pub libraries: Vec, + pub total: usize, + pub with_playlist: usize, + pub pending: usize, + pub unsupported: usize, + pub hashless_videos: usize, +} + +/// Compute current readiness per library and publish to Prometheus. +/// Returns the same data so callers can serialise it. The publish step +/// is idempotent on the gauge — old values get overwritten. +pub fn compute_and_publish( + libraries: &[Library], + exif_dao: &Arc>>, + video_dir: &Path, +) -> Vec { + let ctx = opentelemetry::Context::new(); + let mut out = Vec::with_capacity(libraries.len()); + for lib in libraries { + let stats = compute_for_library(&ctx, lib, exif_dao, video_dir); + publish_gauges(&stats); + out.push(stats); + } + out +} + +fn publish_gauges(s: &HlsLibraryStats) { + HLS_VIDEOS_TOTAL + .with_label_values(&[s.library.as_str()]) + .set(s.total as i64); + HLS_VIDEOS_WITH_PLAYLIST + .with_label_values(&[s.library.as_str()]) + .set(s.with_playlist as i64); + HLS_VIDEOS_PENDING + .with_label_values(&[s.library.as_str()]) + .set(s.pending as i64); + HLS_VIDEOS_UNSUPPORTED + .with_label_values(&[s.library.as_str()]) + .set(s.unsupported as i64); +} + +fn compute_for_library( + ctx: &opentelemetry::Context, + lib: &Library, + exif_dao: &Arc>>, + video_dir: &Path, +) -> HlsLibraryStats { + let rows = { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + match dao.list_paths_and_hashes_for_library(ctx, lib.id) { + Ok(r) => r, + Err(e) => { + warn!( + "hls_stats: list_paths_and_hashes_for_library failed for lib {}: {:?}", + lib.id, e + ); + Vec::new() + } + } + }; + stats_from_rows(lib, &rows, video_dir) +} + +/// Pure function — same compute as [`compute_for_library`] but works +/// on caller-supplied rows. Split out so tests don't need a full +/// `ExifDao` mock; the integration path is exercised through +/// `compute_and_publish` against the real SQLite DAO at runtime. +fn stats_from_rows( + lib: &Library, + rows: &[(String, Option)], + video_dir: &Path, +) -> HlsLibraryStats { + let mut hashes: HashSet = HashSet::new(); + let mut hashless_videos = 0usize; + for (rel_path, hash_opt) in rows { + if !file_types::is_video_file(Path::new(rel_path)) { + continue; + } + match hash_opt { + Some(h) => { + hashes.insert(h.clone()); + } + None => { + hashless_videos += 1; + } + } + } + + let mut with_playlist = 0usize; + let mut unsupported = 0usize; + for h in &hashes { + if hls_paths::playlist_for_hash(video_dir, h).exists() { + with_playlist += 1; + } else if hls_paths::sentinel_for_hash(video_dir, h).exists() { + unsupported += 1; + } + } + let total = hashes.len(); + let pending = total.saturating_sub(with_playlist + unsupported); + + HlsLibraryStats { + library_id: lib.id, + library: lib.name.clone(), + total, + with_playlist, + unsupported, + pending, + hashless_videos, + } +} + +/// Log a single info line summarising readiness across all libraries. +/// Called by the watcher at the end of a full-scan tick so operators +/// who tail the log see the headline number without scraping +/// Prometheus. +pub fn log_summary(stats: &[HlsLibraryStats]) { + let total: usize = stats.iter().map(|s| s.total).sum(); + let with_playlist: usize = stats.iter().map(|s| s.with_playlist).sum(); + let pending: usize = stats.iter().map(|s| s.pending).sum(); + let unsupported: usize = stats.iter().map(|s| s.unsupported).sum(); + let hashless: usize = stats.iter().map(|s| s.hashless_videos).sum(); + + let per_lib: Vec = stats + .iter() + .map(|s| { + format!( + "{}={}/{} pending={} unsupported={} hashless={}", + s.library, s.with_playlist, s.total, s.pending, s.unsupported, s.hashless_videos, + ) + }) + .collect(); + + info!( + "HLS readiness: {}/{} playlists on disk, {} pending, {} unsupported, {} hashless videos | per-library: [{}]", + with_playlist, + total, + pending, + unsupported, + hashless, + per_lib.join(", "), + ); +} + +#[get("/hls/stats")] +pub async fn hls_stats_handler( + _claims: Claims, + app_state: web::Data, + exif_dao: web::Data>>, +) -> impl Responder { + let libraries = app_state.libraries.clone(); + let video_dir = std::path::PathBuf::from(&app_state.video_path); + let exif_dao = exif_dao.into_inner(); + + // Synchronous file IO + DB query — run on a blocking pool so the + // actix worker thread stays free for other requests. + let stats = match web::block(move || compute_and_publish(&libraries, &exif_dao, &video_dir)) + .await + { + Ok(s) => s, + Err(e) => { + warn!("/hls/stats: blocking task failed: {:?}", e); + Vec::new() + } + }; + + let total: usize = stats.iter().map(|s| s.total).sum(); + let with_playlist: usize = stats.iter().map(|s| s.with_playlist).sum(); + let pending: usize = stats.iter().map(|s| s.pending).sum(); + let unsupported: usize = stats.iter().map(|s| s.unsupported).sum(); + let hashless_videos: usize = stats.iter().map(|s| s.hashless_videos).sum(); + + HttpResponse::Ok().json(HlsStatsResponse { + libraries: stats, + total, + with_playlist, + pending, + unsupported, + hashless_videos, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + fn lib(id: i32, name: &str) -> Library { + Library { + id, + name: name.into(), + root_path: String::new(), + enabled: true, + excluded_dirs: Vec::new(), + } + } + + fn rows(vs: Vec<(&str, Option<&str>)>) -> Vec<(String, Option)> { + vs.into_iter() + .map(|(p, h)| (p.to_string(), h.map(|s| s.to_string()))) + .collect() + } + + fn touch(dir: &Path, rel: &str) { + let p = dir.join(rel); + std::fs::create_dir_all(p.parent().unwrap()).unwrap(); + std::fs::write(p, b"").unwrap(); + } + + #[test] + fn videos_only_count_in_total() { + let tmp = tempdir().unwrap(); + let r = rows(vec![ + ("photos/IMG.jpg", Some(&"a".repeat(64))), // image: ignored + ("clip.mp4", Some(&"b".repeat(64))), + ("vid.mov", Some(&"c".repeat(64))), + ]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 2); + assert_eq!(stats.with_playlist, 0); + assert_eq!(stats.pending, 2); + assert_eq!(stats.unsupported, 0); + assert_eq!(stats.hashless_videos, 0); + } + + #[test] + fn hash_dedup_collapses_duplicate_rel_paths() { + let tmp = tempdir().unwrap(); + let r = rows(vec![ + ("a/clip.mp4", Some(&"a".repeat(64))), + ("b/clip.mp4", Some(&"a".repeat(64))), // same bytes, dup + ("other.mp4", Some(&"b".repeat(64))), + ]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 2, "duplicate hashes collapse"); + } + + #[test] + fn playlist_existence_promotes_to_with_playlist() { + let tmp = tempdir().unwrap(); + let hash = "a".repeat(64); + touch(tmp.path(), &format!("aa/{}/playlist.m3u8", hash)); + + let r = rows(vec![("clip.mp4", Some(&hash))]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 1); + assert_eq!(stats.with_playlist, 1); + assert_eq!(stats.pending, 0); + } + + #[test] + fn sentinel_existence_promotes_to_unsupported() { + let tmp = tempdir().unwrap(); + let hash = "b".repeat(64); + touch(tmp.path(), &format!("bb/{}/playlist.unsupported", hash)); + + let r = rows(vec![("clip.mov", Some(&hash))]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 1); + assert_eq!(stats.unsupported, 1); + assert_eq!(stats.with_playlist, 0); + assert_eq!(stats.pending, 0); + } + + #[test] + fn null_hash_videos_are_hashless_not_total() { + let tmp = tempdir().unwrap(); + let r = rows(vec![ + ("clip.mp4", None), + ("other.mp4", Some(&"a".repeat(64))), + ]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 1, "hashless row excluded from total"); + assert_eq!(stats.hashless_videos, 1); + } + + #[test] + fn publish_gauges_sets_per_library_value() { + let s = HlsLibraryStats { + library_id: 7, + library: "test_publish_a".into(), + total: 5, + with_playlist: 2, + pending: 3, + unsupported: 0, + hashless_videos: 0, + }; + publish_gauges(&s); + assert_eq!( + HLS_VIDEOS_TOTAL + .with_label_values(&["test_publish_a"]) + .get(), + 5 + ); + assert_eq!( + HLS_VIDEOS_PENDING + .with_label_values(&["test_publish_a"]) + .get(), + 3 + ); + assert_eq!( + HLS_VIDEOS_WITH_PLAYLIST + .with_label_values(&["test_publish_a"]) + .get(), + 2 + ); + } +} diff --git a/src/main.rs b/src/main.rs index cf493b7..620235e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,6 +45,7 @@ mod file_types; mod files; mod geo; mod handlers; +mod hls_stats; mod libraries; mod library_maintenance; mod perceptual_hash; @@ -126,6 +127,24 @@ fn main() -> std::io::Result<()> { .registry .register(Box::new(thumbnails::VIDEO_GAUGE.clone())) .unwrap(); + // HLS readiness gauges. Updated by the watcher every full-scan + // tick and on every `/hls/stats` request. See `hls_stats`. + prometheus + .registry + .register(Box::new(hls_stats::HLS_VIDEOS_TOTAL.clone())) + .unwrap(); + prometheus + .registry + .register(Box::new(hls_stats::HLS_VIDEOS_WITH_PLAYLIST.clone())) + .unwrap(); + prometheus + .registry + .register(Box::new(hls_stats::HLS_VIDEOS_PENDING.clone())) + .unwrap(); + prometheus + .registry + .register(Box::new(hls_stats::HLS_VIDEOS_UNSUPPORTED.clone())) + .unwrap(); let app_state = app_data.clone(); @@ -270,6 +289,7 @@ fn main() -> std::io::Result<()> { .service(handlers::video::get_video_preview) .service(handlers::video::get_preview_status) .service(handlers::video::get_video_part) + .service(hls_stats::hls_stats_handler) .service(handlers::favorites::favorites) .service(handlers::favorites::put_add_favorite) .service(handlers::favorites::delete_favorite) diff --git a/src/watcher.rs b/src/watcher.rs index 6182ddc..6f04855 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -32,6 +32,7 @@ use crate::exif; use crate::face_watch; use crate::faces; use crate::file_types; +use crate::hls_stats; use crate::libraries; use crate::library_maintenance; use crate::perceptual_hash; @@ -580,6 +581,20 @@ pub fn watch_files( } if is_full_scan { + // End-of-full-scan HLS readiness summary: log a single + // info line + refresh the Prometheus gauges. Skipped on + // quick scans because the cost is non-trivial on big + // libraries and the data only meaningfully changes on + // full passes. + let video_dir_str = + dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); + let stats = hls_stats::compute_and_publish( + &libs, + &exif_dao, + Path::new(&video_dir_str), + ); + hls_stats::log_summary(&stats); + last_full_scan = now; } last_quick_scan = now; -- 2.49.1 From 8c91bf554b29788f7912f490184c5bc01e6f3bdd Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 14 May 2026 16:01:16 -0400 Subject: [PATCH 07/10] hls: cargo fmt + clippy::cloned_ref_to_slice_refs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pure mechanical pass on the files this branch added/modified: rustfmt reflow of a few long lines / chains, and the one non-pre-existing clippy warning — replacing `&[rel_path.clone()]` with `std::slice::from_ref(&rel_path)` in `handlers::video::generate_video` to avoid the alloc + clone for a single-element slice. All 707 tests still pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/handlers/video.rs | 15 +++++++++------ src/hls_stats.rs | 17 ++++++++--------- src/video/legacy_migration.rs | 5 +---- src/watcher.rs | 35 +++++++++++++---------------------- 4 files changed, 31 insertions(+), 41 deletions(-) diff --git a/src/handlers/video.rs b/src/handlers/video.rs index 21338b0..c535fac 100644 --- a/src/handlers/video.rs +++ b/src/handlers/video.rs @@ -80,11 +80,10 @@ pub async fn generate_video( return HttpResponse::BadRequest().finish(); }; - let preferred_library = - libraries::resolve_library_param(&app_state, body.library.as_deref()) - .ok() - .flatten() - .unwrap_or_else(|| app_state.primary_library()); + let preferred_library = libraries::resolve_library_param(&app_state, body.library.as_deref()) + .ok() + .flatten() + .unwrap_or_else(|| app_state.primary_library()); // Try the resolved library first, then fall back to any other library // that actually contains the file — handles union-mode requests where @@ -122,7 +121,11 @@ pub async fn generate_video( // already-ingested videos. let hash_from_db: Option = { let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); - match dao.get_exif_batch(&context, Some(resolved_library_id), &[rel_path.clone()]) { + match dao.get_exif_batch( + &context, + Some(resolved_library_id), + std::slice::from_ref(&rel_path), + ) { Ok(rows) => rows.into_iter().next().and_then(|r| r.content_hash), Err(e) => { warn!( diff --git a/src/hls_stats.rs b/src/hls_stats.rs index 3e7fb03..c587aaf 100644 --- a/src/hls_stats.rs +++ b/src/hls_stats.rs @@ -256,15 +256,14 @@ pub async fn hls_stats_handler( // Synchronous file IO + DB query — run on a blocking pool so the // actix worker thread stays free for other requests. - let stats = match web::block(move || compute_and_publish(&libraries, &exif_dao, &video_dir)) - .await - { - Ok(s) => s, - Err(e) => { - warn!("/hls/stats: blocking task failed: {:?}", e); - Vec::new() - } - }; + let stats = + match web::block(move || compute_and_publish(&libraries, &exif_dao, &video_dir)).await { + Ok(s) => s, + Err(e) => { + warn!("/hls/stats: blocking task failed: {:?}", e); + Vec::new() + } + }; let total: usize = stats.iter().map(|s| s.total).sum(); let with_playlist: usize = stats.iter().map(|s| s.with_playlist).sum(); diff --git a/src/video/legacy_migration.rs b/src/video/legacy_migration.rs index 0db8e25..ed6863c 100644 --- a/src/video/legacy_migration.rs +++ b/src/video/legacy_migration.rs @@ -38,10 +38,7 @@ pub struct RetireStats { impl RetireStats { pub fn total_deleted(&self) -> usize { - self.deleted_playlists - + self.deleted_segments - + self.deleted_sentinels - + self.deleted_tmp + self.deleted_playlists + self.deleted_segments + self.deleted_sentinels + self.deleted_tmp } } diff --git a/src/watcher.rs b/src/watcher.rs index 6f04855..13ac1cd 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -79,9 +79,10 @@ pub fn cleanup_orphaned_playlists( info!(" Cleanup interval: {} seconds", cleanup_interval_secs); info!(" HLS directory: {}", video_path.display()); - let exif_dao: Arc>> = Arc::new(Mutex::new( - Box::new(SqliteExifDao::new()) as Box - )); + let exif_dao: Arc>> = Arc::new(Mutex::new(Box::new( + SqliteExifDao::new(), + ) + as Box)); loop { std::thread::sleep(Duration::from_secs(cleanup_interval_secs)); @@ -163,11 +164,7 @@ pub fn cleanup_orphaned_playlists( for shard_entry in read_root.flatten() { let shard_path = shard_entry.path(); - if !shard_entry - .file_type() - .map(|t| t.is_dir()) - .unwrap_or(false) - { + if !shard_entry.file_type().map(|t| t.is_dir()).unwrap_or(false) { continue; } let shard_name = match shard_path.file_name().and_then(|n| n.to_str()) { @@ -194,16 +191,14 @@ pub fn cleanup_orphaned_playlists( let mut shard_emptied = true; for hash_entry in read_shard.flatten() { let hash_path = hash_entry.path(); - if !hash_entry - .file_type() - .map(|t| t.is_dir()) - .unwrap_or(false) - { + if !hash_entry.file_type().map(|t| t.is_dir()).unwrap_or(false) { shard_emptied = false; continue; } - let Some(hash_name) = - hash_path.file_name().and_then(|n| n.to_str()).map(|n| n.to_owned()) + let Some(hash_name) = hash_path + .file_name() + .and_then(|n| n.to_str()) + .map(|n| n.to_owned()) else { shard_emptied = false; continue; @@ -586,13 +581,9 @@ pub fn watch_files( // quick scans because the cost is non-trivial on big // libraries and the data only meaningfully changes on // full passes. - let video_dir_str = - dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); - let stats = hls_stats::compute_and_publish( - &libs, - &exif_dao, - Path::new(&video_dir_str), - ); + let video_dir_str = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); + let stats = + hls_stats::compute_and_publish(&libs, &exif_dao, Path::new(&video_dir_str)); hls_stats::log_summary(&stats); last_full_scan = now; -- 2.49.1 From 8503ef78840b4527a44398af32f0d29da97e7e99 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 14 May 2026 16:25:05 -0400 Subject: [PATCH 08/10] chore: cargo fmt + clippy --fix sweep across the crate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pure mechanical cleanup of accumulated drift in files outside the HLS-content-hash branch's main change set. No behavior change. - `cargo fmt` on every previously-misformatted file (`ai/insight_generator.rs`, `database/knowledge_dao.rs`, `faces.rs`, `knowledge.rs`, `libraries.rs`). - `cargo clippy --fix`: - `needless_borrow`: `&library` → `library` in `handlers/image.rs` (two sites in the photo-listing path). - Manual clippy pass for warnings clippy emits but can't auto-apply: - `field_reassign_with_default` in `database/reconcile.rs::run` — consolidated into a struct-literal initializer. - `needless_range_loop` in `database/knowledge_dao.rs::union_perceptual_tags` — inner `for b in (a+1)..indices.len() { let ib = indices[b]; ... }` becomes `for &ib in &indices[a + 1..] { ... }`. - Doc-list indentation: continuation lines under nested bullets in `database/mod.rs::get_memories_in_window` and `database/knowledge_dao.rs::build_entity_graph` realigned to the list-item content column. Deliberately not touched (each deserves its own focused commit, with testing, rather than getting bundled into a sweep): - 4× `deprecated count_distinct` in `faces.rs` — diesel API migration to `AggregateExpressionMethods::aggregate_distinct` may shift result types; needs verification against the existing stats queries. - `await_holding_lock` in `knowledge.rs:807` — `std::sync::Mutex` held across `ollama.generate(...).await`. Genuine concurrency bug; fix requires understanding the surrounding flow before just dropping the guard. - 2× `type_complexity` in `database/mod.rs` — cosmetic, would need a `type` alias and corresponding callers updated. - Dead `total_deleted` on `library_maintenance::GcStats` and `file_scan::enumerate_indexable_files` — both are public surface retained for future use; deletion is a separate decision. All 707 tests still pass. Release build clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ai/insight_generator.rs | 5 +++- src/database/knowledge_dao.rs | 14 ++++++++--- src/database/mod.rs | 6 ++--- src/database/reconcile.rs | 46 +++++++++++++++++------------------ src/faces.rs | 5 +++- src/handlers/image.rs | 21 ++++++++-------- src/knowledge.rs | 11 +++------ src/libraries.rs | 21 ++++++++-------- 8 files changed, 67 insertions(+), 62 deletions(-) diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index 3617261..db3fb20 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -4548,7 +4548,10 @@ mod tests { #[test] fn strip_mark_tags_handles_common_patterns() { - assert_eq!(InsightGenerator::strip_mark_tags("plain text"), "plain text"); + assert_eq!( + InsightGenerator::strip_mark_tags("plain text"), + "plain text" + ); assert_eq!( InsightGenerator::strip_mark_tags("…the lake…"), "…the lake…" diff --git a/src/database/knowledge_dao.rs b/src/database/knowledge_dao.rs index 069dd38..06b2b2d 100644 --- a/src/database/knowledge_dao.rs +++ b/src/database/knowledge_dao.rs @@ -235,6 +235,7 @@ pub trait KnowledgeDao: Sync + Send { /// - entity_type: optional, restricts nodes to one type /// - node_limit: caps the number of nodes; lower-fact-count /// entities drop first + /// /// Edges between dropped entities are pruned. Persona scoping /// affects fact_count + edge inclusion (rejected / superseded /// excluded; All vs Single mirrors the existing pattern). @@ -937,7 +938,10 @@ impl KnowledgeDao for SqliteKnowledgeDao { let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let mut q = sql_query(sql).into_boxed(); match persona { - PersonaFilter::Single { user_id, persona_id } => { + PersonaFilter::Single { + user_id, + persona_id, + } => { q = q .bind::(*user_id) .bind::(persona_id.clone()); @@ -977,7 +981,10 @@ impl KnowledgeDao for SqliteKnowledgeDao { // rows flip — REVIEWED survives so the curator can preserve // a hand-approved exception under the same predicate. let touched = match persona { - PersonaFilter::Single { user_id: uid, persona_id: pid } => diesel::update( + PersonaFilter::Single { + user_id: uid, + persona_id: pid, + } => diesel::update( entity_facts .filter(predicate.eq(target_predicate)) .filter(user_id.eq(*uid)) @@ -1282,8 +1289,7 @@ impl KnowledgeDao for SqliteKnowledgeDao { Some(v) => v, None => continue, }; - for b in (a + 1)..indices.len() { - let ib = indices[b]; + for &ib in &indices[a + 1..] { let vb = match &decoded[ib] { Some(v) => v, None => continue, diff --git a/src/database/mod.rs b/src/database/mod.rs index 873b662..4a20702 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -502,9 +502,9 @@ pub trait ExifDao: Sync + Send { /// whose calendar position matches the request's span: /// - `"day"` — same month + day-of-month (any year) /// - `"week"` — same week-of-year (SQLite `%W`, Monday-anchored — - /// close to but not exactly ISO week 8601; the - /// boundary cases at year-start/end can shift by ±1 - /// vs the prior request-time `iso_week()` filter) + /// close to but not exactly ISO week 8601; the boundary cases + /// at year-start/end can shift by ±1 vs the prior request-time + /// `iso_week()` filter) /// - `"month"` — same month (any year) /// /// `tz_offset_minutes` is applied to both sides of the strftime diff --git a/src/database/reconcile.rs b/src/database/reconcile.rs index 57f69f3..2fade4e 100644 --- a/src/database/reconcile.rs +++ b/src/database/reconcile.rs @@ -57,30 +57,28 @@ impl ReconcileStats { /// watcher tick. Errors are logged but never propagated; reconciliation /// is best-effort and a transient DB hiccup must not stall the watcher. pub fn run(conn: &mut SqliteConnection) -> ReconcileStats { - let mut stats = ReconcileStats::default(); - - stats.tagged_photo_hashes_filled = match backfill_tagged_photo_hashes(conn) { - Ok(n) => n, - Err(e) => { - warn!("reconcile: tagged_photo hash backfill failed: {:?}", e); - 0 - } - }; - - stats.photo_insights_hashes_filled = match backfill_photo_insights_hashes(conn) { - Ok(n) => n, - Err(e) => { - warn!("reconcile: photo_insights hash backfill failed: {:?}", e); - 0 - } - }; - - stats.photo_insights_demoted = match collapse_insight_currents(conn) { - Ok(n) => n, - Err(e) => { - warn!("reconcile: photo_insights scalar merge failed: {:?}", e); - 0 - } + let stats = ReconcileStats { + tagged_photo_hashes_filled: match backfill_tagged_photo_hashes(conn) { + Ok(n) => n, + Err(e) => { + warn!("reconcile: tagged_photo hash backfill failed: {:?}", e); + 0 + } + }, + photo_insights_hashes_filled: match backfill_photo_insights_hashes(conn) { + Ok(n) => n, + Err(e) => { + warn!("reconcile: photo_insights hash backfill failed: {:?}", e); + 0 + } + }, + photo_insights_demoted: match collapse_insight_currents(conn) { + Ok(n) => n, + Err(e) => { + warn!("reconcile: photo_insights scalar merge failed: {:?}", e); + 0 + } + }, }; if stats.changed() { diff --git a/src/faces.rs b/src/faces.rs index f4bd5cc..ba47508 100644 --- a/src/faces.rs +++ b/src/faces.rs @@ -2118,7 +2118,10 @@ async fn update_face_handler( // the short context string we surface in the response body — // SQLITE_BUSY here usually means another DAO's writer held the // lock past `busy_timeout` (5s), which is invisible in `{}`. - warn!("PATCH /image/faces/{}: 500 — update_face failed: {:#}", id, e); + warn!( + "PATCH /image/faces/{}: 500 — update_face failed: {:#}", + id, e + ); return HttpResponse::InternalServerError().body(e.to_string()); } }; diff --git a/src/handlers/image.rs b/src/handlers/image.rs index 7266e34..07e977d 100644 --- a/src/handlers/image.rs +++ b/src/handlers/image.rs @@ -183,14 +183,15 @@ pub async fn get_image( // review JPEG, ~1–2 MP). Falls through to NamedFile if no preview is // available, which preserves the historical behavior for callers // that genuinely want the original bytes. - if image_size == PhotoSize::Full && exif::is_tiff_raw(&path) { - if let Some(preview) = exif::extract_embedded_jpeg_preview(&path) { - span.set_status(Status::Ok); - return HttpResponse::Ok() - .content_type("image/jpeg") - .insert_header(("Cache-Control", "public, max-age=3600")) - .body(preview); - } + if image_size == PhotoSize::Full + && exif::is_tiff_raw(&path) + && let Some(preview) = exif::extract_embedded_jpeg_preview(&path) + { + span.set_status(Status::Ok); + return HttpResponse::Ok() + .content_type("image/jpeg") + .insert_header(("Cache-Control", "public, max-age=3600")) + .body(preview); } if let Ok(file) = NamedFile::open(&path) { @@ -706,7 +707,7 @@ pub async fn set_image_date( Ok(row) => { span.set_status(Status::Ok); HttpResponse::Ok().json(build_metadata_response_for_date_mutation( - &library, + library, &normalized_path, row, )) @@ -757,7 +758,7 @@ pub async fn clear_image_date( Ok(row) => { span.set_status(Status::Ok); HttpResponse::Ok().json(build_metadata_response_for_date_mutation( - &library, + library, &normalized_path, row, )) diff --git a/src/knowledge.rs b/src/knowledge.rs index 4c3f5a8..66815b2 100644 --- a/src/knowledge.rs +++ b/src/knowledge.rs @@ -444,8 +444,7 @@ where ) .service(web::resource("/graph").route(web::get().to(get_graph::))) .service( - web::resource("/predicate-stats") - .route(web::get().to(get_predicate_stats::)), + web::resource("/predicate-stats").route(web::get().to(get_predicate_stats::)), ) .service( web::resource("/predicates/{predicate}/bulk-reject") @@ -1261,12 +1260,8 @@ async fn bulk_reject_predicate( let persona = resolve_persona_filter(&req, &claims, &persona_dao); let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); - match dao.bulk_reject_facts_by_predicate( - &cx, - &persona, - &predicate, - Some(("manual", "manual")), - ) { + match dao.bulk_reject_facts_by_predicate(&cx, &persona, &predicate, Some(("manual", "manual"))) + { Ok(rejected) => HttpResponse::Ok().json(BulkRejectResponse { rejected }), Err(e) => { log::error!("bulk_reject_predicate error: {:?}", e); diff --git a/src/libraries.rs b/src/libraries.rs index 59b614a..6248cfa 100644 --- a/src/libraries.rs +++ b/src/libraries.rs @@ -94,7 +94,7 @@ pub fn parse_excluded_dirs_column(raw: Option<&str>) -> Vec { match raw { None => Vec::new(), Some(s) => s - .split(|c: char| matches!(c, ',' | '\n' | '\r')) + .split([',', '\n', '\r']) .map(str::trim) .filter(|s| !s.is_empty()) .map(String::from) @@ -148,10 +148,7 @@ pub fn validate_excluded_dirs_entry(entry: &str) -> Result { if let Some(rel) = trimmed.strip_prefix('/') { // Path form. Reject `..` traversal — `base.join(\"../x\")` doesn't // canonicalise, so `path.starts_with(...)` never matches. - if rel - .split('/') - .any(|seg| seg == "..") - { + if rel.split('/').any(|seg| seg == "..") { return Err(format!( "'{}': '..' segments don't normalise — the prefix-match never fires", trimmed @@ -542,7 +539,10 @@ pub async fn patch_library( { Ok(n) => affected = affected.max(n), Err(e) => { - warn!("PATCH /libraries/{}: enabled update failed: {:?}", lib_id, e); + warn!( + "PATCH /libraries/{}: enabled update failed: {:?}", + lib_id, e + ); return HttpResponse::InternalServerError().body(format!("{}", e)); } } @@ -600,7 +600,9 @@ pub async fn patch_library( ); HttpResponse::Ok().json(lib) } - None => HttpResponse::NotFound().body(format!("library id {} not found after update", lib_id)), + None => { + HttpResponse::NotFound().body(format!("library id {} not found after update", lib_id)) + } } } @@ -930,10 +932,7 @@ mod tests { #[test] fn validate_strips_trailing_slash_on_path_entries() { - assert_eq!( - validate_excluded_dirs_entry("/photos/").unwrap(), - "/photos" - ); + assert_eq!(validate_excluded_dirs_entry("/photos/").unwrap(), "/photos"); assert_eq!( validate_excluded_dirs_entry("/photos//").unwrap(), "/photos" -- 2.49.1 From c30cadde02c6f096fc1fb63873bd857d359b0539 Mon Sep 17 00:00:00 2001 From: Cameron Date: Fri, 15 May 2026 15:10:02 -0400 Subject: [PATCH 09/10] ai: fix UTF-8 byte-slice panics in insight_generator log/truncation paths Switch four `&s[..N]` / `&s[..s.len().min(N)]` sites to `chars().take(N).collect::()` so truncation lands on character boundaries instead of mid-codepoint. The agentic summary preview log was panicking when generated content hit an em-dash at byte 200; the few-shot passage cap, brief_json_args debug formatter, and a test assertion message had the same latent bug. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ai/insight_generator.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index db3fb20..2e2da33 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -1749,8 +1749,8 @@ Return ONLY the summary, nothing else."#, .iter() .enumerate() .map(|(i, c)| { - let trimmed = if c.len() > 1000 { - format!("{}…", &c[..1000]) + let trimmed = if c.chars().count() > 1000 { + format!("{}…", c.chars().take(1000).collect::()) } else { c.clone() }; @@ -3406,8 +3406,8 @@ Return ONLY the summary, nothing else."#, obj.iter() .map(|(k, v)| { let rendered = match v { - serde_json::Value::String(s) if s.len() > 40 => { - format!("\"{}...\"", &s[..40]) + serde_json::Value::String(s) if s.chars().count() > 40 => { + format!("\"{}...\"", s.chars().take(40).collect::()) } _ => v.to_string(), }; @@ -4088,10 +4088,11 @@ Return ONLY the summary, nothing else."#, let title = title_raw.trim().trim_matches('"').to_string(); log::info!("Agentic generated title: {}", title); + let summary_preview: String = final_content.chars().take(200).collect(); log::info!( "Agentic generated summary ({} chars): {}", final_content.len(), - &final_content[..final_content.len().min(200)] + summary_preview ); // 14. Serialize the full message history for training data @@ -4671,7 +4672,7 @@ mod tests { assert!( out.starts_with("You are a journal writer in first person, warm and reflective."), "custom prompt must lead the system content; got: {}", - &out[..out.len().min(200)], + out.chars().take(200).collect::(), ); assert!( !out.contains("personal photo memory assistant"), -- 2.49.1 From 0168a4b574d70fad1501ed507d9c9143ab047c48 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Fri, 15 May 2026 16:00:19 -0400 Subject: [PATCH 10/10] hls: remove legacy /video/stream + /video/{path} routes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The hash-keyed `/video/hls/{hash}/{file}` route fully covers HLS playback now and both clients (Apollo, FileViewer-React) have shipped updates that use it directly. Keeping the basename-keyed fallback only encouraged stale URLs to keep flowing — every legacy file was deleted by the startup migration, so the routes were guaranteed 404 machines. Dropped: - `stream_video` handler (`GET /video/stream?path=…`) — the original basename-keyed playlist serve. - `get_video_part` handler (`GET /video/{path}`) — bare-filename segment serve. The new layout's segments live in `//segment_NNN.ts` and reach the client via `stream_hls_file`. - `legacy_path` field on `GenerateVideoResponse` (serialised as `playlist`). The field always pointed at a file the migration had deleted; current clients ignore it entirely. - Their service registrations in `main.rs`. - The body-side `filename` extraction in `generate_video` (existed only to construct `legacy_path`) and the now-unused `global` opentelemetry import in `handlers/video.rs`. All 707 tests still pass. Same hand-rolled validators (`is_valid_hash` / `is_allowed_hls_filename`) keep the new route's defense-in-depth intact. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/handlers/video.rs | 124 ++---------------------------------------- src/main.rs | 2 - 2 files changed, 4 insertions(+), 122 deletions(-) diff --git a/src/handlers/video.rs b/src/handlers/video.rs index c535fac..1be529f 100644 --- a/src/handlers/video.rs +++ b/src/handlers/video.rs @@ -11,8 +11,8 @@ use actix_web::{ web::{self, Data}, }; use log::{debug, error, info, warn}; +use opentelemetry::KeyValue; use opentelemetry::trace::{Span, Status, Tracer}; -use opentelemetry::{KeyValue, global}; use serde::Serialize; use crate::content_hash; @@ -28,12 +28,9 @@ use crate::state::AppState; use crate::video::actors::{GeneratePreviewClipMessage, QueueVideosMessage, VideoToQueue}; use crate::video::hls_paths; -/// Response body for `POST /video/generate`. New clients should consume -/// `playlist_url` (hash-keyed, stable across libraries and renames) and -/// poll for readiness via the URL itself. Legacy clients reading the -/// raw `playlist` string will be served the legacy basename-keyed path -/// for as long as the field exists — that field will be dropped once -/// every shipped client has migrated. +/// Response body for `POST /video/generate`. Clients consume +/// `playlist_url` (hash-keyed, stable across libraries and renames) +/// and poll for readiness via the URL itself. #[derive(Serialize, Debug)] struct GenerateVideoResponse { /// Hash-keyed URL to the HLS playlist. Resolves to @@ -49,11 +46,6 @@ struct GenerateVideoResponse { /// transcode was queued; clients should retry the URL after a short /// delay (or rely on HLS.js's own retry policy). ready: bool, - /// Legacy basename-keyed playlist *path string*. Returned for older - /// clients that read the response body as a single string under the - /// pre-2026-05 wire format. New clients should ignore this field. - #[serde(rename = "playlist")] - legacy_path: String, } #[post("/video/generate")] @@ -68,18 +60,6 @@ pub async fn generate_video( let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("generate_video", &context); - let filename_pb = PathBuf::from(&body.path); - let Some(filename) = filename_pb - .file_name() - .and_then(|n| n.to_str()) - .map(str::to_string) - else { - let message = format!("Unable to get file name: {:?}", &body.path); - error!("{}", message); - span.set_status(Status::error(message)); - return HttpResponse::BadRequest().finish(); - }; - let preferred_library = libraries::resolve_library_param(&app_state, body.library.as_deref()) .ok() .flatten() @@ -201,14 +181,12 @@ pub async fn generate_video( content_hash_str, hls_paths::PLAYLIST_FILENAME ); - let legacy_path = format!("{}/{}.m3u8", app_state.video_path, filename); span.set_status(Status::Ok); HttpResponse::Ok().json(GenerateVideoResponse { playlist_url, content_hash: content_hash_str, ready, - legacy_path, }) } @@ -310,100 +288,6 @@ fn is_allowed_hls_filename(name: &str) -> bool { false } -#[get("/video/stream")] -pub async fn stream_video( - request: HttpRequest, - _: Claims, - path: web::Query, - app_state: Data, -) -> impl Responder { - let tracer = global::tracer("image-server"); - let context = extract_context_from_request(&request); - let mut span = tracer.start_with_context("stream_video", &context); - - let playlist = &path.path; - debug!("Playlist: {}", playlist); - - // Only serve files under video_path (HLS playlists) or base_path (source videos) - if playlist.starts_with(&app_state.video_path) - || is_valid_full_path(&app_state.base_path, playlist, false).is_some() - { - match NamedFile::open(playlist) { - Ok(file) => { - span.set_status(Status::Ok); - file.into_response(&request) - } - _ => { - span.set_status(Status::error(format!("playlist not found {}", playlist))); - HttpResponse::NotFound().finish() - } - } - } else { - span.set_status(Status::error(format!("playlist not valid {}", playlist))); - HttpResponse::BadRequest().finish() - } -} - -#[get("/video/{path}")] -pub async fn get_video_part( - request: HttpRequest, - _: Claims, - path: web::Path, - app_state: Data, -) -> impl Responder { - let tracer = global_tracer(); - let context = extract_context_from_request(&request); - let mut span = tracer.start_with_context("get_video_part", &context); - - let part = &path.path; - debug!("Video part: {}", part); - - let mut file_part = PathBuf::new(); - file_part.push(app_state.video_path.clone()); - file_part.push(part); - - // Guard against directory traversal attacks - let canonical_base = match std::fs::canonicalize(&app_state.video_path) { - Ok(path) => path, - Err(e) => { - error!("Failed to canonicalize video path: {:?}", e); - span.set_status(Status::error("Invalid video path configuration")); - return HttpResponse::InternalServerError().finish(); - } - }; - - let canonical_file = match std::fs::canonicalize(&file_part) { - Ok(path) => path, - Err(_) => { - warn!("Video part not found or invalid: {:?}", file_part); - span.set_status(Status::error(format!("Video part not found '{}'", part))); - return HttpResponse::NotFound().finish(); - } - }; - - // Ensure the resolved path is still within the video directory - if !canonical_file.starts_with(&canonical_base) { - warn!("Directory traversal attempt detected: {:?}", part); - span.set_status(Status::error("Invalid video path")); - return HttpResponse::Forbidden().finish(); - } - - match NamedFile::open(&canonical_file) { - Ok(file) => { - span.set_status(Status::Ok); - file.into_response(&request) - } - _ => { - error!("Video part not found: {:?}", file_part); - span.set_status(Status::error(format!( - "Video part not found '{}'", - file_part.to_str().unwrap() - ))); - HttpResponse::NotFound().finish() - } - } -} - #[get("/video/preview")] pub async fn get_video_preview( _claims: Claims, diff --git a/src/main.rs b/src/main.rs index 620235e..51583c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -284,11 +284,9 @@ fn main() -> std::io::Result<()> { .service(handlers::image::get_image) .service(handlers::image::upload_image) .service(handlers::video::generate_video) - .service(handlers::video::stream_video) .service(handlers::video::stream_hls_file) .service(handlers::video::get_video_preview) .service(handlers::video::get_preview_status) - .service(handlers::video::get_video_part) .service(hls_stats::hls_stats_handler) .service(handlers::favorites::favorites) .service(handlers::favorites::put_add_favorite) -- 2.49.1