Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix EvWrite to release memory correctly #9809

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1586,5 +1586,106 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
}
}

Y_UNIT_TEST(PreparedDistributedWritePageFault) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableDataShardVolatileTransactions(false);

auto [runtime, server, sender] = TestCreateServer(serverSettings);

TDisableDataShardLogBatching disableDataShardLogBatching;

// Use a policy without levels and very small page sizes, effectively making each row on its own page
NLocalDb::TCompactionPolicyPtr policy = NLocalDb::CreateDefaultTablePolicy();
policy->MinDataPageSize = 1;

auto opts = TShardedTableOptions()
.Columns({{"key", "Int32", true, false},
{"value", "Int32", false, false}})
.Policy(policy.Get());
const auto& columns = opts.Columns_;
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table", opts);
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);

const ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain);

const ui64 lockTxId1 = 1234567890001;
const ui64 lockNodeId = runtime.GetNodeId(0);
NLongTxService::TLockHandle lockHandle1(lockTxId1, runtime.GetActorSystem(0));

auto shard1 = shards.at(0);
NKikimrDataEvents::TLock lock1shard1;

// 1. Make an uncommitted write (lock1 shard1)
{
Cerr << "... making an uncommmited write to " << shard1 << Endl;
auto req = MakeWriteRequestOneKeyValue(
std::nullopt,
NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE,
NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
tableId,
columns,
1, 11);
req->SetLockId(lockTxId1, lockNodeId);
auto result = Write(runtime, sender, shard1, std::move(req));
UNIT_ASSERT_VALUES_EQUAL(result.GetTxLocks().size(), 1u);
lock1shard1 = result.GetTxLocks().at(0);
UNIT_ASSERT_C(lock1shard1.GetCounter() < 1000, "Unexpected lock in the result: " << lock1shard1.ShortDebugString());
}

// 2. Compact and reboot the tablet
Cerr << "... compacting shard " << shard1 << Endl;
CompactTable(runtime, shard1, tableId, false);
Cerr << "... rebooting shard " << shard1 << Endl;
RebootTablet(runtime, shard1, sender);
runtime.SimulateSleep(TDuration::Seconds(1));

// 3. Prepare a distributed write (single shard for simplicity)
ui64 txId1 = 1234567890011;
auto tx1sender = runtime.AllocateEdgeActor();
{
auto req1 = MakeWriteRequestOneKeyValue(
txId1,
NKikimrDataEvents::TEvWrite::MODE_PREPARE,
NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
tableId,
columns,
1, 22);
req1->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);

Cerr << "... preparing tx1 at " << shard1 << Endl;
auto res1 = Write(runtime, tx1sender, shard1, std::move(req1));

// Reboot, making sure tx is only loaded after it's planned
// This causes tx to skip conflicts cache and go to execution
// The first attempt to execute will page fault looking for conflicts
// Tx will be released, and will trigger the bug on restore
Cerr << "... rebooting shard " << shard1 << Endl;
RebootTablet(runtime, shard1, sender);
runtime.SimulateSleep(TDuration::Seconds(1));

ui64 minStep = res1.GetMinStep();
ui64 maxStep = res1.GetMaxStep();

Cerr << "... planning tx1 at " << coordinator << Endl;
SendProposeToCoordinator(
runtime, tx1sender, { shard1 }, {
.TxId = txId1,
.Coordinator = coordinator,
.MinStep = minStep,
.MaxStep = maxStep,
});
}

// 4. Check tx1 reply (it must succeed)
{
Cerr << "... waiting for tx1 result" << Endl;
auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(tx1sender);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
}
}

} // Y_UNIT_TEST_SUITE
} // namespace NKikimr
3 changes: 2 additions & 1 deletion ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,9 @@ TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self)
void TWriteOperation::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase& provider) {
ReleasedTxDataSize = provider.GetMemoryLimit() + provider.GetRequestedMemory();

if (!WriteTx || IsTxDataReleased())
if (!WriteTx || WriteTx->GetIsReleased()) {
return;
}

WriteTx->ReleaseTxData();
// Immediate transactions have no body stored.
Expand Down
Loading