From 901cd46bc7655f0fc0c9c32c59dc2fececac3e1a Mon Sep 17 00:00:00 2001 From: Aleksandr Kriukov Date: Fri, 20 Sep 2024 10:43:50 +0000 Subject: [PATCH] Revert "Merge to 24-3: fix harmonizer logic with hoggish pools (#9477)" This reverts commit ec987560c1b62f7ce63b89cfb95e28d57a12e399. --- ydb/library/actors/core/executor_pool.h | 8 +- .../actors/core/executor_pool_basic.cpp | 10 +- ydb/library/actors/core/executor_pool_io.cpp | 4 +- .../actors/core/executor_pool_shared.cpp | 8 +- ydb/library/actors/core/harmonizer.cpp | 333 ++++++---------- ydb/library/actors/core/harmonizer.h | 22 +- ydb/library/actors/core/harmonizer_ut.cpp | 360 ------------------ ydb/library/actors/core/mon_stats.h | 8 +- ydb/library/actors/core/probes.h | 4 +- ydb/library/actors/core/ut/ya.make | 1 - .../actors/helpers/pool_stats_collector.h | 45 +-- 11 files changed, 169 insertions(+), 634 deletions(-) delete mode 100644 ydb/library/actors/core/harmonizer_ut.cpp diff --git a/ydb/library/actors/core/executor_pool.h b/ydb/library/actors/core/executor_pool.h index 1260c76e4efd..9e283c7a5b35 100644 --- a/ydb/library/actors/core/executor_pool.h +++ b/ydb/library/actors/core/executor_pool.h @@ -15,13 +15,13 @@ namespace NActors { class ISchedulerCookie; struct TCpuConsumption { - double ElapsedUs = 0; - double CpuUs = 0; + double ConsumedUs = 0; + double BookedUs = 0; ui64 NotEnoughCpuExecutions = 0; void Add(const TCpuConsumption& other) { - ElapsedUs += other.ElapsedUs; - CpuUs += other.CpuUs; + ConsumedUs += other.ConsumedUs; + BookedUs += other.BookedUs; NotEnoughCpuExecutions += other.NotEnoughCpuExecutions; } }; diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index c94e73661c08..284392c36961 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -408,10 +408,10 @@ namespace NActors { poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState; poolStats.DecreasingThreadsByExchange = stats.DecreasingThreadsByExchange; poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount; - poolStats.MaxElapsedCpuUs = stats.MaxElapsedCpu; - poolStats.MinElapsedCpuUs = stats.MinElapsedCpu; - poolStats.MaxCpuUs = stats.MaxCpu; - poolStats.MinCpuUs = stats.MinCpu; + poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu; + poolStats.MinConsumedCpuUs = stats.MinConsumedCpu; + poolStats.MaxBookedCpuUs = stats.MaxBookedCpu; + poolStats.MinBookedCpuUs = stats.MinBookedCpu; } statsCopy.resize(MaxFullThreadCount + 1); @@ -430,7 +430,7 @@ namespace NActors { void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const { if (Harmonizer) { TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); - poolState.UsedCpu = stats.AvgElapsedCpu; + poolState.UsedCpu = stats.AvgConsumedCpu; poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount; } else { poolState.PossibleMaxLimit = poolState.MaxLimit; diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp index 954fff86d079..31592f06a6ed 100644 --- a/ydb/library/actors/core/executor_pool_io.cpp +++ b/ydb/library/actors/core/executor_pool_io.cpp @@ -140,7 +140,7 @@ namespace NActors { void TIOExecutorPool::GetCurrentStats(TExecutorPoolStats& poolStats, TVector& statsCopy) const { poolStats.CurrentThreadCount = PoolThreads; - poolStats.DefaultThreadCount = 0; + poolStats.DefaultThreadCount = PoolThreads; poolStats.MaxThreadCount = PoolThreads; poolStats.PotentialMaxThreadCount = PoolThreads; statsCopy.resize(PoolThreads + 1); @@ -156,7 +156,7 @@ namespace NActors { void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const { if (Harmonizer) { TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); - poolState.UsedCpu = stats.AvgElapsedCpu; + poolState.UsedCpu = stats.AvgConsumedCpu; } poolState.CurrentLimit = PoolThreads; poolState.MaxLimit = PoolThreads; diff --git a/ydb/library/actors/core/executor_pool_shared.cpp b/ydb/library/actors/core/executor_pool_shared.cpp index 437989ecbc4b..2424c18c78be 100644 --- a/ydb/library/actors/core/executor_pool_shared.cpp +++ b/ydb/library/actors/core/executor_pool_shared.cpp @@ -143,16 +143,16 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) { } void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector& statsCopy) { - statsCopy.resize(SharedThreadCount); + statsCopy.resize(SharedThreadCount + 1); for (i16 i = 0; i < SharedThreadCount; ++i) { - Threads[i].Thread->GetSharedStats(poolId, statsCopy[i]); + Threads[i].Thread->GetSharedStats(poolId, statsCopy[i + 1]); } } void TSharedExecutorPool::GetSharedStatsForHarmonizer(i16 poolId, std::vector& statsCopy) { - statsCopy.resize(SharedThreadCount); + statsCopy.resize(SharedThreadCount + 1); for (i16 i = 0; i < SharedThreadCount; ++i) { - Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i]); + Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i + 1]); } } diff --git a/ydb/library/actors/core/harmonizer.cpp b/ydb/library/actors/core/harmonizer.cpp index fd90fc6e8767..df6516421698 100644 --- a/ydb/library/actors/core/harmonizer.cpp +++ b/ydb/library/actors/core/harmonizer.cpp @@ -21,15 +21,6 @@ namespace NActors { -namespace { - constexpr bool IsDebug = false; - constexpr bool IsDebugByThread = false; - -#define DEBUG_PRINT(x) do { if constexpr (IsDebug) { (Cerr << x); } } while (0) -#define DEBUG_PRINT_BY_THREAD(x) do { if constexpr (IsDebug && IsDebugByThread) { (Cerr << x); } } while (0) - -} - LWTRACE_USING(ACTORLIB_PROVIDER); constexpr bool CheckBinaryPower(ui64 value) { @@ -173,8 +164,8 @@ struct TValueHistory { }; struct TThreadInfo { - TValueHistory<8> Elapsed; - TValueHistory<8> Cpu; + TValueHistory<8> Consumed; + TValueHistory<8> Booked; }; struct TPoolInfo { @@ -209,26 +200,26 @@ struct TPoolInfo { TAtomic DecreasingThreadsByExchange = 0; TAtomic PotentialMaxThreadCount = 0; - TValueHistory<16> Elapsed; - TValueHistory<16> Cpu; + TValueHistory<16> Consumed; + TValueHistory<16> Booked; - std::atomic MaxElapsed = 0; - std::atomic MinElapsed = 0; - std::atomic AvgElapsed = 0; - std::atomic MaxCpu = 0; - std::atomic MinCpu = 0; + std::atomic MaxConsumedCpu = 0; + std::atomic MinConsumedCpu = 0; + std::atomic AvgConsumedCpu = 0; + std::atomic MaxBookedCpu = 0; + std::atomic MinBookedCpu = 0; std::unique_ptr> WaitingStats; std::unique_ptr> MovingWaitingStats; - double GetElapsed(i16 threadIdx); - double GetSharedElapsed(i16 threadIdx); - double GetLastSecondElapsed(i16 threadIdx); - double GetLastSecondSharedElapsed(i16 threadIdx); - double GetCpu(i16 threadIdx); - double GetSharedCpu(i16 threadIdx); - double GetLastSecondCpu(i16 threadIdx); - double GetLastSecondSharedCpu(i16 threadIdx); + double GetBooked(i16 threadIdx); + double GetSharedBooked(i16 threadIdx); + double GetLastSecondBooked(i16 threadIdx); + double GetLastSecondSharedBooked(i16 threadIdx); + double GetConsumed(i16 threadIdx); + double GetSharedConsumed(i16 threadIdx); + double GetLastSecondConsumed(i16 threadIdx); + double GetLastSecondSharedConsumed(i16 threadIdx); TCpuConsumption PullStats(ui64 ts); i16 GetFullThreadCount(); float GetThreadCount(); @@ -236,76 +227,73 @@ struct TPoolInfo { bool IsAvgPingGood(); }; -double TPoolInfo::GetCpu(i16 threadIdx) { +double TPoolInfo::GetBooked(i16 threadIdx) { if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].Cpu.GetAvgPart(); + return ThreadInfo[threadIdx].Booked.GetAvgPart(); } return 0.0; } -double TPoolInfo::GetSharedCpu(i16 threadIdx) { +double TPoolInfo::GetSharedBooked(i16 threadIdx) { if ((size_t)threadIdx < SharedInfo.size()) { - return SharedInfo[threadIdx].Cpu.GetAvgPart(); + return SharedInfo[threadIdx].Booked.GetAvgPart(); } return 0.0; } -double TPoolInfo::GetLastSecondCpu(i16 threadIdx) { +double TPoolInfo::GetLastSecondBooked(i16 threadIdx) { if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].Cpu.GetAvgPartForLastSeconds(1); + return ThreadInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1); } return 0.0; } -double TPoolInfo::GetLastSecondSharedCpu(i16 threadIdx) { +double TPoolInfo::GetLastSecondSharedBooked(i16 threadIdx) { if ((size_t)threadIdx < SharedInfo.size()) { - return SharedInfo[threadIdx].Cpu.GetAvgPartForLastSeconds(1); + return SharedInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1); } return 0.0; } -double TPoolInfo::GetElapsed(i16 threadIdx) { +double TPoolInfo::GetConsumed(i16 threadIdx) { if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].Elapsed.GetAvgPart(); + return ThreadInfo[threadIdx].Consumed.GetAvgPart(); } return 0.0; } -double TPoolInfo::GetSharedElapsed(i16 threadIdx) { +double TPoolInfo::GetSharedConsumed(i16 threadIdx) { if ((size_t)threadIdx < SharedInfo.size()) { - return SharedInfo[threadIdx].Elapsed.GetAvgPart(); + return SharedInfo[threadIdx].Consumed.GetAvgPart(); } return 0.0; } -double TPoolInfo::GetLastSecondElapsed(i16 threadIdx) { +double TPoolInfo::GetLastSecondConsumed(i16 threadIdx) { if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].Elapsed.GetAvgPartForLastSeconds(1); + return ThreadInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1); } return 0.0; } -double TPoolInfo::GetLastSecondSharedElapsed(i16 threadIdx) { +double TPoolInfo::GetLastSecondSharedConsumed(i16 threadIdx) { if ((size_t)threadIdx < SharedInfo.size()) { - return SharedInfo[threadIdx].Elapsed.GetAvgPartForLastSeconds(1); + return SharedInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1); } return 0.0; } #define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7] TCpuConsumption TPoolInfo::PullStats(ui64 ts) { - DEBUG_PRINT("TPoolInfo::PullStats " << ts << " PoolId: " << Pool->PoolId << " Name: " << Pool->GetName() << "\n"); TCpuConsumption acc; for (i16 threadIdx = 0; threadIdx < MaxFullThreadCount; ++threadIdx) { - DEBUG_PRINT_BY_THREAD(" threadIdx: " << threadIdx << "\n"); TThreadInfo &threadInfo = ThreadInfo[threadIdx]; TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx); - DEBUG_PRINT_BY_THREAD(" CpuUs: " << cpuConsumption.CpuUs << " ElapsedUs: " << cpuConsumption.ElapsedUs << " NotEnoughCpuExecutions: " << cpuConsumption.NotEnoughCpuExecutions << "\n"); acc.Add(cpuConsumption); - threadInfo.Elapsed.Register(ts, cpuConsumption.ElapsedUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Elapsed.History)); - threadInfo.Cpu.Register(ts, cpuConsumption.CpuUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Cpu.History)); + threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History)); + threadInfo.Booked.Register(ts, cpuConsumption.BookedUs); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History)); } TVector sharedStats; if (Shared) { @@ -320,21 +308,19 @@ TCpuConsumption TPoolInfo::PullStats(ui64 ts) { stat.NotEnoughCpuExecutions }; acc.Add(sharedConsumption); - SharedInfo[sharedIdx].Elapsed.Register(ts, sharedConsumption.ElapsedUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_consumed", UNROLL_HISTORY(SharedInfo[sharedIdx].Elapsed.History)); - SharedInfo[sharedIdx].Cpu.Register(ts, sharedConsumption.CpuUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_booked", UNROLL_HISTORY(SharedInfo[sharedIdx].Cpu.History)); - } - - DEBUG_PRINT(" ElapsedUs: " << acc.ElapsedUs << " CpuUs: " << acc.CpuUs << "\n"); - - Elapsed.Register(ts, acc.ElapsedUs); - MaxElapsed.store(Elapsed.GetMax() / 1'000'000, std::memory_order_relaxed); - MinElapsed.store(Elapsed.GetMin() / 1'000'000, std::memory_order_relaxed); - AvgElapsed.store(Elapsed.GetAvgPart() / 1'000'000, std::memory_order_relaxed); - Cpu.Register(ts, acc.CpuUs); - MaxCpu.store(Cpu.GetMax() / 1'000'000, std::memory_order_relaxed); - MinCpu.store(Cpu.GetMin() / 1'000'000, std::memory_order_relaxed); + SharedInfo[sharedIdx].Consumed.Register(ts, sharedConsumption.ConsumedUs); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_consumed", UNROLL_HISTORY(SharedInfo[sharedIdx].Consumed.History)); + SharedInfo[sharedIdx].Booked.Register(ts, sharedConsumption.BookedUs); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_booked", UNROLL_HISTORY(SharedInfo[sharedIdx].Booked.History)); + } + + Consumed.Register(ts, acc.ConsumedUs); + MaxConsumedCpu.store(Consumed.GetMax() / 1'000'000, std::memory_order_relaxed); + MinConsumedCpu.store(Consumed.GetMin() / 1'000'000, std::memory_order_relaxed); + AvgConsumedCpu.store(Consumed.GetAvgPart() / 1'000'000, std::memory_order_relaxed); + Booked.Register(ts, acc.BookedUs); + MaxBookedCpu.store(Booked.GetMax() / 1'000'000, std::memory_order_relaxed); + MinBookedCpu.store(Booked.GetMin() / 1'000'000, std::memory_order_relaxed); NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions; NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions; if (WaitingStats && BasicPool) { @@ -379,13 +365,13 @@ class THarmonizer: public IHarmonizer { std::vector> Pools; std::vector PriorityOrder; - TValueHistory<16> Elapsed; - TValueHistory<16> Cpu; + TValueHistory<16> Consumed; + TValueHistory<16> Booked; - TAtomic MaxElapsedCpu = 0; - TAtomic MinElapsedCpu = 0; - TAtomic MaxCpu = 0; - TAtomic MinCpu = 0; + TAtomic MaxConsumedCpu = 0; + TAtomic MinConsumedCpu = 0; + TAtomic MaxBookedCpu = 0; + TAtomic MinBookedCpu = 0; TSharedExecutorPool* Shared = nullptr; @@ -420,35 +406,32 @@ double THarmonizer::Rescale(double value) const { } void THarmonizer::PullStats(ui64 ts) { - DEBUG_PRINT("THarmonizer::PullStats " << ts << "\n"); TCpuConsumption acc; for (auto &pool : Pools) { TCpuConsumption consumption = pool->PullStats(ts); acc.Add(consumption); } - Elapsed.Register(ts, acc.ElapsedUs); - RelaxedStore(&MaxElapsedCpu, Elapsed.GetMaxInt()); - RelaxedStore(&MinElapsedCpu, Elapsed.GetMinInt()); - Cpu.Register(ts, acc.CpuUs); - RelaxedStore(&MaxCpu, Cpu.GetMaxInt()); - RelaxedStore(&MinCpu, Cpu.GetMinInt()); + Consumed.Register(ts, acc.ConsumedUs); + RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt()); + RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt()); + Booked.Register(ts, acc.BookedUs); + RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt()); + RelaxedStore(&MinBookedCpu, Booked.GetMinInt()); } -Y_FORCE_INLINE bool IsStarved(double elapsed, double cpu) { - return std::max(elapsed, cpu) > 0.5 && elapsed > cpu + std::min(0.5, elapsed * 0.3); +Y_FORCE_INLINE bool IsStarved(double consumed, double booked) { + return Max(consumed, booked) > 0.1 && consumed < booked * 0.7; } -Y_FORCE_INLINE bool IsHoggish(double cpu, double currentThreadCount) { - return cpu < currentThreadCount - 0.5; +Y_FORCE_INLINE bool IsHoggish(double booked, double currentThreadCount) { + return booked < currentThreadCount - 0.5; } void THarmonizer::HarmonizeImpl(ui64 ts) { - DEBUG_PRINT("THarmonizer::HarmonizeImpl " << ts << "\n"); bool isStarvedPresent = false; - double cpu = 0.0; - double elapsed = 0.0; - double lastSecondCpu = 0.0; - double lastSecondElapsed = 0.0; + double booked = 0.0; + double consumed = 0.0; + double lastSecondBooked = 0.0; i64 beingStopped = 0; double total = 0; TStackVec needyPools; @@ -532,63 +515,49 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { } for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { - DEBUG_PRINT(" poolIdx: " << poolIdx << " PoolId: " << Pools[poolIdx]->Pool->PoolId << " Name: " << Pools[poolIdx]->Pool->GetName() << "\n"); TPoolInfo& pool = *Pools[poolIdx]; total += pool.DefaultThreadCount; i16 currentFullThreadCount = pool.GetFullThreadCount(); - DEBUG_PRINT(" currentFullThreadCount: " << currentFullThreadCount << "\n"); sumOfAdditionalThreads += Max(0, currentFullThreadCount - pool.DefaultFullThreadCount); - DEBUG_PRINT(" sumOfAdditionalThreads: " << sumOfAdditionalThreads << "\n"); float currentThreadCount = pool.GetThreadCount(); - DEBUG_PRINT(" currentThreadCount: " << currentThreadCount << "\n"); - double poolCpu = 0.0; - double poolElapsed = 0.0; - double lastSecondPoolCpu = 0.0; - double lastSecondPoolElapsed = 0.0; + + double poolBooked = 0.0; + double poolConsumed = 0.0; + double lastSecondPoolBooked = 0.0; + double lastSecondPoolConsumed = 0.0; beingStopped += pool.Pool->GetBlockingThreadCount(); - DEBUG_PRINT(" beingStopped: " << beingStopped << "\n"); + for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) { - DEBUG_PRINT_BY_THREAD(" threadIdx: " << threadIdx << "\n"); - double threadElapsed = Rescale(pool.GetElapsed(threadIdx)); - double threadLastSecondElapsed = Rescale(pool.GetLastSecondElapsed(threadIdx)); - double threadCpu = Rescale(pool.GetCpu(threadIdx)); - double threadLastSecondCpu = Rescale(pool.GetLastSecondCpu(threadIdx)); - DEBUG_PRINT_BY_THREAD(" threadCpu: " << threadCpu << "\n"); - DEBUG_PRINT_BY_THREAD(" threadLastSecondCpu: " << threadLastSecondCpu << "\n"); - DEBUG_PRINT_BY_THREAD(" threadElapsed: " << threadElapsed << "\n"); - DEBUG_PRINT_BY_THREAD(" threadLastSecondElapsed: " << threadLastSecondElapsed << "\n"); - poolElapsed += threadElapsed; - lastSecondPoolElapsed += threadLastSecondElapsed; - poolCpu += threadCpu; - lastSecondPoolCpu += threadLastSecondCpu; - LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), threadIdx, threadCpu, threadElapsed, threadLastSecondCpu, threadLastSecondElapsed); + double threadBooked = Rescale(pool.GetBooked(threadIdx)); + double threadLastSecondBooked = Rescale(pool.GetLastSecondBooked(threadIdx)); + double threadConsumed = Rescale(pool.GetConsumed(threadIdx)); + double threadLastSecondConsumed = Rescale(pool.GetLastSecondConsumed(threadIdx)); + poolBooked += threadBooked; + lastSecondPoolBooked += threadLastSecondBooked; + poolConsumed += threadConsumed; + lastSecondPoolConsumed += threadLastSecondConsumed; + LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), threadIdx, threadBooked, threadConsumed, threadLastSecondBooked, threadLastSecondConsumed); } for (ui32 sharedIdx = 0; sharedIdx < pool.SharedInfo.size(); ++sharedIdx) { - double sharedElapsed = Rescale(pool.GetSharedElapsed(sharedIdx)); - double sharedLastSecondElapsed = Rescale(pool.GetLastSecondSharedElapsed(sharedIdx)); - double sharedCpu = Rescale(pool.GetSharedCpu(sharedIdx)); - double sharedLastSecondCpu = Rescale(pool.GetLastSecondSharedCpu(sharedIdx)); - poolElapsed += sharedElapsed; - lastSecondPoolElapsed += sharedLastSecondElapsed; - poolCpu += sharedCpu; - lastSecondPoolCpu += sharedLastSecondCpu; - LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), -1 - sharedIdx, sharedCpu, sharedElapsed, sharedLastSecondCpu, sharedLastSecondElapsed); + double sharedBooked = Rescale(pool.GetSharedBooked(sharedIdx)); + double sharedLastSecondBooked = Rescale(pool.GetLastSecondSharedBooked(sharedIdx)); + double sharedConsumed = Rescale(pool.GetSharedConsumed(sharedIdx)); + double sharedLastSecondConsumed = Rescale(pool.GetLastSecondSharedConsumed(sharedIdx)); + poolBooked += sharedBooked; + lastSecondPoolBooked += sharedLastSecondBooked; + poolConsumed += sharedConsumed; + lastSecondPoolConsumed += sharedLastSecondConsumed; + LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), -1 - sharedIdx, sharedBooked, sharedConsumed, sharedLastSecondBooked, sharedLastSecondConsumed); } - DEBUG_PRINT(" poolElapsed: " << poolElapsed << "\n"); - DEBUG_PRINT(" lastSecondPoolElapsed: " << lastSecondPoolElapsed << "\n"); - DEBUG_PRINT(" poolCpu: " << poolCpu << "\n"); - DEBUG_PRINT(" lastSecondPoolCpu: " << lastSecondPoolCpu << "\n"); - - bool isStarved = IsStarved(poolElapsed, poolCpu) || IsStarved(lastSecondPoolElapsed, lastSecondPoolCpu); + bool isStarved = IsStarved(poolConsumed, poolBooked) || IsStarved(lastSecondPoolConsumed, lastSecondPoolBooked); if (isStarved) { isStarvedPresent = true; } - DEBUG_PRINT(" isStarved: " << isStarved << "\n"); - bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (poolCpu >= currentThreadCount); + bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (poolBooked >= currentThreadCount); if (pool.AvgPingCounter) { if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) { isNeedy = false; @@ -596,9 +565,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { pool.LastUpdateTs = ts; } } - DEBUG_PRINT(" isNeedy: " << isNeedy << "\n"); - - if (currentThreadCount - poolCpu > 0.5) { + if (currentThreadCount - poolBooked > 0.5) { if (hasBorrowedSharedThread[poolIdx] || hasSharedThreadWhichWasNotBorrowed[poolIdx]) { freeHalfThread.push_back(poolIdx); } @@ -607,32 +574,23 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { if (isNeedy) { needyPools.push_back(poolIdx); } - - bool isHoggish = IsHoggish(poolElapsed, currentThreadCount) - || IsHoggish(lastSecondPoolElapsed, currentThreadCount); + bool isHoggish = IsHoggish(poolBooked, currentThreadCount) + || IsHoggish(lastSecondPoolBooked, currentThreadCount); if (isHoggish) { - hoggishPools.push_back({poolIdx, std::max(currentThreadCount - poolElapsed, currentThreadCount - lastSecondPoolElapsed)}); + hoggishPools.push_back({poolIdx, std::max(poolBooked - currentThreadCount, lastSecondPoolBooked - currentThreadCount)}); } - DEBUG_PRINT(" isHoggish: " << isHoggish << "\n"); - - cpu += poolCpu; - elapsed += poolElapsed; - lastSecondCpu += lastSecondPoolCpu; - lastSecondElapsed += lastSecondPoolElapsed; + booked += poolBooked; + consumed += poolConsumed; AtomicSet(pool.LastFlags, (i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2)); - LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolCpu, poolElapsed, lastSecondPoolCpu, lastSecondPoolElapsed, currentThreadCount, pool.MaxFullThreadCount, isStarved, isNeedy, isHoggish); + LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, currentThreadCount, pool.MaxFullThreadCount, isStarved, isNeedy, isHoggish); } - Y_UNUSED(lastSecondElapsed); - DEBUG_PRINT(" total: " << total << " cpu: " << cpu << " lastSecondCpu: " << lastSecondCpu << "\n"); - double budget = total - Max(cpu, lastSecondCpu); - DEBUG_PRINT(" budget: " << budget << "\n"); + double budget = total - Max(booked, lastSecondBooked); i16 budgetInt = static_cast(Max(budget, 0.0)); - DEBUG_PRINT(" budgetInt: " << budgetInt << "\n"); if (budget < -0.1) { isStarvedPresent = true; } - double overbooked = elapsed - cpu; + double overbooked = consumed - booked; if (overbooked < 0) { isStarvedPresent = false; } @@ -659,7 +617,6 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { // last_starved_at_consumed_value = сумма по всем пулам consumed; // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, // использовать вместо total - DEBUG_PRINT(" isStarvedPresent\n"); if (beingStopped && beingStopped >= overbooked) { // do nothing } else { @@ -668,7 +625,6 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { i64 threadCount = pool.GetFullThreadCount(); if (hasSharedThread[poolIdx] && !hasSharedThreadWhichWasNotBorrowed[poolIdx]) { Shared->ReturnOwnHalfThread(poolIdx); - overbooked -= 0.5; } while (threadCount > pool.DefaultFullThreadCount) { pool.SetFullThreadCount(--threadCount); @@ -690,7 +646,6 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { for (size_t needyPoolIdx : needyPools) { TPoolInfo &pool = *Pools[needyPoolIdx]; i64 threadCount = pool.GetFullThreadCount(); - DEBUG_PRINT(" needyPoolIdx: " << needyPoolIdx << " poolId: " << pool.Pool->PoolId << " poolName: " << pool.Pool->GetName() << " threadCount: " << threadCount << " budget: " << budget << "\n"); if (budget >= 1.0) { if (threadCount + 1 <= pool.MaxFullThreadCount) { AtomicIncrement(pool.IncreasingThreadsByNeedyState); @@ -698,7 +653,6 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { sumOfAdditionalThreads++; pool.SetFullThreadCount(threadCount + 1); budget -= 1.0; - DEBUG_PRINT(" added 1 thread budget: " << budget << "\n"); LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); } } else if (Shared && budget >= 0.5 && !hasBorrowedSharedThread[needyPoolIdx] && freeHalfThread.size()) { @@ -706,7 +660,6 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { freeHalfThread.pop_back(); isNeedyByPool[needyPoolIdx] = false; budget -= 0.5; - DEBUG_PRINT(" added 0.5 thread budget: " << budget << "\n"); } if constexpr (NFeatures::IsLocalQueues()) { bool needToExpandLocalQueue = budget < 1.0 || threadCount >= pool.MaxFullThreadCount; @@ -722,33 +675,26 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { if (budget < 1.0) { size_t takingAwayThreads = 0; - DEBUG_PRINT(" takingAwayThreads: " << takingAwayThreads << " sumOfAdditionalThreads: " << sumOfAdditionalThreads << " budget: " << budget << "\n"); for (size_t needyPoolIdx : needyPools) { TPoolInfo &pool = *Pools[needyPoolIdx]; i64 threadCount = pool.GetFullThreadCount(); sumOfAdditionalThreads -= threadCount - pool.DefaultFullThreadCount; - DEBUG_PRINT(" needyPoolIdx: " << needyPoolIdx << " poolId: " << pool.Pool->PoolId << " poolName: " << pool.Pool->GetName() << " threadCount: " << threadCount << " budget: " << budget << "\n"); - DEBUG_PRINT(" sumOfAdditionalThreads: " << sumOfAdditionalThreads << " takingAwayThreads: " << takingAwayThreads << "\n"); if (sumOfAdditionalThreads < takingAwayThreads + 1) { - DEBUG_PRINT(" sumOfAdditionalThreads < takingAwayThreads + 1\n"); break; } if (!isNeedyByPool[needyPoolIdx]) { - DEBUG_PRINT(" !isNeedyByPool[needyPoolIdx]\n"); continue; } AtomicIncrement(pool.IncreasingThreadsByExchange); isNeedyByPool[needyPoolIdx] = false; takingAwayThreads++; pool.SetFullThreadCount(threadCount + 1); - DEBUG_PRINT(" added 1 thread by exchanging budget: " << budget << " takingAwayThreads: " << takingAwayThreads << " sumOfAdditionalThreads: " << sumOfAdditionalThreads << "\n"); + LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); } for (ui16 poolIdx : PriorityOrder) { - DEBUG_PRINT(" poolIdx: " << poolIdx << " takingAwayThreads: " << takingAwayThreads << " budget: " << budget << "\n"); if (takingAwayThreads <= 0) { - DEBUG_PRINT(" takingAwayThreads <= 0\n"); break; } @@ -756,16 +702,14 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { size_t threadCount = pool.GetFullThreadCount(); size_t additionalThreadsCount = Max(0L, threadCount - pool.DefaultFullThreadCount); size_t currentTakingAwayThreads = Min(additionalThreadsCount, takingAwayThreads); - DEBUG_PRINT(" poolIdx: " << poolIdx << " poolId: " << pool.Pool->PoolId << " poolName: " << pool.Pool->GetName() << " threadCount: " << threadCount << " additionalThreadsCount: " << additionalThreadsCount << " currentTakingAwayThreads: " << currentTakingAwayThreads << "\n"); if (!currentTakingAwayThreads) { - DEBUG_PRINT(" !currentTakingAwayThreads\n"); continue; } takingAwayThreads -= currentTakingAwayThreads; pool.SetFullThreadCount(threadCount - currentTakingAwayThreads); - DEBUG_PRINT(" takingAwayThreads by exchanging: " << currentTakingAwayThreads << " takingAwayThreads: " << takingAwayThreads << "\n"); - AtomicAdd(pool.DecreasingThreadsByExchange, currentTakingAwayThreads); + + AtomicAdd(pool.DecreasingThreadsByExchange, takingAwayThreads); LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", threadCount - currentTakingAwayThreads, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); } } @@ -773,11 +717,9 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { for (auto &[hoggishPoolIdx, freeCpu] : hoggishPools) { TPoolInfo &pool = *Pools[hoggishPoolIdx]; i64 threadCount = pool.GetFullThreadCount(); - DEBUG_PRINT(" hoggishPoolIdx: " << hoggishPoolIdx << " poolId: " << pool.Pool->PoolId << " poolName: " << pool.Pool->GetName() << " threadCount: " << threadCount << " freeCpu: " << freeCpu << "\n"); if (hasBorrowedSharedThread[hoggishPoolIdx]) { Shared->ReturnBorrowedHalfThread(hoggishPoolIdx); freeCpu -= 0.5; - DEBUG_PRINT(" return borrowed half thread freeCpu: " << freeCpu << "\n"); continue; } if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) { @@ -788,14 +730,12 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { AtomicIncrement(pool.DecreasingThreadsByHoggishState); LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); pool.SetFullThreadCount(threadCount - 1); - DEBUG_PRINT(" decrease by hoggish threadCount: " << threadCount - 1 << "\n"); } } for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { TPoolInfo& pool = *Pools[poolIdx]; AtomicSet(pool.PotentialMaxThreadCount, std::min(pool.MaxThreadCount, pool.GetThreadCount() + budgetInt)); - DEBUG_PRINT(" poolIdx: " << poolIdx << " PotentialMaxThreadCount: " << pool.PotentialMaxThreadCount << " MaxThreadCount: " << pool.MaxThreadCount << " GetThreadCount: " << pool.GetThreadCount() << " budgetInt: " << budgetInt << "\n"); } } @@ -811,18 +751,13 @@ void THarmonizer::CalculatePriorityOrder() { } void THarmonizer::Harmonize(ui64 ts) { - bool isDisabled = IsDisabled.load(std::memory_order_acquire); - ui64 nextHarmonizeTs = NextHarmonizeTs.load(std::memory_order_acquire); - if (isDisabled || nextHarmonizeTs > ts || !Lock.TryAcquire()) { - DEBUG_PRINT("TryToHarmonizeFailed: " << ts << " " << nextHarmonizeTs << " " << isDisabled << " " << false << "\n"); + if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) { LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false); return; } // Check again under the lock - isDisabled = IsDisabled.load(std::memory_order_acquire); - if (isDisabled) { - DEBUG_PRINT("TryToHarmonizeFailed: " << ts << " " << nextHarmonizeTs << " " << isDisabled << " " << true << "\n"); - LWPROBE(TryToHarmonizeFailed, ts, nextHarmonizeTs, isDisabled, true); + if (IsDisabled) { + LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, true); Lock.Release(); return; } @@ -834,7 +769,6 @@ void THarmonizer::Harmonize(ui64 ts) { TInternalActorTypeGuard activityGuard; if (PriorityOrder.empty()) { - DEBUG_PRINT("CalculatePriorityOrder\n"); CalculatePriorityOrder(); } @@ -863,7 +797,6 @@ void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) { poolInfo.DefaultFullThreadCount = pool->GetDefaultFullThreadCount(); poolInfo.MinFullThreadCount = pool->GetMinFullThreadCount(); poolInfo.MaxFullThreadCount = pool->GetMaxFullThreadCount(); - poolInfo.PotentialMaxThreadCount = poolInfo.MaxFullThreadCount; poolInfo.ThreadInfo.resize(poolInfo.MaxFullThreadCount); poolInfo.SharedInfo.resize(Shared ? Shared->GetSharedThreadCount() : 0); poolInfo.Priority = pool->GetPriority(); @@ -898,11 +831,11 @@ TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const { .DecreasingThreadsByStarvedState = static_cast(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)), .DecreasingThreadsByHoggishState = static_cast(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)), .DecreasingThreadsByExchange = static_cast(RelaxedLoad(&pool.DecreasingThreadsByExchange)), - .MaxElapsedCpu = pool.MaxElapsed.load(std::memory_order_relaxed), - .MinElapsedCpu = pool.MinElapsed.load(std::memory_order_relaxed), - .AvgElapsedCpu = pool.AvgElapsed.load(std::memory_order_relaxed), - .MaxCpu = pool.MaxCpu.load(std::memory_order_relaxed), - .MinCpu = pool.MinCpu.load(std::memory_order_relaxed), + .MaxConsumedCpu = pool.MaxConsumedCpu.load(std::memory_order_relaxed), + .MinConsumedCpu = pool.MinConsumedCpu.load(std::memory_order_relaxed), + .AvgConsumedCpu = pool.AvgConsumedCpu.load(std::memory_order_relaxed), + .MaxBookedCpu = pool.MaxBookedCpu.load(std::memory_order_relaxed), + .MinBookedCpu = pool.MinBookedCpu.load(std::memory_order_relaxed), .PotentialMaxThreadCount = static_cast(RelaxedLoad(&pool.PotentialMaxThreadCount)), .IsNeedy = static_cast(flags & 1), .IsStarved = static_cast(flags & 2), @@ -912,10 +845,10 @@ TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const { THarmonizerStats THarmonizer::GetStats() const { return THarmonizerStats{ - .MaxElapsedCpu = static_cast(RelaxedLoad(&MaxElapsedCpu)), - .MinElapsedCpu = static_cast(RelaxedLoad(&MinElapsedCpu)), - .MaxCpu = static_cast(RelaxedLoad(&MaxCpu)), - .MinCpu = static_cast(RelaxedLoad(&MinCpu)), + .MaxConsumedCpu = static_cast(RelaxedLoad(&MaxConsumedCpu)), + .MinConsumedCpu = static_cast(RelaxedLoad(&MinConsumedCpu)), + .MaxBookedCpu = static_cast(RelaxedLoad(&MaxBookedCpu)), + .MinBookedCpu = static_cast(RelaxedLoad(&MinBookedCpu)), .AvgAwakeningTimeUs = AvgAwakeningTimeUs, .AvgWakingUpTimeUs = AvgWakingUpTimeUs, }; @@ -925,32 +858,4 @@ void THarmonizer::SetSharedPool(TSharedExecutorPool* pool) { Shared = pool; } -TString TPoolHarmonizerStats::ToString() const { - return TStringBuilder() - << "IncreasingThreadsByNeedyState: " << IncreasingThreadsByNeedyState << "\n" - << "IncreasingThreadsByExchange: " << IncreasingThreadsByExchange << "\n" - << "DecreasingThreadsByStarvedState: " << DecreasingThreadsByStarvedState << "\n" - << "DecreasingThreadsByHoggishState: " << DecreasingThreadsByHoggishState << "\n" - << "DecreasingThreadsByExchange: " << DecreasingThreadsByExchange << "\n" - << "MaxElapsedCpu: " << MaxElapsedCpu << "\n" - << "MinElapsedCpu: " << MinElapsedCpu << "\n" - << "AvgElapsedCpu: " << AvgElapsedCpu << "\n" - << "MaxCpu: " << MaxCpu << "\n" - << "MinCpu: " << MinCpu << "\n" - << "PotentialMaxThreadCount: " << PotentialMaxThreadCount << "\n" - << "IsNeedy: " << IsNeedy << "\n" - << "IsStarved: " << IsStarved << "\n" - << "IsHoggish: " << IsHoggish << "\n"; -} - -TString THarmonizerStats::ToString() const { - return TStringBuilder() - << "MaxElapsedCpu: " << MaxElapsedCpu << "\n" - << "MinElapsedCpu: " << MinElapsedCpu << "\n" - << "MaxCpu: " << MaxCpu << "\n" - << "MinCpu: " << MinCpu << "\n" - << "AvgAwakeningTimeUs: " << AvgAwakeningTimeUs << "\n" - << "AvgWakingUpTimeUs: " << AvgWakingUpTimeUs << "\n"; } - -} // namespace NActors \ No newline at end of file diff --git a/ydb/library/actors/core/harmonizer.h b/ydb/library/actors/core/harmonizer.h index 37daf851d7e2..b4323ef8de13 100644 --- a/ydb/library/actors/core/harmonizer.h +++ b/ydb/library/actors/core/harmonizer.h @@ -17,29 +17,25 @@ namespace NActors { ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; ui64 DecreasingThreadsByExchange = 0; - float MaxElapsedCpu = 0.0; - float MinElapsedCpu = 0.0; - float AvgElapsedCpu = 0.0; - float MaxCpu = 0.0; - float MinCpu = 0.0; + float MaxConsumedCpu = 0.0; + float MinConsumedCpu = 0.0; + float AvgConsumedCpu = 0.0; + float MaxBookedCpu = 0.0; + float MinBookedCpu = 0.0; i16 PotentialMaxThreadCount = 0; bool IsNeedy = false; bool IsStarved = false; bool IsHoggish = false; - - TString ToString() const; }; struct THarmonizerStats { - i64 MaxElapsedCpu = 0.0; - i64 MinElapsedCpu = 0.0; - i64 MaxCpu = 0.0; - i64 MinCpu = 0.0; + i64 MaxConsumedCpu = 0.0; + i64 MinConsumedCpu = 0.0; + i64 MaxBookedCpu = 0.0; + i64 MinBookedCpu = 0.0; double AvgAwakeningTimeUs = 0; double AvgWakingUpTimeUs = 0; - - TString ToString() const; }; // Pool cpu harmonizer diff --git a/ydb/library/actors/core/harmonizer_ut.cpp b/ydb/library/actors/core/harmonizer_ut.cpp deleted file mode 100644 index 4149c1584afb..000000000000 --- a/ydb/library/actors/core/harmonizer_ut.cpp +++ /dev/null @@ -1,360 +0,0 @@ -#include "harmonizer.h" -#include -#include - -using namespace NActors; - - -/* - Сценарии тестов без полупотоков: - - IncreaseThreadsByNeedyState/DecreaseThreadsByHoggishState - - DecreaseThreadsByStarvedState - - IncreaseThreadsByExchange/DecreaseThreadsByExchange -*/ - -#define CHECK_CHANGING_THREADS(stats, inc_needy, inc_exchange, dec_hoggish, dec_starved, dec_exchange) \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).IncreasingThreadsByNeedyState, inc_needy, (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).IncreasingThreadsByExchange, inc_exchange, (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByHoggishState, dec_hoggish, (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByStarvedState, dec_starved, (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByExchange, dec_exchange, (stats).ToString()); -// end CHECK_CHANGING_THREADS - -#define CHECK_STATE(stats, state) \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).IsNeedy, TString(state) == "needy", (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).IsHoggish, TString(state) == "hoggish", (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).IsStarved, TString(state) == "starved", (stats).ToString()); -// end CHECK_STATE - - -Y_UNIT_TEST_SUITE(HarmonizerTests) { - - struct TMockExecutorPoolParams { - i16 DefaultFullThreadCount = 4; - i16 MinFullThreadCount = 4; - i16 MaxFullThreadCount = 8; - float DefaultThreadCount = 4.0f; - float MinThreadCount = 4.0f; - float MaxThreadCount = 8.0f; - i16 Priority = 0; - TString Name = "MockPool"; - ui32 PoolId = 0; - }; - - struct TCpuConsumptionModel { - TCpuConsumption value; - TCpuConsumptionModel() : value() {} - TCpuConsumptionModel(const TCpuConsumption& other) : value(other) {} - operator TCpuConsumption() const { - return value; - } - void Increase(const TCpuConsumption& other) { - value.ElapsedUs += other.ElapsedUs; - value.CpuUs += other.CpuUs; - value.NotEnoughCpuExecutions += other.NotEnoughCpuExecutions; - } - }; - - class TMockExecutorPool : public IExecutorPool { - public: - TMockExecutorPool(const TMockExecutorPoolParams& params = TMockExecutorPoolParams()) - : IExecutorPool(params.PoolId) - , Params(params) - , ThreadCount(params.DefaultFullThreadCount) - , ThreadCpuConsumptions(params.MaxFullThreadCount, TCpuConsumption{0.0, 0.0}) - {} - - TMockExecutorPoolParams Params; - i16 ThreadCount = 0; - std::vector ThreadCpuConsumptions; - - i16 GetDefaultFullThreadCount() const override { return Params.DefaultFullThreadCount; } - i16 GetMinFullThreadCount() const override { return Params.MinFullThreadCount; } - i16 GetMaxFullThreadCount() const override { return Params.MaxFullThreadCount; } - void SetFullThreadCount(i16 count) override { ThreadCount = count; } - i16 GetFullThreadCount() const override { return ThreadCount; } - float GetDefaultThreadCount() const override { return Params.DefaultThreadCount; } - float GetMinThreadCount() const override { return Params.MinThreadCount; } - float GetMaxThreadCount() const override { return Params.MaxThreadCount; } - i16 GetPriority() const override { return Params.Priority; } - TString GetName() const override { return Params.Name; } - - // Дополнительные методы из IExecutorPool - void Prepare(TActorSystem* /*actorSystem*/, NSchedulerQueue::TReader** /*scheduleReaders*/, ui32* /*scheduleSz*/) override {} - void Start() override {} - void PrepareStop() override {} - void Shutdown() override {} - bool Cleanup() override { return true; } - - ui32 GetReadyActivation(TWorkerContext& /*wctx*/, ui64 /*revolvingCounter*/) override { return 0; } - void ReclaimMailbox(TMailboxType::EType /*mailboxType*/, ui32 /*hint*/, TWorkerId /*workerId*/, ui64 /*revolvingCounter*/) override {} - TMailboxHeader* ResolveMailbox(ui32 /*hint*/) override { return nullptr; } - - void Schedule(TInstant /*deadline*/, TAutoPtr /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {} - void Schedule(TMonotonic /*deadline*/, TAutoPtr /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {} - void Schedule(TDuration /*delta*/, TAutoPtr /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {} - - bool Send(TAutoPtr& /*ev*/) override { return true; } - bool SpecificSend(TAutoPtr& /*ev*/) override { return true; } - void ScheduleActivation(ui32 /*activation*/) override {} - void SpecificScheduleActivation(ui32 /*activation*/) override {} - void ScheduleActivationEx(ui32 /*activation*/, ui64 /*revolvingCounter*/) override {} - TActorId Register(IActor* /*actor*/, TMailboxType::EType /*mailboxType*/, ui64 /*revolvingCounter*/, const TActorId& /*parentId*/) override { return TActorId(); } - TActorId Register(IActor* /*actor*/, TMailboxHeader* /*mailbox*/, ui32 /*hint*/, const TActorId& /*parentId*/) override { return TActorId(); } - - TAffinity* Affinity() const override { return nullptr; } - - ui32 GetThreads() const override { return static_cast(ThreadCount); } - float GetThreadCount() const override { return static_cast(ThreadCount); } - - void IncreaseThreadCpuConsumption(TCpuConsumption consumption, i16 start = 0, i16 count = -1) { - if (count == -1) { - count = Params.MaxFullThreadCount - start; - } - for (i16 i = start; i < start + count; ++i) { - ThreadCpuConsumptions[i].Increase(consumption); - } - } - - void SetThreadCpuConsumption(TCpuConsumption consumption, i16 start = 0, i16 count = -1) { - if (count == -1) { - count = Params.MaxFullThreadCount - start; - } - for (i16 i = start; i < start + count; ++i) { - ThreadCpuConsumptions[i] = consumption; - } - } - - TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) override { - UNIT_ASSERT_GE(threadIdx, 0); - UNIT_ASSERT_LE(static_cast(threadIdx), ThreadCpuConsumptions.size()); - return ThreadCpuConsumptions[threadIdx]; - } - }; - - Y_UNIT_TEST(TestHarmonizerCreation) { - ui64 currentTs = 1000000; - std::unique_ptr harmonizer(MakeHarmonizer(currentTs)); - UNIT_ASSERT(harmonizer != nullptr); - } - - Y_UNIT_TEST(TestAddPool) { - ui64 currentTs = 1000000; - std::unique_ptr harmonizer(MakeHarmonizer(currentTs)); - auto mockPool = std::make_unique(); - harmonizer->AddPool(mockPool.get()); - - auto stats = harmonizer->GetPoolStats(0); - UNIT_ASSERT_VALUES_EQUAL(stats.PotentialMaxThreadCount, 8); - UNIT_ASSERT_VALUES_EQUAL(stats.IncreasingThreadsByNeedyState, 0); - UNIT_ASSERT_VALUES_EQUAL(stats.DecreasingThreadsByStarvedState, 0); - } - - Y_UNIT_TEST(TestHarmonize) { - ui64 currentTs = 1000000; - auto harmonizer = MakeHarmonizer(currentTs); - auto mockPool = new TMockExecutorPool(); - harmonizer->AddPool(mockPool); - - harmonizer->Harmonize(currentTs + 1000000); // 1 second later - - auto stats = harmonizer->GetPoolStats(0); - Y_UNUSED(stats); - UNIT_ASSERT_VALUES_EQUAL(mockPool->ThreadCount, 4); // Should start with default - - delete harmonizer; - delete mockPool; - } - - Y_UNIT_TEST(TestToNeedyNextToHoggish) { - ui64 currentTs = 1000000; - auto harmonizer = MakeHarmonizer(currentTs); - TMockExecutorPoolParams params; - std::vector> mockPools; - mockPools.emplace_back(new TMockExecutorPool(params)); - params.PoolId = 1; - mockPools.emplace_back(new TMockExecutorPool(params)); - for (auto& pool : mockPools) { - harmonizer->AddPool(pool.get()); - pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount); - } - - TCpuConsumptionModel cpuConsumptionModel; - - currentTs += Us2Ts(1'000'000); - harmonizer->Harmonize(currentTs); - mockPools[0]->SetThreadCpuConsumption({59'000'000.0, 59'000'000.0}, 0, params.DefaultFullThreadCount); - - currentTs += Us2Ts(59'000'000); - harmonizer->Harmonize(currentTs); - - auto stats = harmonizer->GetPoolStats(0); - - CHECK_CHANGING_THREADS(stats, 1, 0, 0, 0, 0); - CHECK_STATE(stats, "needy"); - UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 5); - UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); - - currentTs += Us2Ts(60'000'000); - mockPools[0]->SetThreadCpuConsumption({0.0, 0.0}, 0, params.DefaultFullThreadCount); - harmonizer->Harmonize(currentTs); - - stats = harmonizer->GetPoolStats(0); - - CHECK_CHANGING_THREADS(stats, 1, 0, 1, 0, 0); - CHECK_STATE(stats, "hoggish"); - UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 4); - UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); - } - - Y_UNIT_TEST(TestToNeedyNextToStarved) { - ui64 currentTs = 1000000; - auto harmonizer = MakeHarmonizer(currentTs); - TMockExecutorPoolParams params; - std::vector> mockPools; - mockPools.emplace_back(new TMockExecutorPool(params)); - params.PoolId = 1; - mockPools.emplace_back(new TMockExecutorPool(params)); - for (auto& pool : mockPools) { - harmonizer->AddPool(pool.get()); - pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount); - } - - TCpuConsumptionModel cpuConsumptionModel; - - currentTs += Us2Ts(1'000'000); - harmonizer->Harmonize(currentTs); - mockPools[0]->IncreaseThreadCpuConsumption({59'000'000.0, 59'000'000.0}, 0, params.DefaultFullThreadCount); - - currentTs += Us2Ts(59'000'000); - harmonizer->Harmonize(currentTs); - - auto stats = harmonizer->GetPoolStats(0); - - CHECK_CHANGING_THREADS(stats, 1, 0, 0, 0, 0); - CHECK_STATE(stats, "needy"); - UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 5); - UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); - - currentTs += Us2Ts(60'000'000); - mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 43'000'000.0}, 0, 5); - mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 4); - harmonizer->Harmonize(currentTs); - - stats = harmonizer->GetPoolStats(0); - - CHECK_CHANGING_THREADS(stats, 1, 0, 0, 1, 0); - CHECK_STATE(stats, "starved"); - UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 4); - UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); - } - - Y_UNIT_TEST(TestExchangeThreads) { - ui64 currentTs = 1000000; - auto harmonizer = MakeHarmonizer(currentTs); - TMockExecutorPoolParams params { - .DefaultFullThreadCount = 1, - .MinFullThreadCount = 1, - .MaxFullThreadCount = 2, - .DefaultThreadCount = 1.0f, - .MinThreadCount = 1.0f, - .MaxThreadCount = 2.0f, - }; - std::vector> mockPools; - mockPools.emplace_back(new TMockExecutorPool(params)); - params.PoolId = 1; - mockPools.emplace_back(new TMockExecutorPool(params)); - params.PoolId = 2; - mockPools.emplace_back(new TMockExecutorPool(params)); - for (auto& pool : mockPools) { - harmonizer->AddPool(pool.get()); - pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount); - } - - currentTs += Us2Ts(1'000'000); - harmonizer->Harmonize(currentTs); - - currentTs += Us2Ts(60'000'000); - mockPools[0]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 1); - mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1); - mockPools[2]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1); - harmonizer->Harmonize(currentTs); - - auto stats0 = harmonizer->GetPoolStats(0); - auto stats1 = harmonizer->GetPoolStats(1); - auto stats2 = harmonizer->GetPoolStats(2); - - CHECK_CHANGING_THREADS(stats0, 0, 0, 0, 0, 0); - CHECK_CHANGING_THREADS(stats1, 1, 0, 0, 0, 0); - CHECK_CHANGING_THREADS(stats2, 0, 0, 0, 0, 0); - CHECK_STATE(stats0, "hoggish"); - CHECK_STATE(stats1, "needy"); - CHECK_STATE(stats2, "needy"); - UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 1); - UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 2); - UNIT_ASSERT_VALUES_EQUAL(mockPools[2]->ThreadCount, 1); - - currentTs += Us2Ts(60'000'000); - mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1); - mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2); - mockPools[2]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 1); - harmonizer->Harmonize(currentTs); - - stats0 = harmonizer->GetPoolStats(0); - stats1 = harmonizer->GetPoolStats(1); - stats2 = harmonizer->GetPoolStats(2); - - CHECK_CHANGING_THREADS(stats0, 0, 1, 0, 0, 0); - CHECK_CHANGING_THREADS(stats1, 1, 0, 0, 0, 1); - CHECK_CHANGING_THREADS(stats2, 0, 0, 0, 0, 0); - CHECK_STATE(stats0, "needy"); - CHECK_STATE(stats1, "needy"); - CHECK_STATE(stats2, "hoggish"); - UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 2); - UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 1); - UNIT_ASSERT_VALUES_EQUAL(mockPools[2]->ThreadCount, 1); - } - - Y_UNIT_TEST(TestEnableDisable) { - ui64 currentTs = 1000000; - auto harmonizer = MakeHarmonizer(currentTs); - auto mockPool = new TMockExecutorPool(); - harmonizer->AddPool(mockPool); - - harmonizer->Enable(false); // Disable harmonizer - harmonizer->Harmonize(currentTs + 1000000); - - auto stats = harmonizer->GetPoolStats(0); - Y_UNUSED(stats); - UNIT_ASSERT_VALUES_EQUAL(mockPool->ThreadCount, 4); // Should not change when disabled - - harmonizer->Enable(true); // Enable harmonizer - harmonizer->Harmonize(currentTs + 2000000); - - stats = harmonizer->GetPoolStats(0); - // Now it might change, but we can't predict exactly how without more complex mocking - - delete harmonizer; - delete mockPool; - } - - Y_UNIT_TEST(TestDeclareEmergency) { - ui64 currentTs = 1000000; - auto harmonizer = MakeHarmonizer(currentTs); - auto mockPool = new TMockExecutorPool(); - harmonizer->AddPool(mockPool); - - ui64 emergencyTs = currentTs + 500000; - harmonizer->DeclareEmergency(emergencyTs); - harmonizer->Harmonize(emergencyTs); - - // We can't easily test the internal state, but we can verify that Harmonize was called - // by checking if any stats have changed - auto stats = harmonizer->GetPoolStats(0); - Y_UNUSED(stats); - // Add appropriate assertions based on expected behavior during emergency - - delete harmonizer; - delete mockPool; - } -} diff --git a/ydb/library/actors/core/mon_stats.h b/ydb/library/actors/core/mon_stats.h index f058df6e77ad..4bceffa8f373 100644 --- a/ydb/library/actors/core/mon_stats.h +++ b/ydb/library/actors/core/mon_stats.h @@ -63,10 +63,10 @@ namespace NActors { ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; ui64 DecreasingThreadsByExchange = 0; - i64 MaxElapsedCpuUs = 0; - i64 MinElapsedCpuUs = 0; - i64 MaxCpuUs = 0; - i64 MinCpuUs = 0; + i64 MaxConsumedCpuUs = 0; + i64 MinConsumedCpuUs = 0; + i64 MaxBookedCpuUs = 0; + i64 MinBookedCpuUs = 0; double SpinningTimeUs = 0; double SpinThresholdUs = 0; i16 WrongWakenedThreadCount = 0; diff --git a/ydb/library/actors/core/probes.h b/ydb/library/actors/core/probes.h index bfe0c86e9510..f9394f3273f6 100644 --- a/ydb/library/actors/core/probes.h +++ b/ydb/library/actors/core/probes.h @@ -174,11 +174,11 @@ NAMES("poolId", "pool", "threacCount", "minThreadCount", "maxThreadCount", "defaultThreadCount")) \ PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \ TYPES(ui32, TString, double, double, double, double, ui32, ui32, bool, bool, bool), \ - NAMES("poolId", "pool", "cpu", "elapsed", "lastSecondCpu", "lastSecondElapsed", "threadCount", "maxThreadCount", \ + NAMES("poolId", "pool", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed", "threadCount", "maxThreadCount", \ "isStarved", "isNeedy", "isHoggish")) \ PROBE(HarmonizeCheckPoolByThread, GROUPS("Harmonizer"), \ TYPES(ui32, TString, i16, double, double, double, double), \ - NAMES("poolId", "pool", "threadIdx", "cpu", "elapsed", "lastSecondCpu", "lastSecondElapsed")) \ + NAMES("poolId", "pool", "threadIdx", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed")) \ PROBE(WakingUpConsumption, GROUPS("Harmonizer"), \ TYPES(double, double, double, double, double), \ NAMES("avgWakingUpUs", "realAvgWakingUpUs", "avgAwakeningUs", "realAvgAwakeningUs", "total")) \ diff --git a/ydb/library/actors/core/ut/ya.make b/ydb/library/actors/core/ut/ya.make index 88c75df35049..e1a225e52871 100644 --- a/ydb/library/actors/core/ut/ya.make +++ b/ydb/library/actors/core/ut/ya.make @@ -33,7 +33,6 @@ SRCS( event_pb_payload_ut.cpp event_pb_ut.cpp executor_pool_basic_ut.cpp - harmonizer_ut.cpp log_ut.cpp mon_ut.cpp scheduler_actor_ut.cpp diff --git a/ydb/library/actors/helpers/pool_stats_collector.h b/ydb/library/actors/helpers/pool_stats_collector.h index 32b6767fcc70..fb4c3d9286b8 100644 --- a/ydb/library/actors/helpers/pool_stats_collector.h +++ b/ydb/library/actors/helpers/pool_stats_collector.h @@ -189,10 +189,10 @@ class TStatsCollectingActor : public TActorBootstrapped { NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState; NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByExchange; NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions; - NMonitoring::TDynamicCounters::TCounterPtr MaxElapsedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MinElapsedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MaxCpu; - NMonitoring::TDynamicCounters::TCounterPtr MinCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu; NMonitoring::TDynamicCounters::TCounterPtr SpinningTimeUs; NMonitoring::TDynamicCounters::TCounterPtr SpinThresholdUs; @@ -264,10 +264,10 @@ class TStatsCollectingActor : public TActorBootstrapped { DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true); DecreasingThreadsByExchange = PoolGroup->GetCounter("DecreasingThreadsByExchange", true); NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true); - MaxElapsedCpu = PoolGroup->GetCounter("MaxElapsedCpuByPool", false); - MinElapsedCpu = PoolGroup->GetCounter("MinElapsedCpuByPool", false); - MaxCpu = PoolGroup->GetCounter("MaxCpuByPool", false); - MinCpu = PoolGroup->GetCounter("MinCpuByPool", false); + MaxConsumedCpu = PoolGroup->GetCounter("MaxConsumedCpuByPool", false); + MinConsumedCpu = PoolGroup->GetCounter("MinConsumedCpuByPool", false); + MaxBookedCpu = PoolGroup->GetCounter("MaxBookedCpuByPool", false); + MinBookedCpu = PoolGroup->GetCounter("MinBookedCpuByPool", false); SpinningTimeUs = PoolGroup->GetCounter("SpinningTimeUs", true); SpinThresholdUs = PoolGroup->GetCounter("SpinThresholdUs", false); @@ -348,11 +348,6 @@ class TStatsCollectingActor : public TActorBootstrapped { EventProcessingCountHistogram->Reset(); EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram); - *MaxElapsedCpu = poolStats.MaxElapsedCpuUs; - *MinElapsedCpu = poolStats.MinElapsedCpuUs; - *MaxCpu = poolStats.MaxCpuUs; - *MinCpu = poolStats.MinCpuUs; - double toMicrosec = 1000000 / NHPTimer::GetClockRate(); LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec); EventProcessingTimeHistogram->Reset(); @@ -388,10 +383,10 @@ class TStatsCollectingActor : public TActorBootstrapped { struct TActorSystemCounters { TIntrusivePtr Group; - NMonitoring::TDynamicCounters::TCounterPtr MaxElapsedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MinElapsedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MaxCpu; - NMonitoring::TDynamicCounters::TCounterPtr MinCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu; NMonitoring::TDynamicCounters::TCounterPtr AvgAwakeningTimeUs; NMonitoring::TDynamicCounters::TCounterPtr AvgWakingUpTimeUs; @@ -400,20 +395,20 @@ class TStatsCollectingActor : public TActorBootstrapped { void Init(NMonitoring::TDynamicCounters* group) { Group = group; - MaxElapsedCpu = Group->GetCounter("MaxElapsedCpu", false); - MinElapsedCpu = Group->GetCounter("MinElapsedCpu", false); - MaxCpu = Group->GetCounter("MaxCpu", false); - MinCpu = Group->GetCounter("MinCpu", false); + MaxConsumedCpu = Group->GetCounter("MaxConsumedCpu", false); + MinConsumedCpu = Group->GetCounter("MinConsumedCpu", false); + MaxBookedCpu = Group->GetCounter("MaxBookedCpu", false); + MinBookedCpu = Group->GetCounter("MinBookedCpu", false); AvgAwakeningTimeUs = Group->GetCounter("AvgAwakeningTimeUs", false); AvgWakingUpTimeUs = Group->GetCounter("AvgWakingUpTimeUs", false); } void Set(const THarmonizerStats& harmonizerStats) { #ifdef ACTORSLIB_COLLECT_EXEC_STATS - *MaxElapsedCpu = harmonizerStats.MaxElapsedCpu; - *MinElapsedCpu = harmonizerStats.MinElapsedCpu; - *MaxCpu = harmonizerStats.MaxCpu; - *MinCpu = harmonizerStats.MinCpu; + *MaxConsumedCpu = harmonizerStats.MaxConsumedCpu; + *MinConsumedCpu = harmonizerStats.MinConsumedCpu; + *MaxBookedCpu = harmonizerStats.MaxBookedCpu; + *MinBookedCpu = harmonizerStats.MinBookedCpu; *AvgAwakeningTimeUs = harmonizerStats.AvgAwakeningTimeUs; *AvgWakingUpTimeUs = harmonizerStats.AvgWakingUpTimeUs;