From 49ac954e1c668f2d7726b06ae1dadd2387081c21 Mon Sep 17 00:00:00 2001 From: Marina Pereskokova Date: Wed, 31 Jan 2024 13:12:05 +0300 Subject: [PATCH 1/3] Init yt dq jobs --- .../providers/dq/actors/yt/resource_manager.h | 3 + .../dq/actors/yt/yt_resource_manager.cpp | 2 + .../yql/tools/dq/worker_job/dq_worker.cpp | 20 ++++-- .../dq_vanilla_job.lite/dq_vanilla_job.lite | 1 + .../yql/yt/dq_vanilla_job.lite/main.cpp | 44 +++++++++++++ .../yql/yt/dq_vanilla_job.lite/ya.make | 30 +++++++++ ydb/library/yql/yt/dq_vanilla_job/main.cpp | 65 +++++++++++++++++++ ydb/library/yql/yt/dq_vanilla_job/ya.make | 26 ++++++++ 8 files changed, 187 insertions(+), 4 deletions(-) create mode 120000 ydb/library/yql/yt/dq_vanilla_job.lite/dq_vanilla_job.lite create mode 100644 ydb/library/yql/yt/dq_vanilla_job.lite/main.cpp create mode 100644 ydb/library/yql/yt/dq_vanilla_job.lite/ya.make create mode 100644 ydb/library/yql/yt/dq_vanilla_job/main.cpp create mode 100644 ydb/library/yql/yt/dq_vanilla_job/ya.make diff --git a/ydb/library/yql/providers/dq/actors/yt/resource_manager.h b/ydb/library/yql/providers/dq/actors/yt/resource_manager.h index 950d2e355679..8a2760081ad6 100644 --- a/ydb/library/yql/providers/dq/actors/yt/resource_manager.h +++ b/ydb/library/yql/providers/dq/actors/yt/resource_manager.h @@ -19,6 +19,7 @@ namespace NYql { extern const TString OPERATION_SIZE; extern const TString YT_COORDINATOR; extern const TString YT_BACKEND; + extern const TString YT_FORCE_IPV4; } class ICoordinationHelper; @@ -68,6 +69,8 @@ namespace NYql { int Capabilities = 0; int MaxRetries = -1; + bool ForceIPv4 = false; + // Pinger TString DieOnFileAbsence; // see YQL-14099 diff --git a/ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp b/ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp index 37e401aa90b1..cc254f9b9643 100644 --- a/ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp +++ b/ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp @@ -33,6 +33,7 @@ namespace NYql { const TString OPERATION_SIZE("OPERATION_SIZE"); const TString YT_COORDINATOR("YT_COORDINATOR"); const TString YT_BACKEND("YT_BACKEND"); + const TString YT_FORCE_IPV4("YT_FORCE_IPV4"); } using namespace NActors; @@ -600,6 +601,7 @@ namespace NYql { .BeginMap() .Item(NCommonJobVars::YT_COORDINATOR).Value(coordinatorStr) .Item(NCommonJobVars::YT_BACKEND).Value(backendStr) + .Item(NCommonJobVars::YT_FORCE_IPV4).Value(Options.ForceIPv4) .DoFor(Options.YtBackend.GetVaultEnv(), [&] (NYT::TFluentMap fluent, const NYql::NProto::TDqConfig::TAttr& envVar) { // Добавляем env variables TString tokenValue; try { diff --git a/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp b/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp index ea3ebe2a4473..b6b799c82404 100644 --- a/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp +++ b/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include @@ -30,6 +32,7 @@ #include #include #include +#include using namespace NYql::NDqs; @@ -187,11 +190,20 @@ namespace NYql::NDq::NWorker { TRangeWalker portWalker(startPort, startPort+100); auto ports = BindInRange(portWalker); + auto forceIPv4 = IsTrue(GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_FORCE_IPV4, "")); + if (forceIPv4) { + auto config = NYT::New(); + config->EnableIPv4 = true; + config->EnableIPv6 = false; + NYT::NNet::TAddressResolver::Get()->Configure(config); + } + auto [host, ip] = NYql::NDqs::GetLocalAddress( - coordinatorConfig.HasHostName() ? &coordinatorConfig.GetHostName() : nullptr + coordinatorConfig.HasHostName() ? &coordinatorConfig.GetHostName() : nullptr, + forceIPv4 ? AF_INET : AF_INET6 ); - auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[1].Addr.GetPort(), host, ip); + auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[forceIPv4 ? 0 : 1].Addr.GetPort(), host, ip); i64 cacheSize = backendConfig.HasCacheSize() ? backendConfig.GetCacheSize() : 16000000000L; @@ -280,8 +292,8 @@ namespace NYql::NDq::NWorker { std::tie(setup, logSettings) = BuildActorSetup( nodeId, ip, - ports[1].Addr.GetPort(), - ports[1].Socket->Release(), + ports[forceIPv4 ? 0 : 1].Addr.GetPort(), + ports[forceIPv4 ? 0 : 1].Socket->Release(), {}, dqSensors, [](const TIntrusivePtr& setup) { diff --git a/ydb/library/yql/yt/dq_vanilla_job.lite/dq_vanilla_job.lite b/ydb/library/yql/yt/dq_vanilla_job.lite/dq_vanilla_job.lite new file mode 120000 index 000000000000..d2866c359fb3 --- /dev/null +++ b/ydb/library/yql/yt/dq_vanilla_job.lite/dq_vanilla_job.lite @@ -0,0 +1 @@ +/home/mpereskokova/.ya/build/symres/d31db772576c8ac9811f9ef2500cbd3a/dq_vanilla_job.lite \ No newline at end of file diff --git a/ydb/library/yql/yt/dq_vanilla_job.lite/main.cpp b/ydb/library/yql/yt/dq_vanilla_job.lite/main.cpp new file mode 100644 index 000000000000..2e7fd6ddba84 --- /dev/null +++ b/ydb/library/yql/yt/dq_vanilla_job.lite/main.cpp @@ -0,0 +1,44 @@ +#include + +#include +#include +#include + +#include + +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include + +using namespace NYql; + +int main() { + NBacktrace::RegisterKikimrFatalActions(); + if (!NYT::MlockFileMappings()) { + Cerr << "mlockall failed, but that's fine" << Endl; + } + + NKikimr::NMiniKQL::IStatsRegistryPtr statsRegistry = NKikimr::NMiniKQL::CreateDefaultStatsRegistry(); + + auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({ + GetCommonDqFactory(), + GetDqYtFactory(statsRegistry.Get()), + NKikimr::NMiniKQL::GetYqlFactory(), + }); + + auto dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({ + CreateCommonDqTaskTransformFactory(), + CreateYtDqTaskTransformFactory(), + }); + + return NTaskRunnerProxy::CreateTaskCommandExecutor(dqCompFactory, dqTaskTransformFactory, statsRegistry.Get(), true); +} diff --git a/ydb/library/yql/yt/dq_vanilla_job.lite/ya.make b/ydb/library/yql/yt/dq_vanilla_job.lite/ya.make new file mode 100644 index 000000000000..c149282d6741 --- /dev/null +++ b/ydb/library/yql/yt/dq_vanilla_job.lite/ya.make @@ -0,0 +1,30 @@ +PROGRAM() + +PEERDIR( + library/cpp/yt/mlock + yt/cpp/mapreduce/client + ydb/library/yql/minikql/comp_nodes/llvm14 + ydb/library/yql/public/udf/service/terminate_policy + ydb/library/yql/utils/backtrace + ydb/library/yql/dq/comp_nodes + ydb/library/yql/dq/integration/transform + ydb/library/yql/dq/transform + ydb/library/yql/dq/runtime + ydb/library/yql/providers/common/comp_nodes + ydb/library/yql/providers/dq/common + ydb/library/yql/providers/dq/runtime + ydb/library/yql/providers/yt/comp_nodes/dq + ydb/library/yql/providers/yt/mkql_dq + ydb/library/yql/providers/yt/codec/codegen + ydb/library/yql/providers/yt/comp_nodes/llvm14 + ydb/library/yql/sql/pg + ydb/library/yql/parser/pg_wrapper +) + +YQL_LAST_ABI_VERSION() + +SRCS( + main.cpp +) + +END() diff --git a/ydb/library/yql/yt/dq_vanilla_job/main.cpp b/ydb/library/yql/yt/dq_vanilla_job/main.cpp new file mode 100644 index 000000000000..eb5fb6951dbd --- /dev/null +++ b/ydb/library/yql/yt/dq_vanilla_job/main.cpp @@ -0,0 +1,65 @@ +#include +#include + +#include +#include + +#include + +#include +#include +#include + +#include +#include +#include + +#include + +#include + +using namespace NYql; + +int main(int argc, const char* argv[]) { + NBacktrace::RegisterKikimrFatalActions(); + NBacktrace::EnableKikimrSymbolize(); + + if (!NYT::MlockFileMappings()) { + Cerr << "mlockall failed, but that's fine" << Endl; + } + + if (argc > 1) { + if (!strcmp(argv[1], "-V")) { + Cerr << ToString(GetProgramCommitId()) << Endl; + return 0; + } else if (!strcmp(argv[1], "tasks_runner_proxy")) { + NKikimr::NMiniKQL::IStatsRegistryPtr statsRegistry = NKikimr::NMiniKQL::CreateDefaultStatsRegistry(); + + auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({ + GetCommonDqFactory(), + GetDqYtFactory(statsRegistry.Get()), + NKikimr::NMiniKQL::GetYqlFactory(), + }); + + auto dqTaskTransformFactory = CreateCompositeTaskTransformFactory({ + CreateCommonDqTaskTransformFactory(), + CreateYtDqTaskTransformFactory(), + }); + + return NTaskRunnerProxy::CreateTaskCommandExecutor(dqCompFactory, dqTaskTransformFactory, statsRegistry.Get(), true); + } + } + + try { + NYT::Initialize(argc, argv); + + auto job = new NDq::NWorker::TWorkerJob(); + + job->Do(); + } catch (...) { + Cerr << CurrentExceptionMessage(); + return -1; + } + + return 0; +} diff --git a/ydb/library/yql/yt/dq_vanilla_job/ya.make b/ydb/library/yql/yt/dq_vanilla_job/ya.make new file mode 100644 index 000000000000..4dc08fb2e835 --- /dev/null +++ b/ydb/library/yql/yt/dq_vanilla_job/ya.make @@ -0,0 +1,26 @@ +PROGRAM() + +PEERDIR( + library/cpp/svnversion + library/cpp/yt/mlock + ydb/library/yql/dq/comp_nodes + ydb/library/yql/dq/integration/transform + ydb/library/yql/dq/transform + ydb/library/yql/providers/common/comp_nodes + ydb/library/yql/providers/yt/codec/codegen + ydb/library/yql/providers/yt/comp_nodes/llvm14 + ydb/library/yql/utils/backtrace + ydb/library/yql/providers/yt/comp_nodes/dq + ydb/library/yql/providers/yt/mkql_dq + ydb/library/yql/tools/dq/worker_job + ydb/library/yql/sql/pg + ydb/library/yql/parser/pg_wrapper +) + +YQL_LAST_ABI_VERSION() + +SRCS( + main.cpp +) + +END() From a8af084caf85acaeaf8d29bd75f5fc12a9515370 Mon Sep 17 00:00:00 2001 From: Marina Pereskokova Date: Wed, 31 Jan 2024 13:14:40 +0300 Subject: [PATCH 2/3] Remove binary --- ydb/library/yql/yt/dq_vanilla_job.lite/dq_vanilla_job.lite | 1 - 1 file changed, 1 deletion(-) delete mode 120000 ydb/library/yql/yt/dq_vanilla_job.lite/dq_vanilla_job.lite diff --git a/ydb/library/yql/yt/dq_vanilla_job.lite/dq_vanilla_job.lite b/ydb/library/yql/yt/dq_vanilla_job.lite/dq_vanilla_job.lite deleted file mode 120000 index d2866c359fb3..000000000000 --- a/ydb/library/yql/yt/dq_vanilla_job.lite/dq_vanilla_job.lite +++ /dev/null @@ -1 +0,0 @@ -/home/mpereskokova/.ya/build/symres/d31db772576c8ac9811f9ef2500cbd3a/dq_vanilla_job.lite \ No newline at end of file From 7a52ced506eae82c641fad0ad6211231f8894ecc Mon Sep 17 00:00:00 2001 From: Marina Pereskokova Date: Thu, 1 Feb 2024 11:37:15 +0300 Subject: [PATCH 3/3] Add jobs in ya make --- ydb/library/yql/yt/ya.make | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ydb/library/yql/yt/ya.make b/ydb/library/yql/yt/ya.make index 4aeb5b9bc60c..abd7fe1a9519 100644 --- a/ydb/library/yql/yt/ya.make +++ b/ydb/library/yql/yt/ya.make @@ -9,5 +9,7 @@ END() RECURSE( dynamic native + dq_vanilla_job + dq_vanilla_job.lite )