From 1ab6e3b4a8528f5b029d165d74d5b42d3d546942 Mon Sep 17 00:00:00 2001 From: Vlad Kuznetsov Date: Thu, 5 Dec 2024 17:20:20 +0100 Subject: [PATCH] Add more asserts that TEvLogWrite is never left unreplied (#12331) --- .../blobstorage_pdisk_completion_impl.cpp | 23 ++++++++----------- .../pdisk/blobstorage_pdisk_requestimpl.h | 8 +++++++ 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp index 4b32f513f9f3..6845eb52633a 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp @@ -42,6 +42,7 @@ void TCompletionLogWrite::Exec(TActorSystem *actorSystem) { NHPTimer::STime now = HPNow(); for (auto it = LogWriteQueue.begin(); it != LogWriteQueue.end(); ++it) { TLogWrite &evLog = *(*it); + evLog.Replied = true; TLogWrite *&batch = batchMap[evLog.Owner]; LOG_DEBUG_S(*actorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDisk->PCtx->PDiskId << " ReqId# " << evLog.ReqId.Id << " TEvLogResult Sender# " << evLog.Sender.LocalId() @@ -98,17 +99,12 @@ void TCompletionLogWrite::Exec(TActorSystem *actorSystem) { } void TCompletionLogWrite::Release(TActorSystem *actorSystem) { - switch (Result) { - case EIoResult::Ok: - case EIoResult::Unknown: - break; - default: - for (TLogWrite *logWrite : LogWriteQueue) { - auto res = MakeHolder(NKikimrProto::CORRUPTED, NKikimrBlobStorage::StatusIsValid, - ErrorReason); - actorSystem->Send(logWrite->Sender, res.Release()); - PDisk->Mon.WriteLog.CountResponse(); - } + for (TLogWrite *logWrite : LogWriteQueue) { + auto res = MakeHolder(NKikimrProto::CORRUPTED, NKikimrBlobStorage::StatusIsValid, + ErrorReason); + logWrite->Replied = true; + actorSystem->Send(logWrite->Sender, res.Release()); + PDisk->Mon.WriteLog.CountResponse(); } delete this; @@ -323,7 +319,7 @@ void TCompletionChunkRead::Exec(TActorSystem *actorSystem) { result->Data.Commit(); Y_ABORT_UNLESS(Read); - LOG_DEBUG_S(*actorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDisk->PCtx->PDiskId << " ReqId# " << Read->ReqId.Id + LOG_DEBUG_S(*actorSystem, NKikimrServices::BS_PDISK, "Reply from TCompletionChunkRead, PDiskId# " << PDisk->PCtx->PDiskId << " ReqId# " << Read->ReqId.Id << " " << result->ToString() << " To# " << Read->Sender.LocalId()); double responseTimeMs = HPMilliSecondsFloat(HPNow() - Read->CreationTime); @@ -333,6 +329,7 @@ void TCompletionChunkRead::Exec(TActorSystem *actorSystem) { actorSystem->Send(Read->Sender, result.Release()); Read->IsReplied = true; + PDisk->Mon.GetReadCounter(Read->PriorityClass)->CountResponse(); execSpan.EndOk(); Span.EndOk(); @@ -344,7 +341,7 @@ void TCompletionChunkRead::ReplyError(TActorSystem *actorSystem, TString reason) CommonBuffer.Clear(); TStringStream error; - error << "PDiskId# " << PDisk->PCtx->PDiskId << " ReqId# " << Read->ReqId << " reason# " << reason; + error << "Reply Error from TCompletionChunkRead PDiskId# " << PDisk->PCtx->PDiskId << " ReqId# " << Read->ReqId << " reason# " << reason; auto result = MakeHolder(NKikimrProto::CORRUPTED, Read->ChunkIdx, Read->Offset, Read->Cookie, PDisk->GetStatusFlags(Read->Owner, Read->OwnerGroupType), error.Str()); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h index 93c246f728b9..d4987847cd67 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h @@ -95,6 +95,10 @@ class TRequestBase : public TThrRefBase { return HPMilliSecondsFloat(now - CreationTime); } + double GetCostMs() const { + return Cost / 1.0e6; // since cost is in nanoseconds + } + ui64 GetCost() const { return Cost; } @@ -295,6 +299,8 @@ class TLogWrite : public TRequestBase { THolder Result; std::function OnDestroy; + bool Replied = false; + TLogWrite(NPDisk::TEvLog &ev, const TActorId &sender, ui32 estimatedChunkIdx, TReqId reqId, NWilson::TSpan span) : TRequestBase(sender, reqId, ev.Owner, ev.OwnerRound, NPriInternal::LogWrite, std::move(span)) , Signature(ev.Signature) @@ -311,6 +317,7 @@ class TLogWrite : public TRequestBase { } virtual ~TLogWrite() { + Y_DEBUG_ABORT_UNLESS(Replied); if (OnDestroy) { OnDestroy(); } @@ -347,6 +354,7 @@ class TLogWrite : public TRequestBase { auto *result = new NPDisk::TEvLogResult(NKikimrProto::CORRUPTED, 0, "TLogWrite is being aborted"); result->Results.emplace_back(Lsn, Cookie); actorSystem->Send(Sender, result); + Replied = true; } TString ToString() const {