From 485265ccb3ec46b73ba0dd40d2404bd86ecbd6f5 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Thu, 29 Aug 2024 12:41:28 +0300 Subject: [PATCH] Sink locks tests + some locks fixes (#8420) --- .github/config/muted_ya.txt | 1 + .../kqp/executer_actor/kqp_data_executer.cpp | 37 +- ydb/core/kqp/runtime/kqp_write_actor.cpp | 8 +- ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp | 12 +- ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp | 40 +- ydb/core/kqp/ut/tx/kqp_sink_common.h | 109 ++++++ ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp | 300 +++++++++++++++ ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp | 274 ++++++++++++++ ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp | 349 ++++++++++++++++++ ydb/core/kqp/ut/tx/ya.make | 4 + 10 files changed, 1116 insertions(+), 18 deletions(-) create mode 100644 ydb/core/kqp/ut/tx/kqp_sink_common.h create mode 100644 ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp create mode 100644 ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp create mode 100644 ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index a3c51ad0abee..e0d9aba7ff9d 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -24,6 +24,7 @@ ydb/core/kqp/ut/service [*/*]* ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad +ydb/core/kqp/ut/tx KqpSnapshotRead.ReadOnlyTxWithIndexCommitsOnConcurrentWrite+withSink ydb/core/persqueue/ut [*/*]* ydb/core/persqueue/ut TPQTest.*DirectRead* ydb/core/persqueue/ut/ut_with_sdk [*/*]* diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 2f97f0479556..d479dbb367df 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1151,6 +1151,36 @@ class TKqpDataExecuter : public TKqpExecuterBaseTxProxyMon->ResultsReceivedCount->Inc(); Counters->TxProxyMon->TxResultComplete->Inc(); + CheckExecutionComplete(); + return; + } + case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: { + LOG_D("Broken locks: " << res->Record.DebugString()); + YQL_ENSURE(shardState->State == TShardState::EState::Executing); + shardState->State = TShardState::EState::Finished; + Counters->TxProxyMon->TxResultAborted->Inc(); + LocksBroken = true; + + TMaybe tableName; + if (!res->Record.GetTxLocks().empty()) { + auto& lock = res->Record.GetTxLocks(0); + auto tableId = TTableId(lock.GetSchemeShard(), lock.GetPathId()); + auto it = FindIf(TasksGraph.GetStagesInfo(), [tableId](const auto& x){ return x.second.Meta.TableId.HasSamePath(tableId); }); + if (it != TasksGraph.GetStagesInfo().end()) { + tableName = it->second.Meta.TableConstInfo->Path; + } + } + + // Reply as soon as we know which table had locks invalidated + if (tableName) { + auto message = TStringBuilder() + << "Transaction locks invalidated. Table: " << *tableName; + + return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, + YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message)); + } + + CheckExecutionComplete(); return; } @@ -1673,18 +1703,15 @@ class TKqpDataExecuter : public TKqpExecuterBase(); evWriteTransaction->Record = evWrite; - evWriteTransaction->Record.SetTxMode(NKikimrDataEvents::TEvWrite::MODE_PREPARE); + evWriteTransaction->Record.SetTxMode(ImmediateTx ? NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE : NKikimrDataEvents::TEvWrite::MODE_PREPARE); evWriteTransaction->Record.SetTxId(TxId); - evWriteTransaction->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit); - auto locksCount = evWriteTransaction->Record.GetLocks().LocksSize(); shardState.DatashardState->ShardReadLocks = locksCount > 0; diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 3d305c363694..eb43a02a2bb8 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -525,7 +525,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu OnMessageAcknowledged(ev->Get()->Record.GetOrigin(), ev->Cookie); for (const auto& lock : ev->Get()->Record.GetTxLocks()) { - LocksInfo[ev->Get()->Record.GetOrigin()].AddAndCheckLock(lock); + if (!LocksInfo[ev->Get()->Record.GetOrigin()].AddAndCheckLock(lock)) { + RuntimeError( + TStringBuilder() << "Got LOCKS BROKEN for table `" + << SchemeEntry->TableId.PathId.ToString() << "`.", + NYql::NDqProto::StatusIds::ABORTED, + NYql::TIssues{}); + } } ProcessBatches(); diff --git a/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp b/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp index 780fdf977c3c..9ac7d3fabee7 100644 --- a/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp @@ -29,9 +29,13 @@ using NYql::TExprNode; Y_UNIT_TEST_SUITE(KqpLocksTricky) { - Y_UNIT_TEST(TestNoLocksIssue) { + Y_UNIT_TEST_TWIN(TestNoLocksIssue, withSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink); + auto setting = NKikimrKqp::TKqpSetting(); TKikimrSettings settings; + settings.SetAppConfig(appConfig); settings.SetUseRealThreads(false); TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); @@ -123,9 +127,13 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) { } } - Y_UNIT_TEST(TestNoLocksIssueInteractiveTx) { + Y_UNIT_TEST_TWIN(TestNoLocksIssueInteractiveTx, withSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink); + auto setting = NKikimrKqp::TKqpSetting(); TKikimrSettings settings; + settings.SetAppConfig(appConfig); settings.SetUseRealThreads(false); TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); diff --git a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp index a39c839e6ee2..f4eb9ee7a9ce 100644 --- a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp @@ -9,9 +9,12 @@ using namespace NYdb; using namespace NYdb::NTable; Y_UNIT_TEST_SUITE(KqpSnapshotRead) { - Y_UNIT_TEST(TestSnapshotExpiration) { + Y_UNIT_TEST_TWIN(TestSnapshotExpiration, withSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink); auto settings = TKikimrSettings() - .SetKeepSnapshotTimeout(TDuration::Seconds(1)); + .SetKeepSnapshotTimeout(TDuration::Seconds(1)) + .SetAppConfig(appConfig); TKikimrRunner kikimr(settings); @@ -63,8 +66,9 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { UNIT_ASSERT_C(caught, "Failed to wait for snapshot expiration."); } - Y_UNIT_TEST(ReadOnlyTxCommitsOnConcurrentWrite) { + Y_UNIT_TEST_TWIN(ReadOnlyTxCommitsOnConcurrentWrite, withSink) { NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink); appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); TKikimrRunner kikimr(TKikimrSettings() .SetAppConfig(appConfig) @@ -125,8 +129,13 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { ])", FormatResultSetYson(result.GetResultSet(0))); } - Y_UNIT_TEST(ReadOnlyTxWithIndexCommitsOnConcurrentWrite) { - TKikimrRunner kikimr; + Y_UNIT_TEST_TWIN(ReadOnlyTxWithIndexCommitsOnConcurrentWrite, withSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink); + TKikimrRunner kikimr( + TKikimrSettings() + .SetAppConfig(appConfig) + ); // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG); // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_DEBUG); @@ -186,8 +195,13 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { ])", FormatResultSetYson(result.GetResultSet(0))); } - Y_UNIT_TEST(ReadWriteTxFailsOnConcurrentWrite1) { - TKikimrRunner kikimr; + Y_UNIT_TEST_TWIN(ReadWriteTxFailsOnConcurrentWrite1, withSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink); + TKikimrRunner kikimr( + TKikimrSettings() + .SetAppConfig(appConfig) + ); // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG); // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_DEBUG); @@ -223,8 +237,13 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString()); } - Y_UNIT_TEST(ReadWriteTxFailsOnConcurrentWrite2) { - TKikimrRunner kikimr; + Y_UNIT_TEST_TWIN(ReadWriteTxFailsOnConcurrentWrite2, withSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink); + TKikimrRunner kikimr( + TKikimrSettings() + .SetAppConfig(appConfig) + ); // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG); // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_DEBUG); @@ -266,8 +285,9 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString()); } - Y_UNIT_TEST(ReadWriteTxFailsOnConcurrentWrite3) { + Y_UNIT_TEST_TWIN(ReadWriteTxFailsOnConcurrentWrite3, withSink) { NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink); appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); TKikimrRunner kikimr( TKikimrSettings() diff --git a/ydb/core/kqp/ut/tx/kqp_sink_common.h b/ydb/core/kqp/ut/tx/kqp_sink_common.h new file mode 100644 index 000000000000..674720f82ebc --- /dev/null +++ b/ydb/core/kqp/ut/tx/kqp_sink_common.h @@ -0,0 +1,109 @@ +#pragma once + +#include +#include +#include +#include + + +namespace NKikimr { +namespace NKqp { + +using namespace NYdb; +using namespace NYdb::NQuery; + +class TTableDataModificationTester { +protected: + NKikimrConfig::TAppConfig AppConfig; + std::unique_ptr Kikimr; + YDB_ACCESSOR(bool, IsOlap, false); + YDB_ACCESSOR(bool, FastSnapshotExpiration, false); + + virtual void DoExecute() = 0; +public: + void Execute() { + AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + AppConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); + auto settings = TKikimrSettings().SetAppConfig(AppConfig).SetWithSampleTables(false); + if (FastSnapshotExpiration) { + settings.SetKeepSnapshotTimeout(TDuration::Seconds(1)); + } + + Kikimr = std::make_unique(settings); + Tests::NCommon::TLoggerInit(*Kikimr).Initialize(); + + auto client = Kikimr->GetQueryClient(); + + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1)); + csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1)); + csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation); + + { + auto type = IsOlap ? "COLUMN" : "ROW"; + auto result = client.ExecuteQuery(Sprintf(R"( + CREATE TABLE `/Root/Test` ( + Group Uint32, + Name String, + Amount Uint64, + Comment String, + PRIMARY KEY (Group, Name) + ) WITH ( + STORE = %s, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10 + ); + + CREATE TABLE `/Root/KV` ( + Key Uint32, + Value String, + PRIMARY KEY (Key) + ) WITH ( + STORE = %s, + AUTO_PARTITIONING_BY_SIZE = DISABLED, + AUTO_PARTITIONING_BY_LOAD = DISABLED, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 100, + UNIFORM_PARTITIONS = 100 + ); + + CREATE TABLE `/Root/KV2` ( + Key Uint32, + Value String, + PRIMARY KEY (Key) + ) WITH ( + STORE = %s, + AUTO_PARTITIONING_BY_SIZE = DISABLED, + AUTO_PARTITIONING_BY_LOAD = DISABLED, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 100, + UNIFORM_PARTITIONS = 100 + ); + )", type, type, type), TTxControl::NoTx()).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery(R"( + REPLACE INTO `Test` (Group, Name, Amount, Comment) VALUES + (1u, "Anna", 3500ul, "None"), + (1u, "Paul", 300ul, "None"), + (2u, "Tony", 7200ul, "None"); + REPLACE INTO `KV` (Key, Value) VALUES + (1u, "One"), + (2u, "Two"), + (3u, "Three"), + (4000000001u, "BigOne"), + (4000000002u, "BigTwo"), + (4000000003u, "BigThree"); + )", TTxControl::NoTx()).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + DoExecute(); + csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation); + csController->WaitIndexation(TDuration::Seconds(5)); + } + +}; + +} +} diff --git a/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp b/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp new file mode 100644 index 000000000000..928aceff6642 --- /dev/null +++ b/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp @@ -0,0 +1,300 @@ +#include "kqp_sink_common.h" + +#include +#include +#include +#include + +namespace NKikimr { +namespace NKqp { + +using namespace NYdb; +using namespace NYdb::NQuery; + +Y_UNIT_TEST_SUITE(KqpSinkLocks) { + class TInvalidate : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + auto session2 = client.GetSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` + SELECT Group + 10U AS Group, Name, Amount, Comment ?? "" || "Updated" AS Comment + FROM `/Root/Test` + WHERE Group == 1U AND Name == "Paul"; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + + result = session2.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` (Group, Name, Comment) + VALUES (1U, "Paul", "Changed"); + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session1.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` (Group, Name, Comment) + VALUES (11U, "Sergey", "BadRow"); + )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + })); + + result = session2.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[300u];["Changed"];[1u];["Paul"]]])", FormatResultSetYson(result.GetResultSet(0))); + } + }; + + Y_UNIT_TEST(TInvalidate) { + TInvalidate tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + class TInvalidateOnCommit : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + auto session2 = client.GetSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` + SELECT Group + 10U AS Group, Name, Amount, Comment ?? "" || "Updated" AS Comment + FROM `/Root/Test` + WHERE Group == 1U AND Name == "Paul"; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + + result = session2.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` (Group, Name, Comment) + VALUES (1U, "Paul", "Changed"); + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto commitResult = tx1->Commit().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString()); + commitResult.GetIssues().PrintTo(Cerr); + // TODO: + //UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + // [] (const NYql::TIssue& issue) { + // Y_UNUSED(issue); + // return issue.GetMessage().Contains("/Root/Test"); + // return true; + // }), commitResult.GetIssues().ToString()); + + result = session2.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[300u];["Changed"];[1u];["Paul"]]])", FormatResultSetYson(result.GetResultSet(0))); + } + }; + + Y_UNIT_TEST(InvalidateOnCommit) { + TInvalidateOnCommit tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + + class TDifferentKeyUpdate : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + auto session2 = client.GetSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/Test` WHERE Group = 1; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync(); + UNIT_ASSERT(result.IsSuccess()); + + auto tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + + result = session2.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` (Group, Name, Comment) + VALUES (2U, "Paul", "Changed"); + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT(result.IsSuccess()); + + result = session1.ExecuteQuery(Q_(R"( + SELECT "Nothing"; + )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + }; + + Y_UNIT_TEST(DifferentKeyUpdate) { + TDifferentKeyUpdate tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + class TEmptyRange : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + auto session2 = client.GetSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 11; + )"), TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + + auto tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + + result = session2.ExecuteQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 11; + + UPSERT INTO Test (Group, Name, Amount) VALUES + (11, "Session2", 2); + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + + result = session1.ExecuteQuery(Q1_(R"( + UPSERT INTO Test (Group, Name, Amount) VALUES + (11, "Session1", 1); + )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + })); + + result = session1.ExecuteQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 11; + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[2u];#;[11u];["Session2"]]])", FormatResultSetYson(result.GetResultSet(0))); + } + }; + + Y_UNIT_TEST(EmptyRange) { + TEmptyRange tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + class TEmptyRangeAlreadyBroken : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + auto session2 = client.GetSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 10; + )"), TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + + auto tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + + result = session2.ExecuteQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 11; + + UPSERT INTO Test (Group, Name, Amount) VALUES + (11, "Session2", 2); + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + + result = session1.ExecuteQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 11; + + UPSERT INTO Test (Group, Name, Amount) VALUES + (11, "Session1", 1); + )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED)); + + UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + })); + + result = session1.ExecuteQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 11; + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[2u];#;[11u];["Session2"]]])", FormatResultSetYson(result.GetResultSet(0))); + } + }; + + Y_UNIT_TEST(EmptyRangeAlreadyBroken) { + TEmptyRangeAlreadyBroken tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + class TUncommittedRead : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + auto session2 = client.GetSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteQuery(Q1_(R"( + UPSERT INTO Test (Group, Name, Amount) VALUES + (11, "TEST", 2); + )"), TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + + { + result = session2.ExecuteQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 11; + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + } + + { + result = session1.ExecuteQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 11; + )"), TTxControl::Tx(tx1->GetId())).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[2u];#;[11u];["TEST"]]])", FormatResultSetYson(result.GetResultSet(0))); + } + } + }; + + Y_UNIT_TEST(UncommittedRead) { + TUncommittedRead tester; + tester.SetIsOlap(false); + tester.Execute(); + } +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp new file mode 100644 index 000000000000..1a2240b849d5 --- /dev/null +++ b/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp @@ -0,0 +1,274 @@ +#include "kqp_sink_common.h" + +#include +#include +#include +#include + +namespace NKikimr { +namespace NKqp { + +using namespace NYdb; +using namespace NYdb::NQuery; + +Y_UNIT_TEST_SUITE(KqpSinkMvcc) { + class TSnapshotExpiration : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + auto session2 = client.GetSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/KV` WHERE Key = 1u OR Key = 4000000001u ORDER BY Key; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[4000000001u];["BigOne"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + + auto tx = result.GetTransaction(); + + result = session2.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "ChangedOne"); + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto deadline = TInstant::Now() + TDuration::Seconds(30); + auto caught = false; + do { + Sleep(TDuration::Seconds(1)); + auto result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/KV` WHERE Key = 1u OR Key = 4000000001u; + )"), TTxControl::Tx(tx->GetId())).ExtractValueSync(); + if (result.GetStatus() == EStatus::SUCCESS) + continue; + + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR, + [](const NYql::TIssue& issue){ + return issue.GetMessage().Contains("has no snapshot at"); + }), result.GetIssues().ToString()); + + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED); + + caught = true; + break; + } while (TInstant::Now() < deadline); + UNIT_ASSERT_C(caught, "Failed to wait for snapshot expiration."); + } + }; + + Y_UNIT_TEST(SnapshotExpiration) { + TSnapshotExpiration tester; + tester.SetFastSnapshotExpiration(true); + tester.SetIsOlap(false); + tester.Execute(); + } + + class TReadOnlyTxCommitsOnConcurrentWrite : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + auto session2 = client.GetSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/KV` WHERE Key = 1u OR Key = 4000000001u ORDER BY Key; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync(); + + auto tx = result.GetTransaction(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[4000000001u];["BigOne"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + + result = session2.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "ChangedOne"); + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session2.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/KV` WHERE Key = 1u; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["ChangedOne"]]; + ])", FormatResultSetYson(result.GetResultSet(0))); + + result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/KV` WHERE Key = 1u; + )"), TTxControl::Tx(tx->GetId())).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + ])", FormatResultSetYson(result.GetResultSet(0))); + + result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/KV` WHERE Key = 2u OR Key = 4000000002u ORDER BY Key; + )"), TTxControl::Tx(tx->GetId()).CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[2u];["Two"]]; + [[4000000002u];["BigTwo"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + }; + + Y_UNIT_TEST(ReadOnlyTxCommitsOnConcurrentWrite) { + TReadOnlyTxCommitsOnConcurrentWrite tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + class TReadWriteTxFailsOnConcurrentWrite1 : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + auto session2 = client.GetSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/KV` WHERE Key = 1u OR Key = 4000000001u ORDER BY Key; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync(); + + auto tx = result.GetTransaction(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[4000000001u];["BigOne"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + + result = session2.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "ChangedOne"); + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session1.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "TwiceChangedOne"); + )"), TTxControl::Tx(tx->GetId()).CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString()); + } + }; + + Y_UNIT_TEST(ReadWriteTxFailsOnConcurrentWrite1) { + TReadWriteTxFailsOnConcurrentWrite1 tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + class TReadWriteTxFailsOnConcurrentWrite2 : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + auto session2 = client.GetSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/KV` WHERE Key = 1u OR Key = 4000000001u ORDER BY Key; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync(); + + auto tx = result.GetTransaction(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[4000000001u];["BigOne"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + + // We need to sleep before the upsert below, otherwise writes + // might happen in the same step as the snapshot, which would be + // treated as happening before snapshot and will not break any locks. + Sleep(TDuration::Seconds(2)); + + result = session2.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/KV2` (Key, Value) VALUES (101u, "SomeText"); + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session1.ExecuteQuery(Q_(R"( + UPDATE `/Root/KV` SET Value = "Something" WHERE Key = 1u; + UPDATE `/Root/KV2` SET Value = "AnotherString" WHERE Key = 101u; + )"), TTxControl::Tx(tx->GetId()).CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString()); + } + }; + + Y_UNIT_TEST(ReadWriteTxFailsOnConcurrentWrite2) { + TReadWriteTxFailsOnConcurrentWrite2 tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + class TReadWriteTxFailsOnConcurrentWrite3 : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + auto session2 = client.GetSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/KV` WHERE Key = 1u OR Key = 4000000001u ORDER BY Key; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync(); + + auto tx = result.GetTransaction(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[4000000001u];["BigOne"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + + result = session2.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES (2u, "ChangedTwo"); + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/KV` WHERE Key = 2u OR Key = 4000000002u ORDER BY Key; + )"), TTxControl::Tx(tx->GetId())).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[2u];["Two"]]; + [[4000000002u];["BigTwo"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + + result = session1.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES (2u, "TwiceChangedTwo"); + )"), TTxControl::Tx(tx->GetId()).CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString()); + } + }; + + Y_UNIT_TEST(ReadWriteTxFailsOnConcurrentWrite3) { + TReadWriteTxFailsOnConcurrentWrite3 tester; + tester.SetIsOlap(false); + tester.Execute(); + } +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp b/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp new file mode 100644 index 000000000000..573cfb74ce73 --- /dev/null +++ b/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp @@ -0,0 +1,349 @@ +#include "kqp_sink_common.h" + +#include +#include +#include +#include + +namespace NKikimr { +namespace NKqp { + +using namespace NYdb; +using namespace NYdb::NQuery; + +Y_UNIT_TEST_SUITE(KqpSinkTx) { + class TDeferredEffects : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session = client.GetSession().GetValueSync().GetSession(); + auto result = session.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` + SELECT Group, "Sergey" AS Name + FROM `/Root/Test`; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto tx = result.GetTransaction(); + + result = session.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/Test` WHERE Group = 1; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[3500u];["None"];[1u];["Anna"]]; + [[300u];["None"];[1u];["Paul"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + + auto commitResult = tx->Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::SUCCESS, commitResult.GetIssues().ToString()); + + result = session.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/Test` WHERE Group = 1; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[3500u];["None"];[1u];["Anna"]]; + [[300u];["None"];[1u];["Paul"]]; + [#;#;[1u];["Sergey"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + }; + + Y_UNIT_TEST(DeferredEffects) { + TDeferredEffects tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + class TExplicitTcl : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session = client.GetSession().GetValueSync().GetSession(); + auto tx = session.BeginTransaction(TTxSettings::SerializableRW()) + .ExtractValueSync() + .GetTransaction(); + UNIT_ASSERT(tx.IsActive()); + + auto result = session.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES (10u, "New"); + )"), TTxControl::Tx(tx.GetId())).ExtractValueSync(); + UNIT_ASSERT(result.IsSuccess()); + + result = session.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/KV` WHERE Value = "New"; + )"), TTxControl::BeginTx(TTxSettings::OnlineRO()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT(result.IsSuccess()); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + + auto commitResult = tx.Commit().ExtractValueSync(); + UNIT_ASSERT_C(commitResult.IsSuccess(), commitResult.GetIssues().ToString()); + + result = session.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/KV` WHERE Value = "New"; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT(result.IsSuccess()); + CompareYson(R"([[[10u];["New"]]])", FormatResultSetYson(result.GetResultSet(0))); + + commitResult = tx.Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::NOT_FOUND, commitResult.GetIssues().ToString()); + //TODO: UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND), commitResult.GetIssues().ToString()); + } + }; + + Y_UNIT_TEST(ExplicitTcl) { + TExplicitTcl tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + class TLocksAbortOnCommit : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session = client.GetSession().GetValueSync().GetSession(); + { + auto result = session.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES (1, "One"); + UPSERT INTO `/Root/KV` (Key, Value) VALUES (2, "Two"); + UPSERT INTO `/Root/KV` (Key, Value) VALUES (3, "Three"); + UPSERT INTO `/Root/KV` (Key, Value) VALUES (4, "Four"); + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + + auto result = session.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/KV`; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto tx = result.GetTransaction(); + + result = session.ExecuteQuery(Q_(R"( + UPDATE `/Root/KV` SET Value = "second" WHERE Key = 3; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session.ExecuteQuery(Q_(R"( + UPDATE `/Root/KV` SET Value = "third" WHERE Key = 4; + )"), TTxControl::Tx(tx->GetId())).ExtractValueSync(); + //TODO: UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto commitResult = tx->Commit().ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + //UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::NOT_FOUND, commitResult.GetIssues().ToString()); + } + }; + + Y_UNIT_TEST(LocksAbortOnCommit) { + TLocksAbortOnCommit tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + class TInvalidateOnError : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session = client.GetSession().GetValueSync().GetSession(); + auto tx = session.BeginTransaction(TTxSettings::SerializableRW()) + .ExtractValueSync() + .GetTransaction(); + UNIT_ASSERT(tx.IsActive()); + + auto result = session.ExecuteQuery(Q_(R"( + INSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "New"); + )"), TTxControl::Tx(tx.GetId())).ExtractValueSync(); + // result.GetIssues().PrintTo(Cerr); + //TODO: UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); + + result = session.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "New"); + )"), TTxControl::Tx(tx.GetId())).ExtractValueSync(); + // result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::NOT_FOUND, result.GetIssues().ToString()); + } + }; + + Y_UNIT_TEST(InvalidateOnError) { + TInvalidateOnError tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + class TInteractive : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session = client.GetSession().GetValueSync().GetSession(); + auto tx = session.BeginTransaction(TTxSettings::SerializableRW()) + .ExtractValueSync() + .GetTransaction(); + UNIT_ASSERT(tx.IsActive()); + + auto result = session.ExecuteQuery(R"( + SELECT * FROM `/Root/KV` + )", TTxControl::Tx(tx.GetId())).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session.ExecuteQuery(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "New"); + )", TTxControl::Tx(tx.GetId()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session.ExecuteQuery(R"( + SELECT * FROM `/Root/KV` WHERE Key < 3 + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["New"]]; + [[2u];["Two"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + }; + + Y_UNIT_TEST(Interactive) { + TInteractive tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + class TSnapshotRO : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session = client.GetSession().GetValueSync().GetSession(); + // Read Immediate + auto result = session.ExecuteQuery(Q1_(R"( + SELECT * FROM KV WHERE Key = 2; + )"), TTxControl::BeginTx(TTxSettings::SnapshotRO()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[2u];["Two"]]])", FormatResultSetYson(result.GetResultSet(0))); + + // Read Distributed + result = session.ExecuteQuery(Q1_(R"( + SELECT COUNT(*) FROM KV WHERE Value = "One"; + )"), TTxControl::BeginTx(TTxSettings::SnapshotRO()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[1u]])", FormatResultSetYson(result.GetResultSet(0))); + + // Write + result = session.ExecuteQuery(Q1_(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES + (100, "100500"), + (100500, "100"); + )"), TTxControl::BeginTx(TTxSettings::SnapshotRO()).CommitTx()).ExtractValueSync(); + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR); + UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_OPERATION)); + } + }; + + Y_UNIT_TEST(SnapshotRO) { + TSnapshotRO tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + class TSnapshotROInteractive1 : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session = client.GetSession().GetValueSync().GetSession(); + auto readQuery = Q1_(R"( + SELECT * FROM KV WHERE Key = 1u; + )"); + + auto readResult = R"([ + [[1u];["One"]] + ])"; + + auto result = session.ExecuteQuery(readQuery, + TTxControl::BeginTx(TTxSettings::SnapshotRO())).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(readResult, FormatResultSetYson(result.GetResultSet(0))); + + auto tx = result.GetTransaction(); + UNIT_ASSERT(tx); + UNIT_ASSERT(tx->IsActive()); + + result = session.ExecuteQuery(Q1_(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES + (1u, "value"); + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session.ExecuteQuery(readQuery, + TTxControl::Tx(tx->GetId()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(readResult, FormatResultSetYson(result.GetResultSet(0))); + } + }; + + Y_UNIT_TEST(SnapshotROInteractive1) { + TSnapshotROInteractive1 tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + class TSnapshotROInteractive2 : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session = client.GetSession().GetValueSync().GetSession(); + auto readQuery = Q1_(R"( + SELECT COUNT(*) FROM KV WHERE Value = "One"; + )"); + + auto readResult = R"([ + [1u] + ])"; + + auto tx = session.BeginTransaction(TTxSettings::SnapshotRO()) + .ExtractValueSync() + .GetTransaction(); + UNIT_ASSERT(tx.IsActive()); + + auto result = session.ExecuteQuery(readQuery, + TTxControl::Tx(tx.GetId())).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(readResult, FormatResultSetYson(result.GetResultSet(0))); + + result = session.ExecuteQuery(Q1_(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES + (100500u, "One"); + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session.ExecuteQuery(readQuery, + TTxControl::Tx(tx.GetId())).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(readResult, FormatResultSetYson(result.GetResultSet(0))); + + auto commitResult = tx.Commit().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::SUCCESS, commitResult.GetIssues().ToString()); + } + }; + + Y_UNIT_TEST(SnapshotROInteractive2) { + TSnapshotROInteractive2 tester; + tester.SetIsOlap(false); + tester.Execute(); + } +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/ut/tx/ya.make b/ydb/core/kqp/ut/tx/ya.make index 7f4ca5fae0d6..e7369a4a256e 100644 --- a/ydb/core/kqp/ut/tx/ya.make +++ b/ydb/core/kqp/ut/tx/ya.make @@ -16,12 +16,16 @@ SRCS( kqp_locks_tricky_ut.cpp kqp_locks_ut.cpp kqp_mvcc_ut.cpp + kqp_sink_locks_ut.cpp + kqp_sink_mvcc_ut.cpp + kqp_sink_tx_ut.cpp kqp_tx_ut.cpp ) PEERDIR( ydb/core/kqp ydb/core/kqp/ut/common + ydb/core/tx/columnshard/hooks/testing ydb/library/yql/sql/pg_dummy )