Files
ImageApi/src/main.rs
Cameron 0419aa2323 Scan and generate Video HLS playlists on startup
Refactored and improved video path state. Bumped versions of some dependencies.
2024-12-05 20:19:03 -05:00

613 lines
20 KiB
Rust

#[macro_use]
extern crate diesel;
extern crate rayon;
use actix_web::web::Data;
use actix_web_prom::PrometheusMetricsBuilder;
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use futures::stream::StreamExt;
use lazy_static::lazy_static;
use prometheus::{self, IntGauge};
use std::error::Error;
use std::sync::mpsc::channel;
use std::sync::Mutex;
use std::{collections::HashMap, io::prelude::*};
use std::{env, fs::File};
use std::{
io::ErrorKind,
path::{Path, PathBuf},
};
use walkdir::{DirEntry, WalkDir};
use actix_files::NamedFile;
use actix_multipart as mp;
use actix_web::{
delete, get, middleware, post, put,
web::{self, BufMut, BytesMut},
App, HttpRequest, HttpResponse, HttpServer, Responder,
};
use anyhow::Context;
use chrono::Utc;
use diesel::sqlite::Sqlite;
use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use rayon::prelude::*;
use log::{debug, error, info, trace, warn};
use crate::auth::login;
use crate::data::*;
use crate::database::*;
use crate::files::{
is_image_or_video, is_valid_full_path, move_file, RealFileSystem, RefreshThumbnailsMessage,
};
use crate::service::ServiceBuilder;
use crate::state::AppState;
use crate::tags::*;
use crate::video::*;
mod auth;
mod data;
mod database;
mod error;
mod files;
mod state;
mod tags;
mod video;
mod service;
#[cfg(test)]
mod testhelpers;
lazy_static! {
static ref IMAGE_GAUGE: IntGauge = IntGauge::new(
"imageserver_image_total",
"Count of the images on the server"
)
.unwrap();
static ref VIDEO_GAUGE: IntGauge = IntGauge::new(
"imageserver_video_total",
"Count of the videos on the server"
)
.unwrap();
}
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
#[get("/image")]
async fn get_image(
_claims: Claims,
request: HttpRequest,
req: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
if let Some(path) = is_valid_full_path(&app_state.base_path, &req.path, false) {
let image_size = req.size.unwrap_or(PhotoSize::Full);
if image_size == PhotoSize::Thumb {
let relative_path = path
.strip_prefix(&app_state.base_path)
.expect("Error stripping base path prefix from thumbnail");
let thumbs = &app_state.thumbnail_path;
let thumb_path = Path::new(&thumbs).join(relative_path);
trace!("Thumbnail path: {:?}", thumb_path);
if let Ok(file) = NamedFile::open(&thumb_path) {
file.into_response(&request)
} else {
HttpResponse::NotFound().finish()
}
} else if let Ok(file) = NamedFile::open(path) {
file.into_response(&request)
} else {
HttpResponse::NotFound().finish()
}
} else {
error!("Bad photos request: {}", req.path);
HttpResponse::BadRequest().finish()
}
}
#[get("/image/metadata")]
async fn get_file_metadata(
_: Claims,
path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
match is_valid_full_path(&app_state.base_path, &path.path, false)
.ok_or_else(|| ErrorKind::InvalidData.into())
.and_then(File::open)
.and_then(|file| file.metadata())
{
Ok(metadata) => {
let response: MetadataResponse = metadata.into();
HttpResponse::Ok().json(response)
}
Err(e) => {
error!("Error getting metadata for file '{}': {:?}", path.path, e);
HttpResponse::InternalServerError().finish()
}
}
}
#[post("/image")]
async fn upload_image(
_: Claims,
mut payload: mp::Multipart,
app_state: Data<AppState>,
) -> impl Responder {
let mut file_content: BytesMut = BytesMut::new();
let mut file_name: Option<String> = None;
let mut file_path: Option<String> = None;
while let Some(Ok(mut part)) = payload.next().await {
if let Some(content_type) = part.content_disposition() {
debug!("{:?}", content_type);
if let Some(filename) = content_type.get_filename() {
debug!("Name: {:?}", filename);
file_name = Some(filename.to_string());
while let Some(Ok(data)) = part.next().await {
file_content.put(data);
}
} else if content_type.get_name().map_or(false, |name| name == "path") {
while let Some(Ok(data)) = part.next().await {
if let Ok(path) = std::str::from_utf8(&data) {
file_path = Some(path.to_string())
}
}
}
}
}
let path = file_path.unwrap_or_else(|| app_state.base_path.clone());
if !file_content.is_empty() {
let full_path = PathBuf::from(&path).join(file_name.unwrap());
if let Some(full_path) = is_valid_full_path(
&app_state.base_path,
&full_path.to_str().unwrap().to_string(),
true,
) {
if !full_path.is_file() && is_image_or_video(&full_path) {
let mut file = File::create(&full_path).unwrap();
file.write_all(&file_content).unwrap();
info!("Uploaded: {:?}", full_path);
} else {
warn!("File already exists: {:?}", full_path);
let new_path = format!(
"{}/{}_{}.{}",
full_path.parent().unwrap().to_str().unwrap(),
full_path.file_stem().unwrap().to_str().unwrap(),
Utc::now().timestamp(),
full_path
.extension()
.expect("Uploaded file should have an extension")
.to_str()
.unwrap()
);
info!("Uploaded: {}", new_path);
let mut file = File::create(new_path).unwrap();
file.write_all(&file_content).unwrap();
}
} else {
error!("Invalid path for upload: {:?}", full_path);
return HttpResponse::BadRequest().body("Path was not valid");
}
} else {
return HttpResponse::BadRequest().body("No file body read");
}
app_state.stream_manager.do_send(RefreshThumbnailsMessage);
HttpResponse::Ok().finish()
}
#[post("/video/generate")]
async fn generate_video(
_claims: Claims,
app_state: Data<AppState>,
body: web::Json<ThumbnailRequest>,
) -> impl Responder {
let filename = PathBuf::from(&body.path);
if let Some(name) = filename.file_name() {
let filename = name.to_str().expect("Filename should convert to string");
let playlist = format!("{}/{}.m3u8", app_state.video_path, filename);
if let Some(path) = is_valid_full_path(&app_state.base_path, &body.path, false) {
if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await {
app_state
.stream_manager
.do_send(ProcessMessage(playlist.clone(), child));
}
} else {
return HttpResponse::BadRequest().finish();
}
HttpResponse::Ok().json(playlist)
} else {
error!("Unable to get file name: {:?}", filename);
HttpResponse::BadRequest().finish()
}
}
#[get("/video/stream")]
async fn stream_video(
request: HttpRequest,
_: Claims,
path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let playlist = &path.path;
debug!("Playlist: {}", playlist);
// Extract video playlist dir to dotenv
if !playlist.starts_with(&app_state.video_path)
&& is_valid_full_path(&app_state.base_path, playlist, false).is_some()
{
HttpResponse::BadRequest().finish()
} else if let Ok(file) = NamedFile::open(playlist) {
file.into_response(&request)
} else {
HttpResponse::NotFound().finish()
}
}
#[get("/video/{path}")]
async fn get_video_part(
request: HttpRequest,
_: Claims,
path: web::Path<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let part = &path.path;
debug!("Video part: {}", part);
let mut file_part = PathBuf::new();
file_part.push(app_state.video_path.clone());
file_part.push(part);
// TODO: Do we need to guard against directory attacks here?
if let Ok(file) = NamedFile::open(&file_part) {
file.into_response(&request)
} else {
error!("Video part not found: {:?}", file_part);
HttpResponse::NotFound().finish()
}
}
#[get("image/favorites")]
async fn favorites(
claims: Claims,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
) -> impl Responder {
match web::block(move || {
favorites_dao
.lock()
.expect("Unable to get FavoritesDao")
.get_favorites(claims.sub.parse::<i32>().unwrap())
})
.await
{
Ok(Ok(favorites)) => {
let favorites = favorites
.into_iter()
.map(|favorite| favorite.path)
.collect::<Vec<String>>();
HttpResponse::Ok().json(PhotosResponse {
photos: favorites,
dirs: Vec::new(),
})
}
Ok(Err(e)) => {
error!("Error getting favorites: {:?}", e);
HttpResponse::InternalServerError().finish()
}
Err(_) => HttpResponse::InternalServerError().finish(),
}
}
#[put("image/favorites")]
async fn put_add_favorite(
claims: Claims,
body: web::Json<AddFavoriteRequest>,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
) -> impl Responder {
if let Ok(user_id) = claims.sub.parse::<i32>() {
let path = body.path.clone();
match web::block::<_, Result<usize, DbError>>(move || {
favorites_dao
.lock()
.expect("Unable to get FavoritesDao")
.add_favorite(user_id, &path)
})
.await
{
Ok(Err(e)) if e.kind == DbErrorKind::AlreadyExists => {
debug!("Favorite: {} exists for user: {}", &body.path, user_id);
HttpResponse::Ok()
}
Ok(Err(e)) => {
info!("{:?} {}. for user: {}", e, body.path, user_id);
HttpResponse::BadRequest()
}
Ok(Ok(_)) => {
debug!("Adding favorite \"{}\" for userid: {}", body.path, user_id);
HttpResponse::Created()
}
Err(e) => {
error!("Blocking error while inserting favorite: {:?}", e);
HttpResponse::InternalServerError()
}
}
} else {
error!("Unable to parse sub as i32: {}", claims.sub);
HttpResponse::BadRequest()
}
}
#[delete("image/favorites")]
async fn delete_favorite(
claims: Claims,
body: web::Query<AddFavoriteRequest>,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
) -> impl Responder {
if let Ok(user_id) = claims.sub.parse::<i32>() {
let path = body.path.clone();
web::block(move || {
favorites_dao
.lock()
.expect("Unable to get favorites dao")
.remove_favorite(user_id, path);
})
.await
.unwrap();
info!(
"Removing favorite \"{}\" for userid: {}",
body.path, user_id
);
HttpResponse::Ok()
} else {
error!("Unable to parse sub as i32: {}", claims.sub);
HttpResponse::BadRequest()
}
}
fn create_thumbnails() {
let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
let thumbnail_directory: &Path = Path::new(thumbs);
let images = PathBuf::from(dotenv::var("BASE_PATH").unwrap());
WalkDir::new(&images)
.into_iter()
.collect::<Vec<Result<_, _>>>()
.into_par_iter()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.file_type().is_file())
.filter(|entry| {
if is_video(entry) {
let relative_path = &entry.path().strip_prefix(&images).unwrap();
let thumb_path = Path::new(thumbnail_directory).join(relative_path);
std::fs::create_dir_all(
thumb_path
.parent()
.unwrap_or_else(|| panic!("Thumbnail {:?} has no parent?", thumb_path)),
)
.expect("Error creating directory");
debug!("Generating video thumbnail: {:?}", thumb_path);
generate_video_thumbnail(entry.path(), &thumb_path);
false
} else {
is_image(entry)
}
})
.filter(|entry| {
let path = entry.path();
let relative_path = &path.strip_prefix(&images).unwrap();
let thumb_path = Path::new(thumbnail_directory).join(relative_path);
!thumb_path.exists()
})
.map(|entry| (image::open(entry.path()), entry.path().to_path_buf()))
.filter(|(img, path)| {
if let Err(e) = img {
error!("Unable to open image: {:?}. {}", path, e);
}
img.is_ok()
})
.map(|(img, path)| (img.unwrap(), path))
.map(|(image, path)| (image.thumbnail(200, u32::MAX), path))
.map(|(image, path)| {
let relative_path = &path.strip_prefix(&images).unwrap();
let thumb_path = Path::new(thumbnail_directory).join(relative_path);
std::fs::create_dir_all(thumb_path.parent().unwrap())
.expect("There was an issue creating directory");
info!("Saving thumbnail: {:?}", thumb_path);
image.save(thumb_path).expect("Failure saving thumbnail");
})
.for_each(drop);
debug!("Finished making thumbnails");
update_media_counts(&images);
}
fn update_media_counts(media_dir: &Path) {
let mut image_count = 0;
let mut video_count = 0;
for ref entry in WalkDir::new(media_dir).into_iter().filter_map(|e| e.ok()) {
if is_image(entry) {
image_count += 1;
} else if is_video(entry) {
video_count += 1;
}
}
IMAGE_GAUGE.set(image_count);
VIDEO_GAUGE.set(video_count);
}
fn is_image(entry: &DirEntry) -> bool {
entry
.path()
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext.to_lowercase())
.map(|ext| ext == "jpg" || ext == "jpeg" || ext == "png" || ext == "nef")
.unwrap_or(false)
}
fn is_video(entry: &DirEntry) -> bool {
entry
.path()
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext.to_lowercase())
.map(|ext| ext == "mp4" || ext == "mov")
.unwrap_or(false)
}
fn main() -> std::io::Result<()> {
if let Err(err) = dotenv::dotenv() {
println!("Error parsing .env {:?}", err);
}
env_logger::init();
run_migrations(&mut connect()).expect("Failed to run migrations");
create_thumbnails();
watch_files();
let system = actix::System::new();
system.block_on(async {
let app_data = Data::new(AppState::default());
let labels = HashMap::new();
let prometheus = PrometheusMetricsBuilder::new("api")
.const_labels(labels)
.build()
.expect("Unable to build prometheus metrics middleware");
prometheus
.registry
.register(Box::new(IMAGE_GAUGE.clone()))
.unwrap();
prometheus
.registry
.register(Box::new(VIDEO_GAUGE.clone()))
.unwrap();
let app_state = app_data.clone();
app_state
.playlist_manager
.do_send(ScanDirectoryMessage {
directory: app_state.base_path.clone(),
});
HttpServer::new(move || {
let user_dao = SqliteUserDao::new();
let favorites_dao = SqliteFavoriteDao::new();
let tag_dao = SqliteTagDao::default();
App::new()
.wrap(middleware::Logger::default())
.service(web::resource("/login").route(web::post().to(login::<SqliteUserDao>)))
.service(
web::resource("/photos")
.route(web::get().to(files::list_photos::<SqliteTagDao, RealFileSystem>)),
)
.service(web::resource("/file/move").post(move_file::<RealFileSystem>))
.service(get_image)
.service(upload_image)
.service(generate_video)
.service(stream_video)
.service(get_video_part)
.service(favorites)
.service(put_add_favorite)
.service(delete_favorite)
.service(get_file_metadata)
.add_feature(add_tag_services::<_, SqliteTagDao>)
.app_data(app_data.clone())
.app_data::<Data<RealFileSystem>>(Data::new(RealFileSystem::new(
app_data.base_path.clone(),
)))
.app_data::<Data<Mutex<SqliteUserDao>>>(Data::new(Mutex::new(user_dao)))
.app_data::<Data<Mutex<Box<dyn FavoriteDao>>>>(Data::new(Mutex::new(Box::new(
favorites_dao,
))))
.app_data::<Data<Mutex<SqliteTagDao>>>(Data::new(Mutex::new(tag_dao)))
.wrap(prometheus.clone())
})
.bind(dotenv::var("BIND_URL").unwrap())?
.bind("localhost:8088")?
.run()
.await
})
}
fn run_migrations(
connection: &mut impl MigrationHarness<Sqlite>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
connection.run_pending_migrations(MIGRATIONS)?;
Ok(())
}
fn watch_files() {
std::thread::spawn(|| {
let (wtx, wrx) = channel();
let mut watcher = RecommendedWatcher::new(wtx, Config::default()).unwrap();
let base_str = dotenv::var("BASE_PATH").unwrap();
let base_path = Path::new(&base_str);
watcher
.watch(base_path, RecursiveMode::Recursive)
.context(format!("Unable to watch BASE_PATH: '{}'", base_str))
.unwrap();
loop {
let ev = wrx.recv();
if let Ok(Ok(event)) = ev {
match event.kind {
EventKind::Create(_) => create_thumbnails(),
EventKind::Modify(kind) => {
debug!("All modified paths: {:?}", event.paths);
debug!("Modify kind: {:?}", kind);
if let Some(orig) = event.paths.first() {
let image_base_path = PathBuf::from(env::var("BASE_PATH").unwrap());
let image_relative = orig.strip_prefix(&image_base_path).unwrap();
if let Ok(old_thumbnail) =
env::var("THUMBNAILS").map(PathBuf::from).map(|mut base| {
base.push(image_relative);
base
})
{
if let Err(e) = std::fs::remove_file(&old_thumbnail) {
error!(
"Error removing thumbnail: {}\n{}",
old_thumbnail.display(),
e
);
} else {
info!("Deleted moved thumbnail: {}", old_thumbnail.display());
create_thumbnails();
}
}
}
}
EventKind::Remove(_) => {
update_media_counts(&PathBuf::from(env::var("BASE_PATH").unwrap()))
}
_ => {}
}
};
}
});
}