-
Notifications
You must be signed in to change notification settings - Fork 229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow writing pa.Table
that are either a subset of table schema or in arbitrary order, and support type promotion on write
#921
Changes from 5 commits
245acda
0118f2a
e75e0ad
b6e3410
6b774c6
e26eb23
f0125e9
29573d9
d7ec362
d4d80e3
865c446
7340476
28e20d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -120,6 +120,7 @@ | |
Schema, | ||
SchemaVisitorPerPrimitiveType, | ||
SchemaWithPartnerVisitor, | ||
_check_schema_compatible, | ||
pre_order_visit, | ||
promote, | ||
prune_columns, | ||
|
@@ -1450,14 +1451,17 @@ def field_partner(self, partner_struct: Optional[pa.Array], field_id: int, _: st | |
except ValueError: | ||
return None | ||
|
||
if isinstance(partner_struct, pa.StructArray): | ||
return partner_struct.field(name) | ||
elif isinstance(partner_struct, pa.Table): | ||
return partner_struct.column(name).combine_chunks() | ||
elif isinstance(partner_struct, pa.RecordBatch): | ||
return partner_struct.column(name) | ||
else: | ||
raise ValueError(f"Cannot find {name} in expected partner_struct type {type(partner_struct)}") | ||
try: | ||
if isinstance(partner_struct, pa.StructArray): | ||
return partner_struct.field(name) | ||
elif isinstance(partner_struct, pa.Table): | ||
return partner_struct.column(name).combine_chunks() | ||
elif isinstance(partner_struct, pa.RecordBatch): | ||
return partner_struct.column(name) | ||
else: | ||
raise ValueError(f"Cannot find {name} in expected partner_struct type {type(partner_struct)}") | ||
except KeyError: | ||
return None | ||
|
||
return None | ||
|
||
|
@@ -1998,8 +2002,7 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT | |
) | ||
|
||
def write_parquet(task: WriteTask) -> DataFile: | ||
table_schema = task.schema | ||
|
||
Fokko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
table_schema = table_metadata.schema() | ||
# if schema needs to be transformed, use the transformed schema and adjust the arrow table accordingly | ||
# otherwise use the original schema | ||
if (sanitized_schema := sanitize_column_names(table_schema)) != table_schema: | ||
|
@@ -2011,7 +2014,7 @@ def write_parquet(task: WriteTask) -> DataFile: | |
batches = [ | ||
_to_requested_schema( | ||
requested_schema=file_schema, | ||
file_schema=table_schema, | ||
file_schema=task.schema, | ||
batch=batch, | ||
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, | ||
include_field_ids=True, | ||
|
@@ -2070,47 +2073,30 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[ | |
return bin_packed_record_batches | ||
|
||
|
||
def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False) -> None: | ||
def _check_pyarrow_schema_compatible( | ||
requested_schema: Schema, provided_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False | ||
) -> None: | ||
""" | ||
Check if the `table_schema` is compatible with `other_schema`. | ||
Check if the `requested_schema` is compatible with `provided_schema`. | ||
Two schemas are considered compatible when they are equal in terms of the Iceberg Schema type. | ||
Raises: | ||
ValueError: If the schemas are not compatible. | ||
""" | ||
name_mapping = table_schema.name_mapping | ||
name_mapping = requested_schema.name_mapping | ||
sungwy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try: | ||
task_schema = pyarrow_to_schema( | ||
other_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us | ||
provided_schema = pyarrow_to_schema( | ||
provided_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us | ||
) | ||
except ValueError as e: | ||
other_schema = _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) | ||
additional_names = set(other_schema.column_names) - set(table_schema.column_names) | ||
provided_schema = _pyarrow_to_schema_without_ids(provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) | ||
additional_names = provided_schema.field_names - requested_schema.field_names | ||
raise ValueError( | ||
sungwy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." | ||
) from e | ||
|
||
if table_schema.as_struct() != task_schema.as_struct(): | ||
from rich.console import Console | ||
from rich.table import Table as RichTable | ||
|
||
console = Console(record=True) | ||
|
||
rich_table = RichTable(show_header=True, header_style="bold") | ||
rich_table.add_column("") | ||
rich_table.add_column("Table field") | ||
rich_table.add_column("Dataframe field") | ||
|
||
for lhs in table_schema.fields: | ||
try: | ||
rhs = task_schema.find_field(lhs.field_id) | ||
rich_table.add_row("✅" if lhs == rhs else "❌", str(lhs), str(rhs)) | ||
except ValueError: | ||
rich_table.add_row("❌", str(lhs), "Missing") | ||
|
||
console.print(rich_table) | ||
raise ValueError(f"Mismatch in fields:\n{console.export_text()}") | ||
_check_schema_compatible(requested_schema, provided_schema) | ||
|
||
|
||
def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]: | ||
|
@@ -2124,7 +2110,7 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_ | |
f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids" | ||
) | ||
schema = table_metadata.schema() | ||
_check_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema()) | ||
_check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema()) | ||
|
||
statistics = data_file_statistics_from_parquet_metadata( | ||
parquet_metadata=parquet_metadata, | ||
|
@@ -2205,7 +2191,7 @@ def _dataframe_to_data_files( | |
Returns: | ||
An iterable that supplies datafiles that represent the table. | ||
""" | ||
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask | ||
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, PropertyUtil, TableProperties, WriteTask | ||
|
||
counter = counter or itertools.count(0) | ||
write_uuid = write_uuid or uuid.uuid4() | ||
|
@@ -2214,13 +2200,16 @@ def _dataframe_to_data_files( | |
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, | ||
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, | ||
) | ||
name_mapping = table_metadata.schema().name_mapping | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we use table's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question - I'm not sure actually. When we are writing a dataframe into an Iceberg table, I think we are making the assumption that its names match the current names of the Iceberg table, so I think using the
I'm curious to hear what others' thoughts are, and whether anyone has a workflow in mind that would benefit from this change! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds great! I initially raised this because we’re assigning field IDs for the input dataframe, which aligns with the general purpose of name mapping - to provide fallback IDs. On second thought, schema.name-mapping.default is more for the read side, so using it here may silently introduce unwanted side effects during write. I agree, let’s hold off on this for a while and wait for more discussions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds great 👍 thank you for the review! |
||
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False | ||
task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change is necessary to ensure that we are comparing the Schema that matches that arrow table's schema versus the Table Schema in order to properly invoke There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great catch 👍 |
||
|
||
if table_metadata.spec().is_unpartitioned(): | ||
yield from write_file( | ||
io=io, | ||
table_metadata=table_metadata, | ||
tasks=iter([ | ||
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=table_metadata.schema()) | ||
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema) | ||
for batches in bin_pack_arrow_table(df, target_file_size) | ||
]), | ||
) | ||
|
@@ -2235,7 +2224,7 @@ def _dataframe_to_data_files( | |
task_id=next(counter), | ||
record_batches=batches, | ||
partition_key=partition.partition_key, | ||
schema=table_metadata.schema(), | ||
schema=task_schema, | ||
) | ||
for partition in partitions | ||
for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size) | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -324,6 +324,11 @@ def field_ids(self) -> Set[int]: | |||||||||||
"""Return the IDs of the current schema.""" | ||||||||||||
return set(self._name_to_id.values()) | ||||||||||||
|
||||||||||||
@property | ||||||||||||
def field_names(self) -> Set[str]: | ||||||||||||
Fokko marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
"""Return the Names of the current schema.""" | ||||||||||||
return set(self._name_to_id.keys()) | ||||||||||||
|
||||||||||||
def _validate_identifier_field(self, field_id: int) -> None: | ||||||||||||
"""Validate that the field with the given ID is a valid identifier field. | ||||||||||||
|
@@ -1616,3 +1621,145 @@ def _(file_type: FixedType, read_type: IcebergType) -> IcebergType: | |||||||||||
return read_type | ||||||||||||
else: | ||||||||||||
raise ResolveError(f"Cannot promote {file_type} to {read_type}") | ||||||||||||
|
||||||||||||
|
||||||||||||
def _check_schema_compatible(requested_schema: Schema, provided_schema: Schema) -> None: | ||||||||||||
""" | ||||||||||||
Check if the `provided_schema` is compatible with `requested_schema`. | ||||||||||||
Both Schemas must have valid IDs and share the same ID for the same field names. | ||||||||||||
Two schemas are considered compatible when: | ||||||||||||
1. All `required` fields in `requested_schema` are present and are also `required` in the `provided_schema` | ||||||||||||
2. Field Types are consistent for fields that are present in both schemas. I.e. the field type | ||||||||||||
in the `provided_schema` can be promoted to the field type of the same field ID in `requested_schema` | ||||||||||||
Raises: | ||||||||||||
ValueError: If the schemas are not compatible. | ||||||||||||
""" | ||||||||||||
visit(requested_schema, _SchemaCompatibilityVisitor(provided_schema)) | ||||||||||||
|
||||||||||||
# from rich.console import Console | ||||||||||||
# from rich.table import Table as RichTable | ||||||||||||
|
||||||||||||
# console = Console(record=True) | ||||||||||||
|
||||||||||||
# rich_table = RichTable(show_header=True, header_style="bold") | ||||||||||||
# rich_table.add_column("") | ||||||||||||
# rich_table.add_column("Table field") | ||||||||||||
# rich_table.add_column("Dataframe field") | ||||||||||||
|
||||||||||||
# is_compatible = True | ||||||||||||
|
||||||||||||
# for field_id in requested_schema.field_ids: | ||||||||||||
# lhs = requested_schema.find_field(field_id) | ||||||||||||
# try: | ||||||||||||
# rhs = provided_schema.find_field(field_id) | ||||||||||||
# except ValueError: | ||||||||||||
# if lhs.required: | ||||||||||||
# rich_table.add_row("❌", str(lhs), "Missing") | ||||||||||||
# is_compatible = False | ||||||||||||
# else: | ||||||||||||
# rich_table.add_row("✅", str(lhs), "Missing") | ||||||||||||
# continue | ||||||||||||
|
||||||||||||
# if lhs.required and not rhs.required: | ||||||||||||
# rich_table.add_row("❌", str(lhs), "Missing") | ||||||||||||
# is_compatible = False | ||||||||||||
|
||||||||||||
# if lhs.field_type == rhs.field_type: | ||||||||||||
# rich_table.add_row("✅", str(lhs), str(rhs)) | ||||||||||||
# continue | ||||||||||||
# elif any( | ||||||||||||
# (isinstance(lhs.field_type, container_type) and isinstance(rhs.field_type, container_type)) | ||||||||||||
# for container_type in {StructType, MapType, ListType} | ||||||||||||
# ): | ||||||||||||
# rich_table.add_row("✅", str(lhs), str(rhs)) | ||||||||||||
# continue | ||||||||||||
# else: | ||||||||||||
# try: | ||||||||||||
# promote(rhs.field_type, lhs.field_type) | ||||||||||||
# rich_table.add_row("✅", str(lhs), str(rhs)) | ||||||||||||
# except ResolveError: | ||||||||||||
# rich_table.add_row("❌", str(lhs), str(rhs)) | ||||||||||||
# is_compatible = False | ||||||||||||
|
||||||||||||
# if not is_compatible: | ||||||||||||
# console.print(rich_table) | ||||||||||||
# raise ValueError(f"Mismatch in fields:\n{console.export_text()}") | ||||||||||||
|
||||||||||||
|
||||||||||||
class _SchemaCompatibilityVisitor(SchemaVisitor[bool]): | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @Fokko here's my take on using a Visitor as we've discussed - unfortunately the richTable doesn't print in order with the nested fields because a Visitor doesn't traverse in pre-order sequence. Should we use a PreOrderSchemaVisitor here instead to ensure that the fields are logged in order of IDs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I think pre-order makes sense here 👍 |
||||||||||||
provided_schema: Schema | ||||||||||||
|
||||||||||||
def __init__(self, provided_schema: Schema): | ||||||||||||
from rich.console import Console | ||||||||||||
from rich.table import Table as RichTable | ||||||||||||
|
||||||||||||
self.provided_schema = provided_schema | ||||||||||||
self.rich_table = RichTable(show_header=True, header_style="bold") | ||||||||||||
self.rich_table.add_column("") | ||||||||||||
self.rich_table.add_column("Table field") | ||||||||||||
self.rich_table.add_column("Dataframe field") | ||||||||||||
self.console = Console(record=True) | ||||||||||||
|
||||||||||||
def _is_field_compatible(self, lhs: NestedField) -> bool: | ||||||||||||
# Check required field exists as required field first | ||||||||||||
try: | ||||||||||||
rhs = self.provided_schema.find_field(lhs.field_id) | ||||||||||||
except ValueError: | ||||||||||||
if lhs.required: | ||||||||||||
self.rich_table.add_row("❌", str(lhs), "Missing") | ||||||||||||
return False | ||||||||||||
else: | ||||||||||||
self.rich_table.add_row("✅", str(lhs), "Missing") | ||||||||||||
return True | ||||||||||||
|
||||||||||||
if lhs.required and not rhs.required: | ||||||||||||
self.rich_table.add_row("❌", str(lhs), "Missing") | ||||||||||||
return False | ||||||||||||
|
||||||||||||
# Check type compatibility | ||||||||||||
if lhs.field_type == rhs.field_type: | ||||||||||||
self.rich_table.add_row("✅", str(lhs), str(rhs)) | ||||||||||||
return True | ||||||||||||
elif any( | ||||||||||||
(isinstance(lhs.field_type, container_type) and isinstance(rhs.field_type, container_type)) | ||||||||||||
for container_type in {StructType, MapType, ListType} | ||||||||||||
): | ||||||||||||
sungwy marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
self.rich_table.add_row("✅", str(lhs), str(rhs)) | ||||||||||||
return True | ||||||||||||
else: | ||||||||||||
try: | ||||||||||||
promote(rhs.field_type, lhs.field_type) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This succeeds for
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have a test to reproduce this? This is interesting since for Python There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll write up a test 👍 The comparison isn't between python types, but between parquet physical types: https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/pyarrow.py#L1503-L1507 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we could get away with just removing this check, since we are running a comprehensive type compatibility check already? iceberg-python/pyiceberg/io/pyarrow.py Lines 1550 to 1554 in e27cd90
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test added that demonstrates this issue: https://github.com/apache/iceberg-python/pull/921/files#diff-8ca7e967a2c2ef394c75f707879f1b7e6d09226c321643140b9325f742041d67R669-R713 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Confirmed that this would work @Fokko let me know if we are good to move forward with this change! |
||||||||||||
self.rich_table.add_row("✅", str(lhs), str(rhs)) | ||||||||||||
return True | ||||||||||||
except ResolveError: | ||||||||||||
self.rich_table.add_row("❌", str(lhs), str(rhs)) | ||||||||||||
return False | ||||||||||||
|
||||||||||||
def schema(self, schema: Schema, struct_result: bool) -> bool: | ||||||||||||
if not struct_result: | ||||||||||||
self.console.print(self.rich_table) | ||||||||||||
raise ValueError(f"Mismatch in fields:\n{self.console.export_text()}") | ||||||||||||
sungwy marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
return struct_result | ||||||||||||
|
||||||||||||
def struct(self, struct: StructType, field_results: List[bool]) -> bool: | ||||||||||||
return all(field_results) | ||||||||||||
|
||||||||||||
def field(self, field: NestedField, field_result: bool) -> bool: | ||||||||||||
return all([self._is_field_compatible(field), field_result]) | ||||||||||||
|
||||||||||||
def list(self, list_type: ListType, element_result: bool) -> bool: | ||||||||||||
return element_result and self._is_field_compatible(list_type.element_field) | ||||||||||||
|
||||||||||||
def map(self, map_type: MapType, key_result: bool, value_result: bool) -> bool: | ||||||||||||
return all([ | ||||||||||||
self._is_field_compatible(map_type.key_field), | ||||||||||||
self._is_field_compatible(map_type.value_field), | ||||||||||||
key_result, | ||||||||||||
value_result, | ||||||||||||
]) | ||||||||||||
|
||||||||||||
def primitive(self, primitive: PrimitiveType) -> bool: | ||||||||||||
return True |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -501,14 +501,11 @@ def test_add_files_fails_on_schema_mismatch(spark: SparkSession, session_catalog | |
) | ||
|
||
expected = """Mismatch in fields: | ||
┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┓ | ||
┃ ┃ Table field ┃ Dataframe field ┃ | ||
┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━┩ | ||
│ ✅ │ 1: foo: optional boolean │ 1: foo: optional boolean │ | ||
| ✅ │ 2: bar: optional string │ 2: bar: optional string │ | ||
│ ❌ │ 3: baz: optional int │ 3: baz: optional string │ | ||
│ ✅ │ 4: qux: optional date │ 4: qux: optional date │ | ||
└────┴──────────────────────────┴──────────────────────────┘ | ||
┏━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┓ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it just me, or is the left easier to read? 😅 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I opted for this approach because I wanted to group the Extra Fields in the dataframe also into the table. But if we are taking the approach of using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the new way since it tells me exactly which field to focus on and the reason its not compatible |
||
┃ Field Name ┃ Category ┃ Table field ┃ Dataframe field ┃ | ||
┡━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━┩ | ||
│ baz │ Type │ optional int │ optional string │ | ||
└────────────┴──────────┴──────────────┴─────────────────┘ | ||
""" | ||
|
||
with pytest.raises(ValueError, match=expected): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is necessary to support writing dataframes / recordbatches with a subset of the schema. Otherwise, the
ArrowAccessor
throws aKeyError
. This way, we return aNone
and theArrowProjectionVisitor
is responsible for checking if the field is nullable, and can be filled in with a null array.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change responsible for schema projection / writing a subset of the schema? Do you mind expanding on the mechanism behind how this works? I'm curious
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's right - the
ArrowProjectionVisitor
is responsible for detecting that thefield_partner
isNone
and then checking if the table field is also optional before filling it in with a null array. This change is necessary so that theArrowAccessor
doesn't throw an exception if the field can't be found in the arrow component, and enablesArrowProjectionVisitor
to make use of a code pathway it wasn't able to make use of before:iceberg-python/pyiceberg/io/pyarrow.py
Lines 1388 to 1395 in b11cdb5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Above we have the
file_schema
that should correspond with thepartner_struct
. I expect that when looking up the field-id, it should alreadyreturn None
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I as I pointed out in this comment: #921 (comment) I think
write_parquet
is using the Table Schema, instead of the Schema corresponding to the data types of the PyArrow construct.I will take that to mean that this isn't intended and making sure that we use the Schema corresponding to the data types of the PyArrow construct is what we intend to do here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the context. This isn't intended, the schema should align with the data. I checked against the last commit, and it doesn't throw the
KeyError
anymore because of your fix. Thanks 👍There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the suggestion - I've removed this try exception block in the latest update.