Files
ImageApi/src/main.rs
T
Cameron Cordes 03699f7413 Add TTS voice deletion, async speech jobs, voice-list cache, ref-seconds name tags
- DELETE /tts/voices/{name}: remove a cloned voice via the llama-swap
  passthrough (upstream chatterbox-tts-api exposes DELETE /voices/{name}).
- POST/GET/DELETE /tts/speech/jobs: durable job flow for long syntheses —
  dispatch returns 202 + job id, the synth queues on the GPU permit instead
  of fast-failing 429, and clients poll for the result (kept ~10 min).
- GET /tts/voices now serves an in-memory cache so listing voices doesn't
  make llama-swap spin up the TTS model (evicting the resident LLM);
  invalidated on create/delete, ?refresh=1 forces an upstream re-query.
- Created voice names are tagged with LLAMA_SWAP_TTS_REF_SECONDS (e.g.
  grandma-30s) so the library shows which ref length produced each clone.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 17:36:15 -04:00

439 lines
18 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#![allow(clippy::too_many_arguments)]
#[macro_use]
extern crate diesel;
extern crate rayon;
use actix_web::web::Data;
use actix_web_prom::PrometheusMetricsBuilder;
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
use std::collections::HashMap;
use std::env;
use std::sync::{Arc, Mutex};
use actix_cors::Cors;
use actix_governor::{Governor, GovernorConfigBuilder};
use actix_multipart as mp;
use actix_web::{App, HttpResponse, HttpServer, middleware, web};
use diesel::sqlite::Sqlite;
use std::error::Error;
use crate::ai::InsightGenerator;
use crate::auth::login;
use crate::data::*;
use crate::database::*;
use crate::files::{RealFileSystem, move_file};
use crate::service::ServiceBuilder;
use crate::state::AppState;
use crate::tags::*;
use log::{error, info};
mod ai;
mod auth;
mod backfill;
mod clip_search;
mod clip_watch;
mod content_hash;
mod data;
mod database;
mod date_resolver;
mod duplicates;
mod error;
mod exif;
mod face_watch;
mod faces;
mod file_scan;
mod file_types;
mod files;
mod geo;
mod handlers;
mod hls_stats;
mod libraries;
mod library_maintenance;
mod perceptual_hash;
mod state;
mod tags;
mod thumbnails;
mod utils;
mod video;
mod watcher;
mod knowledge;
mod memories;
mod otel;
mod personas;
mod service;
#[cfg(test)]
mod testhelpers;
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
fn main() -> std::io::Result<()> {
if let Err(err) = dotenv::dotenv() {
println!("Error parsing .env {:?}", err);
}
run_migrations(&mut connect()).expect("Failed to run migrations");
// Recover orphaned insight generation jobs from a previous crash.
{
use crate::database::{InsightGenerationJobDao, SqliteInsightGenerationJobDao};
let mut dao = SqliteInsightGenerationJobDao::new();
let ctx = opentelemetry::Context::new();
match dao.recover_orphaned_jobs(&ctx) {
Ok(n) if n > 0 => {
info!("Recovered {} orphaned insight generation jobs", n);
}
Ok(_) => {}
Err(e) => {
log::warn!("Failed to recover orphaned insight jobs: {:?}", e);
}
}
}
// One-shot retirement of the pre-content-hash HLS layout. Idempotent
// — a second boot finds nothing and reports zero deletions, so it's
// safe to leave wired in until the module is removed in a later
// release. Runs before the actor pipeline starts so we never race a
// PlaylistGenerator write against this rm.
{
let video_path = env::var("VIDEO_PATH").expect("VIDEO_PATH was not set in the env");
video::legacy_migration::retire_legacy_hls_output(std::path::Path::new(&video_path));
}
let system = actix::System::new();
system.block_on(async {
// Just use basic logger when running a non-release build
#[cfg(debug_assertions)]
{
env_logger::init();
}
#[cfg(not(debug_assertions))]
{
otel::init_logs();
otel::init_tracing();
}
// AppState construction loads (and seeds if needed) the libraries
// table; we use that list to drive the initial thumbnail sweep.
let app_data = Data::new(AppState::default());
// Kick thumbnail generation onto a background thread so the HTTP
// server can accept traffic while large libraries are backfilling.
// Existing thumbs are re-used (exists() check inside the walk),
// so missed files are filled in over successive scans.
{
let libs = app_data.libraries.clone();
let excluded = app_data.excluded_dirs.clone();
std::thread::spawn(move || {
thumbnails::create_thumbnails(&libs, &excluded);
});
}
// generate_video_gifs().await;
let labels = HashMap::new();
let prometheus = PrometheusMetricsBuilder::new("api")
.const_labels(labels)
.build()
.expect("Unable to build prometheus metrics middleware");
prometheus
.registry
.register(Box::new(thumbnails::IMAGE_GAUGE.clone()))
.unwrap();
prometheus
.registry
.register(Box::new(thumbnails::VIDEO_GAUGE.clone()))
.unwrap();
// HLS readiness gauges. Updated by the watcher every full-scan
// tick and on every `/hls/stats` request. See `hls_stats`.
prometheus
.registry
.register(Box::new(hls_stats::HLS_VIDEOS_TOTAL.clone()))
.unwrap();
prometheus
.registry
.register(Box::new(hls_stats::HLS_VIDEOS_WITH_PLAYLIST.clone()))
.unwrap();
prometheus
.registry
.register(Box::new(hls_stats::HLS_VIDEOS_PENDING.clone()))
.unwrap();
prometheus
.registry
.register(Box::new(hls_stats::HLS_VIDEOS_UNSUPPORTED.clone()))
.unwrap();
let app_state = app_data.clone();
// Start file watcher with playlist manager and preview generator.
// The watcher's first tick is configured to be a full scan (see
// `watch_files`), so every library's missing HLS playlists are
// queued on that first iteration — no separate startup walk
// needed.
let playlist_mgr_for_watcher = app_state.playlist_manager.as_ref().clone();
let preview_gen_for_watcher = app_state.preview_clip_generator.as_ref().clone();
// Both background jobs read from the shared `live_libraries` lock
// so a PATCH /libraries/{id} that flips `enabled` or edits
// `excluded_dirs` takes effect on the next watcher tick / cleanup
// tick without an ImageApi restart.
watcher::watch_files(
app_state.live_libraries.clone(),
playlist_mgr_for_watcher,
preview_gen_for_watcher,
app_state.face_client.clone(),
app_state.clip_client.clone(),
app_state.excluded_dirs.clone(),
app_state.library_health.clone(),
);
// Start orphaned playlist cleanup job. Multi-library aware: walks
// every configured library when looking for the source video, and
// skips the whole cycle while any library is stale (a missing
// source is indistinguishable from a transiently-unmounted share).
watcher::cleanup_orphaned_playlists(
app_state.live_libraries.clone(),
app_state.excluded_dirs.clone(),
app_state.library_health.clone(),
);
// Periodically clean up stale turn entries from the in-memory
// registry. Runs at the same interval as the configured timeout,
// drops entries older than that timeout.
{
let registry = app_state.turn_registry.clone();
let timeout_secs = registry.timeout_secs();
tokio::spawn(async move {
// Sweep at most every 5 minutes, and never less often than the
// timeout itself — otherwise entries could linger up to ~2× the
// configured timeout before being reclaimed.
let interval_secs = timeout_secs.clamp(1, 300);
let interval = tokio::time::Duration::from_secs(interval_secs);
loop {
tokio::time::sleep(interval).await;
let cleaned = registry.cleanup_stale().await;
if cleaned > 0 {
log::info!("TurnRegistry: cleaned up {cleaned} stale entries");
}
}
});
}
// Spawn background job to generate daily conversation summaries
{
use crate::ai::generate_daily_summaries;
use crate::database::{DailySummaryDao, SqliteDailySummaryDao};
use chrono::NaiveDate;
// Configure date range for summary generation
// Default: August 2024 ±30 days (July 1 - September 30, 2024)
// To expand: change start_date and end_date
let start_date = Some(NaiveDate::from_ymd_opt(2015, 10, 1).unwrap());
let end_date = Some(NaiveDate::from_ymd_opt(2020, 1, 1).unwrap());
// let contacts_to_summarize = vec!["Domenique", "Zach", "Paul"]; // Add more contacts as needed
let contacts_to_summarize = vec![]; // Add more contacts as needed
let ollama = app_state.ollama.clone();
let sms_client = app_state.sms_client.clone();
for contact in contacts_to_summarize {
let ollama_clone = ollama.clone();
let sms_client_clone = sms_client.clone();
let summary_dao: Arc<Mutex<Box<dyn DailySummaryDao>>> =
Arc::new(Mutex::new(Box::new(SqliteDailySummaryDao::new())));
let start = start_date;
let end = end_date;
tokio::spawn(async move {
info!("Starting daily summary generation for {}", contact);
if let Err(e) = generate_daily_summaries(
contact,
start,
end,
&ollama_clone,
&sms_client_clone,
summary_dao,
)
.await
{
error!("Daily summary generation failed for {}: {:?}", contact, e);
} else {
info!("Daily summary generation completed for {}", contact);
}
});
}
}
HttpServer::new(move || {
let user_dao = SqliteUserDao::new();
let favorites_dao = SqliteFavoriteDao::new();
let tag_dao = SqliteTagDao::default();
let exif_dao = SqliteExifDao::new();
let insight_dao = SqliteInsightDao::new();
let preview_dao = SqlitePreviewDao::new();
let face_dao = faces::SqliteFaceDao::new();
let cors = Cors::default()
.allowed_origin_fn(|origin, _req_head| {
// Allow all origins in development, or check against CORS_ALLOWED_ORIGINS env var
if let Ok(allowed_origins) = env::var("CORS_ALLOWED_ORIGINS") {
allowed_origins
.split(',')
.any(|allowed| origin.as_bytes() == allowed.trim().as_bytes())
} else {
// Default: allow all origins if not configured
true
}
})
.allowed_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"])
.allowed_headers(vec![
actix_web::http::header::AUTHORIZATION,
actix_web::http::header::ACCEPT,
actix_web::http::header::CONTENT_TYPE,
])
.supports_credentials()
.max_age(3600);
// Configure rate limiting for login endpoint (2 requests/sec, burst of 5)
let governor_conf = GovernorConfigBuilder::default()
.per_second(2)
.burst_size(5)
.finish()
.unwrap();
App::new()
.wrap(middleware::Logger::default())
.wrap(cors)
.service(
web::resource("/login")
.wrap(Governor::new(&governor_conf))
.route(web::post().to(login::<SqliteUserDao>)),
)
.service(
web::resource("/photos")
.route(web::get().to(files::list_photos::<SqliteTagDao, RealFileSystem>)),
)
.service(
web::resource("/photos/gps-summary")
.route(web::get().to(files::get_gps_summary)),
)
.service(
web::resource("/photos/exif").route(web::get().to(files::list_exif_summary)),
)
.service(
// Semantic search via CLIP embeddings. See
// src/clip_search.rs for the request/response shape.
web::resource("/photos/search")
.route(web::get().to(clip_search::search_photos)),
)
.service(web::resource("/file/move").post(move_file::<RealFileSystem>))
.service(handlers::image::get_image)
.service(handlers::image::upload_image)
.service(handlers::video::generate_video)
.service(handlers::video::stream_hls_file)
.service(handlers::video::get_video_preview)
.service(handlers::video::get_preview_status)
.service(hls_stats::hls_stats_handler)
.service(handlers::favorites::favorites)
.service(handlers::favorites::put_add_favorite)
.service(handlers::favorites::delete_favorite)
.service(handlers::image::get_file_metadata)
.service(handlers::image::set_image_gps)
.service(handlers::image::set_image_date)
.service(handlers::image::clear_image_date)
.service(handlers::image::get_full_exif)
.service(memories::list_memories)
.service(ai::generate_insight_handler)
.service(ai::generate_agentic_insight_handler)
.service(ai::generation_status_handler)
.service(ai::cancel_generation_handler)
.service(ai::get_insight_handler)
.service(ai::delete_insight_handler)
.service(ai::get_all_insights_handler)
.service(ai::get_insight_history_handler)
.service(ai::get_available_models_handler)
.service(ai::get_openrouter_models_handler)
.service(ai::chat_turn_handler)
.service(ai::chat_stream_handler)
.service(ai::chat_history_handler)
.service(ai::chat_rewind_handler)
.service(ai::turn_async_handler)
.service(ai::turn_replay_handler)
.service(ai::cancel_turn_handler)
.service(ai::rate_insight_handler)
.service(ai::export_training_data_handler)
.service(ai::tts_speech_handler)
.service(ai::create_speech_job_handler)
.service(ai::speech_job_status_handler)
.service(ai::cancel_speech_job_handler)
.service(ai::list_voices_handler)
.service(ai::create_voice_upload_handler)
.service(ai::create_voice_from_library_handler)
.service(ai::delete_voice_handler)
.service(libraries::list_libraries)
.service(libraries::patch_library)
.add_feature(add_tag_services::<_, SqliteTagDao>)
.add_feature(knowledge::add_knowledge_services::<_, SqliteKnowledgeDao>)
.add_feature(personas::add_persona_services)
.add_feature(faces::add_face_services::<_, faces::SqliteFaceDao>)
.add_feature(duplicates::add_duplicate_services)
.app_data(app_data.clone())
.app_data::<Data<RealFileSystem>>(Data::new(RealFileSystem::new(
app_data.base_path.clone(),
)))
.app_data::<Data<Mutex<SqliteUserDao>>>(Data::new(Mutex::new(user_dao)))
.app_data::<Data<Mutex<Box<dyn FavoriteDao>>>>(Data::new(Mutex::new(Box::new(
favorites_dao,
))))
.app_data::<Data<Mutex<SqliteTagDao>>>(Data::new(Mutex::new(tag_dao)))
.app_data::<Data<Mutex<Box<dyn ExifDao>>>>(Data::new(Mutex::new(Box::new(
exif_dao,
))))
.app_data::<Data<Mutex<Box<dyn InsightDao>>>>(Data::new(Mutex::new(Box::new(
insight_dao,
))))
.app_data::<Data<Mutex<Box<dyn PreviewDao>>>>(Data::new(Mutex::new(Box::new(
preview_dao,
))))
.app_data::<Data<Mutex<SqliteKnowledgeDao>>>(Data::new(Mutex::new(
SqliteKnowledgeDao::new(),
)))
.app_data::<Data<Mutex<Box<dyn database::PersonaDao>>>>(Data::new(Mutex::new(
Box::new(database::SqlitePersonaDao::new()),
)))
.app_data::<Data<Mutex<faces::SqliteFaceDao>>>(Data::new(Mutex::new(face_dao)))
.app_data::<Data<crate::ai::face_client::FaceClient>>(Data::new(
app_data.face_client.clone(),
))
.app_data(mp::form::MultipartFormConfig::default().total_limit(1024 * 1024 * 1024)) // 1GB upload limit
.app_data(web::JsonConfig::default().error_handler(|err, req| {
let detail = err.to_string();
log::warn!(
"JSON parse error on {} {}: {}",
req.method(),
req.uri(),
detail
);
let response =
HttpResponse::BadRequest().json(serde_json::json!({"error": detail}));
actix_web::error::InternalError::from_response(err, response).into()
}))
.app_data::<Data<InsightGenerator>>(Data::new(app_data.insight_generator.clone()))
.wrap(prometheus.clone())
})
.bind(dotenv::var("BIND_URL").unwrap())?
.bind("localhost:8088")?
.run()
.await
})
}
fn run_migrations(
connection: &mut impl MigrationHarness<Sqlite>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
connection.run_pending_migrations(MIGRATIONS)?;
Ok(())
}