Skip to content

Commit

Permalink
make parameters conditional
Browse files Browse the repository at this point in the history
  • Loading branch information
ssmike committed Nov 29, 2024
1 parent 01eadd9 commit 66b2266
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 67 deletions.
151 changes: 85 additions & 66 deletions ydb/core/kqp/runtime/kqp_compute_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ class IObservable : TNonCopyable, public TIntrusiveListItem<IObservable> {
template<typename T>
class IObservableValue : public IObservable {
protected:
virtual double DoUpdateValue() = 0;
virtual T DoUpdateValue() = 0;

public:
bool Update() override {
if (auto val = DoUpdateValue()) {
auto val = DoUpdateValue();
if (val != Value) {
Value = val;
return true;
} else {
Expand Down Expand Up @@ -134,12 +135,7 @@ class TRatio : public IObservableValue<double> {
template<typename T>
class TParameter;

class TObservableUpdater : IObservable {
private:
bool Update() override {
return false;
}

class TObservableUpdater {
public:
void UpdateAll() {
TVector<TSet<IObservable*>> queue;
Expand Down Expand Up @@ -225,8 +221,8 @@ class TObservableUpdater : IObservable {
T* Get();
};

THashMap<TParameterKey, TValueContainer> Params;
TIntrusiveList<IObservable> ToUpdate_;
THashMap<TParameterKey, TValueContainer> Params;
};

template<typename T>
Expand All @@ -253,7 +249,7 @@ class TParameter : public IObservableValue<T> {
}

protected:
double DoUpdateValue() override {
T DoUpdateValue() override {
return Value_;
}

Expand Down Expand Up @@ -383,38 +379,30 @@ class IResourcesWeightLimitValue : public TParameter<double>, public TIntrusiveL
public:
using TParameter<double>::TParameter;

virtual bool Enabled() = 0;
virtual IObservableValue<bool>* Enabled() = 0;

virtual double Weight() = 0;
virtual IObservableValue<double>* Weight() = 0;

virtual double HardLimit() = 0;
virtual IObservableValue<double>* HardLimit() = 0;
};

class TResourcesWeightCalculator : public IObservable {
public:
void Register(IResourcesWeightLimitValue* entry) {
AddDependency(entry);
ReportEnabled(entry);
}

void ReportEnabled(IResourcesWeightLimitValue* entry) {
entry->TIntrusiveListItem<IResourcesWeightLimitValue, TResourceWeightIntrusiveListTag>::Unlink();
if (entry->Enabled()) {
Entries.PushFront(entry);
} else {
Entries.PushBack(entry);
}
AddDependency(entry->Enabled());
AddDependency(entry->Weight());
AddDependency(entry->HardLimit());
Entries.PushBack(entry);
}

bool Update() {
SortBuffer.clear();
double sumWeight = 0;
for (auto& entry : Entries) {
if (!entry.Enabled()) {
break;
if (entry.Enabled()->GetValue()) {
sumWeight += entry.Weight()->GetValue();
SortBuffer.push_back({entry.HardLimit()->GetValue() / entry.Weight()->GetValue(), &entry});
}
sumWeight += entry.Weight();
SortBuffer.push_back({entry.HardLimit() / entry.Weight(), &entry});
}
Sort(SortBuffer);

Expand All @@ -433,15 +421,14 @@ class TResourcesWeightCalculator : public IObservable {
break;
}

sumWeight -= sortedEntry->Weight();
sumWeight -= sortedEntry->Weight()->GetValue();
}

for (auto& [entryLimit, sortedEntry] : SortBuffer) {
sortedEntry->SetValue(Min(level * sortedEntry->Weight(), sortedEntry->HardLimit()));
sortedEntry->SetValue(Min(level * sortedEntry->Weight()->GetValue(), sortedEntry->HardLimit()->GetValue()));
}

// nobody should be subscribed
return false;
return true;
}

private:
Expand All @@ -461,46 +448,75 @@ class TResourcesWeightLimitValue : public IResourcesWeightLimitValue {
TResourcesWeightCalculator* calculator,
TObservableUpdater* updater)
: IResourcesWeightLimitValue(updater, staticLimit->GetValue())
, StaticLimit(staticLimit)
, EnabledFlag(enabled, tasksCount)
, HardLimitValue(staticLimit, tasksCount, sumCores)
, ResourceWeightValue(resourceWeight)
, Enabled_(enabled)
, Calculator_(calculator)
, Taskscount(tasksCount)
, SumCores(sumCores)
, Updater_(updater)
{
AddDependency(sumCores);
AddDependency(Taskscount);
AddDependency(staticLimit);
AddDependency(resourceWeight);
AddDependency(enabled);

calculator->Register(this);
AddDependency(calculator);
}

bool Enabled() override {
return Enabled_->GetValue() && Taskscount->GetValue() > 0;
~TResourcesWeightLimitValue() {
Updater_->ToUpdate(Calculator_);
}

double Weight() override {
return ResourceWeightValue->GetValue();
IObservableValue<double>* Weight() override {
return ResourceWeightValue;
}

double HardLimit() override {
return Min(StaticLimit->GetValue(), Taskscount->GetValue() / SumCores->GetValue());
IObservableValue<bool>* Enabled() override {
return &EnabledFlag;
}

bool Update() override {
Calculator_->ReportEnabled(this);
return IResourcesWeightLimitValue::Update();
IObservableValue<double>* HardLimit() override {
return &HardLimitValue;
}

private:
IObservableValue<double>* StaticLimit;
struct TEnabledFlag : public IObservableValue<bool> {
TEnabledFlag(TParameter<bool>* enabled, TParameter<i64>* taskscount)
: Enabled_(enabled)
, Taskscount(taskscount)
{
AddDependency(enabled);
AddDependency(taskscount);
Update();
}

bool DoUpdateValue() override {
return Enabled_->GetValue() && Taskscount->GetValue() > 0;
}

TParameter<bool>* Enabled_;
TParameter<i64>* Taskscount;
} EnabledFlag;

struct THardLimit : public IObservableValue<double> {
THardLimit(IObservableValue<double>* staticLimit, TParameter<i64>* taskscount, TParameter<double>* sumCores)
: StaticLimit(staticLimit)
, TasksCount(taskscount)
, SumCores(sumCores)
{
AddDependency(StaticLimit);
AddDependency(TasksCount);
AddDependency(SumCores);
}

double DoUpdateValue() override {
return Min(StaticLimit->GetValue(), TasksCount->GetValue() / SumCores->GetValue());
}

IObservableValue<double>* StaticLimit;
TParameter<i64>* TasksCount;
TParameter<double>* SumCores;
} HardLimitValue;

private:
TParameter<double>* ResourceWeightValue;
TParameter<bool>* Enabled_;
TResourcesWeightCalculator* Calculator_;
TParameter<i64>* Taskscount;
TParameter<double>* SumCores;
TObservableUpdater* Updater_;
};


Expand Down Expand Up @@ -529,7 +545,6 @@ class TSchedulerEntity {
std::atomic<i64> DelayedCount = 0;

THolder<IObservableValue<double>> Share;
THolder<TResourcesWeightLimitValue> ResourcesWeightLimit;

::NMonitoring::TDynamicCounters::TCounterPtr Vtime;
::NMonitoring::TDynamicCounters::TCounterPtr EntitiesWeight;
Expand Down Expand Up @@ -655,11 +670,10 @@ struct TComputeScheduler::TImpl {
THashMap<TString, size_t> GroupId;
std::vector<std::unique_ptr<TSchedulerEntity::TGroupRecord>> Records;

TResourcesWeightCalculator ResourceWeightsCalculator;
TObservableUpdater WeightsUpdater;
TParameter<double> SumCores{&WeightsUpdater, 1};

TResourcesWeightCalculator ResourceWeightsCalculator;

enum : ui32 {
TotalShare = 1,

Expand All @@ -670,6 +684,7 @@ struct TComputeScheduler::TImpl {
TasksCount = 5,

CompositeShare = 6,
ResourceLimitValue = 7,
};

TIntrusivePtr<TKqpCounters> Counters;
Expand All @@ -678,10 +693,9 @@ struct TComputeScheduler::TImpl {

TDuration MaxDelay = TDuration::Seconds(10);

void CreateGroup(THolder<IObservableValue<double>> share, NMonotonic::TMonotonic now, std::optional<TString> groupName = std::nullopt, THolder<TResourcesWeightLimitValue> resourceWeightLimit = {}) {
void CreateGroup(THolder<IObservableValue<double>> share, NMonotonic::TMonotonic now, std::optional<TString> groupName = std::nullopt) {
auto group = std::make_unique<TSchedulerEntity::TGroupRecord>();
group->Share = std::move(share);
group->ResourcesWeightLimit = std::move(resourceWeightLimit);
if (groupName) {
group->Name = *groupName;
GroupId[*groupName] = Records.size();
Expand Down Expand Up @@ -883,7 +897,11 @@ class TCompositeGroupShare : public IObservableValue<double> {
protected:
double DoUpdateValue() override {
if (ResourceWeightEnabled->GetValue()) {
return Min(TotalLimit->GetValue(), ResourceWeightLimit->GetValue());
if (ResourceWeightLimit->Enabled()->GetValue()) {
return Min(TotalLimit->GetValue(), ResourceWeightLimit->GetValue());
} else {
return 0;
}
} else {
return TotalLimit->GetValue();
}
Expand All @@ -898,6 +916,7 @@ class TCompositeGroupShare : public IObservableValue<double> {
AddDependency(resourceWeightEnabled);
AddDependency(totalLimit);
AddDependency(resourceWeightLimit);
AddDependency(resourceWeightLimit->Enabled());
Update();
}

Expand All @@ -916,11 +935,10 @@ void TComputeScheduler::UpdateGroupShare(TString group, double share, TMonotonic
TParameter<bool>* weightEnabled = Impl->WeightsUpdater.FindOrAddParameter<bool>({group, TImpl::ResourceWeightEnabled}, resourceWeight.has_value());
weightEnabled->SetValue(resourceWeight.has_value());

TParameter<double>* resourceWeightValue = Impl->WeightsUpdater.FindOrAddParameter<double>({group, TImpl::ResourceWeight}, resourceWeight.value_or(0));

TParameter<i64>* taskscount = Impl->WeightsUpdater.FindOrAddParameter<i64>({group, TImpl::TasksCount}, 0);

if (!ptr) {
TParameter<double>* resourceWeightValue = Impl->WeightsUpdater.FindOrAddParameter<double>({group, TImpl::ResourceWeight}, resourceWeight.value_or(0));
TParameter<i64>* taskscount = Impl->WeightsUpdater.FindOrAddParameter<i64>({group, TImpl::TasksCount}, 0);

auto resourceLimitValue = MakeHolder<TResourcesWeightLimitValue>(
&Impl->SumCores,
taskscount,
Expand All @@ -932,8 +950,9 @@ void TComputeScheduler::UpdateGroupShare(TString group, double share, TMonotonic

auto compositeWeight = MakeHolder<TCompositeGroupShare>(shareValue, resourceLimitValue.Get(), weightEnabled);
auto cap = MakeHolder<TShare>(&Impl->SumCores, compositeWeight.Get());
Impl->WeightsUpdater.AddValue<IObservable>({group, TImpl::ResourceLimitValue}, THolder(resourceLimitValue.Release()));
Impl->WeightsUpdater.AddValue({group, TImpl::CompositeShare}, std::move(compositeWeight));
Impl->CreateGroup(std::move(cap), now, group, std::move(resourceLimitValue));
Impl->CreateGroup(std::move(cap), now, group);
} else {
auto& record = Impl->Records[*ptr];
record->MutableStats.Next()->Disabled = false;
Expand Down
Loading

0 comments on commit 66b2266

Please sign in to comment.