Skip to content

Commit

Permalink
feature(kqp): enable stream lookup for data query
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Dec 28, 2023
1 parent 06f9376 commit 1209d62
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 19 deletions.
13 changes: 10 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1650,11 +1650,18 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

bool HassDmlOperationOnOlap(NKqpProto::TKqpPhyTx_EType queryType, const NKqpProto::TKqpPhyStage& stage) {
bool HasDmlOperationOnOlap(NKqpProto::TKqpPhyTx_EType queryType, const NKqpProto::TKqpPhyStage& stage) {
if (queryType == NKqpProto::TKqpPhyTx::TYPE_DATA) {
return true;
}
for (const auto &tableOp : stage.GetTableOps()) {

for (const auto& input : stage.GetInputs()) {
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
return true;
}
}

for (const auto& tableOp : stage.GetTableOps()) {
if (tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
return true;
}
Expand Down Expand Up @@ -1691,7 +1698,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

if (stageInfo.Meta.IsOlap() && HassDmlOperationOnOlap(tx.Body->GetType(), stage)) {
if (stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage)) {
auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables.";
LOG_E(error);
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
.Columns(read.Columns())
.LookupKeys(keys)
.Index(indexName.Cast())
.LookupKeys(keys)
.Done();
}
} else if (kqpCtx.IsDataQuery()) {
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ namespace NKqp {
using namespace NYdb;
using namespace NYdb::NTable;

static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead) {
static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead, bool streamLookup = true) {
auto app = NKikimrConfig::TAppConfig();
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(sourceRead);
app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(sourceRead);
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup);
return app;
}

Expand Down Expand Up @@ -43,7 +44,7 @@ Y_UNIT_TEST_SUITE(KqpCost) {
//runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
}
Y_UNIT_TEST_TWIN(PointLookup, SourceRead) {
TKikimrRunner kikimr(GetAppConfig(SourceRead));
TKikimrRunner kikimr(GetAppConfig(SourceRead, false));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ Y_UNIT_TEST_TWIN(BigRow, EnableInplaceUpdate) {
// source read use iterator interface, that doesn't use datashard transactions
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);

auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3962,9 +3962,9 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda

auto& stats = NYdb::TProtoAccessor::GetProto(*result2.GetStats());

int readPhase = 1;
int readPhase = 0;
if (serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).name(), "/Root/SecondaryComplexKeys");
Expand All @@ -3974,6 +3974,8 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);

readPhase++;

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).name(), "/Root/SecondaryComplexKeys/Index/indexImplTable");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).reads().rows(), 1);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ Y_UNIT_TEST_SUITE(KqpIndexLookupJoin) {
void Test(const TString& query, const TString& answer, size_t rightTableReads, bool useStreamLookup = false) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(useStreamLookup);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);

auto settings = TKikimrSettings().SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
Expand Down
13 changes: 10 additions & 3 deletions ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,14 @@ void Test(const TString& query, const TString& answer, THashSet<TString> allowSc
}
}

void TestRange(const TString& query, const TString& answer, ui64 rowsRead, int stagesCount = 1) {
TKikimrRunner kikimr;
void TestRange(const TString& query, const TString& answer, ui64 rowsRead, int stagesCount = 1, bool streamLookup = true) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup);

auto settings = TKikimrSettings()
.SetAppConfig(appConfig);

TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -189,7 +195,8 @@ Y_UNIT_TEST(OverflowLookup) {
)",
R"([])",
0,
2);
2,
false);

TestRange(
R"(
Expand Down
9 changes: 7 additions & 2 deletions ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,12 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {

auto explainResult = session.ExplainDataQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString());
UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());

if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst());
} else {
UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());
}

auto params = kikimr.GetTableClient().GetParamsBuilder()
.AddParam("$group").OptionalUint32(1).Build()
Expand Down Expand Up @@ -1224,7 +1229,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
size_t phase = 0;
if (stats.query_phases().size() == 2) {
phase = 1;
} else if (stats.query_phases().size() == 0) {
} else if (stats.query_phases().size() == 1) {
phase = 0;
} else {
UNIT_ASSERT(false);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {

Y_UNIT_TEST_QUAD(IndexLookupJoin, EnableStreamLookup, QueryService) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(EnableStreamLookup);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
Expand Down
23 changes: 19 additions & 4 deletions ydb/core/kqp/ut/query/kqp_explain_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
node = FindPlanNodeByKv(plan, "Name", "TableFullScan");
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
node = FindPlanNodeByKv(plan, "Name", "TablePointLookup");

if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
node = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
} else {
node = FindPlanNodeByKv(plan, "Name", "TablePointLookup");
}

UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
}

Expand Down Expand Up @@ -535,8 +541,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT_VALUES_EQUAL(rangeScansCount, 1);

ui32 lookupsCount = 0;
lookupsCount = CountPlanNodesByKv(plan, "Node Type", "Stage-TablePointLookup");
lookupsCount += CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr");
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TableLookup");
} else {
lookupsCount = CountPlanNodesByKv(plan, "Node Type", "Stage-TablePointLookup");
lookupsCount += CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr");
}

UNIT_ASSERT_VALUES_EQUAL(lookupsCount, 1);

/* check tables section */
Expand Down Expand Up @@ -902,7 +913,11 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
}

Y_UNIT_TEST(MultiJoinCteLinks) {
TKikimrRunner kikimr;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
TKikimrRunner kikimr{settings};
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/ut/query/kqp_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
Y_UNIT_TEST(QueryTimeoutImmediate) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
TKikimrRunner kikimr{settings};
Expand Down Expand Up @@ -490,6 +491,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) {

Y_UNIT_TEST(QueryCancelImmediate) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/query/kqp_stats_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ TCollectedStreamResult JoinStatsBasic(
std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
if (result.GetStatus() == EStatus::SUCCESS)
continue;

if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead() && false) {
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
[](const NYql::TIssue& issue){
return issue.GetMessage().Contains("has no snapshot at");
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/protos/table_service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ message TTableServiceConfig {
optional uint64 SessionIdleDurationSeconds = 28 [default = 600];
optional TAggregationConfig AggregationConfig = 29;
optional bool EnableKqpScanQueryStreamLookup = 30 [default = true];
optional bool EnableKqpDataQueryStreamLookup = 31 [default = false];
optional bool EnableKqpDataQueryStreamLookup = 31 [default = true];
optional TExecuterRetriesConfig ExecuterRetriesConfig = 32;
reserved 33; // optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false];
optional bool EnablePublishKqpProxyByRM = 34 [default = true];
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_order.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,7 @@ Y_UNIT_TEST(TestMvccReadDoesntBlockWrites) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetEnableMvccSnapshotReads(false);
serverSettings.SetDomainName("Root")
Expand Down Expand Up @@ -1863,9 +1864,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) {

Y_UNIT_TEST(MvccTestOutOfOrderRestartLocksSingleWithoutBarrier) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetEnableMvccSnapshotReads(false);
serverSettings.SetDomainName("Root")
.SetAppConfig(app)
.SetUseRealThreads(false);

Tests::TServer::TPtr server = new TServer(serverSettings);
Expand Down Expand Up @@ -3507,6 +3511,7 @@ Y_UNIT_TEST(MvccTestSnapshotRead) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetAppConfig(app)
Expand Down

0 comments on commit 1209d62

Please sign in to comment.