diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index fba12d565bfe..0b9674e3df88 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1650,11 +1650,18 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetType(), stage)) { + if (stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage)) { auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables."; LOG_E(error); ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp index 22fbcac96399..9fceac7ea47a 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp @@ -387,7 +387,6 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx .Columns(read.Columns()) .LookupKeys(keys) .Index(indexName.Cast()) - .LookupKeys(keys) .Done(); } } else if (kqpCtx.IsDataQuery()) { diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index 6211df3be3cd..7be32f87aa9b 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -11,10 +11,11 @@ namespace NKqp { using namespace NYdb; using namespace NYdb::NTable; -static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead) { +static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead, bool streamLookup = true) { auto app = NKikimrConfig::TAppConfig(); app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(sourceRead); app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(sourceRead); + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup); return app; } @@ -43,7 +44,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { //runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG); } Y_UNIT_TEST_TWIN(PointLookup, SourceRead) { - TKikimrRunner kikimr(GetAppConfig(SourceRead)); + TKikimrRunner kikimr(GetAppConfig(SourceRead, false)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp b/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp index 1f85761e0c92..b8912474b5b8 100644 --- a/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp +++ b/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp @@ -401,6 +401,7 @@ Y_UNIT_TEST_TWIN(BigRow, EnableInplaceUpdate) { // source read use iterator interface, that doesn't use datashard transactions NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); auto settings = TKikimrSettings() .SetAppConfig(appConfig) diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 3bc943c7fd95..00ae7554fb6a 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -3962,9 +3962,9 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda auto& stats = NYdb::TProtoAccessor::GetProto(*result2.GetStats()); - int readPhase = 1; + int readPhase = 0; if (serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access().size(), 2); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).name(), "/Root/SecondaryComplexKeys"); @@ -3974,6 +3974,8 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda } else { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); + readPhase++; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).name(), "/Root/SecondaryComplexKeys/Index/indexImplTable"); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).reads().rows(), 1); diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp index 8bcad00b36d0..3c69410c54a6 100644 --- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp @@ -90,6 +90,7 @@ Y_UNIT_TEST_SUITE(KqpIndexLookupJoin) { void Test(const TString& query, const TString& answer, size_t rightTableReads, bool useStreamLookup = false) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(useStreamLookup); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); auto settings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(settings); diff --git a/ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp b/ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp index c1c107893289..8771814bd2e6 100644 --- a/ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp @@ -143,8 +143,14 @@ void Test(const TString& query, const TString& answer, THashSet allowSc } } -void TestRange(const TString& query, const TString& answer, ui64 rowsRead, int stagesCount = 1) { - TKikimrRunner kikimr; +void TestRange(const TString& query, const TString& answer, ui64 rowsRead, int stagesCount = 1, bool streamLookup = true) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup); + + auto settings = TKikimrSettings() + .SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -189,7 +195,8 @@ Y_UNIT_TEST(OverflowLookup) { )", R"([])", 0, - 2); + 2, + false); TestRange( R"( diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index adc7addeac75..db38d3735fe4 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -205,7 +205,12 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { auto explainResult = session.ExplainDataQuery(query).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString()); - UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst()); + + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst()); + } else { + UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst()); + } auto params = kikimr.GetTableClient().GetParamsBuilder() .AddParam("$group").OptionalUint32(1).Build() @@ -1224,7 +1229,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { size_t phase = 0; if (stats.query_phases().size() == 2) { phase = 1; - } else if (stats.query_phases().size() == 0) { + } else if (stats.query_phases().size() == 1) { phase = 0; } else { UNIT_ASSERT(false); diff --git a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp index ddf538058fd2..828615f3bd8e 100644 --- a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp +++ b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp @@ -280,6 +280,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { Y_UNIT_TEST_QUAD(IndexLookupJoin, EnableStreamLookup, QueryService) { NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(EnableStreamLookup); auto settings = TKikimrSettings() .SetAppConfig(appConfig); diff --git a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp index f359d21f7e35..034e02064a1c 100644 --- a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp @@ -497,7 +497,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) { UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue"); node = FindPlanNodeByKv(plan, "Name", "TableFullScan"); UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue"); - node = FindPlanNodeByKv(plan, "Name", "TablePointLookup"); + + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + node = FindPlanNodeByKv(plan, "Node Type", "TableLookup"); + } else { + node = FindPlanNodeByKv(plan, "Name", "TablePointLookup"); + } + UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue"); } @@ -535,8 +541,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) { UNIT_ASSERT_VALUES_EQUAL(rangeScansCount, 1); ui32 lookupsCount = 0; - lookupsCount = CountPlanNodesByKv(plan, "Node Type", "Stage-TablePointLookup"); - lookupsCount += CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr"); + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TableLookup"); + } else { + lookupsCount = CountPlanNodesByKv(plan, "Node Type", "Stage-TablePointLookup"); + lookupsCount += CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr"); + } + UNIT_ASSERT_VALUES_EQUAL(lookupsCount, 1); /* check tables section */ @@ -902,7 +913,11 @@ Y_UNIT_TEST_SUITE(KqpExplain) { } Y_UNIT_TEST(MultiJoinCteLinks) { - TKikimrRunner kikimr; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig); + TKikimrRunner kikimr{settings}; auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp index 2c755dd6038d..f57bf4005ab1 100644 --- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp @@ -351,6 +351,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) { Y_UNIT_TEST(QueryTimeoutImmediate) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); auto settings = TKikimrSettings() .SetAppConfig(appConfig); TKikimrRunner kikimr{settings}; @@ -490,6 +491,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) { Y_UNIT_TEST(QueryCancelImmediate) { NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); auto settings = TKikimrSettings() .SetAppConfig(appConfig); diff --git a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp index bf19a63de4d8..83779196a891 100644 --- a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp @@ -102,6 +102,7 @@ TCollectedStreamResult JoinStatsBasic( std::function getIter) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); auto settings = TKikimrSettings() .SetAppConfig(appConfig); diff --git a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp index b59407d5022b..bf56e63e7985 100644 --- a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp @@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { if (result.GetStatus() == EStatus::SUCCESS) continue; - if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead() && false) { + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR, [](const NYql::TIssue& issue){ return issue.GetMessage().Contains("has no snapshot at"); diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index 5ca397d677d3..3ef6ae03c9c4 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -224,7 +224,7 @@ message TTableServiceConfig { optional uint64 SessionIdleDurationSeconds = 28 [default = 600]; optional TAggregationConfig AggregationConfig = 29; optional bool EnableKqpScanQueryStreamLookup = 30 [default = true]; - optional bool EnableKqpDataQueryStreamLookup = 31 [default = false]; + optional bool EnableKqpDataQueryStreamLookup = 31 [default = true]; optional TExecuterRetriesConfig ExecuterRetriesConfig = 32; reserved 33; // optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false]; optional bool EnablePublishKqpProxyByRM = 34 [default = true]; diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index a2e73d7480a3..17c07969f672 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -1526,6 +1526,7 @@ Y_UNIT_TEST(TestMvccReadDoesntBlockWrites) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetEnableMvccSnapshotReads(false); serverSettings.SetDomainName("Root") @@ -1863,9 +1864,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) { Y_UNIT_TEST(MvccTestOutOfOrderRestartLocksSingleWithoutBarrier) { TPortManager pm; + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetEnableMvccSnapshotReads(false); serverSettings.SetDomainName("Root") + .SetAppConfig(app) .SetUseRealThreads(false); Tests::TServer::TPtr server = new TServer(serverSettings); @@ -3507,6 +3511,7 @@ Y_UNIT_TEST(MvccTestSnapshotRead) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetAppConfig(app)