diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index 2027fd2..67a7358 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -738,7 +738,11 @@ impl InsightGenerator { .map(|t| t.name) .collect() }; - log::info!("Fetched {} tags for photo: {:?}", tag_names.len(), tag_names); + log::info!( + "Fetched {} tags for photo: {:?}", + tag_names.len(), + tag_names + ); // 4. Get location name from GPS coordinates (needed for RAG query) let location = match exif { @@ -827,7 +831,10 @@ impl InsightGenerator { Some(desc) } Err(e) => { - log::warn!("Failed to generate photo description for RAG enrichment: {}", e); + log::warn!( + "Failed to generate photo description for RAG enrichment: {}", + e + ); None } } @@ -845,7 +852,11 @@ impl InsightGenerator { if !tag_names.is_empty() { parts.push(format!("tags: {}", tag_names.join(", "))); } - if parts.is_empty() { None } else { Some(parts.join(". ")) } + if parts.is_empty() { + None + } else { + Some(parts.join(". ")) + } }; let mut search_enrichment: Option = enriched_query.clone(); @@ -900,7 +911,11 @@ impl InsightGenerator { if !tag_names.is_empty() { parts.push(format!("tags: {}", tag_names.join(", "))); } - if parts.is_empty() { None } else { Some(parts.join(". ")) } + if parts.is_empty() { + None + } else { + Some(parts.join(". ")) + } }; // Step 3: Try historical RAG (>30 days ago) using extracted topics @@ -980,7 +995,14 @@ impl InsightGenerator { log::info!("No immediate messages found, trying basic RAG as fallback"); // Fallback to basic RAG even without strong query match self - .find_relevant_messages_rag(date_taken, None, contact.as_deref(), None, 20, enriched_query.as_deref()) + .find_relevant_messages_rag( + date_taken, + None, + contact.as_deref(), + None, + 20, + enriched_query.as_deref(), + ) .await { Ok(rag_messages) if !rag_messages.is_empty() => { @@ -1399,11 +1421,7 @@ Return ONLY the summary, nothing else."#, Ok(d) => d, Err(e) => return format!("Error: failed to parse date '{}': {}", date_str, e), }; - let timestamp = date - .and_hms_opt(12, 0, 0) - .unwrap() - .and_utc() - .timestamp(); + let timestamp = date.and_hms_opt(12, 0, 0).unwrap().and_utc().timestamp(); log::info!( "tool_get_sms_messages: date={}, contact='{}', days_radius={}", @@ -1442,7 +1460,11 @@ Return ONLY the summary, nothing else."#, format!("[{}] {}: {}", ts, sender, m.body) }) .collect(); - format!("Found {} messages:\n{}", messages.len(), formatted.join("\n")) + format!( + "Found {} messages:\n{}", + messages.len(), + formatted.join("\n") + ) } Ok(_) => "No messages found.".to_string(), Err(e) => format!("Error fetching SMS messages: {}", e), @@ -1468,11 +1490,7 @@ Return ONLY the summary, nothing else."#, Ok(d) => d, Err(e) => return format!("Error: failed to parse date '{}': {}", date_str, e), }; - let timestamp = date - .and_hms_opt(12, 0, 0) - .unwrap() - .and_utc() - .timestamp(); + let timestamp = date.and_hms_opt(12, 0, 0).unwrap().and_utc().timestamp(); log::info!( "tool_get_calendar_events: date={}, days_radius={}", @@ -1541,11 +1559,7 @@ Return ONLY the summary, nothing else."#, Ok(d) => d, Err(e) => return format!("Error: failed to parse date '{}': {}", date_str, e), }; - let timestamp = date - .and_hms_opt(12, 0, 0) - .unwrap() - .and_utc() - .timestamp(); + let timestamp = date.and_hms_opt(12, 0, 0).unwrap().and_utc().timestamp(); log::info!( "tool_get_location_history: date={}, days_radius={}", @@ -1805,12 +1819,10 @@ Return ONLY the summary, nothing else."#, // 2a. Verify the model exists on at least one server before checking capabilities if let Some(ref model_name) = custom_model { - let available_on_primary = OllamaClient::is_model_available( - &ollama_client.primary_url, - model_name, - ) - .await - .unwrap_or(false); + let available_on_primary = + OllamaClient::is_model_available(&ollama_client.primary_url, model_name) + .await + .unwrap_or(false); let available_on_fallback = if let Some(ref fallback_url) = ollama_client.fallback_url { OllamaClient::is_model_available(fallback_url, model_name) @@ -2002,28 +2014,28 @@ Return ONLY the summary, nothing else."#, messages.push(response.clone()); - if let Some(ref tool_calls) = response.tool_calls { - if !tool_calls.is_empty() { - for tool_call in tool_calls { - log::info!( - "Agentic tool call [{}]: {} {:?}", - iteration, - tool_call.function.name, - tool_call.function.arguments - ); - let result = self - .execute_tool( - &tool_call.function.name, - &tool_call.function.arguments, - &ollama_client, - &image_base64, - &loop_cx, - ) - .await; - messages.push(ChatMessage::tool_result(result)); - } - continue; + if let Some(ref tool_calls) = response.tool_calls + && !tool_calls.is_empty() + { + for tool_call in tool_calls { + log::info!( + "Agentic tool call [{}]: {} {:?}", + iteration, + tool_call.function.name, + tool_call.function.arguments + ); + let result = self + .execute_tool( + &tool_call.function.name, + &tool_call.function.arguments, + &ollama_client, + &image_base64, + &loop_cx, + ) + .await; + messages.push(ChatMessage::tool_result(result)); } + continue; } // No tool calls — this is the final answer @@ -2033,13 +2045,14 @@ Return ONLY the summary, nothing else."#, // If loop exhausted without final answer, ask for one if final_content.is_empty() { - log::info!("Agentic loop exhausted after {} iterations, requesting final answer", iterations_used); + log::info!( + "Agentic loop exhausted after {} iterations, requesting final answer", + iterations_used + ); messages.push(ChatMessage::user( "Based on the context gathered, please write the final photo insight: a title and a detailed personal summary. Write in first person as Cameron.", )); - let final_response = ollama_client - .chat_with_tools(messages, vec![]) - .await?; + let final_response = ollama_client.chat_with_tools(messages, vec![]).await?; final_content = final_response.content; } @@ -2179,18 +2192,29 @@ mod tests { Some("vacation, hiking, mountains".to_string()), ); assert!(result.contains("## Tags"), "Should include Tags section"); - assert!(result.contains("vacation, hiking, mountains"), "Should include tag names"); + assert!( + result.contains("vacation, hiking, mountains"), + "Should include tag names" + ); } #[test] fn combine_contexts_omits_tags_section_when_no_tags() { let result = InsightGenerator::combine_contexts( Some("some messages".to_string()), - None, None, None, + None, + None, + None, None, // no tags ); - assert!(!result.contains("## Tags"), "Should not include Tags section when None"); - assert!(result.contains("## Messages"), "Should still include Messages"); + assert!( + !result.contains("## Tags"), + "Should not include Tags section when None" + ); + assert!( + result.contains("## Messages"), + "Should still include Messages" + ); } #[test] diff --git a/src/ai/ollama.rs b/src/ai/ollama.rs index 481b716..38a0c6c 100644 --- a/src/ai/ollama.rs +++ b/src/ai/ollama.rs @@ -528,10 +528,8 @@ Analyze the image and use specific details from both the visual content and the // Try fallback server if available if let Some(fallback_url) = &self.fallback_url { - let fallback_model = self - .fallback_model - .as_ref() - .unwrap_or(&self.primary_model); + let fallback_model = + self.fallback_model.as_ref().unwrap_or(&self.primary_model); log::info!( "Attempting chat_with_tools with fallback server: {} (model: {})", @@ -578,7 +576,9 @@ Analyze the image and use specific details from both the visual content and the let model = if base_url == self.primary_url { &self.primary_model } else { - self.fallback_model.as_deref().unwrap_or(&self.primary_model) + self.fallback_model + .as_deref() + .unwrap_or(&self.primary_model) }; let options = self.num_ctx.map(|ctx| OllamaOptions { num_ctx: Some(ctx) }); @@ -602,7 +602,11 @@ Analyze the image and use specific details from both the visual content and the if !response.status().is_success() { let status = response.status(); let body = response.text().await.unwrap_or_default(); - anyhow::bail!("Ollama chat request failed with status {}: {}", status, body); + anyhow::bail!( + "Ollama chat request failed with status {}: {}", + status, + body + ); } let chat_response: OllamaChatResponse = response @@ -804,13 +808,28 @@ pub struct ChatMessage { impl ChatMessage { pub fn system(content: impl Into) -> Self { - Self { role: "system".to_string(), content: content.into(), tool_calls: None, images: None } + Self { + role: "system".to_string(), + content: content.into(), + tool_calls: None, + images: None, + } } pub fn user(content: impl Into) -> Self { - Self { role: "user".to_string(), content: content.into(), tool_calls: None, images: None } + Self { + role: "user".to_string(), + content: content.into(), + tool_calls: None, + images: None, + } } pub fn tool_result(content: impl Into) -> Self { - Self { role: "tool".to_string(), content: content.into(), tool_calls: None, images: None } + Self { + role: "tool".to_string(), + content: content.into(), + tool_calls: None, + images: None, + } } } diff --git a/src/database/preview_dao.rs b/src/database/preview_dao.rs index 7ff7618..fe90f4d 100644 --- a/src/database/preview_dao.rs +++ b/src/database/preview_dao.rs @@ -4,7 +4,7 @@ use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use crate::database::models::{InsertVideoPreviewClip, VideoPreviewClip}; -use crate::database::{connect, DbError, DbErrorKind}; +use crate::database::{DbError, DbErrorKind, connect}; use crate::otel::trace_db_call; pub trait PreviewDao: Sync + Send { @@ -232,10 +232,7 @@ mod tests { .unwrap(); // Status should remain "pending" from the first insert - let clip = dao - .get_preview(&ctx, "photos/video.mp4") - .unwrap() - .unwrap(); + let clip = dao.get_preview(&ctx, "photos/video.mp4").unwrap().unwrap(); assert_eq!(clip.status, "pending"); } @@ -256,10 +253,7 @@ mod tests { ) .unwrap(); - let clip = dao - .get_preview(&ctx, "photos/video.mp4") - .unwrap() - .unwrap(); + let clip = dao.get_preview(&ctx, "photos/video.mp4").unwrap().unwrap(); assert_eq!(clip.status, "complete"); assert_eq!(clip.duration_seconds, Some(9.5)); assert_eq!(clip.file_size_bytes, Some(1024000)); @@ -283,10 +277,7 @@ mod tests { ) .unwrap(); - let clip = dao - .get_preview(&ctx, "photos/video.mp4") - .unwrap() - .unwrap(); + let clip = dao.get_preview(&ctx, "photos/video.mp4").unwrap().unwrap(); assert_eq!(clip.status, "failed"); assert_eq!( clip.error_message.as_deref(), diff --git a/src/files.rs b/src/files.rs index a7355fb..6f3adeb 100644 --- a/src/files.rs +++ b/src/files.rs @@ -1490,7 +1490,8 @@ mod tests { let request: Query = Query::from_query("path=&tag_ids=1,3&recursive=true").unwrap(); - let mut tag_dao = SqliteTagDao::new(std::sync::Arc::new(Mutex::new(in_memory_db_connection()))); + let mut tag_dao = + SqliteTagDao::new(std::sync::Arc::new(Mutex::new(in_memory_db_connection()))); let tag1 = tag_dao .create_tag(&opentelemetry::Context::current(), "tag1") @@ -1536,7 +1537,8 @@ mod tests { exp: 12345, }; - let mut tag_dao = SqliteTagDao::new(std::sync::Arc::new(Mutex::new(in_memory_db_connection()))); + let mut tag_dao = + SqliteTagDao::new(std::sync::Arc::new(Mutex::new(in_memory_db_connection()))); let tag1 = tag_dao .create_tag(&opentelemetry::Context::current(), "tag1") diff --git a/src/main.rs b/src/main.rs index 14ca74c..b56d43c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -600,8 +600,7 @@ async fn get_video_preview( Some(path) => path, None => { span.set_status(Status::error("Invalid path")); - return HttpResponse::BadRequest() - .json(serde_json::json!({"error": "Invalid path"})); + return HttpResponse::BadRequest().json(serde_json::json!({"error": "Invalid path"})); } }; @@ -634,8 +633,7 @@ async fn get_video_preview( } Err(_) => { // File missing on disk but DB says complete - reset and regenerate - let mut dao = - preview_dao.lock().expect("Unable to lock PreviewDao"); + let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); let _ = dao.update_status( &context, &relative_path, @@ -665,12 +663,10 @@ async fn get_video_preview( })) } "failed" => { - let error_msg = - clip.error_message.unwrap_or_else(|| "Unknown error".to_string()); - span.set_status(Status::error(format!( - "Generation failed: {}", - error_msg - ))); + let error_msg = clip + .error_message + .unwrap_or_else(|| "Unknown error".to_string()); + span.set_status(Status::error(format!("Generation failed: {}", error_msg))); HttpResponse::InternalServerError().json(serde_json::json!({ "error": format!("Generation failed: {}", error_msg) })) @@ -708,8 +704,7 @@ async fn get_video_preview( } Err(_) => { span.set_status(Status::error("Database error")); - HttpResponse::InternalServerError() - .json(serde_json::json!({"error": "Database error"})) + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } @@ -768,10 +763,7 @@ async fn get_preview_status( path: path.clone(), status: clip.status.clone(), preview_url: if clip.status == "complete" { - Some(format!( - "/video/preview?path={}", - urlencoding::encode(path) - )) + Some(format!("/video/preview?path={}", urlencoding::encode(path))) } else { None }, @@ -810,8 +802,7 @@ async fn get_preview_status( } Err(_) => { span.set_status(Status::error("Database error")); - HttpResponse::InternalServerError() - .json(serde_json::json!({"error": "Database error"})) + HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"})) } } } @@ -1213,21 +1204,18 @@ fn main() -> std::io::Result<()> { .app_data::>>>(Data::new(Mutex::new(Box::new( preview_dao, )))) - .app_data( - web::JsonConfig::default() - .error_handler(|err, req| { - let detail = err.to_string(); - log::warn!( - "JSON parse error on {} {}: {}", - req.method(), - req.uri(), - detail - ); - let response = HttpResponse::BadRequest() - .json(serde_json::json!({"error": detail})); - actix_web::error::InternalError::from_response(err, response).into() - }), - ) + .app_data(web::JsonConfig::default().error_handler(|err, req| { + let detail = err.to_string(); + log::warn!( + "JSON parse error on {} {}: {}", + req.method(), + req.uri(), + detail + ); + let response = + HttpResponse::BadRequest().json(serde_json::json!({"error": detail})); + actix_web::error::InternalError::from_response(err, response).into() + })) .app_data::>(Data::new(app_data.insight_generator.clone())) .wrap(prometheus.clone()) }) @@ -1765,9 +1753,7 @@ mod tests { // Verify the DAO now has a pending record let mut dao_lock = preview_dao.lock().unwrap(); let ctx = opentelemetry::Context::new(); - let clip = dao_lock - .get_preview(&ctx, "photos/new_video.mp4") - .unwrap(); + let clip = dao_lock.get_preview(&ctx, "photos/new_video.mp4").unwrap(); assert!(clip.is_some()); assert_eq!(clip.unwrap().status, "pending"); } @@ -1778,8 +1764,15 @@ mod tests { let ctx = opentelemetry::Context::new(); dao.insert_preview(&ctx, "photos/done.mp4", "pending") .unwrap(); - dao.update_status(&ctx, "photos/done.mp4", "complete", Some(9.5), Some(500000), None) - .unwrap(); + dao.update_status( + &ctx, + "photos/done.mp4", + "complete", + Some(9.5), + Some(500000), + None, + ) + .unwrap(); let preview_dao = make_preview_dao(dao); let app_state = Data::new(AppState::test_state()); @@ -1806,7 +1799,12 @@ mod tests { let previews = body["previews"].as_array().unwrap(); assert_eq!(previews.len(), 1); assert_eq!(previews[0]["status"], "complete"); - assert!(previews[0]["preview_url"].as_str().unwrap().contains("photos%2Fdone.mp4")); + assert!( + previews[0]["preview_url"] + .as_str() + .unwrap() + .contains("photos%2Fdone.mp4") + ); } #[actix_rt::test] diff --git a/src/state.rs b/src/state.rs index 578ea78..4000704 100644 --- a/src/state.rs +++ b/src/state.rs @@ -46,11 +46,8 @@ impl AppState { let video_playlist_manager = VideoPlaylistManager::new(video_path.clone(), playlist_generator.start()); - let preview_clip_generator = PreviewClipGenerator::new( - preview_clips_path.clone(), - base_path.clone(), - preview_dao, - ); + let preview_clip_generator = + PreviewClipGenerator::new(preview_clips_path.clone(), base_path.clone(), preview_dao); Self { stream_manager, @@ -141,9 +138,10 @@ impl Default for AppState { ); // Ensure preview clips directory exists - let preview_clips_path = env::var("PREVIEW_CLIPS_DIRECTORY") - .unwrap_or_else(|_| "preview_clips".to_string()); - std::fs::create_dir_all(&preview_clips_path).expect("Failed to create PREVIEW_CLIPS_DIRECTORY"); + let preview_clips_path = + env::var("PREVIEW_CLIPS_DIRECTORY").unwrap_or_else(|_| "preview_clips".to_string()); + std::fs::create_dir_all(&preview_clips_path) + .expect("Failed to create PREVIEW_CLIPS_DIRECTORY"); Self::new( Arc::new(StreamActor {}.start()), diff --git a/src/video/actors.rs b/src/video/actors.rs index 936e39f..e90bbe1 100644 --- a/src/video/actors.rs +++ b/src/video/actors.rs @@ -159,19 +159,21 @@ async fn get_video_rotation(video_path: &str) -> i32 { .await; if let Ok(output) = output - && output.status.success() { - let rotation_str = String::from_utf8_lossy(&output.stdout); - let rotation_str = rotation_str.trim(); - if !rotation_str.is_empty() - && let Ok(rotation) = rotation_str.parse::() - && rotation != 0 { - debug!( - "Detected rotation {}° from stream tag for {}", - rotation, video_path - ); - return rotation; - } + && output.status.success() + { + let rotation_str = String::from_utf8_lossy(&output.stdout); + let rotation_str = rotation_str.trim(); + if !rotation_str.is_empty() + && let Ok(rotation) = rotation_str.parse::() + && rotation != 0 + { + debug!( + "Detected rotation {}° from stream tag for {}", + rotation, video_path + ); + return rotation; } + } // Check display matrix side data (modern videos, e.g. iPhone) let output = tokio::process::Command::new("ffprobe") @@ -188,21 +190,23 @@ async fn get_video_rotation(video_path: &str) -> i32 { .await; if let Ok(output) = output - && output.status.success() { - let rotation_str = String::from_utf8_lossy(&output.stdout); - let rotation_str = rotation_str.trim(); - if !rotation_str.is_empty() - && let Ok(rotation) = rotation_str.parse::() { - let rotation = rotation.abs() as i32; - if rotation != 0 { - debug!( - "Detected rotation {}° from display matrix for {}", - rotation, video_path - ); - return rotation; - } - } + && output.status.success() + { + let rotation_str = String::from_utf8_lossy(&output.stdout); + let rotation_str = rotation_str.trim(); + if !rotation_str.is_empty() + && let Ok(rotation) = rotation_str.parse::() + { + let rotation = rotation.abs() as i32; + if rotation != 0 { + debug!( + "Detected rotation {}° from display matrix for {}", + rotation, video_path + ); + return rotation; + } } + } 0 } @@ -550,7 +554,8 @@ impl Handler for PreviewClipGenerator { { let otel_ctx = opentelemetry::Context::current(); let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao"); - let _ = dao.update_status(&otel_ctx, &relative_path, "processing", None, None, None); + let _ = + dao.update_status(&otel_ctx, &relative_path, "processing", None, None, None); } // Compute output path: join preview_clips_dir with relative path, change ext to .mp4 diff --git a/src/video/ffmpeg.rs b/src/video/ffmpeg.rs index ddb35d5..b40b175 100644 --- a/src/video/ffmpeg.rs +++ b/src/video/ffmpeg.rs @@ -183,7 +183,11 @@ impl Ffmpeg { Ok(output_file.to_string()) } - pub async fn create_gif_from_frames(&self, frame_base_dir: &str, output_file: &str) -> Result { + pub async fn create_gif_from_frames( + &self, + frame_base_dir: &str, + output_file: &str, + ) -> Result { let output = Command::new("ffmpeg") .arg("-y") .args(["-framerate", "4"]) @@ -278,10 +282,7 @@ pub async fn generate_preview_clip(input_file: &str, output_file: &str) -> Resul "select='lt(mod(t,{:.4}),1)',setpts=N/FRAME_RATE/TB,fps=30,scale=-2:480,format=yuv420p", interval ); - let af = format!( - "aselect='lt(mod(t,{:.4}),1)',asetpts=N/SR/TB", - interval - ); + let af = format!("aselect='lt(mod(t,{:.4}),1)',asetpts=N/SR/TB", interval); cmd.args(["-vf", &vf]); cmd.args(["-af", &af]); @@ -326,7 +327,10 @@ pub async fn generate_preview_clip(input_file: &str, output_file: &str) -> Resul info!( "Generated preview clip '{}' ({:.1}s, {} bytes) in {:?}", - output_file, clip_duration, file_size, start.elapsed() + output_file, + clip_duration, + file_size, + start.elapsed() ); Ok((clip_duration, file_size))