feature/hls-content-hash #95

Merged
cameron merged 10 commits from feature/hls-content-hash into master 2026-05-15 20:09:49 +00:00
5 changed files with 488 additions and 0 deletions
Showing only changes of commit 7cd1ea3cf8 - Show all commits

View File

@@ -424,6 +424,17 @@ pub trait ExifDao: Sync + Send {
context: &opentelemetry::Context,
) -> Result<Vec<String>, DbError>;
/// Every row in `image_exif` for `library_id`, as
/// `(rel_path, content_hash)`. The hash is Option because rows
/// mid-backfill carry NULL. Used by HLS readiness stats; callers
/// filter by extension client-side because the DB schema doesn't
/// carry media type.
fn list_paths_and_hashes_for_library(
&mut self,
context: &opentelemetry::Context,
library_id: i32,
) -> Result<Vec<(String, Option<String>)>, DbError>;
/// Return image_exif rows that need their `date_taken` resolved by the
/// canonical-date waterfall (see `crate::date_resolver`): `date_taken
/// IS NULL`. Returns `(library_id, rel_path)`. The caller filters to
@@ -1261,6 +1272,30 @@ impl ExifDao for SqliteExifDao {
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn list_paths_and_hashes_for_library(
&mut self,
context: &opentelemetry::Context,
lib_id: i32,
) -> Result<Vec<(String, Option<String>)>, DbError> {
trace_db_call(
context,
"query",
"list_paths_and_hashes_for_library",
|_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(library_id.eq(lib_id))
.select((rel_path, content_hash))
.load::<(String, Option<String>)>(connection.deref_mut())
.map_err(|_| anyhow::anyhow!("Query error"))
},
)
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_rows_needing_date_backfill(
&mut self,
context: &opentelemetry::Context,

View File

@@ -1696,6 +1696,14 @@ mod tests {
Ok(Vec::new())
}
fn list_paths_and_hashes_for_library(
&mut self,
_context: &opentelemetry::Context,
_library_id: i32,
) -> Result<Vec<(String, Option<String>)>, DbError> {
Ok(Vec::new())
}
fn get_rows_needing_date_backfill(
&mut self,
_context: &opentelemetry::Context,

410
src/hls_stats.rs Normal file
View File

@@ -0,0 +1,410 @@
//! Per-library HLS readiness: Prometheus gauges + `/hls/stats` endpoint.
//!
//! The new hash-keyed pipeline transcodes lazily — most of a freshly
//! mounted library is "pending" for the first hour, and operators want
//! a live read on "how much work is left, am I CPU-bound, do I need to
//! bump `HLS_CONCURRENCY`." This module supplies both surfaces against
//! the same compute path:
//!
//! - **Prometheus gauges** `imageserver_hls_videos_total{library}`,
//! `..._with_playlist{library}`, `..._pending{library}`,
//! `..._unsupported{library}`. Updated every watcher full-scan tick
//! and on every `/hls/stats` request, so the freshness matches
//! whichever surface the operator is watching.
//!
//! - **`GET /hls/stats`** returns a JSON snapshot of the same counts
//! plus a top-level cross-library aggregate. Claims-protected
//! (matches every other authenticated read in this crate).
//!
//! Cost is O(distinct video hashes per library), each row needing a
//! single `stat()` on the playlist file. On a 100k-video library that's
//! noticeable; on a typical home library (few thousand) it's noise.
//! We call from explicit triggers only — never per-request from
//! middleware — so the cost is bounded.
use std::collections::HashSet;
use std::path::Path;
use std::sync::{Arc, Mutex};
use actix_web::{HttpResponse, Responder, get, web};
use lazy_static::lazy_static;
use log::{info, warn};
use prometheus::IntGaugeVec;
use serde::Serialize;
use crate::data::Claims;
use crate::database::ExifDao;
use crate::file_types;
use crate::libraries::Library;
use crate::state::AppState;
use crate::video::hls_paths;
lazy_static! {
pub static ref HLS_VIDEOS_TOTAL: IntGaugeVec = IntGaugeVec::new(
prometheus::Opts::new(
"imageserver_hls_videos_total",
"Distinct video content hashes per library known to image_exif",
),
&["library"],
)
.expect("HLS_VIDEOS_TOTAL");
pub static ref HLS_VIDEOS_WITH_PLAYLIST: IntGaugeVec = IntGaugeVec::new(
prometheus::Opts::new(
"imageserver_hls_videos_with_playlist",
"Videos whose hash-keyed HLS playlist is already on disk",
),
&["library"],
)
.expect("HLS_VIDEOS_WITH_PLAYLIST");
pub static ref HLS_VIDEOS_PENDING: IntGaugeVec = IntGaugeVec::new(
prometheus::Opts::new(
"imageserver_hls_videos_pending",
"Videos whose hash-keyed HLS playlist is not yet on disk",
),
&["library"],
)
.expect("HLS_VIDEOS_PENDING");
pub static ref HLS_VIDEOS_UNSUPPORTED: IntGaugeVec = IntGaugeVec::new(
prometheus::Opts::new(
"imageserver_hls_videos_unsupported",
"Videos with an `.unsupported` sentinel — ffmpeg refused; \
operator must delete to retry",
),
&["library"],
)
.expect("HLS_VIDEOS_UNSUPPORTED");
}
/// Per-library HLS readiness snapshot.
#[derive(Serialize, Debug, Clone, PartialEq, Eq)]
pub struct HlsLibraryStats {
pub library_id: i32,
pub library: String,
/// Distinct video content hashes (dedupes intra-library bytes-at-N-paths).
pub total: usize,
/// Of `total`, hashes whose `playlist.m3u8` is on disk.
pub with_playlist: usize,
/// Of `total`, hashes whose ffmpeg attempt left a `.unsupported`
/// sentinel. Counted separately because they won't progress without
/// operator intervention (delete the sentinel to retry).
pub unsupported: usize,
/// `total - (with_playlist + unsupported)` — videos awaiting transcode.
pub pending: usize,
/// Distinct rel_paths under this library that are video files but
/// whose `image_exif.content_hash` is still NULL (mid-backfill).
/// These don't yet count toward `total` because they're invisible
/// to the hash-keyed pipeline; surfaced so the operator can see
/// "hash backfill, then transcode" pipeline depth.
pub hashless_videos: usize,
}
/// JSON response body for `GET /hls/stats`.
#[derive(Serialize, Debug)]
pub struct HlsStatsResponse {
pub libraries: Vec<HlsLibraryStats>,
pub total: usize,
pub with_playlist: usize,
pub pending: usize,
pub unsupported: usize,
pub hashless_videos: usize,
}
/// Compute current readiness per library and publish to Prometheus.
/// Returns the same data so callers can serialise it. The publish step
/// is idempotent on the gauge — old values get overwritten.
pub fn compute_and_publish(
libraries: &[Library],
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
video_dir: &Path,
) -> Vec<HlsLibraryStats> {
let ctx = opentelemetry::Context::new();
let mut out = Vec::with_capacity(libraries.len());
for lib in libraries {
let stats = compute_for_library(&ctx, lib, exif_dao, video_dir);
publish_gauges(&stats);
out.push(stats);
}
out
}
fn publish_gauges(s: &HlsLibraryStats) {
HLS_VIDEOS_TOTAL
.with_label_values(&[s.library.as_str()])
.set(s.total as i64);
HLS_VIDEOS_WITH_PLAYLIST
.with_label_values(&[s.library.as_str()])
.set(s.with_playlist as i64);
HLS_VIDEOS_PENDING
.with_label_values(&[s.library.as_str()])
.set(s.pending as i64);
HLS_VIDEOS_UNSUPPORTED
.with_label_values(&[s.library.as_str()])
.set(s.unsupported as i64);
}
fn compute_for_library(
ctx: &opentelemetry::Context,
lib: &Library,
exif_dao: &Arc<Mutex<Box<dyn ExifDao>>>,
video_dir: &Path,
) -> HlsLibraryStats {
let rows = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
match dao.list_paths_and_hashes_for_library(ctx, lib.id) {
Ok(r) => r,
Err(e) => {
warn!(
"hls_stats: list_paths_and_hashes_for_library failed for lib {}: {:?}",
lib.id, e
);
Vec::new()
}
}
};
stats_from_rows(lib, &rows, video_dir)
}
/// Pure function — same compute as [`compute_for_library`] but works
/// on caller-supplied rows. Split out so tests don't need a full
/// `ExifDao` mock; the integration path is exercised through
/// `compute_and_publish` against the real SQLite DAO at runtime.
fn stats_from_rows(
lib: &Library,
rows: &[(String, Option<String>)],
video_dir: &Path,
) -> HlsLibraryStats {
let mut hashes: HashSet<String> = HashSet::new();
let mut hashless_videos = 0usize;
for (rel_path, hash_opt) in rows {
if !file_types::is_video_file(Path::new(rel_path)) {
continue;
}
match hash_opt {
Some(h) => {
hashes.insert(h.clone());
}
None => {
hashless_videos += 1;
}
}
}
let mut with_playlist = 0usize;
let mut unsupported = 0usize;
for h in &hashes {
if hls_paths::playlist_for_hash(video_dir, h).exists() {
with_playlist += 1;
} else if hls_paths::sentinel_for_hash(video_dir, h).exists() {
unsupported += 1;
}
}
let total = hashes.len();
let pending = total.saturating_sub(with_playlist + unsupported);
HlsLibraryStats {
library_id: lib.id,
library: lib.name.clone(),
total,
with_playlist,
unsupported,
pending,
hashless_videos,
}
}
/// Log a single info line summarising readiness across all libraries.
/// Called by the watcher at the end of a full-scan tick so operators
/// who tail the log see the headline number without scraping
/// Prometheus.
pub fn log_summary(stats: &[HlsLibraryStats]) {
let total: usize = stats.iter().map(|s| s.total).sum();
let with_playlist: usize = stats.iter().map(|s| s.with_playlist).sum();
let pending: usize = stats.iter().map(|s| s.pending).sum();
let unsupported: usize = stats.iter().map(|s| s.unsupported).sum();
let hashless: usize = stats.iter().map(|s| s.hashless_videos).sum();
let per_lib: Vec<String> = stats
.iter()
.map(|s| {
format!(
"{}={}/{} pending={} unsupported={} hashless={}",
s.library, s.with_playlist, s.total, s.pending, s.unsupported, s.hashless_videos,
)
})
.collect();
info!(
"HLS readiness: {}/{} playlists on disk, {} pending, {} unsupported, {} hashless videos | per-library: [{}]",
with_playlist,
total,
pending,
unsupported,
hashless,
per_lib.join(", "),
);
}
#[get("/hls/stats")]
pub async fn hls_stats_handler(
_claims: Claims,
app_state: web::Data<AppState>,
exif_dao: web::Data<Mutex<Box<dyn ExifDao>>>,
) -> impl Responder {
let libraries = app_state.libraries.clone();
let video_dir = std::path::PathBuf::from(&app_state.video_path);
let exif_dao = exif_dao.into_inner();
// Synchronous file IO + DB query — run on a blocking pool so the
// actix worker thread stays free for other requests.
let stats = match web::block(move || compute_and_publish(&libraries, &exif_dao, &video_dir))
.await
{
Ok(s) => s,
Err(e) => {
warn!("/hls/stats: blocking task failed: {:?}", e);
Vec::new()
}
};
let total: usize = stats.iter().map(|s| s.total).sum();
let with_playlist: usize = stats.iter().map(|s| s.with_playlist).sum();
let pending: usize = stats.iter().map(|s| s.pending).sum();
let unsupported: usize = stats.iter().map(|s| s.unsupported).sum();
let hashless_videos: usize = stats.iter().map(|s| s.hashless_videos).sum();
HttpResponse::Ok().json(HlsStatsResponse {
libraries: stats,
total,
with_playlist,
pending,
unsupported,
hashless_videos,
})
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn lib(id: i32, name: &str) -> Library {
Library {
id,
name: name.into(),
root_path: String::new(),
enabled: true,
excluded_dirs: Vec::new(),
}
}
fn rows(vs: Vec<(&str, Option<&str>)>) -> Vec<(String, Option<String>)> {
vs.into_iter()
.map(|(p, h)| (p.to_string(), h.map(|s| s.to_string())))
.collect()
}
fn touch(dir: &Path, rel: &str) {
let p = dir.join(rel);
std::fs::create_dir_all(p.parent().unwrap()).unwrap();
std::fs::write(p, b"").unwrap();
}
#[test]
fn videos_only_count_in_total() {
let tmp = tempdir().unwrap();
let r = rows(vec![
("photos/IMG.jpg", Some(&"a".repeat(64))), // image: ignored
("clip.mp4", Some(&"b".repeat(64))),
("vid.mov", Some(&"c".repeat(64))),
]);
let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path());
assert_eq!(stats.total, 2);
assert_eq!(stats.with_playlist, 0);
assert_eq!(stats.pending, 2);
assert_eq!(stats.unsupported, 0);
assert_eq!(stats.hashless_videos, 0);
}
#[test]
fn hash_dedup_collapses_duplicate_rel_paths() {
let tmp = tempdir().unwrap();
let r = rows(vec![
("a/clip.mp4", Some(&"a".repeat(64))),
("b/clip.mp4", Some(&"a".repeat(64))), // same bytes, dup
("other.mp4", Some(&"b".repeat(64))),
]);
let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path());
assert_eq!(stats.total, 2, "duplicate hashes collapse");
}
#[test]
fn playlist_existence_promotes_to_with_playlist() {
let tmp = tempdir().unwrap();
let hash = "a".repeat(64);
touch(tmp.path(), &format!("aa/{}/playlist.m3u8", hash));
let r = rows(vec![("clip.mp4", Some(&hash))]);
let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path());
assert_eq!(stats.total, 1);
assert_eq!(stats.with_playlist, 1);
assert_eq!(stats.pending, 0);
}
#[test]
fn sentinel_existence_promotes_to_unsupported() {
let tmp = tempdir().unwrap();
let hash = "b".repeat(64);
touch(tmp.path(), &format!("bb/{}/playlist.unsupported", hash));
let r = rows(vec![("clip.mov", Some(&hash))]);
let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path());
assert_eq!(stats.total, 1);
assert_eq!(stats.unsupported, 1);
assert_eq!(stats.with_playlist, 0);
assert_eq!(stats.pending, 0);
}
#[test]
fn null_hash_videos_are_hashless_not_total() {
let tmp = tempdir().unwrap();
let r = rows(vec![
("clip.mp4", None),
("other.mp4", Some(&"a".repeat(64))),
]);
let stats = stats_from_rows(&lib(1, "main"), &r, tmp.path());
assert_eq!(stats.total, 1, "hashless row excluded from total");
assert_eq!(stats.hashless_videos, 1);
}
#[test]
fn publish_gauges_sets_per_library_value() {
let s = HlsLibraryStats {
library_id: 7,
library: "test_publish_a".into(),
total: 5,
with_playlist: 2,
pending: 3,
unsupported: 0,
hashless_videos: 0,
};
publish_gauges(&s);
assert_eq!(
HLS_VIDEOS_TOTAL
.with_label_values(&["test_publish_a"])
.get(),
5
);
assert_eq!(
HLS_VIDEOS_PENDING
.with_label_values(&["test_publish_a"])
.get(),
3
);
assert_eq!(
HLS_VIDEOS_WITH_PLAYLIST
.with_label_values(&["test_publish_a"])
.get(),
2
);
}
}

View File

@@ -45,6 +45,7 @@ mod file_types;
mod files;
mod geo;
mod handlers;
mod hls_stats;
mod libraries;
mod library_maintenance;
mod perceptual_hash;
@@ -126,6 +127,24 @@ fn main() -> std::io::Result<()> {
.registry
.register(Box::new(thumbnails::VIDEO_GAUGE.clone()))
.unwrap();
// HLS readiness gauges. Updated by the watcher every full-scan
// tick and on every `/hls/stats` request. See `hls_stats`.
prometheus
.registry
.register(Box::new(hls_stats::HLS_VIDEOS_TOTAL.clone()))
.unwrap();
prometheus
.registry
.register(Box::new(hls_stats::HLS_VIDEOS_WITH_PLAYLIST.clone()))
.unwrap();
prometheus
.registry
.register(Box::new(hls_stats::HLS_VIDEOS_PENDING.clone()))
.unwrap();
prometheus
.registry
.register(Box::new(hls_stats::HLS_VIDEOS_UNSUPPORTED.clone()))
.unwrap();
let app_state = app_data.clone();
@@ -270,6 +289,7 @@ fn main() -> std::io::Result<()> {
.service(handlers::video::get_video_preview)
.service(handlers::video::get_preview_status)
.service(handlers::video::get_video_part)
.service(hls_stats::hls_stats_handler)
.service(handlers::favorites::favorites)
.service(handlers::favorites::put_add_favorite)
.service(handlers::favorites::delete_favorite)

View File

@@ -32,6 +32,7 @@ use crate::exif;
use crate::face_watch;
use crate::faces;
use crate::file_types;
use crate::hls_stats;
use crate::libraries;
use crate::library_maintenance;
use crate::perceptual_hash;
@@ -580,6 +581,20 @@ pub fn watch_files(
}
if is_full_scan {
// End-of-full-scan HLS readiness summary: log a single
// info line + refresh the Prometheus gauges. Skipped on
// quick scans because the cost is non-trivial on big
// libraries and the data only meaningfully changes on
// full passes.
let video_dir_str =
dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
let stats = hls_stats::compute_and_publish(
&libs,
&exif_dao,
Path::new(&video_dir_str),
);
hls_stats::log_summary(&stats);
last_full_scan = now;
}
last_quick_scan = now;