Two persona-infrastructure correctness fixes that go together because
the second one (FK with CASCADE) requires the first (preventing the
persona row from being mutated out from under its facts).
1. update_persona handler refuses name/systemPrompt edits to built-ins
(409). includeAllMemories stays editable — that's a per-user
preference, not the persona's identity. Mirrors the existing
delete_persona guard. The DAO is intentionally permissive so the
guard sits at the HTTP layer; persona_dao test pins that contract.
2. Migration 2026-05-10 adds user_id to entity_facts and a composite
FK (user_id, persona_id) -> personas(user_id, persona_id) ON DELETE
CASCADE. This closes two issues at once:
- Persona orphans: deleting a custom persona used to leave its
facts dangling forever, readable only via PersonaFilter::All.
CASCADE now wipes them with the persona row.
- Multi-user fact leakage: PersonaFilter::Single("default") used
to surface every user's default-scoped facts. PersonaFilter is
now { user_id, persona_id } and all read paths
(get_facts_for_entity, list_facts, get_recent_activity) filter
on user_id first. upsert_fact's dedup key extends to user_id so
identical claims under shared persona names from different
users no longer corroborate-bump each other's confidence.
- user_id threads from Claims.sub.parse::<i32>().unwrap_or(1) at
the chat / insight handlers through ChatTurnRequest, the
streaming agentic loop, execute_tool, and into the leaf tools
(tool_store_fact, tool_recall_facts_for_photo). The ".unwrap_or(1)"
accommodates Apollo's service token whose sub is non-numeric on
legacy mints.
- Backfill picks the smallest user_id matching each legacy fact's
persona_id so the FK holds for already-stored rows.
Five new knowledge_dao tests with FK-on connection: persona scoping
isolation, All-variant union per-user, dedup not crossing users,
CASCADE delete, FK rejection of unknown personas. Plus
dao_update_does_not_block_built_ins documenting where the
HTTP-layer guard lives.
Apollo coordinates separately — the matching changes there add the
/api/personas proxy and start sending persona_id on photo-chat turns.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
422 lines
14 KiB
Rust
422 lines
14 KiB
Rust
#![allow(dead_code)]
|
|
|
|
use diesel::prelude::*;
|
|
use diesel::sqlite::SqliteConnection;
|
|
use std::ops::DerefMut;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use crate::database::models::{InsertPersona, Persona};
|
|
use crate::database::schema;
|
|
use crate::database::{DbError, DbErrorKind, connect};
|
|
use crate::otel::trace_db_call;
|
|
|
|
/// Patch shape for update_persona. None = leave field alone. Built-ins are
|
|
/// allowed to flip `include_all_memories` but should reject name/prompt
|
|
/// edits at the handler layer (built-in copy lives in the migration).
|
|
pub struct PersonaPatch {
|
|
pub name: Option<String>,
|
|
pub system_prompt: Option<String>,
|
|
pub include_all_memories: Option<bool>,
|
|
}
|
|
|
|
/// One row of a bulk migration upload. Fields named to match the JSON
|
|
/// shape the mobile client uploads (`POST /personas/migrate`).
|
|
pub struct ImportPersona {
|
|
pub persona_id: String,
|
|
pub name: String,
|
|
pub system_prompt: String,
|
|
pub is_built_in: bool,
|
|
pub created_at: i64,
|
|
}
|
|
|
|
pub trait PersonaDao: Sync + Send {
|
|
fn list_personas(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
user_id: i32,
|
|
) -> Result<Vec<Persona>, DbError>;
|
|
|
|
fn get_persona(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
user_id: i32,
|
|
persona_id: &str,
|
|
) -> Result<Option<Persona>, DbError>;
|
|
|
|
fn create_persona(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
user_id: i32,
|
|
persona_id: &str,
|
|
name: &str,
|
|
system_prompt: &str,
|
|
is_built_in: bool,
|
|
include_all_memories: bool,
|
|
) -> Result<Persona, DbError>;
|
|
|
|
fn update_persona(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
user_id: i32,
|
|
persona_id: &str,
|
|
patch: PersonaPatch,
|
|
) -> Result<Option<Persona>, DbError>;
|
|
|
|
fn delete_persona(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
user_id: i32,
|
|
persona_id: &str,
|
|
) -> Result<bool, DbError>;
|
|
|
|
/// Idempotent bulk import. INSERT OR IGNORE on (user_id, persona_id)
|
|
/// — re-uploading the same set is a no-op. Returns the number of rows
|
|
/// actually inserted (skipped duplicates don't count).
|
|
fn bulk_import(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
user_id: i32,
|
|
personas: &[ImportPersona],
|
|
) -> Result<usize, DbError>;
|
|
}
|
|
|
|
pub struct SqlitePersonaDao {
|
|
connection: Arc<Mutex<SqliteConnection>>,
|
|
}
|
|
|
|
impl Default for SqlitePersonaDao {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl SqlitePersonaDao {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
connection: Arc::new(Mutex::new(connect())),
|
|
}
|
|
}
|
|
|
|
pub fn from_connection(conn: Arc<Mutex<SqliteConnection>>) -> Self {
|
|
Self { connection: conn }
|
|
}
|
|
}
|
|
|
|
impl PersonaDao for SqlitePersonaDao {
|
|
fn list_personas(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
uid: i32,
|
|
) -> Result<Vec<Persona>, DbError> {
|
|
trace_db_call(cx, "query", "list_personas", |_span| {
|
|
use schema::personas::dsl::*;
|
|
let mut conn = self.connection.lock().expect("PersonaDao lock");
|
|
personas
|
|
.filter(user_id.eq(uid))
|
|
.order(created_at.asc())
|
|
.load::<Persona>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn get_persona(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
uid: i32,
|
|
pid: &str,
|
|
) -> Result<Option<Persona>, DbError> {
|
|
trace_db_call(cx, "query", "get_persona", |_span| {
|
|
use schema::personas::dsl::*;
|
|
let mut conn = self.connection.lock().expect("PersonaDao lock");
|
|
personas
|
|
.filter(user_id.eq(uid))
|
|
.filter(persona_id.eq(pid))
|
|
.first::<Persona>(conn.deref_mut())
|
|
.optional()
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn create_persona(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
uid: i32,
|
|
pid: &str,
|
|
nm: &str,
|
|
prompt: &str,
|
|
builtin: bool,
|
|
include_all: bool,
|
|
) -> Result<Persona, DbError> {
|
|
trace_db_call(cx, "insert", "create_persona", |_span| {
|
|
use schema::personas::dsl::*;
|
|
let mut conn = self.connection.lock().expect("PersonaDao lock");
|
|
let now = chrono::Utc::now().timestamp_millis();
|
|
|
|
diesel::insert_into(personas)
|
|
.values(InsertPersona {
|
|
user_id: uid,
|
|
persona_id: pid,
|
|
name: nm,
|
|
system_prompt: prompt,
|
|
is_built_in: builtin,
|
|
include_all_memories: include_all,
|
|
created_at: now,
|
|
updated_at: now,
|
|
})
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
|
|
|
|
personas
|
|
.filter(user_id.eq(uid))
|
|
.filter(persona_id.eq(pid))
|
|
.first::<Persona>(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::InsertError))
|
|
}
|
|
|
|
fn update_persona(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
uid: i32,
|
|
pid: &str,
|
|
patch: PersonaPatch,
|
|
) -> Result<Option<Persona>, DbError> {
|
|
trace_db_call(cx, "update", "update_persona", |_span| {
|
|
use schema::personas::dsl::*;
|
|
let mut conn = self.connection.lock().expect("PersonaDao lock");
|
|
let now = chrono::Utc::now().timestamp_millis();
|
|
|
|
// Apply each field as its own UPDATE — keeps types simple
|
|
// (Diesel's tuple updates don't compose cleanly across optional
|
|
// columns) and matches the pattern already in use for entities
|
|
// (knowledge_dao.rs::update_entity).
|
|
if let Some(ref new_name) = patch.name {
|
|
diesel::update(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid)))
|
|
.set((name.eq(new_name), updated_at.eq(now)))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update name error: {}", e))?;
|
|
}
|
|
if let Some(ref new_prompt) = patch.system_prompt {
|
|
diesel::update(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid)))
|
|
.set((system_prompt.eq(new_prompt), updated_at.eq(now)))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update prompt error: {}", e))?;
|
|
}
|
|
if let Some(new_include_all) = patch.include_all_memories {
|
|
diesel::update(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid)))
|
|
.set((include_all_memories.eq(new_include_all), updated_at.eq(now)))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Update include_all error: {}", e))?;
|
|
}
|
|
|
|
personas
|
|
.filter(user_id.eq(uid))
|
|
.filter(persona_id.eq(pid))
|
|
.first::<Persona>(conn.deref_mut())
|
|
.optional()
|
|
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
|
|
}
|
|
|
|
fn delete_persona(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
uid: i32,
|
|
pid: &str,
|
|
) -> Result<bool, DbError> {
|
|
trace_db_call(cx, "delete", "delete_persona", |_span| {
|
|
use schema::personas::dsl::*;
|
|
let mut conn = self.connection.lock().expect("PersonaDao lock");
|
|
let n = diesel::delete(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid)))
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))?;
|
|
Ok(n > 0)
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn bulk_import(
|
|
&mut self,
|
|
cx: &opentelemetry::Context,
|
|
uid: i32,
|
|
rows: &[ImportPersona],
|
|
) -> Result<usize, DbError> {
|
|
trace_db_call(cx, "insert", "bulk_import_personas", |_span| {
|
|
let mut conn = self.connection.lock().expect("PersonaDao lock");
|
|
let now = chrono::Utc::now().timestamp_millis();
|
|
let mut inserted = 0usize;
|
|
|
|
// INSERT OR IGNORE on the (user_id, persona_id) UNIQUE so
|
|
// re-running migrate is a no-op for personas already on the
|
|
// server.
|
|
for p in rows {
|
|
let n = diesel::sql_query(
|
|
"INSERT OR IGNORE INTO personas (user_id, persona_id, name, system_prompt, \
|
|
is_built_in, include_all_memories, created_at, updated_at) \
|
|
VALUES (?, ?, ?, ?, ?, 0, ?, ?)",
|
|
)
|
|
.bind::<diesel::sql_types::Integer, _>(uid)
|
|
.bind::<diesel::sql_types::Text, _>(&p.persona_id)
|
|
.bind::<diesel::sql_types::Text, _>(&p.name)
|
|
.bind::<diesel::sql_types::Text, _>(&p.system_prompt)
|
|
.bind::<diesel::sql_types::Bool, _>(p.is_built_in)
|
|
.bind::<diesel::sql_types::BigInt, _>(p.created_at)
|
|
.bind::<diesel::sql_types::BigInt, _>(now)
|
|
.execute(conn.deref_mut())
|
|
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
|
|
inserted += n;
|
|
}
|
|
Ok(inserted)
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::InsertError))
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::database::test::in_memory_db_connection;
|
|
|
|
fn dao_with_user(username: &str) -> (SqlitePersonaDao, i32) {
|
|
use crate::database::schema::users::dsl as u;
|
|
let conn = Arc::new(Mutex::new(in_memory_db_connection()));
|
|
diesel::insert_into(u::users)
|
|
.values((u::username.eq(username), u::password.eq("x")))
|
|
.execute(conn.lock().unwrap().deref_mut())
|
|
.unwrap();
|
|
let user_id: i32 = u::users
|
|
.filter(u::username.eq(username))
|
|
.select(u::id)
|
|
.first(conn.lock().unwrap().deref_mut())
|
|
.unwrap();
|
|
(SqlitePersonaDao::from_connection(conn), user_id)
|
|
}
|
|
|
|
#[test]
|
|
fn create_and_list_round_trip() {
|
|
let cx = opentelemetry::Context::new();
|
|
let (mut dao, uid) = dao_with_user("alice");
|
|
|
|
// The migration seeds 3 built-ins for any existing user; alice
|
|
// was created post-migration so she starts empty.
|
|
let p = dao
|
|
.create_persona(&cx, uid, "custom-1", "Custom A", "prompt A", false, false)
|
|
.unwrap();
|
|
assert_eq!(p.persona_id, "custom-1");
|
|
assert_eq!(p.user_id, uid);
|
|
assert!(!p.is_built_in);
|
|
|
|
let list = dao.list_personas(&cx, uid).unwrap();
|
|
assert_eq!(list.len(), 1);
|
|
assert_eq!(list[0].persona_id, "custom-1");
|
|
}
|
|
|
|
#[test]
|
|
fn unique_constraint_blocks_duplicate_persona_id() {
|
|
let cx = opentelemetry::Context::new();
|
|
let (mut dao, uid) = dao_with_user("bob");
|
|
|
|
dao.create_persona(&cx, uid, "x", "X", "p", false, false)
|
|
.unwrap();
|
|
let err = dao.create_persona(&cx, uid, "x", "X2", "p2", false, false);
|
|
assert!(
|
|
err.is_err(),
|
|
"second insert with same persona_id should fail"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn bulk_import_is_idempotent() {
|
|
let cx = opentelemetry::Context::new();
|
|
let (mut dao, uid) = dao_with_user("carol");
|
|
|
|
let rows = vec![
|
|
ImportPersona {
|
|
persona_id: "custom-a".into(),
|
|
name: "A".into(),
|
|
system_prompt: "p1".into(),
|
|
is_built_in: false,
|
|
created_at: 1,
|
|
},
|
|
ImportPersona {
|
|
persona_id: "custom-b".into(),
|
|
name: "B".into(),
|
|
system_prompt: "p2".into(),
|
|
is_built_in: false,
|
|
created_at: 2,
|
|
},
|
|
];
|
|
|
|
let first = dao.bulk_import(&cx, uid, &rows).unwrap();
|
|
assert_eq!(first, 2);
|
|
let second = dao.bulk_import(&cx, uid, &rows).unwrap();
|
|
assert_eq!(second, 0, "re-import should insert nothing");
|
|
|
|
assert_eq!(dao.list_personas(&cx, uid).unwrap().len(), 2);
|
|
}
|
|
|
|
#[test]
|
|
fn dao_update_does_not_block_built_ins() {
|
|
// Documenting contract: the DAO is intentionally permissive —
|
|
// `update_persona` will apply name/system_prompt edits to ANY
|
|
// row, including built-ins. The guard against editing built-in
|
|
// identity (name + systemPrompt) lives in the HTTP handler
|
|
// (src/personas.rs::update_persona). If you find yourself
|
|
// wanting to add the guard here too, prefer that — defence in
|
|
// depth — but keep this test passing so anyone who removes
|
|
// the handler guard gets a failing call site, not silent data
|
|
// corruption.
|
|
let cx = opentelemetry::Context::new();
|
|
let (mut dao, uid) = dao_with_user("eve");
|
|
|
|
dao.create_persona(&cx, uid, "default", "Default", "old", true, false)
|
|
.unwrap();
|
|
let updated = dao
|
|
.update_persona(
|
|
&cx,
|
|
uid,
|
|
"default",
|
|
PersonaPatch {
|
|
name: Some("Renamed".into()),
|
|
system_prompt: Some("new prompt".into()),
|
|
include_all_memories: None,
|
|
},
|
|
)
|
|
.unwrap()
|
|
.unwrap();
|
|
assert_eq!(updated.name, "Renamed");
|
|
assert_eq!(updated.system_prompt, "new prompt");
|
|
assert!(
|
|
updated.is_built_in,
|
|
"is_built_in flag should be unchanged by patch"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn update_toggles_include_all_memories() {
|
|
let cx = opentelemetry::Context::new();
|
|
let (mut dao, uid) = dao_with_user("dan");
|
|
|
|
dao.create_persona(&cx, uid, "j", "Journal", "p", true, false)
|
|
.unwrap();
|
|
let updated = dao
|
|
.update_persona(
|
|
&cx,
|
|
uid,
|
|
"j",
|
|
PersonaPatch {
|
|
name: None,
|
|
system_prompt: None,
|
|
include_all_memories: Some(true),
|
|
},
|
|
)
|
|
.unwrap()
|
|
.unwrap();
|
|
assert!(updated.include_all_memories);
|
|
}
|
|
}
|