diff --git a/ydb/core/tablet/tablet_resolver.cpp b/ydb/core/tablet/tablet_resolver.cpp index 5c5e5bdc0d19..e6c1508d9007 100644 --- a/ydb/core/tablet/tablet_resolver.cpp +++ b/ydb/core/tablet/tablet_resolver.cpp @@ -649,8 +649,9 @@ class TTabletResolver : public TActorBootstrapped { if (!(entry.KnownLeaderTablet == msg->CurrentLeaderTablet || !entry.KnownLeaderTablet)) { DropEntry(tabletId, entry, ctx); // got info but not full, occurs on transitional cluster states } else { - entry.KnownLeaderTablet = msg->CurrentLeaderTablet; entry.State = TEntry::StProblemPing; + entry.KnownLeaderTablet = msg->CurrentLeaderTablet; + entry.KnownFollowers = std::move(msg->Followers); SendPing(tabletId, entry, ctx); } } else { diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index 31b6456c8a5c..8ae953f62914 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -1992,6 +1992,127 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { UNIT_ASSERT(splitLatency < TDuration::Seconds(5)); } + Y_UNIT_TEST(DistributedOutOfOrderFollowerConsistency) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetNodeCount(1) + .SetUseRealThreads(false) + .SetEnableDataShardVolatileTransactions(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::TABLET_RESOLVER, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::STATESTORAGE, NLog::PRI_TRACE); + + InitRoot(server, sender); + + auto opts = TShardedTableOptions() + .Shards(1) + .Followers(1); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + CreateShardedTable(server, sender, "/Root", "table-2", opts); + + runtime.SimulateSleep(TDuration::Seconds(1)); + for (ui64 shard : GetTableShards(server, sender, "/Root/table-1")) { + InvalidateTabletResolverCache(runtime, shard); + } + for (ui64 shard : GetTableShards(server, sender, "/Root/table-2")) { + InvalidateTabletResolverCache(runtime, shard); + } + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2);"); + + // Let followers catch up + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Block readset exchange + std::vector> readSets; + auto blockReadSets = runtime.AddObserver([&](TEvTxProcessing::TEvReadSet::TPtr& ev) { + readSets.emplace_back(ev.Release()); + }); + + // Start a distributed write to both tables + TString sessionId = CreateSessionRPC(runtime, "/Root"); + auto upsertResult = SendRequest( + runtime, + MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3); + UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 4); + )", sessionId, /* txId */ "", /* commitTx */ true), + "/Root"); + WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets"); + + // Stop blocking further readsets + blockReadSets.Remove(); + + // Start another distributed write to both tables, it should succeed + ExecSQL(server, sender, R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 5); + UPSERT INTO `/Root/table-2` (key, value) VALUES (6, 6); + )"); + + // Let followers catch up + runtime.SimulateSleep(TDuration::Seconds(1)); + for (ui64 shard : GetTableShards(server, sender, "/Root/table-1")) { + InvalidateTabletResolverCache(runtime, shard); + } + for (ui64 shard : GetTableShards(server, sender, "/Root/table-2")) { + InvalidateTabletResolverCache(runtime, shard); + } + + // Check tables, they shouldn't see inconsistent results with the latest write + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-1` + ORDER BY key + )"), "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-2` + ORDER BY key + )"), "/Root"), + "{ items { uint32_value: 2 } items { uint32_value: 2 } }"); + + // Unblock readsets + for (auto& ev : readSets) { + ui32 nodeIndex = ev->GetRecipientRewrite().NodeId() - runtime.GetNodeId(0); + runtime.Send(ev.release(), nodeIndex, true); + } + readSets.clear(); + + // Let followers catch up + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Check tables again, they should have all rows visible now + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-1` + ORDER BY key + )")), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }, " + "{ items { uint32_value: 5 } items { uint32_value: 5 } }"); + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-2` + ORDER BY key + )")), + "{ items { uint32_value: 2 } items { uint32_value: 2 } }, " + "{ items { uint32_value: 4 } items { uint32_value: 4 } }, " + "{ items { uint32_value: 6 } items { uint32_value: 6 } }"); + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/follower_edge.cpp b/ydb/core/tx/datashard/follower_edge.cpp index 0663f004fd30..69de5fe15bbf 100644 --- a/ydb/core/tx/datashard/follower_edge.cpp +++ b/ydb/core/tx/datashard/follower_edge.cpp @@ -6,13 +6,24 @@ std::tuple TDataShard::CalculateFollowerReadEdge() cons Y_ABORT_UNLESS(!IsFollower()); Y_DEBUG_ABORT_UNLESS(IsMvccEnabled()); + TRowVersion volatileUncertain = VolatileTxManager.GetMinUncertainVersion(); + for (auto order : TransQueue.GetPlan()) { // When we have planned operations we assume the first one may be used // for new writes, so we mark is as non-repeatable. We could skip // readonly operations, but there's little benefit in that, and it's // complicated to determine which is the first readable given we may // have executed some out of order. - return { TRowVersion(order.Step, order.TxId), false, 0 }; + return { Min(volatileUncertain, TRowVersion(order.Step, order.TxId)), false, 0 }; + } + + if (!volatileUncertain.IsMax()) { + // We have some uncertainty in an unresolved volatile commit + // Allow followers to read from it in non-repeatable snapshot modes + // FIXME: when at least one write is committed at this version, it + // should stop being non-repeatable, and followers need to resolve + // other possibly out-of-order commits. + return { volatileUncertain, false, 0 }; } // This is the max version where we had any writes diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 470fa7ef44cb..35af1fd255fb 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -598,12 +598,18 @@ namespace NKikimr::NDataShard { UnblockWaitingRemovalOperations(info); + TRowVersion prevUncertain = GetMinUncertainVersion(); + for (ui64 commitTxId : info->CommitTxIds) { VolatileTxByCommitTxId.erase(commitTxId); } VolatileTxByVersion.erase(info); VolatileTxs.erase(txId); + if (prevUncertain < GetMinUncertainVersion()) { + Self->PromoteFollowerReadEdge(); + } + if (!WaitingSnapshotEvents.empty()) { TVolatileTxInfo* next = !VolatileTxByVersion.empty() ? *VolatileTxByVersion.begin() : nullptr; while (!WaitingSnapshotEvents.empty()) { diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h index ca7c05e20581..ab7533038e1c 100644 --- a/ydb/core/tx/datashard/volatile_tx.h +++ b/ydb/core/tx/datashard/volatile_tx.h @@ -197,6 +197,14 @@ namespace NKikimr::NDataShard { return !VolatileTxByVersion.empty() && (*VolatileTxByVersion.begin())->Version <= snapshot; } + TRowVersion GetMinUncertainVersion() const { + if (!VolatileTxByVersion.empty()) { + return (*VolatileTxByVersion.begin())->Version; + } else { + return TRowVersion::Max(); + } + } + void PersistAddVolatileTx( ui64 txId, const TRowVersion& version, TConstArrayRef commitTxIds,