Skip to content

Commit

Permalink
Remove Int96Converter (apache#2480)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Aug 17, 2022
1 parent 4e2c49f commit dba2287
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 89 deletions.
31 changes: 8 additions & 23 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use crate::arrow::array_reader::{
};
use crate::arrow::buffer::converter::{
DecimalArrayConverter, DecimalByteArrayConvert, DecimalFixedLengthByteArrayConverter,
FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter,
Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
FixedLenBinaryConverter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter,
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
IntervalYearMonthConverter,
};
use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
Expand Down Expand Up @@ -182,26 +182,11 @@ fn build_primitive_reader(
column_desc,
arrow_type,
)?)),
PhysicalType::INT96 => {
// get the optional timezone information from arrow type
let timezone = arrow_type.as_ref().and_then(|data_type| {
if let DataType::Timestamp(_, tz) = data_type {
tz.clone()
} else {
None
}
});
let converter = Int96Converter::new(Int96ArrayConverter { timezone });
Ok(Box::new(ComplexObjectArrayReader::<
Int96Type,
Int96Converter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
PhysicalType::INT96 => Ok(Box::new(PrimitiveArrayReader::<Int96Type>::new(
page_iterator,
column_desc,
arrow_type,
)?)),
PhysicalType::FLOAT => Ok(Box::new(PrimitiveArrayReader::<FloatType>::new(
page_iterator,
column_desc,
Expand Down
123 changes: 84 additions & 39 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,25 @@ use crate::arrow::record_reader::RecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::Type as PhysicalType;
use crate::column::page::PageIterator;
use crate::data_type::DataType;
use crate::data_type::{DataType, Int96};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow::array::{Array, ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder, Decimal128Array, Float32Array, Float64Array, Int32Array, Int64Array};
use arrow::array::{
Array, ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder,
Decimal128Array, Float32Array, Float64Array, Int32Array, Int64Array,
TimestampNanosecondArray, TimestampNanosecondBufferBuilder,
};
use arrow::buffer::Buffer;
use arrow::datatypes::DataType as ArrowType;
use arrow::datatypes::{DataType as ArrowType, TimeUnit};
use std::any::Any;
use std::sync::Arc;

/// Primitive array readers are leaves of array reader tree. They accept page iterator
/// and read them into primitive arrays.
pub struct PrimitiveArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
where
T: DataType,
T::T: ScalarValue,
{
data_type: ArrowType,
pages: Box<dyn PageIterator>,
Expand All @@ -45,9 +49,9 @@ pub struct PrimitiveArrayReader<T>
}

impl<T> PrimitiveArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
where
T: DataType,
T::T: ScalarValue,
{
/// Construct primitive array reader.
pub fn new(
Expand Down Expand Up @@ -77,9 +81,9 @@ impl<T> PrimitiveArrayReader<T>

/// Implementation of primitive array reader.
impl<T> ArrayReader for PrimitiveArrayReader<T>
where
T: DataType,
T::T: ScalarValue,
where
T: DataType,
T::T: ScalarValue,
{
fn as_any(&self) -> &dyn Any {
self
Expand All @@ -95,7 +99,7 @@ impl<T> ArrayReader for PrimitiveArrayReader<T>
}

fn consume_batch(&mut self) -> Result<ArrayRef> {
let target_type = self.get_data_type().clone();
let target_type = &self.data_type;
let arrow_data_type = match T::get_physical_type() {
PhysicalType::BOOLEAN => ArrowType::Boolean,
PhysicalType::INT32 => {
Expand All @@ -120,9 +124,11 @@ impl<T> ArrayReader for PrimitiveArrayReader<T>
}
PhysicalType::FLOAT => ArrowType::Float32,
PhysicalType::DOUBLE => ArrowType::Float64,
PhysicalType::INT96
| PhysicalType::BYTE_ARRAY
| PhysicalType::FIXED_LEN_BYTE_ARRAY => {
PhysicalType::INT96 => match target_type {
ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
_ => unreachable!("INT96 must be timestamp nanosecond"),
},
PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
unreachable!(
"PrimitiveArrayReaders don't support complex physical types"
);
Expand All @@ -132,16 +138,31 @@ impl<T> ArrayReader for PrimitiveArrayReader<T>
// Convert to arrays by using the Parquet physical type.
// The physical types are then cast to Arrow types if necessary

let mut record_data = self.record_reader.consume_record_data();
let record_data = self.record_reader.consume_record_data();
let record_data = match T::get_physical_type() {
PhysicalType::BOOLEAN => {
let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len());

if T::get_physical_type() == PhysicalType::BOOLEAN {
let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len());
for e in record_data.as_slice() {
boolean_buffer.append(*e > 0);
}
boolean_buffer.finish()
}
PhysicalType::INT96 => {
// SAFETY - record_data is an aligned buffer of Int96
let (prefix, slice, suffix) =
unsafe { record_data.as_slice().align_to::<Int96>() };
assert!(prefix.is_empty() && suffix.is_empty());

let mut builder = TimestampNanosecondBufferBuilder::new(slice.len());
for v in slice {
builder.append(v.to_nanos())
}

for e in record_data.as_slice() {
boolean_buffer.append(*e > 0);
builder.finish()
}
record_data = boolean_buffer.finish();
}
_ => record_data,
};

let array_data = ArrayDataBuilder::new(arrow_data_type)
.len(self.record_reader.num_values())
Expand All @@ -155,9 +176,10 @@ impl<T> ArrayReader for PrimitiveArrayReader<T>
PhysicalType::INT64 => Arc::new(Int64Array::from(array_data)) as ArrayRef,
PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)) as ArrayRef,
PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)) as ArrayRef,
PhysicalType::INT96
| PhysicalType::BYTE_ARRAY
| PhysicalType::FIXED_LEN_BYTE_ARRAY => {
PhysicalType::INT96 => {
Arc::new(TimestampNanosecondArray::from(array_data)) as ArrayRef
}
PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
unreachable!(
"PrimitiveArrayReaders don't support complex physical types"
);
Expand Down Expand Up @@ -204,7 +226,7 @@ impl<T> ArrayReader for PrimitiveArrayReader<T>
));
}
}
.with_precision_and_scale(p, s)?;
.with_precision_and_scale(*p, *s)?;

Arc::new(array) as ArrayRef
}
Expand Down Expand Up @@ -243,11 +265,11 @@ mod tests {
use crate::util::test_common::rand_gen::make_pages;
use crate::util::InMemoryPageIterator;
use arrow::array::{Array, PrimitiveArray};
use arrow::datatypes::{ArrowPrimitiveType};
use arrow::datatypes::ArrowPrimitiveType;

use arrow::datatypes::DataType::Decimal128;
use rand::distributions::uniform::SampleUniform;
use std::collections::VecDeque;
use arrow::datatypes::DataType::Decimal128;

#[allow(clippy::too_many_arguments)]
fn make_column_chunks<T: DataType>(
Expand Down Expand Up @@ -313,7 +335,7 @@ mod tests {
column_desc,
None,
)
.unwrap();
.unwrap();

// expect no values to be read
let array = array_reader.next_batch(50).unwrap();
Expand Down Expand Up @@ -360,7 +382,7 @@ mod tests {
column_desc,
None,
)
.unwrap();
.unwrap();

// Read first 50 values, which are all from the first column chunk
let array = array_reader.next_batch(50).unwrap();
Expand Down Expand Up @@ -560,7 +582,7 @@ mod tests {
column_desc,
None,
)
.unwrap();
.unwrap();

let mut accu_len: usize = 0;

Expand Down Expand Up @@ -602,7 +624,6 @@ mod tests {
}
}


#[test]
fn test_primitive_array_reader_decimal_types() {
// parquet `INT32` to decimal
Expand Down Expand Up @@ -641,18 +662,30 @@ mod tests {
column_desc,
None,
)
.unwrap();
.unwrap();

// read data from the reader
// the data type is decimal(8,2)
let array = array_reader.next_batch(50).unwrap();
assert_eq!(array.data_type(), &Decimal128(8, 2));
let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
let data_decimal_array = data[0..50].iter().copied().map(|v| Some(v as i128)).collect::<Decimal128Array>().with_precision_and_scale(8, 2).unwrap();
let data_decimal_array = data[0..50]
.iter()
.copied()
.map(|v| Some(v as i128))
.collect::<Decimal128Array>()
.with_precision_and_scale(8, 2)
.unwrap();
assert_eq!(array, &data_decimal_array);

// not equal with different data type(precision and scale)
let data_decimal_array = data[0..50].iter().copied().map(|v| Some(v as i128)).collect::<Decimal128Array>().with_precision_and_scale(9, 0).unwrap();
let data_decimal_array = data[0..50]
.iter()
.copied()
.map(|v| Some(v as i128))
.collect::<Decimal128Array>()
.with_precision_and_scale(9, 0)
.unwrap();
assert_ne!(array, &data_decimal_array)
}

Expand Down Expand Up @@ -692,18 +725,30 @@ mod tests {
column_desc,
None,
)
.unwrap();
.unwrap();

// read data from the reader
// the data type is decimal(18,4)
let array = array_reader.next_batch(50).unwrap();
assert_eq!(array.data_type(), &Decimal128(18, 4));
let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
let data_decimal_array = data[0..50].iter().copied().map(|v| Some(v as i128)).collect::<Decimal128Array>().with_precision_and_scale(18, 4).unwrap();
let data_decimal_array = data[0..50]
.iter()
.copied()
.map(|v| Some(v as i128))
.collect::<Decimal128Array>()
.with_precision_and_scale(18, 4)
.unwrap();
assert_eq!(array, &data_decimal_array);

// not equal with different data type(precision and scale)
let data_decimal_array = data[0..50].iter().copied().map(|v| Some(v as i128)).collect::<Decimal128Array>().with_precision_and_scale(34, 0).unwrap();
let data_decimal_array = data[0..50]
.iter()
.copied()
.map(|v| Some(v as i128))
.collect::<Decimal128Array>()
.with_precision_and_scale(34, 0)
.unwrap();
assert_ne!(array, &data_decimal_array)
}
}
Expand Down
20 changes: 18 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ mod tests {
use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
use crate::data_type::{
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
FixedLenByteArrayType, Int32Type, Int64Type,
FixedLenByteArrayType, Int32Type, Int64Type, Int96Type,
};
use crate::errors::Result;
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
Expand Down Expand Up @@ -744,7 +744,7 @@ mod tests {
impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
fn gen(len: i32) -> FixedLenByteArray {
let mut v = vec![0u8; len as usize];
rand::thread_rng().fill_bytes(&mut v);
thread_rng().fill_bytes(&mut v);
ByteArray::from(v).into()
}
}
Expand Down Expand Up @@ -773,6 +773,22 @@ mod tests {
);
}

#[test]
fn test_int96_single_column_reader_test() {
let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2,
ConvertedType::NONE,
None,
|vals| {
Arc::new(TimestampNanosecondArray::from_iter(
vals.iter().map(|x| x.map(|x| x.to_nanos())),
)) as _
},
encodings,
);
}

struct RandUtf8Gen {}

impl RandGen<ByteArrayType> for RandUtf8Gen {
Expand Down
23 changes: 2 additions & 21 deletions parquet/src/arrow/buffer/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::data_type::{ByteArray, FixedLenByteArray, Int96};
use crate::data_type::{ByteArray, FixedLenByteArray};
use arrow::array::{
Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, FixedSizeBinaryBuilder,
IntervalDayTimeArray, IntervalDayTimeBuilder, IntervalYearMonthArray,
IntervalYearMonthBuilder, TimestampNanosecondArray,
IntervalYearMonthBuilder,
};
use std::sync::Arc;

Expand Down Expand Up @@ -169,22 +169,6 @@ impl Converter<Vec<Option<FixedLenByteArray>>, IntervalDayTimeArray>
}
}

pub struct Int96ArrayConverter {
pub timezone: Option<String>,
}

impl Converter<Vec<Option<Int96>>, TimestampNanosecondArray> for Int96ArrayConverter {
fn convert(&self, source: Vec<Option<Int96>>) -> Result<TimestampNanosecondArray> {
Ok(TimestampNanosecondArray::from_opt_vec(
source
.into_iter()
.map(|int96| int96.map(|val| val.to_i64() * 1_000_000))
.collect(),
self.timezone.clone(),
))
}
}

#[cfg(test)]
pub struct Utf8ArrayConverter {}

Expand Down Expand Up @@ -212,9 +196,6 @@ impl Converter<Vec<Option<ByteArray>>, StringArray> for Utf8ArrayConverter {
pub type Utf8Converter =
ArrayRefConverter<Vec<Option<ByteArray>>, StringArray, Utf8ArrayConverter>;

pub type Int96Converter =
ArrayRefConverter<Vec<Option<Int96>>, TimestampNanosecondArray, Int96ArrayConverter>;

pub type FixedLenBinaryConverter = ArrayRefConverter<
Vec<Option<FixedLenByteArray>>,
FixedSizeBinaryArray,
Expand Down
Loading

0 comments on commit dba2287

Please sign in to comment.