diff --git a/src/files.rs b/src/files.rs index af2a47a..1f3f22f 100644 --- a/src/files.rs +++ b/src/files.rs @@ -9,23 +9,26 @@ use ::anyhow; use actix::{Handler, Message}; use anyhow::{anyhow, Context}; -use actix_web::web::Data; -use actix_web::{web::{self, Query}, HttpRequest, HttpResponse}; -use log::{debug, error, info, trace}; -use opentelemetry::KeyValue; -use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; use crate::data::{Claims, FilesRequest, FilterMode, PhotosResponse, SortType}; use crate::{create_thumbnails, AppState}; +use actix_web::web::Data; +use actix_web::{ + web::{self, Query}, + HttpRequest, HttpResponse, +}; +use log::{debug, error, info, trace}; +use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; +use opentelemetry::KeyValue; use crate::data::SortType::NameAsc; use crate::error::IntoHttpError; +use crate::otel::{extract_context_from_request, global_tracer}; use crate::tags::{FileWithTagCount, TagDao}; use crate::video::StreamActor; use path_absolutize::*; use rand::prelude::SliceRandom; use rand::thread_rng; use serde::Deserialize; -use crate::otel::{extract_context_from_request, global_tracer}; pub async fn list_photos( _: Claims, @@ -36,11 +39,28 @@ pub async fn list_photos( tag_dao: web::Data>, ) -> HttpResponse { let search_path = &req.path; - + let tracer = global_tracer(); let context = extract_context_from_request(&request); let mut span = tracer.start_with_context("list_photos", &context); - span.set_attribute(KeyValue::new("path", search_path.to_string())); + span.set_attributes(vec![ + KeyValue::new("path", search_path.to_string()), + KeyValue::new("recursive", req.recursive.unwrap_or(false).to_string()), + KeyValue::new( + "tag_ids", + req.tag_ids.clone().unwrap_or_default().to_string(), + ), + KeyValue::new( + "tag_filter_mode", + format!("{:?}", req.tag_filter_mode.unwrap_or(FilterMode::Any)), + ), + KeyValue::new( + "exclude_tag_ids", + req.exclude_tag_ids.clone().unwrap_or_default().to_string(), + ), + KeyValue::new("sort", format!("{:?}", &req.sort.unwrap_or(NameAsc))), + ]); + let span_context = opentelemetry::Context::current_with_span(span); let search_recursively = req.recursive.unwrap_or(false); @@ -67,8 +87,12 @@ pub async fn list_photos( .collect::>(); return match filter_mode { - FilterMode::Any => dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context), - FilterMode::All => dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context), + FilterMode::Any => { + dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context) + } + FilterMode::All => { + dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context) + } } .context(format!( "Failed to get files with tag_ids: {:?} with filter_mode: {:?}", @@ -105,7 +129,9 @@ pub async fn list_photos( tagged_files.len(), tagged_files ); - span_context.span().set_attribute(KeyValue::new("file_count", tagged_files.len().to_string())); + span_context + .span() + .set_attribute(KeyValue::new("file_count", tagged_files.len().to_string())); span_context.span().set_status(Status::Ok); HttpResponse::Ok().json(PhotosResponse { @@ -139,7 +165,9 @@ pub async fn list_photos( .map(|f| f.to_str().unwrap().to_string()) .map(|file_name| { let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); - let file_tags = tag_dao.get_tags_for_path(&span_context, &file_name).unwrap_or_default(); + let file_tags = tag_dao + .get_tags_for_path(&span_context, &file_name) + .unwrap_or_default(); (file_name, file_tags) }) @@ -198,16 +226,20 @@ pub async fn list_photos( .map(|f| f.to_str().unwrap().to_string()) .collect::>(); - span_context.span().set_attribute(KeyValue::new("file_count", files.len().to_string())); + span_context + .span() + .set_attribute(KeyValue::new("file_count", files.len().to_string())); span_context.span().set_status(Status::Ok); - + HttpResponse::Ok().json(PhotosResponse { photos: response_files, dirs, }) } else { error!("Bad photos request: {}", req.path); - span_context.span().set_status(Status::error("Invalid path")); + span_context + .span() + .set_status(Status::error("Invalid path")); HttpResponse::BadRequest().finish() } } @@ -241,7 +273,7 @@ pub fn list_files(dir: &Path) -> io::Result> { let dir_name_string = dir.to_str().unwrap_or_default().to_string(); span.set_attribute(KeyValue::new("dir", dir_name_string)); info!("Listing files in: {:?}", dir); - + let files = read_dir(dir)? .filter_map(|res| res.ok()) .filter(|entry| is_image_or_video(&entry.path()) || entry.file_type().unwrap().is_dir()) @@ -249,6 +281,7 @@ pub fn list_files(dir: &Path) -> io::Result> { .collect::>(); span.set_attribute(KeyValue::new("file_count", files.len().to_string())); + span.set_status(Status::Ok); info!("Found {:?} files in directory: {:?}", files.len(), dir); Ok(files) } @@ -395,7 +428,11 @@ impl FileSystemAccess for RealFileSystem { } fn move_file>(&self, from: P, destination: P) -> anyhow::Result<()> { - info!("Moving file: '{:?}' -> '{:?}'", from.as_ref(), destination.as_ref()); + info!( + "Moving file: '{:?}' -> '{:?}'", + from.as_ref(), + destination.as_ref() + ); let name = from .as_ref() .file_name() diff --git a/src/main.rs b/src/main.rs index e7f7a1e..50cba73 100644 --- a/src/main.rs +++ b/src/main.rs @@ -676,7 +676,13 @@ fn watch_files() { let ev = wrx.recv(); if let Ok(Ok(event)) = ev { match event.kind { - EventKind::Create(_) => create_thumbnails(), + EventKind::Create(create_kind) => { + info!( + "Creating thumbnails {:?} create event kind: {:?}", + event.paths, create_kind + ); + create_thumbnails(); + } EventKind::Modify(kind) => { debug!("All modified paths: {:?}", event.paths); debug!("Modify kind: {:?}", kind); diff --git a/src/otel.rs b/src/otel.rs index 01c8cbe..b93fa31 100644 --- a/src/otel.rs +++ b/src/otel.rs @@ -1,6 +1,6 @@ use actix_web::http::header::HeaderMap; use actix_web::HttpRequest; -use opentelemetry::global::BoxedTracer; +use opentelemetry::global::{BoxedSpan, BoxedTracer}; use opentelemetry::propagation::TextMapPropagator; use opentelemetry::trace::{Span, Status, Tracer}; use opentelemetry::{global, Context, KeyValue}; @@ -82,26 +82,31 @@ pub fn extract_context_from_request(req: &HttpRequest) -> Context { propagator.extract(&HeaderExtractor(req.headers())) } -pub fn trace_db_call(context: &Context, query_type: &str, operation: &str, func: F) -> anyhow::Result -where F: FnOnce() -> anyhow::Result { - +pub fn trace_db_call( + context: &Context, + query_type: &str, + operation: &str, + func: F, +) -> anyhow::Result +where + F: FnOnce(&mut BoxedSpan) -> anyhow::Result, +{ let tracer = global::tracer("db"); let mut span = tracer .span_builder(format!("db.{}.{}", query_type, operation)) .with_attributes(vec![ KeyValue::new("db.query_type", query_type.to_string().clone()), KeyValue::new("db.operation", operation.to_string().clone()), - ]).start_with_context(&tracer, context); - - let result = func(); + ]) + .start_with_context(&tracer, context); + + let result = func(&mut span); match &result { Ok(_) => { span.set_status(Status::Ok); } - Err(e) => { - span.set_status(Status::error(e.to_string())) - } + Err(e) => span.set_status(Status::error(e.to_string())), } - + result -} \ No newline at end of file +} diff --git a/src/tags.rs b/src/tags.rs index 96f146e..4c6dac6 100644 --- a/src/tags.rs +++ b/src/tags.rs @@ -8,8 +8,9 @@ use chrono::Utc; use diesel::dsl::count_star; use diesel::prelude::*; use diesel::sql_types::*; -use log::{debug, info, trace}; -use opentelemetry::trace::Tracer; +use log::{debug, info}; +use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer}; +use opentelemetry::KeyValue; use schema::{tagged_photo, tags}; use serde::{Deserialize, Serialize}; use std::borrow::BorrowMut; @@ -37,22 +38,30 @@ async fn add_tag( ) -> impl Responder { let tracer = global_tracer(); let context = extract_context_from_request(&request); - let _ = tracer.start_with_context("add_tag", &context); + let span = tracer.start_with_context("add_tag", &context); + let span_context = opentelemetry::Context::current_with_span(span); let tag_name = body.tag_name.clone(); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .get_all_tags(&context, None) + .get_all_tags(&span_context, None) .and_then(|tags| { if let Some((_, tag)) = tags.iter().find(|t| t.1.name == tag_name) { Ok(tag.clone()) } else { - tag_dao.create_tag(&context, tag_name.trim()) + info!( + "Creating missing tag: '{:?}' for file: '{}'", + tag_name, &body.file_name + ); + tag_dao.create_tag(&span_context, tag_name.trim()) } }) - .and_then(|tag| tag_dao.tag_file(&context, &body.file_name, tag.id)) - .map(|_| HttpResponse::Ok()) + .and_then(|tag| tag_dao.tag_file(&span_context, &body.file_name, tag.id)) + .map(|_| { + span_context.span().set_status(Status::Ok); + HttpResponse::Ok() + }) .into_http_internal_err() } @@ -63,10 +72,15 @@ async fn get_tags( tag_dao: web::Data>, ) -> impl Responder { let context = extract_context_from_request(&http_request); + let span = global_tracer().start_with_context("get_tags", &context); + let span_context = opentelemetry::Context::current_with_span(span); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .get_tags_for_path(&context, &request.path) - .map(|tags| HttpResponse::Ok().json(tags)) + .get_tags_for_path(&span_context, &request.path) + .map(|tags| { + span_context.span().set_status(Status::Ok); + HttpResponse::Ok().json(tags) + }) .into_http_internal_err() } @@ -77,10 +91,15 @@ async fn get_all_tags( query: web::Query, ) -> impl Responder { let context = extract_context_from_request(&request); + let span = global_tracer().start_with_context("get_all_tags", &context); + let span_context = opentelemetry::Context::current_with_span(span); + let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .get_all_tags(&context, query.path.clone()) + .get_all_tags(&span_context, query.path.clone()) .map(|tags| { + span_context.span().set_status(Status::Ok); + HttpResponse::Ok().json( tags.iter() .map(|(tag_count, tag)| TagWithTagCount { @@ -100,10 +119,15 @@ async fn remove_tagged_photo( tag_dao: web::Data>, ) -> impl Responder { let context = extract_context_from_request(&http_request); + let span = global_tracer().start_with_context("remove_tagged_photo", &context); + let span_context = opentelemetry::Context::current_with_span(span); + let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); tag_dao - .remove_tag(&context, &request.tag_name, &request.file_name) + .remove_tag(&span_context, &request.tag_name, &request.file_name) .map(|result| { + span_context.span().set_status(Status::Ok); + if result.is_some() { HttpResponse::Ok() } else { @@ -121,9 +145,14 @@ async fn update_tags( ) -> impl Responder { let mut dao = tag_dao.lock().expect("Unable to get TagDao"); let context = extract_context_from_request(&http_request); + let span = global_tracer().start_with_context("update_tags", &context); + let span_context = opentelemetry::Context::current_with_span(span); - dao.get_tags_for_path(&context, &request.file_name) - .and_then(|existing_tags| dao.get_all_tags(&context, None).map(|all| (existing_tags, all))) + dao.get_tags_for_path(&span_context, &request.file_name) + .and_then(|existing_tags| { + dao.get_all_tags(&span_context, None) + .map(|all| (existing_tags, all)) + }) .map(|(existing_tags, all_tags)| { let tags_to_remove = existing_tags .iter() @@ -135,7 +164,7 @@ async fn update_tags( "Removing tag {:?} from file: {:?}", tag.name, request.file_name ); - dao.remove_tag(&context, &tag.name, &request.file_name) + dao.remove_tag(&span_context, &tag.name, &request.file_name) .unwrap_or_else(|err| panic!("{:?} Unable to remove tag {:?}", err, &tag.name)); } @@ -150,7 +179,7 @@ async fn update_tags( new_tag.name, request.file_name ); - dao.tag_file(&context, &request.file_name, new_tag.id) + dao.tag_file(&span_context, &request.file_name, new_tag.id) .with_context(|| { format!( "Unable to tag file {:?} with tag: {:?}", @@ -160,6 +189,7 @@ async fn update_tags( .unwrap(); } + span_context.span().set_status(Status::Ok); HttpResponse::Ok() }) .into_http_internal_err() @@ -213,10 +243,24 @@ pub trait TagDao { context: &opentelemetry::Context, path: Option, ) -> anyhow::Result>; - fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result>; + fn get_tags_for_path( + &mut self, + context: &opentelemetry::Context, + path: &str, + ) -> anyhow::Result>; fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result; - fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result>; - fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result; + fn remove_tag( + &mut self, + context: &opentelemetry::Context, + tag_name: &str, + path: &str, + ) -> anyhow::Result>; + fn tag_file( + &mut self, + context: &opentelemetry::Context, + path: &str, + tag_id: i32, + ) -> anyhow::Result; fn get_files_with_all_tag_ids( &mut self, tag_ids: Vec, @@ -248,10 +292,16 @@ impl Default for SqliteTagDao { } impl TagDao for SqliteTagDao { - fn get_all_tags(&mut self, context: &opentelemetry::Context, path: Option) -> anyhow::Result> { + fn get_all_tags( + &mut self, + context: &opentelemetry::Context, + path: Option, + ) -> anyhow::Result> { // select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*); - trace_db_call(&context, "query", "get_all_tags", || { + trace_db_call(&context, "query", "get_all_tags", |span| { + span.set_attribute(KeyValue::new("path", path.clone().unwrap_or_default())); + let path = path.map(|p| p + "%").unwrap_or("%".to_string()); let (id, name, created_time) = tags::all_columns; tags::table @@ -279,9 +329,15 @@ impl TagDao for SqliteTagDao { }) } - fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result> { - trace_db_call(&context, "query", "get_tags_for_path", || { - trace!("Getting Tags for path: {:?}", path); + fn get_tags_for_path( + &mut self, + context: &opentelemetry::Context, + path: &str, + ) -> anyhow::Result> { + trace_db_call(&context, "query", "get_tags_for_path", |span| { + span.set_attribute(KeyValue::new("path", path.to_string())); + + debug!("Getting Tags for path: {:?}", path); tags::table .left_join(tagged_photo::table) .filter(tagged_photo::photo_name.eq(&path)) @@ -292,7 +348,9 @@ impl TagDao for SqliteTagDao { } fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result { - trace_db_call(&context, "insert", "create_tag", || { + trace_db_call(&context, "insert", "create_tag", |span| { + span.set_attribute(KeyValue::new("name", name.to_string())); + diesel::insert_into(tags::table) .values(InsertTag { name: name.to_string(), @@ -322,8 +380,18 @@ impl TagDao for SqliteTagDao { }) } - fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result> { - trace_db_call(&context, "delete", "remove_tag", || { + fn remove_tag( + &mut self, + context: &opentelemetry::Context, + tag_name: &str, + path: &str, + ) -> anyhow::Result> { + trace_db_call(&context, "delete", "remove_tag", |span| { + span.set_attributes(vec![ + KeyValue::new("tag_name", tag_name.to_string()), + KeyValue::new("path", path.to_string()), + ]); + tags::table .filter(tags::name.eq(tag_name)) .get_result::(self.connection.borrow_mut()) @@ -347,8 +415,18 @@ impl TagDao for SqliteTagDao { }) } - fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result { - trace_db_call(&context, "insert", "tag_file", || { + fn tag_file( + &mut self, + context: &opentelemetry::Context, + path: &str, + tag_id: i32, + ) -> anyhow::Result { + trace_db_call(&context, "insert", "tag_file", |span| { + span.set_attributes(vec![ + KeyValue::new("path", path.to_string()), + KeyValue::new("tag_id", tag_id.to_string()), + ]); + diesel::insert_into(tagged_photo::table) .values(InsertTaggedPhoto { tag_id, @@ -386,7 +464,7 @@ impl TagDao for SqliteTagDao { exclude_tag_ids: Vec, context: &opentelemetry::Context, ) -> anyhow::Result> { - trace_db_call(&context, "query", "get_files_with_all_tags", || { + trace_db_call(&context, "query", "get_files_with_all_tags", |_| { use diesel::dsl::*; let exclude_subquery = tagged_photo::table @@ -423,7 +501,7 @@ impl TagDao for SqliteTagDao { exclude_tag_ids: Vec, context: &opentelemetry::Context, ) -> anyhow::Result> { - trace_db_call(&context, "query", "get_files_with_any_tags", || { + trace_db_call(&context, "query", "get_files_with_any_tags", |_| { use diesel::dsl::*; let tag_ids_str = tag_ids @@ -445,12 +523,12 @@ WITH filtered_photos AS ( FROM tagged_photo tp WHERE tp.tag_id IN ({}) AND tp.photo_name NOT IN ( - SELECT photo_name - FROM tagged_photo + SELECT photo_name + FROM tagged_photo WHERE tag_id IN ({}) ) ) - SELECT + SELECT fp.photo_name as file_name, COUNT(DISTINCT tp2.tag_id) as tag_count FROM filtered_photos fp @@ -493,7 +571,11 @@ mod tests { } impl TagDao for TestTagDao { - fn get_all_tags(&mut self, context: &opentelemetry::Context, _option: Option) -> anyhow::Result> { + fn get_all_tags( + &mut self, + context: &opentelemetry::Context, + _option: Option, + ) -> anyhow::Result> { Ok(self .tags .borrow() @@ -503,7 +585,11 @@ mod tests { .clone()) } - fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result> { + fn get_tags_for_path( + &mut self, + context: &opentelemetry::Context, + path: &str, + ) -> anyhow::Result> { info!("Getting test tags for: {:?}", path); warn!("Tags for path: {:?}", self.tagged_photos); @@ -515,7 +601,11 @@ mod tests { .clone()) } - fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result { + fn create_tag( + &mut self, + context: &opentelemetry::Context, + name: &str, + ) -> anyhow::Result { self.tag_count += 1; let tag_id = self.tag_count; @@ -531,7 +621,12 @@ mod tests { Ok(tag) } - fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result> { + fn remove_tag( + &mut self, + context: &opentelemetry::Context, + tag_name: &str, + path: &str, + ) -> anyhow::Result> { let mut clone = { let photo_tags = &self.tagged_photos.borrow()[path]; photo_tags.clone() @@ -551,7 +646,12 @@ mod tests { } } - fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result { + fn tag_file( + &mut self, + context: &opentelemetry::Context, + path: &str, + tag_id: i32, + ) -> anyhow::Result { debug!("Tagging file: {:?} with tag_id: {:?}", path, tag_id); if let Some(tag) = self.tags.borrow().iter().find(|t| t.id == tag_id) { diff --git a/src/video.rs b/src/video.rs index b9112da..87ca6ce 100644 --- a/src/video.rs +++ b/src/video.rs @@ -1,7 +1,10 @@ use crate::is_video; +use crate::otel::global_tracer; use actix::prelude::*; use futures::TryFutureExt; use log::{debug, error, info, trace, warn}; +use opentelemetry::trace::{Span, Tracer}; +use opentelemetry::KeyValue; use std::io::Result; use std::path::{Path, PathBuf}; use std::process::{Child, Command, ExitStatus, Stdio}; @@ -122,6 +125,9 @@ impl Handler for VideoPlaylistManager { type Result = ResponseFuture<()>; fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result { + let tracer = global_tracer(); + let mut span = tracer.start("videoplaylistmanager.scan_directory"); + let start = std::time::Instant::now(); info!( "Starting scan directory for video playlist generation: {}", @@ -157,6 +163,11 @@ impl Handler for VideoPlaylistManager { .expect("Failed to send generate playlist message") { Ok(_) => { + span.add_event( + "Playlist generated", + vec![KeyValue::new("video_path", path_as_str.to_string())], + ); + debug!( "Successfully generated playlist for file: '{}'", path_as_str @@ -168,6 +179,10 @@ impl Handler for VideoPlaylistManager { } } + span.add_event( + "Finished directory scan", + vec![KeyValue::new("directory", scan_dir_name.to_string())], + ); info!( "Finished directory scan of '{}' in {:?}", scan_dir_name,