diff --git a/.gitignore b/.gitignore index 6234c03..b4484c3 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,12 @@ database/target *.db .env + +# Default ignored files +.idea/shelf/ +.idea/workspace.xml +# Datasource local storage ignored files +.idea/dataSources* +.idea/dataSources.local.xml +# Editor-based HTTP Client requests +.idea/httpRequests/ diff --git a/.idea/image-api.iml b/.idea/image-api.iml new file mode 100644 index 0000000..c254557 --- /dev/null +++ b/.idea/image-api.iml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..28a804d --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..dbc19f7 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 4580351..97ac18c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,30 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "actix" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1be241f88f3b1e7e9a3fbe3b5a8a0f6915b5a1d7ee0d9a248d3376d01068cc60" +dependencies = [ + "actix-rt", + "actix_derive", + "bitflags", + "bytes 0.5.6", + "crossbeam-channel 0.4.4", + "derive_more", + "futures-channel", + "futures-util", + "log", + "once_cell", + "parking_lot", + "pin-project 0.4.27", + "smallvec", + "tokio", + "tokio-util", + "trust-dns-proto", + "trust-dns-resolver", +] + [[package]] name = "actix-codec" version = "0.3.0" @@ -313,6 +338,17 @@ dependencies = [ "syn", ] +[[package]] +name = "actix_derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b95aceadaf327f18f0df5962fedc1bde2f870566a0b9f65c89508a3b1f79334c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "addr2line" version = "0.14.0" @@ -642,6 +678,16 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "crossbeam-channel" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" +dependencies = [ + "crossbeam-utils 0.7.2", + "maybe-uninit", +] + [[package]] name = "crossbeam-channel" version = "0.5.0" @@ -649,7 +695,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775" dependencies = [ "cfg-if 1.0.0", - "crossbeam-utils", + "crossbeam-utils 0.8.0", ] [[package]] @@ -660,7 +706,7 @@ checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9" dependencies = [ "cfg-if 1.0.0", "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-utils 0.8.0", ] [[package]] @@ -671,12 +717,23 @@ checksum = "ec0f606a85340376eef0d6d8fec399e6d4a544d648386c6645eb6d0653b27d9f" dependencies = [ "cfg-if 1.0.0", "const_fn", - "crossbeam-utils", + "crossbeam-utils 0.8.0", "lazy_static", "memoffset", "scopeguard", ] +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "lazy_static", +] + [[package]] name = "crossbeam-utils" version = "0.8.0" @@ -1146,6 +1203,7 @@ dependencies = [ name = "image-api" version = "0.1.0" dependencies = [ + "actix", "actix-cors", "actix-files", "actix-multipart", @@ -1357,6 +1415,12 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + [[package]] name = "memchr" version = "2.3.4" @@ -1824,9 +1888,9 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ab346ac5921dc62ffa9f89b7a773907511cdfa5490c572ae9be1be33e8afa4a" dependencies = [ - "crossbeam-channel", + "crossbeam-channel 0.5.0", "crossbeam-deque", - "crossbeam-utils", + "crossbeam-utils 0.8.0", "lazy_static", "num_cpus", ] @@ -2256,6 +2320,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6d7ad61edd59bfcc7e80dababf0f4aed2e6d5e0ba1659356ae889752dfc12ff" dependencies = [ "bytes 0.5.6", + "fnv", "futures-core", "iovec", "lazy_static", @@ -2277,6 +2342,7 @@ checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ "bytes 0.5.6", "futures-core", + "futures-io", "futures-sink", "log", "pin-project-lite 0.1.11", diff --git a/Cargo.toml b/Cargo.toml index 3136c5b..a1565e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,11 +7,12 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +actix = "0.10" actix-web = "3" actix-rt = "1" actix-files = "0.4" actix-multipart = "0.3.0" -actix-cors="0.5" +actix-cors = "0.5" futures = "0.3.5" jsonwebtoken = "7.2.0" serde = "1" diff --git a/src/data/mod.rs b/src/data/mod.rs index 922d083..92a1404 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -1,9 +1,9 @@ use std::str::FromStr; -use actix_web::{dev, Error, FromRequest, http::header, HttpRequest}; use actix_web::error::ErrorUnauthorized; +use actix_web::{dev, http::header, Error, FromRequest, HttpRequest}; use futures::future::{err, ok, Ready}; -use jsonwebtoken::{Algorithm, decode, DecodingKey, Validation}; +use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation}; use serde::{Deserialize, Serialize}; #[derive(Serialize)] diff --git a/src/main.rs b/src/main.rs index e2e2b12..a7d9b09 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,23 +2,27 @@ extern crate diesel; extern crate rayon; -use actix_files::NamedFile; -use actix_multipart as mp; -use actix_web::web::{HttpRequest, HttpResponse, Json}; -use actix_web::{get, post, web, App, HttpServer, Responder}; -use chrono::{Duration, Utc}; -use data::{AddFavoriteRequest, LoginRequest, ThumbnailRequest}; -use futures::stream::StreamExt; -use jsonwebtoken::{encode, EncodingKey, Header}; -use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher}; -use rayon::prelude::*; -use serde::Serialize; use std::fs::File; use std::io::prelude::*; use std::path::{Path, PathBuf}; +use std::sync::{Arc}; use std::sync::mpsc::channel; -use crate::data::{secret_key, Claims, CreateAccountRequest, Token}; +use actix::{Actor, Addr}; +use actix_files::NamedFile; +use actix_multipart as mp; +use actix_web::{App, get, HttpServer, post, Responder, web}; +use actix_web::web::{HttpRequest, HttpResponse, Json}; +use chrono::{Duration, Utc}; +use futures::stream::StreamExt; +use jsonwebtoken::{encode, EncodingKey, Header}; +use notify::{DebouncedEvent, RecursiveMode, watcher, Watcher}; +use rayon::prelude::*; +use serde::Serialize; + +use data::{AddFavoriteRequest, LoginRequest, ThumbnailRequest}; + +use crate::data::{Claims, CreateAccountRequest, secret_key, Token}; use crate::database::{add_favorite, create_user, get_favorites, get_user, user_exists}; use crate::files::{is_valid_path, list_files}; use crate::video::*; @@ -56,7 +60,7 @@ async fn login(creds: Json) -> impl Responder { &claims, &EncodingKey::from_secret(secret_key().as_bytes()), ) - .unwrap(); + .unwrap(); HttpResponse::Ok().json(Token { token: &token }) } else { HttpResponse::NotFound().finish() @@ -171,14 +175,20 @@ async fn upload_image(_: Claims, mut payload: mp::Multipart) -> impl Responder { } #[post("/video/generate")] -async fn generate_video(_claims: Claims, body: web::Json) -> impl Responder { +async fn generate_video( + _claims: Claims, + data: web::Data, + body: web::Json, +) -> impl Responder { let filename = PathBuf::from(&body.path); if let Some(name) = filename.file_stem() { let filename = name.to_str().expect("Filename should convert to string"); let playlist = format!("tmp/{}.m3u8", filename); if let Some(path) = is_valid_path(&body.path) { - create_playlist(&path.to_str().unwrap(), &playlist); + if let Ok(child) = create_playlist(&path.to_str().unwrap(), &playlist) { + data.stream_manager.do_send(ProcessMessage(playlist.clone(), child)); + } } else { return HttpResponse::BadRequest().finish(); } @@ -246,7 +256,7 @@ async fn post_add_favorite(claims: Claims, body: web::Json) } } -async fn create_thumbnails() { +fn create_thumbnails() { let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined"); let thumbnail_directory: &Path = Path::new(thumbs); @@ -301,11 +311,10 @@ async fn create_thumbnails() { println!("Finished"); } -#[actix_rt::main] -async fn main() -> std::io::Result<()> { - create_thumbnails().await; +fn main() -> std::io::Result<()> { + create_thumbnails(); - tokio::spawn(async { + std::thread::spawn(|| { let (wtx, wrx) = channel(); let mut watcher = watcher(wtx, std::time::Duration::from_secs(10)).unwrap(); watcher @@ -313,24 +322,25 @@ async fn main() -> std::io::Result<()> { .unwrap(); loop { - let ev = wrx.recv_timeout(std::time::Duration::from_secs(5)); - match ev { - Ok(event) => { - match event { - DebouncedEvent::Create(_) => create_thumbnails().await, - DebouncedEvent::Rename(_, _) => create_thumbnails().await, - _ => continue, - }; - } - Err(e) => { - println!("Event: {:?}", e); - // break; - } + let ev = wrx.recv_timeout(std::time::Duration::from_secs(10)); + if let Ok(event) = ev { + match event { + DebouncedEvent::Create(_) => create_thumbnails(), + DebouncedEvent::Rename(_, _) => create_thumbnails(), + _ => continue, + }; } } }); - HttpServer::new(|| { + let system = actix::System::new("Fileserver"); + let act = StreamActor {}.start(); + + let app_data = web::Data::new(AppState { + stream_manager: Arc::new(act), + }); + + HttpServer::new(move || { App::new() .service(login) .service(list_photos) @@ -341,9 +351,15 @@ async fn main() -> std::io::Result<()> { .service(get_video_part) .service(favorites) .service(post_add_favorite) + .app_data(app_data.clone()) }) - .bind(dotenv::var("BIND_URL").unwrap())? - .bind("localhost:8088")? - .run() - .await + .bind(dotenv::var("BIND_URL").unwrap())? + .bind("localhost:8088")? + .run(); + + system.run() +} + +struct AppState { + stream_manager: Arc>, } diff --git a/src/video.rs b/src/video.rs index 98027b9..5797103 100644 --- a/src/video.rs +++ b/src/video.rs @@ -1,13 +1,47 @@ +use std::io::Result; use std::path::Path; -use std::process::Command; +use std::process::{Child, Command, ExitStatus, Stdio}; + +use actix::prelude::*; // ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8 // ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8 -pub fn create_playlist(video_path: &str, playlist_file: &str) { +pub struct StreamActor; + +impl Actor for StreamActor { + type Context = Context; +} + +pub struct ProcessMessage(pub String, pub Child); + +impl Message for ProcessMessage { + type Result = Result; +} + +impl Handler for StreamActor { + type Result = Result; + + fn handle(&mut self, msg: ProcessMessage, _ctx: &mut Self::Context) -> Self::Result { + println!("Message received"); + let mut process = msg.1; + let result = process.wait(); + + println!( + "Finished waiting for: {:?}. Code: {:?}", + msg.0, + result + .as_ref() + .map_or(-1, |status| status.code().unwrap_or(-1)) + ); + result + } +} + +pub fn create_playlist(video_path: &str, playlist_file: &str) -> Result { if Path::new(playlist_file).exists() { println!("Playlist already exists: {}", playlist_file); - return; + return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)); } let result = Command::new("ffmpeg") @@ -26,16 +60,22 @@ pub fn create_playlist(video_path: &str, playlist_file: &str) { .arg("-vf") .arg("scale=1080:-2,setsar=1:1") .arg(playlist_file) + .stdout(Stdio::null()) + .stderr(Stdio::null()) .spawn(); let start_time = std::time::Instant::now(); loop { std::thread::sleep(std::time::Duration::from_secs(1)); - if Path::new(playlist_file).exists() || std::time::Instant::now() - start_time > std::time::Duration::from_secs(5) { + if Path::new(playlist_file).exists() + || std::time::Instant::now() - start_time > std::time::Duration::from_secs(5) + { break; } } + + result } pub fn generate_video_thumbnail(path: &Path, destination: &Path) {