Skip to content

Commit

Permalink
recalc weights lazily
Browse files Browse the repository at this point in the history
  • Loading branch information
ssmike committed Nov 29, 2024
1 parent 912849e commit 01eadd9
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 17 deletions.
28 changes: 11 additions & 17 deletions ydb/core/kqp/runtime/kqp_compute_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace {
}

static constexpr TDuration AvgBatch = TDuration::MicroSeconds(100);

static constexpr double MinCapacity = 1e-9;
}

namespace NKikimr {
Expand Down Expand Up @@ -390,14 +392,6 @@ class IResourcesWeightLimitValue : public TParameter<double>, public TIntrusiveL

class TResourcesWeightCalculator : public IObservable {
public:
TResourcesWeightCalculator(TObservableUpdater* updater, TMaybe<TDuration> throttleRecalculation)
: Updater(updater)
, ThrottleRecalculation(throttleRecalculation)
{
Y_UNUSED(Updater);
Y_UNUSED(throttleRecalculation);
}

void Register(IResourcesWeightLimitValue* entry) {
AddDependency(entry);
ReportEnabled(entry);
Expand Down Expand Up @@ -452,8 +446,6 @@ class TResourcesWeightCalculator : public IObservable {

private:
TIntrusiveList<IResourcesWeightLimitValue, TResourceWeightIntrusiveListTag> Entries;
TObservableUpdater* Updater;
TMaybe<TDuration> ThrottleRecalculation;

TVector<std::pair<double, IResourcesWeightLimitValue*>> SortBuffer;
};
Expand Down Expand Up @@ -522,7 +514,6 @@ class TSchedulerEntity {
TMonotonic LastNowRecalc;
bool Disabled = false;
i64 EntitiesWeight = 0;
double MaxDeviation = 0;
double MaxLimitDeviation = 0;

ssize_t TrackedBefore = 0;
Expand Down Expand Up @@ -620,6 +611,9 @@ class TSchedulerEntity {
if (limit > tracked) {
return {};
} else {
if (current.get()->Capacity < MinCapacity) {
return MaxDelay;
}
return Min(MaxDelay, ToDuration(/*Coeff * */(tracked - limit +
Max<i64>(0, group->DelayedSumBatches.load()) + BatchTime.MicroSeconds() +
ActivationPenalty.MicroSeconds() * (group->DelayedCount.load() + 1) +
Expand Down Expand Up @@ -664,7 +658,7 @@ struct TComputeScheduler::TImpl {
TObservableUpdater WeightsUpdater;
TParameter<double> SumCores{&WeightsUpdater, 1};

TResourcesWeightCalculator ResourceWeightsCalculator{&WeightsUpdater, {}};
TResourcesWeightCalculator ResourceWeightsCalculator;

enum : ui32 {
TotalShare = 1,
Expand Down Expand Up @@ -766,7 +760,6 @@ void TComputeScheduler::TImpl::AdvanceTime(TMonotonic now, TSchedulerEntity::TGr
if (Counters) {
record->InitCounters(Counters);
}
WeightsUpdater.UpdateAll();
record->MutableStats.Next()->Capacity = record->Share->GetValue();
auto& v = record->MutableStats;
{
Expand All @@ -784,8 +777,6 @@ void TComputeScheduler::TImpl::AdvanceTime(TMonotonic now, TSchedulerEntity::TGr
tracked - FromDuration(ForgetInteval) * group.get()->Capacity,
Min<ssize_t>(group.get()->Limit(now) - group.get()->MaxLimitDeviation, tracked));

v.Next()->MaxDeviation = (FromDuration(SmoothPeriod) * v.Next()->Capacity) / v.Next()->Capacity;

//if (group.get()->EntitiesWeight > 0) {
// delta = FromDuration(now - group.get()->LastNowRecalc) * group.get()->Capacity / group.get()->EntitiesWeight;
//}
Expand All @@ -804,6 +795,7 @@ void TComputeScheduler::TImpl::AdvanceTime(TMonotonic now, TSchedulerEntity::TGr
}

void TComputeScheduler::AdvanceTime(TMonotonic now) {
Impl->WeightsUpdater.UpdateAll();
for (size_t i = 0; i < Impl->Records.size(); ++i) {
Impl->AdvanceTime(now, Impl->Records[i].get());
}
Expand All @@ -818,8 +810,10 @@ void TComputeScheduler::Deregister(TSchedulerEntityHandle& self, TMonotonic now)
for (auto group : (*self).Groups) {
auto* next = group->MutableStats.Next();
next->EntitiesWeight -= (*self).Weight;
auto* param = Impl->WeightsUpdater.FindOrAddParameter<i64>({group->Name, TImpl::TasksCount}, 0);
param->Add(-1);
auto* param = Impl->WeightsUpdater.FindValue<TParameter<i64>>({group->Name, TImpl::TasksCount});
if (param) {
param->Add(-1);
}
Impl->AdvanceTime(now, group);
}
}
Expand Down
49 changes: 49 additions & 0 deletions ydb/core/kqp/runtime/kqp_compute_scheduler_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include "kqp_compute_scheduler.h"

#include <library/cpp/testing/unittest/registar.h>

using namespace NKikimr::NKqp;

Y_UNIT_TEST_SUITE(TComputeScheduler) {

TVector<TDuration> RunSimulation(TComputeScheduler& scheduler, TArrayRef<TSchedulerEntityHandle> handles, TMonotonic start, TDuration tick, TDuration length, TDuration batch) {
TVector<TDuration> results(handles.size());
for (TDuration t = TDuration::Zero(); t < length; t += tick) {
auto now = start + t;
for (size_t i = 0; i < handles.size(); ++i) {
auto& handle = handles[i];
if (!handle.Delay(now)) {
handle.TrackTime(batch, now);
}
}
scheduler.AdvanceTime(now);
}
return results;
}


Y_UNIT_TEST(TTotalLimits) {
TComputeScheduler scheduler;
scheduler.UpdateGroupShare("first", 0.4, TMonotonic::Zero(), std::nullopt);
scheduler.UpdateGroupShare("second", 0.4, TMonotonic::Zero(), std::nullopt);
scheduler.SetCapacity(2);
TVector<TSchedulerEntityHandle> handles;
handles.push_back(scheduler.Enroll("first", 1, TMonotonic::Zero()));
auto times = RunSimulation(scheduler,
handles,
TMonotonic::Zero() + TDuration::MilliSeconds(10),
TDuration::MilliSeconds(5),
TDuration::Seconds(2),
TDuration::MilliSeconds(10));
for (auto& time : times) {
UNIT_ASSERT_LE(time, TDuration::Seconds(2) * 0.8 + TDuration::MilliSeconds(10));
UNIT_ASSERT_GE(time, TDuration::Seconds(2) * 0.8 - TDuration::MilliSeconds(10));
}
}

Y_UNIT_TEST(QueryLimits) {
}

Y_UNIT_TEST(ResourceWeight) {
}
}
1 change: 1 addition & 0 deletions ydb/core/kqp/runtime/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ TIMEOUT(180)

SRCS(
kqp_scan_data_ut.cpp
kqp_compute_scheduler_ut.cpp
)

YQL_LAST_ABI_VERSION()
Expand Down

0 comments on commit 01eadd9

Please sign in to comment.