feature/video-stream-manager #4

Merged
cameron merged 3 commits from feature/video-stream-manager into master 2021-02-13 18:18:15 +00:00
3 changed files with 85 additions and 28 deletions
Showing only changes of commit b595bdd642 - Show all commits

View File

@@ -1,9 +1,9 @@
use std::str::FromStr; use std::str::FromStr;
use actix_web::{dev, Error, FromRequest, http::header, HttpRequest};
use actix_web::error::ErrorUnauthorized; use actix_web::error::ErrorUnauthorized;
use actix_web::{dev, http::header, Error, FromRequest, HttpRequest};
use futures::future::{err, ok, Ready}; use futures::future::{err, ok, Ready};
use jsonwebtoken::{Algorithm, decode, DecodingKey, Validation}; use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Serialize)] #[derive(Serialize)]

View File

@@ -2,21 +2,24 @@
extern crate diesel; extern crate diesel;
extern crate rayon; extern crate rayon;
use std::fs::File;
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use actix_files::NamedFile; use actix_files::NamedFile;
use actix_multipart as mp; use actix_multipart as mp;
use actix_web::web::{HttpRequest, HttpResponse, Json}; use actix_web::web::{HttpRequest, HttpResponse, Json};
use actix_web::{get, post, web, App, HttpServer, Responder}; use actix_web::{get, post, web, App, HttpServer, Responder};
use chrono::{Duration, Utc}; use chrono::{Duration, Utc};
use data::{AddFavoriteRequest, LoginRequest, ThumbnailRequest};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use jsonwebtoken::{encode, EncodingKey, Header}; use jsonwebtoken::{encode, EncodingKey, Header};
use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher}; use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
use rayon::prelude::*; use rayon::prelude::*;
use serde::Serialize; use serde::Serialize;
use std::fs::File;
use std::io::prelude::*; use data::{AddFavoriteRequest, LoginRequest, ThumbnailRequest};
use std::path::{Path, PathBuf};
use std::sync::mpsc::channel;
use crate::data::{secret_key, Claims, CreateAccountRequest, Token}; use crate::data::{secret_key, Claims, CreateAccountRequest, Token};
use crate::database::{add_favorite, create_user, get_favorites, get_user, user_exists}; use crate::database::{add_favorite, create_user, get_favorites, get_user, user_exists};
@@ -171,14 +174,24 @@ async fn upload_image(_: Claims, mut payload: mp::Multipart) -> impl Responder {
} }
#[post("/video/generate")] #[post("/video/generate")]
async fn generate_video(_claims: Claims, body: web::Json<ThumbnailRequest>) -> impl Responder { async fn generate_video(
_claims: Claims,
data: web::Data<AppState>,
body: web::Json<ThumbnailRequest>,
) -> impl Responder {
let filename = PathBuf::from(&body.path); let filename = PathBuf::from(&body.path);
if let Some(name) = filename.file_stem() { if let Some(name) = filename.file_stem() {
let filename = name.to_str().expect("Filename should convert to string"); let filename = name.to_str().expect("Filename should convert to string");
let playlist = format!("tmp/{}.m3u8", filename); let playlist = format!("tmp/{}.m3u8", filename);
if let Some(path) = is_valid_path(&body.path) { if let Some(path) = is_valid_path(&body.path) {
create_playlist(&path.to_str().unwrap(), &playlist); if let Ok(mut stream) = data.stream_manager.lock() {
if let Ok(child) = create_playlist(&path.to_str().unwrap(), &playlist) {
stream.track_stream(&playlist, child)
}
} else {
let _ = create_playlist(&path.to_str().unwrap(), &playlist);
}
} else { } else {
return HttpResponse::BadRequest().finish(); return HttpResponse::BadRequest().finish();
} }
@@ -305,7 +318,9 @@ async fn create_thumbnails() {
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
create_thumbnails().await; create_thumbnails().await;
tokio::spawn(async { let stream_manager = Arc::new(Mutex::new(VideoStreamManager::new()));
let stream_manager_copy = stream_manager.clone();
tokio::spawn(async move {
let (wtx, wrx) = channel(); let (wtx, wrx) = channel();
let mut watcher = watcher(wtx, std::time::Duration::from_secs(10)).unwrap(); let mut watcher = watcher(wtx, std::time::Duration::from_secs(10)).unwrap();
watcher watcher
@@ -313,24 +328,25 @@ async fn main() -> std::io::Result<()> {
.unwrap(); .unwrap();
loop { loop {
let ev = wrx.recv_timeout(std::time::Duration::from_secs(5)); if let Ok(mut manager) = stream_manager_copy.lock() {
match ev { manager.check_for_finished_streams();
Ok(event) => { }
let ev = wrx.recv_timeout(std::time::Duration::from_secs(10));
if let Ok(event) = ev {
match event { match event {
DebouncedEvent::Create(_) => create_thumbnails().await, DebouncedEvent::Create(_) => create_thumbnails().await,
DebouncedEvent::Rename(_, _) => create_thumbnails().await, DebouncedEvent::Rename(_, _) => create_thumbnails().await,
_ => continue, _ => continue,
}; };
} }
Err(e) => {
println!("Event: {:?}", e);
// break;
}
}
} }
}); });
HttpServer::new(|| { let app_data = web::Data::new(AppState {
stream_manager: stream_manager.clone(),
});
HttpServer::new(move || {
App::new() App::new()
.service(login) .service(login)
.service(list_photos) .service(list_photos)
@@ -341,9 +357,14 @@ async fn main() -> std::io::Result<()> {
.service(get_video_part) .service(get_video_part)
.service(favorites) .service(favorites)
.service(post_add_favorite) .service(post_add_favorite)
.app_data(app_data.clone())
}) })
.bind(dotenv::var("BIND_URL").unwrap())? .bind(dotenv::var("BIND_URL").unwrap())?
.bind("localhost:8088")? .bind("localhost:8088")?
.run() .run()
.await .await
} }
struct AppState {
stream_manager: Arc<Mutex<VideoStreamManager>>,
}

View File

@@ -1,13 +1,43 @@
use std::collections::HashMap;
use std::io::Result;
use std::path::Path; use std::path::Path;
use std::process::Command; use std::process::{Child, Command, Stdio};
// ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8 // ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8
// ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8 // ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8
pub fn create_playlist(video_path: &str, playlist_file: &str) { pub(crate) struct VideoStreamManager {
streams: HashMap<String, Child>,
}
impl VideoStreamManager {
pub(crate) fn new() -> VideoStreamManager {
VideoStreamManager {
streams: HashMap::new(),
}
}
pub fn track_stream(&mut self, playlist_path: &dyn ToString, child_process: Child) {
println!("Tracking process for: {:?}", playlist_path.to_string());
self.streams
.insert(playlist_path.to_string(), child_process);
}
pub fn check_for_finished_streams(&mut self) {
self.streams.retain(|playlist_path, process| {
let is_done = process.try_wait().map_or(false, |status| status.is_some());
if is_done {
println!("Removing process: {:?}", playlist_path)
}
!is_done
});
}
}
pub fn create_playlist(video_path: &str, playlist_file: &str) -> Result<Child> {
if Path::new(playlist_file).exists() { if Path::new(playlist_file).exists() {
println!("Playlist already exists: {}", playlist_file); println!("Playlist already exists: {}", playlist_file);
return; return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
} }
let result = Command::new("ffmpeg") let result = Command::new("ffmpeg")
@@ -26,16 +56,22 @@ pub fn create_playlist(video_path: &str, playlist_file: &str) {
.arg("-vf") .arg("-vf")
.arg("scale=1080:-2,setsar=1:1") .arg("scale=1080:-2,setsar=1:1")
.arg(playlist_file) .arg(playlist_file)
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn(); .spawn();
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();
loop { loop {
std::thread::sleep(std::time::Duration::from_secs(1)); std::thread::sleep(std::time::Duration::from_secs(1));
if Path::new(playlist_file).exists() || std::time::Instant::now() - start_time > std::time::Duration::from_secs(5) { if Path::new(playlist_file).exists()
|| std::time::Instant::now() - start_time > std::time::Duration::from_secs(5)
{
break; break;
} }
} }
result
} }
pub fn generate_video_thumbnail(path: &Path, destination: &Path) { pub fn generate_video_thumbnail(path: &Path, destination: &Path) {