feat(bins): multi-library populate_knowledge + progress UX

populate_knowledge now loads real libraries from the DB instead of
fabricating a single library_id=1 row from BASE_PATH. Adds --library
<id|name> to restrict the walk and validates --path against the selected
library roots. The full library set is still passed to InsightGenerator so
resolve_full_path can probe every root when an insight resolves to a
different library than the one being walked.

Adds indicatif progress bars across the long-running utility binaries via
a shared src/bin_progress.rs helper (determinate bar + open-ended spinner
with consistent styling). Per-batch info! noise is replaced by the bar's
throughput/ETA; warnings and errors route through pb.println so they
scroll above the bar instead of fighting with it.

  populate_knowledge   spinner during scan, determinate bar over all libs
  backfill_hashes      spinner with running hashed/missing/errors counts
  import_calendar      determinate bar; embedding/store failures inline
  import_location_*    determinate bar advancing by chunk size
  import_search_*      determinate bar; pb cloned into the spawn task
  cleanup_files P1     determinate bar over DB paths
  cleanup_files P2     determinate bar; pb.suspend() around y/n/a/s prompt

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Cameron
2026-04-26 23:55:33 -04:00
parent d5f944c7b6
commit b9d5578653
11 changed files with 362 additions and 149 deletions

49
Cargo.lock generated
View File

@@ -808,6 +808,19 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "console"
version = "0.15.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "054ccb5b10f9f2cbf51eb355ca1d05c2d279ce1804688d0db74b4733a5aeafd8"
dependencies = [
"encode_unicode",
"libc",
"once_cell",
"unicode-width",
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "constant_time_eq" name = "constant_time_eq"
version = "0.4.2" version = "0.4.2"
@@ -1150,6 +1163,12 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]]
name = "encode_unicode"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0"
[[package]] [[package]]
name = "encoding_rs" name = "encoding_rs"
version = "0.8.35" version = "0.8.35"
@@ -1920,6 +1939,7 @@ dependencies = [
"futures", "futures",
"ical", "ical",
"image", "image",
"indicatif",
"infer", "infer",
"jsonwebtoken", "jsonwebtoken",
"kamadak-exif", "kamadak-exif",
@@ -1980,6 +2000,19 @@ dependencies = [
"hashbrown 0.15.5", "hashbrown 0.15.5",
] ]
[[package]]
name = "indicatif"
version = "0.17.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235"
dependencies = [
"console",
"number_prefix",
"portable-atomic",
"unicode-width",
"web-time",
]
[[package]] [[package]]
name = "infer" name = "infer"
version = "0.16.0" version = "0.16.0"
@@ -2451,6 +2484,12 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "number_prefix"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]] [[package]]
name = "object" name = "object"
version = "0.36.7" version = "0.36.7"
@@ -4333,6 +4372,16 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "web-time"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]] [[package]]
name = "weezl" name = "weezl"
version = "0.1.12" version = "0.1.12"

View File

@@ -60,3 +60,4 @@ scraper = "0.20"
base64 = "0.22" base64 = "0.22"
blake3 = "1.5" blake3 = "1.5"
async-trait = "0.1" async-trait = "0.1"
indicatif = "0.17"

View File

@@ -10,8 +10,10 @@ use std::sync::{Arc, Mutex};
use std::time::Instant; use std::time::Instant;
use clap::Parser; use clap::Parser;
use log::{error, warn};
use rayon::prelude::*; use rayon::prelude::*;
use image_api::bin_progress;
use image_api::content_hash; use image_api::content_hash;
use image_api::database::{ExifDao, SqliteExifDao, connect}; use image_api::database::{ExifDao, SqliteExifDao, connect};
use image_api::libraries::{self, Library}; use image_api::libraries::{self, Library};
@@ -76,6 +78,8 @@ fn main() -> anyhow::Result<()> {
let mut total_errors = 0u64; let mut total_errors = 0u64;
let start = Instant::now(); let start = Instant::now();
let pb = bin_progress::spinner("hashing");
loop { loop {
let rows = { let rows = {
let mut guard = dao.lock().expect("Unable to lock ExifDao"); let mut guard = dao.lock().expect("Unable to lock ExifDao");
@@ -86,7 +90,11 @@ fn main() -> anyhow::Result<()> {
if rows.is_empty() { if rows.is_empty() {
break; break;
} }
println!("Processing batch of {} rows", rows.len()); let batch_size = rows.len();
pb.set_message(format!(
"batch of {} (hashed={} missing={} errors={})",
batch_size, total_hashed, total_missing, total_errors
));
// Compute hashes in parallel (I/O-bound; rayon helps on local disks, // Compute hashes in parallel (I/O-bound; rayon helps on local disks,
// throttled by network on SMB mounts — use --parallelism to tune). // throttled by network on SMB mounts — use --parallelism to tune).
@@ -100,13 +108,13 @@ fn main() -> anyhow::Result<()> {
Some(abs_path) if abs_path.exists() => match content_hash::compute(&abs_path) { Some(abs_path) if abs_path.exists() => match content_hash::compute(&abs_path) {
Ok(id) => (library_id, rel_path, Some(id)), Ok(id) => (library_id, rel_path, Some(id)),
Err(e) => { Err(e) => {
eprintln!("hash error for {}: {:?}", abs_path.display(), e); error!("hash error for {}: {:?}", abs_path.display(), e);
(library_id, rel_path, None) (library_id, rel_path, None)
} }
}, },
Some(_) => (library_id, rel_path, None), // file missing on disk Some(_) => (library_id, rel_path, None), // file missing on disk
None => { None => {
eprintln!("Row refers to unknown library_id {}", library_id); warn!("Row refers to unknown library_id {}", library_id);
(library_id, rel_path, None) (library_id, rel_path, None)
} }
} }
@@ -126,9 +134,12 @@ fn main() -> anyhow::Result<()> {
&id.content_hash, &id.content_hash,
id.size_bytes, id.size_bytes,
) { ) {
Ok(_) => total_hashed += 1, Ok(_) => {
total_hashed += 1;
pb.inc(1);
}
Err(e) => { Err(e) => {
eprintln!("persist error for {}: {:?}", rel_path, e); pb.println(format!("persist error for {}: {:?}", rel_path, e));
total_errors += 1; total_errors += 1;
} }
} }
@@ -142,34 +153,28 @@ fn main() -> anyhow::Result<()> {
for (_, rel_path, ident) in &results { for (_, rel_path, ident) in &results {
match ident { match ident {
Some(id) => { Some(id) => {
println!( pb.println(format!(
"[dry-run] {} -> {} ({} bytes)", "[dry-run] {} -> {} ({} bytes)",
rel_path, id.content_hash, id.size_bytes rel_path, id.content_hash, id.size_bytes
); ));
total_hashed += 1; total_hashed += 1;
pb.inc(1);
} }
None => { None => {
total_missing += 1; total_missing += 1;
} }
} }
} }
println!( pb.println(format!(
"[dry-run] processed one batch of {}. Stopping — a real run would continue \ "[dry-run] processed one batch of {}. Stopping — a real run would continue \
until no NULL content_hash rows remain.", until no NULL content_hash rows remain.",
results.len() results.len()
); ));
break; break;
} }
let elapsed = start.elapsed().as_secs_f64().max(0.001);
let rate = total_hashed as f64 / elapsed;
println!(
" hashed={} missing={} errors={} ({:.1} files/sec)",
total_hashed, total_missing, total_errors, rate
);
} }
println!(); pb.finish_and_clear();
println!( println!(
"Done. hashed={}, skipped (missing on disk)={}, errors={}, elapsed={:.1}s", "Done. hashed={}, skipped (missing on disk)={}, errors={}, elapsed={:.1}s",
total_hashed, total_hashed,

View File

@@ -2,10 +2,10 @@ use anyhow::{Context, Result};
use chrono::Utc; use chrono::Utc;
use clap::Parser; use clap::Parser;
use image_api::ai::ollama::OllamaClient; use image_api::ai::ollama::OllamaClient;
use image_api::bin_progress;
use image_api::database::calendar_dao::{InsertCalendarEvent, SqliteCalendarEventDao}; use image_api::database::calendar_dao::{InsertCalendarEvent, SqliteCalendarEventDao};
use image_api::parsers::ical_parser::parse_ics_file; use image_api::parsers::ical_parser::parse_ics_file;
use log::{error, info}; use log::{error, info};
use std::sync::{Arc, Mutex};
// Import the trait to use its methods // Import the trait to use its methods
use image_api::database::CalendarEventDao; use image_api::database::CalendarEventDao;
@@ -64,9 +64,11 @@ async fn main() -> Result<()> {
None None
}; };
let inserted_count = Arc::new(Mutex::new(0)); let mut inserted_count = 0usize;
let skipped_count = Arc::new(Mutex::new(0)); let mut skipped_count = 0usize;
let error_count = Arc::new(Mutex::new(0)); let mut error_count = 0usize;
let pb = bin_progress::determinate(events.len() as u64, "importing");
// Process events in batches // Process events in batches
// Can't use rayon with async, so process sequentially // Can't use rayon with async, so process sequentially
@@ -82,7 +84,8 @@ async fn main() -> Result<()> {
) )
&& exists && exists
{ {
*skipped_count.lock().unwrap() += 1; skipped_count += 1;
pb.inc(1);
continue; continue;
} }
@@ -101,10 +104,7 @@ async fn main() -> Result<()> {
}) { }) {
Ok(emb) => Some(emb), Ok(emb) => Some(emb),
Err(e) => { Err(e) => {
error!( pb.println(format!("embedding failed for '{}': {}", event.summary, e));
"Failed to generate embedding for event '{}': {}",
event.summary, e
);
None None
} }
} }
@@ -133,28 +133,26 @@ async fn main() -> Result<()> {
}; };
match dao_instance.store_event(&context, insert_event) { match dao_instance.store_event(&context, insert_event) {
Ok(_) => { Ok(_) => inserted_count += 1,
*inserted_count.lock().unwrap() += 1;
if *inserted_count.lock().unwrap() % 100 == 0 {
info!("Imported {} events...", *inserted_count.lock().unwrap());
}
}
Err(e) => { Err(e) => {
error!("Failed to store event '{}': {:?}", event.summary, e); pb.println(format!("store failed for '{}': {:?}", event.summary, e));
*error_count.lock().unwrap() += 1; error_count += 1;
} }
} }
pb.set_message(format!(
"inserted={} skipped={} errors={}",
inserted_count, skipped_count, error_count
));
pb.inc(1);
} }
let final_inserted = *inserted_count.lock().unwrap(); pb.finish_and_clear();
let final_skipped = *skipped_count.lock().unwrap();
let final_errors = *error_count.lock().unwrap();
info!("\n=== Import Summary ==="); info!("=== Import Summary ===");
info!("Total events found: {}", events.len()); info!("Total events found: {}", events.len());
info!("Successfully inserted: {}", final_inserted); info!("Successfully inserted: {}", inserted_count);
info!("Skipped (already exist): {}", final_skipped); info!("Skipped (already exist): {}", skipped_count);
info!("Errors: {}", final_errors); info!("Errors: {}", error_count);
if args.generate_embeddings { if args.generate_embeddings {
info!("Embeddings were generated for semantic search"); info!("Embeddings were generated for semantic search");
@@ -162,5 +160,12 @@ async fn main() -> Result<()> {
info!("No embeddings generated (use --generate-embeddings to enable semantic search)"); info!("No embeddings generated (use --generate-embeddings to enable semantic search)");
} }
if error_count > 0 {
error!(
"Completed with {} errors — review log output above",
error_count
);
}
Ok(()) Ok(())
} }

View File

@@ -1,6 +1,7 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use chrono::Utc; use chrono::Utc;
use clap::Parser; use clap::Parser;
use image_api::bin_progress;
use image_api::database::location_dao::{InsertLocationRecord, SqliteLocationHistoryDao}; use image_api::database::location_dao::{InsertLocationRecord, SqliteLocationHistoryDao};
use image_api::parsers::location_json_parser::parse_location_json; use image_api::parsers::location_json_parser::parse_location_json;
use log::{error, info}; use log::{error, info};
@@ -38,23 +39,20 @@ async fn main() -> Result<()> {
let context = opentelemetry::Context::current(); let context = opentelemetry::Context::current();
let mut inserted_count = 0; let mut inserted_count = 0usize;
let mut skipped_count = 0; let mut skipped_count = 0usize;
let mut error_count = 0; let mut error_count = 0usize;
let mut dao_instance = SqliteLocationHistoryDao::new(); let mut dao_instance = SqliteLocationHistoryDao::new();
let created_at = Utc::now().timestamp(); let created_at = Utc::now().timestamp();
// Process in batches using batch insert for massive speedup let pb = bin_progress::determinate(locations.len() as u64, "importing");
for (batch_idx, chunk) in locations.chunks(args.batch_size).enumerate() {
info!(
"Processing batch {} ({} records)...",
batch_idx + 1,
chunk.len()
);
// Process in batches using batch insert for massive speedup
for chunk in locations.chunks(args.batch_size) {
// Convert to InsertLocationRecord // Convert to InsertLocationRecord
let mut batch_inserts = Vec::with_capacity(chunk.len()); let mut batch_inserts = Vec::with_capacity(chunk.len());
let mut chunk_skipped = 0usize;
for location in chunk { for location in chunk {
// Skip existing check if requested (makes import much slower) // Skip existing check if requested (makes import much slower)
@@ -68,6 +66,7 @@ async fn main() -> Result<()> {
&& exists && exists
{ {
skipped_count += 1; skipped_count += 1;
chunk_skipped += 1;
continue; continue;
} }
@@ -89,26 +88,35 @@ async fn main() -> Result<()> {
// Batch insert entire chunk in single transaction // Batch insert entire chunk in single transaction
if !batch_inserts.is_empty() { if !batch_inserts.is_empty() {
match dao_instance.store_locations_batch(&context, batch_inserts) { match dao_instance.store_locations_batch(&context, batch_inserts) {
Ok(count) => { Ok(count) => inserted_count += count,
inserted_count += count;
info!(
"Imported {} locations (total: {})...",
count, inserted_count
);
}
Err(e) => { Err(e) => {
error!("Failed to store batch: {:?}", e); pb.println(format!("batch insert failed: {:?}", e));
error_count += chunk.len(); error_count += chunk.len() - chunk_skipped;
} }
} }
} }
pb.set_message(format!(
"inserted={} skipped={} errors={}",
inserted_count, skipped_count, error_count
));
pb.inc(chunk.len() as u64);
} }
info!("\n=== Import Summary ==="); pb.finish_and_clear();
info!("=== Import Summary ===");
info!("Total locations found: {}", locations.len()); info!("Total locations found: {}", locations.len());
info!("Successfully inserted: {}", inserted_count); info!("Successfully inserted: {}", inserted_count);
info!("Skipped (already exist): {}", skipped_count); info!("Skipped (already exist): {}", skipped_count);
info!("Errors: {}", error_count); info!("Errors: {}", error_count);
if error_count > 0 {
error!(
"Completed with {} errors — review log output above",
error_count
);
}
Ok(()) Ok(())
} }

View File

@@ -2,9 +2,10 @@ use anyhow::{Context, Result};
use chrono::Utc; use chrono::Utc;
use clap::Parser; use clap::Parser;
use image_api::ai::ollama::OllamaClient; use image_api::ai::ollama::OllamaClient;
use image_api::bin_progress;
use image_api::database::search_dao::{InsertSearchRecord, SqliteSearchHistoryDao}; use image_api::database::search_dao::{InsertSearchRecord, SqliteSearchHistoryDao};
use image_api::parsers::search_html_parser::parse_search_html; use image_api::parsers::search_html_parser::parse_search_html;
use log::{error, info, warn}; use log::{error, info};
// Import the trait to use its methods // Import the trait to use its methods
use image_api::database::SearchHistoryDao; use image_api::database::SearchHistoryDao;
@@ -49,24 +50,22 @@ async fn main() -> Result<()> {
let ollama = OllamaClient::new(primary_url, fallback_url, primary_model, fallback_model); let ollama = OllamaClient::new(primary_url, fallback_url, primary_model, fallback_model);
let context = opentelemetry::Context::current(); let context = opentelemetry::Context::current();
let mut inserted_count = 0; let mut inserted_count = 0usize;
let mut skipped_count = 0; let mut skipped_count = 0usize;
let mut error_count = 0; let mut error_count = 0usize;
let mut dao_instance = SqliteSearchHistoryDao::new(); let mut dao_instance = SqliteSearchHistoryDao::new();
let created_at = Utc::now().timestamp(); let created_at = Utc::now().timestamp();
let pb = bin_progress::determinate(searches.len() as u64, "importing");
let total_batches = searches.len().div_ceil(args.batch_size);
// Process searches in batches (embeddings are REQUIRED for searches) // Process searches in batches (embeddings are REQUIRED for searches)
for (batch_idx, chunk) in searches.chunks(args.batch_size).enumerate() { for (batch_idx, chunk) in searches.chunks(args.batch_size).enumerate() {
info!(
"Processing batch {} ({} searches)...",
batch_idx + 1,
chunk.len()
);
// Generate embeddings for this batch // Generate embeddings for this batch
let queries: Vec<String> = chunk.iter().map(|s| s.query.clone()).collect(); let queries: Vec<String> = chunk.iter().map(|s| s.query.clone()).collect();
let pb_for_warn = pb.clone();
let embeddings_result = tokio::task::spawn({ let embeddings_result = tokio::task::spawn({
let ollama_client = ollama.clone(); let ollama_client = ollama.clone();
async move { async move {
@@ -76,7 +75,7 @@ async fn main() -> Result<()> {
match ollama_client.generate_embedding(query).await { match ollama_client.generate_embedding(query).await {
Ok(emb) => embeddings.push(Some(emb)), Ok(emb) => embeddings.push(Some(emb)),
Err(e) => { Err(e) => {
warn!("Failed to generate embedding for query '{}': {}", query, e); pb_for_warn.println(format!("embedding failed for '{}': {}", query, e));
embeddings.push(None); embeddings.push(None);
} }
} }
@@ -112,10 +111,7 @@ async fn main() -> Result<()> {
source_file: Some(args.path.clone()), source_file: Some(args.path.clone()),
}); });
} else { } else {
error!( pb.println(format!("skipping '{}' — missing embedding", search.query));
"Skipping search '{}' due to missing embedding",
search.query
);
error_count += 1; error_count += 1;
} }
} }
@@ -123,30 +119,41 @@ async fn main() -> Result<()> {
// Batch insert entire chunk in single transaction // Batch insert entire chunk in single transaction
if !batch_inserts.is_empty() { if !batch_inserts.is_empty() {
match dao_instance.store_searches_batch(&context, batch_inserts) { match dao_instance.store_searches_batch(&context, batch_inserts) {
Ok(count) => { Ok(count) => inserted_count += count,
inserted_count += count;
info!("Imported {} searches (total: {})...", count, inserted_count);
}
Err(e) => { Err(e) => {
error!("Failed to store batch: {:?}", e); pb.println(format!("batch insert failed: {:?}", e));
error_count += chunk.len(); error_count += chunk.len();
} }
} }
} }
pb.set_message(format!(
"inserted={} skipped={} errors={}",
inserted_count, skipped_count, error_count
));
pb.inc(chunk.len() as u64);
// Rate limiting between batches // Rate limiting between batches
if batch_idx < searches.len() / args.batch_size { if batch_idx + 1 < total_batches {
info!("Waiting 500ms before next batch...");
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
} }
} }
info!("\n=== Import Summary ==="); pb.finish_and_clear();
info!("=== Import Summary ===");
info!("Total searches found: {}", searches.len()); info!("Total searches found: {}", searches.len());
info!("Successfully inserted: {}", inserted_count); info!("Successfully inserted: {}", inserted_count);
info!("Skipped (already exist): {}", skipped_count); info!("Skipped (already exist): {}", skipped_count);
info!("Errors: {}", error_count); info!("Errors: {}", error_count);
info!("All imported searches have embeddings for semantic search"); info!("All imported searches have embeddings for semantic search");
if error_count > 0 {
error!(
"Completed with {} errors — review log output above",
error_count
);
}
Ok(()) Ok(())
} }

View File

@@ -1,14 +1,17 @@
use std::path::PathBuf; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use clap::Parser; use clap::Parser;
use log::warn;
use walkdir::WalkDir; use walkdir::WalkDir;
use image_api::ai::{InsightGenerator, OllamaClient, SmsApiClient}; use image_api::ai::{InsightGenerator, OllamaClient, SmsApiClient};
use image_api::bin_progress;
use image_api::database::{ use image_api::database::{
CalendarEventDao, DailySummaryDao, ExifDao, InsightDao, KnowledgeDao, LocationHistoryDao, CalendarEventDao, DailySummaryDao, ExifDao, InsightDao, KnowledgeDao, LocationHistoryDao,
SearchHistoryDao, SqliteCalendarEventDao, SqliteDailySummaryDao, SqliteExifDao, SearchHistoryDao, SqliteCalendarEventDao, SqliteDailySummaryDao, SqliteExifDao,
SqliteInsightDao, SqliteKnowledgeDao, SqliteLocationHistoryDao, SqliteSearchHistoryDao, SqliteInsightDao, SqliteKnowledgeDao, SqliteLocationHistoryDao, SqliteSearchHistoryDao,
connect,
}; };
use image_api::file_types::{IMAGE_EXTENSIONS, VIDEO_EXTENSIONS}; use image_api::file_types::{IMAGE_EXTENSIONS, VIDEO_EXTENSIONS};
use image_api::libraries::{self, Library}; use image_api::libraries::{self, Library};
@@ -20,7 +23,13 @@ use image_api::tags::{SqliteTagDao, TagDao};
about = "Batch populate the knowledge base by running the agentic insight loop over a folder" about = "Batch populate the knowledge base by running the agentic insight loop over a folder"
)] )]
struct Args { struct Args {
/// Directory to scan. Defaults to BASE_PATH from .env /// Restrict to a single library by numeric id or name. Defaults to all
/// configured libraries.
#[arg(long)]
library: Option<String>,
/// Optional subdirectory to scan instead of full library roots. Must be
/// an absolute path under one of the selected libraries.
#[arg(long)] #[arg(long)]
path: Option<String>, path: Option<String>,
@@ -68,10 +77,57 @@ async fn main() -> anyhow::Result<()> {
let args = Args::parse(); let args = Args::parse();
let base_path = dotenv::var("BASE_PATH")?; // Load libraries from the DB. Patch the placeholder row from BASE_PATH
let scan_path = args.path.as_deref().unwrap_or(&base_path).to_string(); // first when present so a fresh install still gets a valid root.
let env_base_path = dotenv::var("BASE_PATH").ok();
let mut seed_conn = connect();
if let Some(base) = env_base_path.as_deref() {
libraries::seed_or_patch_from_env(&mut seed_conn, base);
}
let all_libs = libraries::load_all(&mut seed_conn);
drop(seed_conn);
if all_libs.is_empty() {
anyhow::bail!("No libraries configured");
}
// Ollama config from env with CLI overrides // Resolve --library to a concrete subset.
let selected_libs: Vec<Library> = match args.library.as_deref() {
None => all_libs.clone(),
Some(raw) => {
let raw = raw.trim();
let matched = if let Ok(id) = raw.parse::<i32>() {
all_libs.iter().find(|l| l.id == id).cloned()
} else {
all_libs.iter().find(|l| l.name == raw).cloned()
};
match matched {
Some(lib) => vec![lib],
None => anyhow::bail!("Unknown library: {}", raw),
}
}
};
// Resolve --path to (target_library, walk_root). When provided, the path
// must live under exactly one of the selected libraries.
let scan_targets: Vec<(Library, PathBuf)> = match args.path.as_deref() {
None => selected_libs
.iter()
.map(|lib| (lib.clone(), PathBuf::from(&lib.root_path)))
.collect(),
Some(raw) => {
let abs = PathBuf::from(raw);
let matched = selected_libs
.iter()
.find(|lib| abs.starts_with(&lib.root_path))
.cloned();
match matched {
Some(lib) => vec![(lib, abs)],
None => anyhow::bail!("--path {} is not under any selected library root", raw),
}
}
};
// Ollama config from env with CLI overrides.
let primary_url = std::env::var("OLLAMA_PRIMARY_URL") let primary_url = std::env::var("OLLAMA_PRIMARY_URL")
.or_else(|_| std::env::var("OLLAMA_URL")) .or_else(|_| std::env::var("OLLAMA_URL"))
.unwrap_or_else(|_| "http://localhost:11434".to_string()); .unwrap_or_else(|_| "http://localhost:11434".to_string());
@@ -108,7 +164,6 @@ async fn main() -> anyhow::Result<()> {
let sms_api_token = std::env::var("SMS_API_TOKEN").ok(); let sms_api_token = std::env::var("SMS_API_TOKEN").ok();
let sms_client = SmsApiClient::new(sms_api_url, sms_api_token); let sms_client = SmsApiClient::new(sms_api_url, sms_api_token);
// Wire up all DAOs
let insight_dao: Arc<Mutex<Box<dyn InsightDao>>> = let insight_dao: Arc<Mutex<Box<dyn InsightDao>>> =
Arc::new(Mutex::new(Box::new(SqliteInsightDao::new()))); Arc::new(Mutex::new(Box::new(SqliteInsightDao::new())));
let exif_dao: Arc<Mutex<Box<dyn ExifDao>>> = let exif_dao: Arc<Mutex<Box<dyn ExifDao>>> =
@@ -126,12 +181,9 @@ async fn main() -> anyhow::Result<()> {
let knowledge_dao: Arc<Mutex<Box<dyn KnowledgeDao>>> = let knowledge_dao: Arc<Mutex<Box<dyn KnowledgeDao>>> =
Arc::new(Mutex::new(Box::new(SqliteKnowledgeDao::new()))); Arc::new(Mutex::new(Box::new(SqliteKnowledgeDao::new())));
let populate_lib = Library { // Pass the full library set so `resolve_full_path` probes every root,
id: libraries::PRIMARY_LIBRARY_ID, // even when --library restricts the walk. A rel_path shared across
name: "main".to_string(), // libraries will resolve against the first existing match.
root_path: base_path.clone(),
};
let generator = InsightGenerator::new( let generator = InsightGenerator::new(
ollama, ollama,
None, None,
@@ -144,12 +196,15 @@ async fn main() -> anyhow::Result<()> {
search_dao, search_dao,
tag_dao, tag_dao,
knowledge_dao, knowledge_dao,
vec![populate_lib], all_libs.clone(),
); );
println!("Knowledge Base Population"); println!("Knowledge Base Population");
println!("========================="); println!("=========================");
println!("Scan path: {}", scan_path); for (lib, root) in &scan_targets {
println!("Library: {} (id={})", lib.name, lib.id);
println!("Scan root: {}", root.display());
}
println!("Model: {}", primary_model); println!("Model: {}", primary_model);
println!("Max iterations: {}", args.max_iterations); println!("Max iterations: {}", args.max_iterations);
println!("Timeout: {}s", args.timeout_secs); println!("Timeout: {}s", args.timeout_secs);
@@ -178,30 +233,56 @@ async fn main() -> anyhow::Result<()> {
); );
println!(); println!();
// Collect all image and video files
let all_extensions: Vec<&str> = IMAGE_EXTENSIONS let all_extensions: Vec<&str> = IMAGE_EXTENSIONS
.iter() .iter()
.chain(VIDEO_EXTENSIONS.iter()) .chain(VIDEO_EXTENSIONS.iter())
.copied() .copied()
.collect(); .collect();
println!("Scanning {}...", scan_path); // Collect (library, abs_path, rel_path) for every media file across all
let files: Vec<PathBuf> = WalkDir::new(&scan_path) // scan targets so the progress counter spans the full job.
.into_iter() let mut files: Vec<(Library, PathBuf, String)> = Vec::new();
.filter_map(|e| e.ok()) for (lib, walk_root) in &scan_targets {
.filter(|e| e.file_type().is_file()) let lib_root = Path::new(&lib.root_path);
.filter(|e| { let scan_pb = bin_progress::spinner(format!("scanning {}", walk_root.display()));
e.path() let count_before = files.len();
for entry in WalkDir::new(walk_root).into_iter().filter_map(|e| e.ok()) {
if !entry.file_type().is_file() {
continue;
}
let abs_path = entry.path().to_path_buf();
let ext_ok = abs_path
.extension() .extension()
.and_then(|ext| ext.to_str()) .and_then(|ext| ext.to_str())
.map(|ext| all_extensions.contains(&ext.to_lowercase().as_str())) .map(|ext| all_extensions.contains(&ext.to_lowercase().as_str()))
.unwrap_or(false) .unwrap_or(false);
}) if !ext_ok {
.map(|e| e.path().to_path_buf()) continue;
.collect(); }
let rel = match abs_path.strip_prefix(lib_root) {
Ok(p) => p.to_string_lossy().replace('\\', "/"),
Err(_) => {
warn!(
"{} is not under library root {}; skipping",
abs_path.display(),
lib_root.display()
);
continue;
}
};
files.push((lib.clone(), abs_path, rel));
scan_pb.inc(1);
}
let added = files.len() - count_before;
scan_pb.finish_with_message(format!(
"scanned {} ({} media files)",
walk_root.display(),
added
));
}
let total = files.len(); let total = files.len();
println!("Found {} files\n", total); println!("\nTotal files to consider: {}\n", total);
if total == 0 { if total == 0 {
println!("Nothing to process."); println!("Nothing to process.");
@@ -213,35 +294,29 @@ async fn main() -> anyhow::Result<()> {
let mut skipped = 0usize; let mut skipped = 0usize;
let mut errors = 0usize; let mut errors = 0usize;
for (i, path) in files.iter().enumerate() { let pb = bin_progress::determinate(total as u64, "");
let relative = match path.strip_prefix(&base_path) {
Ok(p) => p.to_string_lossy().replace('\\', "/"),
Err(_) => path.to_string_lossy().replace('\\', "/"),
};
let prefix = format!("[{}/{}]", i + 1, total); for (lib, _abs_path, relative) in files.iter() {
pb.set_message(format!("{}: {}", lib.name, relative));
// Check for existing insight unless --reprocess
if !args.reprocess { if !args.reprocess {
let has_insight = insight_dao let has_insight = insight_dao
.lock() .lock()
.unwrap() .unwrap()
.get_insight(&cx, &relative) .get_insight(&cx, relative)
.unwrap_or(None) .unwrap_or(None)
.is_some(); .is_some();
if has_insight { if has_insight {
println!("{} skip {}", prefix, relative);
skipped += 1; skipped += 1;
pb.inc(1);
continue; continue;
} }
} }
println!("{} start {}", prefix, relative);
match generator match generator
.generate_agentic_insight_for_photo( .generate_agentic_insight_for_photo(
&relative, relative,
args.model.clone(), args.model.clone(),
None, None,
args.num_ctx, args.num_ctx,
@@ -256,17 +331,17 @@ async fn main() -> anyhow::Result<()> {
) )
.await .await
{ {
Ok(_) => { Ok(_) => processed += 1,
println!("{} done {}", prefix, relative);
processed += 1;
}
Err(e) => { Err(e) => {
eprintln!("{} error {}{:?}", prefix, relative, e); pb.println(format!("error {}: {}{:?}", lib.name, relative, e));
errors += 1; errors += 1;
} }
} }
pb.inc(1);
} }
pb.finish_and_clear();
println!(); println!();
println!("========================="); println!("=========================");
println!("Complete"); println!("Complete");

34
src/bin_progress.rs Normal file
View File

@@ -0,0 +1,34 @@
//! Shared progress-bar styling for the utility binaries. Centralised so every
//! `cargo run --bin ...` tool gets the same look and feel.
use indicatif::{ProgressBar, ProgressStyle};
const DETERMINATE_TEMPLATE: &str = "{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] \
{human_pos}/{human_len} ({percent}%) {per_sec} eta {eta} {msg}";
const SPINNER_TEMPLATE: &str = "{spinner:.green} [{elapsed_precise}] {human_pos} {per_sec} {msg}";
/// Determinate progress bar used when the total work is known up front.
pub fn determinate(total: u64, message: impl Into<String>) -> ProgressBar {
let pb = ProgressBar::new(total);
pb.set_style(
ProgressStyle::with_template(DETERMINATE_TEMPLATE)
.expect("hard-coded template parses")
.progress_chars("=> "),
);
pb.set_message(message.into());
pb
}
/// Spinner used for open-ended work (e.g. paginated DB scans that loop until
/// empty). Throughput is shown via `{per_sec}`; tick at a steady cadence so
/// it animates even when work is bursty.
pub fn spinner(message: impl Into<String>) -> ProgressBar {
let pb = ProgressBar::new_spinner();
pb.set_style(
ProgressStyle::with_template(SPINNER_TEMPLATE).expect("hard-coded template parses"),
);
pb.set_message(message.into());
pb.enable_steady_tick(std::time::Duration::from_millis(120));
pb
}

View File

@@ -1,8 +1,9 @@
use crate::bin_progress;
use crate::cleanup::database_updater::DatabaseUpdater; use crate::cleanup::database_updater::DatabaseUpdater;
use crate::cleanup::types::{CleanupConfig, CleanupStats}; use crate::cleanup::types::{CleanupConfig, CleanupStats};
use crate::file_types::IMAGE_EXTENSIONS; use crate::file_types::IMAGE_EXTENSIONS;
use anyhow::Result; use anyhow::Result;
use log::{error, warn}; use log::error;
use std::path::PathBuf; use std::path::PathBuf;
// All supported image extensions to try // All supported image extensions to try
@@ -25,15 +26,17 @@ pub fn resolve_missing_files(
stats.files_checked = all_paths.len(); stats.files_checked = all_paths.len();
println!("Checking file existence...");
let mut missing_count = 0; let mut missing_count = 0;
let mut resolved_count = 0; let mut resolved_count = 0;
let pb = bin_progress::determinate(stats.files_checked as u64, "checking");
for path_str in all_paths { for path_str in all_paths {
let full_path = config.base_path.join(&path_str); let full_path = config.base_path.join(&path_str);
// Check if file exists // Check if file exists
if full_path.exists() { if full_path.exists() {
pb.inc(1);
continue; continue;
} }
@@ -43,16 +46,16 @@ pub fn resolve_missing_files(
// Try to find the file with different extensions // Try to find the file with different extensions
match find_file_with_alternative_extension(&config.base_path, &path_str) { match find_file_with_alternative_extension(&config.base_path, &path_str) {
Some(new_path_str) => { Some(new_path_str) => {
println!( pb.println(format!(
"{} → found as {} {}", "{} → found as {}{}",
path_str, path_str,
new_path_str, new_path_str,
if config.dry_run { if config.dry_run {
"(dry-run, not updated)" " (dry-run, not updated)"
} else { } else {
"" ""
} }
); ));
if !config.dry_run { if !config.dry_run {
// Update database // Update database
@@ -71,11 +74,18 @@ pub fn resolve_missing_files(
} }
} }
None => { None => {
warn!("{} → not found with any extension", path_str); pb.println(format!("{} not found with any extension", path_str));
} }
} }
pb.set_message(format!(
"missing={} resolved={}",
missing_count, resolved_count
));
pb.inc(1);
} }
pb.finish_and_clear();
println!("\nResults:"); println!("\nResults:");
println!("- Files checked: {}", stats.files_checked); println!("- Files checked: {}", stats.files_checked);
println!("- Missing files: {}", missing_count); println!("- Missing files: {}", missing_count);

View File

@@ -1,7 +1,9 @@
use crate::bin_progress;
use crate::cleanup::database_updater::DatabaseUpdater; use crate::cleanup::database_updater::DatabaseUpdater;
use crate::cleanup::file_type_detector::{detect_file_type, should_rename}; use crate::cleanup::file_type_detector::{detect_file_type, should_rename};
use crate::cleanup::types::{CleanupConfig, CleanupStats}; use crate::cleanup::types::{CleanupConfig, CleanupStats};
use anyhow::Result; use anyhow::Result;
use indicatif::ProgressBar;
use log::{error, warn}; use log::{error, warn};
use std::fs; use std::fs;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@@ -32,16 +34,20 @@ pub fn validate_file_types(
println!("Files found: {}\n", files.len()); println!("Files found: {}\n", files.len());
stats.files_checked = files.len(); stats.files_checked = files.len();
println!("Detecting file types...");
let mut mismatches_found = 0; let mut mismatches_found = 0;
let mut files_renamed = 0; let mut files_renamed = 0;
let mut user_skipped = 0; let mut user_skipped = 0;
let pb = bin_progress::determinate(files.len() as u64, "detecting");
for file_path in files { for file_path in files {
// Get current extension // Get current extension
let current_ext = match file_path.extension() { let current_ext = match file_path.extension() {
Some(ext) => ext.to_str().unwrap_or(""), Some(ext) => ext.to_str().unwrap_or(""),
None => continue, // Skip files without extensions None => {
pb.inc(1);
continue;
}
}; };
// Detect actual file type // Detect actual file type
@@ -57,14 +63,15 @@ pub fn validate_file_types(
Ok(rel) => rel.to_str().unwrap_or(""), Ok(rel) => rel.to_str().unwrap_or(""),
Err(_) => { Err(_) => {
error!("Failed to get relative path for {:?}", file_path); error!("Failed to get relative path for {:?}", file_path);
pb.inc(1);
continue; continue;
} }
}; };
println!("\nFile type mismatch:"); pb.println(format!(
println!(" Path: {}", relative_path); "mismatch: {} .{} → .{}",
println!(" Current: .{}", current_ext); relative_path, current_ext, detected_ext
println!(" Actual: .{}", detected_ext); ));
// Calculate new path // Calculate new path
let new_file_path = file_path.with_extension(&detected_ext); let new_file_path = file_path.with_extension(&detected_ext);
@@ -72,6 +79,7 @@ pub fn validate_file_types(
Ok(rel) => rel.to_str().unwrap_or(""), Ok(rel) => rel.to_str().unwrap_or(""),
Err(_) => { Err(_) => {
error!("Failed to get new relative path for {:?}", new_file_path); error!("Failed to get new relative path for {:?}", new_file_path);
pb.inc(1);
continue; continue;
} }
}; };
@@ -83,22 +91,26 @@ pub fn validate_file_types(
"Destination exists for {}: {}", "Destination exists for {}: {}",
relative_path, new_relative_path relative_path, new_relative_path
)); ));
pb.inc(1);
continue; continue;
} }
// Determine if we should proceed // Determine if we should proceed
let should_proceed = if config.dry_run { let should_proceed = if config.dry_run {
println!(" (dry-run mode - would rename to {})", new_relative_path); pb.println(format!(
" (dry-run — would rename to {})",
new_relative_path
));
false false
} else if skip_all { } else if skip_all {
println!(" Skipped (skip all)");
user_skipped += 1; user_skipped += 1;
false false
} else if auto_fix_all { } else if auto_fix_all {
true true
} else { } else {
// Interactive prompt // Interactive prompt — suspend the bar so the prompt is visible.
match prompt_for_rename(new_relative_path) { let decision = pb.suspend(|| prompt_for_rename(new_relative_path, &pb));
match decision {
RenameDecision::Yes => true, RenameDecision::Yes => true,
RenameDecision::No => { RenameDecision::No => {
user_skipped += 1; user_skipped += 1;
@@ -120,8 +132,6 @@ pub fn validate_file_types(
// Rename the file // Rename the file
match fs::rename(&file_path, &new_file_path) { match fs::rename(&file_path, &new_file_path) {
Ok(_) => { Ok(_) => {
println!("✓ Renamed file");
// Update database // Update database
match db_updater.update_file_path(relative_path, new_relative_path) match db_updater.update_file_path(relative_path, new_relative_path)
{ {
@@ -160,8 +170,15 @@ pub fn validate_file_types(
warn!("Failed to detect type for {:?}: {:?}", file_path, e); warn!("Failed to detect type for {:?}: {:?}", file_path, e);
} }
} }
pb.set_message(format!(
"mismatches={} renamed={} skipped={}",
mismatches_found, files_renamed, user_skipped
));
pb.inc(1);
} }
pb.finish_and_clear();
println!("\nResults:"); println!("\nResults:");
println!("- Files scanned: {}", stats.files_checked); println!("- Files scanned: {}", stats.files_checked);
println!("- Mismatches found: {}", mismatches_found); println!("- Mismatches found: {}", mismatches_found);
@@ -195,8 +212,9 @@ enum RenameDecision {
SkipAll, SkipAll,
} }
/// Prompt the user for rename decision /// Prompt the user for rename decision. Caller must `pb.suspend` so the
fn prompt_for_rename(new_path: &str) -> RenameDecision { /// progress bar isn't redrawing over the prompt.
fn prompt_for_rename(new_path: &str, _pb: &ProgressBar) -> RenameDecision {
println!("\nRename to {}?", new_path); println!("\nRename to {}?", new_path);
println!(" [y] Yes"); println!(" [y] Yes");
println!(" [n] No (default)"); println!(" [n] No (default)");

View File

@@ -5,6 +5,7 @@ extern crate diesel;
pub mod ai; pub mod ai;
pub mod auth; pub mod auth;
pub mod bin_progress;
pub mod cleanup; pub mod cleanup;
pub mod content_hash; pub mod content_hash;
pub mod data; pub mod data;