//! Video-related endpoints: HLS playlist generation, segment streaming, //! and the short-clip preview pipeline. use std::collections::HashMap; use std::path::PathBuf; use std::sync::Mutex; use actix_files::NamedFile; use actix_web::{ HttpRequest, HttpResponse, Responder, get, post, web::{self, Data}, }; use log::{debug, error, info, warn}; use opentelemetry::KeyValue; use opentelemetry::trace::{Span, Status, Tracer}; use serde::Serialize; use crate::content_hash; use crate::data::{ Claims, PreviewClipRequest, PreviewStatusItem, PreviewStatusRequest, PreviewStatusResponse, ThumbnailRequest, }; use crate::database::{ExifDao, PreviewDao}; use crate::files::is_valid_full_path; use crate::libraries; use crate::otel::{extract_context_from_request, global_tracer}; use crate::state::AppState; use crate::video::actors::{GeneratePreviewClipMessage, QueueVideosMessage, VideoToQueue}; use crate::video::hls_paths; /// Response body for `POST /video/generate`. Clients consume /// `playlist_url` (hash-keyed, stable across libraries and renames) /// and poll for readiness via the URL itself. #[derive(Serialize, Debug)] struct GenerateVideoResponse { /// Hash-keyed URL to the HLS playlist. Resolves to /// `$VIDEO_PATH///playlist.m3u8` server-side. Relative /// segment refs inside the playlist resolve correctly because the /// browser appends to this URL's path. playlist_url: String, /// blake3 content hash of the source video. Stable per byte content, /// so duplicate uploads / archive ingests share one set of HLS /// output. content_hash: String, /// `true` iff the playlist file is already on disk. `false` means a /// transcode was queued; clients should retry the URL after a short /// delay (or rely on HLS.js's own retry policy). ready: bool, } #[post("/video/generate")] pub async fn generate_video( _claims: Claims, request: HttpRequest, app_state: Data, exif_dao: Data>>, body: web::Json, ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("generate_video", &context); let preferred_library = libraries::resolve_library_param(&app_state, body.library.as_deref()) .ok() .flatten() .unwrap_or_else(|| app_state.primary_library()); // Try the resolved library first, then fall back to any other library // that actually contains the file — handles union-mode requests where // the mobile client passes no library but the file lives in a // non-primary library. Track which library won so the DB lookup is // scoped correctly. let resolved = is_valid_full_path(&preferred_library.root_path, &body.path, false) .filter(|p| p.exists()) .map(|p| (preferred_library.id, preferred_library.root_path.clone(), p)) .or_else(|| { app_state.libraries.iter().find_map(|lib| { if lib.id == preferred_library.id { return None; } is_valid_full_path(&lib.root_path, &body.path, false) .filter(|p| p.exists()) .map(|p| (lib.id, lib.root_path.clone(), p)) }) }); let Some((resolved_library_id, resolved_root, full_path)) = resolved else { span.set_status(Status::error(format!("invalid path {:?}", &body.path))); return HttpResponse::BadRequest().finish(); }; // Build the rel_path used to look up the row. let full_path_str = full_path.to_string_lossy().to_string(); let rel_path = full_path_str .strip_prefix(&resolved_root) .unwrap_or(full_path_str.as_str()) .trim_start_matches(['/', '\\']) .to_string(); // DB lookup first. Cheap and avoids re-reading the file off disk for // already-ingested videos. let hash_from_db: Option = { let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); match dao.get_exif_batch( &context, Some(resolved_library_id), std::slice::from_ref(&rel_path), ) { Ok(rows) => rows.into_iter().next().and_then(|r| r.content_hash), Err(e) => { warn!( "exif_dao.get_exif_batch failed for {} (lib {}): {:?}", rel_path, resolved_library_id, e ); None } } }; // Best-effort fallback: compute on-the-fly when the DB row hasn't // been written or is mid-backfill. Read-only — no library mutation. let content_hash_str = match hash_from_db { Some(h) => h, None => match content_hash::compute(&full_path) { Ok(id) => id.content_hash, Err(e) => { error!( "Failed to compute content_hash for {}: {}", full_path.display(), e ); span.set_status(Status::error(format!("hash compute failed: {}", e))); return HttpResponse::InternalServerError().finish(); } }, }; let video_dir = std::path::Path::new(&app_state.video_path); let playlist_path = hls_paths::playlist_for_hash(video_dir, &content_hash_str); let sentinel_path = hls_paths::sentinel_for_hash(video_dir, &content_hash_str); let ready = playlist_path.exists(); if !ready && !sentinel_path.exists() { // Kick off generation via the existing actor pipeline. Fire-and- // forget — the playlist appears at `playlist_path` once ffmpeg // + rename complete. The client polls the URL. info!( "/video/generate: queueing playlist for {} (hash={})", full_path.display(), &content_hash_str[..content_hash_str.len().min(16)] ); app_state.playlist_manager.do_send(QueueVideosMessage { videos: vec![VideoToQueue { video_path: full_path.clone(), content_hash: content_hash_str.clone(), }], }); span.add_event( "playlist_queued", vec![KeyValue::new("content_hash", content_hash_str.clone())], ); } else if ready { span.add_event( "playlist_already_present", vec![KeyValue::new("content_hash", content_hash_str.clone())], ); } else { // Sentinel present — past transcode attempt failed. Return the // URL anyway (it'll 404 / 5xx at fetch time) so the client gets // a deterministic answer. Operator must delete the sentinel to // force a retry. warn!( "/video/generate: unsupported sentinel present for {} (hash={}); not re-queueing", full_path.display(), &content_hash_str[..content_hash_str.len().min(16)] ); } let playlist_url = format!( "/video/hls/{}/{}", content_hash_str, hls_paths::PLAYLIST_FILENAME ); span.set_status(Status::Ok); HttpResponse::Ok().json(GenerateVideoResponse { playlist_url, content_hash: content_hash_str, ready, }) } /// Serve HLS playlist or segment files under the hash-keyed layout /// `$VIDEO_PATH///`. The matched `{file}` must be /// either `playlist.m3u8` or a `segment_NNN.ts` style segment; any other /// shape is 400'd to defend against operators stashing other content in /// the hash dir. #[get("/video/hls/{hash}/{file}")] pub async fn stream_hls_file( request: HttpRequest, _: Claims, path: web::Path<(String, String)>, app_state: Data, ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("stream_hls_file", &context); let (hash, file) = path.into_inner(); if !is_valid_hash(&hash) { span.set_status(Status::error("invalid hash")); return HttpResponse::BadRequest().body("invalid hash"); } if !is_allowed_hls_filename(&file) { span.set_status(Status::error("invalid file")); return HttpResponse::BadRequest().body("invalid file"); } let shard = &hash[..2]; let file_path = PathBuf::from(&app_state.video_path) .join(shard) .join(&hash) .join(&file); // Path-traversal guard: canonicalize both sides and require the file // to live under `app_state.video_path`. `is_valid_hash` / // `is_allowed_hls_filename` already block dangerous strings, but // belt-and-suspenders here is cheap. let canonical_base = match std::fs::canonicalize(&app_state.video_path) { Ok(p) => p, Err(e) => { error!("Failed to canonicalize VIDEO_PATH: {:?}", e); span.set_status(Status::error("VIDEO_PATH not canonicalisable")); return HttpResponse::InternalServerError().finish(); } }; let canonical_file = match std::fs::canonicalize(&file_path) { Ok(p) => p, Err(_) => { debug!("HLS file not found: {}", file_path.display()); span.set_status(Status::error("not found")); return HttpResponse::NotFound().finish(); } }; if !canonical_file.starts_with(&canonical_base) { warn!( "Path traversal attempt: {} resolved outside VIDEO_PATH", file_path.display() ); span.set_status(Status::error("traversal")); return HttpResponse::Forbidden().finish(); } match NamedFile::open(&canonical_file) { Ok(f) => { span.set_status(Status::Ok); f.into_response(&request) } Err(_) => { span.set_status(Status::error("not found")); HttpResponse::NotFound().finish() } } } /// 64 lowercase-or-upper hex chars. Strict so we don't accept arbitrary /// strings that might canonicalize into trouble. fn is_valid_hash(s: &str) -> bool { s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit()) } /// Allowed file names inside a hash dir. `playlist.m3u8` plus segment /// files matching the `segment_NNN.ts` template that `PlaylistGenerator` /// writes via `hls_paths::SEGMENT_TEMPLATE`. Anything else (including /// `.tmp`, `.unsupported`, dotfiles) returns 400 — these are internal /// artifacts the client should never request. fn is_allowed_hls_filename(name: &str) -> bool { if name == hls_paths::PLAYLIST_FILENAME { return true; } if let Some(rest) = name.strip_prefix("segment_") && let Some(num) = rest.strip_suffix(".ts") && !num.is_empty() && num.bytes().all(|b| b.is_ascii_digit()) { return true; } false } #[get("/video/preview")] pub async fn get_video_preview( _claims: Claims, request: HttpRequest, req: web::Query, app_state: Data, preview_dao: Data>>, ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("get_video_preview", &context); // Validate path let full_path = match is_valid_full_path(&app_state.base_path, &req.path, true) { Some(path) => path, None => { span.set_status(Status::error("Invalid path")); return HttpResponse::BadRequest().json(serde_json::json!({"error": "Invalid path"})); } }; let full_path_str = full_path.to_string_lossy().to_string(); // Use relative path (from BASE_PATH) for DB storage, consistent with EXIF convention let relative_path = full_path_str .strip_prefix(&app_state.base_path) .unwrap_or(&full_path_str) .trim_start_matches(['/', '\\']) .to_string(); // Check preview status in DB let preview = { let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); dao.get_preview(&context, &relative_path) }; match preview { Ok(Some(clip)) => match clip.status.as_str() { "complete" => { let preview_path = PathBuf::from(&app_state.preview_clips_path) .join(&relative_path) .with_extension("mp4"); match NamedFile::open(&preview_path) { Ok(file) => { span.set_status(Status::Ok); file.into_response(&request) } Err(_) => { // File missing on disk but DB says complete - reset and regenerate let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); let _ = dao.update_status( &context, &relative_path, "pending", None, None, None, ); app_state .preview_clip_generator .do_send(GeneratePreviewClipMessage { video_path: full_path_str, }); span.set_status(Status::Ok); HttpResponse::Accepted().json(serde_json::json!({ "status": "processing", "path": req.path })) } } } "processing" => { span.set_status(Status::Ok); HttpResponse::Accepted().json(serde_json::json!({ "status": "processing", "path": req.path })) } "failed" => { let error_msg = clip .error_message .unwrap_or_else(|| "Unknown error".to_string()); span.set_status(Status::error(format!("Generation failed: {}", error_msg))); HttpResponse::InternalServerError().json(serde_json::json!({ "error": format!("Generation failed: {}", error_msg) })) } _ => { // pending or unknown status - trigger generation app_state .preview_clip_generator .do_send(GeneratePreviewClipMessage { video_path: full_path_str, }); span.set_status(Status::Ok); HttpResponse::Accepted().json(serde_json::json!({ "status": "processing", "path": req.path })) } }, Ok(None) => { // No record exists - insert as pending and trigger generation { let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); let _ = dao.insert_preview(&context, &relative_path, "pending"); } app_state .preview_clip_generator .do_send(GeneratePreviewClipMessage { video_path: full_path_str, }); span.set_status(Status::Ok); HttpResponse::Accepted().json(serde_json::json!({ "status": "processing", "path": req.path })) } Err(_) => { span.set_status(Status::error("Database error")); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } #[post("/video/preview/status")] pub async fn get_preview_status( _claims: Claims, request: HttpRequest, body: web::Json, app_state: Data, preview_dao: Data>>, ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("get_preview_status", &context); // Limit to 200 paths per request if body.paths.len() > 200 { span.set_status(Status::error("Too many paths")); return HttpResponse::BadRequest() .json(serde_json::json!({"error": "Maximum 200 paths per request"})); } let previews = { let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); dao.get_previews_batch(&context, &body.paths) }; match previews { Ok(clips) => { // Build a map of file_path -> VideoPreviewClip for quick lookup let clip_map: HashMap = clips .into_iter() .map(|clip| (clip.file_path.clone(), clip)) .collect(); let mut items: Vec = Vec::with_capacity(body.paths.len()); for path in &body.paths { if let Some(clip) = clip_map.get(path) { // Re-queue generation for stale pending/failed records if clip.status == "pending" || clip.status == "failed" { let full_path = format!( "{}/{}", app_state.base_path.trim_end_matches(['/', '\\']), path.trim_start_matches(['/', '\\']) ); app_state .preview_clip_generator .do_send(GeneratePreviewClipMessage { video_path: full_path, }); } items.push(PreviewStatusItem { path: path.clone(), status: clip.status.clone(), preview_url: if clip.status == "complete" { Some(format!("/video/preview?path={}", urlencoding::encode(path))) } else { None }, }); } else { // No record exists — insert as pending and trigger generation { let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); let _ = dao.insert_preview(&context, path, "pending"); } // Build full path for ffmpeg (actor needs the absolute path for input) let full_path = format!( "{}/{}", app_state.base_path.trim_end_matches(['/', '\\']), path.trim_start_matches(['/', '\\']) ); info!("Triggering preview generation for '{}'", path); app_state .preview_clip_generator .do_send(GeneratePreviewClipMessage { video_path: full_path, }); items.push(PreviewStatusItem { path: path.clone(), status: "pending".to_string(), preview_url: None, }); } } span.set_status(Status::Ok); HttpResponse::Ok().json(PreviewStatusResponse { previews: items }) } Err(_) => { span.set_status(Status::error("Database error")); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } #[cfg(test)] mod tests { use super::*; use crate::data::Claims; use crate::database::PreviewDao; use crate::testhelpers::TestPreviewDao; use actix_web::App; #[test] fn is_valid_hash_requires_64_ascii_hex() { assert!(is_valid_hash(&"a".repeat(64))); assert!(is_valid_hash(&"F".repeat(64))); assert!(is_valid_hash(&format!("ab{}", "0".repeat(62)))); assert!(!is_valid_hash(&"a".repeat(63))); assert!(!is_valid_hash(&"a".repeat(65))); // Anything outside the hex alphabet — including '/', '.', '..' — // is rejected up front so the path-traversal canonicalisation // never has to defend the boundary alone. assert!(!is_valid_hash(&format!("/{}", "a".repeat(63)))); assert!(!is_valid_hash(&format!("..{}", "a".repeat(62)))); assert!(!is_valid_hash(&"g".repeat(64))); } #[test] fn is_allowed_hls_filename_accepts_only_playlist_and_segments() { assert!(is_allowed_hls_filename("playlist.m3u8")); assert!(is_allowed_hls_filename("segment_000.ts")); assert!(is_allowed_hls_filename("segment_999.ts")); assert!(is_allowed_hls_filename("segment_0.ts")); // Internal artifacts the client should never request. assert!(!is_allowed_hls_filename("playlist.m3u8.tmp")); assert!(!is_allowed_hls_filename("playlist.unsupported")); // Traversal / path components — defence in depth alongside // the actix path matcher itself. assert!(!is_allowed_hls_filename("..")); assert!(!is_allowed_hls_filename("../etc/passwd")); assert!(!is_allowed_hls_filename("segment_abc.ts")); assert!(!is_allowed_hls_filename("segment_.ts")); assert!(!is_allowed_hls_filename("")); } fn make_token() -> String { let claims = Claims::valid_user("1".to_string()); jsonwebtoken::encode( &jsonwebtoken::Header::default(), &claims, &jsonwebtoken::EncodingKey::from_secret(b"test_key"), ) .unwrap() } fn make_preview_dao(dao: TestPreviewDao) -> Data>> { Data::new(Mutex::new(Box::new(dao) as Box)) } #[actix_rt::test] async fn test_get_preview_status_returns_pending_for_unknown() { let dao = TestPreviewDao::new(); let preview_dao = make_preview_dao(dao); let app_state = Data::new(AppState::test_state()); let token = make_token(); let app = actix_web::test::init_service( App::new() .service(get_preview_status) .app_data(app_state) .app_data(preview_dao.clone()), ) .await; let req = actix_web::test::TestRequest::post() .uri("/video/preview/status") .insert_header(("Authorization", format!("Bearer {}", token))) .set_json(serde_json::json!({"paths": ["photos/new_video.mp4"]})) .to_request(); let resp = actix_web::test::call_service(&app, req).await; assert_eq!(resp.status(), 200); let body: serde_json::Value = actix_web::test::read_body_json(resp).await; let previews = body["previews"].as_array().unwrap(); assert_eq!(previews.len(), 1); assert_eq!(previews[0]["status"], "pending"); // Verify the DAO now has a pending record let mut dao_lock = preview_dao.lock().unwrap(); let ctx = opentelemetry::Context::new(); let clip = dao_lock.get_preview(&ctx, "photos/new_video.mp4").unwrap(); assert!(clip.is_some()); assert_eq!(clip.unwrap().status, "pending"); } #[actix_rt::test] async fn test_get_preview_status_returns_complete_with_url() { let mut dao = TestPreviewDao::new(); let ctx = opentelemetry::Context::new(); dao.insert_preview(&ctx, "photos/done.mp4", "pending") .unwrap(); dao.update_status( &ctx, "photos/done.mp4", "complete", Some(9.5), Some(500000), None, ) .unwrap(); let preview_dao = make_preview_dao(dao); let app_state = Data::new(AppState::test_state()); let token = make_token(); let app = actix_web::test::init_service( App::new() .service(get_preview_status) .app_data(app_state) .app_data(preview_dao), ) .await; let req = actix_web::test::TestRequest::post() .uri("/video/preview/status") .insert_header(("Authorization", format!("Bearer {}", token))) .set_json(serde_json::json!({"paths": ["photos/done.mp4"]})) .to_request(); let resp = actix_web::test::call_service(&app, req).await; assert_eq!(resp.status(), 200); let body: serde_json::Value = actix_web::test::read_body_json(resp).await; let previews = body["previews"].as_array().unwrap(); assert_eq!(previews.len(), 1); assert_eq!(previews[0]["status"], "complete"); assert!( previews[0]["preview_url"] .as_str() .unwrap() .contains("photos%2Fdone.mp4") ); } #[actix_rt::test] async fn test_get_preview_status_rejects_over_200_paths() { let dao = TestPreviewDao::new(); let preview_dao = make_preview_dao(dao); let app_state = Data::new(AppState::test_state()); let token = make_token(); let app = actix_web::test::init_service( App::new() .service(get_preview_status) .app_data(app_state) .app_data(preview_dao), ) .await; let paths: Vec = (0..201).map(|i| format!("video_{}.mp4", i)).collect(); let req = actix_web::test::TestRequest::post() .uri("/video/preview/status") .insert_header(("Authorization", format!("Bearer {}", token))) .set_json(serde_json::json!({"paths": paths})) .to_request(); let resp = actix_web::test::call_service(&app, req).await; assert_eq!(resp.status(), 400); } #[actix_rt::test] async fn test_get_preview_status_mixed_statuses() { let mut dao = TestPreviewDao::new(); let ctx = opentelemetry::Context::new(); dao.insert_preview(&ctx, "a.mp4", "pending").unwrap(); dao.insert_preview(&ctx, "b.mp4", "pending").unwrap(); dao.update_status(&ctx, "b.mp4", "complete", Some(10.0), Some(100000), None) .unwrap(); let preview_dao = make_preview_dao(dao); let app_state = Data::new(AppState::test_state()); let token = make_token(); let app = actix_web::test::init_service( App::new() .service(get_preview_status) .app_data(app_state) .app_data(preview_dao), ) .await; let req = actix_web::test::TestRequest::post() .uri("/video/preview/status") .insert_header(("Authorization", format!("Bearer {}", token))) .set_json(serde_json::json!({"paths": ["a.mp4", "b.mp4", "c.mp4"]})) .to_request(); let resp = actix_web::test::call_service(&app, req).await; assert_eq!(resp.status(), 200); let body: serde_json::Value = actix_web::test::read_body_json(resp).await; let previews = body["previews"].as_array().unwrap(); assert_eq!(previews.len(), 3); // a.mp4 is pending assert_eq!(previews[0]["path"], "a.mp4"); assert_eq!(previews[0]["status"], "pending"); // b.mp4 is complete with URL assert_eq!(previews[1]["path"], "b.mp4"); assert_eq!(previews[1]["status"], "complete"); assert!(previews[1]["preview_url"].is_string()); // c.mp4 was not found — handler inserts pending assert_eq!(previews[2]["path"], "c.mp4"); assert_eq!(previews[2]["status"], "pending"); } /// Verifies that the status endpoint re-queues generation for stale /// "pending" and "failed" records (e.g., after a server restart or /// when clip files were deleted). The do_send to the actor exercises /// the re-queue code path; the actor runs against temp dirs so it /// won't panic. #[actix_rt::test] async fn test_get_preview_status_requeues_pending_and_failed() { let mut dao = TestPreviewDao::new(); let ctx = opentelemetry::Context::new(); // Simulate stale records left from a previous server run dao.insert_preview(&ctx, "stale/pending.mp4", "pending") .unwrap(); dao.insert_preview(&ctx, "stale/failed.mp4", "pending") .unwrap(); dao.update_status( &ctx, "stale/failed.mp4", "failed", None, None, Some("ffmpeg error"), ) .unwrap(); let preview_dao = make_preview_dao(dao); let app_state = Data::new(AppState::test_state()); let token = make_token(); let app = actix_web::test::init_service( App::new() .service(get_preview_status) .app_data(app_state) .app_data(preview_dao), ) .await; let req = actix_web::test::TestRequest::post() .uri("/video/preview/status") .insert_header(("Authorization", format!("Bearer {}", token))) .set_json(serde_json::json!({ "paths": ["stale/pending.mp4", "stale/failed.mp4"] })) .to_request(); let resp = actix_web::test::call_service(&app, req).await; assert_eq!(resp.status(), 200); let body: serde_json::Value = actix_web::test::read_body_json(resp).await; let previews = body["previews"].as_array().unwrap(); assert_eq!(previews.len(), 2); // Both records are returned with their current status assert_eq!(previews[0]["path"], "stale/pending.mp4"); assert_eq!(previews[0]["status"], "pending"); assert!(previews[0].get("preview_url").is_none()); assert_eq!(previews[1]["path"], "stale/failed.mp4"); assert_eq!(previews[1]["status"], "failed"); assert!(previews[1].get("preview_url").is_none()); } }