Skip to content

Commit

Permalink
Refactor harmonizer in actorsystem (#12084)
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall authored Nov 28, 2024
1 parent a02f6d5 commit 97c9f43
Show file tree
Hide file tree
Showing 32 changed files with 2,234 additions and 1,003 deletions.
10 changes: 4 additions & 6 deletions ydb/library/actors/core/cpu_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ namespace NActors {
poolsWithSharedThreads.push_back(cfg.PoolId);
}
}
Shared.reset(new TSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads));
auto sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
Shared.reset(CreateSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads));
auto sharedPool = static_cast<ISharedExecutorPool*>(Shared.get());

ui64 ts = GetCycleCountFast();
Harmonizer.reset(MakeHarmonizer(ts));
Expand Down Expand Up @@ -135,11 +135,9 @@ namespace NActors {
for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
if (cfg.PoolId == poolId) {
if (cfg.HasSharedThread) {
auto *sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
auto *sharedPool = Shared.get();
auto *pool = new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get());
if (pool) {
pool->AddSharedThread(sharedPool->GetSharedThread(poolId));
}
pool->AddSharedThread(sharedPool->GetSharedThread(poolId));
return pool;
} else {
return new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get());
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/actors/core/cpu_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

#include "config.h"
#include "executor_pool_jail.h"
#include "harmonizer.h"
#include "executor_pool.h"
#include "executor_pool_shared.h"
#include "mon_stats.h"
#include <ydb/library/actors/core/harmonizer/harmonizer.h>
#include <memory>

namespace NActors {
Expand All @@ -16,7 +16,7 @@ namespace NActors {
const ui32 ExecutorPoolCount;
TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
std::unique_ptr<IHarmonizer> Harmonizer;
std::unique_ptr<TSharedExecutorPool> Shared;
std::unique_ptr<ISharedExecutorPool> Shared;
std::unique_ptr<TExecutorPoolJail> Jail;
TCpuManagerConfig Config;

Expand Down
19 changes: 15 additions & 4 deletions ydb/library/actors/core/executor_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ namespace NActors {
struct TExecutorThreadStats;
class TExecutorPoolJail;
class ISchedulerCookie;
struct TSharedExecutorThreadCtx;

struct TCpuConsumption {
double ConsumedUs = 0;
double BookedUs = 0;
double CpuUs = 0;
double ElapsedUs = 0;
ui64 NotEnoughCpuExecutions = 0;

void Add(const TCpuConsumption& other) {
ConsumedUs += other.ConsumedUs;
BookedUs += other.BookedUs;
CpuUs += other.CpuUs;
ElapsedUs += other.ElapsedUs;
NotEnoughCpuExecutions += other.NotEnoughCpuExecutions;
}
};
Expand Down Expand Up @@ -176,6 +177,16 @@ namespace NActors {
return 1;
}

virtual TSharedExecutorThreadCtx* ReleaseSharedThread() {
return nullptr;
}
virtual void AddSharedThread(TSharedExecutorThreadCtx*) {
}

virtual bool IsSharedThreadEnabled() const {
return false;
}

virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) {
Y_UNUSED(threadIdx);
return TCpuConsumption{0.0, 0.0};
Expand Down
1 change: 1 addition & 0 deletions ydb/library/actors/core/executor_pool_base.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "actorsystem.h"
#include "activity_guard.h"
#include "actor.h"
#include "executor_pool_base.h"
#include "executor_pool_basic_feature_flags.h"
Expand Down
12 changes: 6 additions & 6 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,10 @@ namespace NActors {
poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState;
poolStats.DecreasingThreadsByExchange = stats.DecreasingThreadsByExchange;
poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount;
poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu;
poolStats.MinConsumedCpuUs = stats.MinConsumedCpu;
poolStats.MaxBookedCpuUs = stats.MaxBookedCpu;
poolStats.MinBookedCpuUs = stats.MinBookedCpu;
poolStats.MaxCpuUs = stats.MaxCpuUs;
poolStats.MinCpuUs = stats.MinCpuUs;
poolStats.MaxElapsedUs = stats.MaxElapsedUs;
poolStats.MinElapsedUs = stats.MinElapsedUs;
}

statsCopy.resize(MaxFullThreadCount + 1);
Expand All @@ -429,7 +429,7 @@ namespace NActors {
void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
if (Harmonizer) {
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
poolState.UsedCpu = stats.AvgConsumedCpu;
poolState.UsedCpu = stats.AvgElapsedUs;
poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount;
} else {
poolState.PossibleMaxLimit = poolState.MaxLimit;
Expand Down Expand Up @@ -625,7 +625,7 @@ namespace NActors {
TExecutorThreadCtx& threadCtx = Threads[threadIdx];
TExecutorThreadStats stats;
threadCtx.Thread->GetCurrentStatsForHarmonizer(stats);
return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions};
return {static_cast<double>(stats.CpuUs), Ts2Us(stats.SafeElapsedTicks), stats.NotEnoughCpuExecutions};
}

i16 TBasicExecutorPool::GetBlockingThreadCount() const {
Expand Down
9 changes: 6 additions & 3 deletions ydb/library/actors/core/executor_pool_basic.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
#include "executor_pool_basic_feature_flags.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
#include "harmonizer.h"
#include <memory>
#include <ydb/library/actors/core/harmonizer/harmonizer.h>
#include <ydb/library/actors/actor_type/indexes.h>
#include <ydb/library/actors/util/unordered_cache.h>
#include <ydb/library/actors/util/threadparkpad.h>
Expand Down Expand Up @@ -278,8 +278,11 @@ namespace NActors {
void CalcSpinPerThread(ui64 wakingUpConsumption);
void ClearWaitingStats() const;

TSharedExecutorThreadCtx* ReleaseSharedThread();
void AddSharedThread(TSharedExecutorThreadCtx* thread);
TSharedExecutorThreadCtx* ReleaseSharedThread() override;
void AddSharedThread(TSharedExecutorThreadCtx* thread) override;
bool IsSharedThreadEnabled() const override {
return true;
}

private:
void AskToGoToSleep(bool *needToWait, bool *needToBlock);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/core/executor_pool_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ namespace NActors {
void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
if (Harmonizer) {
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
poolState.UsedCpu = stats.AvgConsumedCpu;
poolState.UsedCpu = stats.AvgElapsedUs;
}
poolState.CurrentLimit = PoolThreads;
poolState.MaxLimit = PoolThreads;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/core/executor_pool_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
#include "actorsystem.h"
#include "executor_thread.h"
#include "executor_thread_ctx.h"
#include "harmonizer.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
#include <ydb/library/actors/core/harmonizer/harmonizer.h>
#include <ydb/library/actors/actor_type/indexes.h>
#include <ydb/library/actors/util/ticket_lock.h>
#include <ydb/library/actors/util/unordered_cache.h>
Expand Down
158 changes: 121 additions & 37 deletions ydb/library/actors/core/executor_pool_shared.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,50 @@

namespace NActors {

class TSharedExecutorPool: public ISharedExecutorPool {
public:
TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads);

// IThreadPool
void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
void Start() override;
void PrepareStop() override;
void Shutdown() override;
bool Cleanup() override;

TSharedExecutorThreadCtx *GetSharedThread(i16 poolId) override;
void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override;
void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override;
TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) override;
std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId) override;

i16 ReturnOwnHalfThread(i16 pool) override;
i16 ReturnBorrowedHalfThread(i16 pool) override;
void GiveHalfThread(i16 from, i16 to) override;

i16 GetSharedThreadCount() const override;

TSharedPoolState GetState() const override;

void Init(const std::vector<IExecutorPool*>& pools, bool withThreads) override;

private:
TSharedPoolState State;

std::vector<IExecutorPool*> Pools;

i16 PoolCount;
i16 SharedThreadCount;
std::unique_ptr<TSharedExecutorThreadCtx[]> Threads;

std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders;
std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters;

TDuration TimePerMailbox;
ui32 EventsPerMailbox;
ui64 SoftProcessingDurationTs;
}; // class TSharedExecutorPool

TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads)
: State(poolCount, poolsWithThreads.size())
, Pools(poolCount)
Expand All @@ -29,40 +73,47 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config
}
}

void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
// ActorSystem = actorSystem;

ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]);
ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]);

std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools();
std::vector<IExecutorPool*> poolByThread(SharedThreadCount);
for (IExecutorPool* pool : poolsBasic) {
Pools[pool->PoolId] = dynamic_cast<TBasicExecutorPool*>(pool);
i16 threadIdx = State.ThreadByPool[pool->PoolId];
if (threadIdx >= 0) {
poolByThread[threadIdx] = pool;
}
void TSharedExecutorPool::Init(const std::vector<IExecutorPool*>& pools, bool withThreads) {
std::vector<IExecutorPool*> poolByThread(SharedThreadCount);
for (IExecutorPool* pool : pools) {
Pools[pool->PoolId] = pool;
i16 threadIdx = State.ThreadByPool[pool->PoolId];
if (threadIdx >= 0) {
poolByThread[threadIdx] = pool;
}
}

for (i16 i = 0; i != SharedThreadCount; ++i) {
// !TODO
Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
for (i16 i = 0; i != SharedThreadCount; ++i) {
// !TODO
Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
if (withThreads) {
Threads[i].Thread.reset(
new TSharedExecutorThread(
-1,
actorSystem,
&Threads[i],
PoolCount,
"SharedThread",
SoftProcessingDurationTs,
TimePerMailbox,
nullptr,
&Threads[i],
PoolCount,
"SharedThread",
SoftProcessingDurationTs,
TimePerMailbox,
EventsPerMailbox));
ScheduleWriters[i].Init(ScheduleReaders[i]);
}
}
}

void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]);
ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]);

std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools();
Init(poolsBasic, true);

*scheduleReaders = ScheduleReaders.get();
*scheduleSz = SharedThreadCount;
for (i16 i = 0; i != SharedThreadCount; ++i) {
ScheduleWriters[i].Init(ScheduleReaders[i]);
}

*scheduleReaders = ScheduleReaders.get();
*scheduleSz = SharedThreadCount;
}

void TSharedExecutorPool::Start() {
Expand Down Expand Up @@ -99,24 +150,27 @@ TSharedExecutorThreadCtx* TSharedExecutorPool::GetSharedThread(i16 pool) {
return &Threads[threadIdx];
}

void TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) {
i16 TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) {
i16 threadIdx = State.ThreadByPool[pool];
TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
Y_ABORT_UNLESS(borrowingPool);
State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1;
i16 borrowedPool = State.PoolByBorrowedThread[threadIdx];
State.BorrowedThreadByPool[borrowedPool] = -1;
State.PoolByBorrowedThread[threadIdx] = -1;
// TODO(kruall): Check on race
borrowingPool->ReleaseSharedThread();
return borrowedPool;
}

void TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) {
i16 TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) {
i16 threadIdx = State.BorrowedThreadByPool[pool];
TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
Y_ABORT_UNLESS(borrowingPool);
State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1;
State.PoolByBorrowedThread[threadIdx] = -1;
// TODO(kruall): Check on race
borrowingPool->ReleaseSharedThread();
return State.PoolByThread[threadIdx];
}

void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
Expand All @@ -127,14 +181,14 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
if (borrowedThreadIdx != -1) {
i16 originalPool = State.PoolByThread[borrowedThreadIdx];
if (originalPool == to) {
return ReturnOwnHalfThread(to);
ReturnOwnHalfThread(to);
} else {
ReturnOwnHalfThread(originalPool);
}
from = originalPool;
}
i16 threadIdx = State.ThreadByPool[from];
TBasicExecutorPool* borrowingPool = Pools[to];
IExecutorPool* borrowingPool = Pools[to];
Threads[threadIdx].ExecutorPools[1].store(borrowingPool, std::memory_order_release);
State.BorrowedThreadByPool[to] = threadIdx;
State.PoolByBorrowedThread[threadIdx] = to;
Expand All @@ -143,16 +197,16 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
}

void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
statsCopy.resize(SharedThreadCount + 1);
statsCopy.resize(SharedThreadCount);
for (i16 i = 0; i < SharedThreadCount; ++i) {
Threads[i].Thread->GetSharedStats(poolId, statsCopy[i + 1]);
Threads[i].Thread->GetSharedStats(poolId, statsCopy[i]);
}
}

void TSharedExecutorPool::GetSharedStatsForHarmonizer(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
statsCopy.resize(SharedThreadCount + 1);
statsCopy.resize(SharedThreadCount);
for (i16 i = 0; i < SharedThreadCount; ++i) {
Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i + 1]);
Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i]);
}
}

Expand Down Expand Up @@ -181,4 +235,34 @@ TSharedPoolState TSharedExecutorPool::GetState() const {
return State;
}

ISharedExecutorPool *CreateSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads) {
return new TSharedExecutorPool(config, poolCount, poolsWithThreads);
}

TString TSharedPoolState::ToString() const {
TStringBuilder builder;
builder << '{';
builder << "ThreadByPool: [";
for (ui32 i = 0; i < ThreadByPool.size(); ++i) {
builder << ThreadByPool[i] << (i == ThreadByPool.size() - 1 ? "" : ", ");
}
builder << "], ";
builder << "PoolByThread: [";
for (ui32 i = 0; i < PoolByThread.size(); ++i) {
builder << PoolByThread[i] << (i == PoolByThread.size() - 1 ? "" : ", ");
}
builder << "], ";
builder << "BorrowedThreadByPool: [";
for (ui32 i = 0; i < BorrowedThreadByPool.size(); ++i) {
builder << BorrowedThreadByPool[i] << (i == BorrowedThreadByPool.size() - 1 ? "" : ", ");
}
builder << "], ";
builder << "PoolByBorrowedThread: [";
for (ui32 i = 0; i < PoolByBorrowedThread.size(); ++i) {
builder << PoolByBorrowedThread[i] << (i == PoolByBorrowedThread.size() - 1 ? "" : ", ");
}
builder << ']';
return builder << '}';
}

}
Loading

0 comments on commit 97c9f43

Please sign in to comment.