From ed3017210c2bd54db970820c13a67ebaf00a9b1a Mon Sep 17 00:00:00 2001 From: Sergey Pershin Date: Tue, 10 May 2022 17:06:27 -0700 Subject: [PATCH] Add support constant partitioning columns. (#1571) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/1571 X-link: https://github.com/facebookexternal/presto_cpp/pull/739 Adding support of constant partitioning columns to HashPartitionFunction and HivePartitionFunction. Few other cosmetic changes. Differential Revision: D36273727 fbshipit-source-id: eed41cc922e3548703d47a6308a8b2de01dec6e5 --- .../connectors/hive/HivePartitionFunction.cpp | 99 ++++++++++++------- velox/connectors/hive/HivePartitionFunction.h | 6 +- velox/core/PlanNode.cpp | 27 ++++- velox/core/PlanNode.h | 11 +-- velox/exec/HashPartitionFunction.cpp | 33 +++++-- velox/exec/HashPartitionFunction.h | 7 +- velox/exec/Operator.cpp | 9 +- velox/exec/Operator.h | 6 +- velox/exec/VectorHasher.cpp | 52 +++++++++- velox/exec/VectorHasher.h | 15 +++ velox/exec/tests/utils/PlanBuilder.cpp | 14 ++- velox/exec/tests/utils/PlanBuilder.h | 3 + velox/vector/ComplexVector.h | 3 + 13 files changed, 213 insertions(+), 72 deletions(-) diff --git a/velox/connectors/hive/HivePartitionFunction.cpp b/velox/connectors/hive/HivePartitionFunction.cpp index 017a545026055..f82a3f9f74aef 100644 --- a/velox/connectors/hive/HivePartitionFunction.cpp +++ b/velox/connectors/hive/HivePartitionFunction.cpp @@ -36,13 +36,8 @@ void hashTyped( bool mix, std::vector& hashes) { for (auto i = 0; i < size; ++i) { - uint32_t hash; - if (values.isNullAt(i)) { - hash = 0; - } else { - hash = values.valueAt(i) ? 1 : 0; - } - + const int32_t hash = + (values.isNullAt(i)) ? 0 : (values.valueAt(i) ? 1 : 0); hashes[i] = mix ? hashes[i] * 31 + hash : hash; } } @@ -58,13 +53,8 @@ void hashTyped( bool mix, std::vector& hashes) { for (auto i = 0; i < size; ++i) { - int32_t hash; - if (values.isNullAt(i)) { - hash = 0; - } else { - hash = hashInt64(values.valueAt(i)); - } - + const int32_t hash = + (values.isNullAt(i)) ? 0 : hashInt64(values.valueAt(i)); hashes[i] = mix ? hashes[i] * 31 + hash : hash; } } @@ -91,13 +81,8 @@ void hashTyped( bool mix, std::vector& hashes) { for (auto i = 0; i < size; ++i) { - uint32_t hash; - if (values.isNullAt(i)) { - hash = 0; - } else { - hash = hashBytes(values.valueAt(i), 0); - } - + const uint32_t hash = + (values.isNullAt(i)) ? 0 : hashBytes(values.valueAt(i), 0); hashes[i] = mix ? hashes[i] * 31 + hash : hash; } } @@ -118,44 +103,86 @@ void hash( VELOX_DYNAMIC_TYPE_DISPATCH(hashTyped, typeKind, values, size, mix, hashes); } + +void hashPrecomputed( + uint32_t precomputedHash, + vector_size_t numRows, + bool mix, + std::vector& hashes) { + for (auto i = 0; i < numRows; ++i) { + hashes[i] = mix ? hashes[i] * 31 + precomputedHash : precomputedHash; + } +} + +// Precompute single value hive hash for a constant partition key. +uint32_t precompute(const variant& value) { + if (not value.isNull()) { + switch (value.kind()) { + case TypeKind::BOOLEAN: + return value.value() ? 1 : 0; + case TypeKind::BIGINT: + return hashInt64(value.value()); + case TypeKind::VARCHAR: + return hashBytes(StringView(value.value()), 0); + default: + VELOX_UNSUPPORTED( + "Hive partitioning function doesn't support {} type", + mapTypeKindToName(value.kind())); + } + } + return 0; // In case of NULL. +} } // namespace HivePartitionFunction::HivePartitionFunction( int numBuckets, std::vector bucketToPartition, - std::vector keyChannels) + std::vector keyChannels, + const std::vector>& + constExprs) : numBuckets_{numBuckets}, bucketToPartition_{bucketToPartition}, keyChannels_{std::move(keyChannels)} { decodedVectors_.resize(keyChannels_.size()); + precomputedHashes_.resize(keyChannels_.size()); + size_t constChannel{0}; + for (auto i = 0; i < keyChannels_.size(); ++i) { + if (keyChannels_[i] == kConstantChannel) { + precomputedHashes_[i] = precompute(constExprs[constChannel++]->value()); + } + } } void HivePartitionFunction::partition( const RowVector& input, std::vector& partitions) { - auto size = input.size(); + const auto numRows = input.size(); - if (size > hashes_.size()) { - rows_.resize(size); + if (numRows > hashes_.size()) { + rows_.resize(numRows); rows_.setAll(); - hashes_.resize(size); + hashes_.resize(numRows); } - partitions.resize(size); + partitions.resize(numRows); for (auto i = 0; i < keyChannels_.size(); ++i) { - auto keyVector = input.childAt(keyChannels_[i]); - decodedVectors_[i].decode(*keyVector, rows_); - hash( - decodedVectors_[i], - keyVector->typeKind(), - keyVector->size(), - i > 0, - hashes_); + if (keyChannels_[i] != kConstantChannel) { + const auto& keyVector = input.childAt(keyChannels_[i]); + decodedVectors_[i].decode(*keyVector, rows_); + hash( + decodedVectors_[i], + keyVector->typeKind(), + keyVector->size(), + i > 0, + hashes_); + } else { + hashPrecomputed(precomputedHashes_[i], numRows, i > 0, hashes_); + } } static const int32_t kInt32Max = std::numeric_limits::max(); - for (auto i = 0; i < size; ++i) { + for (auto i = 0; i < numRows; ++i) { partitions[i] = bucketToPartition_[((hashes_[i] & kInt32Max) % numBuckets_)]; } diff --git a/velox/connectors/hive/HivePartitionFunction.h b/velox/connectors/hive/HivePartitionFunction.h index 8f25c146303b2..0078d750d7d57 100644 --- a/velox/connectors/hive/HivePartitionFunction.h +++ b/velox/connectors/hive/HivePartitionFunction.h @@ -25,7 +25,9 @@ class HivePartitionFunction : public core::PartitionFunction { HivePartitionFunction( int numBuckets, std::vector bucketToPartition, - std::vector keyChannels); + std::vector keyChannels, + const std::vector>& + constExprs = {}); ~HivePartitionFunction() override = default; @@ -41,5 +43,7 @@ class HivePartitionFunction : public core::PartitionFunction { std::vector hashes_; SelectivityVector rows_; std::vector decodedVectors_; + // Precomputed hashes for constant partition keys (one per key). + std::vector precomputedHashes_; }; } // namespace facebook::velox::connector::hive diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 480efc3ccb79f..a4446db3d8feb 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -105,7 +105,7 @@ AggregationNode::AggregationNode( } namespace { -void addKeys( +void addFields( std::stringstream& stream, const std::vector>& keys) { for (auto i = 0; i < keys.size(); ++i) { @@ -115,6 +115,27 @@ void addKeys( stream << keys[i]->name(); } } + +void addKeys( + std::stringstream& stream, + const std::vector>& keys) { + for (auto i = 0; i < keys.size(); ++i) { + if (i > 0) { + stream << ", "; + } + if (auto field = + std::dynamic_pointer_cast( + keys[i])) { + stream << field->name(); + } else if ( + auto constant = + std::dynamic_pointer_cast(keys[i])) { + stream << ""; + } else { + stream << ""; + } + } +} } // namespace void AggregationNode::addDetails(std::stringstream& stream) const { @@ -122,7 +143,7 @@ void AggregationNode::addDetails(std::stringstream& stream) const { if (!groupingKeys_.empty()) { stream << "["; - addKeys(stream, groupingKeys_); + addFields(stream, groupingKeys_); stream << "] "; } @@ -224,7 +245,7 @@ UnnestNode::UnnestNode( } void UnnestNode::addDetails(std::stringstream& stream) const { - addKeys(stream, unnestVariables_); + addFields(stream, unnestVariables_); } AbstractJoinNode::AbstractJoinNode( diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index e69671167759e..a250d5b70f854 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -747,8 +747,7 @@ class PartitionedOutputNode : public PlanNode { public: PartitionedOutputNode( const PlanNodeId& id, - const std::vector>& - keys, + const std::vector>& keys, int numPartitions, bool broadcast, bool replicateNullsAndAny, @@ -781,7 +780,7 @@ class PartitionedOutputNode : public PlanNode { int numPartitions, RowTypePtr outputType, PlanNodePtr source) { - std::vector> noKeys; + std::vector> noKeys; return std::make_shared( id, noKeys, @@ -797,7 +796,7 @@ class PartitionedOutputNode : public PlanNode { static std::shared_ptr single(const PlanNodeId& id, RowTypePtr outputType, PlanNodePtr source) { - std::vector> noKeys; + std::vector> noKeys; return std::make_shared( id, noKeys, @@ -823,7 +822,7 @@ class PartitionedOutputNode : public PlanNode { return sources_[0]->outputType(); } - const std::vector>& keys() const { + const std::vector>& keys() const { return keys_; } @@ -855,7 +854,7 @@ class PartitionedOutputNode : public PlanNode { void addDetails(std::stringstream& stream) const override; const std::vector sources_; - const std::vector> keys_; + const std::vector> keys_; const int numPartitions_; const bool broadcast_; const bool replicateNullsAndAny_; diff --git a/velox/exec/HashPartitionFunction.cpp b/velox/exec/HashPartitionFunction.cpp index 374ac1bbe4ed7..580b4a418a62b 100644 --- a/velox/exec/HashPartitionFunction.cpp +++ b/velox/exec/HashPartitionFunction.cpp @@ -19,13 +19,22 @@ namespace facebook::velox::exec { HashPartitionFunction::HashPartitionFunction( int numPartitions, - RowTypePtr inputType, - std::vector keyChannels) - : numPartitions_{numPartitions}, keyChannels_{std::move(keyChannels)} { - hashers_.reserve(keyChannels_.size()); - for (auto channel : keyChannels_) { - hashers_.emplace_back( - VectorHasher::create(inputType->childAt(channel), channel)); + const RowTypePtr& inputType, + const std::vector& keyChannels, + const std::vector>& + constExprs) + : numPartitions_{numPartitions} { + hashers_.reserve(keyChannels.size()); + size_t constChannel{0}; + for (const auto channel : keyChannels) { + if (channel != kConstantChannel) { + hashers_.emplace_back( + VectorHasher::create(inputType->childAt(channel), channel)); + } else { + const auto& constExpr = constExprs[constChannel++]; + hashers_.emplace_back(VectorHasher::create(constExpr->type(), channel)); + hashers_.back()->precompute(constExpr->value()); + } } } @@ -38,8 +47,14 @@ void HashPartitionFunction::partition( rows_.setAll(); hashes_.resize(size); - for (auto i = 0; i < keyChannels_.size(); ++i) { - hashers_[i]->hash(*input.childAt(keyChannels_[i]), rows_, i > 0, hashes_); + for (auto i = 0; i < hashers_.size(); ++i) { + auto& hasher = hashers_[i]; + if (hasher->channel() != kConstantChannel) { + hashers_[i]->hash( + *input.childAt(hasher->channel()), rows_, i > 0, hashes_); + } else { + hashers_[i]->hashPrecomputed(rows_, i > 0, hashes_); + } } partitions.resize(size); diff --git a/velox/exec/HashPartitionFunction.h b/velox/exec/HashPartitionFunction.h index be2dad7591c33..8f05ea5720488 100644 --- a/velox/exec/HashPartitionFunction.h +++ b/velox/exec/HashPartitionFunction.h @@ -24,8 +24,10 @@ class HashPartitionFunction : public core::PartitionFunction { public: HashPartitionFunction( int numPartitions, - RowTypePtr inputType, - std::vector keyChannels); + const RowTypePtr& inputType, + const std::vector& keyChannels, + const std::vector>& + constExprs = {}); ~HashPartitionFunction() override = default; @@ -34,7 +36,6 @@ class HashPartitionFunction : public core::PartitionFunction { private: const int numPartitions_; - const std::vector keyChannels_; std::vector> hashers_; // Reusable memory. diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index d424855bd8945..6bb116e78d192 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -222,12 +222,11 @@ std::string Operator::toString() const { std::vector toChannels( const RowTypePtr& rowType, - const std::vector>& - fields) { + const std::vector>& exprs) { std::vector channels; - channels.reserve(fields.size()); - for (const auto& field : fields) { - auto channel = exprToChannel(field.get(), rowType); + channels.reserve(exprs.size()); + for (const auto& expr : exprs) { + auto channel = exprToChannel(expr.get(), rowType); channels.push_back(channel); } return channels; diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index f2a1940afb0e4..66cb40fe64c7b 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -415,14 +415,10 @@ class Operator { dynamicFilters_; }; -constexpr ChannelIndex kConstantChannel = - std::numeric_limits::max(); - /// Given a row type returns indices for the specified subset of columns. std::vector toChannels( const RowTypePtr& rowType, - const std::vector>& - fields); + const std::vector>& exprs); ChannelIndex exprToChannel(const core::ITypedExpr* expr, const TypePtr& type); diff --git a/velox/exec/VectorHasher.cpp b/velox/exec/VectorHasher.cpp index 9831c6647ee37..21240c2f3fffd 100644 --- a/velox/exec/VectorHasher.cpp +++ b/velox/exec/VectorHasher.cpp @@ -461,8 +461,54 @@ void VectorHasher::hash( bool mix, raw_vector& result) { decoded_.decode(values, rows); - return VELOX_DYNAMIC_TYPE_DISPATCH( - hashValues, typeKind_, rows, mix, result.data()); + VELOX_DYNAMIC_TYPE_DISPATCH(hashValues, typeKind_, rows, mix, result.data()); +} + +void VectorHasher::hashPrecomputed( + const SelectivityVector& rows, + bool mix, + raw_vector& result) const { + rows.applyToSelected([&](vector_size_t row) { + result[row] = + mix ? bits::hashMix(result[row], precomputedHash_) : precomputedHash_; + }); +} + +template +uint64_t hashOne(const variant& value) { + using T = typename KindToFlatVector::HashRowType; + return folly::hasher()(value.value()); +} + +template <> +uint64_t hashOne(const variant& value) { + return folly::hasher()(value.value()); +} + +template <> +uint64_t hashOne(const variant& value) { + return folly::hasher()(value.value()); +} + +template <> +uint64_t hashOne(const variant& value) { + return folly::hasher()(value.value()); +} + +template <> +uint64_t hashOne(const variant& value) { + return folly::hasher()(value.value()); +} + +template <> +uint64_t hashOne(const variant& value) { + return folly::hasher()(value.value()); +} + +void VectorHasher::precompute(const variant& value) { + precomputedHash_ = (not value.isNull()) + ? VELOX_DYNAMIC_TYPE_DISPATCH(hashOne, value.kind(), value) + : 0UL; } void VectorHasher::analyze( @@ -471,7 +517,7 @@ void VectorHasher::analyze( int32_t offset, int32_t nullByte, uint8_t nullMask) { - return VALUE_ID_TYPE_DISPATCH( + VALUE_ID_TYPE_DISPATCH( analyzeTyped, typeKind_, groups, numGroups, offset, nullByte, nullMask); } diff --git a/velox/exec/VectorHasher.h b/velox/exec/VectorHasher.h index 6f156a6c086f5..12c5c1bca4450 100644 --- a/velox/exec/VectorHasher.h +++ b/velox/exec/VectorHasher.h @@ -170,6 +170,18 @@ class VectorHasher { bool mix, raw_vector& result); + // Computes a hash for 'rows' using precomputedHash_ (just like from a const + // vector) and stores it in 'result'. + // If 'mix' is true, mixes the hash with existing value in 'result'. + void hashPrecomputed( + const SelectivityVector& rows, + bool mix, + raw_vector& result) const; + + // Precompute hash of a given single value into precomputedHash_. Used for + // constant partition keys. + void precompute(const variant& value); + // Computes a normalized key for 'rows' in 'values' and stores this // in 'result'. If this is not the first hasher with normalized // keys, updates the partially computed normalized key in @@ -495,6 +507,9 @@ class VectorHasher { DecodedVector decoded_; raw_vector cachedHashes_; + // Single precomputed hash for constant partition keys. + uint64_t precomputedHash_{0}; + // Members for fast map to int domain for array/normalized key. // Maximum integer mapping. If distinct count exceeds this, // array/normalized key mapping fails. diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 7ab8cc0b4c705..cc3b19711b97c 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -945,7 +945,7 @@ PlanBuilder& PlanBuilder::partitionedOutput( createPartitionFunctionFactory(planNode_->outputType(), keys); planNode_ = std::make_shared( nextPlanNodeId(), - fields(keys), + exprs(keys), numPartitions, false, replicateNullsAndAny, @@ -1187,6 +1187,18 @@ PlanBuilder::fields(const std::vector& indices) { return fields(planNode_->outputType(), indices); } +std::vector> PlanBuilder::exprs( + const std::vector& names) { + auto flds = fields(planNode_->outputType(), names); + std::vector> expressions; + expressions.reserve(flds.size()); + for (const auto& fld : flds) { + expressions.emplace_back( + std::dynamic_pointer_cast(fld)); + } + return expressions; +} + std::shared_ptr PlanBuilder::inferTypes( const std::shared_ptr& untypedExpr) { return core::Expressions::inferTypes( diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 834170050a966..9174a5639fce0 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -655,6 +655,9 @@ class PlanBuilder { std::shared_ptr field( const std::string& name); + std::vector> exprs( + const std::vector& names); + std::vector> fields( const std::vector& names); diff --git a/velox/vector/ComplexVector.h b/velox/vector/ComplexVector.h index 9dda7900018a4..6736c193d3424 100644 --- a/velox/vector/ComplexVector.h +++ b/velox/vector/ComplexVector.h @@ -32,6 +32,9 @@ namespace facebook::velox { using ChannelIndex = uint32_t; +constexpr ChannelIndex kConstantChannel = + std::numeric_limits::max(); + class RowVector : public BaseVector { public: RowVector(