Skip to content

Commit

Permalink
Velox side Verax support (#1)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #1

X-link: facebookincubator/velox#11931

Reviewed By: pedroerp

Differential Revision: D67571079

Pulled By: oerling

fbshipit-source-id: f817e19d98ab80a64fbd3d26d33edd3027003a93
  • Loading branch information
Orri Erling authored and facebook-github-bot committed Jan 9, 2025
1 parent 539525d commit c1b7d28
Show file tree
Hide file tree
Showing 45 changed files with 11,677 additions and 0 deletions.
67 changes: 67 additions & 0 deletions verax/ArenaCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <vector>
#include "velox/common/base/BitUtil.h"
#include "velox/common/memory/HashStringAllocator.h"

namespace facebook::velox::optimizer {

/// Single threaded cache in front of HashStringAllocator free
/// list. Speeds up allocation and fre of plan candidates. This is
/// about 2x faster than uncached HashStringAllocator. Overall amounts
/// to ~2% of optimization time.
class ArenaCache {
static constexpr int32_t kMaxSize = 512;
static constexpr int32_t kGranularity = 16;

public:
explicit ArenaCache(velox::HashStringAllocator& allocator)
: allocator_(allocator), allocated_(kMaxSize / kGranularity) {}

void* allocate(size_t size) {
auto sizeClass = velox::bits::roundUp(size, kGranularity) / kGranularity;
if (sizeClass < kMaxSize / kGranularity) {
if (!allocated_[sizeClass].empty()) {
void* result = allocated_[sizeClass].back();
allocated_[sizeClass].pop_back();
totalSize_ -= sizeClass;
return result;
}
}
return allocator_.allocate(sizeClass * kGranularity)->begin();
}

void free(void* ptr) {
auto header = velox::HashStringAllocator::headerOf(ptr);
int32_t sizeClass = header->size() / kGranularity;
if (sizeClass < kMaxSize / kGranularity) {
totalSize_ += sizeClass;
allocated_[sizeClass].push_back(ptr);
return;
}
allocator_.free(header);
}

private:
velox::HashStringAllocator& allocator_;
std::vector<std::vector<void*>> allocated_;
uint64_t totalSize_{0};
};

} // namespace facebook::velox::optimizer
38 changes: 38 additions & 0 deletions verax/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_subdirectory(tests)

add_subdirectory(connectors)

add_library(
velox_verax
ToGraph.cpp
Plan.cpp
PlanObject.cpp
Schema.cpp
SchemaResolver.cpp
QueryGraph.cpp
QueryGraphContext.cpp
Filters.cpp
Cost.cpp
PlanUtils.cpp
RelationOp.cpp
ToVelox.cpp
VeloxHistory.cpp)

add_dependencies(velox_verax velox_hive_connector)

target_link_libraries(velox_verax velox_core velox_connector_metadata
velox_multifragment_plan velox_connector)
182 changes: 182 additions & 0 deletions verax/Cost.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "optimizer/Cost.h" //@manual
#include "optimizer/Plan.h" //@manual
#include "optimizer/PlanUtils.h" //@manual

namespace facebook::velox::optimizer {

using namespace facebook::velox;

// Collection of per operation costs for a target system. The base
// unit is the time to memcpy a cache line in a large memcpy on one
// core. This is ~6GB/s, so ~10ns. Other times are expressed as
// multiples of that.
struct Costs {
static float byteShuffleCost() {
return 12; // ~500MB/s
}

static float hashProbeCost(float cardinality) {
return cardinality < 10000 ? kArrayProbeCost
: cardinality < 500000 ? kSmallHashCost
: kLargeHashCost;
}

static constexpr float kKeyCompareCost =
6; // ~30 instructions to find, decode and an compare
static constexpr float kArrayProbeCost = 2; // ~10 instructions.
static constexpr float kSmallHashCost = 10; // 50 instructions
static constexpr float kLargeHashCost = 40; // 2 LLC misses
static constexpr float kColumnRowCost = 5;
static constexpr float kColumnByteCost = 0.1;

// Cost of hash function on one column.
static constexpr float kHashColumnCost = 0.5;

// Cost of getting a column from a hash table
static constexpr float kHashExtractColumnCost = 0.5;

// Minimal cost of calling a filter function, e.g. comparing two numeric
// exprss.
static constexpr float kMinimumFilterCost = 2;
};

void RelationOp::setCost(const PlanState& state) {
cost_.inputCardinality = state.cost.fanout;
}

float ColumnGroup::lookupCost(float range) const {
// Add 2 because it takes a compare and access also if hitting the
// same row. log(1) == 0, so this would other wise be zero cost.
return Costs::kKeyCompareCost * log(range + 2) / log(2);
}

float orderPrefixDistance(
RelationOpPtr input,
ColumnGroupP index,
const ExprVector& keys) {
int32_t i = 0;
float selection = 1;
for (; i < input->distribution().order.size() &&
i < index->distribution().order.size() && i < keys.size();
++i) {
if (input->distribution().order[i]->sameOrEqual(*keys[i])) {
selection *= index->distribution().order[i]->value().cardinality;
}
}
return selection;
}

void TableScan::setCost(const PlanState& input) {
RelationOp::setCost(input);
float size = byteSize(columns_);
if (!keys.empty()) {
float lookupRange(index->distribution().cardinality);
float orderSelectivity = orderPrefixDistance(this->input(), index, keys);
auto distance = lookupRange / std::max<float>(1, orderSelectivity);
float batchSize = std::min<float>(cost_.inputCardinality, 10000);
if (orderSelectivity == 1) {
// The data does not come in key order.
float batchCost = index->lookupCost(lookupRange) +
index->lookupCost(lookupRange / batchSize) *
std::max<float>(1, batchSize);
cost_.unitCost = batchCost / batchSize;
} else {
float batchCost = index->lookupCost(lookupRange) +
index->lookupCost(distance) * std::max<float>(1, batchSize);
cost_.unitCost = batchCost / batchSize;
}
return;
} else {
cost_.fanout =
index->distribution().cardinality * baseTable->filterSelectivity;
}
auto numColumns = columns_.size();
auto rowCost = numColumns * Costs::kColumnRowCost +
std::max<float>(0, size - 8 * numColumns) * Costs::kColumnByteCost;
cost_.unitCost += cost_.fanout * rowCost;
}

void Aggregation::setCost(const PlanState& input) {
RelationOp::setCost(input);
float cardinality = 1;
for (auto key : grouping) {
cardinality *= key->value().cardinality;
}
// The estimated output is input minus the times an input is a
// duplicate of a key already in the input. The cardinality of the
// result is (d - d * 1 - (1 / d))^n. where d is the number of
// potentially distinct keys and n is the number of elements in the
// input. This approaches d as n goes to infinity. The chance of one in d
// being unique after n values is 1 - (1/d)^n.
auto nOut = cardinality -
cardinality * pow(1.0 - (1.0 / cardinality), cost_.inputCardinality);
cost_.fanout = nOut / cost_.inputCardinality;
cost_.unitCost = grouping.size() * Costs::hashProbeCost(nOut);
float rowBytes = byteSize(grouping) + byteSize(aggregates);
cost_.totalBytes = nOut * rowBytes;
}

template <typename V>
std::pair<float, float> shuffleCostV(const V& columns) {
float size = byteSize(columns);
return {size * Costs::byteShuffleCost(), size};
}

float shuffleCost(const ColumnVector& columns) {
return shuffleCostV(columns).second;
}

float shuffleCost(const ExprVector& columns) {
return shuffleCostV(columns).second;
}

void Repartition::setCost(const PlanState& input) {
RelationOp::setCost(input);
auto pair = shuffleCostV(columns_);
cost_.unitCost = pair.second;
cost_.totalBytes = cost_.inputCardinality * pair.first;
}

void HashBuild::setCost(const PlanState& input) {
RelationOp::setCost(input);
cost_.unitCost = keys.size() * Costs::kHashColumnCost +
Costs::hashProbeCost(cost_.inputCardinality) +
this->input()->columns().size() * Costs::kHashExtractColumnCost * 2;
cost_.totalBytes =
cost_.inputCardinality * byteSize(this->input()->columns());
}

void Join::setCost(const PlanState& input) {
RelationOp::setCost(input);
float buildSize = right->cost().inputCardinality;
auto rowCost =
right->input()->columns().size() * Costs::kHashExtractColumnCost;
cost_.unitCost = Costs::hashProbeCost(buildSize) + cost_.fanout * rowCost +
leftKeys.size() * Costs::kHashColumnCost;
}

void Filter::setCost(const PlanState& /*input*/) {
cost_.unitCost = Costs::kMinimumFilterCost * exprs_.size();
// We assume each filter selects 4/5. Small effect makes it so
// join and scan selectivities that are better known have more
// influence on plan cardinality. To be filled in from history.
cost_.fanout = pow(0.8, exprs_.size());
}

} // namespace facebook::velox::optimizer
74 changes: 74 additions & 0 deletions verax/Cost.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "optimizer/RelationOp.h" //@manual

namespace facebook::velox::optimizer {

/// Interface to historical query cost and cardinality
/// information. There is one long lived instance per
/// process. Public functions are thread safe since multiple
/// concurrent Optimizations may access and update the same History.
class History {
public:
virtual ~History() = default;

/// Returns a historical cost for 'op' or nullopt ir if no data. Matches the
/// full 'op' tree to history. The exactness of the match depends on the
/// History implementation. Implementations may extrapolate from approximate
/// matches. A Cost from this will be used over a non-historical cost if
/// available. Not const since some extrapolated data can be kept.
virtual std::optional<Cost> findCost(RelationOp& op) = 0;

/// Records that the cost and cardinality of 'op' was 'cost' as observed from
/// execution.
virtual void recordCost(const RelationOp& op, Cost cost) = 0;

/// Sets 'filterSelectivity' of 'baseTable' from historical data. Considers
/// filters only and does not return a cost since the cost depends on the
/// columns extracted. This is used first for coming up with join orders. The
/// plan candidates are then made and findCost() is used to access historical
/// cost and plan cardinality.
virtual bool setLeafSelectivity(BaseTable& baseTable) = 0;

virtual void recordLeafSelectivity(
const std::string& handle,
float selectivity,
bool overwrite = true) {
std::lock_guard<std::mutex> l(mutex_);
if (!overwrite &&
leafSelectivities_.find(handle) != leafSelectivities_.end()) {
return;
}
leafSelectivities_[handle] = selectivity;
}

protected:
// serializes access to all data members.
std::mutex mutex_;

/// Memo for selectivity keyed on ConnectorTableHandle::toString().
/// Values between 0 and 1.
std::unordered_map<std::string, float> leafSelectivities_;
};

float shuffleCost(const ColumnVector& columns);

float shuffleCost(const ExprVector& columns);

} // namespace facebook::velox::optimizer
29 changes: 29 additions & 0 deletions verax/Filters.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "optimizer/Plan.h" //@manual
#include "optimizer/PlanUtils.h" //@manual
#include "optimizer/QueryGraph.h" //@manual
#include "velox/common/base/SimdUtil.h"
#include "velox/common/base/SuccinctPrinter.h"

namespace facebook::velox::optimizer {

Cost filterCost(CPSpan<Expr> conjuncts) {
return Cost();
}

} // namespace facebook::velox::optimizer
Loading

0 comments on commit c1b7d28

Please sign in to comment.