From f3cff17f667d8fea2d3dcedffd66d7bc7b18baf2 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 13 Dec 2023 19:04:57 +0100 Subject: [PATCH] implement folder loading support --- .../src/data_loader/loader_directory.rs | 75 +++++++++++++++++++ crates/re_data_source/src/data_loader/mod.rs | 3 + crates/re_data_source/src/load_file.rs | 8 +- 3 files changed, 82 insertions(+), 4 deletions(-) create mode 100644 crates/re_data_source/src/data_loader/loader_directory.rs diff --git a/crates/re_data_source/src/data_loader/loader_directory.rs b/crates/re_data_source/src/data_loader/loader_directory.rs new file mode 100644 index 0000000000000..c029aa8d70383 --- /dev/null +++ b/crates/re_data_source/src/data_loader/loader_directory.rs @@ -0,0 +1,75 @@ +// TODO: issue -> HIVE partitiong, timestamp regexes, zip files, that kinda thing + +/// Loads entire directories, using the appropriate [`crate::DataLoader`]:s for each files within. +pub struct DirectoryLoader; + +impl crate::DataLoader for DirectoryLoader { + #[inline] + fn name(&self) -> String { + "rerun.data_loaders.Directory".into() + } + + #[cfg(not(target_arch = "wasm32"))] + fn load_from_file( + &self, + store_id: re_log_types::StoreId, + dirpath: std::path::PathBuf, + tx: std::sync::mpsc::Sender, + ) -> Result<(), crate::DataLoaderError> { + if dirpath.is_file() { + return Ok(()); // simply not interested + } + + re_tracing::profile_function!(dirpath.display().to_string()); + + re_log::debug!(?dirpath, loader = self.name(), "Loading directory…",); + + for entry in walkdir::WalkDir::new(&dirpath) { + let entry = match entry { + Ok(entry) => entry, + Err(err) => { + re_log::error!(loader = self.name(), ?dirpath, %err, "Failed to open filesystem entry"); + continue; + } + }; + + let filepath = entry.path(); + if filepath.is_file() { + let store_id = store_id.clone(); + let filepath = filepath.to_owned(); + let tx = tx.clone(); + + // NOTE: spawn is fine, this whole function is native-only. + rayon::spawn(move || { + let data = match crate::load_file::load(&store_id, &filepath, false, None) { + Ok(data) => data, + Err(err) => { + re_log::error!(?filepath, %err, "Failed to load directory entry"); + return; + } + }; + + for datum in data { + if tx.send(datum).is_err() { + break; + } + } + }); + } + } + + Ok(()) + } + + #[inline] + fn load_from_file_contents( + &self, + _store_id: re_log_types::StoreId, + _path: std::path::PathBuf, + _contents: std::borrow::Cow<'_, [u8]>, + _tx: std::sync::mpsc::Sender, + ) -> Result<(), crate::DataLoaderError> { + // TODO: zip file supports + Ok(()) // simply not interested + } +} diff --git a/crates/re_data_source/src/data_loader/mod.rs b/crates/re_data_source/src/data_loader/mod.rs index 7bac10cee13fb..5c9cf93efd5ce 100644 --- a/crates/re_data_source/src/data_loader/mod.rs +++ b/crates/re_data_source/src/data_loader/mod.rs @@ -161,6 +161,7 @@ static BUILTIN_LOADERS: Lazy>> = Lazy::new(|| { vec![ Arc::new(RrdLoader) as Arc, Arc::new(ArchetypeLoader), + Arc::new(DirectoryLoader), ] }); @@ -173,7 +174,9 @@ pub fn iter_loaders() -> impl ExactSizeIterator> { // --- mod loader_archetype; +mod loader_directory; mod loader_rrd; pub use self::loader_archetype::ArchetypeLoader; +pub use self::loader_directory::DirectoryLoader; pub use self::loader_rrd::RrdLoader; diff --git a/crates/re_data_source/src/load_file.rs b/crates/re_data_source/src/load_file.rs index 989c5a4f7d468..29523b02a3467 100644 --- a/crates/re_data_source/src/load_file.rs +++ b/crates/re_data_source/src/load_file.rs @@ -90,11 +90,11 @@ fn extension(path: &std::path::Path) -> String { /// This does _not_ access the filesystem. #[inline] fn is_builtin(path: &std::path::Path, is_dir: bool) -> bool { - !is_dir && crate::is_known_file_extension(&extension(path)) + is_dir || crate::is_known_file_extension(&extension(path)) } /// Prepares an adequate [`re_log_types::StoreInfo`] [`LogMsg`] given the input. -fn prepare_store_info( +pub(crate) fn prepare_store_info( store_id: &re_log_types::StoreId, file_source: FileSource, path: &std::path::Path, @@ -131,7 +131,7 @@ fn prepare_store_info( /// - On native, this is filled asynchronously from other threads. /// - On wasm, this is pre-filled synchronously. #[cfg_attr(target_arch = "wasm32", allow(clippy::needless_pass_by_value))] -fn load( +pub(crate) fn load( store_id: &re_log_types::StoreId, path: &std::path::Path, is_dir: bool, @@ -210,7 +210,7 @@ fn load( /// Forwards the data in `rx_loader` to `tx`, taking care of necessary conversions, if any. /// /// Runs asynchronously from another thread on native, synchronously on wasm. -fn send( +pub(crate) fn send( store_id: &re_log_types::StoreId, rx_loader: std::sync::mpsc::Receiver, tx: &Sender,