feature/insight-jobs #102

Merged
cameron merged 13 commits from feature/insight-jobs into master 2026-06-02 23:41:37 +00:00
13 changed files with 108 additions and 102 deletions
Showing only changes of commit 449ce1fda1 - Show all commits
+5 -6
View File
@@ -97,7 +97,7 @@ pub async fn generation_status_handler(
Ok(Some(job)) => {
return HttpResponse::Ok().json(GenerationStatusResponse {
job_id: job.id,
status: InsightJobStatus::from_str(&job.status),
status: InsightJobStatus::parse(&job.status),
started_at: job.started_at,
completed_at: job.completed_at,
result_insight_id: job.result_insight_id,
@@ -133,7 +133,7 @@ pub async fn generation_status_handler(
Ok(Some(job)) => {
return HttpResponse::Ok().json(GenerationStatusResponse {
job_id: job.id,
status: InsightJobStatus::from_str(&job.status),
status: InsightJobStatus::parse(&job.status),
started_at: job.started_at,
completed_at: job.completed_at,
result_insight_id: job.result_insight_id,
@@ -472,10 +472,9 @@ pub async fn generate_insight_handler(
if let Err(e) = dao.complete_job(&ctx, job_id, id) {
log::error!("Failed to mark job {} as completed: {:?}", job_id, e);
}
} else {
if let Err(e) = dao.fail_job(&ctx, job_id, "generation returned no insight") {
log::error!("Failed to mark job {} as failed: {:?}", job_id, e);
}
} else if let Err(e) = dao.fail_job(&ctx, job_id, "generation returned no insight")
{
log::error!("Failed to mark job {} as failed: {:?}", job_id, e);
}
}
Ok(Ok(Err(e))) => {
+4 -4
View File
@@ -1022,10 +1022,10 @@ impl InsightChatService {
);
let system_msg = ChatMessage::system(system_content);
let mut user_msg = ChatMessage::user(req.user_message.clone());
if backend.images_inline {
if let Some(ref img) = image_base64 {
user_msg.images = Some(vec![img.clone()]);
}
if backend.images_inline
&& let Some(ref img) = image_base64
{
user_msg.images = Some(vec![img.clone()]);
}
let mut messages = vec![system_msg, user_msg];
+4 -4
View File
@@ -4039,10 +4039,10 @@ Return ONLY the summary, nothing else."#,
// user message; describe-then-inline → text was already injected.
let system_msg = ChatMessage::system(system_content);
let mut user_msg = ChatMessage::user(user_content);
if backend.images_inline {
if let Some(ref img) = image_base64 {
user_msg.images = Some(vec![img.clone()]);
}
if backend.images_inline
&& let Some(ref img) = image_base64
{
user_msg.images = Some(vec![img.clone()]);
}
let mut messages = vec![system_msg, user_msg];
+1 -4
View File
@@ -424,10 +424,7 @@ impl OllamaClient {
self.generate_with_images(prompt, system, None).await
}
/// Variant of `generate` that sets Ollama's top-level `think: false`.
/// Used by latency-sensitive callers like the rerank pass, where the
/// task has nothing to reason about and chain-of-thought tokens are
/// wasted wall time. Server-side no-op on non-reasoning models.
#[allow(dead_code)]
pub async fn generate_no_think(&self, prompt: &str, system: Option<&str>) -> Result<String> {
self.generate_with_options(prompt, system, None, Some(false))
.await
+1 -1
View File
@@ -219,7 +219,7 @@ async fn main() -> anyhow::Result<()> {
}
let sim = dot(&vec, &query_vec);
scores.push((sim, rel_path.clone()));
if encoded % 10 == 0 {
if encoded.is_multiple_of(10) {
info!(
"progress: {} encoded, {:.1}s elapsed",
encoded,
+1 -1
View File
@@ -109,7 +109,7 @@ struct SearchError {
/// `None` on malformed bytes — those rows get skipped rather than
/// failing the whole query.
fn decode_embedding(bytes: &[u8]) -> Option<Vec<f32>> {
if bytes.is_empty() || bytes.len() % 4 != 0 {
if bytes.is_empty() || !bytes.len().is_multiple_of(4) {
return None;
}
let mut out = Vec::with_capacity(bytes.len() / 4);
+2
View File
@@ -2103,6 +2103,7 @@ impl ExifDao for SqliteExifDao {
.map_err(|_| DbError::new(DbErrorKind::UpdateError))
}
#[allow(clippy::type_complexity)]
fn list_duplicates_exact(
&mut self,
context: &opentelemetry::Context,
@@ -2198,6 +2199,7 @@ impl ExifDao for SqliteExifDao {
.map_err(|_| DbError::new(DbErrorKind::QueryError))
}
#[allow(clippy::type_complexity)]
fn list_perceptual_candidates(
&mut self,
context: &opentelemetry::Context,
+10 -22
View File
@@ -23,16 +23,8 @@ impl InsightJobStatus {
Self::Cancelled => "cancelled",
}
}
}
impl ToString for InsightJobStatus {
fn to_string(&self) -> String {
self.as_str().to_string()
}
}
impl InsightJobStatus {
pub fn from_str(s: &str) -> Self {
pub fn parse(s: &str) -> Self {
match s {
"running" => Self::Running,
"completed" => Self::Completed,
@@ -49,6 +41,12 @@ impl InsightJobStatus {
}
}
impl std::fmt::Display for InsightJobStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
/// Type of insight generation (standard vs agentic).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
@@ -66,19 +64,9 @@ impl InsightGenerationType {
}
}
impl ToString for InsightGenerationType {
fn to_string(&self) -> String {
self.as_str().to_string()
}
}
impl InsightGenerationType {
pub fn from_str(s: &str) -> Self {
match s {
"standard" => Self::Standard,
"agentic" => Self::Agentic,
_ => Self::Standard,
}
impl std::fmt::Display for InsightGenerationType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
+32 -12
View File
@@ -1024,9 +1024,14 @@ impl FaceDao for SqliteFaceDao {
if let Some(lib) = library_id {
q = q.filter(face_detections::library_id.eq(lib));
}
q.select(diesel::dsl::count_distinct(face_detections::content_hash))
.first(conn.deref_mut())
.with_context(|| "stats: scanned")?
q.select(
#[allow(deprecated)]
{
diesel::dsl::count_distinct(face_detections::content_hash)
},
)
.first(conn.deref_mut())
.with_context(|| "stats: scanned")?
};
let with_faces: i64 = {
let mut q = face_detections::table
@@ -1035,9 +1040,14 @@ impl FaceDao for SqliteFaceDao {
if let Some(lib) = library_id {
q = q.filter(face_detections::library_id.eq(lib));
}
q.select(diesel::dsl::count_distinct(face_detections::content_hash))
.first(conn.deref_mut())
.with_context(|| "stats: with_faces")?
q.select(
#[allow(deprecated)]
{
diesel::dsl::count_distinct(face_detections::content_hash)
},
)
.first(conn.deref_mut())
.with_context(|| "stats: with_faces")?
};
let no_faces: i64 = {
let mut q = face_detections::table
@@ -1046,9 +1056,14 @@ impl FaceDao for SqliteFaceDao {
if let Some(lib) = library_id {
q = q.filter(face_detections::library_id.eq(lib));
}
q.select(diesel::dsl::count_distinct(face_detections::content_hash))
.first(conn.deref_mut())
.with_context(|| "stats: no_faces")?
q.select(
#[allow(deprecated)]
{
diesel::dsl::count_distinct(face_detections::content_hash)
},
)
.first(conn.deref_mut())
.with_context(|| "stats: no_faces")?
};
let failed: i64 = {
let mut q = face_detections::table
@@ -1057,9 +1072,14 @@ impl FaceDao for SqliteFaceDao {
if let Some(lib) = library_id {
q = q.filter(face_detections::library_id.eq(lib));
}
q.select(diesel::dsl::count_distinct(face_detections::content_hash))
.first(conn.deref_mut())
.with_context(|| "stats: failed")?
q.select(
#[allow(deprecated)]
{
diesel::dsl::count_distinct(face_detections::content_hash)
},
)
.first(conn.deref_mut())
.with_context(|| "stats: failed")?
};
// Image-extension filter mirrors `list_unscanned_candidates` so
// SCANNED can actually reach 100%: videos sit in `image_exif` but
+1
View File
@@ -53,6 +53,7 @@ pub fn walk_library_files(base_path: &Path, excluded_dirs: &[String]) -> Vec<Dir
/// used by the watcher's quick-scan tick to skip the long tail. Files
/// whose metadata can't be read are kept; the caller's batch EXIF lookup
/// dedups against existing rows.
#[allow(dead_code)]
pub fn enumerate_indexable_files(
base_path: &Path,
excluded_dirs: &[String],
+18 -18
View File
@@ -133,15 +133,15 @@ pub async fn get_image(
}
});
if let Some(found) = existing {
if let Ok(file) = NamedFile::open(&found) {
span.set_status(Status::Ok);
return file
.use_etag(true)
.use_last_modified(true)
.prefer_utf8(true)
.into_response(&request);
}
if let Some(found) = existing
&& let Ok(file) = NamedFile::open(&found)
{
span.set_status(Status::Ok);
return file
.use_etag(true)
.use_last_modified(true)
.prefer_utf8(true)
.into_response(&request);
}
// Cache miss — generate. Resize + JPEG-encode can take 100500ms
@@ -231,15 +231,15 @@ pub async fn get_image(
}
});
if let Some(found) = existing {
if let Ok(file) = NamedFile::open(&found) {
span.set_status(Status::Ok);
return file
.use_etag(true)
.use_last_modified(true)
.prefer_utf8(true)
.into_response(&request);
}
if let Some(found) = existing
&& let Ok(file) = NamedFile::open(&found)
{
span.set_status(Status::Ok);
return file
.use_etag(true)
.use_last_modified(true)
.prefer_utf8(true)
.into_response(&request);
}
let dest = hash_xlarge_path
+28 -30
View File
@@ -803,38 +803,36 @@ async fn synthesize_merge<D: KnowledgeDao + 'static>(
.json(serde_json::json!({"error": "source_id and target_id must differ"}));
}
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
let (source, target) = {
let cx = opentelemetry::Context::current();
let mut dao = dao.lock().expect("Unable to lock KnowledgeDao");
let source = match dao.get_entity_by_id(&cx, body.source_id) {
Ok(Some(e)) => e,
Ok(None) => {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "source entity not found"}));
}
Err(e) => {
log::error!("synthesize_merge source lookup: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
let source = match dao.get_entity_by_id(&cx, body.source_id) {
Ok(Some(e)) => e,
Ok(None) => {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "source entity not found"}));
}
Err(e) => {
log::error!("synthesize_merge source lookup: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
let target = match dao.get_entity_by_id(&cx, body.target_id) {
Ok(Some(e)) => e,
Ok(None) => {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "target entity not found"}));
}
Err(e) => {
log::error!("synthesize_merge target lookup: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
(source, target)
};
let target = match dao.get_entity_by_id(&cx, body.target_id) {
Ok(Some(e)) => e,
Ok(None) => {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "target entity not found"}));
}
Err(e) => {
log::error!("synthesize_merge target lookup: {:?}", e);
return HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Database error"}));
}
};
// Drop the DAO lock before the LLM call — the generate request
// is the slow part (seconds) and we don't want to block other
// knowledge reads while it runs.
drop(dao);
let source_desc = if source.description.trim().is_empty() {
"(none)".to_string()
+1
View File
@@ -296,6 +296,7 @@ impl GcStats {
|| self.revived > 0
}
#[allow(dead_code)]
pub fn total_deleted(&self) -> usize {
self.deleted_face_detections + self.deleted_tagged_photo + self.deleted_photo_insights
}