Skip to content

Commit

Permalink
Change TEvLocal actorsystem metrics (#6355)
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall authored Jul 11, 2024
1 parent 789cd49 commit 5247385
Show file tree
Hide file tree
Showing 17 changed files with 143 additions and 53 deletions.
4 changes: 2 additions & 2 deletions ydb/core/base/pool_stats_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor {
void OnWakeup(const TActorContext &ctx) override {
MiniKQLPoolStats.Update();

TVector<std::tuple<TString, double, ui32>> pools;
TVector<std::tuple<TString, double, ui32, ui32>> pools;
for (const auto& pool : PoolCounters) {
pools.emplace_back(pool.Name, pool.Usage, pool.Threads);
pools.emplace_back(pool.Name, pool.Usage, pool.Threads, pool.LimitThreads);
}

ctx.Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ctx.SelfID.NodeId()), new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools));
Expand Down
13 changes: 6 additions & 7 deletions ydb/core/mind/local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,12 @@ class TLocalNodeRegistrar : public TActorBootstrapped<TLocalNodeRegistrar> {
auto& record = eventStatus->Record;
record.SetStartTime(StartTime.GetValue());
record.MutableResourceMaximum()->CopyFrom(ResourceLimit);
NActors::TExecutorPoolState userPoolState;
ctx.ExecutorThread.ActorSystem->GetExecutorPoolState(AppData()->UserPoolId, userPoolState);

if (!record.GetResourceMaximum().HasCPU()) {
TExecutorPoolStats poolStats;
TVector<TExecutorThreadStats> statsCopy;
TVector<TExecutorThreadStats> sharedStatsCopy;
ctx.ExecutorThread.ActorSystem->GetPoolStats(AppData()->UserPoolId, poolStats, statsCopy, sharedStatsCopy);
if (!statsCopy.empty()) {
record.MutableResourceMaximum()->SetCPU(poolStats.CurrentThreadCount * 1000000);
if (userPoolState.PossibleMaxLimit) {
record.MutableResourceMaximum()->SetCPU(userPoolState.PossibleMaxLimit * 1000000);
}
}
if (!record.GetResourceMaximum().HasMemory()) {
Expand Down Expand Up @@ -649,7 +648,7 @@ class TLocalNodeRegistrar : public TActorBootstrapped<TLocalNodeRegistrar> {
const NKikimrWhiteboard::TSystemStateInfo& info = record.GetSystemStateInfo(0);
if (static_cast<ui32>(info.PoolStatsSize()) > AppData()->UserPoolId) {
const auto& poolStats(info.GetPoolStats(AppData()->UserPoolId));
UserPoolUsage = poolStats.usage() * poolStats.threads() * 1000000; // uS
UserPoolUsage = poolStats.usage() * poolStats.limit() * 1000000; // uS
}

// Note: we use allocated memory because MemoryUsed(AnonRSS) has lag
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/node_whiteboard/node_whiteboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,13 @@ struct TEvWhiteboard{
}
}

TEvSystemStateUpdate(const TVector<std::tuple<TString, double, ui32>>& poolStats) {
TEvSystemStateUpdate(const TVector<std::tuple<TString, double, ui32, ui32>>& poolStats) {
for (const auto& row : poolStats) {
auto& pb = *Record.AddPoolStats();
pb.SetName(std::get<0>(row));
pb.SetUsage(std::get<1>(row));
pb.SetThreads(std::get<2>(row));
pb.SetLimit(std::get<3>(row));
}
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/node_whiteboard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ message TSystemStateInfo {
optional string Name = 1;
optional double Usage = 2 [(InsignificantChangePercent) = 30];
optional uint32 Threads = 3;
optional uint32 Limit = 4;
}

message TEndpoint {
Expand Down
9 changes: 9 additions & 0 deletions ydb/library/actors/core/actorsystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,13 @@ namespace NActors {
CpuManager->Cleanup();
Scheduler.Destroy();
}

void TActorSystem::GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const {
CpuManager->GetExecutorPoolState(poolId, state);
}

void TActorSystem::GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const {
CpuManager->GetExecutorPoolStates(states);
}

}
3 changes: 3 additions & 0 deletions ydb/library/actors/core/actorsystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,5 +306,8 @@ namespace NActors {
return CpuManager->GetBasicExecutorPools();
}

void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const;
void GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const;

};
}
14 changes: 14 additions & 0 deletions ydb/library/actors/core/cpu_manager.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "cpu_manager.h"
#include "executor_pool_jail.h"
#include "mon_stats.h"
#include "probes.h"

#include "executor_pool_basic.h"
Expand Down Expand Up @@ -172,4 +173,17 @@ namespace NActors {
}
}

void TCpuManager::GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const {
if (static_cast<ui32>(poolId) < ExecutorPoolCount) {
Executors[poolId]->GetExecutorPoolState(state);
}
}

void TCpuManager::GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const {
states.resize(ExecutorPoolCount);
for (i16 poolId = 0; poolId < static_cast<ui16>(ExecutorPoolCount); ++poolId) {
GetExecutorPoolState(poolId, states[poolId]);
}
}

}
3 changes: 3 additions & 0 deletions ydb/library/actors/core/cpu_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "harmonizer.h"
#include "executor_pool.h"
#include "executor_pool_shared.h"
#include "mon_stats.h"
#include <memory>

namespace NActors {
Expand Down Expand Up @@ -47,6 +48,8 @@ namespace NActors {
}

void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy, TVector<TExecutorThreadStats>& sharedStatsCopy) const;
void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const;
void GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const;

THarmonizerStats GetHarmonizerStats() const {
if (Harmonizer) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/library/actors/core/executor_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace NActors {
struct TMailboxHeader;
struct TWorkerContext;
struct TExecutorPoolStats;
struct TExecutorPoolState;
struct TExecutorThreadStats;
class TExecutorPoolJail;
class ISchedulerCookie;
Expand Down Expand Up @@ -108,6 +109,10 @@ namespace NActors {
Y_UNUSED(statsCopy);
}

virtual void GetExecutorPoolState(TExecutorPoolState &poolState) const {
Y_UNUSED(poolState);
}

virtual TString GetName() const {
return TString();
}
Expand Down
14 changes: 14 additions & 0 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "mailbox.h"
#include "thread_context.h"
#include <atomic>
#include <memory>
#include <ydb/library/actors/util/affinity.h>
#include <ydb/library/actors/util/datetime.h>

Expand Down Expand Up @@ -425,6 +426,19 @@ namespace NActors {
}
}

void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
if (Harmonizer) {
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
poolState.UsedCpu = stats.AvgConsumedCpu;
poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount;
} else {
poolState.PossibleMaxLimit = poolState.MaxLimit;
}
poolState.CurrentLimit = GetThreadCount();
poolState.MaxLimit = GetMaxThreadCount();
poolState.MinLimit = GetDefaultThreadCount();
}

void TBasicExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
TAffinityGuard affinityGuard(Affinity());

Expand Down
1 change: 1 addition & 0 deletions ydb/library/actors/core/executor_pool_basic.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ namespace NActors {
void Shutdown() override;

void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const override;
void GetExecutorPoolState(TExecutorPoolState &poolState) const override;
TString GetName() const override {
return PoolName;
}
Expand Down
17 changes: 15 additions & 2 deletions ydb/library/actors/core/executor_pool_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ namespace NActors {
, PoolName(poolName)
{}

TIOExecutorPool::TIOExecutorPool(const TIOExecutorPoolConfig& cfg)
TIOExecutorPool::TIOExecutorPool(const TIOExecutorPoolConfig& cfg, IHarmonizer *harmonizer)
: TIOExecutorPool(
cfg.PoolId,
cfg.Threads,
cfg.PoolName,
new TAffinity(cfg.Affinity)
)
{}
{
Harmonizer = harmonizer;
}

TIOExecutorPool::~TIOExecutorPool() {
Threads.Destroy();
Expand Down Expand Up @@ -148,6 +150,17 @@ namespace NActors {
}
}

void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
if (Harmonizer) {
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
poolState.UsedCpu = stats.AvgConsumedCpu;
}
poolState.CurrentLimit = PoolThreads;
poolState.MaxLimit = PoolThreads;
poolState.MinLimit = PoolThreads;
poolState.PossibleMaxLimit = PoolThreads;
}

TString TIOExecutorPool::GetName() const {
return PoolName;
}
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/actors/core/executor_pool_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "actorsystem.h"
#include "executor_thread.h"
#include "executor_thread_ctx.h"
#include "harmonizer.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
#include <ydb/library/actors/actor_type/indexes.h>
Expand All @@ -20,12 +21,13 @@ namespace NActors {

THolder<NSchedulerQueue::TQueueType> ScheduleQueue;
TTicketLock ScheduleLock;
IHarmonizer *Harmonizer = nullptr;

const TString PoolName;
const ui32 ActorSystemIndex = NActors::TActorTypeOperator::GetActorSystemIndex();
public:
TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName = "", TAffinity* affinity = nullptr);
explicit TIOExecutorPool(const TIOExecutorPoolConfig& cfg);
explicit TIOExecutorPool(const TIOExecutorPoolConfig& cfg, IHarmonizer *harmonizer = nullptr);
~TIOExecutorPool();

ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override;
Expand All @@ -42,6 +44,7 @@ namespace NActors {
void Shutdown() override;

void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const override;
void GetExecutorPoolState(TExecutorPoolState &poolState) const override;
TString GetName() const override;
};
}
Loading

0 comments on commit 5247385

Please sign in to comment.