diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index 2aac683..3617261 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -3525,6 +3525,7 @@ Return ONLY the summary, nothing else."#, - Use recall_facts_for_photo + recall_entities to load any prior knowledge about subjects in the photo.\n\ - When you identify people / places / events / things, use store_entity + store_fact to grow the persistent memory.\n\ - Before store_entity, call recall_entities to check whether a similar name already exists; reuse the existing entity_id rather than creating a near-duplicate (e.g. \"Sara\" vs \"Sarah J.\"). The DAO will collapse obvious cosine matches, but choosing the existing id keeps facts and photo links consolidated.\n\ + - Predicates should be relationship-shaped verbs that encode a queryable claim — `lives_in`, `works_at`, `attended`, `is_friend_of`, `is_parent_of`, `interested_in`, `married_to`, `owns`. DO NOT use vague speech-act predicates like `expressed`, `said`, `mentioned`, `stated`, `quoted`, `noted`, `discussed`, `thought`, `wondered`. DO NOT store quotations or sentence fragments as `object_value` — paraphrase into a structured claim. Bad: `(Cameron, expressed, \"I'm tempted to get a part-time job there\")`. Good: `(Cameron, considered_employment_at, )` or `(Cameron, interested_in, \"part-time work\")`.\n\ - A tool returning no results is informative; continue with the others.", ); diff --git a/src/database/knowledge_dao.rs b/src/database/knowledge_dao.rs index 028454b..069dd38 100644 --- a/src/database/knowledge_dao.rs +++ b/src/database/knowledge_dao.rs @@ -205,6 +205,30 @@ pub trait KnowledgeDao: Sync + Send { persona: &PersonaFilter, ) -> Result<(Vec<(Entity, i64)>, i64), DbError>; + /// Aggregate the user's active+reviewed facts by predicate so + /// the curation UI can flag noisy verbs ("expressed", "said") and + /// bulk-reject. Persona-scoped via the existing PersonaFilter + /// pattern. Sorted by count desc. + fn get_predicate_stats( + &mut self, + cx: &opentelemetry::Context, + persona: &PersonaFilter, + limit: usize, + ) -> Result, DbError>; + + /// Bulk reject every active fact under a given predicate + /// (persona-scoped). Returns the number of rows touched. Used by + /// the predicate-cleanup UI to nuke noise verbs in one click. + /// Stamps last_modified_* with the caller-supplied audit so the + /// action shows up in the recent-edits feed. + fn bulk_reject_facts_by_predicate( + &mut self, + cx: &opentelemetry::Context, + persona: &PersonaFilter, + predicate: &str, + audit: Option<(&str, &str)>, + ) -> Result; + /// Build a graph snapshot — entities as nodes (fact count from /// the active persona scope), relational facts as edges. Used /// by the curation UI's graph view. Filters: @@ -872,6 +896,122 @@ impl KnowledgeDao for SqliteKnowledgeDao { .map_err(|_| DbError::new(DbErrorKind::QueryError)) } + fn get_predicate_stats( + &mut self, + cx: &opentelemetry::Context, + persona: &PersonaFilter, + limit: usize, + ) -> Result, DbError> { + trace_db_call(cx, "query", "get_predicate_stats", |_span| { + use diesel::sql_query; + use diesel::sql_types::{BigInt, Integer, Text}; + + // Active + reviewed only — rejected / superseded are + // already off the agent's read path so they shouldn't + // count toward "what predicates are noisy in production". + let where_sql = match persona { + PersonaFilter::Single { .. } => { + "WHERE user_id = ? AND persona_id = ? \ + AND status IN ('active','reviewed')" + } + PersonaFilter::All { .. } => { + "WHERE user_id = ? AND status IN ('active','reviewed')" + } + }; + let sql = format!( + "SELECT predicate, COUNT(*) AS cnt FROM entity_facts \ + {where_sql} \ + GROUP BY predicate \ + ORDER BY cnt DESC \ + LIMIT ?", + ); + + #[derive(diesel::QueryableByName)] + struct Row { + #[diesel(sql_type = Text)] + predicate: String, + #[diesel(sql_type = BigInt)] + cnt: i64, + } + + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + let mut q = sql_query(sql).into_boxed(); + match persona { + PersonaFilter::Single { user_id, persona_id } => { + q = q + .bind::(*user_id) + .bind::(persona_id.clone()); + } + PersonaFilter::All { user_id } => { + q = q.bind::(*user_id); + } + } + q = q.bind::(limit as i64); + + let rows: Vec = q + .load(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {}", e))?; + Ok(rows.into_iter().map(|r| (r.predicate, r.cnt)).collect()) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn bulk_reject_facts_by_predicate( + &mut self, + cx: &opentelemetry::Context, + persona: &PersonaFilter, + target_predicate: &str, + audit: Option<(&str, &str)>, + ) -> Result { + trace_db_call(cx, "update", "bulk_reject_facts_by_predicate", |_span| { + use schema::entity_facts::dsl::*; + let mut conn = self.connection.lock().expect("KnowledgeDao lock"); + + let now = chrono::Utc::now().timestamp(); + let (audit_model, audit_backend) = match audit { + Some((m, b)) => (Some(m.to_string()), Some(b.to_string())), + None => (None, None), + }; + + // Persona scoping mirrors get_predicate_stats. Only ACTIVE + // rows flip — REVIEWED survives so the curator can preserve + // a hand-approved exception under the same predicate. + let touched = match persona { + PersonaFilter::Single { user_id: uid, persona_id: pid } => diesel::update( + entity_facts + .filter(predicate.eq(target_predicate)) + .filter(user_id.eq(*uid)) + .filter(persona_id.eq(pid)) + .filter(status.eq("active")), + ) + .set(( + status.eq("rejected"), + last_modified_by_model.eq(audit_model.clone()), + last_modified_by_backend.eq(audit_backend.clone()), + last_modified_at.eq(Some(now)), + )) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Bulk reject error: {}", e))?, + PersonaFilter::All { user_id: uid } => diesel::update( + entity_facts + .filter(predicate.eq(target_predicate)) + .filter(user_id.eq(*uid)) + .filter(status.eq("active")), + ) + .set(( + status.eq("rejected"), + last_modified_by_model.eq(audit_model.clone()), + last_modified_by_backend.eq(audit_backend.clone()), + last_modified_at.eq(Some(now)), + )) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Bulk reject error: {}", e))?, + }; + Ok(touched) + }) + .map_err(|_| DbError::new(DbErrorKind::UpdateError)) + } + fn build_entity_graph( &mut self, cx: &opentelemetry::Context, diff --git a/src/knowledge.rs b/src/knowledge.rs index fa1142b..4c3f5a8 100644 --- a/src/knowledge.rs +++ b/src/knowledge.rs @@ -359,6 +359,27 @@ pub struct GraphResponse { pub edges: Vec, } +#[derive(Deserialize)] +pub struct PredicateStatsQuery { + pub limit: Option, +} + +#[derive(Serialize)] +pub struct PredicateStat { + pub predicate: String, + pub count: i64, +} + +#[derive(Serialize)] +pub struct PredicateStatsResponse { + pub predicates: Vec, +} + +#[derive(Serialize)] +pub struct BulkRejectResponse { + pub rejected: usize, +} + #[derive(Deserialize)] pub struct ConsolidationQuery { /// Cosine threshold for clustering. Default 0.85 — looser than @@ -421,7 +442,15 @@ where web::resource("/consolidation-proposals") .route(web::get().to(get_consolidation_proposals::)), ) - .service(web::resource("/graph").route(web::get().to(get_graph::))), + .service(web::resource("/graph").route(web::get().to(get_graph::))) + .service( + web::resource("/predicate-stats") + .route(web::get().to(get_predicate_stats::)), + ) + .service( + web::resource("/predicates/{predicate}/bulk-reject") + .route(web::post().to(bulk_reject_predicate::)), + ), ) } @@ -1192,6 +1221,60 @@ async fn get_recent( } } +async fn get_predicate_stats( + req: HttpRequest, + claims: Claims, + query: web::Query, + dao: web::Data>, + persona_dao: PersonaDaoData, +) -> impl Responder { + let limit = query.limit.unwrap_or(100).clamp(1, 500) as usize; + let persona = resolve_persona_filter(&req, &claims, &persona_dao); + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + match dao.get_predicate_stats(&cx, &persona, limit) { + Ok(rows) => HttpResponse::Ok().json(PredicateStatsResponse { + predicates: rows + .into_iter() + .map(|(predicate, count)| PredicateStat { predicate, count }) + .collect(), + }), + Err(e) => { + log::error!("get_predicate_stats error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + +async fn bulk_reject_predicate( + req: HttpRequest, + claims: Claims, + predicate: web::Path, + dao: web::Data>, + persona_dao: PersonaDaoData, +) -> impl Responder { + let predicate = predicate.into_inner(); + if predicate.trim().is_empty() { + return HttpResponse::BadRequest() + .json(serde_json::json!({"error": "predicate must not be empty"})); + } + let persona = resolve_persona_filter(&req, &claims, &persona_dao); + let cx = opentelemetry::Context::current(); + let mut dao = dao.lock().expect("Unable to lock KnowledgeDao"); + match dao.bulk_reject_facts_by_predicate( + &cx, + &persona, + &predicate, + Some(("manual", "manual")), + ) { + Ok(rejected) => HttpResponse::Ok().json(BulkRejectResponse { rejected }), + Err(e) => { + log::error!("bulk_reject_predicate error: {:?}", e); + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) + } + } +} + async fn get_graph( req: HttpRequest, claims: Claims,