diff --git a/src/files.rs b/src/files.rs index b91e8ba..9afef7e 100644 --- a/src/files.rs +++ b/src/files.rs @@ -13,7 +13,7 @@ use actix_web::web::Data; use actix_web::{web::{self, Query}, HttpRequest, HttpResponse}; use log::{debug, error, info, trace}; use opentelemetry::KeyValue; -use opentelemetry::trace::{Span, Status, Tracer}; +use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; use crate::data::{Claims, FilesRequest, FilterMode, PhotosResponse, SortType}; use crate::{create_thumbnails, AppState}; @@ -41,6 +41,7 @@ pub async fn list_photos( let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("list_photos", &context); span.set_attribute(KeyValue::new("path", search_path.to_string())); + let span_context = opentelemetry::Context::current_with_span(span); let search_recursively = req.recursive.unwrap_or(false); if let Some(tag_ids) = &req.tag_ids { @@ -66,8 +67,8 @@ pub async fn list_photos( .collect::>(); return match filter_mode { - FilterMode::Any => dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids), - FilterMode::All => dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids), + FilterMode::Any => dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context), + FilterMode::All => dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context), } .context(format!( "Failed to get files with tag_ids: {:?} with filter_mode: {:?}", @@ -104,8 +105,8 @@ pub async fn list_photos( tagged_files.len(), tagged_files ); - span.set_attribute(KeyValue::new("file_count", tagged_files.len().to_string())); - span.set_status(Status::Ok); + span_context.span().set_attribute(KeyValue::new("file_count", tagged_files.len().to_string())); + span_context.span().set_status(Status::Ok); HttpResponse::Ok().json(PhotosResponse { photos: tagged_files, @@ -138,7 +139,7 @@ pub async fn list_photos( .map(|f| f.to_str().unwrap().to_string()) .map(|file_name| { let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); - let file_tags = tag_dao.get_tags_for_path(&file_name).unwrap_or_default(); + let file_tags = tag_dao.get_tags_for_path(&span_context, &file_name).unwrap_or_default(); (file_name, file_tags) }) @@ -197,8 +198,8 @@ pub async fn list_photos( .map(|f| f.to_str().unwrap().to_string()) .collect::>(); - span.set_attribute(KeyValue::new("file_count", files.len().to_string())); - span.set_status(Status::Ok); + span_context.span().set_attribute(KeyValue::new("file_count", files.len().to_string())); + span_context.span().set_status(Status::Ok); HttpResponse::Ok().json(PhotosResponse { photos: response_files, @@ -206,7 +207,7 @@ pub async fn list_photos( }) } else { error!("Bad photos request: {}", req.path); - span.set_status(Status::error("Invalid path")); + span_context.span().set_status(Status::error("Invalid path")); HttpResponse::BadRequest().finish() } } diff --git a/src/otel.rs b/src/otel.rs index 0c727f6..01c8cbe 100644 --- a/src/otel.rs +++ b/src/otel.rs @@ -82,15 +82,16 @@ pub fn extract_context_from_request(req: &HttpRequest) -> Context { propagator.extract(&HeaderExtractor(req.headers())) } -pub fn trace_db_call(query_type: &str, operation: &str, func: F) -> anyhow::Result +pub fn trace_db_call(context: &Context, query_type: &str, operation: &str, func: F) -> anyhow::Result where F: FnOnce() -> anyhow::Result { + let tracer = global::tracer("db"); let mut span = tracer .span_builder(format!("db.{}.{}", query_type, operation)) .with_attributes(vec![ KeyValue::new("db.query_type", query_type.to_string().clone()), KeyValue::new("db.operation", operation.to_string().clone()), - ]).start_with_context(&tracer, &Context::current()); + ]).start_with_context(&tracer, context); let result = func(); match &result { diff --git a/src/tags.rs b/src/tags.rs index a2cdeae..96f146e 100644 --- a/src/tags.rs +++ b/src/tags.rs @@ -1,14 +1,15 @@ use crate::data::GetTagsRequest; -use crate::otel::trace_db_call; +use crate::otel::{extract_context_from_request, global_tracer, trace_db_call}; use crate::{connect, data::AddTagRequest, error::IntoHttpError, schema, Claims, ThumbnailRequest}; use actix_web::dev::{ServiceFactory, ServiceRequest}; -use actix_web::{web, App, HttpResponse, Responder}; +use actix_web::{web, App, HttpRequest, HttpResponse, Responder}; use anyhow::Context; use chrono::Utc; use diesel::dsl::count_star; use diesel::prelude::*; use diesel::sql_types::*; use log::{debug, info, trace}; +use opentelemetry::trace::Tracer; use schema::{tagged_photo, tags}; use serde::{Deserialize, Serialize}; use std::borrow::BorrowMut; @@ -30,35 +31,41 @@ where async fn add_tag( _: Claims, + request: HttpRequest, body: web::Json, tag_dao: web::Data>, ) -> impl Responder { + let tracer = global_tracer(); + let context = extract_context_from_request(&request); + let _ = tracer.start_with_context("add_tag", &context); let tag_name = body.tag_name.clone(); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .get_all_tags(None) + .get_all_tags(&context, None) .and_then(|tags| { if let Some((_, tag)) = tags.iter().find(|t| t.1.name == tag_name) { Ok(tag.clone()) } else { - tag_dao.create_tag(tag_name.trim()) + tag_dao.create_tag(&context, tag_name.trim()) } }) - .and_then(|tag| tag_dao.tag_file(&body.file_name, tag.id)) + .and_then(|tag| tag_dao.tag_file(&context, &body.file_name, tag.id)) .map(|_| HttpResponse::Ok()) .into_http_internal_err() } async fn get_tags( _: Claims, + http_request: HttpRequest, request: web::Query, tag_dao: web::Data>, ) -> impl Responder { + let context = extract_context_from_request(&http_request); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .get_tags_for_path(&request.path) + .get_tags_for_path(&context, &request.path) .map(|tags| HttpResponse::Ok().json(tags)) .into_http_internal_err() } @@ -66,11 +73,13 @@ async fn get_tags( async fn get_all_tags( _: Claims, tag_dao: web::Data>, + request: HttpRequest, query: web::Query, ) -> impl Responder { + let context = extract_context_from_request(&request); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .get_all_tags(query.path.clone()) + .get_all_tags(&context, query.path.clone()) .map(|tags| { HttpResponse::Ok().json( tags.iter() @@ -86,12 +95,14 @@ async fn get_all_tags( async fn remove_tagged_photo( _: Claims, + http_request: HttpRequest, request: web::Json, tag_dao: web::Data>, ) -> impl Responder { + let context = extract_context_from_request(&http_request); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .remove_tag(&request.tag_name, &request.file_name) + .remove_tag(&context, &request.tag_name, &request.file_name) .map(|result| { if result.is_some() { HttpResponse::Ok() @@ -105,12 +116,14 @@ async fn remove_tagged_photo( async fn update_tags( _: Claims, tag_dao: web::Data>, + http_request: HttpRequest, request: web::Json, ) -> impl Responder { let mut dao = tag_dao.lock().expect("Unable to get TagDao"); + let context = extract_context_from_request(&http_request); - dao.get_tags_for_path(&request.file_name) - .and_then(|existing_tags| dao.get_all_tags(None).map(|all| (existing_tags, all))) + dao.get_tags_for_path(&context, &request.file_name) + .and_then(|existing_tags| dao.get_all_tags(&context, None).map(|all| (existing_tags, all))) .map(|(existing_tags, all_tags)| { let tags_to_remove = existing_tags .iter() @@ -122,7 +135,7 @@ async fn update_tags( "Removing tag {:?} from file: {:?}", tag.name, request.file_name ); - dao.remove_tag(&tag.name, &request.file_name) + dao.remove_tag(&context, &tag.name, &request.file_name) .unwrap_or_else(|err| panic!("{:?} Unable to remove tag {:?}", err, &tag.name)); } @@ -137,7 +150,7 @@ async fn update_tags( new_tag.name, request.file_name ); - dao.tag_file(&request.file_name, new_tag.id) + dao.tag_file(&context, &request.file_name, new_tag.id) .with_context(|| { format!( "Unable to tag file {:?} with tag: {:?}", @@ -195,20 +208,26 @@ pub struct AddTagsRequest { } pub trait TagDao { - fn get_all_tags(&mut self, path: Option) -> anyhow::Result>; - fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result>; - fn create_tag(&mut self, name: &str) -> anyhow::Result; - fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result>; - fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result; + fn get_all_tags( + &mut self, + context: &opentelemetry::Context, + path: Option, + ) -> anyhow::Result>; + fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result>; + fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result; + fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result>; + fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result; fn get_files_with_all_tag_ids( &mut self, tag_ids: Vec, exclude_tag_ids: Vec, + context: &opentelemetry::Context, ) -> anyhow::Result>; fn get_files_with_any_tag_ids( &mut self, tag_ids: Vec, exclude_tag_ids: Vec, + context: &opentelemetry::Context, ) -> anyhow::Result>; } @@ -229,10 +248,10 @@ impl Default for SqliteTagDao { } impl TagDao for SqliteTagDao { - fn get_all_tags(&mut self, path: Option) -> anyhow::Result> { + fn get_all_tags(&mut self, context: &opentelemetry::Context, path: Option) -> anyhow::Result> { // select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*); - trace_db_call("query", "get_all_tags", || { + trace_db_call(&context, "query", "get_all_tags", || { let path = path.map(|p| p + "%").unwrap_or("%".to_string()); let (id, name, created_time) = tags::all_columns; tags::table @@ -260,8 +279,8 @@ impl TagDao for SqliteTagDao { }) } - fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result> { - trace_db_call("query", "get_tags_for_path", || { + fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result> { + trace_db_call(&context, "query", "get_tags_for_path", || { trace!("Getting Tags for path: {:?}", path); tags::table .left_join(tagged_photo::table) @@ -272,8 +291,8 @@ impl TagDao for SqliteTagDao { }) } - fn create_tag(&mut self, name: &str) -> anyhow::Result { - trace_db_call("insert", "create_tag", || { + fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result { + trace_db_call(&context, "insert", "create_tag", || { diesel::insert_into(tags::table) .values(InsertTag { name: name.to_string(), @@ -303,8 +322,8 @@ impl TagDao for SqliteTagDao { }) } - fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result> { - trace_db_call("delete", "remove_tag", || { + fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result> { + trace_db_call(&context, "delete", "remove_tag", || { tags::table .filter(tags::name.eq(tag_name)) .get_result::(self.connection.borrow_mut()) @@ -328,8 +347,8 @@ impl TagDao for SqliteTagDao { }) } - fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result { - trace_db_call("insert", "tag_file", || { + fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result { + trace_db_call(&context, "insert", "tag_file", || { diesel::insert_into(tagged_photo::table) .values(InsertTaggedPhoto { tag_id, @@ -365,8 +384,9 @@ impl TagDao for SqliteTagDao { &mut self, tag_ids: Vec, exclude_tag_ids: Vec, + context: &opentelemetry::Context, ) -> anyhow::Result> { - trace_db_call("query", "get_files_with_all_tags", || { + trace_db_call(&context, "query", "get_files_with_all_tags", || { use diesel::dsl::*; let exclude_subquery = tagged_photo::table @@ -401,8 +421,9 @@ impl TagDao for SqliteTagDao { &mut self, tag_ids: Vec, exclude_tag_ids: Vec, + context: &opentelemetry::Context, ) -> anyhow::Result> { - trace_db_call("query", "get_files_with_any_tags", || { + trace_db_call(&context, "query", "get_files_with_any_tags", || { use diesel::dsl::*; let tag_ids_str = tag_ids @@ -472,7 +493,7 @@ mod tests { } impl TagDao for TestTagDao { - fn get_all_tags(&mut self, _option: Option) -> anyhow::Result> { + fn get_all_tags(&mut self, context: &opentelemetry::Context, _option: Option) -> anyhow::Result> { Ok(self .tags .borrow() @@ -482,7 +503,7 @@ mod tests { .clone()) } - fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result> { + fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result> { info!("Getting test tags for: {:?}", path); warn!("Tags for path: {:?}", self.tagged_photos); @@ -494,7 +515,7 @@ mod tests { .clone()) } - fn create_tag(&mut self, name: &str) -> anyhow::Result { + fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result { self.tag_count += 1; let tag_id = self.tag_count; @@ -510,7 +531,7 @@ mod tests { Ok(tag) } - fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result> { + fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result> { let mut clone = { let photo_tags = &self.tagged_photos.borrow()[path]; photo_tags.clone() @@ -530,7 +551,7 @@ mod tests { } } - fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result { + fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result { debug!("Tagging file: {:?} with tag_id: {:?}", path, tag_id); if let Some(tag) = self.tags.borrow().iter().find(|t| t.id == tag_id) { @@ -566,15 +587,17 @@ mod tests { fn get_files_with_all_tag_ids( &mut self, tag_ids: Vec, - _exclude_tag_ids: Vec, + exclude_tag_ids: Vec, + context: &opentelemetry::Context, ) -> anyhow::Result> { todo!() } fn get_files_with_any_tag_ids( &mut self, - _tag_ids: Vec, - _exclude_tag_ids: Vec, + tag_ids: Vec, + exclude_tag_ids: Vec, + context: &opentelemetry::Context, ) -> anyhow::Result> { todo!() }