Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

position_deletes metadata table #1615

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,67 @@ readable_metrics: [

To show only data files or delete files in the current snapshot, use `table.inspect.data_files()` and `table.inspect.delete_files()` respectively.

### Position deletes

Inspect the positional delete files in the current snapshot of the table:

```python
table.inspect.position_deletes()
```

```python
pyarrow.Table
file_path: string not null
pos: int64 not null
row: struct<id: int32, data: large_string>
child 0, id: int32
child 1, data: large_string
partition: struct<data: large_string> not null
child 0, data: large_string
spec_id: int64
delete_file_path: string not null
----
file_path: [[],[],[],["s3://warehouse/default/table_metadata_position_deletes/data/data=a/00000-1-acbf93b7-f760-4517-aa84-b9240902d3d2-0-00001.parquet"]]
pos: [[],[],[],[0]]
row: [
-- is_valid: all not null
-- child 0 type: int32
[]
-- child 1 type: large_string
[],
-- is_valid: all not null
-- child 0 type: int32
[]
-- child 1 type: large_string
[],
-- is_valid: all not null
-- child 0 type: int32
[]
-- child 1 type: large_string
[],
-- is_valid: [false]
-- child 0 type: int32
[0]
-- child 1 type: large_string
[""]]
partition: [
-- is_valid: all not null
-- child 0 type: large_string
[],
-- is_valid: all not null
-- child 0 type: large_string
[],
-- is_valid: all not null
-- child 0 type: large_string
[],
-- is_valid: all not null
-- child 0 type: large_string
["a"]]
spec_id: [[],[],[],[0]]
delete_file_path: [[],[],[],["s3://warehouse/default/table_metadata_position_deletes/data/data=a/00000-5-bc7a1d8a-fefe-4277-b4ac-8f1dd7badb7a-00001-deletes.parquet"]]

```

## Add Files

Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.
Expand Down
29 changes: 22 additions & 7 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
DataFile,
DataFileContent,
FileFormat,
PositionDelete,
)
from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec, partition_record_value
from pyiceberg.schema import (
Expand Down Expand Up @@ -889,15 +890,29 @@ def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs:
return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs)


def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:
delete_fragment = _construct_fragment(
fs, data_file, file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE}
)
def _read_delete_file(fs: FileSystem, data_file: DataFile) -> Iterator[PositionDelete]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very similar to _read_deletes, do you think there's a way we can reimplement _read_deletes to return PositionDelete and refactor its usage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure i did it right , i converted _read_deletes to use _read_delete_file so we don't implement the reading part twice , and kept the logic of _read_deletes to return Dict[str, pa.ChunkedArray]
wdyt ?

delete_fragment = _construct_fragment(fs, data_file, file_format_kwargs={"pre_buffer": True, "buffer_size": ONE_MEGABYTE})
table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
table = table.unify_dictionaries()
for batch in table.to_batches():
for i in range(len(batch)):
row = batch.column("row")[i].as_py() if "row" in batch.schema.names else None
yield PositionDelete(
file_path=batch.column("file_path")[i].as_py(),
pos=batch.column("pos")[i].as_py(),
row=row, # Setting row as None since it's optional and not needed for position deletes
)


def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:
deletes_by_file: Dict[str, List[int]] = {}
for delete in _read_delete_file(fs, data_file):
if delete.path not in deletes_by_file:
deletes_by_file[delete.path] = []
deletes_by_file[delete.path].append(delete.pos)

# Convert lists of positions to ChunkedArrays
return {
file.as_py(): table.filter(pc.field("file_path") == file).column("pos")
for file in table.column("file_path").chunks[0].dictionary
file_path: pa.chunked_array([pa.array(positions, type=pa.int64())]) for file_path, positions in deletes_by_file.items()
}


Expand Down
28 changes: 28 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,34 @@ def data_file_with_partition(partition_type: StructType, format_version: TableVe
)


class PositionDelete(Record):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__slots__ = ("file_path", "pos", "row")
path: str
pos: int
row: Optional[Record]

def __setattr__(self, name: str, value: Any) -> None:
"""Assign a key/value to a PositionDelete."""
super().__setattr__(name, value)

def __init__(self, file_path: str, pos: int, row: Optional[Record], *data: Any, **named_data: Any) -> None:
super().__init__(*data, **named_data)
self.path = file_path
self.pos = pos
self.row = row

def __hash__(self) -> int:
"""Return the hash of the file path."""
return hash(self.path)

def __eq__(self, other: Any) -> bool:
"""Compare the PositionDelete with another object.

If it is a PositionDelete, it will compare based on the file_path.
"""
return self.path == other.path if isinstance(other, PositionDelete) else False


class DataFile(Record):
__slots__ = (
"content",
Expand Down
81 changes: 80 additions & 1 deletion pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple

from pyiceberg.conversions import from_bytes
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.snapshots import Snapshot, ancestors_of
from pyiceberg.types import PrimitiveType
from pyiceberg.utils.concurrent import ExecutorFactory
Expand Down Expand Up @@ -384,6 +385,28 @@ def _get_all_manifests_schema(self) -> "pa.Schema":
all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False))
return all_manifests_schema

def _get_positional_deletes_schema(self, schema: Optional[Schema] = None, spec_id: Optional[int] = None) -> "pa.Schema":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow

schema = schema or self.tbl.metadata.schema()

partition_struct = self.tbl.metadata.spec_struct(spec_id=spec_id)
pa_partition_struct = schema_to_pyarrow(partition_struct)
pa_row_struct = schema_to_pyarrow(schema.as_struct())
positional_delete_schema = pa.schema(
[
pa.field("file_path", pa.string(), nullable=False),
pa.field("pos", pa.int64(), nullable=False),
pa.field("row", pa_row_struct, nullable=True),
pa.field("partition", pa_partition_struct, nullable=False),
pa.field("spec_id", pa.int64(), nullable=True),
pa.field("delete_file_path", pa.string(), nullable=False),
]
)
return positional_delete_schema

def _generate_manifests_table(self, snapshot: Optional[Snapshot], is_all_manifests_table: bool = False) -> "pa.Table":
import pyarrow as pa

Expand Down Expand Up @@ -453,6 +476,39 @@ def _partition_summaries_to_rows(
schema=self._get_all_manifests_schema() if is_all_manifests_table else self._get_manifests_schema(),
)

def _generate_positional_delete_table(self, manifest: ManifestFile, schema: Schema) -> "pa.Table":
import pyarrow as pa

positional_deletes: List["pa.Table"] = []

position_deletes_schema = self._get_positional_deletes_schema(schema=schema, spec_id=manifest.partition_spec_id)

if manifest.content == ManifestContent.DELETES:
for entry in manifest.fetch_manifest_entry(self.tbl.io):
if entry.data_file.content == DataFileContent.POSITION_DELETES:
from pyiceberg.io.pyarrow import _fs_from_file_path, _read_delete_file

positional_delete_file = _read_delete_file(
_fs_from_file_path(self.tbl.io, entry.data_file.file_path), entry.data_file
)
positional_deletes_records = []
for record in positional_delete_file:
row = {
"file_path": record.path,
"pos": record.pos,
"row": record.row,
"partition": entry.data_file.partition.__dict__,
"spec_id": manifest.partition_spec_id,
"delete_file_path": entry.data_file.file_path,
}
positional_deletes_records.append(row)

positional_deletes.append(pa.Table.from_pylist(positional_deletes_records, position_deletes_schema))

if not positional_deletes:
return pa.Table.from_pylist([], position_deletes_schema)
return pa.concat_tables(positional_deletes)

def manifests(self) -> "pa.Table":
return self._generate_manifests_table(self.tbl.current_snapshot())

Expand Down Expand Up @@ -657,3 +713,26 @@ def all_manifests(self) -> "pa.Table":
lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots]
)
return pa.concat_tables(manifests_by_snapshots)

def position_deletes(self, snapshot_id: Optional[int] = None) -> "pa.Table":
import pyarrow as pa

snapshot = self._get_snapshot(snapshot_id) if snapshot_id else self.tbl.current_snapshot()
if not snapshot:
schema = self._get_positional_deletes_schema()
return pa.Table.from_pylist([], schema=schema)

if snapshot.schema_id is None:
raise ValueError(f"Snapshot {snapshot.snapshot_id} does not have a schema id")

schemas = self.tbl.schemas()
schema = schemas.get(snapshot.schema_id, None)
if not schema:
raise ValueError(f"Cannot find schema with id: {snapshot.schema_id}")

executor = ExecutorFactory.get_or_create()
positional_deletes: Iterator["pa.Table"] = executor.map(
lambda manifest: self._generate_positional_delete_table(manifest, schema=schema),
snapshot.manifests(self.tbl.io),
)
return pa.concat_tables(positional_deletes)
31 changes: 31 additions & 0 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,37 @@ def specs_struct(self) -> StructType:

return StructType(*nested_fields)

def spec_struct(self, spec_id: Optional[int] = None) -> StructType:
"""Produce for a spec_id a struct of PartitionSpecs.

The partition fields should be optional: Partition fields may be added later,
in which case not all files would have the result field, and it may be null.

:return: A StructType that represents a PartitionSpec of the table for a specific spec_id or latest.
"""
if spec_id is None:
spec = self.spec()
else:
specs = self.specs()
filtered_spec = list(filter(lambda spec: spec.spec_id == spec_id, specs.values()))
if not filtered_spec:
raise ValidationError(f"Spec with spec_id {spec_id} not found")
spec = filtered_spec[0]
# Collect all the fields
struct_fields = {field.field_id: field for field in spec.fields}

schema = self.schema()

nested_fields = []
# Sort them by field_id in order to get a deterministic output
for field_id in sorted(struct_fields):
field = struct_fields[field_id]
source_type = schema.find_type(field.source_id)
result_type = field.transform.result_type(source_type)
nested_fields.append(NestedField(field_id=field.field_id, name=field.name, type=result_type, required=False))

return StructType(*nested_fields)

def new_snapshot_id(self) -> int:
"""Generate a new snapshot-id that's not in use."""
snapshot_id = _generate_snapshot_id()
Expand Down
53 changes: 53 additions & 0 deletions tests/integration/test_inspect_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,3 +938,56 @@ def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, fo
lhs = spark.table(f"{identifier}.all_manifests").toPandas()
rhs = df.to_pandas()
assert_frame_equal(lhs, rhs, check_dtype=False)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_positional_deletes(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
from pandas.testing import assert_frame_equal

identifier = "default.table_metadata_position_deletes"
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

spark.sql(
f"""
CREATE TABLE {identifier} (
id int,
data string
)
PARTITIONED BY (data)
TBLPROPERTIES ('write.update.mode'='merge-on-read',
'write.delete.mode'='merge-on-read')
"""
)
tbl = session_catalog.load_table(identifier)

spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')")

spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')")

spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1")

spark.sql(f"DELETE FROM {identifier} WHERE id = 2")

tbl.refresh()
df = tbl.inspect.position_deletes()

assert df.column_names == ["file_path", "pos", "row", "partition", "spec_id", "delete_file_path"]

int_cols = ["pos"]
string_cols = ["file_path", "delete_file_path"]

for column in int_cols:
for value in df[column]:
assert isinstance(value.as_py(), int)

for column in string_cols:
for value in df[column]:
assert isinstance(value.as_py(), str)

lhs = spark.table(f"{identifier}.position_deletes").toPandas()
rhs = df.to_pandas()
assert_frame_equal(lhs, rhs, check_dtype=False)