Skip to content

Commit

Permalink
Cleaner init for dataset with empty commit in init
Browse files Browse the repository at this point in the history
  • Loading branch information
mpiannucci committed Sep 18, 2024
1 parent 7ebf819 commit 8bb1f35
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 39 deletions.
21 changes: 14 additions & 7 deletions icechunk/src/dataset.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::{BTreeMap, HashMap, HashSet},
iter,
iter::{self, Empty},
mem::take,
path::PathBuf,
sync::Arc,
Expand Down Expand Up @@ -285,12 +285,19 @@ impl Dataset {
/// This is the default way to create a new dataset to avoid race conditions
/// when creating datasets.
pub async fn init(
config: DatasetConfig,
storage: Arc<dyn Storage + Send + Sync>,
) -> DatasetResult<Dataset> {
let mut dataset = Dataset::create(storage).with_config(config).build();
dataset.commit("main", "Store created").await?;
Ok(dataset)
) -> DatasetResult<DatasetBuilder> {
let manifest = Arc::new(ManifestsTable::default());
let new_manifest_id = ObjectId::random();
storage.write_manifests(new_manifest_id.clone(), manifest).await?;

let new_snapshot =
Either::<Empty<NodeSnapshot>, Empty<NodeSnapshot>>::Left(iter::empty())
.collect();
let new_snapshot_id = ObjectId::random();
storage.write_snapshot(new_snapshot_id.clone(), Arc::new(new_snapshot)).await?;

Ok(DatasetBuilder::new(storage, Some(new_snapshot_id)))
}

fn new(
Expand Down Expand Up @@ -915,7 +922,7 @@ impl Dataset {
let chunk_changes_c = Arc::clone(&chunk_changes);

let update_task = task::spawn_blocking(move || {
//FIXME: avoid clone, t his one is extremely expensive en memory
//FIXME: avoid clone, this one is extremely expensive en memory
//it's currently needed because we don't want to destroy the manifest in case of later
//failure
let mut new_chunks = old_manifest.as_ref().chunks.clone();
Expand Down
59 changes: 27 additions & 32 deletions icechunk/src/zarr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
ByteRange, ChunkOffset, IcechunkFormatError,
},
refs::BranchVersion,
Dataset, MemCachingStorage, ObjectStorage, Storage,
Dataset, DatasetBuilder, MemCachingStorage, ObjectStorage, Storage,
};

pub use crate::format::ObjectId;
Expand Down Expand Up @@ -576,43 +576,38 @@ async fn mk_dataset(
dataset: &DatasetConfig,
storage: Arc<dyn Storage + Send + Sync>,
) -> Result<(Dataset, Option<String>), String> {
let (dataset, branch): (Dataset, Option<String>) = match &dataset.previous_version {
None => {
let dataset =
Dataset::init(crate::dataset::DatasetConfig::default(), storage)
let (mut builder, branch): (DatasetBuilder, Option<String>) =
match &dataset.previous_version {
None => {
let builder = Dataset::init(storage)
.await
.map_err(|err| format!("Error initializing dataset: {err}"))?;
(dataset, Some(String::from("main")))
}
Some(VersionInfo::SnapshotId(sid)) => {
let mut builder = Dataset::update(storage, sid.clone());
if let Some(inline_theshold) = dataset.inline_chunk_threshold_bytes {
builder.with_inline_threshold_bytes(inline_theshold);
(builder, Some(String::from("main")))
}
(builder.build(), None)
}
Some(VersionInfo::TagRef(tag)) => {
let mut builder = Dataset::from_tag(storage, tag)
.await
.map_err(|err| format!("Error fetching tag: {err}"))?;
if let Some(inline_theshold) = dataset.inline_chunk_threshold_bytes {
builder.with_inline_threshold_bytes(inline_theshold);
Some(VersionInfo::SnapshotId(sid)) => {
let builder = Dataset::update(storage, sid.clone());
(builder, None)
}
(builder.build(), None)
}
Some(VersionInfo::BranchTipRef(branch)) => {
let mut builder = Dataset::from_branch_tip(storage, branch)
.await
.map_err(|err| format!("Error fetching branch: {err}"))?;
if let Some(inline_theshold) = dataset.inline_chunk_threshold_bytes {
builder.with_inline_threshold_bytes(inline_theshold);
Some(VersionInfo::TagRef(tag)) => {
let builder = Dataset::from_tag(storage, tag)
.await
.map_err(|err| format!("Error fetching tag: {err}"))?;
(builder, None)
}
(builder.build(), Some(branch.clone()))
}
};
Some(VersionInfo::BranchTipRef(branch)) => {
let builder = Dataset::from_branch_tip(storage, branch)
.await
.map_err(|err| format!("Error fetching branch: {err}"))?;
(builder, Some(branch.clone()))
}
};

if let Some(inline_theshold) = dataset.inline_chunk_threshold_bytes {
builder.with_inline_threshold_bytes(inline_theshold);
}

// TODO: add error checking, does the previous version exist?
Ok((dataset, branch))
Ok((builder.build(), branch))
}

fn mk_storage(config: &StorageConfig) -> Result<Arc<dyn Storage + Send + Sync>, String> {
Expand Down Expand Up @@ -1527,7 +1522,7 @@ mod tests {
store.set("array/c/0/1/0", data.clone()).await.unwrap();

let (snapshot_id, version) = store.commit("initial commit").await.unwrap();
assert_eq!(version.0, 1);
assert_eq!(version.0, 0);

let new_data = Bytes::copy_from_slice(b"world");
store.set("array/c/0/1/0", new_data.clone()).await.unwrap();
Expand Down

0 comments on commit 8bb1f35

Please sign in to comment.