feature/hls-content-hash #95

Merged
cameron merged 10 commits from feature/hls-content-hash into master 2026-05-15 20:09:49 +00:00
3 changed files with 220 additions and 115 deletions
Showing only changes of commit b8e17e05b7 - Show all commits

View File

@@ -414,6 +414,16 @@ pub trait ExifDao: Sync + Send {
size_bytes: i64, size_bytes: i64,
) -> Result<(), DbError>; ) -> Result<(), DbError>;
/// Every distinct non-NULL `content_hash` across all libraries. Used
/// by HLS orphan cleanup to identify hash dirs under `$VIDEO_PATH`
/// whose source video no longer exists. Cheap query (single column,
/// indexed) but unbounded in size — the result is a HashSet membership
/// check, so a 100k-photo library produces ~100k strings.
fn list_distinct_content_hashes(
&mut self,
context: &opentelemetry::Context,
) -> Result<Vec<String>, DbError>;
/// Return image_exif rows that need their `date_taken` resolved by the /// Return image_exif rows that need their `date_taken` resolved by the
/// canonical-date waterfall (see `crate::date_resolver`): `date_taken /// canonical-date waterfall (see `crate::date_resolver`): `date_taken
/// IS NULL`. Returns `(library_id, rel_path)`. The caller filters to /// IS NULL`. Returns `(library_id, rel_path)`. The caller filters to
@@ -1231,6 +1241,26 @@ impl ExifDao for SqliteExifDao {
.map_err(|_| DbError::new(DbErrorKind::UpdateError)) .map_err(|_| DbError::new(DbErrorKind::UpdateError))
} }
fn list_distinct_content_hashes(
&mut self,
context: &opentelemetry::Context,
) -> Result<Vec<String>, DbError> {
trace_db_call(context, "query", "list_distinct_content_hashes", |_span| {
use schema::image_exif::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get ExifDao");
image_exif
.filter(content_hash.is_not_null())
.select(content_hash)
.distinct()
.load::<Option<String>>(connection.deref_mut())
.map(|rows| rows.into_iter().flatten().collect())
.map_err(|_| anyhow::anyhow!("Query error"))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_rows_needing_date_backfill( fn get_rows_needing_date_backfill(
&mut self, &mut self,
context: &opentelemetry::Context, context: &opentelemetry::Context,

View File

@@ -1689,6 +1689,13 @@ mod tests {
Ok(()) Ok(())
} }
fn list_distinct_content_hashes(
&mut self,
_context: &opentelemetry::Context,
) -> Result<Vec<String>, DbError> {
Ok(Vec::new())
}
fn get_rows_needing_date_backfill( fn get_rows_needing_date_backfill(
&mut self, &mut self,
_context: &opentelemetry::Context, _context: &opentelemetry::Context,

View File

@@ -22,7 +22,6 @@ use std::time::{Duration, SystemTime};
use actix::Addr; use actix::Addr;
use chrono::Utc; use chrono::Utc;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use walkdir::WalkDir;
use crate::backfill; use crate::backfill;
use crate::content_hash; use crate::content_hash;
@@ -45,18 +44,29 @@ use crate::video::actors::{
}; };
use crate::video::hls_paths; use crate::video::hls_paths;
/// Clean up orphaned HLS playlists and segments whose source videos no longer exist. /// Clean up orphaned HLS hash directories under `$VIDEO_PATH` whose
/// content_hash no longer appears in `image_exif`.
///
/// Walks `<video_path>/<shard>/<hash>/` — the layout written by the
/// hash-keyed `PlaylistGenerator` — and deletes any hash directory whose
/// hash isn't in the current DISTINCT set of `image_exif.content_hash`
/// values. Empty shard parents are reaped on the same pass.
///
/// Legacy basename-keyed files at `$VIDEO_PATH` root (from the
/// pre-content-hash layout) are left alone here; the one-shot startup
/// migration is responsible for retiring those.
/// ///
/// `libs_lock` is the shared live view of the libraries table — read at the /// `libs_lock` is the shared live view of the libraries table — read at the
/// top of each cleanup pass so a PATCH /libraries/{id} that disables or /// top of each cleanup pass so a PATCH /libraries/{id} that disables or
/// re-mounts a library is picked up without a restart. /// re-mounts a library is picked up without a restart.
pub fn cleanup_orphaned_playlists( pub fn cleanup_orphaned_playlists(
libs_lock: Arc<RwLock<Vec<libraries::Library>>>, libs_lock: Arc<RwLock<Vec<libraries::Library>>>,
excluded_dirs: Vec<String>, _excluded_dirs: Vec<String>,
library_health: libraries::LibraryHealthMap, library_health: libraries::LibraryHealthMap,
) { ) {
std::thread::spawn(move || { std::thread::spawn(move || {
let video_path = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); let video_path_str = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set");
let video_path = PathBuf::from(&video_path_str);
// Get cleanup interval from environment (default: 24 hours) // Get cleanup interval from environment (default: 24 hours)
let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS") let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS")
@@ -64,18 +74,13 @@ pub fn cleanup_orphaned_playlists(
.and_then(|s| s.parse::<u64>().ok()) .and_then(|s| s.parse::<u64>().ok())
.unwrap_or(86400); // 24 hours .unwrap_or(86400); // 24 hours
info!("Starting orphaned playlist cleanup job"); info!("Starting orphaned HLS cleanup job");
info!(" Cleanup interval: {} seconds", cleanup_interval_secs); info!(" Cleanup interval: {} seconds", cleanup_interval_secs);
info!(" Playlist directory: {}", video_path); info!(" HLS directory: {}", video_path.display());
{
let libs = libs_lock.read().unwrap_or_else(|e| e.into_inner()); let exif_dao: Arc<Mutex<Box<dyn ExifDao>>> = Arc::new(Mutex::new(
for lib in libs.iter() { Box::new(SqliteExifDao::new()) as Box<dyn ExifDao>
info!( ));
" Checking sources under '{}' at {}",
lib.name, lib.root_path
);
}
}
loop { loop {
std::thread::sleep(Duration::from_secs(cleanup_interval_secs)); std::thread::sleep(Duration::from_secs(cleanup_interval_secs));
@@ -86,22 +91,27 @@ pub fn cleanup_orphaned_playlists(
let libs: Vec<libraries::Library> = let libs: Vec<libraries::Library> =
libs_lock.read().unwrap_or_else(|e| e.into_inner()).clone(); libs_lock.read().unwrap_or_else(|e| e.into_inner()).clone();
// Safety gate: skip the cleanup cycle if any library is // Safety gate: skip the cleanup cycle if any (enabled)
// stale. A missing source video on a stale library is // library is stale. With hash-keyed layout the orphan
// indistinguishable from a transient unmount, and the // decision is a pure DB query, but the upstream
// cleanup is destructive — we'd rather leak a few playlist // missing-file scan that *removes* image_exif rows already
// files for a tick than delete one whose source is briefly // pauses for stale libraries — so a stale tick can hold
// unreachable. The cycle re-runs on the next interval. // hashes alive that would otherwise have been GC'd. The
// safety is then mostly belt-and-suspenders: a hash that
// should have been retired is just kept one tick longer.
// We'd rather leak a few hash dirs for 24h than wipe a
// hash dir whose source was briefly unreachable.
{ {
let guard = library_health.read().unwrap_or_else(|e| e.into_inner()); let guard = library_health.read().unwrap_or_else(|e| e.into_inner());
let stale: Vec<String> = libs let stale: Vec<String> = libs
.iter() .iter()
.filter(|lib| lib.enabled)
.filter(|lib| guard.get(&lib.id).map(|h| !h.is_online()).unwrap_or(false)) .filter(|lib| guard.get(&lib.id).map(|h| !h.is_online()).unwrap_or(false))
.map(|lib| lib.name.clone()) .map(|lib| lib.name.clone())
.collect(); .collect();
if !stale.is_empty() { if !stale.is_empty() {
warn!( warn!(
"Skipping orphaned-playlist cleanup: {} library(ies) stale: [{}]", "Skipping orphaned-HLS cleanup: {} library(ies) stale: [{}]",
stale.len(), stale.len(),
stale.join(", ") stale.join(", ")
); );
@@ -109,116 +119,135 @@ pub fn cleanup_orphaned_playlists(
} }
} }
info!("Running orphaned playlist cleanup"); info!("Running orphaned HLS cleanup");
let start = std::time::Instant::now(); let start = std::time::Instant::now();
let mut deleted_count = 0;
let mut error_count = 0;
// Find all .m3u8 files in VIDEO_PATH // Snapshot every live content_hash currently in image_exif.
let playlists: Vec<PathBuf> = WalkDir::new(&video_path) // We intentionally don't filter by library here — a hash that
.into_iter() // lives in any library is alive, even if the library a given
.filter_map(|e| e.ok()) // download attributed it to has since been disabled.
.filter(|e| e.file_type().is_file()) let alive_hashes: HashSet<String> = {
.filter(|e| { let context = opentelemetry::Context::new();
e.path() let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
.extension() match dao.list_distinct_content_hashes(&context) {
.and_then(|s| s.to_str()) Ok(hashes) => hashes.into_iter().collect(),
.map(|ext| ext.eq_ignore_ascii_case("m3u8")) Err(e) => {
error!(
"Failed to load distinct content hashes; skipping HLS cleanup: {:?}",
e
);
continue;
}
}
};
let mut deleted_count = 0usize;
let mut error_count = 0usize;
let mut inspected = 0usize;
// Walk top-level entries of VIDEO_PATH. Each is either a
// legacy basename-keyed `.m3u8` / `.ts` (skip — migration
// owns those) or a 2-char shard directory.
let read_root = match std::fs::read_dir(&video_path) {
Ok(r) => r,
Err(e) => {
error!(
"HLS cleanup: failed to read VIDEO_PATH {}: {}",
video_path.display(),
e
);
continue;
}
};
for shard_entry in read_root.flatten() {
let shard_path = shard_entry.path();
if !shard_entry
.file_type()
.map(|t| t.is_dir())
.unwrap_or(false)
{
continue;
}
let shard_name = match shard_path.file_name().and_then(|n| n.to_str()) {
Some(n) => n.to_owned(),
None => continue,
};
if !is_hash_shard(&shard_name) {
continue;
}
// Hash dirs inside this shard.
let read_shard = match std::fs::read_dir(&shard_path) {
Ok(r) => r,
Err(e) => {
warn!(
"HLS cleanup: failed to read shard {}: {}",
shard_path.display(),
e
);
continue;
}
};
let mut shard_emptied = true;
for hash_entry in read_shard.flatten() {
let hash_path = hash_entry.path();
if !hash_entry
.file_type()
.map(|t| t.is_dir())
.unwrap_or(false) .unwrap_or(false)
}) {
.map(|e| e.path().to_path_buf()) shard_emptied = false;
.collect(); continue;
}
let Some(hash_name) =
hash_path.file_name().and_then(|n| n.to_str()).map(|n| n.to_owned())
else {
shard_emptied = false;
continue;
};
if !is_full_hash(&hash_name) {
shard_emptied = false;
continue;
}
inspected += 1;
info!("Found {} playlist files to check", playlists.len()); if alive_hashes.contains(&hash_name) {
shard_emptied = false;
for playlist_path in playlists { continue;
// Extract the original video filename from playlist name
// Playlist format: {VIDEO_PATH}/{original_filename}.m3u8
if let Some(filename) = playlist_path.file_stem() {
let video_filename = filename.to_string_lossy();
// Search for this video file across every configured
// library, respecting EXCLUDED_DIRS so we don't
// false-resurrect playlists for videos that only
// exist inside an excluded subtree. As soon as one
// library has a matching source, we're done — the
// playlist isn't orphaned.
let mut video_exists = false;
'libs: for lib in &libs {
let effective = lib.effective_excluded_dirs(&excluded_dirs);
for entry in image_api::file_scan::walk_library_files(
Path::new(&lib.root_path),
&effective,
) {
if let Some(entry_stem) = entry.path().file_stem()
&& entry_stem == filename
&& file_types::is_video_file(entry.path())
{
video_exists = true;
break 'libs;
}
}
} }
if !video_exists { debug!(
debug!( "HLS cleanup: removing orphan hash dir {}",
"Source video for playlist {} no longer exists, deleting", hash_path.display()
playlist_path.display() );
); match std::fs::remove_dir_all(&hash_path) {
Ok(()) => deleted_count += 1,
// Delete the playlist file Err(e) => {
if let Err(e) = std::fs::remove_file(&playlist_path) {
warn!( warn!(
"Failed to delete playlist {}: {}", "Failed to delete orphan hash dir {}: {}",
playlist_path.display(), hash_path.display(),
e e
); );
error_count += 1; error_count += 1;
} else { shard_emptied = false;
deleted_count += 1;
// Also try to delete associated .ts segment files
// They are typically named {filename}N.ts in the same directory
if let Some(parent_dir) = playlist_path.parent() {
for entry in WalkDir::new(parent_dir)
.max_depth(1)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
{
let entry_path = entry.path();
if let Some(ext) = entry_path.extension()
&& ext.eq_ignore_ascii_case("ts")
{
// Check if this .ts file belongs to our playlist
if let Some(ts_stem) = entry_path.file_stem() {
let ts_name = ts_stem.to_string_lossy();
if ts_name.starts_with(&*video_filename) {
if let Err(e) = std::fs::remove_file(entry_path) {
debug!(
"Failed to delete segment {}: {}",
entry_path.display(),
e
);
} else {
debug!(
"Deleted segment: {}",
entry_path.display()
);
}
}
}
}
}
}
} }
} }
} }
// If this shard now has no surviving hash dirs, reap
// the (empty) shard dir too. remove_dir fails if non-
// empty, which is the guard.
if shard_emptied {
let _ = std::fs::remove_dir(&shard_path);
}
} }
info!( info!(
"Orphaned playlist cleanup completed in {:?}: deleted {} playlists, {} errors", "Orphaned HLS cleanup completed in {:?}: inspected {} hash dirs, deleted {} orphans, {} errors",
start.elapsed(), start.elapsed(),
inspected,
deleted_count, deleted_count,
error_count error_count
); );
@@ -226,6 +255,18 @@ pub fn cleanup_orphaned_playlists(
}); });
} }
/// True iff `s` is a two-character lowercase-hex shard prefix.
fn is_hash_shard(s: &str) -> bool {
s.len() == 2 && s.bytes().all(|b| b.is_ascii_hexdigit())
}
/// True iff `s` looks like a full blake3 hex digest (64 hex chars).
/// Be strict so we don't accidentally rm a non-HLS directory operators
/// have stashed under VIDEO_PATH.
fn is_full_hash(s: &str) -> bool {
s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit())
}
pub fn watch_files( pub fn watch_files(
libs_lock: Arc<RwLock<Vec<libraries::Library>>>, libs_lock: Arc<RwLock<Vec<libraries::Library>>>,
playlist_manager: Addr<VideoPlaylistManager>, playlist_manager: Addr<VideoPlaylistManager>,
@@ -991,6 +1032,33 @@ mod tests {
assert!(playlist_needs_generation(&video, &playlist)); assert!(playlist_needs_generation(&video, &playlist));
} }
#[test]
fn is_hash_shard_accepts_only_two_hex_chars() {
assert!(is_hash_shard("ab"));
assert!(is_hash_shard("00"));
assert!(is_hash_shard("FF")); // ASCII hexdigit covers upper-case too
assert!(!is_hash_shard("a"));
assert!(!is_hash_shard("abc"));
assert!(!is_hash_shard("zz"));
assert!(!is_hash_shard(""));
assert!(!is_hash_shard("a/"));
}
#[test]
fn is_full_hash_accepts_only_64_hex_chars() {
let h64 = "a".repeat(64);
assert!(is_full_hash(&h64));
let mixed = format!("ab{}", "0".repeat(62));
assert!(is_full_hash(&mixed));
assert!(!is_full_hash(&"a".repeat(63)));
assert!(!is_full_hash(&"a".repeat(65)));
assert!(!is_full_hash(&format!("z{}", "a".repeat(63))));
// Defends against operator stashing e.g. ".tmp" or "Plex" under
// VIDEO_PATH — neither passes the full-hash gate.
assert!(!is_full_hash(".tmp"));
assert!(!is_full_hash("Plex"));
}
#[test] #[test]
fn playlist_needs_generation_true_when_video_missing_metadata() { fn playlist_needs_generation_true_when_video_missing_metadata() {
// Video doesn't exist; metadata fails for it. Falls through to the // Video doesn't exist; metadata fails for it. Falls through to the