From 93bd316f6085702f1f98ef48941e079fcc78e04f Mon Sep 17 00:00:00 2001 From: Egor Zudin Date: Tue, 19 Dec 2023 08:27:48 +0000 Subject: [PATCH 1/5] YQ-2628: add connections option for streamin queries --- ydb/core/fq/libs/config/protos/control_plane_storage.proto | 5 +++++ ydb/core/fq/libs/control_plane_storage/config.cpp | 4 ++++ ydb/core/fq/libs/control_plane_storage/config.h | 5 +++++ ydb/core/fq/libs/control_plane_storage/util.cpp | 5 +++++ .../ydb_control_plane_storage_queries.cpp | 3 ++- 5 files changed, 21 insertions(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/config/protos/control_plane_storage.proto b/ydb/core/fq/libs/config/protos/control_plane_storage.proto index fa8ed94fec1f..884a641059c3 100644 --- a/ydb/core/fq/libs/config/protos/control_plane_storage.proto +++ b/ydb/core/fq/libs/config/protos/control_plane_storage.proto @@ -38,6 +38,10 @@ message TRetryPolicyMapping { TRetryPolicy Policy = 2; } +message TStreamingQueryConfig { + repeated string AvailableConnections = 22; +} + message TControlPlaneStorageConfig { bool Enabled = 1; NFq.NConfig.TYdbStorageConfig Storage = 2; // TODO: remove @@ -73,4 +77,5 @@ message TControlPlaneStorageConfig { bool DumpRawStatistics = 32; bool IgnorePrivateSources = 33; Ydb.Query.StatsMode StatsMode = 34; + TStreamingQueryConfig StreamingQueryConfig = 35; } diff --git a/ydb/core/fq/libs/control_plane_storage/config.cpp b/ydb/core/fq/libs/control_plane_storage/config.cpp index 9661bdedf166..ea1f0a787d4c 100644 --- a/ydb/core/fq/libs/control_plane_storage/config.cpp +++ b/ydb/core/fq/libs/control_plane_storage/config.cpp @@ -40,6 +40,10 @@ TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPl AvailableBindings.insert(GetBindingType(availableBinding)); } + for (const auto& availableConnection : Proto.GetStreamingQueryConfig().GetAvailableConnections()) { + StreamingQueryConfig.AvailableConnections.insert(GetConnectionType(availableConnection)); + } + GeneratorPathsLimit = s3Config.HasGeneratorPathsLimit() ? s3Config.GetGeneratorPathsLimit() : 50'000; diff --git a/ydb/core/fq/libs/control_plane_storage/config.h b/ydb/core/fq/libs/control_plane_storage/config.h index fc4fcf489364..7742b09bd87c 100644 --- a/ydb/core/fq/libs/control_plane_storage/config.h +++ b/ydb/core/fq/libs/control_plane_storage/config.h @@ -13,6 +13,10 @@ namespace NFq { +struct TStreamingQueryConfig { + TSet AvailableConnections; +}; + struct TControlPlaneStorageConfig { NConfig::TControlPlaneStorageConfig Proto; NConfig::TComputeConfig ComputeConfigProto; @@ -30,6 +34,7 @@ struct TControlPlaneStorageConfig { TRetryPolicyItem TaskLeaseRetryPolicy; TDuration QuotaTtl; TDuration MetricsTtl; + TStreamingQueryConfig StreamingQueryConfig; TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, const NConfig::TComputeConfig& computeConfigProto); }; diff --git a/ydb/core/fq/libs/control_plane_storage/util.cpp b/ydb/core/fq/libs/control_plane_storage/util.cpp index c31aa8e20b01..54a6753edca1 100644 --- a/ydb/core/fq/libs/control_plane_storage/util.cpp +++ b/ydb/core/fq/libs/control_plane_storage/util.cpp @@ -161,6 +161,11 @@ NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlane config.SetResultSetsTtl("1d"); } + if (!config.HasStreamingQueryConfig()) { + // For backward compatibility + config.MutableStreamingQueryConfig()->MutableAvailableConnections()->CopyFrom(config.GetAvailableConnection()); + } + return config; } diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index 19bdf5360134..db44d871ce65 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -219,7 +219,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery *queryInternal.mutable_compute_connection() = computeDatabase.connection(); TSet disabledConnections; for (const auto& connection: GetEntities(resultSets[resultSets.size() - 2], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources(), commonCounters)) { - if (!Config->AvailableConnections.contains(connection.content().setting().connection_case())) { + auto connectionCase = connection.content().setting().connection_case(); + if (!Config->AvailableConnections.contains(connectionCase) || !Config->StreamingQueryConfig.AvailableConnections.contains(connectionCase)) { disabledConnections.insert(connection.meta().id()); continue; } From 4bc116694ce29a15697e3e412abac16525f5752d Mon Sep 17 00:00:00 2001 From: Egor Zudin Date: Tue, 19 Dec 2023 14:25:10 +0000 Subject: [PATCH 2/5] Check if query is actually streaming --- .../ydb_control_plane_storage_queries.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index db44d871ce65..5387762646c0 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -220,7 +220,11 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery TSet disabledConnections; for (const auto& connection: GetEntities(resultSets[resultSets.size() - 2], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources(), commonCounters)) { auto connectionCase = connection.content().setting().connection_case(); - if (!Config->AvailableConnections.contains(connectionCase) || !Config->StreamingQueryConfig.AvailableConnections.contains(connectionCase)) { + if (!Config->AvailableConnections.contains(connectionCase)) { + disabledConnections.insert(connection.meta().id()); + continue; + } + if ((queryType == FederatedQuery::QueryContent::STREAMING) && !Config->StreamingQueryConfig.AvailableConnections.contains(connectionCase)) { disabledConnections.insert(connection.meta().id()); continue; } From a57877666b9dc1112e9d0123dfd720ea9ab01fec Mon Sep 17 00:00:00 2001 From: Egor Zudin Date: Tue, 19 Dec 2023 15:22:58 +0000 Subject: [PATCH 3/5] Add check to ModifyQuery --- .../ydb_control_plane_storage_queries.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index 5387762646c0..ae2c817d809a 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -893,7 +893,12 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery // TODO: move to run actor priority selection TSet disabledConnections; for (const auto& connection: GetEntities(resultSets[resultSets.size() - 3], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources(), commonCounters)) { - if (!Config->AvailableConnections.contains(connection.content().setting().connection_case())) { + auto connectionCase = connection.content().setting().connection_case(); + if (!Config->AvailableConnections.contains(connectionCase)) { + disabledConnections.insert(connection.meta().id()); + continue; + } + if ((request.content().type() == FederatedQuery::QueryContent::STREAMING) && !Config->StreamingQueryConfig.AvailableConnections.contains(connectionCase)) { disabledConnections.insert(connection.meta().id()); continue; } From c55d1d7964b98f64d64b4b530b8d7858f291f91d Mon Sep 17 00:00:00 2001 From: Egor Zudin Date: Tue, 19 Dec 2023 15:25:36 +0000 Subject: [PATCH 4/5] Add todo comment --- ydb/core/fq/libs/control_plane_storage/util.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/control_plane_storage/util.cpp b/ydb/core/fq/libs/control_plane_storage/util.cpp index 54a6753edca1..8106443506c1 100644 --- a/ydb/core/fq/libs/control_plane_storage/util.cpp +++ b/ydb/core/fq/libs/control_plane_storage/util.cpp @@ -162,7 +162,7 @@ NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlane } if (!config.HasStreamingQueryConfig()) { - // For backward compatibility + // For backward compatibility, TODO: YQ-2628, remove after config update on every cluster config.MutableStreamingQueryConfig()->MutableAvailableConnections()->CopyFrom(config.GetAvailableConnection()); } From fff5f0e8b89291ef2331a863963bd39df915a49c Mon Sep 17 00:00:00 2001 From: Egor Zudin Date: Wed, 20 Dec 2023 08:02:48 +0000 Subject: [PATCH 5/5] Remove separate config for streaming --- .../fq/libs/config/protos/control_plane_storage.proto | 6 +----- ydb/core/fq/libs/control_plane_storage/config.cpp | 4 ++-- ydb/core/fq/libs/control_plane_storage/config.h | 6 +----- ydb/core/fq/libs/control_plane_storage/util.cpp | 4 ++-- .../ydb_control_plane_storage_queries.cpp | 9 ++------- 5 files changed, 8 insertions(+), 21 deletions(-) diff --git a/ydb/core/fq/libs/config/protos/control_plane_storage.proto b/ydb/core/fq/libs/config/protos/control_plane_storage.proto index 884a641059c3..1c4b668d4ccc 100644 --- a/ydb/core/fq/libs/config/protos/control_plane_storage.proto +++ b/ydb/core/fq/libs/config/protos/control_plane_storage.proto @@ -38,10 +38,6 @@ message TRetryPolicyMapping { TRetryPolicy Policy = 2; } -message TStreamingQueryConfig { - repeated string AvailableConnections = 22; -} - message TControlPlaneStorageConfig { bool Enabled = 1; NFq.NConfig.TYdbStorageConfig Storage = 2; // TODO: remove @@ -77,5 +73,5 @@ message TControlPlaneStorageConfig { bool DumpRawStatistics = 32; bool IgnorePrivateSources = 33; Ydb.Query.StatsMode StatsMode = 34; - TStreamingQueryConfig StreamingQueryConfig = 35; + repeated string AvailableStreamingConnection = 35; } diff --git a/ydb/core/fq/libs/control_plane_storage/config.cpp b/ydb/core/fq/libs/control_plane_storage/config.cpp index ea1f0a787d4c..2cd4dd6bbe74 100644 --- a/ydb/core/fq/libs/control_plane_storage/config.cpp +++ b/ydb/core/fq/libs/control_plane_storage/config.cpp @@ -40,8 +40,8 @@ TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPl AvailableBindings.insert(GetBindingType(availableBinding)); } - for (const auto& availableConnection : Proto.GetStreamingQueryConfig().GetAvailableConnections()) { - StreamingQueryConfig.AvailableConnections.insert(GetConnectionType(availableConnection)); + for (const auto& availableConnection : Proto.GetAvailableStreamingConnection()) { + AvailableStreamingConnections.insert(GetConnectionType(availableConnection)); } GeneratorPathsLimit = diff --git a/ydb/core/fq/libs/control_plane_storage/config.h b/ydb/core/fq/libs/control_plane_storage/config.h index 7742b09bd87c..16c8b9610506 100644 --- a/ydb/core/fq/libs/control_plane_storage/config.h +++ b/ydb/core/fq/libs/control_plane_storage/config.h @@ -13,10 +13,6 @@ namespace NFq { -struct TStreamingQueryConfig { - TSet AvailableConnections; -}; - struct TControlPlaneStorageConfig { NConfig::TControlPlaneStorageConfig Proto; NConfig::TComputeConfig ComputeConfigProto; @@ -34,7 +30,7 @@ struct TControlPlaneStorageConfig { TRetryPolicyItem TaskLeaseRetryPolicy; TDuration QuotaTtl; TDuration MetricsTtl; - TStreamingQueryConfig StreamingQueryConfig; + TSet AvailableStreamingConnections; TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, const NConfig::TComputeConfig& computeConfigProto); }; diff --git a/ydb/core/fq/libs/control_plane_storage/util.cpp b/ydb/core/fq/libs/control_plane_storage/util.cpp index 8106443506c1..74c5bd866406 100644 --- a/ydb/core/fq/libs/control_plane_storage/util.cpp +++ b/ydb/core/fq/libs/control_plane_storage/util.cpp @@ -161,9 +161,9 @@ NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlane config.SetResultSetsTtl("1d"); } - if (!config.HasStreamingQueryConfig()) { + if (config.AvailableStreamingConnectionSize() == 0) { // For backward compatibility, TODO: YQ-2628, remove after config update on every cluster - config.MutableStreamingQueryConfig()->MutableAvailableConnections()->CopyFrom(config.GetAvailableConnection()); + config.MutableAvailableStreamingConnection()->CopyFrom(config.GetAvailableConnection()); } return config; diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index ae2c817d809a..7ab56126f135 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -224,7 +224,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery disabledConnections.insert(connection.meta().id()); continue; } - if ((queryType == FederatedQuery::QueryContent::STREAMING) && !Config->StreamingQueryConfig.AvailableConnections.contains(connectionCase)) { + if ((queryType == FederatedQuery::QueryContent::STREAMING) && !Config->AvailableStreamingConnections.contains(connectionCase)) { disabledConnections.insert(connection.meta().id()); continue; } @@ -893,12 +893,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery // TODO: move to run actor priority selection TSet disabledConnections; for (const auto& connection: GetEntities(resultSets[resultSets.size() - 3], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources(), commonCounters)) { - auto connectionCase = connection.content().setting().connection_case(); - if (!Config->AvailableConnections.contains(connectionCase)) { - disabledConnections.insert(connection.meta().id()); - continue; - } - if ((request.content().type() == FederatedQuery::QueryContent::STREAMING) && !Config->StreamingQueryConfig.AvailableConnections.contains(connectionCase)) { + if (!Config->AvailableConnections.contains(connection.content().setting().connection_case())) { disabledConnections.insert(connection.meta().id()); continue; }