Skip to content

Commit

Permalink
Sink locks tests + some locks fixes (ydb-platform#8420)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored and stanislav-shchetinin committed Aug 30, 2024
1 parent 85522a5 commit 485265c
Show file tree
Hide file tree
Showing 10 changed files with 1,116 additions and 18 deletions.
1 change: 1 addition & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ ydb/core/kqp/ut/service [*/*]*
ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect
ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
ydb/core/kqp/ut/tx KqpSnapshotRead.ReadOnlyTxWithIndexCommitsOnConcurrentWrite+withSink
ydb/core/persqueue/ut [*/*]*
ydb/core/persqueue/ut TPQTest.*DirectRead*
ydb/core/persqueue/ut/ut_with_sdk [*/*]*
Expand Down
37 changes: 32 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,36 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
Counters->TxProxyMon->ResultsReceivedCount->Inc();
Counters->TxProxyMon->TxResultComplete->Inc();

CheckExecutionComplete();
return;
}
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
LOG_D("Broken locks: " << res->Record.DebugString());
YQL_ENSURE(shardState->State == TShardState::EState::Executing);
shardState->State = TShardState::EState::Finished;
Counters->TxProxyMon->TxResultAborted->Inc();
LocksBroken = true;

TMaybe<TString> tableName;
if (!res->Record.GetTxLocks().empty()) {
auto& lock = res->Record.GetTxLocks(0);
auto tableId = TTableId(lock.GetSchemeShard(), lock.GetPathId());
auto it = FindIf(TasksGraph.GetStagesInfo(), [tableId](const auto& x){ return x.second.Meta.TableId.HasSamePath(tableId); });
if (it != TasksGraph.GetStagesInfo().end()) {
tableName = it->second.Meta.TableConstInfo->Path;
}
}

// Reply as soon as we know which table had locks invalidated
if (tableName) {
auto message = TStringBuilder()
<< "Transaction locks invalidated. Table: " << *tableName;

return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
}


CheckExecutionComplete();
return;
}
Expand Down Expand Up @@ -1673,18 +1703,15 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}

void ExecuteEvWriteTransaction(ui64 shardId, NKikimrDataEvents::TEvWrite& evWrite) {
YQL_ENSURE(!ImmediateTx);
TShardState shardState;
shardState.State = TShardState::EState::Preparing;
shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing;
shardState.DatashardState.ConstructInPlace();

auto evWriteTransaction = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>();
evWriteTransaction->Record = evWrite;
evWriteTransaction->Record.SetTxMode(NKikimrDataEvents::TEvWrite::MODE_PREPARE);
evWriteTransaction->Record.SetTxMode(ImmediateTx ? NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);
evWriteTransaction->Record.SetTxId(TxId);

evWriteTransaction->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);

auto locksCount = evWriteTransaction->Record.GetLocks().LocksSize();
shardState.DatashardState->ShardReadLocks = locksCount > 0;

Expand Down
8 changes: 7 additions & 1 deletion ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
OnMessageAcknowledged(ev->Get()->Record.GetOrigin(), ev->Cookie);

for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
LocksInfo[ev->Get()->Record.GetOrigin()].AddAndCheckLock(lock);
if (!LocksInfo[ev->Get()->Record.GetOrigin()].AddAndCheckLock(lock)) {
RuntimeError(
TStringBuilder() << "Got LOCKS BROKEN for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`.",
NYql::NDqProto::StatusIds::ABORTED,
NYql::TIssues{});
}
}

ProcessBatches();
Expand Down
12 changes: 10 additions & 2 deletions ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ using NYql::TExprNode;

Y_UNIT_TEST_SUITE(KqpLocksTricky) {

Y_UNIT_TEST(TestNoLocksIssue) {
Y_UNIT_TEST_TWIN(TestNoLocksIssue, withSink) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink);

auto setting = NKikimrKqp::TKqpSetting();
TKikimrSettings settings;
settings.SetAppConfig(appConfig);
settings.SetUseRealThreads(false);
TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
Expand Down Expand Up @@ -123,9 +127,13 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) {
}
}

Y_UNIT_TEST(TestNoLocksIssueInteractiveTx) {
Y_UNIT_TEST_TWIN(TestNoLocksIssueInteractiveTx, withSink) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink);

auto setting = NKikimrKqp::TKqpSetting();
TKikimrSettings settings;
settings.SetAppConfig(appConfig);
settings.SetUseRealThreads(false);
TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
Expand Down
40 changes: 30 additions & 10 deletions ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ using namespace NYdb;
using namespace NYdb::NTable;

Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
Y_UNIT_TEST(TestSnapshotExpiration) {
Y_UNIT_TEST_TWIN(TestSnapshotExpiration, withSink) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink);
auto settings = TKikimrSettings()
.SetKeepSnapshotTimeout(TDuration::Seconds(1));
.SetKeepSnapshotTimeout(TDuration::Seconds(1))
.SetAppConfig(appConfig);

TKikimrRunner kikimr(settings);

Expand Down Expand Up @@ -63,8 +66,9 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
UNIT_ASSERT_C(caught, "Failed to wait for snapshot expiration.");
}

Y_UNIT_TEST(ReadOnlyTxCommitsOnConcurrentWrite) {
Y_UNIT_TEST_TWIN(ReadOnlyTxCommitsOnConcurrentWrite, withSink) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true);
TKikimrRunner kikimr(TKikimrSettings()
.SetAppConfig(appConfig)
Expand Down Expand Up @@ -125,8 +129,13 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
])", FormatResultSetYson(result.GetResultSet(0)));
}

Y_UNIT_TEST(ReadOnlyTxWithIndexCommitsOnConcurrentWrite) {
TKikimrRunner kikimr;
Y_UNIT_TEST_TWIN(ReadOnlyTxWithIndexCommitsOnConcurrentWrite, withSink) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink);
TKikimrRunner kikimr(
TKikimrSettings()
.SetAppConfig(appConfig)
);

// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);
// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_DEBUG);
Expand Down Expand Up @@ -186,8 +195,13 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
])", FormatResultSetYson(result.GetResultSet(0)));
}

Y_UNIT_TEST(ReadWriteTxFailsOnConcurrentWrite1) {
TKikimrRunner kikimr;
Y_UNIT_TEST_TWIN(ReadWriteTxFailsOnConcurrentWrite1, withSink) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink);
TKikimrRunner kikimr(
TKikimrSettings()
.SetAppConfig(appConfig)
);

// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);
// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_DEBUG);
Expand Down Expand Up @@ -223,8 +237,13 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString());
}

Y_UNIT_TEST(ReadWriteTxFailsOnConcurrentWrite2) {
TKikimrRunner kikimr;
Y_UNIT_TEST_TWIN(ReadWriteTxFailsOnConcurrentWrite2, withSink) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink);
TKikimrRunner kikimr(
TKikimrSettings()
.SetAppConfig(appConfig)
);

// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);
// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_DEBUG);
Expand Down Expand Up @@ -266,8 +285,9 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString());
}

Y_UNIT_TEST(ReadWriteTxFailsOnConcurrentWrite3) {
Y_UNIT_TEST_TWIN(ReadWriteTxFailsOnConcurrentWrite3, withSink) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true);
TKikimrRunner kikimr(
TKikimrSettings()
Expand Down
109 changes: 109 additions & 0 deletions ydb/core/kqp/ut/tx/kqp_sink_common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#pragma once

#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/testlib/common_helper.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>


namespace NKikimr {
namespace NKqp {

using namespace NYdb;
using namespace NYdb::NQuery;

class TTableDataModificationTester {
protected:
NKikimrConfig::TAppConfig AppConfig;
std::unique_ptr<TKikimrRunner> Kikimr;
YDB_ACCESSOR(bool, IsOlap, false);
YDB_ACCESSOR(bool, FastSnapshotExpiration, false);

virtual void DoExecute() = 0;
public:
void Execute() {
AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
AppConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true);
auto settings = TKikimrSettings().SetAppConfig(AppConfig).SetWithSampleTables(false);
if (FastSnapshotExpiration) {
settings.SetKeepSnapshotTimeout(TDuration::Seconds(1));
}

Kikimr = std::make_unique<TKikimrRunner>(settings);
Tests::NCommon::TLoggerInit(*Kikimr).Initialize();

auto client = Kikimr->GetQueryClient();

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);

{
auto type = IsOlap ? "COLUMN" : "ROW";
auto result = client.ExecuteQuery(Sprintf(R"(
CREATE TABLE `/Root/Test` (
Group Uint32,
Name String,
Amount Uint64,
Comment String,
PRIMARY KEY (Group, Name)
) WITH (
STORE = %s,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
);
CREATE TABLE `/Root/KV` (
Key Uint32,
Value String,
PRIMARY KEY (Key)
) WITH (
STORE = %s,
AUTO_PARTITIONING_BY_SIZE = DISABLED,
AUTO_PARTITIONING_BY_LOAD = DISABLED,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 100,
UNIFORM_PARTITIONS = 100
);
CREATE TABLE `/Root/KV2` (
Key Uint32,
Value String,
PRIMARY KEY (Key)
) WITH (
STORE = %s,
AUTO_PARTITIONING_BY_SIZE = DISABLED,
AUTO_PARTITIONING_BY_LOAD = DISABLED,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 100,
UNIFORM_PARTITIONS = 100
);
)", type, type, type), TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
REPLACE INTO `Test` (Group, Name, Amount, Comment) VALUES
(1u, "Anna", 3500ul, "None"),
(1u, "Paul", 300ul, "None"),
(2u, "Tony", 7200ul, "None");
REPLACE INTO `KV` (Key, Value) VALUES
(1u, "One"),
(2u, "Two"),
(3u, "Three"),
(4000000001u, "BigOne"),
(4000000002u, "BigTwo"),
(4000000003u, "BigThree");
)", TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

DoExecute();
csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
csController->WaitIndexation(TDuration::Seconds(5));
}

};

}
}
Loading

0 comments on commit 485265c

Please sign in to comment.