personas: elevate to server with per-persona fact scoping

Move personas off the mobile client into ImageApi as first-class
records, and scope entity_facts by persona so each one builds its own
voice over a shared entity graph. The new include_all_memories flag
lets a persona opt back into the full hive-mind pool for human
browsing of /knowledge/*; agentic generation always stays in-voice.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Cameron Cordes
2026-05-09 17:59:20 -04:00
parent 55a986c249
commit 3e2f36a748
15 changed files with 1024 additions and 20 deletions

View File

@@ -0,0 +1,43 @@
-- Drop the persona-scoping column on entity_facts via the table-rebuild
-- dance for SQLite-version portability (matches the pattern in
-- 2026-04-20-000000_add_backend_to_insights/down.sql).
DROP INDEX IF EXISTS idx_entity_facts_persona;
CREATE TABLE entity_facts_backup AS
SELECT id, subject_entity_id, predicate, object_entity_id, object_value,
source_photo, source_insight_id, confidence, status, created_at
FROM entity_facts;
DROP TABLE entity_facts;
CREATE TABLE entity_facts (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
subject_entity_id INTEGER NOT NULL,
predicate TEXT NOT NULL,
object_entity_id INTEGER,
object_value TEXT,
source_photo TEXT,
source_insight_id INTEGER,
confidence REAL NOT NULL DEFAULT 0.6,
status TEXT NOT NULL DEFAULT 'active',
created_at BIGINT NOT NULL,
CONSTRAINT fk_ef_subject FOREIGN KEY (subject_entity_id) REFERENCES entities(id) ON DELETE CASCADE,
CONSTRAINT fk_ef_object FOREIGN KEY (object_entity_id) REFERENCES entities(id) ON DELETE SET NULL,
CONSTRAINT fk_ef_insight FOREIGN KEY (source_insight_id) REFERENCES photo_insights(id) ON DELETE SET NULL,
CHECK (object_entity_id IS NOT NULL OR object_value IS NOT NULL)
);
INSERT INTO entity_facts
SELECT id, subject_entity_id, predicate, object_entity_id, object_value,
source_photo, source_insight_id, confidence, status, created_at
FROM entity_facts_backup;
DROP TABLE entity_facts_backup;
CREATE INDEX idx_entity_facts_subject ON entity_facts(subject_entity_id);
CREATE INDEX idx_entity_facts_predicate ON entity_facts(predicate);
CREATE INDEX idx_entity_facts_status ON entity_facts(status);
CREATE INDEX idx_entity_facts_source_photo ON entity_facts(source_photo);
DROP INDEX IF EXISTS idx_personas_user;
DROP TABLE IF EXISTS personas;

View File

@@ -0,0 +1,64 @@
-- Personas live server-side now (mobile previously stored them in
-- AsyncStorage only). Each user gets the three built-ins seeded; custom
-- personas land here too via POST /personas or POST /personas/migrate.
--
-- `entity_facts` gains a persona_id so each persona accumulates its own
-- voice over a shared entity graph (entities themselves stay unscoped).
-- Existing rows backfill to 'default' via the column DEFAULT — that
-- becomes the historical baseline. The `include_all_memories` flag on
-- personas lets any persona opt back into reading the full pool.
CREATE TABLE personas (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
user_id INTEGER NOT NULL,
persona_id TEXT NOT NULL,
name TEXT NOT NULL,
system_prompt TEXT NOT NULL,
is_built_in BOOLEAN NOT NULL DEFAULT FALSE,
include_all_memories BOOLEAN NOT NULL DEFAULT FALSE,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL,
UNIQUE(user_id, persona_id),
CONSTRAINT fk_personas_user FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE INDEX idx_personas_user ON personas(user_id);
-- Seed built-ins for every existing user. System prompts copied verbatim
-- from FileViewer-React/hooks/usePersonas.tsx so server and client agree
-- on the canonical voice for each built-in.
INSERT INTO personas (user_id, persona_id, name, system_prompt, is_built_in, created_at, updated_at)
SELECT
u.id,
'default',
'Default Assistant',
'You are my long-term memory assistant. Use only the information provided. Do not invent details. Respond in 36 sentences in third person, leading with the most concrete moment from the photo and the surrounding context. Plain prose, no headings.',
TRUE,
strftime('%s', 'now') * 1000,
strftime('%s', 'now') * 1000
FROM users u
UNION ALL
SELECT
u.id,
'journal',
'Personal Journal',
'You are a personal journal writer. Write in first person, present tense, with warmth and reflection — focusing on emotions and meaningful moments. Use only the information provided; do not invent details. Aim for 48 sentences in a single flowing paragraph, no headings.',
TRUE,
strftime('%s', 'now') * 1000,
strftime('%s', 'now') * 1000
FROM users u
UNION ALL
SELECT
u.id,
'factual',
'Factual Reporter',
'You are a factual memory recorder. Be precise, objective, and concise. Lead with the date and place, then list what / when / who in 24 short sentences. Use only the information provided; if a detail is unknown, say so rather than guessing.',
TRUE,
strftime('%s', 'now') * 1000,
strftime('%s', 'now') * 1000
FROM users u;
-- Persona scoping on facts only. Entities and entity_photo_links stay
-- shared (real-world referents and shared photo ↔ entity associations).
ALTER TABLE entity_facts ADD COLUMN persona_id TEXT NOT NULL DEFAULT 'default';
CREATE INDEX idx_entity_facts_persona ON entity_facts(persona_id);

View File

@@ -48,6 +48,11 @@ pub struct GeneratePhotoInsightRequest {
/// falls back to `DEFAULT_FEWSHOT_INSIGHT_IDS`. /// falls back to `DEFAULT_FEWSHOT_INSIGHT_IDS`.
#[serde(default)] #[serde(default)]
pub fewshot_insight_ids: Option<Vec<i32>>, pub fewshot_insight_ids: Option<Vec<i32>>,
/// Active persona id for this generation. New facts are tagged with
/// it (`entity_facts.persona_id`); recall during the agentic loop is
/// scoped to it. Defaults to `"default"` when absent.
#[serde(default)]
pub persona_id: Option<String>,
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
@@ -376,6 +381,13 @@ pub async fn generate_agentic_insight_handler(
.collect() .collect()
}; };
let persona_id = request
.persona_id
.clone()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "default".to_string());
span.set_attribute(KeyValue::new("persona_id", persona_id.clone()));
let result = insight_generator let result = insight_generator
.generate_agentic_insight_for_photo( .generate_agentic_insight_for_photo(
&normalized_path, &normalized_path,
@@ -390,6 +402,7 @@ pub async fn generate_agentic_insight_handler(
request.backend.clone(), request.backend.clone(),
fewshot_examples, fewshot_examples,
fewshot_ids, fewshot_ids,
persona_id,
) )
.await; .await;
@@ -645,6 +658,10 @@ pub struct ChatTurnHttpRequest {
/// semantics. Also seeds the bootstrap path when no insight exists. /// semantics. Also seeds the bootstrap path when no insight exists.
#[serde(default)] #[serde(default)]
pub system_prompt: Option<String>, pub system_prompt: Option<String>,
/// Active persona id for this turn. New facts/recalls scope to it.
/// Defaults to `"default"` when missing.
#[serde(default)]
pub persona_id: Option<String>,
#[serde(default)] #[serde(default)]
pub amend: bool, pub amend: bool,
/// When true, force the bootstrap path even if an insight already /// When true, force the bootstrap path even if an insight already
@@ -707,6 +724,7 @@ pub async fn chat_turn_handler(
min_p: request.min_p, min_p: request.min_p,
max_iterations: request.max_iterations, max_iterations: request.max_iterations,
system_prompt: request.system_prompt.clone(), system_prompt: request.system_prompt.clone(),
persona_id: request.persona_id.clone(),
amend: request.amend, amend: request.amend,
regenerate: request.regenerate, regenerate: request.regenerate,
}; };
@@ -923,6 +941,7 @@ pub async fn chat_stream_handler(
min_p: request.min_p, min_p: request.min_p,
max_iterations: request.max_iterations, max_iterations: request.max_iterations,
system_prompt: request.system_prompt.clone(), system_prompt: request.system_prompt.clone(),
persona_id: request.persona_id.clone(),
amend: request.amend, amend: request.amend,
regenerate: request.regenerate, regenerate: request.regenerate,
}; };

View File

@@ -50,6 +50,10 @@ pub struct ChatTurnRequest {
/// In amend mode, persisted into the new insight row's system message. /// In amend mode, persisted into the new insight row's system message.
/// None / empty = no change. /// None / empty = no change.
pub system_prompt: Option<String>, pub system_prompt: Option<String>,
/// Active persona id for this turn. Tools that write to
/// `entity_facts` tag the new rows with it; `recall_facts_for_photo`
/// scopes its read to it. None defaults to `"default"`.
pub persona_id: Option<String>,
/// When true, write a new insight row (regenerating title) instead of /// When true, write a new insight row (regenerating title) instead of
/// updating training_messages on the existing row. /// updating training_messages on the existing row.
pub amend: bool, pub amend: bool,
@@ -231,6 +235,13 @@ impl InsightChatService {
bail!("user_message exceeds 8192 chars"); bail!("user_message exceeds 8192 chars");
} }
let active_persona = req
.persona_id
.clone()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "default".to_string());
span.set_attribute(KeyValue::new("persona_id", active_persona.clone()));
let normalized = normalize_path(&req.file_path); let normalized = normalize_path(&req.file_path);
// 1. Acquire the per-(library, file) async mutex. Two concurrent // 1. Acquire the per-(library, file) async mutex. Two concurrent
@@ -464,6 +475,7 @@ impl InsightChatService {
&ollama_client, &ollama_client,
&image_base64, &image_base64,
&normalized, &normalized,
&active_persona,
&loop_cx, &loop_cx,
) )
.await; .await;
@@ -737,6 +749,11 @@ impl InsightChatService {
insight: crate::database::models::PhotoInsight, insight: crate::database::models::PhotoInsight,
tx: tokio::sync::mpsc::Sender<ChatStreamEvent>, tx: tokio::sync::mpsc::Sender<ChatStreamEvent>,
) -> Result<()> { ) -> Result<()> {
let active_persona = req
.persona_id
.clone()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "default".to_string());
let raw_history = insight.training_messages.as_ref().ok_or_else(|| { let raw_history = insight.training_messages.as_ref().ok_or_else(|| {
anyhow!("insight has no chat history; regenerate this insight in agentic mode") anyhow!("insight has no chat history; regenerate this insight in agentic mode")
})?; })?;
@@ -826,6 +843,7 @@ impl InsightChatService {
tools, tools,
&image_base64, &image_base64,
&normalized, &normalized,
&active_persona,
max_iterations, max_iterations,
&tx, &tx,
) )
@@ -915,6 +933,11 @@ impl InsightChatService {
normalized: String, normalized: String,
tx: tokio::sync::mpsc::Sender<ChatStreamEvent>, tx: tokio::sync::mpsc::Sender<ChatStreamEvent>,
) -> Result<()> { ) -> Result<()> {
let active_persona = req
.persona_id
.clone()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "default".to_string());
let effective_backend = resolve_bootstrap_backend(req.backend.as_deref())?; let effective_backend = resolve_bootstrap_backend(req.backend.as_deref())?;
let is_hybrid = effective_backend == "hybrid"; let is_hybrid = effective_backend == "hybrid";
@@ -1008,6 +1031,7 @@ impl InsightChatService {
tools, tools,
&image_base64, &image_base64,
&normalized, &normalized,
&active_persona,
max_iterations, max_iterations,
&tx, &tx,
) )
@@ -1157,6 +1181,7 @@ impl InsightChatService {
tools: Vec<Tool>, tools: Vec<Tool>,
image_base64: &Option<String>, image_base64: &Option<String>,
normalized: &str, normalized: &str,
active_persona: &str,
max_iterations: usize, max_iterations: usize,
tx: &tokio::sync::mpsc::Sender<ChatStreamEvent>, tx: &tokio::sync::mpsc::Sender<ChatStreamEvent>,
) -> Result<AgenticLoopOutcome> { ) -> Result<AgenticLoopOutcome> {
@@ -1235,6 +1260,7 @@ impl InsightChatService {
ollama_client, ollama_client,
image_base64, image_base64,
normalized, normalized,
active_persona,
&cx, &cx,
) )
.await; .await;

View File

@@ -1534,7 +1534,15 @@ Return ONLY the summary, nothing else."#,
// ── Tool executors for agentic loop ──────────────────────────────── // ── Tool executors for agentic loop ────────────────────────────────
/// Dispatch a tool call to the appropriate executor /// Dispatch a tool call to the appropriate executor.
///
/// `persona_id` identifies the persona this loop is generating for —
/// `store_fact` tags new facts with it, `recall_facts_for_photo`
/// filters reads to it (always Single in the agentic loop, even when
/// the persona has `include_all_memories=true`; the hive-mind toggle
/// is for human browsing of `/knowledge/*`, where mixing voices is
/// the explicit goal — during generation the persona's own voice
/// must stay clean).
pub(crate) async fn execute_tool( pub(crate) async fn execute_tool(
&self, &self,
tool_name: &str, tool_name: &str,
@@ -1542,6 +1550,7 @@ Return ONLY the summary, nothing else."#,
ollama: &OllamaClient, ollama: &OllamaClient,
image_base64: &Option<String>, image_base64: &Option<String>,
file_path: &str, file_path: &str,
persona_id: &str,
cx: &opentelemetry::Context, cx: &opentelemetry::Context,
) -> String { ) -> String {
let result = match tool_name { let result = match tool_name {
@@ -1556,9 +1565,15 @@ Return ONLY the summary, nothing else."#,
"reverse_geocode" => self.tool_reverse_geocode(arguments).await, "reverse_geocode" => self.tool_reverse_geocode(arguments).await,
"get_personal_place_at" => self.tool_get_personal_place_at(arguments).await, "get_personal_place_at" => self.tool_get_personal_place_at(arguments).await,
"recall_entities" => self.tool_recall_entities(arguments, cx).await, "recall_entities" => self.tool_recall_entities(arguments, cx).await,
"recall_facts_for_photo" => self.tool_recall_facts_for_photo(arguments, cx).await, "recall_facts_for_photo" => {
self.tool_recall_facts_for_photo(arguments, persona_id, cx)
.await
}
"store_entity" => self.tool_store_entity(arguments, ollama, cx).await, "store_entity" => self.tool_store_entity(arguments, ollama, cx).await,
"store_fact" => self.tool_store_fact(arguments, file_path, cx).await, "store_fact" => {
self.tool_store_fact(arguments, file_path, persona_id, cx)
.await
}
"get_current_datetime" => Self::tool_get_current_datetime(), "get_current_datetime" => Self::tool_get_current_datetime(),
unknown => format!("Unknown tool: {}", unknown), unknown => format!("Unknown tool: {}", unknown),
}; };
@@ -2391,8 +2406,11 @@ Return ONLY the summary, nothing else."#,
async fn tool_recall_facts_for_photo( async fn tool_recall_facts_for_photo(
&self, &self,
args: &serde_json::Value, args: &serde_json::Value,
persona_id: &str,
cx: &opentelemetry::Context, cx: &opentelemetry::Context,
) -> String { ) -> String {
use crate::database::PersonaFilter;
let persona_filter = PersonaFilter::Single(persona_id.to_string());
let file_path = match args.get("file_path").and_then(|v| v.as_str()) { let file_path = match args.get("file_path").and_then(|v| v.as_str()) {
Some(p) => p.to_string(), Some(p) => p.to_string(),
None => return "Error: missing required parameter 'file_path'".to_string(), None => return "Error: missing required parameter 'file_path'".to_string(),
@@ -2432,7 +2450,7 @@ Return ONLY the summary, nothing else."#,
"Entity: {} ({}, role: {})", "Entity: {} ({}, role: {})",
e.name, e.entity_type, role e.name, e.entity_type, role
)); ));
if let Ok(facts) = kdao.get_facts_for_entity(cx, entity_id) { if let Ok(facts) = kdao.get_facts_for_entity(cx, entity_id, &persona_filter) {
for f in facts.iter().filter(|f| f.status == "active") { for f in facts.iter().filter(|f| f.status == "active") {
let obj = if let Some(ref v) = f.object_value { let obj = if let Some(ref v) = f.object_value {
v.clone() v.clone()
@@ -2577,6 +2595,7 @@ Return ONLY the summary, nothing else."#,
&self, &self,
args: &serde_json::Value, args: &serde_json::Value,
file_path: &str, file_path: &str,
persona_id: &str,
cx: &opentelemetry::Context, cx: &opentelemetry::Context,
) -> String { ) -> String {
use crate::database::models::{InsertEntityFact, InsertEntityPhotoLink}; use crate::database::models::{InsertEntityFact, InsertEntityPhotoLink};
@@ -2627,6 +2646,7 @@ Return ONLY the summary, nothing else."#,
confidence: 0.6, confidence: 0.6,
status: "active".to_string(), status: "active".to_string(),
created_at: chrono::Utc::now().timestamp(), created_at: chrono::Utc::now().timestamp(),
persona_id: persona_id.to_string(),
}; };
let mut kdao = self let mut kdao = self
@@ -3176,6 +3196,7 @@ Return ONLY the summary, nothing else."#,
backend: Option<String>, backend: Option<String>,
fewshot_examples: Vec<Vec<ChatMessage>>, fewshot_examples: Vec<Vec<ChatMessage>>,
fewshot_source_ids: Vec<i32>, fewshot_source_ids: Vec<i32>,
persona_id: String,
) -> Result<(Option<i32>, Option<i32>)> { ) -> Result<(Option<i32>, Option<i32>)> {
let tracer = global_tracer(); let tracer = global_tracer();
let current_cx = opentelemetry::Context::current(); let current_cx = opentelemetry::Context::current();
@@ -3652,6 +3673,7 @@ Return ONLY the summary, nothing else."#,
&ollama_client, &ollama_client,
&image_base64, &image_base64,
&file_path, &file_path,
&persona_id,
&loop_cx, &loop_cx,
) )
.await; .await;

View File

@@ -335,6 +335,7 @@ async fn main() -> anyhow::Result<()> {
None, None,
Vec::new(), Vec::new(),
Vec::new(), Vec::new(),
"default".to_string(),
) )
.await .await
{ {

View File

@@ -50,10 +50,21 @@ pub struct FactFilter {
/// "active" | "reviewed" | "rejected" | "all" /// "active" | "reviewed" | "rejected" | "all"
pub status: Option<String>, pub status: Option<String>,
pub predicate: Option<String>, pub predicate: Option<String>,
pub persona: PersonaFilter,
pub limit: i64, pub limit: i64,
pub offset: i64, pub offset: i64,
} }
/// Persona scoping for fact reads. `Single` filters to one persona's
/// view; `All` is the hive-mind read used when a persona has
/// `include_all_memories=true` in the personas table. Entities and
/// photo-links are always shared and don't take a persona filter.
#[derive(Debug, Clone)]
pub enum PersonaFilter {
Single(String),
All,
}
pub struct EntityPatch { pub struct EntityPatch {
pub name: Option<String>, pub name: Option<String>,
pub description: Option<String>, pub description: Option<String>,
@@ -144,6 +155,7 @@ pub trait KnowledgeDao: Sync + Send {
&mut self, &mut self,
cx: &opentelemetry::Context, cx: &opentelemetry::Context,
entity_id: i32, entity_id: i32,
persona: &PersonaFilter,
) -> Result<Vec<EntityFact>, DbError>; ) -> Result<Vec<EntityFact>, DbError>;
fn list_facts( fn list_facts(
@@ -199,6 +211,7 @@ pub trait KnowledgeDao: Sync + Send {
cx: &opentelemetry::Context, cx: &opentelemetry::Context,
since: i64, since: i64,
limit: i64, limit: i64,
persona: &PersonaFilter,
) -> Result<RecentActivity, DbError>; ) -> Result<RecentActivity, DbError>;
} }
@@ -584,10 +597,14 @@ impl KnowledgeDao for SqliteKnowledgeDao {
use schema::entity_facts::dsl::*; use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// Look for an identical active fact // Look for an identical active fact AUTHORED BY THE SAME
// PERSONA. The same claim from a different persona is a
// separate fact (each persona's voice/confidence is its own),
// not a confidence bump on someone else's row.
let mut dup_query = entity_facts let mut dup_query = entity_facts
.filter(subject_entity_id.eq(fact.subject_entity_id)) .filter(subject_entity_id.eq(fact.subject_entity_id))
.filter(predicate.eq(&fact.predicate)) .filter(predicate.eq(&fact.predicate))
.filter(persona_id.eq(&fact.persona_id))
.filter(status.ne("rejected")) .filter(status.ne("rejected"))
.into_boxed(); .into_boxed();
@@ -640,14 +657,19 @@ impl KnowledgeDao for SqliteKnowledgeDao {
&mut self, &mut self,
cx: &opentelemetry::Context, cx: &opentelemetry::Context,
entity_id: i32, entity_id: i32,
persona: &PersonaFilter,
) -> Result<Vec<EntityFact>, DbError> { ) -> Result<Vec<EntityFact>, DbError> {
trace_db_call(cx, "query", "get_facts_for_entity", |_span| { trace_db_call(cx, "query", "get_facts_for_entity", |_span| {
use schema::entity_facts::dsl::*; use schema::entity_facts::dsl::*;
let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let mut conn = self.connection.lock().expect("KnowledgeDao lock");
entity_facts let mut q = entity_facts
.filter(subject_entity_id.eq(entity_id)) .filter(subject_entity_id.eq(entity_id))
.filter(status.ne("rejected")) .filter(status.ne("rejected"))
.load::<EntityFact>(conn.deref_mut()) .into_boxed();
if let PersonaFilter::Single(pid) = persona {
q = q.filter(persona_id.eq(pid.clone()));
}
q.load::<EntityFact>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e)) .map_err(|e| anyhow::anyhow!("Query error: {}", e))
}) })
.map_err(|_| DbError::new(DbErrorKind::QueryError)) .map_err(|_| DbError::new(DbErrorKind::QueryError))
@@ -664,19 +686,27 @@ impl KnowledgeDao for SqliteKnowledgeDao {
let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let mut conn = self.connection.lock().expect("KnowledgeDao lock");
let mut query = entity_facts.into_boxed(); let mut query = entity_facts.into_boxed();
let mut count_query = entity_facts.into_boxed();
if let Some(eid) = filter.entity_id { if let Some(eid) = filter.entity_id {
query = query.filter(subject_entity_id.eq(eid)); query = query.filter(subject_entity_id.eq(eid));
count_query = count_query.filter(subject_entity_id.eq(eid));
} }
let status_val = filter.status.as_deref().unwrap_or("active"); let status_val = filter.status.as_deref().unwrap_or("active");
if status_val != "all" { if status_val != "all" {
query = query.filter(status.eq(status_val)); query = query.filter(status.eq(status_val));
count_query = count_query.filter(status.eq(status_val));
} }
if let Some(ref pred) = filter.predicate { if let Some(ref pred) = filter.predicate {
query = query.filter(predicate.eq(pred)); query = query.filter(predicate.eq(pred));
count_query = count_query.filter(predicate.eq(pred));
}
if let PersonaFilter::Single(ref pid) = filter.persona {
query = query.filter(persona_id.eq(pid.clone()));
count_query = count_query.filter(persona_id.eq(pid.clone()));
} }
let total: i64 = entity_facts let total: i64 = count_query
.select(count_star()) .select(count_star())
.first(conn.deref_mut()) .first(conn.deref_mut())
.unwrap_or(0); .unwrap_or(0);
@@ -854,12 +884,14 @@ impl KnowledgeDao for SqliteKnowledgeDao {
cx: &opentelemetry::Context, cx: &opentelemetry::Context,
since: i64, since: i64,
limit: i64, limit: i64,
persona: &PersonaFilter,
) -> Result<RecentActivity, DbError> { ) -> Result<RecentActivity, DbError> {
trace_db_call(cx, "query", "get_recent_activity", |_span| { trace_db_call(cx, "query", "get_recent_activity", |_span| {
use schema::entities::dsl as e; use schema::entities::dsl as e;
use schema::entity_facts::dsl as ef; use schema::entity_facts::dsl as ef;
let mut conn = self.connection.lock().expect("KnowledgeDao lock"); let mut conn = self.connection.lock().expect("KnowledgeDao lock");
// Entities are shared — recency is global.
let recent_entities = e::entities let recent_entities = e::entities
.filter(e::created_at.gt(since)) .filter(e::created_at.gt(since))
.order(e::created_at.desc()) .order(e::created_at.desc())
@@ -867,8 +899,13 @@ impl KnowledgeDao for SqliteKnowledgeDao {
.load::<Entity>(conn.deref_mut()) .load::<Entity>(conn.deref_mut())
.map_err(|err| anyhow::anyhow!("Query error: {}", err))?; .map_err(|err| anyhow::anyhow!("Query error: {}", err))?;
let recent_facts = ef::entity_facts let mut facts_q = ef::entity_facts
.filter(ef::created_at.gt(since)) .filter(ef::created_at.gt(since))
.into_boxed();
if let PersonaFilter::Single(pid) = persona {
facts_q = facts_q.filter(ef::persona_id.eq(pid.clone()));
}
let recent_facts = facts_q
.order(ef::created_at.desc()) .order(ef::created_at.desc())
.limit(limit) .limit(limit)
.load::<EntityFact>(conn.deref_mut()) .load::<EntityFact>(conn.deref_mut())

View File

@@ -49,6 +49,7 @@ pub mod insights_dao;
pub mod knowledge_dao; pub mod knowledge_dao;
pub mod location_dao; pub mod location_dao;
pub mod models; pub mod models;
pub mod persona_dao;
pub mod preview_dao; pub mod preview_dao;
pub mod reconcile; pub mod reconcile;
pub mod schema; pub mod schema;
@@ -58,10 +59,11 @@ pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao};
pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao}; pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao};
pub use insights_dao::{InsightDao, SqliteInsightDao}; pub use insights_dao::{InsightDao, SqliteInsightDao};
pub use knowledge_dao::{ pub use knowledge_dao::{
EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, RecentActivity, EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, RecentActivity,
SqliteKnowledgeDao, SqliteKnowledgeDao,
}; };
pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao}; pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao};
pub use persona_dao::{ImportPersona, PersonaDao, PersonaPatch, SqlitePersonaDao};
pub use preview_dao::{PreviewDao, SqlitePreviewDao}; pub use preview_dao::{PreviewDao, SqlitePreviewDao};
pub use search_dao::{SearchHistoryDao, SqliteSearchHistoryDao}; pub use search_dao::{SearchHistoryDao, SqliteSearchHistoryDao};

View File

@@ -1,6 +1,6 @@
use crate::database::schema::{ use crate::database::schema::{
entities, entity_facts, entity_photo_links, favorites, image_exif, libraries, photo_insights, entities, entity_facts, entity_photo_links, favorites, image_exif, libraries, personas,
users, video_preview_clips, photo_insights, users, video_preview_clips,
}; };
use serde::Serialize; use serde::Serialize;
@@ -238,6 +238,11 @@ pub struct InsertEntityFact {
pub confidence: f32, pub confidence: f32,
pub status: String, pub status: String,
pub created_at: i64, pub created_at: i64,
/// Which persona authored this fact. Shared entities, persona-tagged
/// facts: each persona accumulates its own voice over the same
/// real-world referents. Defaults to `'default'` for legacy rows
/// (see migration 2026-05-09-000000).
pub persona_id: String,
} }
#[derive(Serialize, Queryable, Clone, Debug)] #[derive(Serialize, Queryable, Clone, Debug)]
@@ -252,6 +257,7 @@ pub struct EntityFact {
pub confidence: f32, pub confidence: f32,
pub status: String, pub status: String,
pub created_at: i64, pub created_at: i64,
pub persona_id: String,
} }
#[derive(Insertable)] #[derive(Insertable)]
@@ -274,6 +280,34 @@ pub struct EntityPhotoLink {
pub role: String, pub role: String,
} }
// --- Personas ---
#[derive(Insertable)]
#[diesel(table_name = personas)]
pub struct InsertPersona<'a> {
pub user_id: i32,
pub persona_id: &'a str,
pub name: &'a str,
pub system_prompt: &'a str,
pub is_built_in: bool,
pub include_all_memories: bool,
pub created_at: i64,
pub updated_at: i64,
}
#[derive(Serialize, Queryable, Clone, Debug)]
pub struct Persona {
pub id: i32,
pub user_id: i32,
pub persona_id: String,
pub name: String,
pub system_prompt: String,
pub is_built_in: bool,
pub include_all_memories: bool,
pub created_at: i64,
pub updated_at: i64,
}
#[derive(Insertable)] #[derive(Insertable)]
#[diesel(table_name = video_preview_clips)] #[diesel(table_name = video_preview_clips)]
pub struct InsertVideoPreviewClip { pub struct InsertVideoPreviewClip {

384
src/database/persona_dao.rs Normal file
View File

@@ -0,0 +1,384 @@
#![allow(dead_code)]
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use crate::database::models::{InsertPersona, Persona};
use crate::database::schema;
use crate::database::{DbError, DbErrorKind, connect};
use crate::otel::trace_db_call;
/// Patch shape for update_persona. None = leave field alone. Built-ins are
/// allowed to flip `include_all_memories` but should reject name/prompt
/// edits at the handler layer (built-in copy lives in the migration).
pub struct PersonaPatch {
pub name: Option<String>,
pub system_prompt: Option<String>,
pub include_all_memories: Option<bool>,
}
/// One row of a bulk migration upload. Fields named to match the JSON
/// shape the mobile client uploads (`POST /personas/migrate`).
pub struct ImportPersona {
pub persona_id: String,
pub name: String,
pub system_prompt: String,
pub is_built_in: bool,
pub created_at: i64,
}
pub trait PersonaDao: Sync + Send {
fn list_personas(
&mut self,
cx: &opentelemetry::Context,
user_id: i32,
) -> Result<Vec<Persona>, DbError>;
fn get_persona(
&mut self,
cx: &opentelemetry::Context,
user_id: i32,
persona_id: &str,
) -> Result<Option<Persona>, DbError>;
fn create_persona(
&mut self,
cx: &opentelemetry::Context,
user_id: i32,
persona_id: &str,
name: &str,
system_prompt: &str,
is_built_in: bool,
include_all_memories: bool,
) -> Result<Persona, DbError>;
fn update_persona(
&mut self,
cx: &opentelemetry::Context,
user_id: i32,
persona_id: &str,
patch: PersonaPatch,
) -> Result<Option<Persona>, DbError>;
fn delete_persona(
&mut self,
cx: &opentelemetry::Context,
user_id: i32,
persona_id: &str,
) -> Result<bool, DbError>;
/// Idempotent bulk import. INSERT OR IGNORE on (user_id, persona_id)
/// — re-uploading the same set is a no-op. Returns the number of rows
/// actually inserted (skipped duplicates don't count).
fn bulk_import(
&mut self,
cx: &opentelemetry::Context,
user_id: i32,
personas: &[ImportPersona],
) -> Result<usize, DbError>;
}
pub struct SqlitePersonaDao {
connection: Arc<Mutex<SqliteConnection>>,
}
impl Default for SqlitePersonaDao {
fn default() -> Self {
Self::new()
}
}
impl SqlitePersonaDao {
pub fn new() -> Self {
Self {
connection: Arc::new(Mutex::new(connect())),
}
}
pub fn from_connection(conn: Arc<Mutex<SqliteConnection>>) -> Self {
Self { connection: conn }
}
}
impl PersonaDao for SqlitePersonaDao {
fn list_personas(
&mut self,
cx: &opentelemetry::Context,
uid: i32,
) -> Result<Vec<Persona>, DbError> {
trace_db_call(cx, "query", "list_personas", |_span| {
use schema::personas::dsl::*;
let mut conn = self.connection.lock().expect("PersonaDao lock");
personas
.filter(user_id.eq(uid))
.order(created_at.asc())
.load::<Persona>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn get_persona(
&mut self,
cx: &opentelemetry::Context,
uid: i32,
pid: &str,
) -> Result<Option<Persona>, DbError> {
trace_db_call(cx, "query", "get_persona", |_span| {
use schema::personas::dsl::*;
let mut conn = self.connection.lock().expect("PersonaDao lock");
personas
.filter(user_id.eq(uid))
.filter(persona_id.eq(pid))
.first::<Persona>(conn.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn create_persona(
&mut self,
cx: &opentelemetry::Context,
uid: i32,
pid: &str,
nm: &str,
prompt: &str,
builtin: bool,
include_all: bool,
) -> Result<Persona, DbError> {
trace_db_call(cx, "insert", "create_persona", |_span| {
use schema::personas::dsl::*;
let mut conn = self.connection.lock().expect("PersonaDao lock");
let now = chrono::Utc::now().timestamp_millis();
diesel::insert_into(personas)
.values(InsertPersona {
user_id: uid,
persona_id: pid,
name: nm,
system_prompt: prompt,
is_built_in: builtin,
include_all_memories: include_all,
created_at: now,
updated_at: now,
})
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
personas
.filter(user_id.eq(uid))
.filter(persona_id.eq(pid))
.first::<Persona>(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::InsertError))
}
fn update_persona(
&mut self,
cx: &opentelemetry::Context,
uid: i32,
pid: &str,
patch: PersonaPatch,
) -> Result<Option<Persona>, DbError> {
trace_db_call(cx, "update", "update_persona", |_span| {
use schema::personas::dsl::*;
let mut conn = self.connection.lock().expect("PersonaDao lock");
let now = chrono::Utc::now().timestamp_millis();
// Apply each field as its own UPDATE — keeps types simple
// (Diesel's tuple updates don't compose cleanly across optional
// columns) and matches the pattern already in use for entities
// (knowledge_dao.rs::update_entity).
if let Some(ref new_name) = patch.name {
diesel::update(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid)))
.set((name.eq(new_name), updated_at.eq(now)))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update name error: {}", e))?;
}
if let Some(ref new_prompt) = patch.system_prompt {
diesel::update(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid)))
.set((system_prompt.eq(new_prompt), updated_at.eq(now)))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update prompt error: {}", e))?;
}
if let Some(new_include_all) = patch.include_all_memories {
diesel::update(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid)))
.set((include_all_memories.eq(new_include_all), updated_at.eq(now)))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Update include_all error: {}", e))?;
}
personas
.filter(user_id.eq(uid))
.filter(persona_id.eq(pid))
.first::<Persona>(conn.deref_mut())
.optional()
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
})
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
fn delete_persona(
&mut self,
cx: &opentelemetry::Context,
uid: i32,
pid: &str,
) -> Result<bool, DbError> {
trace_db_call(cx, "delete", "delete_persona", |_span| {
use schema::personas::dsl::*;
let mut conn = self.connection.lock().expect("PersonaDao lock");
let n = diesel::delete(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid)))
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))?;
Ok(n > 0)
})
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
fn bulk_import(
&mut self,
cx: &opentelemetry::Context,
uid: i32,
rows: &[ImportPersona],
) -> Result<usize, DbError> {
trace_db_call(cx, "insert", "bulk_import_personas", |_span| {
let mut conn = self.connection.lock().expect("PersonaDao lock");
let now = chrono::Utc::now().timestamp_millis();
let mut inserted = 0usize;
// INSERT OR IGNORE on the (user_id, persona_id) UNIQUE so
// re-running migrate is a no-op for personas already on the
// server.
for p in rows {
let n = diesel::sql_query(
"INSERT OR IGNORE INTO personas (user_id, persona_id, name, system_prompt, \
is_built_in, include_all_memories, created_at, updated_at) \
VALUES (?, ?, ?, ?, ?, 0, ?, ?)",
)
.bind::<diesel::sql_types::Integer, _>(uid)
.bind::<diesel::sql_types::Text, _>(&p.persona_id)
.bind::<diesel::sql_types::Text, _>(&p.name)
.bind::<diesel::sql_types::Text, _>(&p.system_prompt)
.bind::<diesel::sql_types::Bool, _>(p.is_built_in)
.bind::<diesel::sql_types::BigInt, _>(p.created_at)
.bind::<diesel::sql_types::BigInt, _>(now)
.execute(conn.deref_mut())
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
inserted += n;
}
Ok(inserted)
})
.map_err(|_| DbError::new(DbErrorKind::InsertError))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::database::test::in_memory_db_connection;
fn dao_with_user(username: &str) -> (SqlitePersonaDao, i32) {
use crate::database::schema::users::dsl as u;
let conn = Arc::new(Mutex::new(in_memory_db_connection()));
diesel::insert_into(u::users)
.values((u::username.eq(username), u::password.eq("x")))
.execute(conn.lock().unwrap().deref_mut())
.unwrap();
let user_id: i32 = u::users
.filter(u::username.eq(username))
.select(u::id)
.first(conn.lock().unwrap().deref_mut())
.unwrap();
(SqlitePersonaDao::from_connection(conn), user_id)
}
#[test]
fn create_and_list_round_trip() {
let cx = opentelemetry::Context::new();
let (mut dao, uid) = dao_with_user("alice");
// The migration seeds 3 built-ins for any existing user; alice
// was created post-migration so she starts empty.
let p = dao
.create_persona(&cx, uid, "custom-1", "Custom A", "prompt A", false, false)
.unwrap();
assert_eq!(p.persona_id, "custom-1");
assert_eq!(p.user_id, uid);
assert!(!p.is_built_in);
let list = dao.list_personas(&cx, uid).unwrap();
assert_eq!(list.len(), 1);
assert_eq!(list[0].persona_id, "custom-1");
}
#[test]
fn unique_constraint_blocks_duplicate_persona_id() {
let cx = opentelemetry::Context::new();
let (mut dao, uid) = dao_with_user("bob");
dao.create_persona(&cx, uid, "x", "X", "p", false, false)
.unwrap();
let err = dao.create_persona(&cx, uid, "x", "X2", "p2", false, false);
assert!(
err.is_err(),
"second insert with same persona_id should fail"
);
}
#[test]
fn bulk_import_is_idempotent() {
let cx = opentelemetry::Context::new();
let (mut dao, uid) = dao_with_user("carol");
let rows = vec![
ImportPersona {
persona_id: "custom-a".into(),
name: "A".into(),
system_prompt: "p1".into(),
is_built_in: false,
created_at: 1,
},
ImportPersona {
persona_id: "custom-b".into(),
name: "B".into(),
system_prompt: "p2".into(),
is_built_in: false,
created_at: 2,
},
];
let first = dao.bulk_import(&cx, uid, &rows).unwrap();
assert_eq!(first, 2);
let second = dao.bulk_import(&cx, uid, &rows).unwrap();
assert_eq!(second, 0, "re-import should insert nothing");
assert_eq!(dao.list_personas(&cx, uid).unwrap().len(), 2);
}
#[test]
fn update_toggles_include_all_memories() {
let cx = opentelemetry::Context::new();
let (mut dao, uid) = dao_with_user("dan");
dao.create_persona(&cx, uid, "j", "Journal", "p", true, false)
.unwrap();
let updated = dao
.update_persona(
&cx,
uid,
"j",
PersonaPatch {
name: None,
system_prompt: None,
include_all_memories: Some(true),
},
)
.unwrap()
.unwrap();
assert!(updated.include_all_memories);
}
}

View File

@@ -57,6 +57,7 @@ diesel::table! {
confidence -> Float, confidence -> Float,
status -> Text, status -> Text,
created_at -> BigInt, created_at -> BigInt,
persona_id -> Text,
} }
} }
@@ -159,6 +160,20 @@ diesel::table! {
} }
} }
diesel::table! {
personas (id) {
id -> Integer,
user_id -> Integer,
persona_id -> Text,
name -> Text,
system_prompt -> Text,
is_built_in -> Bool,
include_all_memories -> Bool,
created_at -> BigInt,
updated_at -> BigInt,
}
}
diesel::table! { diesel::table! {
persons (id) { persons (id) {
id -> Integer, id -> Integer,
@@ -249,6 +264,7 @@ diesel::joinable!(entity_photo_links -> libraries (library_id));
diesel::joinable!(face_detections -> libraries (library_id)); diesel::joinable!(face_detections -> libraries (library_id));
diesel::joinable!(face_detections -> persons (person_id)); diesel::joinable!(face_detections -> persons (person_id));
diesel::joinable!(image_exif -> libraries (library_id)); diesel::joinable!(image_exif -> libraries (library_id));
diesel::joinable!(personas -> users (user_id));
diesel::joinable!(persons -> entities (entity_id)); diesel::joinable!(persons -> entities (entity_id));
diesel::joinable!(photo_insights -> libraries (library_id)); diesel::joinable!(photo_insights -> libraries (library_id));
diesel::joinable!(tagged_photo -> tags (tag_id)); diesel::joinable!(tagged_photo -> tags (tag_id));
@@ -265,6 +281,7 @@ diesel::allow_tables_to_appear_in_same_query!(
image_exif, image_exif,
libraries, libraries,
location_history, location_history,
personas,
persons, persons,
photo_insights, photo_insights,
search_history, search_history,

View File

@@ -1,5 +1,5 @@
use actix_web::dev::{ServiceFactory, ServiceRequest}; use actix_web::dev::{ServiceFactory, ServiceRequest};
use actix_web::{App, HttpResponse, Responder, web}; use actix_web::{App, HttpRequest, HttpResponse, Responder, web};
use chrono::Utc; use chrono::Utc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Mutex; use std::sync::Mutex;
@@ -7,8 +7,38 @@ use std::sync::Mutex;
use crate::data::Claims; use crate::data::Claims;
use crate::database::models::{Entity, EntityFact, EntityPhotoLink}; use crate::database::models::{Entity, EntityFact, EntityPhotoLink};
use crate::database::{ use crate::database::{
EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, RecentActivity, EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, RecentActivity,
}; };
use crate::personas::PersonaDaoData;
/// Resolve the `X-Persona-Id` header into a `PersonaFilter`. Missing
/// header → `'default'`. If the persona has `include_all_memories=true`,
/// returns `PersonaFilter::All` so reads see the full hive-mind pool.
/// On lookup failure (e.g. malformed JWT) returns `Single("default")` —
/// safer than `All` because it preserves the historical baseline view.
fn resolve_persona_filter(
req: &HttpRequest,
claims: &Claims,
persona_dao: &PersonaDaoData,
) -> PersonaFilter {
let pid = req
.headers()
.get("X-Persona-Id")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
.unwrap_or_else(|| "default".to_string());
let Ok(uid) = claims.sub.parse::<i32>() else {
return PersonaFilter::Single(pid);
};
let cx = opentelemetry::Context::current();
let mut dao = persona_dao.lock().expect("Unable to lock PersonaDao");
match dao.get_persona(&cx, uid, &pid) {
Ok(Some(p)) if p.include_all_memories => PersonaFilter::All,
_ => PersonaFilter::Single(pid),
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Request / Response types // Request / Response types
@@ -246,10 +276,13 @@ async fn list_entities<D: KnowledgeDao + 'static>(
} }
async fn get_entity<D: KnowledgeDao + 'static>( async fn get_entity<D: KnowledgeDao + 'static>(
_claims: Claims, req: HttpRequest,
claims: Claims,
id: web::Path<i32>, id: web::Path<i32>,
dao: web::Data<Mutex<D>>, dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder { ) -> impl Responder {
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current(); let cx = opentelemetry::Context::current();
let entity_id = id.into_inner(); let entity_id = id.into_inner();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
@@ -266,8 +299,8 @@ async fn get_entity<D: KnowledgeDao + 'static>(
} }
}; };
// Fetch all facts (all statuses for audit) // Fetch all facts (all statuses for audit), scoped to the active persona.
let raw_facts: Vec<EntityFact> = match dao.get_facts_for_entity(&cx, entity_id) { let raw_facts: Vec<EntityFact> = match dao.get_facts_for_entity(&cx, entity_id, &persona) {
Ok(f) => f, Ok(f) => f,
Err(e) => { Err(e) => {
log::error!("get_facts_for_entity error: {:?}", e); log::error!("get_facts_for_entity error: {:?}", e);
@@ -426,9 +459,11 @@ async fn merge_entities<D: KnowledgeDao + 'static>(
} }
async fn list_facts<D: KnowledgeDao + 'static>( async fn list_facts<D: KnowledgeDao + 'static>(
_claims: Claims, req: HttpRequest,
claims: Claims,
query: web::Query<FactListQuery>, query: web::Query<FactListQuery>,
dao: web::Data<Mutex<D>>, dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder { ) -> impl Responder {
let limit = query.limit.unwrap_or(50).min(200); let limit = query.limit.unwrap_or(50).min(200);
let offset = query.offset.unwrap_or(0); let offset = query.offset.unwrap_or(0);
@@ -438,11 +473,13 @@ async fn list_facts<D: KnowledgeDao + 'static>(
Some("all") => None, Some("all") => None,
Some(s) => Some(s.to_string()), Some(s) => Some(s.to_string()),
}; };
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let filter = FactFilter { let filter = FactFilter {
entity_id: query.entity_id, entity_id: query.entity_id,
status: status_filter, status: status_filter,
predicate: query.predicate.clone(), predicate: query.predicate.clone(),
persona,
limit, limit,
offset, offset,
}; };
@@ -539,18 +576,21 @@ async fn delete_fact<D: KnowledgeDao + 'static>(
} }
async fn get_recent<D: KnowledgeDao + 'static>( async fn get_recent<D: KnowledgeDao + 'static>(
_claims: Claims, req: HttpRequest,
claims: Claims,
query: web::Query<RecentQuery>, query: web::Query<RecentQuery>,
dao: web::Data<Mutex<D>>, dao: web::Data<Mutex<D>>,
persona_dao: PersonaDaoData,
) -> impl Responder { ) -> impl Responder {
let since = query let since = query
.since .since
.unwrap_or_else(|| Utc::now().timestamp() - 86400); .unwrap_or_else(|| Utc::now().timestamp() - 86400);
let limit = query.limit.unwrap_or(20).min(100); let limit = query.limit.unwrap_or(20).min(100);
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
let cx = opentelemetry::Context::current(); let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
match dao.get_recent_activity(&cx, since, limit) { match dao.get_recent_activity(&cx, since, limit, &persona) {
Ok(RecentActivity { entities, facts }) => { Ok(RecentActivity { entities, facts }) => {
let entity_summaries: Vec<EntitySummary> = let entity_summaries: Vec<EntitySummary> =
entities.into_iter().map(EntitySummary::from).collect(); entities.into_iter().map(EntitySummary::from).collect();

View File

@@ -26,6 +26,7 @@ pub mod memories;
pub mod otel; pub mod otel;
pub mod parsers; pub mod parsers;
pub mod perceptual_hash; pub mod perceptual_hash;
pub mod personas;
pub mod service; pub mod service;
pub mod state; pub mod state;
pub mod tags; pub mod tags;

View File

@@ -84,6 +84,7 @@ mod video;
mod knowledge; mod knowledge;
mod memories; mod memories;
mod otel; mod otel;
mod personas;
mod service; mod service;
#[cfg(test)] #[cfg(test)]
mod testhelpers; mod testhelpers;
@@ -1962,6 +1963,7 @@ fn main() -> std::io::Result<()> {
.service(libraries::list_libraries) .service(libraries::list_libraries)
.add_feature(add_tag_services::<_, SqliteTagDao>) .add_feature(add_tag_services::<_, SqliteTagDao>)
.add_feature(knowledge::add_knowledge_services::<_, SqliteKnowledgeDao>) .add_feature(knowledge::add_knowledge_services::<_, SqliteKnowledgeDao>)
.add_feature(personas::add_persona_services)
.add_feature(faces::add_face_services::<_, faces::SqliteFaceDao>) .add_feature(faces::add_face_services::<_, faces::SqliteFaceDao>)
.add_feature(duplicates::add_duplicate_services) .add_feature(duplicates::add_duplicate_services)
.app_data(app_data.clone()) .app_data(app_data.clone())
@@ -1985,6 +1987,9 @@ fn main() -> std::io::Result<()> {
.app_data::<Data<Mutex<SqliteKnowledgeDao>>>(Data::new(Mutex::new( .app_data::<Data<Mutex<SqliteKnowledgeDao>>>(Data::new(Mutex::new(
SqliteKnowledgeDao::new(), SqliteKnowledgeDao::new(),
))) )))
.app_data::<Data<Mutex<Box<dyn database::PersonaDao>>>>(Data::new(Mutex::new(
Box::new(database::SqlitePersonaDao::new()),
)))
.app_data::<Data<Mutex<faces::SqliteFaceDao>>>(Data::new(Mutex::new(face_dao))) .app_data::<Data<Mutex<faces::SqliteFaceDao>>>(Data::new(Mutex::new(face_dao)))
.app_data::<Data<crate::ai::face_client::FaceClient>>(Data::new( .app_data::<Data<crate::ai::face_client::FaceClient>>(Data::new(
app_data.face_client.clone(), app_data.face_client.clone(),

309
src/personas.rs Normal file
View File

@@ -0,0 +1,309 @@
//! HTTP handlers for the server-side persona store.
//!
//! Personas previously lived only in mobile AsyncStorage; this module
//! elevates them so they can sync across devices and so the
//! `entity_facts.persona_id` column has something to reference.
//!
//! Built-in personas (default / journal / factual) are seeded by the
//! migration. Customs are created here and may be migrated up from a
//! device's local store via `POST /personas/migrate`.
use actix_web::dev::{ServiceFactory, ServiceRequest};
use actix_web::{App, HttpResponse, Responder, web};
use serde::{Deserialize, Serialize};
use std::sync::Mutex;
use crate::data::Claims;
use crate::database::models::Persona;
use crate::database::{ImportPersona, PersonaDao, PersonaPatch};
// ---------------------------------------------------------------------------
// Wire shapes — camelCase out the door, snake_case from the DB.
// ---------------------------------------------------------------------------
#[derive(Serialize)]
pub struct PersonaView {
pub id: String,
pub name: String,
#[serde(rename = "systemPrompt")]
pub system_prompt: String,
#[serde(rename = "isBuiltIn")]
pub is_built_in: bool,
#[serde(rename = "includeAllMemories")]
pub include_all_memories: bool,
#[serde(rename = "createdAt")]
pub created_at: i64,
#[serde(rename = "updatedAt")]
pub updated_at: i64,
}
impl From<Persona> for PersonaView {
fn from(p: Persona) -> Self {
Self {
id: p.persona_id,
name: p.name,
system_prompt: p.system_prompt,
is_built_in: p.is_built_in,
include_all_memories: p.include_all_memories,
created_at: p.created_at,
updated_at: p.updated_at,
}
}
}
#[derive(Deserialize)]
pub struct CreatePersonaRequest {
pub name: String,
#[serde(rename = "systemPrompt")]
pub system_prompt: String,
/// Optional caller-provided id. When present (e.g. a client that
/// already minted `"custom-1735124234"` locally and is upgrading from
/// the AsyncStorage-only era), the server uses it; collisions return
/// 409. When absent the server mints `"custom-<ms>"`.
#[serde(default, rename = "personaId")]
pub persona_id: Option<String>,
}
#[derive(Deserialize)]
pub struct UpdatePersonaRequest {
#[serde(default)]
pub name: Option<String>,
#[serde(default, rename = "systemPrompt")]
pub system_prompt: Option<String>,
#[serde(default, rename = "includeAllMemories")]
pub include_all_memories: Option<bool>,
}
#[derive(Deserialize)]
pub struct MigrateRequest {
pub personas: Vec<MigratePersona>,
}
#[derive(Deserialize)]
pub struct MigratePersona {
pub id: String,
pub name: String,
#[serde(rename = "systemPrompt")]
pub system_prompt: String,
#[serde(default, rename = "isBuiltIn")]
pub is_built_in: bool,
#[serde(default, rename = "createdAt")]
pub created_at: Option<i64>,
}
#[derive(Serialize)]
pub struct MigrateResponse {
pub inserted: usize,
}
// ---------------------------------------------------------------------------
// Service registration
// ---------------------------------------------------------------------------
pub type PersonaDaoData = web::Data<Mutex<Box<dyn PersonaDao>>>;
pub fn add_persona_services<T>(app: App<T>) -> App<T>
where
T: ServiceFactory<ServiceRequest, Config = (), Error = actix_web::Error, InitError = ()>,
{
app.service(
web::scope("/personas")
.service(web::resource("/migrate").route(web::post().to(migrate_personas)))
.service(
web::resource("")
.route(web::get().to(list_personas))
.route(web::post().to(create_persona)),
)
.service(
web::resource("/{persona_id}")
.route(web::put().to(update_persona))
.route(web::delete().to(delete_persona)),
),
)
}
// ---------------------------------------------------------------------------
// Handlers
// ---------------------------------------------------------------------------
fn user_id_from_claims(claims: &Claims) -> Option<i32> {
claims.sub.parse::<i32>().ok()
}
async fn list_personas(claims: Claims, dao: PersonaDaoData) -> impl Responder {
let Some(uid) = user_id_from_claims(&claims) else {
return HttpResponse::Unauthorized().json(serde_json::json!({"error": "Invalid claims"}));
};
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock PersonaDao");
match dao.list_personas(&cx, uid) {
Ok(rows) => {
let views: Vec<PersonaView> = rows.into_iter().map(PersonaView::from).collect();
HttpResponse::Ok().json(views)
}
Err(e) => {
log::error!("list_personas error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn create_persona(
claims: Claims,
body: web::Json<CreatePersonaRequest>,
dao: PersonaDaoData,
) -> impl Responder {
let Some(uid) = user_id_from_claims(&claims) else {
return HttpResponse::Unauthorized().json(serde_json::json!({"error": "Invalid claims"}));
};
if body.name.trim().is_empty() {
return HttpResponse::BadRequest().json(serde_json::json!({"error": "name is required"}));
}
if body.system_prompt.trim().is_empty() {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "systemPrompt is required"}));
}
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock PersonaDao");
let pid = match body.persona_id.as_deref() {
Some(s) if !s.trim().is_empty() => s.to_string(),
_ => format!("custom-{}", chrono::Utc::now().timestamp_millis()),
};
if matches!(pid.as_str(), "default" | "journal" | "factual") {
return HttpResponse::Conflict()
.json(serde_json::json!({"error": "persona id collides with a built-in"}));
}
// Pre-check existence so we can return 409 cleanly. The DB UNIQUE
// would also catch it, but parsing Diesel's "constraint violation"
// out of a generic DbError is uglier than a quick lookup.
if let Ok(Some(_)) = dao.get_persona(&cx, uid, &pid) {
return HttpResponse::Conflict()
.json(serde_json::json!({"error": "persona already exists"}));
}
match dao.create_persona(
&cx,
uid,
&pid,
&body.name,
&body.system_prompt,
false,
false,
) {
Ok(p) => HttpResponse::Created().json(PersonaView::from(p)),
Err(e) => {
log::error!("create_persona error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn update_persona(
claims: Claims,
path: web::Path<String>,
body: web::Json<UpdatePersonaRequest>,
dao: PersonaDaoData,
) -> impl Responder {
let Some(uid) = user_id_from_claims(&claims) else {
return HttpResponse::Unauthorized().json(serde_json::json!({"error": "Invalid claims"}));
};
let pid = path.into_inner();
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock PersonaDao");
let patch = PersonaPatch {
name: body.name.clone(),
system_prompt: body.system_prompt.clone(),
include_all_memories: body.include_all_memories,
};
match dao.update_persona(&cx, uid, &pid, patch) {
Ok(Some(p)) => HttpResponse::Ok().json(PersonaView::from(p)),
Ok(None) => {
HttpResponse::NotFound().json(serde_json::json!({"error": "Persona not found"}))
}
Err(e) => {
log::error!("update_persona error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn delete_persona(
claims: Claims,
path: web::Path<String>,
dao: PersonaDaoData,
) -> impl Responder {
let Some(uid) = user_id_from_claims(&claims) else {
return HttpResponse::Unauthorized().json(serde_json::json!({"error": "Invalid claims"}));
};
let pid = path.into_inner();
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock PersonaDao");
match dao.get_persona(&cx, uid, &pid) {
Ok(Some(p)) if p.is_built_in => {
return HttpResponse::Conflict()
.json(serde_json::json!({"error": "Cannot delete built-in persona"}));
}
Ok(None) => {
return HttpResponse::NotFound()
.json(serde_json::json!({"error": "Persona not found"}));
}
Err(e) => {
log::error!("delete_persona lookup error: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
Ok(Some(_)) => {}
}
match dao.delete_persona(&cx, uid, &pid) {
Ok(_) => HttpResponse::NoContent().finish(),
Err(e) => {
log::error!("delete_persona error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}
async fn migrate_personas(
claims: Claims,
body: web::Json<MigrateRequest>,
dao: PersonaDaoData,
) -> impl Responder {
let Some(uid) = user_id_from_claims(&claims) else {
return HttpResponse::Unauthorized().json(serde_json::json!({"error": "Invalid claims"}));
};
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock PersonaDao");
// Filter out built-in ids — those are already seeded by the
// migration and re-importing them would be a no-op anyway thanks to
// INSERT OR IGNORE, but skipping early avoids the UNIQUE round-trip.
let now = chrono::Utc::now().timestamp_millis();
let rows: Vec<ImportPersona> = body
.personas
.iter()
.filter(|p| !matches!(p.id.as_str(), "default" | "journal" | "factual"))
.map(|p| ImportPersona {
persona_id: p.id.clone(),
name: p.name.clone(),
system_prompt: p.system_prompt.clone(),
is_built_in: p.is_built_in,
created_at: p.created_at.unwrap_or(now),
})
.collect();
match dao.bulk_import(&cx, uid, &rows) {
Ok(inserted) => HttpResponse::Ok().json(MigrateResponse { inserted }),
Err(e) => {
log::error!("migrate_personas error: {:?}", e);
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
}
}
}