diff --git a/src/database/mod.rs b/src/database/mod.rs index d5a4d4a..873b662 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -424,6 +424,17 @@ pub trait ExifDao: Sync + Send { context: &opentelemetry::Context, ) -> Result, DbError>; + /// Every row in `image_exif` for `library_id`, as + /// `(rel_path, content_hash)`. The hash is Option because rows + /// mid-backfill carry NULL. Used by HLS readiness stats; callers + /// filter by extension client-side because the DB schema doesn't + /// carry media type. + fn list_paths_and_hashes_for_library( + &mut self, + context: &opentelemetry::Context, + library_id: i32, + ) -> Result)>, DbError>; + /// Return image_exif rows that need their `date_taken` resolved by the /// canonical-date waterfall (see `crate::date_resolver`): `date_taken /// IS NULL`. Returns `(library_id, rel_path)`. The caller filters to @@ -1261,6 +1272,30 @@ impl ExifDao for SqliteExifDao { .map_err(|_| DbError::new(DbErrorKind::QueryError)) } + fn list_paths_and_hashes_for_library( + &mut self, + context: &opentelemetry::Context, + lib_id: i32, + ) -> Result)>, DbError> { + trace_db_call( + context, + "query", + "list_paths_and_hashes_for_library", + |_span| { + use schema::image_exif::dsl::*; + + let mut connection = self.connection.lock().expect("Unable to get ExifDao"); + + image_exif + .filter(library_id.eq(lib_id)) + .select((rel_path, content_hash)) + .load::<(String, Option)>(connection.deref_mut()) + .map_err(|_| anyhow::anyhow!("Query error")) + }, + ) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + fn get_rows_needing_date_backfill( &mut self, context: &opentelemetry::Context, diff --git a/src/files.rs b/src/files.rs index 93d4345..91904e3 100644 --- a/src/files.rs +++ b/src/files.rs @@ -1696,6 +1696,14 @@ mod tests { Ok(Vec::new()) } + fn list_paths_and_hashes_for_library( + &mut self, + _context: &opentelemetry::Context, + _library_id: i32, + ) -> Result)>, DbError> { + Ok(Vec::new()) + } + fn get_rows_needing_date_backfill( &mut self, _context: &opentelemetry::Context, diff --git a/src/hls_stats.rs b/src/hls_stats.rs new file mode 100644 index 0000000..3e7fb03 --- /dev/null +++ b/src/hls_stats.rs @@ -0,0 +1,410 @@ +//! Per-library HLS readiness: Prometheus gauges + `/hls/stats` endpoint. +//! +//! The new hash-keyed pipeline transcodes lazily — most of a freshly +//! mounted library is "pending" for the first hour, and operators want +//! a live read on "how much work is left, am I CPU-bound, do I need to +//! bump `HLS_CONCURRENCY`." This module supplies both surfaces against +//! the same compute path: +//! +//! - **Prometheus gauges** `imageserver_hls_videos_total{library}`, +//! `..._with_playlist{library}`, `..._pending{library}`, +//! `..._unsupported{library}`. Updated every watcher full-scan tick +//! and on every `/hls/stats` request, so the freshness matches +//! whichever surface the operator is watching. +//! +//! - **`GET /hls/stats`** returns a JSON snapshot of the same counts +//! plus a top-level cross-library aggregate. Claims-protected +//! (matches every other authenticated read in this crate). +//! +//! Cost is O(distinct video hashes per library), each row needing a +//! single `stat()` on the playlist file. On a 100k-video library that's +//! noticeable; on a typical home library (few thousand) it's noise. +//! We call from explicit triggers only — never per-request from +//! middleware — so the cost is bounded. + +use std::collections::HashSet; +use std::path::Path; +use std::sync::{Arc, Mutex}; + +use actix_web::{HttpResponse, Responder, get, web}; +use lazy_static::lazy_static; +use log::{info, warn}; +use prometheus::IntGaugeVec; +use serde::Serialize; + +use crate::data::Claims; +use crate::database::ExifDao; +use crate::file_types; +use crate::libraries::Library; +use crate::state::AppState; +use crate::video::hls_paths; + +lazy_static! { + pub static ref HLS_VIDEOS_TOTAL: IntGaugeVec = IntGaugeVec::new( + prometheus::Opts::new( + "imageserver_hls_videos_total", + "Distinct video content hashes per library known to image_exif", + ), + &["library"], + ) + .expect("HLS_VIDEOS_TOTAL"); + pub static ref HLS_VIDEOS_WITH_PLAYLIST: IntGaugeVec = IntGaugeVec::new( + prometheus::Opts::new( + "imageserver_hls_videos_with_playlist", + "Videos whose hash-keyed HLS playlist is already on disk", + ), + &["library"], + ) + .expect("HLS_VIDEOS_WITH_PLAYLIST"); + pub static ref HLS_VIDEOS_PENDING: IntGaugeVec = IntGaugeVec::new( + prometheus::Opts::new( + "imageserver_hls_videos_pending", + "Videos whose hash-keyed HLS playlist is not yet on disk", + ), + &["library"], + ) + .expect("HLS_VIDEOS_PENDING"); + pub static ref HLS_VIDEOS_UNSUPPORTED: IntGaugeVec = IntGaugeVec::new( + prometheus::Opts::new( + "imageserver_hls_videos_unsupported", + "Videos with an `.unsupported` sentinel — ffmpeg refused; \ + operator must delete to retry", + ), + &["library"], + ) + .expect("HLS_VIDEOS_UNSUPPORTED"); +} + +/// Per-library HLS readiness snapshot. +#[derive(Serialize, Debug, Clone, PartialEq, Eq)] +pub struct HlsLibraryStats { + pub library_id: i32, + pub library: String, + /// Distinct video content hashes (dedupes intra-library bytes-at-N-paths). + pub total: usize, + /// Of `total`, hashes whose `playlist.m3u8` is on disk. + pub with_playlist: usize, + /// Of `total`, hashes whose ffmpeg attempt left a `.unsupported` + /// sentinel. Counted separately because they won't progress without + /// operator intervention (delete the sentinel to retry). + pub unsupported: usize, + /// `total - (with_playlist + unsupported)` — videos awaiting transcode. + pub pending: usize, + /// Distinct rel_paths under this library that are video files but + /// whose `image_exif.content_hash` is still NULL (mid-backfill). + /// These don't yet count toward `total` because they're invisible + /// to the hash-keyed pipeline; surfaced so the operator can see + /// "hash backfill, then transcode" pipeline depth. + pub hashless_videos: usize, +} + +/// JSON response body for `GET /hls/stats`. +#[derive(Serialize, Debug)] +pub struct HlsStatsResponse { + pub libraries: Vec, + pub total: usize, + pub with_playlist: usize, + pub pending: usize, + pub unsupported: usize, + pub hashless_videos: usize, +} + +/// Compute current readiness per library and publish to Prometheus. +/// Returns the same data so callers can serialise it. The publish step +/// is idempotent on the gauge — old values get overwritten. +pub fn compute_and_publish( + libraries: &[Library], + exif_dao: &Arc>>, + video_dir: &Path, +) -> Vec { + let ctx = opentelemetry::Context::new(); + let mut out = Vec::with_capacity(libraries.len()); + for lib in libraries { + let stats = compute_for_library(&ctx, lib, exif_dao, video_dir); + publish_gauges(&stats); + out.push(stats); + } + out +} + +fn publish_gauges(s: &HlsLibraryStats) { + HLS_VIDEOS_TOTAL + .with_label_values(&[s.library.as_str()]) + .set(s.total as i64); + HLS_VIDEOS_WITH_PLAYLIST + .with_label_values(&[s.library.as_str()]) + .set(s.with_playlist as i64); + HLS_VIDEOS_PENDING + .with_label_values(&[s.library.as_str()]) + .set(s.pending as i64); + HLS_VIDEOS_UNSUPPORTED + .with_label_values(&[s.library.as_str()]) + .set(s.unsupported as i64); +} + +fn compute_for_library( + ctx: &opentelemetry::Context, + lib: &Library, + exif_dao: &Arc>>, + video_dir: &Path, +) -> HlsLibraryStats { + let rows = { + let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); + match dao.list_paths_and_hashes_for_library(ctx, lib.id) { + Ok(r) => r, + Err(e) => { + warn!( + "hls_stats: list_paths_and_hashes_for_library failed for lib {}: {:?}", + lib.id, e + ); + Vec::new() + } + } + }; + stats_from_rows(lib, &rows, video_dir) +} + +/// Pure function — same compute as [`compute_for_library`] but works +/// on caller-supplied rows. Split out so tests don't need a full +/// `ExifDao` mock; the integration path is exercised through +/// `compute_and_publish` against the real SQLite DAO at runtime. +fn stats_from_rows( + lib: &Library, + rows: &[(String, Option)], + video_dir: &Path, +) -> HlsLibraryStats { + let mut hashes: HashSet = HashSet::new(); + let mut hashless_videos = 0usize; + for (rel_path, hash_opt) in rows { + if !file_types::is_video_file(Path::new(rel_path)) { + continue; + } + match hash_opt { + Some(h) => { + hashes.insert(h.clone()); + } + None => { + hashless_videos += 1; + } + } + } + + let mut with_playlist = 0usize; + let mut unsupported = 0usize; + for h in &hashes { + if hls_paths::playlist_for_hash(video_dir, h).exists() { + with_playlist += 1; + } else if hls_paths::sentinel_for_hash(video_dir, h).exists() { + unsupported += 1; + } + } + let total = hashes.len(); + let pending = total.saturating_sub(with_playlist + unsupported); + + HlsLibraryStats { + library_id: lib.id, + library: lib.name.clone(), + total, + with_playlist, + unsupported, + pending, + hashless_videos, + } +} + +/// Log a single info line summarising readiness across all libraries. +/// Called by the watcher at the end of a full-scan tick so operators +/// who tail the log see the headline number without scraping +/// Prometheus. +pub fn log_summary(stats: &[HlsLibraryStats]) { + let total: usize = stats.iter().map(|s| s.total).sum(); + let with_playlist: usize = stats.iter().map(|s| s.with_playlist).sum(); + let pending: usize = stats.iter().map(|s| s.pending).sum(); + let unsupported: usize = stats.iter().map(|s| s.unsupported).sum(); + let hashless: usize = stats.iter().map(|s| s.hashless_videos).sum(); + + let per_lib: Vec = stats + .iter() + .map(|s| { + format!( + "{}={}/{} pending={} unsupported={} hashless={}", + s.library, s.with_playlist, s.total, s.pending, s.unsupported, s.hashless_videos, + ) + }) + .collect(); + + info!( + "HLS readiness: {}/{} playlists on disk, {} pending, {} unsupported, {} hashless videos | per-library: [{}]", + with_playlist, + total, + pending, + unsupported, + hashless, + per_lib.join(", "), + ); +} + +#[get("/hls/stats")] +pub async fn hls_stats_handler( + _claims: Claims, + app_state: web::Data, + exif_dao: web::Data>>, +) -> impl Responder { + let libraries = app_state.libraries.clone(); + let video_dir = std::path::PathBuf::from(&app_state.video_path); + let exif_dao = exif_dao.into_inner(); + + // Synchronous file IO + DB query — run on a blocking pool so the + // actix worker thread stays free for other requests. + let stats = match web::block(move || compute_and_publish(&libraries, &exif_dao, &video_dir)) + .await + { + Ok(s) => s, + Err(e) => { + warn!("/hls/stats: blocking task failed: {:?}", e); + Vec::new() + } + }; + + let total: usize = stats.iter().map(|s| s.total).sum(); + let with_playlist: usize = stats.iter().map(|s| s.with_playlist).sum(); + let pending: usize = stats.iter().map(|s| s.pending).sum(); + let unsupported: usize = stats.iter().map(|s| s.unsupported).sum(); + let hashless_videos: usize = stats.iter().map(|s| s.hashless_videos).sum(); + + HttpResponse::Ok().json(HlsStatsResponse { + libraries: stats, + total, + with_playlist, + pending, + unsupported, + hashless_videos, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + fn lib(id: i32, name: &str) -> Library { + Library { + id, + name: name.into(), + root_path: String::new(), + enabled: true, + excluded_dirs: Vec::new(), + } + } + + fn rows(vs: Vec<(&str, Option<&str>)>) -> Vec<(String, Option)> { + vs.into_iter() + .map(|(p, h)| (p.to_string(), h.map(|s| s.to_string()))) + .collect() + } + + fn touch(dir: &Path, rel: &str) { + let p = dir.join(rel); + std::fs::create_dir_all(p.parent().unwrap()).unwrap(); + std::fs::write(p, b"").unwrap(); + } + + #[test] + fn videos_only_count_in_total() { + let tmp = tempdir().unwrap(); + let r = rows(vec![ + ("photos/IMG.jpg", Some(&"a".repeat(64))), // image: ignored + ("clip.mp4", Some(&"b".repeat(64))), + ("vid.mov", Some(&"c".repeat(64))), + ]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 2); + assert_eq!(stats.with_playlist, 0); + assert_eq!(stats.pending, 2); + assert_eq!(stats.unsupported, 0); + assert_eq!(stats.hashless_videos, 0); + } + + #[test] + fn hash_dedup_collapses_duplicate_rel_paths() { + let tmp = tempdir().unwrap(); + let r = rows(vec![ + ("a/clip.mp4", Some(&"a".repeat(64))), + ("b/clip.mp4", Some(&"a".repeat(64))), // same bytes, dup + ("other.mp4", Some(&"b".repeat(64))), + ]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 2, "duplicate hashes collapse"); + } + + #[test] + fn playlist_existence_promotes_to_with_playlist() { + let tmp = tempdir().unwrap(); + let hash = "a".repeat(64); + touch(tmp.path(), &format!("aa/{}/playlist.m3u8", hash)); + + let r = rows(vec![("clip.mp4", Some(&hash))]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 1); + assert_eq!(stats.with_playlist, 1); + assert_eq!(stats.pending, 0); + } + + #[test] + fn sentinel_existence_promotes_to_unsupported() { + let tmp = tempdir().unwrap(); + let hash = "b".repeat(64); + touch(tmp.path(), &format!("bb/{}/playlist.unsupported", hash)); + + let r = rows(vec![("clip.mov", Some(&hash))]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 1); + assert_eq!(stats.unsupported, 1); + assert_eq!(stats.with_playlist, 0); + assert_eq!(stats.pending, 0); + } + + #[test] + fn null_hash_videos_are_hashless_not_total() { + let tmp = tempdir().unwrap(); + let r = rows(vec![ + ("clip.mp4", None), + ("other.mp4", Some(&"a".repeat(64))), + ]); + let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path()); + assert_eq!(stats.total, 1, "hashless row excluded from total"); + assert_eq!(stats.hashless_videos, 1); + } + + #[test] + fn publish_gauges_sets_per_library_value() { + let s = HlsLibraryStats { + library_id: 7, + library: "test_publish_a".into(), + total: 5, + with_playlist: 2, + pending: 3, + unsupported: 0, + hashless_videos: 0, + }; + publish_gauges(&s); + assert_eq!( + HLS_VIDEOS_TOTAL + .with_label_values(&["test_publish_a"]) + .get(), + 5 + ); + assert_eq!( + HLS_VIDEOS_PENDING + .with_label_values(&["test_publish_a"]) + .get(), + 3 + ); + assert_eq!( + HLS_VIDEOS_WITH_PLAYLIST + .with_label_values(&["test_publish_a"]) + .get(), + 2 + ); + } +} diff --git a/src/main.rs b/src/main.rs index cf493b7..620235e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,6 +45,7 @@ mod file_types; mod files; mod geo; mod handlers; +mod hls_stats; mod libraries; mod library_maintenance; mod perceptual_hash; @@ -126,6 +127,24 @@ fn main() -> std::io::Result<()> { .registry .register(Box::new(thumbnails::VIDEO_GAUGE.clone())) .unwrap(); + // HLS readiness gauges. Updated by the watcher every full-scan + // tick and on every `/hls/stats` request. See `hls_stats`. + prometheus + .registry + .register(Box::new(hls_stats::HLS_VIDEOS_TOTAL.clone())) + .unwrap(); + prometheus + .registry + .register(Box::new(hls_stats::HLS_VIDEOS_WITH_PLAYLIST.clone())) + .unwrap(); + prometheus + .registry + .register(Box::new(hls_stats::HLS_VIDEOS_PENDING.clone())) + .unwrap(); + prometheus + .registry + .register(Box::new(hls_stats::HLS_VIDEOS_UNSUPPORTED.clone())) + .unwrap(); let app_state = app_data.clone(); @@ -270,6 +289,7 @@ fn main() -> std::io::Result<()> { .service(handlers::video::get_video_preview) .service(handlers::video::get_preview_status) .service(handlers::video::get_video_part) + .service(hls_stats::hls_stats_handler) .service(handlers::favorites::favorites) .service(handlers::favorites::put_add_favorite) .service(handlers::favorites::delete_favorite) diff --git a/src/watcher.rs b/src/watcher.rs index 6182ddc..6f04855 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -32,6 +32,7 @@ use crate::exif; use crate::face_watch; use crate::faces; use crate::file_types; +use crate::hls_stats; use crate::libraries; use crate::library_maintenance; use crate::perceptual_hash; @@ -580,6 +581,20 @@ pub fn watch_files( } if is_full_scan { + // End-of-full-scan HLS readiness summary: log a single + // info line + refresh the Prometheus gauges. Skipped on + // quick scans because the cost is non-trivial on big + // libraries and the data only meaningfully changes on + // full passes. + let video_dir_str = + dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); + let stats = hls_stats::compute_and_publish( + &libs, + &exif_dao, + Path::new(&video_dir_str), + ); + hls_stats::log_summary(&stats); + last_full_scan = now; } last_quick_scan = now;