Skip to content

Commit

Permalink
refactor thread ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksandr Kriukov committed Jan 9, 2024
1 parent eee3fb5 commit 3147be2
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 94 deletions.
52 changes: 14 additions & 38 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "executor_pool_basic.h"
#include "executor_pool_basic_feature_flags.h"
#include "actor.h"
#include "executor_thread_ctx.h"
#include "probes.h"
#include "mailbox.h"
#include <ydb/library/actors/util/affinity.h>
Expand Down Expand Up @@ -167,7 +168,7 @@ namespace NActors {
}

if (workerId >= 0) {
Threads[workerId].ExchangeState(TExecutorThreadCtx::WS_NONE);
Threads[workerId].ExchangeState(EThreadState::None);
}

TAtomic x = AtomicGet(Semaphore);
Expand All @@ -191,7 +192,7 @@ namespace NActors {
} else {
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
if (workerId >= 0) {
Threads[workerId].ExchangeState(TExecutorThreadCtx::WS_RUNNING);
Threads[workerId].ExchangeState(EThreadState::Work);
}
AtomicDecrement(Semaphore);
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
Expand Down Expand Up @@ -244,18 +245,18 @@ namespace NActors {
inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) {
for (i16 i = 0;;) {
TExecutorThreadCtx& threadCtx = Threads[i];
TExecutorThreadCtx::TWaitState state = threadCtx.GetState();
switch (state.Flag) {
case TExecutorThreadCtx::WS_NONE:
case TExecutorThreadCtx::WS_RUNNING:
EThreadState state = threadCtx.GetState<EThreadState>();
switch (state) {
case EThreadState::None:
case EThreadState::Work:
if (++i >= MaxThreadCount - SharedExecutorsCount) {
i = 0;
}
break;
case TExecutorThreadCtx::WS_ACTIVE:
case TExecutorThreadCtx::WS_BLOCKED:
if (threadCtx.ReplaceState(state, TExecutorThreadCtx::WS_NONE)) {
if (state.Flag == TExecutorThreadCtx::WS_BLOCKED) {
case EThreadState::Spin:
case EThreadState::Sleep:
if (threadCtx.ReplaceState<EThreadState>(state, EThreadState::None)) {
if (state == EThreadState::Sleep) {
ui64 beforeUnpark = GetCycleCountFast();
threadCtx.StartWakingTs = beforeUnpark;
if (TlsThreadContext && TlsThreadContext->WaitingStats) {
Expand Down Expand Up @@ -601,38 +602,13 @@ namespace NActors {
}

bool TExecutorThreadCtx::Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
TWaitState state = ExchangeState(WS_ACTIVE);
Y_ABORT_UNLESS(state.Flag == WS_NONE, "WaitingFlag# %d", int(state.Flag));
if (OwnerExecutorPool) {
// if (!OwnerExecutorPool->SetSleepOwnSharedThread()) {
// return false;
// }
// if (TBasicExecutorPool *pool = OtherExecutorPool; pool) {
// if (!pool->SetSleepBorrowedSharedThread()) {
// return false;
// }
//}
}
EThreadState state = ExchangeState<EThreadState>(EThreadState::Spin);
Y_ABORT_UNLESS(state == EThreadState::None, "WaitingFlag# %d", int(state));
if (spinThresholdCycles > 0) {
// spin configured period
Spin(spinThresholdCycles, stopFlag);
// then - sleep
state = GetState();
if (state.Flag == WS_ACTIVE) {
if (ReplaceState(state, WS_BLOCKED)) {
if (Sleep(stopFlag)) { // interrupted
return true;
}
} else {
NextPool = state.NextPool;
}
}
} else {
Block(stopFlag);
}

Y_DEBUG_ABORT_UNLESS(stopFlag->load() || GetState().Flag == WS_NONE);
return false;
return Sleep(stopFlag);
}

}
2 changes: 1 addition & 1 deletion ydb/library/actors/core/executor_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ namespace NActors {
std::vector<TExecutorPoolBaseMailboxed*> pools;
do {
if (NeedToReloadPools.load() == EState::NeedToReloadPools) {
otherPool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ThreadCtx->OtherExecutorPool.load());
// otherPool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ThreadCtx->OtherExecutorPool.load());
NeedToReloadPools = EState::Running;
}
bool wasWorking = true;
Expand Down
152 changes: 97 additions & 55 deletions ydb/library/actors/core/executor_thread_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,66 +11,43 @@ namespace NActors {
class TExecutorThread;
class TBasicExecutorPool;

struct TExecutorThreadCtx {
enum EWaitState : ui64 {
WS_NONE,
WS_ACTIVE,
WS_BLOCKED,
WS_RUNNING
};

struct TWaitState {
EWaitState Flag = WS_NONE;
ui32 NextPool = Max<ui32>();

TWaitState() = default;

explicit TWaitState(ui64 state)
: Flag(static_cast<EWaitState>(state & 0x7))
, NextPool(state >> 3)
{}

explicit TWaitState(EWaitState flag, ui32 nextPool = Max<ui32>())
: Flag(flag)
, NextPool(nextPool)
{}

explicit operator ui64() {
return Flag | ui64(NextPool << 3);
}
};
enum class EThreadState : ui64 {
None,
Spin,
Sleep,
Work
};

struct TGenericExecutorThreadCtx {
TAutoPtr<TExecutorThread> Thread;
TThreadParkPad WaitingPad;

private:
std::atomic<ui64> WaitingFlag = WS_NONE;
std::atomic<ui64> WaitingFlag = static_cast<ui64>(EThreadState::None);

public:
TBasicExecutorPool *OwnerExecutorPool = nullptr;
std::atomic<TBasicExecutorPool*> OtherExecutorPool = nullptr;
ui64 StartWakingTs = 0;
ui32 NextPool = 0;
bool IsShared;

// different threads must spin/block on different cache-lines.
// we add some padding bytes to enforce this rule;

template <typename TWaitState>
TWaitState GetState() {
return TWaitState(WaitingFlag.load());
}

TWaitState ExchangeState(EWaitState flag, ui32 nextPool = Max<ui32>()) {
return TWaitState(WaitingFlag.exchange(static_cast<ui64>(TWaitState(flag, nextPool))));
template <typename TWaitState>
TWaitState ExchangeState(TWaitState state) {
return TWaitState(WaitingFlag.exchange(static_cast<ui64>(state)));
}

bool ReplaceState(TWaitState &expected, EWaitState flag, ui32 nextPool = Max<ui32>()) {
template <typename TWaitState>
bool ReplaceState(TWaitState &expected, TWaitState state) {
ui64 expectedInt = static_cast<ui64>(expected);
bool result = WaitingFlag.compare_exchange_strong(expectedInt, static_cast<ui64>(TWaitState(flag, nextPool)));
bool result = WaitingFlag.compare_exchange_strong(expectedInt, static_cast<ui64>(state));
expected = TWaitState(expectedInt);
return result;
}

protected:
template <typename TDerived, typename TWaitState>
void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
ui64 start = GetCycleCountFast();
bool doSpin = true;
Expand All @@ -81,11 +58,11 @@ namespace NActors {
break;
}
for (ui32 i = 0; i < 12; ++i) {
TWaitState state = GetState();
if (state.Flag == WS_ACTIVE) {
TWaitState state = GetState<TWaitState>();
if (static_cast<EThreadState>(state) == EThreadState::Spin) {
SpinLockPause();
} else {
NextPool = state.NextPool;
static_cast<TDerived*>(this)->AfterWakeUp(state);
doSpin = false;
break;
}
Expand All @@ -100,36 +77,101 @@ namespace NActors {
}
}

template <typename TDerived, typename TWaitState>
bool Sleep(std::atomic<bool> *stopFlag) {
Y_DEBUG_ABORT_UNLESS(TlsThreadContext);

TWaitState state;
TWaitState state = TWaitState{EThreadState::Spin};
if (!ReplaceState<TWaitState>(state, TWaitState{EThreadState::Sleep})) {
static_cast<TDerived*>(this)->AfterWakeUp(state);
return false;
}

do {
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart;
if (WaitingPad.Park()) // interrupted
return true;
TlsThreadContext->Timers.HPStart = GetCycleCountFast();
TlsThreadContext->Timers.Parked += TlsThreadContext->Timers.HPStart - TlsThreadContext->Timers.HPNow;
state = GetState();
} while (state.Flag == WS_BLOCKED && !stopFlag->load(std::memory_order_relaxed));
NextPool = state.NextPool;
state = GetState<TWaitState>();
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));

static_cast<TDerived*>(this)->AfterWakeUp(state);
return false;
}
};

struct TExecutorThreadCtx : public TGenericExecutorThreadCtx {
using TBase = TGenericExecutorThreadCtx;

TBasicExecutorPool *OwnerExecutorPool = nullptr;

void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
this->TBase::Spin<TExecutorThreadCtx, EThreadState>(spinThresholdCycles, stopFlag);
}

bool Sleep(std::atomic<bool> *stopFlag) {
return this->TBase::Sleep<TExecutorThreadCtx, EThreadState>(stopFlag);
}

bool Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag); // in executor_pool_basic.cpp

bool Block(std::atomic<bool> *stopFlag) {
TWaitState state{WS_ACTIVE};
if (ReplaceState(state, WS_BLOCKED)) {
Y_ABORT_UNLESS(state.Flag == WS_ACTIVE, "WaitingFlag# %d", int(state.Flag));
return Sleep(stopFlag);
} else {
return false;
}
void AfterWakeUp(EThreadState /*state*/) {
}

TExecutorThreadCtx() = default;
};


constexpr ui32 MaxPoolsForSharedThreads = 4;

struct TSharedExecutorThreadCtx : public TGenericExecutorThreadCtx {
using TBase = TGenericExecutorThreadCtx;

struct TWaitState {
EThreadState Flag = EThreadState::None;
ui32 NextPool = Max<ui32>();

TWaitState() = default;

TWaitState(ui64 state)
: Flag(static_cast<EThreadState>(state & 0x7))
, NextPool(state >> 3)
{}

TWaitState(EThreadState flag, ui32 nextPool = Max<ui32>())
: Flag(flag)
, NextPool(nextPool)
{}

explicit operator ui64() {
return static_cast<ui64>(Flag) | ui64(NextPool << 3);
}

explicit operator EThreadState() {
return Flag;
}
};

std::atomic<TBasicExecutorPool*> ExecutorPools[MaxPoolsForSharedThreads];
ui32 NextPool = 0;

void AfterWakeUp(TWaitState state) {
NextPool = state.NextPool;
}

void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
this->TBase::Spin<TSharedExecutorThreadCtx, TWaitState>(spinThresholdCycles, stopFlag);
}

bool Sleep(std::atomic<bool> *stopFlag) {
return this->TBase::Sleep<TSharedExecutorThreadCtx, TWaitState>(stopFlag);
}

bool Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag); // in executor_pool_basic.cpp

TSharedExecutorThreadCtx() = default;
};

}

0 comments on commit 3147be2

Please sign in to comment.