Skip to content

Commit

Permalink
Refactored join conditions in CBO (ydb-platform#10366)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelvelikhov authored Oct 14, 2024
1 parent 2baa7c5 commit 5a2282b
Show file tree
Hide file tree
Showing 19 changed files with 260 additions and 243 deletions.
28 changes: 13 additions & 15 deletions ydb/core/kqp/opt/logical/kqp_opt_cbo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ TMaybeNode<TKqlKeyInc> GetRightTableKeyPrefix(const TKqlKeyRange& range) {
/**
* KQP specific rule to check if a LookupJoin is applicable
*/
bool IsLookupJoinApplicableDetailed(const std::shared_ptr<NYql::TRelOptimizerNode>& node, const TVector<TString>& joinColumns, const TKqpProviderContext& ctx) {
bool IsLookupJoinApplicableDetailed(const std::shared_ptr<NYql::TRelOptimizerNode>& node, const TVector<TJoinColumn>& joinColumns, const TKqpProviderContext& ctx) {

auto rel = std::static_pointer_cast<TKqpRelOptimizerNode>(node);
auto expr = TExprBase(rel->Node);
Expand All @@ -45,7 +45,7 @@ bool IsLookupJoinApplicableDetailed(const std::shared_ptr<NYql::TRelOptimizerNod
return false;
}

if (find_if(joinColumns.begin(), joinColumns.end(), [&] (const TString& s) { return node->Stats->KeyColumns->Data[0] == s;}) != joinColumns.end()) {
if (std::find_if(joinColumns.begin(), joinColumns.end(), [&] (const TJoinColumn& c) { return node->Stats->KeyColumns->Data[0] == c.AttributeName;}) != joinColumns.end()) {
return true;
}

Expand Down Expand Up @@ -97,8 +97,8 @@ bool IsLookupJoinApplicableDetailed(const std::shared_ptr<NYql::TRelOptimizerNod
return false;
}

if (prefixSize < node->Stats->KeyColumns->Data.size() && (find_if(joinColumns.begin(), joinColumns.end(), [&] (const TString& s) {
return node->Stats->KeyColumns->Data[prefixSize] == s;
if (prefixSize < node->Stats->KeyColumns->Data.size() && (std::find_if(joinColumns.begin(), joinColumns.end(), [&] (const TJoinColumn& c) {
return node->Stats->KeyColumns->Data[prefixSize] == c.AttributeName;
}) == joinColumns.end())){
return false;
}
Expand All @@ -108,12 +108,11 @@ bool IsLookupJoinApplicableDetailed(const std::shared_ptr<NYql::TRelOptimizerNod

bool IsLookupJoinApplicable(std::shared_ptr<IBaseOptimizerNode> left,
std::shared_ptr<IBaseOptimizerNode> right,
const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
const TVector<TString>& leftJoinKeys,
const TVector<TString>& rightJoinKeys,
const TVector<TJoinColumn>& leftJoinKeys,
const TVector<TJoinColumn>& rightJoinKeys,
TKqpProviderContext& ctx
) {
Y_UNUSED(left, joinConditions, leftJoinKeys);
Y_UNUSED(left, leftJoinKeys);

if (!(right->Stats->StorageType == EStorageType::RowStorage)) {
return false;
Expand All @@ -130,7 +129,7 @@ bool IsLookupJoinApplicable(std::shared_ptr<IBaseOptimizerNode> left,
}

for (auto rightCol : rightJoinKeys) {
if (std::find(rightStats->KeyColumns->Data.begin(), rightStats->KeyColumns->Data.end(), rightCol) == rightStats->KeyColumns->Data.end()) {
if (find(rightStats->KeyColumns->Data.begin(), rightStats->KeyColumns->Data.end(), rightCol.AttributeName) == rightStats->KeyColumns->Data.end()) {
return false;
}
}
Expand All @@ -142,18 +141,17 @@ bool IsLookupJoinApplicable(std::shared_ptr<IBaseOptimizerNode> left,

bool TKqpProviderContext::IsJoinApplicable(const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
const TVector<TString>& leftJoinKeys,
const TVector<TString>& rightJoinKeys,
const TVector<TJoinColumn>& leftJoinKeys,
const TVector<TJoinColumn>& rightJoinKeys,
EJoinAlgoType joinAlgo,
EJoinKind joinKind) {
EJoinKind joinKind) {

switch( joinAlgo ) {
case EJoinAlgoType::LookupJoin:
if ((OptLevel != 3) && (left->Stats->Nrows > 1000)) {
return false;
}
return IsLookupJoinApplicable(left, right, joinConditions, leftJoinKeys, rightJoinKeys, *this);
return IsLookupJoinApplicable(left, right, leftJoinKeys, rightJoinKeys, *this);

case EJoinAlgoType::LookupJoinReverse:
if (joinKind != EJoinKind::LeftSemi) {
Expand All @@ -162,7 +160,7 @@ bool TKqpProviderContext::IsJoinApplicable(const std::shared_ptr<IBaseOptimizerN
if ((OptLevel != 3) && (right->Stats->Nrows > 1000)) {
return false;
}
return IsLookupJoinApplicable(right, left, joinConditions, rightJoinKeys, leftJoinKeys, *this);
return IsLookupJoinApplicable(right, left, rightJoinKeys, leftJoinKeys, *this);

case EJoinAlgoType::MapJoin:
return joinKind != EJoinKind::OuterJoin && joinKind != EJoinKind::Exclusion && right->Stats->ByteSize < 1e6;
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/opt/logical/kqp_opt_cbo.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ struct TKqpProviderContext : public NYql::TBaseProviderContext {

virtual bool IsJoinApplicable(const std::shared_ptr<NYql::IBaseOptimizerNode>& left,
const std::shared_ptr<NYql::IBaseOptimizerNode>& right,
const std::set<std::pair<NYql::NDq::TJoinColumn, NYql::NDq::TJoinColumn>>& joinConditions,
const TVector<TString>& leftJoinKeys, const TVector<TString>& rightJoinKeys,
const TVector<NYql::NDq::TJoinColumn>& leftJoinKeys, const TVector<NYql::NDq::TJoinColumn>& rightJoinKeys,
NYql::EJoinAlgoType joinAlgo, NYql::EJoinKind joinKind) override;

virtual double ComputeJoinCost(const NYql::TOptimizerStatistics& leftStats, const NYql::TOptimizerStatistics& rightStats, const double outputRows, const double outputByteSize, NYql::EJoinAlgoType joinAlgo) const override;
Expand Down
58 changes: 18 additions & 40 deletions ydb/library/yql/core/cbo/cbo_optimizer_new.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ void TRelOptimizerNode::Print(std::stringstream& stream, int ntabs) {
TJoinOptimizerNode::TJoinOptimizerNode(
const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
TVector<TJoinColumn> leftKeys,
TVector<TJoinColumn> rightKeys,
const EJoinKind joinType,
const EJoinAlgoType joinAlgo,
bool leftAny,
Expand All @@ -86,18 +87,14 @@ TJoinOptimizerNode::TJoinOptimizerNode(
) : IBaseOptimizerNode(JoinNodeType)
, LeftArg(left)
, RightArg(right)
, JoinConditions(joinConditions)
, LeftJoinKeys(leftKeys)
, RightJoinKeys(rightKeys)
, JoinType(joinType)
, JoinAlgo(joinAlgo)
, LeftAny(leftAny)
, RightAny(rightAny)
, IsReorderable(!nonReorderable)
{
for (const auto& [l,r] : joinConditions ) {
LeftJoinKeys.push_back(l.AttributeName);
RightJoinKeys.push_back(r.AttributeName);
}
}
{}

TVector<TString> TJoinOptimizerNode::Labels() {
auto res = LeftArg->Labels();
Expand All @@ -120,10 +117,10 @@ void TJoinOptimizerNode::Print(std::stringstream& stream, int ntabs) {
}
stream << ") ";

for (auto c : JoinConditions){
stream << c.first.RelName << "." << c.first.AttributeName
<< "=" << c.second.RelName << "."
<< c.second.AttributeName << ",";
for (size_t i=0; i<LeftJoinKeys.size(); i++){
stream << LeftJoinKeys[i].RelName << "." << LeftJoinKeys[i].AttributeName
<< "=" << RightJoinKeys[i].RelName << "."
<< RightJoinKeys[i].AttributeName << ",";
}
stream << "\n";

Expand All @@ -138,13 +135,14 @@ void TJoinOptimizerNode::Print(std::stringstream& stream, int ntabs) {
RightArg->Print(stream, ntabs+1);
}

bool IsPKJoin(const TOptimizerStatistics& stats, const TVector<TString>& joinKeys) {
bool IsPKJoin(const TOptimizerStatistics& stats, const TVector<TJoinColumn>& joinKeys) {
if (!stats.KeyColumns) {
return false;
}

for(size_t i = 0; i < stats.KeyColumns->Data.size(); i++){
if (std::find(joinKeys.begin(), joinKeys.end(), stats.KeyColumns->Data[i]) == joinKeys.end()) {
if (std::find_if(joinKeys.begin(), joinKeys.end(),
[&] (const TJoinColumn& c) { return c.AttributeName == stats.KeyColumns->Data[i];}) == joinKeys.end()) {
return false;
}
}
Expand All @@ -153,15 +151,13 @@ bool IsPKJoin(const TOptimizerStatistics& stats, const TVector<TString>& joinKey

bool TBaseProviderContext::IsJoinApplicable(const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
const TVector<TString>& leftJoinKeys,
const TVector<TString>& rightJoinKeys,
const TVector<TJoinColumn>& leftJoinKeys,
const TVector<TJoinColumn>& rightJoinKeys,
EJoinAlgoType joinAlgo,
EJoinKind joinKind) {

Y_UNUSED(left);
Y_UNUSED(right);
Y_UNUSED(joinConditions);
Y_UNUSED(leftJoinKeys);
Y_UNUSED(rightJoinKeys);
Y_UNUSED(joinKind);
Expand All @@ -182,30 +178,12 @@ double TBaseProviderContext::ComputeJoinCost(const TOptimizerStatistics& leftSta
*
* The build is on the right side, so we make the build side a bit more expensive than the probe
*/
TOptimizerStatistics TBaseProviderContext::ComputeJoinStats(
const TOptimizerStatistics& leftStats,
const TOptimizerStatistics& rightStats,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
EJoinAlgoType joinAlgo,
EJoinKind joinKind,
TCardinalityHints::TCardinalityHint* maybeHint) const
{
TVector<TString> leftJoinKeys;
TVector<TString> rightJoinKeys;

for (auto c : joinConditions) {
leftJoinKeys.emplace_back(c.first.AttributeName);
rightJoinKeys.emplace_back(c.second.AttributeName);
}

return ComputeJoinStats(leftStats, rightStats, leftJoinKeys, rightJoinKeys, joinAlgo, joinKind, maybeHint);
}

TOptimizerStatistics TBaseProviderContext::ComputeJoinStats(
const TOptimizerStatistics& leftStats,
const TOptimizerStatistics& rightStats,
const TVector<TString>& leftJoinKeys,
const TVector<TString>& rightJoinKeys,
const TVector<TJoinColumn>& leftJoinKeys,
const TVector<TJoinColumn>& rightJoinKeys,
EJoinAlgoType joinAlgo,
EJoinKind joinKind,
TCardinalityHints::TCardinalityHint* maybeHint) const
Expand Down Expand Up @@ -265,9 +243,9 @@ TOptimizerStatistics TBaseProviderContext::ComputeJoinStats(
std::optional<double> lhsUniqueVals;
std::optional<double> rhsUniqueVals;
if (leftStats.ColumnStatistics && rightStats.ColumnStatistics && !leftJoinKeys.empty() && !rightJoinKeys.empty()) {
auto lhs = leftJoinKeys[0];
auto lhs = leftJoinKeys[0].AttributeName;
lhsUniqueVals = leftStats.ColumnStatistics->Data[lhs].NumUniqueVals;
auto rhs = rightJoinKeys[0];
auto rhs = rightJoinKeys[0].AttributeName;
rightStats.ColumnStatistics->Data[rhs];
rhsUniqueVals = leftStats.ColumnStatistics->Data[lhs].NumUniqueVals;
}
Expand Down
49 changes: 16 additions & 33 deletions ydb/library/yql/core/cbo/cbo_optimizer_new.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,27 +201,18 @@ struct IProviderContext {
virtual TOptimizerStatistics ComputeJoinStats(
const TOptimizerStatistics& leftStats,
const TOptimizerStatistics& rightStats,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
EJoinAlgoType joinAlgo,
EJoinKind joinKind,
TCardinalityHints::TCardinalityHint* maybeHint = nullptr) const = 0;

virtual TOptimizerStatistics ComputeJoinStats(
const TOptimizerStatistics& leftStats,
const TOptimizerStatistics& rightStats,
const TVector<TString>& leftJoinKeys,
const TVector<TString>& rightJoinKeys,
const TVector<NDq::TJoinColumn>& leftJoinKeys,
const TVector<NDq::TJoinColumn>& rightJoinKeys,
EJoinAlgoType joinAlgo,
EJoinKind joinKind,
TCardinalityHints::TCardinalityHint* maybeHint = nullptr) const = 0;

virtual bool IsJoinApplicable(const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
const TVector<TString>& leftJoinKeys,
const TVector<TString>& rightJoinKeys,
const TVector<NDq::TJoinColumn>& leftJoinKeys,
const TVector<NDq::TJoinColumn>& rightJoinKeys,
EJoinAlgoType joinAlgo,
EJoinKind joinKind) = 0;
EJoinKind joinKin) = 0;
};

/**
Expand All @@ -233,27 +224,19 @@ struct TBaseProviderContext : public IProviderContext {

double ComputeJoinCost(const TOptimizerStatistics& leftStats, const TOptimizerStatistics& rightStats, const double outputRows, const double outputByteSize, EJoinAlgoType joinAlgo) const override;

bool IsJoinApplicable(const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
const TVector<TString>& leftJoinKeys,
const TVector<TString>& rightJoinKeys,
bool IsJoinApplicable(
const std::shared_ptr<IBaseOptimizerNode>& leftStats,
const std::shared_ptr<IBaseOptimizerNode>& rightStats,
const TVector<NDq::TJoinColumn>& leftJoinKeys,
const TVector<NDq::TJoinColumn>& rightJoinKeys,
EJoinAlgoType joinAlgo,
EJoinKind joinKind) override;

virtual TOptimizerStatistics ComputeJoinStats(
const TOptimizerStatistics& leftStats,
const TOptimizerStatistics& rightStats,
const TVector<TString>& leftJoinKeys,
const TVector<TString>& rightJoinKeys,
EJoinAlgoType joinAlgo,
EJoinKind joinKind,
TCardinalityHints::TCardinalityHint* maybeHint = nullptr) const override;

virtual TOptimizerStatistics ComputeJoinStats(
const TOptimizerStatistics& leftStats,
const TOptimizerStatistics& rightStats,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
const TVector<NDq::TJoinColumn>& leftJoinKeys,
const TVector<NDq::TJoinColumn>& rightJoinKeys,
EJoinAlgoType joinAlgo,
EJoinKind joinKind,
TCardinalityHints::TCardinalityHint* maybeHint = nullptr) const override;
Expand Down Expand Up @@ -290,9 +273,8 @@ struct TRelOptimizerNode : public IBaseOptimizerNode {
struct TJoinOptimizerNode : public IBaseOptimizerNode {
std::shared_ptr<IBaseOptimizerNode> LeftArg;
std::shared_ptr<IBaseOptimizerNode> RightArg;
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>> JoinConditions;
TVector<TString> LeftJoinKeys;
TVector<TString> RightJoinKeys;
TVector<NDq::TJoinColumn> LeftJoinKeys;
TVector<NDq::TJoinColumn> RightJoinKeys;
EJoinKind JoinType;
EJoinAlgoType JoinAlgo;
/////////////////// 'ANY' flag means leaving only one row from the join side.
Expand All @@ -303,7 +285,8 @@ struct TJoinOptimizerNode : public IBaseOptimizerNode {

TJoinOptimizerNode(const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
TVector<NDq::TJoinColumn> leftKeys,
TVector<NDq::TJoinColumn> rightKeys,
const EJoinKind joinType,
const EJoinAlgoType joinAlgo,
bool leftAny,
Expand Down
9 changes: 7 additions & 2 deletions ydb/library/yql/core/yql_cost_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ namespace NDq {
struct TJoinColumn {
TString RelName;
TString AttributeName;
TString AttributeNameWithAliases;
ui32 EquivalenceClass = 0;
bool IsConstant = false;

TJoinColumn(TString relName, TString attributeName) : RelName(relName),
AttributeName(std::move(attributeName)) {}
TJoinColumn(TString relName, TString attributeName) :
RelName(relName),
AttributeName(attributeName),
AttributeNameWithAliases(attributeName) {}

bool operator == (const TJoinColumn& other) const {
return RelName == other.RelName && AttributeName == other.AttributeName;
Expand Down
31 changes: 14 additions & 17 deletions ydb/library/yql/dq/opt/dq_cbo_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,14 @@ Y_UNIT_TEST(JoinSearch2Rels) {
auto rel2 = std::make_shared<TRelOptimizerNode>("b",
std::make_shared<TOptimizerStatistics>(BaseTable, 1000000, 1, 0, 9000009));

std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>> joinConditions;
joinConditions.insert({
NDq::TJoinColumn("a", "1"),
NDq::TJoinColumn("b", "1")
});
TVector<NDq::TJoinColumn> leftKeys = {NDq::TJoinColumn("a", "1")};
TVector<NDq::TJoinColumn> rightKeys ={NDq::TJoinColumn("b", "1")};

auto op = std::make_shared<TJoinOptimizerNode>(
std::static_pointer_cast<IBaseOptimizerNode>(rel1),
std::static_pointer_cast<IBaseOptimizerNode>(rel2),
joinConditions,
leftKeys,
rightKeys,
InnerJoin,
EJoinAlgoType::GraceJoin,
true,
Expand Down Expand Up @@ -86,30 +85,28 @@ Y_UNIT_TEST(JoinSearch3Rels) {
auto rel3 = std::make_shared<TRelOptimizerNode>("c",
std::make_shared<TOptimizerStatistics>(BaseTable, 10000, 1, 0, 9009));

std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>> joinConditions;
joinConditions.insert({
NDq::TJoinColumn("a", "1"),
NDq::TJoinColumn("b", "1")
});
TVector<NDq::TJoinColumn> leftKeys = {NDq::TJoinColumn("a", "1")};
TVector<NDq::TJoinColumn> rightKeys ={NDq::TJoinColumn("b", "1")};

auto op1 = std::make_shared<TJoinOptimizerNode>(
std::static_pointer_cast<IBaseOptimizerNode>(rel1),
std::static_pointer_cast<IBaseOptimizerNode>(rel2),
joinConditions,
leftKeys,
rightKeys,
InnerJoin,
EJoinAlgoType::GraceJoin,
false,
false
);

joinConditions.insert({
NDq::TJoinColumn("a", "1"),
NDq::TJoinColumn("c", "1")
});
leftKeys.push_back(NDq::TJoinColumn("a", "1"));
rightKeys.push_back(NDq::TJoinColumn("c", "1"));

auto op2 = std::make_shared<TJoinOptimizerNode>(
std::static_pointer_cast<IBaseOptimizerNode>(op1),
std::static_pointer_cast<IBaseOptimizerNode>(rel3),
joinConditions,
leftKeys,
rightKeys,
InnerJoin,
EJoinAlgoType::GraceJoin,
true,
Expand Down
Loading

0 comments on commit 5a2282b

Please sign in to comment.