Add VideoWall feature: server-side preview clip generation and mobile grid view

Backend (Rust/Actix-web):
- Add video_preview_clips table and PreviewDao for tracking preview generation
- Add ffmpeg preview clip generator: 10 equally-spaced 1s segments at 480p with CUDA NVENC auto-detection
- Add PreviewClipGenerator actor with semaphore-limited concurrent processing
- Add GET /video/preview and POST /video/preview/status endpoints
- Extend file watcher to detect and queue previews for new videos
- Use relative paths consistently for DB storage (matching EXIF convention)

Frontend (React Native/Expo):
- Add VideoWall grid view with 2-3 column layout of looping preview clips
- Add VideoWallItem component with ActiveVideoPlayer sub-component for lifecycle management
- Add useVideoWall hook for batch status polling with 5s refresh
- Add navigation button in grid header (visible when videos exist)
- Use TextureView surface type to fix Android z-ordering issues
- Optimize memory: players only mount while visible via FlatList windowSize
- Configure ExoPlayer buffer options and caching for short clips
- Tap to toggle audio focus, long press to open in full viewer

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Cameron
2026-02-25 19:40:17 -05:00
parent 7a0da1ab4a
commit 19c099360e
19 changed files with 1691 additions and 12 deletions

View File

@@ -371,6 +371,29 @@ pub struct GpsPhotosResponse {
pub total: usize,
}
#[derive(Deserialize)]
pub struct PreviewClipRequest {
pub path: String,
}
#[derive(Deserialize)]
pub struct PreviewStatusRequest {
pub paths: Vec<String>,
}
#[derive(Serialize)]
pub struct PreviewStatusResponse {
pub previews: Vec<PreviewStatusItem>,
}
#[derive(Serialize)]
pub struct PreviewStatusItem {
pub path: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub preview_url: Option<String>,
}
#[cfg(test)]
mod tests {
use super::Claims;

View File

@@ -14,6 +14,7 @@ pub mod daily_summary_dao;
pub mod insights_dao;
pub mod location_dao;
pub mod models;
pub mod preview_dao;
pub mod schema;
pub mod search_dao;
@@ -21,6 +22,7 @@ pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao};
pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao};
pub use insights_dao::{InsightDao, SqliteInsightDao};
pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao};
pub use preview_dao::{PreviewDao, SqlitePreviewDao};
pub use search_dao::{SearchHistoryDao, SqliteSearchHistoryDao};
pub trait UserDao {

View File

@@ -1,4 +1,4 @@
use crate::database::schema::{favorites, image_exif, photo_insights, users};
use crate::database::schema::{favorites, image_exif, photo_insights, users, video_preview_clips};
use serde::Serialize;
#[derive(Insertable)]
@@ -93,3 +93,24 @@ pub struct PhotoInsight {
pub generated_at: i64,
pub model_version: String,
}
#[derive(Insertable)]
#[diesel(table_name = video_preview_clips)]
pub struct InsertVideoPreviewClip {
pub file_path: String,
pub status: String,
pub created_at: String,
pub updated_at: String,
}
#[derive(Serialize, Queryable, Clone, Debug)]
pub struct VideoPreviewClip {
pub id: i32,
pub file_path: String,
pub status: String,
pub duration_seconds: Option<f32>,
pub file_size_bytes: Option<i32>,
pub error_message: Option<String>,
pub created_at: String,
pub updated_at: String,
}

183
src/database/preview_dao.rs Normal file
View File

@@ -0,0 +1,183 @@
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use crate::database::models::{InsertVideoPreviewClip, VideoPreviewClip};
use crate::database::{connect, DbError, DbErrorKind};
use crate::otel::trace_db_call;
pub trait PreviewDao: Sync + Send {
fn insert_preview(
&mut self,
context: &opentelemetry::Context,
file_path_val: &str,
status_val: &str,
) -> Result<(), DbError>;
fn update_status(
&mut self,
context: &opentelemetry::Context,
file_path_val: &str,
status_val: &str,
duration: Option<f32>,
size: Option<i32>,
error: Option<&str>,
) -> Result<(), DbError>;
fn get_preview(
&mut self,
context: &opentelemetry::Context,
file_path_val: &str,
) -> Result<Option<VideoPreviewClip>, DbError>;
fn get_previews_batch(
&mut self,
context: &opentelemetry::Context,
file_paths: &[String],
) -> Result<Vec<VideoPreviewClip>, DbError>;
fn get_by_status(
&mut self,
context: &opentelemetry::Context,
status_val: &str,
) -> Result<Vec<VideoPreviewClip>, DbError>;
}
pub struct SqlitePreviewDao {
connection: Arc<Mutex<SqliteConnection>>,
}
impl Default for SqlitePreviewDao {
fn default() -> Self {
Self::new()
}
}
impl SqlitePreviewDao {
pub fn new() -> Self {
SqlitePreviewDao {
connection: Arc::new(Mutex::new(connect())),
}
}
}
impl PreviewDao for SqlitePreviewDao {
fn insert_preview(
&mut self,
context: &opentelemetry::Context,
file_path_val: &str,
status_val: &str,
) -> Result<(), DbError> {
trace_db_call(context, "insert", "insert_preview", |_span| {
use crate::database::schema::video_preview_clips::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get PreviewDao");
let now = chrono::Utc::now().to_rfc3339();
diesel::insert_or_ignore_into(video_preview_clips)
.values(InsertVideoPreviewClip {
file_path: file_path_val.to_string(),
status: status_val.to_string(),
created_at: now.clone(),
updated_at: now,
})
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::InsertError))
}
fn update_status(
&mut self,
context: &opentelemetry::Context,
file_path_val: &str,
status_val: &str,
duration: Option<f32>,
size: Option<i32>,
error: Option<&str>,
) -> Result<(), DbError> {
trace_db_call(context, "update", "update_preview_status", |_span| {
use crate::database::schema::video_preview_clips::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get PreviewDao");
let now = chrono::Utc::now().to_rfc3339();
diesel::update(video_preview_clips.filter(file_path.eq(file_path_val)))
.set((
status.eq(status_val),
duration_seconds.eq(duration),
file_size_bytes.eq(size),
error_message.eq(error),
updated_at.eq(&now),
))
.execute(connection.deref_mut())
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Update error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn get_preview(
&mut self,
context: &opentelemetry::Context,
file_path_val: &str,
) -> Result<Option<VideoPreviewClip>, DbError> {
trace_db_call(context, "query", "get_preview", |_span| {
use crate::database::schema::video_preview_clips::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get PreviewDao");
match video_preview_clips
.filter(file_path.eq(file_path_val))
.first::<VideoPreviewClip>(connection.deref_mut())
{
Ok(clip) => Ok(Some(clip)),
Err(diesel::result::Error::NotFound) => Ok(None),
Err(e) => Err(anyhow::anyhow!("Query error: {}", e)),
}
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_previews_batch(
&mut self,
context: &opentelemetry::Context,
file_paths: &[String],
) -> Result<Vec<VideoPreviewClip>, DbError> {
trace_db_call(context, "query", "get_previews_batch", |_span| {
use crate::database::schema::video_preview_clips::dsl::*;
if file_paths.is_empty() {
return Ok(Vec::new());
}
let mut connection = self.connection.lock().expect("Unable to get PreviewDao");
video_preview_clips
.filter(file_path.eq_any(file_paths))
.load::<VideoPreviewClip>(connection.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_by_status(
&mut self,
context: &opentelemetry::Context,
status_val: &str,
) -> Result<Vec<VideoPreviewClip>, DbError> {
trace_db_call(context, "query", "get_previews_by_status", |_span| {
use crate::database::schema::video_preview_clips::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get PreviewDao");
video_preview_clips
.filter(status.eq(status_val))
.load::<VideoPreviewClip>(connection.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
}

View File

@@ -152,6 +152,19 @@ diesel::table! {
}
}
diesel::table! {
video_preview_clips (id) {
id -> Integer,
file_path -> Text,
status -> Text,
duration_seconds -> Nullable<Float>,
file_size_bytes -> Nullable<Integer>,
error_message -> Nullable<Text>,
created_at -> Text,
updated_at -> Text,
}
}
diesel::joinable!(tagged_photo -> tags (tag_id));
diesel::allow_tables_to_appear_in_same_query!(
@@ -167,4 +180,5 @@ diesel::allow_tables_to_appear_in_same_query!(
tagged_photo,
tags,
users,
video_preview_clips,
);

View File

@@ -46,8 +46,8 @@ use crate::service::ServiceBuilder;
use crate::state::AppState;
use crate::tags::*;
use crate::video::actors::{
ProcessMessage, QueueVideosMessage, ScanDirectoryMessage, VideoPlaylistManager,
create_playlist, generate_video_thumbnail,
GeneratePreviewClipMessage, ProcessMessage, QueueVideosMessage, ScanDirectoryMessage,
VideoPlaylistManager, create_playlist, generate_video_thumbnail,
};
use log::{debug, error, info, trace, warn};
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
@@ -583,6 +583,225 @@ async fn get_video_part(
}
}
#[get("/video/preview")]
async fn get_video_preview(
_claims: Claims,
request: HttpRequest,
req: web::Query<PreviewClipRequest>,
app_state: Data<AppState>,
preview_dao: Data<Mutex<Box<dyn PreviewDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_video_preview", &context);
// Validate path
let full_path = match is_valid_full_path(&app_state.base_path, &req.path, true) {
Some(path) => path,
None => {
span.set_status(Status::error("Invalid path"));
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "Invalid path"}));
}
};
let full_path_str = full_path.to_string_lossy().to_string();
// Use relative path (from BASE_PATH) for DB storage, consistent with EXIF convention
let relative_path = full_path_str
.strip_prefix(&app_state.base_path)
.unwrap_or(&full_path_str)
.trim_start_matches(['/', '\\'])
.to_string();
// Check preview status in DB
let preview = {
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
dao.get_preview(&context, &relative_path)
};
match preview {
Ok(Some(clip)) => match clip.status.as_str() {
"complete" => {
let preview_path = PathBuf::from(&app_state.preview_clips_path)
.join(&relative_path)
.with_extension("mp4");
match NamedFile::open(&preview_path) {
Ok(file) => {
span.set_status(Status::Ok);
file.into_response(&request)
}
Err(_) => {
// File missing on disk but DB says complete - reset and regenerate
let mut dao =
preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.update_status(
&context,
&relative_path,
"pending",
None,
None,
None,
);
app_state
.preview_clip_generator
.do_send(GeneratePreviewClipMessage {
video_path: full_path_str,
});
span.set_status(Status::Ok);
HttpResponse::Accepted().json(serde_json::json!({
"status": "processing",
"path": req.path
}))
}
}
}
"processing" => {
span.set_status(Status::Ok);
HttpResponse::Accepted().json(serde_json::json!({
"status": "processing",
"path": req.path
}))
}
"failed" => {
let error_msg =
clip.error_message.unwrap_or_else(|| "Unknown error".to_string());
span.set_status(Status::error(format!(
"Generation failed: {}",
error_msg
)));
HttpResponse::InternalServerError().json(serde_json::json!({
"error": format!("Generation failed: {}", error_msg)
}))
}
_ => {
// pending or unknown status - trigger generation
app_state
.preview_clip_generator
.do_send(GeneratePreviewClipMessage {
video_path: full_path_str,
});
span.set_status(Status::Ok);
HttpResponse::Accepted().json(serde_json::json!({
"status": "processing",
"path": req.path
}))
}
},
Ok(None) => {
// No record exists - insert as pending and trigger generation
{
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.insert_preview(&context, &relative_path, "pending");
}
app_state
.preview_clip_generator
.do_send(GeneratePreviewClipMessage {
video_path: full_path_str,
});
span.set_status(Status::Ok);
HttpResponse::Accepted().json(serde_json::json!({
"status": "processing",
"path": req.path
}))
}
Err(_) => {
span.set_status(Status::error("Database error"));
HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}))
}
}
}
#[post("/video/preview/status")]
async fn get_preview_status(
_claims: Claims,
request: HttpRequest,
body: web::Json<PreviewStatusRequest>,
app_state: Data<AppState>,
preview_dao: Data<Mutex<Box<dyn PreviewDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_preview_status", &context);
// Limit to 200 paths per request
if body.paths.len() > 200 {
span.set_status(Status::error("Too many paths"));
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "Maximum 200 paths per request"}));
}
let previews = {
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
dao.get_previews_batch(&context, &body.paths)
};
match previews {
Ok(clips) => {
// Build a map of file_path -> VideoPreviewClip for quick lookup
let clip_map: HashMap<String, _> = clips
.into_iter()
.map(|clip| (clip.file_path.clone(), clip))
.collect();
let mut items: Vec<PreviewStatusItem> = Vec::with_capacity(body.paths.len());
for path in &body.paths {
if let Some(clip) = clip_map.get(path) {
items.push(PreviewStatusItem {
path: path.clone(),
status: clip.status.clone(),
preview_url: if clip.status == "complete" {
Some(format!(
"/video/preview?path={}",
urlencoding::encode(path)
))
} else {
None
},
});
} else {
// No record exists — insert as pending and trigger generation
{
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.insert_preview(&context, path, "pending");
}
// Build full path for ffmpeg (actor needs the absolute path for input)
let full_path = format!(
"{}/{}",
app_state.base_path.trim_end_matches(['/', '\\']),
path.trim_start_matches(['/', '\\'])
);
info!("Triggering preview generation for '{}'", path);
app_state
.preview_clip_generator
.do_send(GeneratePreviewClipMessage {
video_path: full_path,
});
items.push(PreviewStatusItem {
path: path.clone(),
status: "pending".to_string(),
preview_url: None,
});
}
}
span.set_status(Status::Ok);
HttpResponse::Ok().json(PreviewStatusResponse { previews: items })
}
Err(_) => {
span.set_status(Status::error("Database error"));
HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}))
}
}
}
#[get("image/favorites")]
async fn favorites(
claims: Claims,
@@ -836,9 +1055,10 @@ fn main() -> std::io::Result<()> {
directory: app_state.base_path.clone(),
});
// Start file watcher with playlist manager
// Start file watcher with playlist manager and preview generator
let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone();
watch_files(playlist_mgr_for_watcher);
let preview_gen_for_watcher = app_state.preview_clip_generator.as_ref().clone();
watch_files(playlist_mgr_for_watcher, preview_gen_for_watcher);
// Start orphaned playlist cleanup job
cleanup_orphaned_playlists();
@@ -855,7 +1075,8 @@ fn main() -> std::io::Result<()> {
let start_date = Some(NaiveDate::from_ymd_opt(2015, 10, 1).unwrap());
let end_date = Some(NaiveDate::from_ymd_opt(2020, 1, 1).unwrap());
let contacts_to_summarize = vec!["Domenique", "Zach", "Paul"]; // Add more contacts as needed
// let contacts_to_summarize = vec!["Domenique", "Zach", "Paul"]; // Add more contacts as needed
let contacts_to_summarize = vec![]; // Add more contacts as needed
let ollama = app_state.ollama.clone();
let sms_client = app_state.sms_client.clone();
@@ -895,6 +1116,7 @@ fn main() -> std::io::Result<()> {
let tag_dao = SqliteTagDao::default();
let exif_dao = SqliteExifDao::new();
let insight_dao = SqliteInsightDao::new();
let preview_dao = SqlitePreviewDao::new();
let cors = Cors::default()
.allowed_origin_fn(|origin, _req_head| {
// Allow all origins in development, or check against CORS_ALLOWED_ORIGINS env var
@@ -944,6 +1166,8 @@ fn main() -> std::io::Result<()> {
.service(upload_image)
.service(generate_video)
.service(stream_video)
.service(get_video_preview)
.service(get_preview_status)
.service(get_video_part)
.service(favorites)
.service(put_add_favorite)
@@ -971,6 +1195,9 @@ fn main() -> std::io::Result<()> {
.app_data::<Data<Mutex<Box<dyn InsightDao>>>>(Data::new(Mutex::new(Box::new(
insight_dao,
))))
.app_data::<Data<Mutex<Box<dyn PreviewDao>>>>(Data::new(Mutex::new(Box::new(
preview_dao,
))))
.app_data::<Data<InsightGenerator>>(Data::new(app_data.insight_generator.clone()))
.wrap(prometheus.clone())
})
@@ -1118,7 +1345,10 @@ fn cleanup_orphaned_playlists() {
});
}
fn watch_files(playlist_manager: Addr<VideoPlaylistManager>) {
fn watch_files(
playlist_manager: Addr<VideoPlaylistManager>,
preview_generator: actix::Addr<crate::video::actors::PreviewClipGenerator>,
) {
std::thread::spawn(move || {
let base_str = dotenv::var("BASE_PATH").unwrap();
let base_path = PathBuf::from(&base_str);
@@ -1141,10 +1371,13 @@ fn watch_files(playlist_manager: Addr<VideoPlaylistManager>) {
info!(" Full scan interval: {} seconds", full_interval_secs);
info!(" Watching directory: {}", base_str);
// Create EXIF DAO for tracking processed files
// Create DAOs for tracking processed files
let exif_dao = Arc::new(Mutex::new(
Box::new(SqliteExifDao::new()) as Box<dyn ExifDao>
));
let preview_dao = Arc::new(Mutex::new(
Box::new(SqlitePreviewDao::new()) as Box<dyn PreviewDao>
));
let mut last_quick_scan = SystemTime::now();
let mut last_full_scan = SystemTime::now();
@@ -1165,8 +1398,10 @@ fn watch_files(playlist_manager: Addr<VideoPlaylistManager>) {
process_new_files(
&base_path,
Arc::clone(&exif_dao),
Arc::clone(&preview_dao),
None,
playlist_manager.clone(),
preview_generator.clone(),
);
last_full_scan = now;
} else {
@@ -1181,8 +1416,10 @@ fn watch_files(playlist_manager: Addr<VideoPlaylistManager>) {
process_new_files(
&base_path,
Arc::clone(&exif_dao),
Arc::clone(&preview_dao),
Some(check_since),
playlist_manager.clone(),
preview_generator.clone(),
);
}
@@ -1221,8 +1458,10 @@ fn playlist_needs_generation(video_path: &Path, playlist_path: &Path) -> bool {
fn process_new_files(
base_path: &Path,
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
modified_since: Option<SystemTime>,
playlist_manager: Addr<VideoPlaylistManager>,
preview_generator: actix::Addr<crate::video::actors::PreviewClipGenerator>,
) {
let context = opentelemetry::Context::new();
let thumbs = dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
@@ -1385,6 +1624,55 @@ fn process_new_files(
});
}
// Check for videos that need preview clips
// Collect (full_path, relative_path) for video files
let video_files: Vec<(String, String)> = files
.iter()
.filter(|(file_path, _)| is_video_file(file_path))
.map(|(file_path, rel_path)| (file_path.to_string_lossy().to_string(), rel_path.clone()))
.collect();
if !video_files.is_empty() {
// Query DB using relative paths (consistent with how GET/POST handlers store them)
let video_rel_paths: Vec<String> = video_files.iter().map(|(_, rel)| rel.clone()).collect();
let existing_previews: HashMap<String, String> = {
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
match dao.get_previews_batch(&context, &video_rel_paths) {
Ok(clips) => clips
.into_iter()
.map(|clip| (clip.file_path, clip.status))
.collect(),
Err(e) => {
error!("Error batch querying preview clips: {:?}", e);
HashMap::new()
}
}
};
for (full_path, relative_path) in &video_files {
let status = existing_previews.get(relative_path).map(|s| s.as_str());
let needs_preview = match status {
None => true, // No record at all
Some("failed") => true, // Retry failed
_ => false, // pending, processing, or complete
};
if needs_preview {
// Insert pending record using relative path
if status.is_none() {
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.insert_preview(&context, relative_path, "pending");
}
// Send full path in the message — the actor will derive relative path from it
preview_generator.do_send(GeneratePreviewClipMessage {
video_path: full_path.clone(),
});
}
}
}
// Generate thumbnails for all files that need them
if new_files_found {
info!("Processing thumbnails for new files...");

View File

@@ -4,7 +4,10 @@ use crate::database::{
SqliteCalendarEventDao, SqliteDailySummaryDao, SqliteExifDao, SqliteInsightDao,
SqliteLocationHistoryDao, SqliteSearchHistoryDao,
};
use crate::video::actors::{PlaylistGenerator, StreamActor, VideoPlaylistManager};
use crate::database::{PreviewDao, SqlitePreviewDao};
use crate::video::actors::{
PlaylistGenerator, PreviewClipGenerator, StreamActor, VideoPlaylistManager,
};
use actix::{Actor, Addr};
use std::env;
use std::sync::{Arc, Mutex};
@@ -12,10 +15,12 @@ use std::sync::{Arc, Mutex};
pub struct AppState {
pub stream_manager: Arc<Addr<StreamActor>>,
pub playlist_manager: Arc<Addr<VideoPlaylistManager>>,
pub preview_clip_generator: Arc<Addr<PreviewClipGenerator>>,
pub base_path: String,
pub thumbnail_path: String,
pub video_path: String,
pub gif_path: String,
pub preview_clips_path: String,
pub excluded_dirs: Vec<String>,
pub ollama: OllamaClient,
pub sms_client: SmsApiClient,
@@ -29,22 +34,32 @@ impl AppState {
thumbnail_path: String,
video_path: String,
gif_path: String,
preview_clips_path: String,
excluded_dirs: Vec<String>,
ollama: OllamaClient,
sms_client: SmsApiClient,
insight_generator: InsightGenerator,
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
) -> Self {
let playlist_generator = PlaylistGenerator::new();
let video_playlist_manager =
VideoPlaylistManager::new(video_path.clone(), playlist_generator.start());
let preview_clip_generator = PreviewClipGenerator::new(
preview_clips_path.clone(),
base_path.clone(),
preview_dao,
);
Self {
stream_manager,
playlist_manager: Arc::new(video_playlist_manager.start()),
preview_clip_generator: Arc::new(preview_clip_generator.start()),
base_path,
thumbnail_path,
video_path,
gif_path,
preview_clips_path,
excluded_dirs,
ollama,
sms_client,
@@ -94,6 +109,8 @@ impl Default for AppState {
Arc::new(Mutex::new(Box::new(SqliteExifDao::new())));
let daily_summary_dao: Arc<Mutex<Box<dyn DailySummaryDao>>> =
Arc::new(Mutex::new(Box::new(SqliteDailySummaryDao::new())));
let preview_dao: Arc<Mutex<Box<dyn PreviewDao>>> =
Arc::new(Mutex::new(Box::new(SqlitePreviewDao::new())));
// Initialize Google Takeout DAOs
let calendar_dao: Arc<Mutex<Box<dyn CalendarEventDao>>> =
@@ -119,16 +136,23 @@ impl Default for AppState {
base_path.clone(),
);
// Ensure preview clips directory exists
let preview_clips_path = env::var("PREVIEW_CLIPS_DIRECTORY")
.unwrap_or_else(|_| "preview_clips".to_string());
std::fs::create_dir_all(&preview_clips_path).expect("Failed to create PREVIEW_CLIPS_DIRECTORY");
Self::new(
Arc::new(StreamActor {}.start()),
base_path,
env::var("THUMBNAILS").expect("THUMBNAILS was not set in the env"),
env::var("VIDEO_PATH").expect("VIDEO_PATH was not set in the env"),
env::var("GIFS_DIRECTORY").expect("GIFS_DIRECTORY was not set in the env"),
preview_clips_path,
Self::parse_excluded_dirs(),
ollama,
sms_client,
insight_generator,
preview_dao,
)
}
}
@@ -142,10 +166,11 @@ impl AppState {
let temp_dir = tempfile::tempdir().expect("Failed to create temp directory");
let base_path = temp_dir.path().to_path_buf();
// Create subdirectories for thumbnails, videos, and gifs
// Create subdirectories for thumbnails, videos, gifs, and preview clips
let thumbnail_path = create_test_subdir(&base_path, "thumbnails");
let video_path = create_test_subdir(&base_path, "videos");
let gif_path = create_test_subdir(&base_path, "gifs");
let preview_clips_path = create_test_subdir(&base_path, "preview_clips");
// Initialize test AI clients
let ollama = OllamaClient::new(
@@ -186,6 +211,10 @@ impl AppState {
base_path_str.clone(),
);
// Initialize test preview DAO
let preview_dao: Arc<Mutex<Box<dyn PreviewDao>>> =
Arc::new(Mutex::new(Box::new(SqlitePreviewDao::new())));
// Create the AppState with the temporary paths
AppState::new(
Arc::new(StreamActor {}.start()),
@@ -193,10 +222,12 @@ impl AppState {
thumbnail_path.to_string_lossy().to_string(),
video_path.to_string_lossy().to_string(),
gif_path.to_string_lossy().to_string(),
preview_clips_path.to_string_lossy().to_string(),
Vec::new(), // No excluded directories for test state
ollama,
sms_client,
insight_generator,
preview_dao,
)
}
}

View File

@@ -1,5 +1,7 @@
use crate::database::PreviewDao;
use crate::is_video;
use crate::otel::global_tracer;
use crate::video::ffmpeg::generate_preview_clip;
use actix::prelude::*;
use futures::TryFutureExt;
use log::{debug, error, info, trace, warn};
@@ -8,7 +10,7 @@ use opentelemetry::trace::{Span, Status, Tracer};
use std::io::Result;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, ExitStatus, Stdio};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use tokio::sync::Semaphore;
use walkdir::{DirEntry, WalkDir};
// ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8
@@ -484,3 +486,118 @@ impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
})
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct GeneratePreviewClipMessage {
pub video_path: String,
}
pub struct PreviewClipGenerator {
semaphore: Arc<Semaphore>,
preview_clips_dir: String,
base_path: String,
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
}
impl PreviewClipGenerator {
pub fn new(
preview_clips_dir: String,
base_path: String,
preview_dao: Arc<Mutex<Box<dyn PreviewDao>>>,
) -> Self {
PreviewClipGenerator {
semaphore: Arc::new(Semaphore::new(2)),
preview_clips_dir,
base_path,
preview_dao,
}
}
}
impl Actor for PreviewClipGenerator {
type Context = Context<Self>;
}
impl Handler<GeneratePreviewClipMessage> for PreviewClipGenerator {
type Result = ResponseFuture<()>;
fn handle(
&mut self,
msg: GeneratePreviewClipMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
let semaphore = self.semaphore.clone();
let preview_clips_dir = self.preview_clips_dir.clone();
let base_path = self.base_path.clone();
let preview_dao = self.preview_dao.clone();
let video_path = msg.video_path;
Box::pin(async move {
let permit = semaphore
.acquire_owned()
.await
.expect("Unable to acquire preview semaphore");
// Compute relative path (from BASE_PATH) for DB operations, consistent with EXIF convention
let relative_path = video_path
.strip_prefix(&base_path)
.unwrap_or(&video_path)
.trim_start_matches(['/', '\\'])
.to_string();
// Update status to processing
{
let otel_ctx = opentelemetry::Context::current();
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.update_status(&otel_ctx, &relative_path, "processing", None, None, None);
}
// Compute output path: join preview_clips_dir with relative path, change ext to .mp4
let output_path = PathBuf::from(&preview_clips_dir)
.join(&relative_path)
.with_extension("mp4");
let output_str = output_path.to_string_lossy().to_string();
let video_path_owned = video_path.clone();
let relative_path_owned = relative_path.clone();
tokio::spawn(async move {
match generate_preview_clip(&video_path_owned, &output_str).await {
Ok((duration, size)) => {
info!(
"Preview clip complete for '{}' ({:.1}s, {} bytes)",
relative_path_owned, duration, size
);
let otel_ctx = opentelemetry::Context::current();
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.update_status(
&otel_ctx,
&relative_path_owned,
"complete",
Some(duration as f32),
Some(size as i32),
None,
);
}
Err(e) => {
error!(
"Failed to generate preview clip for '{}': {}",
relative_path_owned, e
);
let otel_ctx = opentelemetry::Context::current();
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
let _ = dao.update_status(
&otel_ctx,
&relative_path_owned,
"failed",
None,
None,
Some(&e.to_string()),
);
}
}
drop(permit);
});
})
}
}

View File

@@ -2,9 +2,40 @@ use futures::TryFutureExt;
use log::{debug, error, info, warn};
use std::io::Result;
use std::process::{Output, Stdio};
use std::sync::OnceLock;
use std::time::Instant;
use tokio::process::Command;
static NVENC_AVAILABLE: OnceLock<bool> = OnceLock::new();
/// Check if NVIDIA NVENC hardware encoder is available via ffmpeg.
async fn check_nvenc_available() -> bool {
Command::new("ffmpeg")
.args(["-hide_banner", "-encoders"])
.output()
.await
.map(|out| {
let stdout = String::from_utf8_lossy(&out.stdout);
stdout.contains("h264_nvenc")
})
.unwrap_or(false)
}
/// Returns whether NVENC is available, caching the result after first check.
async fn is_nvenc_available() -> bool {
if let Some(&available) = NVENC_AVAILABLE.get() {
return available;
}
let available = check_nvenc_available().await;
let _ = NVENC_AVAILABLE.set(available);
if available {
info!("CUDA NVENC hardware acceleration detected and enabled for preview clips");
} else {
info!("NVENC not available, using CPU encoding for preview clips");
}
available
}
pub struct Ffmpeg;
pub enum GifType {
@@ -152,7 +183,7 @@ impl Ffmpeg {
Ok(output_file.to_string())
}
async fn create_gif_from_frames(&self, frame_base_dir: &str, output_file: &str) -> Result<i32> {
pub async fn create_gif_from_frames(&self, frame_base_dir: &str, output_file: &str) -> Result<i32> {
let output = Command::new("ffmpeg")
.arg("-y")
.args(["-framerate", "4"])
@@ -183,3 +214,114 @@ impl Ffmpeg {
Ok(output.status.code().unwrap_or(-1))
}
}
/// Get video duration in seconds as f64 for precise interval calculation.
async fn get_duration_seconds(input_file: &str) -> Result<f64> {
Command::new("ffprobe")
.args(["-i", input_file])
.args(["-show_entries", "format=duration"])
.args(["-v", "quiet"])
.args(["-of", "csv=p=0"])
.output()
.await
.map(|out| String::from_utf8_lossy(&out.stdout).trim().to_string())
.and_then(|duration_str| {
duration_str
.parse::<f64>()
.map_err(|e| std::io::Error::other(e.to_string()))
})
}
/// Generate a preview clip from a video file.
///
/// Creates a ~10 second MP4 by extracting up to 10 equally-spaced 1-second segments
/// at 480p with H.264 video and AAC audio. For short videos (<10s), uses fewer segments.
/// For very short videos (<1s), transcodes the entire video.
///
/// Returns (duration_seconds, file_size_bytes) on success.
pub async fn generate_preview_clip(input_file: &str, output_file: &str) -> Result<(f64, u64)> {
info!("Generating preview clip for: '{}'", input_file);
let start = Instant::now();
let duration = get_duration_seconds(input_file).await?;
let use_nvenc = is_nvenc_available().await;
// Create parent directories for output
if let Some(parent) = std::path::Path::new(output_file).parent() {
std::fs::create_dir_all(parent)?;
}
let mut cmd = Command::new("ffmpeg");
cmd.arg("-y");
// Use CUDA hardware-accelerated decoding when available
if use_nvenc {
cmd.args(["-hwaccel", "cuda"]);
}
cmd.arg("-i").arg(input_file);
if duration < 1.0 {
// Very short video (<1s): transcode the whole thing to 480p MP4
cmd.args(["-vf", "scale=-2:480"]);
} else {
let segment_count = if duration < 10.0 {
duration.floor() as u32
} else {
10
};
let interval = duration / segment_count as f64;
let vf = format!(
"select='lt(mod(t,{:.4}),1)',setpts=N/FRAME_RATE/TB,scale=-2:480",
interval
);
let af = format!(
"aselect='lt(mod(t,{:.4}),1)',asetpts=N/SR/TB",
interval
);
cmd.args(["-vf", &vf]);
cmd.args(["-af", &af]);
}
// Use NVENC for encoding when available, otherwise fall back to libx264
if use_nvenc {
cmd.args(["-c:v", "h264_nvenc", "-preset", "p4", "-cq:v", "28"]);
} else {
cmd.args(["-c:v", "libx264", "-crf", "28", "-preset", "veryfast"]);
}
cmd.args(["-c:a", "aac"]);
cmd.arg(output_file);
cmd.stdout(Stdio::null());
cmd.stderr(Stdio::piped());
let output = cmd.output().await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(std::io::Error::other(format!(
"ffmpeg preview generation failed: {}",
stderr
)));
}
let metadata = std::fs::metadata(output_file)?;
let file_size = metadata.len();
let clip_duration = if duration < 1.0 {
duration
} else if duration < 10.0 {
duration.floor()
} else {
10.0
};
info!(
"Generated preview clip '{}' ({:.1}s, {} bytes) in {:?}",
output_file, clip_duration, file_size, start.elapsed()
);
Ok((clip_duration, file_size))
}