From 66b22661f6dfcbae6f0a4533ec6b2f72c3afeae2 Mon Sep 17 00:00:00 2001 From: Mikhail Surin Date: Wed, 27 Nov 2024 08:33:36 +0300 Subject: [PATCH] make parameters conditional --- .../kqp/runtime/kqp_compute_scheduler.cpp | 151 ++++++++++-------- .../kqp/runtime/kqp_compute_scheduler_ut.cpp | 72 ++++++++- 2 files changed, 156 insertions(+), 67 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp index 821977a5e5b3..9dcf8d2216e6 100644 --- a/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp @@ -69,11 +69,12 @@ class IObservable : TNonCopyable, public TIntrusiveListItem { template class IObservableValue : public IObservable { protected: - virtual double DoUpdateValue() = 0; + virtual T DoUpdateValue() = 0; public: bool Update() override { - if (auto val = DoUpdateValue()) { + auto val = DoUpdateValue(); + if (val != Value) { Value = val; return true; } else { @@ -134,12 +135,7 @@ class TRatio : public IObservableValue { template class TParameter; -class TObservableUpdater : IObservable { -private: - bool Update() override { - return false; - } - +class TObservableUpdater { public: void UpdateAll() { TVector> queue; @@ -225,8 +221,8 @@ class TObservableUpdater : IObservable { T* Get(); }; - THashMap Params; TIntrusiveList ToUpdate_; + THashMap Params; }; template @@ -253,7 +249,7 @@ class TParameter : public IObservableValue { } protected: - double DoUpdateValue() override { + T DoUpdateValue() override { return Value_; } @@ -383,38 +379,30 @@ class IResourcesWeightLimitValue : public TParameter, public TIntrusiveL public: using TParameter::TParameter; - virtual bool Enabled() = 0; + virtual IObservableValue* Enabled() = 0; - virtual double Weight() = 0; + virtual IObservableValue* Weight() = 0; - virtual double HardLimit() = 0; + virtual IObservableValue* HardLimit() = 0; }; class TResourcesWeightCalculator : public IObservable { public: void Register(IResourcesWeightLimitValue* entry) { - AddDependency(entry); - ReportEnabled(entry); - } - - void ReportEnabled(IResourcesWeightLimitValue* entry) { - entry->TIntrusiveListItem::Unlink(); - if (entry->Enabled()) { - Entries.PushFront(entry); - } else { - Entries.PushBack(entry); - } + AddDependency(entry->Enabled()); + AddDependency(entry->Weight()); + AddDependency(entry->HardLimit()); + Entries.PushBack(entry); } bool Update() { SortBuffer.clear(); double sumWeight = 0; for (auto& entry : Entries) { - if (!entry.Enabled()) { - break; + if (entry.Enabled()->GetValue()) { + sumWeight += entry.Weight()->GetValue(); + SortBuffer.push_back({entry.HardLimit()->GetValue() / entry.Weight()->GetValue(), &entry}); } - sumWeight += entry.Weight(); - SortBuffer.push_back({entry.HardLimit() / entry.Weight(), &entry}); } Sort(SortBuffer); @@ -433,15 +421,14 @@ class TResourcesWeightCalculator : public IObservable { break; } - sumWeight -= sortedEntry->Weight(); + sumWeight -= sortedEntry->Weight()->GetValue(); } for (auto& [entryLimit, sortedEntry] : SortBuffer) { - sortedEntry->SetValue(Min(level * sortedEntry->Weight(), sortedEntry->HardLimit())); + sortedEntry->SetValue(Min(level * sortedEntry->Weight()->GetValue(), sortedEntry->HardLimit()->GetValue())); } - // nobody should be subscribed - return false; + return true; } private: @@ -461,46 +448,75 @@ class TResourcesWeightLimitValue : public IResourcesWeightLimitValue { TResourcesWeightCalculator* calculator, TObservableUpdater* updater) : IResourcesWeightLimitValue(updater, staticLimit->GetValue()) - , StaticLimit(staticLimit) + , EnabledFlag(enabled, tasksCount) + , HardLimitValue(staticLimit, tasksCount, sumCores) , ResourceWeightValue(resourceWeight) - , Enabled_(enabled) , Calculator_(calculator) - , Taskscount(tasksCount) - , SumCores(sumCores) + , Updater_(updater) { - AddDependency(sumCores); - AddDependency(Taskscount); - AddDependency(staticLimit); - AddDependency(resourceWeight); - AddDependency(enabled); - calculator->Register(this); + AddDependency(calculator); } - bool Enabled() override { - return Enabled_->GetValue() && Taskscount->GetValue() > 0; + ~TResourcesWeightLimitValue() { + Updater_->ToUpdate(Calculator_); } - double Weight() override { - return ResourceWeightValue->GetValue(); + IObservableValue* Weight() override { + return ResourceWeightValue; } - double HardLimit() override { - return Min(StaticLimit->GetValue(), Taskscount->GetValue() / SumCores->GetValue()); + IObservableValue* Enabled() override { + return &EnabledFlag; } - bool Update() override { - Calculator_->ReportEnabled(this); - return IResourcesWeightLimitValue::Update(); + IObservableValue* HardLimit() override { + return &HardLimitValue; } private: - IObservableValue* StaticLimit; + struct TEnabledFlag : public IObservableValue { + TEnabledFlag(TParameter* enabled, TParameter* taskscount) + : Enabled_(enabled) + , Taskscount(taskscount) + { + AddDependency(enabled); + AddDependency(taskscount); + Update(); + } + + bool DoUpdateValue() override { + return Enabled_->GetValue() && Taskscount->GetValue() > 0; + } + + TParameter* Enabled_; + TParameter* Taskscount; + } EnabledFlag; + + struct THardLimit : public IObservableValue { + THardLimit(IObservableValue* staticLimit, TParameter* taskscount, TParameter* sumCores) + : StaticLimit(staticLimit) + , TasksCount(taskscount) + , SumCores(sumCores) + { + AddDependency(StaticLimit); + AddDependency(TasksCount); + AddDependency(SumCores); + } + + double DoUpdateValue() override { + return Min(StaticLimit->GetValue(), TasksCount->GetValue() / SumCores->GetValue()); + } + + IObservableValue* StaticLimit; + TParameter* TasksCount; + TParameter* SumCores; + } HardLimitValue; + +private: TParameter* ResourceWeightValue; - TParameter* Enabled_; TResourcesWeightCalculator* Calculator_; - TParameter* Taskscount; - TParameter* SumCores; + TObservableUpdater* Updater_; }; @@ -529,7 +545,6 @@ class TSchedulerEntity { std::atomic DelayedCount = 0; THolder> Share; - THolder ResourcesWeightLimit; ::NMonitoring::TDynamicCounters::TCounterPtr Vtime; ::NMonitoring::TDynamicCounters::TCounterPtr EntitiesWeight; @@ -655,11 +670,10 @@ struct TComputeScheduler::TImpl { THashMap GroupId; std::vector> Records; + TResourcesWeightCalculator ResourceWeightsCalculator; TObservableUpdater WeightsUpdater; TParameter SumCores{&WeightsUpdater, 1}; - TResourcesWeightCalculator ResourceWeightsCalculator; - enum : ui32 { TotalShare = 1, @@ -670,6 +684,7 @@ struct TComputeScheduler::TImpl { TasksCount = 5, CompositeShare = 6, + ResourceLimitValue = 7, }; TIntrusivePtr Counters; @@ -678,10 +693,9 @@ struct TComputeScheduler::TImpl { TDuration MaxDelay = TDuration::Seconds(10); - void CreateGroup(THolder> share, NMonotonic::TMonotonic now, std::optional groupName = std::nullopt, THolder resourceWeightLimit = {}) { + void CreateGroup(THolder> share, NMonotonic::TMonotonic now, std::optional groupName = std::nullopt) { auto group = std::make_unique(); group->Share = std::move(share); - group->ResourcesWeightLimit = std::move(resourceWeightLimit); if (groupName) { group->Name = *groupName; GroupId[*groupName] = Records.size(); @@ -883,7 +897,11 @@ class TCompositeGroupShare : public IObservableValue { protected: double DoUpdateValue() override { if (ResourceWeightEnabled->GetValue()) { - return Min(TotalLimit->GetValue(), ResourceWeightLimit->GetValue()); + if (ResourceWeightLimit->Enabled()->GetValue()) { + return Min(TotalLimit->GetValue(), ResourceWeightLimit->GetValue()); + } else { + return 0; + } } else { return TotalLimit->GetValue(); } @@ -898,6 +916,7 @@ class TCompositeGroupShare : public IObservableValue { AddDependency(resourceWeightEnabled); AddDependency(totalLimit); AddDependency(resourceWeightLimit); + AddDependency(resourceWeightLimit->Enabled()); Update(); } @@ -916,11 +935,10 @@ void TComputeScheduler::UpdateGroupShare(TString group, double share, TMonotonic TParameter* weightEnabled = Impl->WeightsUpdater.FindOrAddParameter({group, TImpl::ResourceWeightEnabled}, resourceWeight.has_value()); weightEnabled->SetValue(resourceWeight.has_value()); - TParameter* resourceWeightValue = Impl->WeightsUpdater.FindOrAddParameter({group, TImpl::ResourceWeight}, resourceWeight.value_or(0)); - - TParameter* taskscount = Impl->WeightsUpdater.FindOrAddParameter({group, TImpl::TasksCount}, 0); - if (!ptr) { + TParameter* resourceWeightValue = Impl->WeightsUpdater.FindOrAddParameter({group, TImpl::ResourceWeight}, resourceWeight.value_or(0)); + TParameter* taskscount = Impl->WeightsUpdater.FindOrAddParameter({group, TImpl::TasksCount}, 0); + auto resourceLimitValue = MakeHolder( &Impl->SumCores, taskscount, @@ -932,8 +950,9 @@ void TComputeScheduler::UpdateGroupShare(TString group, double share, TMonotonic auto compositeWeight = MakeHolder(shareValue, resourceLimitValue.Get(), weightEnabled); auto cap = MakeHolder(&Impl->SumCores, compositeWeight.Get()); + Impl->WeightsUpdater.AddValue({group, TImpl::ResourceLimitValue}, THolder(resourceLimitValue.Release())); Impl->WeightsUpdater.AddValue({group, TImpl::CompositeShare}, std::move(compositeWeight)); - Impl->CreateGroup(std::move(cap), now, group, std::move(resourceLimitValue)); + Impl->CreateGroup(std::move(cap), now, group); } else { auto& record = Impl->Records[*ptr]; record->MutableStats.Next()->Disabled = false; diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler_ut.cpp b/ydb/core/kqp/runtime/kqp_compute_scheduler_ut.cpp index c7105a804089..273eea6c9f8d 100644 --- a/ydb/core/kqp/runtime/kqp_compute_scheduler_ut.cpp +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler_ut.cpp @@ -14,6 +14,7 @@ Y_UNIT_TEST_SUITE(TComputeScheduler) { auto& handle = handles[i]; if (!handle.Delay(now)) { handle.TrackTime(batch, now); + results[i] += batch; } } scheduler.AdvanceTime(now); @@ -26,24 +27,93 @@ Y_UNIT_TEST_SUITE(TComputeScheduler) { TComputeScheduler scheduler; scheduler.UpdateGroupShare("first", 0.4, TMonotonic::Zero(), std::nullopt); scheduler.UpdateGroupShare("second", 0.4, TMonotonic::Zero(), std::nullopt); + scheduler.SetMaxDeviation(TDuration::MilliSeconds(10)); scheduler.SetCapacity(2); TVector handles; handles.push_back(scheduler.Enroll("first", 1, TMonotonic::Zero())); + handles.push_back(scheduler.Enroll("second", 1, TMonotonic::Zero())); auto times = RunSimulation(scheduler, handles, TMonotonic::Zero() + TDuration::MilliSeconds(10), - TDuration::MilliSeconds(5), + TDuration::MilliSeconds(1), TDuration::Seconds(2), TDuration::MilliSeconds(10)); for (auto& time : times) { + Cerr << time.MilliSeconds() << " " << (TDuration::Seconds(2) * 0.8).MilliSeconds() << Endl; UNIT_ASSERT_LE(time, TDuration::Seconds(2) * 0.8 + TDuration::MilliSeconds(10)); UNIT_ASSERT_GE(time, TDuration::Seconds(2) * 0.8 - TDuration::MilliSeconds(10)); } } Y_UNIT_TEST(QueryLimits) { + TComputeScheduler scheduler; + scheduler.UpdateGroupShare("first", 0.4, TMonotonic::Zero(), std::nullopt); + scheduler.SetMaxDeviation(TDuration::MilliSeconds(1)); + scheduler.UpdatePerQueryShare("first", 0.5, TMonotonic::Zero()); + scheduler.SetCapacity(2); + TVector handles; + handles.push_back(scheduler.Enroll("first", 1, TMonotonic::Zero())); + handles.push_back(scheduler.Enroll("first", 1, TMonotonic::Zero())); + for (auto& handle : handles) { + auto group = scheduler.MakePerQueryGroup(TMonotonic::Zero(), 0.5, "first"); + scheduler.AddToGroup(TMonotonic::Zero(), group, handle); + } + auto times = RunSimulation(scheduler, + handles, + TMonotonic::Zero() + TDuration::MilliSeconds(10), + TDuration::MilliSeconds(1), + TDuration::Seconds(2), + TDuration::MilliSeconds(10)); + for (auto& time : times) { + Cerr << time.MilliSeconds() << " " << (TDuration::Seconds(2) * 0.4).MilliSeconds() << Endl; + UNIT_ASSERT_LE(time, TDuration::Seconds(2) * 0.4 + TDuration::MilliSeconds(10)); + UNIT_ASSERT_GE(time, TDuration::Seconds(2) * 0.4 - TDuration::MilliSeconds(10)); + } } Y_UNIT_TEST(ResourceWeight) { + TComputeScheduler scheduler; + scheduler.UpdateGroupShare("first", 1, TMonotonic::Zero(), 1); + scheduler.UpdateGroupShare("second", 1, TMonotonic::Zero(), 3); + scheduler.SetMaxDeviation(TDuration::MilliSeconds(1)); + scheduler.SetCapacity(1); + TVector handles; + handles.push_back(scheduler.Enroll("first", 1, TMonotonic::Zero())); + handles.push_back(scheduler.Enroll("second", 1, TMonotonic::Zero())); + scheduler.AdvanceTime(TMonotonic::Zero()); + auto times = RunSimulation(scheduler, + handles, + TMonotonic::Zero() + TDuration::MilliSeconds(10), + TDuration::MilliSeconds(1), + TDuration::Seconds(2), + TDuration::MilliSeconds(10)); + + Cerr << times[0].MilliSeconds() << " " << (TDuration::Seconds(2) /4).MilliSeconds() << Endl; + UNIT_ASSERT_LE(times[0], TDuration::Seconds(2) /4 + TDuration::MilliSeconds(10)); + UNIT_ASSERT_GE(times[0], TDuration::Seconds(2) /4 - TDuration::MilliSeconds(10)); + + Cerr << times[1].MilliSeconds() << " " << (TDuration::Seconds(2)*3 /4).MilliSeconds() << Endl; + UNIT_ASSERT_LE(times[1], TDuration::Seconds(2)*3 /4 + TDuration::MilliSeconds(10)); + UNIT_ASSERT_GE(times[1], TDuration::Seconds(2)*3 /4 - TDuration::MilliSeconds(10)); + + scheduler.Deregister(handles[1], TMonotonic::Zero() + TDuration::Seconds(2)); + handles.pop_back(); + + scheduler.UpdateGroupShare("third", 0.5, TMonotonic::Zero() + TDuration::Seconds(2), 2); + handles.push_back(scheduler.Enroll("third", 1, TMonotonic::Zero() + TDuration::Seconds(2))); + times = RunSimulation(scheduler, + handles, + TMonotonic::Zero() + TDuration::Seconds(2), + TDuration::MilliSeconds(1), + TDuration::Seconds(2), + TDuration::MilliSeconds(10)); + + Cerr << times[0].MilliSeconds() << " " << (TDuration::Seconds(2) /2).MilliSeconds() << Endl; + UNIT_ASSERT_LE(times[0], TDuration::Seconds(2) /2 + TDuration::MilliSeconds(10)); + UNIT_ASSERT_GE(times[0], TDuration::Seconds(2) /2 - TDuration::MilliSeconds(10)); + + Cerr << times[1].MilliSeconds() << " " << (TDuration::Seconds(2) /2).MilliSeconds() << Endl; + UNIT_ASSERT_LE(times[1], TDuration::Seconds(2) /2 + TDuration::MilliSeconds(10)); + UNIT_ASSERT_GE(times[1], TDuration::Seconds(2) /2 - TDuration::MilliSeconds(10)); } }