Skip to content

Commit

Permalink
Table writer 7: ID generator for distinct partitions (#3416)
Browse files Browse the repository at this point in the history
Summary:
Add an ID generator class PartitionIdGenerator that
generates sequential integer IDs for distinct partition
values. It takes partition RowVector as input and returns
the generated IDs for the corresponding rows.

At the moment this PartitionIdGenerator only supports
single partition. The implementation is based on a single
VectorHasher, which the users could assume to generate
sequential consecutive IDs when adding new partitions.

The limit on the number of partitions that could be processed
is enforced by the user specified "maxPartitions".

Pull Request resolved: #3416

Reviewed By: mbasmanova

Differential Revision: D41725059

Pulled By: gggrace14

fbshipit-source-id: 3e875f1104206572efd764d77f0959c2f5d67e22
  • Loading branch information
gggrace14 authored and facebook-github-bot committed Dec 22, 2022
1 parent b6cc877 commit 997d1e2
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 4 deletions.
5 changes: 3 additions & 2 deletions velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(velox_hive_connector OBJECT HiveConnector.cpp HiveDataSink.cpp
FileHandle.cpp HiveWriteProtocol.cpp)
add_library(
velox_hive_connector OBJECT HiveConnector.cpp HiveDataSink.cpp FileHandle.cpp
HiveWriteProtocol.cpp PartitionIdGenerator.cpp)

target_link_libraries(velox_hive_connector velox_connector
velox_dwio_dwrf_reader velox_dwio_dwrf_writer velox_file)
Expand Down
64 changes: 64 additions & 0 deletions velox/connectors/hive/PartitionIdGenerator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/PartitionIdGenerator.h"

namespace facebook::velox::connector::hive {

PartitionIdGenerator::PartitionIdGenerator(
const RowTypePtr& inputType,
std::vector<column_index_t> partitionChannels,
uint32_t maxPartitions)
: partitionChannels_(std::move(partitionChannels)),
maxPartitions_(maxPartitions) {
VELOX_USER_CHECK_EQ(
partitionChannels_.size(),
1,
"Multiple partition keys are not supported yet.");
hasher_ = exec::VectorHasher::create(
inputType->childAt(partitionChannels_[0]), partitionChannels_[0]);
}

void PartitionIdGenerator::run(
const RowVectorPtr& input,
raw_vector<uint64_t>& result) {
result.resize(input->size());
allRows_.resize(input->size());
allRows_.setAll();

auto partitionVector = input->childAt(hasher_->channel())->loadedVector();
hasher_->decode(*partitionVector, allRows_);

if (!hasher_->computeValueIds(allRows_, result)) {
uint64_t range = hasher_->enableValueIds(1, kHasherReservePct);
VELOX_CHECK_NE(
range,
exec::VectorHasher::kRangeTooLarge,
"Number of requested IDs is out of range.");

VELOX_CHECK(
hasher_->computeValueIds(allRows_, result),
"Cannot assign new value IDs.");
}

recentMaxId_ = *std::max_element(result.begin(), result.end());
maxId_ = std::max(maxId_, recentMaxId_);

VELOX_USER_CHECK_LE(
maxId_, maxPartitions_, "Exceeded limit of distinct partitions.");
}

} // namespace facebook::velox::connector::hive
65 changes: 65 additions & 0 deletions velox/connectors/hive/PartitionIdGenerator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/exec/VectorHasher.h"

namespace facebook::velox::connector::hive {
/// Generate sequential integer IDs for distinct partition values, which could
/// be used as vector index. Only single partition key is supported at the
/// moment.
class PartitionIdGenerator {
public:
/// @param inputType RowType of the input.
/// @param partitionChannels Channels of partition keys in the input
/// RowVector.
/// @param maxPartitions The max number of distinct partitions.
PartitionIdGenerator(
const RowTypePtr& inputType,
std::vector<column_index_t> partitionChannels,
uint32_t maxPartitions);

/// Generate sequential partition IDs for input vector.
/// @param input Input RowVector.
/// @param result Generated integer IDs indexed by input row number.
void run(const RowVectorPtr& input, raw_vector<uint64_t>& result);

/// Return the maximum partition ID generated for the most recent input.
uint64_t recentMaxPartitionId() const {
return recentMaxId_;
}

private:
static constexpr const int32_t kHasherReservePct = 20;

const std::vector<column_index_t> partitionChannels_;

const uint32_t maxPartitions_;

std::unique_ptr<exec::VectorHasher> hasher_;

// Maximum partition ID generated for the most recent input.
uint64_t recentMaxId_ = 0;

// Maximum partition ID generated for all inputs received so far.
uint64_t maxId_ = 0;

// All rows are set valid to compute partition IDs for all input rows.
SelectivityVector allRows_;
};

} // namespace facebook::velox::connector::hive
5 changes: 3 additions & 2 deletions velox/connectors/hive/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
add_executable(
velox_hive_connector_test HivePartitionFunctionTest.cpp FileHandleTest.cpp
HiveWriteProtocolTest.cpp)
velox_hive_connector_test
HivePartitionFunctionTest.cpp FileHandleTest.cpp HiveWriteProtocolTest.cpp
PartitionIdGeneratorTest.cpp)
add_test(velox_hive_connector_test velox_hive_connector_test)

target_link_libraries(
Expand Down
112 changes: 112 additions & 0 deletions velox/connectors/hive/tests/PartitionIdGeneratorTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/PartitionIdGenerator.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

#include "gtest/gtest.h"

namespace facebook::velox::connector::hive {

class PartitionIdGeneratorTest : public ::testing::Test,
public test::VectorTestBase {};

TEST_F(PartitionIdGeneratorTest, consecutiveIds) {
auto numPartitions = 100;

PartitionIdGenerator idGenerator(ROW({VARCHAR()}), {0}, 100);

auto input = makeRowVector(
{makeFlatVector<StringView>(numPartitions * 3, [&](auto row) {
return StringView(Date(18000 + row % numPartitions).toString());
})});

raw_vector<uint64_t> ids;
idGenerator.run(input, ids);

// distinctIds contains 100 ids in the range of [1, 100] that are consecutive.
std::unordered_set<uint64_t> distinctIds(ids.begin(), ids.end());
EXPECT_EQ(distinctIds.size(), numPartitions);
EXPECT_EQ(*std::min_element(distinctIds.begin(), distinctIds.end()), 1);
EXPECT_EQ(
*std::max_element(distinctIds.begin(), distinctIds.end()), numPartitions);
}

TEST_F(PartitionIdGeneratorTest, stableIds) {
PartitionIdGenerator idGenerator(ROW({BIGINT()}), {0}, 100);

auto numPartitions = 40;
auto input = makeRowVector({
makeFlatVector<int64_t>(numPartitions, [](auto row) { return row; }),
});

auto otherNumPartitions = 60;
auto otherInput = makeRowVector({
makeFlatVector<int64_t>(
otherNumPartitions, [](auto row) { return row * 1000; }),
});

raw_vector<uint64_t> firstTimeIds;
raw_vector<uint64_t> secondTimeIds;
raw_vector<uint64_t> otherIds;
idGenerator.run(input, firstTimeIds);
idGenerator.run(otherInput, otherIds);
idGenerator.run(input, secondTimeIds);

EXPECT_TRUE(std::equal(
firstTimeIds.begin(), firstTimeIds.end(), secondTimeIds.begin()));
}

TEST_F(PartitionIdGeneratorTest, maxPartitionId) {
PartitionIdGenerator idGenerator(ROW({BIGINT()}), {0}, 100);

auto firstMaxPartitionValue = 10;
auto firstInput = makeRowVector({
makeFlatVector<int64_t>(
1000, [&](auto row) { return row % firstMaxPartitionValue; }),
});

auto secondMaxPartitionValue = firstMaxPartitionValue - 1;
auto secondInput = makeRowVector({
makeFlatVector<int64_t>(
1000, [&](auto row) { return row % secondMaxPartitionValue; }),
});

raw_vector<uint64_t> firstIds;
raw_vector<uint64_t> secondIds;
idGenerator.run(firstInput, firstIds);
idGenerator.run(secondInput, secondIds);

EXPECT_EQ(idGenerator.recentMaxPartitionId(), secondMaxPartitionValue);
}

TEST_F(PartitionIdGeneratorTest, limitOfPartitionNumber) {
auto maxPartitions = 100;

PartitionIdGenerator idGenerator(ROW({INTEGER()}), {0}, maxPartitions);

auto input = makeRowVector({
makeFlatVector<int32_t>(maxPartitions + 1, [](auto row) { return row; }),
});

raw_vector<uint64_t> ids;

VELOX_ASSERT_THROW(
idGenerator.run(input, ids), "Exceeded limit of distinct partitions.");
}

} // namespace facebook::velox::connector::hive

0 comments on commit 997d1e2

Please sign in to comment.