Skip to content

Commit

Permalink
Make use of poller actor straight in http subsystem (ydb-platform#13316)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Jan 13, 2025
1 parent 315b99e commit c0dd9da
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 26 deletions.
3 changes: 2 additions & 1 deletion ydb/core/blobstorage/ut_vdisk/lib/astest.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/library/actors/core/executor_pool_basic.h>
#include <ydb/library/actors/core/executor_pool_io.h>
#include <ydb/library/actors/core/scheduler_basic.h>
#include <ydb/library/actors/interconnect/poller_actor.h>
#include <ydb/core/mon/mon.h>
#include <ydb/library/actors/interconnect/interconnect.h>
#include <ydb/library/actors/protos/services_common.pb.h>
Expand Down Expand Up @@ -64,7 +65,7 @@ inline void TTestWithActorSystem::Run(NActors::IActor *testActor) {
const TActorId nameserviceId = GetNameserviceActorId();
TActorSetupCmd nameserviceSetup(CreateNameserverTable(nameserverTable), TMailboxType::Simple, 0);
setup1->LocalServices.push_back(std::pair<TActorId, TActorSetupCmd>(nameserviceId, std::move(nameserviceSetup)));

setup1->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::Simple, 0));

///////////////////////// LOGGER ///////////////////////////////////////////////
NActors::TActorId loggerActorId = NActors::TActorId(1, "logger");
Expand Down
7 changes: 2 additions & 5 deletions ydb/library/actors/http/http_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,22 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
using TBase = NActors::TActorBootstrapped<THttpProxy>;

IActor* AddListeningPort(TEvHttpProxy::TEvAddListeningPort::TPtr& event) {
IActor* listeningSocket = CreateHttpAcceptorActor(SelfId(), Poller);
IActor* listeningSocket = CreateHttpAcceptorActor(SelfId());
TActorId acceptorId = Register(listeningSocket);
Send(event->Forward(acceptorId));
Acceptors.emplace_back(acceptorId);
return listeningSocket;
}

IActor* AddOutgoingConnection(bool secure) {
IActor* connectionSocket = CreateOutgoingConnectionActor(SelfId(), secure, Poller);
IActor* connectionSocket = CreateOutgoingConnectionActor(SelfId(), secure);
TActorId connectionId = Register(connectionSocket);
ALOG_DEBUG(HttpLog, "Connection created " << connectionId);
Connections.emplace(connectionId);
return connectionSocket;
}

void Bootstrap() {
Poller = Register(NActors::CreatePollerActor());
Become(&THttpProxy::StateWork);
}

Expand Down Expand Up @@ -54,7 +53,6 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
}

void PassAway() override {
Send(Poller, new NActors::TEvents::TEvPoisonPill());
for (const NActors::TActorId& connection : Connections) {
Send(connection, new NActors::TEvents::TEvPoisonPill());
}
Expand Down Expand Up @@ -273,7 +271,6 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
PassAway();
}

NActors::TActorId Poller;
TVector<TActorId> Acceptors;

struct THostEntry {
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/actors/http/http_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ struct TPrivateEndpointInfo : THttpEndpointInfo {
};

NActors::IActor* CreateHttpProxy(std::weak_ptr<NMonitoring::IMetricFactory> registry = NMonitoring::TMetricRegistry::SharedInstance());
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller);
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure, const TActorId& poller);
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner);
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure);
NActors::IActor* CreateIncomingConnectionActor(
std::shared_ptr<TPrivateEndpointInfo> endpoint,
TIntrusivePtr<TSocketDescriptor> socket,
Expand Down
12 changes: 5 additions & 7 deletions ydb/library/actors/http/http_proxy_acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@ class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfi
using TBase::Schedule;

const TActorId Owner;
const TActorId Poller;
TIntrusivePtr<TSocketDescriptor> Socket;
NActors::TPollerToken::TPtr PollerToken;
THashSet<TActorId> Connections;
TDeque<THttpIncomingRequestPtr> RecycledRequests;
ui32 MaxRecycledRequestsCount = 0;
std::shared_ptr<TPrivateEndpointInfo> Endpoint;

TAcceptorActor(const TActorId& owner, const TActorId& poller)
TAcceptorActor(const TActorId& owner)
: NActors::TActor<TAcceptorActor>(&TAcceptorActor::StateInit)
, Owner(owner)
, Poller(poller)
{
}

Expand Down Expand Up @@ -89,7 +87,7 @@ class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfi
if (err == 0) {
ALOG_INFO(HttpLog, "Listening on " << schema << bindAddress->ToString());
SetNonBlock(Socket->Socket);
Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId()));
Send(NActors::MakePollerActorId(), new NActors::TEvPollerRegister(Socket, SelfId(), SelfId()));
TBase::Become(&TAcceptorActor::StateListening);
Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress, Endpoint), 0, event->Cookie);
return;
Expand Down Expand Up @@ -138,7 +136,7 @@ class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfi
RecycledRequests.pop_front();
}
NActors::TActorId connectionId = Register(connectionSocket);
Send(Poller, new NActors::TEvPollerRegister(socket, connectionId, connectionId));
Send(NActors::MakePollerActorId(), new NActors::TEvPollerRegister(socket, connectionId, connectionId));
Connections.emplace(connectionId);
}
}
Expand All @@ -159,8 +157,8 @@ class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfi
}
};

NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller) {
return new TAcceptorActor(owner, poller);
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner) {
return new TAcceptorActor(owner);
}

}
12 changes: 5 additions & 7 deletions ydb/library/actors/http/http_proxy_outgoing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
using TBase::SelfId;

const TActorId Owner;
const TActorId Poller;
SocketAddressType Address;
TString Destination;
TActorId RequestOwner;
Expand All @@ -24,10 +23,9 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
bool AllowConnectionReuse = false;
NActors::TPollerToken::TPtr PollerToken;

TOutgoingConnectionActor(const TActorId& owner, const TActorId& poller)
TOutgoingConnectionActor(const TActorId& owner)
: TBase(&TSelf::StateWaiting)
, Owner(owner)
, Poller(poller)
{
}

Expand Down Expand Up @@ -173,7 +171,7 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
}

void RegisterPoller() {
Send(Poller, new NActors::TEvPollerRegister(TSocketImpl::Socket, SelfId(), SelfId()));
Send(NActors::MakePollerActorId(), new NActors::TEvPollerRegister(TSocketImpl::Socket, SelfId(), SelfId()));
}

void OnConnect() {
Expand Down Expand Up @@ -351,11 +349,11 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
}
};

NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure, const TActorId& poller) {
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure) {
if (secure) {
return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, poller);
return new TOutgoingConnectionActor<TSecureSocketImpl>(owner);
} else {
return new TOutgoingConnectionActor<TPlainSocketImpl>(owner, poller);
return new TOutgoingConnectionActor<TPlainSocketImpl>(owner);
}
}

Expand Down
6 changes: 2 additions & 4 deletions ydb/library/actors/testlib/test_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1799,10 +1799,8 @@ namespace NActors {

setup->Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, InterconnectPoolId(), &InterconnectMock);

if (UseRealInterconnect) {
setup->LocalServices.emplace_back(MakePollerActorId(), NActors::TActorSetupCmd(CreatePollerActor(),
NActors::TMailboxType::Simple, InterconnectPoolId()));
}
setup->LocalServices.emplace_back(MakePollerActorId(), NActors::TActorSetupCmd(CreatePollerActor(),
NActors::TMailboxType::Simple, InterconnectPoolId()));

if (!SingleSysEnv) { // Single system env should do this self
if (LogBackendFactory) {
Expand Down

0 comments on commit c0dd9da

Please sign in to comment.