Files
ImageApi/src/ai/tts.rs
T
Cameron Cordes 1017fe73af Include start offset in voice-name window tag
Clones that don't start at 0:00 are tagged with where the reference
window begins (grandma-at1m32s-30s), so voices cloned from different
sections of the same source are distinguishable in the voice list.
Zero-start names keep the existing -30s form.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 16:21:41 -04:00

1279 lines
49 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// TTS endpoints: proxy text-to-speech + voice-library management to the
// Chatterbox server that sits behind llama-swap (via LlamaCppClient). Speech
// synthesis returns audio as base64-in-JSON so the mobile app can play it as a
// `data:` URI without a binary-fetch path. Voice cloning registers a named
// voice from either an uploaded clip (device) or an existing library file
// (audio read directly; video has its audio track extracted via ffmpeg).
use actix_multipart::Multipart;
use actix_web::{HttpRequest, HttpResponse, Responder, delete, get, post, web};
use anyhow::Context;
use base64::Engine;
use bytes::{BufMut, BytesMut};
use futures::StreamExt;
use opentelemetry::KeyValue;
use opentelemetry::trace::{Span, Status, Tracer};
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::collections::HashMap;
use std::path::Path;
use std::sync::{LazyLock, Mutex as StdMutex};
use std::time::{Duration, Instant};
use tokio::sync::Semaphore;
use uuid::Uuid;
use crate::data::Claims;
use crate::file_types::{is_audio_file, is_video_file};
use crate::files::is_valid_full_path;
use crate::libraries;
use crate::otel::{extract_context_from_request, global_tracer};
use crate::state::AppState;
/// Hard cap on an uploaded voice-reference clip. Chatterbox itself caps the
/// payload (~60s clip); this is a defensive ceiling so a hostile/oversized
/// upload can't balloon ImageApi memory before we ever forward it.
const MAX_VOICE_UPLOAD_BYTES: usize = 25 * 1024 * 1024; // 25 MB
/// Serialize speech synthesis: the Chatterbox server has no internal lock or
/// queue, so concurrent requests contend on the single GPU and cascade into
/// timeouts. One permit; when busy we fast-fail with 429 rather than queue —
/// the app surfaces "busy" immediately, and typical jobs clear in well under a
/// minute. (An abandoned upstream job can still occupy the GPU until it
/// finishes — that's a wrapper limitation; the chunked-queue plan fixes it.)
static TTS_PERMIT: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
// --- Voice-list cache --------------------------------------------------------
/// Cached raw voice-library JSON. llama-swap's `/upstream/<model>/voices`
/// passthrough spins the TTS model up just to answer a listing — which can
/// evict the resident LLM — so we serve a cached copy and only hit upstream on
/// a cold cache, an explicit `?refresh=1`, or after a voice create/delete
/// invalidates it (the TTS model is already loaded right then anyway).
static VOICES_CACHE: LazyLock<StdMutex<Option<Value>>> = LazyLock::new(|| StdMutex::new(None));
fn cached_voices() -> Option<Value> {
VOICES_CACHE.lock().unwrap().clone()
}
fn store_voices_cache(v: &Value) {
*VOICES_CACHE.lock().unwrap() = Some(v.clone());
}
fn invalidate_voices_cache() {
*VOICES_CACHE.lock().unwrap() = None;
}
// --- Async speech jobs -------------------------------------------------------
//
// Synthesizing a long insight can take minutes — too long to hang one HTTP
// request from a phone that may background the app or drop the connection.
// Durable variant: POST /tts/speech/jobs returns a job id immediately, the
// synth runs in a spawned task (queuing on TTS_PERMIT instead of fast-failing
// 429), and the client polls GET /tts/speech/jobs/{id} until it collects the
// audio. State is in-memory only (deliberately lighter than the chat
// TurnRegistry): a restart loses jobs, the client surfaces that and retries.
#[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum TtsJobStatus {
Queued,
Running,
Done,
Error,
Cancelled,
}
impl TtsJobStatus {
fn is_terminal(self) -> bool {
matches!(self, Self::Done | Self::Error | Self::Cancelled)
}
}
struct TtsJob {
status: TtsJobStatus,
format: String,
audio_base64: Option<String>,
error: Option<String>,
created_at: Instant,
finished_at: Option<Instant>,
abort: Option<tokio::task::AbortHandle>,
}
/// Finished jobs linger so a client that lost connectivity can still collect
/// the result on a later poll; anything older than MAX_AGE is dropped outright
/// (aborted first if somehow still running). Swept lazily on each dispatch.
const TTS_JOB_RESULT_TTL: Duration = Duration::from_secs(10 * 60);
const TTS_JOB_MAX_AGE: Duration = Duration::from_secs(30 * 60);
static TTS_JOBS: LazyLock<StdMutex<HashMap<Uuid, TtsJob>>> =
LazyLock::new(|| StdMutex::new(HashMap::new()));
fn sweep_stale_jobs(jobs: &mut HashMap<Uuid, TtsJob>, now: Instant) {
jobs.retain(|_, job| {
let result_expired = job
.finished_at
.is_some_and(|t| now.duration_since(t) >= TTS_JOB_RESULT_TTL);
let too_old = now.duration_since(job.created_at) >= TTS_JOB_MAX_AGE;
if too_old && let Some(h) = job.abort.take() {
h.abort();
}
!(result_expired || too_old)
});
}
/// Run `f` against a job, if it still exists.
fn with_job<R>(id: Uuid, f: impl FnOnce(&mut TtsJob) -> R) -> Option<R> {
TTS_JOBS.lock().unwrap().get_mut(&id).map(f)
}
/// Move a job to a terminal state (first terminal write wins — a cancel that
/// raced a completion keeps the cancel).
fn finish_job(id: Uuid, status: TtsJobStatus, audio_base64: Option<String>, error: Option<String>) {
with_job(id, |job| {
if job.status.is_terminal() {
return;
}
job.status = status;
job.audio_base64 = audio_base64;
job.error = error;
job.finished_at = Some(Instant::now());
job.abort = None;
});
}
/// Sanitize a user-supplied voice name. The name is forwarded to Chatterbox
/// where it becomes a filename in the voice-library directory, so we restrict
/// it to a safe charset (alphanumerics, dash, underscore) — no path
/// separators, dots, or whitespace — and bound its length. Returns `None`
/// when nothing usable remains.
fn sanitize_voice_name(raw: &str) -> Option<String> {
let cleaned: String = raw
.trim()
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
c
} else {
'-'
}
})
.collect();
let cleaned = cleaned.trim_matches('-').to_string();
if cleaned.is_empty() {
return None;
}
Some(cleaned.chars().take(64).collect())
}
/// Reference-clip cap in seconds for voice cloning. Chatterbox is zero-shot —
/// a clean ~1020s sample is the sweet spot and more rarely helps. Tune via
/// `LLAMA_SWAP_TTS_REF_SECONDS` (default 30).
fn tts_ref_seconds() -> u32 {
std::env::var("LLAMA_SWAP_TTS_REF_SECONDS")
.ok()
.and_then(|s| s.trim().parse::<u32>().ok())
.filter(|n| *n > 0)
.unwrap_or(30)
}
/// Tag a (sanitized) voice name with the reference window used to create it:
/// `grandma` → `grandma-30s` (from the start), or `grandma-at1m32s-30s` (30s
/// window starting at 1:32). The tag makes the window visible in the voice
/// list so clones of the same source from different sections can be compared.
/// Skips the append when the name already ends in the same tag; keeps the
/// 64-char bound by truncating the base name, never the tag.
fn append_ref_window(name: &str, start: f64, secs: u32) -> String {
let start_whole = start.round().max(0.0) as u64;
let suffix = if start_whole > 0 {
// ':' isn't in the safe voice-name charset, so 1:32 becomes 1m32s.
let at = if start_whole >= 60 {
format!("at{}m{:02}s", start_whole / 60, start_whole % 60)
} else {
format!("at{start_whole}s")
};
format!("-{at}-{secs}s")
} else {
format!("-{secs}s")
};
if name.ends_with(&suffix) {
return name.to_string();
}
let max_base = 64usize.saturating_sub(suffix.len());
let base: String = name.chars().take(max_base).collect();
let base = base.trim_end_matches('-');
format!("{base}{suffix}")
}
/// Resolve a caller-supplied reference window into concrete `(start, duration)`
/// seconds for ffmpeg. Start defaults to 0; duration defaults to the
/// `tts_ref_seconds` cap and is clamped to it (the cap is the most audio the
/// TTS backend benefits from, so longer requests are quietly bounded rather
/// than rejected). Non-finite or negative values are the caller's bug → Err.
fn resolve_ref_window(
start_seconds: Option<f64>,
duration_seconds: Option<f64>,
) -> Result<(f64, f64), String> {
let cap = f64::from(tts_ref_seconds());
let start = start_seconds.unwrap_or(0.0);
if !start.is_finite() || start < 0.0 {
return Err("start_seconds must be a non-negative number".to_string());
}
let duration = duration_seconds.unwrap_or(cap);
if !duration.is_finite() || duration <= 0.0 {
return Err("duration_seconds must be a positive number".to_string());
}
Ok((start, duration.min(cap)))
}
/// Optional default voice for synthesis when the request doesn't name one.
/// Set `LLAMA_SWAP_TTS_VOICE=m` to read insights in a cloned voice by default.
fn default_voice() -> Option<String> {
std::env::var("LLAMA_SWAP_TTS_VOICE")
.ok()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
}
// Markdown / formatting strippers, compiled once. Insight text is markdown,
// which TTS would otherwise read literally ("star star bold star star").
static MD_IMAGE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"!\[([^\]]*)\]\([^)]*\)").unwrap());
static MD_LINK: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\[([^\]]+)\]\([^)]*\)").unwrap());
static MD_HEADING: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"(?m)^\s{0,3}#{1,6}\s*").unwrap());
static MD_BLOCKQUOTE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"(?m)^\s{0,3}>\s?").unwrap());
static MD_LIST: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"(?m)^\s{0,3}([-*+]|\d+\.)\s+").unwrap());
static MD_EMPHASIS: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"[*_`~]+").unwrap());
static URL_RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"https?://\S+").unwrap());
static MULTISPACE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"[ \t]{2,}").unwrap());
// Any run of 2+ newlines (incl. whitespace-only blank lines) collapses to ONE
// newline: Chatterbox inserts a long pause (sometimes ~20s of silence) per
// blank line, so paragraph breaks must reach it as a single line break at most.
static MULTINEWLINE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\n(?:[ \t]*\n)+").unwrap());
/// True for emoji / pictographic symbols, which most TTS models either skip or
/// mispronounce. Covers the main emoji blocks plus dingbats, misc-technical,
/// variation selectors, and the ZWJ used to glue emoji sequences. We do NOT
/// strip `[bracketed]` tags — non-turbo Chatterbox ignores them, and a future
/// Turbo switch uses them as paralinguistic cues.
fn is_emoji_like(c: char) -> bool {
let u = c as u32;
matches!(u,
0x1F000..=0x1FAFF // emoji, pictographs, supplemental symbols, flags
| 0x2300..=0x23FF // misc technical (⌚ ⏰ ⏳ …)
| 0x2600..=0x27BF // misc symbols + dingbats
| 0x2B00..=0x2BFF // misc symbols & arrows (★ ⬆ …)
| 0xFE00..=0xFE0F // variation selectors
| 0x200D // zero-width joiner
)
}
/// Normalize insight text for speech: unwrap markdown links/images to their
/// visible text, drop heading/list/blockquote/emphasis markers and URLs, strip
/// emoji, and collapse whitespace. Centralized here so every caller (app,
/// WebUI, curl) gets clean audio.
fn clean_for_tts(input: &str) -> String {
let s = MD_IMAGE.replace_all(input, "$1");
let s = MD_LINK.replace_all(&s, "$1");
let s = MD_HEADING.replace_all(&s, "");
let s = MD_BLOCKQUOTE.replace_all(&s, "");
let s = MD_LIST.replace_all(&s, "");
let s = MD_EMPHASIS.replace_all(&s, "");
let s = URL_RE.replace_all(&s, " ");
let s: String = s.chars().filter(|c| !is_emoji_like(*c)).collect();
let s = MULTISPACE.replace_all(&s, " ");
let s = MULTINEWLINE.replace_all(&s, "\n");
s.trim().to_string()
}
/// Full text-preparation pipeline for synthesis: markdown/emoji cleanup, then
/// the user's pronunciation overrides (see [`crate::ai::pronunciation`]) on
/// the resulting plain text — after cleanup so word boundaries aren't
/// obscured by `**WSL**`-style markup.
fn prepare_for_tts(input: &str) -> String {
crate::ai::pronunciation::apply_pronunciations(&clean_for_tts(input))
}
/// Decode an audio/video file to mono 24 kHz WAV via ffmpeg, returning the WAV
/// bytes. Chatterbox validates the reference clip by file *extension* and
/// rejects several formats (e.g. `.aac`, `.opus`), so we always normalize to
/// WAV regardless of the source container. Extracts `duration` seconds starting
/// at `start` (see resolve_ref_window) — references only need a few seconds of
/// clean speech, which may sit anywhere in a long recording.
async fn run_ffmpeg_to_wav(input_path: &str, start: f64, duration: f64) -> anyhow::Result<Vec<u8>> {
let out = tempfile::Builder::new()
.suffix(".wav")
.tempfile()
.context("creating temp wav")?;
let out_s = out.path().to_string_lossy().to_string();
let start_s = format!("{start}");
let secs = format!("{duration}");
// -ss before -i is input seeking: fast, and frame accuracy doesn't matter
// for picking a speech window.
let mut args: Vec<&str> = vec!["-y"];
if start > 0.0 {
args.extend(["-ss", &start_s]);
}
args.extend([
"-i", input_path, "-vn", "-ac", "1", "-ar", "24000", "-t", &secs, "-f", "wav", &out_s,
]);
let output = tokio::process::Command::new("ffmpeg")
.args(&args)
.output()
.await
.context("spawning ffmpeg")?;
if !output.status.success() {
anyhow::bail!("ffmpeg failed: {}", String::from_utf8_lossy(&output.stderr));
}
std::fs::read(&out_s).context("reading transcoded audio")
}
/// Normalize in-memory upload bytes to WAV: write to a temp file (keeping the
/// source extension as an ffmpeg probe hint) then transcode.
async fn transcode_bytes_to_wav(
input: &[u8],
src_ext: Option<&str>,
start: f64,
duration: f64,
) -> anyhow::Result<Vec<u8>> {
let suffix = src_ext
.filter(|e| !e.is_empty())
.map(|e| format!(".{e}"))
.unwrap_or_else(|| ".bin".to_string());
let in_tmp = tempfile::Builder::new()
.suffix(&suffix)
.tempfile()
.context("creating temp input")?;
std::fs::write(in_tmp.path(), input).context("writing temp input")?;
run_ffmpeg_to_wav(&in_tmp.path().to_string_lossy(), start, duration).await
}
#[derive(Debug, Deserialize)]
pub struct TtsSpeechRequest {
pub text: String,
#[serde(default)]
pub voice: Option<String>,
/// Audio container, e.g. `"mp3"` (default) or `"wav"`.
#[serde(default)]
pub format: Option<String>,
/// Chatterbox knobs (clamped server-side). exaggeration 0.252.0 (emotion),
/// cfg_weight 0.01.0 (pace; ~0.3 for fast speakers, 0 to neutralize a
/// reference accent), temperature 0.055.0 (randomness).
#[serde(default)]
pub exaggeration: Option<f32>,
#[serde(default)]
pub cfg_weight: Option<f32>,
#[serde(default)]
pub temperature: Option<f32>,
}
#[derive(Debug, Serialize)]
pub struct TtsSpeechResponse {
pub audio_base64: String,
pub format: String,
}
/// POST /tts/speech — synthesize `text` (optionally in a named `voice`) and
/// return base64-encoded audio for `data:` URI playback on the client.
#[post("/tts/speech")]
pub async fn tts_speech_handler(
http_request: HttpRequest,
_claims: Claims,
req: web::Json<TtsSpeechRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
let parent_context = extract_context_from_request(&http_request);
let mut span = global_tracer().start_with_context("http.tts.speech", &parent_context);
let text = prepare_for_tts(&req.text);
if text.is_empty() {
span.set_status(Status::error("text is required"));
return HttpResponse::BadRequest().json(json!({ "error": "text is required" }));
}
let Some(client) = app_state.llamacpp.as_ref() else {
span.set_status(Status::error("tts backend not configured"));
return HttpResponse::ServiceUnavailable()
.json(json!({ "error": "TTS backend not configured (set LLAMA_SWAP_URL)" }));
};
let format = req
.format
.as_deref()
.filter(|s| !s.is_empty())
.unwrap_or("mp3");
let dv = default_voice();
let voice = req
.voice
.as_deref()
.filter(|s| !s.is_empty())
.or(dv.as_deref());
span.set_attribute(KeyValue::new("tts.model", client.tts_model.clone()));
span.set_attribute(KeyValue::new("tts.format", format.to_string()));
span.set_attribute(KeyValue::new("tts.has_voice", voice.is_some()));
span.set_attribute(KeyValue::new("tts.text_len", text.len() as i64));
// Clamp generation knobs to Chatterbox's documented ranges before forwarding.
let exaggeration = req.exaggeration.map(|x| x.clamp(0.25, 2.0));
let cfg_weight = req.cfg_weight.map(|x| x.clamp(0.0, 1.0));
let temperature = req.temperature.map(|x| x.clamp(0.05, 5.0));
// One synthesis at a time (see TTS_PERMIT) — fast-fail when busy.
let Ok(_permit) = TTS_PERMIT.try_acquire() else {
span.set_status(Status::error("tts busy"));
return HttpResponse::TooManyRequests().json(json!({
"error": "TTS is busy with another request — try again shortly"
}));
};
// Wait for the LLM side to release the GPU before sending — the synthesis
// timeout starts at send, not here (see ai::gpu).
let _gpu = crate::ai::gpu::tts_lease().await;
match client
.text_to_speech(&text, voice, format, exaggeration, cfg_weight, temperature)
.await
{
Ok(bytes) => {
span.set_attribute(KeyValue::new("tts.audio_bytes", bytes.len() as i64));
span.set_status(Status::Ok);
let audio_base64 = base64::engine::general_purpose::STANDARD.encode(&bytes);
HttpResponse::Ok().json(TtsSpeechResponse {
audio_base64,
format: format.to_string(),
})
}
Err(e) => {
span.set_status(Status::error("tts synthesis failed"));
log::error!("TTS synth failed: {:?}", e);
HttpResponse::BadGateway().json(json!({ "error": format!("TTS failed: {e}") }))
}
}
}
#[derive(Debug, Serialize)]
pub struct TtsJobCreatedResponse {
pub job_id: String,
pub status: TtsJobStatus,
}
#[derive(Debug, Serialize)]
pub struct TtsJobStatusResponse {
pub job_id: String,
pub status: TtsJobStatus,
pub format: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub audio_base64: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
/// POST /tts/speech/jobs — durable variant of /tts/speech for long syntheses.
/// Returns 202 + a job id immediately; the synth queues on the single GPU
/// permit (instead of fast-failing 429) and the client polls the job until
/// the audio is ready.
#[post("/tts/speech/jobs")]
pub async fn create_speech_job_handler(
http_request: HttpRequest,
_claims: Claims,
req: web::Json<TtsSpeechRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
let parent_context = extract_context_from_request(&http_request);
let mut span =
global_tracer().start_with_context("http.tts.speech_job.create", &parent_context);
let text = prepare_for_tts(&req.text);
if text.is_empty() {
span.set_status(Status::error("text is required"));
return HttpResponse::BadRequest().json(json!({ "error": "text is required" }));
}
if app_state.llamacpp.is_none() {
span.set_status(Status::error("tts backend not configured"));
return HttpResponse::ServiceUnavailable()
.json(json!({ "error": "TTS backend not configured (set LLAMA_SWAP_URL)" }));
}
let format = req
.format
.as_deref()
.filter(|s| !s.is_empty())
.unwrap_or("mp3")
.to_string();
let voice = req
.voice
.clone()
.filter(|s| !s.is_empty())
.or_else(default_voice);
// Clamp generation knobs to Chatterbox's documented ranges before forwarding.
let exaggeration = req.exaggeration.map(|x| x.clamp(0.25, 2.0));
let cfg_weight = req.cfg_weight.map(|x| x.clamp(0.0, 1.0));
let temperature = req.temperature.map(|x| x.clamp(0.05, 5.0));
span.set_attribute(KeyValue::new("tts.format", format.clone()));
span.set_attribute(KeyValue::new("tts.has_voice", voice.is_some()));
span.set_attribute(KeyValue::new("tts.text_len", text.len() as i64));
let job_id = Uuid::new_v4();
{
let mut jobs = TTS_JOBS.lock().unwrap();
sweep_stale_jobs(&mut jobs, Instant::now());
jobs.insert(
job_id,
TtsJob {
status: TtsJobStatus::Queued,
format: format.clone(),
audio_base64: None,
error: None,
created_at: Instant::now(),
finished_at: None,
abort: None,
},
);
}
let state = app_state.clone();
let handle = tokio::spawn(async move {
// Queue rather than fast-fail: jobs wait their turn for the GPU.
let _permit = match TTS_PERMIT.acquire().await {
Ok(p) => p,
Err(_) => {
finish_job(
job_id,
TtsJobStatus::Error,
None,
Some("TTS queue closed".to_string()),
);
return;
}
};
// Wait for the LLM side to release the GPU too (see ai::gpu) — only
// then does the job count as running. The synthesis timeout starts at
// the HTTP send below, so neither wait burns it, and the client can
// anchor its own deadline to the queued→running transition.
let _gpu = crate::ai::gpu::tts_lease().await;
// Cancelled while queued — release the permits without synthesizing.
let cancelled = with_job(job_id, |job| {
if job.status == TtsJobStatus::Queued {
job.status = TtsJobStatus::Running;
false
} else {
true
}
})
.unwrap_or(true);
if cancelled {
return;
}
let Some(client) = state.llamacpp.as_ref() else {
finish_job(
job_id,
TtsJobStatus::Error,
None,
Some("TTS backend not configured".to_string()),
);
return;
};
match client
.text_to_speech(
&text,
voice.as_deref(),
&format,
exaggeration,
cfg_weight,
temperature,
)
.await
{
Ok(bytes) => {
let audio = base64::engine::general_purpose::STANDARD.encode(&bytes);
finish_job(job_id, TtsJobStatus::Done, Some(audio), None);
}
Err(e) => {
log::error!("TTS job {job_id} failed: {:?}", e);
finish_job(
job_id,
TtsJobStatus::Error,
None,
Some(format!("TTS failed: {e}")),
);
}
}
});
// Aborting an already-finished task is a no-op, so this late install is
// safe even if the job raced to completion.
with_job(job_id, |job| {
if !job.status.is_terminal() {
job.abort = Some(handle.abort_handle());
}
});
span.set_status(Status::Ok);
HttpResponse::Accepted().json(TtsJobCreatedResponse {
job_id: job_id.to_string(),
status: TtsJobStatus::Queued,
})
}
/// GET /tts/speech/jobs/{id} — poll a speech job; returns the audio once done.
/// 404s after the job expires (results are kept ~10 min past completion).
#[get("/tts/speech/jobs/{id}")]
pub async fn speech_job_status_handler(
http_request: HttpRequest,
_claims: Claims,
path: web::Path<String>,
) -> impl Responder {
let parent_context = extract_context_from_request(&http_request);
let mut span =
global_tracer().start_with_context("http.tts.speech_job.status", &parent_context);
let Ok(id) = Uuid::parse_str(&path.into_inner()) else {
span.set_status(Status::error("invalid job id"));
return HttpResponse::BadRequest().json(json!({ "error": "invalid job id" }));
};
let resp = {
let jobs = TTS_JOBS.lock().unwrap();
jobs.get(&id).map(|job| TtsJobStatusResponse {
job_id: id.to_string(),
status: job.status,
format: job.format.clone(),
audio_base64: job.audio_base64.clone(),
error: job.error.clone(),
})
};
match resp {
Some(r) => {
span.set_status(Status::Ok);
HttpResponse::Ok().json(r)
}
None => {
span.set_status(Status::error("job not found"));
HttpResponse::NotFound()
.json(json!({ "error": "TTS job not found (it may have expired)" }))
}
}
}
/// DELETE /tts/speech/jobs/{id} — cancel a queued/running speech job. Once the
/// upstream GPU job has started it can't be interrupted (same wrapper
/// limitation as the sync path); cancelling stops the wait and discards the
/// result. Cancelling an already-finished job leaves it terminal.
#[delete("/tts/speech/jobs/{id}")]
pub async fn cancel_speech_job_handler(
http_request: HttpRequest,
_claims: Claims,
path: web::Path<String>,
) -> impl Responder {
let parent_context = extract_context_from_request(&http_request);
let mut span =
global_tracer().start_with_context("http.tts.speech_job.cancel", &parent_context);
let Ok(id) = Uuid::parse_str(&path.into_inner()) else {
span.set_status(Status::error("invalid job id"));
return HttpResponse::BadRequest().json(json!({ "error": "invalid job id" }));
};
let status = with_job(id, |job| {
if !job.status.is_terminal() {
if let Some(h) = job.abort.take() {
h.abort();
}
job.status = TtsJobStatus::Cancelled;
job.finished_at = Some(Instant::now());
}
job.status
});
match status {
Some(s) => {
span.set_status(Status::Ok);
HttpResponse::Ok().json(json!({ "job_id": id.to_string(), "status": s }))
}
None => {
span.set_status(Status::error("job not found"));
HttpResponse::NotFound()
.json(json!({ "error": "TTS job not found (it may have expired)" }))
}
}
}
#[derive(Debug, Deserialize)]
pub struct ListVoicesQuery {
/// `?refresh=1` bypasses the voice-list cache and re-queries upstream
/// (which may spin up the TTS model).
#[serde(default)]
pub refresh: Option<String>,
}
/// GET /tts/voices — list the Chatterbox voice library. Served from an
/// in-memory cache when possible so browsing settings doesn't make llama-swap
/// load the TTS model (and evict the resident LLM); see VOICES_CACHE.
#[get("/tts/voices")]
pub async fn list_voices_handler(
http_request: HttpRequest,
_claims: Claims,
query: web::Query<ListVoicesQuery>,
app_state: web::Data<AppState>,
) -> impl Responder {
let parent_context = extract_context_from_request(&http_request);
let mut span = global_tracer().start_with_context("http.tts.voices.list", &parent_context);
let force = query
.refresh
.as_deref()
.is_some_and(|v| matches!(v, "1" | "true" | "yes"));
if !force && let Some(v) = cached_voices() {
span.set_attribute(KeyValue::new("tts.voices_cache_hit", true));
span.set_status(Status::Ok);
return HttpResponse::Ok().json(v);
}
let Some(client) = app_state.llamacpp.as_ref() else {
span.set_status(Status::error("tts backend not configured"));
return HttpResponse::ServiceUnavailable()
.json(json!({ "error": "TTS backend not configured" }));
};
match client.list_voices().await {
Ok(v) => {
store_voices_cache(&v);
span.set_attribute(KeyValue::new("tts.voices_cache_hit", false));
span.set_status(Status::Ok);
HttpResponse::Ok().json(v)
}
Err(e) => {
span.set_status(Status::error("list_voices failed"));
log::error!("list_voices failed: {:?}", e);
HttpResponse::BadGateway().json(json!({ "error": format!("{e}") }))
}
}
}
/// DELETE /tts/voices/{name} — remove a cloned voice from the library.
#[delete("/tts/voices/{name}")]
pub async fn delete_voice_handler(
http_request: HttpRequest,
_claims: Claims,
path: web::Path<String>,
app_state: web::Data<AppState>,
) -> impl Responder {
let parent_context = extract_context_from_request(&http_request);
let mut span = global_tracer().start_with_context("http.tts.voices.delete", &parent_context);
let Some(client) = app_state.llamacpp.as_ref() else {
span.set_status(Status::error("tts backend not configured"));
return HttpResponse::ServiceUnavailable()
.json(json!({ "error": "TTS backend not configured" }));
};
// Same charset rule as creation — a name that sanitizes differently was
// never a voice we created, and must not reach the upstream URL.
let raw = path.into_inner();
let name = match sanitize_voice_name(&raw) {
Some(n) if n == raw => n,
_ => {
span.set_status(Status::error("invalid voice name"));
return HttpResponse::BadRequest().json(json!({ "error": "invalid voice name" }));
}
};
span.set_attribute(KeyValue::new("tts.voice_name", name.clone()));
match client.delete_voice(&name).await {
Ok(v) => {
invalidate_voices_cache();
span.set_status(Status::Ok);
HttpResponse::Ok().json(v)
}
Err(e) => {
span.set_status(Status::error("delete_voice failed"));
log::error!("delete_voice failed: {:?}", e);
HttpResponse::BadGateway().json(json!({ "error": format!("{e}") }))
}
}
}
/// POST /tts/voices/upload — register a cloned voice from an uploaded audio
/// clip. Multipart fields: `voice_name` (text) + a file part (`voice_file`),
/// plus optional `start_seconds` / `duration_seconds` (text) selecting which
/// window of a longer recording becomes the reference clip.
#[post("/tts/voices/upload")]
pub async fn create_voice_upload_handler(
http_request: HttpRequest,
_claims: Claims,
mut payload: Multipart,
app_state: web::Data<AppState>,
) -> impl Responder {
let parent_context = extract_context_from_request(&http_request);
let mut span = global_tracer().start_with_context("http.tts.voices.upload", &parent_context);
let Some(client) = app_state.llamacpp.as_ref() else {
span.set_status(Status::error("tts backend not configured"));
return HttpResponse::ServiceUnavailable()
.json(json!({ "error": "TTS backend not configured" }));
};
let mut voice_name: Option<String> = None;
let mut start_field: Option<String> = None;
let mut duration_field: Option<String> = None;
let mut file_bytes = BytesMut::new();
let mut filename = "voice.wav".to_string();
while let Some(Ok(mut part)) = payload.next().await {
// Capture disposition fields up front so the immutable borrow ends
// before we mutably stream the part body (mirrors handlers/image.rs).
let (fname_opt, name_opt) = {
let cd = part.content_disposition();
(
cd.and_then(|c| c.get_filename()).map(|s| s.to_string()),
cd.and_then(|c| c.get_name()).map(|s| s.to_string()),
)
};
if let Some(fname) = fname_opt {
filename = fname;
while let Some(Ok(data)) = part.next().await {
if file_bytes.len() + data.len() > MAX_VOICE_UPLOAD_BYTES {
span.set_status(Status::error("voice clip exceeds limit"));
return HttpResponse::PayloadTooLarge()
.json(json!({ "error": "voice clip exceeds 25 MB" }));
}
file_bytes.put(data);
}
} else if matches!(
name_opt.as_deref(),
Some("voice_name" | "start_seconds" | "duration_seconds")
) {
let field = name_opt.as_deref().unwrap().to_string();
let mut buf = BytesMut::new();
while let Some(Ok(data)) = part.next().await {
buf.put(data);
}
let text = String::from_utf8_lossy(&buf).trim().to_string();
match field.as_str() {
"voice_name" => voice_name = Some(text),
"start_seconds" => start_field = Some(text),
_ => duration_field = Some(text),
}
} else {
while let Some(Ok(_)) = part.next().await {}
}
}
// Empty text parts are treated as absent; anything else must parse, so a
// client bug ("abc") fails loudly instead of silently cloning from 0s.
let parse_secs = |field: Option<&String>, name: &str| -> Result<Option<f64>, String> {
match field.map(|s| s.as_str()).filter(|s| !s.is_empty()) {
None => Ok(None),
Some(s) => s
.parse::<f64>()
.map(Some)
.map_err(|_| format!("{name} must be a number of seconds")),
}
};
let window = parse_secs(start_field.as_ref(), "start_seconds").and_then(|start| {
parse_secs(duration_field.as_ref(), "duration_seconds")
.and_then(|dur| resolve_ref_window(start, dur))
});
let (ref_start, ref_duration) = match window {
Ok(w) => w,
Err(msg) => {
span.set_status(Status::error("invalid reference window"));
return HttpResponse::BadRequest().json(json!({ "error": msg }));
}
};
let Some(name) = voice_name.as_deref().and_then(sanitize_voice_name) else {
span.set_status(Status::error("voice_name is required"));
return HttpResponse::BadRequest()
.json(json!({ "error": "voice_name is required (alphanumerics, - and _ only)" }));
};
// Tag the name with the ref-clip length (e.g. `grandma-30s`) so the
// library shows which reference length produced each clone.
let name = append_ref_window(&name, ref_start, ref_duration.round().max(1.0) as u32);
if file_bytes.is_empty() {
span.set_status(Status::error("voice_file is required"));
return HttpResponse::BadRequest().json(json!({ "error": "voice_file is required" }));
}
span.set_attribute(KeyValue::new("tts.voice_name", name.clone()));
span.set_attribute(KeyValue::new("tts.upload_bytes", file_bytes.len() as i64));
// Normalize to WAV so any device format (e.g. .aac / .opus, which Chatterbox
// rejects by extension) is accepted.
let src_ext = Path::new(&filename).extension().and_then(|e| e.to_str());
let wav =
match transcode_bytes_to_wav(file_bytes.as_ref(), src_ext, ref_start, ref_duration).await {
Ok(w) => w,
Err(e) => {
span.set_status(Status::error("audio decode failed"));
log::error!("voice upload transcode failed: {:?}", e);
return HttpResponse::BadRequest()
.json(json!({ "error": "couldn't decode that audio file" }));
}
};
match client
.create_voice(&name, wav, "reference.wav", "audio/wav")
.await
{
Ok(v) => {
invalidate_voices_cache();
span.set_status(Status::Ok);
HttpResponse::Ok().json(v)
}
Err(e) => {
span.set_status(Status::error("create_voice failed"));
log::error!("create_voice (upload) failed: {:?}", e);
HttpResponse::BadGateway().json(json!({ "error": format!("{e}") }))
}
}
}
#[derive(Debug, Deserialize)]
pub struct CreateVoiceFromLibraryRequest {
pub voice_name: String,
/// Library-relative path to an audio or video file.
pub path: String,
#[serde(default)]
pub library: Option<String>,
/// Offset into the source where the reference window begins (default 0) —
/// lets the client pick the clean-speech section of a long recording.
#[serde(default)]
pub start_seconds: Option<f64>,
/// Reference window length; clamped to LLAMA_SWAP_TTS_REF_SECONDS.
#[serde(default)]
pub duration_seconds: Option<f64>,
}
/// POST /tts/voices/from-library — register a cloned voice from a file already
/// in a library. Audio and video alike are ffmpeg-normalized to a mono 24 kHz
/// WAV reference clip (window selected by start/duration_seconds, length
/// capped by LLAMA_SWAP_TTS_REF_SECONDS).
#[post("/tts/voices/from-library")]
pub async fn create_voice_from_library_handler(
http_request: HttpRequest,
_claims: Claims,
req: web::Json<CreateVoiceFromLibraryRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
let parent_context = extract_context_from_request(&http_request);
let mut span =
global_tracer().start_with_context("http.tts.voices.from_library", &parent_context);
let Some(client) = app_state.llamacpp.as_ref() else {
span.set_status(Status::error("tts backend not configured"));
return HttpResponse::ServiceUnavailable()
.json(json!({ "error": "TTS backend not configured" }));
};
let Some(voice_name) = sanitize_voice_name(&req.voice_name) else {
span.set_status(Status::error("voice_name is required"));
return HttpResponse::BadRequest()
.json(json!({ "error": "voice_name is required (alphanumerics, - and _ only)" }));
};
let (ref_start, ref_duration) =
match resolve_ref_window(req.start_seconds, req.duration_seconds) {
Ok(w) => w,
Err(msg) => {
span.set_status(Status::error("invalid reference window"));
return HttpResponse::BadRequest().json(json!({ "error": msg }));
}
};
// Tag the name with the ref-clip length (e.g. `grandma-30s`) so the
// library shows which reference length produced each clone.
let voice_name =
append_ref_window(&voice_name, ref_start, ref_duration.round().max(1.0) as u32);
let library = match libraries::resolve_library_param(&app_state, req.library.as_deref()) {
Ok(Some(l)) => l,
Ok(None) => app_state.primary_library(),
Err(msg) => {
span.set_status(Status::error("invalid library"));
return HttpResponse::BadRequest().json(json!({ "error": msg }));
}
};
// is_valid_full_path confines the path to the library root (no traversal).
let abs = match is_valid_full_path(&library.root_path, &req.path, false) {
Some(p) if p.exists() => p,
_ => {
span.set_status(Status::error("file not found"));
return HttpResponse::NotFound().json(json!({ "error": "file not found in library" }));
}
};
// Only real audio/video sources are valid voice references — refuse to
// slurp arbitrary library files into memory / ffmpeg.
if !is_audio_file(&abs) && !is_video_file(&abs) {
span.set_status(Status::error("not an audio/video file"));
return HttpResponse::BadRequest()
.json(json!({ "error": "file is not an audio or video file" }));
}
span.set_attribute(KeyValue::new("tts.voice_name", voice_name.clone()));
let wav = match prepare_reference_audio(&abs, ref_start, ref_duration).await {
Ok(b) => b,
Err(e) => {
span.set_status(Status::error("audio decode failed"));
log::error!("voice reference prep failed for {:?}: {:?}", abs, e);
return HttpResponse::BadRequest()
.json(json!({ "error": "couldn't decode that file's audio" }));
}
};
match client
.create_voice(&voice_name, wav, "reference.wav", "audio/wav")
.await
{
Ok(v) => {
invalidate_voices_cache();
span.set_status(Status::Ok);
HttpResponse::Ok().json(v)
}
Err(e) => {
span.set_status(Status::error("create_voice failed"));
log::error!("create_voice (from-library) failed: {:?}", e);
HttpResponse::BadGateway().json(json!({ "error": format!("{e}") }))
}
}
}
/// Read a library file (audio or video) as a Chatterbox-ready reference: ffmpeg
/// decodes/extracts its audio to mono 24 kHz WAV. Reading straight from the
/// library path avoids slurping a (possibly large) video into memory.
async fn prepare_reference_audio(abs: &Path, start: f64, duration: f64) -> anyhow::Result<Vec<u8>> {
run_ffmpeg_to_wav(&abs.to_string_lossy(), start, duration).await
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sanitize_voice_name_keeps_safe_chars() {
assert_eq!(sanitize_voice_name("m").as_deref(), Some("m"));
assert_eq!(
sanitize_voice_name(" Cameron ").as_deref(),
Some("Cameron")
);
assert_eq!(
sanitize_voice_name("voice_01-a").as_deref(),
Some("voice_01-a")
);
}
#[test]
fn sanitize_voice_name_strips_unsafe_chars() {
// Path separators / dots / spaces become '-' and are trimmed at edges.
assert_eq!(sanitize_voice_name("a b.c").as_deref(), Some("a-b-c"));
assert_eq!(
sanitize_voice_name("../etc/passwd").as_deref(),
Some("etc-passwd")
);
}
#[test]
fn sanitize_voice_name_rejects_empty_or_all_unsafe() {
assert_eq!(sanitize_voice_name(""), None);
assert_eq!(sanitize_voice_name(" "), None);
assert_eq!(sanitize_voice_name("../../"), None);
assert_eq!(sanitize_voice_name("...."), None);
}
#[test]
fn sanitize_voice_name_bounds_length() {
let long = "a".repeat(200);
assert_eq!(sanitize_voice_name(&long).unwrap().len(), 64);
}
#[test]
fn append_ref_window_tags_name() {
assert_eq!(append_ref_window("grandma", 0.0, 30), "grandma-30s");
assert_eq!(append_ref_window("voice_01", 0.0, 15), "voice_01-15s");
}
#[test]
fn append_ref_window_includes_nonzero_start() {
// Sub-minute starts stay in seconds; longer ones read as XmYYs since
// ':' isn't allowed in voice names.
assert_eq!(append_ref_window("grandma", 45.0, 30), "grandma-at45s-30s");
assert_eq!(
append_ref_window("grandma", 92.4, 30),
"grandma-at1m32s-30s"
);
assert_eq!(
append_ref_window("grandma", 600.0, 12),
"grandma-at10m00s-12s"
);
// A start that rounds to zero is "from the start".
assert_eq!(append_ref_window("grandma", 0.3, 30), "grandma-30s");
}
#[test]
fn append_ref_window_is_idempotent_for_same_window() {
assert_eq!(append_ref_window("grandma-30s", 0.0, 30), "grandma-30s");
assert_eq!(
append_ref_window("grandma-at45s-30s", 45.0, 30),
"grandma-at45s-30s"
);
// A different window still appends — that's the comparison use-case.
assert_eq!(append_ref_window("grandma-15s", 0.0, 30), "grandma-15s-30s");
assert_eq!(
append_ref_window("grandma-30s", 45.0, 30),
"grandma-30s-at45s-30s"
);
}
#[test]
fn append_ref_window_keeps_64_char_bound() {
let long = "a".repeat(64);
let tagged = append_ref_window(&long, 0.0, 30);
assert_eq!(tagged.len(), 64);
assert!(tagged.ends_with("-30s"));
let tagged = append_ref_window(&long, 92.0, 30);
assert_eq!(tagged.len(), 64);
assert!(tagged.ends_with("-at1m32s-30s"));
}
#[test]
fn resolve_ref_window_defaults_to_start_of_clip_at_cap_length() {
// Reads the live cap rather than mutating LLAMA_SWAP_TTS_REF_SECONDS:
// env mutation flakes under the parallel suite (see env_dispatch).
let cap = f64::from(tts_ref_seconds());
assert_eq!(resolve_ref_window(None, None), Ok((0.0, cap)));
}
#[test]
fn resolve_ref_window_accepts_offset_and_clamps_duration() {
let cap = f64::from(tts_ref_seconds());
assert_eq!(resolve_ref_window(Some(92.5), None), Ok((92.5, cap)));
assert_eq!(resolve_ref_window(Some(10.0), Some(12.0)), Ok((10.0, 12.0)));
// Longer-than-cap windows are bounded, not rejected.
assert_eq!(resolve_ref_window(None, Some(cap + 100.0)), Ok((0.0, cap)));
}
#[test]
fn resolve_ref_window_rejects_garbage() {
assert!(resolve_ref_window(Some(-1.0), None).is_err());
assert!(resolve_ref_window(Some(f64::NAN), None).is_err());
assert!(resolve_ref_window(Some(f64::INFINITY), None).is_err());
assert!(resolve_ref_window(None, Some(0.0)).is_err());
assert!(resolve_ref_window(None, Some(-5.0)).is_err());
assert!(resolve_ref_window(None, Some(f64::NAN)).is_err());
}
#[test]
fn sweep_drops_expired_results_and_keeps_live_jobs() {
let now = Instant::now();
let mk = |status: TtsJobStatus, created: Instant, finished: Option<Instant>| TtsJob {
status,
format: "mp3".into(),
audio_base64: None,
error: None,
created_at: created,
finished_at: finished,
abort: None,
};
let mut jobs = HashMap::new();
let live = Uuid::new_v4();
let fresh_done = Uuid::new_v4();
let stale_done = Uuid::new_v4();
jobs.insert(live, mk(TtsJobStatus::Running, now, None));
jobs.insert(
fresh_done,
mk(TtsJobStatus::Done, now, Some(now - Duration::from_secs(60))),
);
jobs.insert(
stale_done,
mk(
TtsJobStatus::Done,
now - TTS_JOB_MAX_AGE / 2,
Some(now - TTS_JOB_RESULT_TTL),
),
);
sweep_stale_jobs(&mut jobs, now);
assert!(jobs.contains_key(&live));
assert!(jobs.contains_key(&fresh_done));
assert!(!jobs.contains_key(&stale_done));
}
#[test]
fn sweep_drops_jobs_past_max_age_even_if_unfinished() {
let now = Instant::now();
let mut jobs = HashMap::new();
let ancient = Uuid::new_v4();
jobs.insert(
ancient,
TtsJob {
status: TtsJobStatus::Running,
format: "mp3".into(),
audio_base64: None,
error: None,
created_at: now - TTS_JOB_MAX_AGE,
finished_at: None,
abort: None,
},
);
sweep_stale_jobs(&mut jobs, now);
assert!(jobs.is_empty());
}
#[test]
fn voices_cache_roundtrip_and_invalidation() {
invalidate_voices_cache();
assert!(cached_voices().is_none());
let v = json!({ "voices": [{ "name": "m-30s" }], "count": 1 });
store_voices_cache(&v);
assert_eq!(cached_voices(), Some(v));
invalidate_voices_cache();
assert!(cached_voices().is_none());
}
#[test]
fn clean_for_tts_strips_markdown() {
assert_eq!(
clean_for_tts("**Bold** and _italic_ and `code`"),
"Bold and italic and code"
);
assert_eq!(clean_for_tts("# Title\n\nbody"), "Title\nbody");
assert_eq!(
clean_for_tts("See [docs](http://x.com) now"),
"See docs now"
);
assert_eq!(clean_for_tts("- one\n- two"), "one\ntwo");
}
#[test]
fn clean_for_tts_strips_emoji_and_urls() {
assert_eq!(clean_for_tts("Hello 😀 world 🎉"), "Hello world");
assert_eq!(
clean_for_tts("visit https://example.com today"),
"visit today"
);
// ZWJ-glued emoji sequence is fully removed.
assert_eq!(clean_for_tts("family 👨‍👩‍👧 photo"), "family photo");
}
#[test]
fn clean_for_tts_collapses_blank_lines_to_single_break() {
// Chatterbox pauses (sometimes ~20s) per blank line, so paragraph
// breaks must collapse to a single newline.
assert_eq!(clean_for_tts("para one\n\npara two"), "para one\npara two");
assert_eq!(clean_for_tts("a\n\n\n\nb"), "a\nb");
// Whitespace-only "blank" lines collapse too.
assert_eq!(clean_for_tts("a\n \t \nb"), "a\nb");
// A single newline is left alone.
assert_eq!(clean_for_tts("a\nb"), "a\nb");
}
#[test]
fn clean_for_tts_preserves_bracket_tags() {
// Non-turbo Chatterbox ignores these; a future Turbo uses them as
// paralinguistic cues — so we must not strip them.
assert_eq!(clean_for_tts("hello [laugh] there"), "hello [laugh] there");
}
}