diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index ee81b4bf2351..12f521b5cb69 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -413,6 +413,22 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { } break; } + case thrift::Type::INT96: { + auto numBytes = + dictionary_.numValues * (sizeof(int64_t) + sizeof(int32_t)); + dictionary_.values = AlignedBuffer::allocate(numBytes, &pool_); + if (pageData_) { + memcpy(dictionary_.values->asMutable(), pageData_, numBytes); + } else { + dwio::common::readBytes( + numBytes, + inputStream_.get(), + dictionary_.values->asMutable(), + bufferStart_, + bufferEnd_); + } + break; + } case thrift::Type::BYTE_ARRAY: { dictionary_.values = AlignedBuffer::allocate(dictionary_.numValues, &pool_); @@ -505,7 +521,6 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { VELOX_UNSUPPORTED( "Parquet type {} not supported for dictionary", parquetType); } - case thrift::Type::INT96: default: VELOX_UNSUPPORTED( "Parquet type {} not supported for dictionary", parquetType); @@ -532,6 +547,8 @@ int32_t parquetTypeBytes(thrift::Type::type type) { case thrift::Type::INT64: case thrift::Type::DOUBLE: return 8; + case thrift::Type::INT96: + return 12; default: VELOX_FAIL("Type does not have a byte width {}", type); } diff --git a/velox/dwio/parquet/reader/ParquetColumnReader.cpp b/velox/dwio/parquet/reader/ParquetColumnReader.cpp index 664e95ed4b32..1df5876da7c4 100644 --- a/velox/dwio/parquet/reader/ParquetColumnReader.cpp +++ b/velox/dwio/parquet/reader/ParquetColumnReader.cpp @@ -28,6 +28,7 @@ #include "velox/dwio/parquet/reader/StructColumnReader.h" #include "velox/dwio/parquet/reader/Statistics.h" +#include "velox/dwio/parquet/reader/TimestampColumnReader.h" #include "velox/dwio/parquet/thrift/ParquetThriftTypes.h" namespace facebook::velox::parquet { @@ -73,6 +74,10 @@ std::unique_ptr ParquetColumnReader::build( case TypeKind::BOOLEAN: return std::make_unique(dataType, params, scanSpec); + case TypeKind::TIMESTAMP: + return std::make_unique( + dataType, params, scanSpec); + default: VELOX_FAIL( "buildReader unhandled type: " + diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 38831b28c4f7..c32b481575ea 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -410,7 +410,7 @@ TypePtr ReaderBase::convertType( case thrift::Type::type::INT64: return BIGINT(); case thrift::Type::type::INT96: - return DOUBLE(); // TODO: Lose precision + return TIMESTAMP(); case thrift::Type::type::FLOAT: return REAL(); case thrift::Type::type::DOUBLE: diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h new file mode 100644 index 000000000000..12e2ab609257 --- /dev/null +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -0,0 +1,96 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/parquet/reader/IntegerColumnReader.h" +#include "velox/dwio/parquet/reader/ParquetColumnReader.h" + +namespace facebook::velox::parquet { + +class TimestampColumnReader : public IntegerColumnReader { + public: + TimestampColumnReader( + const std::shared_ptr& nodeType, + ParquetParams& params, + common::ScanSpec& scanSpec) + : IntegerColumnReader(nodeType, nodeType, params, scanSpec) {} + + void read( + vector_size_t offset, + RowSet rows, + const uint64_t* /*incomingNulls*/) override { + auto& data = formatData_->as(); + // Use int128_t instead because of the lack of int96 implementation. + prepareRead(offset, rows, nullptr); + readCommon(rows); + } + + void getValues(RowSet rows, VectorPtr* result) override { + auto type = nodeType_->type; + VELOX_CHECK(type->kind() == TypeKind::TIMESTAMP, "Timestamp expected."); + VELOX_CHECK_NE(valueSize_, kNoValueSize); + VELOX_CHECK(mayGetValues_); + if (allNull_) { + *result = std::make_shared>( + &memoryPool_, + rows.size(), + true, + type, + Timestamp(), + SimpleVectorStats{}, + sizeof(Timestamp) * rows.size()); + return; + } + VELOX_CHECK_LE(rows.size(), numValues_); + VELOX_CHECK(!rows.empty()); + if (!values_) { + return; + } + + auto tsValues = AlignedBuffer::allocate( + numValues_, &memoryPool_, Timestamp()); + auto* valuesPtr = tsValues->asMutable(); + BufferPtr nulls = anyNulls_ + ? (returnReaderNulls_ ? nullsInReadRange_ : resultNulls_) + : nullptr; + char* rawValues = reinterpret_cast(rawValues_); + int sizeOfInt96 = sizeof(int64_t) + sizeof(int32_t); + for (size_t i = 0; i < numValues_; i++) { + uint64_t nanos; + memcpy(&nanos, rawValues + i * sizeOfInt96, sizeof(uint64_t)); + int32_t days; + memcpy( + &days, + rawValues + i * sizeOfInt96 + sizeof(uint64_t), + sizeof(int32_t)); + // Convert the timestamp into seconds and nanos since the Unix epoch, + // 00:00:00.000000 on 1 January 1970. The magic number `2440588` is the + // julian day for 1 January 1970. + valuesPtr[i] = Timestamp((days - 2440588) * 86400, nanos); + } + + *result = std::make_shared>( + &memoryPool_, + type, + nulls, + numValues_, + tsValues, + std::move(stringBuffers_)); + } +}; + +} // namespace facebook::velox::parquet