From e6d795feedd3fd72c7d5e43bb3f32188bbbbbafc Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Mon, 16 Sep 2024 12:51:05 +0300 Subject: [PATCH] HTAP: ProposeTx + EvWrite (#9236) --- ydb/core/kqp/common/kqp_tx.h | 4 +- .../kqp/executer_actor/kqp_data_executer.cpp | 52 ++-- ydb/core/kqp/executer_actor/kqp_executer.h | 5 +- .../kqp/executer_actor/kqp_executer_impl.cpp | 17 +- .../kqp/executer_actor/kqp_executer_impl.h | 5 +- ydb/core/kqp/session_actor/kqp_query_state.h | 5 + .../kqp/session_actor/kqp_session_actor.cpp | 21 +- ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 236 +++++++++++++++++- ydb/core/protos/table_service_config.proto | 2 +- 9 files changed, 282 insertions(+), 65 deletions(-) diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h index 23dc0069576b..b3aa2106954c 100644 --- a/ydb/core/kqp/common/kqp_tx.h +++ b/ydb/core/kqp/common/kqp_tx.h @@ -268,7 +268,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { } bool ShouldExecuteDeferredEffects() const { - if (HasUncommittedChangesRead) { + if (HasUncommittedChangesRead || HasOlapTable) { return !DeferredEffects.Empty(); } @@ -297,7 +297,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { } bool CanDeferEffects() const { - if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { + if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) { return false; } diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index f129d4adfb3a..33aa5f2966f8 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -127,16 +127,16 @@ class TKqpDataExecuter : public TKqpExecuterBase& userRequestContext, - const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, - const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx) + ui32 statementResultIndex, const std::optional& federatedQuerySetup, + const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo) : TBase(std::move(request), database, userToken, counters, tableServiceConfig, userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult) , AsyncIoFactory(std::move(asyncIoFactory)) - , UseEvWrite(useEvWrite) + , UseEvWriteForOltp(tableServiceConfig.GetEnableOltpSink()) + , HtapTx(tableServiceConfig.GetEnableHtapTx()) , FederatedQuerySetup(federatedQuerySetup) , GUCSettings(GUCSettings) , ShardIdToTableInfo(shardIdToTableInfo) - , HtapTx(htapTx) { Target = creator; @@ -1487,7 +1487,7 @@ class TKqpDataExecuter : public TKqpExecuterBase TTask& { - YQL_ENSURE(!UseEvWrite); + YQL_ENSURE(!UseEvWriteForOltp); auto it = shardTasks.find(shardId); if (it != shardTasks.end()) { return TasksGraph.GetTask(it->second); @@ -1627,7 +1627,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet(shardId).IsOlap) { if (auto it = evWriteTxs.find(shardId); it != evWriteTxs.end()) { locks = it->second->MutableLocks(); } else { @@ -2333,7 +2332,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseMutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit); - *shardTx->MutableLocks()->MutableSendingShards() = sendingShards; - *shardTx->MutableLocks()->MutableReceivingShards() = receivingShards; - if (arbiter) { - shardTx->MutableLocks()->SetArbiterShard(arbiter); + if (columnShardArbiter) { + shardTx->MutableLocks()->AddSendingShards(*columnShardArbiter); + shardTx->MutableLocks()->AddReceivingShards(*columnShardArbiter); + if (sendingShardsSet.contains(shardId)) { + shardTx->MutableLocks()->AddSendingShards(shardId); + } + if (receivingShardsSet.contains(shardId)) { + shardTx->MutableLocks()->AddReceivingShards(shardId); + } + AFL_ENSURE(!arbiter); + } else { + *shardTx->MutableLocks()->MutableSendingShards() = sendingShards; + *shardTx->MutableLocks()->MutableReceivingShards() = receivingShards; + if (arbiter) { + shardTx->MutableLocks()->SetArbiterShard(arbiter); + } } } @@ -2844,11 +2856,11 @@ class TKqpDataExecuter : public TKqpExecuterBase FederatedQuerySetup; const TGUCSettings::TPtr GUCSettings; TShardIdToTableInfoPtr ShardIdToTableInfo; - const bool HtapTx = false; bool HasExternalSources = false; bool SecretSnapshotRequired = false; @@ -2890,13 +2902,13 @@ class TKqpDataExecuter : public TKqpExecuterBase& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator, - const TIntrusivePtr& userRequestContext, const bool useEvWrite, ui32 statementResultIndex, + const TIntrusivePtr& userRequestContext, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, - const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx) + const TShardIdToTableInfoPtr& shardIdToTableInfo) { return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, tableServiceConfig, std::move(asyncIoFactory), creator, userRequestContext, - useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx); + statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index abd951df2818..3a841ebf01f3 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -95,10 +95,9 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator, - const TIntrusivePtr& userRequestContext, - const bool useEvWrite, ui32 statementResultIndex, + const TIntrusivePtr& userRequestContext, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, - const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx); + const TShardIdToTableInfoPtr& shardIdToTableInfo); IActor* CreateKqpSchemeExecuter( TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index be86fbf79e59..888e81998bdd 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -80,18 +80,17 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator, - const TIntrusivePtr& userRequestContext, - const bool useEvWrite, ui32 statementResultIndex, + const TIntrusivePtr& userRequestContext, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, - const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx) + const TShardIdToTableInfoPtr& shardIdToTableInfo) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction return CreateKqpDataExecuter( std::move(request), database, userToken, counters, false, tableServiceConfig, std::move(asyncIoFactory), creator, - userRequestContext, useEvWrite, statementResultIndex, - federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx + userRequestContext, statementResultIndex, + federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo ); } @@ -113,8 +112,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt return CreateKqpDataExecuter( std::move(request), database, userToken, counters, false, tableServiceConfig, std::move(asyncIoFactory), creator, - userRequestContext, useEvWrite, statementResultIndex, - federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx + userRequestContext, statementResultIndex, + federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo ); case NKqpProto::TKqpPhyTx::TYPE_SCAN: @@ -128,8 +127,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt return CreateKqpDataExecuter( std::move(request), database, userToken, counters, true, tableServiceConfig, std::move(asyncIoFactory), creator, - userRequestContext, useEvWrite, statementResultIndex, - federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx + userRequestContext, statementResultIndex, + federatedQuerySetup, GUCSettings, shardIdToTableInfo ); default: diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index fc6a84c3f659..e320138f19e0 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -2031,10 +2031,9 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator, - const TIntrusivePtr& userRequestContext, - const bool useEvWrite, ui32 statementResultIndex, + const TIntrusivePtr& userRequestContext, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, - const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx); + const TShardIdToTableInfoPtr& shardIdToTableInfo); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 0a614cf8c7e0..7a6befd64f0e 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -349,6 +349,11 @@ class TKqpQueryState : public TNonCopyable { return false; } + if (TxCtx->HasOlapTable) { + // HTAP/OLAP transactions always use separate commit. + return false; + } + if (TxCtx->HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { if (tx && tx->GetHasEffects()) { YQL_ENSURE(tx->ResultsSize() == 0); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index ed254470e2d4..714f01bb3930 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1233,7 +1233,7 @@ class TKqpSessionActor : public TActorBootstrapped { } else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) { request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); - if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { + if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution() || txCtx.HasOlapTable) { request.UseImmediateEffects = true; } } @@ -1292,29 +1292,12 @@ class TKqpSessionActor : public TActorBootstrapped { request.ResourceManager_ = ResourceManager_; LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize()); - bool useEvWrite = ( - (txCtx->HasOlapTable // olap only - && !txCtx->HasOltpTable - && Settings.TableService.GetEnableOlapSink()) - || (txCtx->HasOltpTable // oltp only - && !txCtx->HasOlapTable - && Settings.TableService.GetEnableOltpSink()) - || (txCtx->HasOlapTable // htap - && txCtx->HasOltpTable - && Settings.TableService.GetEnableOlapSink() - && Settings.TableService.GetEnableHtapTx())) - && (request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_UNDEFINED - || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY - || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY - || (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML) - || (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML)); auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, QueryState ? QueryState->UserToken : TIntrusiveConstPtr(), RequestCounters, Settings.TableService, AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(), QueryState ? QueryState->UserRequestContext : MakeIntrusive("", Settings.Database, SessionId), - useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo, - Settings.TableService.GetEnableHtapTx()); + QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 0af07b0f5a2c..6ab1792f11a8 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -3037,10 +3037,10 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } - Y_UNIT_TEST(TableSink_Htap) { + Y_UNIT_TEST_TWIN(TableSink_Htap, withOltpSink) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); - appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withOltpSink); appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true); auto settings = TKikimrSettings() .SetAppConfig(appConfig) @@ -3053,7 +3053,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { const TString query = R"( CREATE TABLE `/Root/ColumnShard` ( Col1 Uint64 NOT NULL, - Col2 String, + Col2 String NOT NULL, Col3 Int32 NOT NULL, PRIMARY KEY (Col1) ) @@ -3062,7 +3062,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { CREATE TABLE `/Root/DataShard` ( Col1 Uint64 NOT NULL, - Col2 String, + Col2 String NOT NULL, Col3 Int32 NOT NULL, PRIMARY KEY (Col1) ) @@ -3073,18 +3073,238 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); auto client = kikimr.GetQueryClient(); + { - auto prepareResult = client.ExecuteQuery(R"( + auto result = client.ExecuteQuery(R"( UPSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES - (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, "test", 13); UPSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES - (10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, NULL, 13); + (10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, "test", 13); INSERT INTO `/Root/ColumnShard` SELECT * FROM `/Root/DataShard`; REPLACE INTO `/Root/DataShard` SELECT * FROM `/Root/ColumnShard`; + SELECT * FROM `/Root/ColumnShard` ORDER BY Col1; + SELECT * FROM `/Root/DataShard` ORDER BY Col1; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[1u;"test1";10];[2u;"test2";11];[3u;"test3";12];[4u;"test";13];[10u;"test1";10];[20u;"test2";11];[30u;"test3";12];[40u;"test";13]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[1u;"test1";10];[2u;"test2";11];[3u;"test3";12];[4u;"test";13];[10u;"test1";10];[20u;"test2";11];[30u;"test3";12];[40u;"test";13]])", FormatResultSetYson(result.GetResultSet(1))); + } + + { + auto result = client.ExecuteQuery(R"( + UPSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES (1u, "test", 0); + UPSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES (2u, "test", 0); + UPSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES (1u, "test", 0); + UPSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES (2u, "test", 0); + UPSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES (3u, "test", 0); + UPSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES (4u, "test", 0); + UPSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES (3u, "test", 0); + UPSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES (4u, "test", 0); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( + SELECT COUNT(*) FROM `/Root/DataShard` WHERE Col3 = 0; + SELECT COUNT(*) FROM `/Root/ColumnShard` WHERE Col3 = 0; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(1))); + } + + { + auto result = client.ExecuteQuery(R"( + UPSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES (1u, "test", 1); + UPSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES (2u, "test", 1); + UPSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES (1u, "test", 1); + UPSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES (2u, "test", 1); + UPSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES (3u, "test", 1); + UPSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES (4u, "test", 1); + UPSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES (3u, "test", 1); + UPSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES (4u, "test", 1); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( + SELECT COUNT(*) FROM `/Root/DataShard` WHERE Col3 = 1; + SELECT COUNT(*) FROM `/Root/ColumnShard` WHERE Col3 = 1; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(1))); + } + + { + auto result = client.ExecuteQuery(R"( + INSERT INTO `/Root/DataShard` SELECT Col1 + 100 AS Col1, Col2, Col3 + 100 AS Col3 FROM `/Root/ColumnShard` WHERE Col3 = 1; + INSERT INTO `/Root/ColumnShard` SELECT Col1 + 100 AS Col1, Col2, Col3 + 100 AS Col3 FROM `/Root/DataShard` WHERE Col3 = 1; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( + SELECT COUNT(*) FROM `/Root/DataShard` WHERE Col3 = 101; + SELECT COUNT(*) FROM `/Root/ColumnShard` WHERE Col3 = 101; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(1))); + } + + { + auto result = client.ExecuteQuery(R"( + INSERT INTO `/Root/ColumnShard` SELECT Col1 + 1000 AS Col1, Col2, Col3 + 1000 AS Col3 FROM `/Root/DataShard` WHERE Col3 = 1; + INSERT INTO `/Root/DataShard` SELECT Col1 + 1000 AS Col1, Col2, Col3 + 1000 AS Col3 FROM `/Root/ColumnShard` WHERE Col3 = 1; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( + SELECT COUNT(*) FROM `/Root/DataShard` WHERE Col3 = 1001; + SELECT COUNT(*) FROM `/Root/ColumnShard` WHERE Col3 = 1001; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(1))); + } + + /*{ + auto result = client.ExecuteQuery(R"( + DELETE FROM `/Root/ColumnShard` ON SELECT * FROM `/Root/DataShard` WHERE Col1 > 9; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( + DELETE FROM `/Root/DataShard` ON SELECT Col1 FROM `/Root/ColumnShard`; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( + SELECT * FROM `/Root/DataShard`; SELECT * FROM `/Root/ColumnShard`; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(1))); + } + + { + auto result = client.ExecuteQuery(R"( + DELETE FROM `/Root/DataShard` WHERE Col2 != "not found"; + DELETE FROM `/Root/ColumnShard` WHERE Col2 != "not found"; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( SELECT * FROM `/Root/DataShard`; + SELECT * FROM `/Root/ColumnShard`; )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(1))); + } + + { + auto result = client.ExecuteQuery(R"( + DELETE FROM `/Root/DataShard` WHERE Col2 = "not found"; + DELETE FROM `/Root/ColumnShard` WHERE Col2 = "not found"; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + }*/ + } + + Y_UNIT_TEST_TWIN(TableSink_HtapInteractive, withOltpSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withOltpSink); + appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + + const TString query = R"( + CREATE TABLE `/Root/ColumnShard` ( + Col1 Uint64 NOT NULL, + Col2 String NOT NULL, + Col3 Int32 NOT NULL, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10); + + CREATE TABLE `/Root/DataShard` ( + Col1 Uint64 NOT NULL, + Col2 String NOT NULL, + Col3 Int32 NOT NULL, + PRIMARY KEY (Col1) + ) + WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto client = kikimr.GetQueryClient(); + + { + auto session = client.GetSession().GetValueSync().GetSession(); + auto result = session.ExecuteQuery(R"( + INSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES + (1u, "test1", 1); + )", NYdb::NQuery::TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + auto tx = result.GetTransaction(); + + result = session.ExecuteQuery(R"( + INSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES + (2u, "test2", 2); + )", NYdb::NQuery::TTxControl::Tx(tx->GetId()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto session = client.GetSession().GetValueSync().GetSession(); + auto result = session.ExecuteQuery(R"( + INSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES + (3u, "test1", 3); + )", NYdb::NQuery::TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + auto tx = result.GetTransaction(); + + result = session.ExecuteQuery(R"( + INSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES + (4u, "test2", 4); + )", NYdb::NQuery::TTxControl::Tx(tx->GetId()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( + SELECT Col3 FROM `/Root/ColumnShard` + UNION + SELECT Col3 FROM `/Root/DataShard` + ORDER BY Col3; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[1];[2];[3];[4]])", FormatResultSetYson(result.GetResultSet(0))); } } diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index af1c763b6096..efaffd7a6e58 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -322,5 +322,5 @@ message TTableServiceConfig { optional bool EnableRowsDuplicationCheck = 69 [ default = false ]; - optional bool EnableHtapTx = 71 [default = false]; + optional bool EnableHtapTx = 71 [default = true]; };