Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shared thread logic to harmonizer #1767

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/library/actors/core/cpu_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ namespace NActors {
}
}
Shared.reset(new TSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads));
// auto sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
auto sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());

ui64 ts = GetCycleCountFast();
Harmonizer.reset(MakeHarmonizer(ts));
Harmonizer->SetSharedPool(sharedPool);

Executors.Reset(new TAutoPtr<IExecutorPool>[ExecutorPoolCount]);

Expand Down
41 changes: 26 additions & 15 deletions ydb/library/actors/core/executor_pool_shared.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
namespace NActors {

TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads)
: ThreadByPool(poolCount, -1)
, PoolByThread(poolsWithThreads.size())
, BorrowedThreadByPool(poolCount, -1)
, PoolByBorrowedThread(poolsWithThreads.size(), -1)
: State(poolCount, poolsWithThreads.size())
, Pools(poolCount)
, PoolCount(poolCount)
, SharedThreadCount(poolsWithThreads.size())
Expand All @@ -27,8 +24,8 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config
{
for (ui32 poolIdx = 0, threadIdx = 0; poolIdx < poolsWithThreads.size(); ++poolIdx, ++threadIdx) {
Y_ABORT_UNLESS(poolsWithThreads[poolIdx] < poolCount);
ThreadByPool[poolsWithThreads[poolIdx]] = threadIdx;
PoolByThread[threadIdx] = poolsWithThreads[poolIdx];
State.ThreadByPool[poolsWithThreads[poolIdx]] = threadIdx;
State.PoolByThread[threadIdx] = poolsWithThreads[poolIdx];
}
}

Expand All @@ -42,7 +39,7 @@ void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TR
std::vector<IExecutorPool*> poolByThread(SharedThreadCount);
for (IExecutorPool* pool : poolsBasic) {
Pools[pool->PoolId] = dynamic_cast<TBasicExecutorPool*>(pool);
i16 threadIdx = ThreadByPool[pool->PoolId];
i16 threadIdx = State.ThreadByPool[pool->PoolId];
if (threadIdx >= 0) {
poolByThread[threadIdx] = pool;
}
Expand Down Expand Up @@ -95,29 +92,39 @@ bool TSharedExecutorPool::Cleanup() {
}

TSharedExecutorThreadCtx* TSharedExecutorPool::GetSharedThread(i16 pool) {
i16 threadIdx = ThreadByPool[pool];
i16 threadIdx = State.ThreadByPool[pool];
if (threadIdx < 0 || threadIdx >= PoolCount) {
return nullptr;
}
return &Threads[threadIdx];
}

void TSharedExecutorPool::ReturnHalfThread(i16 pool) {
i16 threadIdx = ThreadByPool[pool];
void TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) {
i16 threadIdx = State.ThreadByPool[pool];
TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
Y_ABORT_UNLESS(borrowingPool);
BorrowedThreadByPool[PoolByBorrowedThread[threadIdx]] = -1;
PoolByBorrowedThread[threadIdx] = -1;
State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1;
State.PoolByBorrowedThread[threadIdx] = -1;
// TODO(kruall): Check on race
borrowingPool->ReleaseSharedThread();
}

void TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) {
i16 threadIdx = State.BorrowedThreadByPool[pool];
TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
Y_ABORT_UNLESS(borrowingPool);
State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1;
State.PoolByBorrowedThread[threadIdx] = -1;
// TODO(kruall): Check on race
borrowingPool->ReleaseSharedThread();
}

void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
i16 threadIdx = ThreadByPool[from];
i16 threadIdx = State.ThreadByPool[from];
TBasicExecutorPool* borrowingPool = Pools[to];
Threads[threadIdx].ExecutorPools[1].store(borrowingPool, std::memory_order_release);
BorrowedThreadByPool[to] = threadIdx;
PoolByBorrowedThread[threadIdx] = to;
State.BorrowedThreadByPool[to] = threadIdx;
State.PoolByBorrowedThread[threadIdx] = to;
// TODO(kruall): Check on race
borrowingPool->AddSharedThread(&Threads[threadIdx]);
}
Expand Down Expand Up @@ -150,4 +157,8 @@ i16 TSharedExecutorPool::GetSharedThreadCount() const {
return SharedThreadCount;
}

TSharedPoolState TSharedExecutorPool::GetState() const {
return State;
}

}
25 changes: 19 additions & 6 deletions ydb/library/actors/core/executor_pool_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ namespace NActors {
struct TExecutorThreadCtx;
class TBasicExecutorPool;

struct TSharedPoolState {
std::vector<i16> ThreadByPool;
std::vector<i16> PoolByThread;
std::vector<i16> BorrowedThreadByPool;
std::vector<i16> PoolByBorrowedThread;

TSharedPoolState(i16 poolCount, i16 threadCount)
: ThreadByPool(poolCount, -1)
, PoolByThread(threadCount)
, BorrowedThreadByPool(poolCount, -1)
, PoolByBorrowedThread(threadCount, -1)
{}
};

class TSharedExecutorPool: public IActorThreadPool {
public:
TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads);
Expand All @@ -26,18 +40,17 @@ namespace NActors {
TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx);
std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId);

void ReturnHalfThread(i16 pool);
void ReturnOwnHalfThread(i16 pool);
void ReturnBorrowedHalfThread(i16 pool);
void GiveHalfThread(i16 from, i16 to);

i16 GetSharedThreadCount() const;

TSharedPoolState GetState() const;

private:
std::vector<i16> ThreadByPool;
std::vector<i16> PoolByThread;
std::vector<i16> BorrowedThreadByPool;
std::vector<i16> PoolByBorrowedThread;

TSharedPoolState State;

std::vector<TBasicExecutorPool*> Pools;

i16 PoolCount;
Expand Down
41 changes: 39 additions & 2 deletions ydb/library/actors/core/harmonizer.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#include "executor_thread_ctx.h"
#include "probes.h"
#include "harmonizer.h"

#include "probes.h"
#include "actorsystem.h"
#include "executor_pool_basic.h"
#include "executor_pool_basic_feature_flags.h"
#include "executor_pool_shared.h"

#include <ydb/library/actors/util/cpu_load_log.h>
#include <ydb/library/actors/util/datetime.h>
Expand Down Expand Up @@ -301,6 +303,8 @@ class THarmonizer: public IHarmonizer {
TAtomic MaxBookedCpu = 0;
TAtomic MinBookedCpu = 0;

TSharedExecutorPool* Shared = nullptr;

std::atomic<double> AvgAwakeningTimeUs = 0;
std::atomic<double> AvgWakingUpTimeUs = 0;

Expand All @@ -317,6 +321,7 @@ class THarmonizer: public IHarmonizer {
void Enable(bool enable) override;
TPoolHarmonizerStats GetPoolStats(i16 poolId) const override;
THarmonizerStats GetStats() const override;
void SetSharedPool(TSharedExecutorPool* pool) override;
};

THarmonizer::THarmonizer(ui64 ts) {
Expand Down Expand Up @@ -417,6 +422,26 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
pool.BasicPool->ClearWaitingStats();
}

std::vector<bool> hasSharedThread(Pools.size());
std::vector<bool> hasSharedThreadWhichWasNotBorrowed(Pools.size());
std::vector<bool> hasBorrowedSharedThread(Pools.size());
if (Shared) {
auto sharedState = Shared->GetState();
for (ui32 poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
i16 threadIdx = sharedState.ThreadByPool[poolIdx];
if (threadIdx != -1) {
hasSharedThread[poolIdx] = true;
if (sharedState.PoolByBorrowedThread[threadIdx] == -1) {
hasSharedThreadWhichWasNotBorrowed[poolIdx] = true;
}

}
if (sharedState.BorrowedThreadByPool[poolIdx] != -1) {
hasBorrowedSharedThread[poolIdx] = true;
}
}
}

for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
TPoolInfo& pool = Pools[poolIdx];
total += pool.DefaultThreadCount;
Expand All @@ -440,7 +465,8 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
isStarvedPresent = true;
}

bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && poolBooked >= currentThreadCount;
ui32 sharedHalfThreadCount = hasSharedThread[poolIdx] + hasSharedThreadWhichWasNotBorrowed[poolIdx] + hasBorrowedSharedThread[poolIdx];
bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (2 * poolBooked >= 2 * currentThreadCount + sharedHalfThreadCount);
if (pool.AvgPingCounter) {
if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) {
isNeedy = false;
Expand All @@ -462,6 +488,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
AtomicSet(pool.LastFlags, (i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2));
LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish);
}

double budget = total - Max(booked, lastSecondBooked);
i16 budgetInt = static_cast<i16>(Max(budget, 0.0));
if (budget < -0.1) {
Expand Down Expand Up @@ -491,6 +518,9 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
for (ui16 poolIdx : PriorityOrder) {
TPoolInfo &pool = Pools[poolIdx];
i64 threadCount = pool.GetThreadCount();
if (hasSharedThread[poolIdx] && !hasSharedThreadWhichWasNotBorrowed[poolIdx]) {
Shared->ReturnOwnHalfThread(poolIdx);
}
while (threadCount > pool.DefaultThreadCount) {
pool.SetThreadCount(--threadCount);
AtomicIncrement(pool.DecreasingThreadsByStarvedState);
Expand Down Expand Up @@ -577,6 +607,9 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
for (size_t hoggishPoolIdx : hoggishPools) {
TPoolInfo &pool = Pools[hoggishPoolIdx];
i64 threadCount = pool.GetThreadCount();
if (hasBorrowedSharedThread[hoggishPoolIdx]) {
Shared->ReturnBorrowedHalfThread(hoggishPoolIdx);
}
if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) {
pool.LocalQueueSize = std::min<ui16>(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2);
pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize);
Expand Down Expand Up @@ -698,4 +731,8 @@ THarmonizerStats THarmonizer::GetStats() const {
};
}

void THarmonizer::SetSharedPool(TSharedExecutorPool* pool) {
Shared = pool;
}

}
3 changes: 3 additions & 0 deletions ydb/library/actors/core/harmonizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

#include "defs.h"
#include "config.h"
#include "executor_pool_shared.h"

namespace NActors {
class IExecutorPool;
class TSharedExecutorPool;

template <typename T>
struct TWaitingStats;
Expand Down Expand Up @@ -45,6 +47,7 @@ namespace NActors {
virtual void Enable(bool enable) = 0;
virtual TPoolHarmonizerStats GetPoolStats(i16 poolId) const = 0;
virtual THarmonizerStats GetStats() const = 0;
virtual void SetSharedPool(TSharedExecutorPool* pool) = 0;
};

IHarmonizer* MakeHarmonizer(ui64 ts);
Expand Down
Loading