Files
ImageApi/src/main.rs
2025-12-01 13:04:55 -05:00

754 lines
25 KiB
Rust

#[macro_use]
extern crate diesel;
extern crate rayon;
use actix_web::web::Data;
use actix_web_prom::PrometheusMetricsBuilder;
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
use futures::stream::StreamExt;
use lazy_static::lazy_static;
use prometheus::{self, IntGauge};
use std::error::Error;
use std::sync::Mutex;
use std::sync::mpsc::channel;
use std::{collections::HashMap, io::prelude::*};
use std::{env, fs::File};
use std::{
io::ErrorKind,
path::{Path, PathBuf},
};
use walkdir::{DirEntry, WalkDir};
use actix_files::NamedFile;
use actix_multipart as mp;
use actix_web::{
App, HttpRequest, HttpResponse, HttpServer, Responder, delete, get, middleware, post, put,
web::{self, BufMut, BytesMut},
};
use anyhow::Context;
use chrono::Utc;
use diesel::sqlite::Sqlite;
use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use rayon::prelude::*;
use crate::auth::login;
use crate::data::*;
use crate::database::*;
use crate::files::{
RealFileSystem, RefreshThumbnailsMessage, is_image_or_video, is_valid_full_path, move_file,
};
use crate::otel::{extract_context_from_request, global_tracer};
use crate::service::ServiceBuilder;
use crate::state::AppState;
use crate::tags::*;
use crate::video::actors::{
ProcessMessage, ScanDirectoryMessage, create_playlist, generate_video_thumbnail,
};
use crate::video::generate_video_gifs;
use log::{debug, error, info, trace, warn};
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
use opentelemetry::{KeyValue, global};
mod auth;
mod data;
mod database;
mod error;
mod files;
mod state;
mod tags;
mod video;
mod memories;
mod otel;
mod service;
#[cfg(test)]
mod testhelpers;
lazy_static! {
static ref IMAGE_GAUGE: IntGauge = IntGauge::new(
"imageserver_image_total",
"Count of the images on the server"
)
.unwrap();
static ref VIDEO_GAUGE: IntGauge = IntGauge::new(
"imageserver_video_total",
"Count of the videos on the server"
)
.unwrap();
}
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
#[get("/image")]
async fn get_image(
_claims: Claims,
request: HttpRequest,
req: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_image", &context);
if let Some(path) = is_valid_full_path(&app_state.base_path, &req.path, false) {
let image_size = req.size.unwrap_or(PhotoSize::Full);
if image_size == PhotoSize::Thumb {
let relative_path = path
.strip_prefix(&app_state.base_path)
.expect("Error stripping base path prefix from thumbnail");
let thumbs = &app_state.thumbnail_path;
let mut thumb_path = Path::new(&thumbs).join(relative_path);
// If it's a video and GIF format is requested, try to serve GIF thumbnail
if req.format == Some(ThumbnailFormat::Gif) && is_video_file(&path) {
thumb_path = Path::new(&app_state.gif_path).join(relative_path);
thumb_path.set_extension("gif");
}
trace!("Thumbnail path: {:?}", thumb_path);
if let Ok(file) = NamedFile::open(&thumb_path) {
span.set_status(Status::Ok);
// The NamedFile will automatically set the correct content-type
return file.into_response(&request);
}
}
if let Ok(file) = NamedFile::open(&path) {
span.set_status(Status::Ok);
return file.into_response(&request);
}
span.set_status(Status::error("Not found"));
HttpResponse::NotFound().finish()
} else {
span.set_status(Status::error("Bad photos request"));
error!("Bad photos request: {}", req.path);
HttpResponse::BadRequest().finish()
}
}
fn is_video_file(path: &Path) -> bool {
if let Some(extension) = path.extension() {
matches!(
extension.to_str().unwrap_or("").to_lowercase().as_str(),
"mp4" | "mov" | "avi" | "mkv"
)
} else {
false
}
}
#[get("/image/metadata")]
async fn get_file_metadata(
_: Claims,
request: HttpRequest,
path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_file_metadata", &context);
match is_valid_full_path(&app_state.base_path, &path.path, false)
.ok_or_else(|| ErrorKind::InvalidData.into())
.and_then(File::open)
.and_then(|file| file.metadata())
{
Ok(metadata) => {
let response: MetadataResponse = metadata.into();
span.add_event(
"Metadata fetched",
vec![KeyValue::new("file", path.path.clone())],
);
span.set_status(Status::Ok);
HttpResponse::Ok().json(response)
}
Err(e) => {
let message = format!("Error getting metadata for file '{}': {:?}", path.path, e);
error!("{}", message);
span.set_status(Status::error(message));
HttpResponse::InternalServerError().finish()
}
}
}
#[post("/image")]
async fn upload_image(
_: Claims,
request: HttpRequest,
mut payload: mp::Multipart,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("upload_image", &context);
let mut file_content: BytesMut = BytesMut::new();
let mut file_name: Option<String> = None;
let mut file_path: Option<String> = None;
while let Some(Ok(mut part)) = payload.next().await {
if let Some(content_type) = part.content_disposition() {
debug!("{:?}", content_type);
if let Some(filename) = content_type.get_filename() {
debug!("Name: {:?}", filename);
file_name = Some(filename.to_string());
while let Some(Ok(data)) = part.next().await {
file_content.put(data);
}
} else if content_type.get_name() == Some("path") {
while let Some(Ok(data)) = part.next().await {
if let Ok(path) = std::str::from_utf8(&data) {
file_path = Some(path.to_string())
}
}
}
}
}
let path = file_path.unwrap_or_else(|| app_state.base_path.clone());
if !file_content.is_empty() {
let full_path = PathBuf::from(&path).join(file_name.unwrap());
if let Some(full_path) = is_valid_full_path(
&app_state.base_path,
&full_path.to_str().unwrap().to_string(),
true,
) {
let context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
tracer
.span_builder("file write")
.start_with_context(&tracer, &context);
if !full_path.is_file() && is_image_or_video(&full_path) {
let mut file = File::create(&full_path).unwrap();
file.write_all(&file_content).unwrap();
info!("Uploaded: {:?}", full_path);
} else {
warn!("File already exists: {:?}", full_path);
let new_path = format!(
"{}/{}_{}.{}",
full_path.parent().unwrap().to_str().unwrap(),
full_path.file_stem().unwrap().to_str().unwrap(),
Utc::now().timestamp(),
full_path
.extension()
.expect("Uploaded file should have an extension")
.to_str()
.unwrap()
);
info!("Uploaded: {}", new_path);
let mut file = File::create(new_path).unwrap();
file.write_all(&file_content).unwrap();
}
} else {
error!("Invalid path for upload: {:?}", full_path);
span.set_status(Status::error("Invalid path for upload"));
return HttpResponse::BadRequest().body("Path was not valid");
}
} else {
span.set_status(Status::error("No file body read"));
return HttpResponse::BadRequest().body("No file body read");
}
app_state.stream_manager.do_send(RefreshThumbnailsMessage);
span.set_status(Status::Ok);
HttpResponse::Ok().finish()
}
#[post("/video/generate")]
async fn generate_video(
_claims: Claims,
request: HttpRequest,
app_state: Data<AppState>,
body: web::Json<ThumbnailRequest>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("generate_video", &context);
let filename = PathBuf::from(&body.path);
if let Some(name) = filename.file_name() {
let filename = name.to_str().expect("Filename should convert to string");
let playlist = format!("{}/{}.m3u8", app_state.video_path, filename);
if let Some(path) = is_valid_full_path(&app_state.base_path, &body.path, false) {
if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await {
span.add_event(
"playlist_created".to_string(),
vec![KeyValue::new("playlist-name", filename.to_string())],
);
span.set_status(Status::Ok);
app_state.stream_manager.do_send(ProcessMessage(
playlist.clone(),
child,
// opentelemetry::Context::new().with_span(span),
));
}
} else {
span.set_status(Status::error(format!("invalid path {:?}", &body.path)));
return HttpResponse::BadRequest().finish();
}
HttpResponse::Ok().json(playlist)
} else {
let message = format!("Unable to get file name: {:?}", filename);
error!("{}", message);
span.set_status(Status::error(message));
HttpResponse::BadRequest().finish()
}
}
#[get("/video/stream")]
async fn stream_video(
request: HttpRequest,
_: Claims,
path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global::tracer("image-server");
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("stream_video", &context);
let playlist = &path.path;
debug!("Playlist: {}", playlist);
// Extract video playlist dir to dotenv
if !playlist.starts_with(&app_state.video_path)
&& is_valid_full_path(&app_state.base_path, playlist, false).is_some()
{
span.set_status(Status::error(format!("playlist not valid {}", playlist)));
HttpResponse::BadRequest().finish()
} else {
match NamedFile::open(playlist) {
Ok(file) => {
span.set_status(Status::Ok);
file.into_response(&request)
}
_ => {
span.set_status(Status::error(format!("playlist not found {}", playlist)));
HttpResponse::NotFound().finish()
}
}
}
}
#[get("/video/{path}")]
async fn get_video_part(
request: HttpRequest,
_: Claims,
path: web::Path<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_video_part", &context);
let part = &path.path;
debug!("Video part: {}", part);
let mut file_part = PathBuf::new();
file_part.push(app_state.video_path.clone());
file_part.push(part);
// TODO: Do we need to guard against directory attacks here?
match NamedFile::open(&file_part) {
Ok(file) => {
span.set_status(Status::Ok);
file.into_response(&request)
}
_ => {
error!("Video part not found: {:?}", file_part);
span.set_status(Status::error(format!(
"Video part not found '{}'",
file_part.to_str().unwrap()
)));
HttpResponse::NotFound().finish()
}
}
}
#[get("image/favorites")]
async fn favorites(
claims: Claims,
request: HttpRequest,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get favorites", &context);
match web::block(move || {
favorites_dao
.lock()
.expect("Unable to get FavoritesDao")
.get_favorites(claims.sub.parse::<i32>().unwrap())
})
.await
{
Ok(Ok(favorites)) => {
let favorites = favorites
.into_iter()
.map(|favorite| favorite.path)
.collect::<Vec<String>>();
span.set_status(Status::Ok);
HttpResponse::Ok().json(PhotosResponse {
photos: favorites,
dirs: Vec::new(),
})
}
Ok(Err(e)) => {
span.set_status(Status::error(format!("Error getting favorites: {:?}", e)));
error!("Error getting favorites: {:?}", e);
HttpResponse::InternalServerError().finish()
}
Err(_) => HttpResponse::InternalServerError().finish(),
}
}
#[put("image/favorites")]
async fn put_add_favorite(
claims: Claims,
body: web::Json<AddFavoriteRequest>,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
) -> impl Responder {
if let Ok(user_id) = claims.sub.parse::<i32>() {
let path = body.path.clone();
match web::block::<_, Result<usize, DbError>>(move || {
favorites_dao
.lock()
.expect("Unable to get FavoritesDao")
.add_favorite(user_id, &path)
})
.await
{
Ok(Err(e)) if e.kind == DbErrorKind::AlreadyExists => {
warn!("Favorite: {} exists for user: {}", &body.path, user_id);
HttpResponse::Ok()
}
Ok(Err(e)) => {
error!("{:?} {}. for user: {}", e, body.path, user_id);
HttpResponse::BadRequest()
}
Ok(Ok(_)) => {
info!("Adding favorite \"{}\" for userid: {}", body.path, user_id);
HttpResponse::Created()
}
Err(e) => {
error!("Blocking error while inserting favorite: {:?}", e);
HttpResponse::InternalServerError()
}
}
} else {
error!("Unable to parse sub as i32: {}", claims.sub);
HttpResponse::BadRequest()
}
}
#[delete("image/favorites")]
async fn delete_favorite(
claims: Claims,
body: web::Query<AddFavoriteRequest>,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
) -> impl Responder {
if let Ok(user_id) = claims.sub.parse::<i32>() {
let path = body.path.clone();
web::block(move || {
favorites_dao
.lock()
.expect("Unable to get favorites dao")
.remove_favorite(user_id, path);
})
.await
.unwrap();
info!(
"Removing favorite \"{}\" for userid: {}",
body.path, user_id
);
HttpResponse::Ok()
} else {
error!("Unable to parse sub as i32: {}", claims.sub);
HttpResponse::BadRequest()
}
}
fn create_thumbnails() {
let tracer = global_tracer();
let span = tracer.start("creating thumbnails");
let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
let thumbnail_directory: &Path = Path::new(thumbs);
let images = PathBuf::from(dotenv::var("BASE_PATH").unwrap());
WalkDir::new(&images)
.into_iter()
.collect::<Vec<Result<_, _>>>()
.into_par_iter()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.file_type().is_file())
.filter(|entry| {
if is_video(entry) {
let relative_path = &entry.path().strip_prefix(&images).unwrap();
let thumb_path = Path::new(thumbnail_directory).join(relative_path);
std::fs::create_dir_all(
thumb_path
.parent()
.unwrap_or_else(|| panic!("Thumbnail {:?} has no parent?", thumb_path)),
)
.expect("Error creating directory");
let mut video_span = tracer.start_with_context(
"generate_video_thumbnail",
&opentelemetry::Context::new()
.with_remote_span_context(span.span_context().clone()),
);
video_span.set_attributes(vec![
KeyValue::new("type", "video"),
KeyValue::new("file-name", thumb_path.display().to_string()),
]);
debug!("Generating video thumbnail: {:?}", thumb_path);
generate_video_thumbnail(entry.path(), &thumb_path);
video_span.end();
false
} else {
is_image(entry)
}
})
.filter(|entry| {
let path = entry.path();
let relative_path = &path.strip_prefix(&images).unwrap();
let thumb_path = Path::new(thumbnail_directory).join(relative_path);
!thumb_path.exists()
})
.map(|entry| (image::open(entry.path()), entry.path().to_path_buf()))
.filter(|(img, path)| {
if let Err(e) = img {
error!("Unable to open image: {:?}. {}", path, e);
}
img.is_ok()
})
.map(|(img, path)| (img.unwrap(), path))
.map(|(image, path)| (image.thumbnail(200, u32::MAX), path))
.map(|(image, path)| {
let relative_path = &path.strip_prefix(&images).unwrap();
let thumb_path = Path::new(thumbnail_directory).join(relative_path);
std::fs::create_dir_all(thumb_path.parent().unwrap())
.expect("There was an issue creating directory");
info!("Saving thumbnail: {:?}", thumb_path);
image.save(thumb_path).expect("Failure saving thumbnail");
})
.for_each(drop);
debug!("Finished making thumbnails");
update_media_counts(&images);
}
fn update_media_counts(media_dir: &Path) {
let mut image_count = 0;
let mut video_count = 0;
for ref entry in WalkDir::new(media_dir).into_iter().filter_map(|e| e.ok()) {
if is_image(entry) {
image_count += 1;
} else if is_video(entry) {
video_count += 1;
}
}
IMAGE_GAUGE.set(image_count);
VIDEO_GAUGE.set(video_count);
}
fn is_image(entry: &DirEntry) -> bool {
entry
.path()
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext.to_lowercase())
.map(|ext| ext == "jpg" || ext == "jpeg" || ext == "png" || ext == "nef")
.unwrap_or(false)
}
fn is_video(entry: &DirEntry) -> bool {
entry
.path()
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext.to_lowercase())
.map(|ext| ext == "mp4" || ext == "mov")
.unwrap_or(false)
}
fn main() -> std::io::Result<()> {
if let Err(err) = dotenv::dotenv() {
println!("Error parsing .env {:?}", err);
}
run_migrations(&mut connect()).expect("Failed to run migrations");
watch_files();
let system = actix::System::new();
system.block_on(async {
// Just use basic logger when running a non-release build
#[cfg(debug_assertions)]
{
env_logger::init();
}
#[cfg(not(debug_assertions))]
{
otel::init_logs();
otel::init_tracing();
}
create_thumbnails();
generate_video_gifs().await;
let app_data = Data::new(AppState::default());
let labels = HashMap::new();
let prometheus = PrometheusMetricsBuilder::new("api")
.const_labels(labels)
.build()
.expect("Unable to build prometheus metrics middleware");
prometheus
.registry
.register(Box::new(IMAGE_GAUGE.clone()))
.unwrap();
prometheus
.registry
.register(Box::new(VIDEO_GAUGE.clone()))
.unwrap();
let app_state = app_data.clone();
app_state.playlist_manager.do_send(ScanDirectoryMessage {
directory: app_state.base_path.clone(),
});
HttpServer::new(move || {
let user_dao = SqliteUserDao::new();
let favorites_dao = SqliteFavoriteDao::new();
let tag_dao = SqliteTagDao::default();
App::new()
.wrap(middleware::Logger::default())
.service(web::resource("/login").route(web::post().to(login::<SqliteUserDao>)))
.service(
web::resource("/photos")
.route(web::get().to(files::list_photos::<SqliteTagDao, RealFileSystem>)),
)
.service(web::resource("/file/move").post(move_file::<RealFileSystem>))
.service(get_image)
.service(upload_image)
.service(generate_video)
.service(stream_video)
.service(get_video_part)
.service(favorites)
.service(put_add_favorite)
.service(delete_favorite)
.service(get_file_metadata)
.service(memories::list_memories)
.add_feature(add_tag_services::<_, SqliteTagDao>)
.app_data(app_data.clone())
.app_data::<Data<RealFileSystem>>(Data::new(RealFileSystem::new(
app_data.base_path.clone(),
)))
.app_data::<Data<Mutex<SqliteUserDao>>>(Data::new(Mutex::new(user_dao)))
.app_data::<Data<Mutex<Box<dyn FavoriteDao>>>>(Data::new(Mutex::new(Box::new(
favorites_dao,
))))
.app_data::<Data<Mutex<SqliteTagDao>>>(Data::new(Mutex::new(tag_dao)))
.wrap(prometheus.clone())
})
.bind(dotenv::var("BIND_URL").unwrap())?
.bind("localhost:8088")?
.run()
.await
})
}
fn run_migrations(
connection: &mut impl MigrationHarness<Sqlite>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
connection.run_pending_migrations(MIGRATIONS)?;
Ok(())
}
fn watch_files() {
std::thread::spawn(|| {
let (wtx, wrx) = channel();
let mut watcher = RecommendedWatcher::new(wtx, Config::default()).unwrap();
let base_str = dotenv::var("BASE_PATH").unwrap();
let base_path = Path::new(&base_str);
watcher
.watch(base_path, RecursiveMode::Recursive)
.context(format!("Unable to watch BASE_PATH: '{}'", base_str))
.unwrap();
loop {
let ev = wrx.recv();
if let Ok(Ok(event)) = ev {
match event.kind {
EventKind::Create(create_kind) => {
info!(
"Creating thumbnails {:?} create event kind: {:?}",
event.paths, create_kind
);
create_thumbnails();
}
EventKind::Modify(kind) => {
debug!("All modified paths: {:?}", event.paths);
debug!("Modify kind: {:?}", kind);
if let Some(orig) = event.paths.first() {
let image_base_path = PathBuf::from(env::var("BASE_PATH").unwrap());
let image_relative = orig.strip_prefix(&image_base_path).unwrap();
if let Ok(old_thumbnail) =
env::var("THUMBNAILS").map(PathBuf::from).map(|mut base| {
base.push(image_relative);
base
})
{
if let Err(e) = std::fs::remove_file(&old_thumbnail) {
error!(
"Error removing thumbnail: {}\n{}",
old_thumbnail.display(),
e
);
} else {
info!("Deleted moved thumbnail: {}", old_thumbnail.display());
create_thumbnails();
}
}
}
}
EventKind::Remove(_) => {
update_media_counts(&PathBuf::from(env::var("BASE_PATH").unwrap()))
}
_ => {}
}
};
}
});
}