knowledge: valid-time on facts + interval-aware conflict detection
Adds bitemporal support to entity_facts. Existing `created_at` is
transaction time (when we recorded the fact); the new
`valid_from` / `valid_until` BIGINT columns are valid time (when the
fact is/was true in the real world). NULL on either side = unbounded
on that side, both NULL = "always-true / unknown" — matches the
default state of every legacy row, no backfill needed.
The split matters for time-bounded predicates like
is_in_relationship_with / lives_in / works_at: recording the fact
once doesn't mean the relationship is still ongoing. Same predicate
across different windows ("lives_in NYC 2018-2020", "lives_in SF
2020-present") is no longer a conflict — the interval-aware check
in get_entity only flags pairs whose windows overlap. Facts with no
valid-time data still flag against everything (worst case for legacy
rows — user adds dates to suppress).
API surface:
- POST /knowledge/facts accepts optional valid_from / valid_until.
- PATCH /knowledge/facts/{id} accepts both with tri-state semantics:
field omitted = leave alone, JSON null = clear to NULL, number =
set. Implemented via a small serde helper around Option<Option>.
- GET /knowledge/entities/{id} surfaces both fields per fact and
uses them in conflict detection.
Agent path (insight_generator) writes NULL/NULL for now — deriving
valid_from from the source photo's date_taken is slated for a
follow-up agent tool alongside Phase 2's supersession.
Test pins set + clear semantics via update_fact: setting both
bounds, leaving them alone on a subsequent patch, then clearing
valid_until back to NULL.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
|||||||
|
-- SQLite can drop columns since 3.35 (March 2021); embedded
|
||||||
|
-- libsqlite3-sys is well past that. Drop in reverse insert order so
|
||||||
|
-- a partial down still leaves the schema valid.
|
||||||
|
ALTER TABLE entity_facts DROP COLUMN valid_until;
|
||||||
|
ALTER TABLE entity_facts DROP COLUMN valid_from;
|
||||||
25
migrations/2026-05-10-000100_entity_facts_valid_time/up.sql
Normal file
25
migrations/2026-05-10-000100_entity_facts_valid_time/up.sql
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
-- Add valid-time columns to entity_facts.
|
||||||
|
--
|
||||||
|
-- entity_facts already has created_at — *transaction time*, the
|
||||||
|
-- moment WE recorded the fact. That's not the same as the real-world
|
||||||
|
-- period the fact was true. "Cameron is_in_relationship_with X" was
|
||||||
|
-- only true during a window; recording it in 2026 doesn't make it
|
||||||
|
-- true today. Without the distinction, every former relationship,
|
||||||
|
-- former job, former address reads as currently-true.
|
||||||
|
--
|
||||||
|
-- Adding two BIGINT NULL columns: valid_from / valid_until (unix
|
||||||
|
-- seconds). NULL means "unbounded on that side" — `valid_from IS
|
||||||
|
-- NULL` reads as "always-true-back-to-the-beginning",
|
||||||
|
-- `valid_until IS NULL` as "still-true-now-or-unknown". Both NULL =
|
||||||
|
-- temporal validity unknown (current state of all legacy rows).
|
||||||
|
--
|
||||||
|
-- Conflict detection refines accordingly: same-predicate facts with
|
||||||
|
-- different objects stop flagging when their intervals are disjoint
|
||||||
|
-- ("lives_in NYC 2018-2020" and "lives_in SF 2020-present" are both
|
||||||
|
-- valid, just at different times).
|
||||||
|
|
||||||
|
ALTER TABLE entity_facts ADD COLUMN valid_from BIGINT;
|
||||||
|
ALTER TABLE entity_facts ADD COLUMN valid_until BIGINT;
|
||||||
|
|
||||||
|
-- Optional partial index for time-bounded scans. Skipped for now —
|
||||||
|
-- conflict detection runs per-entity (small N) and doesn't need it.
|
||||||
@@ -2684,6 +2684,11 @@ Return ONLY the summary, nothing else."#,
|
|||||||
created_at: chrono::Utc::now().timestamp(),
|
created_at: chrono::Utc::now().timestamp(),
|
||||||
persona_id: persona_id.to_string(),
|
persona_id: persona_id.to_string(),
|
||||||
user_id,
|
user_id,
|
||||||
|
// The agentic loop doesn't yet derive valid-time from the
|
||||||
|
// photo's date_taken. Left NULL for now; Phase 2's
|
||||||
|
// supersession + a future agent tool will populate these.
|
||||||
|
valid_from: None,
|
||||||
|
valid_until: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut kdao = self
|
let mut kdao = self
|
||||||
|
|||||||
@@ -101,6 +101,15 @@ pub struct FactPatch {
|
|||||||
pub object_value: Option<String>,
|
pub object_value: Option<String>,
|
||||||
pub status: Option<String>,
|
pub status: Option<String>,
|
||||||
pub confidence: Option<f32>,
|
pub confidence: Option<f32>,
|
||||||
|
/// Real-world valid-time bounds. Outer Some = "patch this column";
|
||||||
|
/// inner Some(val) = set to that unix-seconds value; inner None =
|
||||||
|
/// clear back to NULL ("unbounded"). The double-Option lets the
|
||||||
|
/// HTTP layer distinguish "field omitted" (leave alone) from
|
||||||
|
/// "field sent as null" (clear) — needed for these specifically
|
||||||
|
/// because there's no sentinel string-empty equivalent like the
|
||||||
|
/// other fields have.
|
||||||
|
pub valid_from: Option<Option<i64>>,
|
||||||
|
pub valid_until: Option<Option<i64>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RecentActivity {
|
pub struct RecentActivity {
|
||||||
@@ -1074,6 +1083,18 @@ impl KnowledgeDao for SqliteKnowledgeDao {
|
|||||||
.execute(conn.deref_mut())
|
.execute(conn.deref_mut())
|
||||||
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
||||||
}
|
}
|
||||||
|
if let Some(new_from) = patch.valid_from {
|
||||||
|
diesel::update(entity_facts.filter(id.eq(fact_id)))
|
||||||
|
.set(valid_from.eq(new_from))
|
||||||
|
.execute(conn.deref_mut())
|
||||||
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
||||||
|
}
|
||||||
|
if let Some(new_until) = patch.valid_until {
|
||||||
|
diesel::update(entity_facts.filter(id.eq(fact_id)))
|
||||||
|
.set(valid_until.eq(new_until))
|
||||||
|
.execute(conn.deref_mut())
|
||||||
|
.map_err(|e| anyhow::anyhow!("Update error: {}", e))?;
|
||||||
|
}
|
||||||
|
|
||||||
entity_facts
|
entity_facts
|
||||||
.filter(id.eq(fact_id))
|
.filter(id.eq(fact_id))
|
||||||
@@ -1347,6 +1368,8 @@ mod tests {
|
|||||||
created_at: 0,
|
created_at: 0,
|
||||||
persona_id: persona_id.to_string(),
|
persona_id: persona_id.to_string(),
|
||||||
user_id,
|
user_id,
|
||||||
|
valid_from: None,
|
||||||
|
valid_until: None,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -1565,6 +1588,8 @@ mod tests {
|
|||||||
created_at: 0,
|
created_at: 0,
|
||||||
persona_id: "ghost".to_string(),
|
persona_id: "ghost".to_string(),
|
||||||
user_id: alice,
|
user_id: alice,
|
||||||
|
valid_from: None,
|
||||||
|
valid_until: None,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
assert!(
|
assert!(
|
||||||
@@ -1573,6 +1598,87 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn update_fact_can_set_and_clear_valid_time() {
|
||||||
|
// FactPatch.valid_from / valid_until are Option<Option<i64>>
|
||||||
|
// so PATCH can distinguish "leave alone" (None) from "set to
|
||||||
|
// value" (Some(Some(n))) and "clear back to NULL" (Some(None)).
|
||||||
|
let cx = opentelemetry::Context::new();
|
||||||
|
let conn = connection_with_fks_on();
|
||||||
|
let alice = create_user(&conn, "alice");
|
||||||
|
create_persona_row(&conn, alice, "default");
|
||||||
|
|
||||||
|
let mut dao = SqliteKnowledgeDao::from_connection(conn.clone());
|
||||||
|
let cameron = make_entity(&mut dao, "Cameron");
|
||||||
|
let fact = add_fact(
|
||||||
|
&mut dao,
|
||||||
|
cameron.id,
|
||||||
|
"is_in_relationship_with",
|
||||||
|
"Alex",
|
||||||
|
alice,
|
||||||
|
"default",
|
||||||
|
);
|
||||||
|
assert_eq!(fact.valid_from, None);
|
||||||
|
assert_eq!(fact.valid_until, None);
|
||||||
|
|
||||||
|
// Set both bounds.
|
||||||
|
let updated = dao
|
||||||
|
.update_fact(
|
||||||
|
&cx,
|
||||||
|
fact.id,
|
||||||
|
FactPatch {
|
||||||
|
predicate: None,
|
||||||
|
object_value: None,
|
||||||
|
status: None,
|
||||||
|
confidence: None,
|
||||||
|
valid_from: Some(Some(1577836800)), // 2020-01-01
|
||||||
|
valid_until: Some(Some(1640995200)), // 2022-01-01
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(updated.valid_from, Some(1577836800));
|
||||||
|
assert_eq!(updated.valid_until, Some(1640995200));
|
||||||
|
|
||||||
|
// Leave alone: omit both — values persist.
|
||||||
|
let still = dao
|
||||||
|
.update_fact(
|
||||||
|
&cx,
|
||||||
|
fact.id,
|
||||||
|
FactPatch {
|
||||||
|
predicate: None,
|
||||||
|
object_value: None,
|
||||||
|
status: None,
|
||||||
|
confidence: None,
|
||||||
|
valid_from: None,
|
||||||
|
valid_until: None,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(still.valid_from, Some(1577836800));
|
||||||
|
assert_eq!(still.valid_until, Some(1640995200));
|
||||||
|
|
||||||
|
// Clear valid_until back to NULL (relationship ongoing again).
|
||||||
|
let cleared = dao
|
||||||
|
.update_fact(
|
||||||
|
&cx,
|
||||||
|
fact.id,
|
||||||
|
FactPatch {
|
||||||
|
predicate: None,
|
||||||
|
object_value: None,
|
||||||
|
status: None,
|
||||||
|
confidence: None,
|
||||||
|
valid_from: None,
|
||||||
|
valid_until: Some(None),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(cleared.valid_from, Some(1577836800));
|
||||||
|
assert_eq!(cleared.valid_until, None);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn delete_entity_clears_relational_facts_that_would_violate_check() {
|
fn delete_entity_clears_relational_facts_that_would_violate_check() {
|
||||||
// entity_facts has a CHECK that at least one of object_entity_id /
|
// entity_facts has a CHECK that at least one of object_entity_id /
|
||||||
@@ -1607,6 +1713,8 @@ mod tests {
|
|||||||
created_at: 0,
|
created_at: 0,
|
||||||
persona_id: "default".to_string(),
|
persona_id: "default".to_string(),
|
||||||
user_id: alice,
|
user_id: alice,
|
||||||
|
valid_from: None,
|
||||||
|
valid_until: None,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|||||||
@@ -249,6 +249,15 @@ pub struct InsertEntityFact {
|
|||||||
/// persona must not see each other's facts. Always paired with
|
/// persona must not see each other's facts. Always paired with
|
||||||
/// `persona_id` — they're a unit.
|
/// `persona_id` — they're a unit.
|
||||||
pub user_id: i32,
|
pub user_id: i32,
|
||||||
|
/// Real-world period the fact is/was true (unix seconds). NULL on
|
||||||
|
/// either side = unbounded — `valid_from IS NULL` reads as
|
||||||
|
/// "always-true-back-to-the-beginning", `valid_until IS NULL` as
|
||||||
|
/// "still-true-now-or-unknown". Distinguishes valid time from
|
||||||
|
/// transaction time (`created_at` is when we recorded the fact,
|
||||||
|
/// not when it was true in the world). See migration
|
||||||
|
/// 2026-05-10-000100.
|
||||||
|
pub valid_from: Option<i64>,
|
||||||
|
pub valid_until: Option<i64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Queryable, Clone, Debug)]
|
#[derive(Serialize, Queryable, Clone, Debug)]
|
||||||
@@ -265,6 +274,8 @@ pub struct EntityFact {
|
|||||||
pub created_at: i64,
|
pub created_at: i64,
|
||||||
pub persona_id: String,
|
pub persona_id: String,
|
||||||
pub user_id: i32,
|
pub user_id: i32,
|
||||||
|
pub valid_from: Option<i64>,
|
||||||
|
pub valid_until: Option<i64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Insertable)]
|
#[derive(Insertable)]
|
||||||
|
|||||||
@@ -59,6 +59,8 @@ diesel::table! {
|
|||||||
created_at -> BigInt,
|
created_at -> BigInt,
|
||||||
persona_id -> Text,
|
persona_id -> Text,
|
||||||
user_id -> Integer,
|
user_id -> Integer,
|
||||||
|
valid_from -> Nullable<BigInt>,
|
||||||
|
valid_until -> Nullable<BigInt>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -109,13 +109,20 @@ pub struct FactDetail {
|
|||||||
pub source_photo: Option<String>,
|
pub source_photo: Option<String>,
|
||||||
pub source_insight_id: Option<i32>,
|
pub source_insight_id: Option<i32>,
|
||||||
pub created_at: i64,
|
pub created_at: i64,
|
||||||
/// Set when another active fact has the same subject+predicate but
|
/// Real-world valid-time interval. NULL on either side means
|
||||||
/// a different object. Detected at read time (no schema change) by
|
/// unbounded; both NULL = "always true" / validity unknown.
|
||||||
/// the get_entity handler grouping facts by predicate. Some
|
/// Distinct from `created_at` (transaction time — when we
|
||||||
/// predicates are legitimately multi-valued ("tagged_in",
|
/// recorded it). See migration 2026-05-10-000100.
|
||||||
/// "friend_of") so this is a *signal* for the curator, not a hard
|
pub valid_from: Option<i64>,
|
||||||
/// invariant. Stale-data correction is the common case (Alice
|
pub valid_until: Option<i64>,
|
||||||
/// lives_in NYC AND SF — one of these is wrong).
|
/// Set when another active fact has the same subject+predicate,
|
||||||
|
/// a different object, AND their valid-time intervals overlap.
|
||||||
|
/// Detected at read time by the get_entity handler grouping
|
||||||
|
/// facts by predicate. Some predicates are legitimately
|
||||||
|
/// multi-valued ("tagged_in", "friend_of") so this is a *signal*
|
||||||
|
/// for the curator, not a hard invariant. The interval check
|
||||||
|
/// keeps "lives_in NYC 2018-2020" + "lives_in SF 2020-present"
|
||||||
|
/// from false-positive flagging.
|
||||||
#[serde(skip_serializing_if = "std::ops::Not::not")]
|
#[serde(skip_serializing_if = "std::ops::Not::not")]
|
||||||
pub in_conflict: bool,
|
pub in_conflict: bool,
|
||||||
}
|
}
|
||||||
@@ -197,12 +204,28 @@ pub struct EntityPatchRequest {
|
|||||||
pub confidence: Option<f32>,
|
pub confidence: Option<f32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Serde helper for the "tri-state" pattern: distinguish "field
|
||||||
|
/// omitted" from "field sent as null". Used for nullable columns
|
||||||
|
/// where we want PATCH to support both "leave alone" and "set NULL".
|
||||||
|
fn deserialize_optional_nullable_i64<'de, D>(d: D) -> Result<Option<Option<i64>>, D::Error>
|
||||||
|
where
|
||||||
|
D: serde::Deserializer<'de>,
|
||||||
|
{
|
||||||
|
Ok(Some(Option::<i64>::deserialize(d)?))
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct FactPatchRequest {
|
pub struct FactPatchRequest {
|
||||||
pub predicate: Option<String>,
|
pub predicate: Option<String>,
|
||||||
pub object_value: Option<String>,
|
pub object_value: Option<String>,
|
||||||
pub status: Option<String>,
|
pub status: Option<String>,
|
||||||
pub confidence: Option<f32>,
|
pub confidence: Option<f32>,
|
||||||
|
/// Tri-state: missing = leave alone, null = clear to NULL, number
|
||||||
|
/// = set. See `deserialize_optional_nullable_i64`.
|
||||||
|
#[serde(default, deserialize_with = "deserialize_optional_nullable_i64")]
|
||||||
|
pub valid_from: Option<Option<i64>>,
|
||||||
|
#[serde(default, deserialize_with = "deserialize_optional_nullable_i64")]
|
||||||
|
pub valid_until: Option<Option<i64>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
@@ -213,6 +236,8 @@ pub struct FactCreateRequest {
|
|||||||
pub object_value: Option<String>,
|
pub object_value: Option<String>,
|
||||||
pub source_photo: Option<String>,
|
pub source_photo: Option<String>,
|
||||||
pub confidence: Option<f32>,
|
pub confidence: Option<f32>,
|
||||||
|
pub valid_from: Option<i64>,
|
||||||
|
pub valid_until: Option<i64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
@@ -390,17 +415,28 @@ async fn get_entity<D: KnowledgeDao + 'static>(
|
|||||||
source_photo: f.source_photo,
|
source_photo: f.source_photo,
|
||||||
source_insight_id: f.source_insight_id,
|
source_insight_id: f.source_insight_id,
|
||||||
created_at: f.created_at,
|
created_at: f.created_at,
|
||||||
|
valid_from: f.valid_from,
|
||||||
|
valid_until: f.valid_until,
|
||||||
in_conflict: false,
|
in_conflict: false,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Conflict detection: within the active set, group by predicate;
|
// Conflict detection: within the active set, group by predicate;
|
||||||
// any predicate group with more than one distinct object (entity
|
// for each pair within a group that disagrees on the object,
|
||||||
// id or value) flags all its members. Some predicates are
|
// flag both only if their valid-time intervals overlap. NULL on
|
||||||
// legitimately multi-valued (e.g. "tagged_in", "friend_of") so
|
// either bound treats that side as unbounded — a fact with no
|
||||||
// this is a curator hint, not a hard rule — `in_conflict` exists
|
// valid-time data still flags against any time period (worst case
|
||||||
// to surface stale-data candidates ("lives_in NYC" and
|
// for legacy data; user adds dates to suppress).
|
||||||
// "lives_in SF" can't both be current).
|
fn intervals_overlap(
|
||||||
|
a: (Option<i64>, Option<i64>),
|
||||||
|
b: (Option<i64>, Option<i64>),
|
||||||
|
) -> bool {
|
||||||
|
let a_lo = a.0.unwrap_or(i64::MIN);
|
||||||
|
let a_hi = a.1.unwrap_or(i64::MAX);
|
||||||
|
let b_lo = b.0.unwrap_or(i64::MIN);
|
||||||
|
let b_hi = b.1.unwrap_or(i64::MAX);
|
||||||
|
a_lo < b_hi && b_lo < a_hi
|
||||||
|
}
|
||||||
{
|
{
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
let mut by_predicate: HashMap<String, Vec<usize>> = HashMap::new();
|
let mut by_predicate: HashMap<String, Vec<usize>> = HashMap::new();
|
||||||
@@ -417,15 +453,21 @@ async fn get_entity<D: KnowledgeDao + 'static>(
|
|||||||
if indices.len() < 2 {
|
if indices.len() < 2 {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Distinct (object_entity_id, object_value) tuples across
|
for (a_pos, &i) in indices.iter().enumerate() {
|
||||||
// these active facts.
|
for &j in &indices[a_pos + 1..] {
|
||||||
let mut seen: HashSet<(Option<i32>, Option<String>)> = HashSet::new();
|
let same_object = facts[i].object_entity_id
|
||||||
for &i in indices {
|
== facts[j].object_entity_id
|
||||||
seen.insert((facts[i].object_entity_id, facts[i].object_value.clone()));
|
&& facts[i].object_value == facts[j].object_value;
|
||||||
}
|
if same_object {
|
||||||
if seen.len() > 1 {
|
continue;
|
||||||
for &i in indices {
|
}
|
||||||
to_flag.insert(i);
|
if intervals_overlap(
|
||||||
|
(facts[i].valid_from, facts[i].valid_until),
|
||||||
|
(facts[j].valid_from, facts[j].valid_until),
|
||||||
|
) {
|
||||||
|
to_flag.insert(i);
|
||||||
|
to_flag.insert(j);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -708,6 +750,8 @@ async fn create_fact<D: KnowledgeDao + 'static>(
|
|||||||
created_at: now,
|
created_at: now,
|
||||||
persona_id,
|
persona_id,
|
||||||
user_id,
|
user_id,
|
||||||
|
valid_from: body.valid_from,
|
||||||
|
valid_until: body.valid_until,
|
||||||
};
|
};
|
||||||
|
|
||||||
match dao.upsert_fact(&cx, insert) {
|
match dao.upsert_fact(&cx, insert) {
|
||||||
@@ -739,6 +783,8 @@ async fn patch_fact<D: KnowledgeDao + 'static>(
|
|||||||
object_value: body.object_value.clone(),
|
object_value: body.object_value.clone(),
|
||||||
status: body.status.clone(),
|
status: body.status.clone(),
|
||||||
confidence: body.confidence,
|
confidence: body.confidence,
|
||||||
|
valid_from: body.valid_from,
|
||||||
|
valid_until: body.valid_until,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
|
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
|
||||||
|
|||||||
Reference in New Issue
Block a user