diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp index e63642c19af6..397a2cbd3de3 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp @@ -183,7 +183,7 @@ void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TG for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) { (*outNWorst)[diskIdx] = { GetPredictedDelayNs(info, groupQueues, diskIdx, queueId), diskIdx }; } - std::partial_sort(outNWorst->begin(), outNWorst->begin() + nWorst, outNWorst->end()); + std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, (ui32)Disks.size()), outNWorst->end()); } bool TBlobState::HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const { @@ -460,12 +460,11 @@ void TBlackboard::ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapInde void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const { ui32 totalVDisks = info.GetTotalVDisksNum(); - Y_DEBUG_ABORT_UNLESS(nWorst < totalVDisks); outNWorst->resize(totalVDisks); for (ui32 orderNumber = 0; orderNumber < totalVDisks; ++orderNumber) { (*outNWorst)[orderNumber] = { groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId), orderNumber }; } - std::sort(outNWorst->begin(), outNWorst->end()); + std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, totalVDisks), outNWorst->end()); } void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp index 6857ba3dae61..03ad6fa12048 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp @@ -38,8 +38,11 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor> &vGets, TDeque> &vPuts) { ReportBytes(GetImpl.GrabBytesToReport()); - RequestsSent += vGets.size(); - RequestsSent += vPuts.size(); + GetRequestsSent += vGets.size(); + PutRequestsSent += vPuts.size(); CountPuts(vPuts); if (vPuts.size()) { if (!IsPutStarted) { @@ -208,7 +211,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor> vGets; TAutoPtr getResult; - ResponsesReceived++; + GetResponsesReceived++; TDeque> vPuts; GetImpl.OnVGetResult(LogCtx, *ev->Get(), vGets, vPuts, getResult); SendVGetsAndVPuts(vGets, vPuts); @@ -217,8 +220,10 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor ResponsesReceived, "RequestsSent# %" PRIu32 " ResponsesReceived# %" PRIu32 - " GetImpl.DumpFullState# %s", RequestsSent, ResponsesReceived, GetImpl.DumpFullState().c_str()); + Y_ABORT_UNLESS(GetRequestsSent + PutRequestsSent > GetResponsesReceived + PutResponsesReceived, + "GetRequestsSent# %" PRIu32 " GetResponsesReceived# %" PRIu32 "PutRequestsSent# %" PRIu32 + " PutResponsesReceived# %" PRIu32 " GetImpl.DumpFullState# %s", GetRequestsSent, + GetResponsesReceived, PutRequestsSent, PutResponsesReceived, GetImpl.DumpFullState().c_str()); TryScheduleGetAcceleration(); if (IsPutStarted) { @@ -228,7 +233,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor> vGets; TDeque> vPuts; TAutoPtr getResult; - ResponsesReceived++; + PutResponsesReceived++; GetImpl.OnVPutResult(LogCtx, *ev->Get(), vGets, vPuts, getResult); SendVGetsAndVPuts(vGets, vPuts); @@ -301,15 +306,18 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor ResponsesReceived, "RequestsSent# %" PRIu64 " ResponsesReceived# %" PRIu64, - ui64(RequestsSent), ui64(ResponsesReceived)); + + Y_ABORT_UNLESS(GetRequestsSent + PutRequestsSent > GetResponsesReceived + PutResponsesReceived, + "GetRequestsSent# %" PRIu64 " GetResponsesReceived# %" PRIu64 "PutRequestsSent# %" PRIu64 + " PutResponsesReceived# %" PRIu64, ui64(GetRequestsSent), ui64(GetResponsesReceived), + ui64(PutRequestsSent), ui64(PutResponsesReceived)); TrySchedulePutAcceleration(); SanityCheck(); // May Die } void TryScheduleGetAcceleration() { - if (!IsGetAccelerateScheduled && GetsAccelerated < 2) { + if (!IsGetAccelerateScheduled && GetsAccelerated < 2 && GetRequestsSent < GroupSize) { // Count VDisks that have requests in flight, if there is no more than 2 such VDisks, Accelerate if (CountDisksWithActiveRequests() <= 2) { ui64 timeToAccelerateUs = GetImpl.GetTimeToAccelerateGetNs(LogCtx, GetsAccelerated) / 1000; @@ -328,7 +336,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActorDeadline) , StartTime(now) , StartTimePut(StartTime) - , RequestsSent(0) - , ResponsesReceived(0) + , GroupSize(info->Type.BlobSubgroupSize()) , ReportedBytes(0) { ReportBytes(sizeof(*this)); @@ -438,7 +445,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor ResponsesReceived); + Y_ABORT_UNLESS(GetRequestsSent + PutRequestsSent > GetResponsesReceived + PutResponsesReceived); Become(&TThis::StateWait); SanityCheck(); // May Die }