Skip to content

Commit

Permalink
feat(kqp): add pragma for sequential reads (#11715)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina authored Nov 19, 2024
1 parent 563d9af commit 92bb6c7
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 0 deletions.
5 changes: 5 additions & 0 deletions ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptim
matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos());
}

if (kqpCtx.Config->HasMaxSequentialReadsInFlight()) {
settings.SequentialInFlight = *kqpCtx.Config->MaxSequentialReadsInFlight.Get();
matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos());
}

TVector<TExprBase> inputs;
TVector<TCoArgument> args;
TNodeOnNodeOwnedMap argReplaces;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
REGISTER_SETTING(*this, MaxDPccpDPTableSize);

REGISTER_SETTING(*this, MaxTasksPerStage);
REGISTER_SETTING(*this, MaxSequentialReadsInFlight);

/* Runtime */
REGISTER_SETTING(*this, ScanQuery);
Expand Down Expand Up @@ -147,6 +148,10 @@ bool TKikimrSettings::HasOptUseFinalizeByKey() const {
return GetFlagValue(OptUseFinalizeByKey.Get().GetOrElse(true)) != EOptionalFlag::Disabled;
}

bool TKikimrSettings::HasMaxSequentialReadsInFlight() const {
return !MaxSequentialReadsInFlight.Get().Empty();
}

EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const {
return GetOptionalFlagValue(OptEnablePredicateExtract.Get());
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ struct TKikimrSettings {


NCommon::TConfSetting<ui32, false> MaxTasksPerStage;
NCommon::TConfSetting<ui32, false> MaxSequentialReadsInFlight;

/* Runtime */
NCommon::TConfSetting<bool, true> ScanQuery;
Expand All @@ -88,6 +89,7 @@ struct TKikimrSettings {
bool HasOptEnableOlapPushdown() const;
bool HasOptEnableOlapProvideComputeSharding() const;
bool HasOptUseFinalizeByKey() const;
bool HasMaxSequentialReadsInFlight() const;

EOptionalFlag GetOptPredicateExtract() const;
EOptionalFlag GetUseLlvm() const;
Expand Down
39 changes: 39 additions & 0 deletions ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4322,7 +4322,46 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
}

Y_UNIT_TEST_TWIN(SequentialReadsPragma, Enabled) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

NYdb::NTable::TExecDataQuerySettings querySettings;
querySettings.CollectQueryStats(ECollectQueryStatsMode::Profile);

TString query = R"(
SELECT Key, Data FROM `/Root/EightShard`
WHERE Text = "Value1"
ORDER BY Key
LIMIT 1;
)";

if (Enabled) {
TString pragma = TString(R"(
PRAGMA ydb.MaxSequentialReadsInFlight = "1";
)");

query = pragma + query;
}

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), querySettings).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([[[101u];[1]]])", FormatResultSetYson(result.GetResultSet(0)));

auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
for (const auto& phase : stats.query_phases()) {
for (const auto& access : phase.table_access()) {
if (access.name() == "/Root/EightShard") {
if (Enabled) {
UNIT_ASSERT_LT(access.partitions_count(), 8);
} else {
UNIT_ASSERT_EQUAL(access.partitions_count(), 8);
}
}
}
}
}

}

Expand Down

0 comments on commit 92bb6c7

Please sign in to comment.