Skip to content

Commit

Permalink
Add support constant partitioning columns. (#1571)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #1571

X-link: facebookexternal/presto_cpp#739

Adding support of constant partitioning columns to HashPartitionFunction and HivePartitionFunction.
Few other cosmetic changes.

Differential Revision: D36273727

fbshipit-source-id: eed41cc922e3548703d47a6308a8b2de01dec6e5
  • Loading branch information
Sergey Pershin authored and facebook-github-bot committed May 11, 2022
1 parent 43308b0 commit ed30172
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 72 deletions.
99 changes: 63 additions & 36 deletions velox/connectors/hive/HivePartitionFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,8 @@ void hashTyped<TypeKind::BOOLEAN>(
bool mix,
std::vector<uint32_t>& hashes) {
for (auto i = 0; i < size; ++i) {
uint32_t hash;
if (values.isNullAt(i)) {
hash = 0;
} else {
hash = values.valueAt<bool>(i) ? 1 : 0;
}

const int32_t hash =
(values.isNullAt(i)) ? 0 : (values.valueAt<bool>(i) ? 1 : 0);
hashes[i] = mix ? hashes[i] * 31 + hash : hash;
}
}
Expand All @@ -58,13 +53,8 @@ void hashTyped<TypeKind::BIGINT>(
bool mix,
std::vector<uint32_t>& hashes) {
for (auto i = 0; i < size; ++i) {
int32_t hash;
if (values.isNullAt(i)) {
hash = 0;
} else {
hash = hashInt64(values.valueAt<int64_t>(i));
}

const int32_t hash =
(values.isNullAt(i)) ? 0 : hashInt64(values.valueAt<int64_t>(i));
hashes[i] = mix ? hashes[i] * 31 + hash : hash;
}
}
Expand All @@ -91,13 +81,8 @@ void hashTyped<TypeKind::VARCHAR>(
bool mix,
std::vector<uint32_t>& hashes) {
for (auto i = 0; i < size; ++i) {
uint32_t hash;
if (values.isNullAt(i)) {
hash = 0;
} else {
hash = hashBytes(values.valueAt<StringView>(i), 0);
}

const uint32_t hash =
(values.isNullAt(i)) ? 0 : hashBytes(values.valueAt<StringView>(i), 0);
hashes[i] = mix ? hashes[i] * 31 + hash : hash;
}
}
Expand All @@ -118,44 +103,86 @@ void hash(

VELOX_DYNAMIC_TYPE_DISPATCH(hashTyped, typeKind, values, size, mix, hashes);
}

void hashPrecomputed(
uint32_t precomputedHash,
vector_size_t numRows,
bool mix,
std::vector<uint32_t>& hashes) {
for (auto i = 0; i < numRows; ++i) {
hashes[i] = mix ? hashes[i] * 31 + precomputedHash : precomputedHash;
}
}

// Precompute single value hive hash for a constant partition key.
uint32_t precompute(const variant& value) {
if (not value.isNull()) {
switch (value.kind()) {
case TypeKind::BOOLEAN:
return value.value<bool>() ? 1 : 0;
case TypeKind::BIGINT:
return hashInt64(value.value<int64_t>());
case TypeKind::VARCHAR:
return hashBytes(StringView(value.value<StringView>()), 0);
default:
VELOX_UNSUPPORTED(
"Hive partitioning function doesn't support {} type",
mapTypeKindToName(value.kind()));
}
}
return 0; // In case of NULL.
}
} // namespace

HivePartitionFunction::HivePartitionFunction(
int numBuckets,
std::vector<int> bucketToPartition,
std::vector<ChannelIndex> keyChannels)
std::vector<ChannelIndex> keyChannels,
const std::vector<std::shared_ptr<const core::ConstantTypedExpr>>&
constExprs)
: numBuckets_{numBuckets},
bucketToPartition_{bucketToPartition},
keyChannels_{std::move(keyChannels)} {
decodedVectors_.resize(keyChannels_.size());
precomputedHashes_.resize(keyChannels_.size());
size_t constChannel{0};
for (auto i = 0; i < keyChannels_.size(); ++i) {
if (keyChannels_[i] == kConstantChannel) {
precomputedHashes_[i] = precompute(constExprs[constChannel++]->value());
}
}
}

void HivePartitionFunction::partition(
const RowVector& input,
std::vector<uint32_t>& partitions) {
auto size = input.size();
const auto numRows = input.size();

if (size > hashes_.size()) {
rows_.resize(size);
if (numRows > hashes_.size()) {
rows_.resize(numRows);
rows_.setAll();
hashes_.resize(size);
hashes_.resize(numRows);
}

partitions.resize(size);
partitions.resize(numRows);
for (auto i = 0; i < keyChannels_.size(); ++i) {
auto keyVector = input.childAt(keyChannels_[i]);
decodedVectors_[i].decode(*keyVector, rows_);
hash(
decodedVectors_[i],
keyVector->typeKind(),
keyVector->size(),
i > 0,
hashes_);
if (keyChannels_[i] != kConstantChannel) {
const auto& keyVector = input.childAt(keyChannels_[i]);
decodedVectors_[i].decode(*keyVector, rows_);
hash(
decodedVectors_[i],
keyVector->typeKind(),
keyVector->size(),
i > 0,
hashes_);
} else {
hashPrecomputed(precomputedHashes_[i], numRows, i > 0, hashes_);
}
}

static const int32_t kInt32Max = std::numeric_limits<int32_t>::max();

for (auto i = 0; i < size; ++i) {
for (auto i = 0; i < numRows; ++i) {
partitions[i] =
bucketToPartition_[((hashes_[i] & kInt32Max) % numBuckets_)];
}
Expand Down
6 changes: 5 additions & 1 deletion velox/connectors/hive/HivePartitionFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ class HivePartitionFunction : public core::PartitionFunction {
HivePartitionFunction(
int numBuckets,
std::vector<int> bucketToPartition,
std::vector<ChannelIndex> keyChannels);
std::vector<ChannelIndex> keyChannels,
const std::vector<std::shared_ptr<const core::ConstantTypedExpr>>&
constExprs = {});

~HivePartitionFunction() override = default;

Expand All @@ -41,5 +43,7 @@ class HivePartitionFunction : public core::PartitionFunction {
std::vector<uint32_t> hashes_;
SelectivityVector rows_;
std::vector<DecodedVector> decodedVectors_;
// Precomputed hashes for constant partition keys (one per key).
std::vector<uint32_t> precomputedHashes_;
};
} // namespace facebook::velox::connector::hive
27 changes: 24 additions & 3 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ AggregationNode::AggregationNode(
}

namespace {
void addKeys(
void addFields(
std::stringstream& stream,
const std::vector<std::shared_ptr<const FieldAccessTypedExpr>>& keys) {
for (auto i = 0; i < keys.size(); ++i) {
Expand All @@ -115,14 +115,35 @@ void addKeys(
stream << keys[i]->name();
}
}

void addKeys(
std::stringstream& stream,
const std::vector<std::shared_ptr<const ITypedExpr>>& keys) {
for (auto i = 0; i < keys.size(); ++i) {
if (i > 0) {
stream << ", ";
}
if (auto field =
std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(
keys[i])) {
stream << field->name();
} else if (
auto constant =
std::dynamic_pointer_cast<const core::ConstantTypedExpr>(keys[i])) {
stream << "<constexpr>";
} else {
stream << "<unknown>";
}
}
}
} // namespace

void AggregationNode::addDetails(std::stringstream& stream) const {
stream << stepName(step_) << " ";

if (!groupingKeys_.empty()) {
stream << "[";
addKeys(stream, groupingKeys_);
addFields(stream, groupingKeys_);
stream << "] ";
}

Expand Down Expand Up @@ -224,7 +245,7 @@ UnnestNode::UnnestNode(
}

void UnnestNode::addDetails(std::stringstream& stream) const {
addKeys(stream, unnestVariables_);
addFields(stream, unnestVariables_);
}

AbstractJoinNode::AbstractJoinNode(
Expand Down
11 changes: 5 additions & 6 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -747,8 +747,7 @@ class PartitionedOutputNode : public PlanNode {
public:
PartitionedOutputNode(
const PlanNodeId& id,
const std::vector<std::shared_ptr<const core::FieldAccessTypedExpr>>&
keys,
const std::vector<std::shared_ptr<const ITypedExpr>>& keys,
int numPartitions,
bool broadcast,
bool replicateNullsAndAny,
Expand Down Expand Up @@ -781,7 +780,7 @@ class PartitionedOutputNode : public PlanNode {
int numPartitions,
RowTypePtr outputType,
PlanNodePtr source) {
std::vector<std::shared_ptr<const core::FieldAccessTypedExpr>> noKeys;
std::vector<std::shared_ptr<const core::ITypedExpr>> noKeys;
return std::make_shared<PartitionedOutputNode>(
id,
noKeys,
Expand All @@ -797,7 +796,7 @@ class PartitionedOutputNode : public PlanNode {

static std::shared_ptr<PartitionedOutputNode>
single(const PlanNodeId& id, RowTypePtr outputType, PlanNodePtr source) {
std::vector<std::shared_ptr<const core::FieldAccessTypedExpr>> noKeys;
std::vector<std::shared_ptr<const core::ITypedExpr>> noKeys;
return std::make_shared<PartitionedOutputNode>(
id,
noKeys,
Expand All @@ -823,7 +822,7 @@ class PartitionedOutputNode : public PlanNode {
return sources_[0]->outputType();
}

const std::vector<std::shared_ptr<const FieldAccessTypedExpr>>& keys() const {
const std::vector<std::shared_ptr<const ITypedExpr>>& keys() const {
return keys_;
}

Expand Down Expand Up @@ -855,7 +854,7 @@ class PartitionedOutputNode : public PlanNode {
void addDetails(std::stringstream& stream) const override;

const std::vector<PlanNodePtr> sources_;
const std::vector<std::shared_ptr<const FieldAccessTypedExpr>> keys_;
const std::vector<std::shared_ptr<const ITypedExpr>> keys_;
const int numPartitions_;
const bool broadcast_;
const bool replicateNullsAndAny_;
Expand Down
33 changes: 24 additions & 9 deletions velox/exec/HashPartitionFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@
namespace facebook::velox::exec {
HashPartitionFunction::HashPartitionFunction(
int numPartitions,
RowTypePtr inputType,
std::vector<ChannelIndex> keyChannels)
: numPartitions_{numPartitions}, keyChannels_{std::move(keyChannels)} {
hashers_.reserve(keyChannels_.size());
for (auto channel : keyChannels_) {
hashers_.emplace_back(
VectorHasher::create(inputType->childAt(channel), channel));
const RowTypePtr& inputType,
const std::vector<ChannelIndex>& keyChannels,
const std::vector<std::shared_ptr<const core::ConstantTypedExpr>>&
constExprs)
: numPartitions_{numPartitions} {
hashers_.reserve(keyChannels.size());
size_t constChannel{0};
for (const auto channel : keyChannels) {
if (channel != kConstantChannel) {
hashers_.emplace_back(
VectorHasher::create(inputType->childAt(channel), channel));
} else {
const auto& constExpr = constExprs[constChannel++];
hashers_.emplace_back(VectorHasher::create(constExpr->type(), channel));
hashers_.back()->precompute(constExpr->value());
}
}
}

Expand All @@ -38,8 +47,14 @@ void HashPartitionFunction::partition(
rows_.setAll();

hashes_.resize(size);
for (auto i = 0; i < keyChannels_.size(); ++i) {
hashers_[i]->hash(*input.childAt(keyChannels_[i]), rows_, i > 0, hashes_);
for (auto i = 0; i < hashers_.size(); ++i) {
auto& hasher = hashers_[i];
if (hasher->channel() != kConstantChannel) {
hashers_[i]->hash(
*input.childAt(hasher->channel()), rows_, i > 0, hashes_);
} else {
hashers_[i]->hashPrecomputed(rows_, i > 0, hashes_);
}
}

partitions.resize(size);
Expand Down
7 changes: 4 additions & 3 deletions velox/exec/HashPartitionFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ class HashPartitionFunction : public core::PartitionFunction {
public:
HashPartitionFunction(
int numPartitions,
RowTypePtr inputType,
std::vector<ChannelIndex> keyChannels);
const RowTypePtr& inputType,
const std::vector<ChannelIndex>& keyChannels,
const std::vector<std::shared_ptr<const core::ConstantTypedExpr>>&
constExprs = {});

~HashPartitionFunction() override = default;

Expand All @@ -34,7 +36,6 @@ class HashPartitionFunction : public core::PartitionFunction {

private:
const int numPartitions_;
const std::vector<ChannelIndex> keyChannels_;
std::vector<std::unique_ptr<VectorHasher>> hashers_;

// Reusable memory.
Expand Down
9 changes: 4 additions & 5 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,11 @@ std::string Operator::toString() const {

std::vector<ChannelIndex> toChannels(
const RowTypePtr& rowType,
const std::vector<std::shared_ptr<const core::FieldAccessTypedExpr>>&
fields) {
const std::vector<std::shared_ptr<const core::ITypedExpr>>& exprs) {
std::vector<ChannelIndex> channels;
channels.reserve(fields.size());
for (const auto& field : fields) {
auto channel = exprToChannel(field.get(), rowType);
channels.reserve(exprs.size());
for (const auto& expr : exprs) {
auto channel = exprToChannel(expr.get(), rowType);
channels.push_back(channel);
}
return channels;
Expand Down
6 changes: 1 addition & 5 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,14 +415,10 @@ class Operator {
dynamicFilters_;
};

constexpr ChannelIndex kConstantChannel =
std::numeric_limits<ChannelIndex>::max();

/// Given a row type returns indices for the specified subset of columns.
std::vector<ChannelIndex> toChannels(
const RowTypePtr& rowType,
const std::vector<std::shared_ptr<const core::FieldAccessTypedExpr>>&
fields);
const std::vector<std::shared_ptr<const core::ITypedExpr>>& exprs);

ChannelIndex exprToChannel(const core::ITypedExpr* expr, const TypePtr& type);

Expand Down
Loading

0 comments on commit ed30172

Please sign in to comment.