Skip to content

Commit

Permalink
Python's Repository.ancestry now returns an iterator
Browse files Browse the repository at this point in the history
Closes: #750
  • Loading branch information
paraseba committed Feb 19, 2025
1 parent fb3f768 commit e40835f
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 37 deletions.
4 changes: 2 additions & 2 deletions docs/docs/icechunk-python/cheatsheets/git-users.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ At this point, the tip of the branch is now the snapshot `198273178639187` and a
In Icechunk, you can view the history of a branch by using the [`repo.ancestry()`](../reference/#icechunk.Repository.ancestry) command, similar to the `git log` command.

```python
repo.ancestry(branch="my-new-branch")
[ancestor for ancestor in repo.ancestry(branch="my-new-branch")]

#[Snapshot(id='198273178639187', ...), ...]
```
Expand Down Expand Up @@ -156,7 +156,7 @@ We can also view the history of a tag by using the [`repo.ancestry()`](../refere
repo.ancestry(tag="my-new-tag")
```

This will return a list of snapshots that are ancestors of the tag. Similar to branches we can lookup the snapshot that a tag is based on by using the [`repo.lookup_tag()`](../reference/#icechunk.Repository.lookup_tag) command.
This will return an iterator of snapshots that are ancestors of the tag. Similar to branches we can lookup the snapshot that a tag is based on by using the [`repo.lookup_tag()`](../reference/#icechunk.Repository.lookup_tag) command.

```python
repo.lookup_tag("my-new-tag")
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/icechunk-python/version-control.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ repo = icechunk.Repository.create(icechunk.in_memory_storage())
On creating a new [`Repository`](../reference/#icechunk.Repository), it will automatically create a `main` branch with an initial snapshot. We can take a look at the ancestry of the `main` branch to confirm this.

```python
repo.ancestry(branch="main")
[ancestor for ancestor in repo.ancestry(branch="main")]

# [SnapshotInfo(id="A840RMN5CF807CM66RY0", parent_id=None, written_at=datetime.datetime(2025,1,30,19,52,41,592998, tzinfo=datetime.timezone.utc), message="Repository...")]
```
Expand All @@ -36,7 +36,7 @@ repo.ancestry(branch="main")

The [`ancestry`](./reference/#icechunk.Repository.ancestry) method can be used to inspect the ancestry of any branch, snapshot, or tag.

We get back a list of [`SnapshotInfo`](../reference/#icechunk.SnapshotInfo) objects, which contain information about the snapshot, including its ID, the ID of its parent snapshot, and the time it was written.
We get back an iterator of [`SnapshotInfo`](../reference/#icechunk.SnapshotInfo) objects, which contain information about the snapshot, including its ID, the ID of its parent snapshot, and the time it was written.

## Creating a snapshot

Expand Down
4 changes: 2 additions & 2 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import abc
import datetime
from collections.abc import AsyncGenerator, AsyncIterator
from collections.abc import AsyncGenerator, AsyncIterator, Iterator
from enum import Enum
from typing import Any

Expand Down Expand Up @@ -907,7 +907,7 @@ class PyRepository:
branch: str | None = None,
tag: str | None = None,
snapshot: str | None = None,
) -> list[SnapshotInfo]: ...
) -> Iterator[SnapshotInfo]: ...
def async_ancestry(
self,
*,
Expand Down
2 changes: 1 addition & 1 deletion icechunk-python/python/icechunk/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def ancestry(
-----
Only one of the arguments can be specified.
"""
return self._repository.ancestry(branch=branch, tag=tag, snapshot=snapshot)
return self._repository.async_ancestry(branch=branch, tag=tag, snapshot=snapshot)

def async_ancestry(
self,
Expand Down
29 changes: 1 addition & 28 deletions icechunk-python/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,34 +511,7 @@ impl PyRepository {
PyStorage(Arc::clone(self.0.storage()))
}

#[pyo3(signature = (*, branch = None, tag = None, snapshot = None))]
pub fn ancestry(
&self,
py: Python<'_>,
branch: Option<String>,
tag: Option<String>,
snapshot: Option<String>,
) -> PyResult<Vec<PySnapshotInfo>> {
// This function calls block_on, so we need to allow other thread python to make progress
py.allow_threads(move || {
let version = args_to_version_info(branch, tag, snapshot)?;

// TODO: this holds everything in memory
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
let ancestry = self
.0
.ancestry(&version)
.await
.map_err(PyIcechunkStoreError::RepositoryError)?
.map_ok(Into::<PySnapshotInfo>::into)
.try_collect::<Vec<_>>()
.await
.map_err(PyIcechunkStoreError::RepositoryError)?;
Ok(ancestry)
})
})
}

/// Returns an object that is both a sync and an async iterator
#[pyo3(signature = (*, branch = None, tag = None, snapshot = None))]
pub fn async_ancestry(
&self,
Expand Down
30 changes: 29 additions & 1 deletion icechunk-python/src/streams.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::{pin::Pin, sync::Arc};

use futures::{Stream, StreamExt};
use pyo3::{exceptions::PyStopAsyncIteration, prelude::*};
use pyo3::{
exceptions::{PyStopAsyncIteration, PyStopIteration},
prelude::*,
};
use tokio::sync::Mutex;

type PyObjectStream = Arc<Mutex<Pin<Box<dyn Stream<Item = PyResult<Py<PyAny>>> + Send>>>>;
Expand Down Expand Up @@ -31,6 +34,10 @@ impl PyAsyncGenerator {
slf
}

fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}

/// This is an anext implementation.
///
/// Notable thing here is that we return PyResult<Option<PyObject>>.
Expand Down Expand Up @@ -62,4 +69,25 @@ impl PyAsyncGenerator {
// of pyo3?
pyo3_async_runtimes::tokio::future_into_py(py, future)
}

fn __next__<'py>(
slf: PyRefMut<'py, Self>,
py: Python<'py>,
) -> PyResult<Option<PyObject>> {
// Arc::clone is cheap, so we can clone the Arc here because we move into the
// future block
let stream = slf.stream.clone();

py.allow_threads(move || {
let next = pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
let mut unlocked = stream.lock().await;
unlocked.next().await
});
match next {
Some(Ok(val)) => Ok(Some(val)),
Some(Err(err)) => Err(err),
None => Err(PyStopIteration::new_err("The iterator is exhausted")),
}
})
}
}
2 changes: 1 addition & 1 deletion icechunk-python/tests/test_commit_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_property_types() -> None:
}
snapshot_id = session.commit("some commit", props)

info = repo.ancestry(branch="main")[0]
info = [p for p in repo.ancestry(branch="main")][0]
assert info.message == "some commit"
assert info.id == snapshot_id
assert info.parent_id == parent_id
Expand Down

0 comments on commit e40835f

Please sign in to comment.