Update to Actix 4
Some checks failed
Core Repos/ImageApi/pipeline/pr-master There was a failure building this commit

This commit is contained in:
Cameron Cordes
2022-03-01 20:38:41 -05:00
parent 1e3f33c2d3
commit 69fe307516
8 changed files with 709 additions and 1074 deletions

View File

@@ -2,7 +2,8 @@
extern crate diesel;
extern crate rayon;
use actix_web_prom::PrometheusMetrics;
use actix_web::web::Data;
use actix_web_prom::PrometheusMetricsBuilder;
use futures::stream::StreamExt;
use lazy_static::lazy_static;
use prometheus::{self, IntGauge};
@@ -19,11 +20,9 @@ use actix::prelude::*;
use actix_files::NamedFile;
use actix_multipart as mp;
use actix_web::{
delete,
error::BlockingError,
get, middleware, post, put,
web::{self, BufMut, BytesMut, HttpRequest, HttpResponse},
App, HttpServer, Responder,
delete, get, middleware, post, put,
web::{self, BufMut, BytesMut},
App, HttpRequest, HttpResponse, HttpServer, Responder,
};
use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
use rayon::prelude::*;
@@ -74,12 +73,12 @@ async fn get_image(
debug!("{:?}", thumb_path);
if let Ok(file) = NamedFile::open(&thumb_path) {
file.into_response(&request).unwrap()
file.into_response(&request)
} else {
HttpResponse::NotFound().finish()
}
} else if let Ok(file) = NamedFile::open(path) {
file.into_response(&request).unwrap()
file.into_response(&request)
} else {
HttpResponse::NotFound().finish()
}
@@ -114,20 +113,19 @@ async fn upload_image(_: Claims, mut payload: mp::Multipart) -> impl Responder {
let mut file_path: Option<String> = None;
while let Some(Ok(mut part)) = payload.next().await {
if let Some(content_type) = part.content_disposition() {
debug!("{:?}", content_type);
if let Some(filename) = content_type.get_filename() {
debug!("Name: {:?}", filename);
file_name = Some(filename.to_string());
let content_type = part.content_disposition();
debug!("{:?}", content_type);
if let Some(filename) = content_type.get_filename() {
debug!("Name: {:?}", filename);
file_name = Some(filename.to_string());
while let Some(Ok(data)) = part.next().await {
file_content.put(data);
}
} else if content_type.get_name().map_or(false, |name| name == "path") {
while let Some(Ok(data)) = part.next().await {
if let Ok(path) = std::str::from_utf8(&data) {
file_path = Some(path.to_string())
}
while let Some(Ok(data)) = part.next().await {
file_content.put(data);
}
} else if content_type.get_name().map_or(false, |name| name == "path") {
while let Some(Ok(data)) = part.next().await {
if let Ok(path) = std::str::from_utf8(&data) {
file_path = Some(path.to_string())
}
}
}
@@ -194,7 +192,7 @@ async fn stream_video(
if !playlist.starts_with("tmp") && is_valid_path(playlist) != None {
HttpResponse::BadRequest().finish()
} else if let Ok(file) = NamedFile::open(playlist) {
file.into_response(&request).unwrap()
file.into_response(&request)
} else {
HttpResponse::NotFound().finish()
}
@@ -210,7 +208,7 @@ async fn get_video_part(
debug!("Video part: {}", part);
if let Ok(file) = NamedFile::open(String::from("tmp/") + part) {
file.into_response(&request).unwrap()
file.into_response(&request)
} else {
error!("Video part not found: tmp/{}", part);
HttpResponse::NotFound().finish()
@@ -222,18 +220,25 @@ async fn favorites(
claims: Claims,
favorites_dao: web::Data<Box<dyn FavoriteDao>>,
) -> impl Responder {
let favorites =
web::block(move || favorites_dao.get_favorites(claims.sub.parse::<i32>().unwrap()))
.await
.unwrap()
.into_iter()
.map(|favorite| favorite.path)
.collect::<Vec<String>>();
match web::block(move || favorites_dao.get_favorites(claims.sub.parse::<i32>().unwrap())).await
{
Ok(Ok(favorites)) => {
let favorites = favorites
.into_iter()
.map(|favorite| favorite.path)
.collect::<Vec<String>>();
HttpResponse::Ok().json(PhotosResponse {
photos: favorites,
dirs: Vec::new(),
})
HttpResponse::Ok().json(PhotosResponse {
photos: favorites,
dirs: Vec::new(),
})
}
Ok(Err(e)) => {
error!("Error getting favorites: {:?}", e);
HttpResponse::InternalServerError().finish()
}
Err(_) => HttpResponse::InternalServerError().finish(),
}
}
#[put("image/favorites")]
@@ -244,21 +249,27 @@ async fn put_add_favorite(
) -> impl Responder {
if let Ok(user_id) = claims.sub.parse::<i32>() {
let path = body.path.clone();
match web::block::<_, usize, DbError>(move || favorites_dao.add_favorite(user_id, &path))
.await
match web::block::<_, Result<usize, DbError>>(move || {
favorites_dao.add_favorite(user_id, &path)
})
.await
{
Err(BlockingError::Error(e)) if e.kind == DbErrorKind::AlreadyExists => {
Ok(Err(e)) if e.kind == DbErrorKind::AlreadyExists => {
debug!("Favorite: {} exists for user: {}", &body.path, user_id);
HttpResponse::Ok()
}
Err(e) => {
Ok(Err(e)) => {
info!("{:?} {}. for user: {}", e, body.path, user_id);
HttpResponse::BadRequest()
}
Ok(_) => {
Ok(Ok(_)) => {
debug!("Adding favorite \"{}\" for userid: {}", body.path, user_id);
HttpResponse::Created()
}
Err(e) => {
error!("Blocking error while inserting favorite: {:?}", e);
HttpResponse::InternalServerError()
}
}
} else {
error!("Unable to parse sub as i32: {}", claims.sub);
@@ -274,9 +285,8 @@ async fn delete_favorite(
) -> impl Responder {
if let Ok(user_id) = claims.sub.parse::<i32>() {
let path = body.path.clone();
web::block::<_, _, String>(move || {
web::block(move || {
favorites_dao.remove_favorite(user_id, path);
Ok(())
})
.await
.unwrap();
@@ -440,50 +450,55 @@ fn main() -> std::io::Result<()> {
}
});
let system = actix::System::new("image-api");
let act = StreamActor {}.start();
let system = actix::System::new();
system.block_on(async {
let act = StreamActor {}.start();
let app_data = web::Data::new(AppState {
stream_manager: Arc::new(act),
});
let app_data = web::Data::new(AppState {
stream_manager: Arc::new(act),
});
let labels = HashMap::new();
let prometheus = PrometheusMetrics::new("", Some("/metrics"), Some(labels));
prometheus
.registry
.register(Box::new(IMAGE_GAUGE.clone()))
.unwrap();
prometheus
.registry
.register(Box::new(VIDEO_GAUGE.clone()))
.unwrap();
let labels = HashMap::new();
let prometheus = PrometheusMetricsBuilder::new("api")
.const_labels(labels)
.build()
.expect("Unable to build prometheus metrics middleware");
HttpServer::new(move || {
let user_dao = SqliteUserDao::new();
let favorites_dao = SqliteFavoriteDao::new();
App::new()
.wrap(middleware::Logger::default())
.service(web::resource("/login").route(web::post().to(login)))
.service(web::resource("/photos").route(web::get().to(files::list_photos)))
.service(get_image)
.service(upload_image)
.service(generate_video)
.service(stream_video)
.service(get_video_part)
.service(favorites)
.service(put_add_favorite)
.service(delete_favorite)
.service(get_file_metadata)
.app_data(app_data.clone())
.data::<Box<dyn UserDao>>(Box::new(user_dao))
.data::<Box<dyn FavoriteDao>>(Box::new(favorites_dao))
.wrap(prometheus.clone())
prometheus
.registry
.register(Box::new(IMAGE_GAUGE.clone()))
.unwrap();
prometheus
.registry
.register(Box::new(VIDEO_GAUGE.clone()))
.unwrap();
HttpServer::new(move || {
let user_dao = SqliteUserDao::new();
let favorites_dao = SqliteFavoriteDao::new();
App::new()
.wrap(middleware::Logger::default())
.service(web::resource("/login").route(web::post().to(login)))
.service(web::resource("/photos").route(web::get().to(files::list_photos)))
.service(get_image)
.service(upload_image)
.service(generate_video)
.service(stream_video)
.service(get_video_part)
.service(favorites)
.service(put_add_favorite)
.service(delete_favorite)
.service(get_file_metadata)
.app_data(app_data.clone())
.app_data::<Data<Box<dyn UserDao>>>(Data::new(Box::new(user_dao)))
.app_data::<Data<Box<dyn FavoriteDao>>>(Data::new(Box::new(favorites_dao)))
.wrap(prometheus.clone())
})
.bind(dotenv::var("BIND_URL").unwrap())?
.bind("localhost:8088")?
.run()
.await
})
.bind(dotenv::var("BIND_URL").unwrap())?
.bind("localhost:8088")?
.run();
system.run()
}
struct AppState {