Skip to content

Commit

Permalink
Add a custom implementation LocalFileSystem::list_with_offset (#7019)
Browse files Browse the repository at this point in the history
* Initial change from Daniel.

* Upgrade unit test to be more generic.

* Add comments on why we have filter

* Cleanup unit tests.

* Update object_store/src/local.rs

Co-authored-by: Adam Reeve <adreeve@gmail.com>

* Add changes suggested by Adam.

* Cleanup match error.

* Apply formatting changes suggested by cargo +stable fmt --all.

* Apply cosmetic changes suggested by clippy.

* Upgrade test_path_with_offset to create temporary directory + files for testing rather than pointing to existing dir.

---------

Co-authored-by: Adam Reeve <adreeve@gmail.com>
  • Loading branch information
corwinjoy and adamreeve authored Feb 8, 2025
1 parent 4e34203 commit 5bf4a6c
Showing 1 changed file with 155 additions and 64 deletions.
219 changes: 155 additions & 64 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,71 +483,15 @@ impl ObjectStore for LocalFileSystem {
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let config = Arc::clone(&self.config);

let root_path = match prefix {
Some(prefix) => match config.prefix_to_filesystem(prefix) {
Ok(path) => path,
Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(),
},
None => self.config.root.to_file_path().unwrap(),
};

let walkdir = WalkDir::new(root_path)
// Don't include the root directory itself
.min_depth(1)
.follow_links(true);

let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
let entry = match convert_walkdir_result(result_dir_entry).transpose()? {
Ok(entry) => entry,
Err(e) => return Some(Err(e)),
};

if !entry.path().is_file() {
return None;
}

match config.filesystem_to_path(entry.path()) {
Ok(path) => match is_valid_file_path(&path) {
true => convert_entry(entry, path).transpose(),
false => None,
},
Err(e) => Some(Err(e)),
}
});

// If no tokio context, return iterator directly as no
// need to perform chunked spawn_blocking reads
if tokio::runtime::Handle::try_current().is_err() {
return futures::stream::iter(s).boxed();
}

// Otherwise list in batches of CHUNK_SIZE
const CHUNK_SIZE: usize = 1024;

let buffer = VecDeque::with_capacity(CHUNK_SIZE);
futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move {
if buffer.is_empty() {
(s, buffer) = tokio::task::spawn_blocking(move || {
for _ in 0..CHUNK_SIZE {
match s.next() {
Some(r) => buffer.push_back(r),
None => break,
}
}
(s, buffer)
})
.await?;
}
self.list_with_maybe_offset(prefix, None)
}

match buffer.pop_front() {
Some(Err(e)) => Err(e),
Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
None => Ok(None),
}
})
.boxed()
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, Result<ObjectMeta>> {
self.list_with_maybe_offset(prefix, Some(offset))
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
Expand Down Expand Up @@ -678,6 +622,93 @@ impl ObjectStore for LocalFileSystem {
}
}

impl LocalFileSystem {
fn list_with_maybe_offset(
&self,
prefix: Option<&Path>,
maybe_offset: Option<&Path>,
) -> BoxStream<'static, Result<ObjectMeta>> {
let config = Arc::clone(&self.config);

let root_path = match prefix {
Some(prefix) => match config.prefix_to_filesystem(prefix) {
Ok(path) => path,
Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(),
},
None => config.root.to_file_path().unwrap(),
};

let walkdir = WalkDir::new(root_path)
// Don't include the root directory itself
.min_depth(1)
.follow_links(true);

let maybe_offset = maybe_offset.cloned();

let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
// Apply offset filter before proceeding, to reduce statx file system calls
// This matters for NFS mounts
if let (Some(offset), Ok(entry)) = (maybe_offset.as_ref(), result_dir_entry.as_ref()) {
let location = config.filesystem_to_path(entry.path());
match location {
Ok(path) if path <= *offset => return None,
Err(e) => return Some(Err(e)),
_ => {}
}
}

let entry = match convert_walkdir_result(result_dir_entry).transpose()? {
Ok(entry) => entry,
Err(e) => return Some(Err(e)),
};

if !entry.path().is_file() {
return None;
}

match config.filesystem_to_path(entry.path()) {
Ok(path) => match is_valid_file_path(&path) {
true => convert_entry(entry, path).transpose(),
false => None,
},
Err(e) => Some(Err(e)),
}
});

// If no tokio context, return iterator directly as no
// need to perform chunked spawn_blocking reads
if tokio::runtime::Handle::try_current().is_err() {
return futures::stream::iter(s).boxed();
}

// Otherwise list in batches of CHUNK_SIZE
const CHUNK_SIZE: usize = 1024;

let buffer = VecDeque::with_capacity(CHUNK_SIZE);
futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move {
if buffer.is_empty() {
(s, buffer) = tokio::task::spawn_blocking(move || {
for _ in 0..CHUNK_SIZE {
match s.next() {
Some(r) => buffer.push_back(r),
None => break,
}
}
(s, buffer)
})
.await?;
}

match buffer.pop_front() {
Some(Err(e)) => Err(e),
Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
None => Ok(None),
}
})
.boxed()
}
}

/// Creates the parent directories of `path` or returns an error based on `source` if no parent
fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> {
let parent = path.parent().ok_or_else(|| {
Expand Down Expand Up @@ -1459,6 +1490,66 @@ mod tests {
);
}

#[tokio::test]
async fn test_path_with_offset() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();

let root_path = root.path();
for i in 0..5 {
let filename = format!("test{}.parquet", i);
let file = root_path.join(filename);
std::fs::write(file, "test").unwrap();
}
let filter_str = "test";
let filter = String::from(filter_str);
let offset_str = filter + "1";
let offset = Path::from(offset_str.clone());

// Use list_with_offset to retrieve files
let res = integration.list_with_offset(None, &offset);
let offset_paths: Vec<_> = res.map_ok(|x| x.location).try_collect().await.unwrap();
let mut offset_files: Vec<_> = offset_paths
.iter()
.map(|x| String::from(x.filename().unwrap()))
.collect();

// Check result with direct filesystem read
let files = fs::read_dir(root_path).unwrap();
let filtered_files = files
.filter_map(Result::ok)
.filter_map(|d| {
d.file_name().to_str().and_then(|f| {
if f.contains(filter_str) {
Some(String::from(f))
} else {
None
}
})
})
.collect::<Vec<_>>();

let mut expected_offset_files: Vec<_> = filtered_files
.iter()
.filter(|s| **s > offset_str)
.cloned()
.collect();

fn do_vecs_match<T: PartialEq>(a: &[T], b: &[T]) -> bool {
let matching = a.iter().zip(b.iter()).filter(|&(a, b)| a == b).count();
matching == a.len() && matching == b.len()
}

offset_files.sort();
expected_offset_files.sort();

// println!("Expected Offset Files: {:?}", expected_offset_files);
// println!("Actual Offset Files: {:?}", offset_files);

assert_eq!(offset_files.len(), expected_offset_files.len());
assert!(do_vecs_match(&expected_offset_files, &offset_files));
}

#[tokio::test]
async fn filesystem_filename_with_percent() {
let temp_dir = TempDir::new().unwrap();
Expand Down

0 comments on commit 5bf4a6c

Please sign in to comment.