Use an Actor for the Stream watching
All checks were successful
Core Repos/ImageApi/pipeline/pr-master This commit looks good

This commit is contained in:
Cameron Cordes
2021-02-11 20:39:07 -05:00
parent 45b4f0cd72
commit 11d1e9600a
4 changed files with 125 additions and 59 deletions

View File

@@ -5,23 +5,24 @@ extern crate rayon;
use std::fs::File;
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::sync::{Arc};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use actix::{Actor, Addr};
use actix_files::NamedFile;
use actix_multipart as mp;
use actix_web::{App, get, HttpServer, post, Responder, web};
use actix_web::web::{HttpRequest, HttpResponse, Json};
use actix_web::{get, post, web, App, HttpServer, Responder};
use chrono::{Duration, Utc};
use futures::stream::StreamExt;
use jsonwebtoken::{encode, EncodingKey, Header};
use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
use notify::{DebouncedEvent, RecursiveMode, watcher, Watcher};
use rayon::prelude::*;
use serde::Serialize;
use data::{AddFavoriteRequest, LoginRequest, ThumbnailRequest};
use crate::data::{secret_key, Claims, CreateAccountRequest, Token};
use crate::data::{Claims, CreateAccountRequest, secret_key, Token};
use crate::database::{add_favorite, create_user, get_favorites, get_user, user_exists};
use crate::files::{is_valid_path, list_files};
use crate::video::*;
@@ -59,7 +60,7 @@ async fn login(creds: Json<LoginRequest>) -> impl Responder {
&claims,
&EncodingKey::from_secret(secret_key().as_bytes()),
)
.unwrap();
.unwrap();
HttpResponse::Ok().json(Token { token: &token })
} else {
HttpResponse::NotFound().finish()
@@ -185,12 +186,8 @@ async fn generate_video(
let filename = name.to_str().expect("Filename should convert to string");
let playlist = format!("tmp/{}.m3u8", filename);
if let Some(path) = is_valid_path(&body.path) {
if let Ok(mut stream) = data.stream_manager.lock() {
if let Ok(child) = create_playlist(&path.to_str().unwrap(), &playlist) {
stream.track_stream(&playlist, child)
}
} else {
let _ = create_playlist(&path.to_str().unwrap(), &playlist);
if let Ok(child) = create_playlist(&path.to_str().unwrap(), &playlist) {
data.stream_manager.do_send(ProcessMessage(playlist.clone(), child));
}
} else {
return HttpResponse::BadRequest().finish();
@@ -259,7 +256,7 @@ async fn post_add_favorite(claims: Claims, body: web::Json<AddFavoriteRequest>)
}
}
async fn create_thumbnails() {
fn create_thumbnails() {
let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
let thumbnail_directory: &Path = Path::new(thumbs);
@@ -314,13 +311,10 @@ async fn create_thumbnails() {
println!("Finished");
}
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
create_thumbnails().await;
fn main() -> std::io::Result<()> {
create_thumbnails();
let stream_manager = Arc::new(Mutex::new(VideoStreamManager::new()));
let stream_manager_copy = stream_manager.clone();
tokio::spawn(async move {
std::thread::spawn(|| {
let (wtx, wrx) = channel();
let mut watcher = watcher(wtx, std::time::Duration::from_secs(10)).unwrap();
watcher
@@ -328,24 +322,24 @@ async fn main() -> std::io::Result<()> {
.unwrap();
loop {
if let Ok(mut manager) = stream_manager_copy.lock() {
manager.check_for_finished_streams();
}
let ev = wrx.recv_timeout(std::time::Duration::from_secs(10));
if let Ok(event) = ev {
match event {
DebouncedEvent::Create(_) => create_thumbnails().await,
DebouncedEvent::Rename(_, _) => create_thumbnails().await,
DebouncedEvent::Create(_) => create_thumbnails(),
DebouncedEvent::Rename(_, _) => create_thumbnails(),
_ => continue,
};
}
}
});
let system = actix::System::new("Fileserver");
let act = StreamActor {}.start();
let app_data = web::Data::new(AppState {
stream_manager: stream_manager.clone(),
stream_manager: Arc::new(act),
});
HttpServer::new(move || {
App::new()
.service(login)
@@ -359,12 +353,13 @@ async fn main() -> std::io::Result<()> {
.service(post_add_favorite)
.app_data(app_data.clone())
})
.bind(dotenv::var("BIND_URL").unwrap())?
.bind("localhost:8088")?
.run()
.await
.bind(dotenv::var("BIND_URL").unwrap())?
.bind("localhost:8088")?
.run();
system.run()
}
struct AppState {
stream_manager: Arc<Mutex<VideoStreamManager>>,
stream_manager: Arc<Addr<StreamActor>>,
}