Skip to content

Commit

Permalink
Fix acceleration with ErasureNone
Browse files Browse the repository at this point in the history
  • Loading branch information
serbel324 committed May 27, 2024
1 parent 4855ebd commit fb9369f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 19 deletions.
5 changes: 2 additions & 3 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TG
for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
(*outNWorst)[diskIdx] = { GetPredictedDelayNs(info, groupQueues, diskIdx, queueId), diskIdx };
}
std::partial_sort(outNWorst->begin(), outNWorst->begin() + nWorst, outNWorst->end());
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, (ui32)Disks.size()), outNWorst->end());
}

bool TBlobState::HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const {
Expand Down Expand Up @@ -460,12 +460,11 @@ void TBlackboard::ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapInde
void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const {
ui32 totalVDisks = info.GetTotalVDisksNum();
Y_DEBUG_ABORT_UNLESS(nWorst < totalVDisks);
outNWorst->resize(totalVDisks);
for (ui32 orderNumber = 0; orderNumber < totalVDisks; ++orderNumber) {
(*outNWorst)[orderNumber] = { groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId), orderNumber };
}
std::sort(outNWorst->begin(), outNWorst->end());
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, totalVDisks), outNWorst->end());
}

void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) {
Expand Down
39 changes: 23 additions & 16 deletions ydb/core/blobstorage/dsproxy/dsproxy_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
const TInstant Deadline;
TInstant StartTime;
TInstant StartTimePut;
ui32 RequestsSent;
ui32 ResponsesReceived;
ui32 GetRequestsSent = 0;
ui32 PutRequestsSent = 0;
ui32 GetResponsesReceived = 0;
ui32 PutResponsesReceived = 0;
ui32 GroupSize;
i64 ReportedBytes;
ui32 MaxSaneRequests = 0;
bool IsPutStarted = false;
Expand Down Expand Up @@ -99,8 +102,8 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
void SendVGetsAndVPuts(TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &vGets,
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &vPuts) {
ReportBytes(GetImpl.GrabBytesToReport());
RequestsSent += vGets.size();
RequestsSent += vPuts.size();
GetRequestsSent += vGets.size();
PutRequestsSent += vPuts.size();
CountPuts(vPuts);
if (vPuts.size()) {
if (!IsPutStarted) {
Expand Down Expand Up @@ -208,7 +211,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt

TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets;
TAutoPtr<TEvBlobStorage::TEvGetResult> getResult;
ResponsesReceived++;
GetResponsesReceived++;
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
GetImpl.OnVGetResult(LogCtx, *ev->Get(), vGets, vPuts, getResult);
SendVGetsAndVPuts(vGets, vPuts);
Expand All @@ -217,8 +220,10 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
SendReplyAndDie(getResult);
return;
}
Y_ABORT_UNLESS(RequestsSent > ResponsesReceived, "RequestsSent# %" PRIu32 " ResponsesReceived# %" PRIu32
" GetImpl.DumpFullState# %s", RequestsSent, ResponsesReceived, GetImpl.DumpFullState().c_str());
Y_ABORT_UNLESS(GetRequestsSent + PutRequestsSent > GetResponsesReceived + PutResponsesReceived,
"GetRequestsSent# %" PRIu32 " GetResponsesReceived# %" PRIu32 "PutRequestsSent# %" PRIu32
" PutResponsesReceived# %" PRIu32 " GetImpl.DumpFullState# %s", GetRequestsSent,
GetResponsesReceived, PutRequestsSent, PutResponsesReceived, GetImpl.DumpFullState().c_str());

TryScheduleGetAcceleration();
if (IsPutStarted) {
Expand All @@ -228,7 +233,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
}

void SanityCheck() {
if (RequestsSent <= MaxSaneRequests) {
if (GetRequestsSent + PutRequestsSent <= MaxSaneRequests) {
return;
}
TStringStream err;
Expand Down Expand Up @@ -293,23 +298,26 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets;
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
TAutoPtr<TEvBlobStorage::TEvGetResult> getResult;
ResponsesReceived++;
PutResponsesReceived++;

GetImpl.OnVPutResult(LogCtx, *ev->Get(), vGets, vPuts, getResult);
SendVGetsAndVPuts(vGets, vPuts);
if (getResult) {
SendReplyAndDie(getResult);
return;
}
Y_ABORT_UNLESS(RequestsSent > ResponsesReceived, "RequestsSent# %" PRIu64 " ResponsesReceived# %" PRIu64,
ui64(RequestsSent), ui64(ResponsesReceived));

Y_ABORT_UNLESS(GetRequestsSent + PutRequestsSent > GetResponsesReceived + PutResponsesReceived,
"GetRequestsSent# %" PRIu64 " GetResponsesReceived# %" PRIu64 "PutRequestsSent# %" PRIu64
" PutResponsesReceived# %" PRIu64, ui64(GetRequestsSent), ui64(GetResponsesReceived),
ui64(PutRequestsSent), ui64(PutResponsesReceived));

TrySchedulePutAcceleration();
SanityCheck(); // May Die
}

void TryScheduleGetAcceleration() {
if (!IsGetAccelerateScheduled && GetsAccelerated < 2) {
if (!IsGetAccelerateScheduled && GetsAccelerated < 2 && GetRequestsSent < GroupSize) {
// Count VDisks that have requests in flight, if there is no more than 2 such VDisks, Accelerate
if (CountDisksWithActiveRequests() <= 2) {
ui64 timeToAccelerateUs = GetImpl.GetTimeToAccelerateGetNs(LogCtx, GetsAccelerated) / 1000;
Expand All @@ -328,7 +336,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
}

void TrySchedulePutAcceleration() {
if (!IsPutAccelerateScheduled && PutsAccelerated < 2) {
if (!IsPutAccelerateScheduled && PutsAccelerated < 2 && PutRequestsSent < GroupSize) {
// Count VDisks that have requests in flight, if there is no more than 2 such VDisks, Accelerate
if (CountDisksWithActiveRequests() <= 2) {
ui64 timeToAccelerateUs = GetImpl.GetTimeToAcceleratePutNs(LogCtx, PutsAccelerated) / 1000;
Expand Down Expand Up @@ -403,8 +411,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
, Deadline(ev->Deadline)
, StartTime(now)
, StartTimePut(StartTime)
, RequestsSent(0)
, ResponsesReceived(0)
, GroupSize(info->Type.BlobSubgroupSize())
, ReportedBytes(0)
{
ReportBytes(sizeof(*this));
Expand Down Expand Up @@ -438,7 +445,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
SendVGetsAndVPuts(vGets, vPuts);
TryScheduleGetAcceleration();

Y_ABORT_UNLESS(RequestsSent > ResponsesReceived);
Y_ABORT_UNLESS(GetRequestsSent + PutRequestsSent > GetResponsesReceived + PutResponsesReceived);
Become(&TThis::StateWait);
SanityCheck(); // May Die
}
Expand Down

0 comments on commit fb9369f

Please sign in to comment.