diff --git a/ydb/library/actors/core/cpu_manager.cpp b/ydb/library/actors/core/cpu_manager.cpp index 039625fd29cf..a6e75433895b 100644 --- a/ydb/library/actors/core/cpu_manager.cpp +++ b/ydb/library/actors/core/cpu_manager.cpp @@ -29,10 +29,11 @@ namespace NActors { } } Shared.reset(new TSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads)); - // auto sharedPool = static_cast(Shared.get()); + auto sharedPool = static_cast(Shared.get()); ui64 ts = GetCycleCountFast(); Harmonizer.reset(MakeHarmonizer(ts)); + Harmonizer->SetSharedPool(sharedPool); Executors.Reset(new TAutoPtr[ExecutorPoolCount]); diff --git a/ydb/library/actors/core/executor_pool_shared.cpp b/ydb/library/actors/core/executor_pool_shared.cpp index f9b769d40494..44e9d1c48f71 100644 --- a/ydb/library/actors/core/executor_pool_shared.cpp +++ b/ydb/library/actors/core/executor_pool_shared.cpp @@ -12,10 +12,7 @@ namespace NActors { TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector poolsWithThreads) - : ThreadByPool(poolCount, -1) - , PoolByThread(poolsWithThreads.size()) - , BorrowedThreadByPool(poolCount, -1) - , PoolByBorrowedThread(poolsWithThreads.size(), -1) + : State(poolCount, poolsWithThreads.size()) , Pools(poolCount) , PoolCount(poolCount) , SharedThreadCount(poolsWithThreads.size()) @@ -27,8 +24,8 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config { for (ui32 poolIdx = 0, threadIdx = 0; poolIdx < poolsWithThreads.size(); ++poolIdx, ++threadIdx) { Y_ABORT_UNLESS(poolsWithThreads[poolIdx] < poolCount); - ThreadByPool[poolsWithThreads[poolIdx]] = threadIdx; - PoolByThread[threadIdx] = poolsWithThreads[poolIdx]; + State.ThreadByPool[poolsWithThreads[poolIdx]] = threadIdx; + State.PoolByThread[threadIdx] = poolsWithThreads[poolIdx]; } } @@ -42,7 +39,7 @@ void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TR std::vector poolByThread(SharedThreadCount); for (IExecutorPool* pool : poolsBasic) { Pools[pool->PoolId] = dynamic_cast(pool); - i16 threadIdx = ThreadByPool[pool->PoolId]; + i16 threadIdx = State.ThreadByPool[pool->PoolId]; if (threadIdx >= 0) { poolByThread[threadIdx] = pool; } @@ -95,29 +92,39 @@ bool TSharedExecutorPool::Cleanup() { } TSharedExecutorThreadCtx* TSharedExecutorPool::GetSharedThread(i16 pool) { - i16 threadIdx = ThreadByPool[pool]; + i16 threadIdx = State.ThreadByPool[pool]; if (threadIdx < 0 || threadIdx >= PoolCount) { return nullptr; } return &Threads[threadIdx]; } -void TSharedExecutorPool::ReturnHalfThread(i16 pool) { - i16 threadIdx = ThreadByPool[pool]; +void TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) { + i16 threadIdx = State.ThreadByPool[pool]; TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel); Y_ABORT_UNLESS(borrowingPool); - BorrowedThreadByPool[PoolByBorrowedThread[threadIdx]] = -1; - PoolByBorrowedThread[threadIdx] = -1; + State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1; + State.PoolByBorrowedThread[threadIdx] = -1; + // TODO(kruall): Check on race + borrowingPool->ReleaseSharedThread(); +} + +void TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) { + i16 threadIdx = State.BorrowedThreadByPool[pool]; + TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel); + Y_ABORT_UNLESS(borrowingPool); + State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1; + State.PoolByBorrowedThread[threadIdx] = -1; // TODO(kruall): Check on race borrowingPool->ReleaseSharedThread(); } void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) { - i16 threadIdx = ThreadByPool[from]; + i16 threadIdx = State.ThreadByPool[from]; TBasicExecutorPool* borrowingPool = Pools[to]; Threads[threadIdx].ExecutorPools[1].store(borrowingPool, std::memory_order_release); - BorrowedThreadByPool[to] = threadIdx; - PoolByBorrowedThread[threadIdx] = to; + State.BorrowedThreadByPool[to] = threadIdx; + State.PoolByBorrowedThread[threadIdx] = to; // TODO(kruall): Check on race borrowingPool->AddSharedThread(&Threads[threadIdx]); } @@ -150,4 +157,8 @@ i16 TSharedExecutorPool::GetSharedThreadCount() const { return SharedThreadCount; } +TSharedPoolState TSharedExecutorPool::GetState() const { + return State; +} + } \ No newline at end of file diff --git a/ydb/library/actors/core/executor_pool_shared.h b/ydb/library/actors/core/executor_pool_shared.h index 2f0eb6097323..3d7b07632663 100644 --- a/ydb/library/actors/core/executor_pool_shared.h +++ b/ydb/library/actors/core/executor_pool_shared.h @@ -10,6 +10,20 @@ namespace NActors { struct TExecutorThreadCtx; class TBasicExecutorPool; + struct TSharedPoolState { + std::vector ThreadByPool; + std::vector PoolByThread; + std::vector BorrowedThreadByPool; + std::vector PoolByBorrowedThread; + + TSharedPoolState(i16 poolCount, i16 threadCount) + : ThreadByPool(poolCount, -1) + , PoolByThread(threadCount) + , BorrowedThreadByPool(poolCount, -1) + , PoolByBorrowedThread(threadCount, -1) + {} + }; + class TSharedExecutorPool: public IActorThreadPool { public: TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector poolsWithThreads); @@ -26,18 +40,17 @@ namespace NActors { TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx); std::vector GetThreadsCpuConsumption(i16 poolId); - void ReturnHalfThread(i16 pool); + void ReturnOwnHalfThread(i16 pool); + void ReturnBorrowedHalfThread(i16 pool); void GiveHalfThread(i16 from, i16 to); i16 GetSharedThreadCount() const; + TSharedPoolState GetState() const; private: - std::vector ThreadByPool; - std::vector PoolByThread; - std::vector BorrowedThreadByPool; - std::vector PoolByBorrowedThread; - + TSharedPoolState State; + std::vector Pools; i16 PoolCount; diff --git a/ydb/library/actors/core/harmonizer.cpp b/ydb/library/actors/core/harmonizer.cpp index 733606a7b528..282b2d92c983 100644 --- a/ydb/library/actors/core/harmonizer.cpp +++ b/ydb/library/actors/core/harmonizer.cpp @@ -1,9 +1,11 @@ +#include "executor_thread_ctx.h" +#include "probes.h" #include "harmonizer.h" -#include "probes.h" #include "actorsystem.h" #include "executor_pool_basic.h" #include "executor_pool_basic_feature_flags.h" +#include "executor_pool_shared.h" #include #include @@ -301,6 +303,8 @@ class THarmonizer: public IHarmonizer { TAtomic MaxBookedCpu = 0; TAtomic MinBookedCpu = 0; + TSharedExecutorPool* Shared = nullptr; + std::atomic AvgAwakeningTimeUs = 0; std::atomic AvgWakingUpTimeUs = 0; @@ -317,6 +321,7 @@ class THarmonizer: public IHarmonizer { void Enable(bool enable) override; TPoolHarmonizerStats GetPoolStats(i16 poolId) const override; THarmonizerStats GetStats() const override; + void SetSharedPool(TSharedExecutorPool* pool) override; }; THarmonizer::THarmonizer(ui64 ts) { @@ -417,6 +422,26 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { pool.BasicPool->ClearWaitingStats(); } + std::vector hasSharedThread(Pools.size()); + std::vector hasSharedThreadWhichWasNotBorrowed(Pools.size()); + std::vector hasBorrowedSharedThread(Pools.size()); + if (Shared) { + auto sharedState = Shared->GetState(); + for (ui32 poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + i16 threadIdx = sharedState.ThreadByPool[poolIdx]; + if (threadIdx != -1) { + hasSharedThread[poolIdx] = true; + if (sharedState.PoolByBorrowedThread[threadIdx] == -1) { + hasSharedThreadWhichWasNotBorrowed[poolIdx] = true; + } + + } + if (sharedState.BorrowedThreadByPool[poolIdx] != -1) { + hasBorrowedSharedThread[poolIdx] = true; + } + } + } + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { TPoolInfo& pool = Pools[poolIdx]; total += pool.DefaultThreadCount; @@ -440,7 +465,8 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { isStarvedPresent = true; } - bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && poolBooked >= currentThreadCount; + ui32 sharedHalfThreadCount = hasSharedThread[poolIdx] + hasSharedThreadWhichWasNotBorrowed[poolIdx] + hasBorrowedSharedThread[poolIdx]; + bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (2 * poolBooked >= 2 * currentThreadCount + sharedHalfThreadCount); if (pool.AvgPingCounter) { if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) { isNeedy = false; @@ -462,6 +488,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { AtomicSet(pool.LastFlags, (i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2)); LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish); } + double budget = total - Max(booked, lastSecondBooked); i16 budgetInt = static_cast(Max(budget, 0.0)); if (budget < -0.1) { @@ -491,6 +518,9 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { for (ui16 poolIdx : PriorityOrder) { TPoolInfo &pool = Pools[poolIdx]; i64 threadCount = pool.GetThreadCount(); + if (hasSharedThread[poolIdx] && !hasSharedThreadWhichWasNotBorrowed[poolIdx]) { + Shared->ReturnOwnHalfThread(poolIdx); + } while (threadCount > pool.DefaultThreadCount) { pool.SetThreadCount(--threadCount); AtomicIncrement(pool.DecreasingThreadsByStarvedState); @@ -577,6 +607,9 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { for (size_t hoggishPoolIdx : hoggishPools) { TPoolInfo &pool = Pools[hoggishPoolIdx]; i64 threadCount = pool.GetThreadCount(); + if (hasBorrowedSharedThread[hoggishPoolIdx]) { + Shared->ReturnBorrowedHalfThread(hoggishPoolIdx); + } if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) { pool.LocalQueueSize = std::min(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2); pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize); @@ -698,4 +731,8 @@ THarmonizerStats THarmonizer::GetStats() const { }; } +void THarmonizer::SetSharedPool(TSharedExecutorPool* pool) { + Shared = pool; +} + } diff --git a/ydb/library/actors/core/harmonizer.h b/ydb/library/actors/core/harmonizer.h index ba98048e493e..19f79de2806a 100644 --- a/ydb/library/actors/core/harmonizer.h +++ b/ydb/library/actors/core/harmonizer.h @@ -2,9 +2,11 @@ #include "defs.h" #include "config.h" +#include "executor_pool_shared.h" namespace NActors { class IExecutorPool; + class TSharedExecutorPool; template struct TWaitingStats; @@ -45,6 +47,7 @@ namespace NActors { virtual void Enable(bool enable) = 0; virtual TPoolHarmonizerStats GetPoolStats(i16 poolId) const = 0; virtual THarmonizerStats GetStats() const = 0; + virtual void SetSharedPool(TSharedExecutorPool* pool) = 0; }; IHarmonizer* MakeHarmonizer(ui64 ts);