From 997d1e2753fa49fcb210eaf2775ed6a71ce9b31f Mon Sep 17 00:00:00 2001 From: Ge Gao Date: Thu, 22 Dec 2022 14:21:35 -0800 Subject: [PATCH] Table writer 7: ID generator for distinct partitions (#3416) Summary: Add an ID generator class PartitionIdGenerator that generates sequential integer IDs for distinct partition values. It takes partition RowVector as input and returns the generated IDs for the corresponding rows. At the moment this PartitionIdGenerator only supports single partition. The implementation is based on a single VectorHasher, which the users could assume to generate sequential consecutive IDs when adding new partitions. The limit on the number of partitions that could be processed is enforced by the user specified "maxPartitions". Pull Request resolved: https://github.com/facebookincubator/velox/pull/3416 Reviewed By: mbasmanova Differential Revision: D41725059 Pulled By: gggrace14 fbshipit-source-id: 3e875f1104206572efd764d77f0959c2f5d67e22 --- velox/connectors/hive/CMakeLists.txt | 5 +- .../connectors/hive/PartitionIdGenerator.cpp | 64 ++++++++++ velox/connectors/hive/PartitionIdGenerator.h | 65 ++++++++++ velox/connectors/hive/tests/CMakeLists.txt | 5 +- .../hive/tests/PartitionIdGeneratorTest.cpp | 112 ++++++++++++++++++ 5 files changed, 247 insertions(+), 4 deletions(-) create mode 100644 velox/connectors/hive/PartitionIdGenerator.cpp create mode 100644 velox/connectors/hive/PartitionIdGenerator.h create mode 100644 velox/connectors/hive/tests/PartitionIdGeneratorTest.cpp diff --git a/velox/connectors/hive/CMakeLists.txt b/velox/connectors/hive/CMakeLists.txt index 968eda9ab6d7..353547ee6cad 100644 --- a/velox/connectors/hive/CMakeLists.txt +++ b/velox/connectors/hive/CMakeLists.txt @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_library(velox_hive_connector OBJECT HiveConnector.cpp HiveDataSink.cpp - FileHandle.cpp HiveWriteProtocol.cpp) +add_library( + velox_hive_connector OBJECT HiveConnector.cpp HiveDataSink.cpp FileHandle.cpp + HiveWriteProtocol.cpp PartitionIdGenerator.cpp) target_link_libraries(velox_hive_connector velox_connector velox_dwio_dwrf_reader velox_dwio_dwrf_writer velox_file) diff --git a/velox/connectors/hive/PartitionIdGenerator.cpp b/velox/connectors/hive/PartitionIdGenerator.cpp new file mode 100644 index 000000000000..e97f45a7d6c4 --- /dev/null +++ b/velox/connectors/hive/PartitionIdGenerator.cpp @@ -0,0 +1,64 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/PartitionIdGenerator.h" + +namespace facebook::velox::connector::hive { + +PartitionIdGenerator::PartitionIdGenerator( + const RowTypePtr& inputType, + std::vector partitionChannels, + uint32_t maxPartitions) + : partitionChannels_(std::move(partitionChannels)), + maxPartitions_(maxPartitions) { + VELOX_USER_CHECK_EQ( + partitionChannels_.size(), + 1, + "Multiple partition keys are not supported yet."); + hasher_ = exec::VectorHasher::create( + inputType->childAt(partitionChannels_[0]), partitionChannels_[0]); +} + +void PartitionIdGenerator::run( + const RowVectorPtr& input, + raw_vector& result) { + result.resize(input->size()); + allRows_.resize(input->size()); + allRows_.setAll(); + + auto partitionVector = input->childAt(hasher_->channel())->loadedVector(); + hasher_->decode(*partitionVector, allRows_); + + if (!hasher_->computeValueIds(allRows_, result)) { + uint64_t range = hasher_->enableValueIds(1, kHasherReservePct); + VELOX_CHECK_NE( + range, + exec::VectorHasher::kRangeTooLarge, + "Number of requested IDs is out of range."); + + VELOX_CHECK( + hasher_->computeValueIds(allRows_, result), + "Cannot assign new value IDs."); + } + + recentMaxId_ = *std::max_element(result.begin(), result.end()); + maxId_ = std::max(maxId_, recentMaxId_); + + VELOX_USER_CHECK_LE( + maxId_, maxPartitions_, "Exceeded limit of distinct partitions."); +} + +} // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/PartitionIdGenerator.h b/velox/connectors/hive/PartitionIdGenerator.h new file mode 100644 index 000000000000..ffc06e70f7af --- /dev/null +++ b/velox/connectors/hive/PartitionIdGenerator.h @@ -0,0 +1,65 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/exec/VectorHasher.h" + +namespace facebook::velox::connector::hive { +/// Generate sequential integer IDs for distinct partition values, which could +/// be used as vector index. Only single partition key is supported at the +/// moment. +class PartitionIdGenerator { + public: + /// @param inputType RowType of the input. + /// @param partitionChannels Channels of partition keys in the input + /// RowVector. + /// @param maxPartitions The max number of distinct partitions. + PartitionIdGenerator( + const RowTypePtr& inputType, + std::vector partitionChannels, + uint32_t maxPartitions); + + /// Generate sequential partition IDs for input vector. + /// @param input Input RowVector. + /// @param result Generated integer IDs indexed by input row number. + void run(const RowVectorPtr& input, raw_vector& result); + + /// Return the maximum partition ID generated for the most recent input. + uint64_t recentMaxPartitionId() const { + return recentMaxId_; + } + + private: + static constexpr const int32_t kHasherReservePct = 20; + + const std::vector partitionChannels_; + + const uint32_t maxPartitions_; + + std::unique_ptr hasher_; + + // Maximum partition ID generated for the most recent input. + uint64_t recentMaxId_ = 0; + + // Maximum partition ID generated for all inputs received so far. + uint64_t maxId_ = 0; + + // All rows are set valid to compute partition IDs for all input rows. + SelectivityVector allRows_; +}; + +} // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/tests/CMakeLists.txt b/velox/connectors/hive/tests/CMakeLists.txt index eb45a305998a..59998ac91151 100644 --- a/velox/connectors/hive/tests/CMakeLists.txt +++ b/velox/connectors/hive/tests/CMakeLists.txt @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. add_executable( - velox_hive_connector_test HivePartitionFunctionTest.cpp FileHandleTest.cpp - HiveWriteProtocolTest.cpp) + velox_hive_connector_test + HivePartitionFunctionTest.cpp FileHandleTest.cpp HiveWriteProtocolTest.cpp + PartitionIdGeneratorTest.cpp) add_test(velox_hive_connector_test velox_hive_connector_test) target_link_libraries( diff --git a/velox/connectors/hive/tests/PartitionIdGeneratorTest.cpp b/velox/connectors/hive/tests/PartitionIdGeneratorTest.cpp new file mode 100644 index 000000000000..907b67fbe056 --- /dev/null +++ b/velox/connectors/hive/tests/PartitionIdGeneratorTest.cpp @@ -0,0 +1,112 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/PartitionIdGenerator.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +#include "gtest/gtest.h" + +namespace facebook::velox::connector::hive { + +class PartitionIdGeneratorTest : public ::testing::Test, + public test::VectorTestBase {}; + +TEST_F(PartitionIdGeneratorTest, consecutiveIds) { + auto numPartitions = 100; + + PartitionIdGenerator idGenerator(ROW({VARCHAR()}), {0}, 100); + + auto input = makeRowVector( + {makeFlatVector(numPartitions * 3, [&](auto row) { + return StringView(Date(18000 + row % numPartitions).toString()); + })}); + + raw_vector ids; + idGenerator.run(input, ids); + + // distinctIds contains 100 ids in the range of [1, 100] that are consecutive. + std::unordered_set distinctIds(ids.begin(), ids.end()); + EXPECT_EQ(distinctIds.size(), numPartitions); + EXPECT_EQ(*std::min_element(distinctIds.begin(), distinctIds.end()), 1); + EXPECT_EQ( + *std::max_element(distinctIds.begin(), distinctIds.end()), numPartitions); +} + +TEST_F(PartitionIdGeneratorTest, stableIds) { + PartitionIdGenerator idGenerator(ROW({BIGINT()}), {0}, 100); + + auto numPartitions = 40; + auto input = makeRowVector({ + makeFlatVector(numPartitions, [](auto row) { return row; }), + }); + + auto otherNumPartitions = 60; + auto otherInput = makeRowVector({ + makeFlatVector( + otherNumPartitions, [](auto row) { return row * 1000; }), + }); + + raw_vector firstTimeIds; + raw_vector secondTimeIds; + raw_vector otherIds; + idGenerator.run(input, firstTimeIds); + idGenerator.run(otherInput, otherIds); + idGenerator.run(input, secondTimeIds); + + EXPECT_TRUE(std::equal( + firstTimeIds.begin(), firstTimeIds.end(), secondTimeIds.begin())); +} + +TEST_F(PartitionIdGeneratorTest, maxPartitionId) { + PartitionIdGenerator idGenerator(ROW({BIGINT()}), {0}, 100); + + auto firstMaxPartitionValue = 10; + auto firstInput = makeRowVector({ + makeFlatVector( + 1000, [&](auto row) { return row % firstMaxPartitionValue; }), + }); + + auto secondMaxPartitionValue = firstMaxPartitionValue - 1; + auto secondInput = makeRowVector({ + makeFlatVector( + 1000, [&](auto row) { return row % secondMaxPartitionValue; }), + }); + + raw_vector firstIds; + raw_vector secondIds; + idGenerator.run(firstInput, firstIds); + idGenerator.run(secondInput, secondIds); + + EXPECT_EQ(idGenerator.recentMaxPartitionId(), secondMaxPartitionValue); +} + +TEST_F(PartitionIdGeneratorTest, limitOfPartitionNumber) { + auto maxPartitions = 100; + + PartitionIdGenerator idGenerator(ROW({INTEGER()}), {0}, maxPartitions); + + auto input = makeRowVector({ + makeFlatVector(maxPartitions + 1, [](auto row) { return row; }), + }); + + raw_vector ids; + + VELOX_ASSERT_THROW( + idGenerator.run(input, ids), "Exceeded limit of distinct partitions."); +} + +} // namespace facebook::velox::connector::hive