Merge branch 'master' into feature/video-gifs

# Conflicts:
#	Cargo.toml
#	src/files.rs
#	src/main.rs
This commit is contained in:
Cameron
2025-05-17 15:53:34 -04:00
7 changed files with 910 additions and 51 deletions

View File

@@ -106,6 +106,8 @@ pub enum SortType {
Shuffle,
NameAsc,
NameDesc,
TagCountAsc,
TagCountDesc,
}
#[derive(Deserialize)]

View File

@@ -21,7 +21,7 @@ use crate::{create_thumbnails, AppState};
use crate::data::SortType::NameAsc;
use crate::error::IntoHttpError;
use crate::tags::TagDao;
use crate::tags::{FileWithTagCount, TagDao};
use crate::video::actors::StreamActor;
use path_absolutize::*;
use rand::prelude::SliceRandom;
@@ -68,6 +68,13 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
"Failed to get files with tag_ids: {:?} with filter_mode: {:?}",
tag_ids, filter_mode
))
.inspect(|files| {
debug!(
"Found {:?} tagged files, filtering down by search path {:?}",
files.len(),
search_path
)
})
.map(|tagged_files| {
tagged_files
.into_iter()
@@ -77,12 +84,12 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
return true;
}
f.starts_with(&format!(
f.file_name.starts_with(&format!(
"{}/",
search_path.strip_suffix('/').unwrap_or_else(|| search_path)
))
})
.collect::<Vec<String>>()
.collect::<Vec<FileWithTagCount>>()
})
.map(|files| sort(files, req.sort.unwrap_or(NameAsc)))
.inspect(|files| debug!("Found {:?} files", files.len()))
@@ -106,7 +113,7 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
if let Ok(files) = file_system.get_files_for_path(search_path) {
debug!("Valid search path: {:?}", search_path);
let mut photos = files
let photos = files
.iter()
.filter(|&f| {
f.metadata().map_or_else(
@@ -122,8 +129,14 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
relative.to_path_buf()
})
.map(|f| f.to_str().unwrap().to_string())
.filter(|file_path| {
if let (Some(tag_ids), Ok(mut tag_dao)) = (&req.tag_ids, tag_dao.lock()) {
.map(|file_name| {
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
let file_tags = tag_dao.get_tags_for_path(&file_name).unwrap_or_default();
(file_name, file_tags)
})
.filter(|(_, file_tags)| {
if let Some(tag_ids) = &req.tag_ids {
let tag_ids = tag_ids
.split(',')
.filter_map(|t| t.parse().ok())
@@ -138,7 +151,6 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
.collect::<Vec<i32>>();
let filter_mode = &req.tag_filter_mode.unwrap_or(FilterMode::Any);
let file_tags = tag_dao.get_tags_for_path(file_path).unwrap_or_default();
let excluded = file_tags.iter().any(|t| excluded_tag_ids.contains(&t.id));
return !excluded
@@ -152,11 +164,20 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
true
})
.collect::<Vec<String>>();
.map(|(file_name, tags)| FileWithTagCount {
file_name,
tag_count: tags.len() as i64,
})
.collect::<Vec<FileWithTagCount>>();
let mut response_files = photos
.clone()
.into_iter()
.map(|f| f.file_name)
.collect::<Vec<String>>();
if let Some(sort_type) = req.sort {
debug!("Sorting files: {:?}", sort_type);
photos = sort(photos, sort_type)
response_files = sort(photos, sort_type)
}
let dirs = files
@@ -169,25 +190,37 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
.map(|f| f.to_str().unwrap().to_string())
.collect::<Vec<String>>();
HttpResponse::Ok().json(PhotosResponse { photos, dirs })
HttpResponse::Ok().json(PhotosResponse {
photos: response_files,
dirs,
})
} else {
error!("Bad photos request: {}", req.path);
HttpResponse::BadRequest().finish()
}
}
fn sort(mut files: Vec<String>, sort_type: SortType) -> Vec<String> {
fn sort(mut files: Vec<FileWithTagCount>, sort_type: SortType) -> Vec<String> {
match sort_type {
SortType::Shuffle => files.shuffle(&mut thread_rng()),
SortType::NameAsc => {
files.sort();
files.sort_by(|l, r| l.file_name.cmp(&r.file_name));
}
SortType::NameDesc => {
files.sort_by(|l, r| r.cmp(l));
files.sort_by(|l, r| r.file_name.cmp(&l.file_name));
}
SortType::TagCountAsc => {
files.sort_by(|l, r| l.tag_count.cmp(&r.tag_count));
}
SortType::TagCountDesc => {
files.sort_by(|l, r| r.tag_count.cmp(&l.tag_count));
}
}
files
.iter()
.map(|f| f.file_name.clone())
.collect::<Vec<String>>()
}
pub fn list_files(dir: &Path) -> io::Result<Vec<PathBuf>> {

View File

@@ -32,20 +32,22 @@ use diesel::sqlite::Sqlite;
use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use rayon::prelude::*;
use log::{debug, error, info, trace, warn};
use crate::auth::login;
use crate::data::*;
use crate::database::*;
use crate::files::{
is_image_or_video, is_valid_full_path, move_file, RealFileSystem, RefreshThumbnailsMessage,
};
use crate::otel::global_tracer;
use crate::service::ServiceBuilder;
use crate::state::AppState;
use crate::tags::*;
use crate::video::actors::{
create_playlist, generate_video_thumbnail, ProcessMessage, ScanDirectoryMessage,
};
use log::{debug, error, info, trace, warn};
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
use opentelemetry::{global, KeyValue};
mod auth;
mod data;
@@ -56,6 +58,7 @@ mod state;
mod tags;
mod video;
mod otel;
mod service;
#[cfg(test)]
mod testhelpers;
@@ -115,6 +118,8 @@ async fn get_file_metadata(
path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let mut span = tracer.start("get_file_metadata");
match is_valid_full_path(&app_state.base_path, &path.path, false)
.ok_or_else(|| ErrorKind::InvalidData.into())
.and_then(File::open)
@@ -122,10 +127,19 @@ async fn get_file_metadata(
{
Ok(metadata) => {
let response: MetadataResponse = metadata.into();
span.add_event(
"Metadata fetched",
vec![KeyValue::new("file", path.path.clone())],
);
span.set_status(Status::Ok);
HttpResponse::Ok().json(response)
}
Err(e) => {
error!("Error getting metadata for file '{}': {:?}", path.path, e);
let message = format!("Error getting metadata for file '{}': {:?}", path.path, e);
error!("{}", message);
span.set_status(Status::error(message));
HttpResponse::InternalServerError().finish()
}
}
@@ -137,6 +151,9 @@ async fn upload_image(
mut payload: mp::Multipart,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global_tracer();
let mut span = tracer.start("upload_image");
let mut file_content: BytesMut = BytesMut::new();
let mut file_name: Option<String> = None;
let mut file_path: Option<String> = None;
@@ -169,6 +186,12 @@ async fn upload_image(
&full_path.to_str().unwrap().to_string(),
true,
) {
let context =
opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
tracer
.span_builder("file write")
.start_with_context(&tracer, &context);
if !full_path.is_file() && is_image_or_video(&full_path) {
let mut file = File::create(&full_path).unwrap();
file.write_all(&file_content).unwrap();
@@ -195,13 +218,16 @@ async fn upload_image(
}
} else {
error!("Invalid path for upload: {:?}", full_path);
span.set_status(Status::error("Invalid path for upload"));
return HttpResponse::BadRequest().body("Path was not valid");
}
} else {
span.set_status(Status::error("No file body read"));
return HttpResponse::BadRequest().body("No file body read");
}
app_state.stream_manager.do_send(RefreshThumbnailsMessage);
span.set_status(Status::Ok);
HttpResponse::Ok().finish()
}
@@ -212,6 +238,9 @@ async fn generate_video(
app_state: Data<AppState>,
body: web::Json<ThumbnailRequest>,
) -> impl Responder {
let tracer = global_tracer();
let mut span = tracer.start("generate_video");
let filename = PathBuf::from(&body.path);
if let Some(name) = filename.file_name() {
@@ -219,17 +248,29 @@ async fn generate_video(
let playlist = format!("{}/{}.m3u8", app_state.video_path, filename);
if let Some(path) = is_valid_full_path(&app_state.base_path, &body.path, false) {
if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await {
app_state
.stream_manager
.do_send(ProcessMessage(playlist.clone(), child));
span.add_event(
"playlist_created".to_string(),
vec![KeyValue::new("playlist-name", filename.to_string())],
);
span.set_status(Status::Ok);
app_state.stream_manager.do_send(ProcessMessage(
playlist.clone(),
child,
// opentelemetry::Context::new().with_span(span),
));
}
} else {
span.set_status(Status::error(format!("invalid path {:?}", &body.path)));
return HttpResponse::BadRequest().finish();
}
HttpResponse::Ok().json(playlist)
} else {
error!("Unable to get file name: {:?}", filename);
let message = format!("Unable to get file name: {:?}", filename);
error!("{}", message);
span.set_status(Status::error(message));
HttpResponse::BadRequest().finish()
}
}
@@ -241,6 +282,9 @@ async fn stream_video(
path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global::tracer("image-server");
let mut span = tracer.start("stream_video");
let playlist = &path.path;
debug!("Playlist: {}", playlist);
@@ -248,10 +292,14 @@ async fn stream_video(
if !playlist.starts_with(&app_state.video_path)
&& is_valid_full_path(&app_state.base_path, playlist, false).is_some()
{
span.set_status(Status::error(format!("playlist not valid {}", playlist)));
HttpResponse::BadRequest().finish()
} else if let Ok(file) = NamedFile::open(playlist) {
span.set_status(Status::Ok);
file.into_response(&request)
} else {
span.set_status(Status::error(format!("playlist not found {}", playlist)));
HttpResponse::NotFound().finish()
}
}
@@ -263,6 +311,9 @@ async fn get_video_part(
path: web::Path<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let tracer = global::tracer("image-server");
let mut span = tracer.start("get_video_part");
let part = &path.path;
debug!("Video part: {}", part);
@@ -271,9 +322,14 @@ async fn get_video_part(
file_part.push(part);
// TODO: Do we need to guard against directory attacks here?
if let Ok(file) = NamedFile::open(&file_part) {
span.set_status(Status::Ok);
file.into_response(&request)
} else {
error!("Video part not found: {:?}", file_part);
span.set_status(Status::error(format!(
"Video part not found '{}'",
file_part.to_str().unwrap()
)));
HttpResponse::NotFound().finish()
}
}
@@ -378,6 +434,9 @@ async fn delete_favorite(
}
fn create_thumbnails() {
let tracer = global_tracer();
let span = tracer.start("creating thumbnails");
let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
let thumbnail_directory: &Path = Path::new(thumbs);
@@ -400,8 +459,19 @@ fn create_thumbnails() {
)
.expect("Error creating directory");
let mut video_span = tracer.start_with_context(
"generate_video_thumbnail",
&opentelemetry::Context::new()
.with_remote_span_context(span.span_context().clone()),
);
video_span.set_attributes(vec![
KeyValue::new("type", "video"),
KeyValue::new("file-name", thumb_path.display().to_string()),
]);
debug!("Generating video thumbnail: {:?}", thumb_path);
generate_video_thumbnail(entry.path(), &thumb_path);
video_span.end();
false
} else {
is_image(entry)
@@ -476,15 +546,26 @@ fn main() -> std::io::Result<()> {
if let Err(err) = dotenv::dotenv() {
println!("Error parsing .env {:?}", err);
}
env_logger::init();
run_migrations(&mut connect()).expect("Failed to run migrations");
create_thumbnails();
watch_files();
let system = actix::System::new();
system.block_on(async {
// Just use basic logger when running a non-release build
#[cfg(debug_assertions)]
{
env_logger::init();
}
#[cfg(not(debug_assertions))]
{
otel::init_logs();
otel::init_tracing();
}
create_thumbnails();
let app_data = Data::new(AppState::default());
let labels = HashMap::new();
@@ -503,11 +584,9 @@ fn main() -> std::io::Result<()> {
.unwrap();
let app_state = app_data.clone();
app_state
.playlist_manager
.do_send(ScanDirectoryMessage {
directory: app_state.base_path.clone(),
});
app_state.playlist_manager.do_send(ScanDirectoryMessage {
directory: app_state.base_path.clone(),
});
HttpServer::new(move || {
let user_dao = SqliteUserDao::new();

61
src/otel.rs Normal file
View File

@@ -0,0 +1,61 @@
use opentelemetry::global::BoxedTracer;
use opentelemetry::{global, KeyValue};
use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs::{BatchLogProcessor, SdkLoggerProvider};
use opentelemetry_sdk::Resource;
pub fn global_tracer() -> BoxedTracer {
global::tracer("image-server")
}
pub fn init_tracing() {
let resources = Resource::builder()
.with_attributes([
KeyValue::new("service.name", "image-server"),
//TODO: Get this from somewhere
KeyValue::new("service.version", "1.0"),
])
.build();
let span_exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(std::env::var("OTLP_OTLS_ENDPOINT").unwrap())
.build()
.unwrap();
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_batch_exporter(span_exporter)
.with_resource(resources)
.build();
global::set_tracer_provider(tracer_provider);
}
pub fn init_logs() {
let otlp_exporter = opentelemetry_otlp::LogExporter::builder()
.with_tonic()
.with_endpoint(std::env::var("OTLP_OTLS_ENDPOINT").unwrap())
.build()
.unwrap();
let exporter = opentelemetry_stdout::LogExporter::default();
let resources = Resource::builder()
.with_attributes([
KeyValue::new("service.name", "image-server"),
KeyValue::new("service.version", "1.0"),
])
.build();
let log_provider = SdkLoggerProvider::builder()
.with_log_processor(BatchLogProcessor::builder(exporter).build())
.with_log_processor(BatchLogProcessor::builder(otlp_exporter).build())
.with_resource(resources)
.build();
let otel_log_appender = OpenTelemetryLogBridge::new(&log_provider);
log::set_boxed_logger(Box::new(otel_log_appender)).expect("Unable to set boxed logger");
//TODO: Still set this with the env? Ideally we still have a clean/simple local logger for local dev
log::set_max_level(log::LevelFilter::Info);
}

View File

@@ -202,12 +202,12 @@ pub trait TagDao {
&mut self,
tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>,
) -> anyhow::Result<Vec<String>>;
) -> anyhow::Result<Vec<FileWithTagCount>>;
fn get_files_with_any_tag_ids(
&mut self,
tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>,
) -> anyhow::Result<Vec<String>>;
) -> anyhow::Result<Vec<FileWithTagCount>>;
}
pub struct SqliteTagDao {
@@ -277,7 +277,7 @@ impl TagDao for SqliteTagDao {
.and_then(|_| {
info!("Inserted tag: {:?}", name);
define_sql_function! {
fn last_insert_rowid() -> diesel::sql_types::Integer;
fn last_insert_rowid() -> Integer;
}
diesel::select(last_insert_rowid())
.get_result::<i32>(&mut self.connection)
@@ -353,7 +353,7 @@ impl TagDao for SqliteTagDao {
&mut self,
tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>,
) -> anyhow::Result<Vec<String>> {
) -> anyhow::Result<Vec<FileWithTagCount>> {
use diesel::dsl::*;
let exclude_subquery = tagged_photo::table
@@ -365,10 +365,21 @@ impl TagDao for SqliteTagDao {
.filter(tagged_photo::tag_id.eq_any(tag_ids.clone()))
.filter(tagged_photo::photo_name.ne_all(exclude_subquery))
.group_by(tagged_photo::photo_name)
.select((tagged_photo::photo_name, count(tagged_photo::tag_id)))
.select((
tagged_photo::photo_name,
count_distinct(tagged_photo::tag_id),
))
.having(count_distinct(tagged_photo::tag_id).ge(tag_ids.len() as i64))
.select(tagged_photo::photo_name)
.get_results::<String>(&mut self.connection)
.get_results::<(String, i64)>(&mut self.connection)
.map(|results| {
results
.into_iter()
.map(|(file_name, tag_count)| FileWithTagCount {
file_name,
tag_count,
})
.collect()
})
.with_context(|| format!("Unable to get Tagged photos with ids: {:?}", tag_ids))
}
@@ -376,7 +387,7 @@ impl TagDao for SqliteTagDao {
&mut self,
tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>,
) -> anyhow::Result<Vec<String>> {
) -> anyhow::Result<Vec<FileWithTagCount>> {
use diesel::dsl::*;
let exclude_subquery = tagged_photo::table
@@ -388,9 +399,20 @@ impl TagDao for SqliteTagDao {
.filter(tagged_photo::tag_id.eq_any(tag_ids.clone()))
.filter(tagged_photo::photo_name.ne_all(exclude_subquery))
.group_by(tagged_photo::photo_name)
.select((tagged_photo::photo_name, count(tagged_photo::tag_id)))
.select(tagged_photo::photo_name)
.get_results::<String>(&mut self.connection)
.select((
tagged_photo::photo_name,
count_distinct(tagged_photo::tag_id),
))
.get_results::<(String, i64)>(&mut self.connection)
.map(|results| {
results
.into_iter()
.map(|(file_name, tag_count)| FileWithTagCount {
file_name,
tag_count,
})
.collect()
})
.with_context(|| format!("Unable to get Tagged photos with ids: {:?}", tag_ids))
}
}
@@ -517,7 +539,7 @@ mod tests {
&mut self,
tag_ids: Vec<i32>,
_exclude_tag_ids: Vec<i32>,
) -> anyhow::Result<Vec<String>> {
) -> anyhow::Result<Vec<FileWithTagCount>> {
todo!()
}
@@ -525,7 +547,7 @@ mod tests {
&mut self,
_tag_ids: Vec<i32>,
_exclude_tag_ids: Vec<i32>,
) -> anyhow::Result<Vec<String>> {
) -> anyhow::Result<Vec<FileWithTagCount>> {
todo!()
}
}
@@ -607,3 +629,9 @@ mod tests {
);
}
}
#[derive(Debug, Clone)]
pub struct FileWithTagCount {
pub file_name: String,
pub tag_count: i64,
}