Merge branch 'master' into feature/video-gifs

This commit is contained in:
Cameron
2025-06-16 21:06:38 -04:00
7 changed files with 569 additions and 246 deletions

23
Cargo.lock generated
View File

@@ -908,9 +908,9 @@ dependencies = [
[[package]] [[package]]
name = "diesel" name = "diesel"
version = "2.2.5" version = "2.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbf9649c05e0a9dbd6d0b0b8301db5182b972d0fd02f0a7c6736cf632d7c0fd5" checksum = "ff3e1edb1f37b4953dd5176916347289ed43d7119cc2e6c7c3f7849ff44ea506"
dependencies = [ dependencies = [
"diesel_derives", "diesel_derives",
"libsqlite3-sys", "libsqlite3-sys",
@@ -1624,7 +1624,6 @@ dependencies = [
"opentelemetry", "opentelemetry",
"opentelemetry-appender-log", "opentelemetry-appender-log",
"opentelemetry-otlp", "opentelemetry-otlp",
"opentelemetry-resource-detectors",
"opentelemetry-stdout", "opentelemetry-stdout",
"opentelemetry_sdk", "opentelemetry_sdk",
"path-absolutize", "path-absolutize",
@@ -1633,7 +1632,6 @@ dependencies = [
"rayon", "rayon",
"serde", "serde",
"serde_json", "serde_json",
"tempfile",
"tokio", "tokio",
"walkdir", "walkdir",
] ]
@@ -2176,23 +2174,6 @@ dependencies = [
"tonic", "tonic",
] ]
[[package]]
name = "opentelemetry-resource-detectors"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0cd3cf373f6f7f3a8f25a189acf1300c8b87e85f7959b45ba83c01e305f5cc3"
dependencies = [
"opentelemetry",
"opentelemetry-semantic-conventions",
"opentelemetry_sdk",
]
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fb3a2f78c2d55362cd6c313b8abedfbc0142ab3c2676822068fd2ab7d51f9b7"
[[package]] [[package]]
name = "opentelemetry-stdout" name = "opentelemetry-stdout"
version = "0.28.0" version = "0.28.0"

View File

@@ -20,7 +20,7 @@ futures = "0.3.5"
jsonwebtoken = "9.3.0" jsonwebtoken = "9.3.0"
serde = "1" serde = "1"
serde_json = "1" serde_json = "1"
diesel = { version = "2.2.5", features = ["sqlite"] } diesel = { version = "2.2.10", features = ["sqlite"] }
diesel_migrations = "2.2.0" diesel_migrations = "2.2.0"
chrono = "0.4" chrono = "0.4"
dotenv = "0.15" dotenv = "0.15"
@@ -37,10 +37,8 @@ prometheus = "0.13"
lazy_static = "1.5" lazy_static = "1.5"
anyhow = "1.0" anyhow = "1.0"
rand = "0.8.5" rand = "0.8.5"
tempfile = "3.14.0"
opentelemetry = { version = "0.28.0", features = ["default", "metrics", "tracing"] } opentelemetry = { version = "0.28.0", features = ["default", "metrics", "tracing"] }
opentelemetry_sdk = { version = "0.28.0", features = ["default", "rt-tokio-current-thread", "tracing", "metrics"] } opentelemetry_sdk = { version = "0.28.0", features = ["default", "rt-tokio-current-thread", "tracing", "metrics"] }
opentelemetry-otlp = { version = "0.28.0", features = ["default", "metrics", "tracing", "grpc-tonic"] } opentelemetry-otlp = { version = "0.28.0", features = ["default", "metrics", "tracing", "grpc-tonic"] }
opentelemetry-stdout = "0.28.0" opentelemetry-stdout = "0.28.0"
opentelemetry-appender-log = "0.28.0" opentelemetry-appender-log = "0.28.0"
opentelemetry-resource-detectors = "0.7.0"

View File

@@ -9,18 +9,20 @@ use ::anyhow;
use actix::{Handler, Message}; use actix::{Handler, Message};
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use crate::data::{Claims, FilesRequest, FilterMode, PhotosResponse, SortType};
use crate::{create_thumbnails, AppState};
use actix_web::web::Data; use actix_web::web::Data;
use actix_web::{ use actix_web::{
web::{self, Query}, web::{self, Query},
HttpResponse, HttpRequest, HttpResponse,
}; };
use log::{debug, error, info, trace}; use log::{debug, error, info, trace};
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
use crate::data::{Claims, FilesRequest, FilterMode, PhotosResponse, SortType}; use opentelemetry::KeyValue;
use crate::{create_thumbnails, AppState};
use crate::data::SortType::NameAsc; use crate::data::SortType::NameAsc;
use crate::error::IntoHttpError; use crate::error::IntoHttpError;
use crate::otel::{extract_context_from_request, global_tracer};
use crate::tags::{FileWithTagCount, TagDao}; use crate::tags::{FileWithTagCount, TagDao};
use crate::video::actors::StreamActor; use crate::video::actors::StreamActor;
use path_absolutize::*; use path_absolutize::*;
@@ -30,6 +32,7 @@ use serde::Deserialize;
pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>( pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
_: Claims, _: Claims,
request: HttpRequest,
req: Query<FilesRequest>, req: Query<FilesRequest>,
app_state: web::Data<AppState>, app_state: web::Data<AppState>,
file_system: web::Data<FS>, file_system: web::Data<FS>,
@@ -37,11 +40,34 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
) -> HttpResponse { ) -> HttpResponse {
let search_path = &req.path; let search_path = &req.path;
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("list_photos", &context);
span.set_attributes(vec![
KeyValue::new("path", search_path.to_string()),
KeyValue::new("recursive", req.recursive.unwrap_or(false).to_string()),
KeyValue::new(
"tag_ids",
req.tag_ids.clone().unwrap_or_default().to_string(),
),
KeyValue::new(
"tag_filter_mode",
format!("{:?}", req.tag_filter_mode.unwrap_or(FilterMode::Any)),
),
KeyValue::new(
"exclude_tag_ids",
req.exclude_tag_ids.clone().unwrap_or_default().to_string(),
),
KeyValue::new("sort", format!("{:?}", &req.sort.unwrap_or(NameAsc))),
]);
let span_context = opentelemetry::Context::current_with_span(span);
let search_recursively = req.recursive.unwrap_or(false); let search_recursively = req.recursive.unwrap_or(false);
if let Some(tag_ids) = &req.tag_ids { if let Some(tag_ids) = &req.tag_ids {
if search_recursively { if search_recursively {
let filter_mode = &req.tag_filter_mode.unwrap_or(FilterMode::Any); let filter_mode = &req.tag_filter_mode.unwrap_or(FilterMode::Any);
debug!( info!(
"Searching for tags: {}. With path: '{}' and filter mode: {:?}", "Searching for tags: {}. With path: '{}' and filter mode: {:?}",
tag_ids, search_path, filter_mode tag_ids, search_path, filter_mode
); );
@@ -61,15 +87,19 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
.collect::<Vec<i32>>(); .collect::<Vec<i32>>();
return match filter_mode { return match filter_mode {
FilterMode::Any => dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids), FilterMode::Any => {
FilterMode::All => dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids), dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context)
}
FilterMode::All => {
dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context)
}
} }
.context(format!( .context(format!(
"Failed to get files with tag_ids: {:?} with filter_mode: {:?}", "Failed to get files with tag_ids: {:?} with filter_mode: {:?}",
tag_ids, filter_mode tag_ids, filter_mode
)) ))
.inspect(|files| { .inspect(|files| {
debug!( info!(
"Found {:?} tagged files, filtering down by search path {:?}", "Found {:?} tagged files, filtering down by search path {:?}",
files.len(), files.len(),
search_path search_path
@@ -94,11 +124,15 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
.map(|files| sort(files, req.sort.unwrap_or(NameAsc))) .map(|files| sort(files, req.sort.unwrap_or(NameAsc)))
.inspect(|files| debug!("Found {:?} files", files.len())) .inspect(|files| debug!("Found {:?} files", files.len()))
.map(|tagged_files: Vec<String>| { .map(|tagged_files: Vec<String>| {
trace!( info!(
"Found {:?} tagged files: {:?}", "Found {:?} tagged files: {:?}",
tagged_files.len(), tagged_files.len(),
tagged_files tagged_files
); );
span_context
.span()
.set_attribute(KeyValue::new("file_count", tagged_files.len().to_string()));
span_context.span().set_status(Status::Ok);
HttpResponse::Ok().json(PhotosResponse { HttpResponse::Ok().json(PhotosResponse {
photos: tagged_files, photos: tagged_files,
@@ -111,7 +145,7 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
} }
if let Ok(files) = file_system.get_files_for_path(search_path) { if let Ok(files) = file_system.get_files_for_path(search_path) {
debug!("Valid search path: {:?}", search_path); info!("Found {:?} files in path: {:?}", files.len(), search_path);
let photos = files let photos = files
.iter() .iter()
@@ -131,7 +165,9 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
.map(|f| f.to_str().unwrap().to_string()) .map(|f| f.to_str().unwrap().to_string())
.map(|file_name| { .map(|file_name| {
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
let file_tags = tag_dao.get_tags_for_path(&file_name).unwrap_or_default(); let file_tags = tag_dao
.get_tags_for_path(&span_context, &file_name)
.unwrap_or_default();
(file_name, file_tags) (file_name, file_tags)
}) })
@@ -190,12 +226,20 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
.map(|f| f.to_str().unwrap().to_string()) .map(|f| f.to_str().unwrap().to_string())
.collect::<Vec<String>>(); .collect::<Vec<String>>();
span_context
.span()
.set_attribute(KeyValue::new("file_count", files.len().to_string()));
span_context.span().set_status(Status::Ok);
HttpResponse::Ok().json(PhotosResponse { HttpResponse::Ok().json(PhotosResponse {
photos: response_files, photos: response_files,
dirs, dirs,
}) })
} else { } else {
error!("Bad photos request: {}", req.path); error!("Bad photos request: {}", req.path);
span_context
.span()
.set_status(Status::error("Invalid path"));
HttpResponse::BadRequest().finish() HttpResponse::BadRequest().finish()
} }
} }
@@ -224,12 +268,21 @@ fn sort(mut files: Vec<FileWithTagCount>, sort_type: SortType) -> Vec<String> {
} }
pub fn list_files(dir: &Path) -> io::Result<Vec<PathBuf>> { pub fn list_files(dir: &Path) -> io::Result<Vec<PathBuf>> {
let tracer = global_tracer();
let mut span = tracer.start("list_files");
let dir_name_string = dir.to_str().unwrap_or_default().to_string();
span.set_attribute(KeyValue::new("dir", dir_name_string));
info!("Listing files in: {:?}", dir);
let files = read_dir(dir)? let files = read_dir(dir)?
.filter_map(|res| res.ok()) .filter_map(|res| res.ok())
.filter(|entry| is_image_or_video(&entry.path()) || entry.file_type().unwrap().is_dir()) .filter(|entry| is_image_or_video(&entry.path()) || entry.file_type().unwrap().is_dir())
.map(|entry| entry.path()) .map(|entry| entry.path())
.collect::<Vec<PathBuf>>(); .collect::<Vec<PathBuf>>();
span.set_attribute(KeyValue::new("file_count", files.len().to_string()));
span.set_status(Status::Ok);
info!("Found {:?} files in directory: {:?}", files.len(), dir);
Ok(files) Ok(files)
} }
@@ -245,6 +298,7 @@ pub fn is_image_or_video(path: &Path) -> bool {
|| extension == "mp4" || extension == "mp4"
|| extension == "mov" || extension == "mov"
|| extension == "nef" || extension == "nef"
|| extension == "webp"
} }
pub fn is_valid_full_path<P: AsRef<Path> + Debug + AsRef<std::ffi::OsStr>>( pub fn is_valid_full_path<P: AsRef<Path> + Debug + AsRef<std::ffi::OsStr>>(
@@ -301,6 +355,8 @@ pub async fn move_file<FS: FileSystemAccess>(
app_state: Data<AppState>, app_state: Data<AppState>,
request: web::Json<MoveFileRequest>, request: web::Json<MoveFileRequest>,
) -> HttpResponse { ) -> HttpResponse {
info!("Moving file: {:?}", request);
match is_valid_full_path(&app_state.base_path, &request.source, false) match is_valid_full_path(&app_state.base_path, &request.source, false)
.ok_or(ErrorKind::InvalidData) .ok_or(ErrorKind::InvalidData)
.and_then(|source| { .and_then(|source| {
@@ -340,7 +396,7 @@ pub async fn move_file<FS: FileSystemAccess>(
} }
} }
#[derive(Deserialize)] #[derive(Deserialize, Debug)]
pub struct MoveFileRequest { pub struct MoveFileRequest {
source: String, source: String,
destination: String, destination: String,
@@ -372,6 +428,11 @@ impl FileSystemAccess for RealFileSystem {
} }
fn move_file<P: AsRef<Path>>(&self, from: P, destination: P) -> anyhow::Result<()> { fn move_file<P: AsRef<Path>>(&self, from: P, destination: P) -> anyhow::Result<()> {
info!(
"Moving file: '{:?}' -> '{:?}'",
from.as_ref(),
destination.as_ref()
);
let name = from let name = from
.as_ref() .as_ref()
.file_name() .file_name()
@@ -393,6 +454,8 @@ impl Handler<RefreshThumbnailsMessage> for StreamActor {
type Result = (); type Result = ();
fn handle(&mut self, _msg: RefreshThumbnailsMessage, _ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, _msg: RefreshThumbnailsMessage, _ctx: &mut Self::Context) -> Self::Result {
let tracer = global_tracer();
let _ = tracer.start("RefreshThumbnailsMessage");
info!("Refreshing thumbnails after upload"); info!("Refreshing thumbnails after upload");
create_thumbnails() create_thumbnails()
} }
@@ -525,6 +588,7 @@ mod tests {
let response = list_photos( let response = list_photos(
claims, claims,
HttpRequest::default(),
request, request,
Data::new(AppState::new( Data::new(AppState::new(
Arc::new(StreamActor {}.start()), Arc::new(StreamActor {}.start()),
@@ -572,6 +636,7 @@ mod tests {
let response: HttpResponse = list_photos( let response: HttpResponse = list_photos(
claims, claims,
HttpRequest::default(),
request, request,
Data::new(AppState::new( Data::new(AppState::new(
Arc::new(StreamActor {}.start()), Arc::new(StreamActor {}.start()),

View File

@@ -38,7 +38,7 @@ use crate::database::*;
use crate::files::{ use crate::files::{
is_image_or_video, is_valid_full_path, move_file, RealFileSystem, RefreshThumbnailsMessage, is_image_or_video, is_valid_full_path, move_file, RealFileSystem, RefreshThumbnailsMessage,
}; };
use crate::otel::global_tracer; use crate::otel::{extract_context_from_request, global_tracer};
use crate::service::ServiceBuilder; use crate::service::ServiceBuilder;
use crate::state::AppState; use crate::state::AppState;
use crate::tags::*; use crate::tags::*;
@@ -85,6 +85,11 @@ async fn get_image(
req: web::Query<ThumbnailRequest>, req: web::Query<ThumbnailRequest>,
app_state: Data<AppState>, app_state: Data<AppState>,
) -> impl Responder { ) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_image", &context);
if let Some(path) = is_valid_full_path(&app_state.base_path, &req.path, false) { if let Some(path) = is_valid_full_path(&app_state.base_path, &req.path, false) {
let image_size = req.size.unwrap_or(PhotoSize::Full); let image_size = req.size.unwrap_or(PhotoSize::Full);
if image_size == PhotoSize::Thumb { if image_size == PhotoSize::Thumb {
@@ -97,16 +102,21 @@ async fn get_image(
trace!("Thumbnail path: {:?}", thumb_path); trace!("Thumbnail path: {:?}", thumb_path);
if let Ok(file) = NamedFile::open(&thumb_path) { if let Ok(file) = NamedFile::open(&thumb_path) {
span.set_status(Status::Ok);
file.into_response(&request) file.into_response(&request)
} else { } else {
span.set_status(Status::error("Not found"));
HttpResponse::NotFound().finish() HttpResponse::NotFound().finish()
} }
} else if let Ok(file) = NamedFile::open(path) { } else if let Ok(file) = NamedFile::open(path) {
span.set_status(Status::Ok);
file.into_response(&request) file.into_response(&request)
} else { } else {
span.set_status(Status::error("Not found"));
HttpResponse::NotFound().finish() HttpResponse::NotFound().finish()
} }
} else { } else {
span.set_status(Status::error("Bad photos request"));
error!("Bad photos request: {}", req.path); error!("Bad photos request: {}", req.path);
HttpResponse::BadRequest().finish() HttpResponse::BadRequest().finish()
} }
@@ -115,11 +125,13 @@ async fn get_image(
#[get("/image/metadata")] #[get("/image/metadata")]
async fn get_file_metadata( async fn get_file_metadata(
_: Claims, _: Claims,
request: HttpRequest,
path: web::Query<ThumbnailRequest>, path: web::Query<ThumbnailRequest>,
app_state: Data<AppState>, app_state: Data<AppState>,
) -> impl Responder { ) -> impl Responder {
let tracer = global_tracer(); let tracer = global_tracer();
let mut span = tracer.start("get_file_metadata"); let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_file_metadata", &context);
match is_valid_full_path(&app_state.base_path, &path.path, false) match is_valid_full_path(&app_state.base_path, &path.path, false)
.ok_or_else(|| ErrorKind::InvalidData.into()) .ok_or_else(|| ErrorKind::InvalidData.into())
.and_then(File::open) .and_then(File::open)
@@ -148,11 +160,13 @@ async fn get_file_metadata(
#[post("/image")] #[post("/image")]
async fn upload_image( async fn upload_image(
_: Claims, _: Claims,
request: HttpRequest,
mut payload: mp::Multipart, mut payload: mp::Multipart,
app_state: Data<AppState>, app_state: Data<AppState>,
) -> impl Responder { ) -> impl Responder {
let tracer = global_tracer(); let tracer = global_tracer();
let mut span = tracer.start("upload_image"); let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("upload_image", &context);
let mut file_content: BytesMut = BytesMut::new(); let mut file_content: BytesMut = BytesMut::new();
let mut file_name: Option<String> = None; let mut file_name: Option<String> = None;
@@ -235,11 +249,14 @@ async fn upload_image(
#[post("/video/generate")] #[post("/video/generate")]
async fn generate_video( async fn generate_video(
_claims: Claims, _claims: Claims,
request: HttpRequest,
app_state: Data<AppState>, app_state: Data<AppState>,
body: web::Json<ThumbnailRequest>, body: web::Json<ThumbnailRequest>,
) -> impl Responder { ) -> impl Responder {
let tracer = global_tracer(); let tracer = global_tracer();
let mut span = tracer.start("generate_video");
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("generate_video", &context);
let filename = PathBuf::from(&body.path); let filename = PathBuf::from(&body.path);
@@ -283,7 +300,8 @@ async fn stream_video(
app_state: Data<AppState>, app_state: Data<AppState>,
) -> impl Responder { ) -> impl Responder {
let tracer = global::tracer("image-server"); let tracer = global::tracer("image-server");
let mut span = tracer.start("stream_video"); let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("stream_video", &context);
let playlist = &path.path; let playlist = &path.path;
debug!("Playlist: {}", playlist); debug!("Playlist: {}", playlist);
@@ -311,8 +329,9 @@ async fn get_video_part(
path: web::Path<ThumbnailRequest>, path: web::Path<ThumbnailRequest>,
app_state: Data<AppState>, app_state: Data<AppState>,
) -> impl Responder { ) -> impl Responder {
let tracer = global::tracer("image-server"); let tracer = global_tracer();
let mut span = tracer.start("get_video_part"); let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get_video_part", &context);
let part = &path.path; let part = &path.path;
debug!("Video part: {}", part); debug!("Video part: {}", part);
@@ -337,8 +356,13 @@ async fn get_video_part(
#[get("image/favorites")] #[get("image/favorites")]
async fn favorites( async fn favorites(
claims: Claims, claims: Claims,
request: HttpRequest,
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>, favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
) -> impl Responder { ) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("get favorites", &context);
match web::block(move || { match web::block(move || {
favorites_dao favorites_dao
.lock() .lock()
@@ -353,12 +377,14 @@ async fn favorites(
.map(|favorite| favorite.path) .map(|favorite| favorite.path)
.collect::<Vec<String>>(); .collect::<Vec<String>>();
span.set_status(Status::Ok);
HttpResponse::Ok().json(PhotosResponse { HttpResponse::Ok().json(PhotosResponse {
photos: favorites, photos: favorites,
dirs: Vec::new(), dirs: Vec::new(),
}) })
} }
Ok(Err(e)) => { Ok(Err(e)) => {
span.set_status(Status::error(format!("Error getting favorites: {:?}", e)));
error!("Error getting favorites: {:?}", e); error!("Error getting favorites: {:?}", e);
HttpResponse::InternalServerError().finish() HttpResponse::InternalServerError().finish()
} }
@@ -383,15 +409,15 @@ async fn put_add_favorite(
.await .await
{ {
Ok(Err(e)) if e.kind == DbErrorKind::AlreadyExists => { Ok(Err(e)) if e.kind == DbErrorKind::AlreadyExists => {
debug!("Favorite: {} exists for user: {}", &body.path, user_id); warn!("Favorite: {} exists for user: {}", &body.path, user_id);
HttpResponse::Ok() HttpResponse::Ok()
} }
Ok(Err(e)) => { Ok(Err(e)) => {
info!("{:?} {}. for user: {}", e, body.path, user_id); error!("{:?} {}. for user: {}", e, body.path, user_id);
HttpResponse::BadRequest() HttpResponse::BadRequest()
} }
Ok(Ok(_)) => { Ok(Ok(_)) => {
debug!("Adding favorite \"{}\" for userid: {}", body.path, user_id); info!("Adding favorite \"{}\" for userid: {}", body.path, user_id);
HttpResponse::Created() HttpResponse::Created()
} }
Err(e) => { Err(e) => {
@@ -652,7 +678,13 @@ fn watch_files() {
let ev = wrx.recv(); let ev = wrx.recv();
if let Ok(Ok(event)) = ev { if let Ok(Ok(event)) = ev {
match event.kind { match event.kind {
EventKind::Create(_) => create_thumbnails(), EventKind::Create(create_kind) => {
info!(
"Creating thumbnails {:?} create event kind: {:?}",
event.paths, create_kind
);
create_thumbnails();
}
EventKind::Modify(kind) => { EventKind::Modify(kind) => {
debug!("All modified paths: {:?}", event.paths); debug!("All modified paths: {:?}", event.paths);
debug!("Modify kind: {:?}", kind); debug!("Modify kind: {:?}", kind);

View File

@@ -1,20 +1,25 @@
use opentelemetry::global::BoxedTracer; use actix_web::http::header::HeaderMap;
use opentelemetry::{global, KeyValue}; use actix_web::HttpRequest;
use opentelemetry::global::{BoxedSpan, BoxedTracer};
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::trace::{Span, Status, Tracer};
use opentelemetry::{global, Context, KeyValue};
use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry_otlp::WithExportConfig; use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs::{BatchLogProcessor, SdkLoggerProvider}; use opentelemetry_sdk::logs::{BatchLogProcessor, SdkLoggerProvider};
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::Resource; use opentelemetry_sdk::Resource;
pub fn global_tracer() -> BoxedTracer { pub fn global_tracer() -> BoxedTracer {
global::tracer("image-server") global::tracer("image-server")
} }
#[allow(dead_code)]
pub fn init_tracing() { pub fn init_tracing() {
let resources = Resource::builder() let resources = Resource::builder()
.with_attributes([ .with_attributes([
KeyValue::new("service.name", "image-server"), KeyValue::new("service.name", "image-server"),
//TODO: Get this from somewhere KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
KeyValue::new("service.version", "1.0"),
]) ])
.build(); .build();
@@ -32,6 +37,7 @@ pub fn init_tracing() {
global::set_tracer_provider(tracer_provider); global::set_tracer_provider(tracer_provider);
} }
#[allow(dead_code)]
pub fn init_logs() { pub fn init_logs() {
let otlp_exporter = opentelemetry_otlp::LogExporter::builder() let otlp_exporter = opentelemetry_otlp::LogExporter::builder()
.with_tonic() .with_tonic()
@@ -44,7 +50,7 @@ pub fn init_logs() {
let resources = Resource::builder() let resources = Resource::builder()
.with_attributes([ .with_attributes([
KeyValue::new("service.name", "image-server"), KeyValue::new("service.name", "image-server"),
KeyValue::new("service.version", "1.0"), KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
]) ])
.build(); .build();
@@ -59,3 +65,48 @@ pub fn init_logs() {
//TODO: Still set this with the env? Ideally we still have a clean/simple local logger for local dev //TODO: Still set this with the env? Ideally we still have a clean/simple local logger for local dev
log::set_max_level(log::LevelFilter::Info); log::set_max_level(log::LevelFilter::Info);
} }
struct HeaderExtractor<'a>(&'a HeaderMap);
impl<'a> opentelemetry::propagation::Extractor for HeaderExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|v| v.to_str().ok())
}
fn keys(&self) -> Vec<&str> {
self.0.keys().map(|k| k.as_str()).collect()
}
}
pub fn extract_context_from_request(req: &HttpRequest) -> Context {
let propagator = TraceContextPropagator::new();
propagator.extract(&HeaderExtractor(req.headers()))
}
pub fn trace_db_call<F, O>(
context: &Context,
query_type: &str,
operation: &str,
func: F,
) -> anyhow::Result<O>
where
F: FnOnce(&mut BoxedSpan) -> anyhow::Result<O>,
{
let tracer = global::tracer("db");
let mut span = tracer
.span_builder(format!("db.{}.{}", query_type, operation))
.with_attributes(vec![
KeyValue::new("db.query_type", query_type.to_string().clone()),
KeyValue::new("db.operation", operation.to_string().clone()),
])
.start_with_context(&tracer, context);
let result = func(&mut span);
match &result {
Ok(_) => {
span.set_status(Status::Ok);
}
Err(e) => span.set_status(Status::error(e.to_string())),
}
result
}

View File

@@ -1,12 +1,16 @@
use crate::data::GetTagsRequest; use crate::data::GetTagsRequest;
use crate::otel::{extract_context_from_request, global_tracer, trace_db_call};
use crate::{connect, data::AddTagRequest, error::IntoHttpError, schema, Claims, ThumbnailRequest}; use crate::{connect, data::AddTagRequest, error::IntoHttpError, schema, Claims, ThumbnailRequest};
use actix_web::dev::{ServiceFactory, ServiceRequest}; use actix_web::dev::{ServiceFactory, ServiceRequest};
use actix_web::{web, App, HttpResponse, Responder}; use actix_web::{web, App, HttpRequest, HttpResponse, Responder};
use anyhow::Context; use anyhow::Context;
use chrono::Utc; use chrono::Utc;
use diesel::dsl::count_star; use diesel::dsl::count_star;
use diesel::prelude::*; use diesel::prelude::*;
use log::{debug, info, trace}; use diesel::sql_types::*;
use log::{debug, info};
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
use opentelemetry::KeyValue;
use schema::{tagged_photo, tags}; use schema::{tagged_photo, tags};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::borrow::BorrowMut; use std::borrow::BorrowMut;
@@ -28,48 +32,74 @@ where
async fn add_tag<D: TagDao>( async fn add_tag<D: TagDao>(
_: Claims, _: Claims,
request: HttpRequest,
body: web::Json<AddTagRequest>, body: web::Json<AddTagRequest>,
tag_dao: web::Data<Mutex<D>>, tag_dao: web::Data<Mutex<D>>,
) -> impl Responder { ) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let span = tracer.start_with_context("add_tag", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let tag_name = body.tag_name.clone(); let tag_name = body.tag_name.clone();
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
tag_dao tag_dao
.get_all_tags(None) .get_all_tags(&span_context, None)
.and_then(|tags| { .and_then(|tags| {
if let Some((_, tag)) = tags.iter().find(|t| t.1.name == tag_name) { if let Some((_, tag)) = tags.iter().find(|t| t.1.name == tag_name) {
Ok(tag.clone()) Ok(tag.clone())
} else { } else {
tag_dao.create_tag(tag_name.trim()) info!(
"Creating missing tag: '{:?}' for file: '{}'",
tag_name, &body.file_name
);
tag_dao.create_tag(&span_context, tag_name.trim())
} }
}) })
.and_then(|tag| tag_dao.tag_file(&body.file_name, tag.id)) .and_then(|tag| tag_dao.tag_file(&span_context, &body.file_name, tag.id))
.map(|_| HttpResponse::Ok()) .map(|_| {
span_context.span().set_status(Status::Ok);
HttpResponse::Ok()
})
.into_http_internal_err() .into_http_internal_err()
} }
async fn get_tags<D: TagDao>( async fn get_tags<D: TagDao>(
_: Claims, _: Claims,
http_request: HttpRequest,
request: web::Query<ThumbnailRequest>, request: web::Query<ThumbnailRequest>,
tag_dao: web::Data<Mutex<D>>, tag_dao: web::Data<Mutex<D>>,
) -> impl Responder { ) -> impl Responder {
let context = extract_context_from_request(&http_request);
let span = global_tracer().start_with_context("get_tags", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
tag_dao tag_dao
.get_tags_for_path(&request.path) .get_tags_for_path(&span_context, &request.path)
.map(|tags| HttpResponse::Ok().json(tags)) .map(|tags| {
span_context.span().set_status(Status::Ok);
HttpResponse::Ok().json(tags)
})
.into_http_internal_err() .into_http_internal_err()
} }
async fn get_all_tags<D: TagDao>( async fn get_all_tags<D: TagDao>(
_: Claims, _: Claims,
tag_dao: web::Data<Mutex<D>>, tag_dao: web::Data<Mutex<D>>,
request: HttpRequest,
query: web::Query<GetTagsRequest>, query: web::Query<GetTagsRequest>,
) -> impl Responder { ) -> impl Responder {
let context = extract_context_from_request(&request);
let span = global_tracer().start_with_context("get_all_tags", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
tag_dao tag_dao
.get_all_tags(query.path.clone()) .get_all_tags(&span_context, query.path.clone())
.map(|tags| { .map(|tags| {
span_context.span().set_status(Status::Ok);
HttpResponse::Ok().json( HttpResponse::Ok().json(
tags.iter() tags.iter()
.map(|(tag_count, tag)| TagWithTagCount { .map(|(tag_count, tag)| TagWithTagCount {
@@ -84,13 +114,20 @@ async fn get_all_tags<D: TagDao>(
async fn remove_tagged_photo<D: TagDao>( async fn remove_tagged_photo<D: TagDao>(
_: Claims, _: Claims,
http_request: HttpRequest,
request: web::Json<AddTagRequest>, request: web::Json<AddTagRequest>,
tag_dao: web::Data<Mutex<D>>, tag_dao: web::Data<Mutex<D>>,
) -> impl Responder { ) -> impl Responder {
let context = extract_context_from_request(&http_request);
let span = global_tracer().start_with_context("remove_tagged_photo", &context);
let span_context = opentelemetry::Context::current_with_span(span);
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
tag_dao tag_dao
.remove_tag(&request.tag_name, &request.file_name) .remove_tag(&span_context, &request.tag_name, &request.file_name)
.map(|result| { .map(|result| {
span_context.span().set_status(Status::Ok);
if result.is_some() { if result.is_some() {
HttpResponse::Ok() HttpResponse::Ok()
} else { } else {
@@ -103,12 +140,19 @@ async fn remove_tagged_photo<D: TagDao>(
async fn update_tags<D: TagDao>( async fn update_tags<D: TagDao>(
_: Claims, _: Claims,
tag_dao: web::Data<Mutex<D>>, tag_dao: web::Data<Mutex<D>>,
http_request: HttpRequest,
request: web::Json<AddTagsRequest>, request: web::Json<AddTagsRequest>,
) -> impl Responder { ) -> impl Responder {
let mut dao = tag_dao.lock().expect("Unable to get TagDao"); let mut dao = tag_dao.lock().expect("Unable to get TagDao");
let context = extract_context_from_request(&http_request);
let span = global_tracer().start_with_context("update_tags", &context);
let span_context = opentelemetry::Context::current_with_span(span);
dao.get_tags_for_path(&request.file_name) dao.get_tags_for_path(&span_context, &request.file_name)
.and_then(|existing_tags| dao.get_all_tags(None).map(|all| (existing_tags, all))) .and_then(|existing_tags| {
dao.get_all_tags(&span_context, None)
.map(|all| (existing_tags, all))
})
.map(|(existing_tags, all_tags)| { .map(|(existing_tags, all_tags)| {
let tags_to_remove = existing_tags let tags_to_remove = existing_tags
.iter() .iter()
@@ -120,7 +164,7 @@ async fn update_tags<D: TagDao>(
"Removing tag {:?} from file: {:?}", "Removing tag {:?} from file: {:?}",
tag.name, request.file_name tag.name, request.file_name
); );
dao.remove_tag(&tag.name, &request.file_name) dao.remove_tag(&span_context, &tag.name, &request.file_name)
.unwrap_or_else(|err| panic!("{:?} Unable to remove tag {:?}", err, &tag.name)); .unwrap_or_else(|err| panic!("{:?} Unable to remove tag {:?}", err, &tag.name));
} }
@@ -135,7 +179,7 @@ async fn update_tags<D: TagDao>(
new_tag.name, request.file_name new_tag.name, request.file_name
); );
dao.tag_file(&request.file_name, new_tag.id) dao.tag_file(&span_context, &request.file_name, new_tag.id)
.with_context(|| { .with_context(|| {
format!( format!(
"Unable to tag file {:?} with tag: {:?}", "Unable to tag file {:?} with tag: {:?}",
@@ -145,6 +189,7 @@ async fn update_tags<D: TagDao>(
.unwrap(); .unwrap();
} }
span_context.span().set_status(Status::Ok);
HttpResponse::Ok() HttpResponse::Ok()
}) })
.into_http_internal_err() .into_http_internal_err()
@@ -193,20 +238,40 @@ pub struct AddTagsRequest {
} }
pub trait TagDao { pub trait TagDao {
fn get_all_tags(&mut self, path: Option<String>) -> anyhow::Result<Vec<(i64, Tag)>>; fn get_all_tags(
fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result<Vec<Tag>>; &mut self,
fn create_tag(&mut self, name: &str) -> anyhow::Result<Tag>; context: &opentelemetry::Context,
fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result<Option<()>>; path: Option<String>,
fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto>; ) -> anyhow::Result<Vec<(i64, Tag)>>;
fn get_tags_for_path(
&mut self,
context: &opentelemetry::Context,
path: &str,
) -> anyhow::Result<Vec<Tag>>;
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag>;
fn remove_tag(
&mut self,
context: &opentelemetry::Context,
tag_name: &str,
path: &str,
) -> anyhow::Result<Option<()>>;
fn tag_file(
&mut self,
context: &opentelemetry::Context,
path: &str,
tag_id: i32,
) -> anyhow::Result<TaggedPhoto>;
fn get_files_with_all_tag_ids( fn get_files_with_all_tag_ids(
&mut self, &mut self,
tag_ids: Vec<i32>, tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>, exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>>; ) -> anyhow::Result<Vec<FileWithTagCount>>;
fn get_files_with_any_tag_ids( fn get_files_with_any_tag_ids(
&mut self, &mut self,
tag_ids: Vec<i32>, tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>, exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>>; ) -> anyhow::Result<Vec<FileWithTagCount>>;
} }
@@ -227,193 +292,261 @@ impl Default for SqliteTagDao {
} }
impl TagDao for SqliteTagDao { impl TagDao for SqliteTagDao {
fn get_all_tags(&mut self, path: Option<String>) -> anyhow::Result<Vec<(i64, Tag)>> { fn get_all_tags(
&mut self,
context: &opentelemetry::Context,
path: Option<String>,
) -> anyhow::Result<Vec<(i64, Tag)>> {
// select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*); // select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*);
let path = path.map(|p| p + "%").unwrap_or("%".to_string()); trace_db_call(&context, "query", "get_all_tags", |span| {
let (id, name, created_time) = tags::all_columns; span.set_attribute(KeyValue::new("path", path.clone().unwrap_or_default()));
tags::table
.inner_join(tagged_photo::table) let path = path.map(|p| p + "%").unwrap_or("%".to_string());
.group_by(tags::id) let (id, name, created_time) = tags::all_columns;
.select((count_star(), id, name, created_time)) tags::table
.filter(tagged_photo::photo_name.like(path)) .inner_join(tagged_photo::table)
.get_results(&mut self.connection) .group_by(tags::id)
.map::<Vec<(i64, Tag)>, _>(|tags_with_count: Vec<(i64, i32, String, i64)>| { .select((count_star(), id, name, created_time))
tags_with_count .filter(tagged_photo::photo_name.like(path))
.iter() .get_results(&mut self.connection)
.map(|tup| { .map::<Vec<(i64, Tag)>, _>(|tags_with_count: Vec<(i64, i32, String, i64)>| {
( tags_with_count
tup.0, .iter()
Tag { .map(|tup| {
id: tup.1, (
name: tup.2.clone(), tup.0,
created_time: tup.3, Tag {
}, id: tup.1,
name: tup.2.clone(),
created_time: tup.3,
},
)
})
.collect()
})
.with_context(|| "Unable to get all tags")
})
}
fn get_tags_for_path(
&mut self,
context: &opentelemetry::Context,
path: &str,
) -> anyhow::Result<Vec<Tag>> {
trace_db_call(&context, "query", "get_tags_for_path", |span| {
span.set_attribute(KeyValue::new("path", path.to_string()));
debug!("Getting Tags for path: {:?}", path);
tags::table
.left_join(tagged_photo::table)
.filter(tagged_photo::photo_name.eq(&path))
.select((tags::id, tags::name, tags::created_time))
.get_results::<Tag>(self.connection.borrow_mut())
.with_context(|| "Unable to get tags from Sqlite")
})
}
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag> {
trace_db_call(&context, "insert", "create_tag", |span| {
span.set_attribute(KeyValue::new("name", name.to_string()));
diesel::insert_into(tags::table)
.values(InsertTag {
name: name.to_string(),
created_time: Utc::now().timestamp(),
})
.execute(&mut self.connection)
.with_context(|| format!("Unable to insert tag {:?} in Sqlite", name))
.and_then(|_| {
info!("Inserted tag: {:?}", name);
define_sql_function! {
fn last_insert_rowid() -> Integer;
}
diesel::select(last_insert_rowid())
.get_result::<i32>(&mut self.connection)
.with_context(|| "Unable to get last inserted tag from Sqlite")
})
.and_then(|id| {
debug!("Got id: {:?} for inserted tag: {:?}", id, name);
tags::table
.filter(tags::id.eq(id))
.select((tags::id, tags::name, tags::created_time))
.get_result::<Tag>(self.connection.borrow_mut())
.with_context(|| {
format!("Unable to get tagged photo with id: {:?} from Sqlite", id)
})
})
})
}
fn remove_tag(
&mut self,
context: &opentelemetry::Context,
tag_name: &str,
path: &str,
) -> anyhow::Result<Option<()>> {
trace_db_call(&context, "delete", "remove_tag", |span| {
span.set_attributes(vec![
KeyValue::new("tag_name", tag_name.to_string()),
KeyValue::new("path", path.to_string()),
]);
tags::table
.filter(tags::name.eq(tag_name))
.get_result::<Tag>(self.connection.borrow_mut())
.optional()
.with_context(|| format!("Unable to get tag '{}'", tag_name))
.and_then(|tag| {
if let Some(tag) = tag {
diesel::delete(
tagged_photo::table
.filter(tagged_photo::tag_id.eq(tag.id))
.filter(tagged_photo::photo_name.eq(path)),
) )
}) .execute(&mut self.connection)
.collect() .with_context(|| format!("Unable to delete tag: '{}'", &tag.name))
}) .map(|_| Some(()))
.with_context(|| "Unable to get all tags") } else {
info!("No tag found with name '{}'", tag_name);
Ok(None)
}
})
})
} }
fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result<Vec<Tag>> { fn tag_file(
trace!("Getting Tags for path: {:?}", path); &mut self,
tags::table context: &opentelemetry::Context,
.left_join(tagged_photo::table) path: &str,
.filter(tagged_photo::photo_name.eq(&path)) tag_id: i32,
.select((tags::id, tags::name, tags::created_time)) ) -> anyhow::Result<TaggedPhoto> {
.get_results::<Tag>(self.connection.borrow_mut()) trace_db_call(&context, "insert", "tag_file", |span| {
.with_context(|| "Unable to get tags from Sqlite") span.set_attributes(vec![
} KeyValue::new("path", path.to_string()),
KeyValue::new("tag_id", tag_id.to_string()),
]);
fn create_tag(&mut self, name: &str) -> anyhow::Result<Tag> { diesel::insert_into(tagged_photo::table)
diesel::insert_into(tags::table) .values(InsertTaggedPhoto {
.values(InsertTag { tag_id,
name: name.to_string(), photo_name: path.to_string(),
created_time: Utc::now().timestamp(), created_time: Utc::now().timestamp(),
}) })
.execute(&mut self.connection) .execute(self.connection.borrow_mut())
.with_context(|| format!("Unable to insert tag {:?} in Sqlite", name)) .with_context(|| format!("Unable to tag file {:?} in sqlite", path))
.and_then(|_| { .and_then(|_| {
info!("Inserted tag: {:?}", name); info!("Inserted tagged photo: {:#} -> {:?}", tag_id, path);
define_sql_function! { define_sql_function! {
fn last_insert_rowid() -> Integer; fn last_insert_rowid() -> diesel::sql_types::Integer;
} }
diesel::select(last_insert_rowid()) diesel::select(last_insert_rowid())
.get_result::<i32>(&mut self.connection) .get_result::<i32>(&mut self.connection)
.with_context(|| "Unable to get last inserted tag from Sqlite") .with_context(|| "Unable to get last inserted tag from Sqlite")
}) })
.and_then(|id| { .and_then(|tagged_id| {
debug!("Got id: {:?} for inserted tag: {:?}", id, name); tagged_photo::table
tags::table .find(tagged_id)
.filter(tags::id.eq(id)) .first(self.connection.borrow_mut())
.select((tags::id, tags::name, tags::created_time)) .with_context(|| {
.get_result::<Tag>(self.connection.borrow_mut()) format!(
.with_context(|| { "Error getting inserted tagged photo with id: {:?}",
format!("Unable to get tagged photo with id: {:?} from Sqlite", id) tagged_id
}) )
}) })
} })
})
fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result<Option<()>> {
tags::table
.filter(tags::name.eq(tag_name))
.get_result::<Tag>(self.connection.borrow_mut())
.optional()
.with_context(|| format!("Unable to get tag '{}'", tag_name))
.and_then(|tag| {
if let Some(tag) = tag {
diesel::delete(
tagged_photo::table
.filter(tagged_photo::tag_id.eq(tag.id))
.filter(tagged_photo::photo_name.eq(path)),
)
.execute(&mut self.connection)
.with_context(|| format!("Unable to delete tag: '{}'", &tag.name))
.map(|_| Some(()))
} else {
info!("No tag found with name '{}'", tag_name);
Ok(None)
}
})
}
fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto> {
diesel::insert_into(tagged_photo::table)
.values(InsertTaggedPhoto {
tag_id,
photo_name: path.to_string(),
created_time: Utc::now().timestamp(),
})
.execute(self.connection.borrow_mut())
.with_context(|| format!("Unable to tag file {:?} in sqlite", path))
.and_then(|_| {
info!("Inserted tagged photo: {:#} -> {:?}", tag_id, path);
define_sql_function! {
fn last_insert_rowid() -> diesel::sql_types::Integer;
}
diesel::select(last_insert_rowid())
.get_result::<i32>(&mut self.connection)
.with_context(|| "Unable to get last inserted tag from Sqlite")
})
.and_then(|tagged_id| {
tagged_photo::table
.find(tagged_id)
.first(self.connection.borrow_mut())
.with_context(|| {
format!(
"Error getting inserted tagged photo with id: {:?}",
tagged_id
)
})
})
} }
fn get_files_with_all_tag_ids( fn get_files_with_all_tag_ids(
&mut self, &mut self,
tag_ids: Vec<i32>, tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>, exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>> { ) -> anyhow::Result<Vec<FileWithTagCount>> {
use diesel::dsl::*; trace_db_call(&context, "query", "get_files_with_all_tags", |_| {
use diesel::dsl::*;
let exclude_subquery = tagged_photo::table let exclude_subquery = tagged_photo::table
.filter(tagged_photo::tag_id.eq_any(exclude_tag_ids.clone())) .filter(tagged_photo::tag_id.eq_any(exclude_tag_ids.clone()))
.select(tagged_photo::photo_name) .select(tagged_photo::photo_name)
.into_boxed(); .into_boxed();
tagged_photo::table tagged_photo::table
.filter(tagged_photo::tag_id.eq_any(tag_ids.clone())) .filter(tagged_photo::tag_id.eq_any(tag_ids.clone()))
.filter(tagged_photo::photo_name.ne_all(exclude_subquery)) .filter(tagged_photo::photo_name.ne_all(exclude_subquery))
.group_by(tagged_photo::photo_name) .group_by(tagged_photo::photo_name)
.select(( .select((
tagged_photo::photo_name, tagged_photo::photo_name,
count_distinct(tagged_photo::tag_id), count_distinct(tagged_photo::tag_id),
)) ))
.having(count_distinct(tagged_photo::tag_id).ge(tag_ids.len() as i64)) .having(count_distinct(tagged_photo::tag_id).ge(tag_ids.len() as i64))
.get_results::<(String, i64)>(&mut self.connection) .get_results::<(String, i64)>(&mut self.connection)
.map(|results| { .map(|results| {
results results
.into_iter() .into_iter()
.map(|(file_name, tag_count)| FileWithTagCount { .map(|(file_name, tag_count)| FileWithTagCount {
file_name, file_name,
tag_count, tag_count,
}) })
.collect() .collect()
}) })
.with_context(|| format!("Unable to get Tagged photos with ids: {:?}", tag_ids)) .with_context(|| format!("Unable to get Tagged photos with ids: {:?}", tag_ids))
})
} }
fn get_files_with_any_tag_ids( fn get_files_with_any_tag_ids(
&mut self, &mut self,
tag_ids: Vec<i32>, tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>, exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>> { ) -> anyhow::Result<Vec<FileWithTagCount>> {
use diesel::dsl::*; trace_db_call(&context, "query", "get_files_with_any_tags", |_| {
use diesel::dsl::*;
// Create the placeholders for the IN clauses
let tag_placeholders = std::iter::repeat("?")
.take(tag_ids.len())
.collect::<Vec<_>>()
.join(",");
let exclude_placeholders = std::iter::repeat("?")
.take(exclude_tag_ids.len())
.collect::<Vec<_>>()
.join(",");
let exclude_subquery = tagged_photo::table let query = sql_query(format!(
.filter(tagged_photo::tag_id.eq_any(exclude_tag_ids.clone())) r#"
.select(tagged_photo::photo_name) WITH filtered_photos AS (
SELECT DISTINCT photo_name
FROM tagged_photo tp
WHERE tp.tag_id IN ({})
AND tp.photo_name NOT IN (
SELECT photo_name
FROM tagged_photo
WHERE tag_id IN ({})
)
)
SELECT
fp.photo_name as file_name,
COUNT(DISTINCT tp2.tag_id) as tag_count
FROM filtered_photos fp
JOIN tagged_photo tp2 ON fp.photo_name = tp2.photo_name
GROUP BY fp.photo_name"#,
tag_placeholders, exclude_placeholders
))
.into_boxed(); .into_boxed();
tagged_photo::table // Bind all parameters
.filter(tagged_photo::tag_id.eq_any(tag_ids.clone())) let query = tag_ids
.filter(tagged_photo::photo_name.ne_all(exclude_subquery)) .into_iter()
.group_by(tagged_photo::photo_name) .fold(query, |q, id| q.bind::<Integer, _>(id));
.select(( let query = exclude_tag_ids
tagged_photo::photo_name, .into_iter()
count_distinct(tagged_photo::tag_id), .fold(query, |q, id| q.bind::<Integer, _>(id));
))
.get_results::<(String, i64)>(&mut self.connection) query
.map(|results| { .load::<FileWithTagCount>(&mut self.connection)
results .with_context(|| "Unable to get tagged photos")
.into_iter() })
.map(|(file_name, tag_count)| FileWithTagCount {
file_name,
tag_count,
})
.collect()
})
.with_context(|| format!("Unable to get Tagged photos with ids: {:?}", tag_ids))
} }
} }
@@ -444,7 +577,11 @@ mod tests {
} }
impl TagDao for TestTagDao { impl TagDao for TestTagDao {
fn get_all_tags(&mut self, _option: Option<String>) -> anyhow::Result<Vec<(i64, Tag)>> { fn get_all_tags(
&mut self,
context: &opentelemetry::Context,
_option: Option<String>,
) -> anyhow::Result<Vec<(i64, Tag)>> {
Ok(self Ok(self
.tags .tags
.borrow() .borrow()
@@ -454,7 +591,11 @@ mod tests {
.clone()) .clone())
} }
fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result<Vec<Tag>> { fn get_tags_for_path(
&mut self,
context: &opentelemetry::Context,
path: &str,
) -> anyhow::Result<Vec<Tag>> {
info!("Getting test tags for: {:?}", path); info!("Getting test tags for: {:?}", path);
warn!("Tags for path: {:?}", self.tagged_photos); warn!("Tags for path: {:?}", self.tagged_photos);
@@ -466,7 +607,11 @@ mod tests {
.clone()) .clone())
} }
fn create_tag(&mut self, name: &str) -> anyhow::Result<Tag> { fn create_tag(
&mut self,
context: &opentelemetry::Context,
name: &str,
) -> anyhow::Result<Tag> {
self.tag_count += 1; self.tag_count += 1;
let tag_id = self.tag_count; let tag_id = self.tag_count;
@@ -482,7 +627,12 @@ mod tests {
Ok(tag) Ok(tag)
} }
fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result<Option<()>> { fn remove_tag(
&mut self,
context: &opentelemetry::Context,
tag_name: &str,
path: &str,
) -> anyhow::Result<Option<()>> {
let mut clone = { let mut clone = {
let photo_tags = &self.tagged_photos.borrow()[path]; let photo_tags = &self.tagged_photos.borrow()[path];
photo_tags.clone() photo_tags.clone()
@@ -502,7 +652,12 @@ mod tests {
} }
} }
fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto> { fn tag_file(
&mut self,
context: &opentelemetry::Context,
path: &str,
tag_id: i32,
) -> anyhow::Result<TaggedPhoto> {
debug!("Tagging file: {:?} with tag_id: {:?}", path, tag_id); debug!("Tagging file: {:?} with tag_id: {:?}", path, tag_id);
if let Some(tag) = self.tags.borrow().iter().find(|t| t.id == tag_id) { if let Some(tag) = self.tags.borrow().iter().find(|t| t.id == tag_id) {
@@ -538,15 +693,17 @@ mod tests {
fn get_files_with_all_tag_ids( fn get_files_with_all_tag_ids(
&mut self, &mut self,
tag_ids: Vec<i32>, tag_ids: Vec<i32>,
_exclude_tag_ids: Vec<i32>, exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>> { ) -> anyhow::Result<Vec<FileWithTagCount>> {
todo!() todo!()
} }
fn get_files_with_any_tag_ids( fn get_files_with_any_tag_ids(
&mut self, &mut self,
_tag_ids: Vec<i32>, tag_ids: Vec<i32>,
_exclude_tag_ids: Vec<i32>, exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>> { ) -> anyhow::Result<Vec<FileWithTagCount>> {
todo!() todo!()
} }
@@ -629,9 +786,10 @@ mod tests {
); );
} }
} }
#[derive(QueryableByName, Debug, Clone)]
#[derive(Debug, Clone)] pub(crate) struct FileWithTagCount {
pub struct FileWithTagCount { #[diesel(sql_type = Text)]
pub file_name: String, pub(crate) file_name: String,
pub tag_count: i64, #[diesel(sql_type = BigInt)]
pub(crate) tag_count: i64,
} }

View File

@@ -1,7 +1,10 @@
use crate::is_video; use crate::is_video;
use crate::otel::global_tracer;
use actix::prelude::*; use actix::prelude::*;
use futures::TryFutureExt; use futures::TryFutureExt;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use opentelemetry::trace::{Span, Status, Tracer};
use opentelemetry::KeyValue;
use std::io::Result; use std::io::Result;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::{Child, Command, ExitStatus, Stdio}; use std::process::{Child, Command, ExitStatus, Stdio};
@@ -122,6 +125,9 @@ impl Handler<ScanDirectoryMessage> for VideoPlaylistManager {
type Result = ResponseFuture<()>; type Result = ResponseFuture<()>;
fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result {
let tracer = global_tracer();
let mut span = tracer.start("videoplaylistmanager.scan_directory");
let start = std::time::Instant::now(); let start = std::time::Instant::now();
info!( info!(
"Starting scan directory for video playlist generation: {}", "Starting scan directory for video playlist generation: {}",
@@ -157,6 +163,11 @@ impl Handler<ScanDirectoryMessage> for VideoPlaylistManager {
.expect("Failed to send generate playlist message") .expect("Failed to send generate playlist message")
{ {
Ok(_) => { Ok(_) => {
span.add_event(
"Playlist generated",
vec![KeyValue::new("video_path", path_as_str.to_string())],
);
debug!( debug!(
"Successfully generated playlist for file: '{}'", "Successfully generated playlist for file: '{}'",
path_as_str path_as_str
@@ -168,6 +179,10 @@ impl Handler<ScanDirectoryMessage> for VideoPlaylistManager {
} }
} }
span.add_event(
"Finished directory scan",
vec![KeyValue::new("directory", scan_dir_name.to_string())],
);
info!( info!(
"Finished directory scan of '{}' in {:?}", "Finished directory scan of '{}' in {:?}",
scan_dir_name, scan_dir_name,
@@ -219,6 +234,16 @@ impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
playlist_path, playlist_path,
msg.video_path.file_name().unwrap().to_str().unwrap() msg.video_path.file_name().unwrap().to_str().unwrap()
); );
let tracer = global_tracer();
let mut span = tracer
.span_builder("playlistgenerator.generate_playlist")
.with_attributes(vec![
KeyValue::new("video_file", video_file.clone()),
KeyValue::new("playlist_file", playlist_file.clone()),
])
.start(&tracer);
Box::pin(async move { Box::pin(async move {
let wait_start = std::time::Instant::now(); let wait_start = std::time::Instant::now();
let permit = semaphore let permit = semaphore
@@ -230,9 +255,20 @@ impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
"Waited for {:?} before starting ffmpeg", "Waited for {:?} before starting ffmpeg",
wait_start.elapsed() wait_start.elapsed()
); );
span.add_event(
"Waited for FFMPEG semaphore",
vec![KeyValue::new(
"wait_time",
wait_start.elapsed().as_secs_f64(),
)],
);
if Path::new(&playlist_file).exists() { if Path::new(&playlist_file).exists() {
debug!("Playlist already exists: {}", playlist_file); debug!("Playlist already exists: {}", playlist_file);
span.set_status(Status::error(format!(
"Playlist already exists: {}",
playlist_file
)));
return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)); return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
} }
@@ -267,6 +303,8 @@ impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
debug!("ffmpeg output: {:?}", res); debug!("ffmpeg output: {:?}", res);
} }
span.set_status(Status::Ok);
ffmpeg_result ffmpeg_result
}); });