diff --git a/ydb/core/config/init/init.cpp b/ydb/core/config/init/init.cpp new file mode 100644 index 000000000000..862b8a4d1ea2 --- /dev/null +++ b/ydb/core/config/init/init.cpp @@ -0,0 +1,298 @@ +#include "init.h" +#include "init_impl.h" + +namespace NKikimr::NConfig { + +class TDefaultEnv + : public IEnv +{ +public: + TString HostName() const override { + return ::HostName(); + } + + TString FQDNHostName() const override { + return ::FQDNHostName(); + } + + TString ReadFromFile(const TString& filePath, const TString& fileName, bool allowEmpty) const override { + return ::ReadFromFile(filePath, fileName, allowEmpty); + } + + void Sleep(const TDuration& dur) const override { + ::Sleep(dur); + } +}; + +class TDefaultErrorCollector + : public IErrorCollector +{ +public: + void Fatal(TString error) override { + Cerr << error << Endl; + } +}; + +struct TFileConfigOptions { + TString Description; + TMaybe ParsedOption; +}; + +class TDefaultProtoConfigFileProvider + : public IProtoConfigFileProvider +{ +private: + TMap> Opts; + + static bool IsFileExists(const fs::path& p) { + std::error_code ec; + return fs::exists(p, ec) && !ec; + } + + static bool IsFileReadable(const fs::path& p) { + std::error_code ec; // For noexcept overload usage. + auto perms = fs::status(p, ec).permissions(); + if ((perms & fs::perms::owner_read) != fs::perms::none && + (perms & fs::perms::group_read) != fs::perms::none && + (perms & fs::perms::others_read) != fs::perms::none + ) + { + return true; + } + return false; + } +public: + TDefaultProtoConfigFileProvider() { + AddProtoConfigOptions(*this); + } + + void AddConfigFile(TString optName, TString description) override { + Opts.emplace(optName, MakeSimpleShared(TFileConfigOptions{.Description = description})); + } + + void RegisterCliOptions(NLastGetopt::TOpts& opts) const override { + for (const auto& [name, opt] : Opts) { + opts.AddLongOption(name, opt->Description).OptionalArgument("PATH").StoreResult(&opt->ParsedOption); + } + } + + TString GetProtoFromFile(const TString& path, IErrorCollector& errorCollector) const override { + fs::path filePath(path.c_str()); + if (!IsFileExists(filePath)) { + errorCollector.Fatal(Sprintf("File %s doesn't exists", path.c_str())); + return {}; + } + if (!IsFileReadable(filePath)) { + errorCollector.Fatal(Sprintf("File %s isn't readable", path.c_str())); + return {}; + } + TAutoPtr fileInput(new TMappedFileInput(path)); + return fileInput->ReadAll(); + } + + bool Has(TString optName) override { + if (auto* opt = Opts.FindPtr(optName)) { + return !!((*opt)->ParsedOption); + } + return false; + } + + TString Get(TString optName) override { + if (auto* opt = Opts.FindPtr(optName); opt && (*opt)->ParsedOption) { + return (*opt)->ParsedOption.GetRef(); + } + return ""; // FIXME: throw + } +}; + +class TDefaultConfigUpdateTracer + : public IConfigUpdateTracer +{ +private: + THashMap ConfigInitInfo; + +public: + void Add(ui32 kind, TConfigItemInfo::TUpdate update) override { + ConfigInitInfo[kind].Updates.emplace_back(update); + } + + THashMap Dump() const override { + return ConfigInitInfo; + } +}; + +std::unique_ptr MakeDefaultEnv() { + return std::make_unique(); +} + +std::unique_ptr MakeDefaultErrorCollector() { + return std::make_unique(); +} + +std::unique_ptr MakeDefaultProtoConfigFileProvider() { + return std::make_unique(); +} + +std::unique_ptr MakeDefaultConfigUpdateTracer() { + return std::make_unique(); +} + +void CopyNodeLocation(NActorsInterconnect::TNodeLocation* dst, const NYdb::NDiscovery::TNodeLocation& src) { + if (src.DataCenterNum) { + dst->SetDataCenterNum(src.DataCenterNum.value()); + } + if (src.RoomNum) { + dst->SetRoomNum(src.RoomNum.value()); + } + if (src.RackNum) { + dst->SetRackNum(src.RackNum.value()); + } + if (src.BodyNum) { + dst->SetBodyNum(src.BodyNum.value()); + } + if (src.Body) { + dst->SetBody(src.Body.value()); + } + if (src.DataCenter) { + dst->SetDataCenter(src.DataCenter.value()); + } + if (src.Module) { + dst->SetModule(src.Module.value()); + } + if (src.Rack) { + dst->SetRack(src.Rack.value()); + } + if (src.Unit) { + dst->SetUnit(src.Unit.value()); + } +} + +void CopyNodeLocation(NYdb::NDiscovery::TNodeLocation* dst, const NActorsInterconnect::TNodeLocation& src) { + if (src.HasDataCenterNum()) { + dst->DataCenterNum = src.GetDataCenterNum(); + } + if (src.HasRoomNum()) { + dst->RoomNum = src.GetRoomNum(); + } + if (src.HasRackNum()) { + dst->RackNum = src.GetRackNum(); + } + if (src.HasBodyNum()) { + dst->BodyNum = src.GetBodyNum(); + } + if (src.HasBody()) { + dst->Body = src.GetBody(); + } + if (src.HasDataCenter()) { + dst->DataCenter = src.GetDataCenter(); + } + if (src.HasModule()) { + dst->Module = src.GetModule(); + } + if (src.HasRack()) { + dst->Rack = src.GetRack(); + } + if (src.HasUnit()) { + dst->Unit = src.GetUnit(); + } +} + +void AddProtoConfigOptions(IProtoConfigFileProvider& out) { + const TMap opts = { + {"alloc-file", "Allocator config file"}, + {"audit-file", "File with audit config"}, + {"auth-file", "authorization configuration"}, + {"auth-token-file", "authorization token configuration"}, + {"bootstrap-file", "Bootstrap config file"}, + {"bs-file", "blobstorage config file"}, + {"channels-file", "tablet channel profile config file"}, + {"cms-file", "CMS config file"}, + {"domains-file", "domain config file"}, + {"drivemodel-file", "drive model config file"}, + {"dyn-nodes-file", "Dynamic nodes config file"}, + {"feature-flags-file", "File with feature flags to turn new features on/off"}, + {"fq-file", "Federated Query config file"}, + {"grpc-file", "gRPC config file"}, + {"http-proxy-file", "Http proxy config file"}, + {"ic-file", "interconnect config file"}, + {"incrhuge-file", "incremental huge blob keeper config file"}, + {"key-file", "tenant encryption key configuration"}, + {"kqp-file", "Kikimr Query Processor config file"}, + {"log-file", "log config file"}, + {"memorylog-file", "set buffer size for memory log"}, + {"metering-file", "File with metering config"}, + {"naming-file", "static nameservice config file"}, + {"netclassifier-file", "NetClassifier config file"}, + {"pdisk-key-file", "pdisk encryption key configuration"}, + {"pq-file", "PersQueue config file"}, + {"pqcd-file", "PersQueue cluster discovery config file"}, + {"public-http-file", "Public HTTP config file"}, + {"rb-file", "File with resource broker customizations"}, + {"sqs-file", "SQS config file"}, + {"sys-file", "actor system config file (use dummy config by default)"}, + {"vdisk-file", "vdisk kind config file"}, + }; + + for (const auto& [opt, desc] : opts) { + out.AddConfigFile(opt, desc); + } +} + +void LoadBootstrapConfig(IProtoConfigFileProvider& protoConfigFileProvider, IErrorCollector& errorCollector, TVector configFiles, NKikimrConfig::TAppConfig& out) { + for (const TString& path : configFiles) { + NKikimrConfig::TAppConfig parsedConfig; + const TString protoString = protoConfigFileProvider.GetProtoFromFile(path, errorCollector); + /* + * FIXME: if (ErrorCollector.HasFatal()) { return; } + */ + const bool result = ParsePBFromString(protoString, &parsedConfig); + if (!result) { + errorCollector.Fatal(Sprintf("Can't parse protobuf: %s", path.c_str())); + return; + } + out.MergeFrom(parsedConfig); + } +} + +void LoadYamlConfig(TConfigRefs refs, const TString& yamlConfigFile, NKikimrConfig::TAppConfig& appConfig, TCallContext callCtx) { + if (!yamlConfigFile) { + return; + } + + IConfigUpdateTracer& ConfigUpdateTracer = refs.Tracer; + IErrorCollector& errorCollector = refs.ErrorCollector; + IProtoConfigFileProvider& protoConfigFileProvider = refs.ProtoConfigFileProvider; + + const TString yamlConfigString = protoConfigFileProvider.GetProtoFromFile(yamlConfigFile, errorCollector); + /* + * FIXME: if (ErrorCollector.HasFatal()) { return; } + */ + NKikimrConfig::TAppConfig parsedConfig = NKikimr::NYaml::Parse(yamlConfigString); // FIXME + /* + * FIXME: if (ErrorCollector.HasFatal()) { return; } + */ + const google::protobuf::Descriptor* descriptor = appConfig.GetDescriptor(); + const google::protobuf::Reflection* reflection = appConfig.GetReflection(); + for(int fieldIdx = 0; fieldIdx < descriptor->field_count(); ++fieldIdx) { + const google::protobuf::FieldDescriptor* fieldDescriptor = descriptor->field(fieldIdx); + if (!fieldDescriptor) { + continue; + } + + if (fieldDescriptor->is_repeated()) { + continue; + } + + if (reflection->HasField(appConfig, fieldDescriptor)) { + // field is already set in app config + continue; + } + + if (reflection->HasField(parsedConfig, fieldDescriptor)) { + reflection->SwapFields(&appConfig, &parsedConfig, {fieldDescriptor}); + TRACE_CONFIG_CHANGE(callCtx, fieldIdx, ReplaceConfigWithConsoleProto); + } + } +} + +} // namespace NKikimr::NConfig diff --git a/ydb/core/config/init/init.h b/ydb/core/config/init/init.h new file mode 100644 index 000000000000..ded9fa497c00 --- /dev/null +++ b/ydb/core/config/init/init.h @@ -0,0 +1,92 @@ +#pragma once + +#include + +#include + +#include +#include +#include +#include + +#include + +namespace NKikimr::NConfig { + +class IEnv { +public: + virtual ~IEnv() {} + virtual TString HostName() const = 0; + virtual TString FQDNHostName() const = 0; + virtual TString ReadFromFile(const TString& filePath, const TString& fileName, bool allowEmpty = true) const = 0; + virtual void Sleep(const TDuration& dur) const = 0; +}; + +class IErrorCollector { +public: + virtual ~IErrorCollector() {} + virtual void Fatal(TString error) = 0; +}; + +class IProtoConfigFileProvider { +public: + virtual ~IProtoConfigFileProvider() {} + virtual void AddConfigFile(TString optName, TString description) = 0; + virtual void RegisterCliOptions(NLastGetopt::TOpts& opts) const = 0; + virtual TString GetProtoFromFile(const TString& path, IErrorCollector& errorCollector) const = 0; + virtual bool Has(TString optName) = 0; + virtual TString Get(TString optName) = 0; +}; + +class IConfigUpdateTracer { +public: + virtual ~IConfigUpdateTracer() {} + virtual void Add(ui32 kind, TConfigItemInfo::TUpdate) = 0; + virtual THashMap Dump() const = 0; +}; + +class IInitialConfigurator { +public: + virtual ~IInitialConfigurator() {}; + virtual void RegisterCliOptions(NLastGetopt::TOpts& opts) = 0; + virtual void ValidateOptions(const NLastGetopt::TOpts& opts, const NLastGetopt::TOptsParseResult& parseResult) = 0; + virtual void Parse(const TVector& freeArgs) = 0; +}; + +std::unique_ptr MakeDefaultConfigUpdateTracer(); +std::unique_ptr MakeDefaultProtoConfigFileProvider(); +std::unique_ptr MakeDefaultEnv(); +std::unique_ptr MakeDefaultErrorCollector(); +std::unique_ptr MakeDefaultInitialConfigurator( + NConfig::IErrorCollector& errorCollector, + NConfig::IProtoConfigFileProvider& protoConfigFileProvider, + NConfig::IConfigUpdateTracer& configUpdateTracer, + NConfig::IEnv& env); + +class TInitialConfigurator { +public: + TInitialConfigurator( + NConfig::IErrorCollector& errorCollector, + NConfig::IProtoConfigFileProvider& protoConfigFileProvider, + NConfig::IConfigUpdateTracer& configUpdateTracer, + NConfig::IEnv& env) + : Impl(MakeDefaultInitialConfigurator(errorCollector, protoConfigFileProvider, configUpdateTracer, env)) + {} + + void RegisterCliOptions(NLastGetopt::TOpts& opts) { + Impl->RegisterCliOptions(opts); + } + + void ValidateOptions(const NLastGetopt::TOpts& opts, const NLastGetopt::TOptsParseResult& parseResult) { + Impl->ValidateOptions(opts, parseResult); + } + + void Parse(const TVector& freeArgs) { + Impl->Parse(freeArgs); + } + +private: + std::unique_ptr Impl; +}; + +} // namespace NKikimr::NConfig diff --git a/ydb/core/config/init/init_impl.h b/ydb/core/config/init/init_impl.h new file mode 100644 index 000000000000..de7090bcf1c1 --- /dev/null +++ b/ydb/core/config/init/init_impl.h @@ -0,0 +1,1530 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace fs = std::filesystem; + +extern TAutoPtr DummyActorSystemConfig(); +extern TAutoPtr DummyAllocatorConfig(); + +using namespace NYdb::NConsoleClient; + +namespace NKikimr::NConfig { + +struct TCallContext { + const char* File; + int Line; +}; + +#define TRACE_CONFIG_CHANGE(CHANGE_CONTEXT, KIND, CHANGE_KIND) \ + ConfigUpdateTracer.Add(KIND, TConfigItemInfo::TUpdate{CHANGE_CONTEXT.File, static_cast(CHANGE_CONTEXT.Line), TConfigItemInfo::EUpdateKind:: CHANGE_KIND}) + +#define TRACE_CONFIG_CHANGE_INPLACE(KIND, CHANGE_KIND) \ + ConfigUpdateTracer.Add(KIND, TConfigItemInfo::TUpdate{__FILE__, static_cast(__LINE__), TConfigItemInfo::EUpdateKind:: CHANGE_KIND}) + +#define TRACE_CONFIG_CHANGE_INPLACE_T(KIND, CHANGE_KIND) \ + ConfigUpdateTracer.Add(NKikimrConsole::TConfigItem:: KIND ## Item, TConfigItemInfo::TUpdate{__FILE__, static_cast(__LINE__), TConfigItemInfo::EUpdateKind:: CHANGE_KIND}) + +#define CALL_CTX() ::NKikimr::NConfig::TCallContext{__FILE__, __LINE__} + + +constexpr TStringBuf NODE_KIND_YDB = "ydb"; +constexpr TStringBuf NODE_KIND_YQ = "yq"; + +constexpr static ui32 DefaultLogLevel = NActors::NLog::PRI_WARN; // log settings +constexpr static ui32 DefaultLogSamplingLevel = NActors::NLog::PRI_DEBUG; // log settings +constexpr static ui32 DefaultLogSamplingRate = 0; // log settings + +template +bool ParsePBFromString(const TString &content, T *pb, bool allowUnknown = false) { + if (!allowUnknown) { + return ::google::protobuf::TextFormat::ParseFromString(content, pb); + } + + ::google::protobuf::TextFormat::Parser parser; + parser.AllowUnknownField(true); + return parser.ParseFromString(content, pb); +} + +struct TConfigRefs { + IConfigUpdateTracer& Tracer; + IErrorCollector& ErrorCollector; + IProtoConfigFileProvider& ProtoConfigFileProvider; +}; + + +template +using TAccessors = std::tuple< + bool (NKikimrConfig::TAppConfig::*)() const, + const TProto& (NKikimrConfig::TAppConfig::*)() const, + TProto* (NKikimrConfig::TAppConfig::*)() + >; + +template +auto MutableConfigPart( + TConfigRefs refs, + const char *optname, + TFieldTag tag, + NKikimrConfig::TAppConfig& baseConfig, + NKikimrConfig::TAppConfig& appConfig, + TCallContext callCtx) -> decltype((appConfig.*std::get<2>(NKikimrConfig::TAppConfig::GetFieldAccessorsByFieldTag(tag)))()) +{ + auto [hasConfig, getConfig, mutableConfig] = NKikimrConfig::TAppConfig::GetFieldAccessorsByFieldTag(tag); + ui32 kind = NKikimrConfig::TAppConfig::GetFieldIdByFieldTag(tag); + + auto& ConfigUpdateTracer = refs.Tracer; + auto& errorCollector = refs.ErrorCollector; + auto& protoConfigFileProvider = refs.ProtoConfigFileProvider; + + if ((appConfig.*hasConfig)()) { + return nullptr; // this field is already provided in AppConfig, so we don't overwrite it + } + + if (optname && protoConfigFileProvider.Has(optname)) { + auto *res = (appConfig.*mutableConfig)(); + + TString path = protoConfigFileProvider.Get(optname); + const TString protoString = protoConfigFileProvider.GetProtoFromFile(path, errorCollector); + /* + * FIXME: if (ErrorCollector.HasFatal()) { return; } + */ + const bool result = ParsePBFromString(protoString, res); + if (!result) { + errorCollector.Fatal(Sprintf("Can't parse protobuf: %s", path.c_str())); + return nullptr; + } + + TRACE_CONFIG_CHANGE(callCtx, kind, MutableConfigPartFromFile); + + return res; + } else if ((baseConfig.*hasConfig)()) { + auto* res = (appConfig.*mutableConfig)(); + res->CopyFrom((baseConfig.*getConfig)()); + TRACE_CONFIG_CHANGE(callCtx, kind, MutableConfigPartFromBaseConfig); + return res; + } + + return nullptr; +} + +template +auto MutableConfigPartMerge( + TConfigRefs refs, + const char *optname, + TFieldTag tag, + NKikimrConfig::TAppConfig& appConfig, + TCallContext callCtx) -> decltype((appConfig.*std::get<2>(NKikimrConfig::TAppConfig::GetFieldAccessorsByFieldTag(tag)))()) +{ + auto mutableConfig = std::get<2>(NKikimrConfig::TAppConfig::GetFieldAccessorsByFieldTag(tag)); + ui32 kind = NKikimrConfig::TAppConfig::GetFieldIdByFieldTag(tag); + + auto& ConfigUpdateTracer = refs.Tracer; + auto& errorCollector = refs.ErrorCollector; + auto& protoConfigFileProvider = refs.ProtoConfigFileProvider; + + if (protoConfigFileProvider.Has(optname)) { + typename std::remove_reference::type cfg; + + TString path = protoConfigFileProvider.Get(optname); + const TString protoString = protoConfigFileProvider.GetProtoFromFile(path, errorCollector); + /* + * FIXME: if (ErrorCollector.HasFatal()) { return; } + */ + const bool result = ParsePBFromString(protoString, &cfg); + if (!result) { + errorCollector.Fatal(Sprintf("Can't parse protobuf: %s", path.c_str())); + return nullptr; + } + + auto *res = (appConfig.*mutableConfig)(); + res->MergeFrom(cfg); + TRACE_CONFIG_CHANGE(callCtx, kind, MutableConfigPartMergeFromFile); + return res; + } + + return nullptr; +} + +void AddProtoConfigOptions(IProtoConfigFileProvider& out); +void LoadBootstrapConfig(IProtoConfigFileProvider& protoConfigFileProvider, IErrorCollector& errorCollector, TVector configFiles, NKikimrConfig::TAppConfig& out); +void LoadYamlConfig(TConfigRefs refs, const TString& yamlConfigFile, NKikimrConfig::TAppConfig& appConfig, TCallContext callCtx); +void CopyNodeLocation(NActorsInterconnect::TNodeLocation* dst, const NYdb::NDiscovery::TNodeLocation& src); +void CopyNodeLocation(NYdb::NDiscovery::TNodeLocation* dst, const NActorsInterconnect::TNodeLocation& src); + +struct TConfigFields { + TMaybe LogLevel; // log settings + TMaybe LogSamplingLevel; // log settings + TMaybe LogSamplingRate; // log settings + TMaybe LogFormat;// log settings + TMaybe SysLogServiceTag; //unique tags for sys logs + TMaybe LogFileName; // log file name to initialize file log backend + TMaybe ClusterName; // log settings + + ui32 NodeId = 0; + TMaybe NodeIdValue; + ui32 DefaultInterconnectPort = 19001; + ui32 MonitoringPort = 0; + TString MonitoringAddress; + ui32 MonitoringThreads = 10; + TString MonitoringCertificateFile; + TString RestartsCountFile = ""; + size_t CompileInflightLimit = 100000; // MiniKQLCompileService + TString UDFsDir; + TVector UDFsPaths; + TMaybe TenantName; + TVector NodeBrokerAddresses; + ui32 NodeBrokerPort = 0; + bool NodeBrokerUseTls = false; + bool FixedNodeID = false; + ui32 InterconnectPort = 0; + bool IgnoreCmsConfigs = false; + bool TinyMode = false; + TString NodeAddress; + TString NodeHost; + TString NodeResolveHost; + TString NodeDomain; + ui32 SqsHttpPort = 0; + TString NodeKind = TString(NODE_KIND_YDB); + TMaybe NodeType; + TMaybe DataCenter; + TString Rack = ""; + ui32 Body = 0; + ui32 GRpcPort = 0; + ui32 GRpcsPort = 0; + TString GRpcPublicHost = ""; + ui32 GRpcPublicPort = 0; + ui32 GRpcsPublicPort = 0; + TVector GRpcPublicAddressesV4; + TVector GRpcPublicAddressesV6; + TString GRpcPublicTargetNameOverride = ""; + TString PathToGrpcCertFile; + TString PathToInterconnectCertFile; + TString PathToGrpcPrivateKeyFile; + TString PathToInterconnectPrivateKeyFile; + TString PathToGrpcCaFile; + TString PathToInterconnectCaFile; + TString YamlConfigFile; + bool SysLogEnabled = false; + bool TcpEnabled = false; + bool SuppressVersionCheck = false; + + void RegisterCliOptions(NLastGetopt::TOpts& opts) { + // FIXME remove default value where TMaybe used + opts.AddLongOption("cluster-name", "which cluster this node belongs to") + .DefaultValue("unknown").OptionalArgument("STR").StoreResult(&ClusterName); + opts.AddLongOption("log-level", "default logging level").OptionalArgument("1-7") + .DefaultValue(ToString(DefaultLogLevel)).StoreResult(&LogLevel); + opts.AddLongOption("log-sampling-level", "sample logs equal to or above this level").OptionalArgument("1-7") + .DefaultValue(ToString(DefaultLogSamplingLevel)).StoreResult(&LogSamplingLevel); + opts.AddLongOption("log-sampling-rate", + "log only each Nth message with priority matching sampling level; 0 turns log sampling off") + .OptionalArgument(Sprintf("0,%" PRIu32, Max())) + .DefaultValue(ToString(DefaultLogSamplingRate)).StoreResult(&LogSamplingRate); + opts.AddLongOption("log-format", "log format to use; short skips the priority and timestamp") + .DefaultValue("full").OptionalArgument("full|short|json").StoreResult(&LogFormat); + opts.AddLongOption("syslog", "send to syslog instead of stderr").SetFlag(&SysLogEnabled); + opts.AddLongOption("syslog-service-tag", "unique tag for syslog").RequiredArgument("NAME").StoreResult(&SysLogServiceTag); + opts.AddLongOption("log-file-name", "file name for log backend").RequiredArgument("NAME").StoreResult(&LogFileName); + opts.AddLongOption("tcp", "start tcp interconnect").NoArgument().SetFlag(&TcpEnabled); + opts.AddLongOption('n', "node", "Node ID or 'static' to auto-detect using naming file and ic-port.") + .RequiredArgument("[NUM|static]").StoreResult(&NodeIdValue); + opts.AddLongOption("node-broker", "node broker address host:port") + .RequiredArgument("ADDR").AppendTo(&NodeBrokerAddresses); + opts.AddLongOption("node-broker-port", "node broker port (hosts from naming file are used)") + .RequiredArgument("PORT").StoreResult(&NodeBrokerPort); + opts.AddLongOption("node-broker-use-tls", "use tls for node broker (hosts from naming file are used)") + .RequiredArgument("PORT").StoreResult(&NodeBrokerUseTls); + opts.AddLongOption("node-address", "address for dynamic node") + .RequiredArgument("ADDR").StoreResult(&NodeAddress); + opts.AddLongOption("node-host", "hostname for dynamic node") + .RequiredArgument("NAME").StoreResult(&NodeHost); + opts.AddLongOption("node-resolve-host", "resolve hostname for dynamic node") + .RequiredArgument("NAME").StoreResult(&NodeResolveHost); + opts.AddLongOption("node-domain", "domain for dynamic node to register in") + .RequiredArgument("NAME").StoreResult(&NodeDomain); + opts.AddLongOption("ic-port", "interconnect port") + .RequiredArgument("NUM").StoreResult(&InterconnectPort); + opts.AddLongOption("sqs-port", "sqs port") + .RequiredArgument("NUM").StoreResult(&SqsHttpPort); + opts.AddLongOption("tenant", "add binding for Local service to specified tenant, might be one of {'/', '//'}") + .RequiredArgument("NAME").StoreResult(&TenantName); + opts.AddLongOption("mon-port", "Monitoring port").OptionalArgument("NUM").StoreResult(&MonitoringPort); + opts.AddLongOption("mon-address", "Monitoring address").OptionalArgument("ADDR").StoreResult(&MonitoringAddress); + opts.AddLongOption("mon-cert", "Monitoring certificate (https)").OptionalArgument("PATH").StoreResult(&MonitoringCertificateFile); + opts.AddLongOption("mon-threads", "Monitoring http server threads").RequiredArgument("NUM").StoreResult(&MonitoringThreads); + opts.AddLongOption("suppress-version-check", "Suppress version compatibility checking via IC").NoArgument().SetFlag(&SuppressVersionCheck); + + opts.AddLongOption("grpc-port", "enable gRPC server on port").RequiredArgument("PORT").StoreResult(&GRpcPort); + opts.AddLongOption("grpcs-port", "enable gRPC SSL server on port").RequiredArgument("PORT").StoreResult(&GRpcsPort); + opts.AddLongOption("grpc-public-host", "set public gRPC host for discovery").RequiredArgument("HOST").StoreResult(&GRpcPublicHost); + opts.AddLongOption("grpc-public-port", "set public gRPC port for discovery").RequiredArgument("PORT").StoreResult(&GRpcPublicPort); + opts.AddLongOption("grpcs-public-port", "set public gRPC SSL port for discovery").RequiredArgument("PORT").StoreResult(&GRpcsPublicPort); + opts.AddLongOption("grpc-public-address-v4", "set public ipv4 address for discovery").RequiredArgument("ADDR").EmplaceTo(&GRpcPublicAddressesV4); + opts.AddLongOption("grpc-public-address-v6", "set public ipv6 address for discovery").RequiredArgument("ADDR").EmplaceTo(&GRpcPublicAddressesV6); + opts.AddLongOption("grpc-public-target-name-override", "set public hostname override for TLS in discovery").RequiredArgument("HOST").StoreResult(&GRpcPublicTargetNameOverride); + opts.AddLongOption('r', "restarts-count-file", "State for restarts monitoring counter,\nuse empty string to disable\n") + .OptionalArgument("PATH").DefaultValue(RestartsCountFile).StoreResult(&RestartsCountFile); + opts.AddLongOption("compile-inflight-limit", "Limit on parallel programs compilation").OptionalArgument("NUM").StoreResult(&CompileInflightLimit); + opts.AddLongOption("udf", "Load shared library with UDF by given path").AppendTo(&UDFsPaths); + opts.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory").StoreResult(&UDFsDir); + opts.AddLongOption("node-kind", Sprintf("Kind of the node (affects list of services activated allowed values are {'%s', '%s'} )", NODE_KIND_YDB.data(), NODE_KIND_YQ.data())) + .RequiredArgument("NAME").StoreResult(&NodeKind); + opts.AddLongOption("node-type", "Type of the node") + .RequiredArgument("NAME").StoreResult(&NodeType); + opts.AddLongOption("ignore-cms-configs", "Don't load configs from CMS") + .NoArgument().SetFlag(&IgnoreCmsConfigs); + opts.AddLongOption("cert", "Path to client certificate file (PEM) for interconnect").RequiredArgument("PATH").StoreResult(&PathToInterconnectCertFile); + opts.AddLongOption("grpc-cert", "Path to client certificate file (PEM) for grpc").RequiredArgument("PATH").StoreResult(&PathToGrpcCertFile); + opts.AddLongOption("ic-cert", "Path to client certificate file (PEM) for interconnect").RequiredArgument("PATH").StoreResult(&PathToInterconnectCertFile); + opts.AddLongOption("key", "Path to private key file (PEM) for interconnect").RequiredArgument("PATH").StoreResult(&PathToInterconnectPrivateKeyFile); + opts.AddLongOption("grpc-key", "Path to private key file (PEM) for grpc").RequiredArgument("PATH").StoreResult(&PathToGrpcPrivateKeyFile); + opts.AddLongOption("ic-key", "Path to private key file (PEM) for interconnect").RequiredArgument("PATH").StoreResult(&PathToInterconnectPrivateKeyFile); + opts.AddLongOption("ca", "Path to certificate authority file (PEM) for interconnect").RequiredArgument("PATH").StoreResult(&PathToInterconnectCaFile); + opts.AddLongOption("grpc-ca", "Path to certificate authority file (PEM) for grpc").RequiredArgument("PATH").StoreResult(&PathToGrpcCaFile); + opts.AddLongOption("ic-ca", "Path to certificate authority file (PEM) for interconnect").RequiredArgument("PATH").StoreResult(&PathToInterconnectCaFile); + opts.AddLongOption("data-center", "data center name (used to describe dynamic node location)") + .RequiredArgument("NAME").StoreResult(&DataCenter); + opts.AddLongOption("rack", "rack name (used to describe dynamic node location)") + .RequiredArgument("NAME").StoreResult(&Rack); + opts.AddLongOption("body", "body name (used to describe dynamic node location)") + .RequiredArgument("NUM").StoreResult(&Body); + opts.AddLongOption("yaml-config", "Yaml config").OptionalArgument("PATH").StoreResult(&YamlConfigFile); + + opts.AddLongOption("tiny-mode", "Start in a tiny mode") + .NoArgument().SetFlag(&TinyMode); + + } + + void ApplyFields(NKikimrConfig::TAppConfig& appConfig, IEnv& env, IConfigUpdateTracer& ConfigUpdateTracer) const { + if (!appConfig.HasAllocatorConfig()) { + appConfig.MutableAllocatorConfig()->CopyFrom(*DummyAllocatorConfig()); + TRACE_CONFIG_CHANGE_INPLACE_T(AllocatorConfig, UpdateExplicitly); + } + + // apply certificates, if any + if (!PathToInterconnectCertFile.Empty()) { + appConfig.MutableInterconnectConfig()->SetPathToCertificateFile(PathToInterconnectCertFile); + TRACE_CONFIG_CHANGE_INPLACE_T(InterconnectConfig, UpdateExplicitly); + } + + if (!PathToInterconnectPrivateKeyFile.Empty()) { + appConfig.MutableInterconnectConfig()->SetPathToPrivateKeyFile(PathToInterconnectPrivateKeyFile); + TRACE_CONFIG_CHANGE_INPLACE_T(InterconnectConfig, UpdateExplicitly); + } + + if (!PathToInterconnectCaFile.Empty()) { + appConfig.MutableInterconnectConfig()->SetPathToCaFile(PathToInterconnectCaFile); + TRACE_CONFIG_CHANGE_INPLACE_T(InterconnectConfig, UpdateExplicitly); + } + + if (appConfig.HasGRpcConfig() && appConfig.GetGRpcConfig().HasCert()) { + appConfig.MutableGRpcConfig()->SetPathToCertificateFile(appConfig.GetGRpcConfig().GetCert()); + TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); + } + + if (!PathToGrpcCertFile.Empty()) { + appConfig.MutableGRpcConfig()->SetPathToCertificateFile(PathToGrpcCertFile); + TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); + } + + if (appConfig.HasGRpcConfig() && appConfig.GetGRpcConfig().HasKey()) { + appConfig.MutableGRpcConfig()->SetPathToPrivateKeyFile(appConfig.GetGRpcConfig().GetKey()); + TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); + } + + if (!PathToGrpcPrivateKeyFile.Empty()) { + appConfig.MutableGRpcConfig()->SetPathToPrivateKeyFile(PathToGrpcPrivateKeyFile); + TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); + } + + if (appConfig.HasGRpcConfig() && appConfig.GetGRpcConfig().HasCA()) { + appConfig.MutableGRpcConfig()->SetPathToCaFile(appConfig.GetGRpcConfig().GetCA()); + TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); + } + + if (!PathToGrpcCaFile.Empty()) { + appConfig.MutableGRpcConfig()->SetPathToCaFile(PathToGrpcCaFile); + TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); + } + + if (!appConfig.HasDomainsConfig()) { + ythrow yexception() << "DomainsConfig is not provided"; + } + + if (!appConfig.HasChannelProfileConfig()) { + ythrow yexception() << "ChannelProfileConfig is not provided"; + } + + if (NodeKind == NODE_KIND_YQ && InterconnectPort) { + auto& fqConfig = *appConfig.MutableFederatedQueryConfig(); + auto& nmConfig = *fqConfig.MutableNodesManager(); + nmConfig.SetPort(InterconnectPort); + nmConfig.SetHost(env.HostName()); + } + + if (SuppressVersionCheck) { + if (appConfig.HasNameserviceConfig()) { + appConfig.MutableNameserviceConfig()->SetSuppressVersionCheck(true); + TRACE_CONFIG_CHANGE_INPLACE_T(NameserviceConfig, UpdateExplicitly); + } else { + ythrow yexception() << "--suppress-version-check option is provided without static nameservice config"; + } + } + + // apply options affecting UDF paths + if (!appConfig.HasUDFsDir()) { + appConfig.SetUDFsDir(UDFsDir); + } + + if (!appConfig.UDFsPathsSize()) { + for (const auto& path : UDFsPaths) { + appConfig.AddUDFsPaths(path); + } + } + + if (!appConfig.HasMonitoringConfig()) { + appConfig.MutableMonitoringConfig()->SetMonitoringThreads(MonitoringThreads); + TRACE_CONFIG_CHANGE_INPLACE_T(MonitoringConfig, UpdateExplicitly); + } + if (!appConfig.HasRestartsCountConfig() && RestartsCountFile) { + appConfig.MutableRestartsCountConfig()->SetRestartsCountFile(RestartsCountFile); + TRACE_CONFIG_CHANGE_INPLACE_T(RestartsCountConfig, UpdateExplicitly); + } + + // Ports and node type are always applied (even if config was loaded from CMS). + if (MonitoringPort) { + appConfig.MutableMonitoringConfig()->SetMonitoringPort(MonitoringPort); + TRACE_CONFIG_CHANGE_INPLACE_T(MonitoringConfig, UpdateExplicitly); + } + if (MonitoringAddress) { + appConfig.MutableMonitoringConfig()->SetMonitoringAddress(MonitoringAddress); + TRACE_CONFIG_CHANGE_INPLACE_T(MonitoringConfig, UpdateExplicitly); + } + if (MonitoringCertificateFile) { + TString sslCertificate = TUnbufferedFileInput(MonitoringCertificateFile).ReadAll(); + if (!sslCertificate.empty()) { + appConfig.MutableMonitoringConfig()->SetMonitoringCertificate(sslCertificate); + TRACE_CONFIG_CHANGE_INPLACE_T(MonitoringConfig, UpdateExplicitly); + } else { + ythrow yexception() << "invalid ssl certificate file"; + } + } + if (SqsHttpPort) { + appConfig.MutableSqsConfig()->MutableHttpServerConfig()->SetPort(SqsHttpPort); + TRACE_CONFIG_CHANGE_INPLACE_T(SqsConfig, UpdateExplicitly); + } + if (GRpcPort) { + auto& conf = *appConfig.MutableGRpcConfig(); + conf.SetStartGRpcProxy(true); + conf.SetPort(GRpcPort); + TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); + } + if (GRpcsPort) { + auto& conf = *appConfig.MutableGRpcConfig(); + conf.SetStartGRpcProxy(true); + conf.SetSslPort(GRpcsPort); + TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); + } + if (GRpcPublicHost) { + auto& conf = *appConfig.MutableGRpcConfig(); + conf.SetPublicHost(GRpcPublicHost); + for (auto& ext : *conf.MutableExtEndpoints()) { + if (!ext.HasPublicHost()) { + ext.SetPublicHost(GRpcPublicHost); + } + } + TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); + } + if (GRpcPublicPort) { + auto& conf = *appConfig.MutableGRpcConfig(); + conf.SetPublicPort(GRpcPublicPort); + for (auto& ext : *conf.MutableExtEndpoints()) { + if (!ext.HasPublicPort()) { + ext.SetPublicPort(GRpcPublicPort); + } + } + TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); + } + if (GRpcsPublicPort) { + auto& conf = *appConfig.MutableGRpcConfig(); + conf.SetPublicSslPort(GRpcsPublicPort); + for (auto& ext : *conf.MutableExtEndpoints()) { + if (!ext.HasPublicSslPort()) { + ext.SetPublicSslPort(GRpcsPublicPort); + } + } + TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); + } + for (const auto& addr : GRpcPublicAddressesV4) { + appConfig.MutableGRpcConfig()->AddPublicAddressesV4(addr); + } + if (GRpcPublicAddressesV4.size()) { + TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); + } + for (const auto& addr : GRpcPublicAddressesV6) { + appConfig.MutableGRpcConfig()->AddPublicAddressesV6(addr); + } + if (GRpcPublicAddressesV6.size()) { + TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); + } + if (GRpcPublicTargetNameOverride) { + appConfig.MutableGRpcConfig()->SetPublicTargetNameOverride(GRpcPublicTargetNameOverride); + TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); + } + if (NodeType) { + appConfig.MutableTenantPoolConfig()->SetNodeType(NodeType.GetRef()); + TRACE_CONFIG_CHANGE_INPLACE_T(TenantPoolConfig, UpdateExplicitly); + } + + if (TenantName && InterconnectPort != DefaultInterconnectPort) { + appConfig.MutableMonitoringConfig()->SetHostLabelOverride(HostAndICPort(env)); + TRACE_CONFIG_CHANGE_INPLACE_T(MonitoringConfig, UpdateExplicitly); + } + + if (DataCenter) { + appConfig.MutableMonitoringConfig()->SetDataCenter(to_lower(DataCenter.GetRef())); + TRACE_CONFIG_CHANGE_INPLACE_T(MonitoringConfig, UpdateExplicitly); + + if (appConfig.HasFederatedQueryConfig()) { + appConfig.MutableFederatedQueryConfig()->MutableNodesManager()->SetDataCenter(to_lower(DataCenter.GetRef())); + TRACE_CONFIG_CHANGE_INPLACE_T(FederatedQueryConfig, UpdateExplicitly); + } + } + } + + ui32 DeduceNodeId(const NKikimrConfig::TAppConfig& appConfig, IEnv& env) const { + ui32 nodeId = 0; + if (NodeIdValue) { + if (NodeIdValue.GetRef() == "static") { + if (!appConfig.HasNameserviceConfig() || !InterconnectPort) { + ythrow yexception() << "'--node static' requires naming file and IC port to be specified"; + } + + try { + nodeId = FindStaticNodeId(appConfig, env); + } catch(TSystemError& e) { + ythrow yexception() << "cannot detect host name: " << e.what(); + } + + if (!nodeId) { + ythrow yexception() << "cannot detect node ID for " << env.HostName() << ":" << InterconnectPort + << " and for " << env.FQDNHostName() << ":" << InterconnectPort << Endl; + } + return nodeId; + } else { + if (!TryFromString(NodeIdValue.GetRef(), nodeId)) { + ythrow yexception() << "wrong '--node' value (should be NUM, 'static')"; + } + } + } + return nodeId; + } + + NActors::TNodeLocation CreateNodeLocation() const { // FIXME + NActorsInterconnect::TNodeLocation location; + location.SetDataCenter(DataCenter ? DataCenter.GetRef() : TString("")); + location.SetRack(Rack); + location.SetUnit(ToString(Body)); + NActors::TNodeLocation loc(location); + + NActorsInterconnect::TNodeLocation legacy; + legacy.SetDataCenterNum(DataCenterFromString(DataCenter ? DataCenter.GetRef() : TString(""))); + legacy.SetRoomNum(0); + legacy.SetRackNum(RackFromString(Rack)); + legacy.SetBodyNum(Body); + loc.InheritLegacyValue(TNodeLocation(legacy)); + return loc; + } + + void ApplyLogSettings(NKikimrConfig::TAppConfig& appConfig, IConfigUpdateTracer& ConfigUpdateTracer) { + if (SysLogServiceTag && !appConfig.GetLogConfig().GetSysLogService()) { + appConfig.MutableLogConfig()->SetSysLogService(SysLogServiceTag.GetRef()); + TRACE_CONFIG_CHANGE_INPLACE_T(LogConfig, UpdateExplicitly); + } + + if (LogFileName) { + appConfig.MutableLogConfig()->SetBackendFileName(LogFileName.GetRef()); + TRACE_CONFIG_CHANGE_INPLACE_T(LogConfig, UpdateExplicitly); + } + } + + ui32 FindStaticNodeId(const NKikimrConfig::TAppConfig& appConfig, IEnv& env) const { + std::vector candidates = {env.HostName(), env.FQDNHostName()}; + for(auto& candidate: candidates) { + candidate.to_lower(); + + const NKikimrConfig::TStaticNameserviceConfig& nameserviceConfig = appConfig.GetNameserviceConfig(); + for (const auto& node : nameserviceConfig.GetNode()) { + if (node.GetHost() == candidate && InterconnectPort == node.GetPort()) { + return node.GetNodeId(); + } + } + } + + return 0; + } + + TString HostAndICPort(IEnv& env) const { + try { + auto hostname = to_lower(env.HostName()); + hostname = hostname.substr(0, hostname.find('.')); + return TStringBuilder() << hostname << ":" << InterconnectPort; + } catch (TSystemError& error) { + return ""; + } + } + + void SetupLogConfigDefaults(NKikimrConfig::TLogConfig& logConfig, IConfigUpdateTracer& ConfigUpdateTracer) const { + if (SysLogEnabled) { + logConfig.SetSysLog(true); + } + if (LogLevel) { + logConfig.SetDefaultLevel(LogLevel.GetRef()); + } + if (LogSamplingLevel) { + logConfig.SetDefaultSamplingLevel(LogSamplingLevel.GetRef()); + } + if (LogSamplingRate) { + logConfig.SetDefaultSamplingRate(LogSamplingRate.GetRef()); + } + if (LogFormat) { + logConfig.SetFormat(LogFormat.GetRef()); + } + if (ClusterName) { + logConfig.SetClusterName(ClusterName.GetRef()); + } + TRACE_CONFIG_CHANGE_INPLACE_T(LogConfig, UpdateExplicitly); + } + + void SetupBootstrapConfigDefaults(NKikimrConfig::TBootstrap& bootstrapConfig, IConfigUpdateTracer& ConfigUpdateTracer) const { + bootstrapConfig.MutableCompileServiceConfig()->SetInflightLimit(CompileInflightLimit); + TRACE_CONFIG_CHANGE_INPLACE_T(BootstrapConfig, UpdateExplicitly); + }; + + void SetupInterconnectConfigDefaults(NKikimrConfig::TInterconnectConfig& icConfig, IConfigUpdateTracer& ConfigUpdateTracer) const { + if (TcpEnabled) { + icConfig.SetStartTcp(true); + TRACE_CONFIG_CHANGE_INPLACE_T(InterconnectConfig, UpdateExplicitly); + } + }; + + NYdb::NDiscovery::TNodeRegistrationSettings GetNodeRegistrationSettings( + const TString &domainName, + const TString &nodeHost, + const TString &nodeAddress, + const TString &nodeResolveHost, + const TMaybe& path) const + { + NYdb::NDiscovery::TNodeRegistrationSettings settings; + settings.Host(nodeHost); + settings.Port(InterconnectPort); + settings.ResolveHost(nodeResolveHost); + settings.Address(nodeAddress); + settings.DomainPath(domainName); + settings.FixedNodeId(FixedNodeID); + if (path) { + settings.Path(*path); + } + + auto loc = CreateNodeLocation(); + NActorsInterconnect::TNodeLocation tmpLocation; + loc.Serialize(&tmpLocation, false); + + NYdb::NDiscovery::TNodeLocation settingLocation; + CopyNodeLocation(&settingLocation, tmpLocation); + settings.Location(settingLocation); + return settings; + } + + void FillClusterEndpoints(const NKikimrConfig::TAppConfig& appConfig, TVector &addrs) const { + if (!NodeBrokerAddresses.empty()) { + for (auto addr: NodeBrokerAddresses) { + addrs.push_back(addr); + } + } else { + if (!NodeBrokerPort) { + ythrow yexception() << "NodeBrokerPort MUST be defined"; + } + + for (const auto &node : appConfig.GetNameserviceConfig().GetNode()) { + addrs.emplace_back(TStringBuilder() << (NodeBrokerUseTls ? "grpcs://" : "") << node.GetHost() << ':' << NodeBrokerPort); + } + } + ShuffleRange(addrs); + } + + TMaybe GetSchemePath() const { + if (TenantName && TenantName.GetRef().StartsWith('/')) { + return TenantName.GetRef(); // TODO(alexvru): fix it + } + return {}; + } + + void ValidateTenant() const { + if (TenantName) { + if (!IsStartWithSlash(TenantName.GetRef())) { // ? + ythrow yexception() << "leading / in --tenant parametr is always required."; + } + if (NodeId && NodeKind != NODE_KIND_YQ) { + ythrow yexception() << "opt '--node' compatible only with '--tenant no', opt 'node' incompatible with any other values of opt '--tenant'"; + } + } + } + + void ApplyServicesMask(NKikimr::TBasicKikimrServicesMask& out) const { + if (NodeKind == NODE_KIND_YDB) { + if (TinyMode) { + out.SetTinyMode(); + } + // do nothing => default behaviour + } else if (NodeKind == NODE_KIND_YQ) { + out.DisableAll(); + out.EnableYQ(); + } else { + ythrow yexception() << "wrong '--node-kind' value '" << NodeKind << "', only '" << NODE_KIND_YDB << "' or '" << NODE_KIND_YQ << "' is allowed"; + } + } + + bool IsStaticNode() const { + return NodeBrokerAddresses.empty() && !NodeBrokerPort; + } + + void ValidateStaticNodeConfig() const { + if (!NodeId) { + ythrow yexception() << "Either --node [NUM|'static'] or --node-broker[-port] should be specified"; + } + } +}; + +struct TMbusConfigFields { + ui32 BusProxyPort = NMsgBusProxy::TProtocol::DefaultPort; + NBus::TBusQueueConfig ProxyBusQueueConfig; + NBus::TBusServerSessionConfig ProxyBusSessionConfig; + TString TracePath; + TVector ProxyBindToProxy; + bool Start = false; + + void RegisterCliOptions(NLastGetopt::TOpts& opts) { + opts.AddLongOption("mbus", "Start MessageBus proxy").NoArgument().SetFlag(&Start); + opts.AddLongOption("mbus-port", "MessageBus proxy port").RequiredArgument("PORT").StoreResult(&BusProxyPort); + opts.AddLongOption("mbus-trace-path", "Path for trace files").RequiredArgument("PATH").StoreResult(&TracePath); + opts.AddLongOption("proxy", "Bind to proxy(-ies)").RequiredArgument("ADDR").AppendTo(&ProxyBindToProxy); + SetMsgBusDefaults(ProxyBusSessionConfig, ProxyBusQueueConfig); + ProxyBusSessionConfig.ConfigureLastGetopt(opts, "mbus-"); + ProxyBusQueueConfig.ConfigureLastGetopt(opts, "mbus-"); + } + + void ValidateCliOptions(const NLastGetopt::TOpts& opts, const NLastGetopt::TOptsParseResult& parseResult) const { + if (!Start) { + for (const auto &option : opts.Opts_) { + for (const TString &longName : option->GetLongNames()) { + if (longName.StartsWith("mbus-") && parseResult.Has(option.Get())) { + ythrow yexception() << "option --" << longName << " is useless without --mbus option"; + } + } + } + } + } + + void InitMessageBusConfig(NKikimrConfig::TAppConfig& appConfig) { + auto messageBusConfig = appConfig.MutableMessageBusConfig(); + messageBusConfig->SetStartBusProxy(Start); + messageBusConfig->SetBusProxyPort(BusProxyPort); + + auto queueConfig = messageBusConfig->MutableProxyBusQueueConfig(); + queueConfig->SetName(ProxyBusQueueConfig.Name); + queueConfig->SetNumWorkers(ProxyBusQueueConfig.NumWorkers); + + auto sessionConfig = messageBusConfig->MutableProxyBusSessionConfig(); + + // TODO use macro from messagebus header file + sessionConfig->SetName(ProxyBusSessionConfig.Name); + sessionConfig->SetNumRetries(ProxyBusSessionConfig.NumRetries); + sessionConfig->SetRetryInterval(ProxyBusSessionConfig.RetryInterval); + sessionConfig->SetReconnectWhenIdle(ProxyBusSessionConfig.ReconnectWhenIdle); + sessionConfig->SetMaxInFlight(ProxyBusSessionConfig.MaxInFlight); + sessionConfig->SetPerConnectionMaxInFlight(ProxyBusSessionConfig.PerConnectionMaxInFlight); + sessionConfig->SetPerConnectionMaxInFlightBySize(ProxyBusSessionConfig.PerConnectionMaxInFlightBySize); + sessionConfig->SetMaxInFlightBySize(ProxyBusSessionConfig.MaxInFlightBySize); + sessionConfig->SetTotalTimeout(ProxyBusSessionConfig.TotalTimeout); + sessionConfig->SetSendTimeout(ProxyBusSessionConfig.SendTimeout); + sessionConfig->SetConnectTimeout(ProxyBusSessionConfig.ConnectTimeout); + sessionConfig->SetDefaultBufferSize(ProxyBusSessionConfig.DefaultBufferSize); + sessionConfig->SetMaxBufferSize(ProxyBusSessionConfig.MaxBufferSize); + sessionConfig->SetSocketRecvBufferSize(ProxyBusSessionConfig.SocketRecvBufferSize); + sessionConfig->SetSocketSendBufferSize(ProxyBusSessionConfig.SocketSendBufferSize); + sessionConfig->SetSocketToS(ProxyBusSessionConfig.SocketToS); + sessionConfig->SetSendThreshold(ProxyBusSessionConfig.SendThreshold); + sessionConfig->SetCork(ProxyBusSessionConfig.Cork.MilliSeconds()); + sessionConfig->SetMaxMessageSize(ProxyBusSessionConfig.MaxMessageSize); + sessionConfig->SetTcpNoDelay(ProxyBusSessionConfig.TcpNoDelay); + sessionConfig->SetTcpCork(ProxyBusSessionConfig.TcpCork); + sessionConfig->SetExecuteOnMessageInWorkerPool(ProxyBusSessionConfig.ExecuteOnMessageInWorkerPool); + sessionConfig->SetExecuteOnReplyInWorkerPool(ProxyBusSessionConfig.ExecuteOnReplyInWorkerPool); + sessionConfig->SetListenPort(ProxyBusSessionConfig.ListenPort); + + for (auto proxy : ProxyBindToProxy) { + messageBusConfig->AddProxyBindToProxy(proxy); + } + messageBusConfig->SetStartTracingBusProxy(!!TracePath); + messageBusConfig->SetTracePath(TracePath); + } +}; + +// ===== + +struct TNodeRegistrationGrpcSettings { + TString PathToGrpcCertFile; + TString PathToGrpcCaFile; + TString PathToGrpcPrivateKeyFile; +}; + +NClient::TKikimr GetKikimr(const TNodeRegistrationGrpcSettings& cf, const TString& addr, const IEnv& env) { + TCommandConfig::TServerEndpoint endpoint = TCommandConfig::ParseServerAddress(addr); + NYdbGrpc::TGRpcClientConfig grpcConfig(endpoint.Address, TDuration::Seconds(5)); + grpcConfig.LoadBalancingPolicy = "round_robin"; + if (endpoint.EnableSsl.Defined()) { + grpcConfig.EnableSsl = endpoint.EnableSsl.GetRef(); + auto& sslCredentials = grpcConfig.SslCredentials; + if (cf.PathToGrpcCaFile) { + sslCredentials.pem_root_certs = env.ReadFromFile(cf.PathToGrpcCaFile, "CA certificates"); + } + if (cf.PathToGrpcCertFile && cf.PathToGrpcPrivateKeyFile) { + sslCredentials.pem_cert_chain = env.ReadFromFile(cf.PathToGrpcCertFile, "Client certificates"); + sslCredentials.pem_private_key = env.ReadFromFile(cf.PathToGrpcPrivateKeyFile, "Client certificates key"); + } + } + return NClient::TKikimr(grpcConfig); +} + +struct TNodeRegistrationSettings { + const TString &domainName; + const TString &nodeHost; + const TString &nodeAddress; + const TString &nodeResolveHost; + const TMaybe& path; + bool FixedNodeID = false; + ui32 InterconnectPort = 0; +}; + +NYdb::NDiscovery::TNodeRegistrationResult TryToRegisterDynamicNodeViaDiscoveryService( + const TNodeRegistrationGrpcSettings& gs, + const TString addr, + const NYdb::NDiscovery::TNodeRegistrationSettings& nrs, + const IEnv& env) +{ + TCommandConfig::TServerEndpoint endpoint = TCommandConfig::ParseServerAddress(addr); + NYdb::TDriverConfig config; + if (endpoint.EnableSsl.Defined()) { + if (gs.PathToGrpcCaFile) { + config.UseSecureConnection(env.ReadFromFile(gs.PathToGrpcCaFile, "CA certificates").c_str()); + } + if (gs.PathToGrpcCertFile && gs.PathToGrpcPrivateKeyFile) { + auto certificate = env.ReadFromFile(gs.PathToGrpcCertFile, "Client certificates"); + auto privateKey = env.ReadFromFile(gs.PathToGrpcPrivateKeyFile, "Client certificates key"); + config.UseClientCertificate(certificate.c_str(), privateKey.c_str()); + } + } + config.SetAuthToken(BUILTIN_ACL_ROOT); + config.SetEndpoint(endpoint.Address); + auto connection = NYdb::TDriver(config); + + auto client = NYdb::NDiscovery::TDiscoveryClient(connection); + NYdb::NDiscovery::TNodeRegistrationResult result = client.NodeRegistration(nrs).GetValueSync(); + connection.Stop(true); + return result; +} + +THolder TryToRegisterDynamicNodeViaLegacyService( + const TNodeRegistrationGrpcSettings& rgs, + NActors::TNodeLocation loc, + const TString& addr, + const TNodeRegistrationSettings& rs, + const IEnv& env) +{ + NClient::TKikimr kikimr(GetKikimr(rgs, addr, env)); + auto registrant = kikimr.GetNodeRegistrant(); + + return MakeHolder( + registrant.SyncRegisterNode( + ToString(rs.domainName), + rs.nodeHost, + rs.InterconnectPort, + rs.nodeAddress, + rs.nodeResolveHost, + loc, + rs.FixedNodeID, + rs.path)); +} + +THolder RegisterDynamicNodeViaLegacyService( + const TNodeRegistrationGrpcSettings& gs, + NActors::TNodeLocation loc, + const TVector& addrs, + const TNodeRegistrationSettings& rs, + const IEnv& env) +{ + THolder result; + while (!result || !result->IsSuccess()) { + for (const auto& addr : addrs) { + result = TryToRegisterDynamicNodeViaLegacyService( + gs, + loc, + addr, + rs, + env); + if (result->IsSuccess()) { + Cout << "Success. Registered via legacy service as " << result->GetNodeId() << Endl; + break; + } + Cerr << "Registration error: " << result->GetErrorMessage() << Endl; + } + if (!result || !result->IsSuccess()) { + env.Sleep(TDuration::Seconds(1)); + } + } + if (!result) { + ythrow yexception() << "Invalid result"; + } + + if (!result->IsSuccess()) { + ythrow yexception() << "Cannot register dynamic node: " << result->GetErrorMessage(); + } + + return result; +} + +// ===== + +struct TAppInitDebugInfo { + NKikimrConfig::TAppConfig OldConfig; + NKikimrConfig::TAppConfig YamlConfig; + THashMap ConfigTransformInfo; +}; + +class TInitialConfiguratorImpl + : public IInitialConfigurator +{ + ui32 NodeId = 0; + TBasicKikimrServicesMask ServicesMask; + TKikimrScopeId ScopeId; + TString TenantName; + TString ClusterName; + + TMap Labels; + + NKikimrConfig::TAppConfig BaseConfig; + NKikimrConfig::TAppConfig AppConfig; + + NConfig::TConfigFields ConfigFields; + NConfig::TMbusConfigFields MbusConfigFields; + + TAppInitDebugInfo InitDebug; + + NConfig::IErrorCollector& ErrorCollector; + NConfig::IProtoConfigFileProvider& ProtoConfigFileProvider; + NConfig::IConfigUpdateTracer& ConfigUpdateTracer; + NConfig::IEnv& Env; + +public: + TInitialConfiguratorImpl( + NConfig::IErrorCollector& errorCollector, + NConfig::IProtoConfigFileProvider& protoConfigFileProvider, + NConfig::IConfigUpdateTracer& configUpdateTracer, + NConfig::IEnv& env) + : ErrorCollector(errorCollector) + , ProtoConfigFileProvider(protoConfigFileProvider) + , ConfigUpdateTracer(configUpdateTracer) + , Env(env) + {} + + void ValidateOptions(const NLastGetopt::TOpts& opts, const NLastGetopt::TOptsParseResult& parseResult) override { + MbusConfigFields.ValidateCliOptions(opts, parseResult); + } + + void Parse(const TVector& freeArgs) override { + using TCfg = NKikimrConfig::TAppConfig; + + NConfig::TConfigRefs refs{ConfigUpdateTracer, ErrorCollector, ProtoConfigFileProvider}; + + Option("auth-file", TCfg::TAuthConfigFieldTag{}, CALL_CTX()); + LoadBootstrapConfig(ProtoConfigFileProvider, ErrorCollector, freeArgs, BaseConfig); + LoadYamlConfig(refs, ConfigFields.YamlConfigFile, AppConfig, CALL_CTX()); + OptionMerge("auth-token-file", TCfg::TAuthConfigFieldTag{}, CALL_CTX()); + + // start memorylog as soon as possible + Option("memorylog-file", TCfg::TMemoryLogConfigFieldTag{}, &TInitialConfiguratorImpl::InitMemLog, CALL_CTX()); + Option("naming-file", TCfg::TNameserviceConfigFieldTag{}, CALL_CTX()); + + ConfigFields.NodeId = ConfigFields.DeduceNodeId(AppConfig, Env); + Cout << "Determined node ID: " << ConfigFields.NodeId << Endl; + + ConfigFields.ValidateTenant(); + + ConfigFields.ApplyServicesMask(ServicesMask); + + PreFillLabels(ConfigFields); + + if (ConfigFields.IsStaticNode()) { + InitStaticNode(); + } else { + InitDynamicNode(); + } + + LoadYamlConfig(refs, ConfigFields.YamlConfigFile, AppConfig, CALL_CTX()); + + Option("sys-file", TCfg::TActorSystemConfigFieldTag{}, CALL_CTX()); + + if (!AppConfig.HasActorSystemConfig()) { + AppConfig.MutableActorSystemConfig()->CopyFrom(*DummyActorSystemConfig()); + TRACE_CONFIG_CHANGE_INPLACE_T(ActorSystemConfig, SetExplicitly); + } + + Option("domains-file", TCfg::TDomainsConfigFieldTag{}, CALL_CTX()); + Option("bs-file", TCfg::TBlobStorageConfigFieldTag{}, CALL_CTX()); + Option("log-file", TCfg::TLogConfigFieldTag{}, &TInitialConfiguratorImpl::SetupLogConfigDefaults, CALL_CTX()); + + // This flag is set per node and we prefer flag over CMS. + ConfigFields.ApplyLogSettings(AppConfig, ConfigUpdateTracer); + + Option("ic-file", TCfg::TInterconnectConfigFieldTag{}, &TInitialConfiguratorImpl::SetupInterconnectConfigDefaults, CALL_CTX()); + Option("channels-file", TCfg::TChannelProfileConfigFieldTag{}, CALL_CTX()); + Option("bootstrap-file", TCfg::TBootstrapConfigFieldTag{}, &TInitialConfiguratorImpl::SetupBootstrapConfigDefaults, CALL_CTX()); + Option("vdisk-file", TCfg::TVDiskConfigFieldTag{}, CALL_CTX()); + Option("drivemodel-file", TCfg::TDriveModelConfigFieldTag{}, CALL_CTX()); + Option("grpc-file", TCfg::TGRpcConfigFieldTag{}, CALL_CTX()); + Option("dyn-nodes-file", TCfg::TDynamicNameserviceConfigFieldTag{}, CALL_CTX()); + Option("cms-file", TCfg::TCmsConfigFieldTag{}, CALL_CTX()); + Option("pq-file", TCfg::TPQConfigFieldTag{}, CALL_CTX()); + Option("pqcd-file", TCfg::TPQClusterDiscoveryConfigFieldTag{}, CALL_CTX()); + Option("netclassifier-file", TCfg::TNetClassifierConfigFieldTag{}, CALL_CTX()); + Option("auth-file", TCfg::TAuthConfigFieldTag{}, CALL_CTX()); + OptionMerge("auth-token-file", TCfg::TAuthConfigFieldTag{}, CALL_CTX()); + Option("key-file", TCfg::TKeyConfigFieldTag{}, CALL_CTX()); + Option("pdisk-key-file", TCfg::TPDiskKeyConfigFieldTag{}, CALL_CTX()); + Option("sqs-file", TCfg::TSqsConfigFieldTag{}, CALL_CTX()); + Option("http-proxy-file", TCfg::THttpProxyConfigFieldTag{}, CALL_CTX()); + Option("public-http-file", TCfg::TPublicHttpConfigFieldTag{}, CALL_CTX()); + Option("feature-flags-file", TCfg::TFeatureFlagsFieldTag{}, CALL_CTX()); + Option("rb-file", TCfg::TResourceBrokerConfigFieldTag{}, CALL_CTX()); + Option("metering-file", TCfg::TMeteringConfigFieldTag{}, CALL_CTX()); + Option("audit-file", TCfg::TAuditConfigFieldTag{}, CALL_CTX()); + Option("kqp-file", TCfg::TKQPConfigFieldTag{}, CALL_CTX()); + Option("incrhuge-file", TCfg::TIncrHugeConfigFieldTag{}, CALL_CTX()); + Option("alloc-file", TCfg::TAllocatorConfigFieldTag{}, CALL_CTX()); + Option("fq-file", TCfg::TFederatedQueryConfigFieldTag{}, CALL_CTX()); + Option(nullptr, TCfg::TTracingConfigFieldTag{}, CALL_CTX()); + Option(nullptr, TCfg::TFailureInjectionConfigFieldTag{}, CALL_CTX()); + + ConfigFields.ApplyFields(AppConfig, Env, ConfigUpdateTracer); + + // MessageBus options. + if (!AppConfig.HasMessageBusConfig()) { + MbusConfigFields.InitMessageBusConfig(AppConfig); + TRACE_CONFIG_CHANGE_INPLACE_T(MessageBusConfig, UpdateExplicitly); + } + + TenantName = FillTenantPoolConfig(ConfigFields); + + FillData(ConfigFields); + } + + void FillData(const NConfig::TConfigFields& cf) { + if (cf.TenantName && ScopeId.IsEmpty()) { + const TString myDomain = DeduceNodeDomain(cf); + for (const auto& domain : AppConfig.GetDomainsConfig().GetDomain()) { + if (domain.GetName() == myDomain) { + ScopeId = TKikimrScopeId(0, domain.GetDomainId()); + break; + } + } + } + + if (cf.NodeId) { // FIXME: do we really need it ??? + NodeId = cf.NodeId; + + Labels["node_id"] = ToString(NodeId); + AddLabelToAppConfig("node_id", Labels["node_id"]); + } + + InitDebug.ConfigTransformInfo = ConfigUpdateTracer.Dump(); + ClusterName = AppConfig.GetNameserviceConfig().GetClusterUUID(); + } + + TString FillTenantPoolConfig(const NConfig::TConfigFields& cf) { + auto &slot = *AppConfig.MutableTenantPoolConfig()->AddSlots(); + slot.SetId("static-slot"); + slot.SetIsDynamic(false); + TString tenantName = cf.TenantName ? cf.TenantName.GetRef() : CanonizePath(DeduceNodeDomain(cf)); + slot.SetTenantName(tenantName); + return tenantName; + } + + void SetupLogConfigDefaults(NKikimrConfig::TLogConfig& logConfig) { + ConfigFields.SetupLogConfigDefaults(logConfig, ConfigUpdateTracer); + } + + void AddLabelToAppConfig(const TString& name, const TString& value) { + for (auto &label : *AppConfig.MutableLabels()) { + if (label.GetName() == name) { + label.SetValue(value); + return; + } + } + + auto *label = AppConfig.AddLabels(); + label->SetName(name); + label->SetValue(value); + } + + void InitMemLog(const NKikimrConfig::TMemoryLogConfig& mem) const { + if (mem.HasLogBufferSize() && mem.GetLogBufferSize() > 0) { + if (mem.HasLogGrainSize() && mem.GetLogGrainSize() > 0) { + TMemoryLog::CreateMemoryLogBuffer(mem.GetLogBufferSize(), mem.GetLogGrainSize()); + } else { + TMemoryLog::CreateMemoryLogBuffer(mem.GetLogBufferSize()); + } + MemLogWriteNullTerm("Memory_log_has_been_started_YAHOO_"); + } + } + + template + void Option(const char* optname, TTag tag, NConfig::TCallContext ctx) { + NConfig::TConfigRefs refs{ConfigUpdateTracer, ErrorCollector, ProtoConfigFileProvider}; + MutableConfigPart(refs, optname, tag, BaseConfig, AppConfig, ctx); + } + + template + void Option(const char* optname, TTag tag, TContinuation continuation, NConfig::TCallContext ctx) { + NConfig::TConfigRefs refs{ConfigUpdateTracer, ErrorCollector, ProtoConfigFileProvider}; + if (auto* res = MutableConfigPart(refs, optname, tag, BaseConfig, AppConfig, ctx)) { + (this->*continuation)(*res); + } + } + + template + void OptionMerge(const char* optname, TTag tag, NConfig::TCallContext ctx) { + NConfig::TConfigRefs refs{ConfigUpdateTracer, ErrorCollector, ProtoConfigFileProvider}; + MutableConfigPartMerge(refs, optname, tag, AppConfig, ctx); + } + + void PreFillLabels(const NConfig::TConfigFields& cf) { + Labels["node_id"] = ToString(cf.NodeId); + Labels["node_host"] = Env.FQDNHostName(); + Labels["tenant"] = (cf.TenantName ? cf.TenantName.GetRef() : TString("")); + Labels["node_type"] = cf.NodeType.GetRef(); + // will be replaced with proper version info + Labels["branch"] = GetBranch(); + Labels["rev"] = GetProgramCommitId(); + Labels["dynamic"] = ToString(cf.NodeBrokerAddresses.empty() ? "false" : "true"); + + for (const auto& [name, value] : Labels) { + auto *label = AppConfig.AddLabels(); + label->SetName(name); + label->SetValue(value); + } + } + + void SetupBootstrapConfigDefaults(NKikimrConfig::TBootstrap& bootstrapConfig) { + ConfigFields.SetupBootstrapConfigDefaults(bootstrapConfig, ConfigUpdateTracer); + }; + + void SetupInterconnectConfigDefaults(NKikimrConfig::TInterconnectConfig& icConfig) { + ConfigFields.SetupInterconnectConfigDefaults(icConfig, ConfigUpdateTracer); + }; + + TString DeduceNodeDomain(const NConfig::TConfigFields& cf) const { + if (cf.NodeDomain) { + return cf.NodeDomain; + } + if (AppConfig.GetDomainsConfig().DomainSize() == 1) { + return AppConfig.GetDomainsConfig().GetDomain(0).GetName(); + } + if (AppConfig.GetTenantPoolConfig().SlotsSize() == 1) { + auto &slot = AppConfig.GetTenantPoolConfig().GetSlots(0); + if (slot.GetDomainName()) { + return slot.GetDomainName(); + } + auto &tenantName = slot.GetTenantName(); + if (IsStartWithSlash(tenantName)) { + return ToString(ExtractDomain(tenantName)); + } + } + return ""; + } + + NYdb::NDiscovery::TNodeRegistrationResult RegisterDynamicNodeViaDiscoveryService( + const NConfig::TConfigFields& cf, + const TVector& addrs, + const TString& domainName, + const IEnv& env) const + { + NYdb::NDiscovery::TNodeRegistrationResult result; + const size_t maxNumberReceivedCallUnimplemented = 5; + size_t currentNumberReceivedCallUnimplemented = 0; + while (!result.IsSuccess() && currentNumberReceivedCallUnimplemented < maxNumberReceivedCallUnimplemented) { + for (const auto& addr : addrs) { + auto nrs = cf.GetNodeRegistrationSettings(domainName, cf.NodeHost, cf.NodeAddress, cf.NodeResolveHost, cf.GetSchemePath()); + result = TryToRegisterDynamicNodeViaDiscoveryService( + { + cf.PathToGrpcCertFile, + cf.PathToGrpcCaFile, + cf.PathToGrpcPrivateKeyFile, + }, + addr, + nrs, + env); + if (result.IsSuccess()) { + Cout << "Success. Registered via discovery service as " << result.GetNodeId() << Endl; + break; + } + Cerr << "Registration error: " << static_cast(result) << Endl; + } + if (!result.IsSuccess()) { + env.Sleep(TDuration::Seconds(1)); + if (result.GetStatus() == NYdb::EStatus::CLIENT_CALL_UNIMPLEMENTED) { + currentNumberReceivedCallUnimplemented++; + } + } + } + return result; + } + + void ProcessRegistrationDynamicNodeResult(const NYdb::NDiscovery::TNodeRegistrationResult& result) { + NodeId = result.GetNodeId(); + NActors::TScopeId scopeId; + if (result.HasScopeTabletId() && result.HasScopePathId()) { + scopeId.first = result.GetScopeTabletId(); + scopeId.second = result.GetScopePathId(); + } + ScopeId = TKikimrScopeId(scopeId); + + auto &nsConfig = *AppConfig.MutableNameserviceConfig(); + nsConfig.ClearNode(); + + auto &dnConfig = *AppConfig.MutableDynamicNodeConfig(); + for (auto &node : result.GetNodes()) { + if (node.NodeId == result.GetNodeId()) { + auto &nodeInfo = *dnConfig.MutableNodeInfo(); + nodeInfo.SetNodeId(node.NodeId); + nodeInfo.SetHost(node.Host); + nodeInfo.SetPort(node.Port); + nodeInfo.SetResolveHost(node.ResolveHost); + nodeInfo.SetAddress(node.Address); + nodeInfo.SetExpire(node.Expire); + NConfig::CopyNodeLocation(nodeInfo.MutableLocation(), node.Location); + } else { + auto &info = *nsConfig.AddNode(); + info.SetNodeId(node.NodeId); + info.SetAddress(node.Address); + info.SetPort(node.Port); + info.SetHost(node.Host); + info.SetInterconnectHost(node.ResolveHost); + NConfig::CopyNodeLocation(info.MutableLocation(), node.Location); + } + } + } + + void ProcessRegistrationDynamicNodeResult(const THolder& result) { + NodeId = result->GetNodeId(); + ScopeId = TKikimrScopeId(result->GetScopeId()); + + auto &nsConfig = *AppConfig.MutableNameserviceConfig(); + nsConfig.ClearNode(); + + auto &dnConfig = *AppConfig.MutableDynamicNodeConfig(); + for (auto &node : result->Record().GetNodes()) { + if (node.GetNodeId() == result->GetNodeId()) { + dnConfig.MutableNodeInfo()->CopyFrom(node); + } else { + auto &info = *nsConfig.AddNode(); + info.SetNodeId(node.GetNodeId()); + info.SetAddress(node.GetAddress()); + info.SetPort(node.GetPort()); + info.SetHost(node.GetHost()); + info.SetInterconnectHost(node.GetResolveHost()); + info.MutableLocation()->CopyFrom(node.GetLocation()); + } + } + } + + void ProcessRegistrationDynamicNodeResult(const std::variant>& result) { + std::visit([this](const auto& res){ ProcessRegistrationDynamicNodeResult(res); }, result); + } + + void RegisterDynamicNode(NConfig::TConfigFields& cf) { + TVector addrs; + + cf.FillClusterEndpoints(AppConfig, addrs); + + if (!cf.InterconnectPort) { + ythrow yexception() << "Either --node or --ic-port must be specified"; + } + + if (addrs.empty()) { + ythrow yexception() << "List of Node Broker end-points is empty"; + } + + TString domainName = DeduceNodeDomain(cf); + + if (!cf.NodeHost) { + cf.NodeHost = Env.FQDNHostName(); + } + + if (!cf.NodeResolveHost) { + cf.NodeResolveHost = cf.NodeHost; + } + + const TNodeRegistrationSettings rs { + domainName, + cf.NodeHost, + cf.NodeAddress, + cf.NodeResolveHost, + cf.GetSchemePath(), + cf.FixedNodeID, + cf.InterconnectPort, + }; + + const TNodeRegistrationGrpcSettings rgs { + cf.PathToGrpcCertFile, + cf.PathToGrpcCaFile, + cf.PathToGrpcPrivateKeyFile, + }; + + std::variant< + NYdb::NDiscovery::TNodeRegistrationResult, + THolder> result = RegisterDynamicNodeViaDiscoveryService( + cf, + addrs, + domainName, + Env); + + if (!std::get(result).IsSuccess()) { + result = RegisterDynamicNodeViaLegacyService(rgs, cf.CreateNodeLocation(), addrs, rs, Env); + } + + return ProcessRegistrationDynamicNodeResult(result); + } + + void ApplyConfigForNode(NKikimrConfig::TAppConfig &appConfig) { + AppConfig.Swap(&appConfig); + // Dynamic node config is defined by options and Node Broker response. + AppConfig.MutableDynamicNodeConfig()->Swap(appConfig.MutableDynamicNodeConfig()); + // By now naming config should be loaded and probably replaced with + // info from registration response. Don't lose it in case CMS has no + // config for naming service. + if (!AppConfig.HasNameserviceConfig()) { + AppConfig.MutableNameserviceConfig()->Swap(appConfig.MutableNameserviceConfig()); + // FIXME(innokentii) + // RunConfig.ConfigInitInfo[NKikimrConsole::TConfigItem::NameserviceConfigItem].Updates.pop_back(); + } + } + + bool TryToLoadConfigForDynamicNodeFromCMS( + const NConfig::TConfigFields& cf, + const TString &addr, + TMaybe& res, + TString &error) const + { + NClient::TKikimr kikimr(GetKikimr( + { + cf.PathToGrpcCertFile, + cf.PathToGrpcCaFile, + cf.PathToGrpcPrivateKeyFile, + }, + addr, + Env)); + auto configurator = kikimr.GetNodeConfigurator(); + + Cout << "Trying to get configs from " << addr << Endl; + + auto result = configurator.SyncGetNodeConfig(NodeId, + Env.FQDNHostName(), + cf.TenantName.GetRef(), + cf.NodeType.GetRef(), + DeduceNodeDomain(cf), + AppConfig.GetAuthConfig().GetStaffApiUserToken(), + true, + 1); + + if (!result.IsSuccess()) { + error = result.GetErrorMessage(); + Cerr << "Configuration error: " << error << Endl; + return false; + } + + Cout << "Success." << Endl; + + res = result; + + return true; + } + + TMaybe LoadConfigForDynamicNode(const NConfig::TConfigFields& cf) const { + TMaybe res; + bool success = false; + TString error; + TVector addrs; + + cf.FillClusterEndpoints(AppConfig, addrs); + + SetRandomSeed(TInstant::Now().MicroSeconds()); + int minAttempts = 10; + int attempts = 0; + while (!success && attempts < minAttempts) { + for (auto addr : addrs) { + success = TryToLoadConfigForDynamicNodeFromCMS(cf, addr, res, error); + ++attempts; + if (success) { + break; + } + } + // Randomized backoff + if (!success) { + Env.Sleep(TDuration::MilliSeconds(500 + RandomNumber(1000))); + } + } + + if (!success) { + Cerr << "WARNING: couldn't load config from CMS: " << error << Endl; + } + + return res; + } + + void InitStaticNode() { + ConfigFields.ValidateStaticNodeConfig(); + + Labels["dynamic"] = "false"; + } + + static ui32 NextValidKind(ui32 kind) { + do { + ++kind; + if (kind != NKikimrConsole::TConfigItem::Auto && NKikimrConsole::TConfigItem::EKind_IsValid(kind)) { + break; + } + } while (kind <= NKikimrConsole::TConfigItem::EKind_MAX); + return kind; + } + + static bool HasCorrespondingManagedKind(ui32 kind, const NKikimrConfig::TAppConfig& appConfig) { + return (kind == NKikimrConsole::TConfigItem::NameserviceConfigItem && appConfig.HasNameserviceConfig()) || + (kind == NKikimrConsole::TConfigItem::NetClassifierDistributableConfigItem && appConfig.HasNetClassifierDistributableConfig()) || + (kind == NKikimrConsole::TConfigItem::NamedConfigsItem && appConfig.NamedConfigsSize()); + } + + NKikimrConfig::TAppConfig GetYamlConfigFromResult(const NKikimr::NClient::TConfigurationResult& result, const TMap& labels) const { + NKikimrConfig::TAppConfig yamlConfig; + if (result.HasYamlConfig() && !result.GetYamlConfig().empty()) { + NYamlConfig::ResolveAndParseYamlConfig( + result.GetYamlConfig(), + result.GetVolatileYamlConfigs(), + labels, + yamlConfig); + } + return yamlConfig; + } + + NKikimrConfig::TAppConfig GetActualDynConfig(const NKikimrConfig::TAppConfig& yamlConfig, const NKikimrConfig::TAppConfig& regularConfig) const { + if (yamlConfig.GetYamlConfigEnabled()) { + for (ui32 kind = NKikimrConsole::TConfigItem::EKind_MIN; kind <= NKikimrConsole::TConfigItem::EKind_MAX; NextValidKind(kind)) { + if (HasCorrespondingManagedKind(kind, yamlConfig)) { + TRACE_CONFIG_CHANGE_INPLACE(kind, ReplaceConfigWithConsoleProto); + } else { + TRACE_CONFIG_CHANGE_INPLACE(kind, ReplaceConfigWithConsoleYaml); + } + } + + return yamlConfig; + } + + for (ui32 kind = NKikimrConsole::TConfigItem::EKind_MIN; kind <= NKikimrConsole::TConfigItem::EKind_MAX; NextValidKind(kind)) { + TRACE_CONFIG_CHANGE_INPLACE(kind, ReplaceConfigWithConsoleProto); + } + + return regularConfig; + } + + void InitDynamicNode() { + Labels["dynamic"] = "true"; + RegisterDynamicNode(ConfigFields); + + Labels["node_id"] = ToString(NodeId); + AddLabelToAppConfig("node_id", Labels["node_id"]); + + if (ConfigFields.IgnoreCmsConfigs) { + return; + } + + TMaybe result = LoadConfigForDynamicNode(ConfigFields); + + if (!result) { + return; + } + + NKikimrConfig::TAppConfig yamlConfig = GetYamlConfigFromResult(*result, Labels); + NYamlConfig::ReplaceUnmanagedKinds(result->GetConfig(), yamlConfig); + + InitDebug.OldConfig.CopyFrom(result->GetConfig()); + InitDebug.YamlConfig.CopyFrom(yamlConfig); + + NKikimrConfig::TAppConfig appConfig = GetActualDynConfig(yamlConfig, result->GetConfig()); + + ApplyConfigForNode(appConfig); + } + + void RegisterCliOptions(NLastGetopt::TOpts& opts) override { + ConfigFields.RegisterCliOptions(opts); + MbusConfigFields.RegisterCliOptions(opts); + opts.AddLongOption("label", "labels for this node") + .Optional().RequiredArgument("KEY=VALUE") + .KVHandler([&](TString key, TString val) { + Labels[key] = val; + }); + + opts.SetFreeArgDefaultTitle("PATH", "path to protobuf file; files are merged in order in which they are enlisted"); + } +}; + +std::unique_ptr MakeDefaultInitialConfigurator( + NConfig::IErrorCollector& errorCollector, + NConfig::IProtoConfigFileProvider& protoConfigFileProvider, + NConfig::IConfigUpdateTracer& configUpdateTracer, + NConfig::IEnv& env) +{ + return std::make_unique(errorCollector, protoConfigFileProvider, configUpdateTracer, env); +} + +} // namespace NKikimr::NConfig diff --git a/ydb/core/config/init/ya.make b/ydb/core/config/init/ya.make new file mode 100644 index 000000000000..2b1c83ed92ab --- /dev/null +++ b/ydb/core/config/init/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + init.h + init.cpp +) + +PEERDIR( + ydb/core/protos + ydb/core/base + ydb/library/yaml_config +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/config/ya.make b/ydb/core/config/ya.make index 652ae9aba9f8..365cd995b6a1 100644 --- a/ydb/core/config/ya.make +++ b/ydb/core/config/ya.make @@ -1,4 +1,5 @@ RECURSE( + init protos tools utils diff --git a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp index 43210bddce2a..d35ac7c31fee 100644 --- a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp +++ b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp @@ -1,1350 +1,62 @@ #include "cli.h" #include "cli_cmds.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -extern TAutoPtr DummyActorSystemConfig(); -extern TAutoPtr DummyAllocatorConfig(); - -namespace NKikimr { -namespace NDriverClient { - -struct TCallContext { - const char* File; - int Line; -}; -#define TRACE_CONFIG_CHANGE(CHANGE_CONTEXT, KIND, CHANGE_KIND) \ - RunConfig.ConfigInitInfo[KIND].Updates.emplace_back( \ - TConfigItemInfo::TUpdate{CHANGE_CONTEXT.File, static_cast(CHANGE_CONTEXT.Line), TConfigItemInfo::EUpdateKind:: CHANGE_KIND}) - -#define TRACE_CONFIG_CHANGE_INPLACE(KIND, CHANGE_KIND) \ - RunConfig.ConfigInitInfo[KIND].Updates.emplace_back( \ - TConfigItemInfo::TUpdate{__FILE__, static_cast(__LINE__), TConfigItemInfo::EUpdateKind:: CHANGE_KIND}) +#include +#include -#define TRACE_CONFIG_CHANGE_INPLACE_T(KIND, CHANGE_KIND) \ - RunConfig.ConfigInitInfo[NKikimrConsole::TConfigItem:: KIND ## Item].Updates.emplace_back( \ - TConfigItemInfo::TUpdate{__FILE__, static_cast(__LINE__), TConfigItemInfo::EUpdateKind:: CHANGE_KIND}) +#include -#define CALL_CTX() TCallContext{__FILE__, __LINE__} +namespace NKikimr::NDriverClient { +class TClientCommandServer : public TClientCommand { +public: + TClientCommandServer(std::shared_ptr factories) + : TClientCommand("server", {}, "Execute YDB server") + , Factories(std::move(factories)) + , ErrorCollector(NConfig::MakeDefaultErrorCollector()) + , ProtoConfigFileProvider(NConfig::MakeDefaultProtoConfigFileProvider()) + , ConfigUpdateTracer(NConfig::MakeDefaultConfigUpdateTracer()) + , Env(NConfig::MakeDefaultEnv()) + , InitCfg(*ErrorCollector, *ProtoConfigFileProvider, *ConfigUpdateTracer, *Env) + {} -constexpr auto NODE_KIND_YDB = "ydb"; -constexpr auto NODE_KIND_YQ = "yq"; + int Run(TConfig &/*config*/) override { + NKikimrConfig::TAppConfig appConfig; -class TClientCommandServerBase : public TClientCommand { + TKikimrRunConfig RunConfig(appConfig); + // FIXME: fill from InitCfg + Y_ABORT_UNLESS(RunConfig.NodeId); + return MainRun(RunConfig, Factories); + } protected: - NKikimrConfig::TAppConfig BaseConfig; - NKikimrConfig::TAppConfig AppConfig; - TKikimrRunConfig RunConfig; - - ui32 LogLevel; // log settings - ui32 LogSamplingLevel; // log settings - ui32 LogSamplingRate; // log settings - TString LogFormat;// log settings - TString SysLogServiceTag; //unique tags for sys logs - TString LogFileName; // log file name to initialize file log backend - TString ClusterName; // log settings + std::shared_ptr Factories; - ui32 NodeId; - TString NodeIdValue; - ui32 DefaultInterconnectPort = 19001; - ui32 BusProxyPort; - NBus::TBusQueueConfig ProxyBusQueueConfig; - NBus::TBusServerSessionConfig ProxyBusSessionConfig; - TVector ProxyBindToProxy; - ui32 MonitoringPort; - TString MonitoringAddress; - ui32 MonitoringThreads; - TString MonitoringCertificateFile; - TString RestartsCountFile; - TString TracePath; - size_t CompileInflightLimit; // MiniKQLCompileService - TString UDFsDir; - TVector UDFsPaths; - TString TenantName; - TVector NodeBrokerAddresses; - ui32 NodeBrokerPort; - bool NodeBrokerUseTls; - bool FixedNodeID; - bool IgnoreCmsConfigs; - bool TinyMode; - TString NodeAddress; - TString NodeHost; - TString NodeResolveHost; - TString NodeDomain; - ui32 InterconnectPort; - ui32 SqsHttpPort; - TString NodeKind = NODE_KIND_YDB; - TString NodeType; - TString DataCenter; - TString Rack; - ui32 Body; - ui32 GRpcPort; - ui32 GRpcsPort; - TString GRpcPublicHost; - ui32 GRpcPublicPort; - ui32 GRpcsPublicPort; - TVector GRpcPublicAddressesV4; - TVector GRpcPublicAddressesV6; - TString GRpcPublicTargetNameOverride; - TString PathToGrpcCertFile; - TString PathToInterconnectCertFile; - TString PathToGrpcPrivateKeyFile; - TString PathToInterconnectPrivateKeyFile; - TString PathToGrpcCaFile; - TString PathToInterconnectCaFile; - TString YamlConfigFile; + std::unique_ptr ErrorCollector; + std::unique_ptr ProtoConfigFileProvider; + std::unique_ptr ConfigUpdateTracer; + std::unique_ptr Env; - TClientCommandServerBase(const char *cmd, const char *description) - : TClientCommand(cmd, {}, description) - , RunConfig(AppConfig) - {} + NConfig::TInitialConfigurator InitCfg; - virtual void Config(TConfig& config) override { + void Config(TConfig& config) override { TClientCommand::Config(config); - LogLevel = NActors::NLog::PRI_WARN; - LogSamplingLevel = NActors::NLog::PRI_DEBUG; - LogSamplingRate = 0; - - NodeId = 0; - NodeIdValue = ""; - BusProxyPort = NMsgBusProxy::TProtocol::DefaultPort; - MonitoringPort = 0; - MonitoringThreads = 10; - RestartsCountFile = ""; - CompileInflightLimit = 100000; - TenantName = ""; - NodeBrokerPort = 0; - NodeBrokerUseTls = false; - FixedNodeID = false; - InterconnectPort = 0; - SqsHttpPort = 0; - IgnoreCmsConfigs = false; - DataCenter = ""; - Rack = ""; - Body = 0; - GRpcPort = 0; - GRpcsPort = 0; - GRpcPublicHost = ""; - GRpcPublicPort = 0; - GRpcsPublicPort = 0; - GRpcPublicAddressesV4.clear(); - GRpcPublicAddressesV6.clear(); - GRpcPublicTargetNameOverride = ""; - - config.Opts->AddLongOption("cluster-name", "which cluster this node belongs to") - .DefaultValue("unknown").OptionalArgument("STR").StoreResult(&ClusterName); - config.Opts->AddLongOption("log-level", "default logging level").OptionalArgument("1-7") - .DefaultValue(ToString(LogLevel)).StoreResult(&LogLevel); - config.Opts->AddLongOption("log-sampling-level", "sample logs equal to or above this level").OptionalArgument("1-7") - .DefaultValue(ToString(LogSamplingLevel)).StoreResult(&LogSamplingLevel); - config.Opts->AddLongOption("log-sampling-rate", - "log only each Nth message with priority matching sampling level; 0 turns log sampling off") - .OptionalArgument(Sprintf("0,%" PRIu32, Max())) - .DefaultValue(ToString(LogSamplingRate)).StoreResult(&LogSamplingRate); - config.Opts->AddLongOption("log-format", "log format to use; short skips the priority and timestamp") - .DefaultValue("full").OptionalArgument("full|short|json").StoreResult(&LogFormat); - config.Opts->AddLongOption("syslog", "send to syslog instead of stderr").NoArgument(); - config.Opts->AddLongOption("syslog-service-tag", "unique tag for syslog").RequiredArgument("NAME").StoreResult(&SysLogServiceTag); - config.Opts->AddLongOption("log-file-name", "file name for log backend").RequiredArgument("NAME").StoreResult(&LogFileName); - config.Opts->AddLongOption("tcp", "start tcp interconnect").NoArgument(); - config.Opts->AddLongOption('n', "node", "Node ID or 'static' to auto-detect using naming file and ic-port.") - .RequiredArgument("[NUM|static]").StoreResult(&NodeIdValue); - config.Opts->AddLongOption("node-broker", "node broker address host:port") - .RequiredArgument("ADDR").AppendTo(&NodeBrokerAddresses); - config.Opts->AddLongOption("node-broker-port", "node broker port (hosts from naming file are used)") - .RequiredArgument("PORT").StoreResult(&NodeBrokerPort); - config.Opts->AddLongOption("node-broker-use-tls", "use tls for node broker (hosts from naming file are used)") - .RequiredArgument("PORT").StoreResult(&NodeBrokerUseTls); - config.Opts->AddLongOption("node-address", "address for dynamic node") - .RequiredArgument("ADDR").StoreResult(&NodeAddress); - config.Opts->AddLongOption("node-host", "hostname for dynamic node") - .RequiredArgument("NAME").StoreResult(&NodeHost); - config.Opts->AddLongOption("node-resolve-host", "resolve hostname for dynamic node") - .RequiredArgument("NAME").StoreResult(&NodeResolveHost); - config.Opts->AddLongOption("node-domain", "domain for dynamic node to register in") - .RequiredArgument("NAME").StoreResult(&NodeDomain); - config.Opts->AddLongOption("ic-port", "interconnect port") - .RequiredArgument("NUM").StoreResult(&InterconnectPort); - config.Opts->AddLongOption("sqs-port", "sqs port") - .RequiredArgument("NUM").StoreResult(&SqsHttpPort); - config.Opts->AddLongOption("proxy", "Bind to proxy(-ies)").RequiredArgument("ADDR").AppendTo(&ProxyBindToProxy); - config.Opts->AddLongOption("tenant", "add binding for Local service to specified tenant, might be one of {'/', '//'}") - .RequiredArgument("NAME").StoreResult(&TenantName); - config.Opts->AddLongOption("mon-port", "Monitoring port").OptionalArgument("NUM").StoreResult(&MonitoringPort); - config.Opts->AddLongOption("mon-address", "Monitoring address").OptionalArgument("ADDR").StoreResult(&MonitoringAddress); - config.Opts->AddLongOption("mon-cert", "Monitoring certificate (https)").OptionalArgument("PATH").StoreResult(&MonitoringCertificateFile); - config.Opts->AddLongOption("mon-threads", "Monitoring http server threads").RequiredArgument("NUM").StoreResult(&MonitoringThreads); - config.Opts->AddLongOption("suppress-version-check", "Suppress version compatibility checking via IC").NoArgument(); - - config.Opts->AddLongOption("sys-file", "actor system config file (use dummy config by default)").OptionalArgument("PATH"); - config.Opts->AddLongOption("naming-file", "static nameservice config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("domains-file", "domain config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("bs-file", "blobstorage config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("log-file", "log config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("ic-file", "interconnect config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("channels-file", "tablet channel profile config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("vdisk-file", "vdisk kind config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("drivemodel-file", "drive model config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("grpc-file", "gRPC config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("grpc-port", "enable gRPC server on port").RequiredArgument("PORT").StoreResult(&GRpcPort); - config.Opts->AddLongOption("grpcs-port", "enable gRPC SSL server on port").RequiredArgument("PORT").StoreResult(&GRpcsPort); - config.Opts->AddLongOption("grpc-public-host", "set public gRPC host for discovery").RequiredArgument("HOST").StoreResult(&GRpcPublicHost); - config.Opts->AddLongOption("grpc-public-port", "set public gRPC port for discovery").RequiredArgument("PORT").StoreResult(&GRpcPublicPort); - config.Opts->AddLongOption("grpcs-public-port", "set public gRPC SSL port for discovery").RequiredArgument("PORT").StoreResult(&GRpcsPublicPort); - config.Opts->AddLongOption("grpc-public-address-v4", "set public ipv4 address for discovery").RequiredArgument("ADDR").EmplaceTo(&GRpcPublicAddressesV4); - config.Opts->AddLongOption("grpc-public-address-v6", "set public ipv6 address for discovery").RequiredArgument("ADDR").EmplaceTo(&GRpcPublicAddressesV6); - config.Opts->AddLongOption("grpc-public-target-name-override", "set public hostname override for TLS in discovery").RequiredArgument("HOST").StoreResult(&GRpcPublicTargetNameOverride); - config.Opts->AddLongOption("kqp-file", "Kikimr Query Processor config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("incrhuge-file", "incremental huge blob keeper config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("memorylog-file", "set buffer size for memory log").OptionalArgument("PATH"); - config.Opts->AddLongOption("pq-file", "PersQueue config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("pqcd-file", "PersQueue cluster discovery config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("netclassifier-file", "NetClassifier config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("auth-file", "authorization configuration").OptionalArgument("PATH"); - config.Opts->AddLongOption("auth-token-file", "authorization token configuration").OptionalArgument("PATH"); - config.Opts->AddLongOption("key-file", "tanant encryption key configuration").OptionalArgument("PATH"); - config.Opts->AddLongOption("pdisk-key-file", "pdisk encryption key configuration").OptionalArgument("PATH"); - config.Opts->AddLongOption("sqs-file", "SQS config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("bootstrap-file", "Bootstrap config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("dyn-nodes-file", "Dynamic nodes config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("cms-file", "CMS config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("alloc-file", "Allocator config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("yql-file", "Yql Analytics config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("fq-file", "Federated Query config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("feature-flags-file", "File with feature flags to turn new features on/off").OptionalArgument("PATH"); - config.Opts->AddLongOption("rb-file", "File with resource broker customizations").OptionalArgument("PATH"); - config.Opts->AddLongOption("metering-file", "File with metering config").OptionalArgument("PATH"); - config.Opts->AddLongOption("audit-file", "File with audit config").OptionalArgument("PATH"); - config.Opts->AddLongOption('r', "restarts-count-file", "State for restarts monitoring counter,\nuse empty string to disable\n") - .OptionalArgument("PATH").DefaultValue(RestartsCountFile).StoreResult(&RestartsCountFile); - config.Opts->AddLongOption("compile-inflight-limit", "Limit on parallel programs compilation").OptionalArgument("NUM").StoreResult(&CompileInflightLimit); - config.Opts->AddLongOption("udf", "Load shared library with UDF by given path").AppendTo(&UDFsPaths); - config.Opts->AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory").StoreResult(&UDFsDir); - config.Opts->AddLongOption("node-kind", Sprintf("Kind of the node (affects list of services activated allowed values are {'%s', '%s'} )", NODE_KIND_YDB, NODE_KIND_YQ)) - .RequiredArgument("NAME").StoreResult(&NodeKind); - config.Opts->AddLongOption("node-type", "Type of the node") - .RequiredArgument("NAME").StoreResult(&NodeType); - config.Opts->AddLongOption("ignore-cms-configs", "Don't load configs from CMS") - .NoArgument().SetFlag(&IgnoreCmsConfigs); - config.Opts->AddLongOption("cert", "Path to client certificate file (PEM) for interconnect").RequiredArgument("PATH").StoreResult(&PathToInterconnectCertFile); - config.Opts->AddLongOption("grpc-cert", "Path to client certificate file (PEM) for grpc").RequiredArgument("PATH").StoreResult(&PathToGrpcCertFile); - config.Opts->AddLongOption("ic-cert", "Path to client certificate file (PEM) for interconnect").RequiredArgument("PATH").StoreResult(&PathToInterconnectCertFile); - config.Opts->AddLongOption("key", "Path to private key file (PEM) for interconnect").RequiredArgument("PATH").StoreResult(&PathToInterconnectPrivateKeyFile); - config.Opts->AddLongOption("grpc-key", "Path to private key file (PEM) for grpc").RequiredArgument("PATH").StoreResult(&PathToGrpcPrivateKeyFile); - config.Opts->AddLongOption("ic-key", "Path to private key file (PEM) for interconnect").RequiredArgument("PATH").StoreResult(&PathToInterconnectPrivateKeyFile); - config.Opts->AddLongOption("ca", "Path to certificate authority file (PEM) for interconnect").RequiredArgument("PATH").StoreResult(&PathToInterconnectCaFile); - config.Opts->AddLongOption("grpc-ca", "Path to certificate authority file (PEM) for grpc").RequiredArgument("PATH").StoreResult(&PathToGrpcCaFile); - config.Opts->AddLongOption("ic-ca", "Path to certificate authority file (PEM) for interconnect").RequiredArgument("PATH").StoreResult(&PathToInterconnectCaFile); - config.Opts->AddLongOption("data-center", "data center name (used to describe dynamic node location)") - .RequiredArgument("NAME").StoreResult(&DataCenter); - config.Opts->AddLongOption("rack", "rack name (used to describe dynamic node location)") - .RequiredArgument("NAME").StoreResult(&Rack); - config.Opts->AddLongOption("body", "body name (used to describe dynamic node location)") - .RequiredArgument("NUM").StoreResult(&Body); - config.Opts->AddLongOption("yaml-config", "Yaml config").OptionalArgument("PATH").StoreResult(&YamlConfigFile); - config.Opts->AddLongOption("http-proxy-file", "Http proxy config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("public-http-file", "Public HTTP config file").OptionalArgument("PATH"); - - config.Opts->AddLongOption("tiny-mode", "Start in a tiny mode") - .NoArgument().SetFlag(&TinyMode); - - config.Opts->AddHelpOption('h'); - - // add messagebus proxy options - config.Opts->AddLongOption("mbus", "Start MessageBus proxy").NoArgument(); - config.Opts->AddLongOption("mbus-port", "MessageBus proxy port").RequiredArgument("PORT").StoreResult(&BusProxyPort); - config.Opts->AddLongOption("mbus-trace-path", "Path for trace files").RequiredArgument("PATH").StoreResult(&TracePath); - SetMsgBusDefaults(ProxyBusSessionConfig, ProxyBusQueueConfig); - ProxyBusSessionConfig.ConfigureLastGetopt(*config.Opts, "mbus-"); - ProxyBusQueueConfig.ConfigureLastGetopt(*config.Opts, "mbus-"); - - config.Opts->AddLongOption("label", "labels for this node") - .Optional().RequiredArgument("KEY=VALUE") - .KVHandler([&](TString key, TString val) { - RunConfig.Labels[key] = val; - }); - + InitCfg.RegisterCliOptions(*config.Opts); + ProtoConfigFileProvider->RegisterCliOptions(*config.Opts); config.SetFreeArgsMin(0); - config.Opts->SetFreeArgDefaultTitle("PATH", "path to protobuf file; files are merged in order in which they are enlisted"); - } - - template - auto MutableConfigPart(TConfig& config, const char *optname, - TFieldTag tag, - TCallContext callCtx) -> decltype((AppConfig.*std::get<2>(NKikimrConfig::TAppConfig::GetFieldAccessorsByFieldTag(tag)))()) - { - auto [hasConfig, getConfig, mutableConfig] = NKikimrConfig::TAppConfig::GetFieldAccessorsByFieldTag(tag); - ui32 kind = NKikimrConfig::TAppConfig::GetFieldIdByFieldTag(tag); - - typename std::remove_reference::type *res = nullptr; - if ((AppConfig.*hasConfig)()) { - return nullptr; // this field is already provided in AppConfig, so we don't overwrite it - } - - if (optname && config.ParseResult->Has(optname)) { - const bool success = ParsePBFromFile(config.ParseResult->Get(optname), res = (AppConfig.*mutableConfig)()); - Y_ABORT_UNLESS(success); - TRACE_CONFIG_CHANGE(callCtx, kind, MutableConfigPartFromFile); - } else if ((BaseConfig.*hasConfig)()) { - res = (AppConfig.*mutableConfig)(); - res->CopyFrom((BaseConfig.*getConfig)()); - TRACE_CONFIG_CHANGE(callCtx, kind, MutableConfigPartFromBaseConfig); - } - - return res; - } - - template - auto MutableConfigPartMerge(TConfig& config, const char *optname, - TFieldTag tag, - TCallContext callCtx) -> decltype((AppConfig.*std::get<2>(NKikimrConfig::TAppConfig::GetFieldAccessorsByFieldTag(tag)))()) - { - auto mutableConfig = std::get<2>(NKikimrConfig::TAppConfig::GetFieldAccessorsByFieldTag(tag)); - ui32 kind = NKikimrConfig::TAppConfig::GetFieldIdByFieldTag(tag); - - typename std::remove_reference::type *res = nullptr; - if (config.ParseResult->Has(optname)) { - typename std::remove_reference::type cfg; - bool success = ParsePBFromFile(config.ParseResult->Get(optname), &cfg); - Y_ABORT_UNLESS(success); - res = (AppConfig.*mutableConfig)(); - res->MergeFrom(cfg); - TRACE_CONFIG_CHANGE(callCtx, kind, MutableConfigPartMergeFromFile); - } - - return res; - } - - ui32 FindStaticNodeId() const { - std::vector candidates = {HostName(), FQDNHostName()}; - for(auto& candidate: candidates) { - candidate.to_lower(); - - const NKikimrConfig::TStaticNameserviceConfig& nameserviceConfig = AppConfig.GetNameserviceConfig(); - for (const auto& node : nameserviceConfig.GetNode()) { - if (node.GetHost() == candidate && InterconnectPort == node.GetPort()) { - return node.GetNodeId(); - } - } - } - - return 0; - } - - void AddLabelToAppConfig(const TString& name, const TString& value) { - for (auto &label : *AppConfig.MutableLabels()) { - if (label.GetName() == name) { - label.SetValue(value); - return; - } - } - - auto *label = AppConfig.AddLabels(); - label->SetName(name); - label->SetValue(value); + config.Opts->AddHelpOption('h'); } - virtual void Parse(TConfig& config) override { + void Parse(TConfig& config) override { TClientCommand::Parse(config); - - using TCfg = NKikimrConfig::TAppConfig; - - MutableConfigPart(config, "auth-file", TCfg::TAuthConfigFieldTag{}, CALL_CTX()); - LoadBootstrapConfig(config); - LoadYamlConfig(CALL_CTX()); - MutableConfigPartMerge(config, "auth-token-file", TCfg::TAuthConfigFieldTag{}, CALL_CTX()); - - // start memorylog as soon as possible - if (auto mem = MutableConfigPart(config, "memorylog-file", TCfg::TMemoryLogConfigFieldTag{}, CALL_CTX())) { - if (mem->HasLogBufferSize() && mem->GetLogBufferSize() > 0) { - if (mem->HasLogGrainSize() && mem->GetLogGrainSize() > 0) { - TMemoryLog::CreateMemoryLogBuffer(mem->GetLogBufferSize(), mem->GetLogGrainSize()); - } else { - TMemoryLog::CreateMemoryLogBuffer(mem->GetLogBufferSize()); - } - MemLogWriteNullTerm("Memory_log_has_been_started_YAHOO_"); - } - } - - MutableConfigPart(config, "naming-file", TCfg::TNameserviceConfigFieldTag{}, CALL_CTX()); - - if (config.ParseResult->Has("node")) { - if (NodeIdValue == "static") { - if (!AppConfig.HasNameserviceConfig() || !InterconnectPort) - ythrow yexception() << "'--node static' requires naming file and IC port to be specified"; - try { - NodeId = FindStaticNodeId(); - } catch(TSystemError& e) { - ythrow yexception() << "cannot detect host name: " << e.what(); - } - if (!NodeId) - ythrow yexception() << "cannot detect node ID for " << HostName() << ":" << InterconnectPort - << " and for " << FQDNHostName() << ":" << InterconnectPort << Endl; - Cout << "Determined node ID: " << NodeId << Endl; - } else { - if (!TryFromString(NodeIdValue, NodeId)) - ythrow yexception() << "wrong '--node' value (should be NUM, 'static')"; - } - } - - if (config.ParseResult->Has("tenant")) { - if (!IsStartWithSlash(TenantName)) { - ythrow yexception() << "lead / in --tenant parametr is always required."; - } - if (NodeId && NodeKind != NODE_KIND_YQ) { - ythrow yexception() << "opt '--node' compatible only with '--tenant no', opt 'node' incompatible with any other values of opt '--tenant'"; - } - } - - if (NodeKind == NODE_KIND_YDB) { - // do nothing => default behaviour - } else if (NodeKind == NODE_KIND_YQ) { - RunConfig.ServicesMask.DisableAll(); - RunConfig.ServicesMask.EnableYQ(); - } else { - ythrow yexception() << "wrong '--node-kind' value '" << NodeKind << "', only '" << NODE_KIND_YDB << "' or '" << NODE_KIND_YQ << "' is allowed"; - } - - if (TinyMode) { - RunConfig.ServicesMask.SetTinyMode(); - } - - RunConfig.Labels["node_id"] = ToString(NodeId); - RunConfig.Labels["node_host"] = FQDNHostName(); - RunConfig.Labels["tenant"] = TenantName; - RunConfig.Labels["node_type"] = NodeType; - // will be replaced with proper version info - RunConfig.Labels["branch"] = GetBranch(); - RunConfig.Labels["rev"] = GetProgramCommitId(); - RunConfig.Labels["dynamic"] = ToString(NodeBrokerAddresses.empty() ? "false" : "true"); - - for (const auto& [name, value] : RunConfig.Labels) { - auto *label = AppConfig.AddLabels(); - label->SetName(name); - label->SetValue(value); - } - - // static node - if (NodeBrokerAddresses.empty() && !NodeBrokerPort) { - if (!NodeId) { - ythrow yexception() << "Either --node [NUM|'static'] or --node-broker[-port] should be specified"; - } - } else { - RegisterDynamicNode(); - - RunConfig.Labels["node_id"] = ToString(RunConfig.NodeId); - AddLabelToAppConfig("node_id", RunConfig.Labels["node_id"]); - - if (!IgnoreCmsConfigs) { - LoadConfigForDynamicNode(); - } - } - - LoadYamlConfig(CALL_CTX()); - - MutableConfigPart(config, "sys-file", TCfg::TActorSystemConfigFieldTag{}, CALL_CTX()); - if (!AppConfig.HasActorSystemConfig()) { - AppConfig.MutableActorSystemConfig()->CopyFrom(*DummyActorSystemConfig()); - TRACE_CONFIG_CHANGE_INPLACE_T(ActorSystemConfig, SetExplicitly); - } - - MutableConfigPart(config, "domains-file", TCfg::TDomainsConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "bs-file", TCfg::TBlobStorageConfigFieldTag{}, CALL_CTX()); - - if (auto logConfig = MutableConfigPart(config, "log-file", TCfg::TLogConfigFieldTag{}, CALL_CTX())) { - if (config.ParseResult->Has("syslog")) - logConfig->SetSysLog(true); - if (config.ParseResult->Has("log-level")) - logConfig->SetDefaultLevel(LogLevel); - if (config.ParseResult->Has("log-sampling-level")) - logConfig->SetDefaultSamplingLevel(LogSamplingLevel); - if (config.ParseResult->Has("log-sampling-rate")) - logConfig->SetDefaultSamplingRate(LogSamplingRate); - if (config.ParseResult->Has("log-format")) - logConfig->SetFormat(LogFormat); - if (config.ParseResult->Has("cluster-name")) - logConfig->SetClusterName(ClusterName); - } - // This flag is set per node and we prefer flag over CMS. - if (config.ParseResult->Has("syslog-service-tag") - && !AppConfig.GetLogConfig().GetSysLogService()) { - AppConfig.MutableLogConfig()->SetSysLogService(SysLogServiceTag); - TRACE_CONFIG_CHANGE_INPLACE_T(LogConfig, UpdateExplicitly); - } - - if (config.ParseResult->Has("log-file-name")) { - AppConfig.MutableLogConfig()->SetBackendFileName(LogFileName); - TRACE_CONFIG_CHANGE_INPLACE_T(LogConfig, UpdateExplicitly); - } - - if (auto interconnectConfig = MutableConfigPart(config, "ic-file", TCfg::TInterconnectConfigFieldTag{}, CALL_CTX())) { - if (config.ParseResult->Has("tcp")) { - interconnectConfig->SetStartTcp(true); - TRACE_CONFIG_CHANGE_INPLACE_T(InterconnectConfig, UpdateExplicitly); - } - } - - MutableConfigPart(config, "channels-file", TCfg::TChannelProfileConfigFieldTag{}, CALL_CTX()); - - if (auto bootstrapConfig = MutableConfigPart(config, "bootstrap-file", TCfg::TBootstrapConfigFieldTag{}, CALL_CTX())) { - bootstrapConfig->MutableCompileServiceConfig()->SetInflightLimit(CompileInflightLimit); - TRACE_CONFIG_CHANGE_INPLACE_T(BootstrapConfig, UpdateExplicitly); - } - - MutableConfigPart(config, "vdisk-file", TCfg::TVDiskConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "drivemodel-file", TCfg::TDriveModelConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "grpc-file", TCfg::TGRpcConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "dyn-nodes-file", TCfg::TDynamicNameserviceConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "cms-file", TCfg::TCmsConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "pq-file", TCfg::TPQConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "pqcd-file", TCfg::TPQClusterDiscoveryConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "netclassifier-file", TCfg::TNetClassifierConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "auth-file", TCfg::TAuthConfigFieldTag{}, CALL_CTX()); - MutableConfigPartMerge(config, "auth-token-file", TCfg::TAuthConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "key-file", TCfg::TKeyConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "pdisk-key-file", TCfg::TPDiskKeyConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "sqs-file", TCfg::TSqsConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "http-proxy-file", TCfg::THttpProxyConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "public-http-file", TCfg::TPublicHttpConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "feature-flags-file", TCfg::TFeatureFlagsFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "rb-file", TCfg::TResourceBrokerConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "metering-file", TCfg::TMeteringConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "audit-file", TCfg::TAuditConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "kqp-file", TCfg::TKQPConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "incrhuge-file", TCfg::TIncrHugeConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "alloc-file", TCfg::TAllocatorConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, "fq-file", TCfg::TFederatedQueryConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, nullptr, TCfg::TTracingConfigFieldTag{}, CALL_CTX()); - MutableConfigPart(config, nullptr, TCfg::TFailureInjectionConfigFieldTag{}, CALL_CTX()); - - if (!AppConfig.HasAllocatorConfig()) { - AppConfig.MutableAllocatorConfig()->CopyFrom(*DummyAllocatorConfig()); - TRACE_CONFIG_CHANGE_INPLACE_T(AllocatorConfig, UpdateExplicitly); - } - - // apply certificates, if any - if (!PathToInterconnectCertFile.Empty()) { - AppConfig.MutableInterconnectConfig()->SetPathToCertificateFile(PathToInterconnectCertFile); - TRACE_CONFIG_CHANGE_INPLACE_T(InterconnectConfig, UpdateExplicitly); - } - - if (!PathToInterconnectPrivateKeyFile.Empty()) { - AppConfig.MutableInterconnectConfig()->SetPathToPrivateKeyFile(PathToInterconnectPrivateKeyFile); - TRACE_CONFIG_CHANGE_INPLACE_T(InterconnectConfig, UpdateExplicitly); - } - - if (!PathToInterconnectCaFile.Empty()) { - AppConfig.MutableInterconnectConfig()->SetPathToCaFile(PathToInterconnectCaFile); - TRACE_CONFIG_CHANGE_INPLACE_T(InterconnectConfig, UpdateExplicitly); - } - - if (AppConfig.HasGRpcConfig() && AppConfig.GetGRpcConfig().HasCert()) { - AppConfig.MutableGRpcConfig()->SetPathToCertificateFile(AppConfig.GetGRpcConfig().GetCert()); - TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); - } - - if (!PathToGrpcCertFile.Empty()) { - AppConfig.MutableGRpcConfig()->SetPathToCertificateFile(PathToGrpcCertFile); - TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); - } - - if (AppConfig.HasGRpcConfig() && AppConfig.GetGRpcConfig().HasKey()) { - AppConfig.MutableGRpcConfig()->SetPathToPrivateKeyFile(AppConfig.GetGRpcConfig().GetKey()); - TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); - } - - if (!PathToGrpcPrivateKeyFile.Empty()) { - AppConfig.MutableGRpcConfig()->SetPathToPrivateKeyFile(PathToGrpcPrivateKeyFile); - TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); - } - - if (AppConfig.HasGRpcConfig() && AppConfig.GetGRpcConfig().HasCA()) { - AppConfig.MutableGRpcConfig()->SetPathToCaFile(AppConfig.GetGRpcConfig().GetCA()); - TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); - } - - if (!PathToGrpcCaFile.Empty()) { - AppConfig.MutableGRpcConfig()->SetPathToCaFile(PathToGrpcCaFile); - TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); - } - - if (!AppConfig.HasDomainsConfig()) - ythrow yexception() << "DomainsConfig is not provided"; - if (!AppConfig.HasChannelProfileConfig()) - ythrow yexception() << "ChannelProfileConfig is not provided"; - - if (!config.ParseResult->Has("tenant") && RunConfig.ScopeId.IsEmpty()) { - const TString myDomain = DeduceNodeDomain(); - for (const auto& domain : AppConfig.GetDomainsConfig().GetDomain()) { - if (domain.GetName() == myDomain) { - RunConfig.ScopeId = TKikimrScopeId(0, domain.GetDomainId()); - break; - } - } - } - if (NodeId) - RunConfig.NodeId = NodeId; - - if (NodeKind == NODE_KIND_YQ && InterconnectPort) { - auto& fqConfig = *AppConfig.MutableFederatedQueryConfig(); - auto& nmConfig = *fqConfig.MutableNodesManager(); - nmConfig.SetPort(InterconnectPort); - nmConfig.SetHost(HostName()); - } - - if (config.ParseResult->Has("suppress-version-check")) { - if (AppConfig.HasNameserviceConfig()) { - AppConfig.MutableNameserviceConfig()->SetSuppressVersionCheck(true); - TRACE_CONFIG_CHANGE_INPLACE_T(NameserviceConfig, UpdateExplicitly); - } else { - ythrow yexception() << "--suppress-version-check option is provided without static nameservice config"; - } - } - - // apply options affecting UDF paths - if (!AppConfig.HasUDFsDir()) - AppConfig.SetUDFsDir(UDFsDir); - if (!AppConfig.UDFsPathsSize()) { - for (const auto& path : UDFsPaths) { - AppConfig.AddUDFsPaths(path); - } - } - - if (!AppConfig.HasMonitoringConfig()) { - AppConfig.MutableMonitoringConfig()->SetMonitoringThreads(MonitoringThreads); - TRACE_CONFIG_CHANGE_INPLACE_T(MonitoringConfig, UpdateExplicitly); - } - if (!AppConfig.HasRestartsCountConfig() && RestartsCountFile) { - AppConfig.MutableRestartsCountConfig()->SetRestartsCountFile(RestartsCountFile); - TRACE_CONFIG_CHANGE_INPLACE_T(RestartsCountConfig, UpdateExplicitly); - } - - // Ports and node type are always applied (even if config was loaded from CMS). - if (MonitoringPort) { - AppConfig.MutableMonitoringConfig()->SetMonitoringPort(MonitoringPort); - TRACE_CONFIG_CHANGE_INPLACE_T(MonitoringConfig, UpdateExplicitly); - } - if (MonitoringAddress) { - AppConfig.MutableMonitoringConfig()->SetMonitoringAddress(MonitoringAddress); - TRACE_CONFIG_CHANGE_INPLACE_T(MonitoringConfig, UpdateExplicitly); - } - if (MonitoringCertificateFile) { - TString sslCertificate = TUnbufferedFileInput(MonitoringCertificateFile).ReadAll(); - if (!sslCertificate.empty()) { - AppConfig.MutableMonitoringConfig()->SetMonitoringCertificate(sslCertificate); - TRACE_CONFIG_CHANGE_INPLACE_T(MonitoringConfig, UpdateExplicitly); - } else { - ythrow yexception() << "invalid ssl certificate file"; - } - } - if (SqsHttpPort) { - AppConfig.MutableSqsConfig()->MutableHttpServerConfig()->SetPort(SqsHttpPort); - TRACE_CONFIG_CHANGE_INPLACE_T(SqsConfig, UpdateExplicitly); - } - if (GRpcPort) { - auto& conf = *AppConfig.MutableGRpcConfig(); - conf.SetStartGRpcProxy(true); - conf.SetPort(GRpcPort); - TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); - } - if (GRpcsPort) { - auto& conf = *AppConfig.MutableGRpcConfig(); - conf.SetStartGRpcProxy(true); - conf.SetSslPort(GRpcsPort); - TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); - } - if (GRpcPublicHost) { - auto& conf = *AppConfig.MutableGRpcConfig(); - conf.SetPublicHost(GRpcPublicHost); - for (auto& ext : *conf.MutableExtEndpoints()) { - if (!ext.HasPublicHost()) { - ext.SetPublicHost(GRpcPublicHost); - } - } - TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); - } - if (GRpcPublicPort) { - auto& conf = *AppConfig.MutableGRpcConfig(); - conf.SetPublicPort(GRpcPublicPort); - for (auto& ext : *conf.MutableExtEndpoints()) { - if (!ext.HasPublicPort()) { - ext.SetPublicPort(GRpcPublicPort); - } - } - TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); - } - if (GRpcsPublicPort) { - auto& conf = *AppConfig.MutableGRpcConfig(); - conf.SetPublicSslPort(GRpcsPublicPort); - for (auto& ext : *conf.MutableExtEndpoints()) { - if (!ext.HasPublicSslPort()) { - ext.SetPublicSslPort(GRpcsPublicPort); - } - } - TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); - } - for (const auto& addr : GRpcPublicAddressesV4) { - AppConfig.MutableGRpcConfig()->AddPublicAddressesV4(addr); - } - if (GRpcPublicAddressesV4.size()) { - TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); - } - for (const auto& addr : GRpcPublicAddressesV6) { - AppConfig.MutableGRpcConfig()->AddPublicAddressesV6(addr); - } - if (GRpcPublicAddressesV6.size()) { - TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); - } - if (GRpcPublicTargetNameOverride) { - AppConfig.MutableGRpcConfig()->SetPublicTargetNameOverride(GRpcPublicTargetNameOverride); - TRACE_CONFIG_CHANGE_INPLACE_T(GRpcConfig, UpdateExplicitly); - } - if (config.ParseResult->Has("node-type")) { - AppConfig.MutableTenantPoolConfig()->SetNodeType(NodeType); - TRACE_CONFIG_CHANGE_INPLACE_T(TenantPoolConfig, UpdateExplicitly); - } - - if (config.ParseResult->Has("tenant") && InterconnectPort != DefaultInterconnectPort) { - AppConfig.MutableMonitoringConfig()->SetHostLabelOverride(HostAndICPort()); - TRACE_CONFIG_CHANGE_INPLACE_T(MonitoringConfig, UpdateExplicitly); - } - - if (config.ParseResult->Has("data-center")) { - AppConfig.MutableMonitoringConfig()->SetDataCenter(to_lower(DataCenter)); - TRACE_CONFIG_CHANGE_INPLACE_T(MonitoringConfig, UpdateExplicitly); - } - - if (config.ParseResult->Has("tenant")) { - auto &slot = *AppConfig.MutableTenantPoolConfig()->AddSlots(); - slot.SetId("static-slot"); - slot.SetTenantName(TenantName); - slot.SetIsDynamic(false); - RunConfig.TenantName = TenantName; - TRACE_CONFIG_CHANGE_INPLACE_T(TenantPoolConfig, UpdateExplicitly); - } else { - auto &slot = *AppConfig.MutableTenantPoolConfig()->AddSlots(); - slot.SetId("static-slot"); - slot.SetTenantName(CanonizePath(DeduceNodeDomain())); - slot.SetIsDynamic(false); - RunConfig.TenantName = CanonizePath(DeduceNodeDomain()); - TRACE_CONFIG_CHANGE_INPLACE_T(TenantPoolConfig, UpdateExplicitly); - } - - if (config.ParseResult->Has("data-center")) { - if (AppConfig.HasFederatedQueryConfig()) { - AppConfig.MutableFederatedQueryConfig()->MutableNodesManager()->SetDataCenter(to_lower(DataCenter)); - TRACE_CONFIG_CHANGE_INPLACE_T(FederatedQueryConfig, UpdateExplicitly); - } - } - - // MessageBus options. - - if (!AppConfig.HasMessageBusConfig()) { - auto messageBusConfig = AppConfig.MutableMessageBusConfig(); - messageBusConfig->SetStartBusProxy(config.ParseResult->Has(config.Opts->FindLongOption("mbus"))); - messageBusConfig->SetBusProxyPort(BusProxyPort); - - if (!messageBusConfig->GetStartBusProxy()) { - for (const auto &option : config.Opts->Opts_) { - for (const TString &longName : option->GetLongNames()) { - if (longName.StartsWith("mbus-") && config.ParseResult->Has(option.Get())) { - ythrow yexception() << "option --" << longName << " is useless without --mbus option"; - } - } - } - } - - auto queueConfig = messageBusConfig->MutableProxyBusQueueConfig(); - queueConfig->SetName(ProxyBusQueueConfig.Name); - queueConfig->SetNumWorkers(ProxyBusQueueConfig.NumWorkers); - - auto sessionConfig = messageBusConfig->MutableProxyBusSessionConfig(); - - // TODO use macro from messagebus header file - sessionConfig->SetName(ProxyBusSessionConfig.Name); - sessionConfig->SetNumRetries(ProxyBusSessionConfig.NumRetries); - sessionConfig->SetRetryInterval(ProxyBusSessionConfig.RetryInterval); - sessionConfig->SetReconnectWhenIdle(ProxyBusSessionConfig.ReconnectWhenIdle); - sessionConfig->SetMaxInFlight(ProxyBusSessionConfig.MaxInFlight); - sessionConfig->SetPerConnectionMaxInFlight(ProxyBusSessionConfig.PerConnectionMaxInFlight); - sessionConfig->SetPerConnectionMaxInFlightBySize(ProxyBusSessionConfig.PerConnectionMaxInFlightBySize); - sessionConfig->SetMaxInFlightBySize(ProxyBusSessionConfig.MaxInFlightBySize); - sessionConfig->SetTotalTimeout(ProxyBusSessionConfig.TotalTimeout); - sessionConfig->SetSendTimeout(ProxyBusSessionConfig.SendTimeout); - sessionConfig->SetConnectTimeout(ProxyBusSessionConfig.ConnectTimeout); - sessionConfig->SetDefaultBufferSize(ProxyBusSessionConfig.DefaultBufferSize); - sessionConfig->SetMaxBufferSize(ProxyBusSessionConfig.MaxBufferSize); - sessionConfig->SetSocketRecvBufferSize(ProxyBusSessionConfig.SocketRecvBufferSize); - sessionConfig->SetSocketSendBufferSize(ProxyBusSessionConfig.SocketSendBufferSize); - sessionConfig->SetSocketToS(ProxyBusSessionConfig.SocketToS); - sessionConfig->SetSendThreshold(ProxyBusSessionConfig.SendThreshold); - sessionConfig->SetCork(ProxyBusSessionConfig.Cork.MilliSeconds()); - sessionConfig->SetMaxMessageSize(ProxyBusSessionConfig.MaxMessageSize); - sessionConfig->SetTcpNoDelay(ProxyBusSessionConfig.TcpNoDelay); - sessionConfig->SetTcpCork(ProxyBusSessionConfig.TcpCork); - sessionConfig->SetExecuteOnMessageInWorkerPool(ProxyBusSessionConfig.ExecuteOnMessageInWorkerPool); - sessionConfig->SetExecuteOnReplyInWorkerPool(ProxyBusSessionConfig.ExecuteOnReplyInWorkerPool); - sessionConfig->SetListenPort(ProxyBusSessionConfig.ListenPort); - - for (auto proxy : ProxyBindToProxy) { - messageBusConfig->AddProxyBindToProxy(proxy); - } - messageBusConfig->SetStartTracingBusProxy(!!TracePath); - messageBusConfig->SetTracePath(TracePath); - - TRACE_CONFIG_CHANGE_INPLACE_T(MessageBusConfig, UpdateExplicitly); - } - - if (AppConfig.HasDynamicNameserviceConfig()) { - bool isDynamic = RunConfig.NodeId > AppConfig.GetDynamicNameserviceConfig().GetMaxStaticNodeId(); - RunConfig.Labels["dynamic"] = ToString(isDynamic ? "true" : "false"); - AddLabelToAppConfig("node_id", RunConfig.Labels["node_id"]); - } - - RunConfig.ClusterName = AppConfig.GetNameserviceConfig().GetClusterUUID(); - } - - inline void LoadYamlConfig(TCallContext callCtx) { - if (!YamlConfigFile) { - return; - } - auto yamlConfig = TFileInput(YamlConfigFile); - NKikimrConfig::TAppConfig parsedConfig = NKikimr::NYaml::Parse(yamlConfig.ReadAll()); - const google::protobuf::Descriptor* descriptor = AppConfig.GetDescriptor(); - const google::protobuf::Reflection* reflection = AppConfig.GetReflection(); - for(int fieldIdx = 0; fieldIdx < descriptor->field_count(); ++fieldIdx) { - const google::protobuf::FieldDescriptor* fieldDescriptor = descriptor->field(fieldIdx); - if (!fieldDescriptor) - continue; - - if (fieldDescriptor->is_repeated()) { - continue; - } - - if (reflection->HasField(AppConfig, fieldDescriptor)) { - // field is already set in app config - continue; - } - - if (reflection->HasField(parsedConfig, fieldDescriptor)) { - reflection->SwapFields(&AppConfig, &parsedConfig, {fieldDescriptor}); - TRACE_CONFIG_CHANGE(callCtx, fieldIdx, ReplaceConfigWithConsoleProto); - } - } - } - - inline bool LoadBootstrapConfig(TConfig& config) { - bool res = false; - for (const TString& path : config.ParseResult->GetFreeArgs()) { - NKikimrConfig::TAppConfig parsedConfig; - const bool result = ParsePBFromFile(path, &parsedConfig); - Y_ABORT_UNLESS(result); - BaseConfig.MergeFrom(parsedConfig); - res = true; - } - - return res; - } - - TString DeduceNodeDomain() { - if (NodeDomain) - return NodeDomain; - if (AppConfig.GetDomainsConfig().DomainSize() == 1) - return AppConfig.GetDomainsConfig().GetDomain(0).GetName(); - if (AppConfig.GetTenantPoolConfig().SlotsSize() == 1) { - auto &slot = AppConfig.GetTenantPoolConfig().GetSlots(0); - if (slot.GetDomainName()) - return slot.GetDomainName(); - auto &tenantName = slot.GetTenantName(); - if (IsStartWithSlash(tenantName)) - return ToString(ExtractDomain(tenantName)); - } - return ""; - } - - TNodeLocation CreateNodeLocation() { - NActorsInterconnect::TNodeLocation location; - location.SetDataCenter(DataCenter); - location.SetRack(Rack); - location.SetUnit(ToString(Body)); - TNodeLocation loc(location); - - NActorsInterconnect::TNodeLocation legacy; - legacy.SetDataCenterNum(DataCenterFromString(DataCenter)); - legacy.SetRoomNum(0); - legacy.SetRackNum(RackFromString(Rack)); - legacy.SetBodyNum(Body); - loc.InheritLegacyValue(TNodeLocation(legacy)); - return loc; - } - - NYdb::NDiscovery::TNodeRegistrationSettings GetNodeRegistrationSettings(const TString &domainName, - const TString &nodeHost, - const TString &nodeAddress, - const TString &nodeResolveHost, - const TMaybe& path) { - NYdb::NDiscovery::TNodeRegistrationSettings settings; - settings.Host(nodeHost); - settings.Port(InterconnectPort); - settings.ResolveHost(nodeResolveHost); - settings.Address(nodeAddress); - settings.DomainPath(domainName); - settings.FixedNodeId(FixedNodeID); - if (path) { - settings.Path(*path); - } - - auto loc = CreateNodeLocation(); - NActorsInterconnect::TNodeLocation tmpLocation; - loc.Serialize(&tmpLocation, false); - - NYdb::NDiscovery::TNodeLocation settingLocation; - CopyNodeLocation(&settingLocation, tmpLocation); - settings.Location(settingLocation); - return settings; - } - - NYdb::NDiscovery::TNodeRegistrationResult TryToRegisterDynamicNodeViaDiscoveryService( - const TString &addr, - const TString &domainName, - const TString &nodeHost, - const TString &nodeAddress, - const TString &nodeResolveHost, - const TMaybe& path) { - TCommandConfig::TServerEndpoint endpoint = TCommandConfig::ParseServerAddress(addr); - NYdb::TDriverConfig config; - if (endpoint.EnableSsl.Defined()) { - if (PathToGrpcCaFile) { - config.UseSecureConnection(ReadFromFile(PathToGrpcCaFile, "CA certificates").c_str()); - } - if (PathToGrpcCertFile && PathToGrpcPrivateKeyFile) { - auto certificate = ReadFromFile(PathToGrpcCertFile, "Client certificates"); - auto privateKey = ReadFromFile(PathToGrpcPrivateKeyFile, "Client certificates key"); - config.UseClientCertificate(certificate.c_str(), privateKey.c_str()); - } - } - config.SetAuthToken(BUILTIN_ACL_ROOT); - config.SetEndpoint(endpoint.Address); - auto connection = NYdb::TDriver(config); - - auto client = NYdb::NDiscovery::TDiscoveryClient(connection); - NYdb::NDiscovery::TNodeRegistrationResult result = client.NodeRegistration(GetNodeRegistrationSettings(domainName, nodeHost, nodeAddress, nodeResolveHost, path)).GetValueSync(); - connection.Stop(true); - return result; - } - - THolder TryToRegisterDynamicNodeViaLegacyService( - const TString &addr, - const TString &domainName, - const TString &nodeHost, - const TString &nodeAddress, - const TString &nodeResolveHost, - const TMaybe& path) { - NClient::TKikimr kikimr(GetKikimr(addr)); - auto registrant = kikimr.GetNodeRegistrant(); - - auto loc = CreateNodeLocation(); - - return MakeHolder - (registrant.SyncRegisterNode(ToString(domainName), - nodeHost, - InterconnectPort, - nodeAddress, - nodeResolveHost, - std::move(loc), - FixedNodeID, - path)); - } - - void FillClusterEndpoints(TVector &addrs) { - if (!NodeBrokerAddresses.empty()) { - for (auto addr: NodeBrokerAddresses) { - addrs.push_back(addr); - } - } else { - Y_ABORT_UNLESS(NodeBrokerPort); - for (auto &node : AppConfig.MutableNameserviceConfig()->GetNode()) { - addrs.emplace_back(TStringBuilder() << (NodeBrokerUseTls ? "grpcs://" : "") << node.GetHost() << ':' << NodeBrokerPort); - } - } - ShuffleRange(addrs); - } - - TString HostAndICPort() { - try { - auto hostname = to_lower(HostName()); - hostname = hostname.substr(0, hostname.find('.')); - return TStringBuilder() << hostname << ":" << InterconnectPort; - } catch (TSystemError& error) { - return ""; - } - } - - TMaybe GetSchemePath() { - if (TenantName.StartsWith('/')) { - return TenantName; // TODO(alexvru): fix it - } - return {}; - } - - NYdb::NDiscovery::TNodeRegistrationResult RegisterDynamicNodeViaDiscoveryService(const TVector& addrs, const TString& domainName) { - NYdb::NDiscovery::TNodeRegistrationResult result; - const size_t maxNumberRecivedCallUnimplemented = 5; - size_t currentNumberRecivedCallUnimplemented = 0; - while (!result.IsSuccess() && currentNumberRecivedCallUnimplemented < maxNumberRecivedCallUnimplemented) { - for (const auto& addr : addrs) { - result = TryToRegisterDynamicNodeViaDiscoveryService(addr, domainName, NodeHost, NodeAddress, NodeResolveHost, GetSchemePath()); - if (result.IsSuccess()) { - Cout << "Success. Registered via discovery service as " << result.GetNodeId() << Endl; - break; - } - Cerr << "Registration error: " << static_cast(result) << Endl; - } - if (!result.IsSuccess()) { - Sleep(TDuration::Seconds(1)); - if (result.GetStatus() == NYdb::EStatus::CLIENT_CALL_UNIMPLEMENTED) { - currentNumberRecivedCallUnimplemented++; - } - } - } - return result; - } - - void ProcessRegistrationDynamicNodeResult(const NYdb::NDiscovery::TNodeRegistrationResult& result) { - RunConfig.NodeId = result.GetNodeId(); - NActors::TScopeId scopeId; - if (result.HasScopeTabletId() && result.HasScopePathId()) { - scopeId.first = result.GetScopeTabletId(); - scopeId.second = result.GetScopePathId(); - } - RunConfig.ScopeId = TKikimrScopeId(scopeId); - - auto &nsConfig = *AppConfig.MutableNameserviceConfig(); - nsConfig.ClearNode(); - - auto &dnConfig = *AppConfig.MutableDynamicNodeConfig(); - for (auto &node : result.GetNodes()) { - if (node.NodeId == result.GetNodeId()) { - auto nodeInfo = dnConfig.MutableNodeInfo(); - nodeInfo->SetNodeId(node.NodeId); - nodeInfo->SetHost(node.Host); - nodeInfo->SetPort(node.Port); - nodeInfo->SetResolveHost(node.ResolveHost); - nodeInfo->SetAddress(node.Address); - nodeInfo->SetExpire(node.Expire); - CopyNodeLocation(nodeInfo->MutableLocation(), node.Location); - } else { - auto &info = *nsConfig.AddNode(); - info.SetNodeId(node.NodeId); - info.SetAddress(node.Address); - info.SetPort(node.Port); - info.SetHost(node.Host); - info.SetInterconnectHost(node.ResolveHost); - CopyNodeLocation(info.MutableLocation(), node.Location); - } - } - } - - static void CopyNodeLocation(NActorsInterconnect::TNodeLocation* dst, const NYdb::NDiscovery::TNodeLocation& src) { - if (src.DataCenterNum) { - dst->SetDataCenterNum(src.DataCenterNum.value()); - } - if (src.RoomNum) { - dst->SetRoomNum(src.RoomNum.value()); - } - if (src.RackNum) { - dst->SetRackNum(src.RackNum.value()); - } - if (src.BodyNum) { - dst->SetBodyNum(src.BodyNum.value()); - } - if (src.Body) { - dst->SetBody(src.Body.value()); - } - if (src.DataCenter) { - dst->SetDataCenter(src.DataCenter.value()); - } - if (src.Module) { - dst->SetModule(src.Module.value()); - } - if (src.Rack) { - dst->SetRack(src.Rack.value()); - } - if (src.Unit) { - dst->SetUnit(src.Unit.value()); - } - } - - static void CopyNodeLocation(NYdb::NDiscovery::TNodeLocation* dst, const NActorsInterconnect::TNodeLocation& src) { - if (src.HasDataCenterNum()) { - dst->DataCenterNum = src.GetDataCenterNum(); - } - if (src.HasRoomNum()) { - dst->RoomNum = src.GetRoomNum(); - } - if (src.HasRackNum()) { - dst->RackNum = src.GetRackNum(); - } - if (src.HasBodyNum()) { - dst->BodyNum = src.GetBodyNum(); - } - if (src.HasBody()) { - dst->Body = src.GetBody(); - } - if (src.HasDataCenter()) { - dst->DataCenter = src.GetDataCenter(); - } - if (src.HasModule()) { - dst->Module = src.GetModule(); - } - if (src.HasRack()) { - dst->Rack = src.GetRack(); - } - if (src.HasUnit()) { - dst->Unit = src.GetUnit(); - } - } - - THolder RegisterDynamicNodeViaLegacyService(const TVector& addrs, const TString& domainName) { - THolder result; - while (!result || !result->IsSuccess()) { - for (const auto& addr : addrs) { - result = TryToRegisterDynamicNodeViaLegacyService(addr, domainName, NodeHost, NodeAddress, NodeResolveHost, GetSchemePath()); - if (result->IsSuccess()) { - Cout << "Success. Registered via legacy service as " << result->GetNodeId() << Endl; - break; - } - Cerr << "Registration error: " << result->GetErrorMessage() << Endl; - } - if (!result || !result->IsSuccess()) - Sleep(TDuration::Seconds(1)); - } - Y_ABORT_UNLESS(result); - - if (!result->IsSuccess()) - ythrow yexception() << "Cannot register dynamic node: " << result->GetErrorMessage(); - - return result; - } - - void ProcessRegistrationDynamicNodeResult(const THolder& result) { - RunConfig.NodeId = result->GetNodeId(); - RunConfig.ScopeId = TKikimrScopeId(result->GetScopeId()); - - auto &nsConfig = *AppConfig.MutableNameserviceConfig(); - nsConfig.ClearNode(); - - auto &dnConfig = *AppConfig.MutableDynamicNodeConfig(); - for (auto &node : result->Record().GetNodes()) { - if (node.GetNodeId() == result->GetNodeId()) { - dnConfig.MutableNodeInfo()->CopyFrom(node); - } else { - auto &info = *nsConfig.AddNode(); - info.SetNodeId(node.GetNodeId()); - info.SetAddress(node.GetAddress()); - info.SetPort(node.GetPort()); - info.SetHost(node.GetHost()); - info.SetInterconnectHost(node.GetResolveHost()); - info.MutableLocation()->CopyFrom(node.GetLocation()); - } - } - } - - void RegisterDynamicNode() { - TVector addrs; - - FillClusterEndpoints(addrs); - - if (!InterconnectPort) - ythrow yexception() << "Either --node or --ic-port should be specified"; - - if (addrs.empty()) { - ythrow yexception() << "List of Node Broker end-points is empty"; - } - - TString domainName = DeduceNodeDomain(); - if (!NodeHost) - NodeHost = FQDNHostName(); - if (!NodeResolveHost) - NodeResolveHost = NodeHost; - - NYdb::NDiscovery::TNodeRegistrationResult result = RegisterDynamicNodeViaDiscoveryService(addrs, domainName); - if (result.IsSuccess()) { - ProcessRegistrationDynamicNodeResult(result); - } else { - THolder result = RegisterDynamicNodeViaLegacyService(addrs, domainName); - ProcessRegistrationDynamicNodeResult(result); - } - } - - void ApplyConfigForNode(NKikimrConfig::TAppConfig &appConfig) { - AppConfig.Swap(&appConfig); - // Dynamic node config is defined by options and Node Broker response. - AppConfig.MutableDynamicNodeConfig()->Swap(appConfig.MutableDynamicNodeConfig()); - // By now naming config should be loaded and probably replaced with - // info from registration response. Don't lose it in case CMS has no - // config for naming service. - if (!AppConfig.HasNameserviceConfig()) { - AppConfig.MutableNameserviceConfig()->Swap(appConfig.MutableNameserviceConfig()); - RunConfig.ConfigInitInfo[NKikimrConsole::TConfigItem::NameserviceConfigItem].Updates.pop_back(); - } - } - - bool TryToLoadConfigForDynamicNodeFromCMS(const TString &addr, TString &error) { - NClient::TKikimr kikimr(GetKikimr(addr)); - auto configurator = kikimr.GetNodeConfigurator(); - - Cout << "Trying to get configs from " << addr << Endl; - - auto result = configurator.SyncGetNodeConfig(RunConfig.NodeId, - FQDNHostName(), - TenantName, - NodeType, - DeduceNodeDomain(), - AppConfig.GetAuthConfig().GetStaffApiUserToken(), - true, - 1); - - if (!result.IsSuccess()) { - error = result.GetErrorMessage(); - Cerr << "Configuration error: " << error << Endl; - return false; - } - - Cout << "Success." << Endl; - - NKikimrConfig::TAppConfig appConfig; - - NKikimrConfig::TAppConfig yamlConfig; - - if (result.HasYamlConfig() && !result.GetYamlConfig().empty()) { - NYamlConfig::ResolveAndParseYamlConfig( - result.GetYamlConfig(), - result.GetVolatileYamlConfigs(), - RunConfig.Labels, - yamlConfig); - } - - RunConfig.InitialCmsConfig.CopyFrom(result.GetConfig()); - - RunConfig.InitialCmsYamlConfig.CopyFrom(yamlConfig); - NYamlConfig::ReplaceUnmanagedKinds(result.GetConfig(), RunConfig.InitialCmsYamlConfig); - - if (yamlConfig.HasYamlConfigEnabled() && yamlConfig.GetYamlConfigEnabled()) { - appConfig = yamlConfig; - NYamlConfig::ReplaceUnmanagedKinds(result.GetConfig(), appConfig); - - for (ui32 kind = NKikimrConsole::TConfigItem::EKind_MIN; kind <= NKikimrConsole::TConfigItem::EKind_MAX; kind++) { - if (kind == NKikimrConsole::TConfigItem::Auto || !NKikimrConsole::TConfigItem::EKind_IsValid(kind)) { - continue; - } - if ((kind == NKikimrConsole::TConfigItem::NameserviceConfigItem && appConfig.HasNameserviceConfig()) - || (kind == NKikimrConsole::TConfigItem::NetClassifierDistributableConfigItem && appConfig.HasNetClassifierDistributableConfig()) - || (kind == NKikimrConsole::TConfigItem::NamedConfigsItem && appConfig.NamedConfigsSize())) { - TRACE_CONFIG_CHANGE_INPLACE(kind, ReplaceConfigWithConsoleProto); - } else { - TRACE_CONFIG_CHANGE_INPLACE(kind, ReplaceConfigWithConsoleYaml); - } - } - } else { - appConfig = result.GetConfig(); - for (ui32 kind = NKikimrConsole::TConfigItem::EKind_MIN; kind <= NKikimrConsole::TConfigItem::EKind_MAX; kind++) { - if (kind == NKikimrConsole::TConfigItem::Auto || !NKikimrConsole::TConfigItem::EKind_IsValid(kind)) { - continue; - } - TRACE_CONFIG_CHANGE_INPLACE(kind, ReplaceConfigWithConsoleProto); - } - } - - ApplyConfigForNode(appConfig); - - return true; - } - - void LoadConfigForDynamicNode() { - auto res = false; - TString error; - TVector addrs; - - FillClusterEndpoints(addrs); - - SetRandomSeed(TInstant::Now().MicroSeconds()); - int minAttempts = 10; - int attempts = 0; - while (!res && attempts < minAttempts) { - for (auto addr : addrs) { - res = TryToLoadConfigForDynamicNodeFromCMS(addr, error); - ++attempts; - if (res) - break; - } - // Randomized backoff - if (!res) - Sleep(TDuration::MilliSeconds(500 + RandomNumber(1000))); - } - - if (!res) { - Cerr << "WARNING: couldn't load config from CMS: " << error << Endl; - } - } - -private: - NClient::TKikimr GetKikimr(const TString& addr) { - TCommandConfig::TServerEndpoint endpoint = TCommandConfig::ParseServerAddress(addr); - NYdbGrpc::TGRpcClientConfig grpcConfig(endpoint.Address, TDuration::Seconds(5)); - grpcConfig.LoadBalancingPolicy = "round_robin"; - if (endpoint.EnableSsl.Defined()) { - grpcConfig.EnableSsl = endpoint.EnableSsl.GetRef(); - auto& sslCredentials = grpcConfig.SslCredentials; - if (PathToGrpcCaFile) { - sslCredentials.pem_root_certs = ReadFromFile(PathToGrpcCaFile, "CA certificates"); - } - if (PathToGrpcCertFile && PathToGrpcPrivateKeyFile) { - sslCredentials.pem_cert_chain = ReadFromFile(PathToGrpcCertFile, "Client certificates"); - sslCredentials.pem_private_key = ReadFromFile(PathToGrpcPrivateKeyFile, "Client certificates key"); - } - } - return NClient::TKikimr(grpcConfig); - } -}; - -class TClientCommandServer : public TClientCommandServerBase { -public: - TClientCommandServer(std::shared_ptr factories) - : TClientCommandServerBase("server", "Execute YDB server") - , Factories(std::move(factories)) - {} - - virtual int Run(TConfig &/*config*/) override { - Y_ABORT_UNLESS(RunConfig.NodeId); - return MainRun(RunConfig, Factories); + InitCfg.ValidateOptions(*config.Opts, *config.ParseResult); + InitCfg.Parse(config.ParseResult->GetFreeArgs()); } - -private: - std::shared_ptr Factories; }; void AddClientCommandServer(TClientCommandTree& parent, std::shared_ptr factories) { parent.AddCommand(std::make_unique(factories)); } -} -} +} // namespace NKikimr::NDriverClient diff --git a/ydb/core/driver_lib/cli_utils/ya.make b/ydb/core/driver_lib/cli_utils/ya.make index 79d7248a3370..c75f2845d97c 100644 --- a/ydb/core/driver_lib/cli_utils/ya.make +++ b/ydb/core/driver_lib/cli_utils/ya.make @@ -35,7 +35,6 @@ SRCS( PEERDIR( library/cpp/deprecated/enum_codegen - ydb/library/grpc/client library/cpp/protobuf/json library/cpp/yson ydb/core/actorlib_impl @@ -43,6 +42,7 @@ PEERDIR( ydb/core/blobstorage/pdisk ydb/core/client/minikql_compile ydb/core/client/scheme_cache_lib + ydb/core/config/init ydb/core/driver_lib/cli_base ydb/core/engine ydb/core/erasure @@ -51,6 +51,7 @@ PEERDIR( ydb/core/scheme ydb/library/aclib ydb/library/folder_service/proto + ydb/library/grpc/client ydb/library/yaml_config ydb/public/api/grpc ydb/public/api/grpc/draft diff --git a/ydb/public/lib/ydb_cli/common/common.cpp b/ydb/public/lib/ydb_cli/common/common.cpp index 1e6153dcb6fb..3e2764697918 100644 --- a/ydb/public/lib/ydb_cli/common/common.cpp +++ b/ydb/public/lib/ydb_cli/common/common.cpp @@ -81,6 +81,11 @@ bool ReadFromFileIfExists(TString& filePath, const TString& fileName, TString& o return false; } +bool ReadFromFileIfExists(const TString& filePath, const TString& fileName, TString& output, bool allowEmpty) { + TString fpCopy = filePath; + return ReadFromFileIfExists(fpCopy, fileName, output, allowEmpty); +} + TString ReadFromFile(TString& filePath, const TString& fileName, bool allowEmpty) { TString content; if (ReadFromFileIfExists(filePath, fileName, content, allowEmpty)) { @@ -90,6 +95,11 @@ TString ReadFromFile(TString& filePath, const TString& fileName, bool allowEmpty } } +TString ReadFromFile(const TString& filePath, const TString& fileName, bool allowEmpty) { + TString fpCopy = filePath; + return ReadFromFile(fpCopy, fileName, allowEmpty); +} + TString InputPassword() { // Disable echoing characters and enable per-symbol input handling #if defined(_unix_) diff --git a/ydb/public/lib/ydb_cli/common/common.h b/ydb/public/lib/ydb_cli/common/common.h index 0afe4e4a84e6..6fc1e3b23715 100644 --- a/ydb/public/lib/ydb_cli/common/common.h +++ b/ydb/public/lib/ydb_cli/common/common.h @@ -38,7 +38,9 @@ class TProfileConfig { }; bool ReadFromFileIfExists(TString& filePath, const TString& fileName, TString& output, bool allowEmpty = false); +bool ReadFromFileIfExists(const TString& filePath, const TString& fileName, TString& output, bool allowEmpty = false); TString ReadFromFile(TString& filePath, const TString& fileName, bool allowEmpty = false); +TString ReadFromFile(const TString& filePath, const TString& fileName, bool allowEmpty = false); TString InputPassword(); }