Merge pull request 'personas: elevate to server with per-persona fact scoping' (#88) from feature/persona-knowledge-segmentation into master
Reviewed-on: #88
This commit was merged in pull request #88.
This commit is contained in:
@@ -48,6 +48,11 @@ pub struct GeneratePhotoInsightRequest {
|
||||
/// falls back to `DEFAULT_FEWSHOT_INSIGHT_IDS`.
|
||||
#[serde(default)]
|
||||
pub fewshot_insight_ids: Option<Vec<i32>>,
|
||||
/// Active persona id for this generation. New facts are tagged with
|
||||
/// it (`entity_facts.persona_id`); recall during the agentic loop is
|
||||
/// scoped to it. Defaults to `"default"` when absent.
|
||||
#[serde(default)]
|
||||
pub persona_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
@@ -376,6 +381,13 @@ pub async fn generate_agentic_insight_handler(
|
||||
.collect()
|
||||
};
|
||||
|
||||
let persona_id = request
|
||||
.persona_id
|
||||
.clone()
|
||||
.filter(|s| !s.trim().is_empty())
|
||||
.unwrap_or_else(|| "default".to_string());
|
||||
span.set_attribute(KeyValue::new("persona_id", persona_id.clone()));
|
||||
|
||||
let result = insight_generator
|
||||
.generate_agentic_insight_for_photo(
|
||||
&normalized_path,
|
||||
@@ -390,6 +402,7 @@ pub async fn generate_agentic_insight_handler(
|
||||
request.backend.clone(),
|
||||
fewshot_examples,
|
||||
fewshot_ids,
|
||||
persona_id,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -645,6 +658,10 @@ pub struct ChatTurnHttpRequest {
|
||||
/// semantics. Also seeds the bootstrap path when no insight exists.
|
||||
#[serde(default)]
|
||||
pub system_prompt: Option<String>,
|
||||
/// Active persona id for this turn. New facts/recalls scope to it.
|
||||
/// Defaults to `"default"` when missing.
|
||||
#[serde(default)]
|
||||
pub persona_id: Option<String>,
|
||||
#[serde(default)]
|
||||
pub amend: bool,
|
||||
/// When true, force the bootstrap path even if an insight already
|
||||
@@ -707,6 +724,7 @@ pub async fn chat_turn_handler(
|
||||
min_p: request.min_p,
|
||||
max_iterations: request.max_iterations,
|
||||
system_prompt: request.system_prompt.clone(),
|
||||
persona_id: request.persona_id.clone(),
|
||||
amend: request.amend,
|
||||
regenerate: request.regenerate,
|
||||
};
|
||||
@@ -923,6 +941,7 @@ pub async fn chat_stream_handler(
|
||||
min_p: request.min_p,
|
||||
max_iterations: request.max_iterations,
|
||||
system_prompt: request.system_prompt.clone(),
|
||||
persona_id: request.persona_id.clone(),
|
||||
amend: request.amend,
|
||||
regenerate: request.regenerate,
|
||||
};
|
||||
|
||||
@@ -50,6 +50,10 @@ pub struct ChatTurnRequest {
|
||||
/// In amend mode, persisted into the new insight row's system message.
|
||||
/// None / empty = no change.
|
||||
pub system_prompt: Option<String>,
|
||||
/// Active persona id for this turn. Tools that write to
|
||||
/// `entity_facts` tag the new rows with it; `recall_facts_for_photo`
|
||||
/// scopes its read to it. None defaults to `"default"`.
|
||||
pub persona_id: Option<String>,
|
||||
/// When true, write a new insight row (regenerating title) instead of
|
||||
/// updating training_messages on the existing row.
|
||||
pub amend: bool,
|
||||
@@ -231,6 +235,13 @@ impl InsightChatService {
|
||||
bail!("user_message exceeds 8192 chars");
|
||||
}
|
||||
|
||||
let active_persona = req
|
||||
.persona_id
|
||||
.clone()
|
||||
.filter(|s| !s.trim().is_empty())
|
||||
.unwrap_or_else(|| "default".to_string());
|
||||
span.set_attribute(KeyValue::new("persona_id", active_persona.clone()));
|
||||
|
||||
let normalized = normalize_path(&req.file_path);
|
||||
|
||||
// 1. Acquire the per-(library, file) async mutex. Two concurrent
|
||||
@@ -464,6 +475,7 @@ impl InsightChatService {
|
||||
&ollama_client,
|
||||
&image_base64,
|
||||
&normalized,
|
||||
&active_persona,
|
||||
&loop_cx,
|
||||
)
|
||||
.await;
|
||||
@@ -737,6 +749,11 @@ impl InsightChatService {
|
||||
insight: crate::database::models::PhotoInsight,
|
||||
tx: tokio::sync::mpsc::Sender<ChatStreamEvent>,
|
||||
) -> Result<()> {
|
||||
let active_persona = req
|
||||
.persona_id
|
||||
.clone()
|
||||
.filter(|s| !s.trim().is_empty())
|
||||
.unwrap_or_else(|| "default".to_string());
|
||||
let raw_history = insight.training_messages.as_ref().ok_or_else(|| {
|
||||
anyhow!("insight has no chat history; regenerate this insight in agentic mode")
|
||||
})?;
|
||||
@@ -826,6 +843,7 @@ impl InsightChatService {
|
||||
tools,
|
||||
&image_base64,
|
||||
&normalized,
|
||||
&active_persona,
|
||||
max_iterations,
|
||||
&tx,
|
||||
)
|
||||
@@ -915,6 +933,11 @@ impl InsightChatService {
|
||||
normalized: String,
|
||||
tx: tokio::sync::mpsc::Sender<ChatStreamEvent>,
|
||||
) -> Result<()> {
|
||||
let active_persona = req
|
||||
.persona_id
|
||||
.clone()
|
||||
.filter(|s| !s.trim().is_empty())
|
||||
.unwrap_or_else(|| "default".to_string());
|
||||
let effective_backend = resolve_bootstrap_backend(req.backend.as_deref())?;
|
||||
let is_hybrid = effective_backend == "hybrid";
|
||||
|
||||
@@ -1008,6 +1031,7 @@ impl InsightChatService {
|
||||
tools,
|
||||
&image_base64,
|
||||
&normalized,
|
||||
&active_persona,
|
||||
max_iterations,
|
||||
&tx,
|
||||
)
|
||||
@@ -1157,6 +1181,7 @@ impl InsightChatService {
|
||||
tools: Vec<Tool>,
|
||||
image_base64: &Option<String>,
|
||||
normalized: &str,
|
||||
active_persona: &str,
|
||||
max_iterations: usize,
|
||||
tx: &tokio::sync::mpsc::Sender<ChatStreamEvent>,
|
||||
) -> Result<AgenticLoopOutcome> {
|
||||
@@ -1235,6 +1260,7 @@ impl InsightChatService {
|
||||
ollama_client,
|
||||
image_base64,
|
||||
normalized,
|
||||
active_persona,
|
||||
&cx,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -1534,7 +1534,15 @@ Return ONLY the summary, nothing else."#,
|
||||
|
||||
// ── Tool executors for agentic loop ────────────────────────────────
|
||||
|
||||
/// Dispatch a tool call to the appropriate executor
|
||||
/// Dispatch a tool call to the appropriate executor.
|
||||
///
|
||||
/// `persona_id` identifies the persona this loop is generating for —
|
||||
/// `store_fact` tags new facts with it, `recall_facts_for_photo`
|
||||
/// filters reads to it (always Single in the agentic loop, even when
|
||||
/// the persona has `include_all_memories=true`; the hive-mind toggle
|
||||
/// is for human browsing of `/knowledge/*`, where mixing voices is
|
||||
/// the explicit goal — during generation the persona's own voice
|
||||
/// must stay clean).
|
||||
pub(crate) async fn execute_tool(
|
||||
&self,
|
||||
tool_name: &str,
|
||||
@@ -1542,6 +1550,7 @@ Return ONLY the summary, nothing else."#,
|
||||
ollama: &OllamaClient,
|
||||
image_base64: &Option<String>,
|
||||
file_path: &str,
|
||||
persona_id: &str,
|
||||
cx: &opentelemetry::Context,
|
||||
) -> String {
|
||||
let result = match tool_name {
|
||||
@@ -1556,9 +1565,15 @@ Return ONLY the summary, nothing else."#,
|
||||
"reverse_geocode" => self.tool_reverse_geocode(arguments).await,
|
||||
"get_personal_place_at" => self.tool_get_personal_place_at(arguments).await,
|
||||
"recall_entities" => self.tool_recall_entities(arguments, cx).await,
|
||||
"recall_facts_for_photo" => self.tool_recall_facts_for_photo(arguments, cx).await,
|
||||
"recall_facts_for_photo" => {
|
||||
self.tool_recall_facts_for_photo(arguments, persona_id, cx)
|
||||
.await
|
||||
}
|
||||
"store_entity" => self.tool_store_entity(arguments, ollama, cx).await,
|
||||
"store_fact" => self.tool_store_fact(arguments, file_path, cx).await,
|
||||
"store_fact" => {
|
||||
self.tool_store_fact(arguments, file_path, persona_id, cx)
|
||||
.await
|
||||
}
|
||||
"get_current_datetime" => Self::tool_get_current_datetime(),
|
||||
unknown => format!("Unknown tool: {}", unknown),
|
||||
};
|
||||
@@ -2391,8 +2406,11 @@ Return ONLY the summary, nothing else."#,
|
||||
async fn tool_recall_facts_for_photo(
|
||||
&self,
|
||||
args: &serde_json::Value,
|
||||
persona_id: &str,
|
||||
cx: &opentelemetry::Context,
|
||||
) -> String {
|
||||
use crate::database::PersonaFilter;
|
||||
let persona_filter = PersonaFilter::Single(persona_id.to_string());
|
||||
let file_path = match args.get("file_path").and_then(|v| v.as_str()) {
|
||||
Some(p) => p.to_string(),
|
||||
None => return "Error: missing required parameter 'file_path'".to_string(),
|
||||
@@ -2432,7 +2450,7 @@ Return ONLY the summary, nothing else."#,
|
||||
"Entity: {} ({}, role: {})",
|
||||
e.name, e.entity_type, role
|
||||
));
|
||||
if let Ok(facts) = kdao.get_facts_for_entity(cx, entity_id) {
|
||||
if let Ok(facts) = kdao.get_facts_for_entity(cx, entity_id, &persona_filter) {
|
||||
for f in facts.iter().filter(|f| f.status == "active") {
|
||||
let obj = if let Some(ref v) = f.object_value {
|
||||
v.clone()
|
||||
@@ -2577,6 +2595,7 @@ Return ONLY the summary, nothing else."#,
|
||||
&self,
|
||||
args: &serde_json::Value,
|
||||
file_path: &str,
|
||||
persona_id: &str,
|
||||
cx: &opentelemetry::Context,
|
||||
) -> String {
|
||||
use crate::database::models::{InsertEntityFact, InsertEntityPhotoLink};
|
||||
@@ -2627,6 +2646,7 @@ Return ONLY the summary, nothing else."#,
|
||||
confidence: 0.6,
|
||||
status: "active".to_string(),
|
||||
created_at: chrono::Utc::now().timestamp(),
|
||||
persona_id: persona_id.to_string(),
|
||||
};
|
||||
|
||||
let mut kdao = self
|
||||
@@ -3176,6 +3196,7 @@ Return ONLY the summary, nothing else."#,
|
||||
backend: Option<String>,
|
||||
fewshot_examples: Vec<Vec<ChatMessage>>,
|
||||
fewshot_source_ids: Vec<i32>,
|
||||
persona_id: String,
|
||||
) -> Result<(Option<i32>, Option<i32>)> {
|
||||
let tracer = global_tracer();
|
||||
let current_cx = opentelemetry::Context::current();
|
||||
@@ -3652,6 +3673,7 @@ Return ONLY the summary, nothing else."#,
|
||||
&ollama_client,
|
||||
&image_base64,
|
||||
&file_path,
|
||||
&persona_id,
|
||||
&loop_cx,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -335,6 +335,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
None,
|
||||
Vec::new(),
|
||||
Vec::new(),
|
||||
"default".to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -50,10 +50,21 @@ pub struct FactFilter {
|
||||
/// "active" | "reviewed" | "rejected" | "all"
|
||||
pub status: Option<String>,
|
||||
pub predicate: Option<String>,
|
||||
pub persona: PersonaFilter,
|
||||
pub limit: i64,
|
||||
pub offset: i64,
|
||||
}
|
||||
|
||||
/// Persona scoping for fact reads. `Single` filters to one persona's
|
||||
/// view; `All` is the hive-mind read used when a persona has
|
||||
/// `include_all_memories=true` in the personas table. Entities and
|
||||
/// photo-links are always shared and don't take a persona filter.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum PersonaFilter {
|
||||
Single(String),
|
||||
All,
|
||||
}
|
||||
|
||||
pub struct EntityPatch {
|
||||
pub name: Option<String>,
|
||||
pub description: Option<String>,
|
||||
@@ -144,6 +155,7 @@ pub trait KnowledgeDao: Sync + Send {
|
||||
&mut self,
|
||||
cx: &opentelemetry::Context,
|
||||
entity_id: i32,
|
||||
persona: &PersonaFilter,
|
||||
) -> Result<Vec<EntityFact>, DbError>;
|
||||
|
||||
fn list_facts(
|
||||
@@ -199,6 +211,7 @@ pub trait KnowledgeDao: Sync + Send {
|
||||
cx: &opentelemetry::Context,
|
||||
since: i64,
|
||||
limit: i64,
|
||||
persona: &PersonaFilter,
|
||||
) -> Result<RecentActivity, DbError>;
|
||||
}
|
||||
|
||||
@@ -584,10 +597,14 @@ impl KnowledgeDao for SqliteKnowledgeDao {
|
||||
use schema::entity_facts::dsl::*;
|
||||
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
||||
|
||||
// Look for an identical active fact
|
||||
// Look for an identical active fact AUTHORED BY THE SAME
|
||||
// PERSONA. The same claim from a different persona is a
|
||||
// separate fact (each persona's voice/confidence is its own),
|
||||
// not a confidence bump on someone else's row.
|
||||
let mut dup_query = entity_facts
|
||||
.filter(subject_entity_id.eq(fact.subject_entity_id))
|
||||
.filter(predicate.eq(&fact.predicate))
|
||||
.filter(persona_id.eq(&fact.persona_id))
|
||||
.filter(status.ne("rejected"))
|
||||
.into_boxed();
|
||||
|
||||
@@ -640,14 +657,19 @@ impl KnowledgeDao for SqliteKnowledgeDao {
|
||||
&mut self,
|
||||
cx: &opentelemetry::Context,
|
||||
entity_id: i32,
|
||||
persona: &PersonaFilter,
|
||||
) -> Result<Vec<EntityFact>, DbError> {
|
||||
trace_db_call(cx, "query", "get_facts_for_entity", |_span| {
|
||||
use schema::entity_facts::dsl::*;
|
||||
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
||||
entity_facts
|
||||
let mut q = entity_facts
|
||||
.filter(subject_entity_id.eq(entity_id))
|
||||
.filter(status.ne("rejected"))
|
||||
.load::<EntityFact>(conn.deref_mut())
|
||||
.into_boxed();
|
||||
if let PersonaFilter::Single(pid) = persona {
|
||||
q = q.filter(persona_id.eq(pid.clone()));
|
||||
}
|
||||
q.load::<EntityFact>(conn.deref_mut())
|
||||
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
@@ -664,19 +686,27 @@ impl KnowledgeDao for SqliteKnowledgeDao {
|
||||
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
||||
|
||||
let mut query = entity_facts.into_boxed();
|
||||
let mut count_query = entity_facts.into_boxed();
|
||||
|
||||
if let Some(eid) = filter.entity_id {
|
||||
query = query.filter(subject_entity_id.eq(eid));
|
||||
count_query = count_query.filter(subject_entity_id.eq(eid));
|
||||
}
|
||||
let status_val = filter.status.as_deref().unwrap_or("active");
|
||||
if status_val != "all" {
|
||||
query = query.filter(status.eq(status_val));
|
||||
count_query = count_query.filter(status.eq(status_val));
|
||||
}
|
||||
if let Some(ref pred) = filter.predicate {
|
||||
query = query.filter(predicate.eq(pred));
|
||||
count_query = count_query.filter(predicate.eq(pred));
|
||||
}
|
||||
if let PersonaFilter::Single(ref pid) = filter.persona {
|
||||
query = query.filter(persona_id.eq(pid.clone()));
|
||||
count_query = count_query.filter(persona_id.eq(pid.clone()));
|
||||
}
|
||||
|
||||
let total: i64 = entity_facts
|
||||
let total: i64 = count_query
|
||||
.select(count_star())
|
||||
.first(conn.deref_mut())
|
||||
.unwrap_or(0);
|
||||
@@ -854,12 +884,14 @@ impl KnowledgeDao for SqliteKnowledgeDao {
|
||||
cx: &opentelemetry::Context,
|
||||
since: i64,
|
||||
limit: i64,
|
||||
persona: &PersonaFilter,
|
||||
) -> Result<RecentActivity, DbError> {
|
||||
trace_db_call(cx, "query", "get_recent_activity", |_span| {
|
||||
use schema::entities::dsl as e;
|
||||
use schema::entity_facts::dsl as ef;
|
||||
let mut conn = self.connection.lock().expect("KnowledgeDao lock");
|
||||
|
||||
// Entities are shared — recency is global.
|
||||
let recent_entities = e::entities
|
||||
.filter(e::created_at.gt(since))
|
||||
.order(e::created_at.desc())
|
||||
@@ -867,8 +899,13 @@ impl KnowledgeDao for SqliteKnowledgeDao {
|
||||
.load::<Entity>(conn.deref_mut())
|
||||
.map_err(|err| anyhow::anyhow!("Query error: {}", err))?;
|
||||
|
||||
let recent_facts = ef::entity_facts
|
||||
let mut facts_q = ef::entity_facts
|
||||
.filter(ef::created_at.gt(since))
|
||||
.into_boxed();
|
||||
if let PersonaFilter::Single(pid) = persona {
|
||||
facts_q = facts_q.filter(ef::persona_id.eq(pid.clone()));
|
||||
}
|
||||
let recent_facts = facts_q
|
||||
.order(ef::created_at.desc())
|
||||
.limit(limit)
|
||||
.load::<EntityFact>(conn.deref_mut())
|
||||
|
||||
@@ -49,6 +49,7 @@ pub mod insights_dao;
|
||||
pub mod knowledge_dao;
|
||||
pub mod location_dao;
|
||||
pub mod models;
|
||||
pub mod persona_dao;
|
||||
pub mod preview_dao;
|
||||
pub mod reconcile;
|
||||
pub mod schema;
|
||||
@@ -58,10 +59,11 @@ pub use calendar_dao::{CalendarEventDao, SqliteCalendarEventDao};
|
||||
pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao};
|
||||
pub use insights_dao::{InsightDao, SqliteInsightDao};
|
||||
pub use knowledge_dao::{
|
||||
EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, RecentActivity,
|
||||
EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, RecentActivity,
|
||||
SqliteKnowledgeDao,
|
||||
};
|
||||
pub use location_dao::{LocationHistoryDao, SqliteLocationHistoryDao};
|
||||
pub use persona_dao::{ImportPersona, PersonaDao, PersonaPatch, SqlitePersonaDao};
|
||||
pub use preview_dao::{PreviewDao, SqlitePreviewDao};
|
||||
pub use search_dao::{SearchHistoryDao, SqliteSearchHistoryDao};
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::database::schema::{
|
||||
entities, entity_facts, entity_photo_links, favorites, image_exif, libraries, photo_insights,
|
||||
users, video_preview_clips,
|
||||
entities, entity_facts, entity_photo_links, favorites, image_exif, libraries, personas,
|
||||
photo_insights, users, video_preview_clips,
|
||||
};
|
||||
use serde::Serialize;
|
||||
|
||||
@@ -238,6 +238,11 @@ pub struct InsertEntityFact {
|
||||
pub confidence: f32,
|
||||
pub status: String,
|
||||
pub created_at: i64,
|
||||
/// Which persona authored this fact. Shared entities, persona-tagged
|
||||
/// facts: each persona accumulates its own voice over the same
|
||||
/// real-world referents. Defaults to `'default'` for legacy rows
|
||||
/// (see migration 2026-05-09-000000).
|
||||
pub persona_id: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Queryable, Clone, Debug)]
|
||||
@@ -252,6 +257,7 @@ pub struct EntityFact {
|
||||
pub confidence: f32,
|
||||
pub status: String,
|
||||
pub created_at: i64,
|
||||
pub persona_id: String,
|
||||
}
|
||||
|
||||
#[derive(Insertable)]
|
||||
@@ -274,6 +280,34 @@ pub struct EntityPhotoLink {
|
||||
pub role: String,
|
||||
}
|
||||
|
||||
// --- Personas ---
|
||||
|
||||
#[derive(Insertable)]
|
||||
#[diesel(table_name = personas)]
|
||||
pub struct InsertPersona<'a> {
|
||||
pub user_id: i32,
|
||||
pub persona_id: &'a str,
|
||||
pub name: &'a str,
|
||||
pub system_prompt: &'a str,
|
||||
pub is_built_in: bool,
|
||||
pub include_all_memories: bool,
|
||||
pub created_at: i64,
|
||||
pub updated_at: i64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Queryable, Clone, Debug)]
|
||||
pub struct Persona {
|
||||
pub id: i32,
|
||||
pub user_id: i32,
|
||||
pub persona_id: String,
|
||||
pub name: String,
|
||||
pub system_prompt: String,
|
||||
pub is_built_in: bool,
|
||||
pub include_all_memories: bool,
|
||||
pub created_at: i64,
|
||||
pub updated_at: i64,
|
||||
}
|
||||
|
||||
#[derive(Insertable)]
|
||||
#[diesel(table_name = video_preview_clips)]
|
||||
pub struct InsertVideoPreviewClip {
|
||||
|
||||
384
src/database/persona_dao.rs
Normal file
384
src/database/persona_dao.rs
Normal file
@@ -0,0 +1,384 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
use diesel::prelude::*;
|
||||
use diesel::sqlite::SqliteConnection;
|
||||
use std::ops::DerefMut;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use crate::database::models::{InsertPersona, Persona};
|
||||
use crate::database::schema;
|
||||
use crate::database::{DbError, DbErrorKind, connect};
|
||||
use crate::otel::trace_db_call;
|
||||
|
||||
/// Patch shape for update_persona. None = leave field alone. Built-ins are
|
||||
/// allowed to flip `include_all_memories` but should reject name/prompt
|
||||
/// edits at the handler layer (built-in copy lives in the migration).
|
||||
pub struct PersonaPatch {
|
||||
pub name: Option<String>,
|
||||
pub system_prompt: Option<String>,
|
||||
pub include_all_memories: Option<bool>,
|
||||
}
|
||||
|
||||
/// One row of a bulk migration upload. Fields named to match the JSON
|
||||
/// shape the mobile client uploads (`POST /personas/migrate`).
|
||||
pub struct ImportPersona {
|
||||
pub persona_id: String,
|
||||
pub name: String,
|
||||
pub system_prompt: String,
|
||||
pub is_built_in: bool,
|
||||
pub created_at: i64,
|
||||
}
|
||||
|
||||
pub trait PersonaDao: Sync + Send {
|
||||
fn list_personas(
|
||||
&mut self,
|
||||
cx: &opentelemetry::Context,
|
||||
user_id: i32,
|
||||
) -> Result<Vec<Persona>, DbError>;
|
||||
|
||||
fn get_persona(
|
||||
&mut self,
|
||||
cx: &opentelemetry::Context,
|
||||
user_id: i32,
|
||||
persona_id: &str,
|
||||
) -> Result<Option<Persona>, DbError>;
|
||||
|
||||
fn create_persona(
|
||||
&mut self,
|
||||
cx: &opentelemetry::Context,
|
||||
user_id: i32,
|
||||
persona_id: &str,
|
||||
name: &str,
|
||||
system_prompt: &str,
|
||||
is_built_in: bool,
|
||||
include_all_memories: bool,
|
||||
) -> Result<Persona, DbError>;
|
||||
|
||||
fn update_persona(
|
||||
&mut self,
|
||||
cx: &opentelemetry::Context,
|
||||
user_id: i32,
|
||||
persona_id: &str,
|
||||
patch: PersonaPatch,
|
||||
) -> Result<Option<Persona>, DbError>;
|
||||
|
||||
fn delete_persona(
|
||||
&mut self,
|
||||
cx: &opentelemetry::Context,
|
||||
user_id: i32,
|
||||
persona_id: &str,
|
||||
) -> Result<bool, DbError>;
|
||||
|
||||
/// Idempotent bulk import. INSERT OR IGNORE on (user_id, persona_id)
|
||||
/// — re-uploading the same set is a no-op. Returns the number of rows
|
||||
/// actually inserted (skipped duplicates don't count).
|
||||
fn bulk_import(
|
||||
&mut self,
|
||||
cx: &opentelemetry::Context,
|
||||
user_id: i32,
|
||||
personas: &[ImportPersona],
|
||||
) -> Result<usize, DbError>;
|
||||
}
|
||||
|
||||
pub struct SqlitePersonaDao {
|
||||
connection: Arc<Mutex<SqliteConnection>>,
|
||||
}
|
||||
|
||||
impl Default for SqlitePersonaDao {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl SqlitePersonaDao {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
connection: Arc::new(Mutex::new(connect())),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_connection(conn: Arc<Mutex<SqliteConnection>>) -> Self {
|
||||
Self { connection: conn }
|
||||
}
|
||||
}
|
||||
|
||||
impl PersonaDao for SqlitePersonaDao {
|
||||
fn list_personas(
|
||||
&mut self,
|
||||
cx: &opentelemetry::Context,
|
||||
uid: i32,
|
||||
) -> Result<Vec<Persona>, DbError> {
|
||||
trace_db_call(cx, "query", "list_personas", |_span| {
|
||||
use schema::personas::dsl::*;
|
||||
let mut conn = self.connection.lock().expect("PersonaDao lock");
|
||||
personas
|
||||
.filter(user_id.eq(uid))
|
||||
.order(created_at.asc())
|
||||
.load::<Persona>(conn.deref_mut())
|
||||
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
|
||||
fn get_persona(
|
||||
&mut self,
|
||||
cx: &opentelemetry::Context,
|
||||
uid: i32,
|
||||
pid: &str,
|
||||
) -> Result<Option<Persona>, DbError> {
|
||||
trace_db_call(cx, "query", "get_persona", |_span| {
|
||||
use schema::personas::dsl::*;
|
||||
let mut conn = self.connection.lock().expect("PersonaDao lock");
|
||||
personas
|
||||
.filter(user_id.eq(uid))
|
||||
.filter(persona_id.eq(pid))
|
||||
.first::<Persona>(conn.deref_mut())
|
||||
.optional()
|
||||
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
|
||||
fn create_persona(
|
||||
&mut self,
|
||||
cx: &opentelemetry::Context,
|
||||
uid: i32,
|
||||
pid: &str,
|
||||
nm: &str,
|
||||
prompt: &str,
|
||||
builtin: bool,
|
||||
include_all: bool,
|
||||
) -> Result<Persona, DbError> {
|
||||
trace_db_call(cx, "insert", "create_persona", |_span| {
|
||||
use schema::personas::dsl::*;
|
||||
let mut conn = self.connection.lock().expect("PersonaDao lock");
|
||||
let now = chrono::Utc::now().timestamp_millis();
|
||||
|
||||
diesel::insert_into(personas)
|
||||
.values(InsertPersona {
|
||||
user_id: uid,
|
||||
persona_id: pid,
|
||||
name: nm,
|
||||
system_prompt: prompt,
|
||||
is_built_in: builtin,
|
||||
include_all_memories: include_all,
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
})
|
||||
.execute(conn.deref_mut())
|
||||
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
|
||||
|
||||
personas
|
||||
.filter(user_id.eq(uid))
|
||||
.filter(persona_id.eq(pid))
|
||||
.first::<Persona>(conn.deref_mut())
|
||||
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::InsertError))
|
||||
}
|
||||
|
||||
fn update_persona(
|
||||
&mut self,
|
||||
cx: &opentelemetry::Context,
|
||||
uid: i32,
|
||||
pid: &str,
|
||||
patch: PersonaPatch,
|
||||
) -> Result<Option<Persona>, DbError> {
|
||||
trace_db_call(cx, "update", "update_persona", |_span| {
|
||||
use schema::personas::dsl::*;
|
||||
let mut conn = self.connection.lock().expect("PersonaDao lock");
|
||||
let now = chrono::Utc::now().timestamp_millis();
|
||||
|
||||
// Apply each field as its own UPDATE — keeps types simple
|
||||
// (Diesel's tuple updates don't compose cleanly across optional
|
||||
// columns) and matches the pattern already in use for entities
|
||||
// (knowledge_dao.rs::update_entity).
|
||||
if let Some(ref new_name) = patch.name {
|
||||
diesel::update(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid)))
|
||||
.set((name.eq(new_name), updated_at.eq(now)))
|
||||
.execute(conn.deref_mut())
|
||||
.map_err(|e| anyhow::anyhow!("Update name error: {}", e))?;
|
||||
}
|
||||
if let Some(ref new_prompt) = patch.system_prompt {
|
||||
diesel::update(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid)))
|
||||
.set((system_prompt.eq(new_prompt), updated_at.eq(now)))
|
||||
.execute(conn.deref_mut())
|
||||
.map_err(|e| anyhow::anyhow!("Update prompt error: {}", e))?;
|
||||
}
|
||||
if let Some(new_include_all) = patch.include_all_memories {
|
||||
diesel::update(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid)))
|
||||
.set((include_all_memories.eq(new_include_all), updated_at.eq(now)))
|
||||
.execute(conn.deref_mut())
|
||||
.map_err(|e| anyhow::anyhow!("Update include_all error: {}", e))?;
|
||||
}
|
||||
|
||||
personas
|
||||
.filter(user_id.eq(uid))
|
||||
.filter(persona_id.eq(pid))
|
||||
.first::<Persona>(conn.deref_mut())
|
||||
.optional()
|
||||
.map_err(|e| anyhow::anyhow!("Query error: {}", e))
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
|
||||
}
|
||||
|
||||
fn delete_persona(
|
||||
&mut self,
|
||||
cx: &opentelemetry::Context,
|
||||
uid: i32,
|
||||
pid: &str,
|
||||
) -> Result<bool, DbError> {
|
||||
trace_db_call(cx, "delete", "delete_persona", |_span| {
|
||||
use schema::personas::dsl::*;
|
||||
let mut conn = self.connection.lock().expect("PersonaDao lock");
|
||||
let n = diesel::delete(personas.filter(user_id.eq(uid)).filter(persona_id.eq(pid)))
|
||||
.execute(conn.deref_mut())
|
||||
.map_err(|e| anyhow::anyhow!("Delete error: {}", e))?;
|
||||
Ok(n > 0)
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
||||
}
|
||||
|
||||
fn bulk_import(
|
||||
&mut self,
|
||||
cx: &opentelemetry::Context,
|
||||
uid: i32,
|
||||
rows: &[ImportPersona],
|
||||
) -> Result<usize, DbError> {
|
||||
trace_db_call(cx, "insert", "bulk_import_personas", |_span| {
|
||||
let mut conn = self.connection.lock().expect("PersonaDao lock");
|
||||
let now = chrono::Utc::now().timestamp_millis();
|
||||
let mut inserted = 0usize;
|
||||
|
||||
// INSERT OR IGNORE on the (user_id, persona_id) UNIQUE so
|
||||
// re-running migrate is a no-op for personas already on the
|
||||
// server.
|
||||
for p in rows {
|
||||
let n = diesel::sql_query(
|
||||
"INSERT OR IGNORE INTO personas (user_id, persona_id, name, system_prompt, \
|
||||
is_built_in, include_all_memories, created_at, updated_at) \
|
||||
VALUES (?, ?, ?, ?, ?, 0, ?, ?)",
|
||||
)
|
||||
.bind::<diesel::sql_types::Integer, _>(uid)
|
||||
.bind::<diesel::sql_types::Text, _>(&p.persona_id)
|
||||
.bind::<diesel::sql_types::Text, _>(&p.name)
|
||||
.bind::<diesel::sql_types::Text, _>(&p.system_prompt)
|
||||
.bind::<diesel::sql_types::Bool, _>(p.is_built_in)
|
||||
.bind::<diesel::sql_types::BigInt, _>(p.created_at)
|
||||
.bind::<diesel::sql_types::BigInt, _>(now)
|
||||
.execute(conn.deref_mut())
|
||||
.map_err(|e| anyhow::anyhow!("Insert error: {}", e))?;
|
||||
inserted += n;
|
||||
}
|
||||
Ok(inserted)
|
||||
})
|
||||
.map_err(|_| DbError::new(DbErrorKind::InsertError))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::database::test::in_memory_db_connection;
|
||||
|
||||
fn dao_with_user(username: &str) -> (SqlitePersonaDao, i32) {
|
||||
use crate::database::schema::users::dsl as u;
|
||||
let conn = Arc::new(Mutex::new(in_memory_db_connection()));
|
||||
diesel::insert_into(u::users)
|
||||
.values((u::username.eq(username), u::password.eq("x")))
|
||||
.execute(conn.lock().unwrap().deref_mut())
|
||||
.unwrap();
|
||||
let user_id: i32 = u::users
|
||||
.filter(u::username.eq(username))
|
||||
.select(u::id)
|
||||
.first(conn.lock().unwrap().deref_mut())
|
||||
.unwrap();
|
||||
(SqlitePersonaDao::from_connection(conn), user_id)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn create_and_list_round_trip() {
|
||||
let cx = opentelemetry::Context::new();
|
||||
let (mut dao, uid) = dao_with_user("alice");
|
||||
|
||||
// The migration seeds 3 built-ins for any existing user; alice
|
||||
// was created post-migration so she starts empty.
|
||||
let p = dao
|
||||
.create_persona(&cx, uid, "custom-1", "Custom A", "prompt A", false, false)
|
||||
.unwrap();
|
||||
assert_eq!(p.persona_id, "custom-1");
|
||||
assert_eq!(p.user_id, uid);
|
||||
assert!(!p.is_built_in);
|
||||
|
||||
let list = dao.list_personas(&cx, uid).unwrap();
|
||||
assert_eq!(list.len(), 1);
|
||||
assert_eq!(list[0].persona_id, "custom-1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unique_constraint_blocks_duplicate_persona_id() {
|
||||
let cx = opentelemetry::Context::new();
|
||||
let (mut dao, uid) = dao_with_user("bob");
|
||||
|
||||
dao.create_persona(&cx, uid, "x", "X", "p", false, false)
|
||||
.unwrap();
|
||||
let err = dao.create_persona(&cx, uid, "x", "X2", "p2", false, false);
|
||||
assert!(
|
||||
err.is_err(),
|
||||
"second insert with same persona_id should fail"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bulk_import_is_idempotent() {
|
||||
let cx = opentelemetry::Context::new();
|
||||
let (mut dao, uid) = dao_with_user("carol");
|
||||
|
||||
let rows = vec![
|
||||
ImportPersona {
|
||||
persona_id: "custom-a".into(),
|
||||
name: "A".into(),
|
||||
system_prompt: "p1".into(),
|
||||
is_built_in: false,
|
||||
created_at: 1,
|
||||
},
|
||||
ImportPersona {
|
||||
persona_id: "custom-b".into(),
|
||||
name: "B".into(),
|
||||
system_prompt: "p2".into(),
|
||||
is_built_in: false,
|
||||
created_at: 2,
|
||||
},
|
||||
];
|
||||
|
||||
let first = dao.bulk_import(&cx, uid, &rows).unwrap();
|
||||
assert_eq!(first, 2);
|
||||
let second = dao.bulk_import(&cx, uid, &rows).unwrap();
|
||||
assert_eq!(second, 0, "re-import should insert nothing");
|
||||
|
||||
assert_eq!(dao.list_personas(&cx, uid).unwrap().len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn update_toggles_include_all_memories() {
|
||||
let cx = opentelemetry::Context::new();
|
||||
let (mut dao, uid) = dao_with_user("dan");
|
||||
|
||||
dao.create_persona(&cx, uid, "j", "Journal", "p", true, false)
|
||||
.unwrap();
|
||||
let updated = dao
|
||||
.update_persona(
|
||||
&cx,
|
||||
uid,
|
||||
"j",
|
||||
PersonaPatch {
|
||||
name: None,
|
||||
system_prompt: None,
|
||||
include_all_memories: Some(true),
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(updated.include_all_memories);
|
||||
}
|
||||
}
|
||||
@@ -57,6 +57,7 @@ diesel::table! {
|
||||
confidence -> Float,
|
||||
status -> Text,
|
||||
created_at -> BigInt,
|
||||
persona_id -> Text,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,6 +160,20 @@ diesel::table! {
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
personas (id) {
|
||||
id -> Integer,
|
||||
user_id -> Integer,
|
||||
persona_id -> Text,
|
||||
name -> Text,
|
||||
system_prompt -> Text,
|
||||
is_built_in -> Bool,
|
||||
include_all_memories -> Bool,
|
||||
created_at -> BigInt,
|
||||
updated_at -> BigInt,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
persons (id) {
|
||||
id -> Integer,
|
||||
@@ -249,6 +264,7 @@ diesel::joinable!(entity_photo_links -> libraries (library_id));
|
||||
diesel::joinable!(face_detections -> libraries (library_id));
|
||||
diesel::joinable!(face_detections -> persons (person_id));
|
||||
diesel::joinable!(image_exif -> libraries (library_id));
|
||||
diesel::joinable!(personas -> users (user_id));
|
||||
diesel::joinable!(persons -> entities (entity_id));
|
||||
diesel::joinable!(photo_insights -> libraries (library_id));
|
||||
diesel::joinable!(tagged_photo -> tags (tag_id));
|
||||
@@ -265,6 +281,7 @@ diesel::allow_tables_to_appear_in_same_query!(
|
||||
image_exif,
|
||||
libraries,
|
||||
location_history,
|
||||
personas,
|
||||
persons,
|
||||
photo_insights,
|
||||
search_history,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use actix_web::dev::{ServiceFactory, ServiceRequest};
|
||||
use actix_web::{App, HttpResponse, Responder, web};
|
||||
use actix_web::{App, HttpRequest, HttpResponse, Responder, web};
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Mutex;
|
||||
@@ -7,8 +7,38 @@ use std::sync::Mutex;
|
||||
use crate::data::Claims;
|
||||
use crate::database::models::{Entity, EntityFact, EntityPhotoLink};
|
||||
use crate::database::{
|
||||
EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, RecentActivity,
|
||||
EntityFilter, EntityPatch, FactFilter, FactPatch, KnowledgeDao, PersonaFilter, RecentActivity,
|
||||
};
|
||||
use crate::personas::PersonaDaoData;
|
||||
|
||||
/// Resolve the `X-Persona-Id` header into a `PersonaFilter`. Missing
|
||||
/// header → `'default'`. If the persona has `include_all_memories=true`,
|
||||
/// returns `PersonaFilter::All` so reads see the full hive-mind pool.
|
||||
/// On lookup failure (e.g. malformed JWT) returns `Single("default")` —
|
||||
/// safer than `All` because it preserves the historical baseline view.
|
||||
fn resolve_persona_filter(
|
||||
req: &HttpRequest,
|
||||
claims: &Claims,
|
||||
persona_dao: &PersonaDaoData,
|
||||
) -> PersonaFilter {
|
||||
let pid = req
|
||||
.headers()
|
||||
.get("X-Persona-Id")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| "default".to_string());
|
||||
|
||||
let Ok(uid) = claims.sub.parse::<i32>() else {
|
||||
return PersonaFilter::Single(pid);
|
||||
};
|
||||
|
||||
let cx = opentelemetry::Context::current();
|
||||
let mut dao = persona_dao.lock().expect("Unable to lock PersonaDao");
|
||||
match dao.get_persona(&cx, uid, &pid) {
|
||||
Ok(Some(p)) if p.include_all_memories => PersonaFilter::All,
|
||||
_ => PersonaFilter::Single(pid),
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Request / Response types
|
||||
@@ -246,10 +276,13 @@ async fn list_entities<D: KnowledgeDao + 'static>(
|
||||
}
|
||||
|
||||
async fn get_entity<D: KnowledgeDao + 'static>(
|
||||
_claims: Claims,
|
||||
req: HttpRequest,
|
||||
claims: Claims,
|
||||
id: web::Path<i32>,
|
||||
dao: web::Data<Mutex<D>>,
|
||||
persona_dao: PersonaDaoData,
|
||||
) -> impl Responder {
|
||||
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
|
||||
let cx = opentelemetry::Context::current();
|
||||
let entity_id = id.into_inner();
|
||||
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
|
||||
@@ -266,8 +299,8 @@ async fn get_entity<D: KnowledgeDao + 'static>(
|
||||
}
|
||||
};
|
||||
|
||||
// Fetch all facts (all statuses for audit)
|
||||
let raw_facts: Vec<EntityFact> = match dao.get_facts_for_entity(&cx, entity_id) {
|
||||
// Fetch all facts (all statuses for audit), scoped to the active persona.
|
||||
let raw_facts: Vec<EntityFact> = match dao.get_facts_for_entity(&cx, entity_id, &persona) {
|
||||
Ok(f) => f,
|
||||
Err(e) => {
|
||||
log::error!("get_facts_for_entity error: {:?}", e);
|
||||
@@ -426,9 +459,11 @@ async fn merge_entities<D: KnowledgeDao + 'static>(
|
||||
}
|
||||
|
||||
async fn list_facts<D: KnowledgeDao + 'static>(
|
||||
_claims: Claims,
|
||||
req: HttpRequest,
|
||||
claims: Claims,
|
||||
query: web::Query<FactListQuery>,
|
||||
dao: web::Data<Mutex<D>>,
|
||||
persona_dao: PersonaDaoData,
|
||||
) -> impl Responder {
|
||||
let limit = query.limit.unwrap_or(50).min(200);
|
||||
let offset = query.offset.unwrap_or(0);
|
||||
@@ -438,11 +473,13 @@ async fn list_facts<D: KnowledgeDao + 'static>(
|
||||
Some("all") => None,
|
||||
Some(s) => Some(s.to_string()),
|
||||
};
|
||||
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
|
||||
|
||||
let filter = FactFilter {
|
||||
entity_id: query.entity_id,
|
||||
status: status_filter,
|
||||
predicate: query.predicate.clone(),
|
||||
persona,
|
||||
limit,
|
||||
offset,
|
||||
};
|
||||
@@ -539,18 +576,21 @@ async fn delete_fact<D: KnowledgeDao + 'static>(
|
||||
}
|
||||
|
||||
async fn get_recent<D: KnowledgeDao + 'static>(
|
||||
_claims: Claims,
|
||||
req: HttpRequest,
|
||||
claims: Claims,
|
||||
query: web::Query<RecentQuery>,
|
||||
dao: web::Data<Mutex<D>>,
|
||||
persona_dao: PersonaDaoData,
|
||||
) -> impl Responder {
|
||||
let since = query
|
||||
.since
|
||||
.unwrap_or_else(|| Utc::now().timestamp() - 86400);
|
||||
let limit = query.limit.unwrap_or(20).min(100);
|
||||
|
||||
let persona = resolve_persona_filter(&req, &claims, &persona_dao);
|
||||
let cx = opentelemetry::Context::current();
|
||||
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
|
||||
match dao.get_recent_activity(&cx, since, limit) {
|
||||
match dao.get_recent_activity(&cx, since, limit, &persona) {
|
||||
Ok(RecentActivity { entities, facts }) => {
|
||||
let entity_summaries: Vec<EntitySummary> =
|
||||
entities.into_iter().map(EntitySummary::from).collect();
|
||||
|
||||
@@ -26,6 +26,7 @@ pub mod memories;
|
||||
pub mod otel;
|
||||
pub mod parsers;
|
||||
pub mod perceptual_hash;
|
||||
pub mod personas;
|
||||
pub mod service;
|
||||
pub mod state;
|
||||
pub mod tags;
|
||||
|
||||
@@ -84,6 +84,7 @@ mod video;
|
||||
mod knowledge;
|
||||
mod memories;
|
||||
mod otel;
|
||||
mod personas;
|
||||
mod service;
|
||||
#[cfg(test)]
|
||||
mod testhelpers;
|
||||
@@ -1974,6 +1975,7 @@ fn main() -> std::io::Result<()> {
|
||||
.service(libraries::list_libraries)
|
||||
.add_feature(add_tag_services::<_, SqliteTagDao>)
|
||||
.add_feature(knowledge::add_knowledge_services::<_, SqliteKnowledgeDao>)
|
||||
.add_feature(personas::add_persona_services)
|
||||
.add_feature(faces::add_face_services::<_, faces::SqliteFaceDao>)
|
||||
.add_feature(duplicates::add_duplicate_services)
|
||||
.app_data(app_data.clone())
|
||||
@@ -1997,6 +1999,9 @@ fn main() -> std::io::Result<()> {
|
||||
.app_data::<Data<Mutex<SqliteKnowledgeDao>>>(Data::new(Mutex::new(
|
||||
SqliteKnowledgeDao::new(),
|
||||
)))
|
||||
.app_data::<Data<Mutex<Box<dyn database::PersonaDao>>>>(Data::new(Mutex::new(
|
||||
Box::new(database::SqlitePersonaDao::new()),
|
||||
)))
|
||||
.app_data::<Data<Mutex<faces::SqliteFaceDao>>>(Data::new(Mutex::new(face_dao)))
|
||||
.app_data::<Data<crate::ai::face_client::FaceClient>>(Data::new(
|
||||
app_data.face_client.clone(),
|
||||
|
||||
309
src/personas.rs
Normal file
309
src/personas.rs
Normal file
@@ -0,0 +1,309 @@
|
||||
//! HTTP handlers for the server-side persona store.
|
||||
//!
|
||||
//! Personas previously lived only in mobile AsyncStorage; this module
|
||||
//! elevates them so they can sync across devices and so the
|
||||
//! `entity_facts.persona_id` column has something to reference.
|
||||
//!
|
||||
//! Built-in personas (default / journal / factual) are seeded by the
|
||||
//! migration. Customs are created here and may be migrated up from a
|
||||
//! device's local store via `POST /personas/migrate`.
|
||||
|
||||
use actix_web::dev::{ServiceFactory, ServiceRequest};
|
||||
use actix_web::{App, HttpResponse, Responder, web};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Mutex;
|
||||
|
||||
use crate::data::Claims;
|
||||
use crate::database::models::Persona;
|
||||
use crate::database::{ImportPersona, PersonaDao, PersonaPatch};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Wire shapes — camelCase out the door, snake_case from the DB.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct PersonaView {
|
||||
pub id: String,
|
||||
pub name: String,
|
||||
#[serde(rename = "systemPrompt")]
|
||||
pub system_prompt: String,
|
||||
#[serde(rename = "isBuiltIn")]
|
||||
pub is_built_in: bool,
|
||||
#[serde(rename = "includeAllMemories")]
|
||||
pub include_all_memories: bool,
|
||||
#[serde(rename = "createdAt")]
|
||||
pub created_at: i64,
|
||||
#[serde(rename = "updatedAt")]
|
||||
pub updated_at: i64,
|
||||
}
|
||||
|
||||
impl From<Persona> for PersonaView {
|
||||
fn from(p: Persona) -> Self {
|
||||
Self {
|
||||
id: p.persona_id,
|
||||
name: p.name,
|
||||
system_prompt: p.system_prompt,
|
||||
is_built_in: p.is_built_in,
|
||||
include_all_memories: p.include_all_memories,
|
||||
created_at: p.created_at,
|
||||
updated_at: p.updated_at,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct CreatePersonaRequest {
|
||||
pub name: String,
|
||||
#[serde(rename = "systemPrompt")]
|
||||
pub system_prompt: String,
|
||||
/// Optional caller-provided id. When present (e.g. a client that
|
||||
/// already minted `"custom-1735124234"` locally and is upgrading from
|
||||
/// the AsyncStorage-only era), the server uses it; collisions return
|
||||
/// 409. When absent the server mints `"custom-<ms>"`.
|
||||
#[serde(default, rename = "personaId")]
|
||||
pub persona_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct UpdatePersonaRequest {
|
||||
#[serde(default)]
|
||||
pub name: Option<String>,
|
||||
#[serde(default, rename = "systemPrompt")]
|
||||
pub system_prompt: Option<String>,
|
||||
#[serde(default, rename = "includeAllMemories")]
|
||||
pub include_all_memories: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct MigrateRequest {
|
||||
pub personas: Vec<MigratePersona>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct MigratePersona {
|
||||
pub id: String,
|
||||
pub name: String,
|
||||
#[serde(rename = "systemPrompt")]
|
||||
pub system_prompt: String,
|
||||
#[serde(default, rename = "isBuiltIn")]
|
||||
pub is_built_in: bool,
|
||||
#[serde(default, rename = "createdAt")]
|
||||
pub created_at: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct MigrateResponse {
|
||||
pub inserted: usize,
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Service registration
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub type PersonaDaoData = web::Data<Mutex<Box<dyn PersonaDao>>>;
|
||||
|
||||
pub fn add_persona_services<T>(app: App<T>) -> App<T>
|
||||
where
|
||||
T: ServiceFactory<ServiceRequest, Config = (), Error = actix_web::Error, InitError = ()>,
|
||||
{
|
||||
app.service(
|
||||
web::scope("/personas")
|
||||
.service(web::resource("/migrate").route(web::post().to(migrate_personas)))
|
||||
.service(
|
||||
web::resource("")
|
||||
.route(web::get().to(list_personas))
|
||||
.route(web::post().to(create_persona)),
|
||||
)
|
||||
.service(
|
||||
web::resource("/{persona_id}")
|
||||
.route(web::put().to(update_persona))
|
||||
.route(web::delete().to(delete_persona)),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Handlers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn user_id_from_claims(claims: &Claims) -> Option<i32> {
|
||||
claims.sub.parse::<i32>().ok()
|
||||
}
|
||||
|
||||
async fn list_personas(claims: Claims, dao: PersonaDaoData) -> impl Responder {
|
||||
let Some(uid) = user_id_from_claims(&claims) else {
|
||||
return HttpResponse::Unauthorized().json(serde_json::json!({"error": "Invalid claims"}));
|
||||
};
|
||||
let cx = opentelemetry::Context::current();
|
||||
let mut dao = dao.lock().expect("Unable to lock PersonaDao");
|
||||
match dao.list_personas(&cx, uid) {
|
||||
Ok(rows) => {
|
||||
let views: Vec<PersonaView> = rows.into_iter().map(PersonaView::from).collect();
|
||||
HttpResponse::Ok().json(views)
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("list_personas error: {:?}", e);
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_persona(
|
||||
claims: Claims,
|
||||
body: web::Json<CreatePersonaRequest>,
|
||||
dao: PersonaDaoData,
|
||||
) -> impl Responder {
|
||||
let Some(uid) = user_id_from_claims(&claims) else {
|
||||
return HttpResponse::Unauthorized().json(serde_json::json!({"error": "Invalid claims"}));
|
||||
};
|
||||
|
||||
if body.name.trim().is_empty() {
|
||||
return HttpResponse::BadRequest().json(serde_json::json!({"error": "name is required"}));
|
||||
}
|
||||
if body.system_prompt.trim().is_empty() {
|
||||
return HttpResponse::BadRequest()
|
||||
.json(serde_json::json!({"error": "systemPrompt is required"}));
|
||||
}
|
||||
|
||||
let cx = opentelemetry::Context::current();
|
||||
let mut dao = dao.lock().expect("Unable to lock PersonaDao");
|
||||
|
||||
let pid = match body.persona_id.as_deref() {
|
||||
Some(s) if !s.trim().is_empty() => s.to_string(),
|
||||
_ => format!("custom-{}", chrono::Utc::now().timestamp_millis()),
|
||||
};
|
||||
|
||||
if matches!(pid.as_str(), "default" | "journal" | "factual") {
|
||||
return HttpResponse::Conflict()
|
||||
.json(serde_json::json!({"error": "persona id collides with a built-in"}));
|
||||
}
|
||||
|
||||
// Pre-check existence so we can return 409 cleanly. The DB UNIQUE
|
||||
// would also catch it, but parsing Diesel's "constraint violation"
|
||||
// out of a generic DbError is uglier than a quick lookup.
|
||||
if let Ok(Some(_)) = dao.get_persona(&cx, uid, &pid) {
|
||||
return HttpResponse::Conflict()
|
||||
.json(serde_json::json!({"error": "persona already exists"}));
|
||||
}
|
||||
|
||||
match dao.create_persona(
|
||||
&cx,
|
||||
uid,
|
||||
&pid,
|
||||
&body.name,
|
||||
&body.system_prompt,
|
||||
false,
|
||||
false,
|
||||
) {
|
||||
Ok(p) => HttpResponse::Created().json(PersonaView::from(p)),
|
||||
Err(e) => {
|
||||
log::error!("create_persona error: {:?}", e);
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_persona(
|
||||
claims: Claims,
|
||||
path: web::Path<String>,
|
||||
body: web::Json<UpdatePersonaRequest>,
|
||||
dao: PersonaDaoData,
|
||||
) -> impl Responder {
|
||||
let Some(uid) = user_id_from_claims(&claims) else {
|
||||
return HttpResponse::Unauthorized().json(serde_json::json!({"error": "Invalid claims"}));
|
||||
};
|
||||
let pid = path.into_inner();
|
||||
let cx = opentelemetry::Context::current();
|
||||
let mut dao = dao.lock().expect("Unable to lock PersonaDao");
|
||||
|
||||
let patch = PersonaPatch {
|
||||
name: body.name.clone(),
|
||||
system_prompt: body.system_prompt.clone(),
|
||||
include_all_memories: body.include_all_memories,
|
||||
};
|
||||
match dao.update_persona(&cx, uid, &pid, patch) {
|
||||
Ok(Some(p)) => HttpResponse::Ok().json(PersonaView::from(p)),
|
||||
Ok(None) => {
|
||||
HttpResponse::NotFound().json(serde_json::json!({"error": "Persona not found"}))
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("update_persona error: {:?}", e);
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_persona(
|
||||
claims: Claims,
|
||||
path: web::Path<String>,
|
||||
dao: PersonaDaoData,
|
||||
) -> impl Responder {
|
||||
let Some(uid) = user_id_from_claims(&claims) else {
|
||||
return HttpResponse::Unauthorized().json(serde_json::json!({"error": "Invalid claims"}));
|
||||
};
|
||||
let pid = path.into_inner();
|
||||
let cx = opentelemetry::Context::current();
|
||||
let mut dao = dao.lock().expect("Unable to lock PersonaDao");
|
||||
|
||||
match dao.get_persona(&cx, uid, &pid) {
|
||||
Ok(Some(p)) if p.is_built_in => {
|
||||
return HttpResponse::Conflict()
|
||||
.json(serde_json::json!({"error": "Cannot delete built-in persona"}));
|
||||
}
|
||||
Ok(None) => {
|
||||
return HttpResponse::NotFound()
|
||||
.json(serde_json::json!({"error": "Persona not found"}));
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("delete_persona lookup error: {:?}", e);
|
||||
return HttpResponse::InternalServerError()
|
||||
.json(serde_json::json!({"error": "Database error"}));
|
||||
}
|
||||
Ok(Some(_)) => {}
|
||||
}
|
||||
|
||||
match dao.delete_persona(&cx, uid, &pid) {
|
||||
Ok(_) => HttpResponse::NoContent().finish(),
|
||||
Err(e) => {
|
||||
log::error!("delete_persona error: {:?}", e);
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn migrate_personas(
|
||||
claims: Claims,
|
||||
body: web::Json<MigrateRequest>,
|
||||
dao: PersonaDaoData,
|
||||
) -> impl Responder {
|
||||
let Some(uid) = user_id_from_claims(&claims) else {
|
||||
return HttpResponse::Unauthorized().json(serde_json::json!({"error": "Invalid claims"}));
|
||||
};
|
||||
let cx = opentelemetry::Context::current();
|
||||
let mut dao = dao.lock().expect("Unable to lock PersonaDao");
|
||||
|
||||
// Filter out built-in ids — those are already seeded by the
|
||||
// migration and re-importing them would be a no-op anyway thanks to
|
||||
// INSERT OR IGNORE, but skipping early avoids the UNIQUE round-trip.
|
||||
let now = chrono::Utc::now().timestamp_millis();
|
||||
let rows: Vec<ImportPersona> = body
|
||||
.personas
|
||||
.iter()
|
||||
.filter(|p| !matches!(p.id.as_str(), "default" | "journal" | "factual"))
|
||||
.map(|p| ImportPersona {
|
||||
persona_id: p.id.clone(),
|
||||
name: p.name.clone(),
|
||||
system_prompt: p.system_prompt.clone(),
|
||||
is_built_in: p.is_built_in,
|
||||
created_at: p.created_at.unwrap_or(now),
|
||||
})
|
||||
.collect();
|
||||
|
||||
match dao.bulk_import(&cx, uid, &rows) {
|
||||
Ok(inserted) => HttpResponse::Ok().json(MigrateResponse { inserted }),
|
||||
Err(e) => {
|
||||
log::error!("migrate_personas error: {:?}", e);
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user