Additional logging and tracing enhancements
This commit is contained in:
65
src/files.rs
65
src/files.rs
@@ -9,23 +9,26 @@ use ::anyhow;
|
|||||||
use actix::{Handler, Message};
|
use actix::{Handler, Message};
|
||||||
use anyhow::{anyhow, Context};
|
use anyhow::{anyhow, Context};
|
||||||
|
|
||||||
use actix_web::web::Data;
|
|
||||||
use actix_web::{web::{self, Query}, HttpRequest, HttpResponse};
|
|
||||||
use log::{debug, error, info, trace};
|
|
||||||
use opentelemetry::KeyValue;
|
|
||||||
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
|
|
||||||
use crate::data::{Claims, FilesRequest, FilterMode, PhotosResponse, SortType};
|
use crate::data::{Claims, FilesRequest, FilterMode, PhotosResponse, SortType};
|
||||||
use crate::{create_thumbnails, AppState};
|
use crate::{create_thumbnails, AppState};
|
||||||
|
use actix_web::web::Data;
|
||||||
|
use actix_web::{
|
||||||
|
web::{self, Query},
|
||||||
|
HttpRequest, HttpResponse,
|
||||||
|
};
|
||||||
|
use log::{debug, error, info, trace};
|
||||||
|
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
|
||||||
|
use opentelemetry::KeyValue;
|
||||||
|
|
||||||
use crate::data::SortType::NameAsc;
|
use crate::data::SortType::NameAsc;
|
||||||
use crate::error::IntoHttpError;
|
use crate::error::IntoHttpError;
|
||||||
|
use crate::otel::{extract_context_from_request, global_tracer};
|
||||||
use crate::tags::{FileWithTagCount, TagDao};
|
use crate::tags::{FileWithTagCount, TagDao};
|
||||||
use crate::video::StreamActor;
|
use crate::video::StreamActor;
|
||||||
use path_absolutize::*;
|
use path_absolutize::*;
|
||||||
use rand::prelude::SliceRandom;
|
use rand::prelude::SliceRandom;
|
||||||
use rand::thread_rng;
|
use rand::thread_rng;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use crate::otel::{extract_context_from_request, global_tracer};
|
|
||||||
|
|
||||||
pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
||||||
_: Claims,
|
_: Claims,
|
||||||
@@ -40,7 +43,24 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
|||||||
let tracer = global_tracer();
|
let tracer = global_tracer();
|
||||||
let context = extract_context_from_request(&request);
|
let context = extract_context_from_request(&request);
|
||||||
let mut span = tracer.start_with_context("list_photos", &context);
|
let mut span = tracer.start_with_context("list_photos", &context);
|
||||||
span.set_attribute(KeyValue::new("path", search_path.to_string()));
|
span.set_attributes(vec![
|
||||||
|
KeyValue::new("path", search_path.to_string()),
|
||||||
|
KeyValue::new("recursive", req.recursive.unwrap_or(false).to_string()),
|
||||||
|
KeyValue::new(
|
||||||
|
"tag_ids",
|
||||||
|
req.tag_ids.clone().unwrap_or_default().to_string(),
|
||||||
|
),
|
||||||
|
KeyValue::new(
|
||||||
|
"tag_filter_mode",
|
||||||
|
format!("{:?}", req.tag_filter_mode.unwrap_or(FilterMode::Any)),
|
||||||
|
),
|
||||||
|
KeyValue::new(
|
||||||
|
"exclude_tag_ids",
|
||||||
|
req.exclude_tag_ids.clone().unwrap_or_default().to_string(),
|
||||||
|
),
|
||||||
|
KeyValue::new("sort", format!("{:?}", &req.sort.unwrap_or(NameAsc))),
|
||||||
|
]);
|
||||||
|
|
||||||
let span_context = opentelemetry::Context::current_with_span(span);
|
let span_context = opentelemetry::Context::current_with_span(span);
|
||||||
|
|
||||||
let search_recursively = req.recursive.unwrap_or(false);
|
let search_recursively = req.recursive.unwrap_or(false);
|
||||||
@@ -67,8 +87,12 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
|||||||
.collect::<Vec<i32>>();
|
.collect::<Vec<i32>>();
|
||||||
|
|
||||||
return match filter_mode {
|
return match filter_mode {
|
||||||
FilterMode::Any => dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context),
|
FilterMode::Any => {
|
||||||
FilterMode::All => dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context),
|
dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context)
|
||||||
|
}
|
||||||
|
FilterMode::All => {
|
||||||
|
dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids, &span_context)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
.context(format!(
|
.context(format!(
|
||||||
"Failed to get files with tag_ids: {:?} with filter_mode: {:?}",
|
"Failed to get files with tag_ids: {:?} with filter_mode: {:?}",
|
||||||
@@ -105,7 +129,9 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
|||||||
tagged_files.len(),
|
tagged_files.len(),
|
||||||
tagged_files
|
tagged_files
|
||||||
);
|
);
|
||||||
span_context.span().set_attribute(KeyValue::new("file_count", tagged_files.len().to_string()));
|
span_context
|
||||||
|
.span()
|
||||||
|
.set_attribute(KeyValue::new("file_count", tagged_files.len().to_string()));
|
||||||
span_context.span().set_status(Status::Ok);
|
span_context.span().set_status(Status::Ok);
|
||||||
|
|
||||||
HttpResponse::Ok().json(PhotosResponse {
|
HttpResponse::Ok().json(PhotosResponse {
|
||||||
@@ -139,7 +165,9 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
|||||||
.map(|f| f.to_str().unwrap().to_string())
|
.map(|f| f.to_str().unwrap().to_string())
|
||||||
.map(|file_name| {
|
.map(|file_name| {
|
||||||
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
|
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
|
||||||
let file_tags = tag_dao.get_tags_for_path(&span_context, &file_name).unwrap_or_default();
|
let file_tags = tag_dao
|
||||||
|
.get_tags_for_path(&span_context, &file_name)
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
(file_name, file_tags)
|
(file_name, file_tags)
|
||||||
})
|
})
|
||||||
@@ -198,7 +226,9 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
|||||||
.map(|f| f.to_str().unwrap().to_string())
|
.map(|f| f.to_str().unwrap().to_string())
|
||||||
.collect::<Vec<String>>();
|
.collect::<Vec<String>>();
|
||||||
|
|
||||||
span_context.span().set_attribute(KeyValue::new("file_count", files.len().to_string()));
|
span_context
|
||||||
|
.span()
|
||||||
|
.set_attribute(KeyValue::new("file_count", files.len().to_string()));
|
||||||
span_context.span().set_status(Status::Ok);
|
span_context.span().set_status(Status::Ok);
|
||||||
|
|
||||||
HttpResponse::Ok().json(PhotosResponse {
|
HttpResponse::Ok().json(PhotosResponse {
|
||||||
@@ -207,7 +237,9 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
|
|||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
error!("Bad photos request: {}", req.path);
|
error!("Bad photos request: {}", req.path);
|
||||||
span_context.span().set_status(Status::error("Invalid path"));
|
span_context
|
||||||
|
.span()
|
||||||
|
.set_status(Status::error("Invalid path"));
|
||||||
HttpResponse::BadRequest().finish()
|
HttpResponse::BadRequest().finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -249,6 +281,7 @@ pub fn list_files(dir: &Path) -> io::Result<Vec<PathBuf>> {
|
|||||||
.collect::<Vec<PathBuf>>();
|
.collect::<Vec<PathBuf>>();
|
||||||
|
|
||||||
span.set_attribute(KeyValue::new("file_count", files.len().to_string()));
|
span.set_attribute(KeyValue::new("file_count", files.len().to_string()));
|
||||||
|
span.set_status(Status::Ok);
|
||||||
info!("Found {:?} files in directory: {:?}", files.len(), dir);
|
info!("Found {:?} files in directory: {:?}", files.len(), dir);
|
||||||
Ok(files)
|
Ok(files)
|
||||||
}
|
}
|
||||||
@@ -395,7 +428,11 @@ impl FileSystemAccess for RealFileSystem {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn move_file<P: AsRef<Path>>(&self, from: P, destination: P) -> anyhow::Result<()> {
|
fn move_file<P: AsRef<Path>>(&self, from: P, destination: P) -> anyhow::Result<()> {
|
||||||
info!("Moving file: '{:?}' -> '{:?}'", from.as_ref(), destination.as_ref());
|
info!(
|
||||||
|
"Moving file: '{:?}' -> '{:?}'",
|
||||||
|
from.as_ref(),
|
||||||
|
destination.as_ref()
|
||||||
|
);
|
||||||
let name = from
|
let name = from
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.file_name()
|
.file_name()
|
||||||
|
|||||||
@@ -676,7 +676,13 @@ fn watch_files() {
|
|||||||
let ev = wrx.recv();
|
let ev = wrx.recv();
|
||||||
if let Ok(Ok(event)) = ev {
|
if let Ok(Ok(event)) = ev {
|
||||||
match event.kind {
|
match event.kind {
|
||||||
EventKind::Create(_) => create_thumbnails(),
|
EventKind::Create(create_kind) => {
|
||||||
|
info!(
|
||||||
|
"Creating thumbnails {:?} create event kind: {:?}",
|
||||||
|
event.paths, create_kind
|
||||||
|
);
|
||||||
|
create_thumbnails();
|
||||||
|
}
|
||||||
EventKind::Modify(kind) => {
|
EventKind::Modify(kind) => {
|
||||||
debug!("All modified paths: {:?}", event.paths);
|
debug!("All modified paths: {:?}", event.paths);
|
||||||
debug!("Modify kind: {:?}", kind);
|
debug!("Modify kind: {:?}", kind);
|
||||||
|
|||||||
23
src/otel.rs
23
src/otel.rs
@@ -1,6 +1,6 @@
|
|||||||
use actix_web::http::header::HeaderMap;
|
use actix_web::http::header::HeaderMap;
|
||||||
use actix_web::HttpRequest;
|
use actix_web::HttpRequest;
|
||||||
use opentelemetry::global::BoxedTracer;
|
use opentelemetry::global::{BoxedSpan, BoxedTracer};
|
||||||
use opentelemetry::propagation::TextMapPropagator;
|
use opentelemetry::propagation::TextMapPropagator;
|
||||||
use opentelemetry::trace::{Span, Status, Tracer};
|
use opentelemetry::trace::{Span, Status, Tracer};
|
||||||
use opentelemetry::{global, Context, KeyValue};
|
use opentelemetry::{global, Context, KeyValue};
|
||||||
@@ -82,25 +82,30 @@ pub fn extract_context_from_request(req: &HttpRequest) -> Context {
|
|||||||
propagator.extract(&HeaderExtractor(req.headers()))
|
propagator.extract(&HeaderExtractor(req.headers()))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn trace_db_call<F, O>(context: &Context, query_type: &str, operation: &str, func: F) -> anyhow::Result<O>
|
pub fn trace_db_call<F, O>(
|
||||||
where F: FnOnce() -> anyhow::Result<O> {
|
context: &Context,
|
||||||
|
query_type: &str,
|
||||||
|
operation: &str,
|
||||||
|
func: F,
|
||||||
|
) -> anyhow::Result<O>
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut BoxedSpan) -> anyhow::Result<O>,
|
||||||
|
{
|
||||||
let tracer = global::tracer("db");
|
let tracer = global::tracer("db");
|
||||||
let mut span = tracer
|
let mut span = tracer
|
||||||
.span_builder(format!("db.{}.{}", query_type, operation))
|
.span_builder(format!("db.{}.{}", query_type, operation))
|
||||||
.with_attributes(vec![
|
.with_attributes(vec![
|
||||||
KeyValue::new("db.query_type", query_type.to_string().clone()),
|
KeyValue::new("db.query_type", query_type.to_string().clone()),
|
||||||
KeyValue::new("db.operation", operation.to_string().clone()),
|
KeyValue::new("db.operation", operation.to_string().clone()),
|
||||||
]).start_with_context(&tracer, context);
|
])
|
||||||
|
.start_with_context(&tracer, context);
|
||||||
|
|
||||||
let result = func();
|
let result = func(&mut span);
|
||||||
match &result {
|
match &result {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
span.set_status(Status::Ok);
|
span.set_status(Status::Ok);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => span.set_status(Status::error(e.to_string())),
|
||||||
span.set_status(Status::error(e.to_string()))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
result
|
result
|
||||||
|
|||||||
170
src/tags.rs
170
src/tags.rs
@@ -8,8 +8,9 @@ use chrono::Utc;
|
|||||||
use diesel::dsl::count_star;
|
use diesel::dsl::count_star;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
use diesel::sql_types::*;
|
use diesel::sql_types::*;
|
||||||
use log::{debug, info, trace};
|
use log::{debug, info};
|
||||||
use opentelemetry::trace::Tracer;
|
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
|
||||||
|
use opentelemetry::KeyValue;
|
||||||
use schema::{tagged_photo, tags};
|
use schema::{tagged_photo, tags};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::borrow::BorrowMut;
|
use std::borrow::BorrowMut;
|
||||||
@@ -37,22 +38,30 @@ async fn add_tag<D: TagDao>(
|
|||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
let tracer = global_tracer();
|
let tracer = global_tracer();
|
||||||
let context = extract_context_from_request(&request);
|
let context = extract_context_from_request(&request);
|
||||||
let _ = tracer.start_with_context("add_tag", &context);
|
let span = tracer.start_with_context("add_tag", &context);
|
||||||
|
let span_context = opentelemetry::Context::current_with_span(span);
|
||||||
let tag_name = body.tag_name.clone();
|
let tag_name = body.tag_name.clone();
|
||||||
|
|
||||||
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
|
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
|
||||||
|
|
||||||
tag_dao
|
tag_dao
|
||||||
.get_all_tags(&context, None)
|
.get_all_tags(&span_context, None)
|
||||||
.and_then(|tags| {
|
.and_then(|tags| {
|
||||||
if let Some((_, tag)) = tags.iter().find(|t| t.1.name == tag_name) {
|
if let Some((_, tag)) = tags.iter().find(|t| t.1.name == tag_name) {
|
||||||
Ok(tag.clone())
|
Ok(tag.clone())
|
||||||
} else {
|
} else {
|
||||||
tag_dao.create_tag(&context, tag_name.trim())
|
info!(
|
||||||
|
"Creating missing tag: '{:?}' for file: '{}'",
|
||||||
|
tag_name, &body.file_name
|
||||||
|
);
|
||||||
|
tag_dao.create_tag(&span_context, tag_name.trim())
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.and_then(|tag| tag_dao.tag_file(&context, &body.file_name, tag.id))
|
.and_then(|tag| tag_dao.tag_file(&span_context, &body.file_name, tag.id))
|
||||||
.map(|_| HttpResponse::Ok())
|
.map(|_| {
|
||||||
|
span_context.span().set_status(Status::Ok);
|
||||||
|
HttpResponse::Ok()
|
||||||
|
})
|
||||||
.into_http_internal_err()
|
.into_http_internal_err()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -63,10 +72,15 @@ async fn get_tags<D: TagDao>(
|
|||||||
tag_dao: web::Data<Mutex<D>>,
|
tag_dao: web::Data<Mutex<D>>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
let context = extract_context_from_request(&http_request);
|
let context = extract_context_from_request(&http_request);
|
||||||
|
let span = global_tracer().start_with_context("get_tags", &context);
|
||||||
|
let span_context = opentelemetry::Context::current_with_span(span);
|
||||||
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
|
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
|
||||||
tag_dao
|
tag_dao
|
||||||
.get_tags_for_path(&context, &request.path)
|
.get_tags_for_path(&span_context, &request.path)
|
||||||
.map(|tags| HttpResponse::Ok().json(tags))
|
.map(|tags| {
|
||||||
|
span_context.span().set_status(Status::Ok);
|
||||||
|
HttpResponse::Ok().json(tags)
|
||||||
|
})
|
||||||
.into_http_internal_err()
|
.into_http_internal_err()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -77,10 +91,15 @@ async fn get_all_tags<D: TagDao>(
|
|||||||
query: web::Query<GetTagsRequest>,
|
query: web::Query<GetTagsRequest>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
let context = extract_context_from_request(&request);
|
let context = extract_context_from_request(&request);
|
||||||
|
let span = global_tracer().start_with_context("get_all_tags", &context);
|
||||||
|
let span_context = opentelemetry::Context::current_with_span(span);
|
||||||
|
|
||||||
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
|
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
|
||||||
tag_dao
|
tag_dao
|
||||||
.get_all_tags(&context, query.path.clone())
|
.get_all_tags(&span_context, query.path.clone())
|
||||||
.map(|tags| {
|
.map(|tags| {
|
||||||
|
span_context.span().set_status(Status::Ok);
|
||||||
|
|
||||||
HttpResponse::Ok().json(
|
HttpResponse::Ok().json(
|
||||||
tags.iter()
|
tags.iter()
|
||||||
.map(|(tag_count, tag)| TagWithTagCount {
|
.map(|(tag_count, tag)| TagWithTagCount {
|
||||||
@@ -100,10 +119,15 @@ async fn remove_tagged_photo<D: TagDao>(
|
|||||||
tag_dao: web::Data<Mutex<D>>,
|
tag_dao: web::Data<Mutex<D>>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
let context = extract_context_from_request(&http_request);
|
let context = extract_context_from_request(&http_request);
|
||||||
|
let span = global_tracer().start_with_context("remove_tagged_photo", &context);
|
||||||
|
let span_context = opentelemetry::Context::current_with_span(span);
|
||||||
|
|
||||||
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
|
let mut tag_dao = tag_dao.lock().expect("Unable to get TagDao");
|
||||||
tag_dao
|
tag_dao
|
||||||
.remove_tag(&context, &request.tag_name, &request.file_name)
|
.remove_tag(&span_context, &request.tag_name, &request.file_name)
|
||||||
.map(|result| {
|
.map(|result| {
|
||||||
|
span_context.span().set_status(Status::Ok);
|
||||||
|
|
||||||
if result.is_some() {
|
if result.is_some() {
|
||||||
HttpResponse::Ok()
|
HttpResponse::Ok()
|
||||||
} else {
|
} else {
|
||||||
@@ -121,9 +145,14 @@ async fn update_tags<D: TagDao>(
|
|||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
let mut dao = tag_dao.lock().expect("Unable to get TagDao");
|
let mut dao = tag_dao.lock().expect("Unable to get TagDao");
|
||||||
let context = extract_context_from_request(&http_request);
|
let context = extract_context_from_request(&http_request);
|
||||||
|
let span = global_tracer().start_with_context("update_tags", &context);
|
||||||
|
let span_context = opentelemetry::Context::current_with_span(span);
|
||||||
|
|
||||||
dao.get_tags_for_path(&context, &request.file_name)
|
dao.get_tags_for_path(&span_context, &request.file_name)
|
||||||
.and_then(|existing_tags| dao.get_all_tags(&context, None).map(|all| (existing_tags, all)))
|
.and_then(|existing_tags| {
|
||||||
|
dao.get_all_tags(&span_context, None)
|
||||||
|
.map(|all| (existing_tags, all))
|
||||||
|
})
|
||||||
.map(|(existing_tags, all_tags)| {
|
.map(|(existing_tags, all_tags)| {
|
||||||
let tags_to_remove = existing_tags
|
let tags_to_remove = existing_tags
|
||||||
.iter()
|
.iter()
|
||||||
@@ -135,7 +164,7 @@ async fn update_tags<D: TagDao>(
|
|||||||
"Removing tag {:?} from file: {:?}",
|
"Removing tag {:?} from file: {:?}",
|
||||||
tag.name, request.file_name
|
tag.name, request.file_name
|
||||||
);
|
);
|
||||||
dao.remove_tag(&context, &tag.name, &request.file_name)
|
dao.remove_tag(&span_context, &tag.name, &request.file_name)
|
||||||
.unwrap_or_else(|err| panic!("{:?} Unable to remove tag {:?}", err, &tag.name));
|
.unwrap_or_else(|err| panic!("{:?} Unable to remove tag {:?}", err, &tag.name));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -150,7 +179,7 @@ async fn update_tags<D: TagDao>(
|
|||||||
new_tag.name, request.file_name
|
new_tag.name, request.file_name
|
||||||
);
|
);
|
||||||
|
|
||||||
dao.tag_file(&context, &request.file_name, new_tag.id)
|
dao.tag_file(&span_context, &request.file_name, new_tag.id)
|
||||||
.with_context(|| {
|
.with_context(|| {
|
||||||
format!(
|
format!(
|
||||||
"Unable to tag file {:?} with tag: {:?}",
|
"Unable to tag file {:?} with tag: {:?}",
|
||||||
@@ -160,6 +189,7 @@ async fn update_tags<D: TagDao>(
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
span_context.span().set_status(Status::Ok);
|
||||||
HttpResponse::Ok()
|
HttpResponse::Ok()
|
||||||
})
|
})
|
||||||
.into_http_internal_err()
|
.into_http_internal_err()
|
||||||
@@ -213,10 +243,24 @@ pub trait TagDao {
|
|||||||
context: &opentelemetry::Context,
|
context: &opentelemetry::Context,
|
||||||
path: Option<String>,
|
path: Option<String>,
|
||||||
) -> anyhow::Result<Vec<(i64, Tag)>>;
|
) -> anyhow::Result<Vec<(i64, Tag)>>;
|
||||||
fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result<Vec<Tag>>;
|
fn get_tags_for_path(
|
||||||
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
path: &str,
|
||||||
|
) -> anyhow::Result<Vec<Tag>>;
|
||||||
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag>;
|
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag>;
|
||||||
fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result<Option<()>>;
|
fn remove_tag(
|
||||||
fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto>;
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
tag_name: &str,
|
||||||
|
path: &str,
|
||||||
|
) -> anyhow::Result<Option<()>>;
|
||||||
|
fn tag_file(
|
||||||
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
path: &str,
|
||||||
|
tag_id: i32,
|
||||||
|
) -> anyhow::Result<TaggedPhoto>;
|
||||||
fn get_files_with_all_tag_ids(
|
fn get_files_with_all_tag_ids(
|
||||||
&mut self,
|
&mut self,
|
||||||
tag_ids: Vec<i32>,
|
tag_ids: Vec<i32>,
|
||||||
@@ -248,10 +292,16 @@ impl Default for SqliteTagDao {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TagDao for SqliteTagDao {
|
impl TagDao for SqliteTagDao {
|
||||||
fn get_all_tags(&mut self, context: &opentelemetry::Context, path: Option<String>) -> anyhow::Result<Vec<(i64, Tag)>> {
|
fn get_all_tags(
|
||||||
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
path: Option<String>,
|
||||||
|
) -> anyhow::Result<Vec<(i64, Tag)>> {
|
||||||
// select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*);
|
// select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*);
|
||||||
|
|
||||||
trace_db_call(&context, "query", "get_all_tags", || {
|
trace_db_call(&context, "query", "get_all_tags", |span| {
|
||||||
|
span.set_attribute(KeyValue::new("path", path.clone().unwrap_or_default()));
|
||||||
|
|
||||||
let path = path.map(|p| p + "%").unwrap_or("%".to_string());
|
let path = path.map(|p| p + "%").unwrap_or("%".to_string());
|
||||||
let (id, name, created_time) = tags::all_columns;
|
let (id, name, created_time) = tags::all_columns;
|
||||||
tags::table
|
tags::table
|
||||||
@@ -279,9 +329,15 @@ impl TagDao for SqliteTagDao {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result<Vec<Tag>> {
|
fn get_tags_for_path(
|
||||||
trace_db_call(&context, "query", "get_tags_for_path", || {
|
&mut self,
|
||||||
trace!("Getting Tags for path: {:?}", path);
|
context: &opentelemetry::Context,
|
||||||
|
path: &str,
|
||||||
|
) -> anyhow::Result<Vec<Tag>> {
|
||||||
|
trace_db_call(&context, "query", "get_tags_for_path", |span| {
|
||||||
|
span.set_attribute(KeyValue::new("path", path.to_string()));
|
||||||
|
|
||||||
|
debug!("Getting Tags for path: {:?}", path);
|
||||||
tags::table
|
tags::table
|
||||||
.left_join(tagged_photo::table)
|
.left_join(tagged_photo::table)
|
||||||
.filter(tagged_photo::photo_name.eq(&path))
|
.filter(tagged_photo::photo_name.eq(&path))
|
||||||
@@ -292,7 +348,9 @@ impl TagDao for SqliteTagDao {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag> {
|
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag> {
|
||||||
trace_db_call(&context, "insert", "create_tag", || {
|
trace_db_call(&context, "insert", "create_tag", |span| {
|
||||||
|
span.set_attribute(KeyValue::new("name", name.to_string()));
|
||||||
|
|
||||||
diesel::insert_into(tags::table)
|
diesel::insert_into(tags::table)
|
||||||
.values(InsertTag {
|
.values(InsertTag {
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
@@ -322,8 +380,18 @@ impl TagDao for SqliteTagDao {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result<Option<()>> {
|
fn remove_tag(
|
||||||
trace_db_call(&context, "delete", "remove_tag", || {
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
tag_name: &str,
|
||||||
|
path: &str,
|
||||||
|
) -> anyhow::Result<Option<()>> {
|
||||||
|
trace_db_call(&context, "delete", "remove_tag", |span| {
|
||||||
|
span.set_attributes(vec![
|
||||||
|
KeyValue::new("tag_name", tag_name.to_string()),
|
||||||
|
KeyValue::new("path", path.to_string()),
|
||||||
|
]);
|
||||||
|
|
||||||
tags::table
|
tags::table
|
||||||
.filter(tags::name.eq(tag_name))
|
.filter(tags::name.eq(tag_name))
|
||||||
.get_result::<Tag>(self.connection.borrow_mut())
|
.get_result::<Tag>(self.connection.borrow_mut())
|
||||||
@@ -347,8 +415,18 @@ impl TagDao for SqliteTagDao {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto> {
|
fn tag_file(
|
||||||
trace_db_call(&context, "insert", "tag_file", || {
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
path: &str,
|
||||||
|
tag_id: i32,
|
||||||
|
) -> anyhow::Result<TaggedPhoto> {
|
||||||
|
trace_db_call(&context, "insert", "tag_file", |span| {
|
||||||
|
span.set_attributes(vec![
|
||||||
|
KeyValue::new("path", path.to_string()),
|
||||||
|
KeyValue::new("tag_id", tag_id.to_string()),
|
||||||
|
]);
|
||||||
|
|
||||||
diesel::insert_into(tagged_photo::table)
|
diesel::insert_into(tagged_photo::table)
|
||||||
.values(InsertTaggedPhoto {
|
.values(InsertTaggedPhoto {
|
||||||
tag_id,
|
tag_id,
|
||||||
@@ -386,7 +464,7 @@ impl TagDao for SqliteTagDao {
|
|||||||
exclude_tag_ids: Vec<i32>,
|
exclude_tag_ids: Vec<i32>,
|
||||||
context: &opentelemetry::Context,
|
context: &opentelemetry::Context,
|
||||||
) -> anyhow::Result<Vec<FileWithTagCount>> {
|
) -> anyhow::Result<Vec<FileWithTagCount>> {
|
||||||
trace_db_call(&context, "query", "get_files_with_all_tags", || {
|
trace_db_call(&context, "query", "get_files_with_all_tags", |_| {
|
||||||
use diesel::dsl::*;
|
use diesel::dsl::*;
|
||||||
|
|
||||||
let exclude_subquery = tagged_photo::table
|
let exclude_subquery = tagged_photo::table
|
||||||
@@ -423,7 +501,7 @@ impl TagDao for SqliteTagDao {
|
|||||||
exclude_tag_ids: Vec<i32>,
|
exclude_tag_ids: Vec<i32>,
|
||||||
context: &opentelemetry::Context,
|
context: &opentelemetry::Context,
|
||||||
) -> anyhow::Result<Vec<FileWithTagCount>> {
|
) -> anyhow::Result<Vec<FileWithTagCount>> {
|
||||||
trace_db_call(&context, "query", "get_files_with_any_tags", || {
|
trace_db_call(&context, "query", "get_files_with_any_tags", |_| {
|
||||||
use diesel::dsl::*;
|
use diesel::dsl::*;
|
||||||
|
|
||||||
let tag_ids_str = tag_ids
|
let tag_ids_str = tag_ids
|
||||||
@@ -493,7 +571,11 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TagDao for TestTagDao {
|
impl TagDao for TestTagDao {
|
||||||
fn get_all_tags(&mut self, context: &opentelemetry::Context, _option: Option<String>) -> anyhow::Result<Vec<(i64, Tag)>> {
|
fn get_all_tags(
|
||||||
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
_option: Option<String>,
|
||||||
|
) -> anyhow::Result<Vec<(i64, Tag)>> {
|
||||||
Ok(self
|
Ok(self
|
||||||
.tags
|
.tags
|
||||||
.borrow()
|
.borrow()
|
||||||
@@ -503,7 +585,11 @@ mod tests {
|
|||||||
.clone())
|
.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_tags_for_path(&mut self, context: &opentelemetry::Context, path: &str) -> anyhow::Result<Vec<Tag>> {
|
fn get_tags_for_path(
|
||||||
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
path: &str,
|
||||||
|
) -> anyhow::Result<Vec<Tag>> {
|
||||||
info!("Getting test tags for: {:?}", path);
|
info!("Getting test tags for: {:?}", path);
|
||||||
warn!("Tags for path: {:?}", self.tagged_photos);
|
warn!("Tags for path: {:?}", self.tagged_photos);
|
||||||
|
|
||||||
@@ -515,7 +601,11 @@ mod tests {
|
|||||||
.clone())
|
.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag> {
|
fn create_tag(
|
||||||
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
name: &str,
|
||||||
|
) -> anyhow::Result<Tag> {
|
||||||
self.tag_count += 1;
|
self.tag_count += 1;
|
||||||
let tag_id = self.tag_count;
|
let tag_id = self.tag_count;
|
||||||
|
|
||||||
@@ -531,7 +621,12 @@ mod tests {
|
|||||||
Ok(tag)
|
Ok(tag)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_tag(&mut self, context: &opentelemetry::Context, tag_name: &str, path: &str) -> anyhow::Result<Option<()>> {
|
fn remove_tag(
|
||||||
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
tag_name: &str,
|
||||||
|
path: &str,
|
||||||
|
) -> anyhow::Result<Option<()>> {
|
||||||
let mut clone = {
|
let mut clone = {
|
||||||
let photo_tags = &self.tagged_photos.borrow()[path];
|
let photo_tags = &self.tagged_photos.borrow()[path];
|
||||||
photo_tags.clone()
|
photo_tags.clone()
|
||||||
@@ -551,7 +646,12 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn tag_file(&mut self, context: &opentelemetry::Context, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto> {
|
fn tag_file(
|
||||||
|
&mut self,
|
||||||
|
context: &opentelemetry::Context,
|
||||||
|
path: &str,
|
||||||
|
tag_id: i32,
|
||||||
|
) -> anyhow::Result<TaggedPhoto> {
|
||||||
debug!("Tagging file: {:?} with tag_id: {:?}", path, tag_id);
|
debug!("Tagging file: {:?} with tag_id: {:?}", path, tag_id);
|
||||||
|
|
||||||
if let Some(tag) = self.tags.borrow().iter().find(|t| t.id == tag_id) {
|
if let Some(tag) = self.tags.borrow().iter().find(|t| t.id == tag_id) {
|
||||||
|
|||||||
15
src/video.rs
15
src/video.rs
@@ -1,7 +1,10 @@
|
|||||||
use crate::is_video;
|
use crate::is_video;
|
||||||
|
use crate::otel::global_tracer;
|
||||||
use actix::prelude::*;
|
use actix::prelude::*;
|
||||||
use futures::TryFutureExt;
|
use futures::TryFutureExt;
|
||||||
use log::{debug, error, info, trace, warn};
|
use log::{debug, error, info, trace, warn};
|
||||||
|
use opentelemetry::trace::{Span, Tracer};
|
||||||
|
use opentelemetry::KeyValue;
|
||||||
use std::io::Result;
|
use std::io::Result;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::process::{Child, Command, ExitStatus, Stdio};
|
use std::process::{Child, Command, ExitStatus, Stdio};
|
||||||
@@ -122,6 +125,9 @@ impl Handler<ScanDirectoryMessage> for VideoPlaylistManager {
|
|||||||
type Result = ResponseFuture<()>;
|
type Result = ResponseFuture<()>;
|
||||||
|
|
||||||
fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result {
|
fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result {
|
||||||
|
let tracer = global_tracer();
|
||||||
|
let mut span = tracer.start("videoplaylistmanager.scan_directory");
|
||||||
|
|
||||||
let start = std::time::Instant::now();
|
let start = std::time::Instant::now();
|
||||||
info!(
|
info!(
|
||||||
"Starting scan directory for video playlist generation: {}",
|
"Starting scan directory for video playlist generation: {}",
|
||||||
@@ -157,6 +163,11 @@ impl Handler<ScanDirectoryMessage> for VideoPlaylistManager {
|
|||||||
.expect("Failed to send generate playlist message")
|
.expect("Failed to send generate playlist message")
|
||||||
{
|
{
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
|
span.add_event(
|
||||||
|
"Playlist generated",
|
||||||
|
vec![KeyValue::new("video_path", path_as_str.to_string())],
|
||||||
|
);
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Successfully generated playlist for file: '{}'",
|
"Successfully generated playlist for file: '{}'",
|
||||||
path_as_str
|
path_as_str
|
||||||
@@ -168,6 +179,10 @@ impl Handler<ScanDirectoryMessage> for VideoPlaylistManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
span.add_event(
|
||||||
|
"Finished directory scan",
|
||||||
|
vec![KeyValue::new("directory", scan_dir_name.to_string())],
|
||||||
|
);
|
||||||
info!(
|
info!(
|
||||||
"Finished directory scan of '{}' in {:?}",
|
"Finished directory scan of '{}' in {:?}",
|
||||||
scan_dir_name,
|
scan_dir_name,
|
||||||
|
|||||||
Reference in New Issue
Block a user