diff --git a/ydb/core/fq/libs/config/protos/control_plane_storage.proto b/ydb/core/fq/libs/config/protos/control_plane_storage.proto index fa8ed94fec1f..1c4b668d4ccc 100644 --- a/ydb/core/fq/libs/config/protos/control_plane_storage.proto +++ b/ydb/core/fq/libs/config/protos/control_plane_storage.proto @@ -73,4 +73,5 @@ message TControlPlaneStorageConfig { bool DumpRawStatistics = 32; bool IgnorePrivateSources = 33; Ydb.Query.StatsMode StatsMode = 34; + repeated string AvailableStreamingConnection = 35; } diff --git a/ydb/core/fq/libs/control_plane_storage/config.cpp b/ydb/core/fq/libs/control_plane_storage/config.cpp index 9661bdedf166..2cd4dd6bbe74 100644 --- a/ydb/core/fq/libs/control_plane_storage/config.cpp +++ b/ydb/core/fq/libs/control_plane_storage/config.cpp @@ -40,6 +40,10 @@ TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPl AvailableBindings.insert(GetBindingType(availableBinding)); } + for (const auto& availableConnection : Proto.GetAvailableStreamingConnection()) { + AvailableStreamingConnections.insert(GetConnectionType(availableConnection)); + } + GeneratorPathsLimit = s3Config.HasGeneratorPathsLimit() ? s3Config.GetGeneratorPathsLimit() : 50'000; diff --git a/ydb/core/fq/libs/control_plane_storage/config.h b/ydb/core/fq/libs/control_plane_storage/config.h index fc4fcf489364..16c8b9610506 100644 --- a/ydb/core/fq/libs/control_plane_storage/config.h +++ b/ydb/core/fq/libs/control_plane_storage/config.h @@ -30,6 +30,7 @@ struct TControlPlaneStorageConfig { TRetryPolicyItem TaskLeaseRetryPolicy; TDuration QuotaTtl; TDuration MetricsTtl; + TSet AvailableStreamingConnections; TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, const NConfig::TComputeConfig& computeConfigProto); }; diff --git a/ydb/core/fq/libs/control_plane_storage/util.cpp b/ydb/core/fq/libs/control_plane_storage/util.cpp index c31aa8e20b01..74c5bd866406 100644 --- a/ydb/core/fq/libs/control_plane_storage/util.cpp +++ b/ydb/core/fq/libs/control_plane_storage/util.cpp @@ -161,6 +161,11 @@ NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlane config.SetResultSetsTtl("1d"); } + if (config.AvailableStreamingConnectionSize() == 0) { + // For backward compatibility, TODO: YQ-2628, remove after config update on every cluster + config.MutableAvailableStreamingConnection()->CopyFrom(config.GetAvailableConnection()); + } + return config; } diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index 19bdf5360134..7ab56126f135 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -219,7 +219,12 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery *queryInternal.mutable_compute_connection() = computeDatabase.connection(); TSet disabledConnections; for (const auto& connection: GetEntities(resultSets[resultSets.size() - 2], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources(), commonCounters)) { - if (!Config->AvailableConnections.contains(connection.content().setting().connection_case())) { + auto connectionCase = connection.content().setting().connection_case(); + if (!Config->AvailableConnections.contains(connectionCase)) { + disabledConnections.insert(connection.meta().id()); + continue; + } + if ((queryType == FederatedQuery::QueryContent::STREAMING) && !Config->AvailableStreamingConnections.contains(connectionCase)) { disabledConnections.insert(connection.meta().id()); continue; }