Add GPU lease coordinating LLM and TTS requests through llama-swap

llama-swap runs chat/vision/Chatterbox as a mutually-exclusive set on
one GPU and HOLDS a request for a non-resident model until the resident
model drains, then swaps. That hold burned the holder's reqwest timeout
(measured: a queued TTS lost 77s behind one LLM turn; an LLM request
behind a synthesis waited the entire remaining synth), so concurrent
insight + read-aloud timed out instead of queueing.

ai::gpu adds a fair RwLock lease acquired before each request is sent,
so cross-model waits happen before the HTTP timeout starts: chat/vision
share the read lease, TTS synthesis and voice-library ops (which spin
Chatterbox up) take the write lease, and embeddings take none (the
embed slot is in llama-swap's always-resident group). Speech jobs now
flip queued->running only after acquiring the GPU, letting the client
anchor its poll deadline to that transition.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Cameron Cordes
2026-06-11 18:20:06 -04:00
parent 03699f7413
commit 0accc4ef2f
4 changed files with 125 additions and 1 deletions
+88
View File
@@ -0,0 +1,88 @@
// GPU lease — in-process coordination for llama-swap model contention.
//
// llama-swap runs the heavyweight models (chat / vision / Chatterbox TTS) as
// a mutually-exclusive set on one GPU (matrix DSL `(q27 | … | tts) & e`): a
// request for a non-resident model is HELD by llama-swap until the resident
// model's in-flight requests drain, then the models swap. That hold counts
// against the *holder's* reqwest timeout — measured live: a queued TTS burned
// 77s of its budget behind a single LLM turn, and an LLM request behind a
// running synthesis waited the entire remaining synth. Uncoordinated
// cross-model traffic therefore times out instead of queueing.
//
// The lease moves that wait into this process, BEFORE the HTTP request is
// sent and before its timeout starts:
// - chat/vision requests (the LLM-side slots) share the READ lease;
// - TTS synthesis and voice-library ops (anything that spins Chatterbox up
// and evicts the LLM) take the WRITE lease;
// - embeddings take NO lease: the `embed` slot is in llama-swap's
// always-resident group (the `& e` term) and never participates in a swap,
// so leasing it would only stall searches behind a queued synthesis.
//
// tokio's RwLock is fair (FIFO, write-preferring): a queued TTS gets the GPU
// right after the current LLM request drains, and later LLM requests queue
// behind it — bounded waits in both directions, no starvation, no timeout
// budget burned while waiting.
//
// RULES: hold a lease for exactly one HTTP request (for streaming, the
// stream's lifetime) and NEVER acquire one while already holding one — once a
// writer is queued, new read acquisitions block, so nested acquisition can
// deadlock.
use std::sync::LazyLock;
use std::time::Instant;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
static GPU_LEASE: LazyLock<RwLock<()>> = LazyLock::new(|| RwLock::new(()));
/// Waits longer than this are logged — they mean a cross-model swap was
/// avoided and quantify what the request *would* have burned of its timeout.
const SLOW_WAIT_LOG_SECS: f64 = 2.0;
/// Shared lease for LLM-side requests (chat / vision slots).
pub async fn llm_lease() -> RwLockReadGuard<'static, ()> {
let started = Instant::now();
let guard = GPU_LEASE.read().await;
log_slow_wait("llm", started);
guard
}
/// Exclusive lease for TTS-side requests (speech synthesis + voice-library
/// ops that spin up Chatterbox).
pub async fn tts_lease() -> RwLockWriteGuard<'static, ()> {
let started = Instant::now();
let guard = GPU_LEASE.write().await;
log_slow_wait("tts", started);
guard
}
fn log_slow_wait(kind: &str, started: Instant) {
let waited = started.elapsed().as_secs_f64();
if waited > SLOW_WAIT_LOG_SECS {
log::info!("GPU lease ({kind}): waited {waited:.1}s for the other model class to drain");
}
}
#[cfg(test)]
mod tests {
use super::*;
// One sequential test, not several: the lease is a single global, so
// parallel tests interleaving reads and writes on it can hit the very
// nested-acquisition deadlock the module comment warns about.
#[tokio::test]
async fn write_lease_excludes_readers_then_reads_share() {
let w = tts_lease().await;
// A reader must not acquire while the writer is held.
let pending = tokio::spawn(async { drop(llm_lease().await) });
tokio::task::yield_now().await;
assert!(!pending.is_finished());
drop(w);
pending.await.expect("reader acquires after writer drops");
// With no writer queued, read leases are shared.
let a = llm_lease().await;
let b = llm_lease().await;
drop(a);
drop(b);
}
}
+25
View File
@@ -142,6 +142,11 @@ impl LlamaCppClient {
/// Chatterbox generation knobs are forwarded when set (caller is expected /// Chatterbox generation knobs are forwarded when set (caller is expected
/// to have range-clamped them): `exaggeration` (0.252.0, emotion), /// to have range-clamped them): `exaggeration` (0.252.0, emotion),
/// `cfg_weight` (0.01.0, pace), `temperature` (0.055.0, randomness). /// `cfg_weight` (0.01.0, pace), `temperature` (0.055.0, randomness).
///
/// Callers must hold the GPU write lease (`ai::gpu::tts_lease`) across
/// this call. It is taken at the call sites in `ai::tts` rather than here
/// so the speech-job path can flip its job to `running` between acquiring
/// the GPU and sending the request.
pub async fn text_to_speech( pub async fn text_to_speech(
&self, &self,
input: &str, input: &str,
@@ -204,6 +209,9 @@ impl LlamaCppClient {
/// List voices in the Chatterbox voice library (raw JSON passthrough). /// List voices in the Chatterbox voice library (raw JSON passthrough).
pub async fn list_voices(&self) -> Result<Value> { pub async fn list_voices(&self) -> Result<Value> {
let url = format!("{}/upstream/{}/voices", self.swap_root(), self.tts_model); let url = format!("{}/upstream/{}/voices", self.swap_root(), self.tts_model);
// The /upstream passthrough spins Chatterbox up (evicting the LLM),
// so it takes the exclusive GPU lease like synthesis does.
let _gpu = crate::ai::gpu::tts_lease().await;
let resp = self let resp = self
.client .client
.get(&url) .get(&url)
@@ -237,6 +245,9 @@ impl LlamaCppClient {
.text("voice_name", voice_name.to_string()) .text("voice_name", voice_name.to_string())
.part("voice_file", part); .part("voice_file", part);
// The /upstream passthrough spins Chatterbox up (evicting the LLM),
// so it takes the exclusive GPU lease like synthesis does.
let _gpu = crate::ai::gpu::tts_lease().await;
let resp = self let resp = self
.client .client
.post(&url) .post(&url)
@@ -262,6 +273,9 @@ impl LlamaCppClient {
self.tts_model, self.tts_model,
voice_name voice_name
); );
// The /upstream passthrough spins Chatterbox up (evicting the LLM),
// so it takes the exclusive GPU lease like synthesis does.
let _gpu = crate::ai::gpu::tts_lease().await;
let resp = self let resp = self
.client .client
.delete(&url) .delete(&url)
@@ -481,6 +495,9 @@ impl LlamaCppClient {
body.insert(k.into(), v); body.insert(k.into(), v);
} }
// Wait for any TTS synthesis to release the GPU before the request
// timeout starts (see ai::gpu).
let _gpu = crate::ai::gpu::llm_lease().await;
let resp = self let resp = self
.client .client
.post(&url) .post(&url)
@@ -599,6 +616,10 @@ impl LlmClient for LlamaCppClient {
body.insert(k.into(), v); body.insert(k.into(), v);
} }
// Wait for any TTS synthesis to release the GPU before the request
// timeout starts (see ai::gpu). The guard is moved into the stream
// below so the lease spans the whole generation, not just the send.
let gpu = crate::ai::gpu::llm_lease().await;
let resp = self let resp = self
.client .client
.post(&url) .post(&url)
@@ -615,6 +636,7 @@ impl LlmClient for LlamaCppClient {
let byte_stream = resp.bytes_stream(); let byte_stream = resp.bytes_stream();
let stream = async_stream::stream! { let stream = async_stream::stream! {
let _gpu = gpu;
let mut byte_stream = byte_stream; let mut byte_stream = byte_stream;
let mut buf: Vec<u8> = Vec::new(); let mut buf: Vec<u8> = Vec::new();
let mut accumulated_content = String::new(); let mut accumulated_content = String::new();
@@ -730,6 +752,9 @@ impl LlmClient for LlamaCppClient {
} }
async fn generate_embeddings(&self, texts: &[&str]) -> Result<Vec<Vec<f32>>> { async fn generate_embeddings(&self, texts: &[&str]) -> Result<Vec<Vec<f32>>> {
// Deliberately NO GPU lease: the embed slot sits in llama-swap's
// always-resident group and never participates in a model swap, so
// leasing here would only stall searches behind a queued synthesis.
let url = format!("{}/embeddings", self.base_url); let url = format!("{}/embeddings", self.base_url);
let body = json!({ let body = json!({
"model": self.embedding_model, "model": self.embedding_model,
+1
View File
@@ -3,6 +3,7 @@ pub mod backend;
pub mod clip_client; pub mod clip_client;
pub mod daily_summary_job; pub mod daily_summary_job;
pub mod face_client; pub mod face_client;
pub mod gpu;
pub mod handlers; pub mod handlers;
pub mod insight_chat; pub mod insight_chat;
pub mod insight_generator; pub mod insight_generator;
+11 -1
View File
@@ -378,6 +378,10 @@ pub async fn tts_speech_handler(
})); }));
}; };
// Wait for the LLM side to release the GPU before sending — the synthesis
// timeout starts at send, not here (see ai::gpu).
let _gpu = crate::ai::gpu::tts_lease().await;
match client match client
.text_to_speech(&text, voice, format, exaggeration, cfg_weight, temperature) .text_to_speech(&text, voice, format, exaggeration, cfg_weight, temperature)
.await .await
@@ -495,7 +499,13 @@ pub async fn create_speech_job_handler(
return; return;
} }
}; };
// Cancelled while queued — release the permit without synthesizing. // Wait for the LLM side to release the GPU too (see ai::gpu) — only
// then does the job count as running. The synthesis timeout starts at
// the HTTP send below, so neither wait burns it, and the client can
// anchor its own deadline to the queued→running transition.
let _gpu = crate::ai::gpu::tts_lease().await;
// Cancelled while queued — release the permits without synthesizing.
let cancelled = with_job(job_id, |job| { let cancelled = with_job(job_id, |job| {
if job.status == TtsJobStatus::Queued { if job.status == TtsJobStatus::Queued {
job.status = TtsJobStatus::Running; job.status = TtsJobStatus::Running;