Compare commits
6 Commits
feature/in
...
002-agenti
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
54a49a8562 | ||
|
|
c1b6013412 | ||
|
|
5c9f5c7d0b | ||
|
|
091327e5d9 | ||
|
|
7615b9c99b | ||
|
|
5e5a2a3167 |
@@ -55,6 +55,11 @@ The following environment variables configure AI-powered photo insights and dail
|
|||||||
- Used to fetch conversation data for context in insights
|
- Used to fetch conversation data for context in insights
|
||||||
- `SMS_API_TOKEN` - Authentication token for SMS API (optional)
|
- `SMS_API_TOKEN` - Authentication token for SMS API (optional)
|
||||||
|
|
||||||
|
#### Agentic Insight Generation
|
||||||
|
- `AGENTIC_MAX_ITERATIONS` - Maximum tool-call iterations per agentic insight request [default: `10`]
|
||||||
|
- Controls how many times the model can invoke tools before being forced to produce a final answer
|
||||||
|
- Increase for more thorough context gathering; decrease to limit response time
|
||||||
|
|
||||||
#### Fallback Behavior
|
#### Fallback Behavior
|
||||||
- Primary server is tried first with 5-second connection timeout
|
- Primary server is tried first with 5-second connection timeout
|
||||||
- On failure, automatically falls back to secondary server (if configured)
|
- On failure, automatically falls back to secondary server (if configured)
|
||||||
|
|||||||
@@ -211,6 +211,112 @@ pub async fn get_all_insights_handler(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// POST /insights/generate/agentic - Generate insight using agentic tool-calling loop
|
||||||
|
#[post("/insights/generate/agentic")]
|
||||||
|
pub async fn generate_agentic_insight_handler(
|
||||||
|
http_request: HttpRequest,
|
||||||
|
_claims: Claims,
|
||||||
|
request: web::Json<GeneratePhotoInsightRequest>,
|
||||||
|
insight_generator: web::Data<InsightGenerator>,
|
||||||
|
insight_dao: web::Data<std::sync::Mutex<Box<dyn InsightDao>>>,
|
||||||
|
) -> impl Responder {
|
||||||
|
let parent_context = extract_context_from_request(&http_request);
|
||||||
|
let tracer = global_tracer();
|
||||||
|
let mut span = tracer.start_with_context("http.insights.generate_agentic", &parent_context);
|
||||||
|
|
||||||
|
let normalized_path = normalize_path(&request.file_path);
|
||||||
|
|
||||||
|
span.set_attribute(KeyValue::new("file_path", normalized_path.clone()));
|
||||||
|
if let Some(ref model) = request.model {
|
||||||
|
span.set_attribute(KeyValue::new("model", model.clone()));
|
||||||
|
}
|
||||||
|
if let Some(ref prompt) = request.system_prompt {
|
||||||
|
span.set_attribute(KeyValue::new("has_custom_prompt", true));
|
||||||
|
span.set_attribute(KeyValue::new("prompt_length", prompt.len() as i64));
|
||||||
|
}
|
||||||
|
if let Some(ctx) = request.num_ctx {
|
||||||
|
span.set_attribute(KeyValue::new("num_ctx", ctx as i64));
|
||||||
|
}
|
||||||
|
|
||||||
|
let max_iterations: usize = std::env::var("AGENTIC_MAX_ITERATIONS")
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| v.parse().ok())
|
||||||
|
.unwrap_or(10);
|
||||||
|
|
||||||
|
span.set_attribute(KeyValue::new("max_iterations", max_iterations as i64));
|
||||||
|
|
||||||
|
log::info!(
|
||||||
|
"Agentic insight generation triggered for photo: {} with model: {:?}, max_iterations: {}",
|
||||||
|
normalized_path,
|
||||||
|
request.model,
|
||||||
|
max_iterations
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = insight_generator
|
||||||
|
.generate_agentic_insight_for_photo(
|
||||||
|
&normalized_path,
|
||||||
|
request.model.clone(),
|
||||||
|
request.system_prompt.clone(),
|
||||||
|
request.num_ctx,
|
||||||
|
max_iterations,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(()) => {
|
||||||
|
span.set_status(Status::Ok);
|
||||||
|
// Fetch the stored insight to return it
|
||||||
|
let otel_context = opentelemetry::Context::new();
|
||||||
|
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
|
||||||
|
match dao.get_insight(&otel_context, &normalized_path) {
|
||||||
|
Ok(Some(insight)) => {
|
||||||
|
let response = PhotoInsightResponse {
|
||||||
|
id: insight.id,
|
||||||
|
file_path: insight.file_path,
|
||||||
|
title: insight.title,
|
||||||
|
summary: insight.summary,
|
||||||
|
generated_at: insight.generated_at,
|
||||||
|
model_version: insight.model_version,
|
||||||
|
};
|
||||||
|
HttpResponse::Ok().json(response)
|
||||||
|
}
|
||||||
|
Ok(None) => HttpResponse::Ok().json(serde_json::json!({
|
||||||
|
"success": true,
|
||||||
|
"message": "Agentic insight generated successfully"
|
||||||
|
})),
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("Insight stored but failed to retrieve: {:?}", e);
|
||||||
|
HttpResponse::Ok().json(serde_json::json!({
|
||||||
|
"success": true,
|
||||||
|
"message": "Agentic insight generated successfully"
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let error_msg = format!("{:?}", e);
|
||||||
|
log::error!("Failed to generate agentic insight: {}", error_msg);
|
||||||
|
span.set_status(Status::error(error_msg.clone()));
|
||||||
|
|
||||||
|
if error_msg.contains("tool calling not supported")
|
||||||
|
|| error_msg.contains("model not available")
|
||||||
|
{
|
||||||
|
HttpResponse::BadRequest().json(serde_json::json!({
|
||||||
|
"error": format!("Failed to generate agentic insight: {}", error_msg)
|
||||||
|
}))
|
||||||
|
} else if error_msg.contains("error parsing tool call") {
|
||||||
|
HttpResponse::BadRequest().json(serde_json::json!({
|
||||||
|
"error": "Model is not compatible with Ollama's tool calling protocol. Try a model known to support native tool calling (e.g. llama3.1, llama3.2, qwen2.5, mistral-nemo)."
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
HttpResponse::InternalServerError().json(serde_json::json!({
|
||||||
|
"error": format!("Failed to generate agentic insight: {}", error_msg)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// GET /insights/models - List available models from both servers with capabilities
|
/// GET /insights/models - List available models from both servers with capabilities
|
||||||
#[get("/insights/models")]
|
#[get("/insights/models")]
|
||||||
pub async fn get_available_models_handler(
|
pub async fn get_available_models_handler(
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use base64::Engine as _;
|
use base64::Engine as _;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, NaiveDate, Utc};
|
||||||
use image::ImageFormat;
|
use image::ImageFormat;
|
||||||
use opentelemetry::KeyValue;
|
use opentelemetry::KeyValue;
|
||||||
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
|
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
|
||||||
@@ -9,7 +9,7 @@ use std::fs::File;
|
|||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use crate::ai::ollama::OllamaClient;
|
use crate::ai::ollama::{ChatMessage, OllamaClient, Tool};
|
||||||
use crate::ai::sms_client::SmsApiClient;
|
use crate::ai::sms_client::SmsApiClient;
|
||||||
use crate::database::models::InsertPhotoInsight;
|
use crate::database::models::InsertPhotoInsight;
|
||||||
use crate::database::{
|
use crate::database::{
|
||||||
@@ -738,7 +738,11 @@ impl InsightGenerator {
|
|||||||
.map(|t| t.name)
|
.map(|t| t.name)
|
||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
log::info!("Fetched {} tags for photo: {:?}", tag_names.len(), tag_names);
|
log::info!(
|
||||||
|
"Fetched {} tags for photo: {:?}",
|
||||||
|
tag_names.len(),
|
||||||
|
tag_names
|
||||||
|
);
|
||||||
|
|
||||||
// 4. Get location name from GPS coordinates (needed for RAG query)
|
// 4. Get location name from GPS coordinates (needed for RAG query)
|
||||||
let location = match exif {
|
let location = match exif {
|
||||||
@@ -827,7 +831,10 @@ impl InsightGenerator {
|
|||||||
Some(desc)
|
Some(desc)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::warn!("Failed to generate photo description for RAG enrichment: {}", e);
|
log::warn!(
|
||||||
|
"Failed to generate photo description for RAG enrichment: {}",
|
||||||
|
e
|
||||||
|
);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -845,7 +852,11 @@ impl InsightGenerator {
|
|||||||
if !tag_names.is_empty() {
|
if !tag_names.is_empty() {
|
||||||
parts.push(format!("tags: {}", tag_names.join(", ")));
|
parts.push(format!("tags: {}", tag_names.join(", ")));
|
||||||
}
|
}
|
||||||
if parts.is_empty() { None } else { Some(parts.join(". ")) }
|
if parts.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(parts.join(". "))
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut search_enrichment: Option<String> = enriched_query.clone();
|
let mut search_enrichment: Option<String> = enriched_query.clone();
|
||||||
@@ -900,7 +911,11 @@ impl InsightGenerator {
|
|||||||
if !tag_names.is_empty() {
|
if !tag_names.is_empty() {
|
||||||
parts.push(format!("tags: {}", tag_names.join(", ")));
|
parts.push(format!("tags: {}", tag_names.join(", ")));
|
||||||
}
|
}
|
||||||
if parts.is_empty() { None } else { Some(parts.join(". ")) }
|
if parts.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(parts.join(". "))
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Step 3: Try historical RAG (>30 days ago) using extracted topics
|
// Step 3: Try historical RAG (>30 days ago) using extracted topics
|
||||||
@@ -980,7 +995,14 @@ impl InsightGenerator {
|
|||||||
log::info!("No immediate messages found, trying basic RAG as fallback");
|
log::info!("No immediate messages found, trying basic RAG as fallback");
|
||||||
// Fallback to basic RAG even without strong query
|
// Fallback to basic RAG even without strong query
|
||||||
match self
|
match self
|
||||||
.find_relevant_messages_rag(date_taken, None, contact.as_deref(), None, 20, enriched_query.as_deref())
|
.find_relevant_messages_rag(
|
||||||
|
date_taken,
|
||||||
|
None,
|
||||||
|
contact.as_deref(),
|
||||||
|
None,
|
||||||
|
20,
|
||||||
|
enriched_query.as_deref(),
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(rag_messages) if !rag_messages.is_empty() => {
|
Ok(rag_messages) if !rag_messages.is_empty() => {
|
||||||
@@ -1314,6 +1336,849 @@ Return ONLY the summary, nothing else."#,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Tool executors for agentic loop ────────────────────────────────
|
||||||
|
|
||||||
|
/// Dispatch a tool call to the appropriate executor
|
||||||
|
async fn execute_tool(
|
||||||
|
&self,
|
||||||
|
tool_name: &str,
|
||||||
|
arguments: &serde_json::Value,
|
||||||
|
ollama: &OllamaClient,
|
||||||
|
image_base64: &Option<String>,
|
||||||
|
cx: &opentelemetry::Context,
|
||||||
|
) -> String {
|
||||||
|
let result = match tool_name {
|
||||||
|
"search_rag" => self.tool_search_rag(arguments, cx).await,
|
||||||
|
"get_sms_messages" => self.tool_get_sms_messages(arguments, cx).await,
|
||||||
|
"get_calendar_events" => self.tool_get_calendar_events(arguments, cx).await,
|
||||||
|
"get_location_history" => self.tool_get_location_history(arguments, cx).await,
|
||||||
|
"get_file_tags" => self.tool_get_file_tags(arguments, cx).await,
|
||||||
|
"describe_photo" => self.tool_describe_photo(ollama, image_base64).await,
|
||||||
|
"reverse_geocode" => self.tool_reverse_geocode(arguments).await,
|
||||||
|
unknown => format!("Unknown tool: {}", unknown),
|
||||||
|
};
|
||||||
|
if result.starts_with("Error") || result.starts_with("No ") {
|
||||||
|
log::warn!("Tool '{}' result: {}", tool_name, result);
|
||||||
|
} else {
|
||||||
|
log::info!(
|
||||||
|
"Tool '{}' result: {} chars",
|
||||||
|
tool_name,
|
||||||
|
result.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tool: search_rag — semantic search over daily summaries
|
||||||
|
async fn tool_search_rag(
|
||||||
|
&self,
|
||||||
|
args: &serde_json::Value,
|
||||||
|
_cx: &opentelemetry::Context,
|
||||||
|
) -> String {
|
||||||
|
let query = match args.get("query").and_then(|v| v.as_str()) {
|
||||||
|
Some(q) => q.to_string(),
|
||||||
|
None => return "Error: missing required parameter 'query'".to_string(),
|
||||||
|
};
|
||||||
|
let date_str = match args.get("date").and_then(|v| v.as_str()) {
|
||||||
|
Some(d) => d,
|
||||||
|
None => return "Error: missing required parameter 'date'".to_string(),
|
||||||
|
};
|
||||||
|
let date = match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
|
||||||
|
Ok(d) => d,
|
||||||
|
Err(e) => return format!("Error: failed to parse date '{}': {}", date_str, e),
|
||||||
|
};
|
||||||
|
let contact = args
|
||||||
|
.get("contact")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.map(|s| s.to_string());
|
||||||
|
|
||||||
|
log::info!(
|
||||||
|
"tool_search_rag: query='{}', date={}, contact={:?}",
|
||||||
|
query,
|
||||||
|
date,
|
||||||
|
contact
|
||||||
|
);
|
||||||
|
|
||||||
|
match self
|
||||||
|
.find_relevant_messages_rag(date, None, contact.as_deref(), None, 5, Some(&query))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(results) if !results.is_empty() => results.join("\n\n"),
|
||||||
|
Ok(_) => "No relevant messages found.".to_string(),
|
||||||
|
Err(e) => format!("Error searching RAG: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tool: get_sms_messages — fetch SMS messages near a date for a contact
|
||||||
|
async fn tool_get_sms_messages(
|
||||||
|
&self,
|
||||||
|
args: &serde_json::Value,
|
||||||
|
_cx: &opentelemetry::Context,
|
||||||
|
) -> String {
|
||||||
|
let date_str = match args.get("date").and_then(|v| v.as_str()) {
|
||||||
|
Some(d) => d,
|
||||||
|
None => return "Error: missing required parameter 'date'".to_string(),
|
||||||
|
};
|
||||||
|
let contact = args
|
||||||
|
.get("contact")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.map(|s| s.to_string());
|
||||||
|
let days_radius = args
|
||||||
|
.get("days_radius")
|
||||||
|
.and_then(|v| v.as_i64())
|
||||||
|
.unwrap_or(4);
|
||||||
|
|
||||||
|
let date = match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
|
||||||
|
Ok(d) => d,
|
||||||
|
Err(e) => return format!("Error: failed to parse date '{}': {}", date_str, e),
|
||||||
|
};
|
||||||
|
let timestamp = date.and_hms_opt(12, 0, 0).unwrap().and_utc().timestamp();
|
||||||
|
|
||||||
|
log::info!(
|
||||||
|
"tool_get_sms_messages: date={}, contact={:?}, days_radius={}",
|
||||||
|
date,
|
||||||
|
contact,
|
||||||
|
days_radius
|
||||||
|
);
|
||||||
|
|
||||||
|
match self
|
||||||
|
.sms_client
|
||||||
|
.fetch_messages_for_contact(contact.as_deref(), timestamp)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(messages) if !messages.is_empty() => {
|
||||||
|
let formatted: Vec<String> = messages
|
||||||
|
.iter()
|
||||||
|
.take(30)
|
||||||
|
.map(|m| {
|
||||||
|
let sender = if m.is_sent { "Me" } else { &m.contact };
|
||||||
|
let ts = DateTime::from_timestamp(m.timestamp, 0)
|
||||||
|
.map(|dt| dt.format("%Y-%m-%d %H:%M").to_string())
|
||||||
|
.unwrap_or_else(|| "unknown".to_string());
|
||||||
|
format!("[{}] {}: {}", ts, sender, m.body)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
format!(
|
||||||
|
"Found {} messages:\n{}",
|
||||||
|
messages.len(),
|
||||||
|
formatted.join("\n")
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Ok(_) => "No messages found.".to_string(),
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("tool_get_sms_messages failed: {}", e);
|
||||||
|
format!("Error fetching SMS messages: {}", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tool: get_calendar_events — fetch calendar events near a date
|
||||||
|
async fn tool_get_calendar_events(
|
||||||
|
&self,
|
||||||
|
args: &serde_json::Value,
|
||||||
|
cx: &opentelemetry::Context,
|
||||||
|
) -> String {
|
||||||
|
let date_str = match args.get("date").and_then(|v| v.as_str()) {
|
||||||
|
Some(d) => d,
|
||||||
|
None => return "Error: missing required parameter 'date'".to_string(),
|
||||||
|
};
|
||||||
|
let days_radius = args
|
||||||
|
.get("days_radius")
|
||||||
|
.and_then(|v| v.as_i64())
|
||||||
|
.unwrap_or(7);
|
||||||
|
|
||||||
|
let date = match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
|
||||||
|
Ok(d) => d,
|
||||||
|
Err(e) => return format!("Error: failed to parse date '{}': {}", date_str, e),
|
||||||
|
};
|
||||||
|
let timestamp = date.and_hms_opt(12, 0, 0).unwrap().and_utc().timestamp();
|
||||||
|
|
||||||
|
log::info!(
|
||||||
|
"tool_get_calendar_events: date={}, days_radius={}",
|
||||||
|
date,
|
||||||
|
days_radius
|
||||||
|
);
|
||||||
|
|
||||||
|
let events = {
|
||||||
|
let mut dao = self
|
||||||
|
.calendar_dao
|
||||||
|
.lock()
|
||||||
|
.expect("Unable to lock CalendarEventDao");
|
||||||
|
dao.find_relevant_events_hybrid(cx, timestamp, days_radius, None, 10)
|
||||||
|
.ok()
|
||||||
|
};
|
||||||
|
|
||||||
|
match events {
|
||||||
|
Some(evts) if !evts.is_empty() => {
|
||||||
|
let formatted: Vec<String> = evts
|
||||||
|
.iter()
|
||||||
|
.map(|e| {
|
||||||
|
let dt = DateTime::from_timestamp(e.start_time, 0)
|
||||||
|
.map(|dt| dt.format("%Y-%m-%d %H:%M").to_string())
|
||||||
|
.unwrap_or_else(|| "unknown".to_string());
|
||||||
|
let loc = e
|
||||||
|
.location
|
||||||
|
.as_ref()
|
||||||
|
.map(|l| format!(" at {}", l))
|
||||||
|
.unwrap_or_default();
|
||||||
|
let attendees = e
|
||||||
|
.attendees
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|a| serde_json::from_str::<Vec<String>>(a).ok())
|
||||||
|
.map(|list| format!(" (with {})", list.join(", ")))
|
||||||
|
.unwrap_or_default();
|
||||||
|
format!("[{}] {}{}{}", dt, e.summary, loc, attendees)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
format!(
|
||||||
|
"Found {} calendar events:\n{}",
|
||||||
|
evts.len(),
|
||||||
|
formatted.join("\n")
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Some(_) => "No calendar events found.".to_string(),
|
||||||
|
None => "No calendar events found.".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tool: get_location_history — fetch location records near a date
|
||||||
|
async fn tool_get_location_history(
|
||||||
|
&self,
|
||||||
|
args: &serde_json::Value,
|
||||||
|
cx: &opentelemetry::Context,
|
||||||
|
) -> String {
|
||||||
|
let date_str = match args.get("date").and_then(|v| v.as_str()) {
|
||||||
|
Some(d) => d,
|
||||||
|
None => return "Error: missing required parameter 'date'".to_string(),
|
||||||
|
};
|
||||||
|
let days_radius = args
|
||||||
|
.get("days_radius")
|
||||||
|
.and_then(|v| v.as_i64())
|
||||||
|
.unwrap_or(14);
|
||||||
|
|
||||||
|
let date = match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
|
||||||
|
Ok(d) => d,
|
||||||
|
Err(e) => return format!("Error: failed to parse date '{}': {}", date_str, e),
|
||||||
|
};
|
||||||
|
let timestamp = date.and_hms_opt(12, 0, 0).unwrap().and_utc().timestamp();
|
||||||
|
|
||||||
|
log::info!(
|
||||||
|
"tool_get_location_history: date={}, days_radius={}",
|
||||||
|
date,
|
||||||
|
days_radius
|
||||||
|
);
|
||||||
|
|
||||||
|
let start_ts = timestamp - (days_radius * 86400);
|
||||||
|
let end_ts = timestamp + (days_radius * 86400);
|
||||||
|
|
||||||
|
let locations = {
|
||||||
|
let mut dao = self
|
||||||
|
.location_dao
|
||||||
|
.lock()
|
||||||
|
.expect("Unable to lock LocationHistoryDao");
|
||||||
|
dao.find_locations_in_range(cx, start_ts, end_ts).ok()
|
||||||
|
};
|
||||||
|
|
||||||
|
match locations {
|
||||||
|
Some(locs) if !locs.is_empty() => {
|
||||||
|
let formatted: Vec<String> = locs
|
||||||
|
.iter()
|
||||||
|
.take(20)
|
||||||
|
.map(|loc| {
|
||||||
|
let dt = DateTime::from_timestamp(loc.timestamp, 0)
|
||||||
|
.map(|dt| dt.format("%Y-%m-%d %H:%M").to_string())
|
||||||
|
.unwrap_or_else(|| "unknown".to_string());
|
||||||
|
let activity = loc
|
||||||
|
.activity
|
||||||
|
.as_ref()
|
||||||
|
.map(|a| format!(" ({})", a))
|
||||||
|
.unwrap_or_default();
|
||||||
|
let place = loc
|
||||||
|
.place_name
|
||||||
|
.as_ref()
|
||||||
|
.map(|p| format!(" at {}", p))
|
||||||
|
.unwrap_or_default();
|
||||||
|
format!(
|
||||||
|
"[{}] {:.4}, {:.4}{}{}",
|
||||||
|
dt, loc.latitude, loc.longitude, place, activity
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
format!(
|
||||||
|
"Found {} location records:\n{}",
|
||||||
|
locs.len(),
|
||||||
|
formatted.join("\n")
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Some(_) => "No location history found.".to_string(),
|
||||||
|
None => "No location history found.".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tool: get_file_tags — fetch tags for a file path
|
||||||
|
async fn tool_get_file_tags(
|
||||||
|
&self,
|
||||||
|
args: &serde_json::Value,
|
||||||
|
cx: &opentelemetry::Context,
|
||||||
|
) -> String {
|
||||||
|
let file_path = match args.get("file_path").and_then(|v| v.as_str()) {
|
||||||
|
Some(p) => p.to_string(),
|
||||||
|
None => return "Error: missing required parameter 'file_path'".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
log::info!("tool_get_file_tags: file_path='{}'", file_path);
|
||||||
|
|
||||||
|
let tags = {
|
||||||
|
let mut dao = self.tag_dao.lock().expect("Unable to lock TagDao");
|
||||||
|
dao.get_tags_for_path(cx, &file_path).ok()
|
||||||
|
};
|
||||||
|
|
||||||
|
match tags {
|
||||||
|
Some(t) if !t.is_empty() => {
|
||||||
|
let names: Vec<String> = t.into_iter().map(|tag| tag.name).collect();
|
||||||
|
names.join(", ")
|
||||||
|
}
|
||||||
|
Some(_) => "No tags found.".to_string(),
|
||||||
|
None => "No tags found.".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tool: describe_photo — generate a visual description of the photo
|
||||||
|
async fn tool_describe_photo(
|
||||||
|
&self,
|
||||||
|
ollama: &OllamaClient,
|
||||||
|
image_base64: &Option<String>,
|
||||||
|
) -> String {
|
||||||
|
log::info!("tool_describe_photo: generating visual description");
|
||||||
|
|
||||||
|
match image_base64 {
|
||||||
|
Some(img) => match ollama.generate_photo_description(img).await {
|
||||||
|
Ok(desc) => desc,
|
||||||
|
Err(e) => format!("Error describing photo: {}", e),
|
||||||
|
},
|
||||||
|
None => "No image available for description.".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tool: reverse_geocode — convert GPS coordinates to a human-readable place name
|
||||||
|
async fn tool_reverse_geocode(&self, args: &serde_json::Value) -> String {
|
||||||
|
let lat = match args.get("latitude").and_then(|v| v.as_f64()) {
|
||||||
|
Some(v) => v,
|
||||||
|
None => return "Error: missing required parameter 'latitude'".to_string(),
|
||||||
|
};
|
||||||
|
let lon = match args.get("longitude").and_then(|v| v.as_f64()) {
|
||||||
|
Some(v) => v,
|
||||||
|
None => return "Error: missing required parameter 'longitude'".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
log::info!("tool_reverse_geocode: lat={}, lon={}", lat, lon);
|
||||||
|
|
||||||
|
match self.reverse_geocode(lat, lon).await {
|
||||||
|
Some(place) => place,
|
||||||
|
None => "Could not resolve coordinates to a place name.".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Agentic insight generation ──────────────────────────────────────
|
||||||
|
|
||||||
|
/// Build the list of tool definitions for the agentic loop
|
||||||
|
fn build_tool_definitions(has_vision: bool) -> Vec<Tool> {
|
||||||
|
let mut tools = vec![
|
||||||
|
Tool::function(
|
||||||
|
"search_rag",
|
||||||
|
"Search conversation history using semantic search. Use this to find relevant past conversations about specific topics, people, or events.",
|
||||||
|
serde_json::json!({
|
||||||
|
"type": "object",
|
||||||
|
"required": ["query", "date"],
|
||||||
|
"properties": {
|
||||||
|
"query": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "The search query to find relevant conversations"
|
||||||
|
},
|
||||||
|
"date": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "The reference date in YYYY-MM-DD format"
|
||||||
|
},
|
||||||
|
"contact": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Optional contact name to filter results"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
Tool::function(
|
||||||
|
"get_sms_messages",
|
||||||
|
"Fetch SMS/text messages near a specific date. Returns the actual message conversation. Omit contact to search across all conversations.",
|
||||||
|
serde_json::json!({
|
||||||
|
"type": "object",
|
||||||
|
"required": ["date"],
|
||||||
|
"properties": {
|
||||||
|
"date": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "The center date in YYYY-MM-DD format"
|
||||||
|
},
|
||||||
|
"contact": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Optional contact name to filter messages. If omitted, searches all conversations."
|
||||||
|
},
|
||||||
|
"days_radius": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Number of days before and after the date to search (default: 4)"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
Tool::function(
|
||||||
|
"get_calendar_events",
|
||||||
|
"Fetch calendar events near a specific date. Shows scheduled events, meetings, and activities.",
|
||||||
|
serde_json::json!({
|
||||||
|
"type": "object",
|
||||||
|
"required": ["date"],
|
||||||
|
"properties": {
|
||||||
|
"date": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "The center date in YYYY-MM-DD format"
|
||||||
|
},
|
||||||
|
"days_radius": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Number of days before and after the date to search (default: 7)"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
Tool::function(
|
||||||
|
"get_location_history",
|
||||||
|
"Fetch location history records near a specific date. Shows places visited and activities.",
|
||||||
|
serde_json::json!({
|
||||||
|
"type": "object",
|
||||||
|
"required": ["date"],
|
||||||
|
"properties": {
|
||||||
|
"date": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "The center date in YYYY-MM-DD format"
|
||||||
|
},
|
||||||
|
"days_radius": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Number of days before and after the date to search (default: 14)"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
Tool::function(
|
||||||
|
"get_file_tags",
|
||||||
|
"Get tags/labels that have been applied to a specific photo file.",
|
||||||
|
serde_json::json!({
|
||||||
|
"type": "object",
|
||||||
|
"required": ["file_path"],
|
||||||
|
"properties": {
|
||||||
|
"file_path": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "The file path of the photo to get tags for"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
];
|
||||||
|
|
||||||
|
tools.push(Tool::function(
|
||||||
|
"reverse_geocode",
|
||||||
|
"Convert GPS latitude/longitude coordinates to a human-readable place name (city, state). Use this when GPS coordinates are available in the photo metadata, or to resolve coordinates returned by get_location_history.",
|
||||||
|
serde_json::json!({
|
||||||
|
"type": "object",
|
||||||
|
"required": ["latitude", "longitude"],
|
||||||
|
"properties": {
|
||||||
|
"latitude": {
|
||||||
|
"type": "number",
|
||||||
|
"description": "GPS latitude in decimal degrees"
|
||||||
|
},
|
||||||
|
"longitude": {
|
||||||
|
"type": "number",
|
||||||
|
"description": "GPS longitude in decimal degrees"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
));
|
||||||
|
|
||||||
|
if has_vision {
|
||||||
|
tools.push(Tool::function(
|
||||||
|
"describe_photo",
|
||||||
|
"Generate a visual description of the photo. Describes people, location, and activity visible in the image.",
|
||||||
|
serde_json::json!({
|
||||||
|
"type": "object",
|
||||||
|
"properties": {}
|
||||||
|
}),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
tools
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generate an AI insight for a photo using an agentic tool-calling loop.
|
||||||
|
/// The model decides which tools to call to gather context before writing the final insight.
|
||||||
|
pub async fn generate_agentic_insight_for_photo(
|
||||||
|
&self,
|
||||||
|
file_path: &str,
|
||||||
|
custom_model: Option<String>,
|
||||||
|
custom_system_prompt: Option<String>,
|
||||||
|
num_ctx: Option<i32>,
|
||||||
|
max_iterations: usize,
|
||||||
|
) -> Result<()> {
|
||||||
|
let tracer = global_tracer();
|
||||||
|
let current_cx = opentelemetry::Context::current();
|
||||||
|
let mut span = tracer.start_with_context("ai.insight.generate_agentic", ¤t_cx);
|
||||||
|
|
||||||
|
let file_path = normalize_path(file_path);
|
||||||
|
log::info!("Generating agentic insight for photo: {}", file_path);
|
||||||
|
|
||||||
|
span.set_attribute(KeyValue::new("file_path", file_path.clone()));
|
||||||
|
span.set_attribute(KeyValue::new("max_iterations", max_iterations as i64));
|
||||||
|
|
||||||
|
// 1. Create OllamaClient
|
||||||
|
let mut ollama_client = if let Some(ref model) = custom_model {
|
||||||
|
log::info!("Using custom model for agentic: {}", model);
|
||||||
|
span.set_attribute(KeyValue::new("custom_model", model.clone()));
|
||||||
|
OllamaClient::new(
|
||||||
|
self.ollama.primary_url.clone(),
|
||||||
|
self.ollama.fallback_url.clone(),
|
||||||
|
model.clone(),
|
||||||
|
Some(model.clone()),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
span.set_attribute(KeyValue::new("model", self.ollama.primary_model.clone()));
|
||||||
|
self.ollama.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(ctx) = num_ctx {
|
||||||
|
log::info!("Using custom context size: {}", ctx);
|
||||||
|
span.set_attribute(KeyValue::new("num_ctx", ctx as i64));
|
||||||
|
ollama_client.set_num_ctx(Some(ctx));
|
||||||
|
}
|
||||||
|
|
||||||
|
let insight_cx = current_cx.with_span(span);
|
||||||
|
|
||||||
|
// 2a. Verify the model exists on at least one server before checking capabilities
|
||||||
|
if let Some(ref model_name) = custom_model {
|
||||||
|
let available_on_primary =
|
||||||
|
OllamaClient::is_model_available(&ollama_client.primary_url, model_name)
|
||||||
|
.await
|
||||||
|
.unwrap_or(false);
|
||||||
|
|
||||||
|
let available_on_fallback = if let Some(ref fallback_url) = ollama_client.fallback_url {
|
||||||
|
OllamaClient::is_model_available(fallback_url, model_name)
|
||||||
|
.await
|
||||||
|
.unwrap_or(false)
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
|
||||||
|
if !available_on_primary && !available_on_fallback {
|
||||||
|
anyhow::bail!(
|
||||||
|
"model not available: '{}' not found on any configured server",
|
||||||
|
model_name
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2b. Check tool calling capability — try primary, fall back to fallback URL
|
||||||
|
let model_name_for_caps = &ollama_client.primary_model;
|
||||||
|
let capabilities = match OllamaClient::check_model_capabilities(
|
||||||
|
&ollama_client.primary_url,
|
||||||
|
model_name_for_caps,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(caps) => caps,
|
||||||
|
Err(_) => {
|
||||||
|
// Model may only be on the fallback server
|
||||||
|
let fallback_url = ollama_client.fallback_url.as_deref().ok_or_else(|| {
|
||||||
|
anyhow::anyhow!(
|
||||||
|
"Failed to check model capabilities for '{}': model not found on primary server and no fallback configured",
|
||||||
|
model_name_for_caps
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
OllamaClient::check_model_capabilities(fallback_url, model_name_for_caps)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
anyhow::anyhow!(
|
||||||
|
"Failed to check model capabilities for '{}': {}",
|
||||||
|
model_name_for_caps,
|
||||||
|
e
|
||||||
|
)
|
||||||
|
})?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !capabilities.has_tool_calling {
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"tool calling not supported by model '{}'",
|
||||||
|
ollama_client.primary_model
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let has_vision = capabilities.has_vision;
|
||||||
|
insight_cx
|
||||||
|
.span()
|
||||||
|
.set_attribute(KeyValue::new("model_has_vision", has_vision));
|
||||||
|
insight_cx
|
||||||
|
.span()
|
||||||
|
.set_attribute(KeyValue::new("model_has_tool_calling", true));
|
||||||
|
|
||||||
|
// 3. Fetch EXIF
|
||||||
|
let exif = {
|
||||||
|
let mut exif_dao = self.exif_dao.lock().expect("Unable to lock ExifDao");
|
||||||
|
exif_dao
|
||||||
|
.get_exif(&insight_cx, &file_path)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to get EXIF: {:?}", e))?
|
||||||
|
};
|
||||||
|
|
||||||
|
// 4. Extract timestamp and contact
|
||||||
|
let timestamp = if let Some(ts) = exif.as_ref().and_then(|e| e.date_taken) {
|
||||||
|
ts
|
||||||
|
} else {
|
||||||
|
log::warn!("No date_taken in EXIF for {}, trying filename", file_path);
|
||||||
|
extract_date_from_filename(&file_path)
|
||||||
|
.map(|dt| dt.timestamp())
|
||||||
|
.or_else(|| {
|
||||||
|
let full_path = std::path::Path::new(&self.base_path).join(&file_path);
|
||||||
|
File::open(&full_path)
|
||||||
|
.and_then(|f| f.metadata())
|
||||||
|
.and_then(|m| m.created().or(m.modified()))
|
||||||
|
.map(|t| DateTime::<Utc>::from(t).timestamp())
|
||||||
|
.inspect_err(|e| {
|
||||||
|
log::warn!(
|
||||||
|
"Failed to get file timestamp for agentic insight {}: {}",
|
||||||
|
file_path,
|
||||||
|
e
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.ok()
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|| Utc::now().timestamp())
|
||||||
|
};
|
||||||
|
|
||||||
|
let date_taken = DateTime::from_timestamp(timestamp, 0)
|
||||||
|
.map(|dt| dt.date_naive())
|
||||||
|
.unwrap_or_else(|| Utc::now().date_naive());
|
||||||
|
|
||||||
|
let contact = Self::extract_contact_from_path(&file_path);
|
||||||
|
log::info!("Agentic: date_taken={}, contact={:?}", date_taken, contact);
|
||||||
|
|
||||||
|
// 5. Fetch tags
|
||||||
|
let tag_names: Vec<String> = {
|
||||||
|
let mut dao = self.tag_dao.lock().expect("Unable to lock TagDao");
|
||||||
|
dao.get_tags_for_path(&insight_cx, &file_path)
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
log::warn!("Failed to fetch tags for agentic {}: {}", file_path, e);
|
||||||
|
Vec::new()
|
||||||
|
})
|
||||||
|
.into_iter()
|
||||||
|
.map(|t| t.name)
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
|
||||||
|
// 6. Load image if vision capable
|
||||||
|
let image_base64 = if has_vision {
|
||||||
|
match self.load_image_as_base64(&file_path) {
|
||||||
|
Ok(b64) => {
|
||||||
|
log::info!("Loaded image for vision-capable agentic model");
|
||||||
|
Some(b64)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("Failed to load image for agentic vision: {}", e);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
// 7. Build system message
|
||||||
|
let base_system = "You are a personal photo memory assistant helping to reconstruct a memory from a photo.\n\n\
|
||||||
|
IMPORTANT INSTRUCTIONS:\n\
|
||||||
|
1. You MUST call multiple tools to gather context BEFORE writing any final insight. Do not produce a final answer after only one or two tool calls.\n\
|
||||||
|
2. Always call ALL of the following tools that are relevant: search_rag (search conversation summaries), get_sms_messages (fetch nearby messages), get_calendar_events (check what was happening that day), get_location_history (find where this was taken), get_file_tags (retrieve tags).\n\
|
||||||
|
3. Only produce your final insight AFTER you have gathered context from at least 3-4 tools.\n\
|
||||||
|
4. If a tool returns no results, that is useful information — continue calling the remaining tools anyway.\n\
|
||||||
|
5. Your final insight must be written in first person as Cameron, in a journal/memoir style.";
|
||||||
|
let system_content = if let Some(ref custom) = custom_system_prompt {
|
||||||
|
format!("{}\n\n{}", custom, base_system)
|
||||||
|
} else {
|
||||||
|
base_system.to_string()
|
||||||
|
};
|
||||||
|
|
||||||
|
// 8. Build user message
|
||||||
|
let gps_info = exif
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|e| {
|
||||||
|
if let (Some(lat), Some(lon)) = (e.gps_latitude, e.gps_longitude) {
|
||||||
|
Some(format!("GPS: {:.4}, {:.4}", lat, lon))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|| "GPS: unknown".to_string());
|
||||||
|
|
||||||
|
let tags_info = if tag_names.is_empty() {
|
||||||
|
"Tags: none".to_string()
|
||||||
|
} else {
|
||||||
|
format!("Tags: {}", tag_names.join(", "))
|
||||||
|
};
|
||||||
|
|
||||||
|
let contact_info = contact
|
||||||
|
.as_ref()
|
||||||
|
.map(|c| format!("Contact/Person: {}", c))
|
||||||
|
.unwrap_or_else(|| "Contact/Person: unknown".to_string());
|
||||||
|
|
||||||
|
let user_content = format!(
|
||||||
|
"Please analyze this photo and gather context to write a personal journal-style insight.\n\n\
|
||||||
|
Photo file path: {}\n\
|
||||||
|
Date taken: {}\n\
|
||||||
|
{}\n\
|
||||||
|
{}\n\
|
||||||
|
{}\n\n\
|
||||||
|
Use the available tools to gather more context about this moment (messages, calendar events, location history, etc.), \
|
||||||
|
then write a detailed personal insight with a title and summary. Write in first person as Cameron.",
|
||||||
|
file_path,
|
||||||
|
date_taken.format("%B %d, %Y"),
|
||||||
|
contact_info,
|
||||||
|
gps_info,
|
||||||
|
tags_info,
|
||||||
|
);
|
||||||
|
|
||||||
|
// 9. Define tools
|
||||||
|
let tools = Self::build_tool_definitions(has_vision);
|
||||||
|
|
||||||
|
// 10. Build initial messages
|
||||||
|
let system_msg = ChatMessage::system(system_content);
|
||||||
|
let mut user_msg = ChatMessage::user(user_content);
|
||||||
|
if let Some(ref img) = image_base64 {
|
||||||
|
user_msg.images = Some(vec![img.clone()]);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut messages = vec![system_msg, user_msg];
|
||||||
|
|
||||||
|
// 11. Agentic loop
|
||||||
|
let loop_span = tracer.start_with_context("ai.agentic.loop", &insight_cx);
|
||||||
|
let loop_cx = insight_cx.with_span(loop_span);
|
||||||
|
|
||||||
|
let mut final_content = String::new();
|
||||||
|
let mut iterations_used = 0usize;
|
||||||
|
|
||||||
|
for iteration in 0..max_iterations {
|
||||||
|
iterations_used = iteration + 1;
|
||||||
|
log::info!("Agentic iteration {}/{}", iteration + 1, max_iterations);
|
||||||
|
|
||||||
|
let response = ollama_client
|
||||||
|
.chat_with_tools(messages.clone(), tools.clone())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Sanitize tool call arguments before pushing back into history.
|
||||||
|
// Some models occasionally return non-object arguments (bool, string, null)
|
||||||
|
// which Ollama rejects when they are re-sent in a subsequent request.
|
||||||
|
let mut response = response;
|
||||||
|
if let Some(ref mut tool_calls) = response.tool_calls {
|
||||||
|
for tc in tool_calls.iter_mut() {
|
||||||
|
if !tc.function.arguments.is_object() {
|
||||||
|
log::warn!(
|
||||||
|
"Tool '{}' returned non-object arguments ({:?}), normalising to {{}}",
|
||||||
|
tc.function.name,
|
||||||
|
tc.function.arguments
|
||||||
|
);
|
||||||
|
tc.function.arguments = serde_json::Value::Object(Default::default());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
messages.push(response.clone());
|
||||||
|
|
||||||
|
if let Some(ref tool_calls) = response.tool_calls
|
||||||
|
&& !tool_calls.is_empty()
|
||||||
|
{
|
||||||
|
for tool_call in tool_calls {
|
||||||
|
log::info!(
|
||||||
|
"Agentic tool call [{}]: {} {:?}",
|
||||||
|
iteration,
|
||||||
|
tool_call.function.name,
|
||||||
|
tool_call.function.arguments
|
||||||
|
);
|
||||||
|
let result = self
|
||||||
|
.execute_tool(
|
||||||
|
&tool_call.function.name,
|
||||||
|
&tool_call.function.arguments,
|
||||||
|
&ollama_client,
|
||||||
|
&image_base64,
|
||||||
|
&loop_cx,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
messages.push(ChatMessage::tool_result(result));
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// No tool calls — this is the final answer
|
||||||
|
final_content = response.content;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If loop exhausted without final answer, ask for one
|
||||||
|
if final_content.is_empty() {
|
||||||
|
log::info!(
|
||||||
|
"Agentic loop exhausted after {} iterations, requesting final answer",
|
||||||
|
iterations_used
|
||||||
|
);
|
||||||
|
messages.push(ChatMessage::user(
|
||||||
|
"Based on the context gathered, please write the final photo insight: a title and a detailed personal summary. Write in first person as Cameron.",
|
||||||
|
));
|
||||||
|
let final_response = ollama_client.chat_with_tools(messages, vec![]).await?;
|
||||||
|
final_content = final_response.content;
|
||||||
|
}
|
||||||
|
|
||||||
|
loop_cx
|
||||||
|
.span()
|
||||||
|
.set_attribute(KeyValue::new("iterations_used", iterations_used as i64));
|
||||||
|
loop_cx.span().set_status(Status::Ok);
|
||||||
|
|
||||||
|
// 12. Generate title
|
||||||
|
let title = ollama_client
|
||||||
|
.generate_photo_title(&final_content, custom_system_prompt.as_deref())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
log::info!("Agentic generated title: {}", title);
|
||||||
|
log::info!(
|
||||||
|
"Agentic generated summary ({} chars): {}",
|
||||||
|
final_content.len(),
|
||||||
|
&final_content[..final_content.len().min(200)]
|
||||||
|
);
|
||||||
|
|
||||||
|
// 13. Store
|
||||||
|
let insight = InsertPhotoInsight {
|
||||||
|
file_path: file_path.to_string(),
|
||||||
|
title,
|
||||||
|
summary: final_content,
|
||||||
|
generated_at: Utc::now().timestamp(),
|
||||||
|
model_version: ollama_client.primary_model.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut dao = self.insight_dao.lock().expect("Unable to lock InsightDao");
|
||||||
|
let result = dao
|
||||||
|
.store_insight(&insight_cx, insight)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to store agentic insight: {:?}", e));
|
||||||
|
|
||||||
|
match &result {
|
||||||
|
Ok(_) => {
|
||||||
|
log::info!("Successfully stored agentic insight for {}", file_path);
|
||||||
|
insight_cx.span().set_status(Status::Ok);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Failed to store agentic insight: {:?}", e);
|
||||||
|
insight_cx.span().set_status(Status::error(e.to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Reverse geocode GPS coordinates to human-readable place names
|
/// Reverse geocode GPS coordinates to human-readable place names
|
||||||
async fn reverse_geocode(&self, lat: f64, lon: f64) -> Option<String> {
|
async fn reverse_geocode(&self, lat: f64, lon: f64) -> Option<String> {
|
||||||
let url = format!(
|
let url = format!(
|
||||||
@@ -1404,18 +2269,29 @@ mod tests {
|
|||||||
Some("vacation, hiking, mountains".to_string()),
|
Some("vacation, hiking, mountains".to_string()),
|
||||||
);
|
);
|
||||||
assert!(result.contains("## Tags"), "Should include Tags section");
|
assert!(result.contains("## Tags"), "Should include Tags section");
|
||||||
assert!(result.contains("vacation, hiking, mountains"), "Should include tag names");
|
assert!(
|
||||||
|
result.contains("vacation, hiking, mountains"),
|
||||||
|
"Should include tag names"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn combine_contexts_omits_tags_section_when_no_tags() {
|
fn combine_contexts_omits_tags_section_when_no_tags() {
|
||||||
let result = InsightGenerator::combine_contexts(
|
let result = InsightGenerator::combine_contexts(
|
||||||
Some("some messages".to_string()),
|
Some("some messages".to_string()),
|
||||||
None, None, None,
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
None, // no tags
|
None, // no tags
|
||||||
);
|
);
|
||||||
assert!(!result.contains("## Tags"), "Should not include Tags section when None");
|
assert!(
|
||||||
assert!(result.contains("## Messages"), "Should still include Messages");
|
!result.contains("## Tags"),
|
||||||
|
"Should not include Tags section when None"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
result.contains("## Messages"),
|
||||||
|
"Should still include Messages"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -8,8 +8,8 @@ pub mod sms_client;
|
|||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
pub use daily_summary_job::{generate_daily_summaries, strip_summary_boilerplate};
|
pub use daily_summary_job::{generate_daily_summaries, strip_summary_boilerplate};
|
||||||
pub use handlers::{
|
pub use handlers::{
|
||||||
delete_insight_handler, generate_insight_handler, get_all_insights_handler,
|
delete_insight_handler, generate_agentic_insight_handler, generate_insight_handler,
|
||||||
get_available_models_handler, get_insight_handler,
|
get_all_insights_handler, get_available_models_handler, get_insight_handler,
|
||||||
};
|
};
|
||||||
pub use insight_generator::InsightGenerator;
|
pub use insight_generator::InsightGenerator;
|
||||||
pub use ollama::{ModelCapabilities, OllamaClient};
|
pub use ollama::{ModelCapabilities, OllamaClient};
|
||||||
|
|||||||
244
src/ai/ollama.rs
244
src/ai/ollama.rs
@@ -1,4 +1,4 @@
|
|||||||
use anyhow::Result;
|
use anyhow::{Context, Result};
|
||||||
use chrono::NaiveDate;
|
use chrono::NaiveDate;
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@@ -176,10 +176,13 @@ impl OllamaClient {
|
|||||||
|
|
||||||
// Check if "vision" is in the capabilities array
|
// Check if "vision" is in the capabilities array
|
||||||
let has_vision = show_response.capabilities.iter().any(|cap| cap == "vision");
|
let has_vision = show_response.capabilities.iter().any(|cap| cap == "vision");
|
||||||
|
// Check if "tools" is in the capabilities array
|
||||||
|
let has_tool_calling = show_response.capabilities.iter().any(|cap| cap == "tools");
|
||||||
|
|
||||||
Ok(ModelCapabilities {
|
Ok(ModelCapabilities {
|
||||||
name: model_name.to_string(),
|
name: model_name.to_string(),
|
||||||
has_vision,
|
has_vision,
|
||||||
|
has_tool_calling,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -206,10 +209,11 @@ impl OllamaClient {
|
|||||||
Ok(cap) => capabilities.push(cap),
|
Ok(cap) => capabilities.push(cap),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::warn!("Failed to get capabilities for model {}: {}", model_name, e);
|
log::warn!("Failed to get capabilities for model {}: {}", model_name, e);
|
||||||
// Fallback: assume no vision if we can't check
|
// Fallback: assume no vision/tools if we can't check
|
||||||
capabilities.push(ModelCapabilities {
|
capabilities.push(ModelCapabilities {
|
||||||
name: model_name,
|
name: model_name,
|
||||||
has_vision: false,
|
has_vision: false,
|
||||||
|
has_tool_calling: false,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -254,7 +258,7 @@ impl OllamaClient {
|
|||||||
prompt: prompt.to_string(),
|
prompt: prompt.to_string(),
|
||||||
stream: false,
|
stream: false,
|
||||||
system: system.map(|s| s.to_string()),
|
system: system.map(|s| s.to_string()),
|
||||||
options: self.num_ctx.map(|ctx| OllamaOptions { num_ctx: ctx }),
|
options: self.num_ctx.map(|ctx| OllamaOptions { num_ctx: Some(ctx) }),
|
||||||
images,
|
images,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -496,6 +500,132 @@ Analyze the image and use specific details from both the visual content and the
|
|||||||
Ok(description.trim().to_string())
|
Ok(description.trim().to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send a chat request with tool definitions to /api/chat.
|
||||||
|
/// Returns the assistant's response message (may contain tool_calls or final content).
|
||||||
|
/// Uses primary/fallback URL routing same as other generation methods.
|
||||||
|
pub async fn chat_with_tools(
|
||||||
|
&self,
|
||||||
|
messages: Vec<ChatMessage>,
|
||||||
|
tools: Vec<Tool>,
|
||||||
|
) -> Result<ChatMessage> {
|
||||||
|
// Try primary server first
|
||||||
|
log::info!(
|
||||||
|
"Attempting chat_with_tools with primary server: {} (model: {})",
|
||||||
|
self.primary_url,
|
||||||
|
self.primary_model
|
||||||
|
);
|
||||||
|
let primary_result = self
|
||||||
|
.try_chat_with_tools(&self.primary_url, messages.clone(), tools.clone())
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match primary_result {
|
||||||
|
Ok(response) => {
|
||||||
|
log::info!("Successfully got chat_with_tools response from primary server");
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("Primary server chat_with_tools failed: {}", e);
|
||||||
|
|
||||||
|
// Try fallback server if available
|
||||||
|
if let Some(fallback_url) = &self.fallback_url {
|
||||||
|
let fallback_model =
|
||||||
|
self.fallback_model.as_ref().unwrap_or(&self.primary_model);
|
||||||
|
|
||||||
|
log::info!(
|
||||||
|
"Attempting chat_with_tools with fallback server: {} (model: {})",
|
||||||
|
fallback_url,
|
||||||
|
fallback_model
|
||||||
|
);
|
||||||
|
match self
|
||||||
|
.try_chat_with_tools(fallback_url, messages, tools)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(response) => {
|
||||||
|
log::info!(
|
||||||
|
"Successfully got chat_with_tools response from fallback server"
|
||||||
|
);
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
Err(fallback_e) => {
|
||||||
|
log::error!(
|
||||||
|
"Fallback server chat_with_tools also failed: {}",
|
||||||
|
fallback_e
|
||||||
|
);
|
||||||
|
Err(anyhow::anyhow!(
|
||||||
|
"Both primary and fallback servers failed. Primary: {}, Fallback: {}",
|
||||||
|
e,
|
||||||
|
fallback_e
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log::error!("No fallback server configured");
|
||||||
|
Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn try_chat_with_tools(
|
||||||
|
&self,
|
||||||
|
base_url: &str,
|
||||||
|
messages: Vec<ChatMessage>,
|
||||||
|
tools: Vec<Tool>,
|
||||||
|
) -> Result<ChatMessage> {
|
||||||
|
let url = format!("{}/api/chat", base_url);
|
||||||
|
let model = if base_url == self.primary_url {
|
||||||
|
&self.primary_model
|
||||||
|
} else {
|
||||||
|
self.fallback_model
|
||||||
|
.as_deref()
|
||||||
|
.unwrap_or(&self.primary_model)
|
||||||
|
};
|
||||||
|
|
||||||
|
let options = self.num_ctx.map(|ctx| OllamaOptions { num_ctx: Some(ctx) });
|
||||||
|
|
||||||
|
let request_body = OllamaChatRequest {
|
||||||
|
model,
|
||||||
|
messages: &messages,
|
||||||
|
stream: false,
|
||||||
|
tools,
|
||||||
|
options,
|
||||||
|
};
|
||||||
|
|
||||||
|
let request_json = serde_json::to_string(&request_body)
|
||||||
|
.unwrap_or_else(|e| format!("<serialization error: {}>", e));
|
||||||
|
log::debug!("chat_with_tools request body: {}", request_json);
|
||||||
|
|
||||||
|
let response = self
|
||||||
|
.client
|
||||||
|
.post(&url)
|
||||||
|
.json(&request_body)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("Failed to connect to Ollama at {}", url))?;
|
||||||
|
|
||||||
|
if !response.status().is_success() {
|
||||||
|
let status = response.status();
|
||||||
|
let body = response.text().await.unwrap_or_default();
|
||||||
|
log::error!(
|
||||||
|
"chat_with_tools request body that caused {}: {}",
|
||||||
|
status,
|
||||||
|
request_json
|
||||||
|
);
|
||||||
|
anyhow::bail!(
|
||||||
|
"Ollama chat request failed with status {}: {}",
|
||||||
|
status,
|
||||||
|
body
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let chat_response: OllamaChatResponse = response
|
||||||
|
.json()
|
||||||
|
.await
|
||||||
|
.with_context(|| "Failed to parse Ollama chat response")?;
|
||||||
|
|
||||||
|
Ok(chat_response.message)
|
||||||
|
}
|
||||||
|
|
||||||
/// Generate an embedding vector for text using nomic-embed-text:v1.5
|
/// Generate an embedding vector for text using nomic-embed-text:v1.5
|
||||||
/// Returns a 768-dimensional vector as Vec<f32>
|
/// Returns a 768-dimensional vector as Vec<f32>
|
||||||
pub async fn generate_embedding(&self, text: &str) -> Result<Vec<f32>> {
|
pub async fn generate_embedding(&self, text: &str) -> Result<Vec<f32>> {
|
||||||
@@ -640,7 +770,112 @@ struct OllamaRequest {
|
|||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
struct OllamaOptions {
|
struct OllamaOptions {
|
||||||
num_ctx: i32,
|
num_ctx: Option<i32>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tool definition sent in /api/chat requests (OpenAI-compatible format)
|
||||||
|
#[derive(Serialize, Clone, Debug)]
|
||||||
|
pub struct Tool {
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub tool_type: String, // always "function"
|
||||||
|
pub function: ToolFunction,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Clone, Debug)]
|
||||||
|
pub struct ToolFunction {
|
||||||
|
pub name: String,
|
||||||
|
pub description: String,
|
||||||
|
pub parameters: serde_json::Value,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Tool {
|
||||||
|
pub fn function(name: &str, description: &str, parameters: serde_json::Value) -> Self {
|
||||||
|
Self {
|
||||||
|
tool_type: "function".to_string(),
|
||||||
|
function: ToolFunction {
|
||||||
|
name: name.to_string(),
|
||||||
|
description: description.to_string(),
|
||||||
|
parameters,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A message in the chat conversation history
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
|
pub struct ChatMessage {
|
||||||
|
pub role: String, // "system" | "user" | "assistant" | "tool"
|
||||||
|
/// Empty string (not null) when tool_calls is present — Ollama quirk
|
||||||
|
#[serde(default)]
|
||||||
|
pub content: String,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub tool_calls: Option<Vec<ToolCall>>,
|
||||||
|
/// Base64 images — only on user messages to vision-capable models
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub images: Option<Vec<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ChatMessage {
|
||||||
|
pub fn system(content: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
role: "system".to_string(),
|
||||||
|
content: content.into(),
|
||||||
|
tool_calls: None,
|
||||||
|
images: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn user(content: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
role: "user".to_string(),
|
||||||
|
content: content.into(),
|
||||||
|
tool_calls: None,
|
||||||
|
images: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn tool_result(content: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
role: "tool".to_string(),
|
||||||
|
content: content.into(),
|
||||||
|
tool_calls: None,
|
||||||
|
images: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tool call returned by the model in an assistant message
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
|
pub struct ToolCall {
|
||||||
|
pub function: ToolCallFunction,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub id: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
|
pub struct ToolCallFunction {
|
||||||
|
pub name: String,
|
||||||
|
/// Native JSON object (NOT a JSON-encoded string like OpenAI)
|
||||||
|
pub arguments: serde_json::Value,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
struct OllamaChatRequest<'a> {
|
||||||
|
model: &'a str,
|
||||||
|
messages: &'a [ChatMessage],
|
||||||
|
stream: bool,
|
||||||
|
#[serde(skip_serializing_if = "Vec::is_empty")]
|
||||||
|
tools: Vec<Tool>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
options: Option<OllamaOptions>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Debug)]
|
||||||
|
struct OllamaChatResponse {
|
||||||
|
message: ChatMessage,
|
||||||
|
#[allow(dead_code)]
|
||||||
|
done: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
done_reason: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
@@ -668,6 +903,7 @@ struct OllamaShowResponse {
|
|||||||
pub struct ModelCapabilities {
|
pub struct ModelCapabilities {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub has_vision: bool,
|
pub has_vision: bool,
|
||||||
|
pub has_tool_calling: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use std::ops::DerefMut;
|
|||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use crate::database::models::{InsertVideoPreviewClip, VideoPreviewClip};
|
use crate::database::models::{InsertVideoPreviewClip, VideoPreviewClip};
|
||||||
use crate::database::{connect, DbError, DbErrorKind};
|
use crate::database::{DbError, DbErrorKind, connect};
|
||||||
use crate::otel::trace_db_call;
|
use crate::otel::trace_db_call;
|
||||||
|
|
||||||
pub trait PreviewDao: Sync + Send {
|
pub trait PreviewDao: Sync + Send {
|
||||||
@@ -232,10 +232,7 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Status should remain "pending" from the first insert
|
// Status should remain "pending" from the first insert
|
||||||
let clip = dao
|
let clip = dao.get_preview(&ctx, "photos/video.mp4").unwrap().unwrap();
|
||||||
.get_preview(&ctx, "photos/video.mp4")
|
|
||||||
.unwrap()
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(clip.status, "pending");
|
assert_eq!(clip.status, "pending");
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -256,10 +253,7 @@ mod tests {
|
|||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let clip = dao
|
let clip = dao.get_preview(&ctx, "photos/video.mp4").unwrap().unwrap();
|
||||||
.get_preview(&ctx, "photos/video.mp4")
|
|
||||||
.unwrap()
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(clip.status, "complete");
|
assert_eq!(clip.status, "complete");
|
||||||
assert_eq!(clip.duration_seconds, Some(9.5));
|
assert_eq!(clip.duration_seconds, Some(9.5));
|
||||||
assert_eq!(clip.file_size_bytes, Some(1024000));
|
assert_eq!(clip.file_size_bytes, Some(1024000));
|
||||||
@@ -283,10 +277,7 @@ mod tests {
|
|||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let clip = dao
|
let clip = dao.get_preview(&ctx, "photos/video.mp4").unwrap().unwrap();
|
||||||
.get_preview(&ctx, "photos/video.mp4")
|
|
||||||
.unwrap()
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(clip.status, "failed");
|
assert_eq!(clip.status, "failed");
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
clip.error_message.as_deref(),
|
clip.error_message.as_deref(),
|
||||||
|
|||||||
@@ -1490,7 +1490,8 @@ mod tests {
|
|||||||
let request: Query<FilesRequest> =
|
let request: Query<FilesRequest> =
|
||||||
Query::from_query("path=&tag_ids=1,3&recursive=true").unwrap();
|
Query::from_query("path=&tag_ids=1,3&recursive=true").unwrap();
|
||||||
|
|
||||||
let mut tag_dao = SqliteTagDao::new(std::sync::Arc::new(Mutex::new(in_memory_db_connection())));
|
let mut tag_dao =
|
||||||
|
SqliteTagDao::new(std::sync::Arc::new(Mutex::new(in_memory_db_connection())));
|
||||||
|
|
||||||
let tag1 = tag_dao
|
let tag1 = tag_dao
|
||||||
.create_tag(&opentelemetry::Context::current(), "tag1")
|
.create_tag(&opentelemetry::Context::current(), "tag1")
|
||||||
@@ -1536,7 +1537,8 @@ mod tests {
|
|||||||
exp: 12345,
|
exp: 12345,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut tag_dao = SqliteTagDao::new(std::sync::Arc::new(Mutex::new(in_memory_db_connection())));
|
let mut tag_dao =
|
||||||
|
SqliteTagDao::new(std::sync::Arc::new(Mutex::new(in_memory_db_connection())));
|
||||||
|
|
||||||
let tag1 = tag_dao
|
let tag1 = tag_dao
|
||||||
.create_tag(&opentelemetry::Context::current(), "tag1")
|
.create_tag(&opentelemetry::Context::current(), "tag1")
|
||||||
|
|||||||
59
src/main.rs
59
src/main.rs
@@ -600,8 +600,7 @@ async fn get_video_preview(
|
|||||||
Some(path) => path,
|
Some(path) => path,
|
||||||
None => {
|
None => {
|
||||||
span.set_status(Status::error("Invalid path"));
|
span.set_status(Status::error("Invalid path"));
|
||||||
return HttpResponse::BadRequest()
|
return HttpResponse::BadRequest().json(serde_json::json!({"error": "Invalid path"}));
|
||||||
.json(serde_json::json!({"error": "Invalid path"}));
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -634,8 +633,7 @@ async fn get_video_preview(
|
|||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
// File missing on disk but DB says complete - reset and regenerate
|
// File missing on disk but DB says complete - reset and regenerate
|
||||||
let mut dao =
|
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
|
||||||
preview_dao.lock().expect("Unable to lock PreviewDao");
|
|
||||||
let _ = dao.update_status(
|
let _ = dao.update_status(
|
||||||
&context,
|
&context,
|
||||||
&relative_path,
|
&relative_path,
|
||||||
@@ -665,12 +663,10 @@ async fn get_video_preview(
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
"failed" => {
|
"failed" => {
|
||||||
let error_msg =
|
let error_msg = clip
|
||||||
clip.error_message.unwrap_or_else(|| "Unknown error".to_string());
|
.error_message
|
||||||
span.set_status(Status::error(format!(
|
.unwrap_or_else(|| "Unknown error".to_string());
|
||||||
"Generation failed: {}",
|
span.set_status(Status::error(format!("Generation failed: {}", error_msg)));
|
||||||
error_msg
|
|
||||||
)));
|
|
||||||
HttpResponse::InternalServerError().json(serde_json::json!({
|
HttpResponse::InternalServerError().json(serde_json::json!({
|
||||||
"error": format!("Generation failed: {}", error_msg)
|
"error": format!("Generation failed: {}", error_msg)
|
||||||
}))
|
}))
|
||||||
@@ -708,8 +704,7 @@ async fn get_video_preview(
|
|||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
span.set_status(Status::error("Database error"));
|
span.set_status(Status::error("Database error"));
|
||||||
HttpResponse::InternalServerError()
|
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||||
.json(serde_json::json!({"error": "Database error"}))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -768,10 +763,7 @@ async fn get_preview_status(
|
|||||||
path: path.clone(),
|
path: path.clone(),
|
||||||
status: clip.status.clone(),
|
status: clip.status.clone(),
|
||||||
preview_url: if clip.status == "complete" {
|
preview_url: if clip.status == "complete" {
|
||||||
Some(format!(
|
Some(format!("/video/preview?path={}", urlencoding::encode(path)))
|
||||||
"/video/preview?path={}",
|
|
||||||
urlencoding::encode(path)
|
|
||||||
))
|
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
},
|
},
|
||||||
@@ -810,8 +802,7 @@ async fn get_preview_status(
|
|||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
span.set_status(Status::error("Database error"));
|
span.set_status(Status::error("Database error"));
|
||||||
HttpResponse::InternalServerError()
|
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||||
.json(serde_json::json!({"error": "Database error"}))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1189,6 +1180,7 @@ fn main() -> std::io::Result<()> {
|
|||||||
.service(get_file_metadata)
|
.service(get_file_metadata)
|
||||||
.service(memories::list_memories)
|
.service(memories::list_memories)
|
||||||
.service(ai::generate_insight_handler)
|
.service(ai::generate_insight_handler)
|
||||||
|
.service(ai::generate_agentic_insight_handler)
|
||||||
.service(ai::get_insight_handler)
|
.service(ai::get_insight_handler)
|
||||||
.service(ai::delete_insight_handler)
|
.service(ai::delete_insight_handler)
|
||||||
.service(ai::get_all_insights_handler)
|
.service(ai::get_all_insights_handler)
|
||||||
@@ -1212,9 +1204,7 @@ fn main() -> std::io::Result<()> {
|
|||||||
.app_data::<Data<Mutex<Box<dyn PreviewDao>>>>(Data::new(Mutex::new(Box::new(
|
.app_data::<Data<Mutex<Box<dyn PreviewDao>>>>(Data::new(Mutex::new(Box::new(
|
||||||
preview_dao,
|
preview_dao,
|
||||||
))))
|
))))
|
||||||
.app_data(
|
.app_data(web::JsonConfig::default().error_handler(|err, req| {
|
||||||
web::JsonConfig::default()
|
|
||||||
.error_handler(|err, req| {
|
|
||||||
let detail = err.to_string();
|
let detail = err.to_string();
|
||||||
log::warn!(
|
log::warn!(
|
||||||
"JSON parse error on {} {}: {}",
|
"JSON parse error on {} {}: {}",
|
||||||
@@ -1222,11 +1212,10 @@ fn main() -> std::io::Result<()> {
|
|||||||
req.uri(),
|
req.uri(),
|
||||||
detail
|
detail
|
||||||
);
|
);
|
||||||
let response = HttpResponse::BadRequest()
|
let response =
|
||||||
.json(serde_json::json!({"error": detail}));
|
HttpResponse::BadRequest().json(serde_json::json!({"error": detail}));
|
||||||
actix_web::error::InternalError::from_response(err, response).into()
|
actix_web::error::InternalError::from_response(err, response).into()
|
||||||
}),
|
}))
|
||||||
)
|
|
||||||
.app_data::<Data<InsightGenerator>>(Data::new(app_data.insight_generator.clone()))
|
.app_data::<Data<InsightGenerator>>(Data::new(app_data.insight_generator.clone()))
|
||||||
.wrap(prometheus.clone())
|
.wrap(prometheus.clone())
|
||||||
})
|
})
|
||||||
@@ -1764,9 +1753,7 @@ mod tests {
|
|||||||
// Verify the DAO now has a pending record
|
// Verify the DAO now has a pending record
|
||||||
let mut dao_lock = preview_dao.lock().unwrap();
|
let mut dao_lock = preview_dao.lock().unwrap();
|
||||||
let ctx = opentelemetry::Context::new();
|
let ctx = opentelemetry::Context::new();
|
||||||
let clip = dao_lock
|
let clip = dao_lock.get_preview(&ctx, "photos/new_video.mp4").unwrap();
|
||||||
.get_preview(&ctx, "photos/new_video.mp4")
|
|
||||||
.unwrap();
|
|
||||||
assert!(clip.is_some());
|
assert!(clip.is_some());
|
||||||
assert_eq!(clip.unwrap().status, "pending");
|
assert_eq!(clip.unwrap().status, "pending");
|
||||||
}
|
}
|
||||||
@@ -1777,7 +1764,14 @@ mod tests {
|
|||||||
let ctx = opentelemetry::Context::new();
|
let ctx = opentelemetry::Context::new();
|
||||||
dao.insert_preview(&ctx, "photos/done.mp4", "pending")
|
dao.insert_preview(&ctx, "photos/done.mp4", "pending")
|
||||||
.unwrap();
|
.unwrap();
|
||||||
dao.update_status(&ctx, "photos/done.mp4", "complete", Some(9.5), Some(500000), None)
|
dao.update_status(
|
||||||
|
&ctx,
|
||||||
|
"photos/done.mp4",
|
||||||
|
"complete",
|
||||||
|
Some(9.5),
|
||||||
|
Some(500000),
|
||||||
|
None,
|
||||||
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let preview_dao = make_preview_dao(dao);
|
let preview_dao = make_preview_dao(dao);
|
||||||
@@ -1805,7 +1799,12 @@ mod tests {
|
|||||||
let previews = body["previews"].as_array().unwrap();
|
let previews = body["previews"].as_array().unwrap();
|
||||||
assert_eq!(previews.len(), 1);
|
assert_eq!(previews.len(), 1);
|
||||||
assert_eq!(previews[0]["status"], "complete");
|
assert_eq!(previews[0]["status"], "complete");
|
||||||
assert!(previews[0]["preview_url"].as_str().unwrap().contains("photos%2Fdone.mp4"));
|
assert!(
|
||||||
|
previews[0]["preview_url"]
|
||||||
|
.as_str()
|
||||||
|
.unwrap()
|
||||||
|
.contains("photos%2Fdone.mp4")
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
|
|||||||
14
src/state.rs
14
src/state.rs
@@ -46,11 +46,8 @@ impl AppState {
|
|||||||
let video_playlist_manager =
|
let video_playlist_manager =
|
||||||
VideoPlaylistManager::new(video_path.clone(), playlist_generator.start());
|
VideoPlaylistManager::new(video_path.clone(), playlist_generator.start());
|
||||||
|
|
||||||
let preview_clip_generator = PreviewClipGenerator::new(
|
let preview_clip_generator =
|
||||||
preview_clips_path.clone(),
|
PreviewClipGenerator::new(preview_clips_path.clone(), base_path.clone(), preview_dao);
|
||||||
base_path.clone(),
|
|
||||||
preview_dao,
|
|
||||||
);
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
stream_manager,
|
stream_manager,
|
||||||
@@ -141,9 +138,10 @@ impl Default for AppState {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Ensure preview clips directory exists
|
// Ensure preview clips directory exists
|
||||||
let preview_clips_path = env::var("PREVIEW_CLIPS_DIRECTORY")
|
let preview_clips_path =
|
||||||
.unwrap_or_else(|_| "preview_clips".to_string());
|
env::var("PREVIEW_CLIPS_DIRECTORY").unwrap_or_else(|_| "preview_clips".to_string());
|
||||||
std::fs::create_dir_all(&preview_clips_path).expect("Failed to create PREVIEW_CLIPS_DIRECTORY");
|
std::fs::create_dir_all(&preview_clips_path)
|
||||||
|
.expect("Failed to create PREVIEW_CLIPS_DIRECTORY");
|
||||||
|
|
||||||
Self::new(
|
Self::new(
|
||||||
Arc::new(StreamActor {}.start()),
|
Arc::new(StreamActor {}.start()),
|
||||||
|
|||||||
@@ -159,12 +159,14 @@ async fn get_video_rotation(video_path: &str) -> i32 {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
if let Ok(output) = output
|
if let Ok(output) = output
|
||||||
&& output.status.success() {
|
&& output.status.success()
|
||||||
|
{
|
||||||
let rotation_str = String::from_utf8_lossy(&output.stdout);
|
let rotation_str = String::from_utf8_lossy(&output.stdout);
|
||||||
let rotation_str = rotation_str.trim();
|
let rotation_str = rotation_str.trim();
|
||||||
if !rotation_str.is_empty()
|
if !rotation_str.is_empty()
|
||||||
&& let Ok(rotation) = rotation_str.parse::<i32>()
|
&& let Ok(rotation) = rotation_str.parse::<i32>()
|
||||||
&& rotation != 0 {
|
&& rotation != 0
|
||||||
|
{
|
||||||
debug!(
|
debug!(
|
||||||
"Detected rotation {}° from stream tag for {}",
|
"Detected rotation {}° from stream tag for {}",
|
||||||
rotation, video_path
|
rotation, video_path
|
||||||
@@ -188,11 +190,13 @@ async fn get_video_rotation(video_path: &str) -> i32 {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
if let Ok(output) = output
|
if let Ok(output) = output
|
||||||
&& output.status.success() {
|
&& output.status.success()
|
||||||
|
{
|
||||||
let rotation_str = String::from_utf8_lossy(&output.stdout);
|
let rotation_str = String::from_utf8_lossy(&output.stdout);
|
||||||
let rotation_str = rotation_str.trim();
|
let rotation_str = rotation_str.trim();
|
||||||
if !rotation_str.is_empty()
|
if !rotation_str.is_empty()
|
||||||
&& let Ok(rotation) = rotation_str.parse::<f64>() {
|
&& let Ok(rotation) = rotation_str.parse::<f64>()
|
||||||
|
{
|
||||||
let rotation = rotation.abs() as i32;
|
let rotation = rotation.abs() as i32;
|
||||||
if rotation != 0 {
|
if rotation != 0 {
|
||||||
debug!(
|
debug!(
|
||||||
@@ -550,7 +554,8 @@ impl Handler<GeneratePreviewClipMessage> for PreviewClipGenerator {
|
|||||||
{
|
{
|
||||||
let otel_ctx = opentelemetry::Context::current();
|
let otel_ctx = opentelemetry::Context::current();
|
||||||
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
|
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
|
||||||
let _ = dao.update_status(&otel_ctx, &relative_path, "processing", None, None, None);
|
let _ =
|
||||||
|
dao.update_status(&otel_ctx, &relative_path, "processing", None, None, None);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compute output path: join preview_clips_dir with relative path, change ext to .mp4
|
// Compute output path: join preview_clips_dir with relative path, change ext to .mp4
|
||||||
|
|||||||
@@ -183,7 +183,11 @@ impl Ffmpeg {
|
|||||||
Ok(output_file.to_string())
|
Ok(output_file.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn create_gif_from_frames(&self, frame_base_dir: &str, output_file: &str) -> Result<i32> {
|
pub async fn create_gif_from_frames(
|
||||||
|
&self,
|
||||||
|
frame_base_dir: &str,
|
||||||
|
output_file: &str,
|
||||||
|
) -> Result<i32> {
|
||||||
let output = Command::new("ffmpeg")
|
let output = Command::new("ffmpeg")
|
||||||
.arg("-y")
|
.arg("-y")
|
||||||
.args(["-framerate", "4"])
|
.args(["-framerate", "4"])
|
||||||
@@ -278,10 +282,7 @@ pub async fn generate_preview_clip(input_file: &str, output_file: &str) -> Resul
|
|||||||
"select='lt(mod(t,{:.4}),1)',setpts=N/FRAME_RATE/TB,fps=30,scale=-2:480,format=yuv420p",
|
"select='lt(mod(t,{:.4}),1)',setpts=N/FRAME_RATE/TB,fps=30,scale=-2:480,format=yuv420p",
|
||||||
interval
|
interval
|
||||||
);
|
);
|
||||||
let af = format!(
|
let af = format!("aselect='lt(mod(t,{:.4}),1)',asetpts=N/SR/TB", interval);
|
||||||
"aselect='lt(mod(t,{:.4}),1)',asetpts=N/SR/TB",
|
|
||||||
interval
|
|
||||||
);
|
|
||||||
|
|
||||||
cmd.args(["-vf", &vf]);
|
cmd.args(["-vf", &vf]);
|
||||||
cmd.args(["-af", &af]);
|
cmd.args(["-af", &af]);
|
||||||
@@ -326,7 +327,10 @@ pub async fn generate_preview_clip(input_file: &str, output_file: &str) -> Resul
|
|||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Generated preview clip '{}' ({:.1}s, {} bytes) in {:?}",
|
"Generated preview clip '{}' ({:.1}s, {} bytes) in {:?}",
|
||||||
output_file, clip_duration, file_size, start.elapsed()
|
output_file,
|
||||||
|
clip_duration,
|
||||||
|
file_size,
|
||||||
|
start.elapsed()
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok((clip_duration, file_size))
|
Ok((clip_duration, file_size))
|
||||||
|
|||||||
Reference in New Issue
Block a user