Skip to content

Commit

Permalink
Disable vpatch 24-2 (#7315)
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall authored Aug 5, 2024
1 parent f5c8236 commit c68c520
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 43 deletions.
208 changes: 195 additions & 13 deletions ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,33 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
}
};

enum EWakeUpTag : ui64 {
VPatchStartTag,
VPatchDiffTag,
MovedPatchTag,
NeverTag,
};

static TString ToString(ui64 wakeUp) {
switch (wakeUp) {
case VPatchStartTag: return "VPatchStartTag";
case VPatchDiffTag: return "VPatchDiffTag";
case MovedPatchTag: return "MovedPatchTag";
case NeverTag: return "NeverTag";
default: return "unknown@" + ToString(wakeUp);
}
}

static constexpr ui32 TypicalHandoffCount = 2;
static constexpr ui32 TypicalPartPlacementCount = 1 + TypicalHandoffCount;
static constexpr ui32 TypicalMaxPartsCount = TypicalPartPlacementCount * TypicalPartsInBlob;

static constexpr ui32 VPatchStartWaitingMultiplier = 2;
static constexpr ui32 VPatchDiffWaitingMultiplier = 6;
static constexpr ui32 MovedPatchWaitingMultiplier = 4;

static constexpr ui32 DefaultNsForChangeStrategy = 30'000'000; // 30 ms

TString Buffer;

ui32 OriginalGroupId;
Expand All @@ -47,6 +70,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
float ApproximateFreeSpaceShare = 0;

TInstant StartTime;
TInstant StageStart;
TInstant Deadline;

NLWTrace::TOrbit Orbit;
Expand All @@ -62,19 +86,24 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
ui32 SentStarts = 0;
ui32 ReceivedFoundParts = 0;
ui32 ErrorResponses = 0;
ui32 SentVPatchDiff = 0;
ui32 ReceivedResults = 0;

TStackVec<TPartPlacement, TypicalMaxPartsCount> FoundParts;
TStackVec<bool, TypicalDisksInSubring> ReceivedResponseFlags;
TStackVec<bool, TypicalDisksInSubring> EmptyResponseFlags;
TStackVec<bool, TypicalDisksInSubring> ErrorResponseFlags;
TStackVec<bool, TypicalDisksInSubring> ForceStopFlags;
TStackVec<bool, TypicalDisksInSubring> SlowFlags;
TBlobStorageGroupInfo::TVDiskIds VDisks;

bool UseVPatch = false;
bool IsGoodPatchedBlobId = false;
bool IsAllowedErasure = false;
bool IsSecured = false;
bool HasSlowVDisk = false;
bool IsContinuedVPatch = false;
bool IsMovedPatch = false;

#define PATCH_LOG(priority, service, marker, msg, ...) \
STLOG(priority, service, marker, msg, \
Expand All @@ -97,6 +126,15 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
return mon->ActivePatch;
}

void ScheduleWakeUp(TInstant startTime, EWakeUpTag tag) {
TDuration duration = TActivationContext::Now() - startTime;
Schedule(duration, new TEvents::TEvWakeup(tag));
}

void ScheduleWakeUp(EWakeUpTag tag) {
ScheduleWakeUp(StageStart, tag);
}

static constexpr ERequestType RequestType() {
return ERequestType::Patch;
}
Expand Down Expand Up @@ -280,6 +318,12 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
void Handle(TEvBlobStorage::TEvVPatchFoundParts::TPtr &ev) {
ReceivedFoundParts++;

if (Info->Type.ErasureFamily() != TErasureType::ErasureMirror) {
if (ReceivedFoundParts == SentStarts / 2 + SentStarts % 2) {
ScheduleWakeUp(VPatchStartTag);
}
}

NKikimrBlobStorage::TEvVPatchFoundParts &record = ev->Get()->Record;

Y_ABORT_UNLESS(record.HasCookie());
Expand Down Expand Up @@ -313,6 +357,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA26, "Received VPatchFoundParts",
(Status, status),
(SubgroupIdx, (ui32)subgroupIdx),
(VDiskId, VDisks[subgroupIdx]),
(ReceivedResults, static_cast<TString>(TStringBuilder() << ReceivedFoundParts << '/' << SentStarts)),
(ErrorReason, errorReason));

Expand Down Expand Up @@ -342,6 +387,13 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
}
ReceivedResults++;


if (Info->Type.ErasureFamily() != TErasureType::ErasureMirror) {
if (ReceivedResults == SentVPatchDiff / 2 + SentVPatchDiff % 2) {
ScheduleWakeUp(VPatchDiffTag);
}
}

PullOutStatusFlagsAndFressSpace(record);
Y_ABORT_UNLESS(record.HasStatus());
NKikimrProto::EReplyStatus status = record.GetStatus();
Expand All @@ -353,6 +405,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA23, "Received VPatchResult",
(Status, status),
(SubgroupIdx, (ui32)subgroupIdx),
(VDiskID, VDisks[subgroupIdx]),
(ReceivedResults, static_cast<TString>(TStringBuilder() << ReceivedResults << '/' << Info->Type.TotalPartCount())),
(ErrorReason, errorReason));

Expand Down Expand Up @@ -500,6 +553,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
}
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA20, "Send TEvVPatchDiff",
(VDiskIdxInSubgroup, idxInSubgroup),
(VDiskId, VDisks[idxInSubgroup]),
(PatchedVDiskIdxInSubgroup, patchedIdxInSubgroup),
(PartId, (ui64)partPlacement.PartId),
(DiffsForPart, diffsForPart.size()),
Expand Down Expand Up @@ -530,6 +584,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
ui32 vdiskIdx = vdiskIdxForParts[partIdx];
Y_VERIFY_S(vdiskIdx == partIdx || vdiskIdx >= dataParts, "vdiskIdx# " << vdiskIdx << " partIdx# " << partIdx);
placements.push_back(TPartPlacement{static_cast<ui8>(vdiskIdx), static_cast<ui8>(partIdx + 1)});
SentVPatchDiff++;
}
SendDiffs(placements);
}
Expand All @@ -538,15 +593,38 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA09, "Start Moved strategy",
(SentStarts, SentStarts));
Become(&TThis::MovedPatchState);
IsMovedPatch = true;
std::optional<ui32> subgroupIdx = 0;

ui32 subgroupIdx = 0;
if (OkVDisksWithParts) {
ui32 okVDiskIdx = RandomNumber<ui32>(OkVDisksWithParts.size());
subgroupIdx = OkVDisksWithParts[okVDiskIdx];
} else {
ui64 worstNs = 0;
ui64 nextToWorstNs = 0;
i32 worstSubGroubIdx = -1;
GetWorstPredictedDelaysNs(NKikimrBlobStorage::EVDiskQueueId::PutAsyncBlob, &worstNs, &nextToWorstNs, &worstSubGroubIdx);
if (worstNs > nextToWorstNs * 2) {
SlowFlags[worstSubGroubIdx] = true;
HasSlowVDisk = true;
}
if (HasSlowVDisk) {
TStackVec<ui32, TypicalDisksInSubring> goodDisks;
for (ui32 idx = 0; idx < VDisks.size(); ++idx) {
if (!SlowFlags[idx] && !ErrorResponseFlags[idx]) {
goodDisks.push_back(idx);
}
}
if (goodDisks.size()) {
ui32 okVDiskIdx = RandomNumber<ui32>(goodDisks.size());
subgroupIdx = goodDisks[okVDiskIdx];
}
}
}
if (!subgroupIdx) {
subgroupIdx = RandomNumber<ui32>(Info->Type.TotalPartCount());
}
TVDiskID vDisk = Info->GetVDiskInSubgroup(subgroupIdx, OriginalId.Hash());
TVDiskID vDisk = Info->GetVDiskInSubgroup(*subgroupIdx, OriginalId.Hash());
TDeque<std::unique_ptr<TEvBlobStorage::TEvVMovedPatch>> events;

ui64 cookie = ((ui64)OriginalId.Hash() << 32) | PatchedId.Hash();
Expand Down Expand Up @@ -575,7 +653,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob

void StartFallback() {
Mon->PatchesWithFallback->Inc();
if (WithMovingPatchRequestToStaticNode && UseVPatch && !IsSecured) {
if (WithMovingPatchRequestToStaticNode && UseVPatch && !IsSecured && !IsMovedPatch) {
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA05, "Start Moved strategy from fallback");
StartMovedPatch();
} else {
Expand All @@ -588,20 +666,31 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob

void StartVPatch() {
Become(&TThis::VPatchState);

StageStart = TActivationContext::Now();
Info->PickSubgroup(OriginalId.Hash(), &VDisks, nullptr);
ReceivedResponseFlags.assign(VDisks.size(), false);
ErrorResponseFlags.assign(VDisks.size(), false);
EmptyResponseFlags.assign(VDisks.size(), false);
ForceStopFlags.assign(VDisks.size(), false);
SlowFlags.assign(VDisks.size(), false);

ui64 worstNs = 0;
ui64 nextToWorstNs = 0;
i32 worstSubGroubIdx = -1;
GetWorstPredictedDelaysNs(NKikimrBlobStorage::EVDiskQueueId::GetFastRead, &worstNs, &nextToWorstNs, &worstSubGroubIdx);
if (worstNs > nextToWorstNs * 2) {
SlowFlags[worstSubGroubIdx] = true;
HasSlowVDisk = true;
}

TDeque<std::unique_ptr<TEvBlobStorage::TEvVPatchStart>> events;

for (ui32 idx = 0; idx < VDisks.size(); ++idx) {
std::unique_ptr<TEvBlobStorage::TEvVPatchStart> ev = std::make_unique<TEvBlobStorage::TEvVPatchStart>(
OriginalId, PatchedId, VDisks[idx], Deadline, idx, true);
events.emplace_back(std::move(ev));
SentStarts++;
if (!SlowFlags[idx]) {
std::unique_ptr<TEvBlobStorage::TEvVPatchStart> ev = std::make_unique<TEvBlobStorage::TEvVPatchStart>(
OriginalId, PatchedId, VDisks[idx], Deadline, idx, true);
events.emplace_back(std::move(ev));
SentStarts++;
}
}

PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA08, "Start VPatch strategy",
Expand Down Expand Up @@ -702,6 +791,17 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
bool ContinueVPatch() {
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA15, "Continue VPatch strategy",
(FoundParts, ConvertFoundPartsToString()));
StageStart = TActivationContext::Now();
IsContinuedVPatch = true;

ui64 worstNs = 0;
ui64 nextToWorstNs = 0;
i32 worstSubGroubIdx = -1;
GetWorstPredictedDelaysNs(NKikimrBlobStorage::EVDiskQueueId::GetFastRead, &worstNs, &nextToWorstNs, &worstSubGroubIdx);
if (worstNs > nextToWorstNs * 2) {
SlowFlags[worstSubGroubIdx] = true;
HasSlowVDisk = true;
}

if (Info->Type.GetErasure() == TErasureType::ErasureMirror3dc) {
return ContinueVPatchForMirror3dc();
Expand All @@ -714,6 +814,9 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
handoffForParts.resize(inPrimary.size());

for (auto &[subgroupIdx, partId] : FoundParts) {
if (SlowFlags[subgroupIdx]) {
continue;
}
if (subgroupIdx == partId - 1) {
inPrimary[partId - 1] = true;
} else {
Expand Down Expand Up @@ -784,6 +887,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob

void Bootstrap() {
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA01, "Actor bootstrapped");
Schedule(TDuration::MicroSeconds(60'000'000), new TEvents::TEvWakeup(NeverTag));

TLogoBlobID truePatchedBlobId = PatchedId;
bool result = true;
Expand All @@ -806,13 +910,14 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
return;
}

Info->PickSubgroup(OriginalId.Hash(), &VDisks, nullptr);
IsSecured = (Info->GetEncryptionMode() != TBlobStorageGroupInfo::EEM_NONE);

IsGoodPatchedBlobId = result;
IsAllowedErasure = Info->Type.ErasureFamily() == TErasureType::ErasureParityBlock
|| Info->Type.GetErasure() == TErasureType::ErasureNone
|| Info->Type.GetErasure() == TErasureType::ErasureMirror3dc;
if (IsGoodPatchedBlobId && IsAllowedErasure && UseVPatch && OriginalGroupId == Info->GroupID && !IsSecured) {
if (false && IsGoodPatchedBlobId && IsAllowedErasure && UseVPatch && OriginalGroupId == Info->GroupID && !IsSecured) {
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA03, "Start VPatch strategy from bootstrap");
StartVPatch();
} else {
Expand All @@ -826,16 +931,90 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
}
}

void GetWorstPredictedDelaysNs(NKikimrBlobStorage::EVDiskQueueId queueId,
ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstSubgroupIdx) const
{
*outWorstSubgroupIdx = -1;
*outWorstNs = 0;
*outNextToWorstNs = 0;
for (ui32 diskIdx = 0; diskIdx < VDisks.size(); ++diskIdx) {
ui64 predictedNs = GroupQueues->GetPredictedDelayNsByOrderNumber(diskIdx, queueId);;
if (predictedNs > *outWorstNs) {
*outNextToWorstNs = *outWorstNs;
*outWorstNs = predictedNs;
*outWorstSubgroupIdx = diskIdx;
} else if (predictedNs > *outNextToWorstNs) {
*outNextToWorstNs = predictedNs;
}
}
}

void SetSlowDisks() {
for (ui32 idx = 0; idx < SlowFlags.size(); ++idx) {
SlowFlags[idx] = !ReceivedResponseFlags[idx] && !EmptyResponseFlags[idx] && !ErrorResponseFlags[idx];
if (SlowFlags[idx]) {
HasSlowVDisk = true;
}
}
}

template <ui64 ExpectedTag>
void HandleWakeUp(TEvents::TEvWakeup::TPtr &ev) {
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA36, "HandleWakeUp",
(ExpectedTag, ToString(ExpectedTag)),
(ReceivedTag, ToString(ev->Get()->Tag)));
if (ev->Get()->Tag == ExpectedTag) {
SetSlowDisks();
StartFallback();
}
if (ev->Get()->Tag == NeverTag) {
SetSlowDisks();
StartFallback();
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA40, "Found NeverTag wake up", (ExpectedTag, ToString(ExpectedTag)));
}
}

void HandleVPatchWakeUp(TEvents::TEvWakeup::TPtr &ev) {
ui64 expectedTag = (IsContinuedVPatch ? VPatchDiffTag : VPatchStartTag);
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA37, "HandleWakeUp",
(ExpectedTag, ToString(expectedTag)),
(ReceivedTag, ToString(ev->Get()->Tag)));
if (ev->Get()->Tag == expectedTag) {
SetSlowDisks();
StartFallback();
}
if (ev->Get()->Tag == NeverTag) {
SetSlowDisks();
StartFallback();
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA41, "Found NeverTag wake up", (ExpectedTag, ToString(expectedTag)));
}
}

void HandleNeverTagWakeUp(TEvents::TEvWakeup::TPtr &ev) {
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA42, "HandleWakeUp",
(ExpectedTag, ToString(NeverTag)),
(ReceivedTag, ToString(ev->Get()->Tag)));
if (ev->Get()->Tag == NeverTag) {
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA43, "Found NeverTag wake up in naive state");
ReplyAndDie(NKikimrProto::DEADLINE);
}
}

STATEFN(NaiveState) {
if (ProcessEvent(ev)) {
return;
}
switch (ev->GetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvGetResult, Handle);
hFunc(TEvBlobStorage::TEvPutResult, Handle);

IgnoreFunc(TEvents::TEvWakeup);
//hFunc(TEvents::TEvWakeup, HandleWakeUp<NeverTag>);
IgnoreFunc(TEvBlobStorage::TEvVPatchResult);
IgnoreFunc(TEvBlobStorage::TEvVPatchFoundParts);
IgnoreFunc(TEvBlobStorage::TEvVMovedPatchResult);
default:
Y_ABORT("Received unknown event");
Y_FAIL_S("Received unknown event " << TypeName(*ev->GetBase()));
};
}

Expand All @@ -845,9 +1024,11 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
}
switch (ev->GetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvVMovedPatchResult, Handle);
hFunc(TEvents::TEvWakeup, HandleWakeUp<MovedPatchTag>);
IgnoreFunc(TEvBlobStorage::TEvVPatchResult);
IgnoreFunc(TEvBlobStorage::TEvVPatchFoundParts);
default:
Y_ABORT("Received unknown event");
Y_FAIL_S("Received unknown event " << TypeName(*ev->GetBase()));
};
}

Expand All @@ -858,8 +1039,9 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
switch (ev->GetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvVPatchFoundParts, Handle);
hFunc(TEvBlobStorage::TEvVPatchResult, Handle);
hFunc(TEvents::TEvWakeup, HandleVPatchWakeUp);
default:
Y_ABORT("Received unknown event");
Y_FAIL_S("Received unknown event " << TypeName(*ev->GetBase()));
};
}
};
Expand Down
Loading

0 comments on commit c68c520

Please sign in to comment.