Skip to content

Commit

Permalink
Merge ce37536 into 9585917
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Sep 26, 2024
2 parents 9585917 + ce37536 commit 61a587d
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 1 deletion.
101 changes: 101 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1586,5 +1586,106 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
}
}

Y_UNIT_TEST(PreparedDistributedWritePageFault) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableDataShardVolatileTransactions(false);

auto [runtime, server, sender] = TestCreateServer(serverSettings);

TDisableDataShardLogBatching disableDataShardLogBatching;

// Use a policy without levels and very small page sizes, effectively making each row on its own page
NLocalDb::TCompactionPolicyPtr policy = NLocalDb::CreateDefaultTablePolicy();
policy->MinDataPageSize = 1;

auto opts = TShardedTableOptions()
.Columns({{"key", "Int32", true, false},
{"value", "Int32", false, false}})
.Policy(policy.Get());
const auto& columns = opts.Columns_;
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table", opts);
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);

const ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain);

const ui64 lockTxId1 = 1234567890001;
const ui64 lockNodeId = runtime.GetNodeId(0);
NLongTxService::TLockHandle lockHandle1(lockTxId1, runtime.GetActorSystem(0));

auto shard1 = shards.at(0);
NKikimrDataEvents::TLock lock1shard1;

// 1. Make an uncommitted write (lock1 shard1)
{
Cerr << "... making an uncommmited write to " << shard1 << Endl;
auto req = MakeWriteRequestOneKeyValue(
std::nullopt,
NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE,
NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
tableId,
columns,
1, 11);
req->SetLockId(lockTxId1, lockNodeId);
auto result = Write(runtime, sender, shard1, std::move(req));
UNIT_ASSERT_VALUES_EQUAL(result.GetTxLocks().size(), 1u);
lock1shard1 = result.GetTxLocks().at(0);
UNIT_ASSERT_C(lock1shard1.GetCounter() < 1000, "Unexpected lock in the result: " << lock1shard1.ShortDebugString());
}

// 2. Compact and reboot the tablet
Cerr << "... compacting shard " << shard1 << Endl;
CompactTable(runtime, shard1, tableId, false);
Cerr << "... rebooting shard " << shard1 << Endl;
RebootTablet(runtime, shard1, sender);
runtime.SimulateSleep(TDuration::Seconds(1));

// 3. Prepare a distributed write (single shard for simplicity)
ui64 txId1 = 1234567890011;
auto tx1sender = runtime.AllocateEdgeActor();
{
auto req1 = MakeWriteRequestOneKeyValue(
txId1,
NKikimrDataEvents::TEvWrite::MODE_PREPARE,
NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
tableId,
columns,
1, 22);
req1->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);

Cerr << "... preparing tx1 at " << shard1 << Endl;
auto res1 = Write(runtime, tx1sender, shard1, std::move(req1));

// Reboot, making sure tx is only loaded after it's planned
// This causes tx to skip conflicts cache and go to execution
// The first attempt to execute will page fault looking for conflicts
// Tx will be released, and will trigger the bug on restore
Cerr << "... rebooting shard " << shard1 << Endl;
RebootTablet(runtime, shard1, sender);
runtime.SimulateSleep(TDuration::Seconds(1));

ui64 minStep = res1.GetMinStep();
ui64 maxStep = res1.GetMaxStep();

Cerr << "... planning tx1 at " << coordinator << Endl;
SendProposeToCoordinator(
runtime, tx1sender, { shard1 }, {
.TxId = txId1,
.Coordinator = coordinator,
.MinStep = minStep,
.MaxStep = maxStep,
});
}

// 4. Check tx1 reply (it must succeed)
{
Cerr << "... waiting for tx1 result" << Endl;
auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(tx1sender);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
}
}

} // Y_UNIT_TEST_SUITE
} // namespace NKikimr
3 changes: 2 additions & 1 deletion ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,9 @@ TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self)
void TWriteOperation::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase& provider) {
ReleasedTxDataSize = provider.GetMemoryLimit() + provider.GetRequestedMemory();

if (!WriteTx || IsTxDataReleased())
if (!WriteTx || WriteTx->GetIsReleased()) {
return;
}

WriteTx->ReleaseTxData();
// Immediate transactions have no body stored.
Expand Down

0 comments on commit 61a587d

Please sign in to comment.