Skip to content

Commit

Permalink
Support lightweight alias actor ids for actors (#12731)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Dec 19, 2024
1 parent 5244b5d commit a51b52c
Show file tree
Hide file tree
Showing 17 changed files with 411 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ Y_UNIT_TEST_SUITE(PartitionEndWatcher) {
Y_ABORT("Unexpected");
}

TActorId RegisterAlias() noexcept {
Y_ABORT("Unexpected");
}

void UnregisterAlias(const TActorId&) noexcept {
Y_ABORT("Unexpected");
}

~MockActorOps() {
for (auto [_, ptr] : Events) {
std::unique_ptr<IEventBase> p;
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/util/actorsys_test/testactorsys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ class TTestExecutorPool : public IExecutorPool {
return Context->Register(actor, parentId, PoolId, mailbox->Hint, NodeId);
}

TActorId RegisterAlias(TMailbox* mailbox, IActor* actor) override {
return Context->RegisterAlias(mailbox->Hint, actor, PoolId, NodeId);
}

void UnregisterAlias(TMailbox* mailbox, const TActorId& actorId) override {
return Context->UnregisterAlias(mailbox->Hint, actorId, PoolId, NodeId);
}

void Prepare(TActorSystem* /*actorSystem*/, NSchedulerQueue::TReader** /*scheduleReaders*/, ui32* /*scheduleSz*/) override {
}

Expand Down
45 changes: 41 additions & 4 deletions ydb/core/util/actorsys_test/testactorsys.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,11 @@ class TTestActorSystem {
if (mailbox) {
*mailbox = &mbox;
}
return mbox.FindActor(actorId.LocalId());
IActor *actor = mbox.FindActor(actorId.LocalId());
if (!actor) {
actor = mbox.FindAlias(actorId.LocalId());
}
return actor;
} else {
return nullptr;
}
Expand Down Expand Up @@ -518,6 +522,24 @@ class TTestActorSystem {
return Register(actor, {}, poolId, {}, nodeId);
}

TActorId RegisterAlias(ui32 mboxId, IActor* actor, ui32 poolId, ui32 nodeId) {
auto it = Mailboxes.find(TMailboxId(nodeId, poolId, mboxId));
Y_ABORT_UNLESS(it != Mailboxes.end());
TMailboxInfo& mbox = it->second;
mbox.AttachAlias(ActorLocalId, actor);

const TActorId actorId(nodeId, poolId, ActorLocalId, mboxId);
++ActorLocalId;
return actorId;
}

void UnregisterAlias(ui32 mboxId, const TActorId &actorId, ui32 poolId, ui32 nodeId) {
auto it = Mailboxes.find(TMailboxId(nodeId, poolId, mboxId));
Y_ABORT_UNLESS(it != Mailboxes.end());
TMailboxInfo& mbox = it->second;
mbox.DetachAlias(actorId.LocalId());
}

void RegisterService(const TActorId& serviceId, const TActorId& actorId) {
const ui32 nodeId = actorId.NodeId(); // only at the node with the actor
GetNode(nodeId)->ActorSystem->RegisterLocalService(serviceId, actorId);
Expand Down Expand Up @@ -566,9 +588,13 @@ class TTestActorSystem {
if (FilterFunction && !FilterFunction(item->NodeId, event)) { // event is dropped by the filter function
continue;
}
const bool success = WrapInActorContext(TransformEvent(event.get(), item->NodeId), [&](IActor *actor) {
const bool success = WrapInActorContext(TransformEvent(event.get(), item->NodeId), [&](IActor *actor, bool alias) {
TAutoPtr<IEventHandle> ev(event.release());

if (alias) {
ev->Rewrite(ev->GetTypeRewrite(), actor->SelfId());
}

const ui32 type = ev->GetTypeRewrite();

THPTimer timer;
Expand Down Expand Up @@ -598,7 +624,16 @@ class TTestActorSystem {
return false;
}
TMailboxInfo& mbox = mboxIt->second;
if (IActor *actor = mbox.FindActor(actorId.LocalId())) {
IActor *actor = mbox.FindActor(actorId.LocalId());
bool alias = false;
if (!actor) {
actor = mbox.FindAlias(actorId.LocalId());
if (actor) {
actorId = actor->SelfId();
alias = true;
}
}
if (actor) {
// obtain node info for this actor
TPerNodeInfo *info = GetNode(actorId.NodeId());

Expand All @@ -613,7 +648,9 @@ class TTestActorSystem {

// invoke the callback
try {
if constexpr (std::is_invocable_v<TCallback, IActor*>) {
if constexpr (std::is_invocable_v<TCallback, IActor*, bool>) {
std::invoke(std::forward<TCallback>(callback), actor, alias);
} else if constexpr (std::is_invocable_v<TCallback, IActor*>) {
std::invoke(std::forward<TCallback>(callback), actor);
} else {
std::invoke(std::forward<TCallback>(callback));
Expand Down
16 changes: 16 additions & 0 deletions ydb/library/actors/core/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ namespace NActors {
return TlsActivationContext->ExecutorThread.RegisterActor(actor, &TlsActivationContext->Mailbox, SelfActorId);
}

TActorId IActor::RegisterAlias() noexcept {
return TlsActivationContext->ExecutorThread.RegisterAlias(&TlsActivationContext->Mailbox, this);
}

void IActor::UnregisterAlias(const TActorId& actorId) noexcept {
return TlsActivationContext->ExecutorThread.UnregisterAlias(&TlsActivationContext->Mailbox, actorId);
}

TActorId TActivationContext::InterconnectProxy(ui32 destinationNodeId) {
return TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(destinationNodeId);
}
Expand Down Expand Up @@ -341,6 +349,14 @@ namespace NActors {
}
}

TActorId TGenericExecutorThread::RegisterAlias(TMailbox* mailbox, IActor* actor) {
return Ctx.Executor->RegisterAlias(mailbox, actor);
}

void TGenericExecutorThread::UnregisterAlias(TMailbox* mailbox, const TActorId& actorId) {
Ctx.Executor->UnregisterAlias(mailbox, actorId);
}

template bool TActivationContext::Send<ESendingType::Common>(TAutoPtr<IEventHandle> ev);
template bool TActivationContext::Send<ESendingType::Lazy>(TAutoPtr<IEventHandle> ev);
template bool TActivationContext::Send<ESendingType::Tail>(TAutoPtr<IEventHandle> ev);
Expand Down
13 changes: 13 additions & 0 deletions ydb/library/actors/core/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <util/system/tls.h>
#include <util/generic/noncopyable.h>

#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h>

namespace NActors {
class TActorSystem;
class TMailboxTable;
Expand Down Expand Up @@ -270,6 +272,9 @@ namespace NActors {

virtual TActorId Register(IActor*, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept = 0;
virtual TActorId RegisterWithSameMailbox(IActor*) const noexcept = 0;

virtual TActorId RegisterAlias() noexcept = 0;
virtual void UnregisterAlias(const TActorId& actorId) noexcept = 0;
};

class TDecorator;
Expand Down Expand Up @@ -353,6 +358,11 @@ namespace NActors {
friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&);
friend class TDecorator;

private:
// actor aliases
absl::flat_hash_set<ui64> Aliases;
friend class TMailbox;

private: // stuck actor monitoring
TMonotonic LastReceiveTimestamp;
size_t StuckIndex = Max<size_t>();
Expand Down Expand Up @@ -600,6 +610,9 @@ namespace NActors {
// some memory.
TActorId RegisterWithSameMailbox(IActor* actor) const noexcept final;

TActorId RegisterAlias() noexcept final;
void UnregisterAlias(const TActorId& actorId) noexcept final;

std::pair<ui32, ui32> CountMailboxEvents(ui32 maxTraverse = Max<ui32>()) const;

private:
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/actors/core/executor_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ namespace NActors {
virtual TActorId Register(IActor* actor, TMailboxCache& cache, ui64 revolvingCounter, const TActorId& parentId) = 0;
virtual TActorId Register(IActor* actor, TMailbox* mailbox, const TActorId& parentId) = 0;

virtual TActorId RegisterAlias(TMailbox* mailbox, IActor* actor) = 0;
virtual void UnregisterAlias(TMailbox* mailbox, const TActorId& actorId) = 0;

virtual void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const {
// TODO: make pure virtual and override everywhere
Y_UNUSED(poolStats);
Expand Down
19 changes: 19 additions & 0 deletions ydb/library/actors/core/executor_pool_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,25 @@ namespace NActors {
return actorId;
}

TActorId TExecutorPoolBaseMailboxed::RegisterAlias(TMailbox* mailbox, IActor* actor) {
Y_ABORT_UNLESS(!mailbox->IsEmpty(),
"RegisterAlias called on an empty mailbox");

Y_DEBUG_ABORT_UNLESS(mailbox->FindActor(actor->SelfId().LocalId()) == actor,
"RegisterAlias called for an actor that is not register in the mailbox");

const ui64 localActorId = AllocateID();
mailbox->AttachAlias(localActorId, actor);
return TActorId(ActorSystem->NodeId, PoolId, localActorId, mailbox->Hint);
}

void TExecutorPoolBaseMailboxed::UnregisterAlias(TMailbox* mailbox, const TActorId& actorId) {
Y_DEBUG_ABORT_UNLESS(actorId.Hint() == mailbox->Hint);
Y_DEBUG_ABORT_UNLESS(actorId.PoolID() == PoolId);
Y_DEBUG_ABORT_UNLESS(actorId.NodeId() == ActorSystem->NodeId);
mailbox->DetachAlias(actorId.LocalId());
}

TAffinity* TExecutorPoolBase::Affinity() const {
return ThreadsAffinity.Get();
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/actors/core/executor_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ namespace NActors {
TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) override;
TActorId Register(IActor* actor, TMailboxCache& cache, ui64 revolvingWriteCounter, const TActorId& parentId) override;
TActorId Register(IActor* actor, TMailbox* mailbox, const TActorId& parentId) override;
TActorId RegisterAlias(TMailbox* mailbox, IActor* actor) override;
void UnregisterAlias(TMailbox* mailbox, const TActorId& actorId) override;
bool Cleanup() override;
};

Expand Down
11 changes: 10 additions & 1 deletion ydb/library/actors/core/executor_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,20 @@ namespace NActors {
for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
recipient = evExt->GetRecipientRewrite();
actor = mailbox->FindActor(recipient.LocalId());
if (!actor) {
actor = mailbox->FindAlias(recipient.LocalId());
if (actor) {
// Work as if some alias actor rewrites events and delivers them to the real actor id
evExt->Rewrite(evExt->GetTypeRewrite(), actor->SelfId());
recipient = evExt->GetRecipientRewrite();
}
}
TActorContext ctx(*mailbox, *this, eventStart, recipient);
TlsActivationContext = &ctx; // ensure dtor (if any) is called within actor system
// move for destruct before ctx;
auto ev = std::move(evExt);
if (actor = mailbox->FindActor(recipient.LocalId())) {
if (actor) {
wasWorking = true;
// Since actor is not null there should be no exceptions
actorType = &typeid(*actor);
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/actors/core/executor_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ namespace NActors {
void DropUnregistered();
const std::vector<THolder<IActor>>& GetUnregistered() const { return DyingActors; }

TActorId RegisterAlias(TMailbox* mailbox, IActor* actor);
void UnregisterAlias(TMailbox* mailbox, const TActorId& actorId);

void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ Y_UNIT_TEST_SUITE(HarmonizerTests) {
TActorId Register(IActor* /*actor*/, TMailboxType::EType /*mailboxType*/, ui64 /*revolvingCounter*/, const TActorId& /*parentId*/) override { return TActorId(); }
TActorId Register(IActor* /*actor*/, TMailboxCache& /*cache*/, ui64 /*revolvingCounter*/, const TActorId& /*parentId*/) override { return TActorId(); }
TActorId Register(IActor* /*actor*/, TMailbox* /*mailbox*/, const TActorId& /*parentId*/) override { return TActorId(); }
TActorId RegisterAlias(TMailbox*, IActor*) override { return TActorId(); }
void UnregisterAlias(TMailbox*, const TActorId&) override {}

TAffinity* Affinity() const override { return nullptr; }

Expand Down
Loading

0 comments on commit a51b52c

Please sign in to comment.