Skip to content

Commit

Permalink
Merge 6b9fe41 into bd51e57
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall authored Jul 9, 2024
2 parents bd51e57 + 6b9fe41 commit f3296f5
Show file tree
Hide file tree
Showing 18 changed files with 151 additions and 10 deletions.
4 changes: 2 additions & 2 deletions ydb/core/base/pool_stats_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor {
void OnWakeup(const TActorContext &ctx) override {
MiniKQLPoolStats.Update();

TVector<std::tuple<TString, double, ui32>> pools;
TVector<std::tuple<TString, double, ui32, ui32>> pools;
for (const auto& pool : PoolCounters) {
pools.emplace_back(pool.Name, pool.Usage, pool.Threads);
pools.emplace_back(pool.Name, pool.Usage, pool.Threads, pool.LimitThreads);
}

ctx.Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ctx.SelfID.NodeId()), new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools));
Expand Down
37 changes: 30 additions & 7 deletions ydb/core/mind/local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ class TLocalNodeRegistrar : public TActorBootstrapped<TLocalNodeRegistrar> {
ui64 MemLimit = 0;
double NodeUsage = 0;

TInstant LastUpdate;
std::vector<TExecutorPoolState> PreviousStates;

bool SentDrainNode = false;
bool DrainResultReceived = false;
i32 PrevEstimate = 0;
Expand Down Expand Up @@ -278,15 +281,35 @@ class TLocalNodeRegistrar : public TActorBootstrapped<TLocalNodeRegistrar> {
auto& record = eventStatus->Record;
record.SetStartTime(StartTime.GetValue());
record.MutableResourceMaximum()->CopyFrom(ResourceLimit);
std::vector<NActors::TExecutorPoolState> poolStates;
ctx.ExecutorThread.ActorSystem->GetExecutorPoolStates(poolStates);

TDuration passedTime = ctx.Now() - LastUpdate;
LastUpdate = ctx.Now();

auto *actorSystemInfo = record.MutableActorSystemInfo();
double cores = 0;
for (ui8 poolId = 0; poolId < poolStates.size(); ++poolId) {
auto &poolState = poolStates[poolId];
if (poolId != AppData()->IOPoolId) {
cores += poolState.MinLimit;
}
auto *poolInfo = actorSystemInfo->AddPools();
double passedElapsedUs = poolState.ElapsedUs;
if (PreviousStates.size()) {
passedElapsedUs -= PreviousStates[poolId].ElapsedUs;
}
poolInfo->SetUsedCores(passedElapsedUs / passedTime.MicroSeconds());
poolInfo->SetCurrentLimit(poolState.CurrentLimit);
poolInfo->SetPossibleMaxLimit(poolState.PossibleMaxLimit);
}
actorSystemInfo->SetCores(cores);
if (!record.GetResourceMaximum().HasCPU()) {
TExecutorPoolStats poolStats;
TVector<TExecutorThreadStats> statsCopy;
TVector<TExecutorThreadStats> sharedStatsCopy;
ctx.ExecutorThread.ActorSystem->GetPoolStats(AppData()->UserPoolId, poolStats, statsCopy, sharedStatsCopy);
if (!statsCopy.empty()) {
record.MutableResourceMaximum()->SetCPU(poolStats.CurrentThreadCount * 1000000);
if (!poolStates.empty()) {
record.MutableResourceMaximum()->SetCPU(poolStates[AppData()->UserPoolId].PossibleMaxLimit * 1000000);
}
}
PreviousStates.swap(poolStates);
if (!record.GetResourceMaximum().HasMemory()) {
if (MemLimit != 0) {
record.MutableResourceMaximum()->SetMemory(MemLimit);
Expand Down Expand Up @@ -649,7 +672,7 @@ class TLocalNodeRegistrar : public TActorBootstrapped<TLocalNodeRegistrar> {
const NKikimrWhiteboard::TSystemStateInfo& info = record.GetSystemStateInfo(0);
if (static_cast<ui32>(info.PoolStatsSize()) > AppData()->UserPoolId) {
const auto& poolStats(info.GetPoolStats(AppData()->UserPoolId));
UserPoolUsage = poolStats.usage() * poolStats.threads() * 1000000; // uS
UserPoolUsage = poolStats.usage() * poolStats.limit() * 1000000; // uS
}

// Note: we use allocated memory because MemoryUsed(AnonRSS) has lag
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/node_whiteboard/node_whiteboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,13 @@ struct TEvWhiteboard{
}
}

TEvSystemStateUpdate(const TVector<std::tuple<TString, double, ui32>>& poolStats) {
TEvSystemStateUpdate(const TVector<std::tuple<TString, double, ui32, ui32>>& poolStats) {
for (const auto& row : poolStats) {
auto& pb = *Record.AddPoolStats();
pb.SetName(std::get<0>(row));
pb.SetUsage(std::get<1>(row));
pb.SetThreads(std::get<2>(row));
pb.SetLimit(std::get<3>(row));
}
}

Expand Down
11 changes: 11 additions & 0 deletions ydb/core/protos/local.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ message TLocalConfig {
repeated NKikimrSchemeOp.TResourceProfile ResourceProfiles = 1;
}

message TActorSystemInfo {
message TPoolInfo {
optional double UsedCores = 1;
optional double CurrentLimit = 2;
optional double PossibleMaxLimit = 3;
}
repeated TPoolInfo Pools = 1;
optional double Cores = 2;
}

message TEvPing {
optional fixed64 HiveId = 1;
optional uint32 HiveGeneration = 2;
Expand All @@ -53,6 +63,7 @@ message TEvStatus {
optional uint64 AvailableWeight = 5;
optional NKikimrTabletBase.TMetrics ResourceMaximum = 8;
optional uint64 StartTime = 7;
optional TActorSystemInfo ActorSystemInfo = 9;
}

enum EBootMode {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/node_whiteboard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ message TSystemStateInfo {
optional string Name = 1;
optional double Usage = 2 [(InsignificantChangePercent) = 30];
optional uint32 Threads = 3;
optional uint32 Limit = 4;
}

message TEndpoint {
Expand Down
9 changes: 9 additions & 0 deletions ydb/library/actors/core/actorsystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,13 @@ namespace NActors {
CpuManager->Cleanup();
Scheduler.Destroy();
}

void TActorSystem::GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const {
CpuManager->GetExecutorPoolState(poolId, state);
}

void TActorSystem::GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const {
CpuManager->GetExecutorPoolStates(states);
}

}
3 changes: 3 additions & 0 deletions ydb/library/actors/core/actorsystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,5 +306,8 @@ namespace NActors {
return CpuManager->GetBasicExecutorPools();
}

void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const;
void GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const;

};
}
19 changes: 19 additions & 0 deletions ydb/library/actors/core/cpu_manager.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "cpu_manager.h"
#include "executor_pool_jail.h"
#include "mon_stats.h"
#include "probes.h"

#include "executor_pool_basic.h"
Expand Down Expand Up @@ -172,4 +173,22 @@ namespace NActors {
}
}

void TCpuManager::GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const {
if (static_cast<ui32>(poolId) < ExecutorPoolCount) {
Executors[poolId]->GetExecutorPoolState(state);
}
if (Shared) {
TExecutorPoolState sharedState;
Shared->GetExecutorPoolState(poolId, sharedState);
state.Aggregate(sharedState);
}
}

void TCpuManager::GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const {
states.resize(ExecutorPoolCount);
for (i16 poolId = 0; poolId < static_cast<ui16>(ExecutorPoolCount); ++poolId) {
GetExecutorPoolState(poolId, states[poolId]);
}
}

}
3 changes: 3 additions & 0 deletions ydb/library/actors/core/cpu_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "harmonizer.h"
#include "executor_pool.h"
#include "executor_pool_shared.h"
#include "mon_stats.h"
#include <memory>

namespace NActors {
Expand Down Expand Up @@ -47,6 +48,8 @@ namespace NActors {
}

void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy, TVector<TExecutorThreadStats>& sharedStatsCopy) const;
void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const;
void GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const;

THarmonizerStats GetHarmonizerStats() const {
if (Harmonizer) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/library/actors/core/executor_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace NActors {
struct TMailboxHeader;
struct TWorkerContext;
struct TExecutorPoolStats;
struct TExecutorPoolState;
struct TExecutorThreadStats;
class TExecutorPoolJail;
class ISchedulerCookie;
Expand Down Expand Up @@ -108,6 +109,10 @@ namespace NActors {
Y_UNUSED(statsCopy);
}

virtual void GetExecutorPoolState(TExecutorPoolState &poolState) const {
Y_UNUSED(poolState);
}

virtual TString GetName() const {
return TString();
}
Expand Down
20 changes: 20 additions & 0 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "mailbox.h"
#include "thread_context.h"
#include <atomic>
#include <memory>
#include <ydb/library/actors/util/affinity.h>
#include <ydb/library/actors/util/datetime.h>

Expand Down Expand Up @@ -425,6 +426,25 @@ namespace NActors {
}
}

void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
ui64 ticks = 0;
for (i16 i = 0; i < PoolThreads; ++i) {
TExecutorThreadStats stats;
Threads[i].Thread->GetCurrentStatsForHarmonizer(stats);
ticks += stats.SafeElapsedTicks;
}
poolState.ElapsedUs = Ts2Us(ticks);
poolState.CurrentLimit = GetThreadCount();
poolState.MaxLimit = GetMaxThreadCount();
poolState.MinLimit = GetDefaultThreadCount();
if (Harmonizer) {
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount;
} else {
poolState.PossibleMaxLimit = poolState.MaxLimit;
}
}

void TBasicExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
TAffinityGuard affinityGuard(Affinity());

Expand Down
1 change: 1 addition & 0 deletions ydb/library/actors/core/executor_pool_basic.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ namespace NActors {
void Shutdown() override;

void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const override;
void GetExecutorPoolState(TExecutorPoolState &poolState) const override;
TString GetName() const override {
return PoolName;
}
Expand Down
14 changes: 14 additions & 0 deletions ydb/library/actors/core/executor_pool_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,20 @@ namespace NActors {
}
}

void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
ui64 ticks = 0;
for (i16 i = 0; i < PoolThreads; ++i) {
TExecutorThreadStats stats;
Threads[i].Thread->GetCurrentStatsForHarmonizer(stats);
ticks += stats.SafeElapsedTicks;
}
poolState.ElapsedUs = Ts2Us(ticks);
poolState.CurrentLimit = PoolThreads;
poolState.MaxLimit = PoolThreads;
poolState.MinLimit = PoolThreads;
poolState.PossibleMaxLimit = PoolThreads;
}

TString TIOExecutorPool::GetName() const {
return PoolName;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/library/actors/core/executor_pool_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace NActors {
void Shutdown() override;

void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const override;
void GetExecutorPoolState(TExecutorPoolState &poolState) const override;
TString GetName() const override;
};
}
10 changes: 10 additions & 0 deletions ydb/library/actors/core/executor_pool_shared.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@ void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector<TExecutorThread
}
}

void TSharedExecutorPool::GetExecutorPoolState(i16 poolId, TExecutorPoolState &poolState) const {
ui64 ticks = 0;
for (i16 i = 0; i < SharedThreadCount; ++i) {
TExecutorThreadStats stats;
Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, stats);
ticks += stats.SafeElapsedTicks;
}
poolState.ElapsedUs = Ts2Us(ticks);
}

void TSharedExecutorPool::GetSharedStatsForHarmonizer(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
statsCopy.resize(SharedThreadCount + 1);
for (i16 i = 0; i < SharedThreadCount; ++i) {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/actors/core/executor_pool_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ namespace NActors {
i16 GetSharedThreadCount() const;

TSharedPoolState GetState() const;
void GetExecutorPoolState(i16 pool, TExecutorPoolState &poolState) const;

private:
TSharedPoolState State;
Expand Down
16 changes: 16 additions & 0 deletions ydb/library/actors/core/mon_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ namespace NActors {
ui64 Buckets[65];
};

struct TExecutorPoolState {
double ElapsedUs = 0;
double CurrentLimit = 0;
double PossibleMaxLimit = 0;
double MaxLimit = 0;
double MinLimit = 0;

void Aggregate(const TExecutorPoolState& other) {
ElapsedUs += other.ElapsedUs;
CurrentLimit += other.CurrentLimit;
PossibleMaxLimit += other.PossibleMaxLimit;
MaxLimit += other.MaxLimit;
MinLimit += other.MinLimit;
}
};

struct TExecutorPoolStats {
ui64 MaxUtilizationTime = 0;
ui64 IncreasingThreadsByNeedyState = 0;
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/actors/helpers/pool_stats_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,15 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
THPTimer UsageTimer;
TString Name;
double Threads;
double LimitThreads;

void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) {
LastElapsedSeconds = 0;
Usage = 0;
UsageTimer.Reset();
Name = poolName;
Threads = threads;
LimitThreads = threads;

PoolGroup = group->GetSubgroup("execpool", poolName);

Expand Down Expand Up @@ -374,6 +376,7 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
Y_UNUSED(stats);
#endif
Threads = poolStats.CurrentThreadCount;
LimitThreads = poolStats.PotentialMaxThreadCount;
}
};

Expand Down

0 comments on commit f3296f5

Please sign in to comment.