#[macro_use] extern crate diesel; extern crate rayon; use actix_web::web::Data; use actix_web_prom::PrometheusMetricsBuilder; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use futures::stream::StreamExt; use lazy_static::lazy_static; use prometheus::{self, IntGauge}; use std::error::Error; use std::sync::mpsc::channel; use std::sync::Mutex; use std::{collections::HashMap, io::prelude::*}; use std::{env, fs::File}; use std::{ io::ErrorKind, path::{Path, PathBuf}, }; use walkdir::{DirEntry, WalkDir}; use actix_files::NamedFile; use actix_multipart as mp; use actix_web::{ delete, get, middleware, post, put, web::{self, BufMut, BytesMut}, App, HttpRequest, HttpResponse, HttpServer, Responder, }; use anyhow::Context; use chrono::Utc; use diesel::sqlite::Sqlite; use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use rayon::prelude::*; use log::{debug, error, info, trace, warn}; use crate::auth::login; use crate::data::*; use crate::database::*; use crate::files::{ is_image_or_video, is_valid_full_path, move_file, RealFileSystem, RefreshThumbnailsMessage, }; use crate::service::ServiceBuilder; use crate::state::AppState; use crate::tags::*; use crate::video::*; mod auth; mod data; mod database; mod error; mod files; mod state; mod tags; mod video; mod service; #[cfg(test)] mod testhelpers; lazy_static! { static ref IMAGE_GAUGE: IntGauge = IntGauge::new( "imageserver_image_total", "Count of the images on the server" ) .unwrap(); static ref VIDEO_GAUGE: IntGauge = IntGauge::new( "imageserver_video_total", "Count of the videos on the server" ) .unwrap(); } pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); #[get("/image")] async fn get_image( _claims: Claims, request: HttpRequest, req: web::Query, app_state: Data, ) -> impl Responder { if let Some(path) = is_valid_full_path(&app_state.base_path, &req.path, false) { let image_size = req.size.unwrap_or(PhotoSize::Full); if image_size == PhotoSize::Thumb { let relative_path = path .strip_prefix(&app_state.base_path) .expect("Error stripping base path prefix from thumbnail"); let thumbs = &app_state.thumbnail_path; let thumb_path = Path::new(&thumbs).join(relative_path); trace!("Thumbnail path: {:?}", thumb_path); if let Ok(file) = NamedFile::open(&thumb_path) { file.into_response(&request) } else { HttpResponse::NotFound().finish() } } else if let Ok(file) = NamedFile::open(path) { file.into_response(&request) } else { HttpResponse::NotFound().finish() } } else { error!("Bad photos request: {}", req.path); HttpResponse::BadRequest().finish() } } #[get("/image/metadata")] async fn get_file_metadata( _: Claims, path: web::Query, app_state: Data, ) -> impl Responder { match is_valid_full_path(&app_state.base_path, &path.path, false) .ok_or_else(|| ErrorKind::InvalidData.into()) .and_then(File::open) .and_then(|file| file.metadata()) { Ok(metadata) => { let response: MetadataResponse = metadata.into(); HttpResponse::Ok().json(response) } Err(e) => { error!("Error getting metadata for file '{}': {:?}", path.path, e); HttpResponse::InternalServerError().finish() } } } #[post("/image")] async fn upload_image( _: Claims, mut payload: mp::Multipart, app_state: Data, ) -> impl Responder { let mut file_content: BytesMut = BytesMut::new(); let mut file_name: Option = None; let mut file_path: Option = None; while let Some(Ok(mut part)) = payload.next().await { if let Some(content_type) = part.content_disposition() { debug!("{:?}", content_type); if let Some(filename) = content_type.get_filename() { debug!("Name: {:?}", filename); file_name = Some(filename.to_string()); while let Some(Ok(data)) = part.next().await { file_content.put(data); } } else if content_type.get_name().map_or(false, |name| name == "path") { while let Some(Ok(data)) = part.next().await { if let Ok(path) = std::str::from_utf8(&data) { file_path = Some(path.to_string()) } } } } } let path = file_path.unwrap_or_else(|| app_state.base_path.clone()); if !file_content.is_empty() { let full_path = PathBuf::from(&path).join(file_name.unwrap()); if let Some(full_path) = is_valid_full_path( &app_state.base_path, &full_path.to_str().unwrap().to_string(), true, ) { if !full_path.is_file() && is_image_or_video(&full_path) { let mut file = File::create(&full_path).unwrap(); file.write_all(&file_content).unwrap(); info!("Uploaded: {:?}", full_path); } else { warn!("File already exists: {:?}", full_path); let new_path = format!( "{}/{}_{}.{}", full_path.parent().unwrap().to_str().unwrap(), full_path.file_stem().unwrap().to_str().unwrap(), Utc::now().timestamp(), full_path .extension() .expect("Uploaded file should have an extension") .to_str() .unwrap() ); info!("Uploaded: {}", new_path); let mut file = File::create(new_path).unwrap(); file.write_all(&file_content).unwrap(); } } else { error!("Invalid path for upload: {:?}", full_path); return HttpResponse::BadRequest().body("Path was not valid"); } } else { return HttpResponse::BadRequest().body("No file body read"); } app_state.stream_manager.do_send(RefreshThumbnailsMessage); HttpResponse::Ok().finish() } #[post("/video/generate")] async fn generate_video( _claims: Claims, app_state: Data, body: web::Json, ) -> impl Responder { let filename = PathBuf::from(&body.path); if let Some(name) = filename.file_stem() { let filename = name.to_str().expect("Filename should convert to string"); let playlist = format!("tmp/{}.m3u8", filename); if let Some(path) = is_valid_full_path(&app_state.base_path, &body.path, false) { if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await { app_state .stream_manager .do_send(ProcessMessage(playlist.clone(), child)); } } else { return HttpResponse::BadRequest().finish(); } HttpResponse::Ok().json(playlist) } else { error!("Unable to get file name: {:?}", filename); HttpResponse::BadRequest().finish() } } #[get("/video/stream")] async fn stream_video( request: HttpRequest, _: Claims, path: web::Query, app_state: Data, ) -> impl Responder { let playlist = &path.path; debug!("Playlist: {}", playlist); // Extract video playlist dir to dotenv if !playlist.starts_with("tmp") && is_valid_full_path(&app_state.base_path, playlist, false).is_some() { HttpResponse::BadRequest().finish() } else if let Ok(file) = NamedFile::open(playlist) { file.into_response(&request) } else { HttpResponse::NotFound().finish() } } #[get("/video/{path}")] async fn get_video_part( request: HttpRequest, _: Claims, path: web::Path, ) -> impl Responder { let part = &path.path; debug!("Video part: {}", part); if let Ok(file) = NamedFile::open(String::from("tmp/") + part) { file.into_response(&request) } else { error!("Video part not found: tmp/{}", part); HttpResponse::NotFound().finish() } } #[get("image/favorites")] async fn favorites( claims: Claims, favorites_dao: Data>>, ) -> impl Responder { match web::block(move || { favorites_dao .lock() .expect("Unable to get FavoritesDao") .get_favorites(claims.sub.parse::().unwrap()) }) .await { Ok(Ok(favorites)) => { let favorites = favorites .into_iter() .map(|favorite| favorite.path) .collect::>(); HttpResponse::Ok().json(PhotosResponse { photos: favorites, dirs: Vec::new(), }) } Ok(Err(e)) => { error!("Error getting favorites: {:?}", e); HttpResponse::InternalServerError().finish() } Err(_) => HttpResponse::InternalServerError().finish(), } } #[put("image/favorites")] async fn put_add_favorite( claims: Claims, body: web::Json, favorites_dao: Data>>, ) -> impl Responder { if let Ok(user_id) = claims.sub.parse::() { let path = body.path.clone(); match web::block::<_, Result>(move || { favorites_dao .lock() .expect("Unable to get FavoritesDao") .add_favorite(user_id, &path) }) .await { Ok(Err(e)) if e.kind == DbErrorKind::AlreadyExists => { debug!("Favorite: {} exists for user: {}", &body.path, user_id); HttpResponse::Ok() } Ok(Err(e)) => { info!("{:?} {}. for user: {}", e, body.path, user_id); HttpResponse::BadRequest() } Ok(Ok(_)) => { debug!("Adding favorite \"{}\" for userid: {}", body.path, user_id); HttpResponse::Created() } Err(e) => { error!("Blocking error while inserting favorite: {:?}", e); HttpResponse::InternalServerError() } } } else { error!("Unable to parse sub as i32: {}", claims.sub); HttpResponse::BadRequest() } } #[delete("image/favorites")] async fn delete_favorite( claims: Claims, body: web::Query, favorites_dao: Data>>, ) -> impl Responder { if let Ok(user_id) = claims.sub.parse::() { let path = body.path.clone(); web::block(move || { favorites_dao .lock() .expect("Unable to get favorites dao") .remove_favorite(user_id, path); }) .await .unwrap(); info!( "Removing favorite \"{}\" for userid: {}", body.path, user_id ); HttpResponse::Ok() } else { error!("Unable to parse sub as i32: {}", claims.sub); HttpResponse::BadRequest() } } fn create_thumbnails() { let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined"); let thumbnail_directory: &Path = Path::new(thumbs); let images = PathBuf::from(dotenv::var("BASE_PATH").unwrap()); WalkDir::new(&images) .into_iter() .collect::>>() .into_par_iter() .filter_map(|entry| entry.ok()) .filter(|entry| entry.file_type().is_file()) .filter(|entry| { if is_video(entry) { let relative_path = &entry.path().strip_prefix(&images).unwrap(); let thumb_path = Path::new(thumbnail_directory).join(relative_path); std::fs::create_dir_all( thumb_path .parent() .unwrap_or_else(|| panic!("Thumbnail {:?} has no parent?", thumb_path)), ) .expect("Error creating directory"); debug!("Generating video thumbnail: {:?}", thumb_path); generate_video_thumbnail(entry.path(), &thumb_path); false } else { is_image(entry) } }) .filter(|entry| { let path = entry.path(); let relative_path = &path.strip_prefix(&images).unwrap(); let thumb_path = Path::new(thumbnail_directory).join(relative_path); !thumb_path.exists() }) .map(|entry| (image::open(entry.path()), entry.path().to_path_buf())) .filter(|(img, path)| { if let Err(e) = img { error!("Unable to open image: {:?}. {}", path, e); } img.is_ok() }) .map(|(img, path)| (img.unwrap(), path)) .map(|(image, path)| (image.thumbnail(200, u32::MAX), path)) .map(|(image, path)| { let relative_path = &path.strip_prefix(&images).unwrap(); let thumb_path = Path::new(thumbnail_directory).join(relative_path); std::fs::create_dir_all(thumb_path.parent().unwrap()) .expect("There was an issue creating directory"); info!("Saving thumbnail: {:?}", thumb_path); image.save(thumb_path).expect("Failure saving thumbnail"); }) .for_each(drop); debug!("Finished making thumbnails"); update_media_counts(&images); } fn update_media_counts(media_dir: &Path) { let mut image_count = 0; let mut video_count = 0; for ref entry in WalkDir::new(media_dir).into_iter().filter_map(|e| e.ok()) { if is_image(entry) { image_count += 1; } else if is_video(entry) { video_count += 1; } } IMAGE_GAUGE.set(image_count); VIDEO_GAUGE.set(video_count); } fn is_image(entry: &DirEntry) -> bool { entry .path() .extension() .and_then(|ext| ext.to_str()) .map(|ext| ext == "jpg" || ext == "jpeg" || ext == "png" || ext == "nef") .unwrap_or(false) } fn is_video(entry: &DirEntry) -> bool { entry .path() .extension() .and_then(|ext| ext.to_str()) .map(|ext| ext == "mp4" || ext == "mov") .unwrap_or(false) } fn main() -> std::io::Result<()> { if let Err(err) = dotenv::dotenv() { println!("Error parsing .env {:?}", err); } env_logger::init(); run_migrations(&mut connect()).expect("Failed to run migrations"); create_thumbnails(); watch_files(); let system = actix::System::new(); system.block_on(async { let app_data = Data::new(AppState::default()); let labels = HashMap::new(); let prometheus = PrometheusMetricsBuilder::new("api") .const_labels(labels) .build() .expect("Unable to build prometheus metrics middleware"); prometheus .registry .register(Box::new(IMAGE_GAUGE.clone())) .unwrap(); prometheus .registry .register(Box::new(VIDEO_GAUGE.clone())) .unwrap(); HttpServer::new(move || { let user_dao = SqliteUserDao::new(); let favorites_dao = SqliteFavoriteDao::new(); let tag_dao = SqliteTagDao::default(); App::new() .wrap(middleware::Logger::default()) .service(web::resource("/login").route(web::post().to(login::))) .service( web::resource("/photos") .route(web::get().to(files::list_photos::)), ) .service(web::resource("/file/move").post(move_file::)) .service(get_image) .service(upload_image) .service(generate_video) .service(stream_video) .service(get_video_part) .service(favorites) .service(put_add_favorite) .service(delete_favorite) .service(get_file_metadata) .add_feature(add_tag_services::<_, SqliteTagDao>) .app_data(app_data.clone()) .app_data::>(Data::new(RealFileSystem::new( app_data.base_path.clone(), ))) .app_data::>>(Data::new(Mutex::new(user_dao))) .app_data::>>>(Data::new(Mutex::new(Box::new( favorites_dao, )))) .app_data::>>(Data::new(Mutex::new(tag_dao))) .wrap(prometheus.clone()) }) .bind(dotenv::var("BIND_URL").unwrap())? .bind("localhost:8088")? .run() .await }) } fn run_migrations( connection: &mut impl MigrationHarness, ) -> Result<(), Box> { connection.run_pending_migrations(MIGRATIONS)?; Ok(()) } fn watch_files() { std::thread::spawn(|| { let (wtx, wrx) = channel(); let mut watcher = RecommendedWatcher::new(wtx, Config::default()).unwrap(); let base_str = dotenv::var("BASE_PATH").unwrap(); let base_path = Path::new(&base_str); watcher .watch(base_path, RecursiveMode::Recursive) .context(format!("Unable to watch BASE_PATH: '{}'", base_str)) .unwrap(); loop { let ev = wrx.recv(); if let Ok(Ok(event)) = ev { match event.kind { EventKind::Create(_) => create_thumbnails(), EventKind::Modify(kind) => { debug!("All modified paths: {:?}", event.paths); debug!("Modify kind: {:?}", kind); if let Some(orig) = event.paths.first() { let image_base_path = PathBuf::from(env::var("BASE_PATH").unwrap()); let image_relative = orig.strip_prefix(&image_base_path).unwrap(); if let Ok(old_thumbnail) = env::var("THUMBNAILS").map(PathBuf::from).map(|mut base| { base.push(image_relative); base }) { if let Err(e) = std::fs::remove_file(&old_thumbnail) { error!( "Error removing thumbnail: {}\n{}", old_thumbnail.display(), e ); } else { info!("Deleted moved thumbnail: {}", old_thumbnail.display()); create_thumbnails(); } } } } EventKind::Remove(_) => { update_media_counts(&PathBuf::from(env::var("BASE_PATH").unwrap())) } _ => {} } }; } }); }