Get Otel span from the request
This commit is contained in:
13
src/files.rs
13
src/files.rs
@@ -10,10 +10,7 @@ use actix::{Handler, Message};
|
|||||||
use anyhow::{anyhow, Context};
|
use anyhow::{anyhow, Context};
|
||||||
|
|
||||||
use actix_web::web::Data;
|
use actix_web::web::Data;
|
||||||
use actix_web::{
|
use actix_web::{web::{self, Query}, HttpRequest, HttpResponse};
|
||||||
web::{self, Query},
|
|
||||||
HttpResponse,
|
|
||||||
};
|
|
||||||
use log::{debug, error, info, trace};
|
use log::{debug, error, info, trace};
|
||||||
use opentelemetry::KeyValue;
|
use opentelemetry::KeyValue;
|
||||||
use opentelemetry::trace::{Span, Status, Tracer};
|
use opentelemetry::trace::{Span, Status, Tracer};
|
||||||
@@ -28,10 +25,11 @@ use path_absolutize::*;
|
|||||||
use rand::prelude::SliceRandom;
|
use rand::prelude::SliceRandom;
|
||||||
use rand::thread_rng;
|
use rand::thread_rng;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use crate::otel::global_tracer;
|
use crate::otel::{extract_context_from_request, global_tracer};
|
||||||
|
|
||||||
pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
||||||
_: Claims,
|
_: Claims,
|
||||||
|
request: HttpRequest,
|
||||||
req: Query<FilesRequest>,
|
req: Query<FilesRequest>,
|
||||||
app_state: web::Data<AppState>,
|
app_state: web::Data<AppState>,
|
||||||
file_system: web::Data<FS>,
|
file_system: web::Data<FS>,
|
||||||
@@ -40,7 +38,8 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
|||||||
let search_path = &req.path;
|
let search_path = &req.path;
|
||||||
|
|
||||||
let tracer = global_tracer();
|
let tracer = global_tracer();
|
||||||
let mut span = tracer.start("list_photos");
|
let context = extract_context_from_request(&request);
|
||||||
|
let mut span = tracer.start_with_context("list_photos", &context);
|
||||||
span.set_attribute(KeyValue::new("path", search_path.to_string()));
|
span.set_attribute(KeyValue::new("path", search_path.to_string()));
|
||||||
|
|
||||||
let search_recursively = req.recursive.unwrap_or(false);
|
let search_recursively = req.recursive.unwrap_or(false);
|
||||||
@@ -547,6 +546,7 @@ mod tests {
|
|||||||
|
|
||||||
let response = list_photos(
|
let response = list_photos(
|
||||||
claims,
|
claims,
|
||||||
|
HttpRequest::default(),
|
||||||
request,
|
request,
|
||||||
Data::new(AppState::new(
|
Data::new(AppState::new(
|
||||||
Arc::new(StreamActor {}.start()),
|
Arc::new(StreamActor {}.start()),
|
||||||
@@ -594,6 +594,7 @@ mod tests {
|
|||||||
|
|
||||||
let response: HttpResponse = list_photos(
|
let response: HttpResponse = list_photos(
|
||||||
claims,
|
claims,
|
||||||
|
HttpRequest::default(),
|
||||||
request,
|
request,
|
||||||
Data::new(AppState::new(
|
Data::new(AppState::new(
|
||||||
Arc::new(StreamActor {}.start()),
|
Arc::new(StreamActor {}.start()),
|
||||||
|
|||||||
32
src/main.rs
32
src/main.rs
@@ -38,7 +38,7 @@ use crate::database::*;
|
|||||||
use crate::files::{
|
use crate::files::{
|
||||||
is_image_or_video, is_valid_full_path, move_file, RealFileSystem, RefreshThumbnailsMessage,
|
is_image_or_video, is_valid_full_path, move_file, RealFileSystem, RefreshThumbnailsMessage,
|
||||||
};
|
};
|
||||||
use crate::otel::global_tracer;
|
use crate::otel::{extract_context_from_request, global_tracer};
|
||||||
use crate::service::ServiceBuilder;
|
use crate::service::ServiceBuilder;
|
||||||
use crate::state::AppState;
|
use crate::state::AppState;
|
||||||
use crate::tags::*;
|
use crate::tags::*;
|
||||||
@@ -84,7 +84,9 @@ async fn get_image(
|
|||||||
app_state: Data<AppState>,
|
app_state: Data<AppState>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
let tracer = global_tracer();
|
let tracer = global_tracer();
|
||||||
let mut span = tracer.start("get_image");
|
let context = extract_context_from_request(&request);
|
||||||
|
|
||||||
|
let mut span = tracer.start_with_context("get_image", &context);
|
||||||
|
|
||||||
if let Some(path) = is_valid_full_path(&app_state.base_path, &req.path, false) {
|
if let Some(path) = is_valid_full_path(&app_state.base_path, &req.path, false) {
|
||||||
let image_size = req.size.unwrap_or(PhotoSize::Full);
|
let image_size = req.size.unwrap_or(PhotoSize::Full);
|
||||||
@@ -121,11 +123,13 @@ async fn get_image(
|
|||||||
#[get("/image/metadata")]
|
#[get("/image/metadata")]
|
||||||
async fn get_file_metadata(
|
async fn get_file_metadata(
|
||||||
_: Claims,
|
_: Claims,
|
||||||
|
request: HttpRequest,
|
||||||
path: web::Query<ThumbnailRequest>,
|
path: web::Query<ThumbnailRequest>,
|
||||||
app_state: Data<AppState>,
|
app_state: Data<AppState>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
let tracer = global_tracer();
|
let tracer = global_tracer();
|
||||||
let mut span = tracer.start("get_file_metadata");
|
let context = extract_context_from_request(&request);
|
||||||
|
let mut span = tracer.start_with_context("get_file_metadata", &context);
|
||||||
match is_valid_full_path(&app_state.base_path, &path.path, false)
|
match is_valid_full_path(&app_state.base_path, &path.path, false)
|
||||||
.ok_or_else(|| ErrorKind::InvalidData.into())
|
.ok_or_else(|| ErrorKind::InvalidData.into())
|
||||||
.and_then(File::open)
|
.and_then(File::open)
|
||||||
@@ -154,11 +158,13 @@ async fn get_file_metadata(
|
|||||||
#[post("/image")]
|
#[post("/image")]
|
||||||
async fn upload_image(
|
async fn upload_image(
|
||||||
_: Claims,
|
_: Claims,
|
||||||
|
request: HttpRequest,
|
||||||
mut payload: mp::Multipart,
|
mut payload: mp::Multipart,
|
||||||
app_state: Data<AppState>,
|
app_state: Data<AppState>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
let tracer = global_tracer();
|
let tracer = global_tracer();
|
||||||
let mut span = tracer.start("upload_image");
|
let context = extract_context_from_request(&request);
|
||||||
|
let mut span = tracer.start_with_context("upload_image", &context);
|
||||||
|
|
||||||
let mut file_content: BytesMut = BytesMut::new();
|
let mut file_content: BytesMut = BytesMut::new();
|
||||||
let mut file_name: Option<String> = None;
|
let mut file_name: Option<String> = None;
|
||||||
@@ -241,11 +247,14 @@ async fn upload_image(
|
|||||||
#[post("/video/generate")]
|
#[post("/video/generate")]
|
||||||
async fn generate_video(
|
async fn generate_video(
|
||||||
_claims: Claims,
|
_claims: Claims,
|
||||||
|
request: HttpRequest,
|
||||||
app_state: Data<AppState>,
|
app_state: Data<AppState>,
|
||||||
body: web::Json<ThumbnailRequest>,
|
body: web::Json<ThumbnailRequest>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
let tracer = global_tracer();
|
let tracer = global_tracer();
|
||||||
let mut span = tracer.start("generate_video");
|
|
||||||
|
let context = extract_context_from_request(&request);
|
||||||
|
let mut span = tracer.start_with_context("generate_video", &context);
|
||||||
|
|
||||||
let filename = PathBuf::from(&body.path);
|
let filename = PathBuf::from(&body.path);
|
||||||
|
|
||||||
@@ -289,7 +298,8 @@ async fn stream_video(
|
|||||||
app_state: Data<AppState>,
|
app_state: Data<AppState>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
let tracer = global::tracer("image-server");
|
let tracer = global::tracer("image-server");
|
||||||
let mut span = tracer.start("stream_video");
|
let context = extract_context_from_request(&request);
|
||||||
|
let mut span = tracer.start_with_context("stream_video", &context);
|
||||||
|
|
||||||
let playlist = &path.path;
|
let playlist = &path.path;
|
||||||
debug!("Playlist: {}", playlist);
|
debug!("Playlist: {}", playlist);
|
||||||
@@ -318,7 +328,8 @@ async fn get_video_part(
|
|||||||
app_state: Data<AppState>,
|
app_state: Data<AppState>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
let tracer = global_tracer();
|
let tracer = global_tracer();
|
||||||
let mut span = tracer.start("get_video_part");
|
let context = extract_context_from_request(&request);
|
||||||
|
let mut span = tracer.start_with_context("get_video_part", &context);
|
||||||
|
|
||||||
let part = &path.path;
|
let part = &path.path;
|
||||||
debug!("Video part: {}", part);
|
debug!("Video part: {}", part);
|
||||||
@@ -343,8 +354,13 @@ async fn get_video_part(
|
|||||||
#[get("image/favorites")]
|
#[get("image/favorites")]
|
||||||
async fn favorites(
|
async fn favorites(
|
||||||
claims: Claims,
|
claims: Claims,
|
||||||
|
request: HttpRequest,
|
||||||
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
|
favorites_dao: Data<Mutex<Box<dyn FavoriteDao>>>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
|
let tracer = global_tracer();
|
||||||
|
let context = extract_context_from_request(&request);
|
||||||
|
let mut span = tracer.start_with_context("get favorites", &context);
|
||||||
|
|
||||||
match web::block(move || {
|
match web::block(move || {
|
||||||
favorites_dao
|
favorites_dao
|
||||||
.lock()
|
.lock()
|
||||||
@@ -359,12 +375,14 @@ async fn favorites(
|
|||||||
.map(|favorite| favorite.path)
|
.map(|favorite| favorite.path)
|
||||||
.collect::<Vec<String>>();
|
.collect::<Vec<String>>();
|
||||||
|
|
||||||
|
span.set_status(Status::Ok);
|
||||||
HttpResponse::Ok().json(PhotosResponse {
|
HttpResponse::Ok().json(PhotosResponse {
|
||||||
photos: favorites,
|
photos: favorites,
|
||||||
dirs: Vec::new(),
|
dirs: Vec::new(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
|
span.set_status(Status::error(format!("Error getting favorites: {:?}", e)));
|
||||||
error!("Error getting favorites: {:?}", e);
|
error!("Error getting favorites: {:?}", e);
|
||||||
HttpResponse::InternalServerError().finish()
|
HttpResponse::InternalServerError().finish()
|
||||||
}
|
}
|
||||||
|
|||||||
22
src/otel.rs
22
src/otel.rs
@@ -1,8 +1,12 @@
|
|||||||
|
use actix_web::http::header::HeaderMap;
|
||||||
|
use actix_web::HttpRequest;
|
||||||
use opentelemetry::global::BoxedTracer;
|
use opentelemetry::global::BoxedTracer;
|
||||||
use opentelemetry::{global, KeyValue};
|
use opentelemetry::{global, Context, KeyValue};
|
||||||
|
use opentelemetry::propagation::TextMapPropagator;
|
||||||
use opentelemetry_appender_log::OpenTelemetryLogBridge;
|
use opentelemetry_appender_log::OpenTelemetryLogBridge;
|
||||||
use opentelemetry_otlp::WithExportConfig;
|
use opentelemetry_otlp::WithExportConfig;
|
||||||
use opentelemetry_sdk::logs::{BatchLogProcessor, SdkLoggerProvider};
|
use opentelemetry_sdk::logs::{BatchLogProcessor, SdkLoggerProvider};
|
||||||
|
use opentelemetry_sdk::propagation::TraceContextPropagator;
|
||||||
use opentelemetry_sdk::Resource;
|
use opentelemetry_sdk::Resource;
|
||||||
|
|
||||||
pub fn global_tracer() -> BoxedTracer {
|
pub fn global_tracer() -> BoxedTracer {
|
||||||
@@ -58,3 +62,19 @@ pub fn init_logs() {
|
|||||||
//TODO: Still set this with the env? Ideally we still have a clean/simple local logger for local dev
|
//TODO: Still set this with the env? Ideally we still have a clean/simple local logger for local dev
|
||||||
log::set_max_level(log::LevelFilter::Info);
|
log::set_max_level(log::LevelFilter::Info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct HeaderExtractor<'a>(&'a HeaderMap);
|
||||||
|
|
||||||
|
impl<'a> opentelemetry::propagation::Extractor for HeaderExtractor<'a> {
|
||||||
|
fn get(&self, key: &str) -> Option<&str> {
|
||||||
|
self.0.get(key).and_then(|v| v.to_str().ok())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn keys(&self) -> Vec<&str> {
|
||||||
|
self.0.keys().map(|k| k.as_str()).collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn extract_context_from_request(req: &HttpRequest) -> Context {
|
||||||
|
let propagator = TraceContextPropagator::new();
|
||||||
|
propagator.extract(&HeaderExtractor(req.headers()))
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user