Skip to content

Commit

Permalink
diff and status return richer information (#715)
Browse files Browse the repository at this point in the history
Now the coordinates for all modified chunks are included
  • Loading branch information
paraseba authored Feb 11, 2025
1 parent 33824b2 commit 85774ae
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 58 deletions.
50 changes: 30 additions & 20 deletions icechunk-python/src/repository.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{BTreeSet, HashMap, HashSet},
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
sync::Arc,
};

Expand Down Expand Up @@ -187,19 +187,20 @@ impl PySnapshotInfo {
#[derive(Debug, PartialEq, Eq, Default)]
pub struct PyDiff {
#[pyo3(get)]
pub new_groups: HashSet<String>,
pub new_groups: BTreeSet<String>,
#[pyo3(get)]
pub new_arrays: HashSet<String>,
pub new_arrays: BTreeSet<String>,
#[pyo3(get)]
pub deleted_groups: HashSet<String>,
pub deleted_groups: BTreeSet<String>,
#[pyo3(get)]
pub deleted_arrays: HashSet<String>,
pub deleted_arrays: BTreeSet<String>,
#[pyo3(get)]
pub updated_user_attributes: HashSet<String>,
pub updated_user_attributes: BTreeSet<String>,
#[pyo3(get)]
pub updated_zarr_metadata: HashSet<String>,
pub updated_zarr_metadata: BTreeSet<String>,
#[pyo3(get)]
pub updated_chunks: HashMap<String, u64>,
// A Vec instead of a set to avoid issues with list not being hashable in python
pub updated_chunks: BTreeMap<String, Vec<Vec<u32>>>,
}

impl From<Diff> for PyDiff {
Expand All @@ -222,8 +223,15 @@ impl From<Diff> for PyDiff {
.into_iter()
.map(|path| path.to_string())
.collect();
let updated_chunks =
value.updated_chunks.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
let updated_chunks = value
.updated_chunks
.into_iter()
.map(|(k, v)| {
let path = k.to_string();
let map = v.into_iter().map(|idx| idx.0).collect();
(path, map)
})
.collect();

PyDiff {
new_groups,
Expand All @@ -245,15 +253,13 @@ impl PyDiff {

if !self.new_groups.is_empty() {
res.push_str("Groups created:\n");
//print()
for g in self.new_groups.iter() {
writeln!(res, " {}", g).unwrap();
}
res.push('\n');
}
if !self.new_arrays.is_empty() {
res.push_str("Arrays created:\n");
//print()
for g in self.new_arrays.iter() {
writeln!(res, " {}", g).unwrap();
}
Expand All @@ -262,7 +268,6 @@ impl PyDiff {

if !self.updated_zarr_metadata.is_empty() {
res.push_str("Zarr metadata updated:\n");
//print()
for g in self.updated_zarr_metadata.iter() {
writeln!(res, " {}", g).unwrap();
}
Expand All @@ -271,7 +276,6 @@ impl PyDiff {

if !self.updated_user_attributes.is_empty() {
res.push_str("User attributes updated:\n");
//print()
for g in self.updated_user_attributes.iter() {
writeln!(res, " {}", g).unwrap();
}
Expand All @@ -280,7 +284,6 @@ impl PyDiff {

if !self.deleted_groups.is_empty() {
res.push_str("Groups deleted:\n");
//print()
for g in self.deleted_groups.iter() {
writeln!(res, " {}", g).unwrap();
}
Expand All @@ -289,7 +292,6 @@ impl PyDiff {

if !self.deleted_arrays.is_empty() {
res.push_str("Arrays deleted:\n");
//print()
for g in self.deleted_arrays.iter() {
writeln!(res, " {}", g).unwrap();
}
Expand All @@ -298,11 +300,19 @@ impl PyDiff {

if !self.updated_chunks.is_empty() {
res.push_str("Number of chunks updated:\n");
//print()
for (path, n) in self.updated_chunks.iter() {
writeln!(res, " {}: {}", path, n).unwrap();
for (path, chunks) in self.updated_chunks.iter() {
writeln!(res, " {}:", path).unwrap();
let coords = chunks
.iter()
.map(|idx| format!(" [{}]", idx.iter().join(", ")))
.take(10)
.join("\n");
res.push_str(coords.as_str());
res.push('\n');
if chunks.len() > 10 {
writeln!(res, " ... {} more", chunks.len() - 10).unwrap();
}
}
res.push('\n');
}
res
}
Expand Down
38 changes: 36 additions & 2 deletions icechunk-python/tests/test_timetravel.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ def test_timetravel() -> None:
status = session.status()
assert status.new_groups == {"/"}
assert status.new_arrays == {"/air_temp"}
assert status.updated_chunks == {"/air_temp": 100}
assert list(status.updated_chunks.keys()) == ["/air_temp"]
assert sorted(status.updated_chunks["/air_temp"]) == sorted(
[[i, j] for i in range(10) for j in range(10)]
)
assert status.deleted_groups == set()
assert status.deleted_arrays == set()
assert status.updated_user_attributes == {"/", "/air_temp"} # why?
Expand Down Expand Up @@ -128,11 +131,42 @@ def test_timetravel() -> None:
diff = repo.diff(to_tag="v1.0", from_snapshot=parents[-1].id)
assert diff.new_groups == {"/"}
assert diff.new_arrays == {"/air_temp"}
assert diff.updated_chunks == {"/air_temp": 100}
assert list(diff.updated_chunks.keys()) == ["/air_temp"]
assert sorted(diff.updated_chunks["/air_temp"]) == sorted(
[[i, j] for i in range(10) for j in range(10)]
)
assert diff.deleted_groups == set()
assert diff.deleted_arrays == set()
assert diff.updated_user_attributes == {"/", "/air_temp"} # why?
assert diff.updated_zarr_metadata == set()
assert (
repr(diff)
== """\
Groups created:
/
Arrays created:
/air_temp
User attributes updated:
/
/air_temp
Number of chunks updated:
/air_temp:
[0, 0]
[0, 1]
[0, 2]
[0, 3]
[0, 4]
[0, 5]
[0, 6]
[0, 7]
[0, 8]
[0, 9]
... 90 more
"""
)

with pytest.raises(ValueError, match="doesn't include"):
# if we call diff in the wrong order it fails with a message
Expand Down
55 changes: 21 additions & 34 deletions icechunk/src/format/transaction_log.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,21 @@
use std::{
collections::{HashMap, HashSet},
hash::Hash,
};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};

use crate::change_set::ChangeSet;

use super::{ChunkIndices, NodeId, Path};

#[derive(Clone, Debug, PartialEq)]
pub struct Changes<Id, Chunks>
where
Id: Eq + Hash,
{
#[derive(Clone, Debug, PartialEq, Default)]
pub struct TransactionLog {
// FIXME: better, more stable on-disk format
pub new_groups: HashSet<Id>,
pub new_arrays: HashSet<Id>,
pub deleted_groups: HashSet<Id>,
pub deleted_arrays: HashSet<Id>,
pub updated_user_attributes: HashSet<Id>,
pub updated_zarr_metadata: HashSet<Id>,
pub updated_chunks: HashMap<Id, Chunks>,
pub new_groups: HashSet<NodeId>,
pub new_arrays: HashSet<NodeId>,
pub deleted_groups: HashSet<NodeId>,
pub deleted_arrays: HashSet<NodeId>,
pub updated_user_attributes: HashSet<NodeId>,
pub updated_zarr_metadata: HashSet<NodeId>,
pub updated_chunks: HashMap<NodeId, HashSet<ChunkIndices>>,
}

impl<Id: Hash + Eq, Chunks> Default for Changes<Id, Chunks> {
fn default() -> Self {
Self {
new_groups: Default::default(),
new_arrays: Default::default(),
deleted_groups: Default::default(),
deleted_arrays: Default::default(),
updated_user_attributes: Default::default(),
updated_zarr_metadata: Default::default(),
updated_chunks: Default::default(),
}
}
}

pub type TransactionLog = Changes<NodeId, HashSet<ChunkIndices>>;

impl TransactionLog {
pub fn new(cs: &ChangeSet) -> Self {
let new_groups = cs.new_groups().map(|(_, node_id)| node_id).cloned().collect();
Expand Down Expand Up @@ -96,7 +74,16 @@ impl TransactionLog {
}
}

pub type Diff = Changes<Path, u64>;
#[derive(Clone, Debug, PartialEq)]
pub struct Diff {
pub new_groups: BTreeSet<Path>,
pub new_arrays: BTreeSet<Path>,
pub deleted_groups: BTreeSet<Path>,
pub deleted_arrays: BTreeSet<Path>,
pub updated_user_attributes: BTreeSet<Path>,
pub updated_zarr_metadata: BTreeSet<Path>,
pub updated_chunks: BTreeMap<Path, BTreeSet<ChunkIndices>>,
}

impl Diff {
pub fn from_transaction_log(
Expand Down Expand Up @@ -143,7 +130,7 @@ impl Diff {
.updated_chunks
.iter()
.flat_map(|(node_id, chunks)| {
nodes.get(node_id).map(|n| (n.clone(), chunks.len() as u64))
nodes.get(node_id).map(|n| (n.clone(), chunks.iter().cloned().collect()))
})
.collect();
Self {
Expand Down
14 changes: 12 additions & 2 deletions icechunk/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2182,7 +2182,10 @@ mod tests {

let diff = ds.status().await?;
assert!(!diff.is_empty());
assert_eq!(diff.updated_chunks, [(new_array_path.clone(), 1)].into());
assert_eq!(
diff.updated_chunks,
[(new_array_path.clone(), [ChunkIndices(vec![0, 0, 0])].into())].into()
);

let _snapshot_id =
ds.commit("commit", Some(SnapshotProperties::default())).await?;
Expand Down Expand Up @@ -2345,7 +2348,14 @@ mod tests {
&diff.new_arrays,
&[new_array_path.clone()].into() // we never committed array2
);
assert_eq!(&diff.updated_chunks, &[(new_array_path.clone(), 2)].into());
assert_eq!(
&diff.updated_chunks,
&[(
new_array_path.clone(),
[ChunkIndices(vec![0, 0, 0]), ChunkIndices(vec![0, 0, 1])].into()
)]
.into()
);
assert_eq!(&diff.updated_user_attributes, &[new_array_path.clone()].into());
assert_eq!(&diff.updated_zarr_metadata, &[new_array_path.clone()].into());
Ok(())
Expand Down

0 comments on commit 85774ae

Please sign in to comment.