Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support metadata table "Entries" #863

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

rshkv
Copy link
Contributor

@rshkv rshkv commented Jan 1, 2025

Re #823. This adds support for the the Manifest Entries (docs) which lists entries in the current snapshot's manifest files.

The code is structured with nested builders of StructArray. The hierarchy is roughly as follow (new classes in bold):

  • EntriesTable
    • status, snapshot_id, sequence_number, file_sequence_number
    • data_file built by the DataFileStructBuilder, which has:
      • file_path, file_format, record_count, etc.
      • partition is a struct of partition values built by PartitionValuesStructBuilder
        • has for each partition column an AnyArrayBuilder which I had to introduce to do dynamic ArrayBuilder casting based on the column type
    • readable_metrics is built by the ReadableMetricsStructBuilder
      • for each column, has a PerColumnReadableMetricsBuilder
        • contains column_size, value_count
        • has an upper_bound and lower_bound struct which use AnyArrayBuilder to preserve the type

This PR ended up being quite verbose because arrow-rs is strict about declaring generic types of array builders at compile time. Unlike Python, which supports entries in ~100 lines, we can't shove a dict into a StructBuilder. Ideally, we could build StructArray row-by-row and write logic to convert manifest entries to rows.

Reference implementations:

Copy link
Member

@Xuanwo Xuanwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @rshkv, sorry for making you rebase the PRs again. There are multiple open PRs here, and I spent some time figuring out the best API. I believe we are now heading in the right direction.

We can focus on merging one PR first and then updating the other PRs to avoid additional work.

}

/// Snapshots table.
pub struct SnapshotsTable<'a> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I think we can simply hold a Table here, allowing us to remove the duplicate APIs exposed at the MetadataTable level and make it a straightforward wrapper instead.

Comment on lines 829 to 849
/// A helper wrapping [ArrayBuilder] for building arrays without declaring the inner type at
/// compile-time when types are determined dynamically (e.g. based on some column type).
/// A [DataType] is given at construction time which is used to later downcast the inner array
/// and provided values.
pub(crate) struct AnyArrayBuilder {
data_type: DataType,
inner: Box<dyn ArrayBuilder>,
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate this is quite verbose and I wish we didn't have to do all the pattern matching below. If you think of a another way to do this let me know.

Comment on lines +969 to +1178
/// File sequence number.
#[inline]
pub fn file_sequence_number(&self) -> Option<i64> {
self.file_sequence_number
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file_sequence_number relies on this value but there was no way to get at the ManifestEntry::file_sequence_number before.

@rshkv rshkv force-pushed the wr/metadata-entries branch 2 times, most recently from eecf1f8 to f82b7ff Compare January 2, 2025 22:08
let (array, is_scalar) = value.get();
assert!(is_scalar, "Can only append scalar datum");

match array.data_type() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is list is exhaustive based on the ArrowSchemaVisitor::primitive function above. I.e., every type produced there is covered here.

Comment on lines +880 to +898
DataType::Timestamp(TimeUnit::Microsecond, _) => self
.builder::<TimestampMicrosecondBuilder>()?
.append_value(array.as_primitive::<TimestampMicrosecondType>().value(0)),
DataType::Timestamp(TimeUnit::Nanosecond, _) => self
.builder::<TimestampNanosecondBuilder>()?
.append_value(array.as_primitive::<TimestampNanosecondType>().value(0)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand it's correct to ignore the timezone here because that's not captured in the builder.

Comment on lines +1130 to +1284
.column_sizes(HashMap::from([(1, 1u64), (2, 1u64)]))
.value_counts(HashMap::from([(1, 2u64), (2, 2u64)]))
.null_value_counts(HashMap::from([(1, 3u64), (2, 3u64)]))
.nan_value_counts(HashMap::from([(1, 4u64), (2, 4u64)]))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't based on the test data but wanted to have that reflected in tests.

Comment on lines +1094 to +1280
.lower_bounds(HashMap::from([
(1, Datum::long(1)),
(2, Datum::long(2)),
(3, Datum::long(3)),
(4, Datum::string("Apache")),
(5, Datum::double(100)),
(6, Datum::int(100)),
(7, Datum::long(100)),
(8, Datum::bool(false)),
(9, Datum::float(100.0)),
// decimal values are not supported by schema::get_arrow_datum
// (10, Datum::decimal(Decimal(123, 2))),
(11, Datum::date(0)),
(12, Datum::timestamp_micros(0)),
(13, Datum::timestamptz_micros(0)),
// ns timestamps, uuid, fixed, binary are currently not
// supported in schema::get_arrow_datum
]))
.upper_bounds(HashMap::from([
(1, Datum::long(1)),
(2, Datum::long(5)),
(3, Datum::long(4)),
(4, Datum::string("Iceberg")),
(5, Datum::double(200)),
(6, Datum::int(200)),
(7, Datum::long(200)),
(8, Datum::bool(true)),
(9, Datum::float(200.0)),
// decimal values are not supported by schema::get_arrow_datum
// (10, Datum::decimal(Decimal(123, 2))),
(11, Datum::date(0)),
(12, Datum::timestamp_micros(0)),
(13, Datum::timestamptz_micros(0)),
// ns timestamps, uuid, fixed, binary are currently not
// supported in schema::get_arrow_datum
]))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding these so we cover those as types in the lower and upper bounds.

I'm trying to limit the changes I'm making because it's already a lot. My preference would be to cover all types as partition columns as well.

Comment on lines +1127 to +1279
// ns timestamps, uuid, fixed, binary are currently not
// supported in schema::get_arrow_datum
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could add support but thought that might be for another PR.

pub fn metadata_table(self) -> MetadataTable {
pub fn metadata_table(&self) -> MetadataTable<'_> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing this comment #822 (comment). I prefer that but don't need to do here.

/// Get the schema for the manifest entries table.
pub fn schema(&self) -> Schema {
Schema::new(vec![
Field::new("status", DataType::Int32, false),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is populated with ManifestStatus enum values.

In Java, the status column is i32 (here) but in Python this is u8.

My preference would be u8 but treating the Java implementation as authoritative.

Comment on lines 490 to 489
self.file_size_in_bytes
.append_value(data_file.file_size_in_bytes() as i64);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The casting is slightly annoying given we're dealing with non-negative types. But the Python and Java implementation use i64.

file_path: StringBuilder::new(),
file_format: StringBuilder::new(),
partition: PartitionValuesStructBuilder::new(table_metadata),
record_count: Int64Builder::new(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The manifests table merged in #861 prefers using PrimitiveBuilder::new() which works because Int64Builder is just PrimitiveBuilder<Int64Type>.

I prefer saying Int64Builder here to be explicit about the type but happy to change.

@rshkv rshkv marked this pull request as ready for review January 2, 2025 22:13
@rshkv
Copy link
Contributor Author

rshkv commented Jan 2, 2025

Thank you, @Xuanwo. Rebased and ready for review.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rshkv for this contribution. I have finished first round review and left some cocerns with current api design.


impl<'a> EntriesTable<'a> {
/// Get the schema for the manifest entries table.
pub fn schema(&self) -> Schema {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return arrow schema rather iceberg schema here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following existing API but happy to update. I understand the idea in #822 (comment) was for engines to fetch the schema before having to fetch data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liurenjie1024, would you mind saying more. I'm happy to go with either but not sure why.

Even if there's no consumer of schema() currently, I follow @xxchan's argument's of the reader likely wanting an Arrow schema. Another benefit is that we can use the schema ourselves when constructing scans. I'm not sure what a consumer would do with an Iceberg schema (except maybe convert to Arrow).

As alternative to having an Arrow or Iceberg schema, we could also not have a public schema()?

/// For reference, see the Java implementation of [`DataFile`][1].
///
/// [1]: https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/api/src/main/java/org/apache/iceberg/DataFile.java
struct DataFileStructBuilder<'a> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would not be required if we use iceberg schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you say more? I suppose we still need to construct those StructArray instances?

@rshkv rshkv force-pushed the wr/metadata-entries branch 2 times, most recently from ec127ec to 28d92ad Compare January 8, 2025 13:25
Comment on lines +133 to +138
+--------------------------+---------------------+---------------------+-----------+---------+
| committed_at | snapshot_id | parent_id | operation | summary |
+--------------------------+---------------------+---------------------+-----------+---------+
| 2018-01-04T21:22:35.770Z | 3051729675574597004 | | append | {} |
| 2019-04-12T20:29:15.770Z | 3055729675574597004 | 3051729675574597004 | append | {} |
+--------------------------+---------------------+---------------------+-----------+---------+"#]],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think checking the rendered table is easier to read and confirm with eyeballs. We lose type information but we assert types separately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 This looks great.

Field { name: "sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "file_sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "data_file", data_type: Struct([Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "file_path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "file_format", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition", data_type: Struct([Field { name: "x", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "record_count", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "file_size_in_bytes", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "column_sizes", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_counts", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_counts", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_counts", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bounds", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bounds", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "key_metadata", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "split_offsets", data_type: List(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "equality_ids", data_type: List(Field { name: "item", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "sort_order_id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "readable_metrics", data_type: Struct([Field { name: "x", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "y", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "z", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "a", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "dbl", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "i32", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "i64", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "bool", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "float", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "decimal", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Decimal128(3, 2), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Decimal128(3, 2), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "date", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Date32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Date32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "timestamp", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Timestamp(Microsecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Timestamp(Microsecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "timestamptz", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Timestamp(Microsecond, Some("+00:00")), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Timestamp(Microsecond, Some("+00:00")), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "timestampns", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "timestamptzns", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Timestamp(Nanosecond, Some("+00:00")), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Timestamp(Nanosecond, Some("+00:00")), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "binary", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: LargeBinary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: LargeBinary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much worse here than it does locally:

Screenshot 2025-01-08 at 13 29 53

If there's a way to pretty-print schemas let me know - couldn't find one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there exist one. We can write a separate function (or newtype) for pretty print the schema.
I once did similar things in RisingWave:

The code:

Basically we ignore noises (e.g., unnecessary fields, field names) and make it more concise and readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that'd be nice. E.g., we don't need to render dict_id and dict_is_ordered for every field.

I'll not introduce that in an already large PR (unless you prefer I do). If you think I should, maybe we can do the updates to check_record_batches (pretty-printing batches, ignoring struct fields, and the new schema pretty-print) in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't want to block this PR basically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can do the updates to check_record_batches (pretty-printing batches, ignoring struct fields, and the new schema pretty-print) in a separate PR?

+1

@rshkv
Copy link
Contributor Author

rshkv commented Jan 8, 2025

I've rebased on #870 and #872 to address the follow:

  • The entries table now lives in a separate entries.rs file.
  • Batches for manifest files are now computed asynchronously.

I haven't address @liurenjie1024's point about schema() returning an Iceberg schema instead of an Arrow one. We have issue #868 and PR #871 but I'm not sure that's what we want generally and whether you want this to happen before this PR.

Otherwise this is ready for another review.

@liurenjie1024
Copy link
Contributor

I've rebased on #870 and #872 to address the follow:

  • The entries table now lives in a separate entries.rs file.
  • Batches for manifest files are now computed asynchronously.

I haven't address @liurenjie1024's point about schema() returning an Iceberg schema instead of an Arrow one. We have issue #868 and PR #871 but I'm not sure that's what we want generally and whether you want this to happen before this PR.

Otherwise this is ready for another review.

Thanks @rshkv for the contribution, let's continue discussion about schema in #868

@rshkv
Copy link
Contributor Author

rshkv commented Jan 24, 2025

I'm still working on this and will continue working on this. I find it quite difficult to get Arrow and Iceberg to agree on types because of field ids.

The Iceberg schema requires that all fields have a field id, which in the converted Arrow schema becomes type metadata. But when constructing StructArray or RecordBatch, Arrow checks that the schema matches that of the data (e.g.). And they tend to not match because the schema has field ids and the data does not.

E.g., with MapBuilder, we have with_values_field to pass in a field with a field id in metadata. However there is no with_keys_field equivalent. Yet, the key field in the Iceberg schema must have a field id.

@liurenjie1024, I'd like to see this finished and some clarity on designs you'd merge would help. Here are some questions:

  • We can make schema() return an Iceberg schema with field id, but how important is it that a returned RecordBatch has field ids in metadata? Are we ok with not having field ids on RecordBatch#schema but only on MetadataTable#schema?
  • If we do need record batches types to have those field ids, we might need changes in arrow-rs to express something like "a RecordBatch or StructArray may have fields with metadata, but the respective types of the underlying ArrayData don't need to match metadata".

Let me know what you think or if I'm not being clear.

Copy link
Contributor

@xxchan xxchan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed that in the Java implementation, the schema has field ids, defined like this:

https://github.com/apache/iceberg/blob/ec269ee3ec0de4184eb536a6ef4f3523dc91332a/api/src/main/java/org/apache/iceberg/DataFile.java#L123-L146

(BTW personally I don't think that's very necessary for compute engines though

@xxchan
Copy link
Contributor

xxchan commented Feb 2, 2025

But when constructing StructArray or RecordBatch, Arrow checks that the schema matches that of the data (e.g.). And they tend to not match because the schema has field ids and the data does not.

A little confused about this point. What resulted them mismatching? IIUC if we use the correct schema to construct Array/RecordBatch, there should be no mismatch.

@rshkv
Copy link
Contributor Author

rshkv commented Feb 2, 2025

@xxchan, that's right. The problem is constructing Array/RecordBatch with the right schema in the first place. MapBuilder doesn't let you configure the key field though, so the keys array won't have the field id.

@xxchan
Copy link
Contributor

xxchan commented Feb 2, 2025

MapBuilder doesn't let you configure the key field though, so the keys array won't have the field id.

Not sure if I understand it correctly, this sounds like just some limitations of arrow-rs and can be solved easily by adding some new APIs, instead of some fundamental difficulty.

BTW maybe it's easier to discuss if you can push some sample code.

@Fokko
Copy link
Contributor

Fokko commented Feb 3, 2025

We can make schema() return an Iceberg schema with field id, but how important is it that a returned RecordBatch has field ids in metadata? Are we ok with not having field ids on RecordBatch#schema but only on MetadataTable#schema?

Yes, that's alright. The field-IDs are internal to Iceberg, and when it goes to the engine it is based on the environment, SQL is position-based, while you could also lookup by name in dataframes.

@Fokko
Copy link
Contributor

Fokko commented Feb 3, 2025

Just noticed that in the Java implementation, the schema has field ids, defined like this: #863 (review)

We don't have to expose the field-IDs internally, as long as we resolve the fields using the ID. The name might change over time (see apache/iceberg#5338), but if you read the data as an Iceberg table, then we resolve the fields by ID then everything will be correct.

@rshkv rshkv force-pushed the wr/metadata-entries branch 4 times, most recently from e189347 to 849ac07 Compare February 9, 2025 01:07
@rshkv rshkv force-pushed the wr/metadata-entries branch from 849ac07 to 05159f8 Compare February 9, 2025 01:15
@rshkv
Copy link
Contributor Author

rshkv commented Feb 9, 2025

BTW maybe it's easier to discuss if you can push some sample code.

Pushed my work-in-progress. Currently failing here:

Incorrect datatype for StructArray field \"column_sizes\", expected

Map(Field { 
  name: \"key_value\", 
  data_type: Struct([
    Field { name: \"key\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {\"PARQUET:field_id\": \"117\"} }, 
    Field { name: \"value\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {\"PARQUET:field_id\": \"118\"} }]),
  nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false)

got

Map(Field {
  name: \"key_value\",
  data_type: Struct([
    Field { name: \"key\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
    Field { name: \"value\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]),
  nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false)

Not sure if I understand it correctly, this sounds like just some limitations of arrow-rs and can be solved easily by adding some new APIs, instead of some fundamental difficulty.

That's right. Going to open a PR with arrow-rs.

@rshkv rshkv force-pushed the wr/metadata-entries branch from d03735e to 4efacd4 Compare February 9, 2025 21:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants