diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp index 6291554537fe..906ce5d909b7 100644 --- a/ydb/core/tx/datashard/datashard_ut_write.cpp +++ b/ydb/core/tx/datashard/datashard_ut_write.cpp @@ -1586,5 +1586,106 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { } } + Y_UNIT_TEST(PreparedDistributedWritePageFault) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableDataShardVolatileTransactions(false); + + auto [runtime, server, sender] = TestCreateServer(serverSettings); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + // Use a policy without levels and very small page sizes, effectively making each row on its own page + NLocalDb::TCompactionPolicyPtr policy = NLocalDb::CreateDefaultTablePolicy(); + policy->MinDataPageSize = 1; + + auto opts = TShardedTableOptions() + .Columns({{"key", "Int32", true, false}, + {"value", "Int32", false, false}}) + .Policy(policy.Get()); + const auto& columns = opts.Columns_; + auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table", opts); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); + + const ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain); + + const ui64 lockTxId1 = 1234567890001; + const ui64 lockNodeId = runtime.GetNodeId(0); + NLongTxService::TLockHandle lockHandle1(lockTxId1, runtime.GetActorSystem(0)); + + auto shard1 = shards.at(0); + NKikimrDataEvents::TLock lock1shard1; + + // 1. Make an uncommitted write (lock1 shard1) + { + Cerr << "... making an uncommmited write to " << shard1 << Endl; + auto req = MakeWriteRequestOneKeyValue( + std::nullopt, + NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, + NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, + tableId, + columns, + 1, 11); + req->SetLockId(lockTxId1, lockNodeId); + auto result = Write(runtime, sender, shard1, std::move(req)); + UNIT_ASSERT_VALUES_EQUAL(result.GetTxLocks().size(), 1u); + lock1shard1 = result.GetTxLocks().at(0); + UNIT_ASSERT_C(lock1shard1.GetCounter() < 1000, "Unexpected lock in the result: " << lock1shard1.ShortDebugString()); + } + + // 2. Compact and reboot the tablet + Cerr << "... compacting shard " << shard1 << Endl; + CompactTable(runtime, shard1, tableId, false); + Cerr << "... rebooting shard " << shard1 << Endl; + RebootTablet(runtime, shard1, sender); + runtime.SimulateSleep(TDuration::Seconds(1)); + + // 3. Prepare a distributed write (single shard for simplicity) + ui64 txId1 = 1234567890011; + auto tx1sender = runtime.AllocateEdgeActor(); + { + auto req1 = MakeWriteRequestOneKeyValue( + txId1, + NKikimrDataEvents::TEvWrite::MODE_PREPARE, + NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, + tableId, + columns, + 1, 22); + req1->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit); + + Cerr << "... preparing tx1 at " << shard1 << Endl; + auto res1 = Write(runtime, tx1sender, shard1, std::move(req1)); + + // Reboot, making sure tx is only loaded after it's planned + // This causes tx to skip conflicts cache and go to execution + // The first attempt to execute will page fault looking for conflicts + // Tx will be released, and will trigger the bug on restore + Cerr << "... rebooting shard " << shard1 << Endl; + RebootTablet(runtime, shard1, sender); + runtime.SimulateSleep(TDuration::Seconds(1)); + + ui64 minStep = res1.GetMinStep(); + ui64 maxStep = res1.GetMaxStep(); + + Cerr << "... planning tx1 at " << coordinator << Endl; + SendProposeToCoordinator( + runtime, tx1sender, { shard1 }, { + .TxId = txId1, + .Coordinator = coordinator, + .MinStep = minStep, + .MaxStep = maxStep, + }); + } + + // 4. Check tx1 reply (it must succeed) + { + Cerr << "... waiting for tx1 result" << Endl; + auto ev = runtime.GrabEdgeEventRethrow(tx1sender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED); + } + } + } // Y_UNIT_TEST_SUITE } // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp index 5ec4e5e92bac..0d67c7f02350 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.cpp +++ b/ydb/core/tx/datashard/datashard_write_operation.cpp @@ -416,8 +416,9 @@ TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self) void TWriteOperation::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase& provider) { ReleasedTxDataSize = provider.GetMemoryLimit() + provider.GetRequestedMemory(); - if (!WriteTx || IsTxDataReleased()) + if (!WriteTx || WriteTx->GetIsReleased()) { return; + } WriteTx->ReleaseTxData(); // Immediate transactions have no body stored.