Skip to content

Commit

Permalink
Fix VDisk replication token handling, add some extra checks and log p…
Browse files Browse the repository at this point in the history
…oints (merge from main #10371) (#11225)
  • Loading branch information
alexvru authored Nov 5, 2024
1 parent cc3d9d3 commit 66baaeb
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 6 deletions.
25 changes: 24 additions & 1 deletion ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ namespace NKikimr {
TEvResumeForce *ResumeForceToken = nullptr;
TInstant ReplicationEndTime;
bool UnrecoveredNonphantomBlobs = false;
bool RequestedReplicationToken = false;
bool HoldingReplicationToken = false;

TWatchdogTimer<TEvReplCheckProgress> ReplProgressWatchdog;

Expand Down Expand Up @@ -287,6 +289,12 @@ namespace NKikimr {
case Plan:
// this is a first quantum of replication, so we have to register it in the broker
State = AwaitToken;
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken);
if (RequestedReplicationToken) {
STLOG(PRI_CRIT, BS_REPL, BSVR38, ReplCtx->VCtx->VDiskLogPrefix << "excessive replication token requested");
break;
}
RequestedReplicationToken = true;
if (!Send(MakeBlobStorageReplBrokerID(), new TEvQueryReplToken(ReplCtx->VDiskCfg->BaseInfo.PDiskId))) {
HandleReplToken();
}
Expand All @@ -303,6 +311,10 @@ namespace NKikimr {
}

void HandleReplToken() {
Y_ABORT_UNLESS(RequestedReplicationToken);
RequestedReplicationToken = false;
HoldingReplicationToken = true;

// switch to replication state
Transition(AwaitToken, Replication);
if (!ResumeIfReady()) {
Expand Down Expand Up @@ -408,6 +420,9 @@ namespace NKikimr {
if (State == WaitQueues || State == Replication) {
// release token as we have finished replicating
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken);
Y_DEBUG_ABORT_UNLESS(HoldingReplicationToken);
HoldingReplicationToken = false;
}
ResetReplProgressTimer(true);

Expand Down Expand Up @@ -635,7 +650,15 @@ namespace NKikimr {

// return replication token if we have one
if (State == AwaitToken || State == WaitQueues || State == Replication) {
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
Y_DEBUG_ABORT_UNLESS(RequestedReplicationToken || HoldingReplicationToken);
if (RequestedReplicationToken || HoldingReplicationToken) {
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
}
} else {
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken && !HoldingReplicationToken);
if (RequestedReplicationToken || HoldingReplicationToken) {
STLOG(PRI_CRIT, BS_REPL, BSVR37, ReplCtx->VCtx->VDiskLogPrefix << "stuck replication token");
}
}

if (ReplJobActorId) {
Expand Down
22 changes: 17 additions & 5 deletions ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ namespace NKikimr {
ui64 NextReceiveCookie;
TResultQueue ResultQueue;
std::shared_ptr<TMessageRelevanceTracker> Tracker = std::make_shared<TMessageRelevanceTracker>();
bool Terminated = false;

TQueue<std::unique_ptr<TEvBlobStorage::TEvVGet>> SchedulerRequestQ;
THashMap<ui64, TReplMemTokenId> RequestTokens;
Expand Down Expand Up @@ -227,9 +228,7 @@ namespace NKikimr {
PrefetchDataSize = 0;
RequestFromVDiskProxyPending = false;
if (Finished) {
Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue);
RequestTokens.clear();
return PassAway(); // TODO(alexvru): check correctness of invocations
return PassAway();
}
}
// send request(s) if prefetch queue is not full
Expand Down Expand Up @@ -297,6 +296,9 @@ namespace NKikimr {
if (msg->Record.GetCookie() == NextReceiveCookie) {
ui64 cookie = NextReceiveCookie;
ProcessResult(msg);
if (Terminated) {
return;
}
ReleaseMemToken(cookie);
while (!ResultQueue.empty()) {
const TQueueItem& top = ResultQueue.top();
Expand All @@ -305,6 +307,9 @@ namespace NKikimr {
}
ui64 cookie = NextReceiveCookie;
ProcessResult(top.get());
if (Terminated) {
return;
}
ReleaseMemToken(cookie);
ResultQueue.pop();
}
Expand All @@ -314,6 +319,7 @@ namespace NKikimr {
}

void ReleaseMemToken(ui64 cookie) {
Y_ABORT_UNLESS(!Terminated);
if (RequestTokens) {
auto it = RequestTokens.find(cookie);
Y_ABORT_UNLESS(it != RequestTokens.end());
Expand Down Expand Up @@ -428,6 +434,13 @@ namespace NKikimr {
}
}

void PassAway() override {
Y_ABORT_UNLESS(!Terminated);
Terminated = true;
Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue);
TActorBootstrapped::PassAway();
}

STRICT_STFUNC(StateFunc,
hFunc(TEvReplProxyNext, Handle)
hFunc(TEvReplMemToken, Handle)
Expand All @@ -446,8 +459,7 @@ namespace NKikimr {
TTrackableVector<TVDiskProxy::TScheduledBlob>&& ids,
const TVDiskID& vdiskId,
const TActorId& serviceId)
: TActorBootstrapped<TVDiskProxyActor>()
, ReplCtx(std::move(replCtx))
: ReplCtx(std::move(replCtx))
, GType(ReplCtx->VCtx->Top->GType)
, Ids(std::move(ids))
, VDiskId(vdiskId)
Expand Down

0 comments on commit 66baaeb

Please sign in to comment.