From 785ce157e6f32a37f48259ba3256880a44c0a15f Mon Sep 17 00:00:00 2001 From: Cameron Date: Fri, 23 May 2025 18:24:54 -0400 Subject: [PATCH] Get Otel span from the request --- src/files.rs | 13 +++++++------ src/main.rs | 32 +++++++++++++++++++++++++------- src/otel.rs | 22 +++++++++++++++++++++- 3 files changed, 53 insertions(+), 14 deletions(-) diff --git a/src/files.rs b/src/files.rs index 79fda03..b91e8ba 100644 --- a/src/files.rs +++ b/src/files.rs @@ -10,10 +10,7 @@ use actix::{Handler, Message}; use anyhow::{anyhow, Context}; use actix_web::web::Data; -use actix_web::{ - web::{self, Query}, - HttpResponse, -}; +use actix_web::{web::{self, Query}, HttpRequest, HttpResponse}; use log::{debug, error, info, trace}; use opentelemetry::KeyValue; use opentelemetry::trace::{Span, Status, Tracer}; @@ -28,10 +25,11 @@ use path_absolutize::*; use rand::prelude::SliceRandom; use rand::thread_rng; use serde::Deserialize; -use crate::otel::global_tracer; +use crate::otel::{extract_context_from_request, global_tracer}; pub async fn list_photos( _: Claims, + request: HttpRequest, req: Query, app_state: web::Data, file_system: web::Data, @@ -40,7 +38,8 @@ pub async fn list_photos( let search_path = &req.path; let tracer = global_tracer(); - let mut span = tracer.start("list_photos"); + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("list_photos", &context); span.set_attribute(KeyValue::new("path", search_path.to_string())); let search_recursively = req.recursive.unwrap_or(false); @@ -547,6 +546,7 @@ mod tests { let response = list_photos( claims, + HttpRequest::default(), request, Data::new(AppState::new( Arc::new(StreamActor {}.start()), @@ -594,6 +594,7 @@ mod tests { let response: HttpResponse = list_photos( claims, + HttpRequest::default(), request, Data::new(AppState::new( Arc::new(StreamActor {}.start()), diff --git a/src/main.rs b/src/main.rs index 5dbf5c2..e7f7a1e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -38,7 +38,7 @@ use crate::database::*; use crate::files::{ is_image_or_video, is_valid_full_path, move_file, RealFileSystem, RefreshThumbnailsMessage, }; -use crate::otel::global_tracer; +use crate::otel::{extract_context_from_request, global_tracer}; use crate::service::ServiceBuilder; use crate::state::AppState; use crate::tags::*; @@ -84,7 +84,9 @@ async fn get_image( app_state: Data, ) -> impl Responder { let tracer = global_tracer(); - let mut span = tracer.start("get_image"); + let context = extract_context_from_request(&request); + + let mut span = tracer.start_with_context("get_image", &context); if let Some(path) = is_valid_full_path(&app_state.base_path, &req.path, false) { let image_size = req.size.unwrap_or(PhotoSize::Full); @@ -121,11 +123,13 @@ async fn get_image( #[get("/image/metadata")] async fn get_file_metadata( _: Claims, + request: HttpRequest, path: web::Query, app_state: Data, ) -> impl Responder { let tracer = global_tracer(); - let mut span = tracer.start("get_file_metadata"); + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("get_file_metadata", &context); match is_valid_full_path(&app_state.base_path, &path.path, false) .ok_or_else(|| ErrorKind::InvalidData.into()) .and_then(File::open) @@ -154,11 +158,13 @@ async fn get_file_metadata( #[post("/image")] async fn upload_image( _: Claims, + request: HttpRequest, mut payload: mp::Multipart, app_state: Data, ) -> impl Responder { let tracer = global_tracer(); - let mut span = tracer.start("upload_image"); + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("upload_image", &context); let mut file_content: BytesMut = BytesMut::new(); let mut file_name: Option = None; @@ -241,11 +247,14 @@ async fn upload_image( #[post("/video/generate")] async fn generate_video( _claims: Claims, + request: HttpRequest, app_state: Data, body: web::Json, ) -> impl Responder { let tracer = global_tracer(); - let mut span = tracer.start("generate_video"); + + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("generate_video", &context); let filename = PathBuf::from(&body.path); @@ -289,7 +298,8 @@ async fn stream_video( app_state: Data, ) -> impl Responder { let tracer = global::tracer("image-server"); - let mut span = tracer.start("stream_video"); + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("stream_video", &context); let playlist = &path.path; debug!("Playlist: {}", playlist); @@ -318,7 +328,8 @@ async fn get_video_part( app_state: Data, ) -> impl Responder { let tracer = global_tracer(); - let mut span = tracer.start("get_video_part"); + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("get_video_part", &context); let part = &path.path; debug!("Video part: {}", part); @@ -343,8 +354,13 @@ async fn get_video_part( #[get("image/favorites")] async fn favorites( claims: Claims, + request: HttpRequest, favorites_dao: Data>>, ) -> impl Responder { + let tracer = global_tracer(); + let context = extract_context_from_request(&request); + let mut span = tracer.start_with_context("get favorites", &context); + match web::block(move || { favorites_dao .lock() @@ -359,12 +375,14 @@ async fn favorites( .map(|favorite| favorite.path) .collect::>(); + span.set_status(Status::Ok); HttpResponse::Ok().json(PhotosResponse { photos: favorites, dirs: Vec::new(), }) } Ok(Err(e)) => { + span.set_status(Status::error(format!("Error getting favorites: {:?}", e))); error!("Error getting favorites: {:?}", e); HttpResponse::InternalServerError().finish() } diff --git a/src/otel.rs b/src/otel.rs index 5c9e121..595c834 100644 --- a/src/otel.rs +++ b/src/otel.rs @@ -1,8 +1,12 @@ +use actix_web::http::header::HeaderMap; +use actix_web::HttpRequest; use opentelemetry::global::BoxedTracer; -use opentelemetry::{global, KeyValue}; +use opentelemetry::{global, Context, KeyValue}; +use opentelemetry::propagation::TextMapPropagator; use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::logs::{BatchLogProcessor, SdkLoggerProvider}; +use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::Resource; pub fn global_tracer() -> BoxedTracer { @@ -58,3 +62,19 @@ pub fn init_logs() { //TODO: Still set this with the env? Ideally we still have a clean/simple local logger for local dev log::set_max_level(log::LevelFilter::Info); } + +struct HeaderExtractor<'a>(&'a HeaderMap); + +impl<'a> opentelemetry::propagation::Extractor for HeaderExtractor<'a> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|v| v.to_str().ok()) + } + + fn keys(&self) -> Vec<&str> { + self.0.keys().map(|k| k.as_str()).collect() + } +} +pub fn extract_context_from_request(req: &HttpRequest) -> Context { + let propagator = TraceContextPropagator::new(); + propagator.extract(&HeaderExtractor(req.headers())) +}