diff --git a/src/data/mod.rs b/src/data/mod.rs index 922d083..92a1404 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -1,9 +1,9 @@ use std::str::FromStr; -use actix_web::{dev, Error, FromRequest, http::header, HttpRequest}; use actix_web::error::ErrorUnauthorized; +use actix_web::{dev, http::header, Error, FromRequest, HttpRequest}; use futures::future::{err, ok, Ready}; -use jsonwebtoken::{Algorithm, decode, DecodingKey, Validation}; +use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation}; use serde::{Deserialize, Serialize}; #[derive(Serialize)] diff --git a/src/main.rs b/src/main.rs index e2e2b12..edec780 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,21 +2,24 @@ extern crate diesel; extern crate rayon; +use std::fs::File; +use std::io::prelude::*; +use std::path::{Path, PathBuf}; +use std::sync::mpsc::channel; +use std::sync::{Arc, Mutex}; + use actix_files::NamedFile; use actix_multipart as mp; use actix_web::web::{HttpRequest, HttpResponse, Json}; use actix_web::{get, post, web, App, HttpServer, Responder}; use chrono::{Duration, Utc}; -use data::{AddFavoriteRequest, LoginRequest, ThumbnailRequest}; use futures::stream::StreamExt; use jsonwebtoken::{encode, EncodingKey, Header}; use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher}; use rayon::prelude::*; use serde::Serialize; -use std::fs::File; -use std::io::prelude::*; -use std::path::{Path, PathBuf}; -use std::sync::mpsc::channel; + +use data::{AddFavoriteRequest, LoginRequest, ThumbnailRequest}; use crate::data::{secret_key, Claims, CreateAccountRequest, Token}; use crate::database::{add_favorite, create_user, get_favorites, get_user, user_exists}; @@ -171,14 +174,24 @@ async fn upload_image(_: Claims, mut payload: mp::Multipart) -> impl Responder { } #[post("/video/generate")] -async fn generate_video(_claims: Claims, body: web::Json) -> impl Responder { +async fn generate_video( + _claims: Claims, + data: web::Data, + body: web::Json, +) -> impl Responder { let filename = PathBuf::from(&body.path); if let Some(name) = filename.file_stem() { let filename = name.to_str().expect("Filename should convert to string"); let playlist = format!("tmp/{}.m3u8", filename); if let Some(path) = is_valid_path(&body.path) { - create_playlist(&path.to_str().unwrap(), &playlist); + if let Ok(mut stream) = data.stream_manager.lock() { + if let Ok(child) = create_playlist(&path.to_str().unwrap(), &playlist) { + stream.track_stream(&playlist, child) + } + } else { + let _ = create_playlist(&path.to_str().unwrap(), &playlist); + } } else { return HttpResponse::BadRequest().finish(); } @@ -305,7 +318,9 @@ async fn create_thumbnails() { async fn main() -> std::io::Result<()> { create_thumbnails().await; - tokio::spawn(async { + let stream_manager = Arc::new(Mutex::new(VideoStreamManager::new())); + let stream_manager_copy = stream_manager.clone(); + tokio::spawn(async move { let (wtx, wrx) = channel(); let mut watcher = watcher(wtx, std::time::Duration::from_secs(10)).unwrap(); watcher @@ -313,24 +328,25 @@ async fn main() -> std::io::Result<()> { .unwrap(); loop { - let ev = wrx.recv_timeout(std::time::Duration::from_secs(5)); - match ev { - Ok(event) => { - match event { - DebouncedEvent::Create(_) => create_thumbnails().await, - DebouncedEvent::Rename(_, _) => create_thumbnails().await, - _ => continue, - }; - } - Err(e) => { - println!("Event: {:?}", e); - // break; - } + if let Ok(mut manager) = stream_manager_copy.lock() { + manager.check_for_finished_streams(); + } + + let ev = wrx.recv_timeout(std::time::Duration::from_secs(10)); + if let Ok(event) = ev { + match event { + DebouncedEvent::Create(_) => create_thumbnails().await, + DebouncedEvent::Rename(_, _) => create_thumbnails().await, + _ => continue, + }; } } }); - HttpServer::new(|| { + let app_data = web::Data::new(AppState { + stream_manager: stream_manager.clone(), + }); + HttpServer::new(move || { App::new() .service(login) .service(list_photos) @@ -341,9 +357,14 @@ async fn main() -> std::io::Result<()> { .service(get_video_part) .service(favorites) .service(post_add_favorite) + .app_data(app_data.clone()) }) .bind(dotenv::var("BIND_URL").unwrap())? .bind("localhost:8088")? .run() .await } + +struct AppState { + stream_manager: Arc>, +} diff --git a/src/video.rs b/src/video.rs index 98027b9..46f60ab 100644 --- a/src/video.rs +++ b/src/video.rs @@ -1,13 +1,43 @@ +use std::collections::HashMap; +use std::io::Result; use std::path::Path; -use std::process::Command; +use std::process::{Child, Command, Stdio}; // ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8 // ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8 -pub fn create_playlist(video_path: &str, playlist_file: &str) { +pub(crate) struct VideoStreamManager { + streams: HashMap, +} + +impl VideoStreamManager { + pub(crate) fn new() -> VideoStreamManager { + VideoStreamManager { + streams: HashMap::new(), + } + } + + pub fn track_stream(&mut self, playlist_path: &dyn ToString, child_process: Child) { + println!("Tracking process for: {:?}", playlist_path.to_string()); + self.streams + .insert(playlist_path.to_string(), child_process); + } + + pub fn check_for_finished_streams(&mut self) { + self.streams.retain(|playlist_path, process| { + let is_done = process.try_wait().map_or(false, |status| status.is_some()); + if is_done { + println!("Removing process: {:?}", playlist_path) + } + !is_done + }); + } +} + +pub fn create_playlist(video_path: &str, playlist_file: &str) -> Result { if Path::new(playlist_file).exists() { println!("Playlist already exists: {}", playlist_file); - return; + return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)); } let result = Command::new("ffmpeg") @@ -26,16 +56,22 @@ pub fn create_playlist(video_path: &str, playlist_file: &str) { .arg("-vf") .arg("scale=1080:-2,setsar=1:1") .arg(playlist_file) + .stdout(Stdio::null()) + .stderr(Stdio::null()) .spawn(); let start_time = std::time::Instant::now(); loop { std::thread::sleep(std::time::Duration::from_secs(1)); - if Path::new(playlist_file).exists() || std::time::Instant::now() - start_time > std::time::Duration::from_secs(5) { + if Path::new(playlist_file).exists() + || std::time::Instant::now() - start_time > std::time::Duration::from_secs(5) + { break; } } + + result } pub fn generate_video_thumbnail(path: &Path, destination: &Path) {