diff --git a/ydb/core/tx/datashard/datashard_repl_apply.cpp b/ydb/core/tx/datashard/datashard_repl_apply.cpp index 58779eeef4c9..db15e9595db6 100644 --- a/ydb/core/tx/datashard/datashard_repl_apply.cpp +++ b/ydb/core/tx/datashard/datashard_repl_apply.cpp @@ -1,4 +1,6 @@ #include "datashard_impl.h" +#include "datashard_locks_db.h" +#include "setup_sys_locks.h" #include @@ -24,6 +26,9 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBaseState != TShardState::Ready) { Result = MakeHolder( NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, @@ -84,6 +89,7 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBasePromoteImmediatePostExecuteEdges(*MvccReadWriteVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc); Pipeline.AddCommittingOp(*MvccReadWriteVersion); } @@ -92,6 +98,7 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBaseSysLocksTable().ApplyLocks(); return true; } diff --git a/ydb/core/tx/datashard/datashard_ut_replication.cpp b/ydb/core/tx/datashard/datashard_ut_replication.cpp index 30267d6537b4..8d5df4025d1c 100644 --- a/ydb/core/tx/datashard/datashard_ut_replication.cpp +++ b/ydb/core/tx/datashard/datashard_ut_replication.cpp @@ -1,11 +1,13 @@ #include #include "datashard_active_transaction.h" +#include "datashard_ut_common_kqp.h" #include namespace NKikimr { using namespace NKikimr::NDataShard; +using namespace NKikimr::NDataShard::NKqpHelpers; using namespace NSchemeShard; using namespace Tests; @@ -307,6 +309,46 @@ Y_UNIT_TEST_SUITE(DataShardReplication) { }, NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED); } + Y_UNIT_TEST(ApplyChangesWithConcurrentTx) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions() + .Replicated(true) + .ReplicationConsistency(EReplicationConsistency::Weak) + ); + + auto shards = GetTableShards(server, sender, "/Root/table-1"); + auto tableId = ResolveTableId(server, sender, "/Root/table-1"); + + ApplyChanges(server, shards.at(0), tableId, "my-source", { + TChange{ .Offset = 0, .WriteTxId = 0, .Key = 1, .Value = 11 }, + }); + + TString sessionId; + TString txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, "SELECT key, value FROM `/Root/table-1`;"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }"); + + ApplyChanges(server, shards.at(0), tableId, "my-source", { + TChange{ .Offset = 1, .WriteTxId = 0, .Key = 1, .Value = 21 }, + }); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleCommit(runtime, sessionId, txId, "SELECT key, value FROM `/Root/table-1`;"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }"); + } + } } // namespace NKikimr