From e67e00ef8a204f4cfd61544b457c08ff9c232c67 Mon Sep 17 00:00:00 2001 From: Cameron Cordes Date: Mon, 11 May 2026 21:50:26 -0400 Subject: [PATCH] knowledge: predicate-quality nudge + bulk-reject endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two coupled changes to fight the speech-act-predicate problem (facts like (Cameron, expressed, "I'm tempted to...")): 1. System prompt grows an explicit predicate-quality rule. The agent is told to use relationship-shaped verbs (lives_in, works_at, attended, is_friend_of, interested_in), and is given an explicit DON'T list (expressed, said, mentioned, stated, quoted, noted, discussed, thought, wondered). Plus a concrete Bad / Good example contrasting the noise pattern with the structured paraphrase the agent should be writing. Stops the bleed for new insights. 2. Cleanup tools for the legacy noise that's already in the table: - get_predicate_stats(persona, limit) returns [(predicate, count)] sorted desc — feeds the curation UI's PREDICATES tab. - bulk_reject_facts_by_predicate(persona, predicate, audit) flips every ACTIVE fact under that predicate to 'rejected' in one transaction, stamping last_modified_* so the action is attributable + reversible per-fact through the entity detail panel. REVIEWED facts under the same predicate are left alone — the curator may have hand-approved an exception ("interested_in" might be largely noise but a reviewed entry is intentional). New HTTP endpoints: GET /knowledge/predicate-stats?limit= POST /knowledge/predicates/{predicate}/bulk-reject Persona-scoped via the existing X-Persona-Id header. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ai/insight_generator.rs | 1 + src/database/knowledge_dao.rs | 140 ++++++++++++++++++++++++++++++++++ src/knowledge.rs | 85 ++++++++++++++++++++- 3 files changed, 225 insertions(+), 1 deletion(-) diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index 2aac683..3617261 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -3525,6 +3525,7 @@ Return ONLY the summary, nothing else."#, - Use recall_facts_for_photo + recall_entities to load any prior knowledge about subjects in the photo.\n\ - When you identify people / places / events / things, use store_entity + store_fact to grow the persistent memory.\n\ - Before store_entity, call recall_entities to check whether a similar name already exists; reuse the existing entity_id rather than creating a near-duplicate (e.g. \"Sara\" vs \"Sarah J.\"). The DAO will collapse obvious cosine matches, but choosing the existing id keeps facts and photo links consolidated.\n\ + - Predicates should be relationship-shaped verbs that encode a queryable claim — `lives_in`, `works_at`, `attended`, `is_friend_of`, `is_parent_of`, `interested_in`, `married_to`, `owns`. DO NOT use vague speech-act predicates like `expressed`, `said`, `mentioned`, `stated`, `quoted`, `noted`, `discussed`, `thought`, `wondered`. DO NOT store quotations or sentence fragments as `object_value` — paraphrase into a structured claim. Bad: `(Cameron, expressed, \"I'm tempted to get a part-time job there\")`. Good: `(Cameron, considered_employment_at, )` or `(Cameron, interested_in, \"part-time work\")`.\n\ - A tool returning no results is informative; continue with the others.", ); diff --git a/src/database/knowledge_dao.rs b/src/database/knowledge_dao.rs index 028454b..069dd38 100644 --- a/src/database/knowledge_dao.rs +++ b/src/database/knowledge_dao.rs @@ -205,6 +205,30 @@ pub trait KnowledgeDao: Sync + Send { persona: &PersonaFilter, ) -> Result<(Vec<(Entity, i64)>, i64), DbError>; + /// Aggregate the user's active+reviewed facts by predicate so + /// the curation UI can flag noisy verbs ("expressed", "said") and + /// bulk-reject. Persona-scoped via the existing PersonaFilter + /// pattern. Sorted by count desc. + fn get_predicate_stats( + &mut self, + cx: &opentelemetry::Context, + persona: &PersonaFilter, + limit: usize, + ) -> Result, DbError>; + + /// Bulk reject every active fact under a given predicate + /// (persona-scoped). Returns the number of rows touched. Used by + /// the predicate-cleanup UI to nuke noise verbs in one click. + /// Stamps last_modified_* with the caller-supplied audit so the + /// action shows up in the recent-edits feed. + fn bulk_reject_facts_by_predicate( + &mut self, + cx: &opentelemetry::Context, + persona: &PersonaFilter, + predicate: &str, + audit: Option<(&str, &str)>, + ) -> Result; + /// Build a graph snapshot — entities as nodes (fact count from /// the active persona scope), relational facts as edges. Used /// by the curation UI's graph view. Filters: @@ -872,6 +896,122 @@ impl KnowledgeDao for SqliteKnowledgeDao { .map_err(|_| DbError::new(DbErrorKind::QueryError)) } + fn get_predicate_stats( + &mut self, + cx: &opentelemetry::Context, + persona: &PersonaFilter, + limit: usize, + ) -> Result, DbError> { + trace_db_call(cx, "query", "get_predicate_stats", |_span| { + use diesel::sql_query; + use diesel::sql_types::{BigInt, Integer, Text}; + + // Active + reviewed only — rejected / superseded are + // already off the agent's read path so they shouldn't + // count toward "what predicates are noisy in production". + let where_sql = match persona { + PersonaFilter::Single { .. } => { + "WHERE user_id = ? AND persona_id = ? \ + AND status IN ('active','reviewed')" + } + PersonaFilter::All { .. } => { + "WHERE user_id = ? AND status IN ('active','reviewed')" + } + }; + let sql = format!( + "SELECT predicate, COUNT(*) AS cnt FROM entity_facts \ + {where_sql} \ + GROUP BY predicate \ + ORDER BY cnt DESC \ + LIMIT ?", + ); + + #[derive(diesel::QueryableByName)] + struct Row { + #[diesel(sql_type = Text)] + predicate: String, + #[diesel(sql_type = BigInt)] + cnt: i64, + } + + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + let mut q = sql_query(sql).into_boxed(); + match persona { + PersonaFilter::Single { user_id, persona_id } => { + q = q + .bind::(*user_id) + .bind::(persona_id.clone()); + } + PersonaFilter::All { user_id } => { + q = q.bind::(*user_id); + } + } + q = q.bind::(limit as i64); + + let rows: Vec = q + .load(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + Ok(rows.into_iter().map(|r| (r.predicate, r.cnt)).collect()) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn bulk_reject_facts_by_predicate( + &mut self, + cx: &opentelemetry::Context, + persona: &PersonaFilter, + target_predicate: &str, + audit: Option<(&str, &str)>, + ) -> Result { + trace_db_call(cx, "update", "bulk_reject_facts_by_predicate", |_span| { + use schema::entity_facts::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + let now = chrono::Utc::now().timestamp(); + let (audit_model, audit_backend) = match audit { + Some((m, b)) => (Some(m.to_string()), Some(b.to_string())), + None => (None, None), + }; + + // Persona scoping mirrors get_predicate_stats. Only ACTIVE + // rows flip — REVIEWED survives so the curator can preserve + // a hand-approved exception under the same predicate. + let touched = match persona { + PersonaFilter::Single { user_id: uid, persona_id: pid } => diesel::update( + entity_facts + .filter(predicate.eq(target_predicate)) + .filter(user_id.eq(*uid)) + .filter(persona_id.eq(pid)) + .filter(status.eq("active")), + ) + .set(( + status.eq("rejected"), + last_modified_by_model.eq(audit_model.clone()), + last_modified_by_backend.eq(audit_backend.clone()), + last_modified_at.eq(Some(now)), + )) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Bulk reject error: {}", e))?, + PersonaFilter::All { user_id: uid } => diesel::update( + entity_facts + .filter(predicate.eq(target_predicate)) + .filter(user_id.eq(*uid)) + .filter(status.eq("active")), + ) + .set(( + status.eq("rejected"), + last_modified_by_model.eq(audit_model.clone()), + last_modified_by_backend.eq(audit_backend.clone()), + last_modified_at.eq(Some(now)), + )) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Bulk reject error: {}", e))?, + }; + Ok(touched) + }) + .map_err(|_| DbError::new(DbErrorKind::UpdateError)) + } + fn build_entity_graph( &mut self, cx: &opentelemetry::Context, diff --git a/src/knowledge.rs b/src/knowledge.rs index fa1142b..4c3f5a8 100644 --- a/src/knowledge.rs +++ b/src/knowledge.rs @@ -359,6 +359,27 @@ pub struct GraphResponse { pub edges: Vec, } +#[derive(Deserialize)] +pub struct PredicateStatsQuery { + pub limit: Option, +} + +#[derive(Serialize)] +pub struct PredicateStat { + pub predicate: String, + pub count: i64, +} + +#[derive(Serialize)] +pub struct PredicateStatsResponse { + pub predicates: Vec, +} + +#[derive(Serialize)] +pub struct BulkRejectResponse { + pub rejected: usize, +} + #[derive(Deserialize)] pub struct ConsolidationQuery { /// Cosine threshold for clustering. Default 0.85 — looser than @@ -421,7 +442,15 @@ where web::resource("/consolidation-proposals") .route(web::get().to(get_consolidation_proposals::)), ) - .service(web::resource("/graph").route(web::get().to(get_graph::))), + .service(web::resource("/graph").route(web::get().to(get_graph::))) + .service( + web::resource("/predicate-stats") + .route(web::get().to(get_predicate_stats::)), + ) + .service( + web::resource("/predicates/{predicate}/bulk-reject") + .route(web::post().to(bulk_reject_predicate::)), + ), ) } @@ -1192,6 +1221,60 @@ async fn get_recent( } } +async fn get_predicate_stats( + req: HttpRequest, + claims: Claims, + query: web::Query, + dao: web::Data>, + persona_dao: PersonaDaoData, +) -> impl Responder { + let limit = query.limit.unwrap_or(100).clamp(1, 500) as usize; + let persona = resolve_persona_filter(&req, &claims, &persona_dao); + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + match dao.get_predicate_stats(&cx, &persona, limit) { + Ok(rows) => HttpResponse::Ok().json(PredicateStatsResponse { + predicates: rows + .into_iter() + .map(|(predicate, count)| PredicateStat { predicate, count }) + .collect(), + }), + Err(e) => { + log::error!("get_predicate_stats error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + +async fn bulk_reject_predicate( + req: HttpRequest, + claims: Claims, + predicate: web::Path, + dao: web::Data>, + persona_dao: PersonaDaoData, +) -> impl Responder { + let predicate = predicate.into_inner(); + if predicate.trim().is_empty() { + return HttpResponse::BadRequest() + .json(serde_json::json!({"error": "predicate must not be empty"})); + } + let persona = resolve_persona_filter(&req, &claims, &persona_dao); + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + match dao.bulk_reject_facts_by_predicate( + &cx, + &persona, + &predicate, + Some(("manual", "manual")), + ) { + Ok(rejected) => HttpResponse::Ok().json(BulkRejectResponse { rejected }), + Err(e) => { + log::error!("bulk_reject_predicate error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + async fn get_graph( req: HttpRequest, claims: Claims,