Recursive Sorting fix and many logging/tracing enhancements #33

Merged
cameron merged 11 commits from feature/fix-recursive-sort into master 2025-06-12 20:03:21 +00:00
3 changed files with 73 additions and 48 deletions
Showing only changes of commit 6aff7f456a - Show all commits

View File

@@ -13,7 +13,7 @@ use actix_web::web::Data;
use actix_web::{web::{self, Query}, HttpRequest, HttpResponse}; use actix_web::{web::{self, Query}, HttpRequest, HttpResponse};
use log::{debug, error, info, trace}; use log::{debug, error, info, trace};
use opentelemetry::KeyValue; use opentelemetry::KeyValue;
use opentelemetry::trace::{Span, Status, Tracer}; use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
use crate::data::{Claims, FilesRequest, FilterMode, PhotosResponse, SortType}; use crate::data::{Claims, FilesRequest, FilterMode, PhotosResponse, SortType};
use crate::{create_thumbnails, AppState}; use crate::{create_thumbnails, AppState};
@@ -41,6 +41,7 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
let context = extract_context_from_request(&request); let context = extract_context_from_request(&request);
let mut span = tracer.start_with_context("list_photos", &context); let mut span = tracer.start_with_context("list_photos", &context);
span.set_attribute(KeyValue::new("path", search_path.to_string())); span.set_attribute(KeyValue::new("path", search_path.to_string()));
let span_context = opentelemetry::Context::current_with_span(span);
let search_recursively = req.recursive.unwrap_or(false); let search_recursively = req.recursive.unwrap_or(false);
if let Some(tag_ids) = &req.tag_ids { if let Some(tag_ids) = &req.tag_ids {
@@ -66,8 +67,8 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
.collect::<Vec<i32>>(); .collect::<Vec<i32>>();
return match filter_mode { return match filter_mode {
FilterMode::Any => dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids), FilterMode::Any => dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context),
FilterMode::All => dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids), FilterMode::All => dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context),
} }
.context(format!( .context(format!(
"Failed to get files with tag_ids: {:?} with filter_mode: {:?}", "Failed to get files with tag_ids: {:?} with filter_mode: {:?}",
@@ -104,8 +105,8 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
tagged_files.len(), tagged_files.len(),
tagged_files tagged_files
); );
span.set_attribute(KeyValue::new("file_count", tagged_files.len().to_string())); span_context.span().set_attribute(KeyValue::new("file_count", tagged_files.len().to_string()));
span.set_status(Status::Ok); span_context.span().set_status(Status::Ok);
HttpResponse::Ok().json(PhotosResponse { HttpResponse::Ok().json(PhotosResponse {
photos: tagged_files, photos: tagged_files,
@@ -138,7 +139,7 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
.map(|f| f.to_str().unwrap().to_string()) .map(|f| f.to_str().unwrap().to_string())
.map(|file_name| { .map(|file_name| {
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
let file_tags = tag_dao.get_tags_for_path(&file_name).unwrap_or_default(); let file_tags = tag_dao.get_tags_for_path(&span_context, &file_name).unwrap_or_default();
(file_name, file_tags) (file_name, file_tags)
}) })
@@ -197,8 +198,8 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
.map(|f| f.to_str().unwrap().to_string()) .map(|f| f.to_str().unwrap().to_string())
.collect::<Vec<String>>(); .collect::<Vec<String>>();
span.set_attribute(KeyValue::new("file_count", files.len().to_string())); span_context.span().set_attribute(KeyValue::new("file_count", files.len().to_string()));
span.set_status(Status::Ok); span_context.span().set_status(Status::Ok);
HttpResponse::Ok().json(PhotosResponse { HttpResponse::Ok().json(PhotosResponse {
photos: response_files, photos: response_files,
@@ -206,7 +207,7 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
}) })
} else { } else {
error!("Bad photos request: {}", req.path); error!("Bad photos request: {}", req.path);
span.set_status(Status::error("Invalid path")); span_context.span().set_status(Status::error("Invalid path"));
HttpResponse::BadRequest().finish() HttpResponse::BadRequest().finish()
} }
} }

View File

@@ -82,15 +82,16 @@ pub fn extract_context_from_request(req: &HttpRequest) -> Context {
propagator.extract(&HeaderExtractor(req.headers())) propagator.extract(&HeaderExtractor(req.headers()))
} }
pub fn trace_db_call<F, O>(query_type: &str, operation: &str, func: F) -> anyhow::Result<O> pub fn trace_db_call<F, O>(context: &Context, query_type: &str, operation: &str, func: F) -> anyhow::Result<O>
where F: FnOnce() -> anyhow::Result<O> { where F: FnOnce() -> anyhow::Result<O> {
let tracer = global::tracer("db"); let tracer = global::tracer("db");
let mut span = tracer let mut span = tracer
.span_builder(format!("db.{}.{}", query_type, operation)) .span_builder(format!("db.{}.{}", query_type, operation))
.with_attributes(vec![ .with_attributes(vec![
KeyValue::new("db.query_type", query_type.to_string().clone()), KeyValue::new("db.query_type", query_type.to_string().clone()),
KeyValue::new("db.operation", operation.to_string().clone()), KeyValue::new("db.operation", operation.to_string().clone()),
]).start_with_context(&tracer, &Context::current()); ]).start_with_context(&tracer, context);
let result = func(); let result = func();
match &result { match &result {

View File

@@ -1,14 +1,15 @@
use crate::data::GetTagsRequest; use crate::data::GetTagsRequest;
use crate::otel::trace_db_call; use crate::otel::{extract_context_from_request, global_tracer, trace_db_call};
use crate::{connect, data::AddTagRequest, error::IntoHttpError, schema, Claims, ThumbnailRequest}; use crate::{connect, data::AddTagRequest, error::IntoHttpError, schema, Claims, ThumbnailRequest};
use actix_web::dev::{ServiceFactory, ServiceRequest}; use actix_web::dev::{ServiceFactory, ServiceRequest};
use actix_web::{web, App, HttpResponse, Responder}; use actix_web::{web, App, HttpRequest, HttpResponse, Responder};
use anyhow::Context; use anyhow::Context;
use chrono::Utc; use chrono::Utc;
use diesel::dsl::count_star; use diesel::dsl::count_star;
use diesel::prelude::*; use diesel::prelude::*;
use diesel::sql_types::*; use diesel::sql_types::*;
use log::{debug, info, trace}; use log::{debug, info, trace};
use opentelemetry::trace::Tracer;
use schema::{tagged_photo, tags}; use schema::{tagged_photo, tags};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::borrow::BorrowMut; use std::borrow::BorrowMut;
@@ -30,35 +31,41 @@ where
async fn add_tag<D: TagDao>( async fn add_tag<D: TagDao>(
_: Claims, _: Claims,
request: HttpRequest,
body: web::Json<AddTagRequest>, body: web::Json<AddTagRequest>,
tag_dao: web::Data<Mutex<D>>, tag_dao: web::Data<Mutex<D>>,
) -> impl Responder { ) -> impl Responder {
let tracer = global_tracer();
let context = extract_context_from_request(&request);
let _ = tracer.start_with_context("add_tag", &context);
let tag_name = body.tag_name.clone(); let tag_name = body.tag_name.clone();
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
tag_dao tag_dao
.get_all_tags(None) .get_all_tags(&context, None)
.and_then(|tags| { .and_then(|tags| {
if let Some((_, tag)) = tags.iter().find(|t| t.1.name == tag_name) { if let Some((_, tag)) = tags.iter().find(|t| t.1.name == tag_name) {
Ok(tag.clone()) Ok(tag.clone())
} else { } else {
tag_dao.create_tag(tag_name.trim()) tag_dao.create_tag(&context, tag_name.trim())
} }
}) })
.and_then(|tag| tag_dao.tag_file(&body.file_name, tag.id)) .and_then(|tag| tag_dao.tag_file(&context, &body.file_name, tag.id))
.map(|_| HttpResponse::Ok()) .map(|_| HttpResponse::Ok())
.into_http_internal_err() .into_http_internal_err()
} }
async fn get_tags<D: TagDao>( async fn get_tags<D: TagDao>(
_: Claims, _: Claims,
http_request: HttpRequest,
request: web::Query<ThumbnailRequest>, request: web::Query<ThumbnailRequest>,
tag_dao: web::Data<Mutex<D>>, tag_dao: web::Data<Mutex<D>>,
) -> impl Responder { ) -> impl Responder {
let context = extract_context_from_request(&http_request);
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
tag_dao tag_dao
.get_tags_for_path(&request.path) .get_tags_for_path(&context, &request.path)
.map(|tags| HttpResponse::Ok().json(tags)) .map(|tags| HttpResponse::Ok().json(tags))
.into_http_internal_err() .into_http_internal_err()
} }
@@ -66,11 +73,13 @@ async fn get_tags<D: TagDao>(
async fn get_all_tags<D: TagDao>( async fn get_all_tags<D: TagDao>(
_: Claims, _: Claims,
tag_dao: web::Data<Mutex<D>>, tag_dao: web::Data<Mutex<D>>,
request: HttpRequest,
query: web::Query<GetTagsRequest>, query: web::Query<GetTagsRequest>,
) -> impl Responder { ) -> impl Responder {
let context = extract_context_from_request(&request);
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
tag_dao tag_dao
.get_all_tags(query.path.clone()) .get_all_tags(&context, query.path.clone())
.map(|tags| { .map(|tags| {
HttpResponse::Ok().json( HttpResponse::Ok().json(
tags.iter() tags.iter()
@@ -86,12 +95,14 @@ async fn get_all_tags<D: TagDao>(
async fn remove_tagged_photo<D: TagDao>( async fn remove_tagged_photo<D: TagDao>(
_: Claims, _: Claims,
http_request: HttpRequest,
request: web::Json<AddTagRequest>, request: web::Json<AddTagRequest>,
tag_dao: web::Data<Mutex<D>>, tag_dao: web::Data<Mutex<D>>,
) -> impl Responder { ) -> impl Responder {
let context = extract_context_from_request(&http_request);
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao"); let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
tag_dao tag_dao
.remove_tag(&request.tag_name, &request.file_name) .remove_tag(&context, &request.tag_name, &request.file_name)
.map(|result| { .map(|result| {
if result.is_some() { if result.is_some() {
HttpResponse::Ok() HttpResponse::Ok()
@@ -105,12 +116,14 @@ async fn remove_tagged_photo<D: TagDao>(
async fn update_tags<D: TagDao>( async fn update_tags<D: TagDao>(
_: Claims, _: Claims,
tag_dao: web::Data<Mutex<D>>, tag_dao: web::Data<Mutex<D>>,
http_request: HttpRequest,
request: web::Json<AddTagsRequest>, request: web::Json<AddTagsRequest>,
) -> impl Responder { ) -> impl Responder {
let mut dao = tag_dao.lock().expect("Unable to get TagDao"); let mut dao = tag_dao.lock().expect("Unable to get TagDao");
let context = extract_context_from_request(&http_request);
dao.get_tags_for_path(&request.file_name) dao.get_tags_for_path(&context, &request.file_name)
.and_then(|existing_tags| dao.get_all_tags(None).map(|all| (existing_tags, all))) .and_then(|existing_tags| dao.get_all_tags(&context, None).map(|all| (existing_tags, all)))
.map(|(existing_tags, all_tags)| { .map(|(existing_tags, all_tags)| {
let tags_to_remove = existing_tags let tags_to_remove = existing_tags
.iter() .iter()
@@ -122,7 +135,7 @@ async fn update_tags<D: TagDao>(
"Removing tag {:?} from file: {:?}", "Removing tag {:?} from file: {:?}",
tag.name, request.file_name tag.name, request.file_name
); );
dao.remove_tag(&tag.name, &request.file_name) dao.remove_tag(&context, &tag.name, &request.file_name)
.unwrap_or_else(|err| panic!("{:?} Unable to remove tag {:?}", err, &tag.name)); .unwrap_or_else(|err| panic!("{:?} Unable to remove tag {:?}", err, &tag.name));
} }
@@ -137,7 +150,7 @@ async fn update_tags<D: TagDao>(
new_tag.name, request.file_name new_tag.name, request.file_name
); );
dao.tag_file(&request.file_name, new_tag.id) dao.tag_file(&context, &request.file_name, new_tag.id)
.with_context(|| { .with_context(|| {
format!( format!(
"Unable to tag file {:?} with tag: {:?}", "Unable to tag file {:?} with tag: {:?}",
@@ -195,20 +208,26 @@ pub struct AddTagsRequest {
} }
pub trait TagDao { pub trait TagDao {
fn get_all_tags(&mut self, path: Option<String>) -> anyhow::Result<Vec<(i64, Tag)>>; fn get_all_tags(
fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result<Vec<Tag>>; &mut self,
fn create_tag(&mut self, name: &str) -> anyhow::Result<Tag>; context: &opentelemetry::Context,
fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result<Option<()>>; path: Option<String>,
fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto>; ) -> anyhow::Result<Vec<(i64, Tag)>>;
fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result<Vec<Tag>>;
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag>;
fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result<Option<()>>;
fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto>;
fn get_files_with_all_tag_ids( fn get_files_with_all_tag_ids(
&mut self, &mut self,
tag_ids: Vec<i32>, tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>, exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>>; ) -> anyhow::Result<Vec<FileWithTagCount>>;
fn get_files_with_any_tag_ids( fn get_files_with_any_tag_ids(
&mut self, &mut self,
tag_ids: Vec<i32>, tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>, exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>>; ) -> anyhow::Result<Vec<FileWithTagCount>>;
} }
@@ -229,10 +248,10 @@ impl Default for SqliteTagDao {
} }
impl TagDao for SqliteTagDao { impl TagDao for SqliteTagDao {
fn get_all_tags(&mut self, path: Option<String>) -> anyhow::Result<Vec<(i64, Tag)>> { fn get_all_tags(&mut self, context: &opentelemetry::Context, path: Option<String>) -> anyhow::Result<Vec<(i64, Tag)>> {
// select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*); // select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*);
trace_db_call("query", "get_all_tags", || { trace_db_call(&context, "query", "get_all_tags", || {
let path = path.map(|p| p + "%").unwrap_or("%".to_string()); let path = path.map(|p| p + "%").unwrap_or("%".to_string());
let (id, name, created_time) = tags::all_columns; let (id, name, created_time) = tags::all_columns;
tags::table tags::table
@@ -260,8 +279,8 @@ impl TagDao for SqliteTagDao {
}) })
} }
fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result<Vec<Tag>> { fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result<Vec<Tag>> {
trace_db_call("query", "get_tags_for_path", || { trace_db_call(&context, "query", "get_tags_for_path", || {
trace!("Getting Tags for path: {:?}", path); trace!("Getting Tags for path: {:?}", path);
tags::table tags::table
.left_join(tagged_photo::table) .left_join(tagged_photo::table)
@@ -272,8 +291,8 @@ impl TagDao for SqliteTagDao {
}) })
} }
fn create_tag(&mut self, name: &str) -> anyhow::Result<Tag> { fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag> {
trace_db_call("insert", "create_tag", || { trace_db_call(&context, "insert", "create_tag", || {
diesel::insert_into(tags::table) diesel::insert_into(tags::table)
.values(InsertTag { .values(InsertTag {
name: name.to_string(), name: name.to_string(),
@@ -303,8 +322,8 @@ impl TagDao for SqliteTagDao {
}) })
} }
fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result<Option<()>> { fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result<Option<()>> {
trace_db_call("delete", "remove_tag", || { trace_db_call(&context, "delete", "remove_tag", || {
tags::table tags::table
.filter(tags::name.eq(tag_name)) .filter(tags::name.eq(tag_name))
.get_result::<Tag>(self.connection.borrow_mut()) .get_result::<Tag>(self.connection.borrow_mut())
@@ -328,8 +347,8 @@ impl TagDao for SqliteTagDao {
}) })
} }
fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto> { fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto> {
trace_db_call("insert", "tag_file", || { trace_db_call(&context, "insert", "tag_file", || {
diesel::insert_into(tagged_photo::table) diesel::insert_into(tagged_photo::table)
.values(InsertTaggedPhoto { .values(InsertTaggedPhoto {
tag_id, tag_id,
@@ -365,8 +384,9 @@ impl TagDao for SqliteTagDao {
&mut self, &mut self,
tag_ids: Vec<i32>, tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>, exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>> { ) -> anyhow::Result<Vec<FileWithTagCount>> {
trace_db_call("query", "get_files_with_all_tags", || { trace_db_call(&context, "query", "get_files_with_all_tags", || {
use diesel::dsl::*; use diesel::dsl::*;
let exclude_subquery = tagged_photo::table let exclude_subquery = tagged_photo::table
@@ -401,8 +421,9 @@ impl TagDao for SqliteTagDao {
&mut self, &mut self,
tag_ids: Vec<i32>, tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>, exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>> { ) -> anyhow::Result<Vec<FileWithTagCount>> {
trace_db_call("query", "get_files_with_any_tags", || { trace_db_call(&context, "query", "get_files_with_any_tags", || {
use diesel::dsl::*; use diesel::dsl::*;
let tag_ids_str = tag_ids let tag_ids_str = tag_ids
@@ -472,7 +493,7 @@ mod tests {
} }
impl TagDao for TestTagDao { impl TagDao for TestTagDao {
fn get_all_tags(&mut self, _option: Option<String>) -> anyhow::Result<Vec<(i64, Tag)>> { fn get_all_tags(&mut self, context: &opentelemetry::Context, _option: Option<String>) -> anyhow::Result<Vec<(i64, Tag)>> {
Ok(self Ok(self
.tags .tags
.borrow() .borrow()
@@ -482,7 +503,7 @@ mod tests {
.clone()) .clone())
} }
fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result<Vec<Tag>> { fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result<Vec<Tag>> {
info!("Getting test tags for: {:?}", path); info!("Getting test tags for: {:?}", path);
warn!("Tags for path: {:?}", self.tagged_photos); warn!("Tags for path: {:?}", self.tagged_photos);
@@ -494,7 +515,7 @@ mod tests {
.clone()) .clone())
} }
fn create_tag(&mut self, name: &str) -> anyhow::Result<Tag> { fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag> {
self.tag_count += 1; self.tag_count += 1;
let tag_id = self.tag_count; let tag_id = self.tag_count;
@@ -510,7 +531,7 @@ mod tests {
Ok(tag) Ok(tag)
} }
fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result<Option<()>> { fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result<Option<()>> {
let mut clone = { let mut clone = {
let photo_tags = &self.tagged_photos.borrow()[path]; let photo_tags = &self.tagged_photos.borrow()[path];
photo_tags.clone() photo_tags.clone()
@@ -530,7 +551,7 @@ mod tests {
} }
} }
fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto> { fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto> {
debug!("Tagging file: {:?} with tag_id: {:?}", path, tag_id); debug!("Tagging file: {:?} with tag_id: {:?}", path, tag_id);
if let Some(tag) = self.tags.borrow().iter().find(|t| t.id == tag_id) { if let Some(tag) = self.tags.borrow().iter().find(|t| t.id == tag_id) {
@@ -566,15 +587,17 @@ mod tests {
fn get_files_with_all_tag_ids( fn get_files_with_all_tag_ids(
&mut self, &mut self,
tag_ids: Vec<i32>, tag_ids: Vec<i32>,
_exclude_tag_ids: Vec<i32>, exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>> { ) -> anyhow::Result<Vec<FileWithTagCount>> {
todo!() todo!()
} }
fn get_files_with_any_tag_ids( fn get_files_with_any_tag_ids(
&mut self, &mut self,
_tag_ids: Vec<i32>, tag_ids: Vec<i32>,
_exclude_tag_ids: Vec<i32>, exclude_tag_ids: Vec<i32>,
context: &opentelemetry::Context,
) -> anyhow::Result<Vec<FileWithTagCount>> { ) -> anyhow::Result<Vec<FileWithTagCount>> {
todo!() todo!()
} }