Branch B of the multi-library data-model rollout. tagged_photo and
photo_insights now follow the bytes (content_hash), not the path,
matching the policy pinned in CLAUDE.md "Multi-library data model".
Branch A's availability probe and EXIF scoping land first; this
branch builds on top.
Migration (2026-05-01-000000_hash_keyed_derived_data)
Adds nullable content_hash columns to tagged_photo and photo_insights,
with partial indexes on the non-null subset to keep the index small
during the transitional window. The migration backfills from
image_exif:
* tagged_photo joins on rel_path alone (no library_id available);
* photo_insights joins on (library_id, rel_path), unambiguous.
Rows whose image_exif hash isn't known yet stay null and the runtime
reconciliation pass populates them as the hash backlog drains.
Insert-time population
TagDao::tag_file looks up image_exif.content_hash by rel_path before
inserting; the hash is written into the new column.
InsightDao::store_insight does the same scoped to (library_id,
rel_path). Caller-supplied hash on InsertPhotoInsight wins; otherwise
the DAO does the lookup. Both paths fall back to None if the hash
isn't known yet — reconciliation backfills.
Reconciliation (database/reconcile.rs)
Three idempotent passes the watcher runs once per tick after the
per-library backfill loop:
1. tagged_photo NULL hashes → populate from image_exif by rel_path.
2. photo_insights NULL hashes → populate by (library_id, rel_path).
3. photo_insights scalar merge — when multiple is_current rows
share a content_hash, keep the earliest generated_at as
current; demote the rest. Demoted rows keep their data so
/insights/history is unaffected; only the "current" pointer
narrows to one per hash.
No filesystem dependency, so reconcile doesn't need the availability
gate; runs every tick. Logs once when something changed, debug
otherwise.
Tags are set-valued under the policy (union on read, already
DISTINCT in queries), so there is no analogous tag-collapse pass —
duplicate (tag_id, content_hash) rows across libraries are
harmless.
Read paths are unchanged in this branch — lookup_tags_batch's
existing rel_path-via-hash-sibling expansion still produces the
correct merge. A follow-up can simplify reads to use the new column
directly for performance.
Tests: 217 pass (212 pre-existing + 5 new in reconcile covering
NULL-fill, hash-not-yet-known no-op, library scoping on insights,
earliest-wins collapse, idempotency).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
352 lines
12 KiB
Rust
352 lines
12 KiB
Rust
use diesel::prelude::*;
|
|
use diesel::sqlite::SqliteConnection;
|
|
use std::ops::DerefMut;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use crate::database::models::{InsertPhotoInsight, PhotoInsight};
|
|
use crate::database::schema;
|
|
use crate::database::{DbError, DbErrorKind, connect};
|
|
use crate::otel::trace_db_call;
|
|
|
|
pub trait InsightDao: Sync + Send {
|
|
fn store_insight(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
insight: InsertPhotoInsight,
|
|
) -> Result<PhotoInsight, DbError>;
|
|
|
|
fn get_insight(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
file_path: &str,
|
|
) -> Result<Option<PhotoInsight>, DbError>;
|
|
|
|
/// Return the most recent current insight whose rel_path is one of
|
|
/// `paths`. Used for content-hash sharing: the caller expands a
|
|
/// single file into all rel_paths with the same content_hash, then
|
|
/// asks here for any existing insight attached to any of them.
|
|
fn get_insight_for_paths(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
paths: &[String],
|
|
) -> Result<Option<PhotoInsight>, DbError>;
|
|
|
|
#[allow(dead_code)]
|
|
fn get_insight_history(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
file_path: &str,
|
|
) -> Result<Vec<PhotoInsight>, DbError>;
|
|
|
|
/// Fetch a single insight by primary key, regardless of `is_current`.
|
|
/// Used by the few-shot injection flow where the caller picks specific
|
|
/// historical insights (which may have been superseded) as training
|
|
/// exemplars for a fresh generation.
|
|
fn get_insight_by_id(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
insight_id: i32,
|
|
) -> Result<Option<PhotoInsight>, DbError>;
|
|
|
|
fn delete_insight(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
file_path: &str,
|
|
) -> Result<(), DbError>;
|
|
|
|
fn get_all_insights(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
) -> Result<Vec<PhotoInsight>, DbError>;
|
|
|
|
fn rate_insight(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
file_path: &str,
|
|
approved: bool,
|
|
) -> Result<(), DbError>;
|
|
|
|
fn get_approved_insights(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
) -> Result<Vec<PhotoInsight>, DbError>;
|
|
|
|
/// Replace the `training_messages` JSON blob on the current row for
|
|
/// `(library_id, rel_path)`. Used by chat-turn append mode to persist
|
|
/// the extended conversation without inserting a new insight version.
|
|
fn update_training_messages(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
library_id: i32,
|
|
file_path: &str,
|
|
training_messages_json: &str,
|
|
) -> Result<(), DbError>;
|
|
}
|
|
|
|
pub struct SqliteInsightDao {
|
|
connection: Arc<Mutex<SqliteConnection>>,
|
|
}
|
|
|
|
impl Default for SqliteInsightDao {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl SqliteInsightDao {
|
|
pub fn new() -> Self {
|
|
SqliteInsightDao {
|
|
connection: Arc::new(Mutex::new(connect())),
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
#[allow(dead_code)]
|
|
pub fn from_connection(conn: Arc<Mutex<SqliteConnection>>) -> Self {
|
|
SqliteInsightDao { connection: conn }
|
|
}
|
|
}
|
|
|
|
impl InsightDao for SqliteInsightDao {
|
|
fn store_insight(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
mut insight: InsertPhotoInsight,
|
|
) -> Result<PhotoInsight, DbError> {
|
|
trace_db_call(context, "insert", "store_insight", |_span| {
|
|
use schema::photo_insights::dsl::*;
|
|
|
|
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
|
|
|
|
// Eagerly populate content_hash so this insight follows the
|
|
// bytes (CLAUDE.md "Multi-library data model"). Caller-
|
|
// supplied hash wins; otherwise look it up from image_exif
|
|
// for the (library_id, rel_path) tuple. None is acceptable —
|
|
// reconciliation backfills it once the hash lands.
|
|
if insight.content_hash.is_none() {
|
|
use schema::image_exif as ie;
|
|
insight.content_hash = ie::table
|
|
.filter(ie::library_id.eq(insight.library_id))
|
|
.filter(ie::rel_path.eq(&insight.file_path))
|
|
.filter(ie::content_hash.is_not_null())
|
|
.select(ie::content_hash)
|
|
.first::<Option<String>>(connection.deref_mut())
|
|
.ok()
|
|
.flatten();
|
|
}
|
|
|
|
// Mark all existing insights for this file as no longer current
|
|
diesel::update(
|
|
photo_insights
|
|
.filter(library_id.eq(insight.library_id))
|
|
.filter(rel_path.eq(&insight.file_path)),
|
|
)
|
|
.set(is_current.eq(false))
|
|
.execute(connection.deref_mut())
|
|
.map_err(|_| anyhow::anyhow!("Update is_current error"))?;
|
|
|
|
// Insert the new insight as current
|
|
diesel::insert_into(photo_insights)
|
|
.values(&insight)
|
|
.execute(connection.deref_mut())
|
|
.map_err(|_| anyhow::anyhow!("Insert error"))?;
|
|
|
|
// Retrieve the inserted record (is_current = true)
|
|
photo_insights
|
|
.filter(library_id.eq(insight.library_id))
|
|
.filter(rel_path.eq(&insight.file_path))
|
|
.filter(is_current.eq(true))
|
|
.first::<PhotoInsight>(connection.deref_mut())
|
|
.map_err(|_| anyhow::anyhow!("Query error"))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::InsertError))
|
|
}
|
|
|
|
fn get_insight(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
path: &str,
|
|
) -> Result<Option<PhotoInsight>, DbError> {
|
|
trace_db_call(context, "query", "get_insight", |_span| {
|
|
use schema::photo_insights::dsl::*;
|
|
|
|
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
|
|
|
|
photo_insights
|
|
.filter(rel_path.eq(path))
|
|
.filter(is_current.eq(true))
|
|
.first::<PhotoInsight>(connection.deref_mut())
|
|
.optional()
|
|
.map_err(|_| anyhow::anyhow!("Query error"))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn get_insight_for_paths(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
paths: &[String],
|
|
) -> Result<Option<PhotoInsight>, DbError> {
|
|
if paths.is_empty() {
|
|
return Ok(None);
|
|
}
|
|
trace_db_call(context, "query", "get_insight_for_paths", |_span| {
|
|
use schema::photo_insights::dsl::*;
|
|
|
|
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
|
|
|
|
photo_insights
|
|
.filter(rel_path.eq_any(paths))
|
|
.filter(is_current.eq(true))
|
|
.order(generated_at.desc())
|
|
.first::<PhotoInsight>(connection.deref_mut())
|
|
.optional()
|
|
.map_err(|_| anyhow::anyhow!("Query error"))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn get_insight_history(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
path: &str,
|
|
) -> Result<Vec<PhotoInsight>, DbError> {
|
|
trace_db_call(context, "query", "get_insight_history", |_span| {
|
|
use schema::photo_insights::dsl::*;
|
|
|
|
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
|
|
|
|
photo_insights
|
|
.filter(rel_path.eq(path))
|
|
.order(generated_at.desc())
|
|
.load::<PhotoInsight>(connection.deref_mut())
|
|
.map_err(|_| anyhow::anyhow!("Query error"))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn get_insight_by_id(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
insight_id: i32,
|
|
) -> Result<Option<PhotoInsight>, DbError> {
|
|
trace_db_call(context, "query", "get_insight_by_id", |_span| {
|
|
use schema::photo_insights::dsl::*;
|
|
|
|
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
|
|
|
|
photo_insights
|
|
.find(insight_id)
|
|
.first::<PhotoInsight>(connection.deref_mut())
|
|
.optional()
|
|
.map_err(|_| anyhow::anyhow!("Query error"))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn delete_insight(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
path: &str,
|
|
) -> Result<(), DbError> {
|
|
trace_db_call(context, "delete", "delete_insight", |_span| {
|
|
use schema::photo_insights::dsl::*;
|
|
|
|
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
|
|
|
|
diesel::delete(photo_insights.filter(rel_path.eq(path)))
|
|
.execute(connection.deref_mut())
|
|
.map(|_| ())
|
|
.map_err(|_| anyhow::anyhow!("Delete error"))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn get_all_insights(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
) -> Result<Vec<PhotoInsight>, DbError> {
|
|
trace_db_call(context, "query", "get_all_insights", |_span| {
|
|
use schema::photo_insights::dsl::*;
|
|
|
|
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
|
|
|
|
photo_insights
|
|
.filter(is_current.eq(true))
|
|
.order(generated_at.desc())
|
|
.load::<PhotoInsight>(connection.deref_mut())
|
|
.map_err(|_| anyhow::anyhow!("Query error"))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn rate_insight(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
path: &str,
|
|
is_approved: bool,
|
|
) -> Result<(), DbError> {
|
|
trace_db_call(context, "update", "rate_insight", |_span| {
|
|
use schema::photo_insights::dsl::*;
|
|
|
|
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
|
|
|
|
diesel::update(
|
|
photo_insights
|
|
.filter(rel_path.eq(path))
|
|
.filter(is_current.eq(true)),
|
|
)
|
|
.set(approved.eq(Some(is_approved)))
|
|
.execute(connection.deref_mut())
|
|
.map(|_| ())
|
|
.map_err(|_| anyhow::anyhow!("Update error"))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
|
|
}
|
|
|
|
fn get_approved_insights(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
) -> Result<Vec<PhotoInsight>, DbError> {
|
|
trace_db_call(context, "query", "get_approved_insights", |_span| {
|
|
use schema::photo_insights::dsl::*;
|
|
|
|
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
|
|
|
|
photo_insights
|
|
.filter(approved.eq(true))
|
|
.filter(training_messages.is_not_null())
|
|
.order(generated_at.desc())
|
|
.load::<PhotoInsight>(connection.deref_mut())
|
|
.map_err(|_| anyhow::anyhow!("Query error"))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::QueryError))
|
|
}
|
|
|
|
fn update_training_messages(
|
|
&mut self,
|
|
context: &opentelemetry::Context,
|
|
lib_id: i32,
|
|
path: &str,
|
|
training_messages_json: &str,
|
|
) -> Result<(), DbError> {
|
|
trace_db_call(context, "update", "update_training_messages", |_span| {
|
|
use schema::photo_insights::dsl::*;
|
|
|
|
let mut connection = self.connection.lock().expect("Unable to get InsightDao");
|
|
|
|
diesel::update(
|
|
photo_insights
|
|
.filter(library_id.eq(lib_id))
|
|
.filter(rel_path.eq(path))
|
|
.filter(is_current.eq(true)),
|
|
)
|
|
.set(training_messages.eq(Some(training_messages_json.to_string())))
|
|
.execute(connection.deref_mut())
|
|
.map(|_| ())
|
|
.map_err(|_| anyhow::anyhow!("Update error"))
|
|
})
|
|
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
|
|
}
|
|
}
|