Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support metadata table "Entries" #863

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
633 changes: 331 additions & 302 deletions Cargo.lock

Large diffs are not rendered by default.

37 changes: 28 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ rust-version = "1.77.1"
anyhow = "1.0.72"
apache-avro = "0.17"
array-init = "2"
arrow-arith = { version = "53.3.0" }
arrow-array = { version = "53.4.0" }
arrow-cast = { version = "53.4.0" }
arrow-ord = { version = "53.4.0" }
arrow-schema = { version = "53.4.0" }
arrow-select = { version = "53.4.0" }
arrow-string = { version = "53.4.0" }
arrow-arith = { version = "54.1.0" }
arrow-array = { version = "54.1.0" }
arrow-cast = { version = "54.1.0" }
arrow-ord = { version = "54.1.0" }
arrow-schema = { version = "54.1.0" }
arrow-select = { version = "54.1.0" }
arrow-string = { version = "54.1.0" }
async-stream = "0.3.5"
async-trait = "0.1.86"
async-std = "1.12"
Expand All @@ -58,7 +58,7 @@ bitvec = "1.0.1"
bytes = "1.6"
chrono = "0.4.38"
ctor = "0.2.8"
datafusion = "44"
datafusion = "45.0.0"
derive_builder = "0.20"
either = "1"
env_logger = "0.11.0"
Expand All @@ -77,7 +77,7 @@ num-bigint = "0.4.6"
once_cell = "1.19"
opendal = "0.51.1"
ordered-float = "4"
parquet = "53.4.0"
parquet = "54.1.0"
paste = "1.0.15"
pilota = "0.11.2"
pretty_assertions = "1.4"
Expand All @@ -103,3 +103,22 @@ hive_metastore = "0.1"
tera = "1"
zstd = "0.13.2"
expect-test = "1"

# Surely there's a better way to do this?
# https://github.com/apache/arrow-rs/pull/7101
# TODO(rshkv): Remove obviously once that PR merges
[patch.crates-io]
arrow-arith = { git = "https://github.com/rshkv/arrow-rs", branch = "wr/map-builder-with-key-field" }
arrow-array = { git = "https://github.com/rshkv/arrow-rs", branch = "wr/map-builder-with-key-field" }
arrow-buffer = { git = "https://github.com/rshkv/arrow-rs", branch = "wr/map-builder-with-key-field" }
arrow-cast = { git = "https://github.com/rshkv/arrow-rs", branch = "wr/map-builder-with-key-field" }
arrow-csv = { git = "https://github.com/rshkv/arrow-rs", branch = "wr/map-builder-with-key-field" }
arrow-data = { git = "https://github.com/rshkv/arrow-rs", branch = "wr/map-builder-with-key-field" }
arrow-ipc = { git = "https://github.com/rshkv/arrow-rs", branch = "wr/map-builder-with-key-field" }
arrow-json = { git = "https://github.com/rshkv/arrow-rs", branch = "wr/map-builder-with-key-field" }
arrow-ord = { git = "https://github.com/rshkv/arrow-rs", branch = "wr/map-builder-with-key-field" }
arrow-row = { git = "https://github.com/rshkv/arrow-rs", branch = "wr/map-builder-with-key-field" }
arrow-schema = { git = "https://github.com/rshkv/arrow-rs", branch = "wr/map-builder-with-key-field" }
arrow-select = { git = "https://github.com/rshkv/arrow-rs", branch = "wr/map-builder-with-key-field" }
arrow-string = { git = "https://github.com/rshkv/arrow-rs", branch = "wr/map-builder-with-key-field" }
parquet = { git = "https://github.com/rshkv/arrow-rs", branch = "wr/map-builder-with-key-field" }
2 changes: 2 additions & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow-string = { workspace = true }
async-std = { workspace = true, optional = true, features = ["attributes"] }
async-stream = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
bitvec = { workspace = true }
Expand Down Expand Up @@ -85,6 +86,7 @@ uuid = { workspace = true }
zstd = { workspace = true }

[dev-dependencies]
arrow-cast = { workspace = true, features = ["prettyprint"] }
ctor = { workspace = true }
expect-test = { workspace = true }
iceberg-catalog-memory = { workspace = true }
Expand Down
187 changes: 187 additions & 0 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,193 @@ get_parquet_stat_as_datum!(min);

get_parquet_stat_as_datum!(max);

/// Utilities to deal with [arrow_array::builder] types in the Iceberg context.
pub(crate) mod builder {
use arrow_array::builder::*;
use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::{ArrayRef, Datum as ArrowDatum};
use arrow_schema::{DataType, TimeUnit};
use ordered_float::OrderedFloat;

use crate::spec::{Literal, PrimitiveLiteral};
use crate::{Error, ErrorKind};

/// A helper wrapping [ArrayBuilder] for building arrays without declaring the inner type at
/// compile-time when types are determined dynamically (e.g. based on some column type).
/// A [DataType] is given at construction time which is used to later downcast the inner array
/// and provided values.
pub(crate) struct AnyPrimitiveArrayBuilder {
data_type: DataType,
inner: Box<dyn ArrayBuilder>,
}

impl AnyPrimitiveArrayBuilder {
pub(crate) fn new(data_type: &DataType) -> Self {
Self {
data_type: data_type.clone(),
inner: make_builder(data_type, 0),
}
}

pub(crate) fn finish(&mut self) -> ArrayRef {
self.inner.finish()
}

/// Append an [[arrow_array::Datum]] value.
pub(crate) fn append_datum(&mut self, value: &dyn ArrowDatum) -> crate::Result<()> {
let (array, is_scalar) = value.get();
assert!(is_scalar, "Can only append scalar datum");

match array.data_type() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is list is exhaustive based on the ArrowSchemaVisitor::primitive function above. I.e., every type produced there is covered here.

DataType::Boolean => self
.builder::<BooleanBuilder>()?
.append_value(array.as_boolean().value(0)),
DataType::Int32 => self
.builder::<Int32Builder>()?
.append_value(array.as_primitive::<Int32Type>().value(0)),
DataType::Int64 => self
.builder::<Int64Builder>()?
.append_value(array.as_primitive::<Int64Type>().value(0)),
DataType::Float32 => self
.builder::<Float32Builder>()?
.append_value(array.as_primitive::<Float32Type>().value(0)),
DataType::Float64 => self
.builder::<Float64Builder>()?
.append_value(array.as_primitive::<Float64Type>().value(0)),
DataType::Decimal128(_, _) => self
.builder::<Decimal128Builder>()?
.append_value(array.as_primitive::<Decimal128Type>().value(0)),
DataType::Date32 => self
.builder::<Date32Builder>()?
.append_value(array.as_primitive::<Date32Type>().value(0)),
DataType::Time64(TimeUnit::Microsecond) => self
.builder::<Time64MicrosecondBuilder>()?
.append_value(array.as_primitive::<Time64MicrosecondType>().value(0)),
DataType::Timestamp(TimeUnit::Microsecond, _) => self
.builder::<TimestampMicrosecondBuilder>()?
.append_value(array.as_primitive::<TimestampMicrosecondType>().value(0)),
DataType::Timestamp(TimeUnit::Nanosecond, _) => self
.builder::<TimestampNanosecondBuilder>()?
.append_value(array.as_primitive::<TimestampNanosecondType>().value(0)),
Comment on lines +893 to +898
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand it's correct to ignore the timezone here because that's not captured in the builder.

DataType::Utf8 => self
.builder::<StringBuilder>()?
.append_value(array.as_string::<i32>().value(0)),
DataType::FixedSizeBinary(_) => self
.builder::<BinaryBuilder>()?
.append_value(array.as_fixed_size_binary().value(0)),
DataType::LargeBinary => self
.builder::<LargeBinaryBuilder>()?
.append_value(array.as_binary::<i64>().value(0)),
_ => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Cannot append data type: {:?}", array.data_type(),),
));
}
}
Ok(())
}

/// Append a literal with the provided [DataType]. We're not solely relying on the literal to
/// infer the type because [Literal] values do not specify the expected type of builder. E.g.,
/// a [PrimitiveLiteral::Long] may go into an array builder for longs but also for timestamps.
pub(crate) fn append_literal(&mut self, value: &Literal) -> crate::Result<()> {
let Some(primitive) = value.as_primitive_literal() else {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Expected primitive type",
));
};

match (&self.data_type, primitive.clone()) {
(DataType::Boolean, PrimitiveLiteral::Boolean(value)) => {
self.builder::<BooleanBuilder>()?.append_value(value)
}
(DataType::Int32, PrimitiveLiteral::Int(value)) => {
self.builder::<Int32Builder>()?.append_value(value)
}
(DataType::Int64, PrimitiveLiteral::Long(value)) => {
self.builder::<Int64Builder>()?.append_value(value)
}
(DataType::Float32, PrimitiveLiteral::Float(OrderedFloat(value))) => {
self.builder::<Float32Builder>()?.append_value(value)
}
(DataType::Float64, PrimitiveLiteral::Double(OrderedFloat(value))) => {
self.builder::<Float64Builder>()?.append_value(value)
}
(DataType::Utf8, PrimitiveLiteral::String(value)) => {
self.builder::<StringBuilder>()?.append_value(value)
}
(DataType::FixedSizeBinary(_), PrimitiveLiteral::Binary(value)) => self
.builder::<FixedSizeBinaryBuilder>()?
.append_value(value)?,
(DataType::LargeBinary, PrimitiveLiteral::Binary(value)) => {
self.builder::<LargeBinaryBuilder>()?.append_value(value)
}
(_, _) => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Builder of type {:?} does not accept literal {:?}",
self.data_type, primitive
),
));
}
}

Ok(())
}

/// Append a null value for the provided [DataType].
pub(crate) fn append_null(&mut self) -> crate::Result<()> {
match self.data_type {
DataType::Boolean => self.builder::<BooleanBuilder>()?.append_null(),
DataType::Int32 => self.builder::<Int32Builder>()?.append_null(),
DataType::Int64 => self.builder::<Int64Builder>()?.append_null(),
DataType::Float32 => self.builder::<Float32Builder>()?.append_null(),
DataType::Float64 => self.builder::<Float64Builder>()?.append_null(),
DataType::Decimal128(_, _) => self.builder::<Decimal128Builder>()?.append_null(),
DataType::Date32 => self.builder::<Date32Builder>()?.append_null(),
DataType::Time64(TimeUnit::Microsecond) => {
self.builder::<Time64MicrosecondBuilder>()?.append_null()
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
self.builder::<TimestampMicrosecondBuilder>()?.append_null()
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
self.builder::<TimestampNanosecondBuilder>()?.append_null()
}
DataType::Utf8 => self.builder::<StringBuilder>()?.append_null(),
DataType::FixedSizeBinary(_) => {
self.builder::<FixedSizeBinaryBuilder>()?.append_null()
}
DataType::LargeBinary => self.builder::<LargeBinaryBuilder>()?.append_null(),
_ => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Cannot append null values for data type: {:?}",
self.data_type
),
))
}
}
Ok(())
}

/// Cast the `inner` builder to a specific type or return [Error].
fn builder<T: ArrayBuilder>(&mut self) -> crate::Result<&mut T> {
self.inner.as_any_mut().downcast_mut::<T>().ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"Failed to cast builder to expected type",
)
})
}
}
}

impl TryFrom<&ArrowSchema> for crate::spec::Schema {
type Error = Error;

Expand Down
Loading
Loading