Recursive Sorting fix and many logging/tracing enhancements #33

Merged
cameron merged 11 commits from feature/fix-recursive-sort into master 2025-06-12 20:03:21 +00:00
2 changed files with 180 additions and 149 deletions
Showing only changes of commit 1e63e0c08c - Show all commits

View File

@@ -3,6 +3,7 @@ use actix_web::HttpRequest;
use opentelemetry::global::BoxedTracer; use opentelemetry::global::BoxedTracer;
use opentelemetry::{global, Context, KeyValue}; use opentelemetry::{global, Context, KeyValue};
use opentelemetry::propagation::TextMapPropagator; use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::trace::Tracer;
use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry_otlp::WithExportConfig; use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs::{BatchLogProcessor, SdkLoggerProvider}; use opentelemetry_sdk::logs::{BatchLogProcessor, SdkLoggerProvider};
@@ -13,6 +14,7 @@ pub fn global_tracer() -> BoxedTracer {
global::tracer("image-server") global::tracer("image-server")
} }
#[allow(dead_code)]
pub fn init_tracing() { pub fn init_tracing() {
let resources = Resource::builder() let resources = Resource::builder()
.with_attributes([ .with_attributes([
@@ -35,6 +37,7 @@ pub fn init_tracing() {
global::set_tracer_provider(tracer_provider); global::set_tracer_provider(tracer_provider);
} }
#[allow(dead_code)]
pub fn init_logs() { pub fn init_logs() {
let otlp_exporter = opentelemetry_otlp::LogExporter::builder() let otlp_exporter = opentelemetry_otlp::LogExporter::builder()
.with_tonic() .with_tonic()
@@ -47,7 +50,7 @@ pub fn init_logs() {
let resources = Resource::builder() let resources = Resource::builder()
.with_attributes([ .with_attributes([
KeyValue::new("service.name", "image-server"), KeyValue::new("service.name", "image-server"),
KeyValue::new("service.version", "1.0"), KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
]) ])
.build(); .build();
@@ -78,3 +81,16 @@ pub fn extract_context_from_request(req: &HttpRequest) -> Context {
let propagator = TraceContextPropagator::new(); let propagator = TraceContextPropagator::new();
propagator.extract(&HeaderExtractor(req.headers())) propagator.extract(&HeaderExtractor(req.headers()))
} }
pub fn trace_db_call<F, O>(operation: &str, query_type: &str, func: F) -> anyhow::Result<O>
where F: FnOnce() -> anyhow::Result<O> {
let tracer = global::tracer("db");
let _span = tracer
.span_builder(format!("db.{}.{}", operation, query_type))
.with_attributes(vec![
KeyValue::new("db.operation", operation.to_string().clone()),
KeyValue::new("db.query_type", query_type.to_string().clone()),
]).start(&tracer);
func()
}

View File

@@ -1,4 +1,5 @@
use crate::data::GetTagsRequest; use crate::data::GetTagsRequest;
use crate::otel::trace_db_call;
use crate::{connect, data::AddTagRequest, error::IntoHttpError, schema, Claims, ThumbnailRequest}; use crate::{connect, data::AddTagRequest, error::IntoHttpError, schema, Claims, ThumbnailRequest};
use actix_web::dev::{ServiceFactory, ServiceRequest}; use actix_web::dev::{ServiceFactory, ServiceRequest};
use actix_web::{web, App, HttpResponse, Responder}; use actix_web::{web, App, HttpResponse, Responder};
@@ -231,123 +232,133 @@ impl TagDao for SqliteTagDao {
fn get_all_tags(&mut self, path: Option<String>) -> anyhow::Result<Vec<(i64, Tag)>> { fn get_all_tags(&mut self, path: Option<String>) -> anyhow::Result<Vec<(i64, Tag)>> {
// select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*); // select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*);
let path = path.map(|p| p + "%").unwrap_or("%".to_string()); trace_db_call("query", "get_all_tags", || {
let (id, name, created_time) = tags::all_columns; let path = path.map(|p| p + "%").unwrap_or("%".to_string());
tags::table let (id, name, created_time) = tags::all_columns;
.inner_join(tagged_photo::table) tags::table
.group_by(tags::id) .inner_join(tagged_photo::table)
.select((count_star(), id, name, created_time)) .group_by(tags::id)
.filter(tagged_photo::photo_name.like(path)) .select((count_star(), id, name, created_time))
.get_results(&mut self.connection) .filter(tagged_photo::photo_name.like(path))
.map::<Vec<(i64, Tag)>, _>(|tags_with_count: Vec<(i64, i32, String, i64)>| { .get_results(&mut self.connection)
tags_with_count .map::<Vec<(i64, Tag)>, _>(|tags_with_count: Vec<(i64, i32, String, i64)>| {
.iter() tags_with_count
.map(|tup| { .iter()
( .map(|tup| {
tup.0, (
Tag { tup.0,
id: tup.1, Tag {
name: tup.2.clone(), id: tup.1,
created_time: tup.3, name: tup.2.clone(),
}, created_time: tup.3,
) },
}) )
.collect() })
}) .collect()
.with_context(|| "Unable to get all tags") })
.with_context(|| "Unable to get all tags")
})
} }
fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result<Vec<Tag>> { fn get_tags_for_path(&mut self, path: &str) -> anyhow::Result<Vec<Tag>> {
trace!("Getting Tags for path: {:?}", path); trace_db_call("query", "get_tags_for_path", || {
tags::table trace!("Getting Tags for path: {:?}", path);
.left_join(tagged_photo::table) tags::table
.filter(tagged_photo::photo_name.eq(&path)) .left_join(tagged_photo::table)
.select((tags::id, tags::name, tags::created_time)) .filter(tagged_photo::photo_name.eq(&path))
.get_results::<Tag>(self.connection.borrow_mut()) .select((tags::id, tags::name, tags::created_time))
.with_context(|| "Unable to get tags from Sqlite") .get_results::<Tag>(self.connection.borrow_mut())
.with_context(|| "Unable to get tags from Sqlite")
})
} }
fn create_tag(&mut self, name: &str) -> anyhow::Result<Tag> { fn create_tag(&mut self, name: &str) -> anyhow::Result<Tag> {
diesel::insert_into(tags::table) trace_db_call("insert", "create_tag", || {
.values(InsertTag { diesel::insert_into(tags::table)
name: name.to_string(), .values(InsertTag {
created_time: Utc::now().timestamp(), name: name.to_string(),
}) created_time: Utc::now().timestamp(),
.execute(&mut self.connection) })
.with_context(|| format!("Unable to insert tag {:?} in Sqlite", name)) .execute(&mut self.connection)
.and_then(|_| { .with_context(|| format!("Unable to insert tag {:?} in Sqlite", name))
info!("Inserted tag: {:?}", name); .and_then(|_| {
define_sql_function! { info!("Inserted tag: {:?}", name);
fn last_insert_rowid() -> Integer; define_sql_function! {
} fn last_insert_rowid() -> Integer;
diesel::select(last_insert_rowid()) }
.get_result::<i32>(&mut self.connection) diesel::select(last_insert_rowid())
.with_context(|| "Unable to get last inserted tag from Sqlite") .get_result::<i32>(&mut self.connection)
}) .with_context(|| "Unable to get last inserted tag from Sqlite")
.and_then(|id| { })
debug!("Got id: {:?} for inserted tag: {:?}", id, name); .and_then(|id| {
tags::table debug!("Got id: {:?} for inserted tag: {:?}", id, name);
.filter(tags::id.eq(id)) tags::table
.select((tags::id, tags::name, tags::created_time)) .filter(tags::id.eq(id))
.get_result::<Tag>(self.connection.borrow_mut()) .select((tags::id, tags::name, tags::created_time))
.with_context(|| { .get_result::<Tag>(self.connection.borrow_mut())
format!("Unable to get tagged photo with id: {:?} from Sqlite", id) .with_context(|| {
}) format!("Unable to get tagged photo with id: {:?} from Sqlite", id)
}) })
})
})
} }
fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result<Option<()>> { fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result<Option<()>> {
tags::table trace_db_call("delete", "remove_tag", || {
.filter(tags::name.eq(tag_name)) tags::table
.get_result::<Tag>(self.connection.borrow_mut()) .filter(tags::name.eq(tag_name))
.optional() .get_result::<Tag>(self.connection.borrow_mut())
.with_context(|| format!("Unable to get tag '{}'", tag_name)) .optional()
.and_then(|tag| { .with_context(|| format!("Unable to get tag '{}'", tag_name))
if let Some(tag) = tag { .and_then(|tag| {
diesel::delete( if let Some(tag) = tag {
tagged_photo::table diesel::delete(
.filter(tagged_photo::tag_id.eq(tag.id)) tagged_photo::table
.filter(tagged_photo::photo_name.eq(path)), .filter(tagged_photo::tag_id.eq(tag.id))
) .filter(tagged_photo::photo_name.eq(path)),
.execute(&mut self.connection) )
.with_context(|| format!("Unable to delete tag: '{}'", &tag.name)) .execute(&mut self.connection)
.map(|_| Some(())) .with_context(|| format!("Unable to delete tag: '{}'", &tag.name))
} else { .map(|_| Some(()))
info!("No tag found with name '{}'", tag_name); } else {
Ok(None) info!("No tag found with name '{}'", tag_name);
} Ok(None)
}) }
})
})
} }
fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto> { fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto> {
diesel::insert_into(tagged_photo::table) trace_db_call("insert", "tag_file", || {
.values(InsertTaggedPhoto { diesel::insert_into(tagged_photo::table)
tag_id, .values(InsertTaggedPhoto {
photo_name: path.to_string(), tag_id,
created_time: Utc::now().timestamp(), photo_name: path.to_string(),
}) created_time: Utc::now().timestamp(),
.execute(self.connection.borrow_mut()) })
.with_context(|| format!("Unable to tag file {:?} in sqlite", path)) .execute(self.connection.borrow_mut())
.and_then(|_| { .with_context(|| format!("Unable to tag file {:?} in sqlite", path))
info!("Inserted tagged photo: {:#} -> {:?}", tag_id, path); .and_then(|_| {
define_sql_function! { info!("Inserted tagged photo: {:#} -> {:?}", tag_id, path);
fn last_insert_rowid() -> diesel::sql_types::Integer; define_sql_function! {
} fn last_insert_rowid() -> diesel::sql_types::Integer;
diesel::select(last_insert_rowid()) }
.get_result::<i32>(&mut self.connection) diesel::select(last_insert_rowid())
.with_context(|| "Unable to get last inserted tag from Sqlite") .get_result::<i32>(&mut self.connection)
}) .with_context(|| "Unable to get last inserted tag from Sqlite")
.and_then(|tagged_id| { })
tagged_photo::table .and_then(|tagged_id| {
.find(tagged_id) tagged_photo::table
.first(self.connection.borrow_mut()) .find(tagged_id)
.with_context(|| { .first(self.connection.borrow_mut())
format!( .with_context(|| {
"Error getting inserted tagged photo with id: {:?}", format!(
tagged_id "Error getting inserted tagged photo with id: {:?}",
) tagged_id
}) )
}) })
})
})
} }
fn get_files_with_all_tag_ids( fn get_files_with_all_tag_ids(
@@ -355,33 +366,35 @@ impl TagDao for SqliteTagDao {
tag_ids: Vec<i32>, tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>, exclude_tag_ids: Vec<i32>,
) -> anyhow::Result<Vec<FileWithTagCount>> { ) -> anyhow::Result<Vec<FileWithTagCount>> {
use diesel::dsl::*; trace_db_call("query", "get_files_with_all_tags", || {
use diesel::dsl::*;
let exclude_subquery = tagged_photo::table let exclude_subquery = tagged_photo::table
.filter(tagged_photo::tag_id.eq_any(exclude_tag_ids.clone())) .filter(tagged_photo::tag_id.eq_any(exclude_tag_ids.clone()))
.select(tagged_photo::photo_name) .select(tagged_photo::photo_name)
.into_boxed(); .into_boxed();
tagged_photo::table tagged_photo::table
.filter(tagged_photo::tag_id.eq_any(tag_ids.clone())) .filter(tagged_photo::tag_id.eq_any(tag_ids.clone()))
.filter(tagged_photo::photo_name.ne_all(exclude_subquery)) .filter(tagged_photo::photo_name.ne_all(exclude_subquery))
.group_by(tagged_photo::photo_name) .group_by(tagged_photo::photo_name)
.select(( .select((
tagged_photo::photo_name, tagged_photo::photo_name,
count_distinct(tagged_photo::tag_id), count_distinct(tagged_photo::tag_id),
)) ))
.having(count_distinct(tagged_photo::tag_id).ge(tag_ids.len() as i64)) .having(count_distinct(tagged_photo::tag_id).ge(tag_ids.len() as i64))
.get_results::<(String, i64)>(&mut self.connection) .get_results::<(String, i64)>(&mut self.connection)
.map(|results| { .map(|results| {
results results
.into_iter() .into_iter()
.map(|(file_name, tag_count)| FileWithTagCount { .map(|(file_name, tag_count)| FileWithTagCount {
file_name, file_name,
tag_count, tag_count,
}) })
.collect() .collect()
}) })
.with_context(|| format!("Unable to get Tagged photos with ids: {:?}", tag_ids)) .with_context(|| format!("Unable to get Tagged photos with ids: {:?}", tag_ids))
})
} }
fn get_files_with_any_tag_ids( fn get_files_with_any_tag_ids(
@@ -389,22 +402,23 @@ impl TagDao for SqliteTagDao {
tag_ids: Vec<i32>, tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>, exclude_tag_ids: Vec<i32>,
) -> anyhow::Result<Vec<FileWithTagCount>> { ) -> anyhow::Result<Vec<FileWithTagCount>> {
use diesel::dsl::*; trace_db_call("query", "get_files_with_any_tags", || {
use diesel::dsl::*;
let tag_ids_str = tag_ids let tag_ids_str = tag_ids
.iter() .iter()
.map(|id| id.to_string()) .map(|id| id.to_string())
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(","); .join(",");
let exclude_tag_ids_str = exclude_tag_ids let exclude_tag_ids_str = exclude_tag_ids
.iter() .iter()
.map(|id| id.to_string()) .map(|id| id.to_string())
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(","); .join(",");
let query = sql_query(format!( let query = sql_query(format!(
r#" r#"
WITH filtered_photos AS ( WITH filtered_photos AS (
SELECT DISTINCT photo_name SELECT DISTINCT photo_name
FROM tagged_photo tp FROM tagged_photo tp
@@ -421,12 +435,13 @@ WITH filtered_photos AS (
FROM filtered_photos fp FROM filtered_photos fp
JOIN tagged_photo tp2 ON fp.photo_name = tp2.photo_name JOIN tagged_photo tp2 ON fp.photo_name = tp2.photo_name
GROUP BY fp.photo_name"#, GROUP BY fp.photo_name"#,
tag_ids_str, exclude_tag_ids_str tag_ids_str, exclude_tag_ids_str
)); ));
// Execute the query: // Execute the query:
let results = query.load::<FileWithTagCount>(&mut self.connection)?; let results = query.load::<FileWithTagCount>(&mut self.connection)?;
Ok(results) Ok(results)
})
} }
} }