Add reconnectable async chat-turn flow with in-memory TurnRegistry

Replace the one-shot SSE chat stream with an async dispatch + reconnectable
replay flow so the mobile client survives backgrounding, network blips, and
OS-killed sockets without losing an in-flight agentic turn.

- TurnRegistry/TurnEntry: in-memory per-turn event buffer (cap 500, front
  eviction) shared by the agentic loop (writer) and SSE replay readers.
  ReplayOutcome + replay_from/next_batch distinguish Events/CaughtUp/Gone;
  next_batch registers the Notify before reading state (no lost wakeup) and
  drains every buffered event before signaling terminal, so the final
  Done/Error is never dropped and the stream closes cleanly.
- Endpoints: POST /insights/chat/turn (202 + turn_id), GET
  /insights/chat/turn/{id} (SSE replay, ?skip_before= resume, per-event seq,
  410 on eviction), DELETE /insights/chat/turn/{id} (real task abort +
  cooperative is_running() check at each loop boundary).
- Cancellation actually stops the task (AbortHandle stored on the entry) and
  emits a Done{cancelled:true}; callers skip persistence on cancel.
- Background sweeper drops stale turns; interval clamped to <=300s.
- OpenTelemetry spans: ai.chat.turn.execute/replay/cancel.
- Legacy POST /insights/chat/stream path preserved unchanged.

Tests: registry coverage for terminal delivery (race guard), waiting, Gone,
abort, eviction; handler integration tests for 404/410, skip_before, seq
stamping, completed replay, and cancel.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Cameron Cordes
2026-05-29 19:50:25 -04:00
parent 0c1c1c6792
commit 962f7bf05c
8 changed files with 1946 additions and 17 deletions
+510 -4
View File
@@ -1,4 +1,5 @@
use actix_web::{HttpRequest, HttpResponse, Responder, delete, get, post, web};
use futures::StreamExt;
use opentelemetry::KeyValue;
use opentelemetry::trace::{Span, Status, Tracer};
use serde::{Deserialize, Serialize};
@@ -1433,7 +1434,26 @@ pub async fn chat_stream_handler(
}
fn render_sse_frame(ev: &ChatStreamEvent) -> String {
let (event_name, payload) = match ev {
let (event_name, payload) = sse_event_payload(ev);
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
format!("event: {}\ndata: {}\n\n", event_name, data)
}
/// Like `render_sse_frame`, but stamps the event's absolute sequence number
/// (`seq`) into the payload so reconnecting replay clients can compute
/// `skip_before` precisely. `seq` is distinct from the tool-pairing `index`
/// already carried by `tool_call`/`tool_result`.
fn render_indexed_frame(ev: &ChatStreamEvent, seq: u32) -> String {
let (event_name, mut payload) = sse_event_payload(ev);
if let serde_json::Value::Object(map) = &mut payload {
map.insert("seq".to_string(), serde_json::json!(seq));
}
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
format!("event: {}\ndata: {}\n\n", event_name, data)
}
fn sse_event_payload(ev: &ChatStreamEvent) -> (&'static str, serde_json::Value) {
match ev {
ChatStreamEvent::IterationStart { n, max } => {
("iteration_start", serde_json::json!({ "n": n, "max": max }))
}
@@ -1471,6 +1491,7 @@ fn render_sse_frame(ev: &ChatStreamEvent) -> String {
amended_insight_id,
backend_used,
model_used,
cancelled,
} => (
"done",
serde_json::json!({
@@ -1483,6 +1504,7 @@ fn render_sse_frame(ev: &ChatStreamEvent) -> String {
"amended_insight_id": amended_insight_id,
"backend": backend_used,
"model": model_used,
"cancelled": cancelled,
}),
),
// Apollo's frontend SSE consumer (and its free-chat backend, which
@@ -1491,7 +1513,491 @@ fn render_sse_frame(ev: &ChatStreamEvent) -> String {
// "no insight found for path") was silently dropped, leaving an
// empty assistant bubble with no clue why the turn died.
ChatStreamEvent::Error(msg) => ("error_message", serde_json::json!({ "message": msg })),
};
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
format!("event: {}\ndata: {}\n\n", event_name, data)
}
}
/// POST /insights/chat/turn — async turn dispatch. Returns turn_id immediately,
/// client then polls GET /insights/chat/turn/{turn_id} for SSE replay.
#[post("/insights/chat/turn")]
pub async fn turn_async_handler(
http_request: HttpRequest,
claims: Claims,
request: web::Json<ChatTurnHttpRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
let parent_context = extract_context_from_request(&http_request);
let tracer = global_tracer();
let mut span = tracer.start_with_context("http.insights.chat_turn_async", &parent_context);
span.set_attribute(KeyValue::new("file_path", request.file_path.clone()));
let library = match libraries::resolve_library_param(&app_state, request.library.as_deref()) {
Ok(Some(lib)) => lib,
Ok(None) => app_state.primary_library(),
Err(e) => {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": format!("invalid library: {}", e)
}));
}
};
let user_id = claims.sub.parse::<i32>().unwrap_or(1);
let chat_req = ChatTurnRequest {
library_id: library.id,
user_id,
file_path: request.file_path.clone(),
user_message: request.user_message.clone(),
model: request.model.clone(),
backend: request.backend.clone(),
num_ctx: request.num_ctx,
temperature: request.temperature,
top_p: request.top_p,
top_k: request.top_k,
min_p: request.min_p,
max_iterations: request.max_iterations,
system_prompt: request.system_prompt.clone(),
persona_id: request.persona_id.clone(),
amend: request.amend,
regenerate: request.regenerate,
};
let service = app_state.insight_chat.clone();
let registry = app_state.turn_registry.clone();
let turn_id = service.chat_turn_async(registry, chat_req).await;
span.set_attribute(KeyValue::new("turn_id", turn_id.clone()));
span.set_status(Status::Ok);
HttpResponse::Accepted().json(serde_json::json!({
"turn_id": turn_id,
"status": "running"
}))
}
/// Query params for the SSE replay stream.
#[derive(Debug, Deserialize)]
pub struct ReplayQuery {
/// Replay events from this absolute sequence number (`seq`) onward.
/// Absent or 0 replays from the beginning. On reconnect the client sends
/// the `seq` of the last event it applied, plus one.
pub skip_before: Option<u32>,
}
/// GET /insights/chat/turn/{turn_id} — SSE replay stream.
#[get("/insights/chat/turn/{turn_id}")]
pub async fn turn_replay_handler(
http_request: HttpRequest,
path: web::Path<String>,
query: web::Query<ReplayQuery>,
app_state: web::Data<AppState>,
) -> HttpResponse {
use crate::ai::turn_registry::ReplayOutcome;
let turn_id = path.into_inner();
let skip_before = query.skip_before.unwrap_or(0);
let parent_context = extract_context_from_request(&http_request);
let tracer = global_tracer();
let mut span = tracer.start_with_context("ai.chat.turn.replay", &parent_context);
span.set_attribute(KeyValue::new("turn_id", turn_id.clone()));
span.set_attribute(KeyValue::new("skip_before", skip_before as i64));
let registry = app_state.turn_registry.clone();
let entry = match registry.get(&turn_id).await {
Some(e) => e,
None => {
span.set_status(Status::error("turn not found"));
return HttpResponse::NotFound().json(serde_json::json!({
"error": format!("turn {} not found", turn_id)
}));
}
};
let info = entry.info().await;
span.set_attribute(KeyValue::new("status", info.status.as_str()));
span.set_attribute(KeyValue::new(
"event_count",
info.total_events_pushed as i64,
));
let turn_info_frame = render_turn_info_frame(&info);
// Initial buffered batch: events produced before this connection attached.
// Stamp each frame with its absolute `seq` so the client can track
// `skip_before` precisely across reconnects.
let (initial_frames, start_skip) = match entry.replay_from(skip_before).await {
ReplayOutcome::Gone => {
span.set_status(Status::error("buffer evicted"));
return HttpResponse::Gone().json(serde_json::json!({
"error": "turn history has expired (buffer evicted)"
}));
}
ReplayOutcome::CaughtUp { next_skip } => (Vec::new(), next_skip),
ReplayOutcome::Events { events, next_skip } => {
let frames: Vec<actix_web::web::Bytes> = events
.into_iter()
.enumerate()
.map(|(i, ev)| {
actix_web::web::Bytes::from(render_indexed_frame(&ev, skip_before + i as u32))
})
.collect();
(frames, next_skip)
}
};
span.set_status(Status::Ok);
let running = entry.is_running();
// Head: the `turn_info` event followed by any already-buffered events.
let head = futures::stream::once(async move {
Ok::<_, actix_web::Error>(actix_web::web::Bytes::from(turn_info_frame))
})
.chain(futures::stream::iter(
initial_frames.into_iter().map(Ok::<_, actix_web::Error>),
));
if !running {
// Completed turn: every event — including the terminal Done/Error — is
// already in the buffered batch above. Emit it and close.
return HttpResponse::Ok()
.content_type("text/event-stream")
.insert_header(("Cache-Control", "no-cache"))
.insert_header(("X-Accel-Buffering", "no"))
.streaming(head);
}
// In-progress turn: after the head, wait for new events. `next_batch`
// drains every buffered event (including the terminal one) before it
// reports the turn finished, so the final Done/Error is never dropped;
// CaughtUp then closes the stream by returning None.
let tail = futures::stream::unfold(
(
entry,
start_skip,
Vec::<actix_web::web::Bytes>::new(),
false,
),
|(entry, skip, pending, finished)| async move {
// Flush queued frames from a previous multi-event batch first.
if let Some((first, rest)) = pending.split_first() {
return Some((Ok(first.clone()), (entry, skip, rest.to_vec(), finished)));
}
if finished {
return None;
}
match entry.next_batch(skip).await {
ReplayOutcome::Events { events, next_skip } => {
let frames: Vec<actix_web::web::Bytes> = events
.into_iter()
.enumerate()
.map(|(i, ev)| {
actix_web::web::Bytes::from(render_indexed_frame(&ev, skip + i as u32))
})
.collect();
// next_batch only returns Events for a non-empty batch.
let (first, rest) = frames.split_first().expect("non-empty batch");
Some((Ok(first.clone()), (entry, next_skip, rest.to_vec(), false)))
}
// Terminal reached and fully drained — close the connection.
ReplayOutcome::CaughtUp { .. } => None,
ReplayOutcome::Gone => {
// Evicted mid-stream: emit one error frame, then close.
let gone =
actix_web::web::Bytes::from(render_sse_frame(&ChatStreamEvent::Error(
"turn history has expired (buffer evicted)".to_string(),
)));
Some((Ok(gone), (entry, skip, Vec::new(), true)))
}
}
},
);
HttpResponse::Ok()
.content_type("text/event-stream")
.insert_header(("Cache-Control", "no-cache"))
.insert_header(("X-Accel-Buffering", "no"))
.streaming(head.chain(tail))
}
fn render_turn_info_frame(info: &crate::ai::turn_registry::TurnInfo) -> String {
let payload = serde_json::json!({
"turn_id": info.turn_id,
"file_path": info.file_path,
"library_id": info.library_id,
"status": info.status.as_str(),
"total_events_pushed": info.total_events_pushed,
"buffered_count": info.buffered_count,
});
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
format!("event: turn_info\ndata: {}\n\n", data)
}
/// DELETE /insights/chat/turn/{turn_id} — cancel a running turn.
#[delete("/insights/chat/turn/{turn_id}")]
pub async fn cancel_turn_handler(
http_request: HttpRequest,
path: web::Path<String>,
app_state: web::Data<AppState>,
) -> impl Responder {
let turn_id = path.into_inner();
let parent_context = extract_context_from_request(&http_request);
let tracer = global_tracer();
let mut span = tracer.start_with_context("ai.chat.turn.cancel", &parent_context);
span.set_attribute(KeyValue::new("turn_id", turn_id.clone()));
let registry = app_state.turn_registry.clone();
let entry = match registry.get(&turn_id).await {
Some(e) => e,
None => {
span.set_status(Status::error("turn not found"));
return HttpResponse::NotFound().json(serde_json::json!({
"error": format!("turn {} not found", turn_id)
}));
}
};
// Abort the spawned task so it stops producing events promptly. The loop
// also checks `is_running()` at each iteration boundary as a graceful
// backstop in case the abort lands between await points.
let aborted = entry.abort();
span.set_attribute(KeyValue::new("aborted", aborted));
// Push the terminal event BEFORE flipping status: a replay reader treats a
// terminal status with no buffered tail as "closed", so the Done must be
// buffered first for in-progress connections to receive it.
let _ = entry
.push_event(ChatStreamEvent::Done {
tool_calls_made: 0,
iterations_used: 0,
truncated: false,
prompt_tokens: None,
eval_tokens: None,
num_ctx: None,
amended_insight_id: None,
backend_used: "cancelled".to_string(),
model_used: "cancelled".to_string(),
cancelled: true,
})
.await;
entry.set_terminal_status(crate::ai::turn_registry::TurnStatus::Cancelled);
span.set_status(Status::Ok);
HttpResponse::Ok().json(serde_json::json!({
"cancelled": true
}))
}
#[cfg(test)]
mod turn_replay_tests {
use super::{cancel_turn_handler, render_indexed_frame, turn_replay_handler};
use crate::ai::insight_chat::ChatStreamEvent;
use crate::ai::turn_registry::{TurnEntry, TurnStatus};
use crate::state::AppState;
use actix_web::test as actix_test;
use actix_web::{App, web::Data};
use std::sync::Arc;
/// Serialize `AppState::test_state()` construction across the parallel
/// tests in this module: each build opens ~10 DAO connections to the one
/// shared `DATABASE_URL` file, and doing several at once races the WAL
/// `journal_mode` switch into a spurious "database is locked". The test
/// bodies themselves still run in parallel; only the open is gated.
static DB_INIT: std::sync::Mutex<()> = std::sync::Mutex::new(());
fn build_state() -> Data<AppState> {
let _guard = DB_INIT.lock().unwrap_or_else(|p| p.into_inner());
Data::new(AppState::test_state())
}
fn done(cancelled: bool) -> ChatStreamEvent {
ChatStreamEvent::Done {
tool_calls_made: 0,
iterations_used: 1,
truncated: false,
prompt_tokens: Some(10),
eval_tokens: Some(20),
num_ctx: None,
amended_insight_id: None,
backend_used: "local".into(),
model_used: "m".into(),
cancelled,
}
}
/// Seed a completed turn (events + terminal Done) directly in the registry.
async fn seed_completed(state: &AppState, id: &str, text_events: usize) {
let entry = Arc::new(TurnEntry::new(id.into(), "/p.jpg".into(), 1));
for i in 0..text_events {
entry
.push_event(ChatStreamEvent::TextDelta(format!("d{i}")))
.await;
}
entry.push_event(done(false)).await;
entry.set_terminal_status(TurnStatus::Done);
state.turn_registry.insert(entry).await;
}
#[test]
fn indexed_frame_stamps_seq_without_clobbering_tool_index() {
// tool_call carries its own pairing `index`; `seq` must be additive.
let frame = render_indexed_frame(
&ChatStreamEvent::ToolCall {
index: 3,
name: "geo".into(),
arguments: serde_json::json!({}),
},
42,
);
assert!(frame.contains("event: tool_call"));
assert!(frame.contains("\"index\":3"));
assert!(frame.contains("\"seq\":42"));
}
#[actix_rt::test]
async fn replay_unknown_turn_is_404() {
let state = build_state();
let app = actix_test::init_service(
App::new()
.service(turn_replay_handler)
.app_data(state.clone()),
)
.await;
let req = actix_test::TestRequest::get()
.uri("/insights/chat/turn/nope")
.to_request();
let resp = actix_test::call_service(&app, req).await;
assert_eq!(resp.status(), 404);
}
#[actix_rt::test]
async fn replay_completed_turn_emits_turn_info_and_done_with_seq() {
let state = build_state();
seed_completed(&state, "t1", 2).await;
let app = actix_test::init_service(
App::new()
.service(turn_replay_handler)
.app_data(state.clone()),
)
.await;
let req = actix_test::TestRequest::get()
.uri("/insights/chat/turn/t1")
.to_request();
let resp = actix_test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
let body = String::from_utf8(actix_test::read_body(resp).await.to_vec()).unwrap();
assert!(body.contains("event: turn_info"));
assert!(body.contains("event: text"));
assert!(body.contains("event: done"));
// Events are seq-stamped 0,1 (text) and 2 (done).
assert!(body.contains("\"seq\":0"));
assert!(body.contains("\"seq\":2"));
// Done payload carries the renamed token fields the client reads.
assert!(body.contains("\"prompt_tokens\":10"));
}
#[actix_rt::test]
async fn replay_skip_before_query_skips_applied_events() {
let state = build_state();
seed_completed(&state, "t2", 3).await; // seqs 0,1,2 text; 3 done
let app = actix_test::init_service(
App::new()
.service(turn_replay_handler)
.app_data(state.clone()),
)
.await;
let req = actix_test::TestRequest::get()
.uri("/insights/chat/turn/t2?skip_before=2")
.to_request();
let resp = actix_test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
let body = String::from_utf8(actix_test::read_body(resp).await.to_vec()).unwrap();
// Only seq 2 (last text) and seq 3 (done) should be present.
assert!(body.contains("\"seq\":2"));
assert!(body.contains("\"seq\":3"));
assert!(!body.contains("\"seq\":0"));
assert!(!body.contains("\"seq\":1"));
}
#[actix_rt::test]
async fn replay_evicted_index_is_410() {
let state = build_state();
let entry = Arc::new(TurnEntry::new("t3".into(), "/p.jpg".into(), 1));
// Push past the cap so the front is evicted and base advances.
for i in 0..600 {
entry
.push_event(ChatStreamEvent::TextDelta(format!("d{i}")))
.await;
}
entry.set_terminal_status(TurnStatus::Done);
state.turn_registry.insert(entry).await;
let app = actix_test::init_service(
App::new()
.service(turn_replay_handler)
.app_data(state.clone()),
)
.await;
let req = actix_test::TestRequest::get()
.uri("/insights/chat/turn/t3?skip_before=0")
.to_request();
let resp = actix_test::call_service(&app, req).await;
assert_eq!(resp.status(), 410);
}
#[actix_rt::test]
async fn cancel_unknown_turn_is_404() {
let state = build_state();
let app = actix_test::init_service(
App::new()
.service(cancel_turn_handler)
.app_data(state.clone()),
)
.await;
let req = actix_test::TestRequest::delete()
.uri("/insights/chat/turn/nope")
.to_request();
let resp = actix_test::call_service(&app, req).await;
assert_eq!(resp.status(), 404);
}
#[actix_rt::test]
async fn cancel_running_turn_marks_cancelled_and_buffers_terminal() {
let state = build_state();
let entry = Arc::new(TurnEntry::new("t4".into(), "/p.jpg".into(), 1));
entry
.push_event(ChatStreamEvent::TextDelta("partial".into()))
.await;
state.turn_registry.insert(entry.clone()).await;
let app = actix_test::init_service(
App::new()
.service(cancel_turn_handler)
.app_data(state.clone()),
)
.await;
let req = actix_test::TestRequest::delete()
.uri("/insights/chat/turn/t4")
.to_request();
let resp = actix_test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
// Status flipped to Cancelled and a terminal Done(cancelled) buffered
// after the existing event, so a late replay reader still completes.
assert_eq!(
TurnStatus::from(entry.status.load(std::sync::atomic::Ordering::Relaxed)),
TurnStatus::Cancelled
);
let info = entry.info().await;
assert_eq!(info.total_events_pushed, 2);
}
}