Skip to content

Commit

Permalink
Fix SIGSEGV in balancing actor (#13136)
Browse files Browse the repository at this point in the history
  • Loading branch information
SammyVimes authored Dec 31, 2024
1 parent bafd581 commit acaff68
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 38 deletions.
24 changes: 14 additions & 10 deletions ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ namespace NBalancing {
: BatchSize(batchSize)
{}

TConstArrayRef<T> GetNextBatch() {
TVector<T> GetNextBatch() {
if (Empty()) {
return {};
}

ui32 begin = CurPos;
ui32 end = Min(begin + BatchSize, static_cast<ui32>(Data.size()));
CurPos = end;
return TConstArrayRef<T>(Data.data() + begin, end - begin);

return TVector<T>(std::make_move_iterator(Data.begin() + begin), std::make_move_iterator(Data.begin() + end));
}

bool Empty() const {
Expand Down Expand Up @@ -60,9 +61,9 @@ namespace NBalancing {
}

TBatchManager() = default;
TBatchManager(IActor* sender, IActor* deleter)
: SenderId(TlsActivationContext->Register(sender))
, DeleterId(TlsActivationContext->Register(deleter))
TBatchManager(const TActorId& sender, const TActorId& deleter)
: SenderId(sender)
, DeleterId(deleter)
{}
};

Expand Down Expand Up @@ -204,13 +205,13 @@ namespace NBalancing {
///////////////////////////////////////////////////////////////////////////////////////////

void ContinueBalancing() {
Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size();
Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size();
Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Size();
Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Size();

if (SendOnMainParts.Empty() && TryDeleteParts.Empty()) {
// no more parts to send or delete
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB03, VDISKP(Ctx->VCtx, "Balancing completed"));
bool hasSomeWorkForNextEpoch = SendOnMainParts.Data.size() >= Ctx->Cfg.MaxToSendPerEpoch || TryDeleteParts.Data.size() >= Ctx->Cfg.MaxToDeletePerEpoch;
bool hasSomeWorkForNextEpoch = SendOnMainParts.Size() >= Ctx->Cfg.MaxToSendPerEpoch || TryDeleteParts.Size() >= Ctx->Cfg.MaxToDeletePerEpoch;
Stop(hasSomeWorkForNextEpoch ? TDuration::Seconds(0) : Ctx->Cfg.TimeToSleepIfNothingToDo);
return;
}
Expand All @@ -231,9 +232,12 @@ namespace NBalancing {
(ConnectedVDisks, ConnectedVDisks.size()), (TotalVDisks, GInfo->GetTotalVDisksNum()));

// register sender and deleter actors
IActor* sender = CreateSenderActor(SelfId(), SendOnMainParts.GetNextBatch(), QueueActorMapPtr, Ctx);
IActor* deleter = CreateDeleterActor(SelfId(), TryDeleteParts.GetNextBatch(), QueueActorMapPtr, Ctx);

BatchManager = TBatchManager(
CreateSenderActor(SelfId(), SendOnMainParts.GetNextBatch(), QueueActorMapPtr, Ctx),
CreateDeleterActor(SelfId(), TryDeleteParts.GetNextBatch(), QueueActorMapPtr, Ctx)
RegisterWithSameMailbox(sender),
RegisterWithSameMailbox(deleter)
);
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/vdisk/balance/balancing_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ namespace NKikimr {
namespace NBalancing {
IActor* CreateSenderActor(
TActorId notifyId,
TConstArrayRef<TPartInfo> parts,
TVector<TPartInfo>&& parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
);
IActor* CreateDeleterActor(
TActorId notifyId,
TConstArrayRef<TLogoBlobID> parts,
TVector<TLogoBlobID>&& parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
);
Expand Down
16 changes: 8 additions & 8 deletions ydb/core/blobstorage/vdisk/balance/deleter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace {
class TPartsRequester {
private:
const TActorId NotifyId;
TConstArrayRef<TLogoBlobID> Parts;
TVector<TLogoBlobID> Parts;
TReplQuoter::TPtr Quoter;
TIntrusivePtr<TBlobStorageGroupInfo> GInfo;
TQueueActorMapPtr QueueActorMapPtr;
Expand All @@ -32,14 +32,14 @@ namespace {
ui32 Responses = 0;
public:

TPartsRequester(TActorId notifyId, TConstArrayRef<TLogoBlobID> parts, TReplQuoter::TPtr quoter, TIntrusivePtr<TBlobStorageGroupInfo> gInfo, TQueueActorMapPtr queueActorMapPtr, NMonGroup::TBalancingGroup& monGroup)
TPartsRequester(TActorId notifyId, TVector<TLogoBlobID>&& parts, TReplQuoter::TPtr quoter, TIntrusivePtr<TBlobStorageGroupInfo> gInfo, TQueueActorMapPtr queueActorMapPtr, NMonGroup::TBalancingGroup& monGroup)
: NotifyId(notifyId)
, Parts(parts)
, Parts(std::move(parts))
, Quoter(quoter)
, GInfo(gInfo)
, QueueActorMapPtr(queueActorMapPtr)
, MonGroup(monGroup)
, Result(parts.size())
, Result(Parts.size())
{
}

Expand Down Expand Up @@ -294,14 +294,14 @@ namespace {
TDeleter() = default;
TDeleter(
TActorId notifyId,
TConstArrayRef<TLogoBlobID> parts,
TVector<TLogoBlobID>&& parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
)
: NotifyId(notifyId)
, Ctx(ctx)
, GInfo(ctx->GInfo)
, PartsRequester(SelfId(), parts, Ctx->VCtx->ReplNodeRequestQuoter, GInfo, queueActorMapPtr, Ctx->MonGroup)
, PartsRequester(SelfId(), std::move(parts), Ctx->VCtx->ReplNodeRequestQuoter, GInfo, queueActorMapPtr, Ctx->MonGroup)
, PartsDeleter(ctx, GInfo)
{
}
Expand All @@ -314,11 +314,11 @@ namespace {

IActor* CreateDeleterActor(
TActorId notifyId,
TConstArrayRef<TLogoBlobID> parts,
TVector<TLogoBlobID>&& parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
) {
return new TDeleter(notifyId, parts, queueActorMapPtr, ctx);
return new TDeleter(notifyId, std::move(parts), queueActorMapPtr, ctx);
}

} // NBalancing
Expand Down
40 changes: 22 additions & 18 deletions ydb/core/blobstorage/vdisk/balance/sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace {
class TReader {
private:
TPDiskCtxPtr PDiskCtx;
TConstArrayRef<TPartInfo> Parts;
TVector<TPartInfo> Parts;
TReplQuoter::TPtr Quoter;
const TBlobStorageGroupType GType;
NMonGroup::TBalancingGroup& MonGroup;
Expand All @@ -28,27 +28,28 @@ namespace {
ui32 Responses = 0;
public:

TReader(TPDiskCtxPtr pDiskCtx, TConstArrayRef<TPartInfo> parts, TReplQuoter::TPtr replPDiskReadQuoter, TBlobStorageGroupType gType, NMonGroup::TBalancingGroup& monGroup)
TReader(TPDiskCtxPtr pDiskCtx, TVector<TPartInfo>&& parts, TReplQuoter::TPtr replPDiskReadQuoter, TBlobStorageGroupType gType, NMonGroup::TBalancingGroup& monGroup)
: PDiskCtx(pDiskCtx)
, Parts(parts)
, Parts(std::move(parts))
, Quoter(replPDiskReadQuoter)
, GType(gType)
, MonGroup(monGroup)
, Result(parts.size())
, Result(Parts.size())
{}

void SendReadRequests(const TActorId& selfId) {
for (ui32 i = 0; i < Parts.size(); ++i) {
const auto& item = Parts[i];
auto& item = Parts[i];
Result[i] = TPart{.Key=item.Key, .PartsMask=item.PartsMask};
std::visit(TOverloaded{
[&](const TRope& data) {
[&](TRope&& data) {
// part is already in memory, no need to read it from disk
Y_DEBUG_ABORT_UNLESS(item.PartsMask.CountBits() == 1);
Result[i].PartsData = {data};
Result[i].PartsData.reserve(1);
Result[i].PartsData.emplace_back(std::move(data));
++Responses;
},
[&](const TDiskPart& diskPart) {
[&](TDiskPart&& diskPart) {
auto ev = std::make_unique<NPDisk::TEvChunkRead>(
PDiskCtx->Dsk->Owner,
PDiskCtx->Dsk->OwnerRound,
Expand All @@ -66,7 +67,7 @@ namespace {
);
MonGroup.ReadFromHandoffBytes() += diskPart.Size;
}
}, item.PartData);
}, std::move(item.PartData));
}
}

Expand All @@ -83,6 +84,8 @@ namespace {
auto diskBlob = TDiskBlob(&data, localParts, GType, key);
ui32 readSize = 0;

Result[i].PartsData.reserve(localParts.CountBits());

for (ui8 partIdx = localParts.FirstPosition(); partIdx < localParts.GetSize(); partIdx = localParts.NextPosition(partIdx)) {
TRope result;
result = diskBlob.GetPart(partIdx, &result);
Expand Down Expand Up @@ -150,26 +153,27 @@ namespace {
auto localParts = part.PartsMask;
for (ui8 partIdx = localParts.FirstPosition(), i = 0; partIdx < localParts.GetSize(); partIdx = localParts.NextPosition(partIdx), ++i) {
auto key = TLogoBlobID(part.Key, partIdx + 1);
auto& data = part.PartsData[i];
auto&& data = std::move(part.PartsData[i]);
size_t dataSize = data.size();
auto vDiskId = GetMainReplicaVDiskId(*GInfo, key);

if (Ctx->HugeBlobCtx->IsHugeBlob(GInfo->GetTopology().GType, part.Key, Ctx->MinHugeBlobInBytes)) {
auto ev = std::make_unique<TEvBlobStorage::TEvVPut>(
key, data, vDiskId,
key, std::move(data), vDiskId,
true, nullptr,
TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::AsyncBlob
);
SendRequest(TVDiskIdShort(vDiskId), selfId, ev.release(), data.size());
SendRequest(TVDiskIdShort(vDiskId), selfId, ev.release(), dataSize);
} else {
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB11, VDISKP(Ctx->VCtx, "Add in multiput"), (LogoBlobId, key.ToString()),
(To, GInfo->GetTopology().GetOrderNumber(TVDiskIdShort(vDiskId))), (DataSize, data.size()));
(To, GInfo->GetTopology().GetOrderNumber(TVDiskIdShort(vDiskId))), (DataSize, dataSize));

auto& ev = vDiskToEv[vDiskId];
if (!ev) {
ev = std::make_unique<TEvBlobStorage::TEvVMultiPut>(vDiskId, TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::AsyncBlob, true, nullptr);
}

ev->AddVPut(key, TRcBuf(data), nullptr, {}, NWilson::TTraceId());
ev->AddVPut(key, TRcBuf(std::move(data)), nullptr, {}, NWilson::TTraceId());
}
}
}
Expand Down Expand Up @@ -344,15 +348,15 @@ namespace {
public:
TSender(
TActorId notifyId,
TConstArrayRef<TPartInfo> parts,
TVector<TPartInfo>&& parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
)
: NotifyId(notifyId)
, QueueActorMapPtr(queueActorMapPtr)
, Ctx(ctx)
, GInfo(ctx->GInfo)
, Reader(Ctx->PDiskCtx, parts, ctx->VCtx->ReplPDiskReadQuoter, GInfo->GetTopology().GType, Ctx->MonGroup)
, Reader(Ctx->PDiskCtx, std::move(parts), ctx->VCtx->ReplPDiskReadQuoter, GInfo->GetTopology().GType, Ctx->MonGroup)
, Sender(ctx, GInfo, queueActorMapPtr)
{}

Expand All @@ -364,11 +368,11 @@ namespace {

IActor* CreateSenderActor(
TActorId notifyId,
TConstArrayRef<TPartInfo> parts,
TVector<TPartInfo>&& parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
) {
return new TSender(notifyId, parts, queueActorMapPtr, ctx);
return new TSender(notifyId, std::move(parts), queueActorMapPtr, ctx);
}

} // NBalancing
Expand Down

0 comments on commit acaff68

Please sign in to comment.