Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable more combinations in join fuzzer #10676

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 56 additions & 3 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1093,10 +1093,50 @@ PlanNodePtr HashJoinNode::create(const folly::dynamic& obj, void* context) {
outputType);
}

MergeJoinNode::MergeJoinNode(
const PlanNodeId& id,
JoinType joinType,
const std::vector<FieldAccessTypedExprPtr>& leftKeys,
const std::vector<FieldAccessTypedExprPtr>& rightKeys,
TypedExprPtr filter,
PlanNodePtr left,
PlanNodePtr right,
RowTypePtr outputType)
: AbstractJoinNode(
id,
joinType,
leftKeys,
rightKeys,
std::move(filter),
std::move(left),
std::move(right),
std::move(outputType)) {
VELOX_USER_CHECK(
isSupported(joinType_),
"The join type is not supported by merge join: ",
joinTypeName(joinType_));
}

folly::dynamic MergeJoinNode::serialize() const {
return serializeBase();
}

// static
bool MergeJoinNode::isSupported(core::JoinType joinType) {
switch (joinType) {
case core::JoinType::kInner:
case core::JoinType::kLeft:
case core::JoinType::kRight:
case core::JoinType::kLeftSemiFilter:
case core::JoinType::kRightSemiFilter:
case core::JoinType::kAnti:
return true;

default:
return false;
}
}

// static
PlanNodePtr MergeJoinNode::create(const folly::dynamic& obj, void* context) {
auto sources = deserializeSources(obj, context);
Expand Down Expand Up @@ -1136,9 +1176,8 @@ NestedLoopJoinNode::NestedLoopJoinNode(
sources_({std::move(left), std::move(right)}),
outputType_(std::move(outputType)) {
VELOX_USER_CHECK(
core::isInnerJoin(joinType_) || core::isLeftJoin(joinType_) ||
core::isRightJoin(joinType_) || core::isFullJoin(joinType_),
"{} unsupported, NestedLoopJoin only supports inner and outer join",
isSupported(joinType_),
"The join type is not supported by nested loop join: ",
joinTypeName(joinType_));

auto leftType = sources_[0]->outputType();
Expand Down Expand Up @@ -1170,6 +1209,20 @@ NestedLoopJoinNode::NestedLoopJoinNode(
right,
outputType) {}

// static
bool NestedLoopJoinNode::isSupported(core::JoinType joinType) {
switch (joinType) {
case core::JoinType::kInner:
case core::JoinType::kLeft:
case core::JoinType::kRight:
case core::JoinType::kFull:
return true;

default:
return false;
}
}

void NestedLoopJoinNode::addDetails(std::stringstream& stream) const {
stream << joinTypeName(joinType_);
if (joinCondition_) {
Expand Down
23 changes: 9 additions & 14 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ namespace facebook::velox::core {

typedef std::string PlanNodeId;

/**
* Generic representation of InsertTable
*/
/// Generic representation of InsertTable
struct InsertTableHandle {
public:
InsertTableHandle(
Expand Down Expand Up @@ -1643,31 +1641,25 @@ class MergeJoinNode : public AbstractJoinNode {
TypedExprPtr filter,
PlanNodePtr left,
PlanNodePtr right,
RowTypePtr outputType)
: AbstractJoinNode(
id,
joinType,
leftKeys,
rightKeys,
std::move(filter),
std::move(left),
std::move(right),
std::move(outputType)) {}
RowTypePtr outputType);

std::string_view name() const override {
return "MergeJoin";
}

folly::dynamic serialize() const override;

/// If merge join supports this join type.
static bool isSupported(core::JoinType joinType);

static PlanNodePtr create(const folly::dynamic& obj, void* context);
};

/// Represents inner/outer nested loop joins. Translates to an
/// exec::NestedLoopJoinProbe and exec::NestedLoopJoinBuild. A separate pipeline
/// is produced for the build side when generating exec::Operators.
///
/// Nested loop join supports both equal and non-equal joins. Expressions
/// Nested loop join (NLJ) supports both equal and non-equal joins. Expressions
/// specified in joinCondition are evaluated on every combination of left/right
/// tuple, to emit result. Results are emitted following the same input order of
/// probe rows for inner and left joins, for each thread of execution.
Expand Down Expand Up @@ -1712,6 +1704,9 @@ class NestedLoopJoinNode : public PlanNode {

folly::dynamic serialize() const override;

/// If nested loop join supports this join type.
static bool isSupported(core::JoinType joinType);

static PlanNodePtr create(const folly::dynamic& obj, void* context);

private:
Expand Down
18 changes: 1 addition & 17 deletions velox/exec/MergeJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,11 @@ MergeJoin::MergeJoin(
numKeys_{joinNode->leftKeys().size()},
joinNode_(joinNode) {
VELOX_USER_CHECK(
isSupported(joinNode_->joinType()),
core::MergeJoinNode::isSupported(joinNode_->joinType()),
"The join type is not supported by merge join: ",
joinTypeName(joinNode_->joinType()));
}

// static
bool MergeJoin::isSupported(core::JoinType joinType) {
switch (joinType) {
case core::JoinType::kInner:
case core::JoinType::kLeft:
case core::JoinType::kRight:
case core::JoinType::kLeftSemiFilter:
case core::JoinType::kRightSemiFilter:
case core::JoinType::kAnti:
return true;

default:
return false;
}
}

void MergeJoin::initialize() {
Operator::initialize();
VELOX_CHECK_NOT_NULL(joinNode_);
Expand Down
3 changes: 0 additions & 3 deletions velox/exec/MergeJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ class MergeJoin : public Operator {
Operator::close();
}

/// If merge join supports this join type.
static bool isSupported(core::JoinType joinType);

private:
// Sets up 'filter_' and related member variables.
void initializeFilter(
Expand Down
13 changes: 5 additions & 8 deletions velox/exec/fuzzer/JoinFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/PartitionIdGenerator.h"
#include "velox/exec/MergeJoin.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/fuzzer/FuzzerUtil.h"
#include "velox/exec/fuzzer/ReferenceQueryRunner.h"
Expand Down Expand Up @@ -148,7 +147,7 @@ class JoinFuzzer {
const std::vector<std::string>& outputColumns);

// Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes.
// If withFilter is true, uses the equiality filter between probeKeys and
// If withFilter is true, uses the equality filter between probeKeys and
// buildKeys as the join filter. Uses empty join filter otherwise.
JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlan(
core::JoinType joinType,
Expand Down Expand Up @@ -860,7 +859,7 @@ void JoinFuzzer::makeAlternativePlans(
.planNode()});

// Use OrderBy + MergeJoin
if (exec::MergeJoin::isSupported(joinNode->joinType())) {
if (core::MergeJoinNode::isSupported(joinNode->joinType())) {
auto planWithSplits = makeMergeJoinPlan(
joinType, probeKeys, buildKeys, probeInput, buildInput, outputColumns);
plans.push_back(planWithSplits);
Expand All @@ -869,8 +868,7 @@ void JoinFuzzer::makeAlternativePlans(
}

// Use NestedLoopJoin.
if (joinNode->isInnerJoin() || joinNode->isLeftJoin() ||
joinNode->isFullJoin()) {
if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) {
auto planWithSplits = makeNestedLoopJoinPlan(
joinType, probeKeys, buildKeys, probeInput, buildInput, outputColumns);
plans.push_back(planWithSplits);
Expand Down Expand Up @@ -1285,7 +1283,7 @@ void JoinFuzzer::addPlansWithTableScan(
}

// Add ungrouped MergeJoin with TableScan.
if (joinNode->isInnerJoin() || joinNode->isLeftJoin()) {
if (core::MergeJoinNode::isSupported(joinNode->joinType())) {
auto planWithSplits = makeMergeJoinPlanWithTableScan(
joinType,
probeType,
Expand All @@ -1307,8 +1305,7 @@ void JoinFuzzer::addPlansWithTableScan(
}

// Add ungrouped NestedLoopJoin with TableScan.
if (joinNode->isInnerJoin() || joinNode->isLeftJoin() ||
joinNode->isFullJoin()) {
if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) {
auto planWithSplits = makeNestedLoopJoinPlanWithTableScan(
joinType,
probeType,
Expand Down
Loading