#[macro_use] extern crate diesel; extern crate rayon; use actix_web_prom::PrometheusMetrics; use futures::stream::StreamExt; use lazy_static::lazy_static; use prometheus::{self, IntGauge}; use std::sync::{mpsc::channel, Arc}; use std::{collections::HashMap, io::prelude::*}; use std::{env, fs::File}; use std::{ io::ErrorKind, path::{Path, PathBuf}, }; use walkdir::{DirEntry, WalkDir}; use actix::prelude::*; use actix_files::NamedFile; use actix_multipart as mp; use actix_web::{ delete, error::BlockingError, get, middleware, post, put, web::{self, BufMut, BytesMut, HttpRequest, HttpResponse}, App, HttpServer, Responder, }; use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher}; use rayon::prelude::*; use log::{debug, error, info}; use crate::auth::login; use crate::data::*; use crate::database::*; use crate::files::{is_image_or_video, is_valid_path}; use crate::video::*; mod auth; mod data; mod database; mod files; mod video; #[cfg(test)] mod testhelpers; lazy_static! { static ref IMAGE_GAUGE: IntGauge = IntGauge::new( "imageserver_image_total", "Count of the images on the server" ) .unwrap(); static ref VIDEO_GAUGE: IntGauge = IntGauge::new( "imageserver_video_total", "Count of the videos on the server" ) .unwrap(); } #[get("/image")] async fn get_image( _claims: Claims, request: HttpRequest, req: web::Query, ) -> impl Responder { if let Some(path) = is_valid_path(&req.path) { if req.size.is_some() { let thumbs = dotenv::var("THUMBNAILS").unwrap(); let relative_path = path .strip_prefix(dotenv::var("BASE_PATH").unwrap()) .expect("Error stripping prefix"); let thumb_path = Path::new(&thumbs).join(relative_path); debug!("{:?}", thumb_path); if let Ok(file) = NamedFile::open(&thumb_path) { file.into_response(&request).unwrap() } else { HttpResponse::NotFound().finish() } } else if let Ok(file) = NamedFile::open(path) { file.into_response(&request).unwrap() } else { HttpResponse::NotFound().finish() } } else { error!("Bad photos request: {}", req.path); HttpResponse::BadRequest().finish() } } #[get("/image/metadata")] async fn get_file_metadata(_: Claims, path: web::Query) -> impl Responder { match is_valid_path(&path.path) .ok_or_else(|| ErrorKind::InvalidData.into()) .and_then(File::open) .and_then(|file| file.metadata()) { Ok(metadata) => { let response: MetadataResponse = metadata.into(); HttpResponse::Ok().json(response) } Err(e) => { error!("Error getting metadata for file '{}': {:?}", path.path, e); HttpResponse::InternalServerError().finish() } } } #[post("/image")] async fn upload_image(_: Claims, mut payload: mp::Multipart) -> impl Responder { let mut file_content: BytesMut = BytesMut::new(); let mut file_name: Option = None; let mut file_path: Option = None; while let Some(Ok(mut part)) = payload.next().await { if let Some(content_type) = part.content_disposition() { debug!("{:?}", content_type); if let Some(filename) = content_type.get_filename() { debug!("Name: {:?}", filename); file_name = Some(filename.to_string()); while let Some(Ok(data)) = part.next().await { file_content.put(data); } } else if content_type.get_name().map_or(false, |name| name == "path") { while let Some(Ok(data)) = part.next().await { if let Ok(path) = std::str::from_utf8(&data) { file_path = Some(path.to_string()) } } } } } let path = file_path.unwrap_or_else(|| dotenv::var("BASE_PATH").unwrap()); if !file_content.is_empty() { let full_path = PathBuf::from(&path).join(file_name.unwrap()); if let Some(full_path) = is_valid_path(full_path.to_str().unwrap_or("")) { if !full_path.is_file() && is_image_or_video(&full_path) { let mut file = File::create(full_path).unwrap(); file.write_all(&file_content).unwrap(); } else { error!("File already exists: {:?}", full_path); return HttpResponse::BadRequest().body("File already exists"); } } else { error!("Invalid path for upload: {:?}", full_path); return HttpResponse::BadRequest().body("Path was not valid"); } } else { return HttpResponse::BadRequest().body("No file body read"); } HttpResponse::Ok().finish() } #[post("/video/generate")] async fn generate_video( _claims: Claims, data: web::Data, body: web::Json, ) -> impl Responder { let filename = PathBuf::from(&body.path); if let Some(name) = filename.file_stem() { let filename = name.to_str().expect("Filename should convert to string"); let playlist = format!("tmp/{}.m3u8", filename); if let Some(path) = is_valid_path(&body.path) { if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await { data.stream_manager .do_send(ProcessMessage(playlist.clone(), child)); } } else { return HttpResponse::BadRequest().finish(); } HttpResponse::Ok().json(playlist) } else { error!("Unable to get file name: {:?}", filename); HttpResponse::BadRequest().finish() } } #[get("/video/stream")] async fn stream_video( request: HttpRequest, _: Claims, path: web::Query, ) -> impl Responder { let playlist = &path.path; debug!("Playlist: {}", playlist); // Extract video playlist dir to dotenv if !playlist.starts_with("tmp") && is_valid_path(playlist) != None { HttpResponse::BadRequest().finish() } else if let Ok(file) = NamedFile::open(playlist) { file.into_response(&request).unwrap() } else { HttpResponse::NotFound().finish() } } #[get("/video/{path}")] async fn get_video_part( request: HttpRequest, _: Claims, path: web::Path, ) -> impl Responder { let part = &path.path; debug!("Video part: {}", part); if let Ok(file) = NamedFile::open(String::from("tmp/") + part) { file.into_response(&request).unwrap() } else { error!("Video part not found: tmp/{}", part); HttpResponse::NotFound().finish() } } #[get("image/favorites")] async fn favorites( claims: Claims, favorites_dao: web::Data>, ) -> impl Responder { let favorites = web::block(move || favorites_dao.get_favorites(claims.sub.parse::().unwrap())) .await .unwrap() .into_iter() .map(|favorite| favorite.path) .collect::>(); HttpResponse::Ok().json(PhotosResponse { photos: favorites, dirs: Vec::new(), }) } #[put("image/favorites")] async fn put_add_favorite( claims: Claims, body: web::Json, favorites_dao: web::Data>, ) -> impl Responder { if let Ok(user_id) = claims.sub.parse::() { let path = body.path.clone(); match web::block::<_, usize, DbError>(move || favorites_dao.add_favorite(user_id, &path)) .await { Err(BlockingError::Error(e)) if e.kind == DbErrorKind::AlreadyExists => { debug!("Favorite: {} exists for user: {}", &body.path, user_id); HttpResponse::Ok() } Err(e) => { info!("{:?} {}. for user: {}", e, body.path, user_id); HttpResponse::BadRequest() } Ok(_) => { debug!("Adding favorite \"{}\" for userid: {}", body.path, user_id); HttpResponse::Created() } } } else { error!("Unable to parse sub as i32: {}", claims.sub); HttpResponse::BadRequest() } } #[delete("image/favorites")] async fn delete_favorite( claims: Claims, body: web::Query, favorites_dao: web::Data>, ) -> impl Responder { if let Ok(user_id) = claims.sub.parse::() { let path = body.path.clone(); web::block::<_, _, String>(move || { favorites_dao.remove_favorite(user_id, path); Ok(()) }) .await .unwrap(); debug!( "Removing favorite \"{}\" for userid: {}", body.path, user_id ); HttpResponse::Ok() } else { error!("Unable to parse sub as i32: {}", claims.sub); HttpResponse::BadRequest() } } fn create_thumbnails() { let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined"); let thumbnail_directory: &Path = Path::new(thumbs); let images = PathBuf::from(dotenv::var("BASE_PATH").unwrap()); walkdir::WalkDir::new(&images) .into_iter() .collect::>>() .into_par_iter() .filter_map(|entry| entry.ok()) .filter(|entry| entry.file_type().is_file()) .filter(|entry| { debug!("{:?}", entry.path()); if let Some(ext) = entry .path() .extension() .and_then(|ext| ext.to_str().map(|ext| ext.to_lowercase())) { if ext == "mp4" || ext == "mov" { let relative_path = &entry.path().strip_prefix(&images).unwrap(); let thumb_path = Path::new(thumbnail_directory).join(relative_path); std::fs::create_dir_all(&thumb_path.parent().unwrap()) .expect("Error creating directory"); debug!("Generating video thumbnail: {:?}", thumb_path); generate_video_thumbnail(entry.path(), &thumb_path); false } else { is_image(entry) } } else { error!("Unable to get extension for file: {:?}", entry.path()); false } }) .filter(|entry| { let path = entry.path(); let relative_path = &path.strip_prefix(&images).unwrap(); let thumb_path = Path::new(thumbnail_directory).join(relative_path); !thumb_path.exists() }) .map(|entry| (image::open(entry.path()), entry.path().to_path_buf())) .filter(|(img, path)| { if let Err(e) = img { error!("Unable to open image: {:?}. {}", path, e); } img.is_ok() }) .map(|(img, path)| (img.unwrap(), path)) .map(|(image, path)| (image.thumbnail(200, u32::MAX), path)) .map(|(image, path)| { let relative_path = &path.strip_prefix(&images).unwrap(); let thumb_path = Path::new(thumbnail_directory).join(relative_path); std::fs::create_dir_all(&thumb_path.parent().unwrap()) .expect("There was an issue creating directory"); debug!("Saving thumbnail: {:?}", thumb_path); image.save(thumb_path).expect("Failure saving thumbnail"); }) .for_each(drop); debug!("Finished making thumbnails"); update_media_counts(&images); } fn update_media_counts(media_dir: &Path) { let mut image_count = 0; let mut video_count = 0; for ref entry in WalkDir::new(media_dir).into_iter().filter_map(|e| e.ok()) { if is_image(entry) { image_count += 1; } else if is_video(entry) { video_count += 1; } } IMAGE_GAUGE.set(image_count); VIDEO_GAUGE.set(video_count); } fn is_image(entry: &DirEntry) -> bool { entry .path() .extension() .and_then(|ext| ext.to_str()) .map(|ext| ext == "jpg" || ext == "jpeg" || ext == "png" || ext == "nef") .unwrap_or(false) } fn is_video(entry: &DirEntry) -> bool { entry .path() .extension() .and_then(|ext| ext.to_str()) .map(|ext| ext == "mp4" || ext == "mov") .unwrap_or(false) } fn main() -> std::io::Result<()> { dotenv::dotenv().ok(); env_logger::init(); create_thumbnails(); std::thread::spawn(|| { let (wtx, wrx) = channel(); let mut watcher = watcher(wtx, std::time::Duration::from_secs(10)).unwrap(); watcher .watch(dotenv::var("BASE_PATH").unwrap(), RecursiveMode::Recursive) .unwrap(); loop { let ev = wrx.recv(); if let Ok(event) = ev { match event { DebouncedEvent::Create(_) => create_thumbnails(), DebouncedEvent::Rename(orig, _) | DebouncedEvent::Write(orig) => { let image_base_path = PathBuf::from(env::var("BASE_PATH").unwrap()); let image_relative = orig.strip_prefix(&image_base_path).unwrap(); if let Ok(old_thumbnail) = env::var("THUMBNAILS").map(PathBuf::from).map(|mut base| { base.push(image_relative); base }) { if let Err(e) = std::fs::remove_file(&old_thumbnail) { error!( "Error removing thumbnail: {}\n{}", old_thumbnail.display(), e ); } else { info!("Deleted moved thumbnail: {}", old_thumbnail.display()); create_thumbnails(); } } } DebouncedEvent::Remove(_) => { update_media_counts(&PathBuf::from(env::var("BASE_PATH").unwrap())) } _ => continue, }; } } }); let system = actix::System::new("image-api"); let act = StreamActor {}.start(); let app_data = web::Data::new(AppState { stream_manager: Arc::new(act), }); let labels = HashMap::new(); let prometheus = PrometheusMetrics::new("", Some("/metrics"), Some(labels)); prometheus .registry .register(Box::new(IMAGE_GAUGE.clone())) .unwrap(); prometheus .registry .register(Box::new(VIDEO_GAUGE.clone())) .unwrap(); HttpServer::new(move || { let user_dao = SqliteUserDao::new(); let favorites_dao = SqliteFavoriteDao::new(); App::new() .wrap(middleware::Logger::default()) .service(web::resource("/login").route(web::post().to(login))) .service(web::resource("/photos").route(web::get().to(files::list_photos))) .service(get_image) .service(upload_image) .service(generate_video) .service(stream_video) .service(get_video_part) .service(favorites) .service(put_add_favorite) .service(delete_favorite) .service(get_file_metadata) .app_data(app_data.clone()) .data::>(Box::new(user_dao)) .data::>(Box::new(favorites_dao)) .wrap(prometheus.clone()) }) .bind(dotenv::var("BIND_URL").unwrap())? .bind("localhost:8088")? .run(); system.run() } struct AppState { stream_manager: Arc>, }