Add polling-based file watching

Remove notify and update otel creates
This commit is contained in:
Cameron
2025-12-22 22:54:19 -05:00
parent df94010d21
commit 47d3ad7222
5 changed files with 267 additions and 272 deletions

View File

@@ -14,9 +14,9 @@ pub mod otel;
pub mod service;
pub mod state;
pub mod tags;
pub mod video;
#[cfg(test)]
pub mod testhelpers;
pub mod video;
// Re-export commonly used types
pub use data::{Claims, ThumbnailRequest};

View File

@@ -9,8 +9,8 @@ use futures::stream::StreamExt;
use lazy_static::lazy_static;
use prometheus::{self, IntGauge};
use std::error::Error;
use std::sync::Mutex;
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use std::{collections::HashMap, io::prelude::*};
use std::{env, fs::File};
use std::{
@@ -26,10 +26,8 @@ use actix_web::{
App, HttpRequest, HttpResponse, HttpServer, Responder, delete, get, middleware, post, put,
web::{self, BufMut, BytesMut},
};
use anyhow::Context;
use chrono::Utc;
use diesel::sqlite::Sqlite;
use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use rayon::prelude::*;
use crate::auth::login;
@@ -824,62 +822,214 @@ fn run_migrations(
fn watch_files() {
std::thread::spawn(|| {
let (wtx, wrx) = channel();
let mut watcher = RecommendedWatcher::new(wtx, Config::default()).unwrap();
let base_str = dotenv::var("BASE_PATH").unwrap();
let base_path = Path::new(&base_str);
let base_path = PathBuf::from(&base_str);
watcher
.watch(base_path, RecursiveMode::Recursive)
.context(format!("Unable to watch BASE_PATH: '{}'", base_str))
.unwrap();
// Get polling intervals from environment variables
// Quick scan: Check recently modified files (default: 60 seconds)
let quick_interval_secs = dotenv::var("WATCH_QUICK_INTERVAL_SECONDS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(60);
// Full scan: Check all files regardless of modification time (default: 3600 seconds = 1 hour)
let full_interval_secs = dotenv::var("WATCH_FULL_INTERVAL_SECONDS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(3600);
info!("Starting optimized file watcher");
info!(" Quick scan interval: {} seconds", quick_interval_secs);
info!(" Full scan interval: {} seconds", full_interval_secs);
info!(" Watching directory: {}", base_str);
// Create EXIF DAO for tracking processed files
let exif_dao = Arc::new(Mutex::new(
Box::new(SqliteExifDao::new()) as Box<dyn ExifDao>
));
let mut last_quick_scan = SystemTime::now();
let mut last_full_scan = SystemTime::now();
let mut scan_count = 0u64;
loop {
let ev = wrx.recv();
if let Ok(Ok(event)) = ev {
match event.kind {
EventKind::Create(create_kind) => {
info!(
"Creating thumbnails {:?} create event kind: {:?}",
event.paths, create_kind
);
create_thumbnails();
}
EventKind::Modify(kind) => {
debug!("All modified paths: {:?}", event.paths);
debug!("Modify kind: {:?}", kind);
std::thread::sleep(Duration::from_secs(quick_interval_secs));
if let Some(orig) = event.paths.first() {
let image_base_path = PathBuf::from(env::var("BASE_PATH").unwrap());
let image_relative = orig.strip_prefix(&image_base_path).unwrap();
if let Ok(old_thumbnail) =
env::var("THUMBNAILS").map(PathBuf::from).map(|mut base| {
base.push(image_relative);
base
})
{
if let Err(e) = std::fs::remove_file(&old_thumbnail) {
error!(
"Error removing thumbnail: {}\n{}",
old_thumbnail.display(),
e
);
} else {
info!("Deleted moved thumbnail: {}", old_thumbnail.display());
let now = SystemTime::now();
let since_last_full = now
.duration_since(last_full_scan)
.unwrap_or(Duration::from_secs(0));
create_thumbnails();
}
}
}
}
let is_full_scan = since_last_full.as_secs() >= full_interval_secs;
EventKind::Remove(_) => {
update_media_counts(&PathBuf::from(env::var("BASE_PATH").unwrap()))
}
if is_full_scan {
info!("Running full scan (scan #{})", scan_count);
process_new_files(&base_path, Arc::clone(&exif_dao), None);
last_full_scan = now;
} else {
debug!(
"Running quick scan (checking files modified in last {} seconds)",
quick_interval_secs + 10
);
// Check files modified since last quick scan, plus 10 second buffer
let check_since = last_quick_scan
.checked_sub(Duration::from_secs(10))
.unwrap_or(last_quick_scan);
process_new_files(&base_path, Arc::clone(&exif_dao), Some(check_since));
}
_ => {}
}
};
last_quick_scan = now;
scan_count += 1;
// Update media counts
update_media_counts(&base_path);
}
});
}
fn process_new_files(
base_path: &Path,
exif_dao: Arc<Mutex<Box<dyn ExifDao>>>,
modified_since: Option<SystemTime>,
) {
let thumbs = dotenv::var("THUMBNAILS").expect("THUMBNAILS not defined");
let thumbnail_directory = Path::new(&thumbs);
// Collect all image and video files, optionally filtered by modification time
let files: Vec<(PathBuf, String)> = WalkDir::new(base_path)
.into_iter()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.file_type().is_file())
.filter(|entry| {
// Filter by modification time if specified
if let Some(since) = modified_since {
if let Ok(metadata) = entry.metadata() {
if let Ok(modified) = metadata.modified() {
return modified >= since;
}
}
// If we can't get metadata, include the file to be safe
return true;
}
true
})
.filter(|entry| is_image(entry) || is_video(entry))
.filter_map(|entry| {
let file_path = entry.path().to_path_buf();
let relative_path = file_path
.strip_prefix(base_path)
.ok()?
.to_str()?
.to_string();
Some((file_path, relative_path))
})
.collect();
if files.is_empty() {
debug!("No files to process");
return;
}
debug!("Found {} files to check", files.len());
// Batch query: Get all EXIF data for these files in one query
let file_paths: Vec<String> = files.iter().map(|(_, rel_path)| rel_path.clone()).collect();
let existing_exif_paths: HashMap<String, bool> = {
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
match dao.get_exif_batch(&file_paths) {
Ok(exif_records) => exif_records
.into_iter()
.map(|record| (record.file_path, true))
.collect(),
Err(e) => {
error!("Error batch querying EXIF data: {:?}", e);
HashMap::new()
}
}
};
let mut new_files_found = false;
let mut files_needing_exif = Vec::new();
// Check each file for missing thumbnail or EXIF data
for (file_path, relative_path) in files {
// Check if thumbnail exists
let thumb_path = thumbnail_directory.join(&relative_path);
let needs_thumbnail = !thumb_path.exists();
// Check if EXIF data exists (for supported files)
let needs_exif = if exif::supports_exif(&file_path) {
!existing_exif_paths.contains_key(&relative_path)
} else {
false
};
if needs_thumbnail || needs_exif {
new_files_found = true;
if needs_thumbnail {
info!("New file detected (missing thumbnail): {}", relative_path);
}
if needs_exif {
files_needing_exif.push((file_path, relative_path));
}
}
}
// Process EXIF data for files that need it
if !files_needing_exif.is_empty() {
info!(
"Processing EXIF data for {} files",
files_needing_exif.len()
);
for (file_path, relative_path) in files_needing_exif {
match exif::extract_exif_from_path(&file_path) {
Ok(exif_data) => {
let timestamp = Utc::now().timestamp();
let insert_exif = InsertImageExif {
file_path: relative_path.clone(),
camera_make: exif_data.camera_make,
camera_model: exif_data.camera_model,
lens_model: exif_data.lens_model,
width: exif_data.width,
height: exif_data.height,
orientation: exif_data.orientation,
gps_latitude: exif_data.gps_latitude,
gps_longitude: exif_data.gps_longitude,
gps_altitude: exif_data.gps_altitude,
focal_length: exif_data.focal_length,
aperture: exif_data.aperture,
shutter_speed: exif_data.shutter_speed,
iso: exif_data.iso,
date_taken: exif_data.date_taken,
created_time: timestamp,
last_modified: timestamp,
};
let mut dao = exif_dao.lock().expect("Unable to lock ExifDao");
if let Err(e) = dao.store_exif(insert_exif) {
error!("Failed to store EXIF data for {}: {:?}", relative_path, e);
} else {
debug!("EXIF data stored for {}", relative_path);
}
}
Err(e) => {
debug!(
"No EXIF data or error extracting from {}: {:?}",
file_path.display(),
e
);
}
}
}
}
// Generate thumbnails for all files that need them
if new_files_found {
info!("Processing thumbnails for new files...");
create_thumbnails();
}
}