-
Notifications
You must be signed in to change notification settings - Fork 229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
position_deletes metadata table #1615
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally LGTM! i added a few nit comments on style.
I think its a good idea to introduce the PositionalDelete class, similar to DataFile
pyiceberg/table/inspect.py
Outdated
@@ -657,3 +717,19 @@ def all_manifests(self) -> "pa.Table": | |||
lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] | |||
) | |||
return pa.concat_tables(manifests_by_snapshots) | |||
|
|||
def position_deletes(self) -> "pa.Table": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: for any metadata tables referencing the current snapshot, let's add an optional param to allow querying for any given snapshot id.
pyiceberg/table/inspect.py
Outdated
|
||
from pyiceberg.io.pyarrow import schema_to_pyarrow | ||
|
||
partition_record = self.tbl.metadata.specs_struct() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: specs_struct()
looks at all PartitionSpecs, do we just need the current partition spec here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also nit: naming, this is a structtype, not the underlying record
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch , i now take the current PartitionSpec
pyiceberg/table/inspect.py
Outdated
partition_record = self.tbl.metadata.specs_struct() | ||
pa_partition_struct = schema_to_pyarrow(partition_record) | ||
pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct()) | ||
positinal_delete_schema = pa.schema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
positinal_delete_schema = pa.schema( | |
positional_delete_schema = pa.schema( |
pyiceberg/table/inspect.py
Outdated
pa.field("delete_file_path", pa.string(), nullable=False), | ||
] | ||
) | ||
return positinal_delete_schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return positinal_delete_schema | |
return positional_delete_schema |
pyiceberg/io/pyarrow.py
Outdated
def _read_delete_file(fs: FileSystem, data_file: DataFile, schema: "pa.Schema") -> pa.Table: | ||
delete_fragment = _construct_fragment(fs, data_file, file_format_kwargs={"pre_buffer": True, "buffer_size": ONE_MEGABYTE}) | ||
table = ds.Scanner.from_fragment(fragment=delete_fragment, schema=schema).to_table() | ||
return table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this might be a good place to start introducing a PositionalDelete
class and we can get rid of the custom _get_positional_file_schema
function
also lets add this to new table to the docs as well https://py.iceberg.apache.org/api/#inspecting-tables |
done :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some more comments :)
cc @Fokko this PR adds the PositionDelete
class 🥳
@@ -320,6 +320,34 @@ def data_file_with_partition(partition_type: StructType, format_version: TableVe | |||
) | |||
|
|||
|
|||
class PositionDelete(Record): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -889,6 +890,19 @@ def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs: | |||
return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs) | |||
|
|||
|
|||
def _read_delete_file(fs: FileSystem, data_file: DataFile) -> Iterator[PositionDelete]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very similar to _read_deletes
, do you think there's a way we can reimplement _read_deletes
to return PositionDelete
and refactor its usage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure i did it right , i converted _read_deletes to use _read_delete_file so we don't implement the reading part twice , and kept the logic of _read_deletes to return Dict[str, pa.ChunkedArray]
wdyt ?
pyiceberg/table/metadata.py
Outdated
@@ -279,6 +279,37 @@ def specs_struct(self) -> StructType: | |||
|
|||
return StructType(*nested_fields) | |||
|
|||
def spec_struct(self, spec_id: Optional[int] = None) -> StructType: | |||
"""Produce for a spec_id a struct of PartitionSpecs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"""Produce for a spec_id a struct of PartitionSpecs. | |
"""Produce for a spec_id a struct of PartitionSpecs. |
pyiceberg/table/inspect.py
Outdated
partition_struct = self.tbl.metadata.spec_struct() | ||
pa_partition_struct = schema_to_pyarrow(partition_struct) | ||
pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is using the table's current metadata, perhaps we want to pass it these based on which snapshot we're using
Implements position_deletes metadata table - #1053