feature/shuffle-sort #30

Merged
cameron merged 21 commits from feature/shuffle-sort into master 2024-12-06 16:25:44 +00:00
11 changed files with 1574 additions and 613 deletions

1659
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,8 +1,8 @@
[package] [package]
name = "image-api" name = "image-api"
version = "0.1.0" version = "0.2.0"
authors = ["Cameron Cordes <cameronc.dev@gmail.com>"] authors = ["Cameron Cordes <cameronc.dev@gmail.com>"]
edition = "2018" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -13,27 +13,27 @@ lto = true
actix = "0.13.1" actix = "0.13.1"
actix-web = "4" actix-web = "4"
actix-rt = "2.6" actix-rt = "2.6"
tokio = { version = "1.42.0", features = ["default", "process", "sync"] }
actix-files = "0.6" actix-files = "0.6"
actix-multipart = "0.6.1" actix-multipart = "0.7.2"
futures = "0.3.5" futures = "0.3.5"
jsonwebtoken = "9.2.0" jsonwebtoken = "9.3.0"
serde = "1" serde = "1"
serde_json = "1" serde_json = "1"
diesel = { version = "2.0.2", features = ["sqlite"] } diesel = { version = "2.2.5", features = ["sqlite"] }
diesel_migrations = "2.0.0" diesel_migrations = "2.2.0"
hmac = "0.12.1"
sha2 = "0.10.8"
chrono = "0.4" chrono = "0.4"
dotenv = "0.15" dotenv = "0.15"
bcrypt = "0.15.0" bcrypt = "0.16.0"
image = { version = "0.24.7", default-features = false, features = ["jpeg", "png", "jpeg_rayon"] } image = { version = "0.25.5", default-features = false, features = ["jpeg", "png", "rayon"] }
walkdir = "2.4.0" walkdir = "2.4.0"
rayon = "1.5" rayon = "1.5"
notify = "6.1.1" notify = "6.1.1"
path-absolutize = "3.0" path-absolutize = "3.1"
log="0.4" log = "0.4"
env_logger= "0.10.1" env_logger = "0.11.5"
actix-web-prom = "0.7.0" actix-web-prom = "0.9.0"
prometheus = "0.13" prometheus = "0.13"
lazy_static = "1.1" lazy_static = "1.5"
anyhow = "1.0" anyhow = "1.0"
rand = "0.8.5"

View File

@@ -10,6 +10,7 @@ You must have `ffmpeg` installed for streaming video and generating video thumbn
- `DATABASE_URL` is a path or url to a database (currently only SQLite is tested) - `DATABASE_URL` is a path or url to a database (currently only SQLite is tested)
- `BASE_PATH` is the root from which you want to serve images and videos - `BASE_PATH` is the root from which you want to serve images and videos
- `THUMBNAILS` is a path where generated thumbnails should be stored - `THUMBNAILS` is a path where generated thumbnails should be stored
- `VIDEO_PATH` is a path where HLS playlists and video parts should be stored
- `BIND_URL` is the url and port to bind to (typically your own IP address) - `BIND_URL` is the url and port to bind to (typically your own IP address)
- `SECRET_KEY` is the *hopefully* random string to sign Tokens with - `SECRET_KEY` is the *hopefully* random string to sign Tokens with
- `RUST_LOG` is one of `off, error, warn, info, debug, trace`, from least to most noisy [error is default] - `RUST_LOG` is one of `off, error, warn, info, debug, trace`, from least to most noisy [error is default]

View File

@@ -100,13 +100,23 @@ pub struct PhotosResponse {
pub dirs: Vec<String>, pub dirs: Vec<String>,
} }
#[derive(Copy, Clone, Deserialize, PartialEq, Debug)]
#[serde(rename_all = "lowercase")]
pub enum SortType {
Shuffle,
NameAsc,
NameDesc,
}
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct FilesRequest { pub struct FilesRequest {
pub path: String, pub path: String,
// comma separated numbers // comma separated numbers
pub tag_ids: Option<String>, pub tag_ids: Option<String>,
pub exclude_tag_ids: Option<String>,
pub tag_filter_mode: Option<FilterMode>, pub tag_filter_mode: Option<FilterMode>,
pub recursive: Option<bool>, pub recursive: Option<bool>,
pub sort: Option<SortType>,
} }
#[derive(Copy, Clone, Deserialize, PartialEq, Debug)] #[derive(Copy, Clone, Deserialize, PartialEq, Debug)]

View File

@@ -41,7 +41,7 @@ pub mod test {
.run_pending_migrations(DB_MIGRATIONS) .run_pending_migrations(DB_MIGRATIONS)
.expect("Failure running DB migrations"); .expect("Failure running DB migrations");
return connection; connection
} }
} }

View File

@@ -16,13 +16,16 @@ use actix_web::{
}; };
use log::{debug, error, info, trace}; use log::{debug, error, info, trace};
use crate::data::{Claims, FilesRequest, FilterMode, PhotosResponse}; use crate::data::{Claims, FilesRequest, FilterMode, PhotosResponse, SortType};
use crate::{create_thumbnails, AppState}; use crate::{create_thumbnails, AppState};
use crate::data::SortType::{NameAsc};
use crate::error::IntoHttpError; use crate::error::IntoHttpError;
use crate::tags::TagDao; use crate::tags::TagDao;
use crate::video::StreamActor; use crate::video::StreamActor;
use path_absolutize::*; use path_absolutize::*;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use serde::Deserialize; use serde::Deserialize;
pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>( pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
@@ -37,7 +40,7 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
let search_recursively = req.recursive.unwrap_or(false); let search_recursively = req.recursive.unwrap_or(false);
if let Some(tag_ids) = &req.tag_ids { if let Some(tag_ids) = &req.tag_ids {
if search_recursively { if search_recursively {
let filter_mode = req.tag_filter_mode.unwrap_or(FilterMode::Any); let filter_mode = &req.tag_filter_mode.unwrap_or(FilterMode::Any);
debug!( debug!(
"Searching for tags: {}. With path: '{}' and filter mode: {:?}", "Searching for tags: {}. With path: '{}' and filter mode: {:?}",
tag_ids, search_path, filter_mode tag_ids, search_path, filter_mode
@@ -49,32 +52,46 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
.filter_map(|t| t.parse().ok()) .filter_map(|t| t.parse().ok())
.collect::<Vec<i32>>(); .collect::<Vec<i32>>();
return dao let exclude_tag_ids = req
.get_files_with_any_tag_ids(tag_ids.clone()) .exclude_tag_ids
.context(format!("Failed to get files with tag_ids: {:?}", tag_ids)) .clone()
.map(|tagged_files| match filter_mode { .unwrap_or_default()
FilterMode::Any => tagged_files .split(',')
.iter() .filter_map(|t| t.parse().ok())
.filter(|file| file.starts_with(search_path)) .collect::<Vec<i32>>();
.cloned()
.collect(), return match filter_mode {
FilterMode::All => tagged_files FilterMode::Any => dao.get_files_with_any_tag_ids(tag_ids.clone(), exclude_tag_ids),
.iter() FilterMode::All => dao.get_files_with_all_tag_ids(tag_ids.clone(), exclude_tag_ids),
.filter(|&file_path| { }
if !file_path.starts_with(search_path) { .context(format!(
return false; "Failed to get files with tag_ids: {:?} with filter_mode: {:?}",
tag_ids, filter_mode
))
.map(|tagged_files| {
tagged_files
.into_iter()
.filter(|f| {
// When searching at the root, everything matches recursively
if search_path.trim() == "" {
return true;
} }
let file_tags = dao.get_tags_for_path(file_path).unwrap_or_default(); f.starts_with(&format!(
tag_ids "{}/",
.iter() search_path.strip_suffix('/').unwrap_or_else(|| search_path)
.all(|id| file_tags.iter().any(|tag| &tag.id == id)) ))
}) })
.cloned() .collect::<Vec<String>>()
.collect::<Vec<String>>(),
}) })
.map(|tagged_files| { .map(|files| sort(files, req.sort.unwrap_or(NameAsc)))
trace!("Found tagged files: {:?}", tagged_files); .inspect(|files| debug!("Found {:?} files", files.len()))
.map(|tagged_files: Vec<String>| {
trace!(
"Found {:?} tagged files: {:?}",
tagged_files.len(),
tagged_files
);
HttpResponse::Ok().json(PhotosResponse { HttpResponse::Ok().json(PhotosResponse {
photos: tagged_files, photos: tagged_files,
@@ -89,7 +106,7 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
if let Ok(files) = file_system.get_files_for_path(search_path) { if let Ok(files) = file_system.get_files_for_path(search_path) {
debug!("Valid search path: {:?}", search_path); debug!("Valid search path: {:?}", search_path);
let photos = files let mut photos = files
.iter() .iter()
.filter(|&f| { .filter(|&f| {
f.metadata().map_or_else( f.metadata().map_or_else(
@@ -112,10 +129,20 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
.filter_map(|t| t.parse().ok()) .filter_map(|t| t.parse().ok())
.collect::<Vec<i32>>(); .collect::<Vec<i32>>();
let excluded_tag_ids = &req
.exclude_tag_ids
.clone()
.unwrap_or_default()
.split(',')
.filter_map(|t| t.parse().ok())
.collect::<Vec<i32>>();
let filter_mode = &req.tag_filter_mode.unwrap_or(FilterMode::Any); let filter_mode = &req.tag_filter_mode.unwrap_or(FilterMode::Any);
let file_tags = tag_dao.get_tags_for_path(file_path).unwrap_or_default(); let file_tags = tag_dao.get_tags_for_path(file_path).unwrap_or_default();
let excluded = file_tags.iter().any(|t| excluded_tag_ids.contains(&t.id));
return match filter_mode { return !excluded
&& match filter_mode {
FilterMode::Any => file_tags.iter().any(|t| tag_ids.contains(&t.id)), FilterMode::Any => file_tags.iter().any(|t| tag_ids.contains(&t.id)),
FilterMode::All => tag_ids FilterMode::All => tag_ids
.iter() .iter()
@@ -127,6 +154,11 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
}) })
.collect::<Vec<String>>(); .collect::<Vec<String>>();
if let Some(sort_type) = req.sort {
debug!("Sorting files: {:?}", sort_type);
photos = sort(photos, sort_type)
}
let dirs = files let dirs = files
.iter() .iter()
.filter(|&f| f.metadata().map_or(false, |md| md.is_dir())) .filter(|&f| f.metadata().map_or(false, |md| md.is_dir()))
@@ -144,6 +176,20 @@ pub async fn list_photos<TagD: TagDao, FS: FileSystemAccess>(
} }
} }
fn sort(mut files: Vec<String>, sort_type: SortType) -> Vec<String> {
match sort_type {
SortType::Shuffle => files.shuffle(&mut thread_rng()),
SortType::NameAsc => {
files.sort();
}
SortType::NameDesc => {
files.sort_by(|l, r| r.cmp(l));
}
}
files
}
pub fn list_files(dir: &Path) -> io::Result<Vec<PathBuf>> { pub fn list_files(dir: &Path) -> io::Result<Vec<PathBuf>> {
let files = read_dir(dir)? let files = read_dir(dir)?
.filter_map(|res| res.ok()) .filter_map(|res| res.ok())
@@ -314,7 +360,7 @@ impl Handler<RefreshThumbnailsMessage> for StreamActor {
type Result = (); type Result = ();
fn handle(&mut self, _msg: RefreshThumbnailsMessage, _ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, _msg: RefreshThumbnailsMessage, _ctx: &mut Self::Context) -> Self::Result {
debug!("Refreshing thumbnails after upload"); info!("Refreshing thumbnails after upload");
create_thumbnails() create_thumbnails()
} }
} }
@@ -349,17 +395,15 @@ mod tests {
fn get_files_for_path(&self, path: &str) -> anyhow::Result<Vec<PathBuf>> { fn get_files_for_path(&self, path: &str) -> anyhow::Result<Vec<PathBuf>> {
if self.err { if self.err {
Err(anyhow!("Error for test")) Err(anyhow!("Error for test"))
} else { } else if let Some(files) = self.files.get(path) {
if let Some(files) = self.files.get(path) {
Ok(files Ok(files
.iter() .iter()
.map(|p| PathBuf::from(p)) .map(PathBuf::from)
.collect::<Vec<PathBuf>>()) .collect::<Vec<PathBuf>>())
} else { } else {
Ok(Vec::new()) Ok(Vec::new())
} }
} }
}
fn move_file<P: AsRef<Path>>(&self, from: P, destination: P) -> anyhow::Result<()> { fn move_file<P: AsRef<Path>>(&self, from: P, destination: P) -> anyhow::Result<()> {
todo!() todo!()
@@ -415,6 +459,7 @@ mod tests {
Arc::new(StreamActor {}.start()), Arc::new(StreamActor {}.start()),
String::from("/tmp"), String::from("/tmp"),
String::from("/tmp/thumbs"), String::from("/tmp/thumbs"),
String::from("/tmp/video"),
)), )),
Data::new(RealFileSystem::new(String::from("/tmp"))), Data::new(RealFileSystem::new(String::from("/tmp"))),
Data::new(Mutex::new(SqliteTagDao::default())), Data::new(Mutex::new(SqliteTagDao::default())),
@@ -456,6 +501,7 @@ mod tests {
Arc::new(StreamActor {}.start()), Arc::new(StreamActor {}.start()),
String::from("/tmp"), String::from("/tmp"),
String::from("/tmp/thumbs"), String::from("/tmp/thumbs"),
String::from("/tmp/video"),
)), )),
Data::new(RealFileSystem::new(String::from("./"))), Data::new(RealFileSystem::new(String::from("./"))),
Data::new(Mutex::new(SqliteTagDao::default())), Data::new(Mutex::new(SqliteTagDao::default())),
@@ -502,6 +548,7 @@ mod tests {
Arc::new(StreamActor {}.start()), Arc::new(StreamActor {}.start()),
String::from(""), String::from(""),
String::from("/tmp/thumbs"), String::from("/tmp/thumbs"),
String::from("/tmp/video"),
)), )),
Data::new(FakeFileSystem::new(files)), Data::new(FakeFileSystem::new(files)),
Data::new(Mutex::new(tag_dao)), Data::new(Mutex::new(tag_dao)),
@@ -546,7 +593,7 @@ mod tests {
], ],
); );
let request: Query<FilesRequest> = Query::from_query(&*format!( let request: Query<FilesRequest> = Query::from_query(&format!(
"path=&tag_ids={},{}&tag_filter_mode=All", "path=&tag_ids={},{}&tag_filter_mode=All",
tag1.id, tag3.id tag1.id, tag3.id
)) ))
@@ -559,6 +606,7 @@ mod tests {
Arc::new(StreamActor {}.start()), Arc::new(StreamActor {}.start()),
String::from(""), String::from(""),
String::from("/tmp/thumbs"), String::from("/tmp/thumbs"),
String::from("/tmp/video"),
)), )),
Data::new(FakeFileSystem::new(files)), Data::new(FakeFileSystem::new(files)),
Data::new(Mutex::new(tag_dao)), Data::new(Mutex::new(tag_dao)),

View File

@@ -26,6 +26,7 @@ use actix_web::{
web::{self, BufMut, BytesMut}, web::{self, BufMut, BytesMut},
App, HttpRequest, HttpResponse, HttpServer, Responder, App, HttpRequest, HttpResponse, HttpServer, Responder,
}; };
use anyhow::Context;
use chrono::Utc; use chrono::Utc;
use diesel::sqlite::Sqlite; use diesel::sqlite::Sqlite;
use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
@@ -139,7 +140,7 @@ async fn upload_image(
let mut file_path: Option<String> = None; let mut file_path: Option<String> = None;
while let Some(Ok(mut part)) = payload.next().await { while let Some(Ok(mut part)) = payload.next().await {
let content_type = part.content_disposition(); if let Some(content_type) = part.content_disposition() {
debug!("{:?}", content_type); debug!("{:?}", content_type);
if let Some(filename) = content_type.get_filename() { if let Some(filename) = content_type.get_filename() {
debug!("Name: {:?}", filename); debug!("Name: {:?}", filename);
@@ -156,6 +157,7 @@ async fn upload_image(
} }
} }
} }
}
let path = file_path.unwrap_or_else(|| app_state.base_path.clone()); let path = file_path.unwrap_or_else(|| app_state.base_path.clone());
if !file_content.is_empty() { if !file_content.is_empty() {
@@ -210,9 +212,9 @@ async fn generate_video(
) -> impl Responder { ) -> impl Responder {
let filename = PathBuf::from(&body.path); let filename = PathBuf::from(&body.path);
if let Some(name) = filename.file_stem() { if let Some(name) = filename.file_name() {
let filename = name.to_str().expect("Filename should convert to string"); let filename = name.to_str().expect("Filename should convert to string");
let playlist = format!("tmp/{}.m3u8", filename); let playlist = format!("{}/{}.m3u8", app_state.video_path, filename);
if let Some(path) = is_valid_full_path(&app_state.base_path, &body.path, false) { if let Some(path) = is_valid_full_path(&app_state.base_path, &body.path, false) {
if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await { if let Ok(child) = create_playlist(path.to_str().unwrap(), &playlist).await {
app_state app_state
@@ -241,7 +243,7 @@ async fn stream_video(
debug!("Playlist: {}", playlist); debug!("Playlist: {}", playlist);
// Extract video playlist dir to dotenv // Extract video playlist dir to dotenv
if !playlist.starts_with("tmp") if !playlist.starts_with(&app_state.video_path)
&& is_valid_full_path(&app_state.base_path, playlist, false).is_some() && is_valid_full_path(&app_state.base_path, playlist, false).is_some()
{ {
HttpResponse::BadRequest().finish() HttpResponse::BadRequest().finish()
@@ -257,14 +259,19 @@ async fn get_video_part(
request: HttpRequest, request: HttpRequest,
_: Claims, _: Claims,
path: web::Path<ThumbnailRequest>, path: web::Path<ThumbnailRequest>,
app_state: Data<AppState>,
) -> impl Responder { ) -> impl Responder {
let part = &path.path; let part = &path.path;
debug!("Video part: {}", part); debug!("Video part: {}", part);
if let Ok(file) = NamedFile::open(String::from("tmp/") + part) { let mut file_part = PathBuf::new();
file_part.push(app_state.video_path.clone());
file_part.push(part);
// TODO: Do we need to guard against directory attacks here?
if let Ok(file) = NamedFile::open(&file_part) {
file.into_response(&request) file.into_response(&request)
} else { } else {
error!("Video part not found: tmp/{}", part); error!("Video part not found: {:?}", file_part);
HttpResponse::NotFound().finish() HttpResponse::NotFound().finish()
} }
} }
@@ -448,6 +455,7 @@ fn is_image(entry: &DirEntry) -> bool {
.path() .path()
.extension() .extension()
.and_then(|ext| ext.to_str()) .and_then(|ext| ext.to_str())
.map(|ext| ext.to_lowercase())
.map(|ext| ext == "jpg" || ext == "jpeg" || ext == "png" || ext == "nef") .map(|ext| ext == "jpg" || ext == "jpeg" || ext == "png" || ext == "nef")
.unwrap_or(false) .unwrap_or(false)
} }
@@ -457,12 +465,15 @@ fn is_video(entry: &DirEntry) -> bool {
.path() .path()
.extension() .extension()
.and_then(|ext| ext.to_str()) .and_then(|ext| ext.to_str())
.map(|ext| ext.to_lowercase())
.map(|ext| ext == "mp4" || ext == "mov") .map(|ext| ext == "mp4" || ext == "mov")
.unwrap_or(false) .unwrap_or(false)
} }
fn main() -> std::io::Result<()> { fn main() -> std::io::Result<()> {
dotenv::dotenv().ok(); if let Err(err) = dotenv::dotenv() {
println!("Error parsing .env {:?}", err);
}
env_logger::init(); env_logger::init();
run_migrations(&mut connect()).expect("Failed to run migrations"); run_migrations(&mut connect()).expect("Failed to run migrations");
@@ -489,6 +500,13 @@ fn main() -> std::io::Result<()> {
.register(Box::new(VIDEO_GAUGE.clone())) .register(Box::new(VIDEO_GAUGE.clone()))
.unwrap(); .unwrap();
let app_state = app_data.clone();
app_state
.playlist_manager
.do_send(ScanDirectoryMessage {
directory: app_state.base_path.clone(),
});
HttpServer::new(move || { HttpServer::new(move || {
let user_dao = SqliteUserDao::new(); let user_dao = SqliteUserDao::new();
let favorites_dao = SqliteFavoriteDao::new(); let favorites_dao = SqliteFavoriteDao::new();
@@ -544,7 +562,10 @@ fn watch_files() {
let base_str = dotenv::var("BASE_PATH").unwrap(); let base_str = dotenv::var("BASE_PATH").unwrap();
let base_path = Path::new(&base_str); let base_path = Path::new(&base_str);
watcher.watch(base_path, RecursiveMode::Recursive).unwrap(); watcher
.watch(base_path, RecursiveMode::Recursive)
.context(format!("Unable to watch BASE_PATH: '{}'", base_str))
.unwrap();
loop { loop {
let ev = wrx.recv(); let ev = wrx.recv();

View File

@@ -1,11 +1,14 @@
use crate::video::{PlaylistGenerator, VideoPlaylistManager};
use crate::StreamActor; use crate::StreamActor;
use actix::{Actor, Addr}; use actix::{Actor, Addr};
use std::{env, sync::Arc}; use std::{env, sync::Arc};
pub struct AppState { pub struct AppState {
pub stream_manager: Arc<Addr<StreamActor>>, pub stream_manager: Arc<Addr<StreamActor>>,
pub playlist_manager: Arc<Addr<VideoPlaylistManager>>,
pub base_path: String, pub base_path: String,
pub thumbnail_path: String, pub thumbnail_path: String,
pub video_path: String,
} }
impl AppState { impl AppState {
@@ -13,11 +16,18 @@ impl AppState {
stream_manager: Arc<Addr<StreamActor>>, stream_manager: Arc<Addr<StreamActor>>,
base_path: String, base_path: String,
thumbnail_path: String, thumbnail_path: String,
video_path: String,
) -> Self { ) -> Self {
let playlist_generator = PlaylistGenerator::new();
let video_playlist_manager =
VideoPlaylistManager::new(video_path.clone(), playlist_generator.start());
Self { Self {
stream_manager, stream_manager,
playlist_manager: Arc::new(video_playlist_manager.start()),
base_path, base_path,
thumbnail_path, thumbnail_path,
video_path,
} }
} }
} }
@@ -28,6 +38,7 @@ impl Default for AppState {
Arc::new(StreamActor {}.start()), Arc::new(StreamActor {}.start()),
env::var("BASE_PATH").expect("BASE_PATH was not set in the env"), env::var("BASE_PATH").expect("BASE_PATH was not set in the env"),
env::var("THUMBNAILS").expect("THUMBNAILS was not set in the env"), env::var("THUMBNAILS").expect("THUMBNAILS was not set in the env"),
env::var("VIDEO_PATH").expect("VIDEO_PATH was not set in the env"),
) )
} }
} }

View File

@@ -198,8 +198,16 @@ pub trait TagDao {
fn create_tag(&mut self, name: &str) -> anyhow::Result<Tag>; fn create_tag(&mut self, name: &str) -> anyhow::Result<Tag>;
fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result<Option<()>>; fn remove_tag(&mut self, tag_name: &str, path: &str) -> anyhow::Result<Option<()>>;
fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto>; fn tag_file(&mut self, path: &str, tag_id: i32) -> anyhow::Result<TaggedPhoto>;
fn get_files_with_all_tag_ids(&mut self, tag_ids: Vec<i32>) -> anyhow::Result<Vec<String>>; fn get_files_with_all_tag_ids(
fn get_files_with_any_tag_ids(&mut self, tag_ids: Vec<i32>) -> anyhow::Result<Vec<String>>; &mut self,
tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>,
) -> anyhow::Result<Vec<String>>;
fn get_files_with_any_tag_ids(
&mut self,
tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>,
) -> anyhow::Result<Vec<String>>;
} }
pub struct SqliteTagDao { pub struct SqliteTagDao {
@@ -268,7 +276,7 @@ impl TagDao for SqliteTagDao {
.with_context(|| format!("Unable to insert tag {:?} in Sqlite", name)) .with_context(|| format!("Unable to insert tag {:?} in Sqlite", name))
.and_then(|_| { .and_then(|_| {
info!("Inserted tag: {:?}", name); info!("Inserted tag: {:?}", name);
sql_function! { define_sql_function! {
fn last_insert_rowid() -> diesel::sql_types::Integer; fn last_insert_rowid() -> diesel::sql_types::Integer;
} }
diesel::select(last_insert_rowid()) diesel::select(last_insert_rowid())
@@ -321,7 +329,7 @@ impl TagDao for SqliteTagDao {
.with_context(|| format!("Unable to tag file {:?} in sqlite", path)) .with_context(|| format!("Unable to tag file {:?} in sqlite", path))
.and_then(|_| { .and_then(|_| {
info!("Inserted tagged photo: {:#} -> {:?}", tag_id, path); info!("Inserted tagged photo: {:#} -> {:?}", tag_id, path);
sql_function! { define_sql_function! {
fn last_insert_rowid() -> diesel::sql_types::Integer; fn last_insert_rowid() -> diesel::sql_types::Integer;
} }
diesel::select(last_insert_rowid()) diesel::select(last_insert_rowid())
@@ -341,24 +349,44 @@ impl TagDao for SqliteTagDao {
}) })
} }
fn get_files_with_all_tag_ids(&mut self, tag_ids: Vec<i32>) -> anyhow::Result<Vec<String>> { fn get_files_with_all_tag_ids(
&mut self,
tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>,
) -> anyhow::Result<Vec<String>> {
use diesel::dsl::*; use diesel::dsl::*;
let exclude_subquery = tagged_photo::table
.filter(tagged_photo::tag_id.eq_any(exclude_tag_ids.clone()))
.select(tagged_photo::photo_name)
.into_boxed();
tagged_photo::table tagged_photo::table
.filter(tagged_photo::tag_id.eq_any(tag_ids.clone())) .filter(tagged_photo::tag_id.eq_any(tag_ids.clone()))
.filter(tagged_photo::photo_name.ne_all(exclude_subquery))
.group_by(tagged_photo::photo_name) .group_by(tagged_photo::photo_name)
.select((tagged_photo::photo_name, count(tagged_photo::tag_id))) .select((tagged_photo::photo_name, count(tagged_photo::tag_id)))
.having(count(tagged_photo::tag_id).eq(tag_ids.len() as i64)) .having(count_distinct(tagged_photo::tag_id).ge(tag_ids.len() as i64))
.select(tagged_photo::photo_name) .select(tagged_photo::photo_name)
.get_results::<String>(&mut self.connection) .get_results::<String>(&mut self.connection)
.with_context(|| format!("Unable to get Tagged photos with ids: {:?}", tag_ids)) .with_context(|| format!("Unable to get Tagged photos with ids: {:?}", tag_ids))
} }
fn get_files_with_any_tag_ids(&mut self, tag_ids: Vec<i32>) -> anyhow::Result<Vec<String>> { fn get_files_with_any_tag_ids(
&mut self,
tag_ids: Vec<i32>,
exclude_tag_ids: Vec<i32>,
) -> anyhow::Result<Vec<String>> {
use diesel::dsl::*; use diesel::dsl::*;
let exclude_subquery = tagged_photo::table
.filter(tagged_photo::tag_id.eq_any(exclude_tag_ids.clone()))
.select(tagged_photo::photo_name)
.into_boxed();
tagged_photo::table tagged_photo::table
.filter(tagged_photo::tag_id.eq_any(tag_ids.clone())) .filter(tagged_photo::tag_id.eq_any(tag_ids.clone()))
.filter(tagged_photo::photo_name.ne_all(exclude_subquery))
.group_by(tagged_photo::photo_name) .group_by(tagged_photo::photo_name)
.select((tagged_photo::photo_name, count(tagged_photo::tag_id))) .select((tagged_photo::photo_name, count(tagged_photo::tag_id)))
.select(tagged_photo::photo_name) .select(tagged_photo::photo_name)
@@ -421,7 +449,7 @@ mod tests {
let tag_id = self.tag_count; let tag_id = self.tag_count;
let tag = Tag { let tag = Tag {
id: tag_id as i32, id: tag_id,
name: name.to_string(), name: name.to_string(),
created_time: Utc::now().timestamp(), created_time: Utc::now().timestamp(),
}; };
@@ -487,12 +515,17 @@ mod tests {
fn get_files_with_all_tag_ids( fn get_files_with_all_tag_ids(
&mut self, &mut self,
_tag_ids: Vec<i32>, tag_ids: Vec<i32>,
_exclude_tag_ids: Vec<i32>,
) -> anyhow::Result<Vec<String>> { ) -> anyhow::Result<Vec<String>> {
todo!() todo!()
} }
fn get_files_with_any_tag_ids(&mut self, tag_ids: Vec<i32>) -> anyhow::Result<Vec<String>> { fn get_files_with_any_tag_ids(
&mut self,
_tag_ids: Vec<i32>,
_exclude_tag_ids: Vec<i32>,
) -> anyhow::Result<Vec<String>> {
todo!() todo!()
} }
} }

View File

@@ -51,8 +51,7 @@ impl UserDao for TestUserDao {
self.user_map self.user_map
.borrow() .borrow()
.iter() .iter()
.find(|&u| u.username == user) .any(|u| u.username == user)
.is_some()
} }
} }

View File

@@ -1,10 +1,13 @@
use std::io::Result; use crate::is_video;
use std::path::Path;
use std::process::{Child, Command, ExitStatus, Stdio};
use actix::prelude::*; use actix::prelude::*;
use log::{debug, trace}; use futures::TryFutureExt;
use log::{debug, error, info, trace, warn};
use std::io::Result;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, ExitStatus, Stdio};
use std::sync::Arc;
use tokio::sync::Semaphore;
use walkdir::{DirEntry, WalkDir};
// ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8 // ffmpeg -i test.mp4 -c:v h264 -flags +cgop -g 30 -hls_time 3 out.m3u8
// ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8 // ffmpeg -i "filename.mp4" -preset veryfast -c:v libx264 -f hls -hls_list_size 100 -hls_time 2 -crf 24 -vf scale=1080:-2,setsar=1:1 attempt/vid_out.m3u8
@@ -93,3 +96,181 @@ pub fn generate_video_thumbnail(path: &Path, destination: &Path) {
.output() .output()
.expect("Failure to create video frame"); .expect("Failure to create video frame");
} }
pub struct VideoPlaylistManager {
playlist_dir: PathBuf,
playlist_generator: Addr<PlaylistGenerator>,
}
impl VideoPlaylistManager {
pub fn new<P: Into<PathBuf>>(
playlist_dir: P,
playlist_generator: Addr<PlaylistGenerator>,
) -> Self {
Self {
playlist_dir: playlist_dir.into(),
playlist_generator,
}
}
}
impl Actor for VideoPlaylistManager {
type Context = Context<Self>;
}
impl Handler<ScanDirectoryMessage> for VideoPlaylistManager {
type Result = ResponseFuture<()>;
fn handle(&mut self, msg: ScanDirectoryMessage, _ctx: &mut Self::Context) -> Self::Result {
let start = std::time::Instant::now();
info!(
"Starting scan directory for video playlist generation: {}",
msg.directory
);
let video_files = WalkDir::new(&msg.directory)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.filter(is_video)
.collect::<Vec<DirEntry>>();
let scan_dir_name = msg.directory.clone();
let playlist_output_dir = self.playlist_dir.clone();
let playlist_generator = self.playlist_generator.clone();
Box::pin(async move {
for e in video_files {
let path = e.path();
let path_as_str = path.to_str().unwrap();
debug!(
"Sending generate playlist message for path: {}",
path_as_str
);
match playlist_generator
.send(GeneratePlaylistMessage {
playlist_path: playlist_output_dir.to_str().unwrap().to_string(),
video_path: PathBuf::from(path),
})
.await
.expect("Failed to send generate playlist message")
{
Ok(_) => {
debug!(
"Successfully generated playlist for file: '{}'",
path_as_str
);
}
Err(e) => {
warn!("Failed to generate playlist for path '{:?}'. {:?}", path, e);
}
}
}
info!(
"Finished directory scan of '{}' in {:?}",
scan_dir_name,
start.elapsed()
);
})
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct ScanDirectoryMessage {
pub(crate) directory: String,
}
#[derive(Message)]
#[rtype(result = "Result<()>")]
struct GeneratePlaylistMessage {
video_path: PathBuf,
playlist_path: String,
}
pub struct PlaylistGenerator {
semaphore: Arc<Semaphore>,
}
impl PlaylistGenerator {
pub(crate) fn new() -> Self {
PlaylistGenerator {
semaphore: Arc::new(Semaphore::new(2)),
}
}
}
impl Actor for PlaylistGenerator {
type Context = Context<Self>;
}
impl Handler<GeneratePlaylistMessage> for PlaylistGenerator {
type Result = ResponseFuture<Result<()>>;
fn handle(&mut self, msg: GeneratePlaylistMessage, _ctx: &mut Self::Context) -> Self::Result {
let video_file = msg.video_path.to_str().unwrap().to_owned();
let playlist_path = msg.playlist_path.as_str().to_owned();
let semaphore = self.semaphore.clone();
let playlist_file = format!(
"{}/{}.m3u8",
playlist_path,
msg.video_path.file_name().unwrap().to_str().unwrap()
);
Box::pin(async move {
let wait_start = std::time::Instant::now();
let permit = semaphore
.acquire_owned()
.await
.expect("Unable to acquire semaphore");
debug!(
"Waited for {:?} before starting ffmpeg",
wait_start.elapsed()
);
if Path::new(&playlist_file).exists() {
debug!("Playlist already exists: {}", playlist_file);
return Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists));
}
tokio::spawn(async move {
let ffmpeg_result = tokio::process::Command::new("ffmpeg")
.arg("-i")
.arg(&video_file)
.arg("-c:v")
.arg("h264")
.arg("-crf")
.arg("21")
.arg("-preset")
.arg("veryfast")
.arg("-hls_time")
.arg("3")
.arg("-hls_list_size")
.arg("100")
.arg("-vf")
.arg("scale=1080:-2,setsar=1:1")
.arg(playlist_file)
.stdout(Stdio::null())
.stderr(Stdio::piped())
.output()
.inspect_err(|e| error!("Failed to run ffmpeg on child process: {}", e))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
.await;
// Hang on to the permit until we're done decoding and then explicitly drop
drop(permit);
if let Ok(ref res) = ffmpeg_result {
debug!("ffmpeg output: {:?}", res);
}
ffmpeg_result
});
Ok(())
})
}
}