//! Backfill `image_exif.content_hash` + `size_bytes` for rows that were //! ingested before hash computation was wired into the watcher. //! //! The watcher computes hashes for new files as they're ingested, so this //! binary is a one-shot tool for the historical backlog. Safe to re-run; //! only rows with NULL content_hash are processed. use std::path::Path; use std::sync::{Arc, Mutex}; use std::time::Instant; use clap::Parser; use log::{error, warn}; use rayon::prelude::*; use image_api::bin_progress; use image_api::content_hash; use image_api::database::{ExifDao, SqliteExifDao, connect}; use image_api::libraries::{self, Library}; #[derive(Parser, Debug)] #[command(name = "backfill_hashes")] #[command(about = "Compute content_hash for image_exif rows missing one")] struct Args { /// Max rows to hash per batch. The process loops until no rows remain. #[arg(long, default_value_t = 500)] batch_size: i64, /// Rayon parallelism override. 0 uses the default thread pool size. #[arg(long, default_value_t = 0)] parallelism: usize, /// Dry-run: log what would be hashed without writing to the DB. #[arg(long)] dry_run: bool, } fn main() -> anyhow::Result<()> { env_logger::init(); dotenv::dotenv().ok(); let args = Args::parse(); if args.parallelism > 0 { rayon::ThreadPoolBuilder::new() .num_threads(args.parallelism) .build_global() .expect("Unable to configure rayon thread pool"); } // Resolve libraries (patch placeholder if still unset) so we can map // library_id back to a root_path on disk. let base_path = dotenv::var("BASE_PATH").ok(); let mut seed_conn = connect(); if let Some(base) = base_path.as_deref() { libraries::seed_or_patch_from_env(&mut seed_conn, base); } let libs = libraries::load_all(&mut seed_conn); drop(seed_conn); if libs.is_empty() { anyhow::bail!("No libraries configured; cannot backfill hashes"); } let libs_by_id: std::collections::HashMap = libs.into_iter().map(|lib| (lib.id, lib)).collect(); println!( "Configured libraries: {}", libs_by_id .values() .map(|l| format!("{} -> {}", l.name, l.root_path)) .collect::>() .join(", ") ); let dao: Arc>> = Arc::new(Mutex::new(Box::new(SqliteExifDao::new()))); let ctx = opentelemetry::Context::new(); let mut total_hashed = 0u64; let mut total_missing = 0u64; let mut total_errors = 0u64; let start = Instant::now(); let pb = bin_progress::spinner("hashing"); loop { let rows = { let mut guard = dao.lock().expect("Unable to lock ExifDao"); guard .get_rows_missing_hash(&ctx, args.batch_size) .map_err(|e| anyhow::anyhow!("DB error: {:?}", e))? }; if rows.is_empty() { break; } let batch_size = rows.len(); pb.set_message(format!( "batch of {} (hashed={} missing={} errors={})", batch_size, total_hashed, total_missing, total_errors )); // Compute hashes in parallel (I/O-bound; rayon helps on local disks, // throttled by network on SMB mounts — use --parallelism to tune). let results: Vec<(i32, String, Option)> = rows .into_par_iter() .map(|(library_id, rel_path)| { let abs = libs_by_id .get(&library_id) .map(|lib| Path::new(&lib.root_path).join(&rel_path)); match abs { Some(abs_path) if abs_path.exists() => match content_hash::compute(&abs_path) { Ok(id) => (library_id, rel_path, Some(id)), Err(e) => { error!("hash error for {}: {:?}", abs_path.display(), e); (library_id, rel_path, None) } }, Some(_) => (library_id, rel_path, None), // file missing on disk None => { warn!("Row refers to unknown library_id {}", library_id); (library_id, rel_path, None) } } }) .collect(); // Persist sequentially — SQLite writes serialize anyway. if !args.dry_run { let mut guard = dao.lock().expect("Unable to lock ExifDao"); for (library_id, rel_path, ident) in &results { match ident { Some(id) => { match guard.backfill_content_hash( &ctx, *library_id, rel_path, &id.content_hash, id.size_bytes, ) { Ok(_) => { total_hashed += 1; pb.inc(1); } Err(e) => { pb.println(format!("persist error for {}: {:?}", rel_path, e)); total_errors += 1; } } } None => { total_missing += 1; } } } } else { for (_, rel_path, ident) in &results { match ident { Some(id) => { pb.println(format!( "[dry-run] {} -> {} ({} bytes)", rel_path, id.content_hash, id.size_bytes )); total_hashed += 1; pb.inc(1); } None => { total_missing += 1; } } } pb.println(format!( "[dry-run] processed one batch of {}. Stopping — a real run would continue \ until no NULL content_hash rows remain.", results.len() )); break; } } pb.finish_and_clear(); println!( "Done. hashed={}, skipped (missing on disk)={}, errors={}, elapsed={:.1}s", total_hashed, total_missing, total_errors, start.elapsed().as_secs_f64() ); Ok(()) }