Skip to content

Commit

Permalink
Setup sys locks in TTxApplyReplicationChanges (#9723)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Sep 24, 2024
1 parent 5807f62 commit 0cd4902
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
7 changes: 7 additions & 0 deletions ydb/core/tx/datashard/datashard_repl_apply.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "datashard_impl.h"
#include "datashard_locks_db.h"
#include "setup_sys_locks.h"

#include <util/string/escape.h>

Expand All @@ -24,6 +26,9 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBase<TDataShar
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
Y_UNUSED(ctx);

TDataShardLocksDb locksDb(*Self, txc);
TSetupSysLocks guardLocks(*Self, &locksDb);

if (Self->State != TShardState::Ready) {
Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>(
NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED,
Expand Down Expand Up @@ -84,6 +89,7 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBase<TDataShar
}

if (MvccReadWriteVersion) {
Self->PromoteImmediatePostExecuteEdges(*MvccReadWriteVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc);
Pipeline.AddCommittingOp(*MvccReadWriteVersion);
}

Expand All @@ -92,6 +98,7 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBase<TDataShar
NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_OK);
}

Self->SysLocksTable().ApplyLocks();
return true;
}

Expand Down
42 changes: 42 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_replication.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
#include "datashard_active_transaction.h"
#include "datashard_ut_common_kqp.h"

#include <ydb/core/tx/tx_proxy/proxy.h>

namespace NKikimr {

using namespace NKikimr::NDataShard;
using namespace NKikimr::NDataShard::NKqpHelpers;
using namespace NSchemeShard;
using namespace Tests;

Expand Down Expand Up @@ -307,6 +309,46 @@ Y_UNIT_TEST_SUITE(DataShardReplication) {
}, NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED);
}

Y_UNIT_TEST(ApplyChangesWithConcurrentTx) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

InitRoot(server, sender);
CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions()
.Replicated(true)
.ReplicationConsistency(EReplicationConsistency::Weak)
);

auto shards = GetTableShards(server, sender, "/Root/table-1");
auto tableId = ResolveTableId(server, sender, "/Root/table-1");

ApplyChanges(server, shards.at(0), tableId, "my-source", {
TChange{ .Offset = 0, .WriteTxId = 0, .Key = 1, .Value = 11 },
});

TString sessionId;
TString txId;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleBegin(runtime, sessionId, txId, "SELECT key, value FROM `/Root/table-1`;"),
"{ items { uint32_value: 1 } items { uint32_value: 11 } }");

ApplyChanges(server, shards.at(0), tableId, "my-source", {
TChange{ .Offset = 1, .WriteTxId = 0, .Key = 1, .Value = 21 },
});

UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleCommit(runtime, sessionId, txId, "SELECT key, value FROM `/Root/table-1`;"),
"{ items { uint32_value: 1 } items { uint32_value: 11 } }");
}

}

} // namespace NKikimr

0 comments on commit 0cd4902

Please sign in to comment.