From c171453b548426afb31427b52a6606c2bdc5126d Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Tue, 17 Sep 2024 10:18:47 +0300 Subject: [PATCH 1/2] fix --- .../kqp/executer_actor/kqp_data_executer.cpp | 4 +- ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 120 +++++++++++++++++- 2 files changed, 120 insertions(+), 4 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 74d0bfda9b68..51602ffddcd5 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2564,8 +2564,8 @@ class TKqpDataExecuter : public TKqpExecuterBase 9; )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); @@ -3223,7 +3223,123 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { DELETE FROM `/Root/ColumnShard` WHERE Col2 = "not found"; )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - }*/ + } + } + + Y_UNIT_TEST_TWIN(TableSink_HtapComplex, withOltpSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withOltpSink); + appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + + const TString query = R"( + CREATE TABLE `/Root/ColumnSrc` ( + Col1 Uint64 NOT NULL, + Col2 String NOT NULL, + Col3 Int32 NOT NULL, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10); + + CREATE TABLE `/Root/RowSrc` ( + Col1 Uint64 NOT NULL, + Col2 String NOT NULL, + Col3 Int32 NOT NULL, + PRIMARY KEY (Col1) + ) + WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2); + + CREATE TABLE `/Root/ColumnDst` ( + Col1 Uint64 NOT NULL, + Col2 String, + Col3 Int32, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10); + + CREATE TABLE `/Root/RowDst` ( + Col1 Uint64 NOT NULL, + Col2 String, + Col3 Int32, + PRIMARY KEY (Col1) + ) + WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto client = kikimr.GetQueryClient(); + + { + auto result = client.ExecuteQuery(R"( + UPSERT INTO `/Root/ColumnSrc` (Col1, Col2, Col3) VALUES + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, "test", 13); + UPSERT INTO `/Root/RowSrc` (Col1, Col2, Col3) VALUES + (10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, "test", 13); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( + $data = SELECT c.Col1 as Col1, c.Col2 As Col2, r.Col3 AS Col3 + FROM `/Root/ColumnSrc`as c + JOIN `/Root/RowSrc` as r + ON c.Col1 + 10 = r.Col3; + UPSERT INTO `/Root/ColumnDst` SELECT * FROM $data; + REPLACE INTO `/Root/RowDst` SELECT * FROM $data; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( + SELECT COUNT(*) FROM `/Root/ColumnDst`; + SELECT COUNT(*) FROM `/Root/RowDst`; + DELETE FROM `/Root/ColumnDst` WHERE 1=1; + DELETE FROM `/Root/RowDst` WHERE 1=1; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(1))); + } + + { + auto result = client.ExecuteQuery(R"( + $prepare = SELECT * + FROM `/Root/ColumnSrc` + WHERE Col2 LIKE 'test?'; + $data = SELECT c.Col1 as Col1, c.Col2 As Col2, r.Col3 AS Col3 + FROM `/Root/RowSrc`as c + LEFT OUTER JOIN $prepare as r + ON c.Col1 + 10 = r.Col3; + UPSERT INTO `/Root/ColumnDst` SELECT * FROM $data; + REPLACE INTO `/Root/RowDst` SELECT * FROM $data; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( + SELECT COUNT(*) FROM `/Root/ColumnDst`; + SELECT COUNT(*) FROM `/Root/RowDst`; + DELETE FROM `/Root/ColumnDst` WHERE 1=1; + DELETE FROM `/Root/RowDst` WHERE 1=1; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(1))); + } } Y_UNIT_TEST_TWIN(TableSink_HtapInteractive, withOltpSink) { From 507c93184c3b9467ceed792713c00635978c89d3 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Tue, 17 Sep 2024 17:48:29 +0300 Subject: [PATCH 2/2] fix --- .../kqp/executer_actor/kqp_data_executer.cpp | 4 +- ydb/core/kqp/opt/kqp_opt_build_txs.cpp | 98 +++++++++++++++++-- .../kqp/session_actor/kqp_session_actor.cpp | 2 + ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 8 +- 4 files changed, 100 insertions(+), 12 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 51602ffddcd5..74d0bfda9b68 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2564,8 +2564,8 @@ class TKqpDataExecuter : public TKqpExecuterBasePhysicalTxs.emplace_back(tx.Cast()); + if (!CheckEffectsTx(tx.Cast(), query, ctx)) { + return TStatus::Error; + } + + BuildCtx->PhysicalTxs.emplace_back(tx.Cast()); + } } return TStatus::Ok; @@ -581,6 +585,84 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase { } private: + TVector CollectEffects(const TExprList& list, TExprContext& ctx) { + struct TEffectsInfo { + enum class EType { + KQP_EFFECT, + KQP_SINK, + EXTERNAL_SINK, + }; + + EType Type; + THashSet TablesPathIds; + TVector Exprs; + }; + TVector effectsInfos; + + for (const auto& expr : list) { + if (auto sinkEffect = expr.Maybe()) { + const size_t sinkIndex = FromString(TStringBuf(sinkEffect.Cast().SinkIndex())); + const auto stage = sinkEffect.Cast().Stage().Maybe(); + YQL_ENSURE(stage); + YQL_ENSURE(stage.Cast().Outputs()); + const auto outputs = stage.Cast().Outputs().Cast(); + YQL_ENSURE(sinkIndex < outputs.Size()); + const auto sink = outputs.Item(sinkIndex).Maybe(); + YQL_ENSURE(sink); + + const auto sinkSettings = sink.Cast().Settings().Maybe(); + if (!sinkSettings) { + // External writes always use their own physical transaction. + effectsInfos.emplace_back(); + effectsInfos.back().Type = TEffectsInfo::EType::EXTERNAL_SINK; + effectsInfos.back().Exprs.push_back(expr.Ptr()); + } else { + // Two table sinks can't be executed in one physical transaction if they write into one table. + const TStringBuf tablePathId = sinkSettings.Cast().Table().PathId().Value(); + + auto it = std::find_if( + std::begin(effectsInfos), + std::end(effectsInfos), + [&tablePathId](const auto& effectsInfo) { + return effectsInfo.Type == TEffectsInfo::EType::KQP_SINK + && !effectsInfo.TablesPathIds.contains(tablePathId); + }); + if (it == std::end(effectsInfos)) { + effectsInfos.emplace_back(); + it = std::prev(std::end(effectsInfos)); + it->Type = TEffectsInfo::EType::KQP_SINK; + } + it->TablesPathIds.insert(tablePathId); + it->Exprs.push_back(expr.Ptr()); + } + } else { + // Table effects are executed all in one physical transaction. + auto it = std::find_if( + std::begin(effectsInfos), + std::end(effectsInfos), + [](const auto& effectsInfo) { return effectsInfo.Type == TEffectsInfo::EType::KQP_EFFECT; }); + if (it == std::end(effectsInfos)) { + effectsInfos.emplace_back(); + it = std::prev(std::end(effectsInfos)); + it->Type = TEffectsInfo::EType::KQP_EFFECT; + } + it->Exprs.push_back(expr.Ptr()); + } + } + + TVector results; + + for (const auto& effects : effectsInfos) { + auto builder = Build(ctx, list.Pos()); + for (const auto& expr : effects.Exprs) { + builder.Add(expr); + } + results.push_back(builder.Done()); + } + + return results; + } + bool HasTableEffects(const TKqlQuery& query) const { for (const TExprBase& effect : query.Effects()) { if (auto maybeSinkEffect = effect.Maybe()) { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 714f01bb3930..00f96a6155ef 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -664,6 +664,8 @@ class TKqpSessionActor : public TActorBootstrapped { return; } + Cerr << "COMPILED " << QueryState->CompileResult->PreparedQuery->GetPhysicalQuery().GetQueryAst() << Endl; + Become(&TKqpSessionActor::ExecuteState); QueryState->TxCtx->OnBeginQuery(); diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index e4c69d31b48e..f54d6dbe3b07 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -3283,9 +3283,13 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { { auto result = client.ExecuteQuery(R"( UPSERT INTO `/Root/ColumnSrc` (Col1, Col2, Col3) VALUES - (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, "test", 13); + (1u, "test1", 10), (2u, "test2", 11); + REPLACE INTO `/Root/ColumnSrc` (Col1, Col2, Col3) VALUES + (3u, "test3", 12), (4u, "test", 13); UPSERT INTO `/Root/RowSrc` (Col1, Col2, Col3) VALUES - (10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, "test", 13); + (10u, "test1", 10), (20u, "test2", 11); + REPLACE INTO `/Root/RowSrc` (Col1, Col2, Col3) VALUES + (30u, "test3", 12), (40u, "test", 13); )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); }