Skip to content

Commit

Permalink
Add garbage collection inflight
Browse files Browse the repository at this point in the history
  • Loading branch information
serbel324 committed Dec 22, 2023
1 parent 313d937 commit 7360899
Showing 1 changed file with 47 additions and 20 deletions.
67 changes: 47 additions & 20 deletions ydb/core/load_test/group_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,6 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
TQuantileTracker<ui32> ReadsInFlightQT;
TQuantileTracker<ui64> ReadBytesInFlightQT;

// Collecting garbage
TIntervalGenerator GarbageCollectIntervalGen;

TDeque<TLogoBlobID> ConfirmedBlobIds;
TIntrusivePtr<NMonitoring::TCounterForPtr> MaxInFlightLatency;
bool IsWorkingNow = true;

Expand All @@ -286,6 +282,17 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
ui64 ScriptedRound;
TVector<TReqInfo> ScriptedRequests;

// Blobs management
TDeque<TLogoBlobID> ConfirmedBlobIds;

// Garbage collection
ui32 GarbageCollectionsInFlight = 0;
TMonotonic NextGarbageCollectionTimestamp;
bool NextGarbageCollectionInQueue = false;
TIntervalGenerator GarbageCollectIntervalGen;
// There is no point in having more than 1 active garbage collection request at the moment
constexpr static ui32 MaxGarbageCollectionsInFlight = 1;

public:
TTabletWriter(ui64 tag, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
TWakeupQueue& wakeupQueue, TQueryDispatcher& queryDispatcher, ui64 tabletId, ui32 channel,
Expand Down Expand Up @@ -326,11 +333,11 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
"items", Percentiles)
, ReadBytesInFlightQT(ExposePeriod, Counters->GetSubgroup("metric", "readBytesInFlight"),
"bytes", Percentiles)
, GarbageCollectIntervalGen(garbageCollectIntervalGen)
, ScriptedRoundDuration(scriptedRoundDuration)
, ScriptedCounter(0)
, ScriptedRound(0)
, ScriptedRequests(std::move(scriptedRequests))
, GarbageCollectIntervalGen(garbageCollectIntervalGen)
{
*Counters->GetCounter("tabletId") = tabletId;
const auto& percCounters = Counters->GetSubgroup("sensor", "microseconds");
Expand Down Expand Up @@ -363,6 +370,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
// Issue TEvDiscover
void Bootstrap(const TActorContext& ctx) {
NextWriteTimestamp = TActivationContext::Monotonic();
NextGarbageCollectionTimestamp = TActivationContext::Monotonic();
auto ev = std::make_unique<TEvBlobStorage::TEvDiscover>(TabletId, Generation, false, true, TInstant::Max(), 0, true);
LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, PrintMe() << " is bootstrapped, going to send "
<< ev->ToString());
Expand Down Expand Up @@ -415,35 +423,37 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
return;
}

IssueTEvCollectGarbage(ctx);
IssueTEvCollectGarbageOnce(ctx);
};

SendToBSProxy(ctx, GroupId, ev.release(), QueryDispatcher.ObtainCookie(std::move(callback)));
}

void IssueTEvCollectGarbage(const TActorContext& ctx) {
void IssueTEvCollectGarbageOnce(const TActorContext& ctx) {
auto ev = TEvBlobStorage::TEvCollectGarbage::CreateHardBarrier(TabletId, Generation, GarbageCollectStep,
Channel, Generation, 0, TInstant::Max());
LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, PrintMe() << " going to send " << ev->ToString());
++GarbageCollectStep;
auto callback = [this] (IEventBase *event, const TActorContext& ctx) {
auto *res = dynamic_cast<TEvBlobStorage::TEvCollectGarbageResult *>(event);
Y_ABORT_UNLESS(res);
--GarbageCollectionsInFlight;
if (!CheckStatus(ctx, res, {NKikimrProto::EReplyStatus::OK})) {
return;
}
LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, PrintMe() << " recieved " << res->ToString());
StartWorking(ctx);
};

++GarbageCollectionsInFlight;
SendToBSProxy(ctx, GroupId, ev.Release(), QueryDispatcher.ObtainCookie(std::move(callback)));
}

void StartWorking(const TActorContext& ctx) {
StartTimestamp = TActivationContext::Monotonic();
InitializeTrackers(StartTimestamp);
IssueWriteIfPossible(ctx);
ScheduleGarbageCollect(ctx);
IssueGarbageCollectionIfPossible(ctx);
ExposeCounters(ctx);
}

Expand All @@ -455,15 +465,19 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
auto callback = [this](IEventBase *event, const TActorContext& ctx) {
auto *res = dynamic_cast<TEvBlobStorage::TEvCollectGarbageResult *>(event);
Y_ABORT_UNLESS(res);
--GarbageCollectionsInFlight;

if (!CheckStatus(ctx, res, {NKikimrProto::EReplyStatus::OK})) {
return;
}
LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, PrintMe() << " recieved " << res->ToString());

if (IsWorkingNow) {
ctx.Send(ctx.SelfID, new TEvStopTest());
}
};

++GarbageCollectionsInFlight;
SendToBSProxy(ctx, GroupId, ev.Release(), QueryDispatcher.ObtainCookie(std::move(callback)));
}

Expand Down Expand Up @@ -569,6 +583,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
DUMP_PARAM_FINAL(ReadSettings.MaxRequestsInFlight)
DUMP_PARAM_FINAL(ReadSettings.MaxBytesInFlight)
DUMP_PARAM(ConfirmedBlobIds.size())
DUMP_PARAM(GarbageCollectionsInFlight)

static constexpr size_t count = 5;
std::array<size_t, count> nums{{9000, 9900, 9990, 9999, 10000}};
Expand Down Expand Up @@ -622,6 +637,12 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
WakeupQueue.Put(NextReadTimestamp, std::bind(&TTabletWriter::IssueReadIfPossible, this, _1), ctx);
NextReadInQueue = true;
}

if (now < NextGarbageCollectionTimestamp && !NextGarbageCollectionInQueue) {
using namespace std::placeholders;
WakeupQueue.Put(NextGarbageCollectionTimestamp, std::bind(&TTabletWriter::IssueGarbageCollectionIfPossible, this, _1), ctx);
NextGarbageCollectionInQueue = true;
}
}

void IssueWriteIfPossible(const TActorContext& ctx) {
Expand Down Expand Up @@ -753,23 +774,27 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
}
}

void ScheduleGarbageCollect(const TActorContext& ctx) {
TDuration duration = GarbageCollectIntervalGen.Generate();
if (duration != TDuration()) {
using namespace std::placeholders;
WakeupQueue.Put(TActivationContext::Monotonic() + duration,
std::bind(&TTabletWriter::IssueGarbageCollectRequest, this, _1), ctx);
void IssueGarbageCollectionIfPossible(const TActorContext& ctx) {
const TMonotonic now = TActivationContext::Monotonic();
while (GarbageCollectionsInFlight < MaxGarbageCollectionsInFlight &&
NextGarbageCollectionTimestamp <= now) {
IssueGarbageCollectRequest(ctx);
}
UpdateNextWakeups(ctx, now);
}

void IssueGarbageCollectRequest(const TActorContext& ctx) {
auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(TabletId, Generation, GarbageCollectStep, Channel,
true, Generation, GarbageCollectStep, nullptr, nullptr, TInstant::Max(), false);
auto callback = [](IEventBase *event, const TActorContext& /*ctx*/) {
auto ev = TEvBlobStorage::TEvCollectGarbage::CreateHardBarrier(TabletId, Generation, GarbageCollectStep,
Channel, Generation, 0, TInstant::Max());
auto callback = [this](IEventBase *event, const TActorContext& ctx) {
auto *res = dynamic_cast<TEvBlobStorage::TEvCollectGarbageResult *>(event);
Y_ABORT_UNLESS(res);
--GarbageCollectionsInFlight;
IssueGarbageCollectionIfPossible(ctx);
};
SendToBSProxy(ctx, GroupId, ev.release(), QueryDispatcher.ObtainCookie(std::move(callback)));
++GarbageCollectionsInFlight;

SendToBSProxy(ctx, GroupId, ev.Release(), QueryDispatcher.ObtainCookie(std::move(callback)));

// just as we have sent this request, we have to trim all confirmed blobs which are going to be deleted
const auto it = std::lower_bound(ConfirmedBlobIds.begin(), ConfirmedBlobIds.end(),
Expand All @@ -780,7 +805,9 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
++GarbageCollectStep;
++WriteStep;
Cookie = 1;
ScheduleGarbageCollect(ctx);

NextGarbageCollectionTimestamp += GarbageCollectIntervalGen.Generate();
NextGarbageCollectionInQueue = false;
}

void IssueReadIfPossible(const TActorContext& ctx) {
Expand Down

0 comments on commit 7360899

Please sign in to comment.