-
Notifications
You must be signed in to change notification settings - Fork 229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
position_deletes metadata table #1615
base: main
Are you sure you want to change the base?
Changes from 3 commits
a26ca73
fb7228a
7ee20ea
d2c58b2
312d7e0
e4ed25e
9d42fae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -320,6 +320,34 @@ def data_file_with_partition(partition_type: StructType, format_version: TableVe | |
) | ||
|
||
|
||
class PositionDelete(Record): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
__slots__ = ("file_path", "pos", "row") | ||
file_path: str | ||
pos: int | ||
row: Optional[Record] | ||
|
||
def __setattr__(self, name: str, value: Any) -> None: | ||
"""Assign a key/value to a PositionDelete.""" | ||
super().__setattr__(name, value) | ||
|
||
def __init__(self, file_path: str, pos: int, row: Optional[Record], *data: Any, **named_data: Any) -> None: | ||
super().__init__(*data, **named_data) | ||
self.file_path = file_path | ||
self.pos = pos | ||
self.row = row | ||
|
||
def __hash__(self) -> int: | ||
"""Return the hash of the file path.""" | ||
return hash(self.file_path) | ||
|
||
def __eq__(self, other: Any) -> bool: | ||
"""Compare the PositionDelete with another object. | ||
|
||
If it is a PositionDelete, it will compare based on the file_path. | ||
""" | ||
return self.file_path == other.file_path if isinstance(other, PositionDelete) else False | ||
|
||
|
||
class DataFile(Record): | ||
__slots__ = ( | ||
"content", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple | ||
|
||
from pyiceberg.conversions import from_bytes | ||
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary | ||
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary | ||
from pyiceberg.partitioning import PartitionSpec | ||
from pyiceberg.table.snapshots import Snapshot, ancestors_of | ||
from pyiceberg.types import PrimitiveType | ||
|
@@ -384,6 +384,26 @@ def _get_all_manifests_schema(self) -> "pa.Schema": | |
all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False)) | ||
return all_manifests_schema | ||
|
||
def _get_positional_deletes_schema(self) -> "pa.Schema": | ||
import pyarrow as pa | ||
|
||
from pyiceberg.io.pyarrow import schema_to_pyarrow | ||
|
||
partition_struct = self.tbl.metadata.spec_struct() | ||
pa_partition_struct = schema_to_pyarrow(partition_struct) | ||
pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is using the table's current metadata, perhaps we want to pass it these based on which snapshot we're using |
||
positional_delete_schema = pa.schema( | ||
[ | ||
pa.field("file_path", pa.string(), nullable=False), | ||
pa.field("pos", pa.int64(), nullable=False), | ||
pa.field("row", pa_row_struct, nullable=True), | ||
pa.field("partition", pa_partition_struct, nullable=False), | ||
pa.field("spec_id", pa.int64(), nullable=True), | ||
pa.field("delete_file_path", pa.string(), nullable=False), | ||
] | ||
) | ||
return positional_delete_schema | ||
|
||
def _generate_manifests_table(self, snapshot: Optional[Snapshot], is_all_manifests_table: bool = False) -> "pa.Table": | ||
import pyarrow as pa | ||
|
||
|
@@ -453,6 +473,37 @@ def _partition_summaries_to_rows( | |
schema=self._get_all_manifests_schema() if is_all_manifests_table else self._get_manifests_schema(), | ||
) | ||
|
||
def _generate_positional_delete_table(self, manifest: ManifestFile, position_deletes_schema: "pa.Schema") -> "pa.Table": | ||
import pyarrow as pa | ||
|
||
positional_deletes: List["pa.Table"] = [] | ||
|
||
if manifest.content == ManifestContent.DELETES: | ||
for entry in manifest.fetch_manifest_entry(self.tbl.io): | ||
if entry.data_file.content == DataFileContent.POSITION_DELETES: | ||
from pyiceberg.io.pyarrow import _fs_from_file_path, _read_delete_file | ||
|
||
positional_delete_file = _read_delete_file( | ||
_fs_from_file_path(self.tbl.io, entry.data_file.file_path), entry.data_file | ||
) | ||
positional_deletes_records = [] | ||
for record in positional_delete_file: | ||
row = { | ||
"file_path": record.file_path, | ||
"pos": record.pos, | ||
"row": record.row, | ||
"partition": entry.data_file.partition.__dict__, | ||
"spec_id": manifest.partition_spec_id, | ||
"delete_file_path": entry.data_file.file_path, | ||
} | ||
positional_deletes_records.append(row) | ||
|
||
positional_deletes.append(pa.Table.from_pylist(positional_deletes_records, position_deletes_schema)) | ||
|
||
if not positional_deletes: | ||
return pa.Table.from_pylist([], position_deletes_schema) | ||
return pa.concat_tables(positional_deletes) | ||
|
||
def manifests(self) -> "pa.Table": | ||
return self._generate_manifests_table(self.tbl.current_snapshot()) | ||
|
||
|
@@ -657,3 +708,19 @@ def all_manifests(self) -> "pa.Table": | |
lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] | ||
) | ||
return pa.concat_tables(manifests_by_snapshots) | ||
|
||
def position_deletes(self, snapshot_id: Optional[int] = None) -> "pa.Table": | ||
import pyarrow as pa | ||
|
||
snapshot = self._get_snapshot(snapshot_id) if snapshot_id else self.tbl.current_snapshot() | ||
position_deletes_schema = self._get_positional_deletes_schema() | ||
|
||
if not snapshot: | ||
return pa.Table.from_pylist([], schema=position_deletes_schema) | ||
|
||
executor = ExecutorFactory.get_or_create() | ||
positional_deletes: Iterator["pa.Table"] = executor.map( | ||
lambda manifest: self._generate_positional_delete_table(manifest, position_deletes_schema), | ||
snapshot.manifests(self.tbl.io), | ||
) | ||
return pa.concat_tables(positional_deletes) |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -279,6 +279,37 @@ def specs_struct(self) -> StructType: | |||||
|
||||||
return StructType(*nested_fields) | ||||||
|
||||||
def spec_struct(self, spec_id: Optional[int] = None) -> StructType: | ||||||
"""Produce for a spec_id a struct of PartitionSpecs. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
The partition fields should be optional: Partition fields may be added later, | ||||||
in which case not all files would have the result field, and it may be null. | ||||||
|
||||||
:return: A StructType that represents a PartitionSpec of the table for a specific spec_id or latest. | ||||||
""" | ||||||
if spec_id is None: | ||||||
spec = self.spec() | ||||||
else: | ||||||
specs = self.specs() | ||||||
filtered_spec = list(filter(lambda spec: spec.spec_id == spec_id, specs.values())) | ||||||
if not filtered_spec: | ||||||
raise ValidationError(f"Spec with spec_id {spec_id} not found") | ||||||
spec = filtered_spec[0] | ||||||
# Collect all the fields | ||||||
struct_fields = {field.field_id: field for field in spec.fields} | ||||||
|
||||||
schema = self.schema() | ||||||
|
||||||
nested_fields = [] | ||||||
# Sort them by field_id in order to get a deterministic output | ||||||
for field_id in sorted(struct_fields): | ||||||
field = struct_fields[field_id] | ||||||
source_type = schema.find_type(field.source_id) | ||||||
result_type = field.transform.result_type(source_type) | ||||||
nested_fields.append(NestedField(field_id=field.field_id, name=field.name, type=result_type, required=False)) | ||||||
|
||||||
return StructType(*nested_fields) | ||||||
|
||||||
def new_snapshot_id(self) -> int: | ||||||
"""Generate a new snapshot-id that's not in use.""" | ||||||
snapshot_id = _generate_snapshot_id() | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very similar to
_read_deletes
, do you think there's a way we can reimplement_read_deletes
to returnPositionDelete
and refactor its usage?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure i did it right , i converted _read_deletes to use _read_delete_file so we don't implement the reading part twice , and kept the logic of _read_deletes to return Dict[str, pa.ChunkedArray]
wdyt ?