Merge branch 'master' into feature/tagging
This commit is contained in:
247
src/main.rs
247
src/main.rs
@@ -2,12 +2,13 @@
|
||||
extern crate diesel;
|
||||
extern crate rayon;
|
||||
|
||||
use actix_web_prom::PrometheusMetrics;
|
||||
use actix_web::web::Data;
|
||||
use actix_web_prom::PrometheusMetricsBuilder;
|
||||
use chrono::Utc;
|
||||
use futures::stream::StreamExt;
|
||||
use lazy_static::lazy_static;
|
||||
use prometheus::{self, IntGauge};
|
||||
use std::sync::{mpsc::channel, Arc};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::{collections::HashMap, io::prelude::*};
|
||||
use std::{env, fs::File};
|
||||
use std::{
|
||||
@@ -16,15 +17,12 @@ use std::{
|
||||
};
|
||||
use walkdir::{DirEntry, WalkDir};
|
||||
|
||||
use actix::prelude::*;
|
||||
use actix_files::NamedFile;
|
||||
use actix_multipart as mp;
|
||||
use actix_web::{
|
||||
delete,
|
||||
error::BlockingError,
|
||||
get, middleware, post, put,
|
||||
web::{self, BufMut, BytesMut, HttpRequest, HttpResponse},
|
||||
App, HttpServer, Responder,
|
||||
delete, get, middleware, post, put,
|
||||
web::{self, BufMut, BytesMut},
|
||||
App, HttpRequest, HttpResponse, HttpServer, Responder,
|
||||
};
|
||||
use diesel::prelude::*;
|
||||
use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
|
||||
@@ -35,14 +33,16 @@ use log::{debug, error, info};
|
||||
use crate::auth::login;
|
||||
use crate::data::*;
|
||||
use crate::database::*;
|
||||
use crate::files::{is_image_or_video, is_valid_path};
|
||||
use crate::files::{is_image_or_video, is_valid_full_path};
|
||||
use crate::models::{InsertTag, InsertTaggedPhoto, Tag, TaggedPhoto};
|
||||
use crate::state::AppState;
|
||||
use crate::video::*;
|
||||
|
||||
mod auth;
|
||||
mod data;
|
||||
pub mod database;
|
||||
mod files;
|
||||
mod state;
|
||||
mod video;
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -66,23 +66,25 @@ async fn get_image(
|
||||
_claims: Claims,
|
||||
request: HttpRequest,
|
||||
req: web::Query<ThumbnailRequest>,
|
||||
app_state: web::Data<AppState>,
|
||||
) -> impl Responder {
|
||||
if let Some(path) = is_valid_path(&req.path) {
|
||||
if let Some(path) = is_valid_full_path(&app_state.base_path, &req.path) {
|
||||
if req.size.is_some() {
|
||||
let thumbs = dotenv::var("THUMBNAILS").unwrap();
|
||||
let relative_path = path
|
||||
.strip_prefix(dotenv::var("BASE_PATH").unwrap())
|
||||
.expect("Error stripping prefix");
|
||||
.strip_prefix(&app_state.base_path)
|
||||
.expect("Error stripping base path prefix from thumbnail");
|
||||
|
||||
let thumbs = &app_state.thumbnail_path;
|
||||
let thumb_path = Path::new(&thumbs).join(relative_path);
|
||||
|
||||
debug!("{:?}", thumb_path);
|
||||
if let Ok(file) = NamedFile::open(&thumb_path) {
|
||||
file.into_response(&request).unwrap()
|
||||
file.into_response(&request)
|
||||
} else {
|
||||
HttpResponse::NotFound().finish()
|
||||
}
|
||||
} else if let Ok(file) = NamedFile::open(path) {
|
||||
file.into_response(&request).unwrap()
|
||||
file.into_response(&request)
|
||||
} else {
|
||||
HttpResponse::NotFound().finish()
|
||||
}
|
||||
@@ -93,8 +95,12 @@ async fn get_image(
|
||||
}
|
||||
|
||||
#[get("/image/metadata")]
|
||||
async fn get_file_metadata(_: Claims, path: web::Query<ThumbnailRequest>) -> impl Responder {
|
||||
match is_valid_path(&path.path)
|
||||
async fn get_file_metadata(
|
||||
_: Claims,
|
||||
path: web::Query<ThumbnailRequest>,
|
||||
app_state: web::Data<AppState>,
|
||||
) -> impl Responder {
|
||||
match is_valid_full_path(&app_state.base_path, &path.path)
|
||||
.ok_or_else(|| ErrorKind::InvalidData.into())
|
||||
.and_then(File::open)
|
||||
.and_then(|file| file.metadata())
|
||||
@@ -111,35 +117,40 @@ async fn get_file_metadata(_: Claims, path: web::Query<ThumbnailRequest>) -> imp
|
||||
}
|
||||
|
||||
#[post("/image")]
|
||||
async fn upload_image(_: Claims, mut payload: mp::Multipart) -> impl Responder {
|
||||
async fn upload_image(
|
||||
_: Claims,
|
||||
mut payload: mp::Multipart,
|
||||
app_state: web::Data<AppState>,
|
||||
) -> impl Responder {
|
||||
let mut file_content: BytesMut = BytesMut::new();
|
||||
let mut file_name: Option<String> = None;
|
||||
let mut file_path: Option<String> = None;
|
||||
|
||||
while let Some(Ok(mut part)) = payload.next().await {
|
||||
if let Some(content_type) = part.content_disposition() {
|
||||
debug!("{:?}", content_type);
|
||||
if let Some(filename) = content_type.get_filename() {
|
||||
debug!("Name: {:?}", filename);
|
||||
file_name = Some(filename.to_string());
|
||||
let content_type = part.content_disposition();
|
||||
debug!("{:?}", content_type);
|
||||
if let Some(filename) = content_type.get_filename() {
|
||||
debug!("Name: {:?}", filename);
|
||||
file_name = Some(filename.to_string());
|
||||
|
||||
while let Some(Ok(data)) = part.next().await {
|
||||
file_content.put(data);
|
||||
}
|
||||
} else if content_type.get_name().map_or(false, |name| name == "path") {
|
||||
while let Some(Ok(data)) = part.next().await {
|
||||
if let Ok(path) = std::str::from_utf8(&data) {
|
||||
file_path = Some(path.to_string())
|
||||
}
|
||||
while let Some(Ok(data)) = part.next().await {
|
||||
file_content.put(data);
|
||||
}
|
||||
} else if content_type.get_name().map_or(false, |name| name == "path") {
|
||||
while let Some(Ok(data)) = part.next().await {
|
||||
if let Ok(path) = std::str::from_utf8(&data) {
|
||||
file_path = Some(path.to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let path = file_path.unwrap_or_else(|| dotenv::var("BASE_PATH").unwrap());
|
||||
let path = file_path.unwrap_or_else(|| app_state.base_path.clone());
|
||||
if !file_content.is_empty() {
|
||||
let full_path = PathBuf::from(&path).join(file_name.unwrap());
|
||||
if let Some(full_path) = is_valid_path(full_path.to_str().unwrap_or("")) {
|
||||
if let Some(full_path) =
|
||||
is_valid_full_path(&app_state.base_path, full_path.to_str().unwrap_or(""))
|
||||
{
|
||||
if !full_path.is_file() && is_image_or_video(&full_path) {
|
||||
let mut file = File::create(full_path).unwrap();
|
||||
file.write_all(&file_content).unwrap();
|
||||
@@ -160,7 +171,7 @@ async fn upload_image(_: Claims, mut payload: mp::Multipart) -> impl Responder {
|
||||
#[post("/video/generate")]
|
||||
async fn generate_video(
|
||||
_claims: Claims,
|
||||
data: web::Data<AppState>,
|
||||
app_state: web::Data<AppState>,
|
||||
body: web::Json<ThumbnailRequest>,
|
||||
) -> impl Responder {
|
||||
let filename = PathBuf::from(&body.path);
|
||||
@@ -168,9 +179,10 @@ async fn generate_video(
|
||||
if let Some(name) = filename.file_stem() {
|
||||
let filename = name.to_str().expect("Filename should convert to string");
|
||||
let playlist = format!("tmp/{}.m3u8", filename);
|
||||
if let Some(path) = is_valid_path(&body.path) {
|
||||
if let Some(path) = is_valid_full_path(&app_state.base_path, &body.path) {
|
||||
if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await {
|
||||
data.stream_manager
|
||||
app_state
|
||||
.stream_manager
|
||||
.do_send(ProcessMessage(playlist.clone(), child));
|
||||
}
|
||||
} else {
|
||||
@@ -189,15 +201,16 @@ async fn stream_video(
|
||||
request: HttpRequest,
|
||||
_: Claims,
|
||||
path: web::Query<ThumbnailRequest>,
|
||||
app_state: web::Data<AppState>,
|
||||
) -> impl Responder {
|
||||
let playlist = &path.path;
|
||||
debug!("Playlist: {}", playlist);
|
||||
|
||||
// Extract video playlist dir to dotenv
|
||||
if !playlist.starts_with("tmp") && is_valid_path(playlist) != None {
|
||||
if !playlist.starts_with("tmp") && is_valid_full_path(&app_state.base_path, playlist) != None {
|
||||
HttpResponse::BadRequest().finish()
|
||||
} else if let Ok(file) = NamedFile::open(playlist) {
|
||||
file.into_response(&request).unwrap()
|
||||
file.into_response(&request)
|
||||
} else {
|
||||
HttpResponse::NotFound().finish()
|
||||
}
|
||||
@@ -213,7 +226,7 @@ async fn get_video_part(
|
||||
debug!("Video part: {}", part);
|
||||
|
||||
if let Ok(file) = NamedFile::open(String::from("tmp/") + part) {
|
||||
file.into_response(&request).unwrap()
|
||||
file.into_response(&request)
|
||||
} else {
|
||||
error!("Video part not found: tmp/{}", part);
|
||||
HttpResponse::NotFound().finish()
|
||||
@@ -225,18 +238,25 @@ async fn favorites(
|
||||
claims: Claims,
|
||||
favorites_dao: web::Data<Box<dyn FavoriteDao>>,
|
||||
) -> impl Responder {
|
||||
let favorites =
|
||||
web::block(move || favorites_dao.get_favorites(claims.sub.parse::<i32>().unwrap()))
|
||||
.await
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|favorite| favorite.path)
|
||||
.collect::<Vec<String>>();
|
||||
match web::block(move || favorites_dao.get_favorites(claims.sub.parse::<i32>().unwrap())).await
|
||||
{
|
||||
Ok(Ok(favorites)) => {
|
||||
let favorites = favorites
|
||||
.into_iter()
|
||||
.map(|favorite| favorite.path)
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
HttpResponse::Ok().json(PhotosResponse {
|
||||
photos: favorites,
|
||||
dirs: Vec::new(),
|
||||
})
|
||||
HttpResponse::Ok().json(PhotosResponse {
|
||||
photos: favorites,
|
||||
dirs: Vec::new(),
|
||||
})
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
error!("Error getting favorites: {:?}", e);
|
||||
HttpResponse::InternalServerError().finish()
|
||||
}
|
||||
Err(_) => HttpResponse::InternalServerError().finish(),
|
||||
}
|
||||
}
|
||||
|
||||
#[put("image/favorites")]
|
||||
@@ -247,21 +267,27 @@ async fn put_add_favorite(
|
||||
) -> impl Responder {
|
||||
if let Ok(user_id) = claims.sub.parse::<i32>() {
|
||||
let path = body.path.clone();
|
||||
match web::block::<_, usize, DbError>(move || favorites_dao.add_favorite(user_id, &path))
|
||||
.await
|
||||
match web::block::<_, Result<usize, DbError>>(move || {
|
||||
favorites_dao.add_favorite(user_id, &path)
|
||||
})
|
||||
.await
|
||||
{
|
||||
Err(BlockingError::Error(e)) if e.kind == DbErrorKind::AlreadyExists => {
|
||||
Ok(Err(e)) if e.kind == DbErrorKind::AlreadyExists => {
|
||||
debug!("Favorite: {} exists for user: {}", &body.path, user_id);
|
||||
HttpResponse::Ok()
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{:?} {}. for user: {}", e, body.path, user_id);
|
||||
Ok(Err(e)) => {
|
||||
info!("{:?} {}. for user: {}", e, body.path, user_id);
|
||||
HttpResponse::BadRequest()
|
||||
}
|
||||
Ok(_) => {
|
||||
info!("Adding favorite \"{}\" for userid: {}", body.path, user_id);
|
||||
Ok(Ok(_)) => {
|
||||
debug!("Adding favorite \"{}\" for userid: {}", body.path, user_id);
|
||||
HttpResponse::Created()
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Blocking error while inserting favorite: {:?}", e);
|
||||
HttpResponse::InternalServerError()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!("Unable to parse sub as i32: {}", claims.sub);
|
||||
@@ -277,9 +303,8 @@ async fn delete_favorite(
|
||||
) -> impl Responder {
|
||||
if let Ok(user_id) = claims.sub.parse::<i32>() {
|
||||
let path = body.path.clone();
|
||||
web::block::<_, _, String>(move || {
|
||||
web::block(move || {
|
||||
favorites_dao.remove_favorite(user_id, path);
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -538,7 +563,59 @@ fn main() -> std::io::Result<()> {
|
||||
env_logger::init();
|
||||
|
||||
create_thumbnails();
|
||||
watch_files();
|
||||
|
||||
let system = actix::System::new();
|
||||
system.block_on(async {
|
||||
let app_data = web::Data::new(AppState::default());
|
||||
|
||||
let labels = HashMap::new();
|
||||
let prometheus = PrometheusMetricsBuilder::new("api")
|
||||
.const_labels(labels)
|
||||
.build()
|
||||
.expect("Unable to build prometheus metrics middleware");
|
||||
|
||||
prometheus
|
||||
.registry
|
||||
.register(Box::new(IMAGE_GAUGE.clone()))
|
||||
.unwrap();
|
||||
prometheus
|
||||
.registry
|
||||
.register(Box::new(VIDEO_GAUGE.clone()))
|
||||
.unwrap();
|
||||
|
||||
HttpServer::new(move || {
|
||||
let user_dao = SqliteUserDao::new();
|
||||
let favorites_dao = SqliteFavoriteDao::new();
|
||||
App::new()
|
||||
.wrap(middleware::Logger::default())
|
||||
.service(web::resource("/login").route(web::post().to(login::<SqliteUserDao>)))
|
||||
.service(web::resource("/photos").route(web::get().to(files::list_photos)))
|
||||
.service(get_image)
|
||||
.service(upload_image)
|
||||
.service(generate_video)
|
||||
.service(stream_video)
|
||||
.service(get_video_part)
|
||||
.service(favorites)
|
||||
.service(put_add_favorite)
|
||||
.service(delete_favorite)
|
||||
.service(get_file_metadata)
|
||||
.service(add_tag)
|
||||
.service(get_tags)
|
||||
.service(remove_tagged_photo)
|
||||
.app_data(app_data.clone())
|
||||
.app_data::<Data<SqliteUserDao>>(Data::new(user_dao))
|
||||
.app_data::<Data<Box<dyn FavoriteDao>>>(Data::new(Box::new(favorites_dao)))
|
||||
.wrap(prometheus.clone())
|
||||
})
|
||||
.bind(dotenv::var("BIND_URL").unwrap())?
|
||||
.bind("localhost:8088")?
|
||||
.run()
|
||||
.await
|
||||
})
|
||||
}
|
||||
|
||||
fn watch_files() {
|
||||
std::thread::spawn(|| {
|
||||
let (wtx, wrx) = channel();
|
||||
let mut watcher = watcher(wtx, std::time::Duration::from_secs(10)).unwrap();
|
||||
@@ -581,56 +658,4 @@ fn main() -> std::io::Result<()> {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let system = actix::System::new("image-api");
|
||||
let act = StreamActor {}.start();
|
||||
|
||||
let app_data = web::Data::new(AppState {
|
||||
stream_manager: Arc::new(act),
|
||||
});
|
||||
|
||||
let labels = HashMap::new();
|
||||
let prometheus = PrometheusMetrics::new("", Some("/metrics"), Some(labels));
|
||||
prometheus
|
||||
.registry
|
||||
.register(Box::new(IMAGE_GAUGE.clone()))
|
||||
.unwrap();
|
||||
prometheus
|
||||
.registry
|
||||
.register(Box::new(VIDEO_GAUGE.clone()))
|
||||
.unwrap();
|
||||
|
||||
HttpServer::new(move || {
|
||||
let user_dao = SqliteUserDao::new();
|
||||
let favorites_dao = SqliteFavoriteDao::new();
|
||||
App::new()
|
||||
.wrap(middleware::Logger::default())
|
||||
.service(web::resource("/login").route(web::post().to(login)))
|
||||
.service(web::resource("/photos").route(web::get().to(files::list_photos)))
|
||||
.service(get_image)
|
||||
.service(upload_image)
|
||||
.service(generate_video)
|
||||
.service(stream_video)
|
||||
.service(get_video_part)
|
||||
.service(favorites)
|
||||
.service(put_add_favorite)
|
||||
.service(delete_favorite)
|
||||
.service(get_file_metadata)
|
||||
.service(add_tag)
|
||||
.service(get_tags)
|
||||
.service(remove_tagged_photo)
|
||||
.app_data(app_data.clone())
|
||||
.data::<Box<dyn UserDao>>(Box::new(user_dao))
|
||||
.data::<Box<dyn FavoriteDao>>(Box::new(favorites_dao))
|
||||
.wrap(prometheus.clone())
|
||||
})
|
||||
.bind(dotenv::var("BIND_URL").unwrap())?
|
||||
.bind("localhost:8088")?
|
||||
.run();
|
||||
|
||||
system.run()
|
||||
}
|
||||
|
||||
struct AppState {
|
||||
stream_manager: Arc<Addr<StreamActor>>,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user