Merge pull request 'Update to Actix 4' (#19) from feature/update-to-actix-4 into master
All checks were successful
Core Repos/ImageApi/pipeline/head This commit looks good

Reviewed-on: #19
This commit was merged in pull request #19.
This commit is contained in:
2022-03-02 02:32:29 +00:00
9 changed files with 708 additions and 1075 deletions

1533
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -10,11 +10,11 @@ edition = "2018"
lto = true lto = true
[dependencies] [dependencies]
actix = "0.10" actix = "0.12"
actix-web = "3" actix-web = "4"
actix-rt = "1" actix-rt = "2.6"
actix-files = "0.5" actix-files = "0.6"
actix-multipart = "0.3.0" actix-multipart = "0.4.0"
futures = "0.3.5" futures = "0.3.5"
jsonwebtoken = "7.2.0" jsonwebtoken = "7.2.0"
serde = "1" serde = "1"
@@ -32,7 +32,7 @@ notify = "4.0"
path-absolutize = "3.0.6" path-absolutize = "3.0.6"
log="0.4" log="0.4"
env_logger="0.8" env_logger="0.8"
actix-web-prom = "0.5.1" actix-web-prom = "0.6"
prometheus = "0.11" prometheus = "0.13"
lazy_static = "1.1" lazy_static = "1.1"
anyhow = "1.0" anyhow = "1.0"

2
Jenkinsfile vendored
View File

@@ -1,7 +1,7 @@
pipeline { pipeline {
agent { agent {
docker { docker {
image 'rust:1.55' image 'rust:1.59'
args '-v "$PWD":/usr/src/image-api' args '-v "$PWD":/usr/src/image-api'
} }
} }

View File

@@ -1,5 +1,8 @@
use actix_web::web::{self, HttpResponse, Json};
use actix_web::{post, Responder}; use actix_web::{post, Responder};
use actix_web::{
web::{self, Json},
HttpResponse,
};
use chrono::{Duration, Utc}; use chrono::{Duration, Utc};
use jsonwebtoken::{encode, EncodingKey, Header}; use jsonwebtoken::{encode, EncodingKey, Header};
use log::{debug, error}; use log::{debug, error};
@@ -57,6 +60,7 @@ pub async fn login(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::testhelpers::{BodyReader, TestUserDao}; use crate::testhelpers::{BodyReader, TestUserDao};
@@ -88,7 +92,9 @@ mod tests {
let response = login(j, web::Data::new(Box::new(dao))).await; let response = login(j, web::Data::new(Box::new(dao))).await;
assert_eq!(response.status(), 200); assert_eq!(response.status(), 200);
assert!(response.body().read_to_str().contains("\"token\"")); let response_text: String = response.read_to_str();
assert!(response_text.contains("\"token\""));
} }
#[actix_rt::test] #[actix_rt::test]

View File

@@ -53,7 +53,6 @@ impl FromStr for Claims {
impl FromRequest for Claims { impl FromRequest for Claims {
type Error = Error; type Error = Error;
type Future = Ready<Result<Self, Self::Error>>; type Future = Ready<Result<Self, Self::Error>>;
type Config = ();
fn from_request(req: &HttpRequest, _payload: &mut dev::Payload) -> Self::Future { fn from_request(req: &HttpRequest, _payload: &mut dev::Payload) -> Self::Future {
req.headers() req.headers()

View File

@@ -5,7 +5,7 @@ use std::path::{Path, PathBuf};
use ::anyhow; use ::anyhow;
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use actix_web::web::{HttpResponse, Query}; use actix_web::{web::Query, HttpResponse};
use log::{debug, error}; use log::{debug, error};
@@ -140,7 +140,7 @@ mod tests {
use super::list_photos; use super::list_photos;
use crate::{ use crate::{
data::{Claims, PhotosResponse, ThumbnailRequest}, data::{Claims, PhotosResponse, ThumbnailRequest},
testhelpers::TypedBodyReader, testhelpers::BodyReader,
}; };
use std::fs; use std::fs;
@@ -172,10 +172,11 @@ mod tests {
fs::File::create(temp_photo).unwrap(); fs::File::create(temp_photo).unwrap();
let response: HttpResponse = list_photos(claims, request).await; let response: HttpResponse = list_photos(claims, request).await;
let status = response.status();
let body: PhotosResponse = response.body().read_body(); let body: PhotosResponse = serde_json::from_str(&response.read_to_str()).unwrap();
assert_eq!(response.status(), 200); assert_eq!(status, 200);
assert!(body.photos.contains(&String::from("photo.jpg"))); assert!(body.photos.contains(&String::from("photo.jpg")));
assert!(body.dirs.contains(&String::from("test-dir"))); assert!(body.dirs.contains(&String::from("test-dir")));
assert!(body assert!(body

View File

@@ -2,7 +2,8 @@
extern crate diesel; extern crate diesel;
extern crate rayon; extern crate rayon;
use actix_web_prom::PrometheusMetrics; use actix_web::web::Data;
use actix_web_prom::PrometheusMetricsBuilder;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use prometheus::{self, IntGauge}; use prometheus::{self, IntGauge};
@@ -19,11 +20,9 @@ use actix::prelude::*;
use actix_files::NamedFile; use actix_files::NamedFile;
use actix_multipart as mp; use actix_multipart as mp;
use actix_web::{ use actix_web::{
delete, delete, get, middleware, post, put,
error::BlockingError, web::{self, BufMut, BytesMut},
get, middleware, post, put, App, HttpRequest, HttpResponse, HttpServer, Responder,
web::{self, BufMut, BytesMut, HttpRequest, HttpResponse},
App, HttpServer, Responder,
}; };
use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher}; use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
use rayon::prelude::*; use rayon::prelude::*;
@@ -74,12 +73,12 @@ async fn get_image(
debug!("{:?}", thumb_path); debug!("{:?}", thumb_path);
if let Ok(file) = NamedFile::open(&thumb_path) { if let Ok(file) = NamedFile::open(&thumb_path) {
file.into_response(&request).unwrap() file.into_response(&request)
} else { } else {
HttpResponse::NotFound().finish() HttpResponse::NotFound().finish()
} }
} else if let Ok(file) = NamedFile::open(path) { } else if let Ok(file) = NamedFile::open(path) {
file.into_response(&request).unwrap() file.into_response(&request)
} else { } else {
HttpResponse::NotFound().finish() HttpResponse::NotFound().finish()
} }
@@ -114,20 +113,19 @@ async fn upload_image(_: Claims, mut payload: mp::Multipart) -> impl Responder {
let mut file_path: Option<String> = None; let mut file_path: Option<String> = None;
while let Some(Ok(mut part)) = payload.next().await { while let Some(Ok(mut part)) = payload.next().await {
if let Some(content_type) = part.content_disposition() { let content_type = part.content_disposition();
debug!("{:?}", content_type); debug!("{:?}", content_type);
if let Some(filename) = content_type.get_filename() { if let Some(filename) = content_type.get_filename() {
debug!("Name: {:?}", filename); debug!("Name: {:?}", filename);
file_name = Some(filename.to_string()); file_name = Some(filename.to_string());
while let Some(Ok(data)) = part.next().await { while let Some(Ok(data)) = part.next().await {
file_content.put(data); file_content.put(data);
} }
} else if content_type.get_name().map_or(false, |name| name == "path") { } else if content_type.get_name().map_or(false, |name| name == "path") {
while let Some(Ok(data)) = part.next().await { while let Some(Ok(data)) = part.next().await {
if let Ok(path) = std::str::from_utf8(&data) { if let Ok(path) = std::str::from_utf8(&data) {
file_path = Some(path.to_string()) file_path = Some(path.to_string())
}
} }
} }
} }
@@ -194,7 +192,7 @@ async fn stream_video(
if !playlist.starts_with("tmp") && is_valid_path(playlist) != None { if !playlist.starts_with("tmp") && is_valid_path(playlist) != None {
HttpResponse::BadRequest().finish() HttpResponse::BadRequest().finish()
} else if let Ok(file) = NamedFile::open(playlist) { } else if let Ok(file) = NamedFile::open(playlist) {
file.into_response(&request).unwrap() file.into_response(&request)
} else { } else {
HttpResponse::NotFound().finish() HttpResponse::NotFound().finish()
} }
@@ -210,7 +208,7 @@ async fn get_video_part(
debug!("Video part: {}", part); debug!("Video part: {}", part);
if let Ok(file) = NamedFile::open(String::from("tmp/") + part) { if let Ok(file) = NamedFile::open(String::from("tmp/") + part) {
file.into_response(&request).unwrap() file.into_response(&request)
} else { } else {
error!("Video part not found: tmp/{}", part); error!("Video part not found: tmp/{}", part);
HttpResponse::NotFound().finish() HttpResponse::NotFound().finish()
@@ -222,18 +220,25 @@ async fn favorites(
claims: Claims, claims: Claims,
favorites_dao: web::Data<Box<dyn FavoriteDao>>, favorites_dao: web::Data<Box<dyn FavoriteDao>>,
) -> impl Responder { ) -> impl Responder {
let favorites = match web::block(move || favorites_dao.get_favorites(claims.sub.parse::<i32>().unwrap())).await
web::block(move || favorites_dao.get_favorites(claims.sub.parse::<i32>().unwrap())) {
.await Ok(Ok(favorites)) => {
.unwrap() let favorites = favorites
.into_iter() .into_iter()
.map(|favorite| favorite.path) .map(|favorite| favorite.path)
.collect::<Vec<String>>(); .collect::<Vec<String>>();
HttpResponse::Ok().json(PhotosResponse { HttpResponse::Ok().json(PhotosResponse {
photos: favorites, photos: favorites,
dirs: Vec::new(), dirs: Vec::new(),
}) })
}
Ok(Err(e)) => {
error!("Error getting favorites: {:?}", e);
HttpResponse::InternalServerError().finish()
}
Err(_) => HttpResponse::InternalServerError().finish(),
}
} }
#[put("image/favorites")] #[put("image/favorites")]
@@ -244,21 +249,27 @@ async fn put_add_favorite(
) -> impl Responder { ) -> impl Responder {
if let Ok(user_id) = claims.sub.parse::<i32>() { if let Ok(user_id) = claims.sub.parse::<i32>() {
let path = body.path.clone(); let path = body.path.clone();
match web::block::<_, usize, DbError>(move || favorites_dao.add_favorite(user_id, &path)) match web::block::<_, Result<usize, DbError>>(move || {
.await favorites_dao.add_favorite(user_id, &path)
})
.await
{ {
Err(BlockingError::Error(e)) if e.kind == DbErrorKind::AlreadyExists => { Ok(Err(e)) if e.kind == DbErrorKind::AlreadyExists => {
debug!("Favorite: {} exists for user: {}", &body.path, user_id); debug!("Favorite: {} exists for user: {}", &body.path, user_id);
HttpResponse::Ok() HttpResponse::Ok()
} }
Err(e) => { Ok(Err(e)) => {
info!("{:?} {}. for user: {}", e, body.path, user_id); info!("{:?} {}. for user: {}", e, body.path, user_id);
HttpResponse::BadRequest() HttpResponse::BadRequest()
} }
Ok(_) => { Ok(Ok(_)) => {
debug!("Adding favorite \"{}\" for userid: {}", body.path, user_id); debug!("Adding favorite \"{}\" for userid: {}", body.path, user_id);
HttpResponse::Created() HttpResponse::Created()
} }
Err(e) => {
error!("Blocking error while inserting favorite: {:?}", e);
HttpResponse::InternalServerError()
}
} }
} else { } else {
error!("Unable to parse sub as i32: {}", claims.sub); error!("Unable to parse sub as i32: {}", claims.sub);
@@ -274,9 +285,8 @@ async fn delete_favorite(
) -> impl Responder { ) -> impl Responder {
if let Ok(user_id) = claims.sub.parse::<i32>() { if let Ok(user_id) = claims.sub.parse::<i32>() {
let path = body.path.clone(); let path = body.path.clone();
web::block::<_, _, String>(move || { web::block(move || {
favorites_dao.remove_favorite(user_id, path); favorites_dao.remove_favorite(user_id, path);
Ok(())
}) })
.await .await
.unwrap(); .unwrap();
@@ -440,50 +450,55 @@ fn main() -> std::io::Result<()> {
} }
}); });
let system = actix::System::new("image-api"); let system = actix::System::new();
let act = StreamActor {}.start(); system.block_on(async {
let act = StreamActor {}.start();
let app_data = web::Data::new(AppState { let app_data = web::Data::new(AppState {
stream_manager: Arc::new(act), stream_manager: Arc::new(act),
}); });
let labels = HashMap::new(); let labels = HashMap::new();
let prometheus = PrometheusMetrics::new("", Some("/metrics"), Some(labels)); let prometheus = PrometheusMetricsBuilder::new("api")
prometheus .const_labels(labels)
.registry .build()
.register(Box::new(IMAGE_GAUGE.clone())) .expect("Unable to build prometheus metrics middleware");
.unwrap();
prometheus
.registry
.register(Box::new(VIDEO_GAUGE.clone()))
.unwrap();
HttpServer::new(move || { prometheus
let user_dao = SqliteUserDao::new(); .registry
let favorites_dao = SqliteFavoriteDao::new(); .register(Box::new(IMAGE_GAUGE.clone()))
App::new() .unwrap();
.wrap(middleware::Logger::default()) prometheus
.service(web::resource("/login").route(web::post().to(login))) .registry
.service(web::resource("/photos").route(web::get().to(files::list_photos))) .register(Box::new(VIDEO_GAUGE.clone()))
.service(get_image) .unwrap();
.service(upload_image)
.service(generate_video) HttpServer::new(move || {
.service(stream_video) let user_dao = SqliteUserDao::new();
.service(get_video_part) let favorites_dao = SqliteFavoriteDao::new();
.service(favorites) App::new()
.service(put_add_favorite) .wrap(middleware::Logger::default())
.service(delete_favorite) .service(web::resource("/login").route(web::post().to(login)))
.service(get_file_metadata) .service(web::resource("/photos").route(web::get().to(files::list_photos)))
.app_data(app_data.clone()) .service(get_image)
.data::<Box<dyn UserDao>>(Box::new(user_dao)) .service(upload_image)
.data::<Box<dyn FavoriteDao>>(Box::new(favorites_dao)) .service(generate_video)
.wrap(prometheus.clone()) .service(stream_video)
.service(get_video_part)
.service(favorites)
.service(put_add_favorite)
.service(delete_favorite)
.service(get_file_metadata)
.app_data(app_data.clone())
.app_data::<Data<Box<dyn UserDao>>>(Data::new(Box::new(user_dao)))
.app_data::<Data<Box<dyn FavoriteDao>>>(Data::new(Box::new(favorites_dao)))
.wrap(prometheus.clone())
})
.bind(dotenv::var("BIND_URL").unwrap())?
.bind("localhost:8088")?
.run()
.await
}) })
.bind(dotenv::var("BIND_URL").unwrap())?
.bind("localhost:8088")?
.run();
system.run()
} }
struct AppState { struct AppState {

View File

@@ -1,5 +1,6 @@
use actix_web::dev::{Body, ResponseBody}; use actix_web::body::MessageBody;
use serde::Deserialize; use actix_web::{body::BoxBody, HttpResponse};
use serde::de::DeserializeOwned;
use crate::database::{models::User, UserDao}; use crate::database::{models::User, UserDao};
use std::cell::RefCell; use std::cell::RefCell;
@@ -55,32 +56,26 @@ impl UserDao for TestUserDao {
} }
pub trait BodyReader { pub trait BodyReader {
fn read_to_str(&self) -> &str; fn read_to_str(self) -> String;
} }
impl BodyReader for ResponseBody<Body> { impl BodyReader for HttpResponse<BoxBody> {
fn read_to_str(&self) -> &str { fn read_to_str(self) -> String {
match self { let body = self.into_body().try_into_bytes().unwrap();
ResponseBody::Body(Body::Bytes(ref b)) => std::str::from_utf8(b).unwrap(), std::str::from_utf8(&body).unwrap().to_string()
_ => panic!("Unknown response body"),
}
} }
} }
pub trait TypedBodyReader<'a, T> pub trait TypedBodyReader<T>
where where
T: Deserialize<'a>, T: DeserializeOwned,
{ {
fn read_body(&'a self) -> T; fn read_body(self) -> T;
} }
impl<'a, T: Deserialize<'a>> TypedBodyReader<'a, T> for ResponseBody<Body> { impl<T: DeserializeOwned> TypedBodyReader<T> for HttpResponse<BoxBody> {
fn read_body(&'a self) -> T { fn read_body(self) -> T {
match self { let body = self.read_to_str();
ResponseBody::Body(Body::Bytes(ref b)) => { serde_json::from_value(serde_json::Value::String(body.clone())).unwrap()
serde_json::from_str(std::str::from_utf8(b).unwrap()).unwrap()
}
_ => panic!("Unknown response body"),
}
} }
} }

View File

@@ -67,7 +67,7 @@ pub async fn create_playlist(video_path: &str, playlist_file: &str) -> Result<Ch
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();
loop { loop {
actix::clock::delay_for(std::time::Duration::from_secs(1)).await; actix::clock::sleep(std::time::Duration::from_secs(1)).await;
if Path::new(playlist_file).exists() if Path::new(playlist_file).exists()
|| std::time::Instant::now() - start_time > std::time::Duration::from_secs(5) || std::time::Instant::now() - start_time > std::time::Duration::from_secs(5)