Skip to content

Commit

Permalink
Fixes dqrun with enabled row dispatcher (backport #12068) (#12237)
Browse files Browse the repository at this point in the history
  • Loading branch information
yumkam authored Dec 3, 2024
1 parent 036a7ce commit 44bba96
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 17 deletions.
3 changes: 2 additions & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ void Init(
credentialsFactory,
tenant,
yqCounters->GetSubgroup("subsystem", "row_dispatcher"),
CreatePqNativeGateway(pqServices));
CreatePqNativeGateway(pqServices),
appData->Mon);
actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release());
}

Expand Down
26 changes: 15 additions & 11 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include <ydb/library/yql/providers/dq/counters/counters.h>
#include <ydb/library/yql/public/purecalc/common/interface.h>

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/fq/libs/actors/logging/log.h>
#include <ydb/core/fq/libs/events/events.h>
#include <ydb/core/mon/mon.h>
Expand Down Expand Up @@ -261,6 +260,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
const ::NMonitoring::TDynamicCounterPtr Counters;
TRowDispatcherMetrics Metrics;
NYql::IPqGateway::TPtr PqGateway;
NActors::TMon* Monitoring;
TNodesTracker NodesTracker;
TAggregatedStats AggrStats;

Expand Down Expand Up @@ -329,7 +329,8 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring = nullptr);

void Bootstrap();

Expand Down Expand Up @@ -401,7 +402,8 @@ TRowDispatcher::TRowDispatcher(
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway)
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring)
: Config(config)
, CredentialsProviderFactory(credentialsProviderFactory)
, PureCalcProgramFactory(CreatePureCalcProgramFactory())
Expand All @@ -412,7 +414,9 @@ TRowDispatcher::TRowDispatcher(
, ActorFactory(actorFactory)
, Counters(counters)
, Metrics(counters)
, PqGateway(pqGateway) {
, PqGateway(pqGateway)
, Monitoring(monitoring)
{
}

void TRowDispatcher::Bootstrap() {
Expand All @@ -425,11 +429,9 @@ void TRowDispatcher::Bootstrap() {
Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing());
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());

NActors::TMon* mon = NKikimr::AppData()->Mon;
if (mon) {
::NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors");
mon->RegisterActorPage(actorsMonPage, "row_dispatcher", "Row Dispatcher", false,
if (Monitoring) {
::NMonitoring::TIndexMonPage* actorsMonPage = Monitoring->RegisterIndexPage("actors", "Actors");
Monitoring->RegisterActorPage(actorsMonPage, "row_dispatcher", "Row Dispatcher", false,
TlsActivationContext->ExecutorThread.ActorSystem, SelfId());
}
NodesTracker.Init(SelfId());
Expand Down Expand Up @@ -964,7 +966,8 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway)
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring)
{
return std::unique_ptr<NActors::IActor>(new TRowDispatcher(
config,
Expand All @@ -974,7 +977,8 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
tenant,
actorFactory,
counters,
pqGateway));
pqGateway,
monitoring));
}

} // namespace NFq
7 changes: 6 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/row_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

#include <memory>

namespace NActors {
class TMon;
}

namespace NFq {

std::unique_ptr<NActors::IActor> NewRowDispatcher(
Expand All @@ -23,6 +27,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring = nullptr);

} // namespace NFq
6 changes: 4 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& tenant,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway)
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring)
{
return NewRowDispatcher(
config,
Expand All @@ -26,7 +27,8 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
tenant,
NFq::NRowDispatcher::CreateActorFactory(),
counters,
pqGateway);
pqGateway,
monitoring);
}

} // namespace NFq
7 changes: 6 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

#include <memory>

namespace NActors {
class TMon;
}

namespace NFq {

std::unique_ptr<NActors::IActor> NewRowDispatcherService(
Expand All @@ -22,6 +26,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& tenant,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring = nullptr);

} // namespace NFq
1 change: 0 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ SRCS(
PEERDIR(
contrib/libs/fmt
contrib/libs/simdjson
ydb/core/base
ydb/core/fq/libs/actors/logging
ydb/core/fq/libs/config/protos
ydb/core/fq/libs/control_plane_storage
Expand Down

0 comments on commit 44bba96

Please sign in to comment.