Skip to content

Commit

Permalink
24-3-11-hotfix: Fix bulk operations breaking frozen locks (#12020)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Nov 27, 2024
1 parent 0f089bd commit 47026cc
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 1 deletion.
10 changes: 10 additions & 0 deletions ydb/core/tx/datashard/datashard__op_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TAct
UpdateProposeQueueSize();
return;
}
if (Pipeline.HasProposeDelayers()) {
DelayedProposeQueue.emplace_back().Reset(ev.Release());
UpdateProposeQueueSize();
return;
}
if (IsReplicated()) {
return Reject<TEvDataShard::TEvUploadRowsResponse>(this, ev, "bulk upsert",
ERejectReasons::WrongState, "Can't execute bulk upsert at replicated table", &ReadOnly, ctx, TDataShard::ELogThrottlerType::UploadRows_Reject);
Expand All @@ -237,6 +242,11 @@ void TDataShard::Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActo
UpdateProposeQueueSize();
return;
}
if (Pipeline.HasProposeDelayers()) {
DelayedProposeQueue.emplace_back().Reset(ev.Release());
UpdateProposeQueueSize();
return;
}
if (IsReplicated()) {
return Reject<TEvDataShard::TEvEraseRowsResponse>(this, ev, "erase",
ERejectReasons::WrongState, "Can't execute erase at replicated table", &ExecError, ctx, TDataShard::ELogThrottlerType::EraseRows_Reject);
Expand Down
184 changes: 184 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3977,6 +3977,190 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"{ items { uint32_value: 4 } items { uint32_value: 40 } }");
}

Y_UNIT_TEST(UncommittedWriteRestartDuringCommitThenBulkErase) {
NKikimrConfig::TAppConfig app;

TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetDomainPlanResolution(100)
.SetAppConfig(app)
// Bug was with non-volatile transactions
.SetEnableDataShardVolatileTransactions(false);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;
UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key Uint32, value Uint32, PRIMARY KEY (key))
WITH (PARTITION_AT_KEYS = (5));
)"),
"SUCCESS");

// Insert some initial data
ExecSQL(server, sender, "UPSERT INTO `/Root/table` (key, value) VALUES (1, 10), (5, 50);");

const auto shards = GetTableShards(server, sender, "/Root/table");
const auto tableId = ResolveTableId(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);

TString sessionId, txId;

// Start inserting a couple of rows into the table
Cerr << "... sending initial upsert" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleBegin(runtime, sessionId, txId, R"(
SELECT key, value FROM `/Root/table` WHERE key = 1;
UPSERT INTO `/Root/table` (key, value) VALUES (2, 20), (6, 60);
)"),
"{ items { uint32_value: 1 } items { uint32_value: 10 } }");

// We want to block readsets next
std::vector<std::unique_ptr<IEventHandle>> readSets;
auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
readSets.emplace_back(ev.Release());
});

// Start committing an additional read/write
// Note: select on the table flushes accumulated changes first
Cerr << "... sending commit request" << Endl;
auto commitFuture = SendRequest(runtime, MakeSimpleRequestRPC(R"(
SELECT key, value FROM `/Root/table` ORDER BY key;
)", sessionId, txId, /* commitTx */ true));

WaitFor(runtime, [&]{ return readSets.size() >= 2; }, "readset exchange");
UNIT_ASSERT_VALUES_EQUAL(readSets.size(), 2u);

// We want to make sure we block the first progress message when shards reboot
std::vector<TActorId> shardActors(shards.size());
UNIT_ASSERT_VALUES_EQUAL(shardActors.size(), 2u);
std::vector<std::unique_ptr<IEventHandle>> blockedProgress;
auto blockProgressQueue = runtime.AddObserver([&](TAutoPtr<IEventHandle>& ev) noexcept {
switch (ev->GetTypeRewrite()) {
case TEvTablet::TEvBoot::EventType: {
auto* msg = ev->Get<TEvTablet::TEvBoot>();
Cerr << "... observed TEvBoot for " << msg->TabletID << " at " << ev->GetRecipientRewrite() << Endl;
auto it = std::find(shards.begin(), shards.end(), msg->TabletID);
if (it != shards.end()) {
shardActors.at(it - shards.begin()) = ev->GetRecipientRewrite();
}
break;
}
case EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 0 /* EvProgressTransaction */: {
auto it = std::find(shardActors.begin(), shardActors.end(), ev->GetRecipientRewrite());
if (it != shardActors.end()) {
ui64 shardId = shards.at(it - shardActors.begin());
Cerr << "... blocking TEvProgressTranasction at " << ev->GetRecipientRewrite() << " shard " << shardId << Endl;
blockedProgress.emplace_back(ev.Release());
return;
}
break;
}
}
});

// Clear old readsets and reboot both shards with TEvPoison
// This way shards don't have a chance to reply causing an UNDETERMINED error
readSets.clear();
for (ui64 shardId : shards) {
Cerr << "... sending TEvPoison to " << shardId << Endl;
ForwardToTablet(runtime, shardId, sender, new TEvents::TEvPoison);
}

// Note: we cannot wait for the commit result, since KQP is blocked trying to abort

// Sleep a little to make sure everything settles
Cerr << "... sleeping for 1 second" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

UNIT_ASSERT_VALUES_EQUAL(readSets.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(blockedProgress.size(), 2u);

// Send an erase rows request before the progress queue resumes
{
Cerr << "... sending TEvEraseRowsRequest to shard 1 for key 1" << Endl;
auto req = std::make_unique<TEvDataShard::TEvEraseRowsRequest>();
req->Record.SetTableId(tableId.PathId.LocalPathId);
req->Record.SetSchemaVersion(tableId.SchemaVersion);
req->Record.AddKeyColumnIds(1);
ui32 key = 1;
TCell keyCell = TCell::Make(key);
req->Record.AddKeyColumns(TSerializedCellVec::Serialize(TArrayRef<const TCell>(&keyCell, 1)));
runtime.Send(new IEventHandle(shardActors.at(0), sender, req.release()), 0, true);
// Give shard 1 a chance to process this request incorrectly
Cerr << "... sleeping for 1 second" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));
}

// Unblock progress queue and resend blocked messages
Cerr << "... resending progress queue" << Endl;
blockProgressQueue.Remove();
for (auto& ev : blockedProgress) {
runtime.Send(ev.release(), 0, true);
}
blockedProgress.clear();

// This insert must run after the currently committing transaction, so it must fail: either read happens before
// the commit and is broken later by the commit, or the read finds a duplicate row and insert fails. Due to a
// bug the commit lock might already be broken, causing conflicts not to work properly, and allowing the insert
// to overwrite key = 2.
Cerr << "... sending an insert" << Endl;
auto insertFuture = KqpSimpleSend(runtime, R"(
INSERT INTO `/Root/table` (key, value) VALUES (2, 22);
)");

// Sleep a little to make sure everything settles
Cerr << "... sleeping for 1 second" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

// Unblock readsets letting transaction to complete
Cerr << "... resending readsets" << Endl;
blockReadSets.Remove();
for (auto& ev : readSets) {
runtime.Send(ev.release(), 0, true);
}
readSets.clear();

// Sleep a little to make sure everything settles
Cerr << "... sleeping for 1 second" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

// We expect erase to succeed by this point
Cerr << "... checking the erase result" << Endl;
{
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvEraseRowsResponse>(sender);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), NKikimrTxDataShard::TEvEraseRowsResponse::OK);
}

// We expect commit to fail with an UNDETERMINED error
Cerr << "... checking the commit result" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
FormatResult(AwaitResponse(runtime, std::move(commitFuture))),
"ERROR: UNDETERMINED");

// Now make a read query, we must not observe any partial commits
Cerr << "... checking final table state" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
SELECT key, value FROM `/Root/table`
ORDER BY key;
)"),
"{ items { uint32_value: 2 } items { uint32_value: 20 } }, "
"{ items { uint32_value: 5 } items { uint32_value: 50 } }, "
"{ items { uint32_value: 6 } items { uint32_value: 60 } }");
}

/**
* This observer forces newly created nodes to start on particular nodes
*/
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/locks/locks.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ enum class ELockRangeFlags : ui8 {
using ELockRangeFlagsRaw = std::underlying_type<ELockRangeFlags>::type;

inline ELockRangeFlags operator|(ELockRangeFlags a, ELockRangeFlags b) { return ELockRangeFlags(ELockRangeFlagsRaw(a) | ELockRangeFlagsRaw(b)); }
inline ELockRangeFlags operator&(ELockRangeFlags a, ELockRangeFlags b) { return ELockRangeFlags(ELockRangeFlagsRaw(a) | ELockRangeFlagsRaw(b)); }
inline ELockRangeFlags operator&(ELockRangeFlags a, ELockRangeFlags b) { return ELockRangeFlags(ELockRangeFlagsRaw(a) & ELockRangeFlagsRaw(b)); }
inline ELockRangeFlags& operator|=(ELockRangeFlags& a, ELockRangeFlags b) { return a = a | b; }
inline ELockRangeFlags& operator&=(ELockRangeFlags& a, ELockRangeFlags b) { return a = a & b; }
inline bool operator!(ELockRangeFlags c) { return ELockRangeFlagsRaw(c) == 0; }
Expand Down

0 comments on commit 47026cc

Please sign in to comment.