#[macro_use] extern crate diesel; extern crate rayon; use crate::auth::login; use database::{DbError, DbErrorKind, FavoriteDao, SqliteFavoriteDao, SqliteUserDao, UserDao}; use futures::stream::StreamExt; use std::io::prelude::*; use std::path::{Path, PathBuf}; use std::sync::mpsc::channel; use std::sync::Arc; use std::{env, fs::File}; use actix::{Actor, Addr}; use actix_files::NamedFile; use actix_multipart as mp; use actix_web::{ delete, error::BlockingError, get, post, put, web::{self, BufMut, BytesMut}, App, HttpServer, Responder, }; use actix_web::{ middleware, web::{HttpRequest, HttpResponse, Json}, }; use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher}; use rayon::prelude::*; use serde::Serialize; use data::{AddFavoriteRequest, ThumbnailRequest}; use log::{debug, error, info}; use crate::data::Claims; use crate::files::{is_image_or_video, is_valid_path, list_files}; use crate::video::*; mod auth; mod data; mod database; mod files; mod video; #[post("/photos")] async fn list_photos(_claims: Claims, req: Json) -> impl Responder { info!("{}", req.path); let path = &req.path; if let Some(path) = is_valid_path(path) { let files = list_files(path).unwrap_or_default(); let photos = &files .iter() .filter(|f| !f.extension().unwrap_or_default().is_empty()) .map(|f| f.to_str().unwrap().to_string()) .collect::>(); let dirs = &files .iter() .filter(|f| f.extension().unwrap_or_default().is_empty()) .map(|f| f.to_str().unwrap().to_string()) .collect::>(); HttpResponse::Ok().json(PhotosResponse { photos, dirs }) } else { error!("Bad photos request: {}", req.path); HttpResponse::BadRequest().finish() } } #[derive(Serialize)] struct PhotosResponse<'a> { photos: &'a [String], dirs: &'a [String], } #[get("/image")] async fn get_image( _claims: Claims, request: HttpRequest, req: web::Query, ) -> impl Responder { if let Some(path) = is_valid_path(&req.path) { if req.size.is_some() { let thumbs = dotenv::var("THUMBNAILS").unwrap(); let relative_path = path .strip_prefix(dotenv::var("BASE_PATH").unwrap()) .expect("Error stripping prefix"); let thumb_path = Path::new(&thumbs).join(relative_path); debug!("{:?}", thumb_path); if let Ok(file) = NamedFile::open(&thumb_path) { file.into_response(&request).unwrap() } else { HttpResponse::NotFound().finish() } } else if let Ok(file) = NamedFile::open(path) { file.into_response(&request).unwrap() } else { HttpResponse::NotFound().finish() } } else { error!("Bad photos request: {}", req.path); HttpResponse::BadRequest().finish() } } #[post("/image")] async fn upload_image(_: Claims, mut payload: mp::Multipart) -> impl Responder { let mut file_content: BytesMut = BytesMut::new(); let mut file_name: Option = None; let mut file_path: Option = None; while let Some(Ok(mut part)) = payload.next().await { if let Some(content_type) = part.content_disposition() { debug!("{:?}", content_type); if let Some(filename) = content_type.get_filename() { debug!("Name: {:?}", filename); file_name = Some(filename.to_string()); while let Some(Ok(data)) = part.next().await { file_content.put(data); } } else if content_type.get_name().map_or(false, |name| name == "path") { while let Some(Ok(data)) = part.next().await { if let Ok(path) = std::str::from_utf8(&data) { file_path = Some(path.to_string()) } } } } } let path = file_path.unwrap_or_else(|| dotenv::var("BASE_PATH").unwrap()); if !file_content.is_empty() { let full_path = PathBuf::from(&path).join(file_name.unwrap()); if let Some(full_path) = is_valid_path(full_path.to_str().unwrap_or("")) { if !full_path.is_file() && is_image_or_video(&full_path) { let mut file = File::create(full_path).unwrap(); file.write_all(&file_content).unwrap(); } else { error!("File already exists: {:?}", full_path); return HttpResponse::BadRequest().body("File already exists"); } } else { error!("Invalid path for upload: {:?}", full_path); return HttpResponse::BadRequest().body("Path was not valid"); } } else { return HttpResponse::BadRequest().body("No file body read"); } HttpResponse::Ok().finish() } #[post("/video/generate")] async fn generate_video( _claims: Claims, data: web::Data, body: web::Json, ) -> impl Responder { let filename = PathBuf::from(&body.path); if let Some(name) = filename.file_stem() { let filename = name.to_str().expect("Filename should convert to string"); let playlist = format!("tmp/{}.m3u8", filename); if let Some(path) = is_valid_path(&body.path) { if let Ok(child) = create_playlist(&path.to_str().unwrap(), &playlist).await { data.stream_manager .do_send(ProcessMessage(playlist.clone(), child)); } } else { return HttpResponse::BadRequest().finish(); } HttpResponse::Ok().json(playlist) } else { error!("Unable to get file name: {:?}", filename); HttpResponse::BadRequest().finish() } } #[get("/video/stream")] async fn stream_video( request: HttpRequest, _: Claims, path: web::Query, ) -> impl Responder { let playlist = &path.path; debug!("Playlist: {}", playlist); // Extract video playlist dir to dotenv if !playlist.starts_with("tmp") && is_valid_path(playlist) != None { HttpResponse::BadRequest().finish() } else if let Ok(file) = NamedFile::open(playlist) { file.into_response(&request).unwrap() } else { HttpResponse::NotFound().finish() } } #[get("/video/{path}")] async fn get_video_part( request: HttpRequest, _: Claims, path: web::Path, ) -> impl Responder { let part = &path.path; debug!("Video part: {}", part); if let Ok(file) = NamedFile::open(String::from("tmp/") + part) { file.into_response(&request).unwrap() } else { error!("Video part not found: tmp/{}", part); HttpResponse::NotFound().finish() } } #[get("image/favorites")] async fn favorites( claims: Claims, favorites_dao: web::Data>, ) -> impl Responder { let favorites = web::block(move || favorites_dao.get_favorites(claims.sub.parse::().unwrap())) .await .unwrap() .into_iter() .map(|favorite| favorite.path) .collect::>(); HttpResponse::Ok().json(PhotosResponse { photos: &favorites, dirs: &Vec::new(), }) } #[put("image/favorites")] async fn put_add_favorite( claims: Claims, body: web::Json, favorites_dao: web::Data>, ) -> impl Responder { if let Ok(user_id) = claims.sub.parse::() { let path = body.path.clone(); match web::block::<_, usize, DbError>(move || favorites_dao.add_favorite(user_id, &path)) .await { Err(BlockingError::Error(e)) if e.kind == DbErrorKind::AlreadyExists => { debug!("Favorite: {} exists for user: {}", &body.path, user_id); HttpResponse::Ok() } Err(e) => { info!("{:?} {}. for user: {}", e, body.path, user_id); HttpResponse::BadRequest() } Ok(_) => { debug!("Adding favorite \"{}\" for userid: {}", body.path, user_id); HttpResponse::Created() } } } else { error!("Unable to parse sub as i32: {}", claims.sub); HttpResponse::BadRequest() } } #[delete("image/favorites")] async fn delete_favorite( claims: Claims, body: web::Query, favorites_dao: web::Data>, ) -> impl Responder { if let Ok(user_id) = claims.sub.parse::() { let path = body.path.clone(); web::block::<_, _, String>(move || { favorites_dao.remove_favorite(user_id, path); Ok(()) }) .await .unwrap(); debug!( "Removing favorite \"{}\" for userid: {}", body.path, user_id ); HttpResponse::Ok() } else { error!("Unable to parse sub as i32: {}", claims.sub); HttpResponse::BadRequest() } } fn create_thumbnails() { let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined"); let thumbnail_directory: &Path = Path::new(thumbs); let images = PathBuf::from(dotenv::var("BASE_PATH").unwrap()); walkdir::WalkDir::new(&images) .into_iter() .collect::>>() .into_par_iter() .filter_map(|entry| entry.ok()) .filter(|entry| entry.file_type().is_file()) .filter(|entry| { debug!("{:?}", entry.path()); if let Some(ext) = entry .path() .extension() .and_then(|ext| ext.to_str().map(|ext| ext.to_lowercase())) { if ext == "mp4" || ext == "mov" { let relative_path = &entry.path().strip_prefix(&images).unwrap(); let thumb_path = Path::new(thumbnail_directory).join(relative_path); std::fs::create_dir_all(&thumb_path.parent().unwrap()) .expect("Error creating directory"); debug!("Generating video thumbnail: {:?}", thumb_path); generate_video_thumbnail(entry.path(), &thumb_path); false } else { ext == "jpg" || ext == "jpeg" || ext == "png" || ext == "nef" } } else { error!("Unable to get extension for file: {:?}", entry.path()); false } }) .filter(|entry| { let path = entry.path(); let relative_path = &path.strip_prefix(&images).unwrap(); let thumb_path = Path::new(thumbnail_directory).join(relative_path); !thumb_path.exists() }) .map(|entry| (image::open(entry.path()), entry.path().to_path_buf())) .filter(|(img, path)| { if let Err(e) = img { error!("Unable to open image: {:?}. {}", path, e); } img.is_ok() }) .map(|(img, path)| (img.unwrap(), path)) .map(|(image, path)| (image.thumbnail(200, u32::MAX), path)) .map(|(image, path)| { let relative_path = &path.strip_prefix(&images).unwrap(); let thumb_path = Path::new(thumbnail_directory).join(relative_path); std::fs::create_dir_all(&thumb_path.parent().unwrap()) .expect("There was an issue creating directory"); debug!("Saving thumbnail: {:?}", thumb_path); image.save(thumb_path).expect("Failure saving thumbnail"); }) .for_each(drop); debug!("Finished"); } fn main() -> std::io::Result<()> { dotenv::dotenv().ok(); env_logger::init(); create_thumbnails(); std::thread::spawn(|| { let (wtx, wrx) = channel(); let mut watcher = watcher(wtx, std::time::Duration::from_secs(10)).unwrap(); watcher .watch(dotenv::var("BASE_PATH").unwrap(), RecursiveMode::Recursive) .unwrap(); loop { let ev = wrx.recv(); if let Ok(event) = ev { match event { DebouncedEvent::Create(_) => create_thumbnails(), DebouncedEvent::Rename(orig, _) | DebouncedEvent::Write(orig) => { let image_base_path = PathBuf::from(env::var("BASE_PATH").unwrap()); let image_relative = orig.strip_prefix(&image_base_path).unwrap(); if let Ok(old_thumbnail) = env::var("THUMBNAILS").map(PathBuf::from).map(|mut base| { base.push(image_relative); base }) { if let Err(e) = std::fs::remove_file(&old_thumbnail) { error!( "Error removing thumbnail: {}\n{}", old_thumbnail.display(), e ); } else { info!("Deleted moved thumbnail: {}", old_thumbnail.display()); create_thumbnails(); } } } _ => continue, }; } } }); let system = actix::System::new(); let act = StreamActor {}.start(); let app_data = web::Data::new(AppState { stream_manager: Arc::new(act), }); HttpServer::new(move || { let user_dao = SqliteUserDao::new(); let favorites_dao = SqliteFavoriteDao::new(); App::new() .wrap(middleware::Logger::default()) .service(web::resource("/login").route(web::post().to(login))) .service(list_photos) .service(get_image) .service(upload_image) .service(generate_video) .service(stream_video) .service(get_video_part) .service(favorites) .service(put_add_favorite) .service(delete_favorite) .app_data(app_data.clone()) .data::>(Box::new(user_dao)) .data::>(Box::new(favorites_dao)) }) .bind(dotenv::var("BIND_URL").unwrap())? .bind("localhost:8088")? .run(); system.run() } struct AppState { stream_manager: Arc>, }