Skip to content

Commit

Permalink
Make sure out-of-order volatile commits are not visible on followers …
Browse files Browse the repository at this point in the history
…KIKIMR-20853 (#1344)
  • Loading branch information
snaury authored Jan 26, 2024
1 parent 7ebcfd0 commit 169272c
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 2 deletions.
3 changes: 2 additions & 1 deletion ydb/core/tablet/tablet_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,9 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
if (!(entry.KnownLeaderTablet == msg->CurrentLeaderTablet || !entry.KnownLeaderTablet)) {
DropEntry(tabletId, entry, ctx); // got info but not full, occurs on transitional cluster states
} else {
entry.KnownLeaderTablet = msg->CurrentLeaderTablet;
entry.State = TEntry::StProblemPing;
entry.KnownLeaderTablet = msg->CurrentLeaderTablet;
entry.KnownFollowers = std::move(msg->Followers);
SendPing(tabletId, entry, ctx);
}
} else {
Expand Down
121 changes: 121 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_volatile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1992,6 +1992,127 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
UNIT_ASSERT(splitLatency < TDuration::Seconds(5));
}

Y_UNIT_TEST(DistributedOutOfOrderFollowerConsistency) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetNodeCount(1)
.SetUseRealThreads(false)
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::TABLET_RESOLVER, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::STATESTORAGE, NLog::PRI_TRACE);

InitRoot(server, sender);

auto opts = TShardedTableOptions()
.Shards(1)
.Followers(1);
CreateShardedTable(server, sender, "/Root", "table-1", opts);
CreateShardedTable(server, sender, "/Root", "table-2", opts);

runtime.SimulateSleep(TDuration::Seconds(1));
for (ui64 shard : GetTableShards(server, sender, "/Root/table-1")) {
InvalidateTabletResolverCache(runtime, shard);
}
for (ui64 shard : GetTableShards(server, sender, "/Root/table-2")) {
InvalidateTabletResolverCache(runtime, shard);
}

ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2);");

// Let followers catch up
runtime.SimulateSleep(TDuration::Seconds(1));

// Block readset exchange
std::vector<std::unique_ptr<IEventHandle>> readSets;
auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
readSets.emplace_back(ev.Release());
});

// Start a distributed write to both tables
TString sessionId = CreateSessionRPC(runtime, "/Root");
auto upsertResult = SendRequest(
runtime,
MakeSimpleRequestRPC(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3);
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 4);
)", sessionId, /* txId */ "", /* commitTx */ true),
"/Root");
WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets");

// Stop blocking further readsets
blockReadSets.Remove();

// Start another distributed write to both tables, it should succeed
ExecSQL(server, sender, R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 5);
UPSERT INTO `/Root/table-2` (key, value) VALUES (6, 6);
)");

// Let followers catch up
runtime.SimulateSleep(TDuration::Seconds(1));
for (ui64 shard : GetTableShards(server, sender, "/Root/table-1")) {
InvalidateTabletResolverCache(runtime, shard);
}
for (ui64 shard : GetTableShards(server, sender, "/Root/table-2")) {
InvalidateTabletResolverCache(runtime, shard);
}

// Check tables, they shouldn't see inconsistent results with the latest write
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleStaleRoExec(runtime, Q_(R"(
SELECT key, value
FROM `/Root/table-1`
ORDER BY key
)"), "/Root"),
"{ items { uint32_value: 1 } items { uint32_value: 1 } }");
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleStaleRoExec(runtime, Q_(R"(
SELECT key, value
FROM `/Root/table-2`
ORDER BY key
)"), "/Root"),
"{ items { uint32_value: 2 } items { uint32_value: 2 } }");

// Unblock readsets
for (auto& ev : readSets) {
ui32 nodeIndex = ev->GetRecipientRewrite().NodeId() - runtime.GetNodeId(0);
runtime.Send(ev.release(), nodeIndex, true);
}
readSets.clear();

// Let followers catch up
runtime.SimulateSleep(TDuration::Seconds(1));

// Check tables again, they should have all rows visible now
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleStaleRoExec(runtime, Q_(R"(
SELECT key, value
FROM `/Root/table-1`
ORDER BY key
)")),
"{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 3 } }, "
"{ items { uint32_value: 5 } items { uint32_value: 5 } }");
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleStaleRoExec(runtime, Q_(R"(
SELECT key, value
FROM `/Root/table-2`
ORDER BY key
)")),
"{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
"{ items { uint32_value: 4 } items { uint32_value: 4 } }, "
"{ items { uint32_value: 6 } items { uint32_value: 6 } }");
}

} // Y_UNIT_TEST_SUITE(DataShardVolatile)

} // namespace NKikimr
13 changes: 12 additions & 1 deletion ydb/core/tx/datashard/follower_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,24 @@ std::tuple<TRowVersion, bool, ui64> TDataShard::CalculateFollowerReadEdge() cons
Y_ABORT_UNLESS(!IsFollower());
Y_DEBUG_ABORT_UNLESS(IsMvccEnabled());

TRowVersion volatileUncertain = VolatileTxManager.GetMinUncertainVersion();

for (auto order : TransQueue.GetPlan()) {
// When we have planned operations we assume the first one may be used
// for new writes, so we mark is as non-repeatable. We could skip
// readonly operations, but there's little benefit in that, and it's
// complicated to determine which is the first readable given we may
// have executed some out of order.
return { TRowVersion(order.Step, order.TxId), false, 0 };
return { Min(volatileUncertain, TRowVersion(order.Step, order.TxId)), false, 0 };
}

if (!volatileUncertain.IsMax()) {
// We have some uncertainty in an unresolved volatile commit
// Allow followers to read from it in non-repeatable snapshot modes
// FIXME: when at least one write is committed at this version, it
// should stop being non-repeatable, and followers need to resolve
// other possibly out-of-order commits.
return { volatileUncertain, false, 0 };
}

// This is the max version where we had any writes
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/datashard/volatile_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,12 +598,18 @@ namespace NKikimr::NDataShard {

UnblockWaitingRemovalOperations(info);

TRowVersion prevUncertain = GetMinUncertainVersion();

for (ui64 commitTxId : info->CommitTxIds) {
VolatileTxByCommitTxId.erase(commitTxId);
}
VolatileTxByVersion.erase(info);
VolatileTxs.erase(txId);

if (prevUncertain < GetMinUncertainVersion()) {
Self->PromoteFollowerReadEdge();
}

if (!WaitingSnapshotEvents.empty()) {
TVolatileTxInfo* next = !VolatileTxByVersion.empty() ? *VolatileTxByVersion.begin() : nullptr;
while (!WaitingSnapshotEvents.empty()) {
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/datashard/volatile_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ namespace NKikimr::NDataShard {
return !VolatileTxByVersion.empty() && (*VolatileTxByVersion.begin())->Version <= snapshot;
}

TRowVersion GetMinUncertainVersion() const {
if (!VolatileTxByVersion.empty()) {
return (*VolatileTxByVersion.begin())->Version;
} else {
return TRowVersion::Max();
}
}

void PersistAddVolatileTx(
ui64 txId, const TRowVersion& version,
TConstArrayRef<ui64> commitTxIds,
Expand Down

0 comments on commit 169272c

Please sign in to comment.