Skip to content

Commit

Permalink
Merge 1e90e8c into cdedad0
Browse files Browse the repository at this point in the history
  • Loading branch information
Hor911 authored Jun 6, 2024
2 parents cdedad0 + 1e90e8c commit 9692293
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 39 deletions.
11 changes: 4 additions & 7 deletions ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ using TStatus = IGraphTransformer::TStatus;

class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
public:
TKqpPhysicalOptTransformer(TTypeAnnotationContext& typesCtx, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, const TKikimrConfiguration::TPtr& config)
TKqpPhysicalOptTransformer(TTypeAnnotationContext& typesCtx, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx)
: TOptimizeTransformerBase(nullptr, NYql::NLog::EComponent::ProviderKqp, {})
, TypesCtx(typesCtx)
, KqpCtx(*kqpCtx)
, Config(config)
{
#define HNDL(name) "KqpPhysical-"#name, Hndl(&TKqpPhysicalOptTransformer::name)
AddHandler(0, &TDqSourceWrap::Match, HNDL(BuildStageWithSourceWrap));
Expand Down Expand Up @@ -430,9 +429,8 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
// TODO: Allow push to left stage for data queries.
// It is now possible as we don't use datashard transactions for reads in data queries.
bool pushLeftStage = !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node);
bool useCBO = Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel) == 3;
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), useCBO
pushLeftStage, KqpCtx.Config->GetHashJoinMode()
);
DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx);
return output;
Expand Down Expand Up @@ -595,13 +593,12 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
private:
TTypeAnnotationContext& TypesCtx;
const TKqpOptimizeContext& KqpCtx;
const TKikimrConfiguration::TPtr& Config;
};

TAutoPtr<IGraphTransformer> CreateKqpPhyOptTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
NYql::TTypeAnnotationContext& typesCtx, const TKikimrConfiguration::TPtr& config)
NYql::TTypeAnnotationContext& typesCtx, const TKikimrConfiguration::TPtr&)
{
return THolder<IGraphTransformer>(new TKqpPhysicalOptTransformer(typesCtx, kqpCtx, config));
return THolder<IGraphTransformer>(new TKqpPhysicalOptTransformer(typesCtx, kqpCtx));
}

} // namespace NKikimr::NKqp::NOpt
25 changes: 8 additions & 17 deletions ydb/library/yql/dq/opt/dq_opt_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ TMaybe<TJoinInputDesc> BuildDqJoin(const TCoEquiJoinTuple& joinTuple,
auto options = joinTuple.Options();
auto linkSettings = GetEquiJoinLinkSettings(options.Ref());
YQL_ENSURE(linkSettings.JoinAlgo != EJoinAlgoType::StreamLookupJoin || typeCtx.StreamLookupJoin, "Unsupported join strategy: streamlookup");

if (linkSettings.JoinAlgo == EJoinAlgoType::MapJoin) {
mode = EHashJoinMode::Map;
} else if (linkSettings.JoinAlgo == EJoinAlgoType::GraceJoin) {
mode = EHashJoinMode::GraceAndSelf;
}

bool leftAny = linkSettings.LeftHints.contains("any");
bool rightAny = linkSettings.RightHints.contains("any");

Expand Down Expand Up @@ -1119,26 +1126,10 @@ TExprNode::TPtr ReplaceJoinOnSide(TExprNode::TPtr&& input, const TTypeAnnotation

}

TExprBase DqBuildHashJoin(const TDqJoin& join, EHashJoinMode mode, bool useCBO, TExprContext& ctx, IOptimizationContext& optCtx) {
TExprBase DqBuildHashJoin(const TDqJoin& join, EHashJoinMode mode, TExprContext& ctx, IOptimizationContext& optCtx) {
const auto joinType = join.JoinType().Value();
YQL_ENSURE(joinType != "Cross"sv);

if (useCBO) {
auto joinAlgo = FromString<EJoinAlgoType>(join.JoinAlgo().StringValue());
switch (joinAlgo) {
case EJoinAlgoType::LookupJoin:
case EJoinAlgoType::MapJoin:
mode = EHashJoinMode::Map;
break;
case EJoinAlgoType::GraceJoin:
mode = EHashJoinMode::GraceAndSelf;
break;
default:
break;
}

}

const auto leftIn = join.LeftInput().Cast<TDqCnUnionAll>().Output();
const auto rightIn = join.RightInput().Cast<TDqCnUnionAll>().Output();

Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/opt/dq_opt_join.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode
NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx);

NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx,
IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin = EHashJoinMode::Off, bool useCBO = false);
IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin = EHashJoinMode::Off);

NNodes::TExprBase DqBuildHashJoin(const NNodes::TDqJoin& join, EHashJoinMode mode, bool useCBO, TExprContext& ctx, IOptimizationContext& optCtx);
NNodes::TExprBase DqBuildHashJoin(const NNodes::TDqJoin& join, EHashJoinMode mode, TExprContext& ctx, IOptimizationContext& optCtx);

NNodes::TExprBase DqBuildJoinDict(const NNodes::TDqJoin& join, TExprContext& ctx);

Expand Down
21 changes: 9 additions & 12 deletions ydb/library/yql/dq/opt/dq_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2655,7 +2655,7 @@ TMaybeNode<TDqJoin> DqFlipJoin(const TDqJoin& join, TExprContext& ctx) {


TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin, bool useCBO)
const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin)
{
if (!node.Maybe<TDqJoin>()) {
return node;
Expand All @@ -2666,21 +2666,18 @@ TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationCon
const bool leftIsUnionAll = join.LeftInput().Maybe<TDqCnUnionAll>().IsValid();
const bool rightIsUnionAll = join.RightInput().Maybe<TDqCnUnionAll>().IsValid();

auto joinAlgo = FromString<EJoinAlgoType>(join.JoinAlgo().StringValue());
if (joinAlgo == EJoinAlgoType::MapJoin) {
hashJoin = EHashJoinMode::Map;
} else if (joinAlgo == EJoinAlgoType::GraceJoin) {
hashJoin = EHashJoinMode::GraceAndSelf;
}

bool useHashJoin = EHashJoinMode::Off != hashJoin
&& joinType != "Cross"sv
&& leftIsUnionAll
&& rightIsUnionAll;

if (useCBO) {
auto joinAlgo = FromString<EJoinAlgoType>(join.JoinAlgo().StringValue());
if (joinAlgo == EJoinAlgoType::MapJoin || joinAlgo == EJoinAlgoType::GraceJoin) {
useHashJoin = joinType != "Cross"sv && leftIsUnionAll && rightIsUnionAll;
}
else {
useHashJoin = false;
}
}

if (DqValidateJoinInputs(join.LeftInput(), join.RightInput(), parentsMap, allowStageMultiUsage)) {
// pass
} else if (DqValidateJoinInputs(join.RightInput(), join.LeftInput(), parentsMap, allowStageMultiUsage)) {
Expand All @@ -2696,7 +2693,7 @@ TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationCon
}

if (useHashJoin) {
return DqBuildHashJoin(join, hashJoin, useCBO, ctx, optCtx);
return DqBuildHashJoin(join, hashJoin, ctx, optCtx);
}

if (joinType == "Full"sv || joinType == "Exclusion"sv) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/sql/v1/join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,10 @@ class TEquiJoin: public TJoinBase {
linkOptions = L(linkOptions, Q(Y(Q("forceSortedMerge"))));
} else if (TJoinLinkSettings::EStrategy::StreamLookup == descr.LinkSettings.Strategy) {
linkOptions = L(linkOptions, Q(Y(Q("forceStreamLookup"))));
} else if (TJoinLinkSettings::EStrategy::ForceMap == descr.LinkSettings.Strategy) {
linkOptions = L(linkOptions, Q(Y(Q("join_algo"), Q("MapJoin"))));
} else if (TJoinLinkSettings::EStrategy::ForceGrace == descr.LinkSettings.Strategy) {
linkOptions = L(linkOptions, Q(Y(Q("join_algo"), Q("GraceJoin"))));
}
if (leftAny) {
linkOptions = L(linkOptions, Q(Y(Q("left"), Q("any"))));
Expand Down
4 changes: 3 additions & 1 deletion ydb/library/yql/sql/v1/source.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ namespace NSQLTranslationV1 {
enum class EStrategy {
Default,
SortedMerge,
StreamLookup
StreamLookup,
ForceMap,
ForceGrace
};
EStrategy Strategy = EStrategy::Default;
};
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/sql/v1/sql_select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ bool TSqlSelect::JoinOp(ISource* join, const TRule_join_source::TBlock3& block,
newStrategy = TJoinLinkSettings::EStrategy::SortedMerge;
} else if (canonizedName == "streamlookup") {
newStrategy = TJoinLinkSettings::EStrategy::StreamLookup;
} else if (canonizedName == "map") {
newStrategy = TJoinLinkSettings::EStrategy::ForceMap;
} else if (canonizedName == "grace") {
newStrategy = TJoinLinkSettings::EStrategy::ForceGrace;
} else {
Ctx.Warning(hint.Pos, TIssuesIds::YQL_UNUSED_HINT) << "Unsupported join strategy: " << hint.Name;
}
Expand Down

0 comments on commit 9692293

Please sign in to comment.