Skip to content

Commit

Permalink
Merge 9147fb4 into 8b8059d
Browse files Browse the repository at this point in the history
  • Loading branch information
pashandor789 authored Nov 28, 2024
2 parents 8b8059d + 9147fb4 commit 6c0943f
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 26 deletions.
5 changes: 3 additions & 2 deletions ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
}

TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {
auto maxDPccpDPTableSize = Config->MaxDPccpDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPccpDPTableSize);
auto maxDPhypDPTableSize = Config->MaxDPHypDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPHypDPTableSize);
auto optLevel = Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel);
auto providerCtx = TKqpProviderContext(KqpCtx, optLevel);
auto opt = std::unique_ptr<IOptimizerNew>(MakeNativeOptimizerNew(providerCtx, maxDPccpDPTableSize));
TExprContext dummyCtx;
auto opt = std::unique_ptr<IOptimizerNew>(MakeNativeOptimizerNew(providerCtx, maxDPhypDPTableSize, dummyCtx));
TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, optLevel,
*opt, [](auto& rels, auto label, auto node, auto stat) {
rels.emplace_back(std::make_shared<TKqpRelOptimizerNode>(TString(label), *stat, node));
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
REGISTER_SETTING(*this, EnableSpillingNodes)
.Parser([](const TString& v) { return ParseEnableSpillingNodes(v); });

REGISTER_SETTING(*this, MaxDPccpDPTableSize);
REGISTER_SETTING(*this, MaxDPHypDPTableSize);

REGISTER_SETTING(*this, MaxTasksPerStage);
REGISTER_SETTING(*this, MaxSequentialReadsInFlight);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ struct TKikimrSettings {
NCommon::TConfSetting<bool, false> OptUseFinalizeByKey;
NCommon::TConfSetting<ui32, false> CostBasedOptimizationLevel;

NCommon::TConfSetting<ui32, false> MaxDPccpDPTableSize;
NCommon::TConfSetting<ui32, false> MaxDPHypDPTableSize;


NCommon::TConfSetting<ui32, false> MaxTasksPerStage;
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3566,7 +3566,7 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
execSettings)
.ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT(result.GetIssues().Empty());
// UNIT_ASSERT(result.GetIssues().Empty());
UNIT_ASSERT_VALUES_EQUAL(NYdb::FormatResultSetYson(result.GetResultSet(0)),
"[[[\"Table1Primary3\"]];[[\"Table1Primary4\"]]]");

Expand Down Expand Up @@ -3610,7 +3610,7 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
execSettings)
.ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT(result.GetIssues().Empty());
// UNIT_ASSERT(result.GetIssues().Empty());
UNIT_ASSERT_VALUES_EQUAL(NYdb::FormatResultSetYson(result.GetResultSet(0)),
"[[[\"Table1Primary4\"];[4]];[[\"Table1Primary3\"];[3]]]");

Expand Down Expand Up @@ -3655,7 +3655,7 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
execSettings)
.ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT(result.GetIssues().Empty());
// UNIT_ASSERT(result.GetIssues().Empty());
UNIT_ASSERT_VALUES_EQUAL(NYdb::FormatResultSetYson(result.GetResultSet(0)),
"[[[\"Table1Primary3\"]];[[\"Table1Primary4\"]];[[\"Table1Primary55\"]]]");

Expand Down Expand Up @@ -3699,7 +3699,7 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
execSettings)
.ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT(result.GetIssues().Empty());
// UNIT_ASSERT(result.GetIssues().Empty());
UNIT_ASSERT_VALUES_EQUAL(NYdb::FormatResultSetYson(result.GetResultSet(0)),
"[[[\"Table1Primary55\"];[55]];[[\"Table1Primary4\"];[4]];[[\"Table1Primary3\"];[3]]]");

Expand Down Expand Up @@ -3804,7 +3804,7 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
execSettings)
.ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT(result.GetIssues().Empty());
// UNIT_ASSERT(result.GetIssues().Empty());
UNIT_ASSERT_VALUES_EQUAL(NYdb::FormatResultSetYson(result.GetResultSet(0)),
"[[[\"Table1Primary3\"];[\"cc\"]];[[\"Table1Primary4\"];[\"dd\"]]]");

Expand Down Expand Up @@ -3856,7 +3856,7 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
execSettings)
.ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT(result.GetIssues().Empty());
// UNIT_ASSERT(result.GetIssues().Empty());
UNIT_ASSERT_VALUES_EQUAL(NYdb::FormatResultSetYson(result.GetResultSet(0)),
"[[[\"Table1Primary3\"];[\"cc\"]];[[\"Table1Primary4\"];[\"dd\"]];[[\"Table1Primary55\"];#]]");

Expand Down Expand Up @@ -4397,7 +4397,7 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
execSettings).ExtractValueSync();

UNIT_ASSERT_C(result2.IsSuccess(), result2.GetIssues().ToString());
UNIT_ASSERT(result2.GetIssues().Empty());
// UNIT_ASSERT(result2.GetIssues().Empty());

UNIT_ASSERT_VALUES_EQUAL(NYdb::FormatResultSetYson(result2.GetResultSet(0)), "[[[\"Payload1\"]]]");

Expand Down
12 changes: 8 additions & 4 deletions ydb/library/yql/dq/opt/dq_cbo_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ Y_UNIT_TEST_SUITE(DQCBO) {

Y_UNIT_TEST(Empty) {
TBaseProviderContext pctx;
std::unique_ptr<IOptimizerNew> optimizer = std::unique_ptr<IOptimizerNew>(MakeNativeOptimizerNew(pctx, 100000));
TExprContext dummyCtx;
std::unique_ptr<IOptimizerNew> optimizer = std::unique_ptr<IOptimizerNew>(MakeNativeOptimizerNew(pctx, 100000, dummyCtx));
}

Y_UNIT_TEST(JoinSearch2Rels) {
TBaseProviderContext pctx;
std::unique_ptr<IOptimizerNew> optimizer = std::unique_ptr<IOptimizerNew>(MakeNativeOptimizerNew(pctx, 100000));
TExprContext dummyCtx;
std::unique_ptr<IOptimizerNew> optimizer = std::unique_ptr<IOptimizerNew>(MakeNativeOptimizerNew(pctx, 100000, dummyCtx));

auto rel1 = std::make_shared<TRelOptimizerNode>(
"a",
Expand Down Expand Up @@ -80,7 +82,8 @@ Type: ManyManyJoin, Nrows: 2e+10, Ncols: 2, ByteSize: 0, Cost: 2.00112e+10, Sel:

Y_UNIT_TEST(JoinSearch3Rels) {
TBaseProviderContext pctx;
std::unique_ptr<IOptimizerNew> optimizer = std::unique_ptr<IOptimizerNew>(MakeNativeOptimizerNew(pctx, 100000));
TExprContext dummyCtx;
std::unique_ptr<IOptimizerNew> optimizer = std::unique_ptr<IOptimizerNew>(MakeNativeOptimizerNew(pctx, 100000, dummyCtx));

auto rel1 = std::make_shared<TRelOptimizerNode>("a",
TOptimizerStatistics(BaseTable, 100000, 1, 0, 1000000));
Expand Down Expand Up @@ -243,7 +246,8 @@ Y_UNIT_TEST(DqOptimizeEquiJoinWithCostsNative) {
TExprContext ctx;
TBaseProviderContext pctx;
std::function<IOptimizerNew*()> optFactory = [&]() {
return MakeNativeOptimizerNew(pctx, 100000);
TExprContext dummyCtx;
return MakeNativeOptimizerNew(pctx, 100000, dummyCtx);
};
_DqOptimizeEquiJoinWithCosts(optFactory, ctx);
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/dq/opt/dq_opt_hypergraph_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ struct TTestContext : public TBaseProviderContext {
template <typename TProviderContext = TTestContext>
std::shared_ptr<IBaseOptimizerNode> Enumerate(const std::shared_ptr<IBaseOptimizerNode>& root, const TOptimizerHints& hints = {}) {
auto ctx = TProviderContext();
TExprContext dummyCtx;
auto optimizer =
std::unique_ptr<IOptimizerNew>(MakeNativeOptimizerNew(ctx, std::numeric_limits<ui32>::max()));
std::unique_ptr<IOptimizerNew>(MakeNativeOptimizerNew(ctx, std::numeric_limits<ui32>::max(), dummyCtx));

Y_ENSURE(root->Kind == EOptimizerNodeKind::JoinNodeType);
auto res = optimizer->JoinSearch(std::static_pointer_cast<TJoinOptimizerNode>(root), hints);
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/opt/dq_opt_join_cbo_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ namespace NYql::NDq {
namespace {
class TDqOptimizerFactory : public IOptimizerFactory {
public:
virtual IOptimizerNew::TPtr MakeJoinCostBasedOptimizerNative(IProviderContext& pctx, TExprContext&, const TNativeSettings& settings) const override {
return IOptimizerNew::TPtr(MakeNativeOptimizerNew(pctx, settings.MaxDPhypDPTableSize));
virtual IOptimizerNew::TPtr MakeJoinCostBasedOptimizerNative(IProviderContext& pctx, TExprContext& ectx, const TNativeSettings& settings) const override {
return IOptimizerNew::TPtr(MakeNativeOptimizerNew(pctx, settings.MaxDPhypDPTableSize, ectx));
}

virtual IOptimizerNew::TPtr MakeJoinCostBasedOptimizerPG(IProviderContext& pctx, TExprContext& ctx, const TPGSettings& settings) const override {
Expand Down
26 changes: 20 additions & 6 deletions ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,10 @@ void ComputeStatistics(const std::shared_ptr<TJoinOptimizerNode>& join, IProvide

class TOptimizerNativeNew: public IOptimizerNew {
public:
TOptimizerNativeNew(IProviderContext& ctx, ui32 maxDPhypDPTableSize)
TOptimizerNativeNew(IProviderContext& ctx, ui32 maxDPhypDPTableSize, TExprContext& exprCtx)
: IOptimizerNew(ctx)
, MaxDPhypTableSize_(maxDPhypDPTableSize)
, MaxDPHypTableSize_(maxDPhypDPTableSize)
, ExprCtx(exprCtx)
{}

std::shared_ptr<TJoinOptimizerNode> JoinSearch(
Expand Down Expand Up @@ -272,8 +273,15 @@ class TOptimizerNativeNew: public IOptimizerNew {
TJoinHypergraph<TNodeSet> hypergraph = MakeJoinHypergraph<TNodeSet>(joinTree, hints);
TDPHypSolver<TNodeSet> solver(hypergraph, this->Pctx);

if (solver.CountCC(MaxDPhypTableSize_) >= MaxDPhypTableSize_) {
if (solver.CountCC(MaxDPHypTableSize_) >= MaxDPHypTableSize_) {
YQL_CLOG(TRACE, CoreDq) << "Maximum DPhyp threshold exceeded";
ExprCtx.AddWarning(
YqlIssue(
{}, TIssuesIds::DQ_OPTIMIZE_ERROR,
"Cost Based Optimizer could not be applied to this query: "
"Enumeration is too large, use PRAGMA MaxDPHypDPTableSize='4294967295' to disable the limitation"
)
);
ComputeStatistics(joinTree, this->Pctx);
return joinTree;
}
Expand Down Expand Up @@ -304,11 +312,12 @@ class TOptimizerNativeNew: public IOptimizerNew {
}

private:
ui32 MaxDPhypTableSize_;
ui32 MaxDPHypTableSize_;
TExprContext& ExprCtx;
};

IOptimizerNew* MakeNativeOptimizerNew(IProviderContext& ctx, const ui32 maxDPhypDPTableSize) {
return new TOptimizerNativeNew(ctx, maxDPhypDPTableSize);
IOptimizerNew* MakeNativeOptimizerNew(IProviderContext& pctx, const ui32 maxDPhypDPTableSize, TExprContext& ectx) {
return new TOptimizerNativeNew(pctx, maxDPhypDPTableSize, ectx);
}

TExprBase DqOptimizeEquiJoinWithCosts(
Expand Down Expand Up @@ -357,6 +366,11 @@ TExprBase DqOptimizeEquiJoinWithCosts(
// of the EquiJoin and n-1 argument are the parameters to EquiJoin

if (!DqCollectJoinRelationsWithStats(rels, typesCtx, equiJoin, providerCollect)){
ctx.AddWarning(
YqlIssue({}, TIssuesIds::DQ_OPTIMIZE_ERROR,
"Cost Based Optimizer could not be applied to this query: couldn't load statistics"
)
);
return node;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/opt/dq_opt_join_cost_based.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ NYql::NNodes::TExprBase DqOptimizeEquiJoinWithCosts(
const TOptimizerHints& hints = {}
);

IOptimizerNew* MakeNativeOptimizerNew(IProviderContext& ctx, const ui32 maxDPccpDPTableSize);
IOptimizerNew* MakeNativeOptimizerNew(IProviderContext& ctx, const ui32 maxDPHypDPTableSize, TExprContext& ectx);

} // namespace NYql::NDq
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/dq/common/yql_dq_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ struct TDqSettings {
static constexpr ETaskRunnerStats TaskRunnerStats = ETaskRunnerStats::Basic;
static constexpr ESpillingEngine SpillingEngine = ESpillingEngine::Disable;
static constexpr ui32 CostBasedOptimizationLevel = 4;
static constexpr ui32 MaxDPccpDPTableSize = 40000U;
static constexpr ui32 MaxDPHypDPTableSize = 40000U;
static constexpr ui64 MaxAttachmentsSize = 2_GB;
static constexpr bool SplitStageOnDqReplicate = true;
static constexpr ui64 EnableSpillingNodes = 0;
Expand Down

0 comments on commit 6c0943f

Please sign in to comment.