Add Simple OpenTelemetry setup
This commit is contained in:
94
src/main.rs
94
src/main.rs
@@ -32,18 +32,20 @@ use diesel::sqlite::Sqlite;
|
||||
use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
|
||||
use rayon::prelude::*;
|
||||
|
||||
use log::{debug, error, info, trace, warn};
|
||||
|
||||
use crate::auth::login;
|
||||
use crate::data::*;
|
||||
use crate::database::*;
|
||||
use crate::files::{
|
||||
is_image_or_video, is_valid_full_path, move_file, RealFileSystem, RefreshThumbnailsMessage,
|
||||
};
|
||||
use crate::otel::global_tracer;
|
||||
use crate::service::ServiceBuilder;
|
||||
use crate::state::AppState;
|
||||
use crate::tags::*;
|
||||
use crate::video::*;
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
|
||||
use opentelemetry::{global, KeyValue};
|
||||
|
||||
mod auth;
|
||||
mod data;
|
||||
@@ -54,6 +56,7 @@ mod state;
|
||||
mod tags;
|
||||
mod video;
|
||||
|
||||
mod otel;
|
||||
mod service;
|
||||
#[cfg(test)]
|
||||
mod testhelpers;
|
||||
@@ -113,6 +116,8 @@ async fn get_file_metadata(
|
||||
path: web::Query<ThumbnailRequest>,
|
||||
app_state: Data<AppState>,
|
||||
) -> impl Responder {
|
||||
let tracer = global_tracer();
|
||||
let mut span = tracer.start("get_file_metadata");
|
||||
match is_valid_full_path(&app_state.base_path, &path.path, false)
|
||||
.ok_or_else(|| ErrorKind::InvalidData.into())
|
||||
.and_then(File::open)
|
||||
@@ -120,10 +125,19 @@ async fn get_file_metadata(
|
||||
{
|
||||
Ok(metadata) => {
|
||||
let response: MetadataResponse = metadata.into();
|
||||
span.add_event(
|
||||
"Metadata fetched",
|
||||
vec![KeyValue::new("file", path.path.clone())],
|
||||
);
|
||||
span.set_status(Status::Ok);
|
||||
|
||||
HttpResponse::Ok().json(response)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error getting metadata for file '{}': {:?}", path.path, e);
|
||||
let message = format!("Error getting metadata for file '{}': {:?}", path.path, e);
|
||||
error!("{}", message);
|
||||
span.set_status(Status::error(message));
|
||||
|
||||
HttpResponse::InternalServerError().finish()
|
||||
}
|
||||
}
|
||||
@@ -135,6 +149,9 @@ async fn upload_image(
|
||||
mut payload: mp::Multipart,
|
||||
app_state: Data<AppState>,
|
||||
) -> impl Responder {
|
||||
let tracer = global_tracer();
|
||||
let mut span = tracer.start("upload_image");
|
||||
|
||||
let mut file_content: BytesMut = BytesMut::new();
|
||||
let mut file_name: Option<String> = None;
|
||||
let mut file_path: Option<String> = None;
|
||||
@@ -167,6 +184,10 @@ async fn upload_image(
|
||||
&full_path.to_str().unwrap().to_string(),
|
||||
true,
|
||||
) {
|
||||
let context = opentelemetry::Context::new().with_remote_span_context(span.span_context().clone());
|
||||
tracer.span_builder("file write")
|
||||
.start_with_context(&tracer, &context);
|
||||
|
||||
if !full_path.is_file() && is_image_or_video(&full_path) {
|
||||
let mut file = File::create(&full_path).unwrap();
|
||||
file.write_all(&file_content).unwrap();
|
||||
@@ -193,13 +214,16 @@ async fn upload_image(
|
||||
}
|
||||
} else {
|
||||
error!("Invalid path for upload: {:?}", full_path);
|
||||
span.set_status(Status::error("Invalid path for upload"));
|
||||
return HttpResponse::BadRequest().body("Path was not valid");
|
||||
}
|
||||
} else {
|
||||
span.set_status(Status::error("No file body read"));
|
||||
return HttpResponse::BadRequest().body("No file body read");
|
||||
}
|
||||
|
||||
app_state.stream_manager.do_send(RefreshThumbnailsMessage);
|
||||
span.set_status(Status::Ok);
|
||||
|
||||
HttpResponse::Ok().finish()
|
||||
}
|
||||
@@ -210,6 +234,9 @@ async fn generate_video(
|
||||
app_state: Data<AppState>,
|
||||
body: web::Json<ThumbnailRequest>,
|
||||
) -> impl Responder {
|
||||
let tracer = global_tracer();
|
||||
let mut span = tracer.start("generate_video");
|
||||
|
||||
let filename = PathBuf::from(&body.path);
|
||||
|
||||
if let Some(name) = filename.file_name() {
|
||||
@@ -217,17 +244,29 @@ async fn generate_video(
|
||||
let playlist = format!("{}/{}.m3u8", app_state.video_path, filename);
|
||||
if let Some(path) = is_valid_full_path(&app_state.base_path, &body.path, false) {
|
||||
if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await {
|
||||
app_state
|
||||
.stream_manager
|
||||
.do_send(ProcessMessage(playlist.clone(), child));
|
||||
span.add_event(
|
||||
"playlist_created".to_string(),
|
||||
vec![KeyValue::new("playlist-name", filename.to_string())],
|
||||
);
|
||||
|
||||
span.set_status(Status::Ok);
|
||||
app_state.stream_manager.do_send(ProcessMessage(
|
||||
playlist.clone(),
|
||||
child,
|
||||
// opentelemetry::Context::new().with_span(span),
|
||||
));
|
||||
}
|
||||
} else {
|
||||
span.set_status(Status::error(format!("invalid path {:?}", &body.path)));
|
||||
return HttpResponse::BadRequest().finish();
|
||||
}
|
||||
|
||||
HttpResponse::Ok().json(playlist)
|
||||
} else {
|
||||
error!("Unable to get file name: {:?}", filename);
|
||||
let message = format!("Unable to get file name: {:?}", filename);
|
||||
error!("{}", message);
|
||||
span.set_status(Status::error(message));
|
||||
|
||||
HttpResponse::BadRequest().finish()
|
||||
}
|
||||
}
|
||||
@@ -239,6 +278,9 @@ async fn stream_video(
|
||||
path: web::Query<ThumbnailRequest>,
|
||||
app_state: Data<AppState>,
|
||||
) -> impl Responder {
|
||||
let tracer = global::tracer("image-server");
|
||||
let mut span = tracer.start("stream_video");
|
||||
|
||||
let playlist = &path.path;
|
||||
debug!("Playlist: {}", playlist);
|
||||
|
||||
@@ -246,10 +288,14 @@ async fn stream_video(
|
||||
if !playlist.starts_with(&app_state.video_path)
|
||||
&& is_valid_full_path(&app_state.base_path, playlist, false).is_some()
|
||||
{
|
||||
span.set_status(Status::error(format!("playlist not valid {}", playlist)));
|
||||
|
||||
HttpResponse::BadRequest().finish()
|
||||
} else if let Ok(file) = NamedFile::open(playlist) {
|
||||
span.set_status(Status::Ok);
|
||||
file.into_response(&request)
|
||||
} else {
|
||||
span.set_status(Status::error(format!("playlist not found {}", playlist)));
|
||||
HttpResponse::NotFound().finish()
|
||||
}
|
||||
}
|
||||
@@ -261,6 +307,9 @@ async fn get_video_part(
|
||||
path: web::Path<ThumbnailRequest>,
|
||||
app_state: Data<AppState>,
|
||||
) -> impl Responder {
|
||||
let tracer = global::tracer("image-server");
|
||||
let mut span = tracer.start("get_video_part");
|
||||
|
||||
let part = &path.path;
|
||||
debug!("Video part: {}", part);
|
||||
|
||||
@@ -269,9 +318,11 @@ async fn get_video_part(
|
||||
file_part.push(part);
|
||||
// TODO: Do we need to guard against directory attacks here?
|
||||
if let Ok(file) = NamedFile::open(&file_part) {
|
||||
span.set_status(Status::Ok);
|
||||
file.into_response(&request)
|
||||
} else {
|
||||
error!("Video part not found: {:?}", file_part);
|
||||
span.set_status(Status::error(format!("Video part not found '{}'", file_part.to_str().unwrap())));
|
||||
HttpResponse::NotFound().finish()
|
||||
}
|
||||
}
|
||||
@@ -376,6 +427,9 @@ async fn delete_favorite(
|
||||
}
|
||||
|
||||
fn create_thumbnails() {
|
||||
let tracer = global_tracer();
|
||||
let span = tracer.start("creating thumbnails");
|
||||
|
||||
let thumbs = &dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
|
||||
let thumbnail_directory: &Path = Path::new(thumbs);
|
||||
|
||||
@@ -398,8 +452,18 @@ fn create_thumbnails() {
|
||||
)
|
||||
.expect("Error creating directory");
|
||||
|
||||
let mut video_span = tracer.start_with_context(
|
||||
"generate_video_thumbnail",
|
||||
&opentelemetry::Context::new().with_remote_span_context(span.span_context().clone()),
|
||||
);
|
||||
video_span.set_attributes(vec![
|
||||
KeyValue::new("type", "video"),
|
||||
KeyValue::new("file-name", thumb_path.display().to_string()),
|
||||
]);
|
||||
|
||||
debug!("Generating video thumbnail: {:?}", thumb_path);
|
||||
generate_video_thumbnail(entry.path(), &thumb_path);
|
||||
video_span.end();
|
||||
false
|
||||
} else {
|
||||
is_image(entry)
|
||||
@@ -474,15 +538,19 @@ fn main() -> std::io::Result<()> {
|
||||
if let Err(err) = dotenv::dotenv() {
|
||||
println!("Error parsing .env {:?}", err);
|
||||
}
|
||||
env_logger::init();
|
||||
// env_logger::init();
|
||||
|
||||
run_migrations(&mut connect()).expect("Failed to run migrations");
|
||||
|
||||
create_thumbnails();
|
||||
watch_files();
|
||||
|
||||
let system = actix::System::new();
|
||||
system.block_on(async {
|
||||
|
||||
otel::init_logs();
|
||||
otel::init_tracing();
|
||||
create_thumbnails();
|
||||
|
||||
let app_data = Data::new(AppState::default());
|
||||
|
||||
let labels = HashMap::new();
|
||||
@@ -501,11 +569,9 @@ fn main() -> std::io::Result<()> {
|
||||
.unwrap();
|
||||
|
||||
let app_state = app_data.clone();
|
||||
app_state
|
||||
.playlist_manager
|
||||
.do_send(ScanDirectoryMessage {
|
||||
directory: app_state.base_path.clone(),
|
||||
});
|
||||
app_state.playlist_manager.do_send(ScanDirectoryMessage {
|
||||
directory: app_state.base_path.clone(),
|
||||
});
|
||||
|
||||
HttpServer::new(move || {
|
||||
let user_dao = SqliteUserDao::new();
|
||||
|
||||
Reference in New Issue
Block a user