Compare commits

6 Commits

Author SHA1 Message Date
Cameron Cordes
39d284dbbb Update dependencies
Some checks failed
Core Repos/ImageApi/pipeline/pr-master Something is wrong with the build of this commit
2021-10-11 21:48:44 -04:00
Cameron Cordes
125ba6192e Elevate insertion logs to info and fix error logs 2021-10-11 21:33:22 -04:00
Cameron Cordes
50d557001b Add created timestamps for tags 2021-10-11 21:33:22 -04:00
Cameron Cordes
cf9dd826c1 Improve add tag endpoint and add get tag endpoint
Flattened out the add tag logic to make it more functional.
2021-10-11 21:33:18 -04:00
Cameron Cordes
4834cacfc3 Create Tag tables and Add Tag endpoint 2021-10-10 22:05:36 -04:00
Cameron Cordes
51081d01c6 Update dependencies 2021-10-10 22:04:01 -04:00
24 changed files with 2142 additions and 6438 deletions

1
.gitignore vendored
View File

@@ -2,7 +2,6 @@
database/target
*.db
.env
/tmp
# Default ignored files
.idea/shelf/

1
.idea/image-api.iml generated
View File

@@ -3,7 +3,6 @@
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/.idea/dataSources" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />

4300
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,8 +1,8 @@
[package]
name = "image-api"
version = "0.3.1"
version = "0.1.0"
authors = ["Cameron Cordes <cameronc.dev@gmail.com>"]
edition = "2024"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -10,37 +10,29 @@ edition = "2024"
lto = true
[dependencies]
actix = "0.13.1"
actix-web = "4"
actix-rt = "2.6"
tokio = { version = "1.42.0", features = ["default", "process", "sync"] }
actix-files = "0.6"
actix-multipart = "0.7.2"
actix = "0.10"
actix-web = "3"
actix-rt = "1"
actix-files = "0.5"
actix-multipart = "0.3.0"
futures = "0.3.5"
jsonwebtoken = "9.3.0"
jsonwebtoken = "7.2.0"
serde = "1"
serde_json = "1"
diesel = { version = "2.2.10", features = ["sqlite"] }
diesel_migrations = "2.2.0"
diesel = { version = "1.4.8", features = ["sqlite"] }
hmac = "0.11"
sha2 = "0.9"
chrono = "0.4"
dotenv = "0.15"
bcrypt = "0.16.0"
image = { version = "0.25.5", default-features = false, features = ["jpeg", "png", "rayon"] }
walkdir = "2.4.0"
bcrypt = "0.9"
image = { version = "0.23", default-features = false, features = ["jpeg", "png", "jpeg_rayon"] }
walkdir = "2"
rayon = "1.5"
notify = "6.1.1"
path-absolutize = "3.1"
log = "0.4"
env_logger = "0.11.5"
actix-web-prom = "0.9.0"
prometheus = "0.13"
lazy_static = "1.5"
notify = "4.0"
path-absolutize = "3.0"
log="0.4"
env_logger="0.8"
actix-web-prom = "0.5.1"
prometheus = "0.11"
lazy_static = "1.1"
anyhow = "1.0"
rand = "0.8.5"
opentelemetry = { version = "0.30.0", features = ["default", "metrics", "tracing"] }
opentelemetry_sdk = { version = "0.30.0", features = ["default", "rt-tokio-current-thread", "metrics"] }
opentelemetry-otlp = { version = "0.30.0", features = ["default", "metrics", "tracing", "grpc-tonic"] }
opentelemetry-stdout = "0.30.0"
opentelemetry-appender-log = "0.30.0"
tempfile = "3.20.0"
regex = "1.11.1"

2
Jenkinsfile vendored
View File

@@ -1,7 +1,7 @@
pipeline {
agent {
docker {
image 'rust:1.59'
image 'rust:1.51'
args '-v "$PWD":/usr/src/image-api'
}
}

View File

@@ -5,13 +5,11 @@ Upon first run it will generate thumbnails for all images and videos at `BASE_PA
## Environment
There are a handful of required environment variables to have the API run.
They should be defined where the binary is located or above it in an `.env` file.
You must have `ffmpeg` installed for streaming video and generating video thumbnails.
- `DATABASE_URL` is a path or url to a database (currently only SQLite is tested)
- `BASE_PATH` is the root from which you want to serve images and videos
- `THUMBNAILS` is a path where generated thumbnails should be stored
- `VIDEO_PATH` is a path where HLS playlists and video parts should be stored
- `BIND_URL` is the url and port to bind to (typically your own IP address)
- `SECRET_KEY` is the *hopefully* random string to sign Tokens with
- `RUST_LOG` is one of `off, error, warn, info, debug, trace`, from least to most noisy [error is default]
- `EXCLUDED_DIRS` is a comma separated list of directories to exclude from the Memories API

View File

@@ -1,28 +1,23 @@
use actix_web::Responder;
use actix_web::{
HttpResponse,
web::{self, Json},
};
use actix_web::web::{self, HttpResponse, Json};
use actix_web::{post, Responder};
use chrono::{Duration, Utc};
use jsonwebtoken::{EncodingKey, Header, encode};
use log::{error, info};
use std::sync::Mutex;
use jsonwebtoken::{encode, EncodingKey, Header};
use log::{debug, error};
use crate::{
data::{Claims, CreateAccountRequest, LoginRequest, Token, secret_key},
data::{secret_key, Claims, CreateAccountRequest, LoginRequest, Token},
database::UserDao,
};
#[allow(dead_code)]
async fn register<D: UserDao>(
#[post("/register")]
async fn register(
user: Json<CreateAccountRequest>,
user_dao: web::Data<Mutex<D>>,
user_dao: web::Data<Box<dyn UserDao>>,
) -> impl Responder {
if !user.username.is_empty() && user.password.len() > 5 && user.password == user.confirmation {
let mut dao = user_dao.lock().expect("Unable to get UserDao");
if dao.user_exists(&user.username) {
if user_dao.user_exists(&user.username) {
HttpResponse::BadRequest()
} else if let Some(_user) = dao.create_user(&user.username, &user.password) {
} else if let Some(_user) = user_dao.create_user(&user.username, &user.password) {
HttpResponse::Ok()
} else {
HttpResponse::InternalServerError()
@@ -32,13 +27,11 @@ async fn register<D: UserDao>(
}
}
pub async fn login<D: UserDao>(
pub async fn login(
creds: Json<LoginRequest>,
user_dao: web::Data<Mutex<D>>,
user_dao: web::Data<Box<dyn UserDao>>,
) -> HttpResponse {
info!("Logging in: {}", creds.username);
let mut user_dao = user_dao.lock().expect("Unable to get UserDao");
debug!("Logging in: {}", creds.username);
if let Some(user) = user_dao.get_user(&creds.username, &creds.password) {
let claims = Claims {
@@ -64,13 +57,12 @@ pub async fn login<D: UserDao>(
#[cfg(test)]
mod tests {
use super::*;
use crate::testhelpers::{BodyReader, TestUserDao};
#[actix_rt::test]
async fn test_login_reports_200_when_user_exists() {
let mut dao = TestUserDao::new();
let dao = TestUserDao::new();
dao.create_user("user", "pass");
let j = Json(LoginRequest {
@@ -78,14 +70,14 @@ mod tests {
password: "pass".to_string(),
});
let response = login::<TestUserDao>(j, web::Data::new(Mutex::new(dao))).await;
let response = login(j, web::Data::new(Box::new(dao))).await;
assert_eq!(response.status(), 200);
}
#[actix_rt::test]
async fn test_login_returns_token_on_success() {
let mut dao = TestUserDao::new();
let dao = TestUserDao::new();
dao.create_user("user", "password");
let j = Json(LoginRequest {
@@ -93,17 +85,15 @@ mod tests {
password: "password".to_string(),
});
let response = login::<TestUserDao>(j, web::Data::new(Mutex::new(dao))).await;
let response = login(j, web::Data::new(Box::new(dao))).await;
assert_eq!(response.status(), 200);
let response_text: String = response.read_to_str();
assert!(response_text.contains("\"token\""));
assert!(response.body().read_to_str().contains("\"token\""));
}
#[actix_rt::test]
async fn test_login_reports_404_when_user_does_not_exist() {
let mut dao = TestUserDao::new();
let dao = TestUserDao::new();
dao.create_user("user", "password");
let j = Json(LoginRequest {
@@ -111,7 +101,7 @@ mod tests {
password: "password".to_string(),
});
let response = login::<TestUserDao>(j, web::Data::new(Mutex::new(dao))).await;
let response = login(j, web::Data::new(Box::new(dao))).await;
assert_eq!(response.status(), 404);
}

View File

@@ -1,14 +1,14 @@
use std::{fs, str::FromStr};
use anyhow::{Context, anyhow};
use anyhow::{anyhow, Context};
use chrono::{DateTime, Utc};
use log::error;
use actix_web::error::ErrorUnauthorized;
use actix_web::{Error, FromRequest, HttpRequest, dev, http::header};
use futures::future::{Ready, err, ok};
use jsonwebtoken::{Algorithm, DecodingKey, Validation, decode};
use actix_web::{dev, http::header, Error, FromRequest, HttpRequest};
use futures::future::{err, ok, Ready};
use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation};
use serde::{Deserialize, Serialize};
#[derive(Serialize)]
@@ -16,27 +16,12 @@ pub struct Token<'a> {
pub token: &'a str,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct Claims {
pub sub: String,
pub exp: i64,
}
#[cfg(test)]
pub mod helper {
use super::Claims;
use chrono::{Duration, Utc};
impl Claims {
pub fn valid_user(user_id: String) -> Self {
Claims {
sub: user_id,
exp: (Utc::now() + Duration::minutes(1)).timestamp(),
}
}
}
}
pub fn secret_key() -> String {
if cfg!(test) {
String::from("test_key")
@@ -68,6 +53,7 @@ impl FromStr for Claims {
impl FromRequest for Claims {
type Error = Error;
type Future = Ready<Result<Self, Self::Error>>;
type Config = ();
fn from_request(req: &HttpRequest, _payload: &mut dev::Payload) -> Self::Future {
req.headers()
@@ -100,54 +86,10 @@ pub struct PhotosResponse {
pub dirs: Vec<String>,
}
#[derive(Copy, Clone, Deserialize, PartialEq, Debug)]
#[serde(rename_all = "lowercase")]
pub enum SortType {
Shuffle,
NameAsc,
NameDesc,
TagCountAsc,
TagCountDesc,
}
#[derive(Deserialize)]
pub struct FilesRequest {
pub path: String,
// comma separated numbers
pub tag_ids: Option<String>,
pub exclude_tag_ids: Option<String>,
pub tag_filter_mode: Option<FilterMode>,
pub recursive: Option<bool>,
pub sort: Option<SortType>,
}
#[derive(Copy, Clone, Deserialize, PartialEq, Debug)]
pub enum FilterMode {
Any,
All,
}
#[derive(Copy, Clone, Deserialize, PartialEq, Debug)]
#[serde(rename_all = "lowercase")]
pub enum PhotoSize {
Full,
Thumb,
}
#[derive(Debug, Deserialize)]
pub struct ThumbnailRequest {
pub(crate) path: String,
pub(crate) size: Option<PhotoSize>,
#[serde(default)]
pub(crate) format: Option<ThumbnailFormat>,
}
#[derive(Debug, Deserialize, PartialEq)]
pub enum ThumbnailFormat {
#[serde(rename = "gif")]
Gif,
#[serde(rename = "image")]
Image,
pub path: String,
pub size: Option<String>,
}
#[derive(Deserialize)]
@@ -197,11 +139,6 @@ pub struct AddTagRequest {
pub tag_name: String,
}
#[derive(Deserialize)]
pub struct GetTagsRequest {
pub path: Option<String>,
}
#[cfg(test)]
mod tests {
use super::Claims;
@@ -226,8 +163,7 @@ mod tests {
#[test]
fn test_expired_token() {
let err = Claims::from_str(
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI5IiwiZXhwIjoxNn0.eZnfaNfiD54VMbphIqeBICeG9SzAtwNXntLwtTBihjY",
);
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI5IiwiZXhwIjoxNn0.eZnfaNfiD54VMbphIqeBICeG9SzAtwNXntLwtTBihjY");
match err.unwrap_err().into_kind() {
ErrorKind::ExpiredSignature => assert!(true),

View File

@@ -1,8 +1,10 @@
use bcrypt::{DEFAULT_COST, hash, verify};
use bcrypt::{hash, verify, DEFAULT_COST};
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use std::{
ops::Deref,
sync::{Arc, Mutex},
};
use crate::database::models::{Favorite, InsertFavorite, InsertUser, User};
@@ -10,9 +12,9 @@ pub mod models;
pub mod schema;
pub trait UserDao {
fn create_user(&mut self, user: &str, password: &str) -> Option<User>;
fn get_user(&mut self, user: &str, password: &str) -> Option<User>;
fn user_exists(&mut self, user: &str) -> bool;
fn create_user(&self, user: &str, password: &str) -> Option<User>;
fn get_user(&self, user: &str, password: &str) -> Option<User>;
fn user_exists(&self, user: &str) -> bool;
}
pub struct SqliteUserDao {
@@ -27,27 +29,9 @@ impl SqliteUserDao {
}
}
#[cfg(test)]
pub mod test {
use diesel::{Connection, SqliteConnection};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
const DB_MIGRATIONS: EmbeddedMigrations = embed_migrations!();
pub fn in_memory_db_connection() -> SqliteConnection {
let mut connection = SqliteConnection::establish(":memory:")
.expect("Unable to create in-memory db connection");
connection
.run_pending_migrations(DB_MIGRATIONS)
.expect("Failure running DB migrations");
connection
}
}
impl UserDao for SqliteUserDao {
// TODO: Should probably use Result here
fn create_user(&mut self, user: &str, pass: &str) -> Option<User> {
fn create_user(&self, user: &str, pass: &str) -> std::option::Option<User> {
use schema::users::dsl::*;
let hashed = hash(pass, DEFAULT_COST);
@@ -57,12 +41,12 @@ impl UserDao for SqliteUserDao {
username: user,
password: &hash,
})
.execute(&mut self.connection)
.execute(&self.connection)
.unwrap();
users
.filter(username.eq(username))
.load::<User>(&mut self.connection)
.load::<User>(&self.connection)
.unwrap()
.first()
.cloned()
@@ -71,12 +55,12 @@ impl UserDao for SqliteUserDao {
}
}
fn get_user(&mut self, user: &str, pass: &str) -> Option<User> {
fn get_user(&self, user: &str, pass: &str) -> Option<User> {
use schema::users::dsl::*;
match users
.filter(username.eq(user))
.load::<User>(&mut self.connection)
.load::<User>(&self.connection)
.unwrap_or_default()
.first()
{
@@ -85,13 +69,15 @@ impl UserDao for SqliteUserDao {
}
}
fn user_exists(&mut self, user: &str) -> bool {
fn user_exists(&self, user: &str) -> bool {
use schema::users::dsl::*;
!users
users
.filter(username.eq(user))
.load::<User>(&mut self.connection)
.unwrap_or_default().is_empty()
.load::<User>(&self.connection)
.unwrap_or_default()
.first()
.is_some()
}
}
@@ -123,9 +109,9 @@ pub enum DbErrorKind {
}
pub trait FavoriteDao: Sync + Send {
fn add_favorite(&mut self, user_id: i32, favorite_path: &str) -> Result<usize, DbError>;
fn remove_favorite(&mut self, user_id: i32, favorite_path: String);
fn get_favorites(&mut self, user_id: i32) -> Result<Vec<Favorite>, DbError>;
fn add_favorite(&self, user_id: i32, favorite_path: &str) -> Result<usize, DbError>;
fn remove_favorite(&self, user_id: i32, favorite_path: String);
fn get_favorites(&self, user_id: i32) -> Result<Vec<Favorite>, DbError>;
}
pub struct SqliteFavoriteDao {
@@ -141,14 +127,15 @@ impl SqliteFavoriteDao {
}
impl FavoriteDao for SqliteFavoriteDao {
fn add_favorite(&mut self, user_id: i32, favorite_path: &str) -> Result<usize, DbError> {
fn add_favorite(&self, user_id: i32, favorite_path: &str) -> Result<usize, DbError> {
use schema::favorites::dsl::*;
let mut connection = self.connection.lock().expect("Unable to get FavoriteDao");
let connection = self.connection.lock().unwrap();
let connection = connection.deref();
if favorites
.filter(userid.eq(user_id).and(path.eq(&favorite_path)))
.first::<Favorite>(connection.deref_mut())
.first::<Favorite>(connection)
.is_err()
{
diesel::insert_into(favorites)
@@ -156,28 +143,28 @@ impl FavoriteDao for SqliteFavoriteDao {
userid: &user_id,
path: favorite_path,
})
.execute(connection.deref_mut())
.execute(connection)
.map_err(|_| DbError::new(DbErrorKind::InsertError))
} else {
Err(DbError::exists())
}
}
fn remove_favorite(&mut self, user_id: i32, favorite_path: String) {
fn remove_favorite(&self, user_id: i32, favorite_path: String) {
use schema::favorites::dsl::*;
diesel::delete(favorites)
.filter(userid.eq(user_id).and(path.eq(favorite_path)))
.execute(self.connection.lock().unwrap().deref_mut())
.execute(self.connection.lock().unwrap().deref())
.unwrap();
}
fn get_favorites(&mut self, user_id: i32) -> Result<Vec<Favorite>, DbError> {
fn get_favorites(&self, user_id: i32) -> Result<Vec<Favorite>, DbError> {
use schema::favorites::dsl::*;
favorites
.filter(userid.eq(user_id))
.load::<Favorite>(self.connection.lock().unwrap().deref_mut())
.load::<Favorite>(self.connection.lock().unwrap().deref())
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
}

View File

@@ -1,8 +1,8 @@
use crate::database::schema::{favorites, users};
use crate::database::schema::{favorites, tagged_photo, tags, users};
use serde::Serialize;
#[derive(Insertable)]
#[diesel(table_name = users)]
#[table_name = "users"]
pub struct InsertUser<'a> {
pub username: &'a str,
pub password: &'a str,
@@ -17,7 +17,7 @@ pub struct User {
}
#[derive(Insertable)]
#[diesel(table_name = favorites)]
#[table_name = "favorites"]
pub struct InsertFavorite<'a> {
pub userid: &'a i32,
pub path: &'a str,
@@ -29,3 +29,33 @@ pub struct Favorite {
pub userid: i32,
pub path: String,
}
#[derive(Serialize, Queryable, Clone, Debug)]
pub struct Tag {
pub id: i32,
pub name: String,
pub created_time: i64,
}
#[derive(Insertable, Clone, Debug)]
#[table_name = "tags"]
pub struct InsertTag {
pub name: String,
pub created_time: i64,
}
#[derive(Insertable, Clone, Debug)]
#[table_name = "tagged_photo"]
pub struct InsertTaggedPhoto {
pub tag_id: i32,
pub photo_name: String,
pub created_time: i64,
}
#[derive(Queryable, Clone, Debug)]
pub struct TaggedPhoto {
pub id: i32,
pub photo_name: String,
pub tag_id: i32,
pub created_time: i64,
}

View File

@@ -33,4 +33,9 @@ table! {
joinable!(tagged_photo -> tags (tag_id));
allow_tables_to_appear_in_same_query!(favorites, tagged_photo, tags, users,);
allow_tables_to_appear_in_same_query!(
favorites,
tagged_photo,
tags,
users,
);

View File

@@ -1,14 +0,0 @@
use actix_web::{error::InternalError, http::StatusCode};
pub trait IntoHttpError<T> {
fn into_http_internal_err(self) -> Result<T, actix_web::Error>;
}
impl<T> IntoHttpError<T> for Result<T, anyhow::Error> {
fn into_http_internal_err(self) -> Result<T, actix_web::Error> {
self.map_err(|e| {
log::error!("Map to err: {:?}", e);
InternalError::new(e, StatusCode::INTERNAL_SERVER_ERROR).into()
})
}
}

View File

@@ -1,292 +1,70 @@
use std::fmt::Debug;
use std::fs::read_dir;
use std::io;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use ::anyhow;
use actix::{Handler, Message};
use anyhow::{Context, anyhow};
use anyhow::{anyhow, Context};
use crate::data::{Claims, FilesRequest, FilterMode, PhotosResponse, SortType};
use crate::{AppState, create_thumbnails};
use actix_web::web::Data;
use actix_web::{
HttpRequest, HttpResponse,
web::{self, Query},
};
use log::{debug, error, info, trace};
use opentelemetry::KeyValue;
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
use actix_web::web::{HttpResponse, Query};
use log::{debug, error};
use crate::data::{Claims, PhotosResponse, ThumbnailRequest};
use crate::data::SortType::NameAsc;
use crate::error::IntoHttpError;
use crate::otel::{extract_context_from_request, global_tracer};
use crate::tags::{FileWithTagCount, TagDao};
use crate::video::actors::StreamActor;
use path_absolutize::*;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use serde::Deserialize;
pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
_: Claims,
request: HttpRequest,
req: Query<FilesRequest>,
app_state: web::Data<AppState>,
file_system: web::Data<FS>,
tag_dao: web::Data<Mutex<TagD>>,
) -> HttpResponse {
let search_path = &req.path;
pub async fn list_photos(_: Claims, req: Query<ThumbnailRequest>) -> HttpResponse {
let path = &req.path;
if let Some(path) = is_valid_path(path) {
debug!("Valid path: {:?}", path);
let files = list_files(&path).unwrap_or_default();
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("list_photos", &context);
span.set_attributes(vec![
KeyValue::new("path", search_path.to_string()),
KeyValue::new("recursive", req.recursive.unwrap_or(false).to_string()),
KeyValue::new(
"tag_ids",
req.tag_ids.clone().unwrap_or_default().to_string(),
),
KeyValue::new(
"tag_filter_mode",
format!("{:?}", req.tag_filter_mode.unwrap_or(FilterMode::Any)),
),
KeyValue::new(
"exclude_tag_ids",
req.exclude_tag_ids.clone().unwrap_or_default().to_string(),
),
KeyValue::new("sort", format!("{:?}", &req.sort.unwrap_or(NameAsc))),
]);
let span_context = opentelemetry::Context::current_with_span(span);
let search_recursively = req.recursive.unwrap_or(false);
if let Some(tag_ids) = &req.tag_ids
&& search_recursively {
let filter_mode = &req.tag_filter_mode.unwrap_or(FilterMode::Any);
info!(
"Searching for tags: {}. With path: '{}' and filter mode: {:?}",
tag_ids, search_path, filter_mode
);
let mut dao = tag_dao.lock().expect("Unable to get TagDao");
let tag_ids = tag_ids
.split(',')
.filter_map(|t| t.parse().ok())
.collect::<Vec<i32>>();
let exclude_tag_ids = req
.exclude_tag_ids
.clone()
.unwrap_or_default()
.split(',')
.filter_map(|t| t.parse().ok())
.collect::<Vec<i32>>();
return match filter_mode {
FilterMode::Any => {
dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context)
}
FilterMode::All => {
dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context)
}
}
.context(format!(
"Failed to get files with tag_ids: {:?} with filter_mode: {:?}",
tag_ids, filter_mode
))
.inspect(|files| {
info!(
"Found {:?} tagged files, filtering down by search path {:?}",
files.len(),
search_path
let photos = files
.iter()
.filter(|&f| {
f.metadata().map_or_else(
|e| {
error!("Failed getting file metadata: {:?}", e);
false
},
|md| md.is_file(),
)
})
.map(|tagged_files| {
tagged_files
.into_iter()
.filter(|f| {
// When searching at the root, everything matches recursively
if search_path.trim() == "" {
return true;
}
f.file_name.starts_with(&format!(
"{}/",
search_path.strip_suffix('/').unwrap_or_else(|| search_path)
))
})
.collect::<Vec<FileWithTagCount>>()
.map(|path: &PathBuf| {
let relative = path
.strip_prefix(dotenv::var("BASE_PATH").unwrap())
.unwrap();
relative.to_path_buf()
})
.map(|files| sort(files, req.sort.unwrap_or(NameAsc)))
.inspect(|files| debug!("Found {:?} files", files.len()))
.map(|tagged_files: Vec<String>| {
info!(
"Found {:?} tagged files: {:?}",
tagged_files.len(),
tagged_files
);
span_context
.span()
.set_attribute(KeyValue::new("file_count", tagged_files.len().to_string()));
span_context.span().set_status(Status::Ok);
.map(|f| f.to_str().unwrap().to_string())
.collect::<Vec<String>>();
HttpResponse::Ok().json(PhotosResponse {
photos: tagged_files,
dirs: vec![],
})
let dirs = files
.iter()
.filter(|&f| f.metadata().map_or(false, |md| md.is_dir()))
.map(|path: &PathBuf| {
let relative = path
.strip_prefix(dotenv::var("BASE_PATH").unwrap())
.unwrap();
relative.to_path_buf()
})
.into_http_internal_err()
.unwrap_or_else(|e| e.error_response());
}
.map(|f| f.to_str().unwrap().to_string())
.collect::<Vec<String>>();
match file_system.get_files_for_path(search_path) {
Ok(files) => {
info!("Found {:?} files in path: {:?}", files.len(), search_path);
let photos = files
.iter()
.filter(|&f| {
f.metadata().map_or_else(
|e| {
error!("Failed getting file metadata: {:?}", e);
f.extension().is_some()
},
|md| md.is_file(),
)
})
.map(|path: &PathBuf| {
let relative = path.strip_prefix(&app_state.base_path).unwrap();
relative.to_path_buf()
})
.map(|f| f.to_str().unwrap().to_string())
.map(|file_name| {
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
let file_tags = tag_dao
.get_tags_for_path(&span_context, &file_name)
.unwrap_or_default();
(file_name, file_tags)
})
.filter(|(_, file_tags)| {
if let Some(tag_ids) = &req.tag_ids {
let tag_ids = tag_ids
.split(',')
.filter_map(|t| t.parse().ok())
.collect::<Vec<i32>>();
let excluded_tag_ids = &req
.exclude_tag_ids
.clone()
.unwrap_or_default()
.split(',')
.filter_map(|t| t.parse().ok())
.collect::<Vec<i32>>();
let filter_mode = &req.tag_filter_mode.unwrap_or(FilterMode::Any);
let excluded = file_tags.iter().any(|t| excluded_tag_ids.contains(&t.id));
return !excluded
&& match filter_mode {
FilterMode::Any => {
file_tags.iter().any(|t| tag_ids.contains(&t.id))
}
FilterMode::All => tag_ids
.iter()
.all(|id| file_tags.iter().any(|tag| &tag.id == id)),
};
}
true
})
.map(|(file_name, tags)| FileWithTagCount {
file_name,
tag_count: tags.len() as i64,
})
.collect::<Vec<FileWithTagCount>>();
let mut response_files = photos
.clone()
.into_iter()
.map(|f| f.file_name)
.collect::<Vec<String>>();
if let Some(sort_type) = req.sort {
debug!("Sorting files: {:?}", sort_type);
response_files = sort(photos, sort_type)
}
let dirs = files
.iter()
.filter(|&f| f.metadata().is_ok_and(|md| md.is_dir()))
.map(|path: &PathBuf| {
let relative = path.strip_prefix(&app_state.base_path).unwrap();
relative.to_path_buf()
})
.map(|f| f.to_str().unwrap().to_string())
.collect::<Vec<String>>();
span_context
.span()
.set_attribute(KeyValue::new("file_count", files.len().to_string()));
span_context.span().set_status(Status::Ok);
HttpResponse::Ok().json(PhotosResponse {
photos: response_files,
dirs,
})
}
_ => {
error!("Bad photos request: {}", req.path);
span_context
.span()
.set_status(Status::error("Invalid path"));
HttpResponse::BadRequest().finish()
}
HttpResponse::Ok().json(PhotosResponse { photos, dirs })
} else {
error!("Bad photos request: {}", req.path);
HttpResponse::BadRequest().finish()
}
}
fn sort(mut files: Vec<FileWithTagCount>, sort_type: SortType) -> Vec<String> {
match sort_type {
SortType::Shuffle => files.shuffle(&mut thread_rng()),
SortType::NameAsc => {
files.sort_by(|l, r| l.file_name.cmp(&r.file_name));
}
SortType::NameDesc => {
files.sort_by(|l, r| r.file_name.cmp(&l.file_name));
}
SortType::TagCountAsc => {
files.sort_by(|l, r| l.tag_count.cmp(&r.tag_count));
}
SortType::TagCountDesc => {
files.sort_by(|l, r| r.tag_count.cmp(&l.tag_count));
}
}
files
.iter()
.map(|f| f.file_name.clone())
.collect::<Vec<String>>()
}
pub fn list_files(dir: &Path) -> io::Result<Vec<PathBuf>> {
let tracer = global_tracer();
let mut span = tracer.start("list_files");
let dir_name_string = dir.to_str().unwrap_or_default().to_string();
span.set_attribute(KeyValue::new("dir", dir_name_string));
info!("Listing files in: {:?}", dir);
let files = read_dir(dir)?
.filter_map(|res| res.ok())
.filter(|entry| is_image_or_video(&entry.path()) || entry.file_type().unwrap().is_dir())
.map(|entry| entry.path())
.collect::<Vec<PathBuf>>();
span.set_attribute(KeyValue::new("file_count", files.len().to_string()));
span.set_status(Status::Ok);
info!("Found {:?} files in directory: {:?}", files.len(), dir);
Ok(files)
}
@@ -302,27 +80,27 @@ pub fn is_image_or_video(path: &Path) -> bool {
|| extension == "mp4"
|| extension == "mov"
|| extension == "nef"
|| extension == "webp"
}
pub fn is_valid_full_path<P: AsRef<Path> + Debug + AsRef<std::ffi::OsStr>>(
base: &P,
path: &P,
new_file: bool,
) -> Option<PathBuf> {
trace!("is_valid_full_path => Base: {:?}. Path: {:?}", base, path);
pub fn is_valid_path(path: &str) -> Option<PathBuf> {
let base = PathBuf::from(dotenv::var("BASE_PATH").unwrap());
let path = PathBuf::from(&path);
is_valid_full_path(&base, path)
}
fn is_valid_full_path(base: &Path, path: &str) -> Option<PathBuf> {
debug!("Base: {:?}. Path: {}", base, path);
let path = PathBuf::from(path);
let mut path = if path.is_relative() {
let mut full_path = PathBuf::new();
full_path.push(base);
let mut full_path = PathBuf::from(base);
full_path.push(&path);
full_path
} else {
path
};
match is_path_above_base_dir(base, &mut path, new_file) {
match is_path_above_base_dir(base, &mut path) {
Ok(path) => Some(path),
Err(e) => {
error!("{}", e);
@@ -331,18 +109,14 @@ pub fn is_valid_full_path<P: AsRef<Path> + Debug + AsRef<std::ffi::OsStr>>(
}
}
fn is_path_above_base_dir<P: AsRef<Path> + Debug>(
base: P,
full_path: &mut PathBuf,
new_file: bool,
) -> anyhow::Result<PathBuf> {
fn is_path_above_base_dir(base: &Path, full_path: &mut PathBuf) -> anyhow::Result<PathBuf> {
full_path
.absolutize()
.with_context(|| format!("Unable to resolve absolute path: {:?}", full_path))
.map_or_else(
|e| Err(anyhow!(e)),
|p| {
if p.starts_with(base) && (new_file || p.exists()) {
if p.starts_with(base) && p.exists() {
Ok(p.into_owned())
} else if !p.exists() {
Err(anyhow!("Path does not exist: {:?}", p))
@@ -353,174 +127,22 @@ fn is_path_above_base_dir<P: AsRef<Path> + Debug>(
)
}
pub async fn move_file<FS: FileSystemAccess>(
_: Claims,
file_system: web::Data<FS>,
app_state: Data<AppState>,
request: web::Json<MoveFileRequest>,
) -> HttpResponse {
info!("Moving file: {:?}", request);
match is_valid_full_path(&app_state.base_path, &request.source, false)
.ok_or(ErrorKind::InvalidData)
.and_then(|source| {
is_valid_full_path(&app_state.base_path, &request.destination, true)
.ok_or(ErrorKind::InvalidData)
.and_then(|dest| {
if dest.exists() {
error!("Destination already exists, not moving file: {:?}", source);
Err(ErrorKind::AlreadyExists)
} else {
Ok(dest)
}
})
.map(|dest| (source, dest))
})
.map(|(source, dest)| file_system.move_file(source, dest))
{
Ok(_) => {
info!("Moved file: {} -> {}", request.source, request.destination,);
app_state.stream_manager.do_send(RefreshThumbnailsMessage);
HttpResponse::Ok().finish()
}
Err(e) => {
error!(
"Error moving file: {} to: {}. {}",
request.source, request.destination, e
);
if e == ErrorKind::InvalidData {
HttpResponse::BadRequest().finish()
} else {
HttpResponse::InternalServerError().finish()
}
}
}
}
#[derive(Deserialize, Debug)]
pub struct MoveFileRequest {
source: String,
destination: String,
}
pub trait FileSystemAccess {
fn get_files_for_path(&self, path: &str) -> anyhow::Result<Vec<PathBuf>>;
fn move_file<P: AsRef<Path>>(&self, from: P, destination: P) -> anyhow::Result<()>;
}
pub struct RealFileSystem {
base_path: String,
}
impl RealFileSystem {
pub(crate) fn new(base_path: String) -> RealFileSystem {
RealFileSystem { base_path }
}
}
impl FileSystemAccess for RealFileSystem {
fn get_files_for_path(&self, path: &str) -> anyhow::Result<Vec<PathBuf>> {
is_valid_full_path(&PathBuf::from(&self.base_path), &PathBuf::from(path), false)
.map(|path| {
debug!("Valid path: {:?}", path);
list_files(&path).unwrap_or_default()
})
.context("Invalid path")
}
fn move_file<P: AsRef<Path>>(&self, from: P, destination: P) -> anyhow::Result<()> {
info!(
"Moving file: '{:?}' -> '{:?}'",
from.as_ref(),
destination.as_ref()
);
let name = from
.as_ref()
.file_name()
.map(|n| n.to_str().unwrap_or_default().to_string())
.unwrap_or_default();
std::fs::rename(from, destination)
.with_context(|| format!("Failed to move file: {:?}", name))
}
}
pub struct RefreshThumbnailsMessage;
impl Message for RefreshThumbnailsMessage {
type Result = ();
}
impl Handler<RefreshThumbnailsMessage> for StreamActor {
type Result = ();
fn handle(&mut self, _msg: RefreshThumbnailsMessage, _ctx: &mut Self::Context) -> Self::Result {
let tracer = global_tracer();
let _ = tracer.start("RefreshThumbnailsMessage");
info!("Refreshing thumbnails after upload");
create_thumbnails()
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::env;
use std::fs::File;
use super::*;
struct FakeFileSystem {
files: HashMap<String, Vec<String>>,
err: bool,
}
impl FakeFileSystem {
fn with_error() -> FakeFileSystem {
FakeFileSystem {
files: HashMap::new(),
err: true,
}
}
fn new(files: HashMap<String, Vec<String>>) -> FakeFileSystem {
FakeFileSystem { files, err: false }
}
}
impl FileSystemAccess for FakeFileSystem {
fn get_files_for_path(&self, path: &str) -> anyhow::Result<Vec<PathBuf>> {
if self.err {
Err(anyhow!("Error for test"))
} else if let Some(files) = self.files.get(path) {
Ok(files.iter().map(PathBuf::from).collect::<Vec<PathBuf>>())
} else {
Ok(Vec::new())
}
}
fn move_file<P: AsRef<Path>>(&self, from: P, destination: P) -> anyhow::Result<()> {
todo!()
}
}
mod api {
use super::*;
use actix_web::{HttpResponse, web::Query};
use actix_web::{web::Query, HttpResponse};
use super::list_photos;
use crate::{
AppState,
data::{Claims, PhotosResponse},
testhelpers::BodyReader,
data::{Claims, PhotosResponse, ThumbnailRequest},
testhelpers::TypedBodyReader,
};
use crate::database::test::in_memory_db_connection;
use crate::tags::SqliteTagDao;
use actix_web::test::TestRequest;
use actix_web::web::Data;
use std::fs;
fn setup() {
@@ -536,9 +158,10 @@ mod tests {
exp: 12345,
};
let request: Query<FilesRequest> = Query::from_query("path=").unwrap();
let request: Query<ThumbnailRequest> = Query::from_query("path=").unwrap();
let mut temp_photo = env::temp_dir();
std::env::set_var("BASE_PATH", "/tmp");
let mut temp_photo = std::env::temp_dir();
let mut tmp = temp_photo.clone();
tmp.push("test-dir");
@@ -546,34 +169,23 @@ mod tests {
temp_photo.push("photo.jpg");
File::create(temp_photo.clone()).unwrap();
fs::File::create(temp_photo).unwrap();
let response: HttpResponse = list_photos(
claims,
TestRequest::default().to_http_request(),
request,
Data::new(AppState::test_state()),
Data::new(RealFileSystem::new(String::from("/tmp"))),
Data::new(Mutex::new(SqliteTagDao::default())),
)
.await;
let status = response.status();
assert_eq!(status, 200);
let response: HttpResponse = list_photos(claims, request).await;
let body: PhotosResponse = serde_json::from_str(&response.read_to_str()).unwrap();
debug!("{:?}", body);
let body: PhotosResponse = response.body().read_body();
assert_eq!(response.status(), 200);
assert!(body.photos.contains(&String::from("photo.jpg")));
assert!(body.dirs.contains(&String::from("test-dir")));
assert!(
body.photos
.iter()
.filter(|filename| !filename.ends_with(".png")
&& !filename.ends_with(".jpg")
&& !filename.ends_with(".jpeg"))
.collect::<Vec<&String>>()
.is_empty()
);
assert!(body
.photos
.iter()
.filter(|filename| !filename.ends_with(".png")
&& !filename.ends_with(".jpg")
&& !filename.ends_with(".jpeg"))
.collect::<Vec<&String>>()
.is_empty());
}
#[actix_rt::test]
@@ -585,173 +197,22 @@ mod tests {
exp: 12345,
};
let request: Query<FilesRequest> = Query::from_query("path=..").unwrap();
let request: Query<ThumbnailRequest> = Query::from_query("path=..").unwrap();
let response = list_photos(
claims,
TestRequest::default().to_http_request(),
request,
Data::new(AppState::test_state()),
Data::new(RealFileSystem::new(String::from("./"))),
Data::new(Mutex::new(SqliteTagDao::default())),
)
.await;
let response = list_photos(claims, request).await;
assert_eq!(response.status(), 400);
}
#[actix_rt::test]
async fn get_files_with_tag_any_filter() {
setup();
let claims = Claims {
sub: String::from("1"),
exp: 12345,
};
let request: Query<FilesRequest> = Query::from_query("path=&tag_ids=1,3").unwrap();
let mut tag_dao = SqliteTagDao::new(in_memory_db_connection());
let tag1 = tag_dao
.create_tag(&opentelemetry::Context::current(), "tag1")
.unwrap();
let _tag2 = tag_dao
.create_tag(&opentelemetry::Context::current(), "tag2")
.unwrap();
let tag3 = tag_dao
.create_tag(&opentelemetry::Context::current(), "tag3")
.unwrap();
let _ = &tag_dao
.tag_file(&opentelemetry::Context::current(), "test.jpg", tag1.id)
.unwrap();
let _ = &tag_dao
.tag_file(&opentelemetry::Context::current(), "test.jpg", tag3.id)
.unwrap();
let mut files = HashMap::new();
files.insert(
String::from(""),
vec![
String::from("file1.txt"),
String::from("test.jpg"),
String::from("some-other.jpg"),
],
);
let response: HttpResponse = list_photos(
claims,
TestRequest::default().to_http_request(),
request,
Data::new(AppState::test_state()),
Data::new(FakeFileSystem::new(files)),
Data::new(Mutex::new(tag_dao)),
)
.await;
assert_eq!(200, response.status());
let body: PhotosResponse = serde_json::from_str(&response.read_to_str()).unwrap();
assert_eq!(1, body.photos.len());
assert!(body.photos.contains(&String::from("test.jpg")));
}
#[actix_rt::test]
async fn get_files_with_tag_all_filter() {
setup();
let claims = Claims {
sub: String::from("1"),
exp: 12345,
};
let mut tag_dao = SqliteTagDao::new(in_memory_db_connection());
let tag1 = tag_dao
.create_tag(&opentelemetry::Context::current(), "tag1")
.unwrap();
let _tag2 = tag_dao
.create_tag(&opentelemetry::Context::current(), "tag2")
.unwrap();
let tag3 = tag_dao
.create_tag(&opentelemetry::Context::current(), "tag3")
.unwrap();
let _ = &tag_dao
.tag_file(&opentelemetry::Context::current(), "test.jpg", tag1.id)
.unwrap();
let _ = &tag_dao
.tag_file(&opentelemetry::Context::current(), "test.jpg", tag3.id)
.unwrap();
// Should get filtered since it doesn't have tag3
tag_dao
.tag_file(
&opentelemetry::Context::current(),
"some-other.jpg",
tag1.id,
)
.unwrap();
let mut files = HashMap::new();
files.insert(
String::from(""),
vec![
String::from("file1.txt"),
String::from("test.jpg"),
String::from("some-other.jpg"),
],
);
let request: Query<FilesRequest> = Query::from_query(&format!(
"path=&tag_ids={},{}&tag_filter_mode=All",
tag1.id, tag3.id
))
.unwrap();
let response: HttpResponse = list_photos(
claims,
TestRequest::default().to_http_request(),
request,
Data::new(AppState::test_state()),
Data::new(FakeFileSystem::new(files)),
Data::new(Mutex::new(tag_dao)),
)
.await;
assert_eq!(200, response.status());
let body: PhotosResponse = serde_json::from_str(&response.read_to_str()).unwrap();
assert_eq!(1, body.photos.len());
assert!(body.photos.contains(&String::from("test.jpg")));
}
}
#[test]
fn directory_traversal_test() {
let base = env::temp_dir();
assert_eq!(
None,
is_valid_full_path(&base, &PathBuf::from("../"), false)
);
assert_eq!(None, is_valid_full_path(&base, &PathBuf::from(".."), false));
assert_eq!(
None,
is_valid_full_path(&base, &PathBuf::from("fake/../../../"), false)
);
assert_eq!(
None,
is_valid_full_path(&base, &PathBuf::from("../../../etc/passwd"), false)
);
assert_eq!(
None,
is_valid_full_path(&base, &PathBuf::from("..//etc/passwd"), false)
);
assert_eq!(
None,
is_valid_full_path(&base, &PathBuf::from("../../etc/passwd"), false)
);
assert_eq!(None, is_valid_path("../"));
assert_eq!(None, is_valid_path(".."));
assert_eq!(None, is_valid_path("fake/../../../"));
assert_eq!(None, is_valid_path("../../../etc/passwd"));
assert_eq!(None, is_valid_path("..//etc/passwd"));
assert_eq!(None, is_valid_path("../../etc/passwd"));
}
#[test]
@@ -761,7 +222,7 @@ mod tests {
test_file.push("test.png");
File::create(test_file).unwrap();
assert!(is_valid_full_path(&base, &PathBuf::from("test.png"), false).is_some());
assert!(is_valid_full_path(&base, "test.png").is_some());
}
#[test]
@@ -772,7 +233,7 @@ mod tests {
let mut test_file = PathBuf::from(&base);
test_file.push(path);
assert_eq!(None, is_valid_full_path(&base, &test_file, false));
assert_eq!(None, is_valid_full_path(&base, path));
}
#[test]
@@ -782,11 +243,11 @@ mod tests {
test_file.push("test.png");
File::create(&test_file).unwrap();
assert!(is_valid_full_path(&base, &test_file, false).is_some());
assert!(is_valid_full_path(&base, test_file.to_str().unwrap()).is_some());
assert_eq!(
Some(test_file.clone()),
is_valid_full_path(&base, &test_file, false)
Some(PathBuf::from("/tmp/test.png")),
is_valid_full_path(&base, "/tmp/test.png")
);
}

View File

@@ -2,15 +2,12 @@
extern crate diesel;
extern crate rayon;
use actix_web::web::Data;
use actix_web_prom::PrometheusMetricsBuilder;
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
use actix_web_prom::PrometheusMetrics;
use chrono::Utc;
use futures::stream::StreamExt;
use lazy_static::lazy_static;
use prometheus::{self, IntGauge};
use std::error::Error;
use std::sync::Mutex;
use std::sync::mpsc::channel;
use std::sync::{mpsc::channel, Arc};
use std::{collections::HashMap, io::prelude::*};
use std::{env, fs::File};
use std::{
@@ -19,48 +16,35 @@ use std::{
};
use walkdir::{DirEntry, WalkDir};
use actix::prelude::*;
use actix_files::NamedFile;
use actix_multipart as mp;
use actix_web::{
App, HttpRequest, HttpResponse, HttpServer, Responder, delete, get, middleware, post, put,
web::{self, BufMut, BytesMut},
delete,
error::BlockingError,
get, middleware, post, put,
web::{self, BufMut, BytesMut, HttpRequest, HttpResponse},
App, HttpServer, Responder,
};
use anyhow::Context;
use chrono::Utc;
use diesel::sqlite::Sqlite;
use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use diesel::prelude::*;
use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
use rayon::prelude::*;
use log::{debug, error, info};
use crate::auth::login;
use crate::data::*;
use crate::database::*;
use crate::files::{
RealFileSystem, RefreshThumbnailsMessage, is_image_or_video, is_valid_full_path, move_file,
};
use crate::otel::{extract_context_from_request, global_tracer};
use crate::service::ServiceBuilder;
use crate::state::AppState;
use crate::tags::*;
use crate::video::actors::{
ProcessMessage, ScanDirectoryMessage, create_playlist, generate_video_thumbnail,
};
use crate::video::generate_video_gifs;
use log::{debug, error, info, trace, warn};
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
use opentelemetry::{KeyValue, global};
use crate::files::{is_image_or_video, is_valid_path};
use crate::models::{InsertTag, InsertTaggedPhoto, Tag, TaggedPhoto};
use crate::video::*;
mod auth;
mod data;
mod database;
mod error;
pub mod database;
mod files;
mod state;
mod tags;
mod video;
mod memories;
mod otel;
mod service;
#[cfg(test)]
mod testhelpers;
@@ -77,115 +61,57 @@ lazy_static! {
.unwrap();
}
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
#[get("/image")]
async fn get_image(
_claims: Claims,
request: HttpRequest,
req: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_image", &context);
if let Some(path) = is_valid_full_path(&app_state.base_path, &req.path, false) {
let image_size = req.size.unwrap_or(PhotoSize::Full);
if image_size == PhotoSize::Thumb {
if let Some(path) = is_valid_path(&req.path) {
if req.size.is_some() {
let thumbs = dotenv::var("THUMBNAILS").unwrap();
let relative_path = path
.strip_prefix(&app_state.base_path)
.expect("Error stripping base path prefix from thumbnail");
.strip_prefix(dotenv::var("BASE_PATH").unwrap())
.expect("Error stripping prefix");
let thumb_path = Path::new(&thumbs).join(relative_path);
let thumbs = &app_state.thumbnail_path;
let mut thumb_path = Path::new(&thumbs).join(relative_path);
// If it's a video and GIF format is requested, try to serve GIF thumbnail
if req.format == Some(ThumbnailFormat::Gif) && is_video_file(&path) {
thumb_path = Path::new(&app_state.gif_path).join(relative_path);
thumb_path.set_extension("gif");
}
trace!("Thumbnail path: {:?}", thumb_path);
debug!("{:?}", thumb_path);
if let Ok(file) = NamedFile::open(&thumb_path) {
span.set_status(Status::Ok);
// The NamedFile will automatically set the correct content-type
return file.into_response(&request);
file.into_response(&request).unwrap()
} else {
HttpResponse::NotFound().finish()
}
} else if let Ok(file) = NamedFile::open(path) {
file.into_response(&request).unwrap()
} else {
HttpResponse::NotFound().finish()
}
if let Ok(file) = NamedFile::open(&path) {
span.set_status(Status::Ok);
return file.into_response(&request);
}
span.set_status(Status::error("Not found"));
HttpResponse::NotFound().finish()
} else {
span.set_status(Status::error("Bad photos request"));
error!("Bad photos request: {}", req.path);
HttpResponse::BadRequest().finish()
}
}
fn is_video_file(path: &Path) -> bool {
if let Some(extension) = path.extension() {
matches!(
extension.to_str().unwrap_or("").to_lowercase().as_str(),
"mp4" | "mov" | "avi" | "mkv"
)
} else {
false
}
}
#[get("/image/metadata")]
async fn get_file_metadata(
_: Claims,
request: HttpRequest,
path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_file_metadata", &context);
match is_valid_full_path(&app_state.base_path, &path.path, false)
async fn get_file_metadata(_: Claims, path: web::Query<ThumbnailRequest>) -> impl Responder {
match is_valid_path(&path.path)
.ok_or_else(|| ErrorKind::InvalidData.into())
.and_then(File::open)
.and_then(|file| file.metadata())
{
Ok(metadata) => {
let response: MetadataResponse = metadata.into();
span.add_event(
"Metadata fetched",
vec![KeyValue::new("file", path.path.clone())],
);
span.set_status(Status::Ok);
HttpResponse::Ok().json(response)
}
Err(e) => {
let message = format!("Error getting metadata for file '{}': {:?}", path.path, e);
error!("{}", message);
span.set_status(Status::error(message));
error!("Error getting metadata for file '{}': {:?}", path.path, e);
HttpResponse::InternalServerError().finish()
}
}
}
#[post("/image")]
async fn upload_image(
_: Claims,
request: HttpRequest,
mut payload: mp::Multipart,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("upload_image", &context);
async fn upload_image(_: Claims, mut payload: mp::Multipart) -> impl Responder {
let mut file_content: BytesMut = BytesMut::new();
let mut file_name: Option<String> = None;
let mut file_path: Option<String> = None;
@@ -200,7 +126,7 @@ async fn upload_image(
while let Some(Ok(data)) = part.next().await {
file_content.put(data);
}
} else if content_type.get_name() == Some("path") {
} else if content_type.get_name().map_or(false, |name| name == "path") {
while let Some(Ok(data)) = part.next().await {
if let Ok(path) = std::str::from_utf8(&data) {
file_path = Some(path.to_string())
@@ -210,102 +136,50 @@ async fn upload_image(
}
}
let path = file_path.unwrap_or_else(|| app_state.base_path.clone());
let path = file_path.unwrap_or_else(|| dotenv::var("BASE_PATH").unwrap());
if !file_content.is_empty() {
let full_path = PathBuf::from(&path).join(file_name.unwrap());
if let Some(full_path) = is_valid_full_path(
&app_state.base_path,
&full_path.to_str().unwrap().to_string(),
true,
) {
let context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
tracer
.span_builder("file write")
.start_with_context(&tracer, &context);
if let Some(full_path) = is_valid_path(full_path.to_str().unwrap_or("")) {
if !full_path.is_file() && is_image_or_video(&full_path) {
let mut file = File::create(&full_path).unwrap();
let mut file = File::create(full_path).unwrap();
file.write_all(&file_content).unwrap();
info!("Uploaded: {:?}", full_path);
} else {
warn!("File already exists: {:?}", full_path);
let new_path = format!(
"{}/{}_{}.{}",
full_path.parent().unwrap().to_str().unwrap(),
full_path.file_stem().unwrap().to_str().unwrap(),
Utc::now().timestamp(),
full_path
.extension()
.expect("Uploaded file should have an extension")
.to_str()
.unwrap()
);
info!("Uploaded: {}", new_path);
let mut file = File::create(new_path).unwrap();
file.write_all(&file_content).unwrap();
error!("File already exists: {:?}", full_path);
return HttpResponse::BadRequest().body("File already exists");
}
} else {
error!("Invalid path for upload: {:?}", full_path);
span.set_status(Status::error("Invalid path for upload"));
return HttpResponse::BadRequest().body("Path was not valid");
}
} else {
span.set_status(Status::error("No file body read"));
return HttpResponse::BadRequest().body("No file body read");
}
app_state.stream_manager.do_send(RefreshThumbnailsMessage);
span.set_status(Status::Ok);
HttpResponse::Ok().finish()
}
#[post("/video/generate")]
async fn generate_video(
_claims: Claims,
request: HttpRequest,
app_state: Data<AppState>,
data: web::Data<AppState>,
body: web::Json<ThumbnailRequest>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("generate_video", &context);
let filename = PathBuf::from(&body.path);
if let Some(name) = filename.file_name() {
if let Some(name) = filename.file_stem() {
let filename = name.to_str().expect("Filename should convert to string");
let playlist = format!("{}/{}.m3u8", app_state.video_path, filename);
if let Some(path) = is_valid_full_path(&app_state.base_path, &body.path, false) {
let playlist = format!("tmp/{}.m3u8", filename);
if let Some(path) = is_valid_path(&body.path) {
if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await {
span.add_event(
"playlist_created".to_string(),
vec![KeyValue::new("playlist-name", filename.to_string())],
);
span.set_status(Status::Ok);
app_state.stream_manager.do_send(ProcessMessage(
playlist.clone(),
child,
// opentelemetry::Context::new().with_span(span),
));
data.stream_manager
.do_send(ProcessMessage(playlist.clone(), child));
}
} else {
span.set_status(Status::error(format!("invalid path {:?}", &body.path)));
return HttpResponse::BadRequest().finish();
}
HttpResponse::Ok().json(playlist)
} else {
let message = format!("Unable to get file name: {:?}", filename);
error!("{}", message);
span.set_status(Status::error(message));
error!("Unable to get file name: {:?}", filename);
HttpResponse::BadRequest().finish()
}
}
@@ -315,33 +189,17 @@ async fn stream_video(
request: HttpRequest,
_: Claims,
path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global::tracer("image-server");
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("stream_video", &context);
let playlist = &path.path;
debug!("Playlist: {}", playlist);
// Extract video playlist dir to dotenv
if !playlist.starts_with(&app_state.video_path)
&& is_valid_full_path(&app_state.base_path, playlist, false).is_some()
{
span.set_status(Status::error(format!("playlist not valid {}", playlist)));
if !playlist.starts_with("tmp") && is_valid_path(playlist) != None {
HttpResponse::BadRequest().finish()
} else if let Ok(file) = NamedFile::open(playlist) {
file.into_response(&request).unwrap()
} else {
match NamedFile::open(playlist) {
Ok(file) => {
span.set_status(Status::Ok);
file.into_response(&request)
}
_ => {
span.set_status(Status::error(format!("playlist not found {}", playlist)));
HttpResponse::NotFound().finish()
}
}
HttpResponse::NotFound().finish()
}
}
@@ -350,106 +208,60 @@ async fn get_video_part(
request: HttpRequest,
_: Claims,
path: web::Path<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_video_part", &context);
let part = &path.path;
debug!("Video part: {}", part);
let mut file_part = PathBuf::new();
file_part.push(app_state.video_path.clone());
file_part.push(part);
// TODO: Do we need to guard against directory attacks here?
match NamedFile::open(&file_part) {
Ok(file) => {
span.set_status(Status::Ok);
file.into_response(&request)
}
_ => {
error!("Video part not found: {:?}", file_part);
span.set_status(Status::error(format!(
"Video part not found '{}'",
file_part.to_str().unwrap()
)));
HttpResponse::NotFound().finish()
}
if let Ok(file) = NamedFile::open(String::from("tmp/") + part) {
file.into_response(&request).unwrap()
} else {
error!("Video part not found: tmp/{}", part);
HttpResponse::NotFound().finish()
}
}
#[get("image/favorites")]
async fn favorites(
claims: Claims,
request: HttpRequest,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
favorites_dao: web::Data<Box<dyn FavoriteDao>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get favorites", &context);
let favorites =
web::block(move || favorites_dao.get_favorites(claims.sub.parse::<i32>().unwrap()))
.await
.unwrap()
.into_iter()
.map(|favorite| favorite.path)
.collect::<Vec<String>>();
match web::block(move || {
favorites_dao
.lock()
.expect("Unable to get FavoritesDao")
.get_favorites(claims.sub.parse::<i32>().unwrap())
HttpResponse::Ok().json(PhotosResponse {
photos: favorites,
dirs: Vec::new(),
})
.await
{
Ok(Ok(favorites)) => {
let favorites = favorites
.into_iter()
.map(|favorite| favorite.path)
.collect::<Vec<String>>();
span.set_status(Status::Ok);
HttpResponse::Ok().json(PhotosResponse {
photos: favorites,
dirs: Vec::new(),
})
}
Ok(Err(e)) => {
span.set_status(Status::error(format!("Error getting favorites: {:?}", e)));
error!("Error getting favorites: {:?}", e);
HttpResponse::InternalServerError().finish()
}
Err(_) => HttpResponse::InternalServerError().finish(),
}
}
#[put("image/favorites")]
async fn put_add_favorite(
claims: Claims,
body: web::Json<AddFavoriteRequest>,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
favorites_dao: web::Data<Box<dyn FavoriteDao>>,
) -> impl Responder {
if let Ok(user_id) = claims.sub.parse::<i32>() {
let path = body.path.clone();
match web::block::<_, Result<usize, DbError>>(move || {
favorites_dao
.lock()
.expect("Unable to get FavoritesDao")
.add_favorite(user_id, &path)
})
.await
match web::block::<_, usize, DbError>(move || favorites_dao.add_favorite(user_id, &path))
.await
{
Ok(Err(e)) if e.kind == DbErrorKind::AlreadyExists => {
warn!("Favorite: {} exists for user: {}", &body.path, user_id);
Err(BlockingError::Error(e)) if e.kind == DbErrorKind::AlreadyExists => {
debug!("Favorite: {} exists for user: {}", &body.path, user_id);
HttpResponse::Ok()
}
Ok(Err(e)) => {
Err(e) => {
error!("{:?} {}. for user: {}", e, body.path, user_id);
HttpResponse::BadRequest()
}
Ok(Ok(_)) => {
Ok(_) => {
info!("Adding favorite \"{}\" for userid: {}", body.path, user_id);
HttpResponse::Created()
}
Err(e) => {
error!("Blocking error while inserting favorite: {:?}", e);
HttpResponse::InternalServerError()
}
}
} else {
error!("Unable to parse sub as i32: {}", claims.sub);
@@ -461,15 +273,13 @@ async fn put_add_favorite(
async fn delete_favorite(
claims: Claims,
body: web::Query<AddFavoriteRequest>,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
favorites_dao: web::Data<Box<dyn FavoriteDao>>,
) -> impl Responder {
if let Ok(user_id) = claims.sub.parse::<i32>() {
let path = body.path.clone();
web::block(move || {
favorites_dao
.lock()
.expect("Unable to get favorites dao")
.remove_favorite(user_id, path);
web::block::<_, _, String>(move || {
favorites_dao.remove_favorite(user_id, path);
Ok(())
})
.await
.unwrap();
@@ -485,48 +295,146 @@ async fn delete_favorite(
}
}
fn create_thumbnails() {
let tracer = global_tracer();
let span = tracer.start("creating thumbnails");
#[post("image/tags")]
async fn add_tag(_: Claims, body: web::Json<AddTagRequest>) -> impl Responder {
let tag = body.tag_name.clone();
use database::schema::tags;
let connection = &connect();
match tags::table
.filter(tags::name.eq(&tag))
.get_result::<Tag>(connection)
.optional()
.and_then(|t| {
if let Some(t) = t {
Ok(t.id)
} else {
match diesel::insert_into(tags::table)
.values(InsertTag {
name: tag.clone(),
created_time: Utc::now().timestamp(),
})
.execute(connection)
.and_then(|_| {
no_arg_sql_function!(
last_insert_rowid,
diesel::sql_types::Integer,
"Represents the SQL last_insert_row() function"
);
diesel::select(last_insert_rowid).get_result::<i32>(connection)
}) {
Err(e) => {
error!("Error inserting tag: '{}'. {:?}", tag, e);
Err(e)
}
Ok(id) => {
info!("Inserted tag: '{}' with id: {:?}", tag, id);
Ok(id)
}
}
}
})
.map(|tag_id| {
use database::schema::tagged_photo;
let file_name = body.file_name.clone();
match tagged_photo::table
.filter(tagged_photo::photo_name.eq(&file_name))
.filter(tagged_photo::tag_id.eq(tag_id))
.get_result::<TaggedPhoto>(connection)
.optional()
{
Ok(Some(_)) => HttpResponse::NoContent(),
Ok(None) => diesel::insert_into(tagged_photo::table)
.values(InsertTaggedPhoto {
tag_id,
photo_name: file_name.clone(),
created_time: Utc::now().timestamp(),
})
.execute(connection)
.map(|_| {
info!("Inserted tagged photo: {} -> '{}'", tag_id, file_name);
HttpResponse::Created()
})
.unwrap_or_else(|e| {
error!(
"Error inserting tagged photo: '{}' -> '{}'. {:?}",
tag_id, body.file_name, e
);
HttpResponse::InternalServerError()
}),
Err(e) => {
error!("Error querying tagged photo: {:?}", e);
HttpResponse::InternalServerError()
}
}
}) {
Ok(resp) => resp,
Err(e) => {
error!("{:?}", e);
HttpResponse::InternalServerError()
}
}
}
#[get("image/tags")]
async fn get_tags(_: Claims, request: web::Query<ThumbnailRequest>) -> impl Responder {
use schema::tagged_photo;
use schema::tags;
match tags::table
.left_join(tagged_photo::table)
.filter(tagged_photo::photo_name.eq(&request.path))
.select((tags::id, tags::name, tags::created_time))
.get_results::<Tag>(&connect())
{
Ok(tags) => HttpResponse::Ok().json(tags),
Err(e) => {
error!("Error getting tags for image: '{}'. {:?}", request.path, e);
HttpResponse::InternalServerError().finish()
}
}
}
fn create_thumbnails() {
let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
let thumbnail_directory: &Path = Path::new(thumbs);
let images = PathBuf::from(dotenv::var("BASE_PATH").unwrap());
WalkDir::new(&images)
walkdir::WalkDir::new(&images)
.into_iter()
.collect::<Vec<Result<_, _>>>()
.into_par_iter()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.file_type().is_file())
.filter(|entry| {
if is_video(entry) {
let relative_path = &entry.path().strip_prefix(&images).unwrap();
let thumb_path = Path::new(thumbnail_directory).join(relative_path);
std::fs::create_dir_all(
thumb_path
.parent()
.unwrap_or_else(|| panic!("Thumbnail {:?} has no parent?", thumb_path)),
)
.expect("Error creating directory");
debug!("{:?}", entry.path());
if let Some(ext) = entry
.path()
.extension()
.and_then(|ext| ext.to_str().map(|ext| ext.to_lowercase()))
{
if ext == "mp4" || ext == "mov" {
let relative_path = &entry.path().strip_prefix(&images).unwrap();
let thumb_path = Path::new(thumbnail_directory).join(relative_path);
std::fs::create_dir_all(&thumb_path.parent().unwrap())
.expect("Error creating directory");
let mut video_span = tracer.start_with_context(
"generate_video_thumbnail",
&opentelemetry::Context::new()
.with_remote_span_context(span.span_context().clone()),
);
video_span.set_attributes(vec![
KeyValue::new("type", "video"),
KeyValue::new("file-name", thumb_path.display().to_string()),
]);
debug!("Generating video thumbnail: {:?}", thumb_path);
generate_video_thumbnail(entry.path(), &thumb_path);
video_span.end();
false
debug!("Generating video thumbnail: {:?}", thumb_path);
generate_video_thumbnail(entry.path(), &thumb_path);
false
} else {
is_image(entry)
}
} else {
is_image(entry)
error!("Unable to get extension for file: {:?}", entry.path());
false
}
})
.filter(|entry| {
@@ -547,9 +455,9 @@ fn create_thumbnails() {
.map(|(image, path)| {
let relative_path = &path.strip_prefix(&images).unwrap();
let thumb_path = Path::new(thumbnail_directory).join(relative_path);
std::fs::create_dir_all(thumb_path.parent().unwrap())
std::fs::create_dir_all(&thumb_path.parent().unwrap())
.expect("There was an issue creating directory");
info!("Saving thumbnail: {:?}", thumb_path);
debug!("Saving thumbnail: {:?}", thumb_path);
image.save(thumb_path).expect("Failure saving thumbnail");
})
.for_each(drop);
@@ -579,7 +487,6 @@ fn is_image(entry: &DirEntry) -> bool {
.path()
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext.to_lowercase())
.map(|ext| ext == "jpg" || ext == "jpeg" || ext == "png" || ext == "nef")
.unwrap_or(false)
}
@@ -589,165 +496,107 @@ fn is_video(entry: &DirEntry) -> bool {
.path()
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext.to_lowercase())
.map(|ext| ext == "mp4" || ext == "mov")
.unwrap_or(false)
}
fn main() -> std::io::Result<()> {
if let Err(err) = dotenv::dotenv() {
println!("Error parsing .env {:?}", err);
}
dotenv::dotenv().ok();
env_logger::init();
run_migrations(&mut connect()).expect("Failed to run migrations");
create_thumbnails();
watch_files();
let system = actix::System::new();
system.block_on(async {
// Just use basic logger when running a non-release build
#[cfg(debug_assertions)]
{
env_logger::init();
}
#[cfg(not(debug_assertions))]
{
otel::init_logs();
otel::init_tracing();
}
create_thumbnails();
generate_video_gifs().await;
let app_data = Data::new(AppState::default());
let labels = HashMap::new();
let prometheus = PrometheusMetricsBuilder::new("api")
.const_labels(labels)
.build()
.expect("Unable to build prometheus metrics middleware");
prometheus
.registry
.register(Box::new(IMAGE_GAUGE.clone()))
.unwrap();
prometheus
.registry
.register(Box::new(VIDEO_GAUGE.clone()))
.unwrap();
let app_state = app_data.clone();
app_state.playlist_manager.do_send(ScanDirectoryMessage {
directory: app_state.base_path.clone(),
});
HttpServer::new(move || {
let user_dao = SqliteUserDao::new();
let favorites_dao = SqliteFavoriteDao::new();
let tag_dao = SqliteTagDao::default();
App::new()
.wrap(middleware::Logger::default())
.service(web::resource("/login").route(web::post().to(login::<SqliteUserDao>)))
.service(
web::resource("/photos")
.route(web::get().to(files::list_photos::<SqliteTagDao, RealFileSystem>)),
)
.service(web::resource("/file/move").post(move_file::<RealFileSystem>))
.service(get_image)
.service(upload_image)
.service(generate_video)
.service(stream_video)
.service(get_video_part)
.service(favorites)
.service(put_add_favorite)
.service(delete_favorite)
.service(get_file_metadata)
.service(memories::list_memories)
.add_feature(add_tag_services::<_, SqliteTagDao>)
.app_data(app_data.clone())
.app_data::<Data<RealFileSystem>>(Data::new(RealFileSystem::new(
app_data.base_path.clone(),
)))
.app_data::<Data<Mutex<SqliteUserDao>>>(Data::new(Mutex::new(user_dao)))
.app_data::<Data<Mutex<Box<dyn FavoriteDao>>>>(Data::new(Mutex::new(Box::new(
favorites_dao,
))))
.app_data::<Data<Mutex<SqliteTagDao>>>(Data::new(Mutex::new(tag_dao)))
.wrap(prometheus.clone())
})
.bind(dotenv::var("BIND_URL").unwrap())?
.bind("localhost:8088")?
.run()
.await
})
}
fn run_migrations(
connection: &mut impl MigrationHarness<Sqlite>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
connection.run_pending_migrations(MIGRATIONS)?;
Ok(())
}
fn watch_files() {
std::thread::spawn(|| {
let (wtx, wrx) = channel();
let mut watcher = RecommendedWatcher::new(wtx, Config::default()).unwrap();
let base_str = dotenv::var("BASE_PATH").unwrap();
let base_path = Path::new(&base_str);
let mut watcher = watcher(wtx, std::time::Duration::from_secs(10)).unwrap();
watcher
.watch(base_path, RecursiveMode::Recursive)
.context(format!("Unable to watch BASE_PATH: '{}'", base_str))
.watch(dotenv::var("BASE_PATH").unwrap(), RecursiveMode::Recursive)
.unwrap();
loop {
let ev = wrx.recv();
if let Ok(Ok(event)) = ev {
match event.kind {
EventKind::Create(create_kind) => {
info!(
"Creating thumbnails {:?} create event kind: {:?}",
event.paths, create_kind
);
create_thumbnails();
}
EventKind::Modify(kind) => {
debug!("All modified paths: {:?}", event.paths);
debug!("Modify kind: {:?}", kind);
if let Ok(event) = ev {
match event {
DebouncedEvent::Create(_) => create_thumbnails(),
DebouncedEvent::Rename(orig, _) | DebouncedEvent::Write(orig) => {
let image_base_path = PathBuf::from(env::var("BASE_PATH").unwrap());
let image_relative = orig.strip_prefix(&image_base_path).unwrap();
if let Ok(old_thumbnail) =
env::var("THUMBNAILS").map(PathBuf::from).map(|mut base| {
base.push(image_relative);
base
})
{
if let Err(e) = std::fs::remove_file(&old_thumbnail) {
error!(
"Error removing thumbnail: {}\n{}",
old_thumbnail.display(),
e
);
} else {
info!("Deleted moved thumbnail: {}", old_thumbnail.display());
if let Some(orig) = event.paths.first() {
let image_base_path = PathBuf::from(env::var("BASE_PATH").unwrap());
let image_relative = orig.strip_prefix(&image_base_path).unwrap();
if let Ok(old_thumbnail) =
env::var("THUMBNAILS").map(PathBuf::from).map(|mut base| {
base.push(image_relative);
base
})
{
if let Err(e) = std::fs::remove_file(&old_thumbnail) {
error!(
"Error removing thumbnail: {}\n{}",
old_thumbnail.display(),
e
);
} else {
info!("Deleted moved thumbnail: {}", old_thumbnail.display());
create_thumbnails();
}
create_thumbnails();
}
}
}
EventKind::Remove(_) => {
DebouncedEvent::Remove(_) => {
update_media_counts(&PathBuf::from(env::var("BASE_PATH").unwrap()))
}
_ => {}
}
};
_ => continue,
};
}
}
});
let system = actix::System::new("image-api");
let act = StreamActor {}.start();
let app_data = web::Data::new(AppState {
stream_manager: Arc::new(act),
});
let labels = HashMap::new();
let prometheus = PrometheusMetrics::new("", Some("/metrics"), Some(labels));
prometheus
.registry
.register(Box::new(IMAGE_GAUGE.clone()))
.unwrap();
prometheus
.registry
.register(Box::new(VIDEO_GAUGE.clone()))
.unwrap();
HttpServer::new(move || {
let user_dao = SqliteUserDao::new();
let favorites_dao = SqliteFavoriteDao::new();
App::new()
.wrap(middleware::Logger::default())
.service(web::resource("/login").route(web::post().to(login)))
.service(web::resource("/photos").route(web::get().to(files::list_photos)))
.service(get_image)
.service(upload_image)
.service(generate_video)
.service(stream_video)
.service(get_video_part)
.service(favorites)
.service(put_add_favorite)
.service(delete_favorite)
.service(get_file_metadata)
.service(add_tag)
.service(get_tags)
.app_data(app_data.clone())
.data::<Box<dyn UserDao>>(Box::new(user_dao))
.data::<Box<dyn FavoriteDao>>(Box::new(favorites_dao))
.wrap(prometheus.clone())
})
.bind(dotenv::var("BIND_URL").unwrap())?
.bind("localhost:8088")?
.run();
system.run()
}
struct AppState {
stream_manager: Arc<Addr<StreamActor>>,
}

View File

@@ -1,763 +0,0 @@
use actix_web::web::Data;
use actix_web::{HttpRequest, HttpResponse, Responder, get, web};
use chrono::LocalResult::{Ambiguous, Single};
use chrono::{DateTime, Datelike, FixedOffset, Local, LocalResult, NaiveDate, TimeZone, Utc};
use log::{debug, trace, warn};
use opentelemetry::KeyValue;
use opentelemetry::trace::{Span, Status, Tracer};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::path::PathBuf;
use walkdir::WalkDir;
use crate::data::Claims;
use crate::files::is_image_or_video;
use crate::otel::{extract_context_from_request, global_tracer};
use crate::state::AppState;
// Helper that encapsulates path-exclusion semantics
#[derive(Debug)]
struct PathExcluder {
excluded_dirs: Vec<PathBuf>,
excluded_patterns: Vec<String>,
}
impl PathExcluder {
/// Build from a `base` path and the raw exclusion entries.
///
/// Rules:
/// - Entries starting with '/' are interpreted as "absolute under base"
/// (e.g. "/photos/private" -> base/photos/private).
/// - Entries without '/' are treated as substring patterns that match
/// anywhere in the full path string (still scoped under base).
fn new(base: &Path, raw_excluded: &[String]) -> Self {
let mut excluded_dirs = Vec::new();
let mut excluded_patterns = Vec::new();
for dir in raw_excluded {
if let Some(rel) = dir.strip_prefix('/') {
// Absolute under base
if !rel.is_empty() {
excluded_dirs.push(base.join(rel));
}
} else {
// Pattern anywhere under base
excluded_patterns.push(dir.clone());
}
}
debug!(
"PathExcluder created. dirs={:?}, patterns={:?}",
excluded_dirs, excluded_patterns
);
Self {
excluded_dirs,
excluded_patterns,
}
}
/// Returns true if `path` should be excluded.
fn is_excluded(&self, path: &Path) -> bool {
// Directory-based exclusions
for excluded in &self.excluded_dirs {
if path.starts_with(excluded) {
debug!(
"PathExcluder: excluded by dir: {:?} (rule: {:?})",
path, excluded
);
return true;
}
}
// Pattern-based exclusions: match whole path components (dir or file name),
// not substrings.
if !self.excluded_patterns.is_empty() {
for component in path.components() {
if let Some(comp_str) = component.as_os_str().to_str()
&& self.excluded_patterns.iter().any(|pat| pat == comp_str) {
debug!(
"PathExcluder: excluded by component pattern: {:?} (component: {:?}, patterns: {:?})",
path, comp_str, self.excluded_patterns
);
return true;
}
}
}
false
}
}
#[derive(Copy, Clone, Deserialize, PartialEq, Debug)]
#[serde(rename_all = "lowercase")]
pub enum MemoriesSpan {
Day,
Week,
Month,
}
#[derive(Deserialize)]
pub struct MemoriesRequest {
pub span: Option<MemoriesSpan>,
/// Client timezone offset in minutes from UTC (e.g., -480 for PST, 60 for CET)
pub timezone_offset_minutes: Option<i32>,
}
#[derive(Debug, Serialize, Clone)]
pub struct MemoryItem {
pub path: String,
pub created: Option<i64>,
pub modified: Option<i64>,
}
#[derive(Debug, Serialize)]
pub struct MemoriesResponse {
pub items: Vec<MemoryItem>,
}
fn get_file_date_info(
path: &Path,
client_timezone: &Option<FixedOffset>,
) -> Option<(NaiveDate, Option<i64>, Option<i64>)> {
// Read file metadata once
let meta = std::fs::metadata(path).ok()?;
// Extract metadata timestamps
let metadata_created = meta.created().ok().map(|t| {
let utc: DateTime<Utc> = t.into();
if let Some(tz) = client_timezone {
utc.with_timezone(tz).timestamp()
} else {
utc.timestamp()
}
});
let metadata_modified = meta.modified().ok().map(|t| {
let utc: DateTime<Utc> = t.into();
if let Some(tz) = client_timezone {
utc.with_timezone(tz).timestamp()
} else {
utc.timestamp()
}
});
// Try to get date from filename
if let Some(date_time) = path
.file_name()
.and_then(|filename| filename.to_str())
.and_then(extract_date_from_filename)
{
// Convert to client timezone if specified
let date_in_timezone = if let Some(tz) = client_timezone {
date_time.with_timezone(tz)
} else {
date_time.with_timezone(&Local).fixed_offset()
};
// Use the timestamp from the filename date
let created_ts = date_in_timezone.timestamp();
debug!(
"File date from file {:?} > {:?} = {:?}",
path.file_name(),
date_time,
date_in_timezone
);
return Some((
date_in_timezone.date_naive(),
Some(created_ts),
metadata_modified,
));
}
// Fall back to metadata if no date in filename
let system_time = meta.created().ok().or_else(|| meta.modified().ok())?;
let dt_utc: DateTime<Utc> = system_time.into();
let date_in_timezone = if let Some(tz) = client_timezone {
dt_utc.with_timezone(tz).date_naive()
} else {
dt_utc.with_timezone(&Local).date_naive()
};
trace!("Fallback metadata create date = {:?}", date_in_timezone);
Some((date_in_timezone, metadata_created, metadata_modified))
}
fn extract_date_from_filename(filename: &str) -> Option<DateTime<FixedOffset>> {
let build_date_from_ymd_capture =
|captures: &regex::Captures| -> Option<DateTime<FixedOffset>> {
let year = captures.get(1)?.as_str().parse::<i32>().ok()?;
let month = captures.get(2)?.as_str().parse::<u32>().ok()?;
let day = captures.get(3)?.as_str().parse::<u32>().ok()?;
let hour = captures.get(4)?.as_str().parse::<u32>().ok()?;
let min = captures.get(5)?.as_str().parse::<u32>().ok()?;
let sec = captures.get(6)?.as_str().parse::<u32>().ok()?;
match Local.from_local_datetime(
&NaiveDate::from_ymd_opt(year, month, day)?.and_hms_opt(hour, min, sec)?,
) {
Single(dt) => Some(dt.fixed_offset()),
Ambiguous(early_dt, _) => Some(early_dt.fixed_offset()),
LocalResult::None => {
warn!("Weird local date: {:?}", filename);
None
}
}
};
// 1. Screenshot format: Screenshot_2014-06-01-20-44-50.png
if let Some(captures) = regex::Regex::new(r"(\d{4})-(\d{2})-(\d{2})-(\d{2})-(\d{2})-(\d{2})")
.ok()?
.captures(filename)
.and_then(|c| build_date_from_ymd_capture(&c))
{
return Some(captures);
}
// Screenshot format: Screenshot_20140601[_-]204450.png
if let Some(captures) = regex::Regex::new(r"(\d{4})(\d{2})(\d{2})[_-](\d{2})(\d{2})(\d{2})")
.ok()?
.captures(filename)
.and_then(|c| build_date_from_ymd_capture(&c))
{
return Some(captures);
}
// 2. Dash format: 2015-01-09_02-15-15.jpg
if let Some(captures) = regex::Regex::new(r"(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})")
.ok()?
.captures(filename)
.and_then(|c| build_date_from_ymd_capture(&c))
{
return Some(captures);
}
// Dash with compact time format: 2015-01-09-021515.jpg
if let Some(captures) = regex::Regex::new(r"(\d{4})-(\d{2})-(\d{2})-(\d{2})(\d{2})(\d{2})")
.ok()?
.captures(filename)
.and_then(|c| build_date_from_ymd_capture(&c))
{
return Some(captures);
}
// 3. Compact format: 20140927101712.jpg
if let Some(captures) = regex::Regex::new(r"(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})")
.ok()?
.captures(filename)
.and_then(|c| build_date_from_ymd_capture(&c))
{
return Some(captures);
}
// 4. Timestamp format: 1401638400.jpeg
if let Some(captures) = regex::Regex::new(r"(\d{10}|\d{13})\.")
.ok()?
.captures(filename)
{
let timestamp_str = captures.get(1)?.as_str();
// Millisecond timestamp (13 digits)
if timestamp_str.len() >= 13
&& let Some(date_time) = timestamp_str[0..13]
.parse::<i64>()
.ok()
.and_then(DateTime::from_timestamp_millis)
.map(|naive_dt| naive_dt.fixed_offset())
{
return Some(date_time);
}
// Second timestamp (10 digits)
if timestamp_str.len() >= 10
&& let Some(date_time) = timestamp_str[0..10]
.parse::<i64>()
.ok()
.and_then(|timestamp_secs| DateTime::from_timestamp(timestamp_secs, 0))
.map(|naive_dt| naive_dt.fixed_offset())
{
return Some(date_time);
}
}
None
}
#[get("/memories")]
pub async fn list_memories(
_claims: Claims,
request: HttpRequest,
q: web::Query<MemoriesRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("list_memories", &context);
let span_mode = q.span.unwrap_or(MemoriesSpan::Day);
let years_back: u32 = 15;
// Create timezone from client offset, default to local timezone if not provided
let client_timezone = match q.timezone_offset_minutes {
Some(offset_mins) => {
let offset_secs = offset_mins * 60;
Some(
FixedOffset::east_opt(offset_secs)
.unwrap_or_else(|| FixedOffset::east_opt(0).unwrap()),
)
}
None => None,
};
let now = if let Some(tz) = client_timezone {
debug!("Client timezone: {:?}", tz);
Utc::now().with_timezone(&tz).date_naive()
} else {
Local::now().date_naive()
};
debug!("Now: {:?}", now);
let base = Path::new(&app_state.base_path);
// Build the path excluder from base and env-configured exclusions
let path_excluder = PathExcluder::new(base, &app_state.excluded_dirs);
let entries: Vec<_> = WalkDir::new(base)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| {
let path = e.path();
// Skip paths that should be excluded
if path_excluder.is_excluded(path) {
return false;
}
true
})
.filter(|e| e.file_type().is_file() && is_image_or_video(e.path()))
.collect();
let mut memories_with_dates: Vec<(MemoryItem, NaiveDate)> = entries
.par_iter()
.filter_map(|entry| {
let path = entry.path();
// Get file date and timestamps in one operation
let (file_date, created, modified) = match get_file_date_info(path, &client_timezone) {
Some(info) => info,
None => {
warn!("No date info found for file: {:?}", path);
return None;
}
};
if is_memories_match(file_date, now, span_mode, years_back) {
return if let Ok(rel) = path.strip_prefix(base) {
Some((
MemoryItem {
path: rel.to_string_lossy().to_string(),
created,
modified,
},
file_date,
))
} else {
warn!("Failed to strip prefix from path: {:?}", path);
None
};
}
None
})
.collect();
match span_mode {
// Sort by absolute time for a more 'overview'
MemoriesSpan::Month => memories_with_dates.sort_by(|a, b| a.1.cmp(&b.1)),
_ => {
memories_with_dates.sort_by(|a, b| {
let day_comparison = a.1.day().cmp(&b.1.day());
if day_comparison == std::cmp::Ordering::Equal {
match (a.0.created, b.0.created) {
(Some(a_time), Some(b_time)) => a_time.cmp(&b_time),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => std::cmp::Ordering::Equal,
}
} else {
day_comparison
}
});
}
}
// Sort by day of the month and time (using the created timestamp)
let items: Vec<MemoryItem> = memories_with_dates.into_iter().map(|(m, _)| m).collect();
span.add_event(
"memories_scanned",
vec![
KeyValue::new("span", format!("{:?}", span_mode)),
KeyValue::new("years_back", years_back.to_string()),
KeyValue::new("result_count", items.len().to_string()),
KeyValue::new(
"client_timezone",
format!(
"{:?}",
client_timezone.unwrap_or_else(|| FixedOffset::east_opt(0).unwrap())
),
),
KeyValue::new("excluded_dirs", format!("{:?}", app_state.excluded_dirs)),
],
);
span.set_status(Status::Ok);
HttpResponse::Ok().json(MemoriesResponse { items })
}
fn is_memories_match(
file_date: NaiveDate,
today: NaiveDate,
span: MemoriesSpan,
years_back: u32,
) -> bool {
if file_date > today {
return false;
}
let years_diff = (today.year() - file_date.year()).unsigned_abs();
if years_diff > years_back {
warn!(
"File date is too far in the past: {:?} vs {:?}",
file_date, today
);
return false;
}
match span {
MemoriesSpan::Day => same_month_day_any_year(file_date, today),
MemoriesSpan::Week => same_week_any_year(file_date, today),
MemoriesSpan::Month => same_month_any_year(file_date, today),
}
}
fn same_month_day_any_year(a: NaiveDate, b: NaiveDate) -> bool {
a.month() == b.month() && a.day() == b.day()
}
// Match same ISO week number and same weekday (ignoring year)
fn same_week_any_year(a: NaiveDate, b: NaiveDate) -> bool {
a.iso_week().week().eq(&b.iso_week().week())
}
// Match same month (ignoring day and year)
fn same_month_any_year(a: NaiveDate, b: NaiveDate) -> bool {
a.month() == b.month()
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Timelike;
use std::fs::{self, File};
use tempfile::tempdir;
#[test]
fn test_extract_date_from_filename_screenshot_format() {
let filename = "Screenshot_2014-06-01-20-44-50.png";
let date_time = extract_date_from_filename(filename).unwrap();
assert_eq!(date_time.year(), 2014);
assert_eq!(date_time.month(), 6);
assert_eq!(date_time.day(), 1);
assert_eq!(date_time.hour(), 20);
assert_eq!(date_time.minute(), 44);
assert_eq!(date_time.second(), 50);
}
#[test]
fn test_extract_date_from_filename_screenshot_less_dashes_format() {
let filename = "Screenshot_20140601-204450.png";
let date_time = extract_date_from_filename(filename).unwrap();
assert_eq!(date_time.year(), 2014);
assert_eq!(date_time.month(), 6);
assert_eq!(date_time.day(), 1);
assert_eq!(date_time.hour(), 20);
assert_eq!(date_time.minute(), 44);
assert_eq!(date_time.second(), 50);
}
#[test]
fn test_extract_date_from_filename_screenshot_underscores_format() {
let filename = "20140601_204450.png";
let date_time = extract_date_from_filename(filename).unwrap();
assert_eq!(date_time.year(), 2014);
assert_eq!(date_time.month(), 6);
assert_eq!(date_time.day(), 1);
assert_eq!(date_time.hour(), 20);
assert_eq!(date_time.minute(), 44);
assert_eq!(date_time.second(), 50);
}
#[test]
fn test_extract_date_from_filename_dash_format() {
let filename = "2015-01-09_02-15-15.jpg";
let date_time = extract_date_from_filename(filename).unwrap();
assert_eq!(date_time.year(), 2015);
assert_eq!(date_time.month(), 1);
assert_eq!(date_time.day(), 9);
assert_eq!(date_time.hour(), 2);
assert_eq!(date_time.minute(), 15);
assert_eq!(date_time.second(), 15);
}
#[test]
fn test_extract_date_from_filename_dash_compact_time_format() {
let filename = "2015-01-09-021515.jpg";
let date_time = extract_date_from_filename(filename).unwrap();
assert_eq!(date_time.year(), 2015);
assert_eq!(date_time.month(), 1);
assert_eq!(date_time.day(), 9);
assert_eq!(date_time.hour(), 2);
assert_eq!(date_time.minute(), 15);
assert_eq!(date_time.second(), 15);
}
#[test]
fn test_extract_date_from_filename_compact_format() {
let filename = "20140927101712.jpg";
let date_time = extract_date_from_filename(filename).unwrap();
assert_eq!(date_time.year(), 2014);
assert_eq!(date_time.month(), 9);
assert_eq!(date_time.day(), 27);
assert_eq!(date_time.hour(), 10);
assert_eq!(date_time.minute(), 17);
assert_eq!(date_time.second(), 12);
}
#[test]
fn test_extract_date_from_filename_timestamp_format() {
let filename = "xyz_1401638400.jpeg"; // Unix timestamp for 2014-06-01 16:00:00 UTC
// Timestamps are already in UTC, so timezone doesn't matter for this test
let date_time = extract_date_from_filename(filename).unwrap();
assert_eq!(date_time.year(), 2014);
assert_eq!(date_time.month(), 6);
assert_eq!(date_time.day(), 1);
assert_eq!(date_time.hour(), 16);
assert_eq!(date_time.minute(), 0);
assert_eq!(date_time.second(), 0);
}
#[test]
fn test_extract_date_from_filename_timestamp_millis_format() {
let filename = "xyz_1401638400000.jpeg"; // Unix timestamp in milliseconds
let date_time = extract_date_from_filename(filename).unwrap();
assert_eq!(date_time.year(), 2014);
assert_eq!(date_time.month(), 6);
assert_eq!(date_time.day(), 1);
assert_eq!(date_time.hour(), 16);
assert_eq!(date_time.minute(), 0);
assert_eq!(date_time.second(), 0);
}
#[test]
fn test_get_file_date_info_from_filename() {
let temp_dir = tempdir().unwrap();
let temp_file = temp_dir.path().join("Screenshot_2014-06-01-20-44-50.png");
File::create(&temp_file).unwrap();
let (date, created, _) =
get_file_date_info(&temp_file, &Some(*Local::now().fixed_offset().offset())).unwrap();
// Check that date is from filename
assert_eq!(date.year(), 2014);
assert_eq!(date.month(), 6);
assert_eq!(date.day(), 1);
// Check that created timestamp matches the date from filename
assert!(created.is_some());
let ts = created.unwrap();
// The timestamp should be for 2014-06-01 20:44:50 in the LOCAL timezone
let dt_from_ts = Local.timestamp_opt(ts, 0).unwrap();
assert_eq!(dt_from_ts.year(), 2014);
assert_eq!(dt_from_ts.month(), 6);
assert_eq!(dt_from_ts.day(), 1);
assert_eq!(dt_from_ts.hour(), 20);
assert_eq!(dt_from_ts.minute(), 44);
assert_eq!(dt_from_ts.second(), 50);
}
#[test]
fn test_get_file_date_info_from_metadata() {
let temp_dir = tempdir().unwrap();
let temp_file = temp_dir.path().join("regular_image.jpg");
File::create(&temp_file).unwrap();
let (date, created, modified) = get_file_date_info(&temp_file, &None).unwrap();
// Both date and timestamps should be from metadata (recent)
let today = Local::now().date_naive();
assert_eq!(date.year(), today.year());
assert_eq!(date.month(), today.month());
// Both timestamps should be valid
assert!(created.is_some());
assert!(modified.is_some());
// Check that timestamps are recent
let dt_created = DateTime::<Utc>::from_timestamp(created.unwrap(), 0).unwrap();
assert_eq!(dt_created.year(), today.year());
let dt_modified = DateTime::<Utc>::from_timestamp(modified.unwrap(), 0).unwrap();
assert_eq!(dt_modified.year(), today.year());
}
#[test]
fn test_path_excluder_absolute_under_base() {
let tmp = tempdir().unwrap();
let base = tmp.path();
// Simulate structure:
// base/photos/private/secret.jpg
// base/photos/public/ok.jpg
// base/screenshots/img.png
let photos_private = base.join("photos/private");
let photos_public = base.join("photos/public");
let screenshots = base.join("screenshots");
fs::create_dir_all(&photos_private).unwrap();
fs::create_dir_all(&photos_public).unwrap();
fs::create_dir_all(&screenshots).unwrap();
let secret = photos_private.join("secret.jpg");
let ok = photos_public.join("ok.jpg");
let shot = screenshots.join("img.png");
File::create(&secret).unwrap();
File::create(&ok).unwrap();
File::create(&shot).unwrap();
// Exclude "/photos/private" and "/screenshots" under base
let excluded = vec![
String::from("/photos/private"),
String::from("/screenshots"),
];
let excluder = PathExcluder::new(base, &excluded);
assert!(excluder.is_excluded(&secret), "secret should be excluded");
assert!(
excluder.is_excluded(&shot),
"screenshots should be excluded"
);
assert!(
!excluder.is_excluded(&ok),
"public photo should NOT be excluded"
);
}
#[test]
fn test_path_excluder_pattern_anywhere_under_base() {
let tmp = tempdir().unwrap();
let base = tmp.path();
// Simulate:
// base/a/tmp_file.jpg
// base/b/normal.jpg
// base/c/sometmpdir/file.jpg
let a = base.join("a");
let b = base.join("b");
let c = base.join("c/tmp");
fs::create_dir_all(&a).unwrap();
fs::create_dir_all(&b).unwrap();
fs::create_dir_all(&c).unwrap();
let tmp_file = a.join("tmp_file.jpg");
let normal = b.join("normal.jpg");
let tmp_dir_file = c.join("file.jpg");
File::create(&tmp_file).unwrap();
File::create(&normal).unwrap();
File::create(&tmp_dir_file).unwrap();
// Exclude any path containing "tmp"
let excluded = vec![String::from("tmp")];
let excluder = PathExcluder::new(base, &excluded);
assert!(
!excluder.is_excluded(&tmp_file),
"file with 'tmp' in name should NOT be excluded"
);
assert!(
excluder.is_excluded(&tmp_dir_file),
"file in directory with 'tmp' in path should be excluded"
);
assert!(
!excluder.is_excluded(&normal),
"file without 'tmp' in its path should NOT be excluded"
);
}
#[test]
fn test_path_excluder_mixed_absolute_and_pattern() {
let tmp = tempdir().unwrap();
let base = tmp.path();
// Simulate:
// base/photos/private/secret_tmp.jpg -> excluded by absolute dir rule
// base/photos/private/secret.jpg -> excluded by absolute dir rule
// base/photos/tmp/public.jpg -> excluded by pattern "tmp" (dir name)
// base/photos/public/tmp_public.jpg -> NOT excluded (file name contains "tmp" but not equal)
// base/other/keep.jpg -> NOT excluded
let photos_private = base.join("photos/private");
let photos_tmp = base.join("photos/tmp");
let photos_public = base.join("photos/public");
let other = base.join("other");
fs::create_dir_all(&photos_private).unwrap();
fs::create_dir_all(&photos_tmp).unwrap();
fs::create_dir_all(&photos_public).unwrap();
fs::create_dir_all(&other).unwrap();
let secret_tmp = photos_private.join("secret_tmp.jpg");
let secret = photos_private.join("secret.jpg");
let tmp_dir_file = photos_tmp.join("public.jpg");
let tmp_in_name = photos_public.join("tmp_public.jpg");
let keep = other.join("keep.jpg");
File::create(&secret_tmp).unwrap();
File::create(&secret).unwrap();
File::create(&tmp_dir_file).unwrap();
File::create(&tmp_in_name).unwrap();
File::create(&keep).unwrap();
// Mixed: exclude "/photos/private" (dir) and any component equal to "tmp"
let excluded = vec![String::from("/photos/private"), String::from("tmp")];
let excluder = PathExcluder::new(base, &excluded);
// Entire private tree is excluded by dir rule
assert!(excluder.is_excluded(&secret_tmp));
assert!(excluder.is_excluded(&secret));
// Dir 'tmp' under photos excluded by pattern
assert!(excluder.is_excluded(&tmp_dir_file));
// File name containing 'tmp' but not equal should NOT be excluded
assert!(!excluder.is_excluded(&tmp_in_name));
// keep.jpg doesn't match any rule
assert!(!excluder.is_excluded(&keep));
}
}

View File

@@ -1,112 +0,0 @@
use actix_web::HttpRequest;
use actix_web::http::header::HeaderMap;
use opentelemetry::global::{BoxedSpan, BoxedTracer};
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::trace::{Span, Status, Tracer};
use opentelemetry::{Context, KeyValue, global};
use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::logs::{BatchLogProcessor, SdkLoggerProvider};
use opentelemetry_sdk::propagation::TraceContextPropagator;
pub fn global_tracer() -> BoxedTracer {
global::tracer("image-server")
}
#[allow(dead_code)]
pub fn init_tracing() {
let resources = Resource::builder()
.with_attributes([
KeyValue::new("service.name", "image-server"),
KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
])
.build();
let span_exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(std::env::var("OTLP_OTLS_ENDPOINT").unwrap())
.build()
.unwrap();
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_batch_exporter(span_exporter)
.with_resource(resources)
.build();
global::set_tracer_provider(tracer_provider);
}
#[allow(dead_code)]
pub fn init_logs() {
let otlp_exporter = opentelemetry_otlp::LogExporter::builder()
.with_tonic()
.with_endpoint(std::env::var("OTLP_OTLS_ENDPOINT").unwrap())
.build()
.unwrap();
let exporter = opentelemetry_stdout::LogExporter::default();
let resources = Resource::builder()
.with_attributes([
KeyValue::new("service.name", "image-server"),
KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
])
.build();
let log_provider = SdkLoggerProvider::builder()
.with_log_processor(BatchLogProcessor::builder(exporter).build())
.with_log_processor(BatchLogProcessor::builder(otlp_exporter).build())
.with_resource(resources)
.build();
let otel_log_appender = OpenTelemetryLogBridge::new(&log_provider);
log::set_boxed_logger(Box::new(otel_log_appender)).expect("Unable to set boxed logger");
//TODO: Still set this with the env? Ideally we still have a clean/simple local logger for local dev
log::set_max_level(log::LevelFilter::Info);
}
struct HeaderExtractor<'a>(&'a HeaderMap);
impl<'a> opentelemetry::propagation::Extractor for HeaderExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|v| v.to_str().ok())
}
fn keys(&self) -> Vec<&str> {
self.0.keys().map(|k| k.as_str()).collect()
}
}
pub fn extract_context_from_request(req: &HttpRequest) -> Context {
let propagator = TraceContextPropagator::new();
propagator.extract(&HeaderExtractor(req.headers()))
}
pub fn trace_db_call<F, O>(
context: &Context,
query_type: &str,
operation: &str,
func: F,
) -> anyhow::Result<O>
where
F: FnOnce(&mut BoxedSpan) -> anyhow::Result<O>,
{
let tracer = global::tracer("db");
let mut span = tracer
.span_builder(format!("db.{}.{}", query_type, operation))
.with_attributes(vec![
KeyValue::new("db.query_type", query_type.to_string().clone()),
KeyValue::new("db.operation", operation.to_string().clone()),
])
.start_with_context(&tracer, context);
let result = func(&mut span);
match &result {
Ok(_) => {
span.set_status(Status::Ok);
}
Err(e) => span.set_status(Status::error(e.to_string())),
}
result
}

View File

@@ -1,16 +0,0 @@
use actix_web::App;
pub trait ServiceBuilder<T> {
fn add_feature<F>(self, f: F) -> App<T>
where
F: Fn(App<T>) -> App<T>;
}
impl<T> ServiceBuilder<T> for App<T> {
fn add_feature<F>(self, create_feature: F) -> App<T>
where
F: Fn(App<T>) -> App<T>,
{
create_feature(self)
}
}

View File

@@ -1,96 +0,0 @@
use crate::video::actors::{PlaylistGenerator, StreamActor, VideoPlaylistManager};
use actix::{Actor, Addr};
use std::{env, sync::Arc};
pub struct AppState {
pub stream_manager: Arc<Addr<StreamActor>>,
pub playlist_manager: Arc<Addr<VideoPlaylistManager>>,
pub base_path: String,
pub thumbnail_path: String,
pub video_path: String,
pub gif_path: String,
pub excluded_dirs: Vec<String>,
}
impl AppState {
pub fn new(
stream_manager: Arc<Addr<StreamActor>>,
base_path: String,
thumbnail_path: String,
video_path: String,
gif_path: String,
excluded_dirs: Vec<String>,
) -> Self {
let playlist_generator = PlaylistGenerator::new();
let video_playlist_manager =
VideoPlaylistManager::new(video_path.clone(), playlist_generator.start());
Self {
stream_manager,
playlist_manager: Arc::new(video_playlist_manager.start()),
base_path,
thumbnail_path,
video_path,
gif_path,
excluded_dirs,
}
}
/// Parse excluded directories from environment variable
fn parse_excluded_dirs() -> Vec<String> {
env::var("EXCLUDED_DIRS")
.unwrap_or_default()
.split(',')
.filter(|dir| !dir.trim().is_empty())
.map(|dir| dir.trim().to_string())
.collect()
}
}
impl Default for AppState {
fn default() -> Self {
Self::new(
Arc::new(StreamActor {}.start()),
env::var("BASE_PATH").expect("BASE_PATH was not set in the env"),
env::var("THUMBNAILS").expect("THUMBNAILS was not set in the env"),
env::var("VIDEO_PATH").expect("VIDEO_PATH was not set in the env"),
env::var("GIFS_DIRECTORY").expect("GIFS_DIRECTORY was not set in the env"),
Self::parse_excluded_dirs(),
)
}
}
#[cfg(test)]
impl AppState {
/// Creates an AppState instance for testing with temporary directories
pub fn test_state() -> Self {
use actix::Actor;
// Create a base temporary directory
let temp_dir = tempfile::tempdir().expect("Failed to create temp directory");
let base_path = temp_dir.path().to_path_buf();
// Create subdirectories for thumbnails, videos, and gifs
let thumbnail_path = create_test_subdir(&base_path, "thumbnails");
let video_path = create_test_subdir(&base_path, "videos");
let gif_path = create_test_subdir(&base_path, "gifs");
// Create the AppState with the temporary paths
AppState::new(
std::sync::Arc::new(crate::video::actors::StreamActor {}.start()),
base_path.to_string_lossy().to_string(),
thumbnail_path.to_string_lossy().to_string(),
video_path.to_string_lossy().to_string(),
gif_path.to_string_lossy().to_string(),
Vec::new(), // No excluded directories for test state
)
}
}
/// Helper function to create a subdirectory inside the base directory for testing
#[cfg(test)]
fn create_test_subdir(base_path: &std::path::Path, name: &str) -> std::path::PathBuf {
let dir_path = base_path.join(name);
std::fs::create_dir_all(&dir_path)
.unwrap_or_else(|_| panic!("Failed to create {} directory", name));
dir_path
}

View File

@@ -1,823 +0,0 @@
use crate::data::GetTagsRequest;
use crate::otel::{extract_context_from_request, global_tracer, trace_db_call};
use crate::{Claims, ThumbnailRequest, connect, data::AddTagRequest, error::IntoHttpError, schema};
use actix_web::dev::{ServiceFactory, ServiceRequest};
use actix_web::{App, HttpRequest, HttpResponse, Responder, web};
use anyhow::Context;
use chrono::Utc;
use diesel::dsl::count_star;
use diesel::prelude::*;
use diesel::sql_types::*;
use log::{debug, info};
use opentelemetry::KeyValue;
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
use schema::{tagged_photo, tags};
use serde::{Deserialize, Serialize};
use std::borrow::BorrowMut;
use std::sync::Mutex;
pub fn add_tag_services<T, TagD: TagDao + 'static>(app: App<T>) -> App<T>
where
T: ServiceFactory<ServiceRequest, Config = (), Error = actix_web::Error, InitError = ()>,
{
app.service(
web::resource("image/tags")
.route(web::post().to(add_tag::<TagD>))
.route(web::get().to(get_tags::<TagD>))
.route(web::delete().to(remove_tagged_photo::<TagD>)),
)
.service(web::resource("image/tags/all").route(web::get().to(get_all_tags::<TagD>)))
.service(web::resource("image/tags/batch").route(web::post().to(update_tags::<TagD>)))
}
async fn add_tag<D: TagDao>(
_: Claims,
request: HttpRequest,
body: web::Json<AddTagRequest>,
tag_dao: web::Data<Mutex<D>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let span = tracer.start_with_context("add_tag", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let tag_name = body.tag_name.clone();
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
tag_dao
.get_all_tags(&span_context, None)
.and_then(|tags| {
if let Some((_, tag)) = tags.iter().find(|t| t.1.name == tag_name) {
Ok(tag.clone())
} else {
info!(
"Creating missing tag: '{:?}' for file: '{}'",
tag_name, &body.file_name
);
tag_dao.create_tag(&span_context, tag_name.trim())
}
})
.and_then(|tag| tag_dao.tag_file(&span_context, &body.file_name, tag.id))
.map(|_| {
span_context.span().set_status(Status::Ok);
HttpResponse::Ok()
})
.into_http_internal_err()
}
async fn get_tags<D: TagDao>(
_: Claims,
http_request: HttpRequest,
request: web::Query<ThumbnailRequest>,
tag_dao: web::Data<Mutex<D>>,
) -> impl Responder {
let context = extract_context_from_request(&http_request);
let span = global_tracer().start_with_context("get_tags", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
tag_dao
.get_tags_for_path(&span_context, &request.path)
.map(|tags| {
span_context.span().set_status(Status::Ok);
HttpResponse::Ok().json(tags)
})
.into_http_internal_err()
}
async fn get_all_tags<D: TagDao>(
_: Claims,
tag_dao: web::Data<Mutex<D>>,
request: HttpRequest,
query: web::Query<GetTagsRequest>,
) -> impl Responder {
let context = extract_context_from_request(&request);
let span = global_tracer().start_with_context("get_all_tags", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
tag_dao
.get_all_tags(&span_context, query.path.clone())
.map(|tags| {
span_context.span().set_status(Status::Ok);
HttpResponse::Ok().json(
tags.iter()
.map(|(tag_count, tag)| TagWithTagCount {
tag: tag.clone(),
tag_count: *tag_count,
})
.collect::<Vec<TagWithTagCount>>(),
)
})
.into_http_internal_err()
}
async fn remove_tagged_photo<D: TagDao>(
_: Claims,
http_request: HttpRequest,
request: web::Json<AddTagRequest>,
tag_dao: web::Data<Mutex<D>>,
) -> impl Responder {
let context = extract_context_from_request(&http_request);
let span = global_tracer().start_with_context("remove_tagged_photo", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
tag_dao
.remove_tag(&span_context, &request.tag_name, &request.file_name)
.map(|result| {
span_context.span().set_status(Status::Ok);
if result.is_some() {
HttpResponse::Ok()
} else {
HttpResponse::NotFound()
}
})
.into_http_internal_err()
}
async fn update_tags<D: TagDao>(
_: Claims,
tag_dao: web::Data<Mutex<D>>,
http_request: HttpRequest,
request: web::Json<AddTagsRequest>,
) -> impl Responder {
let mut dao = tag_dao.lock().expect("Unable to get TagDao");
let context = extract_context_from_request(&http_request);
let span = global_tracer().start_with_context("update_tags", &context);
let span_context = opentelemetry::Context::current_with_span(span);
dao.get_tags_for_path(&span_context, &request.file_name)
.and_then(|existing_tags| {
dao.get_all_tags(&span_context, None)
.map(|all| (existing_tags, all))
})
.map(|(existing_tags, all_tags)| {
let tags_to_remove = existing_tags
.iter()
.filter(|&t| !request.tag_ids.contains(&t.id))
.collect::<Vec<&Tag>>();
for tag in tags_to_remove {
info!(
"Removing tag {:?} from file: {:?}",
tag.name, request.file_name
);
dao.remove_tag(&span_context, &tag.name, &request.file_name)
.unwrap_or_else(|err| panic!("{:?} Unable to remove tag {:?}", err, &tag.name));
}
let new_tags = all_tags
.iter()
.filter(|(_, t)| !existing_tags.contains(t) && request.tag_ids.contains(&t.id))
.collect::<Vec<&(i64, Tag)>>();
for (_, new_tag) in new_tags {
info!(
"Adding tag {:?} to file: {:?}",
new_tag.name, request.file_name
);
dao.tag_file(&span_context, &request.file_name, new_tag.id)
.with_context(|| {
format!(
"Unable to tag file {:?} with tag: {:?}",
request.file_name, new_tag.name
)
})
.unwrap();
}
span_context.span().set_status(Status::Ok);
HttpResponse::Ok()
})
.into_http_internal_err()
}
#[derive(Serialize, Queryable, Clone, Debug, PartialEq)]
pub struct Tag {
pub id: i32,
pub name: String,
pub created_time: i64,
}
#[derive(Serialize, Debug)]
pub struct TagWithTagCount {
pub tag_count: i64,
pub tag: Tag,
}
#[derive(Insertable, Clone, Debug)]
#[diesel(table_name = tags)]
pub struct InsertTag {
pub name: String,
pub created_time: i64,
}
#[derive(Insertable, Clone, Debug)]
#[diesel(table_name = tagged_photo)]
pub struct InsertTaggedPhoto {
pub tag_id: i32,
pub photo_name: String,
pub created_time: i64,
}
#[derive(Queryable, Clone, Debug)]
pub struct TaggedPhoto {
pub id: i32,
pub photo_name: String,
pub tag_id: i32,
pub created_time: i64,
}
#[derive(Debug, Deserialize)]
pub struct AddTagsRequest {
pub file_name: String,
pub tag_ids: Vec<i32>,
}
pub trait TagDao {
fn get_all_tags(
&mut self,
context: &opentelemetry::Context,
path: Option<String>,
) -> anyhow::Result<Vec<(i64, Tag)>>;
fn get_tags_for_path(
&mut self,
context: &opentelemetry::Context,
path: &str,
) -> anyhow::Result<Vec<Tag>>;
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag>;
fn remove_tag(
&mut self,
context: &opentelemetry::Context,
tag_name: &str,
path: &str,
) -> anyhow::Result<Option<()>>;
fn tag_file(
&mut self,
context: &opentelemetry::Context,
path: &str,
tag_id: i32,
) -> anyhow::Result<TaggedPhoto>;
fn get_files_with_all_tag_ids(
&mut self,
tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>>;
fn get_files_with_any_tag_ids(
&mut self,
tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>>;
}
pub struct SqliteTagDao {
connection: SqliteConnection,
}
impl SqliteTagDao {
pub(crate) fn new(connection: SqliteConnection) -> Self {
SqliteTagDao { connection }
}
}
impl Default for SqliteTagDao {
fn default() -> Self {
SqliteTagDao::new(connect())
}
}
impl TagDao for SqliteTagDao {
fn get_all_tags(
&mut self,
context: &opentelemetry::Context,
path: Option<String>,
) -> anyhow::Result<Vec<(i64, Tag)>> {
// select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*);
trace_db_call(context, "query", "get_all_tags", |span| {
span.set_attribute(KeyValue::new("path", path.clone().unwrap_or_default()));
let path = path.map(|p| p + "%").unwrap_or("%".to_string());
let (id, name, created_time) = tags::all_columns;
tags::table
.inner_join(tagged_photo::table)
.group_by(tags::id)
.select((count_star(), id, name, created_time))
.filter(tagged_photo::photo_name.like(path))
.get_results(&mut self.connection)
.map::<Vec<(i64, Tag)>, _>(|tags_with_count: Vec<(i64, i32, String, i64)>| {
tags_with_count
.iter()
.map(|tup| {
(
tup.0,
Tag {
id: tup.1,
name: tup.2.clone(),
created_time: tup.3,
},
)
})
.collect()
})
.with_context(|| "Unable to get all tags")
})
}
fn get_tags_for_path(
&mut self,
context: &opentelemetry::Context,
path: &str,
) -> anyhow::Result<Vec<Tag>> {
trace_db_call(context, "query", "get_tags_for_path", |span| {
span.set_attribute(KeyValue::new("path", path.to_string()));
debug!("Getting Tags for path: {:?}", path);
tags::table
.left_join(tagged_photo::table)
.filter(tagged_photo::photo_name.eq(&path))
.select((tags::id, tags::name, tags::created_time))
.get_results::<Tag>(self.connection.borrow_mut())
.with_context(|| "Unable to get tags from Sqlite")
})
}
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag> {
trace_db_call(context, "insert", "create_tag", |span| {
span.set_attribute(KeyValue::new("name", name.to_string()));
diesel::insert_into(tags::table)
.values(InsertTag {
name: name.to_string(),
created_time: Utc::now().timestamp(),
})
.execute(&mut self.connection)
.with_context(|| format!("Unable to insert tag {:?} in Sqlite", name))
.and_then(|_| {
info!("Inserted tag: {:?}", name);
define_sql_function! {
fn last_insert_rowid() -> Integer;
}
diesel::select(last_insert_rowid())
.get_result::<i32>(&mut self.connection)
.with_context(|| "Unable to get last inserted tag from Sqlite")
})
.and_then(|id| {
debug!("Got id: {:?} for inserted tag: {:?}", id, name);
tags::table
.filter(tags::id.eq(id))
.select((tags::id, tags::name, tags::created_time))
.get_result::<Tag>(self.connection.borrow_mut())
.with_context(|| {
format!("Unable to get tagged photo with id: {:?} from Sqlite", id)
})
})
})
}
fn remove_tag(
&mut self,
context: &opentelemetry::Context,
tag_name: &str,
path: &str,
) -> anyhow::Result<Option<()>> {
trace_db_call(context, "delete", "remove_tag", |span| {
span.set_attributes(vec![
KeyValue::new("tag_name", tag_name.to_string()),
KeyValue::new("path", path.to_string()),
]);
tags::table
.filter(tags::name.eq(tag_name))
.get_result::<Tag>(self.connection.borrow_mut())
.optional()
.with_context(|| format!("Unable to get tag '{}'", tag_name))
.and_then(|tag| {
if let Some(tag) = tag {
diesel::delete(
tagged_photo::table
.filter(tagged_photo::tag_id.eq(tag.id))
.filter(tagged_photo::photo_name.eq(path)),
)
.execute(&mut self.connection)
.with_context(|| format!("Unable to delete tag: '{}'", &tag.name))
.map(|_| Some(()))
} else {
info!("No tag found with name '{}'", tag_name);
Ok(None)
}
})
})
}
fn tag_file(
&mut self,
context: &opentelemetry::Context,
path: &str,
tag_id: i32,
) -> anyhow::Result<TaggedPhoto> {
trace_db_call(context, "insert", "tag_file", |span| {
span.set_attributes(vec![
KeyValue::new("path", path.to_string()),
KeyValue::new("tag_id", tag_id.to_string()),
]);
diesel::insert_into(tagged_photo::table)
.values(InsertTaggedPhoto {
tag_id,
photo_name: path.to_string(),
created_time: Utc::now().timestamp(),
})
.execute(self.connection.borrow_mut())
.with_context(|| format!("Unable to tag file {:?} in sqlite", path))
.and_then(|_| {
info!("Inserted tagged photo: {:#} -> {:?}", tag_id, path);
define_sql_function! {
fn last_insert_rowid() -> diesel::sql_types::Integer;
}
diesel::select(last_insert_rowid())
.get_result::<i32>(&mut self.connection)
.with_context(|| "Unable to get last inserted tag from Sqlite")
})
.and_then(|tagged_id| {
tagged_photo::table
.find(tagged_id)
.first(self.connection.borrow_mut())
.with_context(|| {
format!(
"Error getting inserted tagged photo with id: {:?}",
tagged_id
)
})
})
})
}
fn get_files_with_all_tag_ids(
&mut self,
tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>> {
trace_db_call(context, "query", "get_files_with_all_tags", |_| {
use diesel::dsl::*;
let exclude_subquery = tagged_photo::table
.filter(tagged_photo::tag_id.eq_any(exclude_tag_ids.clone()))
.select(tagged_photo::photo_name)
.into_boxed();
tagged_photo::table
.filter(tagged_photo::tag_id.eq_any(tag_ids.clone()))
.filter(tagged_photo::photo_name.ne_all(exclude_subquery))
.group_by(tagged_photo::photo_name)
.select((
tagged_photo::photo_name,
count_distinct(tagged_photo::tag_id),
))
.having(count_distinct(tagged_photo::tag_id).ge(tag_ids.len() as i64))
.get_results::<(String, i64)>(&mut self.connection)
.map(|results| {
results
.into_iter()
.map(|(file_name, tag_count)| FileWithTagCount {
file_name,
tag_count,
})
.collect()
})
.with_context(|| format!("Unable to get Tagged photos with ids: {:?}", tag_ids))
})
}
fn get_files_with_any_tag_ids(
&mut self,
tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>> {
trace_db_call(context, "query", "get_files_with_any_tags", |_| {
use diesel::dsl::*;
// Create the placeholders for the IN clauses
let tag_placeholders = std::iter::repeat_n("?", tag_ids.len())
.collect::<Vec<_>>()
.join(",");
let exclude_placeholders = std::iter::repeat_n("?", exclude_tag_ids.len())
.collect::<Vec<_>>()
.join(",");
let query = sql_query(format!(
r#"
WITH filtered_photos AS (
SELECT DISTINCT photo_name
FROM tagged_photo tp
WHERE tp.tag_id IN ({})
AND tp.photo_name NOT IN (
SELECT photo_name
FROM tagged_photo
WHERE tag_id IN ({})
)
)
SELECT
fp.photo_name as file_name,
COUNT(DISTINCT tp2.tag_id) as tag_count
FROM filtered_photos fp
JOIN tagged_photo tp2 ON fp.photo_name = tp2.photo_name
GROUP BY fp.photo_name"#,
tag_placeholders, exclude_placeholders
))
.into_boxed();
// Bind all parameters
let query = tag_ids
.into_iter()
.fold(query, |q, id| q.bind::<Integer, _>(id));
let query = exclude_tag_ids
.into_iter()
.fold(query, |q, id| q.bind::<Integer, _>(id));
query
.load::<FileWithTagCount>(&mut self.connection)
.with_context(|| "Unable to get tagged photos")
})
}
}
#[cfg(test)]
mod tests {
use actix_web::test::TestRequest;
use actix_web::web::Data;
use std::{cell::RefCell, collections::HashMap};
use diesel::result::Error::NotFound;
use log::warn;
use super::*;
struct TestTagDao {
tags: RefCell<Vec<Tag>>,
tagged_photos: RefCell<HashMap<String, Vec<Tag>>>,
tag_count: i32,
}
impl TestTagDao {
fn new() -> Self {
Self {
tags: RefCell::new(vec![]),
tagged_photos: RefCell::new(HashMap::new()),
tag_count: 0,
}
}
}
impl TagDao for TestTagDao {
fn get_all_tags(
&mut self,
_context: &opentelemetry::Context,
_option: Option<String>,
) -> anyhow::Result<Vec<(i64, Tag)>> {
Ok(self
.tags
.borrow()
.iter()
.map(|t| (1, t.clone()))
.collect::<Vec<(i64, Tag)>>()
.clone())
}
fn get_tags_for_path(
&mut self,
_context: &opentelemetry::Context,
path: &str,
) -> anyhow::Result<Vec<Tag>> {
info!("Getting test tags for: {:?}", path);
warn!("Tags for path: {:?}", self.tagged_photos);
Ok(self
.tagged_photos
.borrow()
.get(path)
.unwrap_or(&vec![])
.clone())
}
fn create_tag(
&mut self,
_context: &opentelemetry::Context,
name: &str,
) -> anyhow::Result<Tag> {
self.tag_count += 1;
let tag_id = self.tag_count;
let tag = Tag {
id: tag_id,
name: name.to_string(),
created_time: Utc::now().timestamp(),
};
self.tags.borrow_mut().push(tag.clone());
debug!("Created tag: {:?}", tag);
Ok(tag)
}
fn remove_tag(
&mut self,
_context: &opentelemetry::Context,
tag_name: &str,
path: &str,
) -> anyhow::Result<Option<()>> {
let mut clone = {
let photo_tags = &self.tagged_photos.borrow()[path];
photo_tags.clone()
};
clone.retain(|t| t.name != tag_name);
self.tagged_photos
.borrow_mut()
.insert(path.to_string(), clone);
let index = self.tags.borrow().iter().position(|t| t.name == tag_name);
if let Some(index) = index {
self.tags.borrow_mut().remove(index);
Ok(Some(()))
} else {
Ok(None)
}
}
fn tag_file(
&mut self,
_context: &opentelemetry::Context,
path: &str,
tag_id: i32,
) -> anyhow::Result<TaggedPhoto> {
debug!("Tagging file: {:?} with tag_id: {:?}", path, tag_id);
if let Some(tag) = self.tags.borrow().iter().find(|t| t.id == tag_id) {
debug!("Found tag: {:?}", tag);
let tagged_photo = TaggedPhoto {
id: self.tagged_photos.borrow().len() as i32,
tag_id: tag.id,
created_time: Utc::now().timestamp(),
photo_name: path.to_string(),
};
if self.tagged_photos.borrow().contains_key(path) {
let mut photo_tags = self.tagged_photos.borrow()[path].clone();
photo_tags.push(tag.clone());
self.tagged_photos
.borrow_mut()
.insert(path.to_string(), photo_tags);
} else {
//TODO: Add to existing tags (? huh)
self.tagged_photos
.borrow_mut()
.insert(path.to_string(), vec![tag.clone()]);
}
Ok(tagged_photo)
} else {
Err(NotFound.into())
}
}
fn get_files_with_all_tag_ids(
&mut self,
tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>,
_context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>> {
todo!()
}
fn get_files_with_any_tag_ids(
&mut self,
tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>,
_context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>> {
todo!()
}
}
#[actix_rt::test]
async fn add_new_tag_test() {
let tag_dao = TestTagDao::new();
let claims = Claims::valid_user(String::from("1"));
let body = AddTagRequest {
file_name: String::from("test.png"),
tag_name: String::from("test-tag"),
};
let tag_data = Data::new(Mutex::new(tag_dao));
let request = TestRequest::default().to_http_request();
add_tag(claims, request, web::Json(body), tag_data.clone()).await;
let mut tag_dao = tag_data.lock().unwrap();
let tags = tag_dao
.get_all_tags(&opentelemetry::Context::current(), None)
.unwrap();
assert_eq!(tags.len(), 1);
assert_eq!(tags.first().unwrap().1.name, "test-tag");
let tagged_photos = tag_dao.tagged_photos.borrow();
assert_eq!(tagged_photos["test.png"].len(), 1)
}
#[actix_rt::test]
async fn remove_tag_test() {
let tag_dao = TestTagDao::new();
let claims = Claims::valid_user(String::from("1"));
let add_request = AddTagRequest {
file_name: String::from("test.png"),
tag_name: String::from("test-tag"),
};
let remove_request = AddTagRequest {
file_name: String::from("test.png"),
tag_name: String::from("test-tag"),
};
let tag_data = Data::new(Mutex::new(tag_dao));
let request = TestRequest::default().to_http_request();
add_tag(
claims.clone(),
request.clone(),
web::Json(add_request),
tag_data.clone(),
)
.await;
remove_tagged_photo(claims, request, web::Json(remove_request), tag_data.clone()).await;
let mut tag_dao = tag_data.lock().unwrap();
let tags = tag_dao
.get_all_tags(&opentelemetry::Context::current(), None)
.unwrap();
assert!(tags.is_empty());
let tagged_photos = tag_dao.tagged_photos.borrow();
let previously_added_tagged_photo = tagged_photos.get("test.png").unwrap();
assert_eq!(previously_added_tagged_photo.len(), 0)
}
#[actix_rt::test]
async fn replace_tags_keeps_existing_tags_removes_extras_adds_missing_test() {
let mut tag_dao = TestTagDao::new();
let new_tag = tag_dao
.create_tag(&opentelemetry::Context::current(), "Test")
.unwrap();
let new_tag2 = tag_dao
.create_tag(&opentelemetry::Context::current(), "Test2")
.unwrap();
let _ = tag_dao
.create_tag(&opentelemetry::Context::current(), "Test3")
.unwrap();
tag_dao
.tag_file(&opentelemetry::Context::current(), "test.jpg", new_tag.id)
.unwrap();
tag_dao
.tag_file(&opentelemetry::Context::current(), "test.jpg", new_tag2.id)
.unwrap();
let claims = Claims::valid_user(String::from("1"));
let tag_data = Data::new(Mutex::new(tag_dao));
let add_tags_request = AddTagsRequest {
tag_ids: vec![1, 3],
file_name: String::from("test.jpg"),
};
let request = TestRequest::default().to_http_request();
update_tags(
claims,
tag_data.clone(),
request,
web::Json(add_tags_request),
)
.await;
let tag_dao = tag_data.lock().unwrap();
let tags_for_test_photo = &tag_dao.tagged_photos.borrow()["test.jpg"];
assert_eq!(tags_for_test_photo.len(), 2);
// ID of 2 was removed and 3 was added
assert_eq!(
tags_for_test_photo.iter().find(|&t| t.name == "Test2"),
None
);
}
}
#[derive(QueryableByName, Debug, Clone)]
pub(crate) struct FileWithTagCount {
#[diesel(sql_type = Text)]
pub(crate) file_name: String,
#[diesel(sql_type = BigInt)]
pub(crate) tag_count: i64,
}

View File

@@ -1,9 +1,7 @@
use actix_web::{
HttpResponse,
body::{BoxBody, MessageBody},
};
use actix_web::dev::{Body, ResponseBody};
use serde::Deserialize;
use crate::database::{UserDao, models::User};
use crate::database::{models::User, UserDao};
use std::cell::RefCell;
use std::option::Option;
@@ -20,7 +18,7 @@ impl TestUserDao {
}
impl UserDao for TestUserDao {
fn create_user(&mut self, username: &str, password: &str) -> Option<User> {
fn create_user(&self, username: &str, password: &str) -> Option<User> {
let u = User {
id: (self.user_map.borrow().len() + 1) as i32,
username: username.to_string(),
@@ -32,7 +30,7 @@ impl UserDao for TestUserDao {
Some(u)
}
fn get_user(&mut self, user: &str, pass: &str) -> Option<User> {
fn get_user(&self, user: &str, pass: &str) -> Option<User> {
match self
.user_map
.borrow()
@@ -47,18 +45,42 @@ impl UserDao for TestUserDao {
}
}
fn user_exists(&mut self, user: &str) -> bool {
self.user_map.borrow().iter().any(|u| u.username == user)
fn user_exists(&self, user: &str) -> bool {
self.user_map
.borrow()
.iter()
.find(|&u| u.username == user)
.is_some()
}
}
pub trait BodyReader {
fn read_to_str(self) -> String;
fn read_to_str(&self) -> &str;
}
impl BodyReader for HttpResponse<BoxBody> {
fn read_to_str(self) -> String {
let body = self.into_body().try_into_bytes().unwrap();
std::str::from_utf8(&body).unwrap().to_string()
impl BodyReader for ResponseBody<Body> {
fn read_to_str(&self) -> &str {
match self {
ResponseBody::Body(Body::Bytes(ref b)) => std::str::from_utf8(b).unwrap(),
_ => panic!("Unknown response body"),
}
}
}
pub trait TypedBodyReader<'a, T>
where
T: Deserialize<'a>,
{
fn read_body(&'a self) -> T;
}
impl<'a, T: Deserialize<'a>> TypedBodyReader<'a, T> for ResponseBody<Body> {
fn read_body(&'a self) -> T {
match self {
ResponseBody::Body(Body::Bytes(ref b)) => {
serde_json::from_str(std::str::from_utf8(b).unwrap()).unwrap()
}
_ => panic!("Unknown response body"),
}
}
}

95
src/video.rs Normal file
View File

@@ -0,0 +1,95 @@
use std::io::Result;
use std::path::Path;
use std::process::{Child, Command, ExitStatus, Stdio};
use actix::prelude::*;
use log::{debug, trace};
// ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8
// ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8
pub struct StreamActor;
impl Actor for StreamActor {
type Context = Context<Self>;
}
pub struct ProcessMessage(pub String, pub Child);
impl Message for ProcessMessage {
type Result = Result<ExitStatus>;
}
impl Handler<ProcessMessage> for StreamActor {
type Result = Result<ExitStatus>;
fn handle(&mut self, msg: ProcessMessage, _ctx: &mut Self::Context) -> Self::Result {
trace!("Message received");
let mut process = msg.1;
let result = process.wait();
debug!(
"Finished waiting for: {:?}. Code: {:?}",
msg.0,
result
.as_ref()
.map_or(-1, |status| status.code().unwrap_or(-1))
);
result
}
}
pub async fn create_playlist(video_path: &str, playlist_file: &str) -> Result<Child> {
if Path::new(playlist_file).exists() {
debug!("Playlist already exists: {}", playlist_file);
return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
}
let result = Command::new("ffmpeg")
.arg("-i")
.arg(video_path)
.arg("-c:v")
.arg("h264")
.arg("-crf")
.arg("21")
.arg("-preset")
.arg("veryfast")
.arg("-hls_time")
.arg("3")
.arg("-hls_list_size")
.arg("100")
.arg("-vf")
.arg("scale=1080:-2,setsar=1:1")
.arg(playlist_file)
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn();
let start_time = std::time::Instant::now();
loop {
actix::clock::delay_for(std::time::Duration::from_secs(1)).await;
if Path::new(playlist_file).exists()
|| std::time::Instant::now() - start_time > std::time::Duration::from_secs(5)
{
break;
}
}
result
}
pub fn generate_video_thumbnail(path: &Path, destination: &Path) {
Command::new("ffmpeg")
.arg("-ss")
.arg("3")
.arg("-i")
.arg(path.to_str().unwrap())
.arg("-vframes")
.arg("1")
.arg("-f")
.arg("image2")
.arg(destination)
.output()
.expect("Failure to create video frame");
}

View File

@@ -1,314 +0,0 @@
use crate::is_video;
use crate::otel::global_tracer;
use actix::prelude::*;
use futures::TryFutureExt;
use log::{debug, error, info, trace, warn};
use opentelemetry::KeyValue;
use opentelemetry::trace::{Span, Status, Tracer};
use std::io::Result;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, ExitStatus, Stdio};
use std::sync::Arc;
use tokio::sync::Semaphore;
use walkdir::{DirEntry, WalkDir};
// ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8
// ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8
pub struct StreamActor;
impl Actor for StreamActor {
type Context = Context<Self>;
}
pub struct ProcessMessage(pub String, pub Child);
impl Message for ProcessMessage {
type Result = Result<ExitStatus>;
}
impl Handler<ProcessMessage> for StreamActor {
type Result = Result<ExitStatus>;
fn handle(&mut self, msg: ProcessMessage, _ctx: &mut Self::Context) -> Self::Result {
trace!("Message received");
let mut process = msg.1;
let result = process.wait();
debug!(
"Finished waiting for: {:?}. Code: {:?}",
msg.0,
result
.as_ref()
.map_or(-1, |status| status.code().unwrap_or(-1))
);
result
}
}
pub async fn create_playlist(video_path: &str, playlist_file: &str) -> Result<Child> {
if Path::new(playlist_file).exists() {
debug!("Playlist already exists: {}", playlist_file);
return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
}
let result = Command::new("ffmpeg")
.arg("-i")
.arg(video_path)
.arg("-c:v")
.arg("h264")
.arg("-crf")
.arg("21")
.arg("-preset")
.arg("veryfast")
.arg("-hls_time")
.arg("3")
.arg("-hls_list_size")
.arg("100")
.arg("-vf")
.arg("scale=1080:-2,setsar=1:1")
.arg(playlist_file)
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn();
let start_time = std::time::Instant::now();
loop {
actix::clock::sleep(std::time::Duration::from_secs(1)).await;
if Path::new(playlist_file).exists()
|| std::time::Instant::now() - start_time > std::time::Duration::from_secs(5)
{
break;
}
}
result
}
pub fn generate_video_thumbnail(path: &Path, destination: &Path) {
Command::new("ffmpeg")
.arg("-ss")
.arg("3")
.arg("-i")
.arg(path.to_str().unwrap())
.arg("-vframes")
.arg("1")
.arg("-f")
.arg("image2")
.arg(destination)
.output()
.expect("Failure to create video frame");
}
pub struct VideoPlaylistManager {
playlist_dir: PathBuf,
playlist_generator: Addr<PlaylistGenerator>,
}
impl VideoPlaylistManager {
pub fn new<P: Into<PathBuf>>(
playlist_dir: P,
playlist_generator: Addr<PlaylistGenerator>,
) -> Self {
Self {
playlist_dir: playlist_dir.into(),
playlist_generator,
}
}
}
impl Actor for VideoPlaylistManager {
type Context = Context<Self>;
}
impl Handler<ScanDirectoryMessage> for VideoPlaylistManager {
type Result = ResponseFuture<()>;
fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result {
let tracer = global_tracer();
let mut span = tracer.start("videoplaylistmanager.scan_directory");
let start = std::time::Instant::now();
info!(
"Starting scan directory for video playlist generation: {}",
msg.directory
);
let video_files = WalkDir::new(&msg.directory)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.filter(is_video)
.collect::<Vec<DirEntry>>();
let scan_dir_name = msg.directory.clone();
let playlist_output_dir = self.playlist_dir.clone();
let playlist_generator = self.playlist_generator.clone();
Box::pin(async move {
for e in video_files {
let path = e.path();
let path_as_str = path.to_str().unwrap();
debug!(
"Sending generate playlist message for path: {}",
path_as_str
);
match playlist_generator
.send(GeneratePlaylistMessage {
playlist_path: playlist_output_dir.to_str().unwrap().to_string(),
video_path: PathBuf::from(path),
})
.await
.expect("Failed to send generate playlist message")
{
Ok(_) => {
span.add_event(
"Playlist generated",
vec![KeyValue::new("video_path", path_as_str.to_string())],
);
debug!(
"Successfully generated playlist for file: '{}'",
path_as_str
);
}
Err(e) => {
warn!("Failed to generate playlist for path '{:?}'. {:?}", path, e);
}
}
}
span.add_event(
"Finished directory scan",
vec![KeyValue::new("directory", scan_dir_name.to_string())],
);
info!(
"Finished directory scan of '{}' in {:?}",
scan_dir_name,
start.elapsed()
);
})
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct ScanDirectoryMessage {
pub(crate) directory: String,
}
#[derive(Message)]
#[rtype(result = "Result<()>")]
struct GeneratePlaylistMessage {
video_path: PathBuf,
playlist_path: String,
}
pub struct PlaylistGenerator {
semaphore: Arc<Semaphore>,
}
impl PlaylistGenerator {
pub(crate) fn new() -> Self {
PlaylistGenerator {
semaphore: Arc::new(Semaphore::new(2)),
}
}
}
impl Actor for PlaylistGenerator {
type Context = Context<Self>;
}
impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
type Result = ResponseFuture<Result<()>>;
fn handle(&mut self, msg: GeneratePlaylistMessage, _ctx: &mut Self::Context) -> Self::Result {
let video_file = msg.video_path.to_str().unwrap().to_owned();
let playlist_path = msg.playlist_path.as_str().to_owned();
let semaphore = self.semaphore.clone();
let playlist_file = format!(
"{}/{}.m3u8",
playlist_path,
msg.video_path.file_name().unwrap().to_str().unwrap()
);
let tracer = global_tracer();
let mut span = tracer
.span_builder("playlistgenerator.generate_playlist")
.with_attributes(vec![
KeyValue::new("video_file", video_file.clone()),
KeyValue::new("playlist_file", playlist_file.clone()),
])
.start(&tracer);
Box::pin(async move {
let wait_start = std::time::Instant::now();
let permit = semaphore
.acquire_owned()
.await
.expect("Unable to acquire semaphore");
debug!(
"Waited for {:?} before starting ffmpeg",
wait_start.elapsed()
);
span.add_event(
"Waited for FFMPEG semaphore",
vec![KeyValue::new(
"wait_time",
wait_start.elapsed().as_secs_f64(),
)],
);
if Path::new(&playlist_file).exists() {
debug!("Playlist already exists: {}", playlist_file);
span.set_status(Status::error(format!(
"Playlist already exists: {}",
playlist_file
)));
return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
}
tokio::spawn(async move {
let ffmpeg_result = tokio::process::Command::new("ffmpeg")
.arg("-i")
.arg(&video_file)
.arg("-c:v")
.arg("h264")
.arg("-crf")
.arg("21")
.arg("-preset")
.arg("veryfast")
.arg("-hls_time")
.arg("3")
.arg("-hls_list_size")
.arg("100")
.arg("-vf")
.arg("scale=1080:-2,setsar=1:1")
.arg(playlist_file)
.stdout(Stdio::null())
.stderr(Stdio::piped())
.output()
.inspect_err(|e| error!("Failed to run ffmpeg on child process: {}", e))
.map_err(|e| std::io::Error::other(e.to_string()))
.await;
// Hang on to the permit until we're done decoding and then explicitly drop
drop(permit);
if let Ok(ref res) = ffmpeg_result {
debug!("ffmpeg output: {:?}", res);
}
span.set_status(Status::Ok);
ffmpeg_result
});
Ok(())
})
}
}

View File

@@ -1,185 +0,0 @@
use futures::TryFutureExt;
use log::{debug, error, info, warn};
use std::io::Result;
use std::process::{Output, Stdio};
use std::time::Instant;
use tokio::process::Command;
pub struct Ffmpeg;
pub enum GifType {
Overview,
OverviewVideo { duration: u32 },
}
impl Ffmpeg {
async fn _generate_playlist(&self, input_file: &str, output_file: &str) -> Result<String> {
let ffmpeg_result: Result<Output> = Command::new("ffmpeg")
.arg("-i")
.arg(input_file)
.arg("-c:v")
.arg("h264")
.arg("-crf")
.arg("21")
.arg("-preset")
.arg("veryfast")
.arg("-hls_time")
.arg("3")
.arg("-hls_list_size")
.arg("100")
.arg("-vf")
.arg("scale=1080:-2,setsar=1:1")
.arg(output_file)
.stdout(Stdio::null())
.stderr(Stdio::piped())
.output()
.inspect_err(|e| error!("Failed to run ffmpeg on child process: {}", e))
.map_err(|e| std::io::Error::other(e.to_string()))
.await;
if let Ok(ref res) = ffmpeg_result {
debug!("ffmpeg output: {:?}", res);
}
ffmpeg_result.map(|_| output_file.to_string())
}
async fn get_video_duration(&self, input_file: &str) -> Result<u32> {
Command::new("ffprobe")
.args(["-i", input_file])
.args(["-show_entries", "format=duration"])
.args(["-v", "quiet"])
.args(["-of", "csv=p=0"])
.output()
.await
.map(|out| String::from_utf8_lossy(&out.stdout).trim().to_string())
.inspect(|duration| debug!("Found video duration: {:?}", duration))
.and_then(|duration| {
duration
.parse::<f32>()
.map(|duration| duration as u32)
.map_err(|e| std::io::Error::other(e.to_string()))
})
.inspect(|duration| debug!("Found video duration: {:?}", duration))
}
pub async fn generate_video_gif(
&self,
input_file: &str,
output_file: &str,
gif_type: GifType,
) -> Result<String> {
info!("Creating gif for: '{}'", input_file);
match gif_type {
GifType::Overview => {
let temp_dir = tempfile::tempdir()?;
let temp_path = temp_dir
.path()
.to_str()
.expect("Unable to make temp_dir a string");
match self
.get_video_duration(input_file)
.and_then(|duration| {
debug!("Creating gif frames for '{}'", input_file);
Command::new("ffmpeg")
.args(["-i", input_file])
.args(["-vf", &format!("fps=20/{}", duration)])
.args(["-q:v", "2"])
.stderr(Stdio::null())
.arg(format!("{}/frame_%03d.jpg", temp_path))
.status()
})
.and_then(|_| {
debug!("Generating palette");
Command::new("ffmpeg")
.args(["-i", &format!("{}/frame_%03d.jpg", temp_path)])
.args(["-vf", "palettegen"])
.arg(format!("{}/palette.png", temp_path))
.stderr(Stdio::null())
.status()
})
.and_then(|_| {
debug!("Creating gif for: '{}'", input_file);
self.create_gif_from_frames(temp_path, output_file)
})
.await
{
Ok(exit_code) => {
if exit_code == 0 {
info!("Created gif for '{}' -> '{}'", input_file, output_file);
} else {
warn!(
"Failed to create gif for '{}' with exit code: {}",
input_file, exit_code
);
}
}
Err(e) => {
error!("Error creating gif for '{}': {:?}", input_file, e);
}
}
}
GifType::OverviewVideo { duration } => {
let start = Instant::now();
match self
.get_video_duration(input_file)
.and_then(|input_duration| {
Command::new("ffmpeg")
.args(["-i", input_file])
.args([
"-vf",
// Grab 1 second of frames equally spaced to create a 'duration' second long video scaled to 720px on longest side
&format!(
"select='lt(mod(t,{}),1)',setpts=N/FRAME_RATE/TB,scale='if(gt(iw,ih),720,-2)':'if(gt(ih,iw),720,-2)",
input_duration / duration
),
])
.arg("-an")
.arg(output_file)
.status()
})
.await
{
Ok(out) => info!("Finished clip '{}' with code {:?} in {:?}", output_file, out.code(), start.elapsed()),
Err(e) => error!("Error creating video overview: {}", e),
}
}
}
Ok(output_file.to_string())
}
async fn create_gif_from_frames(&self, frame_base_dir: &str, output_file: &str) -> Result<i32> {
let output = Command::new("ffmpeg")
.arg("-y")
.args(["-framerate", "4"])
.args(["-i", &format!("{}/frame_%03d.jpg", frame_base_dir)])
.args(["-i", &format!("{}/palette.png", frame_base_dir)])
.args([
"-filter_complex",
// Scale to 480x480 with a center crop
"[0:v]scale=480:-1:flags=lanczos,crop='min(in_w,in_h)':'min(in_w,in_h)':(in_w-out_w)/2:(in_h-out_h)/2, paletteuse",
])
.args(["-loop", "0"]) // loop forever
.args(["-final_delay", "75"])
.arg(output_file)
.stderr(Stdio::piped()) // Change this to capture stderr
.stdout(Stdio::piped()) // Optionally capture stdout too
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
error!("FFmpeg error: {}", stderr);
let stdout = String::from_utf8_lossy(&output.stdout);
debug!("FFmpeg stdout: {}", stdout);
} else {
debug!("FFmpeg successful with exit code: {}", output.status);
}
Ok(output.status.code().unwrap_or(-1))
}
}

View File

@@ -1,66 +0,0 @@
use crate::otel::global_tracer;
use crate::video::ffmpeg::{Ffmpeg, GifType};
use crate::{is_video, update_media_counts};
use log::info;
use opentelemetry::trace::Tracer;
use std::fs;
use std::path::{Path, PathBuf};
use walkdir::WalkDir;
pub mod actors;
pub mod ffmpeg;
pub async fn generate_video_gifs() {
tokio::spawn(async {
info!("Starting to make video gifs");
let start = std::time::Instant::now();
let tracer = global_tracer();
tracer.start("creating video gifs");
let gif_base_path = &dotenv::var("GIFS_DIRECTORY").unwrap_or(String::from("gifs"));
let gif_directory: &Path = Path::new(gif_base_path);
fs::create_dir_all(gif_base_path).expect("There was an issue creating directory");
let files = PathBuf::from(dotenv::var("BASE_PATH").unwrap());
let ffmpeg = Ffmpeg;
for file in WalkDir::new(&files)
.into_iter()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.file_type().is_file())
.filter(is_video)
.filter(|entry| {
let path = entry.path();
let relative_path = &path.strip_prefix(&files).unwrap();
let thumb_path = Path::new(gif_directory).join(relative_path);
let gif_path = thumb_path.with_extension("gif");
!gif_path.exists()
})
{
let path = file.path();
let relative_path = &path.strip_prefix(&files).unwrap();
let gif_path = Path::new(gif_directory).join(relative_path);
let gif_path = gif_path.with_extension("gif");
if let Some(parent_dir) = gif_path.parent() {
fs::create_dir_all(parent_dir).unwrap_or_else(|_| {
panic!("There was an issue creating gif directory {:?}", gif_path)
});
}
info!("Generating gif for {:?}", path);
ffmpeg
.generate_video_gif(
path.to_str().unwrap(),
gif_path.to_str().unwrap(),
GifType::Overview,
)
.await
.expect("There was an issue generating the gif");
}
info!("Finished making video gifs in {:?}", start.elapsed());
update_media_counts(&files);
});
}