From 24d2123fc2f1d360b491e209400282a347c016da Mon Sep 17 00:00:00 2001 From: Cameron Date: Sun, 18 May 2025 19:57:16 -0400 Subject: [PATCH 01/11] Fix recursive-any tag counting This is bad security wise so it'll need another pass. --- Cargo.lock | 23 ++---------------- Cargo.toml | 4 +-- src/tags.rs | 70 ++++++++++++++++++++++++++++++++--------------------- 3 files changed, 45 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9f8bb85..71d6d3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -908,9 +908,9 @@ dependencies = [ [[package]] name = "diesel" -version = "2.2.5" +version = "2.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbf9649c05e0a9dbd6d0b0b8301db5182b972d0fd02f0a7c6736cf632d7c0fd5" +checksum = "ff3e1edb1f37b4953dd5176916347289ed43d7119cc2e6c7c3f7849ff44ea506" dependencies = [ "diesel_derives", "libsqlite3-sys", @@ -1624,7 +1624,6 @@ dependencies = [ "opentelemetry", "opentelemetry-appender-log", "opentelemetry-otlp", - "opentelemetry-resource-detectors", "opentelemetry-stdout", "opentelemetry_sdk", "path-absolutize", @@ -1633,7 +1632,6 @@ dependencies = [ "rayon", "serde", "serde_json", - "tempfile", "tokio", "walkdir", ] @@ -2176,23 +2174,6 @@ dependencies = [ "tonic", ] -[[package]] -name = "opentelemetry-resource-detectors" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0cd3cf373f6f7f3a8f25a189acf1300c8b87e85f7959b45ba83c01e305f5cc3" -dependencies = [ - "opentelemetry", - "opentelemetry-semantic-conventions", - "opentelemetry_sdk", -] - -[[package]] -name = "opentelemetry-semantic-conventions" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fb3a2f78c2d55362cd6c313b8abedfbc0142ab3c2676822068fd2ab7d51f9b7" - [[package]] name = "opentelemetry-stdout" version = "0.28.0" diff --git a/Cargo.toml b/Cargo.toml index f4a0ecd..7c98e1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ futures = "0.3.5" jsonwebtoken = "9.3.0" serde = "1" serde_json = "1" -diesel = { version = "2.2.5", features = ["sqlite"] } +diesel = { version = "2.2.10", features = ["sqlite"] } diesel_migrations = "2.2.0" chrono = "0.4" dotenv = "0.15" @@ -37,10 +37,8 @@ prometheus = "0.13" lazy_static = "1.5" anyhow = "1.0" rand = "0.8.5" -tempfile = "3.14.0" opentelemetry = { version = "0.28.0", features = ["default", "metrics", "tracing"] } opentelemetry_sdk = { version = "0.28.0", features = ["default", "rt-tokio-current-thread", "tracing", "metrics"] } opentelemetry-otlp = { version = "0.28.0", features = ["default", "metrics", "tracing", "grpc-tonic"] } opentelemetry-stdout = "0.28.0" opentelemetry-appender-log = "0.28.0" -opentelemetry-resource-detectors = "0.7.0" diff --git a/src/tags.rs b/src/tags.rs index 0339dfd..79632f4 100644 --- a/src/tags.rs +++ b/src/tags.rs @@ -6,6 +6,7 @@ use anyhow::Context; use chrono::Utc; use diesel::dsl::count_star; use diesel::prelude::*; +use diesel::sql_types::*; use log::{debug, info, trace}; use schema::{tagged_photo, tags}; use serde::{Deserialize, Serialize}; @@ -390,30 +391,42 @@ impl TagDao for SqliteTagDao { ) -> anyhow::Result> { use diesel::dsl::*; - let exclude_subquery = tagged_photo::table - .filter(tagged_photo::tag_id.eq_any(exclude_tag_ids.clone())) - .select(tagged_photo::photo_name) - .into_boxed(); + let tag_ids_str = tag_ids + .iter() + .map(|id| id.to_string()) + .collect::>() + .join(","); - tagged_photo::table - .filter(tagged_photo::tag_id.eq_any(tag_ids.clone())) - .filter(tagged_photo::photo_name.ne_all(exclude_subquery)) - .group_by(tagged_photo::photo_name) - .select(( - tagged_photo::photo_name, - count_distinct(tagged_photo::tag_id), - )) - .get_results::<(String, i64)>(&mut self.connection) - .map(|results| { - results - .into_iter() - .map(|(file_name, tag_count)| FileWithTagCount { - file_name, - tag_count, - }) - .collect() - }) - .with_context(|| format!("Unable to get Tagged photos with ids: {:?}", tag_ids)) + let exclude_tag_ids_str = exclude_tag_ids + .iter() + .map(|id| id.to_string()) + .collect::>() + .join(","); + + let query = sql_query(format!( + r#" +WITH filtered_photos AS ( + SELECT DISTINCT photo_name + FROM tagged_photo tp + WHERE tp.tag_id IN ({}) + AND tp.photo_name NOT IN ( + SELECT photo_name + FROM tagged_photo + WHERE tag_id IN ({}) + ) + ) + SELECT + fp.photo_name as file_name, + COUNT(DISTINCT tp2.tag_id) as tag_count + FROM filtered_photos fp + JOIN tagged_photo tp2 ON fp.photo_name = tp2.photo_name + GROUP BY fp.photo_name"#, + tag_ids_str, exclude_tag_ids_str + )); + + // Execute the query: + let results = query.load::(&mut self.connection)?; + Ok(results) } } @@ -629,9 +642,10 @@ mod tests { ); } } - -#[derive(Debug, Clone)] -pub struct FileWithTagCount { - pub file_name: String, - pub tag_count: i64, +#[derive(QueryableByName, Debug, Clone)] +pub(crate) struct FileWithTagCount { + #[diesel(sql_type = Text)] + pub(crate) file_name: String, + #[diesel(sql_type = BigInt)] + pub(crate) tag_count: i64, } -- 2.49.1 From d37deb36fe4c55e7924bb098e173f190e7aa5e06 Mon Sep 17 00:00:00 2001 From: Cameron Date: Fri, 23 May 2025 14:51:54 -0400 Subject: [PATCH 02/11] Additional Otel logging and spans --- src/files.rs | 23 ++++++++++++++++++++++- src/main.rs | 16 ++++++++++++---- src/otel.rs | 3 +-- 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/src/files.rs b/src/files.rs index bc9cff6..79fda03 100644 --- a/src/files.rs +++ b/src/files.rs @@ -15,7 +15,8 @@ use actix_web::{ HttpResponse, }; use log::{debug, error, info, trace}; - +use opentelemetry::KeyValue; +use opentelemetry::trace::{Span, Status, Tracer}; use crate::data::{Claims, FilesRequest, FilterMode, PhotosResponse, SortType}; use crate::{create_thumbnails, AppState}; @@ -27,6 +28,7 @@ use path_absolutize::*; use rand::prelude::SliceRandom; use rand::thread_rng; use serde::Deserialize; +use crate::otel::global_tracer; pub async fn list_photos( _: Claims, @@ -36,6 +38,10 @@ pub async fn list_photos( tag_dao: web::Data>, ) -> HttpResponse { let search_path = &req.path; + + let tracer = global_tracer(); + let mut span = tracer.start("list_photos"); + span.set_attribute(KeyValue::new("path", search_path.to_string())); let search_recursively = req.recursive.unwrap_or(false); if let Some(tag_ids) = &req.tag_ids { @@ -99,6 +105,8 @@ pub async fn list_photos( tagged_files.len(), tagged_files ); + span.set_attribute(KeyValue::new("file_count", tagged_files.len().to_string())); + span.set_status(Status::Ok); HttpResponse::Ok().json(PhotosResponse { photos: tagged_files, @@ -190,12 +198,16 @@ pub async fn list_photos( .map(|f| f.to_str().unwrap().to_string()) .collect::>(); + span.set_attribute(KeyValue::new("file_count", files.len().to_string())); + span.set_status(Status::Ok); + HttpResponse::Ok().json(PhotosResponse { photos: response_files, dirs, }) } else { error!("Bad photos request: {}", req.path); + span.set_status(Status::error("Invalid path")); HttpResponse::BadRequest().finish() } } @@ -224,12 +236,18 @@ fn sort(mut files: Vec, sort_type: SortType) -> Vec { } pub fn list_files(dir: &Path) -> io::Result> { + let tracer = global_tracer(); + let mut span = tracer.start("list_files"); + let dir_name_string = dir.to_str().unwrap_or_default().to_string(); + span.set_attribute(KeyValue::new("dir", dir_name_string)); + let files = read_dir(dir)? .filter_map(|res| res.ok()) .filter(|entry| is_image_or_video(&entry.path()) || entry.file_type().unwrap().is_dir()) .map(|entry| entry.path()) .collect::>(); + span.set_attribute(KeyValue::new("file_count", files.len().to_string())); Ok(files) } @@ -372,6 +390,7 @@ impl FileSystemAccess for RealFileSystem { } fn move_file>(&self, from: P, destination: P) -> anyhow::Result<()> { + info!("Moving file: '{:?}' -> '{:?}'", from.as_ref(), destination.as_ref()); let name = from .as_ref() .file_name() @@ -393,6 +412,8 @@ impl Handler for StreamActor { type Result = (); fn handle(&mut self, _msg: RefreshThumbnailsMessage, _ctx: &mut Self::Context) -> Self::Result { + let tracer = global_tracer(); + let _ = tracer.start("RefreshThumbnailsMessage"); info!("Refreshing thumbnails after upload"); create_thumbnails() } diff --git a/src/main.rs b/src/main.rs index 5aeaa1f..5dbf5c2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -83,6 +83,9 @@ async fn get_image( req: web::Query, app_state: Data, ) -> impl Responder { + let tracer = global_tracer(); + let mut span = tracer.start("get_image"); + if let Some(path) = is_valid_full_path(&app_state.base_path, &req.path, false) { let image_size = req.size.unwrap_or(PhotoSize::Full); if image_size == PhotoSize::Thumb { @@ -95,16 +98,21 @@ async fn get_image( trace!("Thumbnail path: {:?}", thumb_path); if let Ok(file) = NamedFile::open(&thumb_path) { + span.set_status(Status::Ok); file.into_response(&request) } else { + span.set_status(Status::error("Not found")); HttpResponse::NotFound().finish() } } else if let Ok(file) = NamedFile::open(path) { + span.set_status(Status::Ok); file.into_response(&request) } else { + span.set_status(Status::error("Not found")); HttpResponse::NotFound().finish() } } else { + span.set_status(Status::error("Bad photos request")); error!("Bad photos request: {}", req.path); HttpResponse::BadRequest().finish() } @@ -309,7 +317,7 @@ async fn get_video_part( path: web::Path, app_state: Data, ) -> impl Responder { - let tracer = global::tracer("image-server"); + let tracer = global_tracer(); let mut span = tracer.start("get_video_part"); let part = &path.path; @@ -381,15 +389,15 @@ async fn put_add_favorite( .await { Ok(Err(e)) if e.kind == DbErrorKind::AlreadyExists => { - debug!("Favorite: {} exists for user: {}", &body.path, user_id); + warn!("Favorite: {} exists for user: {}", &body.path, user_id); HttpResponse::Ok() } Ok(Err(e)) => { - info!("{:?} {}. for user: {}", e, body.path, user_id); + error!("{:?} {}. for user: {}", e, body.path, user_id); HttpResponse::BadRequest() } Ok(Ok(_)) => { - debug!("Adding favorite \"{}\" for userid: {}", body.path, user_id); + info!("Adding favorite \"{}\" for userid: {}", body.path, user_id); HttpResponse::Created() } Err(e) => { diff --git a/src/otel.rs b/src/otel.rs index cc6af16..5c9e121 100644 --- a/src/otel.rs +++ b/src/otel.rs @@ -13,8 +13,7 @@ pub fn init_tracing() { let resources = Resource::builder() .with_attributes([ KeyValue::new("service.name", "image-server"), - //TODO: Get this from somewhere - KeyValue::new("service.version", "1.0"), + KeyValue::new("service.version", env!("CARGO_PKG_VERSION")), ]) .build(); -- 2.49.1 From 785ce157e6f32a37f48259ba3256880a44c0a15f Mon Sep 17 00:00:00 2001 From: Cameron Date: Fri, 23 May 2025 18:24:54 -0400 Subject: [PATCH 03/11] Get Otel span from the request --- src/files.rs | 13 +++++++------ src/main.rs | 32 +++++++++++++++++++++++++------- src/otel.rs | 22 +++++++++++++++++++++- 3 files changed, 53 insertions(+), 14 deletions(-) diff --git a/src/files.rs b/src/files.rs index 79fda03..b91e8ba 100644 --- a/src/files.rs +++ b/src/files.rs @@ -10,10 +10,7 @@ use actix::{Handler, Message}; use anyhow::{anyhow, Context}; use actix_web::web::Data; -use actix_web::{ - web::{self, Query}, - HttpResponse, -}; +use actix_web::{web::{self, Query}, HttpRequest, HttpResponse}; use log::{debug, error, info, trace}; use opentelemetry::KeyValue; use opentelemetry::trace::{Span, Status, Tracer}; @@ -28,10 +25,11 @@ use path_absolutize::*; use rand::prelude::SliceRandom; use rand::thread_rng; use serde::Deserialize; -use crate::otel::global_tracer; +use crate::otel::{extract_context_from_request, global_tracer}; pub async fn list_photos( _: Claims, + request: HttpRequest, req: Query, app_state: web::Data, file_system: web::Data, @@ -40,7 +38,8 @@ pub async fn list_photos( let search_path = &req.path; let tracer = global_tracer(); - let mut span = tracer.start("list_photos"); + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("list_photos", &context); span.set_attribute(KeyValue::new("path", search_path.to_string())); let search_recursively = req.recursive.unwrap_or(false); @@ -547,6 +546,7 @@ mod tests { let response = list_photos( claims, + HttpRequest::default(), request, Data::new(AppState::new( Arc::new(StreamActor {}.start()), @@ -594,6 +594,7 @@ mod tests { let response: HttpResponse = list_photos( claims, + HttpRequest::default(), request, Data::new(AppState::new( Arc::new(StreamActor {}.start()), diff --git a/src/main.rs b/src/main.rs index 5dbf5c2..e7f7a1e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -38,7 +38,7 @@ use crate::database::*; use crate::files::{ is_image_or_video, is_valid_full_path, move_file, RealFileSystem, RefreshThumbnailsMessage, }; -use crate::otel::global_tracer; +use crate::otel::{extract_context_from_request, global_tracer}; use crate::service::ServiceBuilder; use crate::state::AppState; use crate::tags::*; @@ -84,7 +84,9 @@ async fn get_image( app_state: Data, ) -> impl Responder { let tracer = global_tracer(); - let mut span = tracer.start("get_image"); + let context = extract_context_from_request(&request); + + let mut span = tracer.start_with_context("get_image", &context); if let Some(path) = is_valid_full_path(&app_state.base_path, &req.path, false) { let image_size = req.size.unwrap_or(PhotoSize::Full); @@ -121,11 +123,13 @@ async fn get_image( #[get("/image/metadata")] async fn get_file_metadata( _: Claims, + request: HttpRequest, path: web::Query, app_state: Data, ) -> impl Responder { let tracer = global_tracer(); - let mut span = tracer.start("get_file_metadata"); + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("get_file_metadata", &context); match is_valid_full_path(&app_state.base_path, &path.path, false) .ok_or_else(|| ErrorKind::InvalidData.into()) .and_then(File::open) @@ -154,11 +158,13 @@ async fn get_file_metadata( #[post("/image")] async fn upload_image( _: Claims, + request: HttpRequest, mut payload: mp::Multipart, app_state: Data, ) -> impl Responder { let tracer = global_tracer(); - let mut span = tracer.start("upload_image"); + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("upload_image", &context); let mut file_content: BytesMut = BytesMut::new(); let mut file_name: Option = None; @@ -241,11 +247,14 @@ async fn upload_image( #[post("/video/generate")] async fn generate_video( _claims: Claims, + request: HttpRequest, app_state: Data, body: web::Json, ) -> impl Responder { let tracer = global_tracer(); - let mut span = tracer.start("generate_video"); + + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("generate_video", &context); let filename = PathBuf::from(&body.path); @@ -289,7 +298,8 @@ async fn stream_video( app_state: Data, ) -> impl Responder { let tracer = global::tracer("image-server"); - let mut span = tracer.start("stream_video"); + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("stream_video", &context); let playlist = &path.path; debug!("Playlist: {}", playlist); @@ -318,7 +328,8 @@ async fn get_video_part( app_state: Data, ) -> impl Responder { let tracer = global_tracer(); - let mut span = tracer.start("get_video_part"); + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("get_video_part", &context); let part = &path.path; debug!("Video part: {}", part); @@ -343,8 +354,13 @@ async fn get_video_part( #[get("image/favorites")] async fn favorites( claims: Claims, + request: HttpRequest, favorites_dao: Data>>, ) -> impl Responder { + let tracer = global_tracer(); + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("get favorites", &context); + match web::block(move || { favorites_dao .lock() @@ -359,12 +375,14 @@ async fn favorites( .map(|favorite| favorite.path) .collect::>(); + span.set_status(Status::Ok); HttpResponse::Ok().json(PhotosResponse { photos: favorites, dirs: Vec::new(), }) } Ok(Err(e)) => { + span.set_status(Status::error(format!("Error getting favorites: {:?}", e))); error!("Error getting favorites: {:?}", e); HttpResponse::InternalServerError().finish() } diff --git a/src/otel.rs b/src/otel.rs index 5c9e121..595c834 100644 --- a/src/otel.rs +++ b/src/otel.rs @@ -1,8 +1,12 @@ +use actix_web::http::header::HeaderMap; +use actix_web::HttpRequest; use opentelemetry::global::BoxedTracer; -use opentelemetry::{global, KeyValue}; +use opentelemetry::{global, Context, KeyValue}; +use opentelemetry::propagation::TextMapPropagator; use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::logs::{BatchLogProcessor, SdkLoggerProvider}; +use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::Resource; pub fn global_tracer() -> BoxedTracer { @@ -58,3 +62,19 @@ pub fn init_logs() { //TODO: Still set this with the env? Ideally we still have a clean/simple local logger for local dev log::set_max_level(log::LevelFilter::Info); } + +struct HeaderExtractor<'a>(&'a HeaderMap); + +impl<'a> opentelemetry::propagation::Extractor for HeaderExtractor<'a> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|v| v.to_str().ok()) + } + + fn keys(&self) -> Vec<&str> { + self.0.keys().map(|k| k.as_str()).collect() + } +} +pub fn extract_context_from_request(req: &HttpRequest) -> Context { + let propagator = TraceContextPropagator::new(); + propagator.extract(&HeaderExtractor(req.headers())) +} -- 2.49.1 From 1e63e0c08cc5dab93cd03c0ec32cd20a7253ed3f Mon Sep 17 00:00:00 2001 From: Cameron Date: Tue, 3 Jun 2025 12:07:26 -0400 Subject: [PATCH 04/11] Add DB spans to the various queries --- src/otel.rs | 18 ++- src/tags.rs | 311 +++++++++++++++++++++++++++------------------------- 2 files changed, 180 insertions(+), 149 deletions(-) diff --git a/src/otel.rs b/src/otel.rs index 595c834..e3e70e9 100644 --- a/src/otel.rs +++ b/src/otel.rs @@ -3,6 +3,7 @@ use actix_web::HttpRequest; use opentelemetry::global::BoxedTracer; use opentelemetry::{global, Context, KeyValue}; use opentelemetry::propagation::TextMapPropagator; +use opentelemetry::trace::Tracer; use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::logs::{BatchLogProcessor, SdkLoggerProvider}; @@ -13,6 +14,7 @@ pub fn global_tracer() -> BoxedTracer { global::tracer("image-server") } +#[allow(dead_code)] pub fn init_tracing() { let resources = Resource::builder() .with_attributes([ @@ -35,6 +37,7 @@ pub fn init_tracing() { global::set_tracer_provider(tracer_provider); } +#[allow(dead_code)] pub fn init_logs() { let otlp_exporter = opentelemetry_otlp::LogExporter::builder() .with_tonic() @@ -47,7 +50,7 @@ pub fn init_logs() { let resources = Resource::builder() .with_attributes([ KeyValue::new("service.name", "image-server"), - KeyValue::new("service.version", "1.0"), + KeyValue::new("service.version", env!("CARGO_PKG_VERSION")), ]) .build(); @@ -78,3 +81,16 @@ pub fn extract_context_from_request(req: &HttpRequest) -> Context { let propagator = TraceContextPropagator::new(); propagator.extract(&HeaderExtractor(req.headers())) } + +pub fn trace_db_call(operation: &str, query_type: &str, func: F) -> anyhow::Result +where F: FnOnce() -> anyhow::Result { + let tracer = global::tracer("db"); + let _span = tracer + .span_builder(format!("db.{}.{}", operation, query_type)) + .with_attributes(vec![ + KeyValue::new("db.operation", operation.to_string().clone()), + KeyValue::new("db.query_type", query_type.to_string().clone()), + ]).start(&tracer); + + func() +} \ No newline at end of file diff --git a/src/tags.rs b/src/tags.rs index 79632f4..a2cdeae 100644 --- a/src/tags.rs +++ b/src/tags.rs @@ -1,4 +1,5 @@ use crate::data::GetTagsRequest; +use crate::otel::trace_db_call; use crate::{connect, data::AddTagRequest, error::IntoHttpError, schema, Claims, ThumbnailRequest}; use actix_web::dev::{ServiceFactory, ServiceRequest}; use actix_web::{web, App, HttpResponse, Responder}; @@ -231,123 +232,133 @@ impl TagDao for SqliteTagDao { fn get_all_tags(&mut self, path: Option) -> anyhow::Result> { // select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*); - let path = path.map(|p| p + "%").unwrap_or("%".to_string()); - let (id, name, created_time) = tags::all_columns; - tags::table - .inner_join(tagged_photo::table) - .group_by(tags::id) - .select((count_star(), id, name, created_time)) - .filter(tagged_photo::photo_name.like(path)) - .get_results(&mut self.connection) - .map::, _>(|tags_with_count: Vec<(i64, i32, String, i64)>| { - tags_with_count - .iter() - .map(|tup| { - ( - tup.0, - Tag { - id: tup.1, - name: tup.2.clone(), - created_time: tup.3, - }, - ) - }) - .collect() - }) - .with_context(|| "Unable to get all tags") + trace_db_call("query", "get_all_tags", || { + let path = path.map(|p| p + "%").unwrap_or("%".to_string()); + let (id, name, created_time) = tags::all_columns; + tags::table + .inner_join(tagged_photo::table) + .group_by(tags::id) + .select((count_star(), id, name, created_time)) + .filter(tagged_photo::photo_name.like(path)) + .get_results(&mut self.connection) + .map::, _>(|tags_with_count: Vec<(i64, i32, String, i64)>| { + tags_with_count + .iter() + .map(|tup| { + ( + tup.0, + Tag { + id: tup.1, + name: tup.2.clone(), + created_time: tup.3, + }, + ) + }) + .collect() + }) + .with_context(|| "Unable to get all tags") + }) } fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result> { - trace!("Getting Tags for path: {:?}", path); - tags::table - .left_join(tagged_photo::table) - .filter(tagged_photo::photo_name.eq(&path)) - .select((tags::id, tags::name, tags::created_time)) - .get_results::(self.connection.borrow_mut()) - .with_context(|| "Unable to get tags from Sqlite") + trace_db_call("query", "get_tags_for_path", || { + trace!("Getting Tags for path: {:?}", path); + tags::table + .left_join(tagged_photo::table) + .filter(tagged_photo::photo_name.eq(&path)) + .select((tags::id, tags::name, tags::created_time)) + .get_results::(self.connection.borrow_mut()) + .with_context(|| "Unable to get tags from Sqlite") + }) } fn create_tag(&mut self, name: &str) -> anyhow::Result { - diesel::insert_into(tags::table) - .values(InsertTag { - name: name.to_string(), - created_time: Utc::now().timestamp(), - }) - .execute(&mut self.connection) - .with_context(|| format!("Unable to insert tag {:?} in Sqlite", name)) - .and_then(|_| { - info!("Inserted tag: {:?}", name); - define_sql_function! { - fn last_insert_rowid() -> Integer; - } - diesel::select(last_insert_rowid()) - .get_result::(&mut self.connection) - .with_context(|| "Unable to get last inserted tag from Sqlite") - }) - .and_then(|id| { - debug!("Got id: {:?} for inserted tag: {:?}", id, name); - tags::table - .filter(tags::id.eq(id)) - .select((tags::id, tags::name, tags::created_time)) - .get_result::(self.connection.borrow_mut()) - .with_context(|| { - format!("Unable to get tagged photo with id: {:?} from Sqlite", id) - }) - }) + trace_db_call("insert", "create_tag", || { + diesel::insert_into(tags::table) + .values(InsertTag { + name: name.to_string(), + created_time: Utc::now().timestamp(), + }) + .execute(&mut self.connection) + .with_context(|| format!("Unable to insert tag {:?} in Sqlite", name)) + .and_then(|_| { + info!("Inserted tag: {:?}", name); + define_sql_function! { + fn last_insert_rowid() -> Integer; + } + diesel::select(last_insert_rowid()) + .get_result::(&mut self.connection) + .with_context(|| "Unable to get last inserted tag from Sqlite") + }) + .and_then(|id| { + debug!("Got id: {:?} for inserted tag: {:?}", id, name); + tags::table + .filter(tags::id.eq(id)) + .select((tags::id, tags::name, tags::created_time)) + .get_result::(self.connection.borrow_mut()) + .with_context(|| { + format!("Unable to get tagged photo with id: {:?} from Sqlite", id) + }) + }) + }) } fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result> { - tags::table - .filter(tags::name.eq(tag_name)) - .get_result::(self.connection.borrow_mut()) - .optional() - .with_context(|| format!("Unable to get tag '{}'", tag_name)) - .and_then(|tag| { - if let Some(tag) = tag { - diesel::delete( - tagged_photo::table - .filter(tagged_photo::tag_id.eq(tag.id)) - .filter(tagged_photo::photo_name.eq(path)), - ) - .execute(&mut self.connection) - .with_context(|| format!("Unable to delete tag: '{}'", &tag.name)) - .map(|_| Some(())) - } else { - info!("No tag found with name '{}'", tag_name); - Ok(None) - } - }) + trace_db_call("delete", "remove_tag", || { + tags::table + .filter(tags::name.eq(tag_name)) + .get_result::(self.connection.borrow_mut()) + .optional() + .with_context(|| format!("Unable to get tag '{}'", tag_name)) + .and_then(|tag| { + if let Some(tag) = tag { + diesel::delete( + tagged_photo::table + .filter(tagged_photo::tag_id.eq(tag.id)) + .filter(tagged_photo::photo_name.eq(path)), + ) + .execute(&mut self.connection) + .with_context(|| format!("Unable to delete tag: '{}'", &tag.name)) + .map(|_| Some(())) + } else { + info!("No tag found with name '{}'", tag_name); + Ok(None) + } + }) + }) } fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result { - diesel::insert_into(tagged_photo::table) - .values(InsertTaggedPhoto { - tag_id, - photo_name: path.to_string(), - created_time: Utc::now().timestamp(), - }) - .execute(self.connection.borrow_mut()) - .with_context(|| format!("Unable to tag file {:?} in sqlite", path)) - .and_then(|_| { - info!("Inserted tagged photo: {:#} -> {:?}", tag_id, path); - define_sql_function! { - fn last_insert_rowid() -> diesel::sql_types::Integer; - } - diesel::select(last_insert_rowid()) - .get_result::(&mut self.connection) - .with_context(|| "Unable to get last inserted tag from Sqlite") - }) - .and_then(|tagged_id| { - tagged_photo::table - .find(tagged_id) - .first(self.connection.borrow_mut()) - .with_context(|| { - format!( - "Error getting inserted tagged photo with id: {:?}", - tagged_id - ) - }) - }) + trace_db_call("insert", "tag_file", || { + diesel::insert_into(tagged_photo::table) + .values(InsertTaggedPhoto { + tag_id, + photo_name: path.to_string(), + created_time: Utc::now().timestamp(), + }) + .execute(self.connection.borrow_mut()) + .with_context(|| format!("Unable to tag file {:?} in sqlite", path)) + .and_then(|_| { + info!("Inserted tagged photo: {:#} -> {:?}", tag_id, path); + define_sql_function! { + fn last_insert_rowid() -> diesel::sql_types::Integer; + } + diesel::select(last_insert_rowid()) + .get_result::(&mut self.connection) + .with_context(|| "Unable to get last inserted tag from Sqlite") + }) + .and_then(|tagged_id| { + tagged_photo::table + .find(tagged_id) + .first(self.connection.borrow_mut()) + .with_context(|| { + format!( + "Error getting inserted tagged photo with id: {:?}", + tagged_id + ) + }) + }) + }) } fn get_files_with_all_tag_ids( @@ -355,33 +366,35 @@ impl TagDao for SqliteTagDao { tag_ids: Vec, exclude_tag_ids: Vec, ) -> anyhow::Result> { - use diesel::dsl::*; + trace_db_call("query", "get_files_with_all_tags", || { + use diesel::dsl::*; - let exclude_subquery = tagged_photo::table - .filter(tagged_photo::tag_id.eq_any(exclude_tag_ids.clone())) - .select(tagged_photo::photo_name) - .into_boxed(); + let exclude_subquery = tagged_photo::table + .filter(tagged_photo::tag_id.eq_any(exclude_tag_ids.clone())) + .select(tagged_photo::photo_name) + .into_boxed(); - tagged_photo::table - .filter(tagged_photo::tag_id.eq_any(tag_ids.clone())) - .filter(tagged_photo::photo_name.ne_all(exclude_subquery)) - .group_by(tagged_photo::photo_name) - .select(( - tagged_photo::photo_name, - count_distinct(tagged_photo::tag_id), - )) - .having(count_distinct(tagged_photo::tag_id).ge(tag_ids.len() as i64)) - .get_results::<(String, i64)>(&mut self.connection) - .map(|results| { - results - .into_iter() - .map(|(file_name, tag_count)| FileWithTagCount { - file_name, - tag_count, - }) - .collect() - }) - .with_context(|| format!("Unable to get Tagged photos with ids: {:?}", tag_ids)) + tagged_photo::table + .filter(tagged_photo::tag_id.eq_any(tag_ids.clone())) + .filter(tagged_photo::photo_name.ne_all(exclude_subquery)) + .group_by(tagged_photo::photo_name) + .select(( + tagged_photo::photo_name, + count_distinct(tagged_photo::tag_id), + )) + .having(count_distinct(tagged_photo::tag_id).ge(tag_ids.len() as i64)) + .get_results::<(String, i64)>(&mut self.connection) + .map(|results| { + results + .into_iter() + .map(|(file_name, tag_count)| FileWithTagCount { + file_name, + tag_count, + }) + .collect() + }) + .with_context(|| format!("Unable to get Tagged photos with ids: {:?}", tag_ids)) + }) } fn get_files_with_any_tag_ids( @@ -389,22 +402,23 @@ impl TagDao for SqliteTagDao { tag_ids: Vec, exclude_tag_ids: Vec, ) -> anyhow::Result> { - use diesel::dsl::*; + trace_db_call("query", "get_files_with_any_tags", || { + use diesel::dsl::*; - let tag_ids_str = tag_ids - .iter() - .map(|id| id.to_string()) - .collect::>() - .join(","); + let tag_ids_str = tag_ids + .iter() + .map(|id| id.to_string()) + .collect::>() + .join(","); - let exclude_tag_ids_str = exclude_tag_ids - .iter() - .map(|id| id.to_string()) - .collect::>() - .join(","); + let exclude_tag_ids_str = exclude_tag_ids + .iter() + .map(|id| id.to_string()) + .collect::>() + .join(","); - let query = sql_query(format!( - r#" + let query = sql_query(format!( + r#" WITH filtered_photos AS ( SELECT DISTINCT photo_name FROM tagged_photo tp @@ -421,12 +435,13 @@ WITH filtered_photos AS ( FROM filtered_photos fp JOIN tagged_photo tp2 ON fp.photo_name = tp2.photo_name GROUP BY fp.photo_name"#, - tag_ids_str, exclude_tag_ids_str - )); + tag_ids_str, exclude_tag_ids_str + )); - // Execute the query: - let results = query.load::(&mut self.connection)?; - Ok(results) + // Execute the query: + let results = query.load::(&mut self.connection)?; + Ok(results) + }) } } -- 2.49.1 From b11d647ffa178c759af7257ac1cf856a740b798b Mon Sep 17 00:00:00 2001 From: Cameron Date: Tue, 3 Jun 2025 12:51:37 -0400 Subject: [PATCH 05/11] Try fixing span context for db call tracing --- src/otel.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/otel.rs b/src/otel.rs index e3e70e9..b464e9c 100644 --- a/src/otel.rs +++ b/src/otel.rs @@ -3,7 +3,7 @@ use actix_web::HttpRequest; use opentelemetry::global::BoxedTracer; use opentelemetry::{global, Context, KeyValue}; use opentelemetry::propagation::TextMapPropagator; -use opentelemetry::trace::Tracer; +use opentelemetry::trace::{Status, TraceContextExt, Tracer}; use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::logs::{BatchLogProcessor, SdkLoggerProvider}; @@ -82,15 +82,26 @@ pub fn extract_context_from_request(req: &HttpRequest) -> Context { propagator.extract(&HeaderExtractor(req.headers())) } -pub fn trace_db_call(operation: &str, query_type: &str, func: F) -> anyhow::Result +pub fn trace_db_call(query_type: &str, operation: &str, func: F) -> anyhow::Result where F: FnOnce() -> anyhow::Result { let tracer = global::tracer("db"); - let _span = tracer - .span_builder(format!("db.{}.{}", operation, query_type)) + let span = tracer + .span_builder(format!("db.{}.{}", query_type, operation)) .with_attributes(vec![ - KeyValue::new("db.operation", operation.to_string().clone()), KeyValue::new("db.query_type", query_type.to_string().clone()), + KeyValue::new("db.operation", operation.to_string().clone()), ]).start(&tracer); - func() + let context = Context::current_with_span(span); + let result = func(); + match &result { + Ok(_) => { + context.span().set_status(Status::Ok); + } + Err(e) => { + context.span().set_status(Status::error(e.to_string())) + } + } + + result } \ No newline at end of file -- 2.49.1 From c4153b404cba3c4014a12fc774c1e4673fa5db76 Mon Sep 17 00:00:00 2001 From: Cameron Date: Tue, 3 Jun 2025 13:02:25 -0400 Subject: [PATCH 06/11] Try using current context --- src/otel.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/otel.rs b/src/otel.rs index b464e9c..0c727f6 100644 --- a/src/otel.rs +++ b/src/otel.rs @@ -1,9 +1,9 @@ use actix_web::http::header::HeaderMap; use actix_web::HttpRequest; use opentelemetry::global::BoxedTracer; -use opentelemetry::{global, Context, KeyValue}; use opentelemetry::propagation::TextMapPropagator; -use opentelemetry::trace::{Status, TraceContextExt, Tracer}; +use opentelemetry::trace::{Span, Status, Tracer}; +use opentelemetry::{global, Context, KeyValue}; use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::logs::{BatchLogProcessor, SdkLoggerProvider}; @@ -85,21 +85,20 @@ pub fn extract_context_from_request(req: &HttpRequest) -> Context { pub fn trace_db_call(query_type: &str, operation: &str, func: F) -> anyhow::Result where F: FnOnce() -> anyhow::Result { let tracer = global::tracer("db"); - let span = tracer + let mut span = tracer .span_builder(format!("db.{}.{}", query_type, operation)) .with_attributes(vec![ KeyValue::new("db.query_type", query_type.to_string().clone()), KeyValue::new("db.operation", operation.to_string().clone()), - ]).start(&tracer); + ]).start_with_context(&tracer, &Context::current()); - let context = Context::current_with_span(span); let result = func(); match &result { Ok(_) => { - context.span().set_status(Status::Ok); + span.set_status(Status::Ok); } Err(e) => { - context.span().set_status(Status::error(e.to_string())) + span.set_status(Status::error(e.to_string())) } } -- 2.49.1 From 6aff7f456abd89b2e49eefac60694377443c543b Mon Sep 17 00:00:00 2001 From: Cameron Date: Tue, 3 Jun 2025 14:06:19 -0400 Subject: [PATCH 07/11] Manually pass the current context around --- src/files.rs | 19 +++++----- src/otel.rs | 5 +-- src/tags.rs | 97 ++++++++++++++++++++++++++++++++-------------------- 3 files changed, 73 insertions(+), 48 deletions(-) diff --git a/src/files.rs b/src/files.rs index b91e8ba..9afef7e 100644 --- a/src/files.rs +++ b/src/files.rs @@ -13,7 +13,7 @@ use actix_web::web::Data; use actix_web::{web::{self, Query}, HttpRequest, HttpResponse}; use log::{debug, error, info, trace}; use opentelemetry::KeyValue; -use opentelemetry::trace::{Span, Status, Tracer}; +use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; use crate::data::{Claims, FilesRequest, FilterMode, PhotosResponse, SortType}; use crate::{create_thumbnails, AppState}; @@ -41,6 +41,7 @@ pub async fn list_photos( let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("list_photos", &context); span.set_attribute(KeyValue::new("path", search_path.to_string())); + let span_context = opentelemetry::Context::current_with_span(span); let search_recursively = req.recursive.unwrap_or(false); if let Some(tag_ids) = &req.tag_ids { @@ -66,8 +67,8 @@ pub async fn list_photos( .collect::>(); return match filter_mode { - FilterMode::Any => dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids), - FilterMode::All => dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids), + FilterMode::Any => dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context), + FilterMode::All => dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context), } .context(format!( "Failed to get files with tag_ids: {:?} with filter_mode: {:?}", @@ -104,8 +105,8 @@ pub async fn list_photos( tagged_files.len(), tagged_files ); - span.set_attribute(KeyValue::new("file_count", tagged_files.len().to_string())); - span.set_status(Status::Ok); + span_context.span().set_attribute(KeyValue::new("file_count", tagged_files.len().to_string())); + span_context.span().set_status(Status::Ok); HttpResponse::Ok().json(PhotosResponse { photos: tagged_files, @@ -138,7 +139,7 @@ pub async fn list_photos( .map(|f| f.to_str().unwrap().to_string()) .map(|file_name| { let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); - let file_tags = tag_dao.get_tags_for_path(&file_name).unwrap_or_default(); + let file_tags = tag_dao.get_tags_for_path(&span_context, &file_name).unwrap_or_default(); (file_name, file_tags) }) @@ -197,8 +198,8 @@ pub async fn list_photos( .map(|f| f.to_str().unwrap().to_string()) .collect::>(); - span.set_attribute(KeyValue::new("file_count", files.len().to_string())); - span.set_status(Status::Ok); + span_context.span().set_attribute(KeyValue::new("file_count", files.len().to_string())); + span_context.span().set_status(Status::Ok); HttpResponse::Ok().json(PhotosResponse { photos: response_files, @@ -206,7 +207,7 @@ pub async fn list_photos( }) } else { error!("Bad photos request: {}", req.path); - span.set_status(Status::error("Invalid path")); + span_context.span().set_status(Status::error("Invalid path")); HttpResponse::BadRequest().finish() } } diff --git a/src/otel.rs b/src/otel.rs index 0c727f6..01c8cbe 100644 --- a/src/otel.rs +++ b/src/otel.rs @@ -82,15 +82,16 @@ pub fn extract_context_from_request(req: &HttpRequest) -> Context { propagator.extract(&HeaderExtractor(req.headers())) } -pub fn trace_db_call(query_type: &str, operation: &str, func: F) -> anyhow::Result +pub fn trace_db_call(context: &Context, query_type: &str, operation: &str, func: F) -> anyhow::Result where F: FnOnce() -> anyhow::Result { + let tracer = global::tracer("db"); let mut span = tracer .span_builder(format!("db.{}.{}", query_type, operation)) .with_attributes(vec![ KeyValue::new("db.query_type", query_type.to_string().clone()), KeyValue::new("db.operation", operation.to_string().clone()), - ]).start_with_context(&tracer, &Context::current()); + ]).start_with_context(&tracer, context); let result = func(); match &result { diff --git a/src/tags.rs b/src/tags.rs index a2cdeae..96f146e 100644 --- a/src/tags.rs +++ b/src/tags.rs @@ -1,14 +1,15 @@ use crate::data::GetTagsRequest; -use crate::otel::trace_db_call; +use crate::otel::{extract_context_from_request, global_tracer, trace_db_call}; use crate::{connect, data::AddTagRequest, error::IntoHttpError, schema, Claims, ThumbnailRequest}; use actix_web::dev::{ServiceFactory, ServiceRequest}; -use actix_web::{web, App, HttpResponse, Responder}; +use actix_web::{web, App, HttpRequest, HttpResponse, Responder}; use anyhow::Context; use chrono::Utc; use diesel::dsl::count_star; use diesel::prelude::*; use diesel::sql_types::*; use log::{debug, info, trace}; +use opentelemetry::trace::Tracer; use schema::{tagged_photo, tags}; use serde::{Deserialize, Serialize}; use std::borrow::BorrowMut; @@ -30,35 +31,41 @@ where async fn add_tag( _: Claims, + request: HttpRequest, body: web::Json, tag_dao: web::Data>, ) -> impl Responder { + let tracer = global_tracer(); + let context = extract_context_from_request(&request); + let _ = tracer.start_with_context("add_tag", &context); let tag_name = body.tag_name.clone(); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .get_all_tags(None) + .get_all_tags(&context, None) .and_then(|tags| { if let Some((_, tag)) = tags.iter().find(|t| t.1.name == tag_name) { Ok(tag.clone()) } else { - tag_dao.create_tag(tag_name.trim()) + tag_dao.create_tag(&context, tag_name.trim()) } }) - .and_then(|tag| tag_dao.tag_file(&body.file_name, tag.id)) + .and_then(|tag| tag_dao.tag_file(&context, &body.file_name, tag.id)) .map(|_| HttpResponse::Ok()) .into_http_internal_err() } async fn get_tags( _: Claims, + http_request: HttpRequest, request: web::Query, tag_dao: web::Data>, ) -> impl Responder { + let context = extract_context_from_request(&http_request); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .get_tags_for_path(&request.path) + .get_tags_for_path(&context, &request.path) .map(|tags| HttpResponse::Ok().json(tags)) .into_http_internal_err() } @@ -66,11 +73,13 @@ async fn get_tags( async fn get_all_tags( _: Claims, tag_dao: web::Data>, + request: HttpRequest, query: web::Query, ) -> impl Responder { + let context = extract_context_from_request(&request); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .get_all_tags(query.path.clone()) + .get_all_tags(&context, query.path.clone()) .map(|tags| { HttpResponse::Ok().json( tags.iter() @@ -86,12 +95,14 @@ async fn get_all_tags( async fn remove_tagged_photo( _: Claims, + http_request: HttpRequest, request: web::Json, tag_dao: web::Data>, ) -> impl Responder { + let context = extract_context_from_request(&http_request); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .remove_tag(&request.tag_name, &request.file_name) + .remove_tag(&context, &request.tag_name, &request.file_name) .map(|result| { if result.is_some() { HttpResponse::Ok() @@ -105,12 +116,14 @@ async fn remove_tagged_photo( async fn update_tags( _: Claims, tag_dao: web::Data>, + http_request: HttpRequest, request: web::Json, ) -> impl Responder { let mut dao = tag_dao.lock().expect("Unable to get TagDao"); + let context = extract_context_from_request(&http_request); - dao.get_tags_for_path(&request.file_name) - .and_then(|existing_tags| dao.get_all_tags(None).map(|all| (existing_tags, all))) + dao.get_tags_for_path(&context, &request.file_name) + .and_then(|existing_tags| dao.get_all_tags(&context, None).map(|all| (existing_tags, all))) .map(|(existing_tags, all_tags)| { let tags_to_remove = existing_tags .iter() @@ -122,7 +135,7 @@ async fn update_tags( "Removing tag {:?} from file: {:?}", tag.name, request.file_name ); - dao.remove_tag(&tag.name, &request.file_name) + dao.remove_tag(&context, &tag.name, &request.file_name) .unwrap_or_else(|err| panic!("{:?} Unable to remove tag {:?}", err, &tag.name)); } @@ -137,7 +150,7 @@ async fn update_tags( new_tag.name, request.file_name ); - dao.tag_file(&request.file_name, new_tag.id) + dao.tag_file(&context, &request.file_name, new_tag.id) .with_context(|| { format!( "Unable to tag file {:?} with tag: {:?}", @@ -195,20 +208,26 @@ pub struct AddTagsRequest { } pub trait TagDao { - fn get_all_tags(&mut self, path: Option) -> anyhow::Result>; - fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result>; - fn create_tag(&mut self, name: &str) -> anyhow::Result; - fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result>; - fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result; + fn get_all_tags( + &mut self, + context: &opentelemetry::Context, + path: Option, + ) -> anyhow::Result>; + fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result>; + fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result; + fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result>; + fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result; fn get_files_with_all_tag_ids( &mut self, tag_ids: Vec, exclude_tag_ids: Vec, + context: &opentelemetry::Context, ) -> anyhow::Result>; fn get_files_with_any_tag_ids( &mut self, tag_ids: Vec, exclude_tag_ids: Vec, + context: &opentelemetry::Context, ) -> anyhow::Result>; } @@ -229,10 +248,10 @@ impl Default for SqliteTagDao { } impl TagDao for SqliteTagDao { - fn get_all_tags(&mut self, path: Option) -> anyhow::Result> { + fn get_all_tags(&mut self, context: &opentelemetry::Context, path: Option) -> anyhow::Result> { // select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*); - trace_db_call("query", "get_all_tags", || { + trace_db_call(&context, "query", "get_all_tags", || { let path = path.map(|p| p + "%").unwrap_or("%".to_string()); let (id, name, created_time) = tags::all_columns; tags::table @@ -260,8 +279,8 @@ impl TagDao for SqliteTagDao { }) } - fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result> { - trace_db_call("query", "get_tags_for_path", || { + fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result> { + trace_db_call(&context, "query", "get_tags_for_path", || { trace!("Getting Tags for path: {:?}", path); tags::table .left_join(tagged_photo::table) @@ -272,8 +291,8 @@ impl TagDao for SqliteTagDao { }) } - fn create_tag(&mut self, name: &str) -> anyhow::Result { - trace_db_call("insert", "create_tag", || { + fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result { + trace_db_call(&context, "insert", "create_tag", || { diesel::insert_into(tags::table) .values(InsertTag { name: name.to_string(), @@ -303,8 +322,8 @@ impl TagDao for SqliteTagDao { }) } - fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result> { - trace_db_call("delete", "remove_tag", || { + fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result> { + trace_db_call(&context, "delete", "remove_tag", || { tags::table .filter(tags::name.eq(tag_name)) .get_result::(self.connection.borrow_mut()) @@ -328,8 +347,8 @@ impl TagDao for SqliteTagDao { }) } - fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result { - trace_db_call("insert", "tag_file", || { + fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result { + trace_db_call(&context, "insert", "tag_file", || { diesel::insert_into(tagged_photo::table) .values(InsertTaggedPhoto { tag_id, @@ -365,8 +384,9 @@ impl TagDao for SqliteTagDao { &mut self, tag_ids: Vec, exclude_tag_ids: Vec, + context: &opentelemetry::Context, ) -> anyhow::Result> { - trace_db_call("query", "get_files_with_all_tags", || { + trace_db_call(&context, "query", "get_files_with_all_tags", || { use diesel::dsl::*; let exclude_subquery = tagged_photo::table @@ -401,8 +421,9 @@ impl TagDao for SqliteTagDao { &mut self, tag_ids: Vec, exclude_tag_ids: Vec, + context: &opentelemetry::Context, ) -> anyhow::Result> { - trace_db_call("query", "get_files_with_any_tags", || { + trace_db_call(&context, "query", "get_files_with_any_tags", || { use diesel::dsl::*; let tag_ids_str = tag_ids @@ -472,7 +493,7 @@ mod tests { } impl TagDao for TestTagDao { - fn get_all_tags(&mut self, _option: Option) -> anyhow::Result> { + fn get_all_tags(&mut self, context: &opentelemetry::Context, _option: Option) -> anyhow::Result> { Ok(self .tags .borrow() @@ -482,7 +503,7 @@ mod tests { .clone()) } - fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result> { + fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result> { info!("Getting test tags for: {:?}", path); warn!("Tags for path: {:?}", self.tagged_photos); @@ -494,7 +515,7 @@ mod tests { .clone()) } - fn create_tag(&mut self, name: &str) -> anyhow::Result { + fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result { self.tag_count += 1; let tag_id = self.tag_count; @@ -510,7 +531,7 @@ mod tests { Ok(tag) } - fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result> { + fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result> { let mut clone = { let photo_tags = &self.tagged_photos.borrow()[path]; photo_tags.clone() @@ -530,7 +551,7 @@ mod tests { } } - fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result { + fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result { debug!("Tagging file: {:?} with tag_id: {:?}", path, tag_id); if let Some(tag) = self.tags.borrow().iter().find(|t| t.id == tag_id) { @@ -566,15 +587,17 @@ mod tests { fn get_files_with_all_tag_ids( &mut self, tag_ids: Vec, - _exclude_tag_ids: Vec, + exclude_tag_ids: Vec, + context: &opentelemetry::Context, ) -> anyhow::Result> { todo!() } fn get_files_with_any_tag_ids( &mut self, - _tag_ids: Vec, - _exclude_tag_ids: Vec, + tag_ids: Vec, + exclude_tag_ids: Vec, + context: &opentelemetry::Context, ) -> anyhow::Result> { todo!() } -- 2.49.1 From 7c882fd31c32ce52d99d168be16bd12c70da78c9 Mon Sep 17 00:00:00 2001 From: Cameron Date: Tue, 3 Jun 2025 15:23:39 -0400 Subject: [PATCH 08/11] Add webp files and improve logging --- src/files.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/files.rs b/src/files.rs index 9afef7e..af2a47a 100644 --- a/src/files.rs +++ b/src/files.rs @@ -47,7 +47,7 @@ pub async fn list_photos( if let Some(tag_ids) = &req.tag_ids { if search_recursively { let filter_mode = &req.tag_filter_mode.unwrap_or(FilterMode::Any); - debug!( + info!( "Searching for tags: {}. With path: '{}' and filter mode: {:?}", tag_ids, search_path, filter_mode ); @@ -75,7 +75,7 @@ pub async fn list_photos( tag_ids, filter_mode )) .inspect(|files| { - debug!( + info!( "Found {:?} tagged files, filtering down by search path {:?}", files.len(), search_path @@ -100,7 +100,7 @@ pub async fn list_photos( .map(|files| sort(files, req.sort.unwrap_or(NameAsc))) .inspect(|files| debug!("Found {:?} files", files.len())) .map(|tagged_files: Vec| { - trace!( + info!( "Found {:?} tagged files: {:?}", tagged_files.len(), tagged_files @@ -119,7 +119,7 @@ pub async fn list_photos( } if let Ok(files) = file_system.get_files_for_path(search_path) { - debug!("Valid search path: {:?}", search_path); + info!("Found {:?} files in path: {:?}", files.len(), search_path); let photos = files .iter() @@ -240,6 +240,7 @@ pub fn list_files(dir: &Path) -> io::Result> { let mut span = tracer.start("list_files"); let dir_name_string = dir.to_str().unwrap_or_default().to_string(); span.set_attribute(KeyValue::new("dir", dir_name_string)); + info!("Listing files in: {:?}", dir); let files = read_dir(dir)? .filter_map(|res| res.ok()) @@ -248,6 +249,7 @@ pub fn list_files(dir: &Path) -> io::Result> { .collect::>(); span.set_attribute(KeyValue::new("file_count", files.len().to_string())); + info!("Found {:?} files in directory: {:?}", files.len(), dir); Ok(files) } @@ -263,6 +265,7 @@ pub fn is_image_or_video(path: &Path) -> bool { || extension == "mp4" || extension == "mov" || extension == "nef" + || extension == "webp" } pub fn is_valid_full_path + Debug + AsRef>( @@ -319,6 +322,8 @@ pub async fn move_file( app_state: Data, request: web::Json, ) -> HttpResponse { + info!("Moving file: {:?}", request); + match is_valid_full_path(&app_state.base_path, &request.source, false) .ok_or(ErrorKind::InvalidData) .and_then(|source| { @@ -358,7 +363,7 @@ pub async fn move_file( } } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] pub struct MoveFileRequest { source: String, destination: String, -- 2.49.1 From 2c553a8016ccec50d035f6fa06f0dc93a9430682 Mon Sep 17 00:00:00 2001 From: Cameron Date: Wed, 4 Jun 2025 15:05:23 -0400 Subject: [PATCH 09/11] Additional logging and tracing enhancements --- src/files.rs | 71 ++++++++++++++++----- src/main.rs | 8 ++- src/otel.rs | 29 +++++---- src/tags.rs | 176 ++++++++++++++++++++++++++++++++++++++++----------- src/video.rs | 15 +++++ 5 files changed, 231 insertions(+), 68 deletions(-) diff --git a/src/files.rs b/src/files.rs index af2a47a..1f3f22f 100644 --- a/src/files.rs +++ b/src/files.rs @@ -9,23 +9,26 @@ use ::anyhow; use actix::{Handler, Message}; use anyhow::{anyhow, Context}; -use actix_web::web::Data; -use actix_web::{web::{self, Query}, HttpRequest, HttpResponse}; -use log::{debug, error, info, trace}; -use opentelemetry::KeyValue; -use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; use crate::data::{Claims, FilesRequest, FilterMode, PhotosResponse, SortType}; use crate::{create_thumbnails, AppState}; +use actix_web::web::Data; +use actix_web::{ + web::{self, Query}, + HttpRequest, HttpResponse, +}; +use log::{debug, error, info, trace}; +use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; +use opentelemetry::KeyValue; use crate::data::SortType::NameAsc; use crate::error::IntoHttpError; +use crate::otel::{extract_context_from_request, global_tracer}; use crate::tags::{FileWithTagCount, TagDao}; use crate::video::StreamActor; use path_absolutize::*; use rand::prelude::SliceRandom; use rand::thread_rng; use serde::Deserialize; -use crate::otel::{extract_context_from_request, global_tracer}; pub async fn list_photos( _: Claims, @@ -36,11 +39,28 @@ pub async fn list_photos( tag_dao: web::Data>, ) -> HttpResponse { let search_path = &req.path; - + let tracer = global_tracer(); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("list_photos", &context); - span.set_attribute(KeyValue::new("path", search_path.to_string())); + span.set_attributes(vec![ + KeyValue::new("path", search_path.to_string()), + KeyValue::new("recursive", req.recursive.unwrap_or(false).to_string()), + KeyValue::new( + "tag_ids", + req.tag_ids.clone().unwrap_or_default().to_string(), + ), + KeyValue::new( + "tag_filter_mode", + format!("{:?}", req.tag_filter_mode.unwrap_or(FilterMode::Any)), + ), + KeyValue::new( + "exclude_tag_ids", + req.exclude_tag_ids.clone().unwrap_or_default().to_string(), + ), + KeyValue::new("sort", format!("{:?}", &req.sort.unwrap_or(NameAsc))), + ]); + let span_context = opentelemetry::Context::current_with_span(span); let search_recursively = req.recursive.unwrap_or(false); @@ -67,8 +87,12 @@ pub async fn list_photos( .collect::>(); return match filter_mode { - FilterMode::Any => dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context), - FilterMode::All => dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context), + FilterMode::Any => { + dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context) + } + FilterMode::All => { + dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context) + } } .context(format!( "Failed to get files with tag_ids: {:?} with filter_mode: {:?}", @@ -105,7 +129,9 @@ pub async fn list_photos( tagged_files.len(), tagged_files ); - span_context.span().set_attribute(KeyValue::new("file_count", tagged_files.len().to_string())); + span_context + .span() + .set_attribute(KeyValue::new("file_count", tagged_files.len().to_string())); span_context.span().set_status(Status::Ok); HttpResponse::Ok().json(PhotosResponse { @@ -139,7 +165,9 @@ pub async fn list_photos( .map(|f| f.to_str().unwrap().to_string()) .map(|file_name| { let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); - let file_tags = tag_dao.get_tags_for_path(&span_context, &file_name).unwrap_or_default(); + let file_tags = tag_dao + .get_tags_for_path(&span_context, &file_name) + .unwrap_or_default(); (file_name, file_tags) }) @@ -198,16 +226,20 @@ pub async fn list_photos( .map(|f| f.to_str().unwrap().to_string()) .collect::>(); - span_context.span().set_attribute(KeyValue::new("file_count", files.len().to_string())); + span_context + .span() + .set_attribute(KeyValue::new("file_count", files.len().to_string())); span_context.span().set_status(Status::Ok); - + HttpResponse::Ok().json(PhotosResponse { photos: response_files, dirs, }) } else { error!("Bad photos request: {}", req.path); - span_context.span().set_status(Status::error("Invalid path")); + span_context + .span() + .set_status(Status::error("Invalid path")); HttpResponse::BadRequest().finish() } } @@ -241,7 +273,7 @@ pub fn list_files(dir: &Path) -> io::Result> { let dir_name_string = dir.to_str().unwrap_or_default().to_string(); span.set_attribute(KeyValue::new("dir", dir_name_string)); info!("Listing files in: {:?}", dir); - + let files = read_dir(dir)? .filter_map(|res| res.ok()) .filter(|entry| is_image_or_video(&entry.path()) || entry.file_type().unwrap().is_dir()) @@ -249,6 +281,7 @@ pub fn list_files(dir: &Path) -> io::Result> { .collect::>(); span.set_attribute(KeyValue::new("file_count", files.len().to_string())); + span.set_status(Status::Ok); info!("Found {:?} files in directory: {:?}", files.len(), dir); Ok(files) } @@ -395,7 +428,11 @@ impl FileSystemAccess for RealFileSystem { } fn move_file>(&self, from: P, destination: P) -> anyhow::Result<()> { - info!("Moving file: '{:?}' -> '{:?}'", from.as_ref(), destination.as_ref()); + info!( + "Moving file: '{:?}' -> '{:?}'", + from.as_ref(), + destination.as_ref() + ); let name = from .as_ref() .file_name() diff --git a/src/main.rs b/src/main.rs index e7f7a1e..50cba73 100644 --- a/src/main.rs +++ b/src/main.rs @@ -676,7 +676,13 @@ fn watch_files() { let ev = wrx.recv(); if let Ok(Ok(event)) = ev { match event.kind { - EventKind::Create(_) => create_thumbnails(), + EventKind::Create(create_kind) => { + info!( + "Creating thumbnails {:?} create event kind: {:?}", + event.paths, create_kind + ); + create_thumbnails(); + } EventKind::Modify(kind) => { debug!("All modified paths: {:?}", event.paths); debug!("Modify kind: {:?}", kind); diff --git a/src/otel.rs b/src/otel.rs index 01c8cbe..b93fa31 100644 --- a/src/otel.rs +++ b/src/otel.rs @@ -1,6 +1,6 @@ use actix_web::http::header::HeaderMap; use actix_web::HttpRequest; -use opentelemetry::global::BoxedTracer; +use opentelemetry::global::{BoxedSpan, BoxedTracer}; use opentelemetry::propagation::TextMapPropagator; use opentelemetry::trace::{Span, Status, Tracer}; use opentelemetry::{global, Context, KeyValue}; @@ -82,26 +82,31 @@ pub fn extract_context_from_request(req: &HttpRequest) -> Context { propagator.extract(&HeaderExtractor(req.headers())) } -pub fn trace_db_call(context: &Context, query_type: &str, operation: &str, func: F) -> anyhow::Result -where F: FnOnce() -> anyhow::Result { - +pub fn trace_db_call( + context: &Context, + query_type: &str, + operation: &str, + func: F, +) -> anyhow::Result +where + F: FnOnce(&mut BoxedSpan) -> anyhow::Result, +{ let tracer = global::tracer("db"); let mut span = tracer .span_builder(format!("db.{}.{}", query_type, operation)) .with_attributes(vec![ KeyValue::new("db.query_type", query_type.to_string().clone()), KeyValue::new("db.operation", operation.to_string().clone()), - ]).start_with_context(&tracer, context); - - let result = func(); + ]) + .start_with_context(&tracer, context); + + let result = func(&mut span); match &result { Ok(_) => { span.set_status(Status::Ok); } - Err(e) => { - span.set_status(Status::error(e.to_string())) - } + Err(e) => span.set_status(Status::error(e.to_string())), } - + result -} \ No newline at end of file +} diff --git a/src/tags.rs b/src/tags.rs index 96f146e..4c6dac6 100644 --- a/src/tags.rs +++ b/src/tags.rs @@ -8,8 +8,9 @@ use chrono::Utc; use diesel::dsl::count_star; use diesel::prelude::*; use diesel::sql_types::*; -use log::{debug, info, trace}; -use opentelemetry::trace::Tracer; +use log::{debug, info}; +use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; +use opentelemetry::KeyValue; use schema::{tagged_photo, tags}; use serde::{Deserialize, Serialize}; use std::borrow::BorrowMut; @@ -37,22 +38,30 @@ async fn add_tag( ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); - let _ = tracer.start_with_context("add_tag", &context); + let span = tracer.start_with_context("add_tag", &context); + let span_context = opentelemetry::Context::current_with_span(span); let tag_name = body.tag_name.clone(); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .get_all_tags(&context, None) + .get_all_tags(&span_context, None) .and_then(|tags| { if let Some((_, tag)) = tags.iter().find(|t| t.1.name == tag_name) { Ok(tag.clone()) } else { - tag_dao.create_tag(&context, tag_name.trim()) + info!( + "Creating missing tag: '{:?}' for file: '{}'", + tag_name, &body.file_name + ); + tag_dao.create_tag(&span_context, tag_name.trim()) } }) - .and_then(|tag| tag_dao.tag_file(&context, &body.file_name, tag.id)) - .map(|_| HttpResponse::Ok()) + .and_then(|tag| tag_dao.tag_file(&span_context, &body.file_name, tag.id)) + .map(|_| { + span_context.span().set_status(Status::Ok); + HttpResponse::Ok() + }) .into_http_internal_err() } @@ -63,10 +72,15 @@ async fn get_tags( tag_dao: web::Data>, ) -> impl Responder { let context = extract_context_from_request(&http_request); + let span = global_tracer().start_with_context("get_tags", &context); + let span_context = opentelemetry::Context::current_with_span(span); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .get_tags_for_path(&context, &request.path) - .map(|tags| HttpResponse::Ok().json(tags)) + .get_tags_for_path(&span_context, &request.path) + .map(|tags| { + span_context.span().set_status(Status::Ok); + HttpResponse::Ok().json(tags) + }) .into_http_internal_err() } @@ -77,10 +91,15 @@ async fn get_all_tags( query: web::Query, ) -> impl Responder { let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("get_all_tags", &context); + let span_context = opentelemetry::Context::current_with_span(span); + let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .get_all_tags(&context, query.path.clone()) + .get_all_tags(&span_context, query.path.clone()) .map(|tags| { + span_context.span().set_status(Status::Ok); + HttpResponse::Ok().json( tags.iter() .map(|(tag_count, tag)| TagWithTagCount { @@ -100,10 +119,15 @@ async fn remove_tagged_photo( tag_dao: web::Data>, ) -> impl Responder { let context = extract_context_from_request(&http_request); + let span = global_tracer().start_with_context("remove_tagged_photo", &context); + let span_context = opentelemetry::Context::current_with_span(span); + let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .remove_tag(&context, &request.tag_name, &request.file_name) + .remove_tag(&span_context, &request.tag_name, &request.file_name) .map(|result| { + span_context.span().set_status(Status::Ok); + if result.is_some() { HttpResponse::Ok() } else { @@ -121,9 +145,14 @@ async fn update_tags( ) -> impl Responder { let mut dao = tag_dao.lock().expect("Unable to get TagDao"); let context = extract_context_from_request(&http_request); + let span = global_tracer().start_with_context("update_tags", &context); + let span_context = opentelemetry::Context::current_with_span(span); - dao.get_tags_for_path(&context, &request.file_name) - .and_then(|existing_tags| dao.get_all_tags(&context, None).map(|all| (existing_tags, all))) + dao.get_tags_for_path(&span_context, &request.file_name) + .and_then(|existing_tags| { + dao.get_all_tags(&span_context, None) + .map(|all| (existing_tags, all)) + }) .map(|(existing_tags, all_tags)| { let tags_to_remove = existing_tags .iter() @@ -135,7 +164,7 @@ async fn update_tags( "Removing tag {:?} from file: {:?}", tag.name, request.file_name ); - dao.remove_tag(&context, &tag.name, &request.file_name) + dao.remove_tag(&span_context, &tag.name, &request.file_name) .unwrap_or_else(|err| panic!("{:?} Unable to remove tag {:?}", err, &tag.name)); } @@ -150,7 +179,7 @@ async fn update_tags( new_tag.name, request.file_name ); - dao.tag_file(&context, &request.file_name, new_tag.id) + dao.tag_file(&span_context, &request.file_name, new_tag.id) .with_context(|| { format!( "Unable to tag file {:?} with tag: {:?}", @@ -160,6 +189,7 @@ async fn update_tags( .unwrap(); } + span_context.span().set_status(Status::Ok); HttpResponse::Ok() }) .into_http_internal_err() @@ -213,10 +243,24 @@ pub trait TagDao { context: &opentelemetry::Context, path: Option, ) -> anyhow::Result>; - fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result>; + fn get_tags_for_path( + &mut self, + context: &opentelemetry::Context, + path: &str, + ) -> anyhow::Result>; fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result; - fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result>; - fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result; + fn remove_tag( + &mut self, + context: &opentelemetry::Context, + tag_name: &str, + path: &str, + ) -> anyhow::Result>; + fn tag_file( + &mut self, + context: &opentelemetry::Context, + path: &str, + tag_id: i32, + ) -> anyhow::Result; fn get_files_with_all_tag_ids( &mut self, tag_ids: Vec, @@ -248,10 +292,16 @@ impl Default for SqliteTagDao { } impl TagDao for SqliteTagDao { - fn get_all_tags(&mut self, context: &opentelemetry::Context, path: Option) -> anyhow::Result> { + fn get_all_tags( + &mut self, + context: &opentelemetry::Context, + path: Option, + ) -> anyhow::Result> { // select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*); - trace_db_call(&context, "query", "get_all_tags", || { + trace_db_call(&context, "query", "get_all_tags", |span| { + span.set_attribute(KeyValue::new("path", path.clone().unwrap_or_default())); + let path = path.map(|p| p + "%").unwrap_or("%".to_string()); let (id, name, created_time) = tags::all_columns; tags::table @@ -279,9 +329,15 @@ impl TagDao for SqliteTagDao { }) } - fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result> { - trace_db_call(&context, "query", "get_tags_for_path", || { - trace!("Getting Tags for path: {:?}", path); + fn get_tags_for_path( + &mut self, + context: &opentelemetry::Context, + path: &str, + ) -> anyhow::Result> { + trace_db_call(&context, "query", "get_tags_for_path", |span| { + span.set_attribute(KeyValue::new("path", path.to_string())); + + debug!("Getting Tags for path: {:?}", path); tags::table .left_join(tagged_photo::table) .filter(tagged_photo::photo_name.eq(&path)) @@ -292,7 +348,9 @@ impl TagDao for SqliteTagDao { } fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result { - trace_db_call(&context, "insert", "create_tag", || { + trace_db_call(&context, "insert", "create_tag", |span| { + span.set_attribute(KeyValue::new("name", name.to_string())); + diesel::insert_into(tags::table) .values(InsertTag { name: name.to_string(), @@ -322,8 +380,18 @@ impl TagDao for SqliteTagDao { }) } - fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result> { - trace_db_call(&context, "delete", "remove_tag", || { + fn remove_tag( + &mut self, + context: &opentelemetry::Context, + tag_name: &str, + path: &str, + ) -> anyhow::Result> { + trace_db_call(&context, "delete", "remove_tag", |span| { + span.set_attributes(vec![ + KeyValue::new("tag_name", tag_name.to_string()), + KeyValue::new("path", path.to_string()), + ]); + tags::table .filter(tags::name.eq(tag_name)) .get_result::(self.connection.borrow_mut()) @@ -347,8 +415,18 @@ impl TagDao for SqliteTagDao { }) } - fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result { - trace_db_call(&context, "insert", "tag_file", || { + fn tag_file( + &mut self, + context: &opentelemetry::Context, + path: &str, + tag_id: i32, + ) -> anyhow::Result { + trace_db_call(&context, "insert", "tag_file", |span| { + span.set_attributes(vec![ + KeyValue::new("path", path.to_string()), + KeyValue::new("tag_id", tag_id.to_string()), + ]); + diesel::insert_into(tagged_photo::table) .values(InsertTaggedPhoto { tag_id, @@ -386,7 +464,7 @@ impl TagDao for SqliteTagDao { exclude_tag_ids: Vec, context: &opentelemetry::Context, ) -> anyhow::Result> { - trace_db_call(&context, "query", "get_files_with_all_tags", || { + trace_db_call(&context, "query", "get_files_with_all_tags", |_| { use diesel::dsl::*; let exclude_subquery = tagged_photo::table @@ -423,7 +501,7 @@ impl TagDao for SqliteTagDao { exclude_tag_ids: Vec, context: &opentelemetry::Context, ) -> anyhow::Result> { - trace_db_call(&context, "query", "get_files_with_any_tags", || { + trace_db_call(&context, "query", "get_files_with_any_tags", |_| { use diesel::dsl::*; let tag_ids_str = tag_ids @@ -445,12 +523,12 @@ WITH filtered_photos AS ( FROM tagged_photo tp WHERE tp.tag_id IN ({}) AND tp.photo_name NOT IN ( - SELECT photo_name - FROM tagged_photo + SELECT photo_name + FROM tagged_photo WHERE tag_id IN ({}) ) ) - SELECT + SELECT fp.photo_name as file_name, COUNT(DISTINCT tp2.tag_id) as tag_count FROM filtered_photos fp @@ -493,7 +571,11 @@ mod tests { } impl TagDao for TestTagDao { - fn get_all_tags(&mut self, context: &opentelemetry::Context, _option: Option) -> anyhow::Result> { + fn get_all_tags( + &mut self, + context: &opentelemetry::Context, + _option: Option, + ) -> anyhow::Result> { Ok(self .tags .borrow() @@ -503,7 +585,11 @@ mod tests { .clone()) } - fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result> { + fn get_tags_for_path( + &mut self, + context: &opentelemetry::Context, + path: &str, + ) -> anyhow::Result> { info!("Getting test tags for: {:?}", path); warn!("Tags for path: {:?}", self.tagged_photos); @@ -515,7 +601,11 @@ mod tests { .clone()) } - fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result { + fn create_tag( + &mut self, + context: &opentelemetry::Context, + name: &str, + ) -> anyhow::Result { self.tag_count += 1; let tag_id = self.tag_count; @@ -531,7 +621,12 @@ mod tests { Ok(tag) } - fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result> { + fn remove_tag( + &mut self, + context: &opentelemetry::Context, + tag_name: &str, + path: &str, + ) -> anyhow::Result> { let mut clone = { let photo_tags = &self.tagged_photos.borrow()[path]; photo_tags.clone() @@ -551,7 +646,12 @@ mod tests { } } - fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result { + fn tag_file( + &mut self, + context: &opentelemetry::Context, + path: &str, + tag_id: i32, + ) -> anyhow::Result { debug!("Tagging file: {:?} with tag_id: {:?}", path, tag_id); if let Some(tag) = self.tags.borrow().iter().find(|t| t.id == tag_id) { diff --git a/src/video.rs b/src/video.rs index b9112da..87ca6ce 100644 --- a/src/video.rs +++ b/src/video.rs @@ -1,7 +1,10 @@ use crate::is_video; +use crate::otel::global_tracer; use actix::prelude::*; use futures::TryFutureExt; use log::{debug, error, info, trace, warn}; +use opentelemetry::trace::{Span, Tracer}; +use opentelemetry::KeyValue; use std::io::Result; use std::path::{Path, PathBuf}; use std::process::{Child, Command, ExitStatus, Stdio}; @@ -122,6 +125,9 @@ impl Handler for VideoPlaylistManager { type Result = ResponseFuture<()>; fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result { + let tracer = global_tracer(); + let mut span = tracer.start("videoplaylistmanager.scan_directory"); + let start = std::time::Instant::now(); info!( "Starting scan directory for video playlist generation: {}", @@ -157,6 +163,11 @@ impl Handler for VideoPlaylistManager { .expect("Failed to send generate playlist message") { Ok(_) => { + span.add_event( + "Playlist generated", + vec![KeyValue::new("video_path", path_as_str.to_string())], + ); + debug!( "Successfully generated playlist for file: '{}'", path_as_str @@ -168,6 +179,10 @@ impl Handler for VideoPlaylistManager { } } + span.add_event( + "Finished directory scan", + vec![KeyValue::new("directory", scan_dir_name.to_string())], + ); info!( "Finished directory scan of '{}' in {:?}", scan_dir_name, -- 2.49.1 From a25ffcc35103a2638b840aa6a8891e2905c0065e Mon Sep 17 00:00:00 2001 From: Cameron Date: Thu, 12 Jun 2025 13:12:01 -0400 Subject: [PATCH 10/11] Add enhanced logging and tracing for playlist generation --- src/video.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/video.rs b/src/video.rs index 87ca6ce..79483bc 100644 --- a/src/video.rs +++ b/src/video.rs @@ -3,7 +3,7 @@ use crate::otel::global_tracer; use actix::prelude::*; use futures::TryFutureExt; use log::{debug, error, info, trace, warn}; -use opentelemetry::trace::{Span, Tracer}; +use opentelemetry::trace::{Span, Status, Tracer}; use opentelemetry::KeyValue; use std::io::Result; use std::path::{Path, PathBuf}; @@ -234,6 +234,16 @@ impl Handler for PlaylistGenerator { playlist_path, msg.video_path.file_name().unwrap().to_str().unwrap() ); + + let tracer = global_tracer(); + let mut span = tracer + .span_builder("playlistgenerator.generate_playlist") + .with_attributes(vec![ + KeyValue::new("video_file", video_file.clone()), + KeyValue::new("playlist_file", playlist_file.clone()), + ]) + .start(&tracer); + Box::pin(async move { let wait_start = std::time::Instant::now(); let permit = semaphore @@ -245,9 +255,16 @@ impl Handler for PlaylistGenerator { "Waited for {:?} before starting ffmpeg", wait_start.elapsed() ); + span.add_event("Waited for FFMPEG semaphore", vec![ + KeyValue::new("wait_time", wait_start.elapsed().as_secs_f64()) + ]); if Path::new(&playlist_file).exists() { debug!("Playlist already exists: {}", playlist_file); + span.set_status(Status::error(format!( + "Playlist already exists: {}", + playlist_file + ))); return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists)); } @@ -281,6 +298,8 @@ impl Handler for PlaylistGenerator { if let Ok(ref res) = ffmpeg_result { debug!("ffmpeg output: {:?}", res); } + + span.set_status(Status::Ok); ffmpeg_result }); -- 2.49.1 From 97a07e11caecb4129a5fe5d4a1a76a4badd7d42c Mon Sep 17 00:00:00 2001 From: Cameron Date: Thu, 12 Jun 2025 15:01:07 -0400 Subject: [PATCH 11/11] Fix SQL injection vulnerability in a tag query --- src/tags.rs | 64 ++++++++++++++++++++++++++++------------------------ src/video.rs | 16 ++++++++----- 2 files changed, 45 insertions(+), 35 deletions(-) diff --git a/src/tags.rs b/src/tags.rs index 4c6dac6..9c2ea59 100644 --- a/src/tags.rs +++ b/src/tags.rs @@ -503,43 +503,49 @@ impl TagDao for SqliteTagDao { ) -> anyhow::Result> { trace_db_call(&context, "query", "get_files_with_any_tags", |_| { use diesel::dsl::*; - - let tag_ids_str = tag_ids - .iter() - .map(|id| id.to_string()) + // Create the placeholders for the IN clauses + let tag_placeholders = std::iter::repeat("?") + .take(tag_ids.len()) .collect::>() .join(","); - - let exclude_tag_ids_str = exclude_tag_ids - .iter() - .map(|id| id.to_string()) + let exclude_placeholders = std::iter::repeat("?") + .take(exclude_tag_ids.len()) .collect::>() .join(","); let query = sql_query(format!( r#" -WITH filtered_photos AS ( - SELECT DISTINCT photo_name - FROM tagged_photo tp - WHERE tp.tag_id IN ({}) - AND tp.photo_name NOT IN ( - SELECT photo_name - FROM tagged_photo - WHERE tag_id IN ({}) - ) - ) - SELECT - fp.photo_name as file_name, - COUNT(DISTINCT tp2.tag_id) as tag_count - FROM filtered_photos fp - JOIN tagged_photo tp2 ON fp.photo_name = tp2.photo_name - GROUP BY fp.photo_name"#, - tag_ids_str, exclude_tag_ids_str - )); + WITH filtered_photos AS ( + SELECT DISTINCT photo_name + FROM tagged_photo tp + WHERE tp.tag_id IN ({}) + AND tp.photo_name NOT IN ( + SELECT photo_name + FROM tagged_photo + WHERE tag_id IN ({}) + ) + ) + SELECT + fp.photo_name as file_name, + COUNT(DISTINCT tp2.tag_id) as tag_count + FROM filtered_photos fp + JOIN tagged_photo tp2 ON fp.photo_name = tp2.photo_name + GROUP BY fp.photo_name"#, + tag_placeholders, exclude_placeholders + )) + .into_boxed(); - // Execute the query: - let results = query.load::(&mut self.connection)?; - Ok(results) + // Bind all parameters + let query = tag_ids + .into_iter() + .fold(query, |q, id| q.bind::(id)); + let query = exclude_tag_ids + .into_iter() + .fold(query, |q, id| q.bind::(id)); + + query + .load::(&mut self.connection) + .with_context(|| "Unable to get tagged photos") }) } } diff --git a/src/video.rs b/src/video.rs index 79483bc..1f3d230 100644 --- a/src/video.rs +++ b/src/video.rs @@ -234,7 +234,7 @@ impl Handler for PlaylistGenerator { playlist_path, msg.video_path.file_name().unwrap().to_str().unwrap() ); - + let tracer = global_tracer(); let mut span = tracer .span_builder("playlistgenerator.generate_playlist") @@ -243,7 +243,7 @@ impl Handler for PlaylistGenerator { KeyValue::new("playlist_file", playlist_file.clone()), ]) .start(&tracer); - + Box::pin(async move { let wait_start = std::time::Instant::now(); let permit = semaphore @@ -255,9 +255,13 @@ impl Handler for PlaylistGenerator { "Waited for {:?} before starting ffmpeg", wait_start.elapsed() ); - span.add_event("Waited for FFMPEG semaphore", vec![ - KeyValue::new("wait_time", wait_start.elapsed().as_secs_f64()) - ]); + span.add_event( + "Waited for FFMPEG semaphore", + vec![KeyValue::new( + "wait_time", + wait_start.elapsed().as_secs_f64(), + )], + ); if Path::new(&playlist_file).exists() { debug!("Playlist already exists: {}", playlist_file); @@ -298,7 +302,7 @@ impl Handler for PlaylistGenerator { if let Ok(ref res) = ffmpeg_result { debug!("ffmpeg output: {:?}", res); } - + span.set_status(Status::Ok); ffmpeg_result -- 2.49.1