Skip to content

Commit

Permalink
support timestamp reader
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Apr 21, 2023
1 parent 5be966c commit ad309ea
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 2 deletions.
19 changes: 18 additions & 1 deletion velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,22 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
}
break;
}
case thrift::Type::INT96: {
auto numBytes =
dictionary_.numValues * (sizeof(int64_t) + sizeof(int32_t));
dictionary_.values = AlignedBuffer::allocate<char>(numBytes, &pool_);
if (pageData_) {
memcpy(dictionary_.values->asMutable<char>(), pageData_, numBytes);
} else {
dwio::common::readBytes(
numBytes,
inputStream_.get(),
dictionary_.values->asMutable<char>(),
bufferStart_,
bufferEnd_);
}
break;
}
case thrift::Type::BYTE_ARRAY: {
dictionary_.values =
AlignedBuffer::allocate<StringView>(dictionary_.numValues, &pool_);
Expand Down Expand Up @@ -505,7 +521,6 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
VELOX_UNSUPPORTED(
"Parquet type {} not supported for dictionary", parquetType);
}
case thrift::Type::INT96:
default:
VELOX_UNSUPPORTED(
"Parquet type {} not supported for dictionary", parquetType);
Expand All @@ -532,6 +547,8 @@ int32_t parquetTypeBytes(thrift::Type::type type) {
case thrift::Type::INT64:
case thrift::Type::DOUBLE:
return 8;
case thrift::Type::INT96:
return 12;
default:
VELOX_FAIL("Type does not have a byte width {}", type);
}
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "velox/dwio/parquet/reader/StructColumnReader.h"

#include "velox/dwio/parquet/reader/Statistics.h"
#include "velox/dwio/parquet/reader/TimestampColumnReader.h"
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"

namespace facebook::velox::parquet {
Expand Down Expand Up @@ -73,6 +74,10 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
case TypeKind::BOOLEAN:
return std::make_unique<BooleanColumnReader>(dataType, params, scanSpec);

case TypeKind::TIMESTAMP:
return std::make_unique<TimestampColumnReader>(
dataType, params, scanSpec);

default:
VELOX_FAIL(
"buildReader unhandled type: " +
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ TypePtr ReaderBase::convertType(
case thrift::Type::type::INT64:
return BIGINT();
case thrift::Type::type::INT96:
return DOUBLE(); // TODO: Lose precision
return TIMESTAMP();
case thrift::Type::type::FLOAT:
return REAL();
case thrift::Type::type::DOUBLE:
Expand Down
96 changes: 96 additions & 0 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/parquet/reader/IntegerColumnReader.h"
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"

namespace facebook::velox::parquet {

class TimestampColumnReader : public IntegerColumnReader {
public:
TimestampColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& nodeType,
ParquetParams& params,
common::ScanSpec& scanSpec)
: IntegerColumnReader(nodeType, nodeType, params, scanSpec) {}

void read(
vector_size_t offset,
RowSet rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t instead because of the lack of int96 implementation.
prepareRead<int128_t>(offset, rows, nullptr);
readCommon<IntegerColumnReader>(rows);
}

void getValues(RowSet rows, VectorPtr* result) override {
auto type = nodeType_->type;
VELOX_CHECK(type->kind() == TypeKind::TIMESTAMP, "Timestamp expected.");
VELOX_CHECK_NE(valueSize_, kNoValueSize);
VELOX_CHECK(mayGetValues_);
if (allNull_) {
*result = std::make_shared<ConstantVector<Timestamp>>(
&memoryPool_,
rows.size(),
true,
type,
Timestamp(),
SimpleVectorStats<Timestamp>{},
sizeof(Timestamp) * rows.size());
return;
}
VELOX_CHECK_LE(rows.size(), numValues_);
VELOX_CHECK(!rows.empty());
if (!values_) {
return;
}

auto tsValues = AlignedBuffer::allocate<Timestamp>(
numValues_, &memoryPool_, Timestamp());
auto* valuesPtr = tsValues->asMutable<Timestamp>();
BufferPtr nulls = anyNulls_
? (returnReaderNulls_ ? nullsInReadRange_ : resultNulls_)
: nullptr;
char* rawValues = reinterpret_cast<char*>(rawValues_);
int sizeOfInt96 = sizeof(int64_t) + sizeof(int32_t);
for (size_t i = 0; i < numValues_; i++) {
uint64_t nanos;
memcpy(&nanos, rawValues + i * sizeOfInt96, sizeof(uint64_t));
int32_t days;
memcpy(
&days,
rawValues + i * sizeOfInt96 + sizeof(uint64_t),
sizeof(int32_t));
// Convert the timestamp into seconds and nanos since the Unix epoch,
// 00:00:00.000000 on 1 January 1970. The magic number `2440588` is the
// julian day for 1 January 1970.
valuesPtr[i] = Timestamp((days - 2440588) * 86400, nanos);
}

*result = std::make_shared<FlatVector<Timestamp>>(
&memoryPool_,
type,
nulls,
numValues_,
tsValues,
std::move(stringBuffers_));
}
};

} // namespace facebook::velox::parquet

0 comments on commit ad309ea

Please sign in to comment.