From d86b2c3746ca95868f3d8543e50c26727c502526 Mon Sep 17 00:00:00 2001 From: Cameron Date: Mon, 5 Jan 2026 14:50:49 -0500 Subject: [PATCH] Add Google Takeout data import infrastructure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements Phase 1 & 2 of Google Takeout RAG integration: - Database migrations for calendar_events, location_history, search_history - DAO implementations with hybrid time + semantic search - Parsers for .ics, JSON, and HTML Google Takeout formats - Import utilities with batch insert optimization Features: - CalendarEventDao: Hybrid time-range + semantic search for events - LocationHistoryDao: GPS proximity with Haversine distance calculation - SearchHistoryDao: Semantic-first search (queries are embedding-rich) - Batch inserts for performance (1M+ records in minutes vs hours) - OpenTelemetry tracing for all database operations Import utilities: - import_calendar: Parse .ics with optional embedding generation - import_location_history: High-volume GPS data with batch inserts - import_search_history: Always generates embeddings for semantic search 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- Cargo.lock | 342 +++++++++++ Cargo.toml | 4 +- .../down.sql | 1 + .../up.sql | 20 + .../down.sql | 1 + .../up.sql | 19 + .../down.sql | 1 + .../up.sql | 13 + src/ai/insight_generator.rs | 2 +- src/bin/import_calendar.rs | 167 ++++++ src/bin/import_location_history.rs | 115 ++++ src/bin/import_search_history.rs | 154 +++++ src/bin/migrate_exif.rs | 10 +- src/data/mod.rs | 10 +- src/database/calendar_dao.rs | 553 ++++++++++++++++++ src/database/location_dao.rs | 528 +++++++++++++++++ src/database/mod.rs | 14 +- src/database/models.rs | 20 +- src/database/schema.rs | 133 ++++- src/database/search_dao.rs | 516 ++++++++++++++++ src/files.rs | 7 +- src/lib.rs | 1 + src/main.rs | 20 +- src/parsers/ical_parser.rs | 183 ++++++ src/parsers/location_json_parser.rs | 133 +++++ src/parsers/mod.rs | 7 + src/parsers/search_html_parser.rs | 210 +++++++ 27 files changed, 3129 insertions(+), 55 deletions(-) create mode 100644 migrations/2026-01-05-000000_add_calendar_events/down.sql create mode 100644 migrations/2026-01-05-000000_add_calendar_events/up.sql create mode 100644 migrations/2026-01-05-000100_add_location_history/down.sql create mode 100644 migrations/2026-01-05-000100_add_location_history/up.sql create mode 100644 migrations/2026-01-05-000200_add_search_history/down.sql create mode 100644 migrations/2026-01-05-000200_add_search_history/up.sql create mode 100644 src/bin/import_calendar.rs create mode 100644 src/bin/import_location_history.rs create mode 100644 src/bin/import_search_history.rs create mode 100644 src/database/calendar_dao.rs create mode 100644 src/database/location_dao.rs create mode 100644 src/database/search_dao.rs create mode 100644 src/parsers/ical_parser.rs create mode 100644 src/parsers/location_json_parser.rs create mode 100644 src/parsers/mod.rs create mode 100644 src/parsers/search_html_parser.rs diff --git a/Cargo.lock b/Cargo.lock index b964197..7495026 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -340,6 +340,19 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "getrandom 0.3.3", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -848,6 +861,29 @@ dependencies = [ "typenum", ] +[[package]] +name = "cssparser" +version = "0.31.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b3df4f93e5fbbe73ec01ec8d3f68bba73107993a5b1e7519273c32db9b0d5be" +dependencies = [ + "cssparser-macros", + "dtoa-short", + "itoa", + "phf 0.11.3", + "smallvec", +] + +[[package]] +name = "cssparser-macros" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13b588ba4ac1a99f7f2964d24b3d896ddc6bf847ee3855dbd4366f058cfcd331" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "darling" version = "0.20.11" @@ -1024,6 +1060,27 @@ dependencies = [ "syn", ] +[[package]] +name = "dtoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c3cf4824e2d5f025c7b531afcb2325364084a16806f6d47fbc1f5fbd9960590" + +[[package]] +name = "dtoa-short" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd1511a7b6a56299bd043a9c167a6d2bfb37bf84a6dfceaba651168adfb43c87" +dependencies = [ + "dtoa", +] + +[[package]] +name = "ego-tree" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12a0bb14ac04a9fcf170d0bbbef949b44cc492f4452bd20c095636956f653642" + [[package]] name = "either" version = "1.15.0" @@ -1165,6 +1222,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futf" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df420e2e84819663797d1ec6544b13c5be84629e7bb00dc960d6917db2987843" +dependencies = [ + "mac", + "new_debug_unreachable", +] + [[package]] name = "futures" version = "0.3.31" @@ -1260,6 +1327,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -1270,6 +1346,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "getopts" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe4fbac503b8d1f88e6676011885f34b7174f46e59956bba534ba83abded4df" +dependencies = [ + "unicode-width", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -1377,6 +1462,20 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "html5ever" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c13771afe0e6e846f1e67d038d4cb29998a6779f93c809212e4e9c32efd244d4" +dependencies = [ + "log", + "mac", + "markup5ever", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "http" version = "0.2.12" @@ -1557,6 +1656,15 @@ dependencies = [ "cc", ] +[[package]] +name = "ical" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b7cab7543a8b7729a19e2c04309f902861293dcdae6558dfbeb634454d279f6" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "icu_collections" version = "2.0.0" @@ -1708,6 +1816,7 @@ dependencies = [ "dotenv", "env_logger", "futures", + "ical", "image", "infer", "jsonwebtoken", @@ -1726,6 +1835,7 @@ dependencies = [ "rayon", "regex", "reqwest", + "scraper", "serde", "serde_json", "tempfile", @@ -2004,6 +2114,26 @@ dependencies = [ "imgref", ] +[[package]] +name = "mac" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" + +[[package]] +name = "markup5ever" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16ce3abbeba692c8b8441d036ef91aea6df8da2c6b6e21c7e14d3c18e526be45" +dependencies = [ + "log", + "phf 0.11.3", + "phf_codegen 0.11.3", + "string_cache", + "string_cache_codegen", + "tendril", +] + [[package]] name = "maybe-rayon" version = "0.1.1" @@ -2439,6 +2569,96 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "phf" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fabbf1ead8a5bcbc20f5f8b939ee3f5b0f6f281b6ad3468b84656b658b455259" +dependencies = [ + "phf_shared 0.10.0", +] + +[[package]] +name = "phf" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" +dependencies = [ + "phf_macros", + "phf_shared 0.11.3", +] + +[[package]] +name = "phf_codegen" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb1c3a8bc4dd4e5cfce29b44ffc14bedd2ee294559a294e2a4d4c9e9a6a13cd" +dependencies = [ + "phf_generator 0.10.0", + "phf_shared 0.10.0", +] + +[[package]] +name = "phf_codegen" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aef8048c789fa5e851558d709946d6d79a8ff88c0440c587967f8e94bfb1216a" +dependencies = [ + "phf_generator 0.11.3", + "phf_shared 0.11.3", +] + +[[package]] +name = "phf_generator" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d5285893bb5eb82e6aaf5d59ee909a06a16737a8970984dd7746ba9283498d6" +dependencies = [ + "phf_shared 0.10.0", + "rand 0.8.5", +] + +[[package]] +name = "phf_generator" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" +dependencies = [ + "phf_shared 0.11.3", + "rand 0.8.5", +] + +[[package]] +name = "phf_macros" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84ac04429c13a7ff43785d75ad27569f2951ce0ffd30a3321230db2fc727216" +dependencies = [ + "phf_generator 0.11.3", + "phf_shared 0.11.3", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "phf_shared" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096" +dependencies = [ + "siphasher 0.3.11", +] + +[[package]] +name = "phf_shared" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5" +dependencies = [ + "siphasher 1.0.1", +] + [[package]] name = "pin-project" version = "1.1.10" @@ -2529,6 +2749,12 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "precomputed-hash" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" + [[package]] name = "proc-macro2" version = "1.0.101" @@ -2987,6 +3213,22 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "scraper" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b90460b31bfe1fc07be8262e42c665ad97118d4585869de9345a84d501a9eaf0" +dependencies = [ + "ahash", + "cssparser", + "ego-tree", + "getopts", + "html5ever", + "once_cell", + "selectors", + "tendril", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -3010,6 +3252,25 @@ dependencies = [ "libc", ] +[[package]] +name = "selectors" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4eb30575f3638fc8f6815f448d50cb1a2e255b0897985c8c59f4d37b72a07b06" +dependencies = [ + "bitflags", + "cssparser", + "derive_more 0.99.20", + "fxhash", + "log", + "new_debug_unreachable", + "phf 0.10.1", + "phf_codegen 0.10.0", + "precomputed-hash", + "servo_arc", + "smallvec", +] + [[package]] name = "semver" version = "1.0.26" @@ -3087,6 +3348,15 @@ dependencies = [ "serde", ] +[[package]] +name = "servo_arc" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d036d71a959e00c77a63538b90a6c2390969f9772b096ea837205c6bd0491a44" +dependencies = [ + "stable_deref_trait", +] + [[package]] name = "sha1" version = "0.10.6" @@ -3140,6 +3410,18 @@ dependencies = [ "time", ] +[[package]] +name = "siphasher" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" + +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" + [[package]] name = "slab" version = "0.4.11" @@ -3193,6 +3475,31 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29fdc163db75f7b5ffa3daf0c5a7136fb0d4b2f35523cd1769da05e034159feb" +[[package]] +name = "string_cache" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf776ba3fa74f83bf4b63c3dcbbf82173db2632ed8452cb2d891d33f459de70f" +dependencies = [ + "new_debug_unreachable", + "parking_lot", + "phf_shared 0.11.3", + "precomputed-hash", + "serde", +] + +[[package]] +name = "string_cache_codegen" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c711928715f1fe0fe509c53b43e993a9a557babc2d0a3567d0a3006f1ac931a0" +dependencies = [ + "phf_generator 0.11.3", + "phf_shared 0.11.3", + "proc-macro2", + "quote", +] + [[package]] name = "strsim" version = "0.11.1" @@ -3289,6 +3596,17 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "tendril" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d24a120c5fc464a3458240ee02c299ebcb9d67b5249c8848b09d639dca8d7bb0" +dependencies = [ + "futf", + "mac", + "utf-8", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -3385,9 +3703,21 @@ dependencies = [ "signal-hook-registry", "slab", "socket2 0.6.0", + "tokio-macros", "windows-sys 0.59.0", ] +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" @@ -3647,6 +3977,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "unicode-width" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" + [[package]] name = "unicode-xid" version = "0.2.6" @@ -3677,6 +4013,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" diff --git a/Cargo.toml b/Cargo.toml index 27e6e78..1a7bf56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ lto = "thin" actix = "0.13.1" actix-web = "4" actix-rt = "2.6" -tokio = { version = "1.42.0", features = ["default", "process", "sync"] } +tokio = { version = "1.42.0", features = ["default", "process", "sync", "macros", "rt-multi-thread"] } actix-files = "0.6" actix-cors = "0.7" actix-multipart = "0.7.2" @@ -52,3 +52,5 @@ exif = { package = "kamadak-exif", version = "0.6.1" } reqwest = { version = "0.12", features = ["json"] } urlencoding = "2.1" zerocopy = "0.8" +ical = "0.11" +scraper = "0.20" diff --git a/migrations/2026-01-05-000000_add_calendar_events/down.sql b/migrations/2026-01-05-000000_add_calendar_events/down.sql new file mode 100644 index 0000000..70fbec6 --- /dev/null +++ b/migrations/2026-01-05-000000_add_calendar_events/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS calendar_events; diff --git a/migrations/2026-01-05-000000_add_calendar_events/up.sql b/migrations/2026-01-05-000000_add_calendar_events/up.sql new file mode 100644 index 0000000..b2e477d --- /dev/null +++ b/migrations/2026-01-05-000000_add_calendar_events/up.sql @@ -0,0 +1,20 @@ +CREATE TABLE calendar_events ( + id INTEGER PRIMARY KEY NOT NULL, + event_uid TEXT, + summary TEXT NOT NULL, + description TEXT, + location TEXT, + start_time BIGINT NOT NULL, + end_time BIGINT NOT NULL, + all_day BOOLEAN NOT NULL DEFAULT 0, + organizer TEXT, + attendees TEXT, + embedding BLOB, + created_at BIGINT NOT NULL, + source_file TEXT, + UNIQUE(event_uid, start_time) +); + +CREATE INDEX idx_calendar_start_time ON calendar_events(start_time); +CREATE INDEX idx_calendar_end_time ON calendar_events(end_time); +CREATE INDEX idx_calendar_time_range ON calendar_events(start_time, end_time); diff --git a/migrations/2026-01-05-000100_add_location_history/down.sql b/migrations/2026-01-05-000100_add_location_history/down.sql new file mode 100644 index 0000000..8c39663 --- /dev/null +++ b/migrations/2026-01-05-000100_add_location_history/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS location_history; diff --git a/migrations/2026-01-05-000100_add_location_history/up.sql b/migrations/2026-01-05-000100_add_location_history/up.sql new file mode 100644 index 0000000..2d66948 --- /dev/null +++ b/migrations/2026-01-05-000100_add_location_history/up.sql @@ -0,0 +1,19 @@ +CREATE TABLE location_history ( + id INTEGER PRIMARY KEY NOT NULL, + timestamp BIGINT NOT NULL, + latitude REAL NOT NULL, + longitude REAL NOT NULL, + accuracy INTEGER, + activity TEXT, + activity_confidence INTEGER, + place_name TEXT, + place_category TEXT, + embedding BLOB, + created_at BIGINT NOT NULL, + source_file TEXT, + UNIQUE(timestamp, latitude, longitude) +); + +CREATE INDEX idx_location_timestamp ON location_history(timestamp); +CREATE INDEX idx_location_coords ON location_history(latitude, longitude); +CREATE INDEX idx_location_activity ON location_history(activity); diff --git a/migrations/2026-01-05-000200_add_search_history/down.sql b/migrations/2026-01-05-000200_add_search_history/down.sql new file mode 100644 index 0000000..2842ca4 --- /dev/null +++ b/migrations/2026-01-05-000200_add_search_history/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS search_history; diff --git a/migrations/2026-01-05-000200_add_search_history/up.sql b/migrations/2026-01-05-000200_add_search_history/up.sql new file mode 100644 index 0000000..4e7fe11 --- /dev/null +++ b/migrations/2026-01-05-000200_add_search_history/up.sql @@ -0,0 +1,13 @@ +CREATE TABLE search_history ( + id INTEGER PRIMARY KEY NOT NULL, + timestamp BIGINT NOT NULL, + query TEXT NOT NULL, + search_engine TEXT, + embedding BLOB NOT NULL, + created_at BIGINT NOT NULL, + source_file TEXT, + UNIQUE(timestamp, query) +); + +CREATE INDEX idx_search_timestamp ON search_history(timestamp); +CREATE INDEX idx_search_query ON search_history(query); diff --git a/src/ai/insight_generator.rs b/src/ai/insight_generator.rs index 1141cc0..b47b8cf 100644 --- a/src/ai/insight_generator.rs +++ b/src/ai/insight_generator.rs @@ -339,7 +339,7 @@ impl InsightGenerator { let location = match exif { Some(ref exif) => { if let (Some(lat), Some(lon)) = (exif.gps_latitude, exif.gps_longitude) { - let loc = self.reverse_geocode(lat, lon).await; + let loc = self.reverse_geocode(lat as f64, lon as f64).await; if let Some(ref l) = loc { insight_cx .span() diff --git a/src/bin/import_calendar.rs b/src/bin/import_calendar.rs new file mode 100644 index 0000000..e7b1b2c --- /dev/null +++ b/src/bin/import_calendar.rs @@ -0,0 +1,167 @@ +use anyhow::{Context, Result}; +use chrono::Utc; +use clap::Parser; +use image_api::ai::ollama::OllamaClient; +use image_api::database::calendar_dao::{InsertCalendarEvent, SqliteCalendarEventDao}; +use image_api::parsers::ical_parser::parse_ics_file; +use log::{error, info}; +use std::sync::{Arc, Mutex}; + +// Import the trait to use its methods +use image_api::database::CalendarEventDao; + +#[derive(Parser, Debug)] +#[command(author, version, about = "Import Google Takeout Calendar data", long_about = None)] +struct Args { + /// Path to the .ics calendar file + #[arg(short, long)] + path: String, + + /// Generate embeddings for calendar events (slower but enables semantic search) + #[arg(long, default_value = "false")] + generate_embeddings: bool, + + /// Skip events that already exist in the database + #[arg(long, default_value = "true")] + skip_existing: bool, + + /// Batch size for embedding generation + #[arg(long, default_value = "128")] + batch_size: usize, +} + +#[tokio::main] +async fn main() -> Result<()> { + dotenv::dotenv().ok(); + env_logger::init(); + + let args = Args::parse(); + + info!("Parsing calendar file: {}", args.path); + let events = parse_ics_file(&args.path).context("Failed to parse .ics file")?; + + info!("Found {} calendar events", events.len()); + + let context = opentelemetry::Context::current(); + + let ollama = if args.generate_embeddings { + let primary_url = dotenv::var("OLLAMA_PRIMARY_URL") + .or_else(|_| dotenv::var("OLLAMA_URL")) + .unwrap_or_else(|_| "http://localhost:11434".to_string()); + let fallback_url = dotenv::var("OLLAMA_FALLBACK_URL").ok(); + let primary_model = dotenv::var("OLLAMA_PRIMARY_MODEL") + .or_else(|_| dotenv::var("OLLAMA_MODEL")) + .unwrap_or_else(|_| "nomic-embed-text:v1.5".to_string()); + let fallback_model = dotenv::var("OLLAMA_FALLBACK_MODEL").ok(); + + Some(OllamaClient::new( + primary_url, + fallback_url, + primary_model, + fallback_model, + )) + } else { + None + }; + + let inserted_count = Arc::new(Mutex::new(0)); + let skipped_count = Arc::new(Mutex::new(0)); + let error_count = Arc::new(Mutex::new(0)); + + // Process events in batches + // Can't use rayon with async, so process sequentially + for event in &events { + let mut dao_instance = SqliteCalendarEventDao::new(); + + // Check if event exists + if args.skip_existing { + if let Ok(exists) = dao_instance.event_exists( + &context, + event.event_uid.as_deref().unwrap_or(""), + event.start_time, + ) { + if exists { + *skipped_count.lock().unwrap() += 1; + continue; + } + } + } + + // Generate embedding if requested (blocking call) + let embedding = if let Some(ref ollama_client) = ollama { + let text = format!( + "{} {} {}", + event.summary, + event.description.as_deref().unwrap_or(""), + event.location.as_deref().unwrap_or("") + ); + + match tokio::task::block_in_place(|| { + tokio::runtime::Handle::current() + .block_on(async { ollama_client.generate_embedding(&text).await }) + }) { + Ok(emb) => Some(emb), + Err(e) => { + error!( + "Failed to generate embedding for event '{}': {}", + event.summary, e + ); + None + } + } + } else { + None + }; + + // Insert into database + let insert_event = InsertCalendarEvent { + event_uid: event.event_uid.clone(), + summary: event.summary.clone(), + description: event.description.clone(), + location: event.location.clone(), + start_time: event.start_time, + end_time: event.end_time, + all_day: event.all_day, + organizer: event.organizer.clone(), + attendees: if event.attendees.is_empty() { + None + } else { + Some(serde_json::to_string(&event.attendees).unwrap_or_default()) + }, + embedding, + created_at: Utc::now().timestamp(), + source_file: Some(args.path.clone()), + }; + + match dao_instance.store_event(&context, insert_event) { + Ok(_) => { + *inserted_count.lock().unwrap() += 1; + if *inserted_count.lock().unwrap() % 100 == 0 { + info!("Imported {} events...", *inserted_count.lock().unwrap()); + } + } + Err(e) => { + error!("Failed to store event '{}': {:?}", event.summary, e); + *error_count.lock().unwrap() += 1; + } + } + } + + let final_inserted = *inserted_count.lock().unwrap(); + let final_skipped = *skipped_count.lock().unwrap(); + let final_errors = *error_count.lock().unwrap(); + + info!("\n=== Import Summary ==="); + info!("Total events found: {}", events.len()); + info!("Successfully inserted: {}", final_inserted); + info!("Skipped (already exist): {}", final_skipped); + info!("Errors: {}", final_errors); + + if args.generate_embeddings { + info!("Embeddings were generated for semantic search"); + } else { + info!("No embeddings generated (use --generate-embeddings to enable semantic search)"); + } + + Ok(()) +} diff --git a/src/bin/import_location_history.rs b/src/bin/import_location_history.rs new file mode 100644 index 0000000..a0437a1 --- /dev/null +++ b/src/bin/import_location_history.rs @@ -0,0 +1,115 @@ +use anyhow::{Context, Result}; +use chrono::Utc; +use clap::Parser; +use image_api::database::location_dao::{InsertLocationRecord, SqliteLocationHistoryDao}; +use image_api::parsers::location_json_parser::parse_location_json; +use log::{error, info}; +// Import the trait to use its methods +use image_api::database::LocationHistoryDao; + +#[derive(Parser, Debug)] +#[command(author, version, about = "Import Google Takeout Location History data", long_about = None)] +struct Args { + /// Path to the Location History JSON file + #[arg(short, long)] + path: String, + + /// Skip locations that already exist in the database + #[arg(long, default_value = "true")] + skip_existing: bool, + + /// Batch size for database inserts + #[arg(long, default_value = "1000")] + batch_size: usize, +} + +#[tokio::main] +async fn main() -> Result<()> { + dotenv::dotenv().ok(); + env_logger::init(); + + let args = Args::parse(); + + info!("Parsing location history file: {}", args.path); + let locations = + parse_location_json(&args.path).context("Failed to parse location history JSON")?; + + info!("Found {} location records", locations.len()); + + let context = opentelemetry::Context::current(); + + let mut inserted_count = 0; + let mut skipped_count = 0; + let mut error_count = 0; + + let mut dao_instance = SqliteLocationHistoryDao::new(); + let created_at = Utc::now().timestamp(); + + // Process in batches using batch insert for massive speedup + for (batch_idx, chunk) in locations.chunks(args.batch_size).enumerate() { + info!( + "Processing batch {} ({} records)...", + batch_idx + 1, + chunk.len() + ); + + // Convert to InsertLocationRecord + let mut batch_inserts = Vec::with_capacity(chunk.len()); + + for location in chunk { + // Skip existing check if requested (makes import much slower) + if args.skip_existing { + if let Ok(exists) = dao_instance.location_exists( + &context, + location.timestamp, + location.latitude, + location.longitude, + ) { + if exists { + skipped_count += 1; + continue; + } + } + } + + batch_inserts.push(InsertLocationRecord { + timestamp: location.timestamp, + latitude: location.latitude, + longitude: location.longitude, + accuracy: location.accuracy, + activity: location.activity.clone(), + activity_confidence: location.activity_confidence, + place_name: None, + place_category: None, + embedding: None, + created_at, + source_file: Some(args.path.clone()), + }); + } + + // Batch insert entire chunk in single transaction + if !batch_inserts.is_empty() { + match dao_instance.store_locations_batch(&context, batch_inserts) { + Ok(count) => { + inserted_count += count; + info!( + "Imported {} locations (total: {})...", + count, inserted_count + ); + } + Err(e) => { + error!("Failed to store batch: {:?}", e); + error_count += chunk.len(); + } + } + } + } + + info!("\n=== Import Summary ==="); + info!("Total locations found: {}", locations.len()); + info!("Successfully inserted: {}", inserted_count); + info!("Skipped (already exist): {}", skipped_count); + info!("Errors: {}", error_count); + + Ok(()) +} diff --git a/src/bin/import_search_history.rs b/src/bin/import_search_history.rs new file mode 100644 index 0000000..3438230 --- /dev/null +++ b/src/bin/import_search_history.rs @@ -0,0 +1,154 @@ +use anyhow::{Context, Result}; +use chrono::Utc; +use clap::Parser; +use image_api::ai::ollama::OllamaClient; +use image_api::database::search_dao::{InsertSearchRecord, SqliteSearchHistoryDao}; +use image_api::parsers::search_html_parser::parse_search_html; +use log::{error, info, warn}; + +// Import the trait to use its methods +use image_api::database::SearchHistoryDao; + +#[derive(Parser, Debug)] +#[command(author, version, about = "Import Google Takeout Search History data", long_about = None)] +struct Args { + /// Path to the search history HTML file + #[arg(short, long)] + path: String, + + /// Skip searches that already exist in the database + #[arg(long, default_value = "true")] + skip_existing: bool, + + /// Batch size for embedding generation (max 128 recommended) + #[arg(long, default_value = "64")] + batch_size: usize, +} + +#[tokio::main] +async fn main() -> Result<()> { + dotenv::dotenv().ok(); + env_logger::init(); + + let args = Args::parse(); + + info!("Parsing search history file: {}", args.path); + let searches = parse_search_html(&args.path).context("Failed to parse search history HTML")?; + + info!("Found {} search records", searches.len()); + + let primary_url = dotenv::var("OLLAMA_PRIMARY_URL") + .or_else(|_| dotenv::var("OLLAMA_URL")) + .unwrap_or_else(|_| "http://localhost:11434".to_string()); + let fallback_url = dotenv::var("OLLAMA_FALLBACK_URL").ok(); + let primary_model = dotenv::var("OLLAMA_PRIMARY_MODEL") + .or_else(|_| dotenv::var("OLLAMA_MODEL")) + .unwrap_or_else(|_| "nomic-embed-text:v1.5".to_string()); + let fallback_model = dotenv::var("OLLAMA_FALLBACK_MODEL").ok(); + + let ollama = OllamaClient::new(primary_url, fallback_url, primary_model, fallback_model); + let context = opentelemetry::Context::current(); + + let mut inserted_count = 0; + let mut skipped_count = 0; + let mut error_count = 0; + + let mut dao_instance = SqliteSearchHistoryDao::new(); + let created_at = Utc::now().timestamp(); + + // Process searches in batches (embeddings are REQUIRED for searches) + for (batch_idx, chunk) in searches.chunks(args.batch_size).enumerate() { + info!( + "Processing batch {} ({} searches)...", + batch_idx + 1, + chunk.len() + ); + + // Generate embeddings for this batch + let queries: Vec = chunk.iter().map(|s| s.query.clone()).collect(); + + let embeddings_result = tokio::task::spawn({ + let ollama_client = ollama.clone(); + async move { + // Generate embeddings in parallel for the batch + let mut embeddings = Vec::new(); + for query in &queries { + match ollama_client.generate_embedding(query).await { + Ok(emb) => embeddings.push(Some(emb)), + Err(e) => { + warn!("Failed to generate embedding for query '{}': {}", query, e); + embeddings.push(None); + } + } + } + embeddings + } + }) + .await + .context("Failed to generate embeddings for batch")?; + + // Build batch of searches with embeddings + let mut batch_inserts = Vec::new(); + + for (search, embedding_opt) in chunk.iter().zip(embeddings_result.iter()) { + // Check if search exists (optional for speed) + if args.skip_existing { + if let Ok(exists) = + dao_instance.search_exists(&context, search.timestamp, &search.query) + { + if exists { + skipped_count += 1; + continue; + } + } + } + + // Only insert if we have an embedding + if let Some(embedding) = embedding_opt { + batch_inserts.push(InsertSearchRecord { + timestamp: search.timestamp, + query: search.query.clone(), + search_engine: search.search_engine.clone(), + embedding: embedding.clone(), + created_at, + source_file: Some(args.path.clone()), + }); + } else { + error!( + "Skipping search '{}' due to missing embedding", + search.query + ); + error_count += 1; + } + } + + // Batch insert entire chunk in single transaction + if !batch_inserts.is_empty() { + match dao_instance.store_searches_batch(&context, batch_inserts) { + Ok(count) => { + inserted_count += count; + info!("Imported {} searches (total: {})...", count, inserted_count); + } + Err(e) => { + error!("Failed to store batch: {:?}", e); + error_count += chunk.len(); + } + } + } + + // Rate limiting between batches + if batch_idx < searches.len() / args.batch_size { + info!("Waiting 500ms before next batch..."); + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + } + } + + info!("\n=== Import Summary ==="); + info!("Total searches found: {}", searches.len()); + info!("Successfully inserted: {}", inserted_count); + info!("Skipped (already exist): {}", skipped_count); + info!("Errors: {}", error_count); + info!("All imported searches have embeddings for semantic search"); + + Ok(()) +} diff --git a/src/bin/migrate_exif.rs b/src/bin/migrate_exif.rs index 5f5af9d..98e83dc 100644 --- a/src/bin/migrate_exif.rs +++ b/src/bin/migrate_exif.rs @@ -102,11 +102,11 @@ fn main() -> anyhow::Result<()> { width: exif_data.width, height: exif_data.height, orientation: exif_data.orientation, - gps_latitude: exif_data.gps_latitude, - gps_longitude: exif_data.gps_longitude, - gps_altitude: exif_data.gps_altitude, - focal_length: exif_data.focal_length, - aperture: exif_data.aperture, + gps_latitude: exif_data.gps_latitude.map(|v| v as f32), + gps_longitude: exif_data.gps_longitude.map(|v| v as f32), + gps_altitude: exif_data.gps_altitude.map(|v| v as f32), + focal_length: exif_data.focal_length.map(|v| v as f32), + aperture: exif_data.aperture.map(|v| v as f32), shutter_speed: exif_data.shutter_speed, iso: exif_data.iso, date_taken: exif_data.date_taken, diff --git a/src/data/mod.rs b/src/data/mod.rs index 70a3362..fa402b5 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -298,17 +298,17 @@ impl From for ExifMetadata { }, gps: if has_gps { Some(GpsCoordinates { - latitude: exif.gps_latitude, - longitude: exif.gps_longitude, - altitude: exif.gps_altitude, + latitude: exif.gps_latitude.map(|v| v as f64), + longitude: exif.gps_longitude.map(|v| v as f64), + altitude: exif.gps_altitude.map(|v| v as f64), }) } else { None }, capture_settings: if has_capture_settings { Some(CaptureSettings { - focal_length: exif.focal_length, - aperture: exif.aperture, + focal_length: exif.focal_length.map(|v| v as f64), + aperture: exif.aperture.map(|v| v as f64), shutter_speed: exif.shutter_speed, iso: exif.iso, }) diff --git a/src/database/calendar_dao.rs b/src/database/calendar_dao.rs new file mode 100644 index 0000000..e1afefd --- /dev/null +++ b/src/database/calendar_dao.rs @@ -0,0 +1,553 @@ +use diesel::prelude::*; +use diesel::sqlite::SqliteConnection; +use serde::Serialize; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex}; + +use crate::database::{DbError, DbErrorKind, connect}; +use crate::otel::trace_db_call; + +/// Represents a calendar event +#[derive(Serialize, Clone, Debug)] +pub struct CalendarEvent { + pub id: i32, + pub event_uid: Option, + pub summary: String, + pub description: Option, + pub location: Option, + pub start_time: i64, + pub end_time: i64, + pub all_day: bool, + pub organizer: Option, + pub attendees: Option, // JSON string + pub created_at: i64, + pub source_file: Option, +} + +/// Data for inserting a new calendar event +#[derive(Clone, Debug)] +pub struct InsertCalendarEvent { + pub event_uid: Option, + pub summary: String, + pub description: Option, + pub location: Option, + pub start_time: i64, + pub end_time: i64, + pub all_day: bool, + pub organizer: Option, + pub attendees: Option, + pub embedding: Option>, // 768-dim, optional + pub created_at: i64, + pub source_file: Option, +} + +pub trait CalendarEventDao: Sync + Send { + /// Store calendar event with optional embedding + fn store_event( + &mut self, + context: &opentelemetry::Context, + event: InsertCalendarEvent, + ) -> Result; + + /// Batch insert events (for import efficiency) + fn store_events_batch( + &mut self, + context: &opentelemetry::Context, + events: Vec, + ) -> Result; + + /// Find events in time range (PRIMARY query method) + fn find_events_in_range( + &mut self, + context: &opentelemetry::Context, + start_ts: i64, + end_ts: i64, + ) -> Result, DbError>; + + /// Find semantically similar events (SECONDARY - requires embeddings) + fn find_similar_events( + &mut self, + context: &opentelemetry::Context, + query_embedding: &[f32], + limit: usize, + ) -> Result, DbError>; + + /// Hybrid: Time-filtered + semantic ranking + /// "Events during photo timestamp ±N days, ranked by similarity to context" + fn find_relevant_events_hybrid( + &mut self, + context: &opentelemetry::Context, + center_timestamp: i64, + time_window_days: i64, + query_embedding: Option<&[f32]>, + limit: usize, + ) -> Result, DbError>; + + /// Check if event exists (idempotency) + fn event_exists( + &mut self, + context: &opentelemetry::Context, + event_uid: &str, + start_time: i64, + ) -> Result; + + /// Get count of events + fn get_event_count(&mut self, context: &opentelemetry::Context) -> Result; +} + +pub struct SqliteCalendarEventDao { + connection: Arc>, +} + +impl Default for SqliteCalendarEventDao { + fn default() -> Self { + Self::new() + } +} + +impl SqliteCalendarEventDao { + pub fn new() -> Self { + SqliteCalendarEventDao { + connection: Arc::new(Mutex::new(connect())), + } + } + + fn serialize_vector(vec: &[f32]) -> Vec { + use zerocopy::IntoBytes; + vec.as_bytes().to_vec() + } + + fn deserialize_vector(bytes: &[u8]) -> Result, DbError> { + if bytes.len() % 4 != 0 { + return Err(DbError::new(DbErrorKind::QueryError)); + } + + let count = bytes.len() / 4; + let mut vec = Vec::with_capacity(count); + + for chunk in bytes.chunks_exact(4) { + let float = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]); + vec.push(float); + } + + Ok(vec) + } + + fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 { + if a.len() != b.len() { + return 0.0; + } + + let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum(); + let magnitude_a: f32 = a.iter().map(|x| x * x).sum::().sqrt(); + let magnitude_b: f32 = b.iter().map(|x| x * x).sum::().sqrt(); + + if magnitude_a == 0.0 || magnitude_b == 0.0 { + return 0.0; + } + + dot_product / (magnitude_a * magnitude_b) + } +} + +#[derive(QueryableByName)] +struct CalendarEventWithVectorRow { + #[diesel(sql_type = diesel::sql_types::Integer)] + id: i32, + #[diesel(sql_type = diesel::sql_types::Nullable)] + event_uid: Option, + #[diesel(sql_type = diesel::sql_types::Text)] + summary: String, + #[diesel(sql_type = diesel::sql_types::Nullable)] + description: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + location: Option, + #[diesel(sql_type = diesel::sql_types::BigInt)] + start_time: i64, + #[diesel(sql_type = diesel::sql_types::BigInt)] + end_time: i64, + #[diesel(sql_type = diesel::sql_types::Bool)] + all_day: bool, + #[diesel(sql_type = diesel::sql_types::Nullable)] + organizer: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + attendees: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + embedding: Option>, + #[diesel(sql_type = diesel::sql_types::BigInt)] + created_at: i64, + #[diesel(sql_type = diesel::sql_types::Nullable)] + source_file: Option, +} + +impl CalendarEventWithVectorRow { + fn to_calendar_event(&self) -> CalendarEvent { + CalendarEvent { + id: self.id, + event_uid: self.event_uid.clone(), + summary: self.summary.clone(), + description: self.description.clone(), + location: self.location.clone(), + start_time: self.start_time, + end_time: self.end_time, + all_day: self.all_day, + organizer: self.organizer.clone(), + attendees: self.attendees.clone(), + created_at: self.created_at, + source_file: self.source_file.clone(), + } + } +} + +#[derive(QueryableByName)] +struct LastInsertRowId { + #[diesel(sql_type = diesel::sql_types::Integer)] + id: i32, +} + +impl CalendarEventDao for SqliteCalendarEventDao { + fn store_event( + &mut self, + context: &opentelemetry::Context, + event: InsertCalendarEvent, + ) -> Result { + trace_db_call(context, "insert", "store_event", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get CalendarEventDao"); + + // Validate embedding dimensions if provided + if let Some(ref emb) = event.embedding { + if emb.len() != 768 { + return Err(anyhow::anyhow!( + "Invalid embedding dimensions: {} (expected 768)", + emb.len() + )); + } + } + + let embedding_bytes = event.embedding.as_ref().map(|e| Self::serialize_vector(e)); + + // INSERT OR REPLACE to handle re-imports + diesel::sql_query( + "INSERT OR REPLACE INTO calendar_events + (event_uid, summary, description, location, start_time, end_time, all_day, + organizer, attendees, embedding, created_at, source_file) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)", + ) + .bind::, _>(&event.event_uid) + .bind::(&event.summary) + .bind::, _>(&event.description) + .bind::, _>(&event.location) + .bind::(event.start_time) + .bind::(event.end_time) + .bind::(event.all_day) + .bind::, _>(&event.organizer) + .bind::, _>(&event.attendees) + .bind::, _>(&embedding_bytes) + .bind::(event.created_at) + .bind::, _>(&event.source_file) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Insert error: {:?}", e))?; + + let row_id: i32 = diesel::sql_query("SELECT last_insert_rowid() as id") + .get_result::(conn.deref_mut()) + .map(|r| r.id) + .map_err(|e| anyhow::anyhow!("Failed to get last insert ID: {:?}", e))?; + + Ok(CalendarEvent { + id: row_id, + event_uid: event.event_uid, + summary: event.summary, + description: event.description, + location: event.location, + start_time: event.start_time, + end_time: event.end_time, + all_day: event.all_day, + organizer: event.organizer, + attendees: event.attendees, + created_at: event.created_at, + source_file: event.source_file, + }) + }) + .map_err(|_| DbError::new(DbErrorKind::InsertError)) + } + + fn store_events_batch( + &mut self, + context: &opentelemetry::Context, + events: Vec, + ) -> Result { + trace_db_call(context, "insert", "store_events_batch", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get CalendarEventDao"); + let mut inserted = 0; + + conn.transaction::<_, anyhow::Error, _>(|conn| { + for event in events { + // Validate embedding if provided + if let Some(ref emb) = event.embedding { + if emb.len() != 768 { + log::warn!( + "Skipping event with invalid embedding dimensions: {}", + emb.len() + ); + continue; + } + } + + let embedding_bytes = + event.embedding.as_ref().map(|e| Self::serialize_vector(e)); + + diesel::sql_query( + "INSERT OR REPLACE INTO calendar_events + (event_uid, summary, description, location, start_time, end_time, all_day, + organizer, attendees, embedding, created_at, source_file) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)", + ) + .bind::, _>( + &event.event_uid, + ) + .bind::(&event.summary) + .bind::, _>( + &event.description, + ) + .bind::, _>( + &event.location, + ) + .bind::(event.start_time) + .bind::(event.end_time) + .bind::(event.all_day) + .bind::, _>( + &event.organizer, + ) + .bind::, _>( + &event.attendees, + ) + .bind::, _>( + &embedding_bytes, + ) + .bind::(event.created_at) + .bind::, _>( + &event.source_file, + ) + .execute(conn) + .map_err(|e| anyhow::anyhow!("Batch insert error: {:?}", e))?; + + inserted += 1; + } + Ok(()) + }) + .map_err(|e| anyhow::anyhow!("Transaction error: {:?}", e))?; + + Ok(inserted) + }) + .map_err(|_| DbError::new(DbErrorKind::InsertError)) + } + + fn find_events_in_range( + &mut self, + context: &opentelemetry::Context, + start_ts: i64, + end_ts: i64, + ) -> Result, DbError> { + trace_db_call(context, "query", "find_events_in_range", |_span| { + let mut conn = self.connection.lock().expect("Unable to get CalendarEventDao"); + + diesel::sql_query( + "SELECT id, event_uid, summary, description, location, start_time, end_time, all_day, + organizer, attendees, NULL as embedding, created_at, source_file + FROM calendar_events + WHERE start_time >= ?1 AND start_time <= ?2 + ORDER BY start_time ASC" + ) + .bind::(start_ts) + .bind::(end_ts) + .load::(conn.deref_mut()) + .map(|rows| rows.into_iter().map(|r| r.to_calendar_event()).collect()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn find_similar_events( + &mut self, + context: &opentelemetry::Context, + query_embedding: &[f32], + limit: usize, + ) -> Result, DbError> { + trace_db_call(context, "query", "find_similar_events", |_span| { + let mut conn = self.connection.lock().expect("Unable to get CalendarEventDao"); + + if query_embedding.len() != 768 { + return Err(anyhow::anyhow!( + "Invalid query embedding dimensions: {} (expected 768)", + query_embedding.len() + )); + } + + // Load all events with embeddings + let results = diesel::sql_query( + "SELECT id, event_uid, summary, description, location, start_time, end_time, all_day, + organizer, attendees, embedding, created_at, source_file + FROM calendar_events + WHERE embedding IS NOT NULL" + ) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; + + // Compute similarities + let mut scored_events: Vec<(f32, CalendarEvent)> = results + .into_iter() + .filter_map(|row| { + if let Some(ref emb_bytes) = row.embedding { + if let Ok(emb) = Self::deserialize_vector(emb_bytes) { + let similarity = Self::cosine_similarity(query_embedding, &emb); + Some((similarity, row.to_calendar_event())) + } else { + None + } + } else { + None + } + }) + .collect(); + + // Sort by similarity descending + scored_events.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); + + log::info!("Found {} similar calendar events", scored_events.len()); + if !scored_events.is_empty() { + log::info!("Top similarity: {:.4}", scored_events[0].0); + } + + Ok(scored_events.into_iter().take(limit).map(|(_, event)| event).collect()) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn find_relevant_events_hybrid( + &mut self, + context: &opentelemetry::Context, + center_timestamp: i64, + time_window_days: i64, + query_embedding: Option<&[f32]>, + limit: usize, + ) -> Result, DbError> { + trace_db_call(context, "query", "find_relevant_events_hybrid", |_span| { + let window_seconds = time_window_days * 86400; + let start_ts = center_timestamp - window_seconds; + let end_ts = center_timestamp + window_seconds; + + let mut conn = self.connection.lock().expect("Unable to get CalendarEventDao"); + + // Step 1: Time-based filter (fast, indexed) + let events_in_range = diesel::sql_query( + "SELECT id, event_uid, summary, description, location, start_time, end_time, all_day, + organizer, attendees, embedding, created_at, source_file + FROM calendar_events + WHERE start_time >= ?1 AND start_time <= ?2" + ) + .bind::(start_ts) + .bind::(end_ts) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; + + // Step 2: If query embedding provided, rank by semantic similarity + if let Some(query_emb) = query_embedding { + if query_emb.len() != 768 { + return Err(anyhow::anyhow!( + "Invalid query embedding dimensions: {} (expected 768)", + query_emb.len() + )); + } + + let mut scored_events: Vec<(f32, CalendarEvent)> = events_in_range + .into_iter() + .map(|row| { + // Events with embeddings get semantic scoring + let similarity = if let Some(ref emb_bytes) = row.embedding { + if let Ok(emb) = Self::deserialize_vector(emb_bytes) { + Self::cosine_similarity(query_emb, &emb) + } else { + 0.5 // Neutral score for deserialization errors + } + } else { + 0.5 // Neutral score for events without embeddings + }; + (similarity, row.to_calendar_event()) + }) + .collect(); + + // Sort by similarity descending + scored_events.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); + + log::info!("Hybrid query: {} events in time range, ranked by similarity", scored_events.len()); + if !scored_events.is_empty() { + log::info!("Top similarity: {:.4}", scored_events[0].0); + } + + Ok(scored_events.into_iter().take(limit).map(|(_, event)| event).collect()) + } else { + // No semantic ranking, just return time-sorted (limit applied) + log::info!("Time-only query: {} events in range", events_in_range.len()); + Ok(events_in_range.into_iter().take(limit).map(|r| r.to_calendar_event()).collect()) + } + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn event_exists( + &mut self, + context: &opentelemetry::Context, + event_uid: &str, + start_time: i64, + ) -> Result { + trace_db_call(context, "query", "event_exists", |_span| { + let mut conn = self.connection.lock().expect("Unable to get CalendarEventDao"); + + #[derive(QueryableByName)] + struct CountResult { + #[diesel(sql_type = diesel::sql_types::Integer)] + count: i32, + } + + let result: CountResult = diesel::sql_query( + "SELECT COUNT(*) as count FROM calendar_events WHERE event_uid = ?1 AND start_time = ?2" + ) + .bind::(event_uid) + .bind::(start_time) + .get_result(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; + + Ok(result.count > 0) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn get_event_count(&mut self, context: &opentelemetry::Context) -> Result { + trace_db_call(context, "query", "get_event_count", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get CalendarEventDao"); + + #[derive(QueryableByName)] + struct CountResult { + #[diesel(sql_type = diesel::sql_types::BigInt)] + count: i64, + } + + let result: CountResult = + diesel::sql_query("SELECT COUNT(*) as count FROM calendar_events") + .get_result(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; + + Ok(result.count) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } +} diff --git a/src/database/location_dao.rs b/src/database/location_dao.rs new file mode 100644 index 0000000..86b8efe --- /dev/null +++ b/src/database/location_dao.rs @@ -0,0 +1,528 @@ +use diesel::prelude::*; +use diesel::sqlite::SqliteConnection; +use serde::Serialize; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex}; + +use crate::database::{DbError, DbErrorKind, connect}; +use crate::otel::trace_db_call; + +/// Represents a location history record +#[derive(Serialize, Clone, Debug)] +pub struct LocationRecord { + pub id: i32, + pub timestamp: i64, + pub latitude: f64, + pub longitude: f64, + pub accuracy: Option, + pub activity: Option, + pub activity_confidence: Option, + pub place_name: Option, + pub place_category: Option, + pub created_at: i64, + pub source_file: Option, +} + +/// Data for inserting a new location record +#[derive(Clone, Debug)] +pub struct InsertLocationRecord { + pub timestamp: i64, + pub latitude: f64, + pub longitude: f64, + pub accuracy: Option, + pub activity: Option, + pub activity_confidence: Option, + pub place_name: Option, + pub place_category: Option, + pub embedding: Option>, // 768-dim, optional (rarely used) + pub created_at: i64, + pub source_file: Option, +} + +pub trait LocationHistoryDao: Sync + Send { + /// Store single location record + fn store_location( + &mut self, + context: &opentelemetry::Context, + location: InsertLocationRecord, + ) -> Result; + + /// Batch insert locations (Google Takeout has millions of points) + fn store_locations_batch( + &mut self, + context: &opentelemetry::Context, + locations: Vec, + ) -> Result; + + /// Find nearest location to timestamp (PRIMARY query) + /// "Where was I at photo timestamp ±N minutes?" + fn find_nearest_location( + &mut self, + context: &opentelemetry::Context, + timestamp: i64, + max_time_diff_seconds: i64, + ) -> Result, DbError>; + + /// Find locations in time range + fn find_locations_in_range( + &mut self, + context: &opentelemetry::Context, + start_ts: i64, + end_ts: i64, + ) -> Result, DbError>; + + /// Find locations near GPS coordinates (for "photos near this place") + /// Uses approximate bounding box for performance + fn find_locations_near_point( + &mut self, + context: &opentelemetry::Context, + latitude: f64, + longitude: f64, + radius_km: f64, + ) -> Result, DbError>; + + /// Deduplicate: check if location exists + fn location_exists( + &mut self, + context: &opentelemetry::Context, + timestamp: i64, + latitude: f64, + longitude: f64, + ) -> Result; + + /// Get count of location records + fn get_location_count(&mut self, context: &opentelemetry::Context) -> Result; +} + +pub struct SqliteLocationHistoryDao { + connection: Arc>, +} + +impl Default for SqliteLocationHistoryDao { + fn default() -> Self { + Self::new() + } +} + +impl SqliteLocationHistoryDao { + pub fn new() -> Self { + SqliteLocationHistoryDao { + connection: Arc::new(Mutex::new(connect())), + } + } + + fn serialize_vector(vec: &[f32]) -> Vec { + use zerocopy::IntoBytes; + vec.as_bytes().to_vec() + } + + /// Haversine distance calculation (in kilometers) + /// Used for filtering locations by proximity to a point + fn haversine_distance(lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 { + const R: f64 = 6371.0; // Earth radius in km + + let d_lat = (lat2 - lat1).to_radians(); + let d_lon = (lon2 - lon1).to_radians(); + + let a = (d_lat / 2.0).sin().powi(2) + + lat1.to_radians().cos() * lat2.to_radians().cos() * (d_lon / 2.0).sin().powi(2); + + let c = 2.0 * a.sqrt().atan2((1.0 - a).sqrt()); + + R * c + } + + /// Calculate approximate bounding box for spatial queries + /// Returns (min_lat, max_lat, min_lon, max_lon) + fn bounding_box(lat: f64, lon: f64, radius_km: f64) -> (f64, f64, f64, f64) { + const KM_PER_DEGREE_LAT: f64 = 111.0; + let km_per_degree_lon = 111.0 * lat.to_radians().cos(); + + let delta_lat = radius_km / KM_PER_DEGREE_LAT; + let delta_lon = radius_km / km_per_degree_lon; + + ( + lat - delta_lat, // min_lat + lat + delta_lat, // max_lat + lon - delta_lon, // min_lon + lon + delta_lon, // max_lon + ) + } +} + +#[derive(QueryableByName)] +struct LocationRecordRow { + #[diesel(sql_type = diesel::sql_types::Integer)] + id: i32, + #[diesel(sql_type = diesel::sql_types::BigInt)] + timestamp: i64, + #[diesel(sql_type = diesel::sql_types::Float)] + latitude: f32, + #[diesel(sql_type = diesel::sql_types::Float)] + longitude: f32, + #[diesel(sql_type = diesel::sql_types::Nullable)] + accuracy: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + activity: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + activity_confidence: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + place_name: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + place_category: Option, + #[diesel(sql_type = diesel::sql_types::BigInt)] + created_at: i64, + #[diesel(sql_type = diesel::sql_types::Nullable)] + source_file: Option, +} + +impl LocationRecordRow { + fn to_location_record(&self) -> LocationRecord { + LocationRecord { + id: self.id, + timestamp: self.timestamp, + latitude: self.latitude as f64, + longitude: self.longitude as f64, + accuracy: self.accuracy, + activity: self.activity.clone(), + activity_confidence: self.activity_confidence, + place_name: self.place_name.clone(), + place_category: self.place_category.clone(), + created_at: self.created_at, + source_file: self.source_file.clone(), + } + } +} + +#[derive(QueryableByName)] +struct LastInsertRowId { + #[diesel(sql_type = diesel::sql_types::Integer)] + id: i32, +} + +impl LocationHistoryDao for SqliteLocationHistoryDao { + fn store_location( + &mut self, + context: &opentelemetry::Context, + location: InsertLocationRecord, + ) -> Result { + trace_db_call(context, "insert", "store_location", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get LocationHistoryDao"); + + // Validate embedding dimensions if provided (rare for location data) + if let Some(ref emb) = location.embedding { + if emb.len() != 768 { + return Err(anyhow::anyhow!( + "Invalid embedding dimensions: {} (expected 768)", + emb.len() + )); + } + } + + let embedding_bytes = location + .embedding + .as_ref() + .map(|e| Self::serialize_vector(e)); + + // INSERT OR IGNORE to handle re-imports (UNIQUE constraint on timestamp+lat+lon) + diesel::sql_query( + "INSERT OR IGNORE INTO location_history + (timestamp, latitude, longitude, accuracy, activity, activity_confidence, + place_name, place_category, embedding, created_at, source_file) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", + ) + .bind::(location.timestamp) + .bind::(location.latitude as f32) + .bind::(location.longitude as f32) + .bind::, _>(&location.accuracy) + .bind::, _>(&location.activity) + .bind::, _>( + &location.activity_confidence, + ) + .bind::, _>(&location.place_name) + .bind::, _>( + &location.place_category, + ) + .bind::, _>(&embedding_bytes) + .bind::(location.created_at) + .bind::, _>(&location.source_file) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Insert error: {:?}", e))?; + + let row_id: i32 = diesel::sql_query("SELECT last_insert_rowid() as id") + .get_result::(conn.deref_mut()) + .map(|r| r.id) + .map_err(|e| anyhow::anyhow!("Failed to get last insert ID: {:?}", e))?; + + Ok(LocationRecord { + id: row_id, + timestamp: location.timestamp, + latitude: location.latitude, + longitude: location.longitude, + accuracy: location.accuracy, + activity: location.activity, + activity_confidence: location.activity_confidence, + place_name: location.place_name, + place_category: location.place_category, + created_at: location.created_at, + source_file: location.source_file, + }) + }) + .map_err(|_| DbError::new(DbErrorKind::InsertError)) + } + + fn store_locations_batch( + &mut self, + context: &opentelemetry::Context, + locations: Vec, + ) -> Result { + trace_db_call(context, "insert", "store_locations_batch", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get LocationHistoryDao"); + let mut inserted = 0; + + conn.transaction::<_, anyhow::Error, _>(|conn| { + for location in locations { + // Validate embedding if provided (rare) + if let Some(ref emb) = location.embedding { + if emb.len() != 768 { + log::warn!( + "Skipping location with invalid embedding dimensions: {}", + emb.len() + ); + continue; + } + } + + let embedding_bytes = location + .embedding + .as_ref() + .map(|e| Self::serialize_vector(e)); + + let rows_affected = diesel::sql_query( + "INSERT OR IGNORE INTO location_history + (timestamp, latitude, longitude, accuracy, activity, activity_confidence, + place_name, place_category, embedding, created_at, source_file) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", + ) + .bind::(location.timestamp) + .bind::(location.latitude as f32) + .bind::(location.longitude as f32) + .bind::, _>( + &location.accuracy, + ) + .bind::, _>( + &location.activity, + ) + .bind::, _>( + &location.activity_confidence, + ) + .bind::, _>( + &location.place_name, + ) + .bind::, _>( + &location.place_category, + ) + .bind::, _>( + &embedding_bytes, + ) + .bind::(location.created_at) + .bind::, _>( + &location.source_file, + ) + .execute(conn) + .map_err(|e| anyhow::anyhow!("Batch insert error: {:?}", e))?; + + if rows_affected > 0 { + inserted += 1; + } + } + Ok(()) + }) + .map_err(|e| anyhow::anyhow!("Transaction error: {:?}", e))?; + + Ok(inserted) + }) + .map_err(|_| DbError::new(DbErrorKind::InsertError)) + } + + fn find_nearest_location( + &mut self, + context: &opentelemetry::Context, + timestamp: i64, + max_time_diff_seconds: i64, + ) -> Result, DbError> { + trace_db_call(context, "query", "find_nearest_location", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get LocationHistoryDao"); + + let start_ts = timestamp - max_time_diff_seconds; + let end_ts = timestamp + max_time_diff_seconds; + + // Find location closest to target timestamp within window + let results = diesel::sql_query( + "SELECT id, timestamp, latitude, longitude, accuracy, activity, activity_confidence, + place_name, place_category, created_at, source_file + FROM location_history + WHERE timestamp >= ?1 AND timestamp <= ?2 + ORDER BY ABS(timestamp - ?3) ASC + LIMIT 1" + ) + .bind::(start_ts) + .bind::(end_ts) + .bind::(timestamp) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; + + Ok(results.into_iter().next().map(|r| r.to_location_record())) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn find_locations_in_range( + &mut self, + context: &opentelemetry::Context, + start_ts: i64, + end_ts: i64, + ) -> Result, DbError> { + trace_db_call(context, "query", "find_locations_in_range", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get LocationHistoryDao"); + + diesel::sql_query( + "SELECT id, timestamp, latitude, longitude, accuracy, activity, activity_confidence, + place_name, place_category, created_at, source_file + FROM location_history + WHERE timestamp >= ?1 AND timestamp <= ?2 + ORDER BY timestamp ASC" + ) + .bind::(start_ts) + .bind::(end_ts) + .load::(conn.deref_mut()) + .map(|rows| rows.into_iter().map(|r| r.to_location_record()).collect()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn find_locations_near_point( + &mut self, + context: &opentelemetry::Context, + latitude: f64, + longitude: f64, + radius_km: f64, + ) -> Result, DbError> { + trace_db_call(context, "query", "find_locations_near_point", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get LocationHistoryDao"); + + // Use bounding box for initial filter (fast, indexed) + let (min_lat, max_lat, min_lon, max_lon) = + Self::bounding_box(latitude, longitude, radius_km); + + let results = diesel::sql_query( + "SELECT id, timestamp, latitude, longitude, accuracy, activity, activity_confidence, + place_name, place_category, created_at, source_file + FROM location_history + WHERE latitude >= ?1 AND latitude <= ?2 + AND longitude >= ?3 AND longitude <= ?4" + ) + .bind::(min_lat as f32) + .bind::(max_lat as f32) + .bind::(min_lon as f32) + .bind::(max_lon as f32) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; + + // Refine with Haversine distance (in-memory, post-filter) + let filtered: Vec = results + .into_iter() + .map(|r| r.to_location_record()) + .filter(|loc| { + let distance = + Self::haversine_distance(latitude, longitude, loc.latitude, loc.longitude); + distance <= radius_km + }) + .collect(); + + log::info!( + "Found {} locations within {} km of ({}, {})", + filtered.len(), + radius_km, + latitude, + longitude + ); + + Ok(filtered) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn location_exists( + &mut self, + context: &opentelemetry::Context, + timestamp: i64, + latitude: f64, + longitude: f64, + ) -> Result { + trace_db_call(context, "query", "location_exists", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get LocationHistoryDao"); + + #[derive(QueryableByName)] + struct CountResult { + #[diesel(sql_type = diesel::sql_types::Integer)] + count: i32, + } + + let result: CountResult = diesel::sql_query( + "SELECT COUNT(*) as count FROM location_history + WHERE timestamp = ?1 AND latitude = ?2 AND longitude = ?3", + ) + .bind::(timestamp) + .bind::(latitude as f32) + .bind::(longitude as f32) + .get_result(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; + + Ok(result.count > 0) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn get_location_count(&mut self, context: &opentelemetry::Context) -> Result { + trace_db_call(context, "query", "get_location_count", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get LocationHistoryDao"); + + #[derive(QueryableByName)] + struct CountResult { + #[diesel(sql_type = diesel::sql_types::BigInt)] + count: i64, + } + + let result: CountResult = + diesel::sql_query("SELECT COUNT(*) as count FROM location_history") + .get_result(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; + + Ok(result.count) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } +} diff --git a/src/database/mod.rs b/src/database/mod.rs index d4f1b4e..43d078c 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -9,15 +9,25 @@ use crate::database::models::{ }; use crate::otel::trace_db_call; +pub mod calendar_dao; pub mod daily_summary_dao; pub mod embeddings_dao; pub mod insights_dao; +pub mod location_dao; pub mod models; pub mod schema; +pub mod search_dao; +pub use calendar_dao::{ + CalendarEvent, CalendarEventDao, InsertCalendarEvent, SqliteCalendarEventDao, +}; pub use daily_summary_dao::{DailySummaryDao, InsertDailySummary, SqliteDailySummaryDao}; pub use embeddings_dao::{EmbeddingDao, InsertMessageEmbedding}; pub use insights_dao::{InsightDao, SqliteInsightDao}; +pub use location_dao::{ + InsertLocationRecord, LocationHistoryDao, LocationRecord, SqliteLocationHistoryDao, +}; +pub use search_dao::{InsertSearchRecord, SearchHistoryDao, SearchRecord, SqliteSearchHistoryDao}; pub trait UserDao { fn create_user(&mut self, user: &str, password: &str) -> Option; @@ -485,8 +495,8 @@ impl ExifDao for SqliteExifDao { // GPS bounding box if let Some((min_lat, max_lat, min_lon, max_lon)) = gps_bounds { query = query - .filter(gps_latitude.between(min_lat, max_lat)) - .filter(gps_longitude.between(min_lon, max_lon)) + .filter(gps_latitude.between(min_lat as f32, max_lat as f32)) + .filter(gps_longitude.between(min_lon as f32, max_lon as f32)) .filter(gps_latitude.is_not_null()) .filter(gps_longitude.is_not_null()); } diff --git a/src/database/models.rs b/src/database/models.rs index 9cee59b..79207a0 100644 --- a/src/database/models.rs +++ b/src/database/models.rs @@ -40,11 +40,11 @@ pub struct InsertImageExif { pub width: Option, pub height: Option, pub orientation: Option, - pub gps_latitude: Option, - pub gps_longitude: Option, - pub gps_altitude: Option, - pub focal_length: Option, - pub aperture: Option, + pub gps_latitude: Option, + pub gps_longitude: Option, + pub gps_altitude: Option, + pub focal_length: Option, + pub aperture: Option, pub shutter_speed: Option, pub iso: Option, pub date_taken: Option, @@ -62,11 +62,11 @@ pub struct ImageExif { pub width: Option, pub height: Option, pub orientation: Option, - pub gps_latitude: Option, - pub gps_longitude: Option, - pub gps_altitude: Option, - pub focal_length: Option, - pub aperture: Option, + pub gps_latitude: Option, + pub gps_longitude: Option, + pub gps_altitude: Option, + pub focal_length: Option, + pub aperture: Option, pub shutter_speed: Option, pub iso: Option, pub date_taken: Option, diff --git a/src/database/schema.rs b/src/database/schema.rs index aa9a93e..75fe641 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -1,4 +1,37 @@ -table! { +// @generated automatically by Diesel CLI. + +diesel::table! { + calendar_events (id) { + id -> Integer, + event_uid -> Nullable, + summary -> Text, + description -> Nullable, + location -> Nullable, + start_time -> BigInt, + end_time -> BigInt, + all_day -> Bool, + organizer -> Nullable, + attendees -> Nullable, + embedding -> Nullable, + created_at -> BigInt, + source_file -> Nullable, + } +} + +diesel::table! { + daily_conversation_summaries (id) { + id -> Integer, + date -> Text, + contact -> Text, + summary -> Text, + message_count -> Integer, + embedding -> Binary, + created_at -> BigInt, + model_version -> Text, + } +} + +diesel::table! { favorites (id) { id -> Integer, userid -> Integer, @@ -6,7 +39,7 @@ table! { } } -table! { +diesel::table! { image_exif (id) { id -> Integer, file_path -> Text, @@ -16,11 +49,11 @@ table! { width -> Nullable, height -> Nullable, orientation -> Nullable, - gps_latitude -> Nullable, - gps_longitude -> Nullable, - gps_altitude -> Nullable, - focal_length -> Nullable, - aperture -> Nullable, + gps_latitude -> Nullable, + gps_longitude -> Nullable, + gps_altitude -> Nullable, + focal_length -> Nullable, + aperture -> Nullable, shutter_speed -> Nullable, iso -> Nullable, date_taken -> Nullable, @@ -29,24 +62,49 @@ table! { } } -table! { - tagged_photo (id) { +diesel::table! { + knowledge_embeddings (id) { id -> Integer, - photo_name -> Text, - tag_id -> Integer, - created_time -> BigInt, + keyword -> Text, + description -> Text, + category -> Nullable, + embedding -> Binary, + created_at -> BigInt, + model_version -> Text, } } -table! { - tags (id) { +diesel::table! { + location_history (id) { id -> Integer, - name -> Text, - created_time -> BigInt, + timestamp -> BigInt, + latitude -> Float, + longitude -> Float, + accuracy -> Nullable, + activity -> Nullable, + activity_confidence -> Nullable, + place_name -> Nullable, + place_category -> Nullable, + embedding -> Nullable, + created_at -> BigInt, + source_file -> Nullable, } } -table! { +diesel::table! { + message_embeddings (id) { + id -> Integer, + contact -> Text, + body -> Text, + timestamp -> BigInt, + is_sent -> Bool, + embedding -> Binary, + created_at -> BigInt, + model_version -> Text, + } +} + +diesel::table! { photo_insights (id) { id -> Integer, file_path -> Text, @@ -57,7 +115,36 @@ table! { } } -table! { +diesel::table! { + search_history (id) { + id -> Integer, + timestamp -> BigInt, + query -> Text, + search_engine -> Nullable, + embedding -> Binary, + created_at -> BigInt, + source_file -> Nullable, + } +} + +diesel::table! { + tagged_photo (id) { + id -> Integer, + photo_name -> Text, + tag_id -> Integer, + created_time -> BigInt, + } +} + +diesel::table! { + tags (id) { + id -> Integer, + name -> Text, + created_time -> BigInt, + } +} + +diesel::table! { users (id) { id -> Integer, username -> Text, @@ -65,12 +152,18 @@ table! { } } -joinable!(tagged_photo -> tags (tag_id)); +diesel::joinable!(tagged_photo -> tags (tag_id)); -allow_tables_to_appear_in_same_query!( +diesel::allow_tables_to_appear_in_same_query!( + calendar_events, + daily_conversation_summaries, favorites, image_exif, + knowledge_embeddings, + location_history, + message_embeddings, photo_insights, + search_history, tagged_photo, tags, users, diff --git a/src/database/search_dao.rs b/src/database/search_dao.rs new file mode 100644 index 0000000..9ae9ef7 --- /dev/null +++ b/src/database/search_dao.rs @@ -0,0 +1,516 @@ +use diesel::prelude::*; +use diesel::sqlite::SqliteConnection; +use serde::Serialize; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex}; + +use crate::database::{DbError, DbErrorKind, connect}; +use crate::otel::trace_db_call; + +/// Represents a search history record +#[derive(Serialize, Clone, Debug)] +pub struct SearchRecord { + pub id: i32, + pub timestamp: i64, + pub query: String, + pub search_engine: Option, + pub created_at: i64, + pub source_file: Option, +} + +/// Data for inserting a new search record +#[derive(Clone, Debug)] +pub struct InsertSearchRecord { + pub timestamp: i64, + pub query: String, + pub search_engine: Option, + pub embedding: Vec, // 768-dim, REQUIRED + pub created_at: i64, + pub source_file: Option, +} + +pub trait SearchHistoryDao: Sync + Send { + /// Store search with embedding + fn store_search( + &mut self, + context: &opentelemetry::Context, + search: InsertSearchRecord, + ) -> Result; + + /// Batch insert searches + fn store_searches_batch( + &mut self, + context: &opentelemetry::Context, + searches: Vec, + ) -> Result; + + /// Find searches in time range (for temporal context) + fn find_searches_in_range( + &mut self, + context: &opentelemetry::Context, + start_ts: i64, + end_ts: i64, + ) -> Result, DbError>; + + /// Find semantically similar searches (PRIMARY - embeddings shine here) + fn find_similar_searches( + &mut self, + context: &opentelemetry::Context, + query_embedding: &[f32], + limit: usize, + ) -> Result, DbError>; + + /// Hybrid: Time window + semantic ranking + fn find_relevant_searches_hybrid( + &mut self, + context: &opentelemetry::Context, + center_timestamp: i64, + time_window_days: i64, + query_embedding: Option<&[f32]>, + limit: usize, + ) -> Result, DbError>; + + /// Deduplication check + fn search_exists( + &mut self, + context: &opentelemetry::Context, + timestamp: i64, + query: &str, + ) -> Result; + + /// Get count of search records + fn get_search_count(&mut self, context: &opentelemetry::Context) -> Result; +} + +pub struct SqliteSearchHistoryDao { + connection: Arc>, +} + +impl Default for SqliteSearchHistoryDao { + fn default() -> Self { + Self::new() + } +} + +impl SqliteSearchHistoryDao { + pub fn new() -> Self { + SqliteSearchHistoryDao { + connection: Arc::new(Mutex::new(connect())), + } + } + + fn serialize_vector(vec: &[f32]) -> Vec { + use zerocopy::IntoBytes; + vec.as_bytes().to_vec() + } + + fn deserialize_vector(bytes: &[u8]) -> Result, DbError> { + if bytes.len() % 4 != 0 { + return Err(DbError::new(DbErrorKind::QueryError)); + } + + let count = bytes.len() / 4; + let mut vec = Vec::with_capacity(count); + + for chunk in bytes.chunks_exact(4) { + let float = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]); + vec.push(float); + } + + Ok(vec) + } + + fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 { + if a.len() != b.len() { + return 0.0; + } + + let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum(); + let magnitude_a: f32 = a.iter().map(|x| x * x).sum::().sqrt(); + let magnitude_b: f32 = b.iter().map(|x| x * x).sum::().sqrt(); + + if magnitude_a == 0.0 || magnitude_b == 0.0 { + return 0.0; + } + + dot_product / (magnitude_a * magnitude_b) + } +} + +#[derive(QueryableByName)] +struct SearchRecordWithVectorRow { + #[diesel(sql_type = diesel::sql_types::Integer)] + id: i32, + #[diesel(sql_type = diesel::sql_types::BigInt)] + timestamp: i64, + #[diesel(sql_type = diesel::sql_types::Text)] + query: String, + #[diesel(sql_type = diesel::sql_types::Nullable)] + search_engine: Option, + #[diesel(sql_type = diesel::sql_types::Binary)] + embedding: Vec, + #[diesel(sql_type = diesel::sql_types::BigInt)] + created_at: i64, + #[diesel(sql_type = diesel::sql_types::Nullable)] + source_file: Option, +} + +impl SearchRecordWithVectorRow { + fn to_search_record(&self) -> SearchRecord { + SearchRecord { + id: self.id, + timestamp: self.timestamp, + query: self.query.clone(), + search_engine: self.search_engine.clone(), + created_at: self.created_at, + source_file: self.source_file.clone(), + } + } +} + +#[derive(QueryableByName)] +struct LastInsertRowId { + #[diesel(sql_type = diesel::sql_types::Integer)] + id: i32, +} + +impl SearchHistoryDao for SqliteSearchHistoryDao { + fn store_search( + &mut self, + context: &opentelemetry::Context, + search: InsertSearchRecord, + ) -> Result { + trace_db_call(context, "insert", "store_search", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get SearchHistoryDao"); + + // Validate embedding dimensions (REQUIRED for searches) + if search.embedding.len() != 768 { + return Err(anyhow::anyhow!( + "Invalid embedding dimensions: {} (expected 768)", + search.embedding.len() + )); + } + + let embedding_bytes = Self::serialize_vector(&search.embedding); + + // INSERT OR IGNORE to handle re-imports (UNIQUE constraint on timestamp+query) + diesel::sql_query( + "INSERT OR IGNORE INTO search_history + (timestamp, query, search_engine, embedding, created_at, source_file) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + ) + .bind::(search.timestamp) + .bind::(&search.query) + .bind::, _>(&search.search_engine) + .bind::(&embedding_bytes) + .bind::(search.created_at) + .bind::, _>(&search.source_file) + .execute(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Insert error: {:?}", e))?; + + let row_id: i32 = diesel::sql_query("SELECT last_insert_rowid() as id") + .get_result::(conn.deref_mut()) + .map(|r| r.id) + .map_err(|e| anyhow::anyhow!("Failed to get last insert ID: {:?}", e))?; + + Ok(SearchRecord { + id: row_id, + timestamp: search.timestamp, + query: search.query, + search_engine: search.search_engine, + created_at: search.created_at, + source_file: search.source_file, + }) + }) + .map_err(|_| DbError::new(DbErrorKind::InsertError)) + } + + fn store_searches_batch( + &mut self, + context: &opentelemetry::Context, + searches: Vec, + ) -> Result { + trace_db_call(context, "insert", "store_searches_batch", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get SearchHistoryDao"); + let mut inserted = 0; + + conn.transaction::<_, anyhow::Error, _>(|conn| { + for search in searches { + // Validate embedding (REQUIRED) + if search.embedding.len() != 768 { + log::warn!( + "Skipping search with invalid embedding dimensions: {}", + search.embedding.len() + ); + continue; + } + + let embedding_bytes = Self::serialize_vector(&search.embedding); + + let rows_affected = diesel::sql_query( + "INSERT OR IGNORE INTO search_history + (timestamp, query, search_engine, embedding, created_at, source_file) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + ) + .bind::(search.timestamp) + .bind::(&search.query) + .bind::, _>( + &search.search_engine, + ) + .bind::(&embedding_bytes) + .bind::(search.created_at) + .bind::, _>( + &search.source_file, + ) + .execute(conn) + .map_err(|e| anyhow::anyhow!("Batch insert error: {:?}", e))?; + + if rows_affected > 0 { + inserted += 1; + } + } + Ok(()) + }) + .map_err(|e| anyhow::anyhow!("Transaction error: {:?}", e))?; + + Ok(inserted) + }) + .map_err(|_| DbError::new(DbErrorKind::InsertError)) + } + + fn find_searches_in_range( + &mut self, + context: &opentelemetry::Context, + start_ts: i64, + end_ts: i64, + ) -> Result, DbError> { + trace_db_call(context, "query", "find_searches_in_range", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get SearchHistoryDao"); + + diesel::sql_query( + "SELECT id, timestamp, query, search_engine, embedding, created_at, source_file + FROM search_history + WHERE timestamp >= ?1 AND timestamp <= ?2 + ORDER BY timestamp DESC", + ) + .bind::(start_ts) + .bind::(end_ts) + .load::(conn.deref_mut()) + .map(|rows| rows.into_iter().map(|r| r.to_search_record()).collect()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e)) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn find_similar_searches( + &mut self, + context: &opentelemetry::Context, + query_embedding: &[f32], + limit: usize, + ) -> Result, DbError> { + trace_db_call(context, "query", "find_similar_searches", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get SearchHistoryDao"); + + if query_embedding.len() != 768 { + return Err(anyhow::anyhow!( + "Invalid query embedding dimensions: {} (expected 768)", + query_embedding.len() + )); + } + + // Load all searches with embeddings + let results = diesel::sql_query( + "SELECT id, timestamp, query, search_engine, embedding, created_at, source_file + FROM search_history", + ) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; + + // Compute similarities + let mut scored_searches: Vec<(f32, SearchRecord)> = results + .into_iter() + .filter_map(|row| { + if let Ok(emb) = Self::deserialize_vector(&row.embedding) { + let similarity = Self::cosine_similarity(query_embedding, &emb); + Some((similarity, row.to_search_record())) + } else { + None + } + }) + .collect(); + + // Sort by similarity descending + scored_searches + .sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); + + log::info!("Found {} similar searches", scored_searches.len()); + if !scored_searches.is_empty() { + log::info!( + "Top similarity: {:.4} for query: '{}'", + scored_searches[0].0, + scored_searches[0].1.query + ); + } + + Ok(scored_searches + .into_iter() + .take(limit) + .map(|(_, search)| search) + .collect()) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn find_relevant_searches_hybrid( + &mut self, + context: &opentelemetry::Context, + center_timestamp: i64, + time_window_days: i64, + query_embedding: Option<&[f32]>, + limit: usize, + ) -> Result, DbError> { + trace_db_call(context, "query", "find_relevant_searches_hybrid", |_span| { + let window_seconds = time_window_days * 86400; + let start_ts = center_timestamp - window_seconds; + let end_ts = center_timestamp + window_seconds; + + let mut conn = self + .connection + .lock() + .expect("Unable to get SearchHistoryDao"); + + // Step 1: Time-based filter (fast, indexed) + let searches_in_range = diesel::sql_query( + "SELECT id, timestamp, query, search_engine, embedding, created_at, source_file + FROM search_history + WHERE timestamp >= ?1 AND timestamp <= ?2", + ) + .bind::(start_ts) + .bind::(end_ts) + .load::(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; + + // Step 2: If query embedding provided, rank by semantic similarity + if let Some(query_emb) = query_embedding { + if query_emb.len() != 768 { + return Err(anyhow::anyhow!( + "Invalid query embedding dimensions: {} (expected 768)", + query_emb.len() + )); + } + + let mut scored_searches: Vec<(f32, SearchRecord)> = searches_in_range + .into_iter() + .filter_map(|row| { + if let Ok(emb) = Self::deserialize_vector(&row.embedding) { + let similarity = Self::cosine_similarity(query_emb, &emb); + Some((similarity, row.to_search_record())) + } else { + None + } + }) + .collect(); + + // Sort by similarity descending + scored_searches + .sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); + + log::info!( + "Hybrid query: {} searches in time range, ranked by similarity", + scored_searches.len() + ); + if !scored_searches.is_empty() { + log::info!( + "Top similarity: {:.4} for '{}'", + scored_searches[0].0, + scored_searches[0].1.query + ); + } + + Ok(scored_searches + .into_iter() + .take(limit) + .map(|(_, search)| search) + .collect()) + } else { + // No semantic ranking, just return time-sorted (most recent first) + log::info!( + "Time-only query: {} searches in range", + searches_in_range.len() + ); + Ok(searches_in_range + .into_iter() + .take(limit) + .map(|r| r.to_search_record()) + .collect()) + } + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn search_exists( + &mut self, + context: &opentelemetry::Context, + timestamp: i64, + query: &str, + ) -> Result { + trace_db_call(context, "query", "search_exists", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get SearchHistoryDao"); + + #[derive(QueryableByName)] + struct CountResult { + #[diesel(sql_type = diesel::sql_types::Integer)] + count: i32, + } + + let result: CountResult = diesel::sql_query( + "SELECT COUNT(*) as count FROM search_history WHERE timestamp = ?1 AND query = ?2", + ) + .bind::(timestamp) + .bind::(query) + .get_result(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; + + Ok(result.count > 0) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } + + fn get_search_count(&mut self, context: &opentelemetry::Context) -> Result { + trace_db_call(context, "query", "get_search_count", |_span| { + let mut conn = self + .connection + .lock() + .expect("Unable to get SearchHistoryDao"); + + #[derive(QueryableByName)] + struct CountResult { + #[diesel(sql_type = diesel::sql_types::BigInt)] + count: i64, + } + + let result: CountResult = + diesel::sql_query("SELECT COUNT(*) as count FROM search_history") + .get_result(conn.deref_mut()) + .map_err(|e| anyhow::anyhow!("Query error: {:?}", e))?; + + Ok(result.count) + }) + .map_err(|_| DbError::new(DbErrorKind::QueryError)) + } +} diff --git a/src/files.rs b/src/files.rs index 4d8c86c..785a219 100644 --- a/src/files.rs +++ b/src/files.rs @@ -217,7 +217,12 @@ pub async fn list_photos( if let (Some(photo_lat), Some(photo_lon)) = (exif.gps_latitude, exif.gps_longitude) { - let distance = haversine_distance(lat, lon, photo_lat, photo_lon); + let distance = haversine_distance( + lat as f64, + lon as f64, + photo_lat as f64, + photo_lon as f64, + ); distance <= radius_km } else { false diff --git a/src/lib.rs b/src/lib.rs index 90ba68a..bd4f7ab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ pub mod files; pub mod geo; pub mod memories; pub mod otel; +pub mod parsers; pub mod service; pub mod state; pub mod tags; diff --git a/src/main.rs b/src/main.rs index 3be66d2..2702cd3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -303,11 +303,11 @@ async fn upload_image( width: exif_data.width, height: exif_data.height, orientation: exif_data.orientation, - gps_latitude: exif_data.gps_latitude, - gps_longitude: exif_data.gps_longitude, - gps_altitude: exif_data.gps_altitude, - focal_length: exif_data.focal_length, - aperture: exif_data.aperture, + gps_latitude: exif_data.gps_latitude.map(|v| v as f32), + gps_longitude: exif_data.gps_longitude.map(|v| v as f32), + gps_altitude: exif_data.gps_altitude.map(|v| v as f32), + focal_length: exif_data.focal_length.map(|v| v as f32), + aperture: exif_data.aperture.map(|v| v as f32), shutter_speed: exif_data.shutter_speed, iso: exif_data.iso, date_taken: exif_data.date_taken, @@ -1061,11 +1061,11 @@ fn process_new_files( width: exif_data.width, height: exif_data.height, orientation: exif_data.orientation, - gps_latitude: exif_data.gps_latitude, - gps_longitude: exif_data.gps_longitude, - gps_altitude: exif_data.gps_altitude, - focal_length: exif_data.focal_length, - aperture: exif_data.aperture, + gps_latitude: exif_data.gps_latitude.map(|v| v as f32), + gps_longitude: exif_data.gps_longitude.map(|v| v as f32), + gps_altitude: exif_data.gps_altitude.map(|v| v as f32), + focal_length: exif_data.focal_length.map(|v| v as f32), + aperture: exif_data.aperture.map(|v| v as f32), shutter_speed: exif_data.shutter_speed, iso: exif_data.iso, date_taken: exif_data.date_taken, diff --git a/src/parsers/ical_parser.rs b/src/parsers/ical_parser.rs new file mode 100644 index 0000000..c2d0bff --- /dev/null +++ b/src/parsers/ical_parser.rs @@ -0,0 +1,183 @@ +use anyhow::{Context, Result}; +use chrono::NaiveDateTime; +use ical::parser::ical::component::IcalCalendar; +use ical::property::Property; +use std::fs::File; +use std::io::BufReader; + +#[derive(Debug, Clone)] +pub struct ParsedCalendarEvent { + pub event_uid: Option, + pub summary: String, + pub description: Option, + pub location: Option, + pub start_time: i64, + pub end_time: i64, + pub all_day: bool, + pub organizer: Option, + pub attendees: Vec, +} + +pub fn parse_ics_file(path: &str) -> Result> { + let file = File::open(path).context("Failed to open .ics file")?; + let reader = BufReader::new(file); + + let parser = ical::IcalParser::new(reader); + let mut events = Vec::new(); + + for calendar_result in parser { + let calendar: IcalCalendar = calendar_result.context("Failed to parse calendar")?; + + for event in calendar.events { + // Extract properties + let mut event_uid = None; + let mut summary = None; + let mut description = None; + let mut location = None; + let mut start_time = None; + let mut end_time = None; + let mut all_day = false; + let mut organizer = None; + let mut attendees = Vec::new(); + + for property in event.properties { + match property.name.as_str() { + "UID" => { + event_uid = property.value; + } + "SUMMARY" => { + summary = property.value; + } + "DESCRIPTION" => { + description = property.value; + } + "LOCATION" => { + location = property.value; + } + "DTSTART" => { + if let Some(ref value) = property.value { + start_time = parse_ical_datetime(value, &property)?; + // Check if it's an all-day event (no time component) + all_day = value.len() == 8; // YYYYMMDD format + } + } + "DTEND" => { + if let Some(ref value) = property.value { + end_time = parse_ical_datetime(value, &property)?; + } + } + "ORGANIZER" => { + organizer = extract_email_from_mailto(property.value.as_deref()); + } + "ATTENDEE" => { + if let Some(email) = extract_email_from_mailto(property.value.as_deref()) { + attendees.push(email); + } + } + _ => {} + } + } + + // Only include events with required fields + if let (Some(summary_text), Some(start), Some(end)) = (summary, start_time, end_time) { + events.push(ParsedCalendarEvent { + event_uid, + summary: summary_text, + description, + location, + start_time: start, + end_time: end, + all_day, + organizer, + attendees, + }); + } + } + } + + Ok(events) +} + +fn parse_ical_datetime(value: &str, property: &Property) -> Result> { + // Check for TZID parameter + let _tzid = property.params.as_ref().and_then(|params| { + params + .iter() + .find(|(key, _)| key == "TZID") + .and_then(|(_, values)| values.first()) + .cloned() + }); + + // iCal datetime formats: + // - 20240815T140000Z (UTC) + // - 20240815T140000 (local/TZID) + // - 20240815 (all-day) + + let cleaned = value.replace("Z", "").replace("T", ""); + + // All-day event (YYYYMMDD) + if cleaned.len() == 8 { + let dt = NaiveDateTime::parse_from_str(&format!("{}000000", cleaned), "%Y%m%d%H%M%S") + .context("Failed to parse all-day date")?; + return Ok(Some(dt.and_utc().timestamp())); + } + + // DateTime event (YYYYMMDDTHHMMSS) + if cleaned.len() >= 14 { + let dt = NaiveDateTime::parse_from_str(&cleaned[..14], "%Y%m%d%H%M%S") + .context("Failed to parse datetime")?; + + // If original had 'Z', it's UTC + let timestamp = if value.ends_with('Z') { + dt.and_utc().timestamp() + } else { + // Treat as UTC for simplicity (proper TZID handling is complex) + dt.and_utc().timestamp() + }; + + return Ok(Some(timestamp)); + } + + Ok(None) +} + +fn extract_email_from_mailto(value: Option<&str>) -> Option { + value.and_then(|v| { + // ORGANIZER and ATTENDEE often have format: mailto:user@example.com + if v.starts_with("mailto:") { + Some(v.trim_start_matches("mailto:").to_string()) + } else { + Some(v.to_string()) + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_ical_datetime() { + let prop = Property { + name: "DTSTART".to_string(), + params: None, + value: Some("20240815T140000Z".to_string()), + }; + + let timestamp = parse_ical_datetime("20240815T140000Z", &prop).unwrap(); + assert!(timestamp.is_some()); + } + + #[test] + fn test_extract_email() { + assert_eq!( + extract_email_from_mailto(Some("mailto:user@example.com")), + Some("user@example.com".to_string()) + ); + + assert_eq!( + extract_email_from_mailto(Some("user@example.com")), + Some("user@example.com".to_string()) + ); + } +} diff --git a/src/parsers/location_json_parser.rs b/src/parsers/location_json_parser.rs new file mode 100644 index 0000000..7ca6b87 --- /dev/null +++ b/src/parsers/location_json_parser.rs @@ -0,0 +1,133 @@ +use anyhow::{Context, Result}; +use chrono::DateTime; +use serde::Deserialize; +use std::fs::File; +use std::io::BufReader; + +#[derive(Debug, Clone)] +pub struct ParsedLocationRecord { + pub timestamp: i64, + pub latitude: f64, + pub longitude: f64, + pub accuracy: Option, + pub activity: Option, + pub activity_confidence: Option, +} + +// Google Takeout Location History JSON structures +#[derive(Debug, Deserialize)] +struct LocationHistory { + locations: Vec, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct LocationPoint { + timestamp_ms: Option, // Older format + timestamp: Option, // Newer format (ISO8601) + latitude_e7: Option, + longitude_e7: Option, + accuracy: Option, + activity: Option>, +} + +#[derive(Debug, Deserialize)] +struct ActivityRecord { + activity: Vec, + timestamp_ms: Option, +} + +#[derive(Debug, Deserialize)] +struct ActivityType { + #[serde(rename = "type")] + activity_type: String, + confidence: i32, +} + +pub fn parse_location_json(path: &str) -> Result> { + let file = File::open(path).context("Failed to open location JSON file")?; + let reader = BufReader::new(file); + + let history: LocationHistory = + serde_json::from_reader(reader).context("Failed to parse location history JSON")?; + + let mut records = Vec::new(); + + for point in history.locations { + // Parse timestamp (try both formats) + let timestamp = if let Some(ts_ms) = point.timestamp_ms { + // Milliseconds since epoch + ts_ms + .parse::() + .context("Failed to parse timestamp_ms")? + / 1000 + } else if let Some(ts_iso) = point.timestamp { + // ISO8601 format + DateTime::parse_from_rfc3339(&ts_iso) + .context("Failed to parse ISO8601 timestamp")? + .timestamp() + } else { + continue; // Skip points without timestamp + }; + + // Convert E7 format to decimal degrees + let latitude = point.latitude_e7.map(|e7| e7 as f64 / 10_000_000.0); + let longitude = point.longitude_e7.map(|e7| e7 as f64 / 10_000_000.0); + + // Extract highest-confidence activity + let (activity, activity_confidence) = point + .activity + .as_ref() + .and_then(|activities| activities.first()) + .and_then(|record| { + record + .activity + .iter() + .max_by_key(|a| a.confidence) + .map(|a| (a.activity_type.clone(), a.confidence)) + }) + .unzip(); + + if let (Some(lat), Some(lon)) = (latitude, longitude) { + records.push(ParsedLocationRecord { + timestamp, + latitude: lat, + longitude: lon, + accuracy: point.accuracy, + activity, + activity_confidence, + }); + } + } + + Ok(records) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_e7_conversion() { + let lat_e7 = 374228300_i64; + let lat = lat_e7 as f64 / 10_000_000.0; + assert!((lat - 37.42283).abs() < 0.00001); + } + + #[test] + fn test_parse_sample_json() { + let json = r#"{ + "locations": [ + { + "latitudeE7": 374228300, + "longitudeE7": -1221086100, + "accuracy": 20, + "timestampMs": "1692115200000" + } + ] + }"#; + + let history: LocationHistory = serde_json::from_str(json).unwrap(); + assert_eq!(history.locations.len(), 1); + } +} diff --git a/src/parsers/mod.rs b/src/parsers/mod.rs new file mode 100644 index 0000000..98dcea3 --- /dev/null +++ b/src/parsers/mod.rs @@ -0,0 +1,7 @@ +pub mod ical_parser; +pub mod location_json_parser; +pub mod search_html_parser; + +pub use ical_parser::{ParsedCalendarEvent, parse_ics_file}; +pub use location_json_parser::{ParsedLocationRecord, parse_location_json}; +pub use search_html_parser::{ParsedSearchRecord, parse_search_html}; diff --git a/src/parsers/search_html_parser.rs b/src/parsers/search_html_parser.rs new file mode 100644 index 0000000..4bcd166 --- /dev/null +++ b/src/parsers/search_html_parser.rs @@ -0,0 +1,210 @@ +use anyhow::{Context, Result}; +use chrono::{DateTime, NaiveDateTime, Utc}; +use scraper::{Html, Selector}; +use std::fs; + +#[derive(Debug, Clone)] +pub struct ParsedSearchRecord { + pub timestamp: i64, + pub query: String, + pub search_engine: Option, +} + +pub fn parse_search_html(path: &str) -> Result> { + let html_content = + fs::read_to_string(path).context("Failed to read search history HTML file")?; + + let document = Html::parse_document(&html_content); + let mut records = Vec::new(); + + // Try multiple selector strategies as Google Takeout format varies + + // Strategy 1: Look for specific cell structure + if let Ok(cell_selector) = Selector::parse("div.content-cell") { + for cell in document.select(&cell_selector) { + if let Some(record) = parse_content_cell(&cell) { + records.push(record); + } + } + } + + // Strategy 2: Look for outer-cell structure (older format) + if records.is_empty() { + if let Ok(outer_selector) = Selector::parse("div.outer-cell") { + for cell in document.select(&outer_selector) { + if let Some(record) = parse_outer_cell(&cell) { + records.push(record); + } + } + } + } + + // Strategy 3: Generic approach - look for links and timestamps + if records.is_empty() { + if let Ok(link_selector) = Selector::parse("a") { + for link in document.select(&link_selector) { + if let Some(href) = link.value().attr("href") { + // Check if it's a search URL + if href.contains("google.com/search?q=") || href.contains("search?q=") { + if let Some(query) = extract_query_from_url(href) { + // Try to find nearby timestamp + let timestamp = find_nearby_timestamp(&link); + + records.push(ParsedSearchRecord { + timestamp: timestamp.unwrap_or_else(|| Utc::now().timestamp()), + query, + search_engine: Some("Google".to_string()), + }); + } + } + } + } + } + } + + Ok(records) +} + +fn parse_content_cell(cell: &scraper::ElementRef) -> Option { + let link_selector = Selector::parse("a").ok()?; + + let link = cell.select(&link_selector).next()?; + let href = link.value().attr("href")?; + let query = extract_query_from_url(href)?; + + // Extract timestamp from cell text + let cell_text = cell.text().collect::>().join(" "); + let timestamp = parse_timestamp_from_text(&cell_text); + + Some(ParsedSearchRecord { + timestamp: timestamp.unwrap_or_else(|| Utc::now().timestamp()), + query, + search_engine: Some("Google".to_string()), + }) +} + +fn parse_outer_cell(cell: &scraper::ElementRef) -> Option { + let link_selector = Selector::parse("a").ok()?; + + let link = cell.select(&link_selector).next()?; + let href = link.value().attr("href")?; + let query = extract_query_from_url(href)?; + + let cell_text = cell.text().collect::>().join(" "); + let timestamp = parse_timestamp_from_text(&cell_text); + + Some(ParsedSearchRecord { + timestamp: timestamp.unwrap_or_else(|| Utc::now().timestamp()), + query, + search_engine: Some("Google".to_string()), + }) +} + +fn extract_query_from_url(url: &str) -> Option { + // Extract query parameter from URL + // Example: https://www.google.com/search?q=rust+programming + + if let Some(query_start) = url.find("?q=").or_else(|| url.find("&q=")) { + let query_part = &url[query_start + 3..]; + let query_end = query_part.find('&').unwrap_or(query_part.len()); + let encoded_query = &query_part[..query_end]; + + // URL decode + urlencoding::decode(encoded_query) + .ok() + .map(|s| s.to_string()) + } else { + None + } +} + +fn find_nearby_timestamp(element: &scraper::ElementRef) -> Option { + // Look for timestamp in parent or sibling elements + if let Some(parent) = element.parent() { + if parent.value().as_element().is_some() { + let parent_ref = scraper::ElementRef::wrap(parent)?; + let text = parent_ref.text().collect::>().join(" "); + return parse_timestamp_from_text(&text); + } + } + None +} + +fn parse_timestamp_from_text(text: &str) -> Option { + // Google Takeout timestamps often look like: + // "Aug 15, 2024, 2:34:56 PM PDT" + // "2024-08-15T14:34:56Z" + + // Try ISO8601 first + if let Some(iso_match) = text + .split_whitespace() + .find(|s| s.contains('T') && s.contains('-')) + { + if let Ok(dt) = DateTime::parse_from_rfc3339(iso_match) { + return Some(dt.timestamp()); + } + } + + // Try common date patterns + let patterns = [ + "%b %d, %Y, %I:%M:%S %p", // Aug 15, 2024, 2:34:56 PM + "%Y-%m-%d %H:%M:%S", // 2024-08-15 14:34:56 + "%m/%d/%Y %H:%M:%S", // 08/15/2024 14:34:56 + ]; + + for pattern in patterns { + // Extract potential date string + if let Some(date_part) = extract_date_substring(text) { + if let Ok(dt) = NaiveDateTime::parse_from_str(&date_part, pattern) { + return Some(dt.and_utc().timestamp()); + } + } + } + + None +} + +fn extract_date_substring(text: &str) -> Option { + // Try to extract date-like substring from text + // This is a heuristic approach for varied formats + + // Look for patterns like "Aug 15, 2024, 2:34:56 PM" + if let Some(pos) = text.find(|c: char| c.is_numeric()) { + let rest = &text[pos..]; + if let Some(end) = + rest.find(|c: char| !c.is_alphanumeric() && c != ':' && c != ',' && c != ' ') + { + Some(rest[..end].trim().to_string()) + } else { + Some(rest.trim().to_string()) + } + } else { + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_query_from_url() { + let url = "https://www.google.com/search?q=rust+programming&oq=rust"; + let query = extract_query_from_url(url); + assert_eq!(query, Some("rust programming".to_string())); + } + + #[test] + fn test_extract_query_with_encoding() { + let url = "https://www.google.com/search?q=hello%20world"; + let query = extract_query_from_url(url); + assert_eq!(query, Some("hello world".to_string())); + } + + #[test] + fn test_parse_iso_timestamp() { + let text = "Some text 2024-08-15T14:34:56Z more text"; + let timestamp = parse_timestamp_from_text(text); + assert!(timestamp.is_some()); + } +}