diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index 3617261..2e2da33 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -1749,8 +1749,8 @@ Return ONLY the summary, nothing else."#, .iter() .enumerate() .map(|(i, c)| { - let trimmed = if c.len() > 1000 { - format!("{}…", &c[..1000]) + let trimmed = if c.chars().count() > 1000 { + format!("{}…", c.chars().take(1000).collect::()) } else { c.clone() }; @@ -3406,8 +3406,8 @@ Return ONLY the summary, nothing else."#, obj.iter() .map(|(k, v)| { let rendered = match v { - serde_json::Value::String(s) if s.len() > 40 => { - format!("\"{}...\"", &s[..40]) + serde_json::Value::String(s) if s.chars().count() > 40 => { + format!("\"{}...\"", s.chars().take(40).collect::()) } _ => v.to_string(), }; @@ -4088,10 +4088,11 @@ Return ONLY the summary, nothing else."#, let title = title_raw.trim().trim_matches('"').to_string(); log::info!("Agentic generated title: {}", title); + let summary_preview: String = final_content.chars().take(200).collect(); log::info!( "Agentic generated summary ({} chars): {}", final_content.len(), - &final_content[..final_content.len().min(200)] + summary_preview ); // 14. Serialize the full message history for training data @@ -4548,7 +4549,10 @@ mod tests { #[test] fn strip_mark_tags_handles_common_patterns() { - assert_eq!(InsightGenerator::strip_mark_tags("plain text"), "plain text"); + assert_eq!( + InsightGenerator::strip_mark_tags("plain text"), + "plain text" + ); assert_eq!( InsightGenerator::strip_mark_tags("…the lake…"), "…the lake…" @@ -4668,7 +4672,7 @@ mod tests { assert!( out.starts_with("You are a journal writer in first person, warm and reflective."), "custom prompt must lead the system content; got: {}", - &out[..out.len().min(200)], + out.chars().take(200).collect::(), ); assert!( !out.contains("personal photo memory assistant"), diff --git a/src/content_hash.rs b/src/content_hash.rs index 5d334ff..a2a9e9e 100644 --- a/src/content_hash.rs +++ b/src/content_hash.rs @@ -52,12 +52,9 @@ pub fn thumbnail_path(thumbs_dir: &Path, hash: &str) -> PathBuf { /// Hash-keyed HLS output directory: `///`. /// The playlist lives at `playlist.m3u8` inside this directory and its -/// segments are co-located so HLS relative references Just Work. -/// -/// Allow-dead until Branch B/C rewires the HLS pipeline to use it; the -/// helper lives here today so Branch A's path layout decisions stay -/// adjacent to thumbnail/legacy ones. -#[allow(dead_code)] +/// segments are co-located so HLS relative references Just Work. See +/// [`crate::video::hls_paths`] for the filename constants and the +/// per-file helpers built on this dir. pub fn hls_dir(video_dir: &Path, hash: &str) -> PathBuf { let shard = shard_prefix(hash); video_dir.join(shard).join(hash) diff --git a/src/database/knowledge_dao.rs b/src/database/knowledge_dao.rs index 069dd38..06b2b2d 100644 --- a/src/database/knowledge_dao.rs +++ b/src/database/knowledge_dao.rs @@ -235,6 +235,7 @@ pub trait KnowledgeDao: Sync + Send { /// - entity_type: optional, restricts nodes to one type /// - node_limit: caps the number of nodes; lower-fact-count /// entities drop first + /// /// Edges between dropped entities are pruned. Persona scoping /// affects fact_count + edge inclusion (rejected / superseded /// excluded; All vs Single mirrors the existing pattern). @@ -937,7 +938,10 @@ impl KnowledgeDao for SqliteKnowledgeDao { let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let mut q = sql_query(sql).into_boxed(); match persona { - PersonaFilter::Single { user_id, persona_id } => { + PersonaFilter::Single { + user_id, + persona_id, + } => { q = q .bind::(*user_id) .bind::(persona_id.clone()); @@ -977,7 +981,10 @@ impl KnowledgeDao for SqliteKnowledgeDao { // rows flip — REVIEWED survives so the curator can preserve // a hand-approved exception under the same predicate. let touched = match persona { - PersonaFilter::Single { user_id: uid, persona_id: pid } => diesel::update( + PersonaFilter::Single { + user_id: uid, + persona_id: pid, + } => diesel::update( entity_facts .filter(predicate.eq(target_predicate)) .filter(user_id.eq(*uid)) @@ -1282,8 +1289,7 @@ impl KnowledgeDao for SqliteKnowledgeDao { Some(v) => v, None => continue, }; - for b in (a + 1)..indices.len() { - let ib = indices[b]; + for &ib in &indices[a + 1..] { let vb = match &decoded[ib] { Some(v) => v, None => continue, diff --git a/src/database/mod.rs b/src/database/mod.rs index cf20ee9..4a20702 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -414,6 +414,27 @@ pub trait ExifDao: Sync + Send { size_bytes: i64, ) -> Result<(), DbError>; + /// Every distinct non-NULL `content_hash` across all libraries. Used + /// by HLS orphan cleanup to identify hash dirs under `$VIDEO_PATH` + /// whose source video no longer exists. Cheap query (single column, + /// indexed) but unbounded in size — the result is a HashSet membership + /// check, so a 100k-photo library produces ~100k strings. + fn list_distinct_content_hashes( + &mut self, + context: &opentelemetry::Context, + ) -> Result, DbError>; + + /// Every row in `image_exif` for `library_id`, as + /// `(rel_path, content_hash)`. The hash is Option because rows + /// mid-backfill carry NULL. Used by HLS readiness stats; callers + /// filter by extension client-side because the DB schema doesn't + /// carry media type. + fn list_paths_and_hashes_for_library( + &mut self, + context: &opentelemetry::Context, + library_id: i32, + ) -> Result)>, DbError>; + /// Return image_exif rows that need their `date_taken` resolved by the /// canonical-date waterfall (see `crate::date_resolver`): `date_taken /// IS NULL`. Returns `(library_id, rel_path)`. The caller filters to @@ -481,9 +502,9 @@ pub trait ExifDao: Sync + Send { /// whose calendar position matches the request's span: /// - `"day"` — same month + day-of-month (any year) /// - `"week"` — same week-of-year (SQLite `%W`, Monday-anchored — - /// close to but not exactly ISO week 8601; the - /// boundary cases at year-start/end can shift by ±1 - /// vs the prior request-time `iso_week()` filter) + /// close to but not exactly ISO week 8601; the boundary cases + /// at year-start/end can shift by ±1 vs the prior request-time + /// `iso_week()` filter) /// - `"month"` — same month (any year) /// /// `tz_offset_minutes` is applied to both sides of the strftime @@ -1231,6 +1252,50 @@ impl ExifDao for SqliteExifDao { .map_err(|_| DbError::new(DbErrorKind::UpdateError)) } + fn list_distinct_content_hashes( + &mut self, + context: &opentelemetry::Context, + ) -> Result, DbError> { + trace_db_call(context, "query", "list_distinct_content_hashes", |_span| { + use schema::image_exif::dsl::*; + + let mut connection = self.connection.lock().expect("Unable to get ExifDao"); + + image_exif + .filter(content_hash.is_not_null()) + .select(content_hash) + .distinct() + .load::>(connection.deref_mut()) + .map(|rows| rows.into_iter().flatten().collect()) + .map_err(|_| anyhow::anyhow!("Query error")) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn list_paths_and_hashes_for_library( + &mut self, + context: &opentelemetry::Context, + lib_id: i32, + ) -> Result)>, DbError> { + trace_db_call( + context, + "query", + "list_paths_and_hashes_for_library", + |_span| { + use schema::image_exif::dsl::*; + + let mut connection = self.connection.lock().expect("Unable to get ExifDao"); + + image_exif + .filter(library_id.eq(lib_id)) + .select((rel_path, content_hash)) + .load::<(String, Option)>(connection.deref_mut()) + .map_err(|_| anyhow::anyhow!("Query error")) + }, + ) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + fn get_rows_needing_date_backfill( &mut self, context: &opentelemetry::Context, diff --git a/src/database/reconcile.rs b/src/database/reconcile.rs index 57f69f3..2fade4e 100644 --- a/src/database/reconcile.rs +++ b/src/database/reconcile.rs @@ -57,30 +57,28 @@ impl ReconcileStats { /// watcher tick. Errors are logged but never propagated; reconciliation /// is best-effort and a transient DB hiccup must not stall the watcher. pub fn run(conn: &mut SqliteConnection) -> ReconcileStats { - let mut stats = ReconcileStats::default(); - - stats.tagged_photo_hashes_filled = match backfill_tagged_photo_hashes(conn) { - Ok(n) => n, - Err(e) => { - warn!("reconcile: tagged_photo hash backfill failed: {:?}", e); - 0 - } - }; - - stats.photo_insights_hashes_filled = match backfill_photo_insights_hashes(conn) { - Ok(n) => n, - Err(e) => { - warn!("reconcile: photo_insights hash backfill failed: {:?}", e); - 0 - } - }; - - stats.photo_insights_demoted = match collapse_insight_currents(conn) { - Ok(n) => n, - Err(e) => { - warn!("reconcile: photo_insights scalar merge failed: {:?}", e); - 0 - } + let stats = ReconcileStats { + tagged_photo_hashes_filled: match backfill_tagged_photo_hashes(conn) { + Ok(n) => n, + Err(e) => { + warn!("reconcile: tagged_photo hash backfill failed: {:?}", e); + 0 + } + }, + photo_insights_hashes_filled: match backfill_photo_insights_hashes(conn) { + Ok(n) => n, + Err(e) => { + warn!("reconcile: photo_insights hash backfill failed: {:?}", e); + 0 + } + }, + photo_insights_demoted: match collapse_insight_currents(conn) { + Ok(n) => n, + Err(e) => { + warn!("reconcile: photo_insights scalar merge failed: {:?}", e); + 0 + } + }, }; if stats.changed() { diff --git a/src/faces.rs b/src/faces.rs index f4bd5cc..ba47508 100644 --- a/src/faces.rs +++ b/src/faces.rs @@ -2118,7 +2118,10 @@ async fn update_face_handler( // the short context string we surface in the response body — // SQLITE_BUSY here usually means another DAO's writer held the // lock past `busy_timeout` (5s), which is invisible in `{}`. - warn!("PATCH /image/faces/{}: 500 — update_face failed: {:#}", id, e); + warn!( + "PATCH /image/faces/{}: 500 — update_face failed: {:#}", + id, e + ); return HttpResponse::InternalServerError().body(e.to_string()); } }; diff --git a/src/files.rs b/src/files.rs index b7b035c..91904e3 100644 --- a/src/files.rs +++ b/src/files.rs @@ -1689,6 +1689,21 @@ mod tests { Ok(()) } + fn list_distinct_content_hashes( + &mut self, + _context: &opentelemetry::Context, + ) -> Result, DbError> { + Ok(Vec::new()) + } + + fn list_paths_and_hashes_for_library( + &mut self, + _context: &opentelemetry::Context, + _library_id: i32, + ) -> Result)>, DbError> { + Ok(Vec::new()) + } + fn get_rows_needing_date_backfill( &mut self, _context: &opentelemetry::Context, diff --git a/src/handlers/image.rs b/src/handlers/image.rs index 7266e34..07e977d 100644 --- a/src/handlers/image.rs +++ b/src/handlers/image.rs @@ -183,14 +183,15 @@ pub async fn get_image( // review JPEG, ~1–2 MP). Falls through to NamedFile if no preview is // available, which preserves the historical behavior for callers // that genuinely want the original bytes. - if image_size == PhotoSize::Full && exif::is_tiff_raw(&path) { - if let Some(preview) = exif::extract_embedded_jpeg_preview(&path) { - span.set_status(Status::Ok); - return HttpResponse::Ok() - .content_type("image/jpeg") - .insert_header(("Cache-Control", "public, max-age=3600")) - .body(preview); - } + if image_size == PhotoSize::Full + && exif::is_tiff_raw(&path) + && let Some(preview) = exif::extract_embedded_jpeg_preview(&path) + { + span.set_status(Status::Ok); + return HttpResponse::Ok() + .content_type("image/jpeg") + .insert_header(("Cache-Control", "public, max-age=3600")) + .body(preview); } if let Ok(file) = NamedFile::open(&path) { @@ -706,7 +707,7 @@ pub async fn set_image_date( Ok(row) => { span.set_status(Status::Ok); HttpResponse::Ok().json(build_metadata_response_for_date_mutation( - &library, + library, &normalized_path, row, )) @@ -757,7 +758,7 @@ pub async fn clear_image_date( Ok(row) => { span.set_status(Status::Ok); HttpResponse::Ok().json(build_metadata_response_for_date_mutation( - &library, + library, &normalized_path, row, )) diff --git a/src/handlers/video.rs b/src/handlers/video.rs index d00346c..1be529f 100644 --- a/src/handlers/video.rs +++ b/src/handlers/video.rs @@ -11,190 +11,283 @@ use actix_web::{ web::{self, Data}, }; use log::{debug, error, info, warn}; +use opentelemetry::KeyValue; use opentelemetry::trace::{Span, Status, Tracer}; -use opentelemetry::{KeyValue, global}; +use serde::Serialize; +use crate::content_hash; use crate::data::{ Claims, PreviewClipRequest, PreviewStatusItem, PreviewStatusRequest, PreviewStatusResponse, ThumbnailRequest, }; -use crate::database::PreviewDao; +use crate::database::{ExifDao, PreviewDao}; use crate::files::is_valid_full_path; use crate::libraries; use crate::otel::{extract_context_from_request, global_tracer}; use crate::state::AppState; -use crate::video::actors::{GeneratePreviewClipMessage, ProcessMessage, create_playlist}; +use crate::video::actors::{GeneratePreviewClipMessage, QueueVideosMessage, VideoToQueue}; +use crate::video::hls_paths; + +/// Response body for `POST /video/generate`. Clients consume +/// `playlist_url` (hash-keyed, stable across libraries and renames) +/// and poll for readiness via the URL itself. +#[derive(Serialize, Debug)] +struct GenerateVideoResponse { + /// Hash-keyed URL to the HLS playlist. Resolves to + /// `$VIDEO_PATH///playlist.m3u8` server-side. Relative + /// segment refs inside the playlist resolve correctly because the + /// browser appends to this URL's path. + playlist_url: String, + /// blake3 content hash of the source video. Stable per byte content, + /// so duplicate uploads / archive ingests share one set of HLS + /// output. + content_hash: String, + /// `true` iff the playlist file is already on disk. `false` means a + /// transcode was queued; clients should retry the URL after a short + /// delay (or rely on HLS.js's own retry policy). + ready: bool, +} #[post("/video/generate")] pub async fn generate_video( _claims: Claims, request: HttpRequest, app_state: Data, + exif_dao: Data>>, body: web::Json, ) -> impl Responder { let tracer = global_tracer(); - let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("generate_video", &context); - let filename = PathBuf::from(&body.path); + let preferred_library = libraries::resolve_library_param(&app_state, body.library.as_deref()) + .ok() + .flatten() + .unwrap_or_else(|| app_state.primary_library()); - if let Some(name) = filename.file_name() { - let filename = name.to_str().expect("Filename should convert to string"); - // KNOWN ISSUE (multi-library): playlist filename is the basename - // alone, so two source files with the same basename — whether in - // different libraries or different subdirs of one library — - // overwrite each other's playlists while ffmpeg runs. The - // hash-keyed `content_hash::hls_dir` is the long-term answer - // (see CLAUDE.md "Multi-library data model"); rewiring the - // actor pipeline to use it is out of scope for this branch. - // The orphan-cleanup job above already walks every library so - // it doesn't false-delete archive playlists. - let playlist = format!("{}/{}.m3u8", app_state.video_path, filename); + // Try the resolved library first, then fall back to any other library + // that actually contains the file — handles union-mode requests where + // the mobile client passes no library but the file lives in a + // non-primary library. Track which library won so the DB lookup is + // scoped correctly. + let resolved = is_valid_full_path(&preferred_library.root_path, &body.path, false) + .filter(|p| p.exists()) + .map(|p| (preferred_library.id, preferred_library.root_path.clone(), p)) + .or_else(|| { + app_state.libraries.iter().find_map(|lib| { + if lib.id == preferred_library.id { + return None; + } + is_valid_full_path(&lib.root_path, &body.path, false) + .filter(|p| p.exists()) + .map(|p| (lib.id, lib.root_path.clone(), p)) + }) + }); - let library = libraries::resolve_library_param(&app_state, body.library.as_deref()) - .ok() - .flatten() - .unwrap_or_else(|| app_state.primary_library()); + let Some((resolved_library_id, resolved_root, full_path)) = resolved else { + span.set_status(Status::error(format!("invalid path {:?}", &body.path))); + return HttpResponse::BadRequest().finish(); + }; - // Try the resolved library first, then fall back to any other library - // that actually contains the file — handles union-mode requests where - // the mobile client passes no library but the file lives in a - // non-primary library. - let resolved = is_valid_full_path(&library.root_path, &body.path, false) - .filter(|p| p.exists()) - .or_else(|| { - app_state.libraries.iter().find_map(|lib| { - if lib.id == library.id { - return None; - } - is_valid_full_path(&lib.root_path, &body.path, false).filter(|p| p.exists()) - }) - }); + // Build the rel_path used to look up the row. + let full_path_str = full_path.to_string_lossy().to_string(); + let rel_path = full_path_str + .strip_prefix(&resolved_root) + .unwrap_or(full_path_str.as_str()) + .trim_start_matches(['/', '\\']) + .to_string(); - if let Some(path) = resolved { - if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await { - span.add_event( - "playlist_created".to_string(), - vec![KeyValue::new("playlist-name", filename.to_string())], + // DB lookup first. Cheap and avoids re-reading the file off disk for + // already-ingested videos. + let hash_from_db: Option = { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + match dao.get_exif_batch( + &context, + Some(resolved_library_id), + std::slice::from_ref(&rel_path), + ) { + Ok(rows) => rows.into_iter().next().and_then(|r| r.content_hash), + Err(e) => { + warn!( + "exif_dao.get_exif_batch failed for {} (lib {}): {:?}", + rel_path, resolved_library_id, e ); - - span.set_status(Status::Ok); - app_state.stream_manager.do_send(ProcessMessage( - playlist.clone(), - child, - // opentelemetry::Context::new().with_span(span), - )); - } - } else { - span.set_status(Status::error(format!("invalid path {:?}", &body.path))); - return HttpResponse::BadRequest().finish(); - } - - HttpResponse::Ok().json(playlist) - } else { - let message = format!("Unable to get file name: {:?}", filename); - error!("{}", message); - span.set_status(Status::error(message)); - - HttpResponse::BadRequest().finish() - } -} - -#[get("/video/stream")] -pub async fn stream_video( - request: HttpRequest, - _: Claims, - path: web::Query, - app_state: Data, -) -> impl Responder { - let tracer = global::tracer("image-server"); - let context = extract_context_from_request(&request); - let mut span = tracer.start_with_context("stream_video", &context); - - let playlist = &path.path; - debug!("Playlist: {}", playlist); - - // Only serve files under video_path (HLS playlists) or base_path (source videos) - if playlist.starts_with(&app_state.video_path) - || is_valid_full_path(&app_state.base_path, playlist, false).is_some() - { - match NamedFile::open(playlist) { - Ok(file) => { - span.set_status(Status::Ok); - file.into_response(&request) - } - _ => { - span.set_status(Status::error(format!("playlist not found {}", playlist))); - HttpResponse::NotFound().finish() + None } } + }; + + // Best-effort fallback: compute on-the-fly when the DB row hasn't + // been written or is mid-backfill. Read-only — no library mutation. + let content_hash_str = match hash_from_db { + Some(h) => h, + None => match content_hash::compute(&full_path) { + Ok(id) => id.content_hash, + Err(e) => { + error!( + "Failed to compute content_hash for {}: {}", + full_path.display(), + e + ); + span.set_status(Status::error(format!("hash compute failed: {}", e))); + return HttpResponse::InternalServerError().finish(); + } + }, + }; + + let video_dir = std::path::Path::new(&app_state.video_path); + let playlist_path = hls_paths::playlist_for_hash(video_dir, &content_hash_str); + let sentinel_path = hls_paths::sentinel_for_hash(video_dir, &content_hash_str); + let ready = playlist_path.exists(); + + if !ready && !sentinel_path.exists() { + // Kick off generation via the existing actor pipeline. Fire-and- + // forget — the playlist appears at `playlist_path` once ffmpeg + // + rename complete. The client polls the URL. + info!( + "/video/generate: queueing playlist for {} (hash={})", + full_path.display(), + &content_hash_str[..content_hash_str.len().min(16)] + ); + app_state.playlist_manager.do_send(QueueVideosMessage { + videos: vec![VideoToQueue { + video_path: full_path.clone(), + content_hash: content_hash_str.clone(), + }], + }); + span.add_event( + "playlist_queued", + vec![KeyValue::new("content_hash", content_hash_str.clone())], + ); + } else if ready { + span.add_event( + "playlist_already_present", + vec![KeyValue::new("content_hash", content_hash_str.clone())], + ); } else { - span.set_status(Status::error(format!("playlist not valid {}", playlist))); - HttpResponse::BadRequest().finish() + // Sentinel present — past transcode attempt failed. Return the + // URL anyway (it'll 404 / 5xx at fetch time) so the client gets + // a deterministic answer. Operator must delete the sentinel to + // force a retry. + warn!( + "/video/generate: unsupported sentinel present for {} (hash={}); not re-queueing", + full_path.display(), + &content_hash_str[..content_hash_str.len().min(16)] + ); } + + let playlist_url = format!( + "/video/hls/{}/{}", + content_hash_str, + hls_paths::PLAYLIST_FILENAME + ); + + span.set_status(Status::Ok); + HttpResponse::Ok().json(GenerateVideoResponse { + playlist_url, + content_hash: content_hash_str, + ready, + }) } -#[get("/video/{path}")] -pub async fn get_video_part( +/// Serve HLS playlist or segment files under the hash-keyed layout +/// `$VIDEO_PATH///`. The matched `{file}` must be +/// either `playlist.m3u8` or a `segment_NNN.ts` style segment; any other +/// shape is 400'd to defend against operators stashing other content in +/// the hash dir. +#[get("/video/hls/{hash}/{file}")] +pub async fn stream_hls_file( request: HttpRequest, _: Claims, - path: web::Path, + path: web::Path<(String, String)>, app_state: Data, ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); - let mut span = tracer.start_with_context("get_video_part", &context); + let mut span = tracer.start_with_context("stream_hls_file", &context); - let part = &path.path; - debug!("Video part: {}", part); + let (hash, file) = path.into_inner(); + if !is_valid_hash(&hash) { + span.set_status(Status::error("invalid hash")); + return HttpResponse::BadRequest().body("invalid hash"); + } + if !is_allowed_hls_filename(&file) { + span.set_status(Status::error("invalid file")); + return HttpResponse::BadRequest().body("invalid file"); + } - let mut file_part = PathBuf::new(); - file_part.push(app_state.video_path.clone()); - file_part.push(part); + let shard = &hash[..2]; + let file_path = PathBuf::from(&app_state.video_path) + .join(shard) + .join(&hash) + .join(&file); - // Guard against directory traversal attacks + // Path-traversal guard: canonicalize both sides and require the file + // to live under `app_state.video_path`. `is_valid_hash` / + // `is_allowed_hls_filename` already block dangerous strings, but + // belt-and-suspenders here is cheap. let canonical_base = match std::fs::canonicalize(&app_state.video_path) { - Ok(path) => path, + Ok(p) => p, Err(e) => { - error!("Failed to canonicalize video path: {:?}", e); - span.set_status(Status::error("Invalid video path configuration")); + error!("Failed to canonicalize VIDEO_PATH: {:?}", e); + span.set_status(Status::error("VIDEO_PATH not canonicalisable")); return HttpResponse::InternalServerError().finish(); } }; - - let canonical_file = match std::fs::canonicalize(&file_part) { - Ok(path) => path, + let canonical_file = match std::fs::canonicalize(&file_path) { + Ok(p) => p, Err(_) => { - warn!("Video part not found or invalid: {:?}", file_part); - span.set_status(Status::error(format!("Video part not found '{}'", part))); + debug!("HLS file not found: {}", file_path.display()); + span.set_status(Status::error("not found")); return HttpResponse::NotFound().finish(); } }; - - // Ensure the resolved path is still within the video directory if !canonical_file.starts_with(&canonical_base) { - warn!("Directory traversal attempt detected: {:?}", part); - span.set_status(Status::error("Invalid video path")); + warn!( + "Path traversal attempt: {} resolved outside VIDEO_PATH", + file_path.display() + ); + span.set_status(Status::error("traversal")); return HttpResponse::Forbidden().finish(); } match NamedFile::open(&canonical_file) { - Ok(file) => { + Ok(f) => { span.set_status(Status::Ok); - file.into_response(&request) + f.into_response(&request) } - _ => { - error!("Video part not found: {:?}", file_part); - span.set_status(Status::error(format!( - "Video part not found '{}'", - file_part.to_str().unwrap() - ))); + Err(_) => { + span.set_status(Status::error("not found")); HttpResponse::NotFound().finish() } } } +/// 64 lowercase-or-upper hex chars. Strict so we don't accept arbitrary +/// strings that might canonicalize into trouble. +fn is_valid_hash(s: &str) -> bool { + s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit()) +} + +/// Allowed file names inside a hash dir. `playlist.m3u8` plus segment +/// files matching the `segment_NNN.ts` template that `PlaylistGenerator` +/// writes via `hls_paths::SEGMENT_TEMPLATE`. Anything else (including +/// `.tmp`, `.unsupported`, dotfiles) returns 400 — these are internal +/// artifacts the client should never request. +fn is_allowed_hls_filename(name: &str) -> bool { + if name == hls_paths::PLAYLIST_FILENAME { + return true; + } + if let Some(rest) = name.strip_prefix("segment_") + && let Some(num) = rest.strip_suffix(".ts") + && !num.is_empty() + && num.bytes().all(|b| b.is_ascii_digit()) + { + return true; + } + false +} + #[get("/video/preview")] pub async fn get_video_preview( _claims: Claims, @@ -427,6 +520,41 @@ mod tests { use crate::testhelpers::TestPreviewDao; use actix_web::App; + #[test] + fn is_valid_hash_requires_64_ascii_hex() { + assert!(is_valid_hash(&"a".repeat(64))); + assert!(is_valid_hash(&"F".repeat(64))); + assert!(is_valid_hash(&format!("ab{}", "0".repeat(62)))); + + assert!(!is_valid_hash(&"a".repeat(63))); + assert!(!is_valid_hash(&"a".repeat(65))); + // Anything outside the hex alphabet — including '/', '.', '..' — + // is rejected up front so the path-traversal canonicalisation + // never has to defend the boundary alone. + assert!(!is_valid_hash(&format!("/{}", "a".repeat(63)))); + assert!(!is_valid_hash(&format!("..{}", "a".repeat(62)))); + assert!(!is_valid_hash(&"g".repeat(64))); + } + + #[test] + fn is_allowed_hls_filename_accepts_only_playlist_and_segments() { + assert!(is_allowed_hls_filename("playlist.m3u8")); + assert!(is_allowed_hls_filename("segment_000.ts")); + assert!(is_allowed_hls_filename("segment_999.ts")); + assert!(is_allowed_hls_filename("segment_0.ts")); + + // Internal artifacts the client should never request. + assert!(!is_allowed_hls_filename("playlist.m3u8.tmp")); + assert!(!is_allowed_hls_filename("playlist.unsupported")); + // Traversal / path components — defence in depth alongside + // the actix path matcher itself. + assert!(!is_allowed_hls_filename("..")); + assert!(!is_allowed_hls_filename("../etc/passwd")); + assert!(!is_allowed_hls_filename("segment_abc.ts")); + assert!(!is_allowed_hls_filename("segment_.ts")); + assert!(!is_allowed_hls_filename("")); + } + fn make_token() -> String { let claims = Claims::valid_user("1".to_string()); jsonwebtoken::encode( diff --git a/src/hls_stats.rs b/src/hls_stats.rs new file mode 100644 index 0000000..c587aaf --- /dev/null +++ b/src/hls_stats.rs @@ -0,0 +1,409 @@ +//! Per-library HLS readiness: Prometheus gauges + `/hls/stats` endpoint. +//! +//! The new hash-keyed pipeline transcodes lazily — most of a freshly +//! mounted library is "pending" for the first hour, and operators want +//! a live read on "how much work is left, am I CPU-bound, do I need to +//! bump `HLS_CONCURRENCY`." This module supplies both surfaces against +//! the same compute path: +//! +//! - **Prometheus gauges** `imageserver_hls_videos_total{library}`, +//! `..._with_playlist{library}`, `..._pending{library}`, +//! `..._unsupported{library}`. Updated every watcher full-scan tick +//! and on every `/hls/stats` request, so the freshness matches +//! whichever surface the operator is watching. +//! +//! - **`GET /hls/stats`** returns a JSON snapshot of the same counts +//! plus a top-level cross-library aggregate. Claims-protected +//! (matches every other authenticated read in this crate). +//! +//! Cost is O(distinct video hashes per library), each row needing a +//! single `stat()` on the playlist file. On a 100k-video library that's +//! noticeable; on a typical home library (few thousand) it's noise. +//! We call from explicit triggers only — never per-request from +//! middleware — so the cost is bounded. + +use std::collections::HashSet; +use std::path::Path; +use std::sync::{Arc, Mutex}; + +use actix_web::{HttpResponse, Responder, get, web}; +use lazy_static::lazy_static; +use log::{info, warn}; +use prometheus::IntGaugeVec; +use serde::Serialize; + +use crate::data::Claims; +use crate::database::ExifDao; +use crate::file_types; +use crate::libraries::Library; +use crate::state::AppState; +use crate::video::hls_paths; + +lazy_static! { + pub static ref HLS_VIDEOS_TOTAL: IntGaugeVec = IntGaugeVec::new( + prometheus::Opts::new( + "imageserver_hls_videos_total", + "Distinct video content hashes per library known to image_exif", + ), + &["library"], + ) + .expect("HLS_VIDEOS_TOTAL"); + pub static ref HLS_VIDEOS_WITH_PLAYLIST: IntGaugeVec = IntGaugeVec::new( + prometheus::Opts::new( + "imageserver_hls_videos_with_playlist", + "Videos whose hash-keyed HLS playlist is already on disk", + ), + &["library"], + ) + .expect("HLS_VIDEOS_WITH_PLAYLIST"); + pub static ref HLS_VIDEOS_PENDING: IntGaugeVec = IntGaugeVec::new( + prometheus::Opts::new( + "imageserver_hls_videos_pending", + "Videos whose hash-keyed HLS playlist is not yet on disk", + ), + &["library"], + ) + .expect("HLS_VIDEOS_PENDING"); + pub static ref HLS_VIDEOS_UNSUPPORTED: IntGaugeVec = IntGaugeVec::new( + prometheus::Opts::new( + "imageserver_hls_videos_unsupported", + "Videos with an `.unsupported` sentinel — ffmpeg refused; \ + operator must delete to retry", + ), + &["library"], + ) + .expect("HLS_VIDEOS_UNSUPPORTED"); +} + +/// Per-library HLS readiness snapshot. +#[derive(Serialize, Debug, Clone, PartialEq, Eq)] +pub struct HlsLibraryStats { + pub library_id: i32, + pub library: String, + /// Distinct video content hashes (dedupes intra-library bytes-at-N-paths). + pub total: usize, + /// Of `total`, hashes whose `playlist.m3u8` is on disk. + pub with_playlist: usize, + /// Of `total`, hashes whose ffmpeg attempt left a `.unsupported` + /// sentinel. Counted separately because they won't progress without + /// operator intervention (delete the sentinel to retry). + pub unsupported: usize, + /// `total - (with_playlist + unsupported)` — videos awaiting transcode. + pub pending: usize, + /// Distinct rel_paths under this library that are video files but + /// whose `image_exif.content_hash` is still NULL (mid-backfill). + /// These don't yet count toward `total` because they're invisible + /// to the hash-keyed pipeline; surfaced so the operator can see + /// "hash backfill, then transcode" pipeline depth. + pub hashless_videos: usize, +} + +/// JSON response body for `GET /hls/stats`. +#[derive(Serialize, Debug)] +pub struct HlsStatsResponse { + pub libraries: Vec, + pub total: usize, + pub with_playlist: usize, + pub pending: usize, + pub unsupported: usize, + pub hashless_videos: usize, +} + +/// Compute current readiness per library and publish to Prometheus. +/// Returns the same data so callers can serialise it. The publish step +/// is idempotent on the gauge — old values get overwritten. +pub fn compute_and_publish( + libraries: &[Library], + exif_dao: &Arc>>, + video_dir: &Path, +) -> Vec { + let ctx = opentelemetry::Context::new(); + let mut out = Vec::with_capacity(libraries.len()); + for lib in libraries { + let stats = compute_for_library(&ctx, lib, exif_dao, video_dir); + publish_gauges(&stats); + out.push(stats); + } + out +} + +fn publish_gauges(s: &HlsLibraryStats) { + HLS_VIDEOS_TOTAL + .with_label_values(&[s.library.as_str()]) + .set(s.total as i64); + HLS_VIDEOS_WITH_PLAYLIST + .with_label_values(&[s.library.as_str()]) + .set(s.with_playlist as i64); + HLS_VIDEOS_PENDING + .with_label_values(&[s.library.as_str()]) + .set(s.pending as i64); + HLS_VIDEOS_UNSUPPORTED + .with_label_values(&[s.library.as_str()]) + .set(s.unsupported as i64); +} + +fn compute_for_library( + ctx: &opentelemetry::Context, + lib: &Library, + exif_dao: &Arc>>, + video_dir: &Path, +) -> HlsLibraryStats { + let rows = { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + match dao.list_paths_and_hashes_for_library(ctx, lib.id) { + Ok(r) => r, + Err(e) => { + warn!( + "hls_stats: list_paths_and_hashes_for_library failed for lib {}: {:?}", + lib.id, e + ); + Vec::new() + } + } + }; + stats_from_rows(lib, &rows, video_dir) +} + +/// Pure function — same compute as [`compute_for_library`] but works +/// on caller-supplied rows. Split out so tests don't need a full +/// `ExifDao` mock; the integration path is exercised through +/// `compute_and_publish` against the real SQLite DAO at runtime. +fn stats_from_rows( + lib: &Library, + rows: &[(String, Option)], + video_dir: &Path, +) -> HlsLibraryStats { + let mut hashes: HashSet = HashSet::new(); + let mut hashless_videos = 0usize; + for (rel_path, hash_opt) in rows { + if !file_types::is_video_file(Path::new(rel_path)) { + continue; + } + match hash_opt { + Some(h) => { + hashes.insert(h.clone()); + } + None => { + hashless_videos += 1; + } + } + } + + let mut with_playlist = 0usize; + let mut unsupported = 0usize; + for h in &hashes { + if hls_paths::playlist_for_hash(video_dir, h).exists() { + with_playlist += 1; + } else if hls_paths::sentinel_for_hash(video_dir, h).exists() { + unsupported += 1; + } + } + let total = hashes.len(); + let pending = total.saturating_sub(with_playlist + unsupported); + + HlsLibraryStats { + library_id: lib.id, + library: lib.name.clone(), + total, + with_playlist, + unsupported, + pending, + hashless_videos, + } +} + +/// Log a single info line summarising readiness across all libraries. +/// Called by the watcher at the end of a full-scan tick so operators +/// who tail the log see the headline number without scraping +/// Prometheus. +pub fn log_summary(stats: &[HlsLibraryStats]) { + let total: usize = stats.iter().map(|s| s.total).sum(); + let with_playlist: usize = stats.iter().map(|s| s.with_playlist).sum(); + let pending: usize = stats.iter().map(|s| s.pending).sum(); + let unsupported: usize = stats.iter().map(|s| s.unsupported).sum(); + let hashless: usize = stats.iter().map(|s| s.hashless_videos).sum(); + + let per_lib: Vec = stats + .iter() + .map(|s| { + format!( + "{}={}/{} pending={} unsupported={} hashless={}", + s.library, s.with_playlist, s.total, s.pending, s.unsupported, s.hashless_videos, + ) + }) + .collect(); + + info!( + "HLS readiness: {}/{} playlists on disk, {} pending, {} unsupported, {} hashless videos | per-library: [{}]", + with_playlist, + total, + pending, + unsupported, + hashless, + per_lib.join(", "), + ); +} + +#[get("/hls/stats")] +pub async fn hls_stats_handler( + _claims: Claims, + app_state: web::Data, + exif_dao: web::Data>>, +) -> impl Responder { + let libraries = app_state.libraries.clone(); + let video_dir = std::path::PathBuf::from(&app_state.video_path); + let exif_dao = exif_dao.into_inner(); + + // Synchronous file IO + DB query — run on a blocking pool so the + // actix worker thread stays free for other requests. + let stats = + match web::block(move || compute_and_publish(&libraries, &exif_dao, &video_dir)).await { + Ok(s) => s, + Err(e) => { + warn!("/hls/stats: blocking task failed: {:?}", e); + Vec::new() + } + }; + + let total: usize = stats.iter().map(|s| s.total).sum(); + let with_playlist: usize = stats.iter().map(|s| s.with_playlist).sum(); + let pending: usize = stats.iter().map(|s| s.pending).sum(); + let unsupported: usize = stats.iter().map(|s| s.unsupported).sum(); + let hashless_videos: usize = stats.iter().map(|s| s.hashless_videos).sum(); + + HttpResponse::Ok().json(HlsStatsResponse { + libraries: stats, + total, + with_playlist, + pending, + unsupported, + hashless_videos, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + fn lib(id: i32, name: &str) -> Library { + Library { + id, + name: name.into(), + root_path: String::new(), + enabled: true, + excluded_dirs: Vec::new(), + } + } + + fn rows(vs: Vec<(&str, Option<&str>)>) -> Vec<(String, Option)> { + vs.into_iter() + .map(|(p, h)| (p.to_string(), h.map(|s| s.to_string()))) + .collect() + } + + fn touch(dir: &Path, rel: &str) { + let p = dir.join(rel); + std::fs::create_dir_all(p.parent().unwrap()).unwrap(); + std::fs::write(p, b"").unwrap(); + } + + #[test] + fn videos_only_count_in_total() { + let tmp = tempdir().unwrap(); + let r = rows(vec![ + ("photos/IMG.jpg", Some(&"a".repeat(64))), // image: ignored + ("clip.mp4", Some(&"b".repeat(64))), + ("vid.mov", Some(&"c".repeat(64))), + ]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 2); + assert_eq!(stats.with_playlist, 0); + assert_eq!(stats.pending, 2); + assert_eq!(stats.unsupported, 0); + assert_eq!(stats.hashless_videos, 0); + } + + #[test] + fn hash_dedup_collapses_duplicate_rel_paths() { + let tmp = tempdir().unwrap(); + let r = rows(vec![ + ("a/clip.mp4", Some(&"a".repeat(64))), + ("b/clip.mp4", Some(&"a".repeat(64))), // same bytes, dup + ("other.mp4", Some(&"b".repeat(64))), + ]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 2, "duplicate hashes collapse"); + } + + #[test] + fn playlist_existence_promotes_to_with_playlist() { + let tmp = tempdir().unwrap(); + let hash = "a".repeat(64); + touch(tmp.path(), &format!("aa/{}/playlist.m3u8", hash)); + + let r = rows(vec![("clip.mp4", Some(&hash))]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 1); + assert_eq!(stats.with_playlist, 1); + assert_eq!(stats.pending, 0); + } + + #[test] + fn sentinel_existence_promotes_to_unsupported() { + let tmp = tempdir().unwrap(); + let hash = "b".repeat(64); + touch(tmp.path(), &format!("bb/{}/playlist.unsupported", hash)); + + let r = rows(vec![("clip.mov", Some(&hash))]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 1); + assert_eq!(stats.unsupported, 1); + assert_eq!(stats.with_playlist, 0); + assert_eq!(stats.pending, 0); + } + + #[test] + fn null_hash_videos_are_hashless_not_total() { + let tmp = tempdir().unwrap(); + let r = rows(vec![ + ("clip.mp4", None), + ("other.mp4", Some(&"a".repeat(64))), + ]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 1, "hashless row excluded from total"); + assert_eq!(stats.hashless_videos, 1); + } + + #[test] + fn publish_gauges_sets_per_library_value() { + let s = HlsLibraryStats { + library_id: 7, + library: "test_publish_a".into(), + total: 5, + with_playlist: 2, + pending: 3, + unsupported: 0, + hashless_videos: 0, + }; + publish_gauges(&s); + assert_eq!( + HLS_VIDEOS_TOTAL + .with_label_values(&["test_publish_a"]) + .get(), + 5 + ); + assert_eq!( + HLS_VIDEOS_PENDING + .with_label_values(&["test_publish_a"]) + .get(), + 3 + ); + assert_eq!( + HLS_VIDEOS_WITH_PLAYLIST + .with_label_values(&["test_publish_a"]) + .get(), + 2 + ); + } +} diff --git a/src/knowledge.rs b/src/knowledge.rs index 4c3f5a8..66815b2 100644 --- a/src/knowledge.rs +++ b/src/knowledge.rs @@ -444,8 +444,7 @@ where ) .service(web::resource("/graph").route(web::get().to(get_graph::))) .service( - web::resource("/predicate-stats") - .route(web::get().to(get_predicate_stats::)), + web::resource("/predicate-stats").route(web::get().to(get_predicate_stats::)), ) .service( web::resource("/predicates/{predicate}/bulk-reject") @@ -1261,12 +1260,8 @@ async fn bulk_reject_predicate( let persona = resolve_persona_filter(&req, &claims, &persona_dao); let cx = opentelemetry::Context::current(); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); - match dao.bulk_reject_facts_by_predicate( - &cx, - &persona, - &predicate, - Some(("manual", "manual")), - ) { + match dao.bulk_reject_facts_by_predicate(&cx, &persona, &predicate, Some(("manual", "manual"))) + { Ok(rejected) => HttpResponse::Ok().json(BulkRejectResponse { rejected }), Err(e) => { log::error!("bulk_reject_predicate error: {:?}", e); diff --git a/src/libraries.rs b/src/libraries.rs index 59b614a..6248cfa 100644 --- a/src/libraries.rs +++ b/src/libraries.rs @@ -94,7 +94,7 @@ pub fn parse_excluded_dirs_column(raw: Option<&str>) -> Vec { match raw { None => Vec::new(), Some(s) => s - .split(|c: char| matches!(c, ',' | '\n' | '\r')) + .split([',', '\n', '\r']) .map(str::trim) .filter(|s| !s.is_empty()) .map(String::from) @@ -148,10 +148,7 @@ pub fn validate_excluded_dirs_entry(entry: &str) -> Result { if let Some(rel) = trimmed.strip_prefix('/') { // Path form. Reject `..` traversal — `base.join(\"../x\")` doesn't // canonicalise, so `path.starts_with(...)` never matches. - if rel - .split('/') - .any(|seg| seg == "..") - { + if rel.split('/').any(|seg| seg == "..") { return Err(format!( "'{}': '..' segments don't normalise — the prefix-match never fires", trimmed @@ -542,7 +539,10 @@ pub async fn patch_library( { Ok(n) => affected = affected.max(n), Err(e) => { - warn!("PATCH /libraries/{}: enabled update failed: {:?}", lib_id, e); + warn!( + "PATCH /libraries/{}: enabled update failed: {:?}", + lib_id, e + ); return HttpResponse::InternalServerError().body(format!("{}", e)); } } @@ -600,7 +600,9 @@ pub async fn patch_library( ); HttpResponse::Ok().json(lib) } - None => HttpResponse::NotFound().body(format!("library id {} not found after update", lib_id)), + None => { + HttpResponse::NotFound().body(format!("library id {} not found after update", lib_id)) + } } } @@ -930,10 +932,7 @@ mod tests { #[test] fn validate_strips_trailing_slash_on_path_entries() { - assert_eq!( - validate_excluded_dirs_entry("/photos/").unwrap(), - "/photos" - ); + assert_eq!(validate_excluded_dirs_entry("/photos/").unwrap(), "/photos"); assert_eq!( validate_excluded_dirs_entry("/photos//").unwrap(), "/photos" diff --git a/src/main.rs b/src/main.rs index 30be9dc..51583c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,7 +26,6 @@ use crate::files::{RealFileSystem, move_file}; use crate::service::ServiceBuilder; use crate::state::AppState; use crate::tags::*; -use crate::video::actors::ScanDirectoryMessage; use log::{error, info}; mod ai; @@ -46,6 +45,7 @@ mod file_types; mod files; mod geo; mod handlers; +mod hls_stats; mod libraries; mod library_maintenance; mod perceptual_hash; @@ -73,6 +73,16 @@ fn main() -> std::io::Result<()> { run_migrations(&mut connect()).expect("Failed to run migrations"); + // One-shot retirement of the pre-content-hash HLS layout. Idempotent + // — a second boot finds nothing and reports zero deletions, so it's + // safe to leave wired in until the module is removed in a later + // release. Runs before the actor pipeline starts so we never race a + // PlaylistGenerator write against this rm. + { + let video_path = env::var("VIDEO_PATH").expect("VIDEO_PATH was not set in the env"); + video::legacy_migration::retire_legacy_hls_output(std::path::Path::new(&video_path)); + } + let system = actix::System::new(); system.block_on(async { // Just use basic logger when running a non-release build @@ -117,15 +127,32 @@ fn main() -> std::io::Result<()> { .registry .register(Box::new(thumbnails::VIDEO_GAUGE.clone())) .unwrap(); + // HLS readiness gauges. Updated by the watcher every full-scan + // tick and on every `/hls/stats` request. See `hls_stats`. + prometheus + .registry + .register(Box::new(hls_stats::HLS_VIDEOS_TOTAL.clone())) + .unwrap(); + prometheus + .registry + .register(Box::new(hls_stats::HLS_VIDEOS_WITH_PLAYLIST.clone())) + .unwrap(); + prometheus + .registry + .register(Box::new(hls_stats::HLS_VIDEOS_PENDING.clone())) + .unwrap(); + prometheus + .registry + .register(Box::new(hls_stats::HLS_VIDEOS_UNSUPPORTED.clone())) + .unwrap(); let app_state = app_data.clone(); - for lib in &app_state.libraries { - app_state.playlist_manager.do_send(ScanDirectoryMessage { - directory: lib.root_path.clone(), - }); - } - // Start file watcher with playlist manager and preview generator + // Start file watcher with playlist manager and preview generator. + // The watcher's first tick is configured to be a full scan (see + // `watch_files`), so every library's missing HLS playlists are + // queued on that first iteration — no separate startup walk + // needed. let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone(); let preview_gen_for_watcher = app_state.preview_clip_generator.as_ref().clone(); // Both background jobs read from the shared `live_libraries` lock @@ -257,10 +284,10 @@ fn main() -> std::io::Result<()> { .service(handlers::image::get_image) .service(handlers::image::upload_image) .service(handlers::video::generate_video) - .service(handlers::video::stream_video) + .service(handlers::video::stream_hls_file) .service(handlers::video::get_video_preview) .service(handlers::video::get_preview_status) - .service(handlers::video::get_video_part) + .service(hls_stats::hls_stats_handler) .service(handlers::favorites::favorites) .service(handlers::favorites::put_add_favorite) .service(handlers::favorites::delete_favorite) diff --git a/src/state.rs b/src/state.rs index b147c77..fd39cba 100644 --- a/src/state.rs +++ b/src/state.rs @@ -111,7 +111,7 @@ impl AppState { "AppState::new requires at least one library" ); let base_path = libraries_vec[0].root_path.clone(); - let playlist_generator = PlaylistGenerator::new(); + let playlist_generator = PlaylistGenerator::new(video_path.clone()); let video_playlist_manager = VideoPlaylistManager::new(video_path.clone(), playlist_generator.start()); diff --git a/src/video/actors.rs b/src/video/actors.rs index eb539ef..9f2df1b 100644 --- a/src/video/actors.rs +++ b/src/video/actors.rs @@ -1,18 +1,18 @@ +use crate::content_hash; use crate::database::PreviewDao; use crate::libraries::Library; use crate::otel::global_tracer; -use crate::thumbnails::is_video; use crate::video::ffmpeg::{generate_preview_clip, get_duration_seconds_blocking}; +use crate::video::hls_paths; use actix::prelude::*; -use log::{debug, error, info, trace, warn}; +use log::{debug, error, info, warn}; use opentelemetry::KeyValue; use opentelemetry::trace::{Span, Status, Tracer}; use std::io::Result; use std::path::{Path, PathBuf}; -use std::process::{Child, Command, ExitStatus, Stdio}; +use std::process::{Command, Stdio}; use std::sync::{Arc, Mutex}; use tokio::sync::Semaphore; -use walkdir::{DirEntry, WalkDir}; // ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8 // ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8 @@ -22,89 +22,14 @@ impl Actor for StreamActor { type Context = Context; } -pub struct ProcessMessage(pub String, pub Child); - -impl Message for ProcessMessage { - type Result = Result; -} - -impl Handler for StreamActor { - type Result = Result; - - fn handle(&mut self, msg: ProcessMessage, _ctx: &mut Self::Context) -> Self::Result { - trace!("Message received"); - let mut process = msg.1; - let result = process.wait(); - - debug!( - "Finished waiting for: {:?}. Code: {:?}", - msg.0, - result - .as_ref() - .map_or(-1, |status| status.code().unwrap_or(-1)) - ); - result - } -} - -pub fn playlist_file_for(playlist_dir: &str, video_path: &Path) -> PathBuf { - let filename = video_path - .file_name() - .and_then(|n| n.to_str()) - .unwrap_or("unknown"); - PathBuf::from(format!("{}/{}.m3u8", playlist_dir, filename)) -} - -/// Sentinel path written next to a would-be playlist when ffmpeg cannot -/// transcode the source (e.g. truncated mp4 with no moov atom). Its presence -/// causes future scans to skip the file instead of re-running ffmpeg every -/// pass. Delete the `.unsupported` file to force a retry. -pub fn playlist_unsupported_sentinel(playlist_file: &Path) -> PathBuf { - let mut s = playlist_file.as_os_str().to_owned(); - s.push(".unsupported"); - PathBuf::from(s) -} - -pub async fn create_playlist(video_path: &str, playlist_file: &str) -> Result { - if Path::new(playlist_file).exists() { - debug!("Playlist already exists: {}", playlist_file); - return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)); - } - - let result = Command::new("ffmpeg") - .arg("-i") - .arg(video_path) - .arg("-c:v") - .arg("h264") - .arg("-crf") - .arg("21") - .arg("-preset") - .arg("veryfast") - .arg("-hls_time") - .arg("3") - .arg("-hls_list_size") - .arg("0") - .arg("-hls_playlist_type") - .arg("vod") - .arg("-vf") - .arg("scale='min(1080,iw)':-2,setsar=1:1") - .arg(playlist_file) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .spawn(); - - let start_time = std::time::Instant::now(); - loop { - actix::clock::sleep(std::time::Duration::from_secs(1)).await; - - if Path::new(playlist_file).exists() - || std::time::Instant::now() - start_time > std::time::Duration::from_secs(5) - { - break; - } - } - - result +/// A video paired with its content hash, ready to be queued for HLS +/// playlist generation. Hash is required because all output paths are +/// keyed on it; callers that lack a hash (rows mid-backfill) must skip +/// the video rather than fabricate one. +#[derive(Debug, Clone)] +pub struct VideoToQueue { + pub video_path: PathBuf, + pub content_hash: String, } pub fn generate_video_thumbnail(path: &Path, destination: &Path) -> std::io::Result<()> { @@ -331,17 +256,17 @@ async fn get_max_gop_seconds(video_path: &str) -> Option { } pub struct VideoPlaylistManager { - playlist_dir: PathBuf, + video_dir: PathBuf, playlist_generator: Addr, } impl VideoPlaylistManager { pub fn new>( - playlist_dir: P, + video_dir: P, playlist_generator: Addr, ) -> Self { Self { - playlist_dir: playlist_dir.into(), + video_dir: video_dir.into(), playlist_generator, } } @@ -351,144 +276,68 @@ impl Actor for VideoPlaylistManager { type Context = Context; } -impl Handler for VideoPlaylistManager { - type Result = ResponseFuture<()>; - - fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result { - let tracer = global_tracer(); - let mut span = tracer.start("videoplaylistmanager.scan_directory"); - - let start = std::time::Instant::now(); - info!( - "Starting scan directory for video playlist generation: {}", - msg.directory - ); - - let playlist_output_dir = self.playlist_dir.clone(); - let playlist_dir_str = playlist_output_dir.to_str().unwrap().to_string(); - - let video_files = WalkDir::new(&msg.directory) - .into_iter() - .filter_map(|e| e.ok()) - .filter(|e| e.file_type().is_file()) - .filter(is_video) - .filter(|e| { - let playlist = playlist_file_for(&playlist_dir_str, e.path()); - !playlist.exists() && !playlist_unsupported_sentinel(&playlist).exists() - }) - .collect::>(); - - let scan_dir_name = msg.directory.clone(); - let playlist_generator = self.playlist_generator.clone(); - - Box::pin(async move { - for e in video_files { - let path = e.path(); - let path_as_str = path.to_str().unwrap(); - debug!( - "Sending generate playlist message for path: {}", - path_as_str - ); - - match playlist_generator - .send(GeneratePlaylistMessage { - playlist_path: playlist_output_dir.to_str().unwrap().to_string(), - video_path: PathBuf::from(path), - }) - .await - .expect("Failed to send generate playlist message") - { - Ok(_) => { - span.add_event( - "Playlist generated", - vec![KeyValue::new("video_path", path_as_str.to_string())], - ); - - debug!( - "Successfully generated playlist for file: '{}'", - path_as_str - ); - } - Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { - debug!("Playlist already exists for '{:?}', skipping", path); - } - Err(e) => { - warn!("Failed to generate playlist for path '{:?}'. {:?}", path, e); - } - } - } - - span.add_event( - "Finished directory scan", - vec![KeyValue::new("directory", scan_dir_name.to_string())], - ); - info!( - "Finished directory scan of '{}' in {:?}", - scan_dir_name, - start.elapsed() - ); - }) - } -} - impl Handler for VideoPlaylistManager { type Result = (); fn handle(&mut self, msg: QueueVideosMessage, _ctx: &mut Self::Context) -> Self::Result { - if msg.video_paths.is_empty() { + if msg.videos.is_empty() { return; } - info!( - "Queueing {} videos for HLS playlist generation", - msg.video_paths.len() - ); - - let playlist_output_dir = self.playlist_dir.clone(); - let playlist_dir_str = playlist_output_dir.to_str().unwrap().to_string(); + let video_dir = self.video_dir.clone(); let playlist_generator = self.playlist_generator.clone(); - for video_path in msg.video_paths { - let playlist = playlist_file_for(&playlist_dir_str, &video_path); - if playlist.exists() || playlist_unsupported_sentinel(&playlist).exists() { + let mut queued = 0usize; + let mut already_present = 0usize; + for VideoToQueue { + video_path, + content_hash, + } in msg.videos + { + let playlist = hls_paths::playlist_for_hash(&video_dir, &content_hash); + let sentinel = hls_paths::sentinel_for_hash(&video_dir, &content_hash); + if playlist.exists() || sentinel.exists() { + already_present += 1; continue; } - let path_str = video_path.to_string_lossy().to_string(); - debug!("Queueing playlist generation for: {}", path_str); - + debug!( + "Queueing playlist generation for {} (hash={})", + video_path.display(), + short_hash(&content_hash) + ); playlist_generator.do_send(GeneratePlaylistMessage { - playlist_path: playlist_dir_str.clone(), video_path, + content_hash, }); + queued += 1; } + info!( + "Queue tick: {} queued, {} skipped (playlist or sentinel already on disk)", + queued, already_present + ); } } -#[derive(Message)] -#[rtype(result = "()")] -pub struct ScanDirectoryMessage { - pub(crate) directory: String, -} - #[derive(Message)] #[rtype(result = "()")] pub struct QueueVideosMessage { - pub video_paths: Vec, + pub videos: Vec, } #[derive(Message)] #[rtype(result = "Result<()>")] pub struct GeneratePlaylistMessage { pub video_path: PathBuf, - pub playlist_path: String, + pub content_hash: String, } pub struct PlaylistGenerator { semaphore: Arc, + video_dir: PathBuf, } impl PlaylistGenerator { - pub(crate) fn new() -> Self { + pub(crate) fn new>(video_dir: P) -> Self { // Concurrency is tunable via HLS_CONCURRENCY so operators can dial // it to their hardware: 1 on weak Synology boxes to avoid thermal // throttling, higher on desktops with spare cores. @@ -500,6 +349,7 @@ impl PlaylistGenerator { info!("PlaylistGenerator: concurrency={}", concurrency); PlaylistGenerator { semaphore: Arc::new(Semaphore::new(concurrency)), + video_dir: video_dir.into(), } } } @@ -513,20 +363,23 @@ impl Handler for PlaylistGenerator { fn handle(&mut self, msg: GeneratePlaylistMessage, _ctx: &mut Self::Context) -> Self::Result { let video_file = msg.video_path.to_str().unwrap().to_owned(); - let playlist_path = msg.playlist_path.as_str().to_owned(); + let content_hash_str = msg.content_hash.clone(); let semaphore = self.semaphore.clone(); + let video_dir = self.video_dir.clone(); - let playlist_file = format!( - "{}/{}.m3u8", - playlist_path, - msg.video_path.file_name().unwrap().to_str().unwrap() - ); + let hash_dir = content_hash::hls_dir(&video_dir, &content_hash_str); + let playlist_path = hls_paths::playlist_for_hash(&video_dir, &content_hash_str); + let sentinel_path = hls_paths::sentinel_for_hash(&video_dir, &content_hash_str); + let segment_template = hls_paths::segment_template_for_hash(&video_dir, &content_hash_str); + let playlist_file = playlist_path.to_string_lossy().to_string(); + let segment_pattern = segment_template.to_string_lossy().to_string(); let tracer = global_tracer(); let mut span = tracer .span_builder("playlistgenerator.generate_playlist") .with_attributes(vec![ KeyValue::new("video_file", video_file.clone()), + KeyValue::new("content_hash", content_hash_str.clone()), KeyValue::new("playlist_file", playlist_file.clone()), ]) .start(&tracer); @@ -550,7 +403,7 @@ impl Handler for PlaylistGenerator { )], ); - if Path::new(&playlist_file).exists() { + if playlist_path.exists() { debug!("Playlist already exists: {}", playlist_file); span.set_status(Status::error(format!( "Playlist already exists: {}", @@ -559,6 +412,19 @@ impl Handler for PlaylistGenerator { return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)); } + // Ensure the shard + hash directory exist. Idempotent — the + // dir may already be present from a prior attempt that wrote + // a sentinel before being cleared for retry. + if let Err(e) = tokio::fs::create_dir_all(&hash_dir).await { + error!( + "Failed to create HLS hash dir {}: {}", + hash_dir.display(), + e + ); + span.set_status(Status::error(format!("mkdir failed: {}", e))); + return Err(e); + } + // One ffprobe call for codec + rotation metadata. let stream_meta = probe_video_stream_meta(&video_file).await; let is_h264 = stream_meta.is_h264; @@ -619,16 +485,11 @@ impl Handler for PlaylistGenerator { span.add_event("Transcoding to h264", vec![]); } - // Encode to a .tmp playlist and explicit segment names so a failed - // encode leaves predictable artifacts we can clean up — and so a - // concurrent scan doesn't see a half-written .m3u8 as "done". + // Encode to a .tmp playlist alongside the final inside the + // hash dir, so a concurrent scan never sees a half-written + // .m3u8 as "done". Segments use the hash-keyed template; + // ffmpeg writes them next to the playlist (relative refs). let playlist_tmp = format!("{}.tmp", playlist_file); - let video_stem = msg - .video_path - .file_name() - .and_then(|n| n.to_str()) - .unwrap_or("video"); - let segment_pattern = format!("{}/{}_%03d.ts", playlist_path, video_stem); let mut cmd = tokio::process::Command::new("ffmpeg"); cmd.arg("-y").arg("-i").arg(&video_file); @@ -717,12 +578,12 @@ impl Handler for PlaylistGenerator { let success = matches!(&ffmpeg_result, Ok(out) if out.status.success()); if success { - if let Err(e) = tokio::fs::rename(&playlist_tmp, &playlist_file).await { + if let Err(e) = tokio::fs::rename(&playlist_tmp, &playlist_path).await { error!( "ffmpeg succeeded but rename {} -> {} failed: {}", playlist_tmp, playlist_file, e ); - cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await; + cleanup_partial_hls(&hash_dir).await; span.set_status(Status::error(format!("rename failed: {}", e))); return Err(e); } @@ -739,18 +600,17 @@ impl Handler for PlaylistGenerator { Err(e) => format!("ffmpeg failed: {}", e), }; error!("ffmpeg failed for {}: {}", video_file, detail); - cleanup_partial_hls(&playlist_tmp, playlist_path.as_str(), video_stem).await; - let sentinel = playlist_unsupported_sentinel(Path::new(&playlist_file)); - if let Err(se) = tokio::fs::write(&sentinel, b"").await { + cleanup_partial_hls(&hash_dir).await; + if let Err(se) = tokio::fs::write(&sentinel_path, b"").await { warn!( "Failed to write playlist sentinel {}: {}", - sentinel.display(), + sentinel_path.display(), se ); } else { info!( "Wrote playlist sentinel {} so future scans skip {}", - sentinel.display(), + sentinel_path.display(), video_file ); } @@ -761,29 +621,47 @@ impl Handler for PlaylistGenerator { } } -/// Delete the temp playlist and any segment files that ffmpeg may have written -/// before failing. Called both on ffmpeg error and on rename failure so a -/// retry on the next scan starts from a clean slate. -async fn cleanup_partial_hls(playlist_tmp: &str, playlist_dir: &str, video_stem: &str) { - let _ = tokio::fs::remove_file(playlist_tmp).await; - - let segment_prefix = format!("{}_", video_stem); - let Ok(mut entries) = tokio::fs::read_dir(playlist_dir).await else { +/// Delete the partial playlist (.tmp) and any segment files left behind by +/// a failed ffmpeg run. Wipes every non-sentinel file in the hash dir; +/// retains the sentinel if one has already been written by an earlier +/// caller in the same path (today there is none, but kept defensively so +/// the function is safe to call after sentinel write too). +async fn cleanup_partial_hls(hash_dir: &Path) { + let Ok(mut entries) = tokio::fs::read_dir(hash_dir).await else { return; }; while let Ok(Some(entry)) = entries.next_entry().await { - let Some(name) = entry.file_name().to_str().map(str::to_owned) else { + let path = entry.path(); + let is_sentinel = path + .file_name() + .and_then(|n| n.to_str()) + .map(|n| n == hls_paths::UNSUPPORTED_SENTINEL_FILENAME) + .unwrap_or(false); + if is_sentinel { continue; - }; - if name.starts_with(&segment_prefix) - && name.ends_with(".ts") - && let Err(e) = tokio::fs::remove_file(entry.path()).await - { - warn!("Failed to remove partial segment {}: {}", name, e); + } + if let Err(e) = tokio::fs::remove_file(&path).await { + warn!( + "Failed to remove partial HLS file {}: {}", + path.display(), + e + ); } } } +/// First 16 chars of a content hash for log lines. Short enough to keep +/// log volume sane, long enough that distinct hashes don't collide in +/// practice. +fn short_hash(hash: &str) -> &str { + let end = hash + .char_indices() + .nth(16) + .map(|(i, _)| i) + .unwrap_or(hash.len()); + &hash[..end] +} + #[derive(Message)] #[rtype(result = "()")] pub struct GeneratePreviewClipMessage { diff --git a/src/video/hls_paths.rs b/src/video/hls_paths.rs new file mode 100644 index 0000000..cf7fa96 --- /dev/null +++ b/src/video/hls_paths.rs @@ -0,0 +1,84 @@ +//! Path layout for hash-keyed HLS output. +//! +//! Source-of-truth is [`crate::content_hash::hls_dir`], which gives +//! `///`. The playlist, the per-segment files, +//! and the "ffmpeg refused" sentinel all live inside that directory so a +//! `.m3u8` written with relative segment references resolves correctly +//! at serve time without any URL rewriting. + +use std::path::{Path, PathBuf}; + +use crate::content_hash; + +/// Standard filename for the HLS playlist inside a hash dir. Fixed so +/// the URL contract is `playlist.m3u8` regardless of the source video's +/// original basename. +pub const PLAYLIST_FILENAME: &str = "playlist.m3u8"; + +/// Sentinel filename written when ffmpeg refused to transcode the +/// source. Presence in the hash dir tells future scans to skip the file +/// instead of re-running ffmpeg every tick. Delete to force a retry. +pub const UNSUPPORTED_SENTINEL_FILENAME: &str = "playlist.unsupported"; + +/// Segment-name template passed to ffmpeg via `-hls_segment_filename`. +/// Segments live inside the hash dir; the playlist's relative refs +/// resolve to siblings automatically. +pub const SEGMENT_TEMPLATE: &str = "segment_%03d.ts"; + +/// Path to the HLS playlist for a video identified by content hash. +pub fn playlist_for_hash(video_dir: &Path, hash: &str) -> PathBuf { + content_hash::hls_dir(video_dir, hash).join(PLAYLIST_FILENAME) +} + +/// Path to the unsupported-source sentinel for a hash. +pub fn sentinel_for_hash(video_dir: &Path, hash: &str) -> PathBuf { + content_hash::hls_dir(video_dir, hash).join(UNSUPPORTED_SENTINEL_FILENAME) +} + +/// Absolute path used as ffmpeg's `-hls_segment_filename` value. +pub fn segment_template_for_hash(video_dir: &Path, hash: &str) -> PathBuf { + content_hash::hls_dir(video_dir, hash).join(SEGMENT_TEMPLATE) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn playlist_path_lives_inside_sharded_hash_dir() { + let video = Path::new("/var/video"); + let p = playlist_for_hash(video, "abcdef0123456789"); + assert_eq!( + p, + PathBuf::from("/var/video/ab/abcdef0123456789/playlist.m3u8") + ); + } + + #[test] + fn sentinel_path_lives_alongside_playlist() { + let video = Path::new("/var/video"); + let s = sentinel_for_hash(video, "abcdef0123456789"); + assert_eq!( + s, + PathBuf::from("/var/video/ab/abcdef0123456789/playlist.unsupported") + ); + } + + #[test] + fn segment_template_lives_alongside_playlist() { + let video = Path::new("/var/video"); + let t = segment_template_for_hash(video, "abcdef0123456789"); + assert_eq!( + t, + PathBuf::from("/var/video/ab/abcdef0123456789/segment_%03d.ts") + ); + } + + #[test] + fn distinct_hashes_yield_distinct_dirs() { + let video = Path::new("/var/video"); + let a = playlist_for_hash(video, "1111aaaa"); + let b = playlist_for_hash(video, "2222bbbb"); + assert_ne!(a.parent(), b.parent()); + } +} diff --git a/src/video/legacy_migration.rs b/src/video/legacy_migration.rs new file mode 100644 index 0000000..ed6863c --- /dev/null +++ b/src/video/legacy_migration.rs @@ -0,0 +1,243 @@ +//! One-shot retirement of the pre-content-hash HLS output layout. +//! +//! Before the hash-keyed layout landed, the actor pipeline wrote every +//! playlist as `$VIDEO_PATH/.m3u8` with sibling +//! `_NNN.ts` segments and a `.m3u8.unsupported` +//! sentinel on ffmpeg failure. The new pipeline (see +//! [`crate::video::hls_paths`]) puts everything inside a hash-keyed +//! subdirectory, so the legacy flat files are orphaned the moment the +//! upgraded binary boots — they're not served, not refreshed, and not +//! GC'd by the new orphan cleanup (which deliberately ignores anything +//! that doesn't sit inside a `//` dir). +//! +//! This migration runs once on startup. It walks `$VIDEO_PATH` at depth +//! 1, deletes every `.m3u8` / `.m3u8.tmp` / `.m3u8.unsupported` / `.ts` +//! file, and reports a single info line. It is idempotent — a second +//! run finds nothing and reports zero deletions, so it's safe to leave +//! wired in across releases until the codebase finally drops the +//! module. +//! +//! Sub-directories under `$VIDEO_PATH` are intentionally left alone: +//! every legitimate child of `$VIDEO_PATH` in the new layout is a +//! 2-char shard directory holding hash subdirs, and those are managed +//! by `cleanup_orphaned_playlists`. + +use std::path::Path; + +use log::{info, warn}; + +/// Counters for what the migration did this run. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub struct RetireStats { + pub deleted_playlists: usize, + pub deleted_segments: usize, + pub deleted_sentinels: usize, + pub deleted_tmp: usize, + pub errors: usize, +} + +impl RetireStats { + pub fn total_deleted(&self) -> usize { + self.deleted_playlists + self.deleted_segments + self.deleted_sentinels + self.deleted_tmp + } +} + +/// Delete every legacy basename-keyed HLS artifact at the root of +/// `video_dir`. Hash dirs (children that are directories) are skipped. +/// Returns counts so the caller can log a single line summary. +pub fn retire_legacy_hls_output(video_dir: &Path) -> RetireStats { + let mut stats = RetireStats::default(); + + let read = match std::fs::read_dir(video_dir) { + Ok(r) => r, + Err(e) => { + warn!( + "Legacy HLS migration: cannot read {} ({}); skipping", + video_dir.display(), + e + ); + return stats; + } + }; + + for entry in read.flatten() { + let file_type = match entry.file_type() { + Ok(t) => t, + Err(_) => continue, + }; + if !file_type.is_file() { + // Hash shard directories live here in the new layout. + continue; + } + let path = entry.path(); + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + + let bucket = classify(name); + let Some(bucket) = bucket else { + continue; + }; + + match std::fs::remove_file(&path) { + Ok(()) => match bucket { + LegacyKind::Playlist => stats.deleted_playlists += 1, + LegacyKind::Segment => stats.deleted_segments += 1, + LegacyKind::Sentinel => stats.deleted_sentinels += 1, + LegacyKind::Tmp => stats.deleted_tmp += 1, + }, + Err(e) => { + warn!( + "Legacy HLS migration: failed to remove {}: {}", + path.display(), + e + ); + stats.errors += 1; + } + } + } + + if stats.total_deleted() > 0 || stats.errors > 0 { + info!( + "Legacy HLS migration: deleted {} playlist(s), {} segment(s), {} sentinel(s), {} tmp; {} error(s)", + stats.deleted_playlists, + stats.deleted_segments, + stats.deleted_sentinels, + stats.deleted_tmp, + stats.errors, + ); + } else { + info!( + "Legacy HLS migration: nothing to do under {}", + video_dir.display() + ); + } + + stats +} + +#[derive(Debug, Clone, Copy)] +enum LegacyKind { + Playlist, + Segment, + Sentinel, + Tmp, +} + +/// Decide whether a flat file at `$VIDEO_PATH` root is legacy HLS +/// output. Returns `None` for anything else — operator-stashed files, +/// new-layout files (which don't live here), etc. — so we don't rm them. +fn classify(name: &str) -> Option { + // Order matters: sentinel and tmp are more specific suffixes that + // sit on top of the .m3u8 / .ts extensions, so check them first. + if name.ends_with(".m3u8.unsupported") { + return Some(LegacyKind::Sentinel); + } + if name.ends_with(".m3u8.tmp") { + return Some(LegacyKind::Tmp); + } + if name.ends_with(".m3u8") { + return Some(LegacyKind::Playlist); + } + if name.ends_with(".ts") { + return Some(LegacyKind::Segment); + } + None +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + use tempfile::tempdir; + + #[test] + fn classify_recognises_each_legacy_artifact() { + assert!(matches!( + classify("IMG_0341.MOV.m3u8"), + Some(LegacyKind::Playlist) + )); + assert!(matches!( + classify("IMG_0341.MOV_000.ts"), + Some(LegacyKind::Segment) + )); + assert!(matches!( + classify("IMG_0341.MOV.m3u8.unsupported"), + Some(LegacyKind::Sentinel) + )); + assert!(matches!( + classify("IMG_0341.MOV.m3u8.tmp"), + Some(LegacyKind::Tmp) + )); + + assert!(classify("README.md").is_none()); + assert!(classify("ab").is_none()); // shard dir name + assert!(classify(".keep").is_none()); + } + + #[test] + fn retire_deletes_legacy_and_leaves_hash_dirs() { + let tmp = tempdir().unwrap(); + let root = tmp.path(); + + // Legacy artifacts at root. + fs::write(root.join("IMG_0341.MOV.m3u8"), b"#EXTM3U").unwrap(); + fs::write(root.join("IMG_0341.MOV_000.ts"), b"\x00").unwrap(); + fs::write(root.join("IMG_0341.MOV_001.ts"), b"\x00").unwrap(); + fs::write(root.join("clip.MP4.m3u8.unsupported"), b"").unwrap(); + fs::write(root.join("partial.m3u8.tmp"), b"").unwrap(); + + // New-layout hash dir we must NOT touch. + let hash_dir = root.join("ab").join("a".repeat(64)); + fs::create_dir_all(&hash_dir).unwrap(); + fs::write(hash_dir.join("playlist.m3u8"), b"#EXTM3U").unwrap(); + fs::write(hash_dir.join("segment_000.ts"), b"\x00").unwrap(); + + // Unrelated file we must NOT touch. + fs::write(root.join("README.md"), b"don't touch me").unwrap(); + + let stats = retire_legacy_hls_output(root); + assert_eq!(stats.deleted_playlists, 1); + assert_eq!(stats.deleted_segments, 2); + assert_eq!(stats.deleted_sentinels, 1); + assert_eq!(stats.deleted_tmp, 1); + assert_eq!(stats.errors, 0); + + // Legacy artifacts gone. + assert!(!root.join("IMG_0341.MOV.m3u8").exists()); + assert!(!root.join("IMG_0341.MOV_000.ts").exists()); + assert!(!root.join("clip.MP4.m3u8.unsupported").exists()); + assert!(!root.join("partial.m3u8.tmp").exists()); + // Hash dir untouched. + assert!(hash_dir.join("playlist.m3u8").exists()); + assert!(hash_dir.join("segment_000.ts").exists()); + // Unrelated file untouched. + assert!(root.join("README.md").exists()); + } + + #[test] + fn retire_is_idempotent() { + let tmp = tempdir().unwrap(); + let root = tmp.path(); + + fs::write(root.join("video.mp4.m3u8"), b"#EXTM3U").unwrap(); + fs::write(root.join("video.mp4_000.ts"), b"\x00").unwrap(); + + let first = retire_legacy_hls_output(root); + assert_eq!(first.deleted_playlists + first.deleted_segments, 2); + + let second = retire_legacy_hls_output(root); + assert_eq!(second.total_deleted(), 0); + assert_eq!(second.errors, 0); + } + + #[test] + fn retire_handles_missing_dir() { + // No panic, no error count blowing up — just a warn + zero stats. + let tmp = tempdir().unwrap(); + let missing = tmp.path().join("does_not_exist"); + let stats = retire_legacy_hls_output(&missing); + assert_eq!(stats.total_deleted(), 0); + assert_eq!(stats.errors, 0); + } +} diff --git a/src/video/mod.rs b/src/video/mod.rs index 8078a1f..f28d302 100644 --- a/src/video/mod.rs +++ b/src/video/mod.rs @@ -9,6 +9,8 @@ use walkdir::WalkDir; pub mod actors; pub mod ffmpeg; +pub mod hls_paths; +pub mod legacy_migration; #[allow(dead_code)] pub async fn generate_video_gifs() { diff --git a/src/watcher.rs b/src/watcher.rs index ca56ea6..13ac1cd 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -22,7 +22,6 @@ use std::time::{Duration, SystemTime}; use actix::Addr; use chrono::Utc; use log::{debug, error, info, warn}; -use walkdir::WalkDir; use crate::backfill; use crate::content_hash; @@ -33,6 +32,7 @@ use crate::exif; use crate::face_watch; use crate::faces; use crate::file_types; +use crate::hls_stats; use crate::libraries; use crate::library_maintenance; use crate::perceptual_hash; @@ -40,20 +40,34 @@ use crate::tags; use crate::tags::SqliteTagDao; use crate::thumbnails; use crate::video; -use crate::video::actors::{GeneratePreviewClipMessage, QueueVideosMessage, VideoPlaylistManager}; +use crate::video::actors::{ + GeneratePreviewClipMessage, QueueVideosMessage, VideoPlaylistManager, VideoToQueue, +}; +use crate::video::hls_paths; -/// Clean up orphaned HLS playlists and segments whose source videos no longer exist. +/// Clean up orphaned HLS hash directories under `$VIDEO_PATH` whose +/// content_hash no longer appears in `image_exif`. +/// +/// Walks `///` — the layout written by the +/// hash-keyed `PlaylistGenerator` — and deletes any hash directory whose +/// hash isn't in the current DISTINCT set of `image_exif.content_hash` +/// values. Empty shard parents are reaped on the same pass. +/// +/// Legacy basename-keyed files at `$VIDEO_PATH` root (from the +/// pre-content-hash layout) are left alone here; the one-shot startup +/// migration is responsible for retiring those. /// /// `libs_lock` is the shared live view of the libraries table — read at the /// top of each cleanup pass so a PATCH /libraries/{id} that disables or /// re-mounts a library is picked up without a restart. pub fn cleanup_orphaned_playlists( libs_lock: Arc>>, - excluded_dirs: Vec, + _excluded_dirs: Vec, library_health: libraries::LibraryHealthMap, ) { std::thread::spawn(move || { - let video_path = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); + let video_path_str = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); + let video_path = PathBuf::from(&video_path_str); // Get cleanup interval from environment (default: 24 hours) let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS") @@ -61,18 +75,14 @@ pub fn cleanup_orphaned_playlists( .and_then(|s| s.parse::().ok()) .unwrap_or(86400); // 24 hours - info!("Starting orphaned playlist cleanup job"); + info!("Starting orphaned HLS cleanup job"); info!(" Cleanup interval: {} seconds", cleanup_interval_secs); - info!(" Playlist directory: {}", video_path); - { - let libs = libs_lock.read().unwrap_or_else(|e| e.into_inner()); - for lib in libs.iter() { - info!( - " Checking sources under '{}' at {}", - lib.name, lib.root_path - ); - } - } + info!(" HLS directory: {}", video_path.display()); + + let exif_dao: Arc>> = Arc::new(Mutex::new(Box::new( + SqliteExifDao::new(), + ) + as Box)); loop { std::thread::sleep(Duration::from_secs(cleanup_interval_secs)); @@ -83,22 +93,27 @@ pub fn cleanup_orphaned_playlists( let libs: Vec = libs_lock.read().unwrap_or_else(|e| e.into_inner()).clone(); - // Safety gate: skip the cleanup cycle if any library is - // stale. A missing source video on a stale library is - // indistinguishable from a transient unmount, and the - // cleanup is destructive — we'd rather leak a few playlist - // files for a tick than delete one whose source is briefly - // unreachable. The cycle re-runs on the next interval. + // Safety gate: skip the cleanup cycle if any (enabled) + // library is stale. With hash-keyed layout the orphan + // decision is a pure DB query, but the upstream + // missing-file scan that *removes* image_exif rows already + // pauses for stale libraries — so a stale tick can hold + // hashes alive that would otherwise have been GC'd. The + // safety is then mostly belt-and-suspenders: a hash that + // should have been retired is just kept one tick longer. + // We'd rather leak a few hash dirs for 24h than wipe a + // hash dir whose source was briefly unreachable. { let guard = library_health.read().unwrap_or_else(|e| e.into_inner()); let stale: Vec = libs .iter() + .filter(|lib| lib.enabled) .filter(|lib| guard.get(&lib.id).map(|h| !h.is_online()).unwrap_or(false)) .map(|lib| lib.name.clone()) .collect(); if !stale.is_empty() { warn!( - "Skipping orphaned-playlist cleanup: {} library(ies) stale: [{}]", + "Skipping orphaned-HLS cleanup: {} library(ies) stale: [{}]", stale.len(), stale.join(", ") ); @@ -106,116 +121,129 @@ pub fn cleanup_orphaned_playlists( } } - info!("Running orphaned playlist cleanup"); + info!("Running orphaned HLS cleanup"); let start = std::time::Instant::now(); - let mut deleted_count = 0; - let mut error_count = 0; - // Find all .m3u8 files in VIDEO_PATH - let playlists: Vec = WalkDir::new(&video_path) - .into_iter() - .filter_map(|e| e.ok()) - .filter(|e| e.file_type().is_file()) - .filter(|e| { - e.path() - .extension() - .and_then(|s| s.to_str()) - .map(|ext| ext.eq_ignore_ascii_case("m3u8")) - .unwrap_or(false) - }) - .map(|e| e.path().to_path_buf()) - .collect(); + // Snapshot every live content_hash currently in image_exif. + // We intentionally don't filter by library here — a hash that + // lives in any library is alive, even if the library a given + // download attributed it to has since been disabled. + let alive_hashes: HashSet = { + let context = opentelemetry::Context::new(); + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + match dao.list_distinct_content_hashes(&context) { + Ok(hashes) => hashes.into_iter().collect(), + Err(e) => { + error!( + "Failed to load distinct content hashes; skipping HLS cleanup: {:?}", + e + ); + continue; + } + } + }; - info!("Found {} playlist files to check", playlists.len()); + let mut deleted_count = 0usize; + let mut error_count = 0usize; + let mut inspected = 0usize; - for playlist_path in playlists { - // Extract the original video filename from playlist name - // Playlist format: {VIDEO_PATH}/{original_filename}.m3u8 - if let Some(filename) = playlist_path.file_stem() { - let video_filename = filename.to_string_lossy(); + // Walk top-level entries of VIDEO_PATH. Each is either a + // legacy basename-keyed `.m3u8` / `.ts` (skip — migration + // owns those) or a 2-char shard directory. + let read_root = match std::fs::read_dir(&video_path) { + Ok(r) => r, + Err(e) => { + error!( + "HLS cleanup: failed to read VIDEO_PATH {}: {}", + video_path.display(), + e + ); + continue; + } + }; - // Search for this video file across every configured - // library, respecting EXCLUDED_DIRS so we don't - // false-resurrect playlists for videos that only - // exist inside an excluded subtree. As soon as one - // library has a matching source, we're done — the - // playlist isn't orphaned. - let mut video_exists = false; - 'libs: for lib in &libs { - let effective = lib.effective_excluded_dirs(&excluded_dirs); - for entry in image_api::file_scan::walk_library_files( - Path::new(&lib.root_path), - &effective, - ) { - if let Some(entry_stem) = entry.path().file_stem() - && entry_stem == filename - && file_types::is_video_file(entry.path()) - { - video_exists = true; - break 'libs; - } - } + for shard_entry in read_root.flatten() { + let shard_path = shard_entry.path(); + if !shard_entry.file_type().map(|t| t.is_dir()).unwrap_or(false) { + continue; + } + let shard_name = match shard_path.file_name().and_then(|n| n.to_str()) { + Some(n) => n.to_owned(), + None => continue, + }; + if !is_hash_shard(&shard_name) { + continue; + } + + // Hash dirs inside this shard. + let read_shard = match std::fs::read_dir(&shard_path) { + Ok(r) => r, + Err(e) => { + warn!( + "HLS cleanup: failed to read shard {}: {}", + shard_path.display(), + e + ); + continue; + } + }; + + let mut shard_emptied = true; + for hash_entry in read_shard.flatten() { + let hash_path = hash_entry.path(); + if !hash_entry.file_type().map(|t| t.is_dir()).unwrap_or(false) { + shard_emptied = false; + continue; + } + let Some(hash_name) = hash_path + .file_name() + .and_then(|n| n.to_str()) + .map(|n| n.to_owned()) + else { + shard_emptied = false; + continue; + }; + if !is_full_hash(&hash_name) { + shard_emptied = false; + continue; + } + inspected += 1; + + if alive_hashes.contains(&hash_name) { + shard_emptied = false; + continue; } - if !video_exists { - debug!( - "Source video for playlist {} no longer exists, deleting", - playlist_path.display() - ); - - // Delete the playlist file - if let Err(e) = std::fs::remove_file(&playlist_path) { + debug!( + "HLS cleanup: removing orphan hash dir {}", + hash_path.display() + ); + match std::fs::remove_dir_all(&hash_path) { + Ok(()) => deleted_count += 1, + Err(e) => { warn!( - "Failed to delete playlist {}: {}", - playlist_path.display(), + "Failed to delete orphan hash dir {}: {}", + hash_path.display(), e ); error_count += 1; - } else { - deleted_count += 1; - - // Also try to delete associated .ts segment files - // They are typically named {filename}N.ts in the same directory - if let Some(parent_dir) = playlist_path.parent() { - for entry in WalkDir::new(parent_dir) - .max_depth(1) - .into_iter() - .filter_map(|e| e.ok()) - .filter(|e| e.file_type().is_file()) - { - let entry_path = entry.path(); - if let Some(ext) = entry_path.extension() - && ext.eq_ignore_ascii_case("ts") - { - // Check if this .ts file belongs to our playlist - if let Some(ts_stem) = entry_path.file_stem() { - let ts_name = ts_stem.to_string_lossy(); - if ts_name.starts_with(&*video_filename) { - if let Err(e) = std::fs::remove_file(entry_path) { - debug!( - "Failed to delete segment {}: {}", - entry_path.display(), - e - ); - } else { - debug!( - "Deleted segment: {}", - entry_path.display() - ); - } - } - } - } - } - } + shard_emptied = false; } } } + + // If this shard now has no surviving hash dirs, reap + // the (empty) shard dir too. remove_dir fails if non- + // empty, which is the guard. + if shard_emptied { + let _ = std::fs::remove_dir(&shard_path); + } } info!( - "Orphaned playlist cleanup completed in {:?}: deleted {} playlists, {} errors", + "Orphaned HLS cleanup completed in {:?}: inspected {} hash dirs, deleted {} orphans, {} errors", start.elapsed(), + inspected, deleted_count, error_count ); @@ -223,6 +251,18 @@ pub fn cleanup_orphaned_playlists( }); } +/// True iff `s` is a two-character lowercase-hex shard prefix. +fn is_hash_shard(s: &str) -> bool { + s.len() == 2 && s.bytes().all(|b| b.is_ascii_hexdigit()) +} + +/// True iff `s` looks like a full blake3 hex digest (64 hex chars). +/// Be strict so we don't accidentally rm a non-HLS directory operators +/// have stashed under VIDEO_PATH. +fn is_full_hash(s: &str) -> bool { + s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit()) +} + pub fn watch_files( libs_lock: Arc>>, playlist_manager: Addr, @@ -288,7 +328,12 @@ pub fn watch_files( )); let mut last_quick_scan = SystemTime::now(); - let mut last_full_scan = SystemTime::now(); + // Initialize to UNIX_EPOCH so the *first* tick is treated as a + // full scan. That replaces the legacy startup ScanDirectoryMessage + // walk for HLS playlists: every library's existing media gets + // checked once at watcher boot, instead of waiting up to + // full_interval_secs (1h default) for the first natural full scan. + let mut last_full_scan = SystemTime::UNIX_EPOCH; let mut scan_count = 0u64; // Per-library cursor for the missing-file scan. Each tick reads @@ -531,6 +576,16 @@ pub fn watch_files( } if is_full_scan { + // End-of-full-scan HLS readiness summary: log a single + // info line + refresh the Prometheus gauges. Skipped on + // quick scans because the cost is non-trivial on big + // libraries and the data only meaningfully changes on + // full passes. + let video_dir_str = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); + let stats = + hls_stats::compute_and_publish(&libs, &exif_dao, Path::new(&video_dir_str)); + hls_stats::log_summary(&stats); + last_full_scan = now; } last_quick_scan = now; @@ -600,14 +655,18 @@ pub fn process_new_files( // Batch query: Get all EXIF data for these files in one query let file_paths: Vec = files.iter().map(|(_, rel_path)| rel_path.clone()).collect(); - let existing_exif_paths: HashMap = { + // Map of rel_path -> Option. The presence of the key + // tells us "row exists"; the Option value carries the hash for the + // HLS pipeline so video files without a hash (mid-backfill) skip + // this tick rather than fall back to a basename-colliding playlist. + let existing_exif: HashMap> = { let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); // Walk is per-library, so scope the lookup so a same-named file // in another library doesn't make this one look already-indexed. match dao.get_exif_batch(&context, Some(library.id), &file_paths) { Ok(exif_records) => exif_records .into_iter() - .map(|record| (record.file_path, true)) + .map(|record| (record.file_path, record.content_hash)) .collect(), Err(e) => { error!("Error batch querying EXIF data: {:?}", e); @@ -637,7 +696,7 @@ pub fn process_new_files( && !bare_legacy_thumb_path.exists() && !thumbnails::unsupported_thumbnail_sentinel(&scoped_thumb_path).exists() && !thumbnails::unsupported_thumbnail_sentinel(&bare_legacy_thumb_path).exists(); - let needs_row = !existing_exif_paths.contains_key(relative_path); + let needs_row = !existing_exif.contains_key(relative_path); if needs_thumbnail || needs_row { new_files_found = true; @@ -796,28 +855,45 @@ pub fn process_new_files( } } - // Check for videos that need HLS playlists + // Check for videos that need HLS playlists. All output is keyed on + // `content_hash` (see `crate::video::hls_paths`), so files whose + // `image_exif.content_hash` is still NULL — typically mid-backfill — + // are skipped this tick and picked up after the unhashed backlog + // drain populates the hash on a subsequent tick. Skipping is the + // correct call: queuing without a hash would either fall back to + // basename keying (the bug this refactor fixes) or fabricate one. let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); - let mut videos_needing_playlists = Vec::new(); + let video_dir = Path::new(&video_path_base); + let mut videos_needing_playlists: Vec = Vec::new(); + let mut hashless_video_count = 0usize; - for (file_path, _relative_path) in &files { - if file_types::is_video_file(file_path) { - // Construct expected playlist path - let playlist_filename = - format!("{}.m3u8", file_path.file_name().unwrap().to_string_lossy()); - let playlist_path = Path::new(&video_path_base).join(&playlist_filename); - - // Check if playlist needs (re)generation - if playlist_needs_generation(file_path, &playlist_path) { - videos_needing_playlists.push(file_path.clone()); - } + for (file_path, relative_path) in &files { + if !file_types::is_video_file(file_path) { + continue; + } + let Some(hash) = existing_exif.get(relative_path).and_then(|h| h.clone()) else { + hashless_video_count += 1; + continue; + }; + let playlist_path = hls_paths::playlist_for_hash(video_dir, &hash); + if playlist_needs_generation(file_path, &playlist_path) { + videos_needing_playlists.push(VideoToQueue { + video_path: file_path.clone(), + content_hash: hash, + }); } } - // Send queue request to playlist manager + if hashless_video_count > 0 { + debug!( + "Watcher tick for '{}': skipped {} video(s) with NULL content_hash (will retry after backfill)", + library.name, hashless_video_count + ); + } + if !videos_needing_playlists.is_empty() { playlist_manager.do_send(QueueVideosMessage { - video_paths: videos_needing_playlists, + videos: videos_needing_playlists, }); } @@ -962,6 +1038,33 @@ mod tests { assert!(playlist_needs_generation(&video, &playlist)); } + #[test] + fn is_hash_shard_accepts_only_two_hex_chars() { + assert!(is_hash_shard("ab")); + assert!(is_hash_shard("00")); + assert!(is_hash_shard("FF")); // ASCII hexdigit covers upper-case too + assert!(!is_hash_shard("a")); + assert!(!is_hash_shard("abc")); + assert!(!is_hash_shard("zz")); + assert!(!is_hash_shard("")); + assert!(!is_hash_shard("a/")); + } + + #[test] + fn is_full_hash_accepts_only_64_hex_chars() { + let h64 = "a".repeat(64); + assert!(is_full_hash(&h64)); + let mixed = format!("ab{}", "0".repeat(62)); + assert!(is_full_hash(&mixed)); + assert!(!is_full_hash(&"a".repeat(63))); + assert!(!is_full_hash(&"a".repeat(65))); + assert!(!is_full_hash(&format!("z{}", "a".repeat(63)))); + // Defends against operator stashing e.g. ".tmp" or "Plex" under + // VIDEO_PATH — neither passes the full-hash gate. + assert!(!is_full_hash(".tmp")); + assert!(!is_full_hash("Plex")); + } + #[test] fn playlist_needs_generation_true_when_video_missing_metadata() { // Video doesn't exist; metadata fails for it. Falls through to the