Skip to content

Commit

Permalink
Icechunk learns how to do diff and status (#711)
Browse files Browse the repository at this point in the history
Closes: #309, #679

Co-authored-by: Matthew Iannucci <matthew@earthmover.io>
  • Loading branch information
paraseba and mpiannucci authored Feb 10, 2025
1 parent f80cb79 commit 33824b2
Show file tree
Hide file tree
Showing 12 changed files with 539 additions and 53 deletions.
2 changes: 2 additions & 0 deletions icechunk-python/python/icechunk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
ConflictSolver,
ConflictType,
Credentials,
Diff,
GcsCredentials,
GcsStaticCredentials,
GCSummary,
Expand Down Expand Up @@ -91,6 +92,7 @@
"ConflictSolver",
"ConflictType",
"Credentials",
"Diff",
"GCSummary",
"GcsCredentials",
"GcsStaticCredentials",
Expand Down
26 changes: 26 additions & 0 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,22 @@ class RepositoryConfig:
def set_virtual_chunk_container(self, cont: VirtualChunkContainer) -> None: ...
def clear_virtual_chunk_containers(self) -> None: ...

class Diff:
@property
def new_groups(self) -> set[str]: ...
@property
def new_arrays(self) -> set[str]: ...
@property
def deleted_groups(self) -> set[str]: ...
@property
def deleted_arrays(self) -> set[str]: ...
@property
def updated_user_attributes(self) -> set[str]: ...
@property
def updated_zarr_metadata(self) -> set[str]: ...
@property
def updated_chunks(self) -> dict[str, int]: ...

class GCSummary:
@property
def chunks_deleted(self) -> int: ...
Expand Down Expand Up @@ -332,6 +348,15 @@ class PyRepository:
def create_tag(self, tag: str, snapshot_id: str) -> None: ...
def list_tags(self) -> set[str]: ...
def lookup_tag(self, tag: str) -> str: ...
def diff(
self,
from_branch: str | None = None,
from_tag: str | None = None,
from_snapshot: str | None = None,
to_branch: str | None = None,
to_tag: str | None = None,
to_snapshot: str | None = None,
) -> Diff: ...
def readonly_session(
self,
*,
Expand Down Expand Up @@ -364,6 +389,7 @@ class PySession:
def branch(self) -> str | None: ...
@property
def has_uncommitted_changes(self) -> bool: ...
def status(self) -> Diff: ...
def discard_changes(self) -> None: ...
def all_virtual_chunk_locations(self) -> list[str]: ...
def chunk_coordinates(
Expand Down
33 changes: 33 additions & 0 deletions icechunk-python/python/icechunk/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Self

from icechunk._icechunk_python import (
Diff,
GCSummary,
PyRepository,
RepositoryConfig,
Expand Down Expand Up @@ -388,6 +389,38 @@ def lookup_tag(self, tag: str) -> str:
"""
return self._repository.lookup_tag(tag)

def diff(
self,
*,
from_branch: str | None = None,
from_tag: str | None = None,
from_snapshot: str | None = None,
to_branch: str | None = None,
to_tag: str | None = None,
to_snapshot: str | None = None,
) -> Diff:
"""
Compute an overview of the operations executed from version `from` to version `to`.
Both versions, `from` and `to`, must be identified. Identification can be done using a branch, tag or snapshot id.
The styles used to identify the `from` and `to` versions can be different.
The `from` version must be a member of the `ancestry` of `to`.
Returns
-------
Diff
The operations executed between the two versions
"""
return self._repository.diff(
from_branch=from_branch,
from_tag=from_tag,
from_snapshot=from_snapshot,
to_branch=to_branch,
to_tag=to_tag,
to_snapshot=to_snapshot,
)

def readonly_session(
self,
*,
Expand Down
12 changes: 12 additions & 0 deletions icechunk-python/python/icechunk/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Conflict,
ConflictErrorData,
ConflictSolver,
Diff,
RebaseFailedData,
)
from icechunk._icechunk_python import PyConflictError, PyRebaseFailedError, PySession
Expand Down Expand Up @@ -179,6 +180,17 @@ def has_uncommitted_changes(self) -> bool:
"""
return self._session.has_uncommitted_changes

def status(self) -> Diff:
"""
Compute an overview of the current session changes
Returns
-------
Diff
The operations executed in the current session but still not committed.
"""
return self._session.status()

def discard_changes(self) -> None:
"""
When the session is writable, discard any uncommitted changes.
Expand Down
3 changes: 2 additions & 1 deletion icechunk-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use errors::{
};
use icechunk::{format::format_constants::SpecVersionBin, initialize_tracing};
use pyo3::prelude::*;
use repository::{PyGCSummary, PyRepository, PySnapshotInfo};
use repository::{PyDiff, PyGCSummary, PyRepository, PySnapshotInfo};
use session::PySession;
use store::PyStore;

Expand Down Expand Up @@ -81,6 +81,7 @@ fn _icechunk_python(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyManifestConfig>()?;
m.add_class::<PyStorageSettings>()?;
m.add_class::<PyGCSummary>()?;
m.add_class::<PyDiff>()?;
m.add_function(wrap_pyfunction!(initialize_logs, m)?)?;
m.add_function(wrap_pyfunction!(spec_version, m)?)?;

Expand Down
154 changes: 154 additions & 0 deletions icechunk-python/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use icechunk::{
config::Credentials,
format::{
snapshot::{SnapshotInfo, SnapshotProperties},
transaction_log::Diff,
SnapshotId,
},
ops::gc::{expire, garbage_collect, ExpiredRefAction, GCConfig, GCSummary},
Expand Down Expand Up @@ -182,6 +183,131 @@ impl PySnapshotInfo {
}
}

#[pyclass(name = "Diff", eq)]
#[derive(Debug, PartialEq, Eq, Default)]
pub struct PyDiff {
#[pyo3(get)]
pub new_groups: HashSet<String>,
#[pyo3(get)]
pub new_arrays: HashSet<String>,
#[pyo3(get)]
pub deleted_groups: HashSet<String>,
#[pyo3(get)]
pub deleted_arrays: HashSet<String>,
#[pyo3(get)]
pub updated_user_attributes: HashSet<String>,
#[pyo3(get)]
pub updated_zarr_metadata: HashSet<String>,
#[pyo3(get)]
pub updated_chunks: HashMap<String, u64>,
}

impl From<Diff> for PyDiff {
fn from(value: Diff) -> Self {
let new_groups =
value.new_groups.into_iter().map(|path| path.to_string()).collect();
let new_arrays =
value.new_arrays.into_iter().map(|path| path.to_string()).collect();
let deleted_groups =
value.deleted_groups.into_iter().map(|path| path.to_string()).collect();
let deleted_arrays =
value.deleted_arrays.into_iter().map(|path| path.to_string()).collect();
let updated_user_attributes = value
.updated_user_attributes
.into_iter()
.map(|path| path.to_string())
.collect();
let updated_zarr_metadata = value
.updated_zarr_metadata
.into_iter()
.map(|path| path.to_string())
.collect();
let updated_chunks =
value.updated_chunks.into_iter().map(|(k, v)| (k.to_string(), v)).collect();

PyDiff {
new_groups,
new_arrays,
deleted_groups,
deleted_arrays,
updated_user_attributes,
updated_zarr_metadata,
updated_chunks,
}
}
}

#[pymethods]
impl PyDiff {
pub fn __repr__(&self) -> String {
let mut res = String::new();
use std::fmt::Write;

if !self.new_groups.is_empty() {
res.push_str("Groups created:\n");
//print()
for g in self.new_groups.iter() {
writeln!(res, " {}", g).unwrap();
}
res.push('\n');
}
if !self.new_arrays.is_empty() {
res.push_str("Arrays created:\n");
//print()
for g in self.new_arrays.iter() {
writeln!(res, " {}", g).unwrap();
}
res.push('\n');
}

if !self.updated_zarr_metadata.is_empty() {
res.push_str("Zarr metadata updated:\n");
//print()
for g in self.updated_zarr_metadata.iter() {
writeln!(res, " {}", g).unwrap();
}
res.push('\n');
}

if !self.updated_user_attributes.is_empty() {
res.push_str("User attributes updated:\n");
//print()
for g in self.updated_user_attributes.iter() {
writeln!(res, " {}", g).unwrap();
}
res.push('\n');
}

if !self.deleted_groups.is_empty() {
res.push_str("Groups deleted:\n");
//print()
for g in self.deleted_groups.iter() {
writeln!(res, " {}", g).unwrap();
}
res.push('\n');
}

if !self.deleted_arrays.is_empty() {
res.push_str("Arrays deleted:\n");
//print()
for g in self.deleted_arrays.iter() {
writeln!(res, " {}", g).unwrap();
}
res.push('\n');
}

if !self.updated_chunks.is_empty() {
res.push_str("Number of chunks updated:\n");
//print()
for (path, n) in self.updated_chunks.iter() {
writeln!(res, " {}: {}", path, n).unwrap();
}
res.push('\n');
}
res
}
}

#[pyclass(name = "GCSummary", eq)]
#[derive(Debug, PartialEq, Eq, Default)]
pub struct PyGCSummary {
Expand Down Expand Up @@ -586,6 +712,34 @@ impl PyRepository {
})
}

#[pyo3(signature = (*, from_branch=None, from_tag=None, from_snapshot=None, to_branch=None, to_tag=None, to_snapshot=None))]
#[allow(clippy::too_many_arguments)]
pub fn diff(
&self,
py: Python<'_>,
from_branch: Option<String>,
from_tag: Option<String>,
from_snapshot: Option<String>,
to_branch: Option<String>,
to_tag: Option<String>,
to_snapshot: Option<String>,
) -> PyResult<PyDiff> {
let from = args_to_version_info(from_branch, from_tag, from_snapshot)?;
let to = args_to_version_info(to_branch, to_tag, to_snapshot)?;

// This function calls block_on, so we need to allow other thread python to make progress
py.allow_threads(move || {
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
let diff = self
.0
.diff(&from, &to)
.await
.map_err(PyIcechunkStoreError::SessionError)?;
Ok(diff.into())
})
})
}

#[pyo3(signature = (*, branch = None, tag = None, snapshot = None))]
pub fn readonly_session(
&self,
Expand Down
15 changes: 14 additions & 1 deletion icechunk-python/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::sync::{Mutex, RwLock};
use crate::{
conflicts::PyConflictSolver,
errors::{PyIcechunkStoreError, PyIcechunkStoreResult},
repository::PySnapshotProperties,
repository::{PyDiff, PySnapshotProperties},
store::PyStore,
streams::PyAsyncGenerator,
};
Expand Down Expand Up @@ -73,6 +73,19 @@ impl PySession {
py.allow_threads(move || self.0.blocking_read().has_uncommitted_changes())
}

pub fn status(&self, py: Python<'_>) -> PyResult<PyDiff> {
// This is blocking function, we need to release the Gil
py.allow_threads(move || {
let session = self.0.blocking_read();

pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
let res =
session.status().await.map_err(PyIcechunkStoreError::SessionError)?;
Ok(res.into())
})
})
}

pub fn discard_changes(&self, py: Python<'_>) {
// This is blocking function, we need to release the Gil
py.allow_threads(move || {
Expand Down
Loading

0 comments on commit 33824b2

Please sign in to comment.