Skip to content

Commit

Permalink
Add more asserts that TEvLogWrite is never left unreplied (#12331)
Browse files Browse the repository at this point in the history
  • Loading branch information
va-kuznecov authored Dec 5, 2024
1 parent a86e9b0 commit 1ab6e3b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
23 changes: 10 additions & 13 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void TCompletionLogWrite::Exec(TActorSystem *actorSystem) {
NHPTimer::STime now = HPNow();
for (auto it = LogWriteQueue.begin(); it != LogWriteQueue.end(); ++it) {
TLogWrite &evLog = *(*it);
evLog.Replied = true;
TLogWrite *&batch = batchMap[evLog.Owner];
LOG_DEBUG_S(*actorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDisk->PCtx->PDiskId
<< " ReqId# " << evLog.ReqId.Id << " TEvLogResult Sender# " << evLog.Sender.LocalId()
Expand Down Expand Up @@ -98,17 +99,12 @@ void TCompletionLogWrite::Exec(TActorSystem *actorSystem) {
}

void TCompletionLogWrite::Release(TActorSystem *actorSystem) {
switch (Result) {
case EIoResult::Ok:
case EIoResult::Unknown:
break;
default:
for (TLogWrite *logWrite : LogWriteQueue) {
auto res = MakeHolder<TEvLogResult>(NKikimrProto::CORRUPTED, NKikimrBlobStorage::StatusIsValid,
ErrorReason);
actorSystem->Send(logWrite->Sender, res.Release());
PDisk->Mon.WriteLog.CountResponse();
}
for (TLogWrite *logWrite : LogWriteQueue) {
auto res = MakeHolder<TEvLogResult>(NKikimrProto::CORRUPTED, NKikimrBlobStorage::StatusIsValid,
ErrorReason);
logWrite->Replied = true;
actorSystem->Send(logWrite->Sender, res.Release());
PDisk->Mon.WriteLog.CountResponse();
}

delete this;
Expand Down Expand Up @@ -323,7 +319,7 @@ void TCompletionChunkRead::Exec(TActorSystem *actorSystem) {
result->Data.Commit();

Y_ABORT_UNLESS(Read);
LOG_DEBUG_S(*actorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDisk->PCtx->PDiskId << " ReqId# " << Read->ReqId.Id
LOG_DEBUG_S(*actorSystem, NKikimrServices::BS_PDISK, "Reply from TCompletionChunkRead, PDiskId# " << PDisk->PCtx->PDiskId << " ReqId# " << Read->ReqId.Id
<< " " << result->ToString() << " To# " << Read->Sender.LocalId());

double responseTimeMs = HPMilliSecondsFloat(HPNow() - Read->CreationTime);
Expand All @@ -333,6 +329,7 @@ void TCompletionChunkRead::Exec(TActorSystem *actorSystem) {

actorSystem->Send(Read->Sender, result.Release());
Read->IsReplied = true;

PDisk->Mon.GetReadCounter(Read->PriorityClass)->CountResponse();
execSpan.EndOk();
Span.EndOk();
Expand All @@ -344,7 +341,7 @@ void TCompletionChunkRead::ReplyError(TActorSystem *actorSystem, TString reason)
CommonBuffer.Clear();

TStringStream error;
error << "PDiskId# " << PDisk->PCtx->PDiskId << " ReqId# " << Read->ReqId << " reason# " << reason;
error << "Reply Error from TCompletionChunkRead PDiskId# " << PDisk->PCtx->PDiskId << " ReqId# " << Read->ReqId << " reason# " << reason;
auto result = MakeHolder<TEvChunkReadResult>(NKikimrProto::CORRUPTED,
Read->ChunkIdx, Read->Offset, Read->Cookie,
PDisk->GetStatusFlags(Read->Owner, Read->OwnerGroupType), error.Str());
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ class TRequestBase : public TThrRefBase {
return HPMilliSecondsFloat(now - CreationTime);
}

double GetCostMs() const {
return Cost / 1.0e6; // since cost is in nanoseconds
}

ui64 GetCost() const {
return Cost;
}
Expand Down Expand Up @@ -295,6 +299,8 @@ class TLogWrite : public TRequestBase {
THolder<NPDisk::TEvLogResult> Result;
std::function<void()> OnDestroy;

bool Replied = false;

TLogWrite(NPDisk::TEvLog &ev, const TActorId &sender, ui32 estimatedChunkIdx, TReqId reqId, NWilson::TSpan span)
: TRequestBase(sender, reqId, ev.Owner, ev.OwnerRound, NPriInternal::LogWrite, std::move(span))
, Signature(ev.Signature)
Expand All @@ -311,6 +317,7 @@ class TLogWrite : public TRequestBase {
}

virtual ~TLogWrite() {
Y_DEBUG_ABORT_UNLESS(Replied);
if (OnDestroy) {
OnDestroy();
}
Expand Down Expand Up @@ -347,6 +354,7 @@ class TLogWrite : public TRequestBase {
auto *result = new NPDisk::TEvLogResult(NKikimrProto::CORRUPTED, 0, "TLogWrite is being aborted");
result->Results.emplace_back(Lsn, Cookie);
actorSystem->Send(Sender, result);
Replied = true;
}

TString ToString() const {
Expand Down

0 comments on commit 1ab6e3b

Please sign in to comment.