Compare commits
13 Commits
master
...
002-agenti
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
54a49a8562 | ||
|
|
c1b6013412 | ||
|
|
5c9f5c7d0b | ||
|
|
091327e5d9 | ||
|
|
7615b9c99b | ||
|
|
5e5a2a3167 | ||
|
|
8196ef94a0 | ||
|
|
e58b8fe743 | ||
|
|
c0d27d0b9e | ||
|
|
8ecd3c6cf8 | ||
|
|
387ce23afd | ||
|
|
b31b4b903c | ||
|
|
dd0715c081 |
@@ -55,6 +55,11 @@ The following environment variables configure AI-powered photo insights and dail
|
||||
- Used to fetch conversation data for context in insights
|
||||
- `SMS_API_TOKEN` - Authentication token for SMS API (optional)
|
||||
|
||||
#### Agentic Insight Generation
|
||||
- `AGENTIC_MAX_ITERATIONS` - Maximum tool-call iterations per agentic insight request [default: `10`]
|
||||
- Controls how many times the model can invoke tools before being forced to produce a final answer
|
||||
- Increase for more thorough context gathering; decrease to limit response time
|
||||
|
||||
#### Fallback Behavior
|
||||
- Primary server is tried first with 5-second connection timeout
|
||||
- On failure, automatically falls back to secondary server (if configured)
|
||||
|
||||
@@ -211,6 +211,112 @@ pub async fn get_all_insights_handler(
|
||||
}
|
||||
}
|
||||
|
||||
/// POST /insights/generate/agentic - Generate insight using agentic tool-calling loop
|
||||
#[post("/insights/generate/agentic")]
|
||||
pub async fn generate_agentic_insight_handler(
|
||||
http_request: HttpRequest,
|
||||
_claims: Claims,
|
||||
request: web::Json<GeneratePhotoInsightRequest>,
|
||||
insight_generator: web::Data<InsightGenerator>,
|
||||
insight_dao: web::Data<std::sync::Mutex<Box<dyn InsightDao>>>,
|
||||
) -> impl Responder {
|
||||
let parent_context = extract_context_from_request(&http_request);
|
||||
let tracer = global_tracer();
|
||||
let mut span = tracer.start_with_context("http.insights.generate_agentic", &parent_context);
|
||||
|
||||
let normalized_path = normalize_path(&request.file_path);
|
||||
|
||||
span.set_attribute(KeyValue::new("file_path", normalized_path.clone()));
|
||||
if let Some(ref model) = request.model {
|
||||
span.set_attribute(KeyValue::new("model", model.clone()));
|
||||
}
|
||||
if let Some(ref prompt) = request.system_prompt {
|
||||
span.set_attribute(KeyValue::new("has_custom_prompt", true));
|
||||
span.set_attribute(KeyValue::new("prompt_length", prompt.len() as i64));
|
||||
}
|
||||
if let Some(ctx) = request.num_ctx {
|
||||
span.set_attribute(KeyValue::new("num_ctx", ctx as i64));
|
||||
}
|
||||
|
||||
let max_iterations: usize = std::env::var("AGENTIC_MAX_ITERATIONS")
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(10);
|
||||
|
||||
span.set_attribute(KeyValue::new("max_iterations", max_iterations as i64));
|
||||
|
||||
log::info!(
|
||||
"Agentic insight generation triggered for photo: {} with model: {:?}, max_iterations: {}",
|
||||
normalized_path,
|
||||
request.model,
|
||||
max_iterations
|
||||
);
|
||||
|
||||
let result = insight_generator
|
||||
.generate_agentic_insight_for_photo(
|
||||
&normalized_path,
|
||||
request.model.clone(),
|
||||
request.system_prompt.clone(),
|
||||
request.num_ctx,
|
||||
max_iterations,
|
||||
)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(()) => {
|
||||
span.set_status(Status::Ok);
|
||||
// Fetch the stored insight to return it
|
||||
let otel_context = opentelemetry::Context::new();
|
||||
let mut dao = insight_dao.lock().expect("Unable to lock InsightDao");
|
||||
match dao.get_insight(&otel_context, &normalized_path) {
|
||||
Ok(Some(insight)) => {
|
||||
let response = PhotoInsightResponse {
|
||||
id: insight.id,
|
||||
file_path: insight.file_path,
|
||||
title: insight.title,
|
||||
summary: insight.summary,
|
||||
generated_at: insight.generated_at,
|
||||
model_version: insight.model_version,
|
||||
};
|
||||
HttpResponse::Ok().json(response)
|
||||
}
|
||||
Ok(None) => HttpResponse::Ok().json(serde_json::json!({
|
||||
"success": true,
|
||||
"message": "Agentic insight generated successfully"
|
||||
})),
|
||||
Err(e) => {
|
||||
log::warn!("Insight stored but failed to retrieve: {:?}", e);
|
||||
HttpResponse::Ok().json(serde_json::json!({
|
||||
"success": true,
|
||||
"message": "Agentic insight generated successfully"
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let error_msg = format!("{:?}", e);
|
||||
log::error!("Failed to generate agentic insight: {}", error_msg);
|
||||
span.set_status(Status::error(error_msg.clone()));
|
||||
|
||||
if error_msg.contains("tool calling not supported")
|
||||
|| error_msg.contains("model not available")
|
||||
{
|
||||
HttpResponse::BadRequest().json(serde_json::json!({
|
||||
"error": format!("Failed to generate agentic insight: {}", error_msg)
|
||||
}))
|
||||
} else if error_msg.contains("error parsing tool call") {
|
||||
HttpResponse::BadRequest().json(serde_json::json!({
|
||||
"error": "Model is not compatible with Ollama's tool calling protocol. Try a model known to support native tool calling (e.g. llama3.1, llama3.2, qwen2.5, mistral-nemo)."
|
||||
}))
|
||||
} else {
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({
|
||||
"error": format!("Failed to generate agentic insight: {}", error_msg)
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// GET /insights/models - List available models from both servers with capabilities
|
||||
#[get("/insights/models")]
|
||||
pub async fn get_available_models_handler(
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -8,8 +8,8 @@ pub mod sms_client;
|
||||
#[allow(unused_imports)]
|
||||
pub use daily_summary_job::{generate_daily_summaries, strip_summary_boilerplate};
|
||||
pub use handlers::{
|
||||
delete_insight_handler, generate_insight_handler, get_all_insights_handler,
|
||||
get_available_models_handler, get_insight_handler,
|
||||
delete_insight_handler, generate_agentic_insight_handler, generate_insight_handler,
|
||||
get_all_insights_handler, get_available_models_handler, get_insight_handler,
|
||||
};
|
||||
pub use insight_generator::InsightGenerator;
|
||||
pub use ollama::{ModelCapabilities, OllamaClient};
|
||||
|
||||
275
src/ai/ollama.rs
275
src/ai/ollama.rs
@@ -1,4 +1,4 @@
|
||||
use anyhow::Result;
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::NaiveDate;
|
||||
use reqwest::Client;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -176,10 +176,13 @@ impl OllamaClient {
|
||||
|
||||
// Check if "vision" is in the capabilities array
|
||||
let has_vision = show_response.capabilities.iter().any(|cap| cap == "vision");
|
||||
// Check if "tools" is in the capabilities array
|
||||
let has_tool_calling = show_response.capabilities.iter().any(|cap| cap == "tools");
|
||||
|
||||
Ok(ModelCapabilities {
|
||||
name: model_name.to_string(),
|
||||
has_vision,
|
||||
has_tool_calling,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -206,10 +209,11 @@ impl OllamaClient {
|
||||
Ok(cap) => capabilities.push(cap),
|
||||
Err(e) => {
|
||||
log::warn!("Failed to get capabilities for model {}: {}", model_name, e);
|
||||
// Fallback: assume no vision if we can't check
|
||||
// Fallback: assume no vision/tools if we can't check
|
||||
capabilities.push(ModelCapabilities {
|
||||
name: model_name,
|
||||
has_vision: false,
|
||||
has_tool_calling: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -254,7 +258,7 @@ impl OllamaClient {
|
||||
prompt: prompt.to_string(),
|
||||
stream: false,
|
||||
system: system.map(|s| s.to_string()),
|
||||
options: self.num_ctx.map(|ctx| OllamaOptions { num_ctx: ctx }),
|
||||
options: self.num_ctx.map(|ctx| OllamaOptions { num_ctx: Some(ctx) }),
|
||||
images,
|
||||
};
|
||||
|
||||
@@ -480,6 +484,148 @@ Analyze the image and use specific details from both the visual content and the
|
||||
.await
|
||||
}
|
||||
|
||||
/// Generate a brief visual description of a photo for use in RAG query enrichment.
|
||||
/// Returns 1-2 sentences describing people, location, and activity visible in the image.
|
||||
/// Only called when the model has vision capabilities.
|
||||
pub async fn generate_photo_description(&self, image_base64: &str) -> Result<String> {
|
||||
let prompt = "Briefly describe what you see in this image in 1-2 sentences. \
|
||||
Focus on the people, location, and activity.";
|
||||
let system = "You are a scene description assistant. Be concise and factual.";
|
||||
let images = vec![image_base64.to_string()];
|
||||
|
||||
let description = self
|
||||
.generate_with_images(prompt, Some(system), Some(images))
|
||||
.await?;
|
||||
|
||||
Ok(description.trim().to_string())
|
||||
}
|
||||
|
||||
/// Send a chat request with tool definitions to /api/chat.
|
||||
/// Returns the assistant's response message (may contain tool_calls or final content).
|
||||
/// Uses primary/fallback URL routing same as other generation methods.
|
||||
pub async fn chat_with_tools(
|
||||
&self,
|
||||
messages: Vec<ChatMessage>,
|
||||
tools: Vec<Tool>,
|
||||
) -> Result<ChatMessage> {
|
||||
// Try primary server first
|
||||
log::info!(
|
||||
"Attempting chat_with_tools with primary server: {} (model: {})",
|
||||
self.primary_url,
|
||||
self.primary_model
|
||||
);
|
||||
let primary_result = self
|
||||
.try_chat_with_tools(&self.primary_url, messages.clone(), tools.clone())
|
||||
.await;
|
||||
|
||||
match primary_result {
|
||||
Ok(response) => {
|
||||
log::info!("Successfully got chat_with_tools response from primary server");
|
||||
Ok(response)
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Primary server chat_with_tools failed: {}", e);
|
||||
|
||||
// Try fallback server if available
|
||||
if let Some(fallback_url) = &self.fallback_url {
|
||||
let fallback_model =
|
||||
self.fallback_model.as_ref().unwrap_or(&self.primary_model);
|
||||
|
||||
log::info!(
|
||||
"Attempting chat_with_tools with fallback server: {} (model: {})",
|
||||
fallback_url,
|
||||
fallback_model
|
||||
);
|
||||
match self
|
||||
.try_chat_with_tools(fallback_url, messages, tools)
|
||||
.await
|
||||
{
|
||||
Ok(response) => {
|
||||
log::info!(
|
||||
"Successfully got chat_with_tools response from fallback server"
|
||||
);
|
||||
Ok(response)
|
||||
}
|
||||
Err(fallback_e) => {
|
||||
log::error!(
|
||||
"Fallback server chat_with_tools also failed: {}",
|
||||
fallback_e
|
||||
);
|
||||
Err(anyhow::anyhow!(
|
||||
"Both primary and fallback servers failed. Primary: {}, Fallback: {}",
|
||||
e,
|
||||
fallback_e
|
||||
))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log::error!("No fallback server configured");
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn try_chat_with_tools(
|
||||
&self,
|
||||
base_url: &str,
|
||||
messages: Vec<ChatMessage>,
|
||||
tools: Vec<Tool>,
|
||||
) -> Result<ChatMessage> {
|
||||
let url = format!("{}/api/chat", base_url);
|
||||
let model = if base_url == self.primary_url {
|
||||
&self.primary_model
|
||||
} else {
|
||||
self.fallback_model
|
||||
.as_deref()
|
||||
.unwrap_or(&self.primary_model)
|
||||
};
|
||||
|
||||
let options = self.num_ctx.map(|ctx| OllamaOptions { num_ctx: Some(ctx) });
|
||||
|
||||
let request_body = OllamaChatRequest {
|
||||
model,
|
||||
messages: &messages,
|
||||
stream: false,
|
||||
tools,
|
||||
options,
|
||||
};
|
||||
|
||||
let request_json = serde_json::to_string(&request_body)
|
||||
.unwrap_or_else(|e| format!("<serialization error: {}>", e));
|
||||
log::debug!("chat_with_tools request body: {}", request_json);
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.post(&url)
|
||||
.json(&request_body)
|
||||
.send()
|
||||
.await
|
||||
.with_context(|| format!("Failed to connect to Ollama at {}", url))?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
log::error!(
|
||||
"chat_with_tools request body that caused {}: {}",
|
||||
status,
|
||||
request_json
|
||||
);
|
||||
anyhow::bail!(
|
||||
"Ollama chat request failed with status {}: {}",
|
||||
status,
|
||||
body
|
||||
);
|
||||
}
|
||||
|
||||
let chat_response: OllamaChatResponse = response
|
||||
.json()
|
||||
.await
|
||||
.with_context(|| "Failed to parse Ollama chat response")?;
|
||||
|
||||
Ok(chat_response.message)
|
||||
}
|
||||
|
||||
/// Generate an embedding vector for text using nomic-embed-text:v1.5
|
||||
/// Returns a 768-dimensional vector as Vec<f32>
|
||||
pub async fn generate_embedding(&self, text: &str) -> Result<Vec<f32>> {
|
||||
@@ -624,7 +770,112 @@ struct OllamaRequest {
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct OllamaOptions {
|
||||
num_ctx: i32,
|
||||
num_ctx: Option<i32>,
|
||||
}
|
||||
|
||||
/// Tool definition sent in /api/chat requests (OpenAI-compatible format)
|
||||
#[derive(Serialize, Clone, Debug)]
|
||||
pub struct Tool {
|
||||
#[serde(rename = "type")]
|
||||
pub tool_type: String, // always "function"
|
||||
pub function: ToolFunction,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone, Debug)]
|
||||
pub struct ToolFunction {
|
||||
pub name: String,
|
||||
pub description: String,
|
||||
pub parameters: serde_json::Value,
|
||||
}
|
||||
|
||||
impl Tool {
|
||||
pub fn function(name: &str, description: &str, parameters: serde_json::Value) -> Self {
|
||||
Self {
|
||||
tool_type: "function".to_string(),
|
||||
function: ToolFunction {
|
||||
name: name.to_string(),
|
||||
description: description.to_string(),
|
||||
parameters,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A message in the chat conversation history
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct ChatMessage {
|
||||
pub role: String, // "system" | "user" | "assistant" | "tool"
|
||||
/// Empty string (not null) when tool_calls is present — Ollama quirk
|
||||
#[serde(default)]
|
||||
pub content: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub tool_calls: Option<Vec<ToolCall>>,
|
||||
/// Base64 images — only on user messages to vision-capable models
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub images: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
impl ChatMessage {
|
||||
pub fn system(content: impl Into<String>) -> Self {
|
||||
Self {
|
||||
role: "system".to_string(),
|
||||
content: content.into(),
|
||||
tool_calls: None,
|
||||
images: None,
|
||||
}
|
||||
}
|
||||
pub fn user(content: impl Into<String>) -> Self {
|
||||
Self {
|
||||
role: "user".to_string(),
|
||||
content: content.into(),
|
||||
tool_calls: None,
|
||||
images: None,
|
||||
}
|
||||
}
|
||||
pub fn tool_result(content: impl Into<String>) -> Self {
|
||||
Self {
|
||||
role: "tool".to_string(),
|
||||
content: content.into(),
|
||||
tool_calls: None,
|
||||
images: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tool call returned by the model in an assistant message
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct ToolCall {
|
||||
pub function: ToolCallFunction,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct ToolCallFunction {
|
||||
pub name: String,
|
||||
/// Native JSON object (NOT a JSON-encoded string like OpenAI)
|
||||
pub arguments: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct OllamaChatRequest<'a> {
|
||||
model: &'a str,
|
||||
messages: &'a [ChatMessage],
|
||||
stream: bool,
|
||||
#[serde(skip_serializing_if = "Vec::is_empty")]
|
||||
tools: Vec<Tool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
options: Option<OllamaOptions>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct OllamaChatResponse {
|
||||
message: ChatMessage,
|
||||
#[allow(dead_code)]
|
||||
done: bool,
|
||||
#[serde(default)]
|
||||
#[allow(dead_code)]
|
||||
done_reason: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@@ -652,6 +903,7 @@ struct OllamaShowResponse {
|
||||
pub struct ModelCapabilities {
|
||||
pub name: String,
|
||||
pub has_vision: bool,
|
||||
pub has_tool_calling: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
@@ -664,3 +916,18 @@ struct OllamaBatchEmbedRequest {
|
||||
struct OllamaEmbedResponse {
|
||||
embeddings: Vec<Vec<f32>>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn generate_photo_description_prompt_is_concise() {
|
||||
// Verify the method exists and its prompt is sane by checking the
|
||||
// constant we'll use. This is a compile + smoke check; actual LLM
|
||||
// calls are integration-tested manually.
|
||||
let prompt = "Briefly describe what you see in this image in 1-2 sentences. \
|
||||
Focus on the people, location, and activity.";
|
||||
assert!(prompt.len() < 200, "Prompt should be concise");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::ops::DerefMut;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use crate::database::models::{InsertVideoPreviewClip, VideoPreviewClip};
|
||||
use crate::database::{connect, DbError, DbErrorKind};
|
||||
use crate::database::{DbError, DbErrorKind, connect};
|
||||
use crate::otel::trace_db_call;
|
||||
|
||||
pub trait PreviewDao: Sync + Send {
|
||||
@@ -232,10 +232,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
// Status should remain "pending" from the first insert
|
||||
let clip = dao
|
||||
.get_preview(&ctx, "photos/video.mp4")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let clip = dao.get_preview(&ctx, "photos/video.mp4").unwrap().unwrap();
|
||||
assert_eq!(clip.status, "pending");
|
||||
}
|
||||
|
||||
@@ -256,10 +253,7 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let clip = dao
|
||||
.get_preview(&ctx, "photos/video.mp4")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let clip = dao.get_preview(&ctx, "photos/video.mp4").unwrap().unwrap();
|
||||
assert_eq!(clip.status, "complete");
|
||||
assert_eq!(clip.duration_seconds, Some(9.5));
|
||||
assert_eq!(clip.file_size_bytes, Some(1024000));
|
||||
@@ -283,10 +277,7 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let clip = dao
|
||||
.get_preview(&ctx, "photos/video.mp4")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let clip = dao.get_preview(&ctx, "photos/video.mp4").unwrap().unwrap();
|
||||
assert_eq!(clip.status, "failed");
|
||||
assert_eq!(
|
||||
clip.error_message.as_deref(),
|
||||
|
||||
@@ -1490,7 +1490,8 @@ mod tests {
|
||||
let request: Query<FilesRequest> =
|
||||
Query::from_query("path=&tag_ids=1,3&recursive=true").unwrap();
|
||||
|
||||
let mut tag_dao = SqliteTagDao::new(in_memory_db_connection());
|
||||
let mut tag_dao =
|
||||
SqliteTagDao::new(std::sync::Arc::new(Mutex::new(in_memory_db_connection())));
|
||||
|
||||
let tag1 = tag_dao
|
||||
.create_tag(&opentelemetry::Context::current(), "tag1")
|
||||
@@ -1536,7 +1537,8 @@ mod tests {
|
||||
exp: 12345,
|
||||
};
|
||||
|
||||
let mut tag_dao = SqliteTagDao::new(in_memory_db_connection());
|
||||
let mut tag_dao =
|
||||
SqliteTagDao::new(std::sync::Arc::new(Mutex::new(in_memory_db_connection())));
|
||||
|
||||
let tag1 = tag_dao
|
||||
.create_tag(&opentelemetry::Context::current(), "tag1")
|
||||
|
||||
77
src/main.rs
77
src/main.rs
@@ -600,8 +600,7 @@ async fn get_video_preview(
|
||||
Some(path) => path,
|
||||
None => {
|
||||
span.set_status(Status::error("Invalid path"));
|
||||
return HttpResponse::BadRequest()
|
||||
.json(serde_json::json!({"error": "Invalid path"}));
|
||||
return HttpResponse::BadRequest().json(serde_json::json!({"error": "Invalid path"}));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -634,8 +633,7 @@ async fn get_video_preview(
|
||||
}
|
||||
Err(_) => {
|
||||
// File missing on disk but DB says complete - reset and regenerate
|
||||
let mut dao =
|
||||
preview_dao.lock().expect("Unable to lock PreviewDao");
|
||||
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
|
||||
let _ = dao.update_status(
|
||||
&context,
|
||||
&relative_path,
|
||||
@@ -665,12 +663,10 @@ async fn get_video_preview(
|
||||
}))
|
||||
}
|
||||
"failed" => {
|
||||
let error_msg =
|
||||
clip.error_message.unwrap_or_else(|| "Unknown error".to_string());
|
||||
span.set_status(Status::error(format!(
|
||||
"Generation failed: {}",
|
||||
error_msg
|
||||
)));
|
||||
let error_msg = clip
|
||||
.error_message
|
||||
.unwrap_or_else(|| "Unknown error".to_string());
|
||||
span.set_status(Status::error(format!("Generation failed: {}", error_msg)));
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({
|
||||
"error": format!("Generation failed: {}", error_msg)
|
||||
}))
|
||||
@@ -708,8 +704,7 @@ async fn get_video_preview(
|
||||
}
|
||||
Err(_) => {
|
||||
span.set_status(Status::error("Database error"));
|
||||
HttpResponse::InternalServerError()
|
||||
.json(serde_json::json!({"error": "Database error"}))
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -768,10 +763,7 @@ async fn get_preview_status(
|
||||
path: path.clone(),
|
||||
status: clip.status.clone(),
|
||||
preview_url: if clip.status == "complete" {
|
||||
Some(format!(
|
||||
"/video/preview?path={}",
|
||||
urlencoding::encode(path)
|
||||
))
|
||||
Some(format!("/video/preview?path={}", urlencoding::encode(path)))
|
||||
} else {
|
||||
None
|
||||
},
|
||||
@@ -810,8 +802,7 @@ async fn get_preview_status(
|
||||
}
|
||||
Err(_) => {
|
||||
span.set_status(Status::error("Database error"));
|
||||
HttpResponse::InternalServerError()
|
||||
.json(serde_json::json!({"error": "Database error"}))
|
||||
HttpResponse::InternalServerError().json(serde_json::json!({"error": "Database error"}))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1189,6 +1180,7 @@ fn main() -> std::io::Result<()> {
|
||||
.service(get_file_metadata)
|
||||
.service(memories::list_memories)
|
||||
.service(ai::generate_insight_handler)
|
||||
.service(ai::generate_agentic_insight_handler)
|
||||
.service(ai::get_insight_handler)
|
||||
.service(ai::delete_insight_handler)
|
||||
.service(ai::get_all_insights_handler)
|
||||
@@ -1212,21 +1204,18 @@ fn main() -> std::io::Result<()> {
|
||||
.app_data::<Data<Mutex<Box<dyn PreviewDao>>>>(Data::new(Mutex::new(Box::new(
|
||||
preview_dao,
|
||||
))))
|
||||
.app_data(
|
||||
web::JsonConfig::default()
|
||||
.error_handler(|err, req| {
|
||||
let detail = err.to_string();
|
||||
log::warn!(
|
||||
"JSON parse error on {} {}: {}",
|
||||
req.method(),
|
||||
req.uri(),
|
||||
detail
|
||||
);
|
||||
let response = HttpResponse::BadRequest()
|
||||
.json(serde_json::json!({"error": detail}));
|
||||
actix_web::error::InternalError::from_response(err, response).into()
|
||||
}),
|
||||
)
|
||||
.app_data(web::JsonConfig::default().error_handler(|err, req| {
|
||||
let detail = err.to_string();
|
||||
log::warn!(
|
||||
"JSON parse error on {} {}: {}",
|
||||
req.method(),
|
||||
req.uri(),
|
||||
detail
|
||||
);
|
||||
let response =
|
||||
HttpResponse::BadRequest().json(serde_json::json!({"error": detail}));
|
||||
actix_web::error::InternalError::from_response(err, response).into()
|
||||
}))
|
||||
.app_data::<Data<InsightGenerator>>(Data::new(app_data.insight_generator.clone()))
|
||||
.wrap(prometheus.clone())
|
||||
})
|
||||
@@ -1764,9 +1753,7 @@ mod tests {
|
||||
// Verify the DAO now has a pending record
|
||||
let mut dao_lock = preview_dao.lock().unwrap();
|
||||
let ctx = opentelemetry::Context::new();
|
||||
let clip = dao_lock
|
||||
.get_preview(&ctx, "photos/new_video.mp4")
|
||||
.unwrap();
|
||||
let clip = dao_lock.get_preview(&ctx, "photos/new_video.mp4").unwrap();
|
||||
assert!(clip.is_some());
|
||||
assert_eq!(clip.unwrap().status, "pending");
|
||||
}
|
||||
@@ -1777,8 +1764,15 @@ mod tests {
|
||||
let ctx = opentelemetry::Context::new();
|
||||
dao.insert_preview(&ctx, "photos/done.mp4", "pending")
|
||||
.unwrap();
|
||||
dao.update_status(&ctx, "photos/done.mp4", "complete", Some(9.5), Some(500000), None)
|
||||
.unwrap();
|
||||
dao.update_status(
|
||||
&ctx,
|
||||
"photos/done.mp4",
|
||||
"complete",
|
||||
Some(9.5),
|
||||
Some(500000),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let preview_dao = make_preview_dao(dao);
|
||||
let app_state = Data::new(AppState::test_state());
|
||||
@@ -1805,7 +1799,12 @@ mod tests {
|
||||
let previews = body["previews"].as_array().unwrap();
|
||||
assert_eq!(previews.len(), 1);
|
||||
assert_eq!(previews[0]["status"], "complete");
|
||||
assert!(previews[0]["preview_url"].as_str().unwrap().contains("photos%2Fdone.mp4"));
|
||||
assert!(
|
||||
previews[0]["preview_url"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.contains("photos%2Fdone.mp4")
|
||||
);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
|
||||
21
src/state.rs
21
src/state.rs
@@ -5,6 +5,7 @@ use crate::database::{
|
||||
SqliteLocationHistoryDao, SqliteSearchHistoryDao,
|
||||
};
|
||||
use crate::database::{PreviewDao, SqlitePreviewDao};
|
||||
use crate::tags::{SqliteTagDao, TagDao};
|
||||
use crate::video::actors::{
|
||||
PlaylistGenerator, PreviewClipGenerator, StreamActor, VideoPlaylistManager,
|
||||
};
|
||||
@@ -45,11 +46,8 @@ impl AppState {
|
||||
let video_playlist_manager =
|
||||
VideoPlaylistManager::new(video_path.clone(), playlist_generator.start());
|
||||
|
||||
let preview_clip_generator = PreviewClipGenerator::new(
|
||||
preview_clips_path.clone(),
|
||||
base_path.clone(),
|
||||
preview_dao,
|
||||
);
|
||||
let preview_clip_generator =
|
||||
PreviewClipGenerator::new(preview_clips_path.clone(), base_path.clone(), preview_dao);
|
||||
|
||||
Self {
|
||||
stream_manager,
|
||||
@@ -119,6 +117,8 @@ impl Default for AppState {
|
||||
Arc::new(Mutex::new(Box::new(SqliteLocationHistoryDao::new())));
|
||||
let search_dao: Arc<Mutex<Box<dyn SearchHistoryDao>>> =
|
||||
Arc::new(Mutex::new(Box::new(SqliteSearchHistoryDao::new())));
|
||||
let tag_dao: Arc<Mutex<Box<dyn TagDao>>> =
|
||||
Arc::new(Mutex::new(Box::new(SqliteTagDao::default())));
|
||||
|
||||
// Load base path
|
||||
let base_path = env::var("BASE_PATH").expect("BASE_PATH was not set in the env");
|
||||
@@ -133,13 +133,15 @@ impl Default for AppState {
|
||||
calendar_dao.clone(),
|
||||
location_dao.clone(),
|
||||
search_dao.clone(),
|
||||
tag_dao.clone(),
|
||||
base_path.clone(),
|
||||
);
|
||||
|
||||
// Ensure preview clips directory exists
|
||||
let preview_clips_path = env::var("PREVIEW_CLIPS_DIRECTORY")
|
||||
.unwrap_or_else(|_| "preview_clips".to_string());
|
||||
std::fs::create_dir_all(&preview_clips_path).expect("Failed to create PREVIEW_CLIPS_DIRECTORY");
|
||||
let preview_clips_path =
|
||||
env::var("PREVIEW_CLIPS_DIRECTORY").unwrap_or_else(|_| "preview_clips".to_string());
|
||||
std::fs::create_dir_all(&preview_clips_path)
|
||||
.expect("Failed to create PREVIEW_CLIPS_DIRECTORY");
|
||||
|
||||
Self::new(
|
||||
Arc::new(StreamActor {}.start()),
|
||||
@@ -196,6 +198,8 @@ impl AppState {
|
||||
Arc::new(Mutex::new(Box::new(SqliteLocationHistoryDao::new())));
|
||||
let search_dao: Arc<Mutex<Box<dyn SearchHistoryDao>>> =
|
||||
Arc::new(Mutex::new(Box::new(SqliteSearchHistoryDao::new())));
|
||||
let tag_dao: Arc<Mutex<Box<dyn TagDao>>> =
|
||||
Arc::new(Mutex::new(Box::new(SqliteTagDao::default())));
|
||||
|
||||
// Initialize test InsightGenerator with all data sources
|
||||
let base_path_str = base_path.to_string_lossy().to_string();
|
||||
@@ -208,6 +212,7 @@ impl AppState {
|
||||
calendar_dao.clone(),
|
||||
location_dao.clone(),
|
||||
search_dao.clone(),
|
||||
tag_dao.clone(),
|
||||
base_path_str.clone(),
|
||||
);
|
||||
|
||||
|
||||
91
src/tags.rs
91
src/tags.rs
@@ -14,8 +14,8 @@ use opentelemetry::KeyValue;
|
||||
use opentelemetry::trace::{Span, Status, TraceContextExt, Tracer};
|
||||
use schema::{tagged_photo, tags};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::borrow::BorrowMut;
|
||||
use std::sync::Mutex;
|
||||
use std::ops::DerefMut;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
pub fn add_tag_services<T, TagD: TagDao + 'static>(app: App<T>) -> App<T>
|
||||
where
|
||||
@@ -276,7 +276,7 @@ pub struct AddTagsRequest {
|
||||
pub tag_ids: Vec<i32>,
|
||||
}
|
||||
|
||||
pub trait TagDao {
|
||||
pub trait TagDao: Send + Sync {
|
||||
fn get_all_tags(
|
||||
&mut self,
|
||||
context: &opentelemetry::Context,
|
||||
@@ -330,18 +330,20 @@ pub trait TagDao {
|
||||
}
|
||||
|
||||
pub struct SqliteTagDao {
|
||||
connection: SqliteConnection,
|
||||
connection: Arc<Mutex<SqliteConnection>>,
|
||||
}
|
||||
|
||||
impl SqliteTagDao {
|
||||
pub(crate) fn new(connection: SqliteConnection) -> Self {
|
||||
pub(crate) fn new(connection: Arc<Mutex<SqliteConnection>>) -> Self {
|
||||
SqliteTagDao { connection }
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for SqliteTagDao {
|
||||
fn default() -> Self {
|
||||
SqliteTagDao::new(connect())
|
||||
SqliteTagDao {
|
||||
connection: Arc::new(Mutex::new(connect())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -353,6 +355,10 @@ impl TagDao for SqliteTagDao {
|
||||
) -> anyhow::Result<Vec<(i64, Tag)>> {
|
||||
// select name, count(*) from tags join tagged_photo ON tags.id = tagged_photo.tag_id GROUP BY tags.name ORDER BY COUNT(*);
|
||||
|
||||
let mut conn = self
|
||||
.connection
|
||||
.lock()
|
||||
.expect("Unable to lock SqliteTagDao connection");
|
||||
trace_db_call(context, "query", "get_all_tags", |span| {
|
||||
span.set_attribute(KeyValue::new("path", path.clone().unwrap_or_default()));
|
||||
|
||||
@@ -363,7 +369,7 @@ impl TagDao for SqliteTagDao {
|
||||
.group_by(tags::id)
|
||||
.select((count_star(), id, name, created_time))
|
||||
.filter(tagged_photo::photo_name.like(path))
|
||||
.get_results(&mut self.connection)
|
||||
.get_results(conn.deref_mut())
|
||||
.map::<Vec<(i64, Tag)>, _>(|tags_with_count: Vec<(i64, i32, String, i64)>| {
|
||||
tags_with_count
|
||||
.iter()
|
||||
@@ -388,6 +394,10 @@ impl TagDao for SqliteTagDao {
|
||||
context: &opentelemetry::Context,
|
||||
path: &str,
|
||||
) -> anyhow::Result<Vec<Tag>> {
|
||||
let mut conn = self
|
||||
.connection
|
||||
.lock()
|
||||
.expect("Unable to lock SqliteTagDao connection");
|
||||
trace_db_call(context, "query", "get_tags_for_path", |span| {
|
||||
span.set_attribute(KeyValue::new("path", path.to_string()));
|
||||
|
||||
@@ -396,12 +406,16 @@ impl TagDao for SqliteTagDao {
|
||||
.left_join(tagged_photo::table)
|
||||
.filter(tagged_photo::photo_name.eq(&path))
|
||||
.select((tags::id, tags::name, tags::created_time))
|
||||
.get_results::<Tag>(self.connection.borrow_mut())
|
||||
.get_results::<Tag>(conn.deref_mut())
|
||||
.with_context(|| "Unable to get tags from Sqlite")
|
||||
})
|
||||
}
|
||||
|
||||
fn create_tag(&mut self, context: &opentelemetry::Context, name: &str) -> anyhow::Result<Tag> {
|
||||
let mut conn = self
|
||||
.connection
|
||||
.lock()
|
||||
.expect("Unable to lock SqliteTagDao connection");
|
||||
trace_db_call(context, "insert", "create_tag", |span| {
|
||||
span.set_attribute(KeyValue::new("name", name.to_string()));
|
||||
|
||||
@@ -410,7 +424,7 @@ impl TagDao for SqliteTagDao {
|
||||
name: name.to_string(),
|
||||
created_time: Utc::now().timestamp(),
|
||||
})
|
||||
.execute(&mut self.connection)
|
||||
.execute(conn.deref_mut())
|
||||
.with_context(|| format!("Unable to insert tag {:?} in Sqlite", name))
|
||||
.and_then(|_| {
|
||||
info!("Inserted tag: {:?}", name);
|
||||
@@ -418,7 +432,7 @@ impl TagDao for SqliteTagDao {
|
||||
fn last_insert_rowid() -> Integer;
|
||||
}
|
||||
diesel::select(last_insert_rowid())
|
||||
.get_result::<i32>(&mut self.connection)
|
||||
.get_result::<i32>(conn.deref_mut())
|
||||
.with_context(|| "Unable to get last inserted tag from Sqlite")
|
||||
})
|
||||
.and_then(|id| {
|
||||
@@ -426,7 +440,7 @@ impl TagDao for SqliteTagDao {
|
||||
tags::table
|
||||
.filter(tags::id.eq(id))
|
||||
.select((tags::id, tags::name, tags::created_time))
|
||||
.get_result::<Tag>(self.connection.borrow_mut())
|
||||
.get_result::<Tag>(conn.deref_mut())
|
||||
.with_context(|| {
|
||||
format!("Unable to get tagged photo with id: {:?} from Sqlite", id)
|
||||
})
|
||||
@@ -440,6 +454,10 @@ impl TagDao for SqliteTagDao {
|
||||
tag_name: &str,
|
||||
path: &str,
|
||||
) -> anyhow::Result<Option<()>> {
|
||||
let mut conn = self
|
||||
.connection
|
||||
.lock()
|
||||
.expect("Unable to lock SqliteTagDao connection");
|
||||
trace_db_call(context, "delete", "remove_tag", |span| {
|
||||
span.set_attributes(vec![
|
||||
KeyValue::new("tag_name", tag_name.to_string()),
|
||||
@@ -448,7 +466,7 @@ impl TagDao for SqliteTagDao {
|
||||
|
||||
tags::table
|
||||
.filter(tags::name.eq(tag_name))
|
||||
.get_result::<Tag>(self.connection.borrow_mut())
|
||||
.get_result::<Tag>(conn.deref_mut())
|
||||
.optional()
|
||||
.with_context(|| format!("Unable to get tag '{}'", tag_name))
|
||||
.and_then(|tag| {
|
||||
@@ -458,7 +476,7 @@ impl TagDao for SqliteTagDao {
|
||||
.filter(tagged_photo::tag_id.eq(tag.id))
|
||||
.filter(tagged_photo::photo_name.eq(path)),
|
||||
)
|
||||
.execute(&mut self.connection)
|
||||
.execute(conn.deref_mut())
|
||||
.with_context(|| format!("Unable to delete tag: '{}'", &tag.name))
|
||||
.map(|_| Some(()))
|
||||
} else {
|
||||
@@ -475,6 +493,10 @@ impl TagDao for SqliteTagDao {
|
||||
path: &str,
|
||||
tag_id: i32,
|
||||
) -> anyhow::Result<TaggedPhoto> {
|
||||
let mut conn = self
|
||||
.connection
|
||||
.lock()
|
||||
.expect("Unable to lock SqliteTagDao connection");
|
||||
trace_db_call(context, "insert", "tag_file", |span| {
|
||||
span.set_attributes(vec![
|
||||
KeyValue::new("path", path.to_string()),
|
||||
@@ -487,7 +509,7 @@ impl TagDao for SqliteTagDao {
|
||||
photo_name: path.to_string(),
|
||||
created_time: Utc::now().timestamp(),
|
||||
})
|
||||
.execute(self.connection.borrow_mut())
|
||||
.execute(conn.deref_mut())
|
||||
.with_context(|| format!("Unable to tag file {:?} in sqlite", path))
|
||||
.and_then(|_| {
|
||||
info!("Inserted tagged photo: {:#} -> {:?}", tag_id, path);
|
||||
@@ -495,13 +517,13 @@ impl TagDao for SqliteTagDao {
|
||||
fn last_insert_rowid() -> diesel::sql_types::Integer;
|
||||
}
|
||||
diesel::select(last_insert_rowid())
|
||||
.get_result::<i32>(&mut self.connection)
|
||||
.get_result::<i32>(conn.deref_mut())
|
||||
.with_context(|| "Unable to get last inserted tag from Sqlite")
|
||||
})
|
||||
.and_then(|tagged_id| {
|
||||
tagged_photo::table
|
||||
.find(tagged_id)
|
||||
.first(self.connection.borrow_mut())
|
||||
.first(conn.deref_mut())
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Error getting inserted tagged photo with id: {:?}",
|
||||
@@ -518,6 +540,10 @@ impl TagDao for SqliteTagDao {
|
||||
exclude_tag_ids: Vec<i32>,
|
||||
context: &opentelemetry::Context,
|
||||
) -> anyhow::Result<Vec<FileWithTagCount>> {
|
||||
let mut conn = self
|
||||
.connection
|
||||
.lock()
|
||||
.expect("Unable to lock SqliteTagDao connection");
|
||||
trace_db_call(context, "query", "get_files_with_all_tags", |_| {
|
||||
use diesel::dsl::*;
|
||||
|
||||
@@ -564,7 +590,7 @@ impl TagDao for SqliteTagDao {
|
||||
.fold(query, |q, id| q.bind::<Integer, _>(id));
|
||||
|
||||
query
|
||||
.load::<FileWithTagCount>(&mut self.connection)
|
||||
.load::<FileWithTagCount>(conn.deref_mut())
|
||||
.with_context(|| "Unable to get tagged photos with all specified tags")
|
||||
})
|
||||
}
|
||||
@@ -575,6 +601,10 @@ impl TagDao for SqliteTagDao {
|
||||
exclude_tag_ids: Vec<i32>,
|
||||
context: &opentelemetry::Context,
|
||||
) -> anyhow::Result<Vec<FileWithTagCount>> {
|
||||
let mut conn = self
|
||||
.connection
|
||||
.lock()
|
||||
.expect("Unable to lock SqliteTagDao connection");
|
||||
trace_db_call(context, "query", "get_files_with_any_tags", |_| {
|
||||
use diesel::dsl::*;
|
||||
// Create the placeholders for the IN clauses
|
||||
@@ -616,7 +646,7 @@ impl TagDao for SqliteTagDao {
|
||||
.fold(query, |q, id| q.bind::<Integer, _>(id));
|
||||
|
||||
query
|
||||
.load::<FileWithTagCount>(&mut self.connection)
|
||||
.load::<FileWithTagCount>(conn.deref_mut())
|
||||
.with_context(|| "Unable to get tagged photos")
|
||||
})
|
||||
}
|
||||
@@ -629,9 +659,13 @@ impl TagDao for SqliteTagDao {
|
||||
) -> anyhow::Result<()> {
|
||||
use crate::database::schema::tagged_photo::dsl::*;
|
||||
|
||||
let mut conn = self
|
||||
.connection
|
||||
.lock()
|
||||
.expect("Unable to lock SqliteTagDao connection");
|
||||
diesel::update(tagged_photo.filter(photo_name.eq(old_name)))
|
||||
.set(photo_name.eq(new_name))
|
||||
.execute(&mut self.connection)?;
|
||||
.execute(conn.deref_mut())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -641,10 +675,14 @@ impl TagDao for SqliteTagDao {
|
||||
) -> anyhow::Result<Vec<String>> {
|
||||
use crate::database::schema::tagged_photo::dsl::*;
|
||||
|
||||
let mut conn = self
|
||||
.connection
|
||||
.lock()
|
||||
.expect("Unable to lock SqliteTagDao connection");
|
||||
tagged_photo
|
||||
.select(photo_name)
|
||||
.distinct()
|
||||
.load(&mut self.connection)
|
||||
.load(conn.deref_mut())
|
||||
.with_context(|| "Unable to get photo names")
|
||||
}
|
||||
|
||||
@@ -655,6 +693,10 @@ impl TagDao for SqliteTagDao {
|
||||
) -> anyhow::Result<std::collections::HashMap<String, i64>> {
|
||||
use std::collections::HashMap;
|
||||
|
||||
let mut conn = self
|
||||
.connection
|
||||
.lock()
|
||||
.expect("Unable to lock SqliteTagDao connection");
|
||||
trace_db_call(context, "query", "get_tag_counts_batch", |span| {
|
||||
span.set_attribute(KeyValue::new("file_count", file_paths.len() as i64));
|
||||
|
||||
@@ -697,7 +739,7 @@ impl TagDao for SqliteTagDao {
|
||||
|
||||
// Execute query and convert to HashMap
|
||||
query
|
||||
.load::<TagCountRow>(&mut self.connection)
|
||||
.load::<TagCountRow>(conn.deref_mut())
|
||||
.with_context(|| "Unable to get batch tag counts")
|
||||
.map(|rows| {
|
||||
rows.into_iter()
|
||||
@@ -735,6 +777,13 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
// SAFETY: TestTagDao uses RefCell<T> fields which are !Send because they allow
|
||||
// multiple mutable borrows without coordination. This impl is sound because
|
||||
// TestTagDao is test-only, used within a single test function, and never moved
|
||||
// into spawned tasks or shared across threads.
|
||||
unsafe impl Send for TestTagDao {}
|
||||
unsafe impl Sync for TestTagDao {}
|
||||
|
||||
impl TagDao for TestTagDao {
|
||||
fn get_all_tags(
|
||||
&mut self,
|
||||
|
||||
@@ -159,19 +159,21 @@ async fn get_video_rotation(video_path: &str) -> i32 {
|
||||
.await;
|
||||
|
||||
if let Ok(output) = output
|
||||
&& output.status.success() {
|
||||
let rotation_str = String::from_utf8_lossy(&output.stdout);
|
||||
let rotation_str = rotation_str.trim();
|
||||
if !rotation_str.is_empty()
|
||||
&& let Ok(rotation) = rotation_str.parse::<i32>()
|
||||
&& rotation != 0 {
|
||||
debug!(
|
||||
"Detected rotation {}° from stream tag for {}",
|
||||
rotation, video_path
|
||||
);
|
||||
return rotation;
|
||||
}
|
||||
&& output.status.success()
|
||||
{
|
||||
let rotation_str = String::from_utf8_lossy(&output.stdout);
|
||||
let rotation_str = rotation_str.trim();
|
||||
if !rotation_str.is_empty()
|
||||
&& let Ok(rotation) = rotation_str.parse::<i32>()
|
||||
&& rotation != 0
|
||||
{
|
||||
debug!(
|
||||
"Detected rotation {}° from stream tag for {}",
|
||||
rotation, video_path
|
||||
);
|
||||
return rotation;
|
||||
}
|
||||
}
|
||||
|
||||
// Check display matrix side data (modern videos, e.g. iPhone)
|
||||
let output = tokio::process::Command::new("ffprobe")
|
||||
@@ -188,21 +190,23 @@ async fn get_video_rotation(video_path: &str) -> i32 {
|
||||
.await;
|
||||
|
||||
if let Ok(output) = output
|
||||
&& output.status.success() {
|
||||
let rotation_str = String::from_utf8_lossy(&output.stdout);
|
||||
let rotation_str = rotation_str.trim();
|
||||
if !rotation_str.is_empty()
|
||||
&& let Ok(rotation) = rotation_str.parse::<f64>() {
|
||||
let rotation = rotation.abs() as i32;
|
||||
if rotation != 0 {
|
||||
debug!(
|
||||
"Detected rotation {}° from display matrix for {}",
|
||||
rotation, video_path
|
||||
);
|
||||
return rotation;
|
||||
}
|
||||
}
|
||||
&& output.status.success()
|
||||
{
|
||||
let rotation_str = String::from_utf8_lossy(&output.stdout);
|
||||
let rotation_str = rotation_str.trim();
|
||||
if !rotation_str.is_empty()
|
||||
&& let Ok(rotation) = rotation_str.parse::<f64>()
|
||||
{
|
||||
let rotation = rotation.abs() as i32;
|
||||
if rotation != 0 {
|
||||
debug!(
|
||||
"Detected rotation {}° from display matrix for {}",
|
||||
rotation, video_path
|
||||
);
|
||||
return rotation;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
0
|
||||
}
|
||||
@@ -550,7 +554,8 @@ impl Handler<GeneratePreviewClipMessage> for PreviewClipGenerator {
|
||||
{
|
||||
let otel_ctx = opentelemetry::Context::current();
|
||||
let mut dao = preview_dao.lock().expect("Unable to lock PreviewDao");
|
||||
let _ = dao.update_status(&otel_ctx, &relative_path, "processing", None, None, None);
|
||||
let _ =
|
||||
dao.update_status(&otel_ctx, &relative_path, "processing", None, None, None);
|
||||
}
|
||||
|
||||
// Compute output path: join preview_clips_dir with relative path, change ext to .mp4
|
||||
|
||||
@@ -183,7 +183,11 @@ impl Ffmpeg {
|
||||
Ok(output_file.to_string())
|
||||
}
|
||||
|
||||
pub async fn create_gif_from_frames(&self, frame_base_dir: &str, output_file: &str) -> Result<i32> {
|
||||
pub async fn create_gif_from_frames(
|
||||
&self,
|
||||
frame_base_dir: &str,
|
||||
output_file: &str,
|
||||
) -> Result<i32> {
|
||||
let output = Command::new("ffmpeg")
|
||||
.arg("-y")
|
||||
.args(["-framerate", "4"])
|
||||
@@ -278,10 +282,7 @@ pub async fn generate_preview_clip(input_file: &str, output_file: &str) -> Resul
|
||||
"select='lt(mod(t,{:.4}),1)',setpts=N/FRAME_RATE/TB,fps=30,scale=-2:480,format=yuv420p",
|
||||
interval
|
||||
);
|
||||
let af = format!(
|
||||
"aselect='lt(mod(t,{:.4}),1)',asetpts=N/SR/TB",
|
||||
interval
|
||||
);
|
||||
let af = format!("aselect='lt(mod(t,{:.4}),1)',asetpts=N/SR/TB", interval);
|
||||
|
||||
cmd.args(["-vf", &vf]);
|
||||
cmd.args(["-af", &af]);
|
||||
@@ -326,7 +327,10 @@ pub async fn generate_preview_clip(input_file: &str, output_file: &str) -> Resul
|
||||
|
||||
info!(
|
||||
"Generated preview clip '{}' ({:.1}s, {} bytes) in {:?}",
|
||||
output_file, clip_duration, file_size, start.elapsed()
|
||||
output_file,
|
||||
clip_duration,
|
||||
file_size,
|
||||
start.elapsed()
|
||||
);
|
||||
|
||||
Ok((clip_duration, file_size))
|
||||
|
||||
Reference in New Issue
Block a user