006-bin-cleanup-and-progress #57

Merged
cameron merged 2 commits from 006-bin-cleanup-and-progress into master 2026-04-27 20:28:33 +00:00
13 changed files with 362 additions and 350 deletions

View File

@@ -69,9 +69,6 @@ cargo fix
```bash
# Two-phase cleanup: resolve missing files and validate file types
cargo run --bin cleanup_files -- --base-path /path/to/media --database-url ./database.db
# Batch extract EXIF for existing files
cargo run --bin migrate_exif
```
## Architecture Overview

49
Cargo.lock generated
View File

@@ -808,6 +808,19 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "console"
version = "0.15.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "054ccb5b10f9f2cbf51eb355ca1d05c2d279ce1804688d0db74b4733a5aeafd8"
dependencies = [
"encode_unicode",
"libc",
"once_cell",
"unicode-width",
"windows-sys 0.59.0",
]
[[package]]
name = "constant_time_eq"
version = "0.4.2"
@@ -1150,6 +1163,12 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]]
name = "encode_unicode"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0"
[[package]]
name = "encoding_rs"
version = "0.8.35"
@@ -1920,6 +1939,7 @@ dependencies = [
"futures",
"ical",
"image",
"indicatif",
"infer",
"jsonwebtoken",
"kamadak-exif",
@@ -1980,6 +2000,19 @@ dependencies = [
"hashbrown 0.15.5",
]
[[package]]
name = "indicatif"
version = "0.17.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235"
dependencies = [
"console",
"number_prefix",
"portable-atomic",
"unicode-width",
"web-time",
]
[[package]]
name = "infer"
version = "0.16.0"
@@ -2451,6 +2484,12 @@ dependencies = [
"autocfg",
]
[[package]]
name = "number_prefix"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "object"
version = "0.36.7"
@@ -4333,6 +4372,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "web-time"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "weezl"
version = "0.1.12"

View File

@@ -60,3 +60,4 @@ scraper = "0.20"
base64 = "0.22"
blake3 = "1.5"
async-trait = "0.1"
indicatif = "0.17"

View File

@@ -10,8 +10,10 @@ use std::sync::{Arc, Mutex};
use std::time::Instant;
use clap::Parser;
use log::{error, warn};
use rayon::prelude::*;
use image_api::bin_progress;
use image_api::content_hash;
use image_api::database::{ExifDao, SqliteExifDao, connect};
use image_api::libraries::{self, Library};
@@ -76,6 +78,8 @@ fn main() -> anyhow::Result<()> {
let mut total_errors = 0u64;
let start = Instant::now();
let pb = bin_progress::spinner("hashing");
loop {
let rows = {
let mut guard = dao.lock().expect("Unable to lock ExifDao");
@@ -86,7 +90,11 @@ fn main() -> anyhow::Result<()> {
if rows.is_empty() {
break;
}
println!("Processing batch of {} rows", rows.len());
let batch_size = rows.len();
pb.set_message(format!(
"batch of {} (hashed={} missing={} errors={})",
batch_size, total_hashed, total_missing, total_errors
));
// Compute hashes in parallel (I/O-bound; rayon helps on local disks,
// throttled by network on SMB mounts — use --parallelism to tune).
@@ -100,13 +108,13 @@ fn main() -> anyhow::Result<()> {
Some(abs_path) if abs_path.exists() => match content_hash::compute(&abs_path) {
Ok(id) => (library_id, rel_path, Some(id)),
Err(e) => {
eprintln!("hash error for {}: {:?}", abs_path.display(), e);
error!("hash error for {}: {:?}", abs_path.display(), e);
(library_id, rel_path, None)
}
},
Some(_) => (library_id, rel_path, None), // file missing on disk
None => {
eprintln!("Row refers to unknown library_id {}", library_id);
warn!("Row refers to unknown library_id {}", library_id);
(library_id, rel_path, None)
}
}
@@ -126,9 +134,12 @@ fn main() -> anyhow::Result<()> {
&id.content_hash,
id.size_bytes,
) {
Ok(_) => total_hashed += 1,
Ok(_) => {
total_hashed += 1;
pb.inc(1);
}
Err(e) => {
eprintln!("persist error for {}: {:?}", rel_path, e);
pb.println(format!("persist error for {}: {:?}", rel_path, e));
total_errors += 1;
}
}
@@ -142,34 +153,28 @@ fn main() -> anyhow::Result<()> {
for (_, rel_path, ident) in &results {
match ident {
Some(id) => {
println!(
pb.println(format!(
"[dry-run] {} -> {} ({} bytes)",
rel_path, id.content_hash, id.size_bytes
);
));
total_hashed += 1;
pb.inc(1);
}
None => {
total_missing += 1;
}
}
}
println!(
pb.println(format!(
"[dry-run] processed one batch of {}. Stopping — a real run would continue \
until no NULL content_hash rows remain.",
results.len()
);
));
break;
}
let elapsed = start.elapsed().as_secs_f64().max(0.001);
let rate = total_hashed as f64 / elapsed;
println!(
" hashed={} missing={} errors={} ({:.1} files/sec)",
total_hashed, total_missing, total_errors, rate
);
}
println!();
pb.finish_and_clear();
println!(
"Done. hashed={}, skipped (missing on disk)={}, errors={}, elapsed={:.1}s",
total_hashed,

View File

@@ -2,10 +2,10 @@ use anyhow::{Context, Result};
use chrono::Utc;
use clap::Parser;
use image_api::ai::ollama::OllamaClient;
use image_api::bin_progress;
use image_api::database::calendar_dao::{InsertCalendarEvent, SqliteCalendarEventDao};
use image_api::parsers::ical_parser::parse_ics_file;
use log::{error, info};
use std::sync::{Arc, Mutex};
// Import the trait to use its methods
use image_api::database::CalendarEventDao;
@@ -64,9 +64,11 @@ async fn main() -> Result<()> {
None
};
let inserted_count = Arc::new(Mutex::new(0));
let skipped_count = Arc::new(Mutex::new(0));
let error_count = Arc::new(Mutex::new(0));
let mut inserted_count = 0usize;
let mut skipped_count = 0usize;
let mut error_count = 0usize;
let pb = bin_progress::determinate(events.len() as u64, "importing");
// Process events in batches
// Can't use rayon with async, so process sequentially
@@ -82,7 +84,8 @@ async fn main() -> Result<()> {
)
&& exists
{
*skipped_count.lock().unwrap() += 1;
skipped_count += 1;
pb.inc(1);
continue;
}
@@ -101,10 +104,7 @@ async fn main() -> Result<()> {
}) {
Ok(emb) => Some(emb),
Err(e) => {
error!(
"Failed to generate embedding for event '{}': {}",
event.summary, e
);
pb.println(format!("embedding failed for '{}': {}", event.summary, e));
None
}
}
@@ -133,28 +133,26 @@ async fn main() -> Result<()> {
};
match dao_instance.store_event(&context, insert_event) {
Ok(_) => {
*inserted_count.lock().unwrap() += 1;
if *inserted_count.lock().unwrap() % 100 == 0 {
info!("Imported {} events...", *inserted_count.lock().unwrap());
}
}
Ok(_) => inserted_count += 1,
Err(e) => {
error!("Failed to store event '{}': {:?}", event.summary, e);
*error_count.lock().unwrap() += 1;
pb.println(format!("store failed for '{}': {:?}", event.summary, e));
error_count += 1;
}
}
pb.set_message(format!(
"inserted={} skipped={} errors={}",
inserted_count, skipped_count, error_count
));
pb.inc(1);
}
let final_inserted = *inserted_count.lock().unwrap();
let final_skipped = *skipped_count.lock().unwrap();
let final_errors = *error_count.lock().unwrap();
pb.finish_and_clear();
info!("\n=== Import Summary ===");
info!("=== Import Summary ===");
info!("Total events found: {}", events.len());
info!("Successfully inserted: {}", final_inserted);
info!("Skipped (already exist): {}", final_skipped);
info!("Errors: {}", final_errors);
info!("Successfully inserted: {}", inserted_count);
info!("Skipped (already exist): {}", skipped_count);
info!("Errors: {}", error_count);
if args.generate_embeddings {
info!("Embeddings were generated for semantic search");
@@ -162,5 +160,12 @@ async fn main() -> Result<()> {
info!("No embeddings generated (use --generate-embeddings to enable semantic search)");
}
if error_count > 0 {
error!(
"Completed with {} errors — review log output above",
error_count
);
}
Ok(())
}

View File

@@ -1,6 +1,7 @@
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Parser;
use image_api::bin_progress;
use image_api::database::location_dao::{InsertLocationRecord, SqliteLocationHistoryDao};
use image_api::parsers::location_json_parser::parse_location_json;
use log::{error, info};
@@ -38,23 +39,20 @@ async fn main() -> Result<()> {
let context = opentelemetry::Context::current();
let mut inserted_count = 0;
let mut skipped_count = 0;
let mut error_count = 0;
let mut inserted_count = 0usize;
let mut skipped_count = 0usize;
let mut error_count = 0usize;
let mut dao_instance = SqliteLocationHistoryDao::new();
let created_at = Utc::now().timestamp();
// Process in batches using batch insert for massive speedup
for (batch_idx, chunk) in locations.chunks(args.batch_size).enumerate() {
info!(
"Processing batch {} ({} records)...",
batch_idx + 1,
chunk.len()
);
let pb = bin_progress::determinate(locations.len() as u64, "importing");
// Process in batches using batch insert for massive speedup
for chunk in locations.chunks(args.batch_size) {
// Convert to InsertLocationRecord
let mut batch_inserts = Vec::with_capacity(chunk.len());
let mut chunk_skipped = 0usize;
for location in chunk {
// Skip existing check if requested (makes import much slower)
@@ -68,6 +66,7 @@ async fn main() -> Result<()> {
&& exists
{
skipped_count += 1;
chunk_skipped += 1;
continue;
}
@@ -89,26 +88,35 @@ async fn main() -> Result<()> {
// Batch insert entire chunk in single transaction
if !batch_inserts.is_empty() {
match dao_instance.store_locations_batch(&context, batch_inserts) {
Ok(count) => {
inserted_count += count;
info!(
"Imported {} locations (total: {})...",
count, inserted_count
);
}
Ok(count) => inserted_count += count,
Err(e) => {
error!("Failed to store batch: {:?}", e);
error_count += chunk.len();
}
pb.println(format!("batch insert failed: {:?}", e));
error_count += chunk.len() - chunk_skipped;
}
}
}
info!("\n=== Import Summary ===");
pb.set_message(format!(
"inserted={} skipped={} errors={}",
inserted_count, skipped_count, error_count
));
pb.inc(chunk.len() as u64);
}
pb.finish_and_clear();
info!("=== Import Summary ===");
info!("Total locations found: {}", locations.len());
info!("Successfully inserted: {}", inserted_count);
info!("Skipped (already exist): {}", skipped_count);
info!("Errors: {}", error_count);
if error_count > 0 {
error!(
"Completed with {} errors — review log output above",
error_count
);
}
Ok(())
}

View File

@@ -2,9 +2,10 @@ use anyhow::{Context, Result};
use chrono::Utc;
use clap::Parser;
use image_api::ai::ollama::OllamaClient;
use image_api::bin_progress;
use image_api::database::search_dao::{InsertSearchRecord, SqliteSearchHistoryDao};
use image_api::parsers::search_html_parser::parse_search_html;
use log::{error, info, warn};
use log::{error, info};
// Import the trait to use its methods
use image_api::database::SearchHistoryDao;
@@ -49,24 +50,22 @@ async fn main() -> Result<()> {
let ollama = OllamaClient::new(primary_url, fallback_url, primary_model, fallback_model);
let context = opentelemetry::Context::current();
let mut inserted_count = 0;
let mut skipped_count = 0;
let mut error_count = 0;
let mut inserted_count = 0usize;
let mut skipped_count = 0usize;
let mut error_count = 0usize;
let mut dao_instance = SqliteSearchHistoryDao::new();
let created_at = Utc::now().timestamp();
let pb = bin_progress::determinate(searches.len() as u64, "importing");
let total_batches = searches.len().div_ceil(args.batch_size);
// Process searches in batches (embeddings are REQUIRED for searches)
for (batch_idx, chunk) in searches.chunks(args.batch_size).enumerate() {
info!(
"Processing batch {} ({} searches)...",
batch_idx + 1,
chunk.len()
);
// Generate embeddings for this batch
let queries: Vec<String> = chunk.iter().map(|s| s.query.clone()).collect();
let pb_for_warn = pb.clone();
let embeddings_result = tokio::task::spawn({
let ollama_client = ollama.clone();
async move {
@@ -76,7 +75,7 @@ async fn main() -> Result<()> {
match ollama_client.generate_embedding(query).await {
Ok(emb) => embeddings.push(Some(emb)),
Err(e) => {
warn!("Failed to generate embedding for query '{}': {}", query, e);
pb_for_warn.println(format!("embedding failed for '{}': {}", query, e));
embeddings.push(None);
}
}
@@ -112,10 +111,7 @@ async fn main() -> Result<()> {
source_file: Some(args.path.clone()),
});
} else {
error!(
"Skipping search '{}' due to missing embedding",
search.query
);
pb.println(format!("skipping '{}' — missing embedding", search.query));
error_count += 1;
}
}
@@ -123,30 +119,41 @@ async fn main() -> Result<()> {
// Batch insert entire chunk in single transaction
if !batch_inserts.is_empty() {
match dao_instance.store_searches_batch(&context, batch_inserts) {
Ok(count) => {
inserted_count += count;
info!("Imported {} searches (total: {})...", count, inserted_count);
}
Ok(count) => inserted_count += count,
Err(e) => {
error!("Failed to store batch: {:?}", e);
pb.println(format!("batch insert failed: {:?}", e));
error_count += chunk.len();
}
}
}
pb.set_message(format!(
"inserted={} skipped={} errors={}",
inserted_count, skipped_count, error_count
));
pb.inc(chunk.len() as u64);
// Rate limiting between batches
if batch_idx < searches.len() / args.batch_size {
info!("Waiting 500ms before next batch...");
if batch_idx + 1 < total_batches {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
}
info!("\n=== Import Summary ===");
pb.finish_and_clear();
info!("=== Import Summary ===");
info!("Total searches found: {}", searches.len());
info!("Successfully inserted: {}", inserted_count);
info!("Skipped (already exist): {}", skipped_count);
info!("Errors: {}", error_count);
info!("All imported searches have embeddings for semantic search");
if error_count > 0 {
error!(
"Completed with {} errors — review log output above",
error_count
);
}
Ok(())
}

View File

@@ -1,198 +0,0 @@
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use chrono::Utc;
use clap::Parser;
use rayon::prelude::*;
use walkdir::WalkDir;
use image_api::database::models::InsertImageExif;
use image_api::database::{ExifDao, SqliteExifDao};
use image_api::exif;
#[derive(Parser, Debug)]
#[command(name = "migrate_exif")]
#[command(about = "Extract and store EXIF data from images", long_about = None)]
struct Args {
#[arg(long, help = "Skip files that already have EXIF data in database")]
skip_existing: bool,
}
fn main() -> anyhow::Result<()> {
env_logger::init();
dotenv::dotenv()?;
let args = Args::parse();
let base_path = dotenv::var("BASE_PATH")?;
let base = PathBuf::from(&base_path);
println!("EXIF Migration Tool");
println!("===================");
println!("Base path: {}", base.display());
if args.skip_existing {
println!("Mode: Skip existing (incremental)");
} else {
println!("Mode: Upsert (insert new, update existing)");
}
println!();
// Collect all image files that support EXIF
println!("Scanning for images...");
let image_files: Vec<PathBuf> = WalkDir::new(&base)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.filter(|e| exif::supports_exif(e.path()))
.map(|e| e.path().to_path_buf())
.collect();
println!("Found {} images to process", image_files.len());
if image_files.is_empty() {
println!("No EXIF-supporting images found. Exiting.");
return Ok(());
}
println!();
println!("Extracting EXIF data...");
// Create a thread-safe DAO
let dao = Arc::new(Mutex::new(SqliteExifDao::new()));
// Process in parallel using rayon
let results: Vec<_> = image_files
.par_iter()
.map(|path| {
// Create context for this processing iteration
let context = opentelemetry::Context::new();
let relative_path = match path.strip_prefix(&base) {
Ok(p) => p.to_str().unwrap().replace('\\', "/"),
Err(_) => {
eprintln!(
"Error: Could not create relative path for {}",
path.display()
);
return Err(anyhow::anyhow!("Path error"));
}
};
// Check if EXIF data already exists
let existing = if let Ok(mut dao_lock) = dao.lock() {
dao_lock.get_exif(&context, &relative_path).ok().flatten()
} else {
eprintln!("{} - Failed to acquire database lock", relative_path);
return Err(anyhow::anyhow!("Lock error"));
};
// Skip if exists and skip_existing flag is set
if args.skip_existing && existing.is_some() {
return Ok(("skip".to_string(), relative_path));
}
match exif::extract_exif_from_path(path) {
Ok(exif_data) => {
let timestamp = Utc::now().timestamp();
let insert_exif = InsertImageExif {
library_id: image_api::libraries::PRIMARY_LIBRARY_ID,
file_path: relative_path.clone(),
camera_make: exif_data.camera_make,
camera_model: exif_data.camera_model,
lens_model: exif_data.lens_model,
width: exif_data.width,
height: exif_data.height,
orientation: exif_data.orientation,
gps_latitude: exif_data.gps_latitude.map(|v| v as f32),
gps_longitude: exif_data.gps_longitude.map(|v| v as f32),
gps_altitude: exif_data.gps_altitude.map(|v| v as f32),
focal_length: exif_data.focal_length.map(|v| v as f32),
aperture: exif_data.aperture.map(|v| v as f32),
shutter_speed: exif_data.shutter_speed,
iso: exif_data.iso,
date_taken: exif_data.date_taken,
created_time: existing
.as_ref()
.map(|e| e.created_time)
.unwrap_or(timestamp),
last_modified: timestamp,
content_hash: None,
size_bytes: None,
};
// Store or update in database
if let Ok(mut dao_lock) = dao.lock() {
let result = if existing.is_some() {
// Update existing record
dao_lock
.update_exif(&context, insert_exif)
.map(|_| "update")
} else {
// Insert new record
dao_lock.store_exif(&context, insert_exif).map(|_| "insert")
};
match result {
Ok(action) => {
if action == "update" {
println!("{} (updated)", relative_path);
} else {
println!("{} (inserted)", relative_path);
}
Ok((action.to_string(), relative_path))
}
Err(e) => {
eprintln!("{} - Database error: {:?}", relative_path, e);
Err(anyhow::anyhow!("Database error"))
}
}
} else {
eprintln!("{} - Failed to acquire database lock", relative_path);
Err(anyhow::anyhow!("Lock error"))
}
}
Err(e) => {
eprintln!("{} - No EXIF data: {:?}", relative_path, e);
Err(e)
}
}
})
.collect();
// Count results
let mut success_count = 0;
let mut inserted_count = 0;
let mut updated_count = 0;
let mut skipped_count = 0;
for (action, _) in results.iter().flatten() {
success_count += 1;
match action.as_str() {
"insert" => inserted_count += 1,
"update" => updated_count += 1,
"skip" => skipped_count += 1,
_ => {}
}
}
let error_count = results.len() - success_count - skipped_count;
println!();
println!("===================");
println!("Migration complete!");
println!("Total images processed: {}", image_files.len());
if inserted_count > 0 {
println!(" New EXIF records inserted: {}", inserted_count);
}
if updated_count > 0 {
println!(" Existing records updated: {}", updated_count);
}
if skipped_count > 0 {
println!(" Skipped (already exists): {}", skipped_count);
}
if error_count > 0 {
println!(" Errors (no EXIF data or failures): {}", error_count);
}
Ok(())
}

View File

@@ -1,14 +1,17 @@
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use clap::Parser;
use log::warn;
use walkdir::WalkDir;
use image_api::ai::{InsightGenerator, OllamaClient, SmsApiClient};
use image_api::bin_progress;
use image_api::database::{
CalendarEventDao, DailySummaryDao, ExifDao, InsightDao, KnowledgeDao, LocationHistoryDao,
SearchHistoryDao, SqliteCalendarEventDao, SqliteDailySummaryDao, SqliteExifDao,
SqliteInsightDao, SqliteKnowledgeDao, SqliteLocationHistoryDao, SqliteSearchHistoryDao,
connect,
};
use image_api::file_types::{IMAGE_EXTENSIONS, VIDEO_EXTENSIONS};
use image_api::libraries::{self, Library};
@@ -20,7 +23,13 @@ use image_api::tags::{SqliteTagDao, TagDao};
about = "Batch populate the knowledge base by running the agentic insight loop over a folder"
)]
struct Args {
/// Directory to scan. Defaults to BASE_PATH from .env
/// Restrict to a single library by numeric id or name. Defaults to all
/// configured libraries.
#[arg(long)]
library: Option<String>,
/// Optional subdirectory to scan instead of full library roots. Must be
/// an absolute path under one of the selected libraries.
#[arg(long)]
path: Option<String>,
@@ -68,10 +77,57 @@ async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let base_path = dotenv::var("BASE_PATH")?;
let scan_path = args.path.as_deref().unwrap_or(&base_path).to_string();
// Load libraries from the DB. Patch the placeholder row from BASE_PATH
// first when present so a fresh install still gets a valid root.
let env_base_path = dotenv::var("BASE_PATH").ok();
let mut seed_conn = connect();
if let Some(base) = env_base_path.as_deref() {
libraries::seed_or_patch_from_env(&mut seed_conn, base);
}
let all_libs = libraries::load_all(&mut seed_conn);
drop(seed_conn);
if all_libs.is_empty() {
anyhow::bail!("No libraries configured");
}
// Ollama config from env with CLI overrides
// Resolve --library to a concrete subset.
let selected_libs: Vec<Library> = match args.library.as_deref() {
None => all_libs.clone(),
Some(raw) => {
let raw = raw.trim();
let matched = if let Ok(id) = raw.parse::<i32>() {
all_libs.iter().find(|l| l.id == id).cloned()
} else {
all_libs.iter().find(|l| l.name == raw).cloned()
};
match matched {
Some(lib) => vec![lib],
None => anyhow::bail!("Unknown library: {}", raw),
}
}
};
// Resolve --path to (target_library, walk_root). When provided, the path
// must live under exactly one of the selected libraries.
let scan_targets: Vec<(Library, PathBuf)> = match args.path.as_deref() {
None => selected_libs
.iter()
.map(|lib| (lib.clone(), PathBuf::from(&lib.root_path)))
.collect(),
Some(raw) => {
let abs = PathBuf::from(raw);
let matched = selected_libs
.iter()
.find(|lib| abs.starts_with(&lib.root_path))
.cloned();
match matched {
Some(lib) => vec![(lib, abs)],
None => anyhow::bail!("--path {} is not under any selected library root", raw),
}
}
};
// Ollama config from env with CLI overrides.
let primary_url = std::env::var("OLLAMA_PRIMARY_URL")
.or_else(|_| std::env::var("OLLAMA_URL"))
.unwrap_or_else(|_| "http://localhost:11434".to_string());
@@ -108,7 +164,6 @@ async fn main() -> anyhow::Result<()> {
let sms_api_token = std::env::var("SMS_API_TOKEN").ok();
let sms_client = SmsApiClient::new(sms_api_url, sms_api_token);
// Wire up all DAOs
let insight_dao: Arc<Mutex<Box<dyn InsightDao>>> =
Arc::new(Mutex::new(Box::new(SqliteInsightDao::new())));
let exif_dao: Arc<Mutex<Box<dyn ExifDao>>> =
@@ -126,12 +181,9 @@ async fn main() -> anyhow::Result<()> {
let knowledge_dao: Arc<Mutex<Box<dyn KnowledgeDao>>> =
Arc::new(Mutex::new(Box::new(SqliteKnowledgeDao::new())));
let populate_lib = Library {
id: libraries::PRIMARY_LIBRARY_ID,
name: "main".to_string(),
root_path: base_path.clone(),
};
// Pass the full library set so `resolve_full_path` probes every root,
// even when --library restricts the walk. A rel_path shared across
// libraries will resolve against the first existing match.
let generator = InsightGenerator::new(
ollama,
None,
@@ -144,12 +196,15 @@ async fn main() -> anyhow::Result<()> {
search_dao,
tag_dao,
knowledge_dao,
vec![populate_lib],
all_libs.clone(),
);
println!("Knowledge Base Population");
println!("=========================");
println!("Scan path: {}", scan_path);
for (lib, root) in &scan_targets {
println!("Library: {} (id={})", lib.name, lib.id);
println!("Scan root: {}", root.display());
}
println!("Model: {}", primary_model);
println!("Max iterations: {}", args.max_iterations);
println!("Timeout: {}s", args.timeout_secs);
@@ -178,30 +233,56 @@ async fn main() -> anyhow::Result<()> {
);
println!();
// Collect all image and video files
let all_extensions: Vec<&str> = IMAGE_EXTENSIONS
.iter()
.chain(VIDEO_EXTENSIONS.iter())
.copied()
.collect();
println!("Scanning {}...", scan_path);
let files: Vec<PathBuf> = WalkDir::new(&scan_path)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.filter(|e| {
e.path()
// Collect (library, abs_path, rel_path) for every media file across all
// scan targets so the progress counter spans the full job.
let mut files: Vec<(Library, PathBuf, String)> = Vec::new();
for (lib, walk_root) in &scan_targets {
let lib_root = Path::new(&lib.root_path);
let scan_pb = bin_progress::spinner(format!("scanning {}", walk_root.display()));
let count_before = files.len();
for entry in WalkDir::new(walk_root).into_iter().filter_map(|e| e.ok()) {
if !entry.file_type().is_file() {
continue;
}
let abs_path = entry.path().to_path_buf();
let ext_ok = abs_path
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| all_extensions.contains(&ext.to_lowercase().as_str()))
.unwrap_or(false)
})
.map(|e| e.path().to_path_buf())
.collect();
.unwrap_or(false);
if !ext_ok {
continue;
}
let rel = match abs_path.strip_prefix(lib_root) {
Ok(p) => p.to_string_lossy().replace('\\', "/"),
Err(_) => {
warn!(
"{} is not under library root {}; skipping",
abs_path.display(),
lib_root.display()
);
continue;
}
};
files.push((lib.clone(), abs_path, rel));
scan_pb.inc(1);
}
let added = files.len() - count_before;
scan_pb.finish_with_message(format!(
"scanned {} ({} media files)",
walk_root.display(),
added
));
}
let total = files.len();
println!("Found {} files\n", total);
println!("\nTotal files to consider: {}\n", total);
if total == 0 {
println!("Nothing to process.");
@@ -213,35 +294,29 @@ async fn main() -> anyhow::Result<()> {
let mut skipped = 0usize;
let mut errors = 0usize;
for (i, path) in files.iter().enumerate() {
let relative = match path.strip_prefix(&base_path) {
Ok(p) => p.to_string_lossy().replace('\\', "/"),
Err(_) => path.to_string_lossy().replace('\\', "/"),
};
let pb = bin_progress::determinate(total as u64, "");
let prefix = format!("[{}/{}]", i + 1, total);
for (lib, _abs_path, relative) in files.iter() {
pb.set_message(format!("{}: {}", lib.name, relative));
// Check for existing insight unless --reprocess
if !args.reprocess {
let has_insight = insight_dao
.lock()
.unwrap()
.get_insight(&cx, &relative)
.get_insight(&cx, relative)
.unwrap_or(None)
.is_some();
if has_insight {
println!("{} skip {}", prefix, relative);
skipped += 1;
pb.inc(1);
continue;
}
}
println!("{} start {}", prefix, relative);
match generator
.generate_agentic_insight_for_photo(
&relative,
relative,
args.model.clone(),
None,
args.num_ctx,
@@ -256,17 +331,17 @@ async fn main() -> anyhow::Result<()> {
)
.await
{
Ok(_) => {
println!("{} done {}", prefix, relative);
processed += 1;
}
Ok(_) => processed += 1,
Err(e) => {
eprintln!("{} error {}{:?}", prefix, relative, e);
pb.println(format!("error {}: {}{:?}", lib.name, relative, e));
errors += 1;
}
}
pb.inc(1);
}
pb.finish_and_clear();
println!();
println!("=========================");
println!("Complete");

34
src/bin_progress.rs Normal file
View File

@@ -0,0 +1,34 @@
//! Shared progress-bar styling for the utility binaries. Centralised so every
//! `cargo run --bin ...` tool gets the same look and feel.
use indicatif::{ProgressBar, ProgressStyle};
const DETERMINATE_TEMPLATE: &str = "{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] \
{human_pos}/{human_len} ({percent}%) {per_sec} eta {eta} {msg}";
const SPINNER_TEMPLATE: &str = "{spinner:.green} [{elapsed_precise}] {human_pos} {per_sec} {msg}";
/// Determinate progress bar used when the total work is known up front.
pub fn determinate(total: u64, message: impl Into<String>) -> ProgressBar {
let pb = ProgressBar::new(total);
pb.set_style(
ProgressStyle::with_template(DETERMINATE_TEMPLATE)
.expect("hard-coded template parses")
.progress_chars("=> "),
);
pb.set_message(message.into());
pb
}
/// Spinner used for open-ended work (e.g. paginated DB scans that loop until
/// empty). Throughput is shown via `{per_sec}`; tick at a steady cadence so
/// it animates even when work is bursty.
pub fn spinner(message: impl Into<String>) -> ProgressBar {
let pb = ProgressBar::new_spinner();
pb.set_style(
ProgressStyle::with_template(SPINNER_TEMPLATE).expect("hard-coded template parses"),
);
pb.set_message(message.into());
pb.enable_steady_tick(std::time::Duration::from_millis(120));
pb
}

View File

@@ -1,8 +1,9 @@
use crate::bin_progress;
use crate::cleanup::database_updater::DatabaseUpdater;
use crate::cleanup::types::{CleanupConfig, CleanupStats};
use crate::file_types::IMAGE_EXTENSIONS;
use anyhow::Result;
use log::{error, warn};
use log::error;
use std::path::PathBuf;
// All supported image extensions to try
@@ -25,15 +26,17 @@ pub fn resolve_missing_files(
stats.files_checked = all_paths.len();
println!("Checking file existence...");
let mut missing_count = 0;
let mut resolved_count = 0;
let pb = bin_progress::determinate(stats.files_checked as u64, "checking");
for path_str in all_paths {
let full_path = config.base_path.join(&path_str);
// Check if file exists
if full_path.exists() {
pb.inc(1);
continue;
}
@@ -43,7 +46,7 @@ pub fn resolve_missing_files(
// Try to find the file with different extensions
match find_file_with_alternative_extension(&config.base_path, &path_str) {
Some(new_path_str) => {
println!(
pb.println(format!(
"{} → found as {}{}",
path_str,
new_path_str,
@@ -52,7 +55,7 @@ pub fn resolve_missing_files(
} else {
""
}
);
));
if !config.dry_run {
// Update database
@@ -71,11 +74,18 @@ pub fn resolve_missing_files(
}
}
None => {
warn!("{} → not found with any extension", path_str);
pb.println(format!("{} not found with any extension", path_str));
}
}
pb.set_message(format!(
"missing={} resolved={}",
missing_count, resolved_count
));
pb.inc(1);
}
pb.finish_and_clear();
println!("\nResults:");
println!("- Files checked: {}", stats.files_checked);
println!("- Missing files: {}", missing_count);

View File

@@ -1,7 +1,9 @@
use crate::bin_progress;
use crate::cleanup::database_updater::DatabaseUpdater;
use crate::cleanup::file_type_detector::{detect_file_type, should_rename};
use crate::cleanup::types::{CleanupConfig, CleanupStats};
use anyhow::Result;
use indicatif::ProgressBar;
use log::{error, warn};
use std::fs;
use std::path::{Path, PathBuf};
@@ -32,16 +34,20 @@ pub fn validate_file_types(
println!("Files found: {}\n", files.len());
stats.files_checked = files.len();
println!("Detecting file types...");
let mut mismatches_found = 0;
let mut files_renamed = 0;
let mut user_skipped = 0;
let pb = bin_progress::determinate(files.len() as u64, "detecting");
for file_path in files {
// Get current extension
let current_ext = match file_path.extension() {
Some(ext) => ext.to_str().unwrap_or(""),
None => continue, // Skip files without extensions
None => {
pb.inc(1);
continue;
}
};
// Detect actual file type
@@ -57,14 +63,15 @@ pub fn validate_file_types(
Ok(rel) => rel.to_str().unwrap_or(""),
Err(_) => {
error!("Failed to get relative path for {:?}", file_path);
pb.inc(1);
continue;
}
};
println!("\nFile type mismatch:");
println!(" Path: {}", relative_path);
println!(" Current: .{}", current_ext);
println!(" Actual: .{}", detected_ext);
pb.println(format!(
"mismatch: {} .{} → .{}",
relative_path, current_ext, detected_ext
));
// Calculate new path
let new_file_path = file_path.with_extension(&detected_ext);
@@ -72,6 +79,7 @@ pub fn validate_file_types(
Ok(rel) => rel.to_str().unwrap_or(""),
Err(_) => {
error!("Failed to get new relative path for {:?}", new_file_path);
pb.inc(1);
continue;
}
};
@@ -83,22 +91,26 @@ pub fn validate_file_types(
"Destination exists for {}: {}",
relative_path, new_relative_path
));
pb.inc(1);
continue;
}
// Determine if we should proceed
let should_proceed = if config.dry_run {
println!(" (dry-run mode - would rename to {})", new_relative_path);
pb.println(format!(
" (dry-run — would rename to {})",
new_relative_path
));
false
} else if skip_all {
println!(" Skipped (skip all)");
user_skipped += 1;
false
} else if auto_fix_all {
true
} else {
// Interactive prompt
match prompt_for_rename(new_relative_path) {
// Interactive prompt — suspend the bar so the prompt is visible.
let decision = pb.suspend(|| prompt_for_rename(new_relative_path, &pb));
match decision {
RenameDecision::Yes => true,
RenameDecision::No => {
user_skipped += 1;
@@ -120,8 +132,6 @@ pub fn validate_file_types(
// Rename the file
match fs::rename(&file_path, &new_file_path) {
Ok(_) => {
println!("✓ Renamed file");
// Update database
match db_updater.update_file_path(relative_path, new_relative_path)
{
@@ -160,8 +170,15 @@ pub fn validate_file_types(
warn!("Failed to detect type for {:?}: {:?}", file_path, e);
}
}
pb.set_message(format!(
"mismatches={} renamed={} skipped={}",
mismatches_found, files_renamed, user_skipped
));
pb.inc(1);
}
pb.finish_and_clear();
println!("\nResults:");
println!("- Files scanned: {}", stats.files_checked);
println!("- Mismatches found: {}", mismatches_found);
@@ -195,8 +212,9 @@ enum RenameDecision {
SkipAll,
}
/// Prompt the user for rename decision
fn prompt_for_rename(new_path: &str) -> RenameDecision {
/// Prompt the user for rename decision. Caller must `pb.suspend` so the
/// progress bar isn't redrawing over the prompt.
fn prompt_for_rename(new_path: &str, _pb: &ProgressBar) -> RenameDecision {
println!("\nRename to {}?", new_path);
println!(" [y] Yes");
println!(" [n] No (default)");

View File

@@ -5,6 +5,7 @@ extern crate diesel;
pub mod ai;
pub mod auth;
pub mod bin_progress;
pub mod cleanup;
pub mod content_hash;
pub mod data;