Skip to content

Commit

Permalink
Add local queue configs (ydb-platform#14802)
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall authored Feb 20, 2025
1 parent 4a69e43 commit 2992756
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 26 deletions.
7 changes: 7 additions & 0 deletions ydb/core/driver_lib/run/auto_config_initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,13 @@ namespace NKikimr::NAutoConfigInitializer {
} else {
executor->SetSpinThreshold(1);
}

if (config->HasMinLocalQueueSize()) {
executor->SetMinLocalQueueSize(config->GetMinLocalQueueSize());
}
if (config->HasMaxLocalQueueSize()) {
executor->SetMaxLocalQueueSize(config->GetMaxLocalQueueSize());
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions ydb/core/driver_lib/run/config_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,14 @@ void AddExecutorPool(NActors::TCpuManagerConfig& cpuManager, const NKikimrConfig
basic.MaxThreadCount = poolConfig.GetMaxThreads();
basic.DefaultThreadCount = poolConfig.GetThreads();
basic.Priority = poolConfig.GetPriority();
if (poolConfig.HasMinLocalQueueSize()) {
basic.MinLocalQueueSize = poolConfig.GetMinLocalQueueSize();
}
if (poolConfig.HasMaxLocalQueueSize()) {
basic.MaxLocalQueueSize = poolConfig.GetMaxLocalQueueSize();
}
cpuManager.Basic.emplace_back(std::move(basic));

break;
}

Expand Down
5 changes: 5 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ message TActorSystemConfig {
optional int32 MaxAvgPingDeviation = 17;

optional bool HasSharedThread = 18;
optional uint32 MaxLocalQueueSize = 20;
optional uint32 MinLocalQueueSize = 21;
}

message TScheduler {
Expand Down Expand Up @@ -125,6 +127,9 @@ message TActorSystemConfig {

optional bool MonitorStuckActors = 15;
optional EActorSystemProfile ActorSystemProfile = 16;

optional uint32 MinLocalQueueSize = 20;
optional uint32 MaxLocalQueueSize = 21;
}

message TStaticNameserviceConfig {
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/actors/core/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ namespace NActors {
EASProfile ActorSystemProfile = EASProfile::Default;
bool HasSharedThread = false;
bool UseRingQueue = false;
ui16 MinLocalQueueSize = 0;
ui16 MaxLocalQueueSize = 0;
};

struct TSharedExecutorPoolConfig {
Expand Down
31 changes: 20 additions & 11 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ namespace NActors {
, Harmonizer(harmonizer)
, SoftProcessingDurationTs(cfg.SoftProcessingDurationTs)
, HasOwnSharedThread(cfg.HasSharedThread)
, MaxLocalQueueSize(cfg.MaxLocalQueueSize)
, MinLocalQueueSize(cfg.MinLocalQueueSize)
, Priority(cfg.Priority)
, Jail(jail)
, ActorSystemProfile(cfg.ActorSystemProfile)
Expand All @@ -124,13 +126,9 @@ namespace NActors {
threads = threads - 1;
}

if constexpr (NFeatures::IsLocalQueues()) {
if (MaxLocalQueueSize) {
LocalQueues.Reset(new NThreading::TPadded<std::queue<ui32>>[threads]);
if constexpr (NFeatures::TLocalQueuesFeatureFlags::FIXED_LOCAL_QUEUE_SIZE) {
LocalQueueSize = *NFeatures::TLocalQueuesFeatureFlags::FIXED_LOCAL_QUEUE_SIZE;
} else {
LocalQueueSize = NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE;
}
LocalQueueSize = MinLocalQueueSize;
}
if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) {
for (ui32 idx = 0; idx < threads; ++idx) {
Expand Down Expand Up @@ -284,7 +282,7 @@ namespace NActors {
}

TMailbox* TBasicExecutorPool::GetReadyActivation(ui64 revolvingCounter) {
if constexpr (NFeatures::IsLocalQueues()) {
if (MaxLocalQueueSize) {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "local queue");
return GetReadyActivationLocalQueue(revolvingCounter);
} else {
Expand Down Expand Up @@ -389,7 +387,7 @@ namespace NActors {
}

void TBasicExecutorPool::ScheduleActivationEx(TMailbox* mailbox, ui64 revolvingCounter) {
if constexpr (NFeatures::IsLocalQueues()) {
if (MaxLocalQueueSize) {
ScheduleActivationExLocalQueue(mailbox, revolvingCounter);
} else {
ScheduleActivationExCommon(mailbox, revolvingCounter, AtomicGet(Semaphore));
Expand Down Expand Up @@ -629,9 +627,8 @@ namespace NActors {
}

void TBasicExecutorPool::SetLocalQueueSize(ui16 size) {
if constexpr (!NFeatures::TLocalQueuesFeatureFlags::FIXED_LOCAL_QUEUE_SIZE) {
LocalQueueSize.store(std::max(size, NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE), std::memory_order_relaxed);
}
size = std::min(size, MaxLocalQueueSize);
LocalQueueSize.store(size, std::memory_order_relaxed);
}

void TBasicExecutorPool::Initialize() {
Expand Down Expand Up @@ -753,4 +750,16 @@ namespace NActors {
return EventsPerMailboxValue;
}

ui16 TBasicExecutorPool::GetLocalQueueSize() const {
return LocalQueueSize.load(std::memory_order_relaxed);
}

ui16 TBasicExecutorPool::GetMaxLocalQueueSize() const {
return MaxLocalQueueSize;
}

ui16 TBasicExecutorPool::GetMinLocalQueueSize() const {
return MinLocalQueueSize;
}

}
6 changes: 5 additions & 1 deletion ydb/library/actors/core/executor_pool_basic.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ namespace NActors {
IHarmonizer *Harmonizer;
ui64 SoftProcessingDurationTs = 0;
bool HasOwnSharedThread = false;
ui16 MaxLocalQueueSize = 0;
ui16 MinLocalQueueSize = 0;

const i16 Priority = 0;
const ui32 ActorSystemIndex = NActors::TActorTypeOperator::GetActorSystemIndex();
Expand Down Expand Up @@ -243,7 +245,9 @@ namespace NActors {
void ScheduleActivationExLocalQueue(TMailbox* mailbox, ui64 revolvingWriteCounter);

void SetLocalQueueSize(ui16 size);

ui16 GetLocalQueueSize() const;
ui16 GetMaxLocalQueueSize() const;
ui16 GetMinLocalQueueSize() const;
void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
void Start() override;
void PrepareStop() override;
Expand Down
8 changes: 0 additions & 8 deletions ydb/library/actors/core/executor_pool_basic_feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ namespace NActors::NFeatures {
struct TLocalQueuesFeatureFlags {
static constexpr EActorSystemOptimizationType OptimizationType = EActorSystemOptimizationType::LocalQueues;

static constexpr ui16 MIN_LOCAL_QUEUE_SIZE = 0;
static constexpr ui16 MAX_LOCAL_QUEUE_SIZE = 16;
static constexpr std::optional<ui16> FIXED_LOCAL_QUEUE_SIZE = std::nullopt;

static constexpr bool UseIfAllOtherThreadsAreSleeping = false;
static constexpr bool UseOnMicroburst = false;
};
Expand All @@ -43,8 +39,4 @@ namespace NActors::NFeatures {
return TFeatureFlags::OptimizationType == EActorSystemOptimizationType::Common;
}

consteval bool IsLocalQueues() {
return TFeatureFlags::OptimizationType == EActorSystemOptimizationType::LocalQueues;
}

}
11 changes: 7 additions & 4 deletions ydb/library/actors/core/harmonizer/harmonizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,11 @@ void THarmonizer::ProcessNeedyState() {
ProcessingBudget -= 1.0;
LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
}
if constexpr (NFeatures::IsLocalQueues()) {
if (pool.MaxLocalQueueSize) {
bool needToExpandLocalQueue = ProcessingBudget < 1.0 || threadCount >= pool.MaxFullThreadCount;
needToExpandLocalQueue &= (bool)pool.BasicPool;
needToExpandLocalQueue &= (pool.MaxFullThreadCount > 1);
needToExpandLocalQueue &= (pool.LocalQueueSize < NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE);
needToExpandLocalQueue &= (pool.LocalQueueSize < pool.MaxLocalQueueSize);
if (needToExpandLocalQueue) {
pool.BasicPool->SetLocalQueueSize(++pool.LocalQueueSize);
}
Expand Down Expand Up @@ -258,8 +258,8 @@ void THarmonizer::ProcessHoggishState() {
if (threadCount == pool.MinFullThreadCount && Shared && SharedInfo.ForeignThreadsAllowed[hoggishPoolIdx] != 0) {
Shared->SetForeignThreadSlots(hoggishPoolIdx, 0);
}
if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) {
pool.LocalQueueSize = std::min<ui16>(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2);
if (pool.BasicPool && pool.LocalQueueSize > pool.MinLocalQueueSize) {
pool.LocalQueueSize = std::min<ui16>(pool.MinLocalQueueSize, pool.LocalQueueSize / 2);
pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize);
}
HARMONIZER_DEBUG_PRINT("poolIdx", hoggishPoolIdx, "threadCount", threadCount, "pool.MinFullThreadCount", pool.MinFullThreadCount, "freeCpu", freeCpu);
Expand Down Expand Up @@ -389,6 +389,9 @@ void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) {
if (poolInfo.BasicPool) {
poolInfo.WaitingStats.reset(new TWaitingStats<ui64>());
poolInfo.MovingWaitingStats.reset(new TWaitingStats<double>());
poolInfo.MinLocalQueueSize = poolInfo.BasicPool->GetMinLocalQueueSize();
poolInfo.MaxLocalQueueSize = poolInfo.BasicPool->GetMaxLocalQueueSize();
poolInfo.LocalQueueSize = poolInfo.MinLocalQueueSize;
}
PriorityOrder.clear();
}
Expand Down
1 change: 0 additions & 1 deletion ydb/library/actors/core/harmonizer/pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);

TPoolInfo::TPoolInfo()
: LocalQueueSize(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE)
{}

double TPoolInfo::GetCpu(i16 threadIdx) const {
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/actors/core/harmonizer/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ struct TPoolInfo {
float MinThreadCount = 0;
float MaxThreadCount = 0;

ui16 LocalQueueSize;
ui16 MaxLocalQueueSize = 0;
ui16 MinLocalQueueSize = 0;

i16 Priority = 0;
NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
ui32 MaxAvgPingUs = 0;
ui64 LastUpdateTs = 0;
ui64 NotEnoughCpuExecutions = 0;
ui64 NewNotEnoughCpuExecutions = 0;
ui16 LocalQueueSize;

std::atomic<float> SharedCpuQuota = 0;
std::atomic<i64> LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish
Expand Down

0 comments on commit 2992756

Please sign in to comment.