Skip to content

Commit

Permalink
Merge a30b479 into e9a893e
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jun 17, 2024
2 parents e9a893e + a30b479 commit debea71
Show file tree
Hide file tree
Showing 24 changed files with 68 additions and 124 deletions.
4 changes: 2 additions & 2 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2158,7 +2158,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu

auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(),
Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources),
Config.GetQueryServiceConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources),
federatedQuerySetupFactory, s3ActorsFactory
);
setup->LocalServices.push_back(std::make_pair(
Expand All @@ -2167,7 +2167,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu

// Create finalize script service
auto finalize = NKqp::CreateKqpFinalizeScriptService(
Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), federatedQuerySetupFactory, s3ActorsFactory
Config.GetQueryServiceConfig(), federatedQuerySetupFactory, s3ActorsFactory
);
setup->LocalServices.push_back(std::make_pair(
NKqp::MakeKqpFinalizeScriptServiceId(NodeId),
Expand Down
8 changes: 2 additions & 6 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
TKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
const TTableServiceConfig& tableServiceConfig,
const TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const TGUCSettings::TPtr& gUCSettings,
const TMaybe<TString>& applicationName, const TString& uid, const TKqpQueryId& queryId,
Expand All @@ -70,7 +69,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
, DbCounters(dbCounters)
, Config(MakeIntrusive<TKikimrConfiguration>())
, QueryServiceConfig(queryServiceConfig)
, MetadataProviderConfig(metadataProviderConfig)
, CompilationTimeout(TDuration::MilliSeconds(tableServiceConfig.GetCompileTimeoutMs()))
, UserRequestContext(userRequestContext)
, CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor")
Expand Down Expand Up @@ -269,7 +267,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
counters->TxProxyMon = new NTxProxy::TTxProxyMon(AppData(ctx)->Counters);
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader =
std::make_shared<TKqpTableMetadataLoader>(
QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState);
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader),
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, QueryServiceConfig);
Gateway->SetToken(QueryId.Cluster, UserToken);
Expand Down Expand Up @@ -556,7 +554,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
TKqpDbCountersPtr DbCounters;
TKikimrConfiguration::TPtr Config;
TQueryServiceConfig QueryServiceConfig;
TMetadataProviderConfig MetadataProviderConfig;
TDuration CompilationTimeout;
TInstant StartTime;
TDuration CompileCpuTime;
Expand Down Expand Up @@ -622,7 +619,6 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
const TTableServiceConfig& tableServiceConfig,
const TQueryServiceConfig& queryServiceConfig,
const TMetadataProviderConfig& metadataProviderConfig,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const TString& uid, const TKqpQueryId& query, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings,
Expand All @@ -631,7 +627,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst, bool collectFullDiagnostics,
bool perStatementResult, NYql::TExprContext* splitCtx, NYql::TExprNode::TPtr splitExpr)
{
return new TKqpCompileActor(owner, kqpSettings, tableServiceConfig, queryServiceConfig, metadataProviderConfig,
return new TKqpCompileActor(owner, kqpSettings, tableServiceConfig, queryServiceConfig,
moduleResolverState, counters, gUCSettings, applicationName,
uid, query, userToken, dbCounters,
federatedQuerySetup, userRequestContext,
Expand Down
11 changes: 4 additions & 7 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,13 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
}

TKqpCompileService(const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig,
const TMetadataProviderConfig& metadataProviderConfig, const TKqpSettings::TConstPtr& kqpSettings,
const TKqpSettings::TConstPtr& kqpSettings,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup
)
: TableServiceConfig(tableServiceConfig)
, QueryServiceConfig(queryServiceConfig)
, MetadataProviderConfig(metadataProviderConfig)
, KqpSettings(kqpSettings)
, ModuleResolverState(moduleResolverState)
, Counters(counters)
Expand Down Expand Up @@ -1058,7 +1057,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
}

void StartCompilation(TKqpCompileRequest&& request, const TActorContext& ctx) {
auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, TableServiceConfig, QueryServiceConfig, MetadataProviderConfig, ModuleResolverState, Counters,
auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, TableServiceConfig, QueryServiceConfig, ModuleResolverState, Counters,
request.Uid, request.Query, request.UserToken, FederatedQuerySetup, request.DbCounters, request.GUCSettings, request.ApplicationName, request.UserRequestContext,
request.CompileServiceSpan.GetTraceId(), request.TempTablesState, request.CompileSettings.Action, std::move(request.QueryAst), CollectDiagnostics,
request.CompileSettings.PerStatementResult, request.SplitCtx, request.SplitExpr);
Expand Down Expand Up @@ -1171,7 +1170,6 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
private:
TTableServiceConfig TableServiceConfig;
TQueryServiceConfig QueryServiceConfig;
TMetadataProviderConfig MetadataProviderConfig;
TKqpSettings::TConstPtr KqpSettings;
TIntrusivePtr<TModuleResolverState> ModuleResolverState;
TIntrusivePtr<TKqpCounters> Counters;
Expand All @@ -1186,13 +1184,12 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
};

IActor* CreateKqpCompileService(const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig,
const TMetadataProviderConfig& metadataProviderConfig, const TKqpSettings::TConstPtr& kqpSettings,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup
)
{
return new TKqpCompileService(tableServiceConfig, queryServiceConfig, metadataProviderConfig, kqpSettings, moduleResolverState, counters,
return new TKqpCompileService(tableServiceConfig, queryServiceConfig, kqpSettings, moduleResolverState, counters,
std::move(queryReplayFactory), federatedQuerySetup);
}

Expand Down
2 changes: 0 additions & 2 deletions ydb/core/kqp/compile_service/kqp_compile_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ enum class ECompileActorAction {

IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState,
TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup
Expand All @@ -28,7 +27,6 @@ IActor* CreateKqpCompileComputationPatternService(const NKikimrConfig::TTableSer
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const TString& uid, const TKqpQueryId& query,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TGUCSettings::TPtr& GUCSettings)
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
maximalSecretsSnapshotWaitTime, userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
, AsyncIoFactory(std::move(asyncIoFactory))
, EnableOlapSink(enableOlapSink)
, UseEvWrite(useEvWrite)
Expand Down Expand Up @@ -2737,12 +2737,12 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
{
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig,
std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, maximalSecretsSnapshotWaitTime, userRequestContext,
std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, userRequestContext,
enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings);
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);

Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
{
Expand All @@ -92,7 +92,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, false,
aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator,
maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex,
userRequestContext, enableOlapSink, useEvWrite, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr
);
}
Expand All @@ -115,22 +115,22 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, false,
aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator,
maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex,
userRequestContext, enableOlapSink, useEvWrite, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr
);

case NKqpProto::TKqpPhyTx::TYPE_SCAN:
return CreateKqpScanExecuter(
std::move(request), database, userToken, counters, aggregation,
executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext,
executerRetriesConfig, preparedQuery, chanTransportVersion, userRequestContext,
statementResultIndex
);

case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, true,
aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator,
maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex,
userRequestContext, enableOlapSink, useEvWrite, statementResultIndex,
federatedQuerySetup, GUCSettings
);

Expand Down
10 changes: 4 additions & 6 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
ui32 statementResultIndex, ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase", bool streamResult = false)
: Request(std::move(request))
, Database(database)
Expand All @@ -135,7 +135,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
, ExecuterSpan(spanVerbosity, std::move(Request.TraceId), spanName)
, Planner(nullptr)
, ExecuterRetriesConfig(executerRetriesConfig)
, MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
, AggregationSettings(aggregation)
, HasOlapTable(false)
, StreamResult(streamResult)
Expand Down Expand Up @@ -1645,7 +1644,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}

void GetSecretsSnapshot() {
RegisterDescribeSecretsActor(this->SelfId(), UserToken ? UserToken->GetUserSID() : "", SecretNames, this->ActorContext().ActorSystem(), MaximalSecretsSnapshotWaitTime);
RegisterDescribeSecretsActor(this->SelfId(), UserToken ? UserToken->GetUserSID() : "", SecretNames, this->ActorContext().ActorSystem());
}

void GetResourcesSnapshot() {
Expand Down Expand Up @@ -1971,7 +1970,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

std::vector<TString> SecretNames;
std::map<TString, TString> SecureParams;
TDuration MaximalSecretsSnapshotWaitTime;

const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings;
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
Expand All @@ -1998,7 +1996,7 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);

Expand All @@ -2007,7 +2005,7 @@ IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex);
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex);

} // namespace NKqp
} // namespace NKikimr
Loading

0 comments on commit debea71

Please sign in to comment.