diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp index c73d34e7dbdc..91d828e07d6d 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp @@ -110,6 +110,11 @@ TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptim matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos()); } + if (kqpCtx.Config->HasMaxSequentialReadsInFlight()) { + settings.SequentialInFlight = *kqpCtx.Config->MaxSequentialReadsInFlight.Get(); + matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos()); + } + TVector inputs; TVector args; TNodeOnNodeOwnedMap argReplaces; diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.cpp b/ydb/core/kqp/provider/yql_kikimr_settings.cpp index e19bc3995257..4598d36f0bbb 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_settings.cpp @@ -95,6 +95,7 @@ TKikimrConfiguration::TKikimrConfiguration() { REGISTER_SETTING(*this, MaxDPccpDPTableSize); REGISTER_SETTING(*this, MaxTasksPerStage); + REGISTER_SETTING(*this, MaxSequentialReadsInFlight); /* Runtime */ REGISTER_SETTING(*this, ScanQuery); @@ -147,6 +148,10 @@ bool TKikimrSettings::HasOptUseFinalizeByKey() const { return GetFlagValue(OptUseFinalizeByKey.Get().GetOrElse(true)) != EOptionalFlag::Disabled; } +bool TKikimrSettings::HasMaxSequentialReadsInFlight() const { + return !MaxSequentialReadsInFlight.Get().Empty(); +} + EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const { return GetOptionalFlagValue(OptEnablePredicateExtract.Get()); } diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index 34f26f7e303a..3eaeffa9874c 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -72,6 +72,7 @@ struct TKikimrSettings { NCommon::TConfSetting MaxTasksPerStage; + NCommon::TConfSetting MaxSequentialReadsInFlight; /* Runtime */ NCommon::TConfSetting ScanQuery; @@ -88,6 +89,7 @@ struct TKikimrSettings { bool HasOptEnableOlapPushdown() const; bool HasOptEnableOlapProvideComputeSharding() const; bool HasOptUseFinalizeByKey() const; + bool HasMaxSequentialReadsInFlight() const; EOptionalFlag GetOptPredicateExtract() const; EOptionalFlag GetUseLlvm() const; diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 37e1037ba2f0..e73c17e7938f 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -4322,7 +4322,46 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } } + Y_UNIT_TEST_TWIN(SequentialReadsPragma, Enabled) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + NYdb::NTable::TExecDataQuerySettings querySettings; + querySettings.CollectQueryStats(ECollectQueryStatsMode::Profile); + + TString query = R"( + SELECT Key, Data FROM `/Root/EightShard` + WHERE Text = "Value1" + ORDER BY Key + LIMIT 1; + )"; + + if (Enabled) { + TString pragma = TString(R"( + PRAGMA ydb.MaxSequentialReadsInFlight = "1"; + )"); + query = pragma + query; + } + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), querySettings).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[[101u];[1]]])", FormatResultSetYson(result.GetResultSet(0))); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + for (const auto& phase : stats.query_phases()) { + for (const auto& access : phase.table_access()) { + if (access.name() == "/Root/EightShard") { + if (Enabled) { + UNIT_ASSERT_LT(access.partitions_count(), 8); + } else { + UNIT_ASSERT_EQUAL(access.partitions_count(), 8); + } + } + } + } + } }