Skip to content

Commit

Permalink
Add mailbox stats
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall committed Jul 25, 2024
1 parent a303aa8 commit 47c448c
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 23 deletions.
4 changes: 4 additions & 0 deletions ydb/library/actors/core/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ namespace NActors {
return NHPTimer::GetSeconds(GetCurrentEventTicks());
}

void TActivationContext::EnableMailboxStats() {
TlsActivationContext->Mailbox.EnableStats();
}

TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept {
return TlsActivationContext->ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfActorId);
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/actors/core/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "actorsystem.h"
#include "event.h"
#include "executor_thread.h"
#include "mailbox.h"
#include "monotonic.h"
#include "thread_context.h"

Expand Down Expand Up @@ -130,6 +131,8 @@ namespace NActors {

static i64 GetCurrentEventTicks();
static double GetCurrentEventTicksAsSeconds();

static void EnableMailboxStats();
};

struct TActorContext: public TActivationContext {
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/actors/core/executor_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ namespace NActors {

Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(eventStart, hpnow, activityType, CurrentActorScheduledEventsCounter);
mailbox->AddElapsedCycles(elapsed);
if (elapsed > 1000000) {
LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0);
}
Expand Down Expand Up @@ -372,7 +373,7 @@ namespace NActors {
break; // empty queue, leave
}
}
TlsThreadContext->ActivationStartTS.store(GetCycleCountFast(), std::memory_order_release);
TlsThreadContext->ActivationStartTS.store(hpnow, std::memory_order_release);
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);

NProfiling::TMemoryTagScope::Reset(0);
Expand Down
24 changes: 21 additions & 3 deletions ydb/library/actors/core/mailbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,9 @@ namespace NActors {
CleanupActors();
}

bool TMailboxHeader::CleanupActors() {
bool TMailboxHeader::CleanupActors(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo) {
bool done = true;
switch (ActorPack) {
switch (actorPack) {
case TMailboxActorPack::Simple: {
if (ActorsInfo.Simple.ActorId != 0) {
delete ActorsInfo.Simple.Actor;
Expand All @@ -399,13 +399,31 @@ namespace NActors {
done = false;
break;
}
case TMailboxActorPack::Complex:
Y_ABORT("Unexpected ActorPack type");
}
ActorPack = TMailboxActorPack::Simple;
actorPack = TMailboxActorPack::Simple;
ActorsInfo.Simple.ActorId = 0;
ActorsInfo.Simple.Actor = nullptr;
return done;
}

bool TMailboxHeader::CleanupActors() {
if (ActorPack != TMailboxActorPack::Complex) {
TMailboxActorPack::EType pack = ActorPack;
bool done = CleanupActors(pack, ActorsInfo);
ActorPack = pack;
return done;
} else {
bool done = CleanupActors(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo);
delete ActorsInfo.Complex;
ActorPack = TMailboxActorPack::Simple;
ActorsInfo.Simple.ActorId = 0;
ActorsInfo.Simple.Actor = nullptr;
return done;
}
}

std::pair<ui32, ui32> TMailboxHeader::CountMailboxEvents(ui64 localActorId, ui32 maxTraverse) {
switch (Type) {
case TMailboxType::Simple:
Expand Down
119 changes: 100 additions & 19 deletions ydb/library/actors/core/mailbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "executor_pool.h"
#include "mailbox_queue_simple.h"
#include "mailbox_queue_revolving.h"
#include <functional>
#include <ydb/library/actors/util/unordered_cache.h>
#include <library/cpp/threading/queue/mpsc_htswap.h>
#include <library/cpp/threading/queue/mpsc_read_as_filled.h>
Expand All @@ -27,6 +28,10 @@ namespace NActors {

struct TMailboxHeader;

struct TMailboxStats {
ui64 ElapsedCycles = 0;
};

template<bool>
struct TMailboxUsageImpl {
void Push(ui64 /*localId*/) {}
Expand All @@ -53,7 +58,8 @@ namespace NActors {
enum EType {
Simple = 0,
Array = 1,
Map = 2
Map = 2,
Complex = 3,
};
};

Expand All @@ -79,7 +85,7 @@ namespace NActors {
volatile ui32 ExecutionState;
ui32 Reserved : 4; // never changes, always zero
ui32 Type : 4; // never changes
ui32 ActorPack : 2;
TMailboxActorPack::EType ActorPack : 2;
ui32 Knobs : 22;

struct TActorPair {
Expand All @@ -91,6 +97,8 @@ namespace NActors {
TActorPair Actors[ARRAY_CAPACITY];
};

struct alignas(64) TComplexActorInfo;

union TActorsInfo {
TActorPair Simple;
struct {
Expand All @@ -100,11 +108,19 @@ namespace NActors {
struct {
TActorMap* ActorsMap;
} Map;
TComplexActorInfo* Complex;
} ActorsInfo;

struct alignas(64) TComplexActorInfo{
TActorsInfo ActorsInfo;
TMailboxActorPack::EType ActorPack;
TMailboxStats Stats;
};

TMailboxHeader(TMailboxType::EType type);
~TMailboxHeader();

static bool CleanupActors(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo);
bool CleanupActors();

// this interface is used exclusively by executor thread, so implementation is there
Expand All @@ -119,12 +135,13 @@ namespace NActors {
bool UnlockAsFree(bool wouldReschedule); // preceed with releasing lock, but mark as free one

bool IsEmpty() const noexcept {
return (ActorPack == TMailboxActorPack::Simple && ActorsInfo.Simple.ActorId == 0);
return (ActorPack == TMailboxActorPack::Simple && ActorsInfo.Simple.ActorId == 0) ||
(ActorPack == TMailboxActorPack::Complex && ActorsInfo.Complex->ActorPack == TMailboxActorPack::Simple && ActorsInfo.Complex->ActorsInfo.Simple.ActorId == 0);
}

template<typename T>
void ForEach(T&& callback) noexcept {
switch (ActorPack) {
static void ForEach(TMailboxActorPack::EType actorPack, TActorsInfo &ActorsInfo, T&& callback) noexcept {
switch (actorPack) {
case TMailboxActorPack::Simple:
if (ActorsInfo.Simple.ActorId) {
callback(ActorsInfo.Simple.ActorId, ActorsInfo.Simple.Actor);
Expand All @@ -143,10 +160,22 @@ namespace NActors {
callback(row.ActorId, row.Actor);
}
break;

case TMailboxActorPack::Complex:
Y_ABORT("Unexpected ActorPack type");
}
}

IActor* FindActor(ui64 localActorId) noexcept {
template<typename T>
void ForEach(T&& callback) noexcept {
if (ActorPack != TMailboxActorPack::Complex) {
ForEach(static_cast<TMailboxActorPack::EType>(ActorPack), ActorsInfo, std::forward(callback));
} else {
ForEach(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, std::forward(callback));
}
}

static IActor* FindActor(TMailboxActorPack::EType ActorPack, TActorsInfo &ActorsInfo, ui64 localActorId) noexcept {
switch (ActorPack) {
case TMailboxActorPack::Simple: {
if (ActorsInfo.Simple.ActorId == localActorId)
Expand All @@ -167,14 +196,22 @@ namespace NActors {
}
break;
}
default:
Y_ABORT();
case TMailboxActorPack::Complex:
Y_ABORT("Unexpected ActorPack type");
}
return nullptr;
}

void AttachActor(ui64 localActorId, IActor* actor) noexcept {
switch (ActorPack) {
IActor* FindActor(ui64 localActorId) noexcept {
if (ActorPack != TMailboxActorPack::Complex) {
return FindActor(static_cast<TMailboxActorPack::EType>(ActorPack), ActorsInfo, localActorId);
} else {
return FindActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId);
}
}

static void AttachActor(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo, ui64 localActorId, IActor* actor) noexcept {
switch (actorPack) {
case TMailboxActorPack::Simple: {
if (ActorsInfo.Simple.ActorId == 0) {
ActorsInfo.Simple.ActorId = localActorId;
Expand All @@ -185,7 +222,7 @@ namespace NActors {
ar->Actors[0] = ActorsInfo.Simple;
ar->Actors[1] = TActorPair{actor, localActorId};
ActorsInfo.Array.ActorsCount = 2;
ActorPack = TMailboxActorPack::Array;
actorPack = TMailboxActorPack::Array;
ActorsInfo.Array.ActorsArray = ar;
}
break;
Expand All @@ -201,7 +238,7 @@ namespace NActors {
mp->emplace(ActorsInfo.Array.ActorsArray->Actors[i].ActorId, ActorsInfo.Array.ActorsArray->Actors[i].Actor);
}
mp->emplace(localActorId, actor);
ActorPack = TMailboxActorPack::Map;
actorPack = TMailboxActorPack::Map;
ActorsInfo.Array.ActorsCount = 0;
delete ActorsInfo.Array.ActorsArray;
ActorsInfo.Map.ActorsMap = mp;
Expand All @@ -210,17 +247,27 @@ namespace NActors {
}
break;
}
default:
Y_ABORT();
case TMailboxActorPack::Complex:
Y_ABORT("Unexpected ActorPack type");
}
}

IActor* DetachActor(ui64 localActorId) noexcept {
Y_DEBUG_ABORT_UNLESS(FindActor(localActorId) != nullptr);
void AttachActor(ui64 localActorId, IActor* actor) noexcept {
if (ActorPack != TMailboxActorPack::Complex) {
TMailboxActorPack::EType pack = ActorPack;
AttachActor(pack, ActorsInfo, localActorId, actor);
ActorPack = pack;
} else {
AttachActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId, actor);
}
}

static IActor* DetachActor(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo, ui64 localActorId) noexcept {
Y_DEBUG_ABORT_UNLESS(FindActor(actorPack, ActorsInfo, localActorId) != nullptr);

IActor* actorToDestruct = nullptr;

switch (ActorPack) {
switch (actorPack) {
case TMailboxActorPack::Simple: {
Y_ABORT_UNLESS(ActorsInfo.Simple.ActorId == localActorId);
actorToDestruct = ActorsInfo.Simple.Actor;
Expand All @@ -243,7 +290,7 @@ namespace NActors {
ar->Actors[i++] = TActorPair{actor, actorId};
}
delete ActorsInfo.Map.ActorsMap;
ActorPack = TMailboxActorPack::Array;
actorPack = TMailboxActorPack::Array;
ActorsInfo.Array.ActorsArray = ar;
ActorsInfo.Array.ActorsCount = ARRAY_CAPACITY;
}
Expand All @@ -265,16 +312,50 @@ namespace NActors {
if (ActorsInfo.Array.ActorsCount == 1) {
const TActorPair Actor = ActorsInfo.Array.ActorsArray->Actors[0];
delete ActorsInfo.Array.ActorsArray;
ActorPack = TMailboxActorPack::Simple;
actorPack = TMailboxActorPack::Simple;
ActorsInfo.Simple = Actor;
}
break;
}
case TMailboxActorPack::Complex:
Y_ABORT("Unexpected ActorPack type");
}

return actorToDestruct;
}

IActor* DetachActor(ui64 localActorId) noexcept {
if (ActorPack != TMailboxActorPack::Complex) {
TMailboxActorPack::EType pack = ActorPack;
IActor* result = DetachActor(pack, ActorsInfo, localActorId);
ActorPack = pack;
return result;
} else {
return DetachActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId);
}
}

void EnableStats() {
TComplexActorInfo* complex = new TComplexActorInfo;
complex->ActorPack = ActorPack;
complex->ActorsInfo = std::move(ActorsInfo);
ActorPack = TMailboxActorPack::Complex;
ActorsInfo.Complex = complex;
}

void AddElapsedCycles(ui64 elapsed) {
if (ActorPack == TMailboxActorPack::Complex) {
ActorsInfo.Complex->Stats.ElapsedCycles += elapsed;
}
}

std::optional<ui64> GetElapsedCycles() {
if (ActorPack == TMailboxActorPack::Complex) {
return ActorsInfo.Complex->Stats.ElapsedCycles;
}
return std::nullopt;
}

std::pair<ui32, ui32> CountMailboxEvents(ui64 localActorId, ui32 maxTraverse);
};

Expand Down

0 comments on commit 47c448c

Please sign in to comment.