From 33824b2f391d83ccd59f4d35cba683955dfd7326 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebasti=C3=A1n=20Galkin?= Date: Mon, 10 Feb 2025 18:33:54 -0300 Subject: [PATCH] Icechunk learns how to do `diff` and `status` (#711) Closes: #309, #679 Co-authored-by: Matthew Iannucci --- icechunk-python/python/icechunk/__init__.py | 2 + .../python/icechunk/_icechunk_python.pyi | 26 +++ icechunk-python/python/icechunk/repository.py | 33 ++++ icechunk-python/python/icechunk/session.py | 12 ++ icechunk-python/src/lib.rs | 3 +- icechunk-python/src/repository.rs | 154 +++++++++++++++++ icechunk-python/src/session.rs | 15 +- icechunk-python/tests/test_timetravel.py | 27 ++- icechunk/src/format/snapshot.rs | 7 + icechunk/src/format/transaction_log.rs | 162 ++++++++++++++---- icechunk/src/repository.rs | 84 ++++++++- icechunk/src/session.rs | 67 +++++++- 12 files changed, 539 insertions(+), 53 deletions(-) diff --git a/icechunk-python/python/icechunk/__init__.py b/icechunk-python/python/icechunk/__init__.py index 2379b9d2d..4df73ca4a 100644 --- a/icechunk-python/python/icechunk/__init__.py +++ b/icechunk-python/python/icechunk/__init__.py @@ -13,6 +13,7 @@ ConflictSolver, ConflictType, Credentials, + Diff, GcsCredentials, GcsStaticCredentials, GCSummary, @@ -91,6 +92,7 @@ "ConflictSolver", "ConflictType", "Credentials", + "Diff", "GCSummary", "GcsCredentials", "GcsStaticCredentials", diff --git a/icechunk-python/python/icechunk/_icechunk_python.pyi b/icechunk-python/python/icechunk/_icechunk_python.pyi index 465a4cb43..1fdfad0bd 100644 --- a/icechunk-python/python/icechunk/_icechunk_python.pyi +++ b/icechunk-python/python/icechunk/_icechunk_python.pyi @@ -265,6 +265,22 @@ class RepositoryConfig: def set_virtual_chunk_container(self, cont: VirtualChunkContainer) -> None: ... def clear_virtual_chunk_containers(self) -> None: ... +class Diff: + @property + def new_groups(self) -> set[str]: ... + @property + def new_arrays(self) -> set[str]: ... + @property + def deleted_groups(self) -> set[str]: ... + @property + def deleted_arrays(self) -> set[str]: ... + @property + def updated_user_attributes(self) -> set[str]: ... + @property + def updated_zarr_metadata(self) -> set[str]: ... + @property + def updated_chunks(self) -> dict[str, int]: ... + class GCSummary: @property def chunks_deleted(self) -> int: ... @@ -332,6 +348,15 @@ class PyRepository: def create_tag(self, tag: str, snapshot_id: str) -> None: ... def list_tags(self) -> set[str]: ... def lookup_tag(self, tag: str) -> str: ... + def diff( + self, + from_branch: str | None = None, + from_tag: str | None = None, + from_snapshot: str | None = None, + to_branch: str | None = None, + to_tag: str | None = None, + to_snapshot: str | None = None, + ) -> Diff: ... def readonly_session( self, *, @@ -364,6 +389,7 @@ class PySession: def branch(self) -> str | None: ... @property def has_uncommitted_changes(self) -> bool: ... + def status(self) -> Diff: ... def discard_changes(self) -> None: ... def all_virtual_chunk_locations(self) -> list[str]: ... def chunk_coordinates( diff --git a/icechunk-python/python/icechunk/repository.py b/icechunk-python/python/icechunk/repository.py index 07e42e030..c3f5ed4f5 100644 --- a/icechunk-python/python/icechunk/repository.py +++ b/icechunk-python/python/icechunk/repository.py @@ -3,6 +3,7 @@ from typing import Self from icechunk._icechunk_python import ( + Diff, GCSummary, PyRepository, RepositoryConfig, @@ -388,6 +389,38 @@ def lookup_tag(self, tag: str) -> str: """ return self._repository.lookup_tag(tag) + def diff( + self, + *, + from_branch: str | None = None, + from_tag: str | None = None, + from_snapshot: str | None = None, + to_branch: str | None = None, + to_tag: str | None = None, + to_snapshot: str | None = None, + ) -> Diff: + """ + Compute an overview of the operations executed from version `from` to version `to`. + + Both versions, `from` and `to`, must be identified. Identification can be done using a branch, tag or snapshot id. + The styles used to identify the `from` and `to` versions can be different. + + The `from` version must be a member of the `ancestry` of `to`. + + Returns + ------- + Diff + The operations executed between the two versions + """ + return self._repository.diff( + from_branch=from_branch, + from_tag=from_tag, + from_snapshot=from_snapshot, + to_branch=to_branch, + to_tag=to_tag, + to_snapshot=to_snapshot, + ) + def readonly_session( self, *, diff --git a/icechunk-python/python/icechunk/session.py b/icechunk-python/python/icechunk/session.py index a158a2361..64ad32026 100644 --- a/icechunk-python/python/icechunk/session.py +++ b/icechunk-python/python/icechunk/session.py @@ -6,6 +6,7 @@ Conflict, ConflictErrorData, ConflictSolver, + Diff, RebaseFailedData, ) from icechunk._icechunk_python import PyConflictError, PyRebaseFailedError, PySession @@ -179,6 +180,17 @@ def has_uncommitted_changes(self) -> bool: """ return self._session.has_uncommitted_changes + def status(self) -> Diff: + """ + Compute an overview of the current session changes + + Returns + ------- + Diff + The operations executed in the current session but still not committed. + """ + return self._session.status() + def discard_changes(self) -> None: """ When the session is writable, discard any uncommitted changes. diff --git a/icechunk-python/src/lib.rs b/icechunk-python/src/lib.rs index fa2863ce1..2d81e62ff 100644 --- a/icechunk-python/src/lib.rs +++ b/icechunk-python/src/lib.rs @@ -27,7 +27,7 @@ use errors::{ }; use icechunk::{format::format_constants::SpecVersionBin, initialize_tracing}; use pyo3::prelude::*; -use repository::{PyGCSummary, PyRepository, PySnapshotInfo}; +use repository::{PyDiff, PyGCSummary, PyRepository, PySnapshotInfo}; use session::PySession; use store::PyStore; @@ -81,6 +81,7 @@ fn _icechunk_python(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_function(wrap_pyfunction!(initialize_logs, m)?)?; m.add_function(wrap_pyfunction!(spec_version, m)?)?; diff --git a/icechunk-python/src/repository.rs b/icechunk-python/src/repository.rs index bcba182b0..3bc209f84 100644 --- a/icechunk-python/src/repository.rs +++ b/icechunk-python/src/repository.rs @@ -11,6 +11,7 @@ use icechunk::{ config::Credentials, format::{ snapshot::{SnapshotInfo, SnapshotProperties}, + transaction_log::Diff, SnapshotId, }, ops::gc::{expire, garbage_collect, ExpiredRefAction, GCConfig, GCSummary}, @@ -182,6 +183,131 @@ impl PySnapshotInfo { } } +#[pyclass(name = "Diff", eq)] +#[derive(Debug, PartialEq, Eq, Default)] +pub struct PyDiff { + #[pyo3(get)] + pub new_groups: HashSet, + #[pyo3(get)] + pub new_arrays: HashSet, + #[pyo3(get)] + pub deleted_groups: HashSet, + #[pyo3(get)] + pub deleted_arrays: HashSet, + #[pyo3(get)] + pub updated_user_attributes: HashSet, + #[pyo3(get)] + pub updated_zarr_metadata: HashSet, + #[pyo3(get)] + pub updated_chunks: HashMap, +} + +impl From for PyDiff { + fn from(value: Diff) -> Self { + let new_groups = + value.new_groups.into_iter().map(|path| path.to_string()).collect(); + let new_arrays = + value.new_arrays.into_iter().map(|path| path.to_string()).collect(); + let deleted_groups = + value.deleted_groups.into_iter().map(|path| path.to_string()).collect(); + let deleted_arrays = + value.deleted_arrays.into_iter().map(|path| path.to_string()).collect(); + let updated_user_attributes = value + .updated_user_attributes + .into_iter() + .map(|path| path.to_string()) + .collect(); + let updated_zarr_metadata = value + .updated_zarr_metadata + .into_iter() + .map(|path| path.to_string()) + .collect(); + let updated_chunks = + value.updated_chunks.into_iter().map(|(k, v)| (k.to_string(), v)).collect(); + + PyDiff { + new_groups, + new_arrays, + deleted_groups, + deleted_arrays, + updated_user_attributes, + updated_zarr_metadata, + updated_chunks, + } + } +} + +#[pymethods] +impl PyDiff { + pub fn __repr__(&self) -> String { + let mut res = String::new(); + use std::fmt::Write; + + if !self.new_groups.is_empty() { + res.push_str("Groups created:\n"); + //print() + for g in self.new_groups.iter() { + writeln!(res, " {}", g).unwrap(); + } + res.push('\n'); + } + if !self.new_arrays.is_empty() { + res.push_str("Arrays created:\n"); + //print() + for g in self.new_arrays.iter() { + writeln!(res, " {}", g).unwrap(); + } + res.push('\n'); + } + + if !self.updated_zarr_metadata.is_empty() { + res.push_str("Zarr metadata updated:\n"); + //print() + for g in self.updated_zarr_metadata.iter() { + writeln!(res, " {}", g).unwrap(); + } + res.push('\n'); + } + + if !self.updated_user_attributes.is_empty() { + res.push_str("User attributes updated:\n"); + //print() + for g in self.updated_user_attributes.iter() { + writeln!(res, " {}", g).unwrap(); + } + res.push('\n'); + } + + if !self.deleted_groups.is_empty() { + res.push_str("Groups deleted:\n"); + //print() + for g in self.deleted_groups.iter() { + writeln!(res, " {}", g).unwrap(); + } + res.push('\n'); + } + + if !self.deleted_arrays.is_empty() { + res.push_str("Arrays deleted:\n"); + //print() + for g in self.deleted_arrays.iter() { + writeln!(res, " {}", g).unwrap(); + } + res.push('\n'); + } + + if !self.updated_chunks.is_empty() { + res.push_str("Number of chunks updated:\n"); + //print() + for (path, n) in self.updated_chunks.iter() { + writeln!(res, " {}: {}", path, n).unwrap(); + } + res.push('\n'); + } + res + } +} + #[pyclass(name = "GCSummary", eq)] #[derive(Debug, PartialEq, Eq, Default)] pub struct PyGCSummary { @@ -586,6 +712,34 @@ impl PyRepository { }) } + #[pyo3(signature = (*, from_branch=None, from_tag=None, from_snapshot=None, to_branch=None, to_tag=None, to_snapshot=None))] + #[allow(clippy::too_many_arguments)] + pub fn diff( + &self, + py: Python<'_>, + from_branch: Option, + from_tag: Option, + from_snapshot: Option, + to_branch: Option, + to_tag: Option, + to_snapshot: Option, + ) -> PyResult { + let from = args_to_version_info(from_branch, from_tag, from_snapshot)?; + let to = args_to_version_info(to_branch, to_tag, to_snapshot)?; + + // This function calls block_on, so we need to allow other thread python to make progress + py.allow_threads(move || { + pyo3_async_runtimes::tokio::get_runtime().block_on(async move { + let diff = self + .0 + .diff(&from, &to) + .await + .map_err(PyIcechunkStoreError::SessionError)?; + Ok(diff.into()) + }) + }) + } + #[pyo3(signature = (*, branch = None, tag = None, snapshot = None))] pub fn readonly_session( &self, diff --git a/icechunk-python/src/session.rs b/icechunk-python/src/session.rs index 13d17c10e..7f8c0bf68 100644 --- a/icechunk-python/src/session.rs +++ b/icechunk-python/src/session.rs @@ -9,7 +9,7 @@ use tokio::sync::{Mutex, RwLock}; use crate::{ conflicts::PyConflictSolver, errors::{PyIcechunkStoreError, PyIcechunkStoreResult}, - repository::PySnapshotProperties, + repository::{PyDiff, PySnapshotProperties}, store::PyStore, streams::PyAsyncGenerator, }; @@ -73,6 +73,19 @@ impl PySession { py.allow_threads(move || self.0.blocking_read().has_uncommitted_changes()) } + pub fn status(&self, py: Python<'_>) -> PyResult { + // This is blocking function, we need to release the Gil + py.allow_threads(move || { + let session = self.0.blocking_read(); + + pyo3_async_runtimes::tokio::get_runtime().block_on(async move { + let res = + session.status().await.map_err(PyIcechunkStoreError::SessionError)?; + Ok(res.into()) + }) + }) + } + pub fn discard_changes(&self, py: Python<'_>) { // This is blocking function, we need to release the Gil py.allow_threads(move || { diff --git a/icechunk-python/tests/test_timetravel.py b/icechunk-python/tests/test_timetravel.py index 04a6406b7..040d49d65 100644 --- a/icechunk-python/tests/test_timetravel.py +++ b/icechunk-python/tests/test_timetravel.py @@ -21,6 +21,7 @@ def test_timetravel() -> None: storage=ic.in_memory_storage(), config=config, ) + session = repo.writable_session("main") store = session.store @@ -32,7 +33,16 @@ def test_timetravel() -> None: air_temp[:, :] = 42 assert air_temp[200, 6] == 42 - snapshot_id = session.commit("commit 1") + status = session.status() + assert status.new_groups == {"/"} + assert status.new_arrays == {"/air_temp"} + assert status.updated_chunks == {"/air_temp": 100} + assert status.deleted_groups == set() + assert status.deleted_arrays == set() + assert status.updated_user_attributes == {"/", "/air_temp"} # why? + assert status.updated_zarr_metadata == set() + + first_snapshot_id = session.commit("commit 1") assert session.read_only session = repo.writable_session("main") @@ -45,7 +55,7 @@ def test_timetravel() -> None: new_snapshot_id = session.commit("commit 2") - session = repo.readonly_session(snapshot=snapshot_id) + session = repo.readonly_session(snapshot=first_snapshot_id) store = session.store group = zarr.open_group(store=store, mode="r") air_temp = cast(zarr.core.array.Array, group["air_temp"]) @@ -115,6 +125,19 @@ def test_timetravel() -> None: assert list(repo.ancestry(tag="v1.0")) == parents assert list(repo.ancestry(branch="feature-not-dead")) == parents + diff = repo.diff(to_tag="v1.0", from_snapshot=parents[-1].id) + assert diff.new_groups == {"/"} + assert diff.new_arrays == {"/air_temp"} + assert diff.updated_chunks == {"/air_temp": 100} + assert diff.deleted_groups == set() + assert diff.deleted_arrays == set() + assert diff.updated_user_attributes == {"/", "/air_temp"} # why? + assert diff.updated_zarr_metadata == set() + + with pytest.raises(ValueError, match="doesn't include"): + # if we call diff in the wrong order it fails with a message + repo.diff(from_tag="v1.0", to_snapshot=parents[-1].id) + # check async ancestry works assert list(repo.ancestry(snapshot=feature_snapshot_id)) == asyncio.run( async_ancestry(repo, snapshot=feature_snapshot_id) diff --git a/icechunk/src/format/snapshot.rs b/icechunk/src/format/snapshot.rs index 332920625..5f0b33e3e 100644 --- a/icechunk/src/format/snapshot.rs +++ b/icechunk/src/format/snapshot.rs @@ -173,6 +173,13 @@ impl From<&Snapshot> for SnapshotInfo { } } +impl SnapshotInfo { + pub fn is_initial(&self) -> bool { + // FIXME: add check for known initial id + self.parent_id.is_none() + } +} + impl Snapshot { pub const INITIAL_COMMIT_MESSAGE: &'static str = "Repository initialized"; diff --git a/icechunk/src/format/transaction_log.rs b/icechunk/src/format/transaction_log.rs index 71cf36351..92e9a7ed8 100644 --- a/icechunk/src/format/transaction_log.rs +++ b/icechunk/src/format/transaction_log.rs @@ -1,50 +1,49 @@ -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + hash::Hash, +}; use crate::change_set::ChangeSet; -use super::{ - snapshot::{NodeSnapshot, NodeType}, - ChunkIndices, NodeId, -}; +use super::{ChunkIndices, NodeId, Path}; -#[derive(Clone, Debug, PartialEq, Default)] -pub struct TransactionLog { +#[derive(Clone, Debug, PartialEq)] +pub struct Changes +where + Id: Eq + Hash, +{ // FIXME: better, more stable on-disk format - pub new_groups: HashSet, - pub new_arrays: HashSet, - pub deleted_groups: HashSet, - pub deleted_arrays: HashSet, - pub updated_user_attributes: HashSet, - pub updated_zarr_metadata: HashSet, - pub updated_chunks: HashMap>, + pub new_groups: HashSet, + pub new_arrays: HashSet, + pub deleted_groups: HashSet, + pub deleted_arrays: HashSet, + pub updated_user_attributes: HashSet, + pub updated_zarr_metadata: HashSet, + pub updated_chunks: HashMap, +} + +impl Default for Changes { + fn default() -> Self { + Self { + new_groups: Default::default(), + new_arrays: Default::default(), + deleted_groups: Default::default(), + deleted_arrays: Default::default(), + updated_user_attributes: Default::default(), + updated_zarr_metadata: Default::default(), + updated_chunks: Default::default(), + } + } } +pub type TransactionLog = Changes>; + impl TransactionLog { - pub fn new<'a>( - cs: &ChangeSet, - parent_nodes: impl Iterator, - child_nodes: impl Iterator, - ) -> Self { + pub fn new(cs: &ChangeSet) -> Self { let new_groups = cs.new_groups().map(|(_, node_id)| node_id).cloned().collect(); let new_arrays = cs.new_arrays().map(|(_, node_id)| node_id).cloned().collect(); - let parent_nodes = - parent_nodes.map(|n| (n.id.clone(), n.node_type())).collect::>(); - let child_nodes = - child_nodes.map(|n| (n.id.clone(), n.node_type())).collect::>(); - let mut deleted_groups = HashSet::new(); - let mut deleted_arrays = HashSet::new(); - - for (node_id, node_type) in parent_nodes.difference(&child_nodes) { - // TODO: we shouldn't need the following clones - match node_type { - NodeType::Group => { - deleted_groups.insert(node_id.clone()); - } - NodeType::Array => { - deleted_arrays.insert(node_id.clone()); - } - } - } + let deleted_groups = cs.deleted_groups().map(|(_, id)| id.clone()).collect(); + let deleted_arrays = cs.deleted_arrays().map(|(_, id)| id.clone()).collect(); let updated_user_attributes = cs.user_attributes_updated_nodes().cloned().collect(); @@ -79,4 +78,93 @@ impl TransactionLog { pub fn is_empty(&self) -> bool { self.len() == 0 } + + pub fn merge(&mut self, other: &TransactionLog) { + self.new_groups.extend(other.new_groups.iter().cloned()); + self.new_arrays.extend(other.new_arrays.iter().cloned()); + self.deleted_groups.extend(other.deleted_groups.iter().cloned()); + self.deleted_arrays.extend(other.deleted_arrays.iter().cloned()); + self.updated_user_attributes + .extend(other.updated_user_attributes.iter().cloned()); + self.updated_zarr_metadata.extend(other.updated_zarr_metadata.iter().cloned()); + for (node, chunks) in other.updated_chunks.iter() { + self.updated_chunks + .entry(node.clone()) + .and_modify(|set| set.extend(chunks.iter().cloned())) + .or_insert_with(|| chunks.clone()); + } + } +} + +pub type Diff = Changes; + +impl Diff { + pub fn from_transaction_log( + tx: &TransactionLog, + nodes: HashMap, + ) -> Self { + let new_groups = tx + .new_groups + .iter() + .flat_map(|node_id| nodes.get(node_id)) + .cloned() + .collect(); + let new_arrays = tx + .new_arrays + .iter() + .flat_map(|node_id| nodes.get(node_id)) + .cloned() + .collect(); + let deleted_groups = tx + .deleted_groups + .iter() + .flat_map(|node_id| nodes.get(node_id)) + .cloned() + .collect(); + let deleted_arrays = tx + .deleted_arrays + .iter() + .flat_map(|node_id| nodes.get(node_id)) + .cloned() + .collect(); + let updated_user_attributes = tx + .updated_user_attributes + .iter() + .flat_map(|node_id| nodes.get(node_id)) + .cloned() + .collect(); + let updated_zarr_metadata = tx + .updated_zarr_metadata + .iter() + .flat_map(|node_id| nodes.get(node_id)) + .cloned() + .collect(); + let updated_chunks = tx + .updated_chunks + .iter() + .flat_map(|(node_id, chunks)| { + nodes.get(node_id).map(|n| (n.clone(), chunks.len() as u64)) + }) + .collect(); + Self { + new_groups, + new_arrays, + deleted_groups, + deleted_arrays, + updated_user_attributes, + updated_zarr_metadata, + updated_chunks, + } + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.new_groups.is_empty() + && self.new_arrays.is_empty() + && self.deleted_groups.is_empty() + && self.deleted_arrays.is_empty() + && self.updated_user_attributes.is_empty() + && self.updated_user_attributes.is_empty() + && self.updated_chunks.is_empty() + } } diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index d0d682afc..bcdc7e798 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -1,11 +1,15 @@ use std::{ collections::{BTreeSet, HashMap, HashSet}, + future::ready, ops::RangeBounds, sync::Arc, }; use bytes::Bytes; -use futures::{stream::FuturesUnordered, Stream, StreamExt}; +use futures::{ + stream::{FuturesOrdered, FuturesUnordered}, + Stream, StreamExt, TryStreamExt, +}; use regex::bytes::Regex; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -18,6 +22,7 @@ use crate::{ error::ICError, format::{ snapshot::{ManifestFileInfo, NodeData, Snapshot, SnapshotInfo}, + transaction_log::{Diff, TransactionLog}, IcechunkFormatError, IcechunkFormatErrorKind, ManifestId, NodeId, Path, SnapshotId, }, @@ -26,7 +31,7 @@ use crate::{ list_branches, list_tags, update_branch, BranchVersion, Ref, RefError, RefErrorKind, }, - session::Session, + session::{Session, SessionErrorKind, SessionResult}, storage::{self, ETag, FetchConfigResult, StorageErrorKind, UpdateConfigResult}, virtual_chunks::{ContainerName, VirtualChunkResolver}, Storage, StorageError, @@ -584,6 +589,66 @@ impl Repository { } } + #[instrument(skip(self))] + /// Compute the diff between `from` and `to` snapshots. + /// + /// If `from` is not in the ancestry of `to`, `RepositoryErrorKind::BadSnapshotChainForDiff` + /// will be returned. + /// + /// Result includes the diffs in both `from` and `to` snapshots. + pub async fn diff( + &self, + from: &VersionInfo, + to: &VersionInfo, + ) -> SessionResult { + let from = self.resolve_version(from).await?; + let from_info = + SnapshotInfo::from(self.asset_manager.fetch_snapshot(&from).await?.as_ref()); + let all_snaps = self + .ancestry(to) + .await? + .try_take_while(|snap_info| ready(Ok(snap_info.id != from))) + .try_collect::>() + .await?; + + if all_snaps.last().and_then(|info| info.parent_id.as_ref()) != Some(&from) { + return Err(SessionErrorKind::BadSnapshotChainForDiff.into()); + } + + let fut: FuturesOrdered<_> = all_snaps + .iter() + .chain(std::iter::once(&from_info)) + .filter_map(|snap_info| { + if snap_info.is_initial() { + None + } else { + Some( + self.asset_manager + .fetch_transaction_log(&snap_info.id) + .in_current_span(), + ) + } + }) + .collect(); + + let full_log = fut + .try_fold(TransactionLog::default(), |mut res, log| { + res.merge(log.as_ref()); + ready(Ok(res)) + }) + .await?; + + if let Some(to_snap) = all_snaps.first().as_ref().map(|snap| snap.id.clone()) { + let from_session = + self.readonly_session(&VersionInfo::SnapshotId(from)).await?; + let to_session = + self.readonly_session(&VersionInfo::SnapshotId(to_snap)).await?; + tx_to_diff(&full_log, &from_session, &to_session).await + } else { + Err(SessionErrorKind::BadSnapshotChainForDiff.into()) + } + } + #[instrument(skip(self))] pub async fn readonly_session( &self, @@ -598,7 +663,6 @@ impl Repository { self.virtual_resolver.clone(), snapshot_id.clone(), ); - self.preload_manifests(snapshot_id); Ok(session) @@ -751,6 +815,20 @@ pub async fn raise_if_invalid_snapshot_id( Ok(()) } +pub async fn tx_to_diff( + tx: &TransactionLog, + from: &Session, + to: &Session, +) -> SessionResult { + let nodes: HashMap = from + .list_nodes() + .await? + .chain(to.list_nodes().await?) + .map(|n| (n.id, n.path)) + .collect(); + Ok(Diff::from_transaction_log(tx, nodes)) +} + #[cfg(test)] #[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used)] mod tests { diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index c702b2e3f..0825930e6 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -33,13 +33,13 @@ use crate::{ ManifestFileInfo, NodeData, NodeSnapshot, NodeType, Snapshot, SnapshotProperties, UserAttributesSnapshot, ZarrArrayMetadata, }, - transaction_log::TransactionLog, + transaction_log::{Diff, TransactionLog}, ByteRange, ChunkIndices, ChunkOffset, IcechunkFormatError, IcechunkFormatErrorKind, ManifestId, NodeId, ObjectId, Path, SnapshotId, }, metadata::UserAttributes, refs::{fetch_branch_tip, update_branch, RefError, RefErrorKind}, - repository::{RepositoryError, RepositoryErrorKind}, + repository::{tx_to_diff, RepositoryError, RepositoryErrorKind}, storage::{self, StorageErrorKind}, virtual_chunks::{VirtualChunkContainer, VirtualChunkResolver}, RepositoryConfig, Storage, StorageError, @@ -104,6 +104,8 @@ pub enum SessionErrorKind { "invalid chunk index: coordinates {coords:?} are not valid for array at {path}" )] InvalidIndex { coords: ChunkIndices, path: Path }, + #[error("`to` snapshot ancestry doesn't include `from`")] + BadSnapshotChainForDiff, } pub type SessionError = ICError; @@ -249,6 +251,20 @@ impl Session { self.virtual_resolver.matching_container(chunk_location.0.as_str()) } + /// Compute an overview of the current session changes + pub async fn status(&self) -> SessionResult { + let tx_log = TransactionLog::new(&self.change_set); + let from_session = Self::create_readonly_session( + self.config().clone(), + self.storage_settings.as_ref().clone(), + Arc::clone(&self.storage), + Arc::clone(&self.asset_manager), + Arc::clone(&self.virtual_resolver), + self.snapshot_id.clone(), + ); + tx_to_diff(&tx_log, &from_session, self).await + } + /// Add a group to the store. /// /// Calling this only records the operation in memory, doesn't have any consequence on the storage @@ -1521,11 +1537,7 @@ async fn flush( trace!(transaction_log_id = %new_snapshot.id(), "Creating transaction log"); // FIXME: this should execute in a non-blocking context - let tx_log = TransactionLog::new( - flush_data.change_set, - old_snapshot.iter(), - new_snapshot.iter(), - ); + let tx_log = TransactionLog::new(flush_data.change_set); let new_snapshot_id = new_snapshot.id(); flush_data @@ -2079,16 +2091,25 @@ mod tests { let mut ds = repository.writable_session("main").await?; + let initial_snapshot = repository.lookup_branch("main").await?; + + let diff = ds.status().await?; + assert!(diff.is_empty()); + // add a new array and retrieve its node ds.add_group(Path::root()).await?; - let snapshot_id = + let diff = ds.status().await?; + assert!(!diff.is_empty()); + assert_eq!(diff.new_groups, [Path::root()].into()); + + let first_commit = ds.commit("commit", Some(SnapshotProperties::default())).await?; // We need a new session after the commit let mut ds = repository.writable_session("main").await?; //let node_id3 = NodeId::random(); - assert_eq!(snapshot_id, ds.snapshot_id); + assert_eq!(first_commit, ds.snapshot_id); assert!(matches!( ds.get_node(&Path::root()).await.ok(), Some(NodeSnapshot { path, user_attributes, node_data, .. }) @@ -2133,6 +2154,10 @@ mod tests { let new_array_path: Path = "/group/array1".try_into().unwrap(); ds.add_array(new_array_path.clone(), zarr_meta.clone()).await?; + let diff = ds.status().await?; + assert!(!diff.is_empty()); + assert_eq!(diff.new_arrays, [new_array_path.clone()].into()); + // wo commit to test the case of a chunkless array let _snapshot_id = ds.commit("commit", Some(SnapshotProperties::default())).await?; @@ -2155,6 +2180,10 @@ mod tests { ) .await?; + let diff = ds.status().await?; + assert!(!diff.is_empty()); + assert_eq!(diff.updated_chunks, [(new_array_path.clone(), 1)].into()); + let _snapshot_id = ds.commit("commit", Some(SnapshotProperties::default())).await?; @@ -2299,6 +2328,26 @@ mod tests { Some(ChunkPayload::Inline("new chunk".into())) ); + let diff = repository + .diff( + &VersionInfo::SnapshotId(initial_snapshot), + &VersionInfo::BranchTipRef("main".to_string()), + ) + .await?; + + assert!(diff.deleted_groups.is_empty()); + assert!(diff.deleted_arrays.is_empty()); + assert_eq!( + &diff.new_groups, + &["/".try_into().unwrap(), "/group".try_into().unwrap()].into() + ); + assert_eq!( + &diff.new_arrays, + &[new_array_path.clone()].into() // we never committed array2 + ); + assert_eq!(&diff.updated_chunks, &[(new_array_path.clone(), 2)].into()); + assert_eq!(&diff.updated_user_attributes, &[new_array_path.clone()].into()); + assert_eq!(&diff.updated_zarr_metadata, &[new_array_path.clone()].into()); Ok(()) }