Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQL-17542 move allocator ownership from TDqTaskRunner to actors #1335

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ void Init(
if (!mkqlAllocSize) {
mkqlAllocSize = 30_MB;
}
Y_ABORT_UNLESS(appData->FunctionRegistry);
NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.Counters = workerManagerCounters;
lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, nullptr, false);
Expand All @@ -257,8 +258,9 @@ void Init(
lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit();
lwmOptions.MkqlMinAllocSize = mkqlAllocSize;
lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
[=](const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) {
return lwmOptions.Factory->Get(task, statsMode);
*appData->FunctionRegistry,
[=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) {
return lwmOptions.Factory->Get(alloc, task, statsMode);
});
if (protoConfig.GetRateLimiter().GetDataPlaneEnabled()) {
lwmOptions.QuoterServiceActorId = NFq::YqQuoterServiceActorId();
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ void TKqpComputeActor::DoBootstrap() {
execCtx.ComputeCtx = &ComputeCtx;
execCtx.ComputationFactory = NMiniKQL::GetKqpActorComputeFactory(&ComputeCtx);
execCtx.ApplyCtx = nullptr;
execCtx.Alloc = nullptr;
execCtx.TypeEnv = nullptr;
execCtx.PatternCache = GetKqpResourceManager()->GetPatternCache();

Expand All @@ -68,7 +67,7 @@ void TKqpComputeActor::DoBootstrap() {
settings.ReadRanges.push_back(readRange);
}

auto taskRunner = MakeDqTaskRunner(execCtx, settings, logger);
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocator(), execCtx, settings, logger);
SetTaskRunner(taskRunner);

auto wakeup = [this]{ ContinueExecute(); };
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ void TKqpScanComputeActor::DoBootstrap() {
execCtx.RandomProvider = TAppData::RandomProvider.Get();
execCtx.TimeProvider = TAppData::TimeProvider.Get();
execCtx.ApplyCtx = nullptr;
execCtx.Alloc = nullptr;
execCtx.TypeEnv = nullptr;
execCtx.PatternCache = GetKqpResourceManager()->GetPatternCache();

Expand Down Expand Up @@ -219,7 +218,7 @@ void TKqpScanComputeActor::DoBootstrap() {
};
}

auto taskRunner = MakeDqTaskRunner(execCtx, settings, logger);
auto taskRunner = MakeDqTaskRunner(GetAllocator(), execCtx, settings, logger);
TBase::SetTaskRunner(taskRunner);

auto wakeup = [this] { ContinueExecute(); };
Expand Down
13 changes: 6 additions & 7 deletions ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ using namespace NYql::NDq;

namespace {

std::unique_ptr<TDqTaskRunnerContext> CreateTaskRunnerContext(NMiniKQL::TKqpComputeContextBase* computeCtx, NMiniKQL::TScopedAlloc* alloc,
std::unique_ptr<TDqTaskRunnerContext> CreateTaskRunnerContext(NMiniKQL::TKqpComputeContextBase* computeCtx,
NMiniKQL::TTypeEnvironment* typeEnv)
{
std::unique_ptr<TDqTaskRunnerContext> context = std::make_unique<TDqTaskRunnerContext>();
Expand All @@ -44,7 +44,6 @@ std::unique_ptr<TDqTaskRunnerContext> CreateTaskRunnerContext(NMiniKQL::TKqpComp
return nullptr;
};

context->Alloc = alloc;
context->TypeEnv = typeEnv;
context->ApplyCtx = nullptr;
return context;
Expand Down Expand Up @@ -167,12 +166,12 @@ class TKqpLiteralExecuter {

// task runner settings
ComputeCtx = std::make_unique<NMiniKQL::TKqpComputeContextBase>();
RunnerContext = CreateTaskRunnerContext(ComputeCtx.get(), &Request.TxAlloc->Alloc, &Request.TxAlloc->TypeEnv);
RunnerContext = CreateTaskRunnerContext(ComputeCtx.get(), &Request.TxAlloc->TypeEnv);
RunnerContext->PatternCache = GetKqpResourceManager()->GetPatternCache();
TDqTaskRunnerSettings settings = CreateTaskRunnerSettings(Request.StatsMode);

for (auto& task : TasksGraph.GetTasks()) {
RunTask(task, *RunnerContext, settings);
RunTask(Request.TxAlloc->Alloc, task, *RunnerContext, settings);

if (TerminateIfTimeout()) {
return;
Expand All @@ -183,7 +182,7 @@ class TKqpLiteralExecuter {
UpdateCounters();
}

void RunTask(TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
void RunTask(NMiniKQL::TScopedAlloc& alloc, TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);

Expand Down Expand Up @@ -218,7 +217,7 @@ class TKqpLiteralExecuter {
<< message);
};

auto taskRunner = MakeDqTaskRunner(context, settings, log);
auto taskRunner = MakeDqTaskRunner(alloc, context, settings, log);
TaskRunners.emplace_back(taskRunner);

auto taskSettings = NDq::TDqTaskSettings(&protoTask);
Expand All @@ -228,7 +227,7 @@ class TKqpLiteralExecuter {
auto status = taskRunner->Run();
YQL_ENSURE(status == ERunStatus::Finished);

with_lock (*context.Alloc) { // allocator is used only by outputChannel->PopAll()
with_lock (alloc) { // allocator is used only by outputChannel->PopAll()
for (auto& taskOutput : task.Outputs) {
for (ui64 outputChannelId : taskOutput.Channels) {
auto outputChannel = taskRunner->GetOutputChannel(outputChannelId);
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/kqp/runtime/kqp_tasks_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp


TKqpTasksRunner::TKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
NKikimr::NMiniKQL::TScopedAlloc& alloc,
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
: LogFunc(logFunc)
, Alloc(execCtx.Alloc)
, Alloc(alloc)
{
YQL_ENSURE(execCtx.Alloc);
YQL_ENSURE(execCtx.TypeEnv);

ApplyCtx = dynamic_cast<NMiniKQL::TKqpDatashardApplyContext *>(execCtx.ApplyCtx);
Expand All @@ -86,7 +86,7 @@ TKqpTasksRunner::TKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TD
try {
for (auto&& task : tasks) {
ui64 taskId = task.GetId();
auto runner = MakeDqTaskRunner(execCtx, settings, logFunc);
auto runner = MakeDqTaskRunner(alloc, execCtx, settings, logFunc);
if (auto* stats = runner->GetStats()) {
Stats.emplace(taskId, stats);
}
Expand Down Expand Up @@ -230,15 +230,16 @@ const NYql::NDq::TDqTaskSettings& TKqpTasksRunner::GetTask(ui64 taskId) const {

TGuard<NMiniKQL::TScopedAlloc> TKqpTasksRunner::BindAllocator(TMaybe<ui64> memoryLimit) {
if (memoryLimit) {
Alloc->SetLimit(*memoryLimit);
Alloc.SetLimit(*memoryLimit);
}
return TGuard(*Alloc);
return TGuard(Alloc);
}

TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
NKikimr::NMiniKQL::TScopedAlloc& alloc,
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
{
return new TKqpTasksRunner(std::move(tasks), execCtx, settings, logFunc);
return new TKqpTasksRunner(std::move(tasks), alloc, execCtx, settings, logFunc);
}

} // namespace NKqp
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/runtime/kqp_tasks_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ NYql::NDq::IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NYql::NDqProto::
class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCopyable {
public:
TKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks,
NKikimr::NMiniKQL::TScopedAlloc& alloc,
const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings,
const NYql::NDq::TLogFunc& logFunc);

Expand Down Expand Up @@ -50,15 +51,15 @@ class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCop
// otherwise use particular memory limit
TGuard<NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit = Nothing());

ui64 GetAllocatedMemory() const { return Alloc->GetAllocated(); }
ui64 GetAllocatedMemory() const { return Alloc.GetAllocated(); }

const TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> GetTasksStats() const { return Stats; }
private:
TMap<ui64, TIntrusivePtr<NYql::NDq::IDqTaskRunner>> TaskRunners;
TMap<ui64, NYql::NDq::TDqTaskSettings> Tasks;
TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> Stats;
NYql::NDq::TLogFunc LogFunc;
NMiniKQL::TScopedAlloc* Alloc;
NMiniKQL::TScopedAlloc& Alloc;
NMiniKQL::TKqpComputeContextBase* ComputeCtx;
NMiniKQL::TKqpDatashardApplyContext* ApplyCtx;

Expand All @@ -72,6 +73,7 @@ class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCop


TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks,
NKikimr::NMiniKQL::TScopedAlloc& alloc,
const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings,
const NYql::NDq::TLogFunc& logFunc);

Expand Down
5 changes: 2 additions & 3 deletions ydb/core/tx/datashard/datashard__engine_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,6 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor
KqpExecCtx.RandomProvider = TAppData::RandomProvider.Get();
KqpExecCtx.TimeProvider = TAppData::TimeProvider.Get();
KqpExecCtx.ApplyCtx = KqpApplyCtx.Get();
KqpExecCtx.Alloc = KqpAlloc.Get();
KqpExecCtx.TypeEnv = KqpTypeEnv.Get();
if (auto rm = NKqp::TryGetKqpResourceManager()) {
KqpExecCtx.PatternCache = rm->GetPatternCache();
Expand Down Expand Up @@ -1238,9 +1237,9 @@ NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(NKikimrTxDataShard::TKqpTra

settings.OptLLVM = "OFF";
settings.TerminateOnError = false;

Y_ABORT_UNLESS(KqpAlloc);
KqpAlloc->SetLimit(10_MB);
KqpTasksRunner = NKqp::CreateKqpTasksRunner(std::move(*tx.MutableTasks()), KqpExecCtx, settings, KqpLogFunc);
KqpTasksRunner = NKqp::CreateKqpTasksRunner(std::move(*tx.MutableTasks()), *KqpAlloc.Get(), KqpExecCtx, settings, KqpLogFunc);
}

return *KqpTasksRunner;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class TDqComputeActor : public TDqComputeActorBase<TDqComputeActor> {
};
}

auto taskRunner = TaskRunnerFactory(Task, RuntimeSettings.StatsMode, logger);
auto taskRunner = TaskRunnerFactory(GetAllocator(), Task, RuntimeSettings.StatsMode, logger);
SetTaskRunner(taskRunner);
auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
TDqTaskRunnerExecutionContext execCtx(TxId, RuntimeSettings.UseSpilling, std::move(wakeup));
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,11 @@ struct TComputeMemoryLimits {
IMemoryQuotaManager::TPtr MemoryQuotaManager;
};

//temporary flag to integarate changes in interface
#define Y_YQL_DQ_TASK_RUNNER_REQUIRES_ALLOCATOR 1

using TTaskRunnerFactory = std::function<
TIntrusivePtr<IDqTaskRunner>(const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc)
TIntrusivePtr<IDqTaskRunner>(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc)
>;

void FillAsyncStats(NDqProto::TDqAsyncBufferStats& proto, TDqAsyncStats stats);
Expand Down
21 changes: 16 additions & 5 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
, Running(!Task.GetCreateSuspended())
, PassExceptions(passExceptions)
{
Alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(
__LOCATION__,
NKikimr::TAlignedPagePoolCounters(),
FunctionRegistry->SupportsSizedAllocators(),
false
);
InitMonCounters(taskCounters);
InitializeTask();
if (ownMemoryQuota) {
Expand Down Expand Up @@ -626,8 +632,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssues issues) {
CA_LOG_E(InternalErrorLogString(statusCode, issues));
if (TaskRunner) {
TaskRunner->GetAllocatorPtr()->InvalidateMemInfo();
TaskRunner->GetAllocatorPtr()->DisableStrictAllocationCheck();
TaskRunner->GetAllocator().InvalidateMemInfo();
TaskRunner->GetAllocator().DisableStrictAllocationCheck();
}
State = NDqProto::COMPUTE_STATE_FAILURE;
ReportStateAndMaybeDie(statusCode, issues);
Expand Down Expand Up @@ -1365,6 +1371,10 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}
}

protected:
NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() {
return *Alloc.get();
}
private:
virtual const TDqMemoryQuota::TProfileStats* GetMemoryProfileStats() const {
Y_ABORT_UNLESS(MemoryQuota);
Expand Down Expand Up @@ -1586,7 +1596,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
.TaskCounters = TaskCounters,
.Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr,
.Alloc = TaskRunner ? Alloc : nullptr,
.MemoryQuotaManager = MemoryLimits.MemoryQuotaManager,
.SourceSettings = (!settings.empty() ? settings.at(inputIndex) : nullptr),
.Arena = Task.GetArena(),
Expand Down Expand Up @@ -1619,7 +1629,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
.ProgramBuilder = *transform.ProgramBuilder,
.Alloc = TaskRunner->GetAllocatorPtr(),
.Alloc = Alloc,
.TraceId = ComputeActorSpan.GetTraceId()
});
} catch (const std::exception& ex) {
Expand Down Expand Up @@ -2222,7 +2232,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>

LastSendStatsTime = now;
}

private:
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; //must be declared on top to be destroyed after all the rest
protected:
const NActors::TActorId ExecuterId;
const TTxId TxId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct ITaskRunnerActorFactory {
THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota = {}) = 0;
};

ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory);
ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory);

} // namespace NTaskRunnerActor

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,23 @@ class TLocalTaskRunnerActor
public:
static constexpr char ActorName[] = "YQL_DQ_TASK_RUNNER";

TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota)
TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota)
: TActor<TLocalTaskRunnerActor>(&TLocalTaskRunnerActor::Handler)
, FuncRegistry(funcRegistry)
, Parent(parent)
, Factory(factory)
, TxId(txId)
, TaskId(taskId)
, InputChannelsWithDisabledCheckpoints(std::move(inputChannelsWithDisabledCheckpoints))
, MemoryQuota(std::move(memoryQuota))
{ }
{
Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>(
__LOCATION__,
NKikimr::TAlignedPagePoolCounters(),
FuncRegistry.SupportsSizedAllocators(),
false
);
}

~TLocalTaskRunnerActor()
{ }
Expand Down Expand Up @@ -407,7 +415,7 @@ class TLocalTaskRunnerActor
void OnDqTask(TEvTaskRunnerCreate::TPtr& ev) {
ParentId = ev->Sender;
auto settings = NDq::TDqTaskSettings(&ev->Get()->Task);
TaskRunner = Factory(settings, ev->Get()->StatsMode, [this](const TString& message) {
TaskRunner = Factory(*Alloc.get(), settings, ev->Get()->StatsMode, [this](const TString& message) {
LOG_D(message);
});

Expand Down Expand Up @@ -463,6 +471,8 @@ class TLocalTaskRunnerActor
THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) {
return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::BAD_REQUEST, TVector<TIssue>{TIssue(message).SetCode(TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR)});
}
const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry;
std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;

NActors::TActorId ParentId;
ITaskRunnerActor::ICallbacks* Parent;
Expand All @@ -477,8 +487,9 @@ class TLocalTaskRunnerActor
};

struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory {
TLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory)
TLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory)
: Factory(factory)
, FuncRegistry(funcRegistry)
{ }

std::tuple<ITaskRunnerActor*, NActors::IActor*> Create(
Expand All @@ -488,19 +499,20 @@ struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory {
THashSet<ui32>&& inputChannelsWithDisabledCheckpoints,
THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota) override
{
auto* actor = new TLocalTaskRunnerActor(parent, Factory, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota));
auto* actor = new TLocalTaskRunnerActor(parent, Factory, FuncRegistry, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota));
return std::make_tuple(
static_cast<ITaskRunnerActor*>(actor),
static_cast<NActors::IActor*>(actor)
);
}

TTaskRunnerFactory Factory;
const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry;
};

ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory)
ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory)
{
return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(factory));
return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(funcRegistry, factory));
}

} // namespace NTaskRunnerActor
Expand Down
Loading
Loading