From b595bdd6422bbc4c40092b607d03cd3a8259c53e Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Tue, 9 Feb 2021 21:30:27 -0500 Subject: [PATCH 1/3] Add VideoStreamManager for keeping track of active streams The stream manager should help prevent zombie processes and can later be used for stopping video streams if the user exits the video before finishing for example. --- src/data/mod.rs | 4 +-- src/main.rs | 65 ++++++++++++++++++++++++++++++++----------------- src/video.rs | 44 ++++++++++++++++++++++++++++++--- 3 files changed, 85 insertions(+), 28 deletions(-) diff --git a/src/data/mod.rs b/src/data/mod.rs index 922d083..92a1404 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -1,9 +1,9 @@ use std::str::FromStr; -use actix_web::{dev, Error, FromRequest, http::header, HttpRequest}; use actix_web::error::ErrorUnauthorized; +use actix_web::{dev, http::header, Error, FromRequest, HttpRequest}; use futures::future::{err, ok, Ready}; -use jsonwebtoken::{Algorithm, decode, DecodingKey, Validation}; +use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation}; use serde::{Deserialize, Serialize}; #[derive(Serialize)] diff --git a/src/main.rs b/src/main.rs index e2e2b12..edec780 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,21 +2,24 @@ extern crate diesel; extern crate rayon; +use std::fs::File; +use std::io::prelude::*; +use std::path::{Path, PathBuf}; +use std::sync::mpsc::channel; +use std::sync::{Arc, Mutex}; + use actix_files::NamedFile; use actix_multipart as mp; use actix_web::web::{HttpRequest, HttpResponse, Json}; use actix_web::{get, post, web, App, HttpServer, Responder}; use chrono::{Duration, Utc}; -use data::{AddFavoriteRequest, LoginRequest, ThumbnailRequest}; use futures::stream::StreamExt; use jsonwebtoken::{encode, EncodingKey, Header}; use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher}; use rayon::prelude::*; use serde::Serialize; -use std::fs::File; -use std::io::prelude::*; -use std::path::{Path, PathBuf}; -use std::sync::mpsc::channel; + +use data::{AddFavoriteRequest, LoginRequest, ThumbnailRequest}; use crate::data::{secret_key, Claims, CreateAccountRequest, Token}; use crate::database::{add_favorite, create_user, get_favorites, get_user, user_exists}; @@ -171,14 +174,24 @@ async fn upload_image(_: Claims, mut payload: mp::Multipart) -> impl Responder { } #[post("/video/generate")] -async fn generate_video(_claims: Claims, body: web::Json) -> impl Responder { +async fn generate_video( + _claims: Claims, + data: web::Data, + body: web::Json, +) -> impl Responder { let filename = PathBuf::from(&body.path); if let Some(name) = filename.file_stem() { let filename = name.to_str().expect("Filename should convert to string"); let playlist = format!("tmp/{}.m3u8", filename); if let Some(path) = is_valid_path(&body.path) { - create_playlist(&path.to_str().unwrap(), &playlist); + if let Ok(mut stream) = data.stream_manager.lock() { + if let Ok(child) = create_playlist(&path.to_str().unwrap(), &playlist) { + stream.track_stream(&playlist, child) + } + } else { + let _ = create_playlist(&path.to_str().unwrap(), &playlist); + } } else { return HttpResponse::BadRequest().finish(); } @@ -305,7 +318,9 @@ async fn create_thumbnails() { async fn main() -> std::io::Result<()> { create_thumbnails().await; - tokio::spawn(async { + let stream_manager = Arc::new(Mutex::new(VideoStreamManager::new())); + let stream_manager_copy = stream_manager.clone(); + tokio::spawn(async move { let (wtx, wrx) = channel(); let mut watcher = watcher(wtx, std::time::Duration::from_secs(10)).unwrap(); watcher @@ -313,24 +328,25 @@ async fn main() -> std::io::Result<()> { .unwrap(); loop { - let ev = wrx.recv_timeout(std::time::Duration::from_secs(5)); - match ev { - Ok(event) => { - match event { - DebouncedEvent::Create(_) => create_thumbnails().await, - DebouncedEvent::Rename(_, _) => create_thumbnails().await, - _ => continue, - }; - } - Err(e) => { - println!("Event: {:?}", e); - // break; - } + if let Ok(mut manager) = stream_manager_copy.lock() { + manager.check_for_finished_streams(); + } + + let ev = wrx.recv_timeout(std::time::Duration::from_secs(10)); + if let Ok(event) = ev { + match event { + DebouncedEvent::Create(_) => create_thumbnails().await, + DebouncedEvent::Rename(_, _) => create_thumbnails().await, + _ => continue, + }; } } }); - HttpServer::new(|| { + let app_data = web::Data::new(AppState { + stream_manager: stream_manager.clone(), + }); + HttpServer::new(move || { App::new() .service(login) .service(list_photos) @@ -341,9 +357,14 @@ async fn main() -> std::io::Result<()> { .service(get_video_part) .service(favorites) .service(post_add_favorite) + .app_data(app_data.clone()) }) .bind(dotenv::var("BIND_URL").unwrap())? .bind("localhost:8088")? .run() .await } + +struct AppState { + stream_manager: Arc>, +} diff --git a/src/video.rs b/src/video.rs index 98027b9..46f60ab 100644 --- a/src/video.rs +++ b/src/video.rs @@ -1,13 +1,43 @@ +use std::collections::HashMap; +use std::io::Result; use std::path::Path; -use std::process::Command; +use std::process::{Child, Command, Stdio}; // ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8 // ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8 -pub fn create_playlist(video_path: &str, playlist_file: &str) { +pub(crate) struct VideoStreamManager { + streams: HashMap, +} + +impl VideoStreamManager { + pub(crate) fn new() -> VideoStreamManager { + VideoStreamManager { + streams: HashMap::new(), + } + } + + pub fn track_stream(&mut self, playlist_path: &dyn ToString, child_process: Child) { + println!("Tracking process for: {:?}", playlist_path.to_string()); + self.streams + .insert(playlist_path.to_string(), child_process); + } + + pub fn check_for_finished_streams(&mut self) { + self.streams.retain(|playlist_path, process| { + let is_done = process.try_wait().map_or(false, |status| status.is_some()); + if is_done { + println!("Removing process: {:?}", playlist_path) + } + !is_done + }); + } +} + +pub fn create_playlist(video_path: &str, playlist_file: &str) -> Result { if Path::new(playlist_file).exists() { println!("Playlist already exists: {}", playlist_file); - return; + return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)); } let result = Command::new("ffmpeg") @@ -26,16 +56,22 @@ pub fn create_playlist(video_path: &str, playlist_file: &str) { .arg("-vf") .arg("scale=1080:-2,setsar=1:1") .arg(playlist_file) + .stdout(Stdio::null()) + .stderr(Stdio::null()) .spawn(); let start_time = std::time::Instant::now(); loop { std::thread::sleep(std::time::Duration::from_secs(1)); - if Path::new(playlist_file).exists() || std::time::Instant::now() - start_time > std::time::Duration::from_secs(5) { + if Path::new(playlist_file).exists() + || std::time::Instant::now() - start_time > std::time::Duration::from_secs(5) + { break; } } + + result } pub fn generate_video_thumbnail(path: &Path, destination: &Path) { From 45b4f0cd72fcb9f04976bf1d6fbf67984d1ff952 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Tue, 9 Feb 2021 21:32:22 -0500 Subject: [PATCH 2/3] Track relevant .idea files --- .gitignore | 9 +++++++++ .idea/image-api.iml | 11 +++++++++++ .idea/misc.xml | 6 ++++++ .idea/modules.xml | 8 ++++++++ .idea/vcs.xml | 6 ++++++ 5 files changed, 40 insertions(+) create mode 100644 .idea/image-api.iml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml diff --git a/.gitignore b/.gitignore index 6234c03..b4484c3 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,12 @@ database/target *.db .env + +# Default ignored files +.idea/shelf/ +.idea/workspace.xml +# Datasource local storage ignored files +.idea/dataSources* +.idea/dataSources.local.xml +# Editor-based HTTP Client requests +.idea/httpRequests/ diff --git a/.idea/image-api.iml b/.idea/image-api.iml new file mode 100644 index 0000000..c254557 --- /dev/null +++ b/.idea/image-api.iml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..28a804d --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..dbc19f7 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file From 11d1e9600a7b2881187a3f9a10110c7a71086081 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Thu, 11 Feb 2021 20:39:07 -0500 Subject: [PATCH 3/3] Use an Actor for the Stream watching --- Cargo.lock | 76 ++++++++++++++++++++++++++++++++++++++++++++++++---- Cargo.toml | 3 ++- src/main.rs | 55 +++++++++++++++++-------------------- src/video.rs | 50 ++++++++++++++++++---------------- 4 files changed, 125 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4580351..97ac18c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,30 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "actix" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1be241f88f3b1e7e9a3fbe3b5a8a0f6915b5a1d7ee0d9a248d3376d01068cc60" +dependencies = [ + "actix-rt", + "actix_derive", + "bitflags", + "bytes 0.5.6", + "crossbeam-channel 0.4.4", + "derive_more", + "futures-channel", + "futures-util", + "log", + "once_cell", + "parking_lot", + "pin-project 0.4.27", + "smallvec", + "tokio", + "tokio-util", + "trust-dns-proto", + "trust-dns-resolver", +] + [[package]] name = "actix-codec" version = "0.3.0" @@ -313,6 +338,17 @@ dependencies = [ "syn", ] +[[package]] +name = "actix_derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b95aceadaf327f18f0df5962fedc1bde2f870566a0b9f65c89508a3b1f79334c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "addr2line" version = "0.14.0" @@ -642,6 +678,16 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "crossbeam-channel" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" +dependencies = [ + "crossbeam-utils 0.7.2", + "maybe-uninit", +] + [[package]] name = "crossbeam-channel" version = "0.5.0" @@ -649,7 +695,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775" dependencies = [ "cfg-if 1.0.0", - "crossbeam-utils", + "crossbeam-utils 0.8.0", ] [[package]] @@ -660,7 +706,7 @@ checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9" dependencies = [ "cfg-if 1.0.0", "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-utils 0.8.0", ] [[package]] @@ -671,12 +717,23 @@ checksum = "ec0f606a85340376eef0d6d8fec399e6d4a544d648386c6645eb6d0653b27d9f" dependencies = [ "cfg-if 1.0.0", "const_fn", - "crossbeam-utils", + "crossbeam-utils 0.8.0", "lazy_static", "memoffset", "scopeguard", ] +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "lazy_static", +] + [[package]] name = "crossbeam-utils" version = "0.8.0" @@ -1146,6 +1203,7 @@ dependencies = [ name = "image-api" version = "0.1.0" dependencies = [ + "actix", "actix-cors", "actix-files", "actix-multipart", @@ -1357,6 +1415,12 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + [[package]] name = "memchr" version = "2.3.4" @@ -1824,9 +1888,9 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ab346ac5921dc62ffa9f89b7a773907511cdfa5490c572ae9be1be33e8afa4a" dependencies = [ - "crossbeam-channel", + "crossbeam-channel 0.5.0", "crossbeam-deque", - "crossbeam-utils", + "crossbeam-utils 0.8.0", "lazy_static", "num_cpus", ] @@ -2256,6 +2320,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6d7ad61edd59bfcc7e80dababf0f4aed2e6d5e0ba1659356ae889752dfc12ff" dependencies = [ "bytes 0.5.6", + "fnv", "futures-core", "iovec", "lazy_static", @@ -2277,6 +2342,7 @@ checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ "bytes 0.5.6", "futures-core", + "futures-io", "futures-sink", "log", "pin-project-lite 0.1.11", diff --git a/Cargo.toml b/Cargo.toml index 3136c5b..a1565e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,11 +7,12 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +actix = "0.10" actix-web = "3" actix-rt = "1" actix-files = "0.4" actix-multipart = "0.3.0" -actix-cors="0.5" +actix-cors = "0.5" futures = "0.3.5" jsonwebtoken = "7.2.0" serde = "1" diff --git a/src/main.rs b/src/main.rs index edec780..a7d9b09 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,23 +5,24 @@ extern crate rayon; use std::fs::File; use std::io::prelude::*; use std::path::{Path, PathBuf}; +use std::sync::{Arc}; use std::sync::mpsc::channel; -use std::sync::{Arc, Mutex}; +use actix::{Actor, Addr}; use actix_files::NamedFile; use actix_multipart as mp; +use actix_web::{App, get, HttpServer, post, Responder, web}; use actix_web::web::{HttpRequest, HttpResponse, Json}; -use actix_web::{get, post, web, App, HttpServer, Responder}; use chrono::{Duration, Utc}; use futures::stream::StreamExt; use jsonwebtoken::{encode, EncodingKey, Header}; -use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher}; +use notify::{DebouncedEvent, RecursiveMode, watcher, Watcher}; use rayon::prelude::*; use serde::Serialize; use data::{AddFavoriteRequest, LoginRequest, ThumbnailRequest}; -use crate::data::{secret_key, Claims, CreateAccountRequest, Token}; +use crate::data::{Claims, CreateAccountRequest, secret_key, Token}; use crate::database::{add_favorite, create_user, get_favorites, get_user, user_exists}; use crate::files::{is_valid_path, list_files}; use crate::video::*; @@ -59,7 +60,7 @@ async fn login(creds: Json) -> impl Responder { &claims, &EncodingKey::from_secret(secret_key().as_bytes()), ) - .unwrap(); + .unwrap(); HttpResponse::Ok().json(Token { token: &token }) } else { HttpResponse::NotFound().finish() @@ -185,12 +186,8 @@ async fn generate_video( let filename = name.to_str().expect("Filename should convert to string"); let playlist = format!("tmp/{}.m3u8", filename); if let Some(path) = is_valid_path(&body.path) { - if let Ok(mut stream) = data.stream_manager.lock() { - if let Ok(child) = create_playlist(&path.to_str().unwrap(), &playlist) { - stream.track_stream(&playlist, child) - } - } else { - let _ = create_playlist(&path.to_str().unwrap(), &playlist); + if let Ok(child) = create_playlist(&path.to_str().unwrap(), &playlist) { + data.stream_manager.do_send(ProcessMessage(playlist.clone(), child)); } } else { return HttpResponse::BadRequest().finish(); @@ -259,7 +256,7 @@ async fn post_add_favorite(claims: Claims, body: web::Json) } } -async fn create_thumbnails() { +fn create_thumbnails() { let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined"); let thumbnail_directory: &Path = Path::new(thumbs); @@ -314,13 +311,10 @@ async fn create_thumbnails() { println!("Finished"); } -#[actix_rt::main] -async fn main() -> std::io::Result<()> { - create_thumbnails().await; +fn main() -> std::io::Result<()> { + create_thumbnails(); - let stream_manager = Arc::new(Mutex::new(VideoStreamManager::new())); - let stream_manager_copy = stream_manager.clone(); - tokio::spawn(async move { + std::thread::spawn(|| { let (wtx, wrx) = channel(); let mut watcher = watcher(wtx, std::time::Duration::from_secs(10)).unwrap(); watcher @@ -328,24 +322,24 @@ async fn main() -> std::io::Result<()> { .unwrap(); loop { - if let Ok(mut manager) = stream_manager_copy.lock() { - manager.check_for_finished_streams(); - } - let ev = wrx.recv_timeout(std::time::Duration::from_secs(10)); if let Ok(event) = ev { match event { - DebouncedEvent::Create(_) => create_thumbnails().await, - DebouncedEvent::Rename(_, _) => create_thumbnails().await, + DebouncedEvent::Create(_) => create_thumbnails(), + DebouncedEvent::Rename(_, _) => create_thumbnails(), _ => continue, }; } } }); + let system = actix::System::new("Fileserver"); + let act = StreamActor {}.start(); + let app_data = web::Data::new(AppState { - stream_manager: stream_manager.clone(), + stream_manager: Arc::new(act), }); + HttpServer::new(move || { App::new() .service(login) @@ -359,12 +353,13 @@ async fn main() -> std::io::Result<()> { .service(post_add_favorite) .app_data(app_data.clone()) }) - .bind(dotenv::var("BIND_URL").unwrap())? - .bind("localhost:8088")? - .run() - .await + .bind(dotenv::var("BIND_URL").unwrap())? + .bind("localhost:8088")? + .run(); + + system.run() } struct AppState { - stream_manager: Arc>, + stream_manager: Arc>, } diff --git a/src/video.rs b/src/video.rs index 46f60ab..5797103 100644 --- a/src/video.rs +++ b/src/video.rs @@ -1,36 +1,40 @@ -use std::collections::HashMap; use std::io::Result; use std::path::Path; -use std::process::{Child, Command, Stdio}; +use std::process::{Child, Command, ExitStatus, Stdio}; + +use actix::prelude::*; // ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8 // ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8 -pub(crate) struct VideoStreamManager { - streams: HashMap, +pub struct StreamActor; + +impl Actor for StreamActor { + type Context = Context; } -impl VideoStreamManager { - pub(crate) fn new() -> VideoStreamManager { - VideoStreamManager { - streams: HashMap::new(), - } - } +pub struct ProcessMessage(pub String, pub Child); - pub fn track_stream(&mut self, playlist_path: &dyn ToString, child_process: Child) { - println!("Tracking process for: {:?}", playlist_path.to_string()); - self.streams - .insert(playlist_path.to_string(), child_process); - } +impl Message for ProcessMessage { + type Result = Result; +} - pub fn check_for_finished_streams(&mut self) { - self.streams.retain(|playlist_path, process| { - let is_done = process.try_wait().map_or(false, |status| status.is_some()); - if is_done { - println!("Removing process: {:?}", playlist_path) - } - !is_done - }); +impl Handler for StreamActor { + type Result = Result; + + fn handle(&mut self, msg: ProcessMessage, _ctx: &mut Self::Context) -> Self::Result { + println!("Message received"); + let mut process = msg.1; + let result = process.wait(); + + println!( + "Finished waiting for: {:?}. Code: {:?}", + msg.0, + result + .as_ref() + .map_or(-1, |status| status.code().unwrap_or(-1)) + ); + result } }