Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIKIMR-20580: enable stream lookup for data query #611

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1650,11 +1650,18 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

bool HassDmlOperationOnOlap(NKqpProto::TKqpPhyTx_EType queryType, const NKqpProto::TKqpPhyStage& stage) {
bool HasDmlOperationOnOlap(NKqpProto::TKqpPhyTx_EType queryType, const NKqpProto::TKqpPhyStage& stage) {
if (queryType == NKqpProto::TKqpPhyTx::TYPE_DATA) {
return true;
}
for (const auto &tableOp : stage.GetTableOps()) {

for (const auto& input : stage.GetInputs()) {
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
return true;
}
}

for (const auto& tableOp : stage.GetTableOps()) {
if (tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
return true;
}
Expand Down Expand Up @@ -1691,7 +1698,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

if (stageInfo.Meta.IsOlap() && HassDmlOperationOnOlap(tx.Body->GetType(), stage)) {
if (stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage)) {
auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables.";
LOG_E(error);
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew
meta.TableId = MakeTableId(input.GetStreamLookup().GetTable());
meta.TablePath = input.GetStreamLookup().GetTable().GetPath();
meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId);
YQL_ENSURE(meta.TableConstInfo);
meta.TableKind = meta.TableConstInfo->TableKind;
}

if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kSequencer) {
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ namespace NKqp {
using namespace NYdb;
using namespace NYdb::NTable;

static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead) {
static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead, bool streamLookup = true) {
auto app = NKikimrConfig::TAppConfig();
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(sourceRead);
app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(sourceRead);
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup);
return app;
}

Expand Down Expand Up @@ -43,7 +44,7 @@ Y_UNIT_TEST_SUITE(KqpCost) {
//runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
}
Y_UNIT_TEST_TWIN(PointLookup, SourceRead) {
TKikimrRunner kikimr(GetAppConfig(SourceRead));
TKikimrRunner kikimr(GetAppConfig(SourceRead, false));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ Y_UNIT_TEST_TWIN(BigRow, EnableInplaceUpdate) {
// source read use iterator interface, that doesn't use datashard transactions
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);

auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4030,9 +4030,9 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda

auto& stats = NYdb::TProtoAccessor::GetProto(*result2.GetStats());

int readPhase = 1;
int readPhase = 0;
if (serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).name(), "/Root/SecondaryComplexKeys");
Expand All @@ -4042,6 +4042,8 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);

readPhase++;

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).name(), "/Root/SecondaryComplexKeys/Index/indexImplTable");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).reads().rows(), 1);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ Y_UNIT_TEST_SUITE(KqpIndexLookupJoin) {
void Test(const TString& query, const TString& answer, size_t rightTableReads, bool useStreamLookup = false) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(useStreamLookup);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);

auto settings = TKikimrSettings().SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
Expand Down
13 changes: 10 additions & 3 deletions ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,14 @@ void Test(const TString& query, const TString& answer, THashSet<TString> allowSc
}
}

void TestRange(const TString& query, const TString& answer, ui64 rowsRead, int stagesCount = 1) {
TKikimrRunner kikimr;
void TestRange(const TString& query, const TString& answer, ui64 rowsRead, int stagesCount = 1, bool streamLookup = true) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup);

auto settings = TKikimrSettings()
.SetAppConfig(appConfig);

TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -204,7 +210,8 @@ Y_UNIT_TEST(OverflowLookup) {
)",
R"([])",
0,
2);
2,
false);

TestRange(
R"(
Expand Down
9 changes: 7 additions & 2 deletions ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,12 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {

auto explainResult = session.ExplainDataQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString());
UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());

if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst());
} else {
UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());
}

auto params = kikimr.GetTableClient().GetParamsBuilder()
.AddParam("$group").OptionalUint32(1).Build()
Expand Down Expand Up @@ -1224,7 +1229,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
size_t phase = 0;
if (stats.query_phases().size() == 2) {
phase = 1;
} else if (stats.query_phases().size() == 0) {
} else if (stats.query_phases().size() == 1) {
phase = 0;
} else {
UNIT_ASSERT(false);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {

Y_UNIT_TEST_QUAD(IndexLookupJoin, EnableStreamLookup, QueryService) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(EnableStreamLookup);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
Expand Down
21 changes: 18 additions & 3 deletions ydb/core/kqp/ut/query/kqp_explain_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
node = FindPlanNodeByKv(plan, "Name", "TableFullScan");
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
node = FindPlanNodeByKv(plan, "Name", "TablePointLookup");

if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
node = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
} else {
node = FindPlanNodeByKv(plan, "Name", "TablePointLookup");
}

UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
}

Expand Down Expand Up @@ -533,7 +539,12 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT_VALUES_EQUAL(rangeScansCount, 1);

ui32 lookupsCount = 0;
lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr");
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TableLookup");
} else {
lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr");
}

UNIT_ASSERT_VALUES_EQUAL(lookupsCount, 1);

/* check tables section */
Expand Down Expand Up @@ -899,7 +910,11 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
}

Y_UNIT_TEST(MultiJoinCteLinks) {
TKikimrRunner kikimr;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
TKikimrRunner kikimr{settings};
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/ut/query/kqp_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
Y_UNIT_TEST(QueryTimeoutImmediate) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
TKikimrRunner kikimr{settings};
Expand Down Expand Up @@ -490,6 +491,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) {

Y_UNIT_TEST(QueryCancelImmediate) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/query/kqp_stats_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ TCollectedStreamResult JoinStatsBasic(
std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
if (result.GetStatus() == EStatus::SUCCESS)
continue;

if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead() && false) {
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
[](const NYql::TIssue& issue){
return issue.GetMessage().Contains("has no snapshot at");
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/protos/table_service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ message TTableServiceConfig {
optional uint64 SessionIdleDurationSeconds = 28 [default = 600];
optional TAggregationConfig AggregationConfig = 29;
optional bool EnableKqpScanQueryStreamLookup = 30 [default = true];
optional bool EnableKqpDataQueryStreamLookup = 31 [default = false];
optional bool EnableKqpDataQueryStreamLookup = 31 [default = true];
optional TExecuterRetriesConfig ExecuterRetriesConfig = 32;
reserved 33; // optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false];
optional bool EnablePublishKqpProxyByRM = 34 [default = true];
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_order.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,7 @@ Y_UNIT_TEST(TestMvccReadDoesntBlockWrites) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetEnableMvccSnapshotReads(false);
serverSettings.SetDomainName("Root")
Expand Down Expand Up @@ -1863,9 +1864,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) {

Y_UNIT_TEST(MvccTestOutOfOrderRestartLocksSingleWithoutBarrier) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetEnableMvccSnapshotReads(false);
serverSettings.SetDomainName("Root")
.SetAppConfig(app)
.SetUseRealThreads(false);

Tests::TServer::TPtr server = new TServer(serverSettings);
Expand Down Expand Up @@ -3507,6 +3511,7 @@ Y_UNIT_TEST(MvccTestSnapshotRead) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetAppConfig(app)
Expand Down
Loading
Loading