Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement byte range requests for chunks and metadata through zarr interface [EAR-1274] #63

Merged
merged 9 commits into from
Sep 16, 2024
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ all-features = true

[advisories]
version = 2
ignore = ["RUSTSEC-2023-0086"]

[licenses]
# List of explicitly allowed licenses
Expand Down
14 changes: 5 additions & 9 deletions icechunk-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ mod streams;

use std::{pin::Pin, sync::Arc};

use ::icechunk::{
format::ChunkOffset,
zarr::{ByteRange, StoreError},
Store,
};
use ::icechunk::{format::ChunkOffset, zarr::StoreError, Store};
use bytes::Bytes;
use errors::{PyIcechunkStoreError, PyIcechunkStoreResult};
use futures::Stream;
Expand Down Expand Up @@ -76,9 +72,9 @@ impl PyIcechunkStore {
pub async fn get(
&self,
key: String,
byte_range: Option<ByteRange>,
byte_range: Option<(Option<ChunkOffset>, Option<ChunkOffset>)>,
) -> PyIcechunkStoreResult<PyObject> {
let byte_range = byte_range.unwrap_or((None, None));
let byte_range = byte_range.unwrap_or((None, None)).into();
let data = self.store.read().await.get(&key, &byte_range).await?;
let pybytes = Python::with_gil(|py| {
let bound_bytes = PyBytes::new_bound(py, &data);
Expand All @@ -89,9 +85,9 @@ impl PyIcechunkStore {

pub async fn get_partial_values(
&self,
key_ranges: Vec<(String, ByteRange)>,
key_ranges: Vec<(String, (Option<ChunkOffset>, Option<ChunkOffset>))>,
) -> PyIcechunkStoreResult<Vec<Option<PyObject>>> {
let iter = key_ranges.into_iter();
let iter = key_ranges.into_iter().map(|r| (r.0, r.1.into()));
let result = self
.store
.read()
Expand Down
4 changes: 2 additions & 2 deletions icechunk/examples/multithreaded_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{ops::Range, sync::Arc, time::Duration};

use bytes::Bytes;
use futures::StreamExt;
use icechunk::Store;
use icechunk::{format::ByteRange, Store};
use tokio::{sync::RwLock, task::JoinSet, time::sleep};

#[tokio::main]
Expand Down Expand Up @@ -45,7 +45,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
if let Ok(value) = store
.read()
.await
.get(format!("array/c/{i}").as_str(), &(None, None))
.get(format!("array/c/{i}").as_str(), &ByteRange::ALL)
.await
{
println!("Got {value:?} in {attempts} attempts");
Expand Down
11 changes: 7 additions & 4 deletions icechunk/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
mk_manifests_table, ChunkInfo, ChunkRef, ManifestExtents, ManifestRef,
},
snapshot::{mk_snapshot_table, NodeData, NodeSnapshot, UserAttributesSnapshot},
Flags, IcechunkFormatError, NodeId, ObjectId, TableRegion,
ByteRange, Flags, IcechunkFormatError, NodeId, ObjectId, TableRegion,
},
refs::{
fetch_branch, fetch_branch_tip, fetch_tag, last_branch_version, update_branch,
Expand Down Expand Up @@ -559,13 +559,14 @@ impl Dataset {
&self,
path: &Path,
coords: &ChunkIndices,
byte_range: &ByteRange,
) -> DatasetResult<Option<Bytes>> {
match self.get_chunk_ref(path, coords).await? {
Some(ChunkPayload::Ref(ChunkRef { id, .. })) => {
// TODO: we don't have a way to distinguish if we want to pass a range or not
Ok(self.storage.fetch_chunk(&id, &None).await.map(Some)?)
Ok(self.storage.fetch_chunk(&id, byte_range).await.map(Some)?)
}
Some(ChunkPayload::Inline(bytes)) => Ok(Some(bytes)),
Some(ChunkPayload::Inline(bytes)) => Ok(Some(byte_range.slice(bytes))),
//FIXME: implement virtual fetch
Some(ChunkPayload::Virtual(_)) => todo!(),
None => Ok(None),
Expand Down Expand Up @@ -1332,7 +1333,9 @@ mod tests {
let data = Bytes::copy_from_slice(b"foo".repeat(512).as_slice());
ds.set_chunk(&array1_path, &ChunkIndices(vec![0, 0, 0]), data.clone()).await?;

let chunk = ds.get_chunk(&array1_path, &ChunkIndices(vec![0, 0, 0])).await?;
let chunk = ds
.get_chunk(&array1_path, &ChunkIndices(vec![0, 0, 0]), &ByteRange::ALL)
.await?;
assert_eq!(chunk, Some(data));

let path: Path = "/group/array2".into();
Expand Down
64 changes: 63 additions & 1 deletion icechunk/src/format/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use core::fmt;
use std::{fmt::Debug, ops::Range, path::PathBuf};
use std::{
fmt::Debug,
hash::Hash,
ops::{Bound, Range},
path::PathBuf,
};

use ::arrow::array::RecordBatch;
use bytes::Bytes;
use itertools::Itertools;
use serde::{Deserialize, Deserializer, Serialize};
use serde_with::serde_as;
Expand Down Expand Up @@ -89,6 +95,62 @@ pub struct ChunkIndices(pub Vec<u64>);
pub type ChunkOffset = u64;
pub type ChunkLength = u64;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ByteRange(pub Bound<ChunkOffset>, pub Bound<ChunkOffset>);

impl ByteRange {
pub fn from_offset(offset: ChunkOffset) -> Self {
Self(Bound::Included(offset), Bound::Unbounded)
}

pub fn to_offset(offset: ChunkOffset) -> Self {
Self(Bound::Unbounded, Bound::Excluded(offset))
}

pub fn bounded(start: ChunkOffset, end: ChunkOffset) -> Self {
Self(Bound::Included(start), Bound::Excluded(end))
}

pub const ALL: Self = Self(Bound::Unbounded, Bound::Unbounded);

pub fn slice(&self, bytes: Bytes) -> Bytes {
match (self.0, self.1) {
(Bound::Included(start), Bound::Excluded(end)) => {
bytes.slice(start as usize..end as usize)
}
(Bound::Included(start), Bound::Unbounded) => bytes.slice(start as usize..),
(Bound::Unbounded, Bound::Excluded(end)) => bytes.slice(..end as usize),
(Bound::Excluded(start), Bound::Excluded(end)) => {
bytes.slice(start as usize + 1..end as usize)
}
(Bound::Excluded(start), Bound::Unbounded) => {
bytes.slice(start as usize + 1..)
}
(Bound::Unbounded, Bound::Included(end)) => bytes.slice(..=end as usize),
(Bound::Included(start), Bound::Included(end)) => {
bytes.slice(start as usize..=end as usize)
}
(Bound::Excluded(start), Bound::Included(end)) => {
bytes.slice(start as usize + 1..=end as usize)
}
(Bound::Unbounded, Bound::Unbounded) => bytes,
}
}
}

impl From<(Option<ChunkOffset>, Option<ChunkOffset>)> for ByteRange {
fn from((start, end): (Option<ChunkOffset>, Option<ChunkOffset>)) -> Self {
match (start, end) {
(Some(start), Some(end)) => {
Self(Bound::Included(start), Bound::Excluded(end))
}
(Some(start), None) => Self(Bound::Included(start), Bound::Unbounded),
(None, Some(end)) => Self(Bound::Unbounded, Bound::Excluded(end)),
(None, None) => Self(Bound::Unbounded, Bound::Unbounded),
}
}
}

pub type TableOffset = u32;

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
10 changes: 5 additions & 5 deletions icechunk/src/storage/caching.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{ops::Range, sync::Arc};
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -10,7 +10,7 @@ use quick_cache::{

use crate::format::{
attributes::AttributesTable, manifest::ManifestsTable, snapshot::SnapshotTable,
ChunkOffset, ObjectId,
ByteRange, ObjectId,
};

use super::{Storage, StorageError, StorageResult};
Expand All @@ -20,7 +20,7 @@ enum CacheKey {
Snapshot(ObjectId),
Attributes(ObjectId),
Manifest(ObjectId),
Chunk(ObjectId, Option<Range<ChunkOffset>>),
Chunk(ObjectId, ByteRange),
}

#[derive(Clone)]
Expand Down Expand Up @@ -143,7 +143,7 @@ impl Storage for MemCachingStorage {
async fn fetch_chunk(
&self,
id: &ObjectId,
range: &Option<Range<ChunkOffset>>,
range: &ByteRange,
) -> Result<Bytes, StorageError> {
let key = CacheKey::Chunk(id.clone(), range.clone());
match self.cache.get_value_or_guard_async(&key).await {
Expand Down Expand Up @@ -194,7 +194,7 @@ impl Storage for MemCachingStorage {
async fn write_chunk(&self, id: ObjectId, bytes: Bytes) -> Result<(), StorageError> {
self.backend.write_chunk(id.clone(), bytes.clone()).await?;
// TODO: we could add the chunk also with its full range (0, size)
self.cache.insert(CacheKey::Chunk(id, None), CacheValue::Chunk(bytes));
self.cache.insert(CacheKey::Chunk(id, ByteRange::ALL), CacheValue::Chunk(bytes));
Ok(())
}

Expand Down
9 changes: 3 additions & 6 deletions icechunk/src/storage/logging.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
ops::Range,
sync::{Arc, Mutex},
};
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -10,7 +7,7 @@ use futures::stream::BoxStream;
use super::{Storage, StorageError, StorageResult};
use crate::format::{
attributes::AttributesTable, manifest::ManifestsTable, snapshot::SnapshotTable,
ChunkOffset, ObjectId,
ByteRange, ObjectId,
};

#[derive(Debug)]
Expand Down Expand Up @@ -70,7 +67,7 @@ impl Storage for LoggingStorage {
async fn fetch_chunk(
&self,
id: &ObjectId,
range: &Option<Range<ChunkOffset>>,
range: &ByteRange,
) -> Result<Bytes, StorageError> {
self.fetch_log
.lock()
Expand Down
11 changes: 4 additions & 7 deletions icechunk/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::fmt;
use futures::stream::BoxStream;
use parquet::errors as parquet_errors;
use std::{ops::Range, sync::Arc};
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -19,7 +19,7 @@ pub use object_store::ObjectStorage;

use crate::format::{
attributes::AttributesTable, manifest::ManifestsTable, snapshot::SnapshotTable,
ChunkOffset, ObjectId, Path,
ByteRange, ObjectId, Path,
};

#[derive(Debug, Error)]
Expand Down Expand Up @@ -56,11 +56,8 @@ pub trait Storage: fmt::Debug {
id: &ObjectId,
) -> StorageResult<Arc<AttributesTable>>; // FIXME: format flags
async fn fetch_manifests(&self, id: &ObjectId) -> StorageResult<Arc<ManifestsTable>>; // FIXME: format flags
async fn fetch_chunk(
&self,
id: &ObjectId,
range: &Option<Range<ChunkOffset>>,
) -> StorageResult<Bytes>; // FIXME: format flags
async fn fetch_chunk(&self, id: &ObjectId, range: &ByteRange)
-> StorageResult<Bytes>; // FIXME: format flags

async fn write_snapshot(
&self,
Expand Down
55 changes: 42 additions & 13 deletions icechunk/src/storage/object_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::fmt;
use std::{fs::create_dir_all, future::ready, sync::Arc};
use std::{fs::create_dir_all, future::ready, ops::Bound, sync::Arc};

use arrow::array::RecordBatch;
use async_trait::async_trait;
Expand All @@ -8,7 +8,8 @@ use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use object_store::{
buffered::BufWriter, local::LocalFileSystem, memory::InMemory,
path::Path as ObjectPath, ObjectStore, PutMode, PutOptions, PutPayload,
path::Path as ObjectPath, GetOptions, GetRange, ObjectStore, PutMode, PutOptions,
PutPayload,
};
use parquet::arrow::{
async_reader::ParquetObjectReader, async_writer::ParquetObjectWriter,
Expand All @@ -17,11 +18,44 @@ use parquet::arrow::{

use crate::format::{
attributes::AttributesTable, manifest::ManifestsTable, snapshot::SnapshotTable,
ChunkOffset, ObjectId, Path,
ByteRange, ObjectId, Path,
};

use super::{Storage, StorageError, StorageResult};

// Get Range is object_store specific, keep it with this module
impl From<&ByteRange> for Option<GetRange> {
fn from(value: &ByteRange) -> Self {
match (value.0, value.1) {
(Bound::Included(start), Bound::Excluded(end)) => {
Some(GetRange::Bounded(start as usize..end as usize))
}
(Bound::Included(start), Bound::Unbounded) => {
Some(GetRange::Offset(start as usize))
}
(Bound::Included(start), Bound::Included(end)) => {
Some(GetRange::Bounded(start as usize..end as usize + 1))
}
(Bound::Excluded(start), Bound::Excluded(end)) => {
Some(GetRange::Bounded(start as usize + 1..end as usize))
}
(Bound::Excluded(start), Bound::Unbounded) => {
Some(GetRange::Offset(start as usize + 1))
}
(Bound::Excluded(start), Bound::Included(end)) => {
Some(GetRange::Bounded(start as usize + 1..end as usize + 1))
}
(Bound::Unbounded, Bound::Excluded(end)) => {
Some(GetRange::Suffix(end as usize))
}
(Bound::Unbounded, Bound::Included(end)) => {
Some(GetRange::Suffix(end as usize + 1))
}
(Bound::Unbounded, Bound::Unbounded) => None,
}
}
}

const SNAPSHOT_PREFIX: &str = "s/";
const MANIFEST_PREFIX: &str = "m/";
// const ATTRIBUTES_PREFIX: &str = "a/";
Expand Down Expand Up @@ -193,20 +227,15 @@ impl Storage for ObjectStorage {
async fn fetch_chunk(
&self,
id: &ObjectId,
range: &Option<std::ops::Range<ChunkOffset>>,
range: &ByteRange,
) -> Result<Bytes, StorageError> {
let path = self.get_path(CHUNK_PREFIX, id);
// TODO: shall we split `range` into multiple ranges and use get_ranges?
// I can't tell that `get_range` does splitting
if let Some(range) = range {
Ok(self
.store
.get_range(&path, (range.start as usize)..(range.end as usize))
.await?)
} else {
// TODO: Can't figure out if `get` is the most efficient way to get the whole object.
Ok(self.store.get(&path).await?.bytes().await?)
}
let options =
GetOptions { range: Option::<GetRange>::from(range), ..Default::default() };
let chunk = self.store.get_opts(&path, options).await?.bytes().await?;
Ok(chunk)
}

async fn write_chunk(
Expand Down
Loading
Loading