#[macro_use] extern crate diesel; extern crate rayon; use actix::Addr; use actix_web::web::Data; use actix_web_prom::PrometheusMetricsBuilder; use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; use futures::stream::StreamExt; use lazy_static::lazy_static; use prometheus::{self, IntGauge}; use std::error::Error; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; use std::{collections::HashMap, io::prelude::*}; use std::{env, fs::File}; use std::{ io::ErrorKind, path::{Path, PathBuf}, }; use walkdir::{DirEntry, WalkDir}; use actix_cors::Cors; use actix_files::NamedFile; use actix_governor::{Governor, GovernorConfigBuilder}; use actix_multipart as mp; use actix_web::{ App, HttpRequest, HttpResponse, HttpServer, Responder, delete, get, middleware, post, put, web::{self, BufMut, BytesMut}, }; use chrono::Utc; use diesel::sqlite::Sqlite; use rayon::prelude::*; use urlencoding::decode; use crate::ai::InsightGenerator; use crate::auth::login; use crate::data::*; use crate::database::models::InsertImageExif; use crate::database::*; use crate::files::{ RealFileSystem, RefreshThumbnailsMessage, is_image_or_video, is_valid_full_path, move_file, }; use crate::otel::{extract_context_from_request, global_tracer}; use crate::service::ServiceBuilder; use crate::state::AppState; use crate::tags::*; use crate::video::actors::{ GeneratePreviewClipMessage, ProcessMessage, QueueVideosMessage, ScanDirectoryMessage, VideoPlaylistManager, create_playlist, generate_video_thumbnail, }; use log::{debug, error, info, trace, warn}; use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; use opentelemetry::{KeyValue, global}; mod ai; mod auth; mod data; mod database; mod error; mod exif; mod file_types; mod files; mod geo; mod state; mod tags; mod utils; mod video; mod memories; mod otel; mod service; #[cfg(test)] mod testhelpers; lazy_static! { static ref IMAGE_GAUGE: IntGauge = IntGauge::new( "imageserver_image_total", "Count of the images on the server" ) .unwrap(); static ref VIDEO_GAUGE: IntGauge = IntGauge::new( "imageserver_video_total", "Count of the videos on the server" ) .unwrap(); } pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); #[get("/image")] async fn get_image( _claims: Claims, request: HttpRequest, req: web::Query, app_state: Data, ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("get_image", &context); if let Some(path) = is_valid_full_path(&app_state.base_path, &req.path, false) { let image_size = req.size.unwrap_or(PhotoSize::Full); if image_size == PhotoSize::Thumb { let relative_path = path .strip_prefix(&app_state.base_path) .expect("Error stripping base path prefix from thumbnail"); let thumbs = &app_state.thumbnail_path; let mut thumb_path = Path::new(&thumbs).join(relative_path); // If it's a video and GIF format is requested, try to serve GIF thumbnail if req.format == Some(ThumbnailFormat::Gif) && is_video_file(&path) { thumb_path = Path::new(&app_state.gif_path).join(relative_path); thumb_path.set_extension("gif"); } // Handle circular thumbnail request if req.shape == Some(ThumbnailShape::Circle) { match create_circular_thumbnail(&thumb_path, thumbs).await { Ok(circular_path) => { if let Ok(file) = NamedFile::open(&circular_path) { span.set_status(Status::Ok); return file .use_etag(true) .use_last_modified(true) .prefer_utf8(true) .into_response(&request); } } Err(e) => { warn!("Failed to create circular thumbnail: {:?}", e); // Fall through to serve square thumbnail } } } trace!("Thumbnail path: {:?}", thumb_path); if let Ok(file) = NamedFile::open(&thumb_path) { span.set_status(Status::Ok); // The NamedFile will automatically set the correct content-type // Enable ETag and set cache headers for thumbnails (1 day cache) return file .use_etag(true) .use_last_modified(true) .prefer_utf8(true) .into_response(&request); } } if let Ok(file) = NamedFile::open(&path) { span.set_status(Status::Ok); // Enable ETag and set cache headers for full images (1 hour cache) return file .use_etag(true) .use_last_modified(true) .prefer_utf8(true) .into_response(&request); } span.set_status(Status::error("Not found")); HttpResponse::NotFound().finish() } else { span.set_status(Status::error("Bad photos request")); error!("Bad photos request: {}", req.path); HttpResponse::BadRequest().finish() } } fn is_video_file(path: &Path) -> bool { use image_api::file_types; file_types::is_video_file(path) } async fn create_circular_thumbnail( thumb_path: &Path, thumbs_dir: &str, ) -> Result> { use image::{GenericImageView, ImageBuffer, Rgba}; // Create circular thumbnails directory let circular_dir = Path::new(thumbs_dir).join("_circular"); // Get relative path from thumbs_dir to create same structure let relative_to_thumbs = thumb_path.strip_prefix(thumbs_dir)?; let circular_path = circular_dir.join(relative_to_thumbs).with_extension("png"); // Check if circular thumbnail already exists if circular_path.exists() { return Ok(circular_path); } // Create parent directory if needed if let Some(parent) = circular_path.parent() { std::fs::create_dir_all(parent)?; } // Load the square thumbnail let img = image::open(thumb_path)?; let (width, height) = img.dimensions(); // Fixed output size for consistency let output_size = 80u32; let radius = output_size as f32 / 2.0; // Calculate crop area to get square center of original image let crop_size = width.min(height); let crop_x = (width - crop_size) / 2; let crop_y = (height - crop_size) / 2; // Create a new RGBA image with transparency let output = ImageBuffer::from_fn(output_size, output_size, |x, y| { let dx = x as f32 - radius; let dy = y as f32 - radius; let distance = (dx * dx + dy * dy).sqrt(); if distance <= radius { // Inside circle - map to cropped source area // Scale from output coordinates to crop coordinates let scale = crop_size as f32 / output_size as f32; let src_x = crop_x + (x as f32 * scale) as u32; let src_y = crop_y + (y as f32 * scale) as u32; let pixel = img.get_pixel(src_x, src_y); Rgba([pixel[0], pixel[1], pixel[2], 255]) } else { // Outside circle - transparent Rgba([0, 0, 0, 0]) } }); // Save as PNG (supports transparency) output.save(&circular_path)?; Ok(circular_path) } #[get("/image/metadata")] async fn get_file_metadata( _: Claims, request: HttpRequest, path: web::Query, app_state: Data, exif_dao: Data>>, ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("get_file_metadata", &context); let span_context = opentelemetry::Context::new().with_remote_span_context(span.span_context().clone()); let full_path = is_valid_full_path(&app_state.base_path, &path.path, false); match full_path .ok_or_else(|| ErrorKind::InvalidData.into()) .and_then(File::open) .and_then(|file| file.metadata()) { Ok(metadata) => { let mut response: MetadataResponse = metadata.into(); // Extract date from filename if possible response.filename_date = memories::extract_date_from_filename(&path.path).map(|dt| dt.timestamp()); // Query EXIF data if available if let Ok(mut dao) = exif_dao.lock() && let Ok(Some(exif)) = dao.get_exif(&span_context, &path.path) { response.exif = Some(exif.into()); } span.add_event( "Metadata fetched", vec![KeyValue::new("file", path.path.clone())], ); span.set_status(Status::Ok); HttpResponse::Ok().json(response) } Err(e) => { let message = format!("Error getting metadata for file '{}': {:?}", path.path, e); error!("{}", message); span.set_status(Status::error(message)); HttpResponse::InternalServerError().finish() } } } #[post("/image")] async fn upload_image( _: Claims, request: HttpRequest, mut payload: mp::Multipart, app_state: Data, exif_dao: Data>>, ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("upload_image", &context); let span_context = opentelemetry::Context::new().with_remote_span_context(span.span_context().clone()); let mut file_content: BytesMut = BytesMut::new(); let mut file_name: Option = None; let mut file_path: Option = None; while let Some(Ok(mut part)) = payload.next().await { if let Some(content_type) = part.content_disposition() { debug!("{:?}", content_type); if let Some(filename) = content_type.get_filename() { debug!("Name (raw): {:?}", filename); // Decode URL-encoded filename (e.g., "file%20name.jpg" -> "file name.jpg") let decoded_filename = decode(filename) .map(|s| s.to_string()) .unwrap_or_else(|_| filename.to_string()); debug!("Name (decoded): {:?}", decoded_filename); file_name = Some(decoded_filename); while let Some(Ok(data)) = part.next().await { file_content.put(data); } } else if content_type.get_name() == Some("path") { while let Some(Ok(data)) = part.next().await { if let Ok(path) = std::str::from_utf8(&data) { file_path = Some(path.to_string()) } } } } } let path = file_path.unwrap_or_else(|| app_state.base_path.clone()); if !file_content.is_empty() { if file_name.is_none() { span.set_status(Status::error("No filename provided")); return HttpResponse::BadRequest().body("No filename provided"); } let full_path = PathBuf::from(&path).join(file_name.unwrap()); if let Some(full_path) = is_valid_full_path( &app_state.base_path, &full_path.to_str().unwrap().to_string(), true, ) { let context = opentelemetry::Context::new().with_remote_span_context(span.span_context().clone()); tracer .span_builder("file write") .start_with_context(&tracer, &context); let uploaded_path = if !full_path.is_file() && is_image_or_video(&full_path) { let mut file = File::create(&full_path).unwrap(); file.write_all(&file_content).unwrap(); info!("Uploaded: {:?}", full_path); full_path } else { warn!("File already exists: {:?}", full_path); let new_path = format!( "{}/{}_{}.{}", full_path.parent().unwrap().to_str().unwrap(), full_path.file_stem().unwrap().to_str().unwrap(), Utc::now().timestamp(), full_path .extension() .expect("Uploaded file should have an extension") .to_str() .unwrap() ); info!("Uploaded: {}", new_path); let new_path_buf = PathBuf::from(&new_path); let mut file = File::create(&new_path_buf).unwrap(); file.write_all(&file_content).unwrap(); new_path_buf }; // Extract and store EXIF data if file supports it if exif::supports_exif(&uploaded_path) { let relative_path = uploaded_path .strip_prefix(&app_state.base_path) .expect("Error stripping base path prefix") .to_str() .unwrap() .to_string(); match exif::extract_exif_from_path(&uploaded_path) { Ok(exif_data) => { let timestamp = Utc::now().timestamp(); let insert_exif = InsertImageExif { file_path: relative_path.clone(), camera_make: exif_data.camera_make, camera_model: exif_data.camera_model, lens_model: exif_data.lens_model, width: exif_data.width, height: exif_data.height, orientation: exif_data.orientation, gps_latitude: exif_data.gps_latitude.map(|v| v as f32), gps_longitude: exif_data.gps_longitude.map(|v| v as f32), gps_altitude: exif_data.gps_altitude.map(|v| v as f32), focal_length: exif_data.focal_length.map(|v| v as f32), aperture: exif_data.aperture.map(|v| v as f32), shutter_speed: exif_data.shutter_speed, iso: exif_data.iso, date_taken: exif_data.date_taken, created_time: timestamp, last_modified: timestamp, }; if let Ok(mut dao) = exif_dao.lock() { if let Err(e) = dao.store_exif(&span_context, insert_exif) { error!("Failed to store EXIF data for {}: {:?}", relative_path, e); } else { debug!("EXIF data stored for {}", relative_path); } } } Err(e) => { debug!( "No EXIF data or error extracting from {}: {:?}", uploaded_path.display(), e ); } } } } else { error!("Invalid path for upload: {:?}", full_path); span.set_status(Status::error("Invalid path for upload")); return HttpResponse::BadRequest().body("Path was not valid"); } } else { span.set_status(Status::error("No file body read")); return HttpResponse::BadRequest().body("No file body read"); } app_state.stream_manager.do_send(RefreshThumbnailsMessage); span.set_status(Status::Ok); HttpResponse::Ok().finish() } #[post("/video/generate")] async fn generate_video( _claims: Claims, request: HttpRequest, app_state: Data, body: web::Json, ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("generate_video", &context); let filename = PathBuf::from(&body.path); if let Some(name) = filename.file_name() { let filename = name.to_str().expect("Filename should convert to string"); let playlist = format!("{}/{}.m3u8", app_state.video_path, filename); if let Some(path) = is_valid_full_path(&app_state.base_path, &body.path, false) { if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await { span.add_event( "playlist_created".to_string(), vec![KeyValue::new("playlist-name", filename.to_string())], ); span.set_status(Status::Ok); app_state.stream_manager.do_send(ProcessMessage( playlist.clone(), child, // opentelemetry::Context::new().with_span(span), )); } } else { span.set_status(Status::error(format!("invalid path {:?}", &body.path))); return HttpResponse::BadRequest().finish(); } HttpResponse::Ok().json(playlist) } else { let message = format!("Unable to get file name: {:?}", filename); error!("{}", message); span.set_status(Status::error(message)); HttpResponse::BadRequest().finish() } } #[get("/video/stream")] async fn stream_video( request: HttpRequest, _: Claims, path: web::Query, app_state: Data, ) -> impl Responder { let tracer = global::tracer("image-server"); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("stream_video", &context); let playlist = &path.path; debug!("Playlist: {}", playlist); // Extract video playlist dir to dotenv if !playlist.starts_with(&app_state.video_path) && is_valid_full_path(&app_state.base_path, playlist, false).is_some() { span.set_status(Status::error(format!("playlist not valid {}", playlist))); HttpResponse::BadRequest().finish() } else { match NamedFile::open(playlist) { Ok(file) => { span.set_status(Status::Ok); file.into_response(&request) } _ => { span.set_status(Status::error(format!("playlist not found {}", playlist))); HttpResponse::NotFound().finish() } } } } #[get("/video/{path}")] async fn get_video_part( request: HttpRequest, _: Claims, path: web::Path, app_state: Data, ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("get_video_part", &context); let part = &path.path; debug!("Video part: {}", part); let mut file_part = PathBuf::new(); file_part.push(app_state.video_path.clone()); file_part.push(part); // Guard against directory traversal attacks let canonical_base = match std::fs::canonicalize(&app_state.video_path) { Ok(path) => path, Err(e) => { error!("Failed to canonicalize video path: {:?}", e); span.set_status(Status::error("Invalid video path configuration")); return HttpResponse::InternalServerError().finish(); } }; let canonical_file = match std::fs::canonicalize(&file_part) { Ok(path) => path, Err(_) => { warn!("Video part not found or invalid: {:?}", file_part); span.set_status(Status::error(format!("Video part not found '{}'", part))); return HttpResponse::NotFound().finish(); } }; // Ensure the resolved path is still within the video directory if !canonical_file.starts_with(&canonical_base) { warn!("Directory traversal attempt detected: {:?}", part); span.set_status(Status::error("Invalid video path")); return HttpResponse::Forbidden().finish(); } match NamedFile::open(&canonical_file) { Ok(file) => { span.set_status(Status::Ok); file.into_response(&request) } _ => { error!("Video part not found: {:?}", file_part); span.set_status(Status::error(format!( "Video part not found '{}'", file_part.to_str().unwrap() ))); HttpResponse::NotFound().finish() } } } #[get("/video/preview")] async fn get_video_preview( _claims: Claims, request: HttpRequest, req: web::Query, app_state: Data, preview_dao: Data>>, ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("get_video_preview", &context); // Validate path let full_path = match is_valid_full_path(&app_state.base_path, &req.path, true) { Some(path) => path, None => { span.set_status(Status::error("Invalid path")); return HttpResponse::BadRequest().json(serde_json::json!({"error": "Invalid path"})); } }; let full_path_str = full_path.to_string_lossy().to_string(); // Use relative path (from BASE_PATH) for DB storage, consistent with EXIF convention let relative_path = full_path_str .strip_prefix(&app_state.base_path) .unwrap_or(&full_path_str) .trim_start_matches(['/', '\\']) .to_string(); // Check preview status in DB let preview = { let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); dao.get_preview(&context, &relative_path) }; match preview { Ok(Some(clip)) => match clip.status.as_str() { "complete" => { let preview_path = PathBuf::from(&app_state.preview_clips_path) .join(&relative_path) .with_extension("mp4"); match NamedFile::open(&preview_path) { Ok(file) => { span.set_status(Status::Ok); file.into_response(&request) } Err(_) => { // File missing on disk but DB says complete - reset and regenerate let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); let _ = dao.update_status( &context, &relative_path, "pending", None, None, None, ); app_state .preview_clip_generator .do_send(GeneratePreviewClipMessage { video_path: full_path_str, }); span.set_status(Status::Ok); HttpResponse::Accepted().json(serde_json::json!({ "status": "processing", "path": req.path })) } } } "processing" => { span.set_status(Status::Ok); HttpResponse::Accepted().json(serde_json::json!({ "status": "processing", "path": req.path })) } "failed" => { let error_msg = clip .error_message .unwrap_or_else(|| "Unknown error".to_string()); span.set_status(Status::error(format!("Generation failed: {}", error_msg))); HttpResponse::InternalServerError().json(serde_json::json!({ "error": format!("Generation failed: {}", error_msg) })) } _ => { // pending or unknown status - trigger generation app_state .preview_clip_generator .do_send(GeneratePreviewClipMessage { video_path: full_path_str, }); span.set_status(Status::Ok); HttpResponse::Accepted().json(serde_json::json!({ "status": "processing", "path": req.path })) } }, Ok(None) => { // No record exists - insert as pending and trigger generation { let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); let _ = dao.insert_preview(&context, &relative_path, "pending"); } app_state .preview_clip_generator .do_send(GeneratePreviewClipMessage { video_path: full_path_str, }); span.set_status(Status::Ok); HttpResponse::Accepted().json(serde_json::json!({ "status": "processing", "path": req.path })) } Err(_) => { span.set_status(Status::error("Database error")); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } #[post("/video/preview/status")] async fn get_preview_status( _claims: Claims, request: HttpRequest, body: web::Json, app_state: Data, preview_dao: Data>>, ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("get_preview_status", &context); // Limit to 200 paths per request if body.paths.len() > 200 { span.set_status(Status::error("Too many paths")); return HttpResponse::BadRequest() .json(serde_json::json!({"error": "Maximum 200 paths per request"})); } let previews = { let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); dao.get_previews_batch(&context, &body.paths) }; match previews { Ok(clips) => { // Build a map of file_path -> VideoPreviewClip for quick lookup let clip_map: HashMap = clips .into_iter() .map(|clip| (clip.file_path.clone(), clip)) .collect(); let mut items: Vec = Vec::with_capacity(body.paths.len()); for path in &body.paths { if let Some(clip) = clip_map.get(path) { // Re-queue generation for stale pending/failed records if clip.status == "pending" || clip.status == "failed" { let full_path = format!( "{}/{}", app_state.base_path.trim_end_matches(['/', '\\']), path.trim_start_matches(['/', '\\']) ); app_state .preview_clip_generator .do_send(GeneratePreviewClipMessage { video_path: full_path, }); } items.push(PreviewStatusItem { path: path.clone(), status: clip.status.clone(), preview_url: if clip.status == "complete" { Some(format!("/video/preview?path={}", urlencoding::encode(path))) } else { None }, }); } else { // No record exists — insert as pending and trigger generation { let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); let _ = dao.insert_preview(&context, path, "pending"); } // Build full path for ffmpeg (actor needs the absolute path for input) let full_path = format!( "{}/{}", app_state.base_path.trim_end_matches(['/', '\\']), path.trim_start_matches(['/', '\\']) ); info!("Triggering preview generation for '{}'", path); app_state .preview_clip_generator .do_send(GeneratePreviewClipMessage { video_path: full_path, }); items.push(PreviewStatusItem { path: path.clone(), status: "pending".to_string(), preview_url: None, }); } } span.set_status(Status::Ok); HttpResponse::Ok().json(PreviewStatusResponse { previews: items }) } Err(_) => { span.set_status(Status::error("Database error")); HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } #[get("image/favorites")] async fn favorites( claims: Claims, request: HttpRequest, favorites_dao: Data>>, ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("get favorites", &context); match web::block(move || { favorites_dao .lock() .expect("Unable to get FavoritesDao") .get_favorites(claims.sub.parse::().unwrap()) }) .await { Ok(Ok(favorites)) => { let favorites = favorites .into_iter() .map(|favorite| favorite.path) .collect::>(); span.set_status(Status::Ok); HttpResponse::Ok().json(PhotosResponse { photos: favorites, dirs: Vec::new(), total_count: None, has_more: None, next_offset: None, }) } Ok(Err(e)) => { span.set_status(Status::error(format!("Error getting favorites: {:?}", e))); error!("Error getting favorites: {:?}", e); HttpResponse::InternalServerError().finish() } Err(_) => HttpResponse::InternalServerError().finish(), } } #[put("image/favorites")] async fn put_add_favorite( claims: Claims, body: web::Json, favorites_dao: Data>>, ) -> impl Responder { if let Ok(user_id) = claims.sub.parse::() { let path = body.path.clone(); match web::block::<_, Result>(move || { favorites_dao .lock() .expect("Unable to get FavoritesDao") .add_favorite(user_id, &path) }) .await { Ok(Err(e)) if e.kind == DbErrorKind::AlreadyExists => { warn!("Favorite: {} exists for user: {}", &body.path, user_id); HttpResponse::Ok() } Ok(Err(e)) => { error!("{:?} {}. for user: {}", e, body.path, user_id); HttpResponse::BadRequest() } Ok(Ok(_)) => { info!("Adding favorite \"{}\" for userid: {}", body.path, user_id); HttpResponse::Created() } Err(e) => { error!("Blocking error while inserting favorite: {:?}", e); HttpResponse::InternalServerError() } } } else { error!("Unable to parse sub as i32: {}", claims.sub); HttpResponse::BadRequest() } } #[delete("image/favorites")] async fn delete_favorite( claims: Claims, body: web::Query, favorites_dao: Data>>, ) -> impl Responder { if let Ok(user_id) = claims.sub.parse::() { let path = body.path.clone(); web::block(move || { favorites_dao .lock() .expect("Unable to get favorites dao") .remove_favorite(user_id, path); }) .await .unwrap(); info!( "Removing favorite \"{}\" for userid: {}", body.path, user_id ); HttpResponse::Ok() } else { error!("Unable to parse sub as i32: {}", claims.sub); HttpResponse::BadRequest() } } fn create_thumbnails() { let tracer = global_tracer(); let span = tracer.start("creating thumbnails"); let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined"); let thumbnail_directory: &Path = Path::new(thumbs); let images = PathBuf::from(dotenv::var("BASE_PATH").unwrap()); WalkDir::new(&images) .into_iter() .collect::>>() .into_par_iter() .filter_map(|entry| entry.ok()) .filter(|entry| entry.file_type().is_file()) .filter(|entry| { if is_video(entry) { let relative_path = &entry.path().strip_prefix(&images).unwrap(); let thumb_path = Path::new(thumbnail_directory).join(relative_path); std::fs::create_dir_all( thumb_path .parent() .unwrap_or_else(|| panic!("Thumbnail {:?} has no parent?", thumb_path)), ) .expect("Error creating directory"); let mut video_span = tracer.start_with_context( "generate_video_thumbnail", &opentelemetry::Context::new() .with_remote_span_context(span.span_context().clone()), ); video_span.set_attributes(vec![ KeyValue::new("type", "video"), KeyValue::new("file-name", thumb_path.display().to_string()), ]); debug!("Generating video thumbnail: {:?}", thumb_path); generate_video_thumbnail(entry.path(), &thumb_path); video_span.end(); false } else { is_image(entry) } }) .filter(|entry| { let path = entry.path(); let relative_path = &path.strip_prefix(&images).unwrap(); let thumb_path = Path::new(thumbnail_directory).join(relative_path); !thumb_path.exists() }) .map(|entry| (image::open(entry.path()), entry.path().to_path_buf())) .filter(|(img, path)| { if let Err(e) = img { error!("Unable to open image: {:?}. {}", path, e); } img.is_ok() }) .map(|(img, path)| (img.unwrap(), path)) .map(|(image, path)| (image.thumbnail(200, u32::MAX), path)) .map(|(image, path)| { let relative_path = &path.strip_prefix(&images).unwrap(); let thumb_path = Path::new(thumbnail_directory).join(relative_path); std::fs::create_dir_all(thumb_path.parent().unwrap()) .expect("There was an issue creating directory"); info!("Saving thumbnail: {:?}", thumb_path); image.save(thumb_path).expect("Failure saving thumbnail"); }) .for_each(drop); debug!("Finished making thumbnails"); update_media_counts(&images); } fn update_media_counts(media_dir: &Path) { let mut image_count = 0; let mut video_count = 0; for ref entry in WalkDir::new(media_dir).into_iter().filter_map(|e| e.ok()) { if is_image(entry) { image_count += 1; } else if is_video(entry) { video_count += 1; } } IMAGE_GAUGE.set(image_count); VIDEO_GAUGE.set(video_count); } fn is_image(entry: &DirEntry) -> bool { use image_api::file_types; file_types::direntry_is_image(entry) } fn is_video(entry: &DirEntry) -> bool { use image_api::file_types; file_types::direntry_is_video(entry) } fn main() -> std::io::Result<()> { if let Err(err) = dotenv::dotenv() { println!("Error parsing .env {:?}", err); } run_migrations(&mut connect()).expect("Failed to run migrations"); let system = actix::System::new(); system.block_on(async { // Just use basic logger when running a non-release build #[cfg(debug_assertions)] { env_logger::init(); } #[cfg(not(debug_assertions))] { otel::init_logs(); otel::init_tracing(); } create_thumbnails(); // generate_video_gifs().await; let app_data = Data::new(AppState::default()); let labels = HashMap::new(); let prometheus = PrometheusMetricsBuilder::new("api") .const_labels(labels) .build() .expect("Unable to build prometheus metrics middleware"); prometheus .registry .register(Box::new(IMAGE_GAUGE.clone())) .unwrap(); prometheus .registry .register(Box::new(VIDEO_GAUGE.clone())) .unwrap(); let app_state = app_data.clone(); app_state.playlist_manager.do_send(ScanDirectoryMessage { directory: app_state.base_path.clone(), }); // Start file watcher with playlist manager and preview generator let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone(); let preview_gen_for_watcher = app_state.preview_clip_generator.as_ref().clone(); watch_files(playlist_mgr_for_watcher, preview_gen_for_watcher); // Start orphaned playlist cleanup job cleanup_orphaned_playlists(); // Spawn background job to generate daily conversation summaries { use crate::ai::generate_daily_summaries; use crate::database::{DailySummaryDao, SqliteDailySummaryDao}; use chrono::NaiveDate; // Configure date range for summary generation // Default: August 2024 ±30 days (July 1 - September 30, 2024) // To expand: change start_date and end_date let start_date = Some(NaiveDate::from_ymd_opt(2015, 10, 1).unwrap()); let end_date = Some(NaiveDate::from_ymd_opt(2020, 1, 1).unwrap()); // let contacts_to_summarize = vec!["Domenique", "Zach", "Paul"]; // Add more contacts as needed let contacts_to_summarize = vec![]; // Add more contacts as needed let ollama = app_state.ollama.clone(); let sms_client = app_state.sms_client.clone(); for contact in contacts_to_summarize { let ollama_clone = ollama.clone(); let sms_client_clone = sms_client.clone(); let summary_dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteDailySummaryDao::new()))); let start = start_date; let end = end_date; tokio::spawn(async move { info!("Starting daily summary generation for {}", contact); if let Err(e) = generate_daily_summaries( contact, start, end, &ollama_clone, &sms_client_clone, summary_dao, ) .await { error!("Daily summary generation failed for {}: {:?}", contact, e); } else { info!("Daily summary generation completed for {}", contact); } }); } } HttpServer::new(move || { let user_dao = SqliteUserDao::new(); let favorites_dao = SqliteFavoriteDao::new(); let tag_dao = SqliteTagDao::default(); let exif_dao = SqliteExifDao::new(); let insight_dao = SqliteInsightDao::new(); let preview_dao = SqlitePreviewDao::new(); let cors = Cors::default() .allowed_origin_fn(|origin, _req_head| { // Allow all origins in development, or check against CORS_ALLOWED_ORIGINS env var if let Ok(allowed_origins) = env::var("CORS_ALLOWED_ORIGINS") { allowed_origins .split(',') .any(|allowed| origin.as_bytes() == allowed.trim().as_bytes()) } else { // Default: allow all origins if not configured true } }) .allowed_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"]) .allowed_headers(vec![ actix_web::http::header::AUTHORIZATION, actix_web::http::header::ACCEPT, actix_web::http::header::CONTENT_TYPE, ]) .supports_credentials() .max_age(3600); // Configure rate limiting for login endpoint (2 requests/sec, burst of 5) let governor_conf = GovernorConfigBuilder::default() .per_second(2) .burst_size(5) .finish() .unwrap(); App::new() .wrap(middleware::Logger::default()) .wrap(cors) .service( web::resource("/login") .wrap(Governor::new(&governor_conf)) .route(web::post().to(login::)), ) .service( web::resource("/photos") .route(web::get().to(files::list_photos::)), ) .service( web::resource("/photos/gps-summary") .route(web::get().to(files::get_gps_summary)), ) .service(web::resource("/file/move").post(move_file::)) .service(get_image) .service(upload_image) .service(generate_video) .service(stream_video) .service(get_video_preview) .service(get_preview_status) .service(get_video_part) .service(favorites) .service(put_add_favorite) .service(delete_favorite) .service(get_file_metadata) .service(memories::list_memories) .service(ai::generate_insight_handler) .service(ai::generate_agentic_insight_handler) .service(ai::get_insight_handler) .service(ai::delete_insight_handler) .service(ai::get_all_insights_handler) .service(ai::get_available_models_handler) .add_feature(add_tag_services::<_, SqliteTagDao>) .app_data(app_data.clone()) .app_data::>(Data::new(RealFileSystem::new( app_data.base_path.clone(), ))) .app_data::>>(Data::new(Mutex::new(user_dao))) .app_data::>>>(Data::new(Mutex::new(Box::new( favorites_dao, )))) .app_data::>>(Data::new(Mutex::new(tag_dao))) .app_data::>>>(Data::new(Mutex::new(Box::new( exif_dao, )))) .app_data::>>>(Data::new(Mutex::new(Box::new( insight_dao, )))) .app_data::>>>(Data::new(Mutex::new(Box::new( preview_dao, )))) .app_data(web::JsonConfig::default().error_handler(|err, req| { let detail = err.to_string(); log::warn!( "JSON parse error on {} {}: {}", req.method(), req.uri(), detail ); let response = HttpResponse::BadRequest().json(serde_json::json!({"error": detail})); actix_web::error::InternalError::from_response(err, response).into() })) .app_data::>(Data::new(app_data.insight_generator.clone())) .wrap(prometheus.clone()) }) .bind(dotenv::var("BIND_URL").unwrap())? .bind("localhost:8088")? .run() .await }) } fn run_migrations( connection: &mut impl MigrationHarness, ) -> Result<(), Box> { connection.run_pending_migrations(MIGRATIONS)?; Ok(()) } /// Clean up orphaned HLS playlists and segments whose source videos no longer exist fn cleanup_orphaned_playlists() { std::thread::spawn(|| { let video_path = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); let base_path = dotenv::var("BASE_PATH").expect("BASE_PATH must be set"); // Get cleanup interval from environment (default: 24 hours) let cleanup_interval_secs = dotenv::var("PLAYLIST_CLEANUP_INTERVAL_SECONDS") .ok() .and_then(|s| s.parse::().ok()) .unwrap_or(86400); // 24 hours info!("Starting orphaned playlist cleanup job"); info!(" Cleanup interval: {} seconds", cleanup_interval_secs); info!(" Playlist directory: {}", video_path); loop { std::thread::sleep(Duration::from_secs(cleanup_interval_secs)); info!("Running orphaned playlist cleanup"); let start = std::time::Instant::now(); let mut deleted_count = 0; let mut error_count = 0; // Find all .m3u8 files in VIDEO_PATH let playlists: Vec = WalkDir::new(&video_path) .into_iter() .filter_map(|e| e.ok()) .filter(|e| e.file_type().is_file()) .filter(|e| { e.path() .extension() .and_then(|s| s.to_str()) .map(|ext| ext.eq_ignore_ascii_case("m3u8")) .unwrap_or(false) }) .map(|e| e.path().to_path_buf()) .collect(); info!("Found {} playlist files to check", playlists.len()); for playlist_path in playlists { // Extract the original video filename from playlist name // Playlist format: {VIDEO_PATH}/{original_filename}.m3u8 if let Some(filename) = playlist_path.file_stem() { let video_filename = filename.to_string_lossy(); // Search for this video file in BASE_PATH let mut video_exists = false; for entry in WalkDir::new(&base_path) .into_iter() .filter_map(|e| e.ok()) .filter(|e| e.file_type().is_file()) { if let Some(entry_stem) = entry.path().file_stem() && entry_stem == filename && is_video_file(entry.path()) { video_exists = true; break; } } if !video_exists { debug!( "Source video for playlist {} no longer exists, deleting", playlist_path.display() ); // Delete the playlist file if let Err(e) = std::fs::remove_file(&playlist_path) { warn!( "Failed to delete playlist {}: {}", playlist_path.display(), e ); error_count += 1; } else { deleted_count += 1; // Also try to delete associated .ts segment files // They are typically named {filename}N.ts in the same directory if let Some(parent_dir) = playlist_path.parent() { for entry in WalkDir::new(parent_dir) .max_depth(1) .into_iter() .filter_map(|e| e.ok()) .filter(|e| e.file_type().is_file()) { let entry_path = entry.path(); if let Some(ext) = entry_path.extension() && ext.eq_ignore_ascii_case("ts") { // Check if this .ts file belongs to our playlist if let Some(ts_stem) = entry_path.file_stem() { let ts_name = ts_stem.to_string_lossy(); if ts_name.starts_with(&*video_filename) { if let Err(e) = std::fs::remove_file(entry_path) { debug!( "Failed to delete segment {}: {}", entry_path.display(), e ); } else { debug!( "Deleted segment: {}", entry_path.display() ); } } } } } } } } } } info!( "Orphaned playlist cleanup completed in {:?}: deleted {} playlists, {} errors", start.elapsed(), deleted_count, error_count ); } }); } fn watch_files( playlist_manager: Addr, preview_generator: Addr, ) { std::thread::spawn(move || { let base_str = dotenv::var("BASE_PATH").unwrap(); let base_path = PathBuf::from(&base_str); // Get polling intervals from environment variables // Quick scan: Check recently modified files (default: 60 seconds) let quick_interval_secs = dotenv::var("WATCH_QUICK_INTERVAL_SECONDS") .ok() .and_then(|s| s.parse::().ok()) .unwrap_or(60); // Full scan: Check all files regardless of modification time (default: 3600 seconds = 1 hour) let full_interval_secs = dotenv::var("WATCH_FULL_INTERVAL_SECONDS") .ok() .and_then(|s| s.parse::().ok()) .unwrap_or(3600); info!("Starting optimized file watcher"); info!(" Quick scan interval: {} seconds", quick_interval_secs); info!(" Full scan interval: {} seconds", full_interval_secs); info!(" Watching directory: {}", base_str); // Create DAOs for tracking processed files let exif_dao = Arc::new(Mutex::new( Box::new(SqliteExifDao::new()) as Box )); let preview_dao = Arc::new(Mutex::new( Box::new(SqlitePreviewDao::new()) as Box )); let mut last_quick_scan = SystemTime::now(); let mut last_full_scan = SystemTime::now(); let mut scan_count = 0u64; loop { std::thread::sleep(Duration::from_secs(quick_interval_secs)); let now = SystemTime::now(); let since_last_full = now .duration_since(last_full_scan) .unwrap_or(Duration::from_secs(0)); let is_full_scan = since_last_full.as_secs() >= full_interval_secs; if is_full_scan { info!("Running full scan (scan #{})", scan_count); process_new_files( &base_path, Arc::clone(&exif_dao), Arc::clone(&preview_dao), None, playlist_manager.clone(), preview_generator.clone(), ); last_full_scan = now; } else { debug!( "Running quick scan (checking files modified in last {} seconds)", quick_interval_secs + 10 ); // Check files modified since last quick scan, plus 10 second buffer let check_since = last_quick_scan .checked_sub(Duration::from_secs(10)) .unwrap_or(last_quick_scan); process_new_files( &base_path, Arc::clone(&exif_dao), Arc::clone(&preview_dao), Some(check_since), playlist_manager.clone(), preview_generator.clone(), ); } last_quick_scan = now; scan_count += 1; // Update media counts update_media_counts(&base_path); } }); } /// Check if a playlist needs to be (re)generated /// Returns true if: /// - Playlist doesn't exist, OR /// - Source video is newer than the playlist fn playlist_needs_generation(video_path: &Path, playlist_path: &Path) -> bool { if !playlist_path.exists() { return true; } // Check if source video is newer than playlist if let (Ok(video_meta), Ok(playlist_meta)) = ( std::fs::metadata(video_path), std::fs::metadata(playlist_path), ) && let (Ok(video_modified), Ok(playlist_modified)) = (video_meta.modified(), playlist_meta.modified()) { return video_modified > playlist_modified; } // If we can't determine, assume it needs generation true } fn process_new_files( base_path: &Path, exif_dao: Arc>>, preview_dao: Arc>>, modified_since: Option, playlist_manager: Addr, preview_generator: Addr, ) { let context = opentelemetry::Context::new(); let thumbs = dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined"); let thumbnail_directory = Path::new(&thumbs); // Collect all image and video files, optionally filtered by modification time let files: Vec<(PathBuf, String)> = WalkDir::new(base_path) .into_iter() .filter_map(|entry| entry.ok()) .filter(|entry| entry.file_type().is_file()) .filter(|entry| { // Filter by modification time if specified if let Some(since) = modified_since { if let Ok(metadata) = entry.metadata() && let Ok(modified) = metadata.modified() { return modified >= since; } // If we can't get metadata, include the file to be safe return true; } true }) .filter(|entry| is_image(entry) || is_video(entry)) .filter_map(|entry| { let file_path = entry.path().to_path_buf(); let relative_path = file_path .strip_prefix(base_path) .ok()? .to_str()? .to_string(); Some((file_path, relative_path)) }) .collect(); if files.is_empty() { debug!("No files to process"); return; } debug!("Found {} files to check", files.len()); // Batch query: Get all EXIF data for these files in one query let file_paths: Vec = files.iter().map(|(_, rel_path)| rel_path.clone()).collect(); let existing_exif_paths: HashMap = { let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); match dao.get_exif_batch(&context, &file_paths) { Ok(exif_records) => exif_records .into_iter() .map(|record| (record.file_path, true)) .collect(), Err(e) => { error!("Error batch querying EXIF data: {:?}", e); HashMap::new() } } }; let mut new_files_found = false; let mut files_needing_exif = Vec::new(); // Check each file for missing thumbnail or EXIF data for (file_path, relative_path) in &files { // Check if thumbnail exists let thumb_path = thumbnail_directory.join(relative_path); let needs_thumbnail = !thumb_path.exists(); // Check if EXIF data exists (for supported files) let needs_exif = if exif::supports_exif(file_path) { !existing_exif_paths.contains_key(relative_path) } else { false }; if needs_thumbnail || needs_exif { new_files_found = true; if needs_thumbnail { info!("New file detected (missing thumbnail): {}", relative_path); } if needs_exif { files_needing_exif.push((file_path.clone(), relative_path.clone())); } } } // Process EXIF data for files that need it if !files_needing_exif.is_empty() { info!( "Processing EXIF data for {} files", files_needing_exif.len() ); for (file_path, relative_path) in files_needing_exif { match exif::extract_exif_from_path(&file_path) { Ok(exif_data) => { let timestamp = Utc::now().timestamp(); let insert_exif = InsertImageExif { file_path: relative_path.clone(), camera_make: exif_data.camera_make, camera_model: exif_data.camera_model, lens_model: exif_data.lens_model, width: exif_data.width, height: exif_data.height, orientation: exif_data.orientation, gps_latitude: exif_data.gps_latitude.map(|v| v as f32), gps_longitude: exif_data.gps_longitude.map(|v| v as f32), gps_altitude: exif_data.gps_altitude.map(|v| v as f32), focal_length: exif_data.focal_length.map(|v| v as f32), aperture: exif_data.aperture.map(|v| v as f32), shutter_speed: exif_data.shutter_speed, iso: exif_data.iso, date_taken: exif_data.date_taken, created_time: timestamp, last_modified: timestamp, }; let mut dao = exif_dao.lock().expect("Unable to lock ExifDao"); if let Err(e) = dao.store_exif(&context, insert_exif) { error!("Failed to store EXIF data for {}: {:?}", relative_path, e); } else { debug!("EXIF data stored for {}", relative_path); } } Err(e) => { debug!( "No EXIF data or error extracting from {}: {:?}", file_path.display(), e ); } } } } // Check for videos that need HLS playlists let video_path_base = dotenv::var("VIDEO_PATH").expect("VIDEO_PATH must be set"); let mut videos_needing_playlists = Vec::new(); for (file_path, _relative_path) in &files { if is_video_file(file_path) { // Construct expected playlist path let playlist_filename = format!("{}.m3u8", file_path.file_name().unwrap().to_string_lossy()); let playlist_path = Path::new(&video_path_base).join(&playlist_filename); // Check if playlist needs (re)generation if playlist_needs_generation(file_path, &playlist_path) { videos_needing_playlists.push(file_path.clone()); } } } // Send queue request to playlist manager if !videos_needing_playlists.is_empty() { playlist_manager.do_send(QueueVideosMessage { video_paths: videos_needing_playlists, }); } // Check for videos that need preview clips // Collect (full_path, relative_path) for video files let video_files: Vec<(String, String)> = files .iter() .filter(|(file_path, _)| is_video_file(file_path)) .map(|(file_path, rel_path)| (file_path.to_string_lossy().to_string(), rel_path.clone())) .collect(); if !video_files.is_empty() { // Query DB using relative paths (consistent with how GET/POST handlers store them) let video_rel_paths: Vec = video_files.iter().map(|(_, rel)| rel.clone()).collect(); let existing_previews: HashMap = { let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); match dao.get_previews_batch(&context, &video_rel_paths) { Ok(clips) => clips .into_iter() .map(|clip| (clip.file_path, clip.status)) .collect(), Err(e) => { error!("Error batch querying preview clips: {:?}", e); HashMap::new() } } }; for (full_path, relative_path) in &video_files { let status = existing_previews.get(relative_path).map(|s| s.as_str()); let needs_preview = match status { None => true, // No record at all Some("failed") => true, // Retry failed Some("pending") => true, // Stale pending from previous run _ => false, // processing or complete }; if needs_preview { // Insert pending record using relative path if status.is_none() { let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); let _ = dao.insert_preview(&context, relative_path, "pending"); } // Send full path in the message — the actor will derive relative path from it preview_generator.do_send(GeneratePreviewClipMessage { video_path: full_path.clone(), }); } } } // Generate thumbnails for all files that need them if new_files_found { info!("Processing thumbnails for new files..."); create_thumbnails(); } } #[cfg(test)] mod tests { use super::*; use crate::data::Claims; use crate::database::PreviewDao; use crate::testhelpers::TestPreviewDao; use actix_web::web::Data; fn make_token() -> String { let claims = Claims::valid_user("1".to_string()); jsonwebtoken::encode( &jsonwebtoken::Header::default(), &claims, &jsonwebtoken::EncodingKey::from_secret(b"test_key"), ) .unwrap() } fn make_preview_dao(dao: TestPreviewDao) -> Data>> { Data::new(Mutex::new(Box::new(dao) as Box)) } #[actix_rt::test] async fn test_get_preview_status_returns_pending_for_unknown() { let dao = TestPreviewDao::new(); let preview_dao = make_preview_dao(dao); let app_state = Data::new(AppState::test_state()); let token = make_token(); let app = actix_web::test::init_service( App::new() .service(get_preview_status) .app_data(app_state) .app_data(preview_dao.clone()), ) .await; let req = actix_web::test::TestRequest::post() .uri("/video/preview/status") .insert_header(("Authorization", format!("Bearer {}", token))) .set_json(serde_json::json!({"paths": ["photos/new_video.mp4"]})) .to_request(); let resp = actix_web::test::call_service(&app, req).await; assert_eq!(resp.status(), 200); let body: serde_json::Value = actix_web::test::read_body_json(resp).await; let previews = body["previews"].as_array().unwrap(); assert_eq!(previews.len(), 1); assert_eq!(previews[0]["status"], "pending"); // Verify the DAO now has a pending record let mut dao_lock = preview_dao.lock().unwrap(); let ctx = opentelemetry::Context::new(); let clip = dao_lock.get_preview(&ctx, "photos/new_video.mp4").unwrap(); assert!(clip.is_some()); assert_eq!(clip.unwrap().status, "pending"); } #[actix_rt::test] async fn test_get_preview_status_returns_complete_with_url() { let mut dao = TestPreviewDao::new(); let ctx = opentelemetry::Context::new(); dao.insert_preview(&ctx, "photos/done.mp4", "pending") .unwrap(); dao.update_status( &ctx, "photos/done.mp4", "complete", Some(9.5), Some(500000), None, ) .unwrap(); let preview_dao = make_preview_dao(dao); let app_state = Data::new(AppState::test_state()); let token = make_token(); let app = actix_web::test::init_service( App::new() .service(get_preview_status) .app_data(app_state) .app_data(preview_dao), ) .await; let req = actix_web::test::TestRequest::post() .uri("/video/preview/status") .insert_header(("Authorization", format!("Bearer {}", token))) .set_json(serde_json::json!({"paths": ["photos/done.mp4"]})) .to_request(); let resp = actix_web::test::call_service(&app, req).await; assert_eq!(resp.status(), 200); let body: serde_json::Value = actix_web::test::read_body_json(resp).await; let previews = body["previews"].as_array().unwrap(); assert_eq!(previews.len(), 1); assert_eq!(previews[0]["status"], "complete"); assert!( previews[0]["preview_url"] .as_str() .unwrap() .contains("photos%2Fdone.mp4") ); } #[actix_rt::test] async fn test_get_preview_status_rejects_over_200_paths() { let dao = TestPreviewDao::new(); let preview_dao = make_preview_dao(dao); let app_state = Data::new(AppState::test_state()); let token = make_token(); let app = actix_web::test::init_service( App::new() .service(get_preview_status) .app_data(app_state) .app_data(preview_dao), ) .await; let paths: Vec = (0..201).map(|i| format!("video_{}.mp4", i)).collect(); let req = actix_web::test::TestRequest::post() .uri("/video/preview/status") .insert_header(("Authorization", format!("Bearer {}", token))) .set_json(serde_json::json!({"paths": paths})) .to_request(); let resp = actix_web::test::call_service(&app, req).await; assert_eq!(resp.status(), 400); } #[actix_rt::test] async fn test_get_preview_status_mixed_statuses() { let mut dao = TestPreviewDao::new(); let ctx = opentelemetry::Context::new(); dao.insert_preview(&ctx, "a.mp4", "pending").unwrap(); dao.insert_preview(&ctx, "b.mp4", "pending").unwrap(); dao.update_status(&ctx, "b.mp4", "complete", Some(10.0), Some(100000), None) .unwrap(); let preview_dao = make_preview_dao(dao); let app_state = Data::new(AppState::test_state()); let token = make_token(); let app = actix_web::test::init_service( App::new() .service(get_preview_status) .app_data(app_state) .app_data(preview_dao), ) .await; let req = actix_web::test::TestRequest::post() .uri("/video/preview/status") .insert_header(("Authorization", format!("Bearer {}", token))) .set_json(serde_json::json!({"paths": ["a.mp4", "b.mp4", "c.mp4"]})) .to_request(); let resp = actix_web::test::call_service(&app, req).await; assert_eq!(resp.status(), 200); let body: serde_json::Value = actix_web::test::read_body_json(resp).await; let previews = body["previews"].as_array().unwrap(); assert_eq!(previews.len(), 3); // a.mp4 is pending assert_eq!(previews[0]["path"], "a.mp4"); assert_eq!(previews[0]["status"], "pending"); // b.mp4 is complete with URL assert_eq!(previews[1]["path"], "b.mp4"); assert_eq!(previews[1]["status"], "complete"); assert!(previews[1]["preview_url"].is_string()); // c.mp4 was not found — handler inserts pending assert_eq!(previews[2]["path"], "c.mp4"); assert_eq!(previews[2]["status"], "pending"); } /// Verifies that the status endpoint re-queues generation for stale /// "pending" and "failed" records (e.g., after a server restart or /// when clip files were deleted). The do_send to the actor exercises /// the re-queue code path; the actor runs against temp dirs so it /// won't panic. #[actix_rt::test] async fn test_get_preview_status_requeues_pending_and_failed() { let mut dao = TestPreviewDao::new(); let ctx = opentelemetry::Context::new(); // Simulate stale records left from a previous server run dao.insert_preview(&ctx, "stale/pending.mp4", "pending") .unwrap(); dao.insert_preview(&ctx, "stale/failed.mp4", "pending") .unwrap(); dao.update_status( &ctx, "stale/failed.mp4", "failed", None, None, Some("ffmpeg error"), ) .unwrap(); let preview_dao = make_preview_dao(dao); let app_state = Data::new(AppState::test_state()); let token = make_token(); let app = actix_web::test::init_service( App::new() .service(get_preview_status) .app_data(app_state) .app_data(preview_dao), ) .await; let req = actix_web::test::TestRequest::post() .uri("/video/preview/status") .insert_header(("Authorization", format!("Bearer {}", token))) .set_json(serde_json::json!({ "paths": ["stale/pending.mp4", "stale/failed.mp4"] })) .to_request(); let resp = actix_web::test::call_service(&app, req).await; assert_eq!(resp.status(), 200); let body: serde_json::Value = actix_web::test::read_body_json(resp).await; let previews = body["previews"].as_array().unwrap(); assert_eq!(previews.len(), 2); // Both records are returned with their current status assert_eq!(previews[0]["path"], "stale/pending.mp4"); assert_eq!(previews[0]["status"], "pending"); assert!(previews[0].get("preview_url").is_none()); assert_eq!(previews[1]["path"], "stale/failed.mp4"); assert_eq!(previews[1]["status"], "failed"); assert!(previews[1].get("preview_url").is_none()); } }