Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change TEvLocal actorsystem metrics #6355

Merged
merged 1 commit into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/base/pool_stats_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor {
void OnWakeup(const TActorContext &ctx) override {
MiniKQLPoolStats.Update();

TVector<std::tuple<TString, double, ui32>> pools;
TVector<std::tuple<TString, double, ui32, ui32>> pools;
for (const auto& pool : PoolCounters) {
pools.emplace_back(pool.Name, pool.Usage, pool.Threads);
pools.emplace_back(pool.Name, pool.Usage, pool.Threads, pool.LimitThreads);
}

ctx.Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ctx.SelfID.NodeId()), new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools));
Expand Down
13 changes: 6 additions & 7 deletions ydb/core/mind/local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,12 @@ class TLocalNodeRegistrar : public TActorBootstrapped<TLocalNodeRegistrar> {
auto& record = eventStatus->Record;
record.SetStartTime(StartTime.GetValue());
record.MutableResourceMaximum()->CopyFrom(ResourceLimit);
NActors::TExecutorPoolState userPoolState;
ctx.ExecutorThread.ActorSystem->GetExecutorPoolState(AppData()->UserPoolId, userPoolState);

if (!record.GetResourceMaximum().HasCPU()) {
TExecutorPoolStats poolStats;
TVector<TExecutorThreadStats> statsCopy;
TVector<TExecutorThreadStats> sharedStatsCopy;
ctx.ExecutorThread.ActorSystem->GetPoolStats(AppData()->UserPoolId, poolStats, statsCopy, sharedStatsCopy);
if (!statsCopy.empty()) {
record.MutableResourceMaximum()->SetCPU(poolStats.CurrentThreadCount * 1000000);
if (userPoolState.PossibleMaxLimit) {
record.MutableResourceMaximum()->SetCPU(userPoolState.PossibleMaxLimit * 1000000);
}
}
if (!record.GetResourceMaximum().HasMemory()) {
Expand Down Expand Up @@ -649,7 +648,7 @@ class TLocalNodeRegistrar : public TActorBootstrapped<TLocalNodeRegistrar> {
const NKikimrWhiteboard::TSystemStateInfo& info = record.GetSystemStateInfo(0);
if (static_cast<ui32>(info.PoolStatsSize()) > AppData()->UserPoolId) {
const auto& poolStats(info.GetPoolStats(AppData()->UserPoolId));
UserPoolUsage = poolStats.usage() * poolStats.threads() * 1000000; // uS
UserPoolUsage = poolStats.usage() * poolStats.limit() * 1000000; // uS
}

// Note: we use allocated memory because MemoryUsed(AnonRSS) has lag
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/node_whiteboard/node_whiteboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,13 @@ struct TEvWhiteboard{
}
}

TEvSystemStateUpdate(const TVector<std::tuple<TString, double, ui32>>& poolStats) {
TEvSystemStateUpdate(const TVector<std::tuple<TString, double, ui32, ui32>>& poolStats) {
for (const auto& row : poolStats) {
auto& pb = *Record.AddPoolStats();
pb.SetName(std::get<0>(row));
pb.SetUsage(std::get<1>(row));
pb.SetThreads(std::get<2>(row));
pb.SetLimit(std::get<3>(row));
}
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/node_whiteboard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ message TSystemStateInfo {
optional string Name = 1;
optional double Usage = 2 [(InsignificantChangePercent) = 30];
optional uint32 Threads = 3;
optional uint32 Limit = 4;
}

message TEndpoint {
Expand Down
9 changes: 9 additions & 0 deletions ydb/library/actors/core/actorsystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,13 @@ namespace NActors {
CpuManager->Cleanup();
Scheduler.Destroy();
}

void TActorSystem::GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const {
CpuManager->GetExecutorPoolState(poolId, state);
}

void TActorSystem::GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const {
CpuManager->GetExecutorPoolStates(states);
}

}
3 changes: 3 additions & 0 deletions ydb/library/actors/core/actorsystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,5 +306,8 @@ namespace NActors {
return CpuManager->GetBasicExecutorPools();
}

void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const;
void GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const;

};
}
14 changes: 14 additions & 0 deletions ydb/library/actors/core/cpu_manager.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "cpu_manager.h"
#include "executor_pool_jail.h"
#include "mon_stats.h"
#include "probes.h"

#include "executor_pool_basic.h"
Expand Down Expand Up @@ -172,4 +173,17 @@ namespace NActors {
}
}

void TCpuManager::GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const {
if (static_cast<ui32>(poolId) < ExecutorPoolCount) {
Executors[poolId]->GetExecutorPoolState(state);
}
}

void TCpuManager::GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const {
states.resize(ExecutorPoolCount);
for (i16 poolId = 0; poolId < static_cast<ui16>(ExecutorPoolCount); ++poolId) {
GetExecutorPoolState(poolId, states[poolId]);
}
}

}
3 changes: 3 additions & 0 deletions ydb/library/actors/core/cpu_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "harmonizer.h"
#include "executor_pool.h"
#include "executor_pool_shared.h"
#include "mon_stats.h"
#include <memory>

namespace NActors {
Expand Down Expand Up @@ -47,6 +48,8 @@ namespace NActors {
}

void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy, TVector<TExecutorThreadStats>& sharedStatsCopy) const;
void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const;
void GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const;

THarmonizerStats GetHarmonizerStats() const {
if (Harmonizer) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/library/actors/core/executor_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace NActors {
struct TMailboxHeader;
struct TWorkerContext;
struct TExecutorPoolStats;
struct TExecutorPoolState;
struct TExecutorThreadStats;
class TExecutorPoolJail;
class ISchedulerCookie;
Expand Down Expand Up @@ -108,6 +109,10 @@ namespace NActors {
Y_UNUSED(statsCopy);
}

virtual void GetExecutorPoolState(TExecutorPoolState &poolState) const {
Y_UNUSED(poolState);
}

virtual TString GetName() const {
return TString();
}
Expand Down
14 changes: 14 additions & 0 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "mailbox.h"
#include "thread_context.h"
#include <atomic>
#include <memory>
#include <ydb/library/actors/util/affinity.h>
#include <ydb/library/actors/util/datetime.h>

Expand Down Expand Up @@ -425,6 +426,19 @@ namespace NActors {
}
}

void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
if (Harmonizer) {
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
poolState.UsedCpu = stats.AvgConsumedCpu;
poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount;
} else {
poolState.PossibleMaxLimit = poolState.MaxLimit;
}
poolState.CurrentLimit = GetThreadCount();
poolState.MaxLimit = GetMaxThreadCount();
poolState.MinLimit = GetDefaultThreadCount();
}

void TBasicExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
TAffinityGuard affinityGuard(Affinity());

Expand Down
1 change: 1 addition & 0 deletions ydb/library/actors/core/executor_pool_basic.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ namespace NActors {
void Shutdown() override;

void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const override;
void GetExecutorPoolState(TExecutorPoolState &poolState) const override;
TString GetName() const override {
return PoolName;
}
Expand Down
17 changes: 15 additions & 2 deletions ydb/library/actors/core/executor_pool_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ namespace NActors {
, PoolName(poolName)
{}

TIOExecutorPool::TIOExecutorPool(const TIOExecutorPoolConfig& cfg)
TIOExecutorPool::TIOExecutorPool(const TIOExecutorPoolConfig& cfg, IHarmonizer *harmonizer)
: TIOExecutorPool(
cfg.PoolId,
cfg.Threads,
cfg.PoolName,
new TAffinity(cfg.Affinity)
)
{}
{
Harmonizer = harmonizer;
}

TIOExecutorPool::~TIOExecutorPool() {
Threads.Destroy();
Expand Down Expand Up @@ -148,6 +150,17 @@ namespace NActors {
}
}

void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
if (Harmonizer) {
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
poolState.UsedCpu = stats.AvgConsumedCpu;
}
poolState.CurrentLimit = PoolThreads;
poolState.MaxLimit = PoolThreads;
poolState.MinLimit = PoolThreads;
poolState.PossibleMaxLimit = PoolThreads;
}

TString TIOExecutorPool::GetName() const {
return PoolName;
}
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/actors/core/executor_pool_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "actorsystem.h"
#include "executor_thread.h"
#include "executor_thread_ctx.h"
#include "harmonizer.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
#include <ydb/library/actors/actor_type/indexes.h>
Expand All @@ -20,12 +21,13 @@ namespace NActors {

THolder<NSchedulerQueue::TQueueType> ScheduleQueue;
TTicketLock ScheduleLock;
IHarmonizer *Harmonizer = nullptr;

const TString PoolName;
const ui32 ActorSystemIndex = NActors::TActorTypeOperator::GetActorSystemIndex();
public:
TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName = "", TAffinity* affinity = nullptr);
explicit TIOExecutorPool(const TIOExecutorPoolConfig& cfg);
explicit TIOExecutorPool(const TIOExecutorPoolConfig& cfg, IHarmonizer *harmonizer = nullptr);
~TIOExecutorPool();

ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override;
Expand All @@ -42,6 +44,7 @@ namespace NActors {
void Shutdown() override;

void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const override;
void GetExecutorPoolState(TExecutorPoolState &poolState) const override;
TString GetName() const override;
};
}
Loading
Loading