From 6e3fc43bac31f6eb9e281e8d22205fdcc942f4ee Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Thu, 19 Sep 2024 11:38:03 +0300 Subject: [PATCH] Fix sinks order (#9345) --- .github/config/muted_ya.txt | 5 - ydb/core/kqp/opt/kqp_opt_build_txs.cpp | 106 +++++++++++++-- .../s3/kqp_federated_query_ut.cpp | 3 +- ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 124 +++++++++++++++++- 4 files changed, 217 insertions(+), 21 deletions(-) diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index a2cbe6c0eb7e..6307b5a72a16 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -24,12 +24,7 @@ ydb/core/kqp/ut/service [*/*]* ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad -ydb/core/kqp/ut/service KqpQueryService.TableSink_OlapRWQueries -ydb/core/kqp/ut/service KqpQueryService.TableSink_Htap -ydb/core/kqp/ut/tx KqpSnapshotRead.ReadOnlyTxWithIndexCommitsOnConcurrentWrite+withSink -ydb/core/kqp/ut/tx KqpSinkMvcc.OltpNamedStatement ydb/core/kqp/ut/tx KqpSinkMvcc.OlapNamedStatement -ydb/core/kqp/ut/tx KqpSinkMvcc.OltpMultiSinks ydb/core/kqp/ut/tx KqpSinkMvcc.OlapMultiSinks ydb/core/persqueue/ut [*/*]* ydb/core/persqueue/ut TPQTest.*DirectRead* diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index e43f958d1c13..f15d85253640 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -560,16 +560,20 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase { } if (!query.Effects().Empty()) { - auto tx = BuildTx(query.Effects().Ptr(), ctx, /* isPrecompute */ false); - if (!tx) { - return TStatus::Error; - } + auto collectedEffects = CollectEffects(query.Effects(), ctx); - if (!CheckEffectsTx(tx.Cast(), query, ctx)) { - return TStatus::Error; - } + for (auto& effects : collectedEffects) { + auto tx = BuildTx(effects.Ptr(), ctx, /* isPrecompute */ false); + if (!tx) { + return TStatus::Error; + } - BuildCtx->PhysicalTxs.emplace_back(tx.Cast()); + if (!CheckEffectsTx(tx.Cast(), effects, ctx)) { + return TStatus::Error; + } + + BuildCtx->PhysicalTxs.emplace_back(tx.Cast()); + } } return TStatus::Ok; @@ -581,8 +585,86 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase { } private: - bool HasTableEffects(const TKqlQuery& query) const { - for (const TExprBase& effect : query.Effects()) { + TVector CollectEffects(const TExprList& list, TExprContext& ctx) { + struct TEffectsInfo { + enum class EType { + KQP_EFFECT, + KQP_SINK, + EXTERNAL_SINK, + }; + + EType Type; + THashSet TablesPathIds; + TVector Exprs; + }; + TVector effectsInfos; + + for (const auto& expr : list) { + if (auto sinkEffect = expr.Maybe()) { + const size_t sinkIndex = FromString(TStringBuf(sinkEffect.Cast().SinkIndex())); + const auto stage = sinkEffect.Cast().Stage().Maybe(); + YQL_ENSURE(stage); + YQL_ENSURE(stage.Cast().Outputs()); + const auto outputs = stage.Cast().Outputs().Cast(); + YQL_ENSURE(sinkIndex < outputs.Size()); + const auto sink = outputs.Item(sinkIndex).Maybe(); + YQL_ENSURE(sink); + + const auto sinkSettings = sink.Cast().Settings().Maybe(); + if (!sinkSettings) { + // External writes always use their own physical transaction. + effectsInfos.emplace_back(); + effectsInfos.back().Type = TEffectsInfo::EType::EXTERNAL_SINK; + effectsInfos.back().Exprs.push_back(expr.Ptr()); + } else { + // Two table sinks can't be executed in one physical transaction if they write into one table. + const TStringBuf tablePathId = sinkSettings.Cast().Table().PathId().Value(); + + auto it = std::find_if( + std::begin(effectsInfos), + std::end(effectsInfos), + [&tablePathId](const auto& effectsInfo) { + return effectsInfo.Type == TEffectsInfo::EType::KQP_SINK + && !effectsInfo.TablesPathIds.contains(tablePathId); + }); + if (it == std::end(effectsInfos)) { + effectsInfos.emplace_back(); + it = std::prev(std::end(effectsInfos)); + it->Type = TEffectsInfo::EType::KQP_SINK; + } + it->TablesPathIds.insert(tablePathId); + it->Exprs.push_back(expr.Ptr()); + } + } else { + // Table effects are executed all in one physical transaction. + auto it = std::find_if( + std::begin(effectsInfos), + std::end(effectsInfos), + [](const auto& effectsInfo) { return effectsInfo.Type == TEffectsInfo::EType::KQP_EFFECT; }); + if (it == std::end(effectsInfos)) { + effectsInfos.emplace_back(); + it = std::prev(std::end(effectsInfos)); + it->Type = TEffectsInfo::EType::KQP_EFFECT; + } + it->Exprs.push_back(expr.Ptr()); + } + } + + TVector results; + + for (const auto& effects : effectsInfos) { + auto builder = Build(ctx, list.Pos()); + for (const auto& expr : effects.Exprs) { + builder.Add(expr); + } + results.push_back(builder.Done()); + } + + return results; + } + + bool HasTableEffects(const TExprList& effectsList) const { + for (const TExprBase& effect : effectsList) { if (auto maybeSinkEffect = effect.Maybe()) { // (KqpSinkEffect (DqStage (... ((DqSink '0 (DataSink '"kikimr") ...)))) '0) auto sinkEffect = maybeSinkEffect.Cast(); @@ -608,7 +690,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase { return false; } - bool CheckEffectsTx(TKqpPhysicalTx tx, const TKqlQuery& query, TExprContext& ctx) const { + bool CheckEffectsTx(TKqpPhysicalTx tx, const TExprList& effectsList, TExprContext& ctx) const { TMaybeNode blackistedNode; VisitExpr(tx.Ptr(), [&blackistedNode](const TExprNode::TPtr& exprNode) { if (blackistedNode) { @@ -630,7 +712,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase { return true; }); - if (blackistedNode && HasTableEffects(query)) { + if (blackistedNode && HasTableEffects(effectsList)) { ctx.AddError(TIssue(ctx.GetPosition(blackistedNode.Cast().Pos()), TStringBuilder() << "Callable not expected in effects tx: " << blackistedNode.Cast().CallableName())); return false; diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp index b839915d30eb..ff182ea2f6b7 100644 --- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp @@ -1224,8 +1224,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { auto db = kikimr->GetQueryClient(); auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); resultFuture.Wait(); - UNIT_ASSERT_C(!resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); - UNIT_ASSERT_STRING_CONTAINS(resultFuture.GetValueSync().GetIssues().ToString(), "Callable not expected in effects tx: Unwrap"); + UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); } } diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index cbae2cba2209..33a99973bb2d 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -3174,7 +3174,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(1))); } - /*{ + { auto result = client.ExecuteQuery(R"( DELETE FROM `/Root/ColumnShard` ON SELECT * FROM `/Root/DataShard` WHERE Col1 > 9; )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); @@ -3223,7 +3223,127 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { DELETE FROM `/Root/ColumnShard` WHERE Col2 = "not found"; )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - }*/ + } + } + + Y_UNIT_TEST_TWIN(TableSink_HtapComplex, withOltpSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withOltpSink); + appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + + const TString query = R"( + CREATE TABLE `/Root/ColumnSrc` ( + Col1 Uint64 NOT NULL, + Col2 String NOT NULL, + Col3 Int32 NOT NULL, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10); + + CREATE TABLE `/Root/RowSrc` ( + Col1 Uint64 NOT NULL, + Col2 String NOT NULL, + Col3 Int32 NOT NULL, + PRIMARY KEY (Col1) + ) + WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2); + + CREATE TABLE `/Root/ColumnDst` ( + Col1 Uint64 NOT NULL, + Col2 String, + Col3 Int32, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10); + + CREATE TABLE `/Root/RowDst` ( + Col1 Uint64 NOT NULL, + Col2 String, + Col3 Int32, + PRIMARY KEY (Col1) + ) + WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto client = kikimr.GetQueryClient(); + + { + auto result = client.ExecuteQuery(R"( + UPSERT INTO `/Root/ColumnSrc` (Col1, Col2, Col3) VALUES + (1u, "test1", 10), (2u, "test2", 11); + REPLACE INTO `/Root/ColumnSrc` (Col1, Col2, Col3) VALUES + (3u, "test3", 12), (4u, "test", 13); + UPSERT INTO `/Root/RowSrc` (Col1, Col2, Col3) VALUES + (10u, "test1", 10), (20u, "test2", 11); + REPLACE INTO `/Root/RowSrc` (Col1, Col2, Col3) VALUES + (30u, "test3", 12), (40u, "test", 13); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( + $data = SELECT c.Col1 as Col1, c.Col2 As Col2, r.Col3 AS Col3 + FROM `/Root/ColumnSrc`as c + JOIN `/Root/RowSrc` as r + ON c.Col1 + 10 = r.Col3; + UPSERT INTO `/Root/ColumnDst` SELECT * FROM $data; + REPLACE INTO `/Root/RowDst` SELECT * FROM $data; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( + SELECT COUNT(*) FROM `/Root/ColumnDst`; + SELECT COUNT(*) FROM `/Root/RowDst`; + DELETE FROM `/Root/ColumnDst` WHERE 1=1; + DELETE FROM `/Root/RowDst` WHERE 1=1; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(1))); + } + + { + auto result = client.ExecuteQuery(R"( + $prepare = SELECT * + FROM `/Root/ColumnSrc` + WHERE Col2 LIKE '%test%test%'; + $data = SELECT c.Col1 as Col1, c.Col2 As Col2, r.Col3 AS Col3 + FROM `/Root/RowSrc`as c + LEFT OUTER JOIN $prepare as r + ON c.Col1 + 10 = r.Col3; + UPSERT INTO `/Root/ColumnDst` SELECT * FROM $data; + REPLACE INTO `/Root/RowDst` SELECT * FROM $data; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( + SELECT COUNT(*) FROM `/Root/ColumnDst`; + SELECT COUNT(*) FROM `/Root/RowDst`; + DELETE FROM `/Root/ColumnDst` WHERE 1=1; + DELETE FROM `/Root/RowDst` WHERE 1=1; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(1))); + } } Y_UNIT_TEST_TWIN(TableSink_HtapInteractive, withOltpSink) {