Skip to content

Commit

Permalink
Kqprun fixed fault and logs printing for multi node usage (ydb-platfo…
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jul 4, 2024
1 parent fb86fc9 commit e686036
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
9 changes: 9 additions & 0 deletions ydb/library/actors/testlib/test_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,12 @@ namespace NActors {
LogBackend = logBackend;
}

void TTestActorRuntimeBase::SetLogBackendFactory(std::function<TAutoPtr<TLogBackend>()> logBackendFactory) {
Y_ABORT_UNLESS(!IsInitialized);
TGuard<TMutex> guard(Mutex);
LogBackendFactory = logBackendFactory;
}

void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) {
TGuard<TMutex> guard(Mutex);
for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
Expand Down Expand Up @@ -1761,6 +1767,9 @@ namespace NActors {
}

if (!SingleSysEnv) { // Single system env should do this self
if (LogBackendFactory) {
LogBackend = LogBackendFactory();
}
TAutoPtr<TLogBackend> logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend();
NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings,
logBackend, GetCountersForComponent(node->DynamicCounters, "utils"));
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/actors/testlib/test_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ namespace NActors {
}
TDuration SetReschedulingDelay(TDuration delay);
void SetLogBackend(const TAutoPtr<TLogBackend> logBackend);
void SetLogBackendFactory(std::function<TAutoPtr<TLogBackend>()> logBackendFactory);
void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority);
TIntrusivePtr<ITimeProvider> GetTimeProvider();
TIntrusivePtr<IMonotonicTimeProvider> GetMonotonicTimeProvider();
Expand Down Expand Up @@ -654,6 +655,7 @@ namespace NActors {
ui64 DispatcherRandomSeed;
TIntrusivePtr<IRandomProvider> DispatcherRandomProvider;
TAutoPtr<TLogBackend> LogBackend;
std::function<TAutoPtr<TLogBackend>()> LogBackendFactory;
bool NeedMonitoring;
ui16 MonitoringPortOffset = 0;
bool MonitoringTypeAsync = false;
Expand Down
15 changes: 9 additions & 6 deletions ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ class TYdbSetup::TImpl {

runtime.SetLogPriority(service, NActors::NLog::EPriority(setting.GetLevel()));
}

runtime.SetLogBackendFactory([this]() { return CreateLogBackend(); });
};

serverSettings.SetLoggerInitializer(loggerInitializer);
serverSettings.SetLogBackend(CreateLogBackend());
}

void SetFunctionRegistry(NKikimr::Tests::TServerSettings& serverSettings) const {
Expand Down Expand Up @@ -244,12 +245,13 @@ class TYdbSetup::TImpl {
NKikimr::NKqp::TEvFetchScriptResultsResponse::TPtr FetchScriptExecutionResultsRequest(const TString& operation, i32 resultSetId) const {
TString executionId = *NKikimr::NKqp::ScriptExecutionIdFromOperation(operation);

NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor();
ui32 nodeIndex = RandomNumber(Settings_.NodeCount);
NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor(nodeIndex);
auto rowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit();
auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit();
NActors::IActor* fetchActor = NKikimr::NKqp::CreateGetScriptExecutionResultActor(edgeActor, Settings_.DomainName, executionId, resultSetId, 0, rowsLimit, sizeLimit, TInstant::Max());

GetRuntime()->Register(fetchActor, RandomNumber(Settings_.NodeCount));
GetRuntime()->Register(fetchActor, nodeIndex);

return GetRuntime()->GrabEdgeEvent<NKikimr::NKqp::TEvFetchScriptResultsResponse>(edgeActor);
}
Expand Down Expand Up @@ -303,10 +305,11 @@ class TYdbSetup::TImpl {

template <typename TRequest, typename TResponse>
typename TResponse::TPtr RunKqpProxyRequest(THolder<TRequest> event) const {
NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor();
NActors::TActorId kqpProxy = NKikimr::NKqp::MakeKqpProxyID(GetRuntime()->GetNodeId(RandomNumber(Settings_.NodeCount)));
ui32 nodeIndex = RandomNumber(Settings_.NodeCount);
NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor(nodeIndex);
NActors::TActorId kqpProxy = NKikimr::NKqp::MakeKqpProxyID(GetRuntime()->GetNodeId(nodeIndex));

GetRuntime()->Send(kqpProxy, edgeActor, event.Release());
GetRuntime()->Send(kqpProxy, edgeActor, event.Release(), nodeIndex);

return GetRuntime()->GrabEdgeEvent<TResponse>(edgeActor);
}
Expand Down

0 comments on commit e686036

Please sign in to comment.