From 66baaebb3d74805c4001de37f243d593c2c2e35d Mon Sep 17 00:00:00 2001 From: Alexander Rutkovsky Date: Tue, 5 Nov 2024 09:25:29 +0300 Subject: [PATCH] Fix VDisk replication token handling, add some extra checks and log points (merge from main #10371) (#11225) --- .../vdisk/repl/blobstorage_repl.cpp | 25 ++++++++++++++++++- .../vdisk/repl/blobstorage_replproxy.cpp | 22 ++++++++++++---- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp index 45f7f4250a02..b1b4e794e86e 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp @@ -174,6 +174,8 @@ namespace NKikimr { TEvResumeForce *ResumeForceToken = nullptr; TInstant ReplicationEndTime; bool UnrecoveredNonphantomBlobs = false; + bool RequestedReplicationToken = false; + bool HoldingReplicationToken = false; TWatchdogTimer ReplProgressWatchdog; @@ -287,6 +289,12 @@ namespace NKikimr { case Plan: // this is a first quantum of replication, so we have to register it in the broker State = AwaitToken; + Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken); + if (RequestedReplicationToken) { + STLOG(PRI_CRIT, BS_REPL, BSVR38, ReplCtx->VCtx->VDiskLogPrefix << "excessive replication token requested"); + break; + } + RequestedReplicationToken = true; if (!Send(MakeBlobStorageReplBrokerID(), new TEvQueryReplToken(ReplCtx->VDiskCfg->BaseInfo.PDiskId))) { HandleReplToken(); } @@ -303,6 +311,10 @@ namespace NKikimr { } void HandleReplToken() { + Y_ABORT_UNLESS(RequestedReplicationToken); + RequestedReplicationToken = false; + HoldingReplicationToken = true; + // switch to replication state Transition(AwaitToken, Replication); if (!ResumeIfReady()) { @@ -408,6 +420,9 @@ namespace NKikimr { if (State == WaitQueues || State == Replication) { // release token as we have finished replicating Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken); + Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken); + Y_DEBUG_ABORT_UNLESS(HoldingReplicationToken); + HoldingReplicationToken = false; } ResetReplProgressTimer(true); @@ -635,7 +650,15 @@ namespace NKikimr { // return replication token if we have one if (State == AwaitToken || State == WaitQueues || State == Replication) { - Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken); + Y_DEBUG_ABORT_UNLESS(RequestedReplicationToken || HoldingReplicationToken); + if (RequestedReplicationToken || HoldingReplicationToken) { + Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken); + } + } else { + Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken && !HoldingReplicationToken); + if (RequestedReplicationToken || HoldingReplicationToken) { + STLOG(PRI_CRIT, BS_REPL, BSVR37, ReplCtx->VCtx->VDiskLogPrefix << "stuck replication token"); + } } if (ReplJobActorId) { diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp index b37ce712e0c5..0a8569e1241f 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp @@ -129,6 +129,7 @@ namespace NKikimr { ui64 NextReceiveCookie; TResultQueue ResultQueue; std::shared_ptr Tracker = std::make_shared(); + bool Terminated = false; TQueue> SchedulerRequestQ; THashMap RequestTokens; @@ -227,9 +228,7 @@ namespace NKikimr { PrefetchDataSize = 0; RequestFromVDiskProxyPending = false; if (Finished) { - Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue); - RequestTokens.clear(); - return PassAway(); // TODO(alexvru): check correctness of invocations + return PassAway(); } } // send request(s) if prefetch queue is not full @@ -297,6 +296,9 @@ namespace NKikimr { if (msg->Record.GetCookie() == NextReceiveCookie) { ui64 cookie = NextReceiveCookie; ProcessResult(msg); + if (Terminated) { + return; + } ReleaseMemToken(cookie); while (!ResultQueue.empty()) { const TQueueItem& top = ResultQueue.top(); @@ -305,6 +307,9 @@ namespace NKikimr { } ui64 cookie = NextReceiveCookie; ProcessResult(top.get()); + if (Terminated) { + return; + } ReleaseMemToken(cookie); ResultQueue.pop(); } @@ -314,6 +319,7 @@ namespace NKikimr { } void ReleaseMemToken(ui64 cookie) { + Y_ABORT_UNLESS(!Terminated); if (RequestTokens) { auto it = RequestTokens.find(cookie); Y_ABORT_UNLESS(it != RequestTokens.end()); @@ -428,6 +434,13 @@ namespace NKikimr { } } + void PassAway() override { + Y_ABORT_UNLESS(!Terminated); + Terminated = true; + Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue); + TActorBootstrapped::PassAway(); + } + STRICT_STFUNC(StateFunc, hFunc(TEvReplProxyNext, Handle) hFunc(TEvReplMemToken, Handle) @@ -446,8 +459,7 @@ namespace NKikimr { TTrackableVector&& ids, const TVDiskID& vdiskId, const TActorId& serviceId) - : TActorBootstrapped() - , ReplCtx(std::move(replCtx)) + : ReplCtx(std::move(replCtx)) , GType(ReplCtx->VCtx->Top->GType) , Ids(std::move(ids)) , VDiskId(vdiskId)