From 9bc0b0dcd0f0358e3c35ccf306192040909051f9 Mon Sep 17 00:00:00 2001 From: Adi Suissa-Peleg Date: Mon, 16 Sep 2024 14:59:43 +0000 Subject: [PATCH] ads-replacement: adding gRPC-mux support for ADS config replacement Signed-off-by: Adi Suissa-Peleg --- envoy/config/grpc_mux.h | 13 ++ source/common/config/null_grpc_mux_impl.h | 6 + .../grpc/grpc_mux_failover.h | 14 +- .../config_subscription/grpc/grpc_mux_impl.cc | 77 +++++++--- .../config_subscription/grpc/grpc_mux_impl.h | 17 ++- .../grpc/new_grpc_mux_impl.cc | 79 +++++++--- .../grpc/new_grpc_mux_impl.h | 19 ++- .../config_subscription/grpc/watch_map.cc | 4 +- .../config_subscription/grpc/watch_map.h | 8 +- .../grpc/xds_mux/grpc_mux_impl.cc | 99 ++++++++---- .../grpc/xds_mux/grpc_mux_impl.h | 32 +++- .../extensions/config_subscription/grpc/BUILD | 3 + .../grpc/grpc_mux_failover_test.cc | 50 ++++-- .../grpc/grpc_mux_impl_test.cc | 143 +++++++++++++++++- .../grpc/new_grpc_mux_impl_test.cc | 139 ++++++++++++++++- .../grpc/watch_map_test.cc | 24 +-- .../grpc/xds_grpc_mux_impl_test.cc | 131 +++++++++++++++- 17 files changed, 742 insertions(+), 116 deletions(-) diff --git a/envoy/config/grpc_mux.h b/envoy/config/grpc_mux.h index a18c87bc193e..cb7e4f21a8f9 100644 --- a/envoy/config/grpc_mux.h +++ b/envoy/config/grpc_mux.h @@ -2,10 +2,13 @@ #include +#include "envoy/common/backoff_strategy.h" #include "envoy/common/exception.h" #include "envoy/common/pure.h" +#include "envoy/config/custom_config_validators.h" #include "envoy/config/eds_resources_cache.h" #include "envoy/config/subscription.h" +#include "envoy/grpc/async_client.h" #include "envoy/stats/stats_macros.h" #include "source/common/common/cleanup.h" @@ -112,6 +115,16 @@ class GrpcMux { * @return EdsResourcesCacheOptRef optional eds resources cache for the gRPC-mux. */ virtual EdsResourcesCacheOptRef edsResourcesCache() PURE; + + /** + * Updates the current gRPC-Mux object to use a new gRPC client, and config. + */ + virtual absl::Status + updateMuxSource(Grpc::RawAsyncClientPtr primary_async_client, + Grpc::RawAsyncClientPtr failover_async_client, + CustomConfigValidatorsPtr custom_config_validators, Stats::Scope& scope, + BackOffStrategyPtr backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source) PURE; }; using GrpcMuxPtr = std::unique_ptr; diff --git a/source/common/config/null_grpc_mux_impl.h b/source/common/config/null_grpc_mux_impl.h index 453d723eb32d..e3ae70852836 100644 --- a/source/common/config/null_grpc_mux_impl.h +++ b/source/common/config/null_grpc_mux_impl.h @@ -27,6 +27,12 @@ class NullGrpcMuxImpl : public GrpcMux, ENVOY_BUG(false, "unexpected request for on demand update"); } + absl::Status updateMuxSource(Grpc::RawAsyncClientPtr, Grpc::RawAsyncClientPtr, + CustomConfigValidatorsPtr, Stats::Scope&, BackOffStrategyPtr, + const envoy::config::core::v3::ApiConfigSource&) override { + return absl::UnimplementedError(""); + } + EdsResourcesCacheOptRef edsResourcesCache() override { return absl::nullopt; } void onWriteable() override {} diff --git a/source/extensions/config_subscription/grpc/grpc_mux_failover.h b/source/extensions/config_subscription/grpc/grpc_mux_failover.h index 19765df61fc7..98dff08e51a1 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_failover.h +++ b/source/extensions/config_subscription/grpc/grpc_mux_failover.h @@ -6,6 +6,8 @@ namespace Envoy { namespace Config { +class GrpcMuxFailoverTest; + /** * This class arbitrates between two config providers of the same GrpcMux - * the primary and the failover. Envoy always prefers fetching config from the @@ -188,6 +190,8 @@ class GrpcMuxFailover : public GrpcStreamInterface, } private: + friend class GrpcMuxFailoverTest; + // A helper class that proxies the callbacks of GrpcStreamCallbacks for the primary service. class PrimaryGrpcStreamCallbacks : public GrpcStreamCallbacks { public: @@ -356,7 +360,15 @@ class GrpcMuxFailover : public GrpcStreamInterface, void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override { PANIC("not implemented"); } - void closeStream() override { PANIC("not implemented"); } + void closeStream() override { + if (connectingToOrConnectedToPrimary()) { + ENVOY_LOG_MISC(debug, "Intentionally closing the primary gRPC stream"); + primary_grpc_stream_->closeStream(); + } else if (connectingToOrConnectedToFailover()) { + ENVOY_LOG_MISC(debug, "Intentionally closing the failover gRPC stream"); + failover_grpc_stream_->closeStream(); + } + } // The stream callbacks that will be invoked on the GrpcMux object, to notify // about the state of the underlying primary/failover stream. diff --git a/source/extensions/config_subscription/grpc/grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/grpc_mux_impl.cc index 4317cdca1a2c..a17d5a9798ef 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/grpc_mux_impl.cc @@ -60,14 +60,18 @@ std::string convertToWildcard(const std::string& resource_name) { } // namespace GrpcMuxImpl::GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node) - : grpc_stream_(createGrpcStreamObject(grpc_mux_context)), + : dispatcher_(grpc_mux_context.dispatcher_), + grpc_stream_(createGrpcStreamObject(std::move(grpc_mux_context.async_client_), + std::move(grpc_mux_context.failover_async_client_), + grpc_mux_context.service_method_, grpc_mux_context.scope_, + std::move(grpc_mux_context.backoff_strategy_), + grpc_mux_context.rate_limit_settings_)), local_info_(grpc_mux_context.local_info_), skip_subsequent_node_(skip_subsequent_node), config_validators_(std::move(grpc_mux_context.config_validators_)), xds_config_tracker_(grpc_mux_context.xds_config_tracker_), xds_resources_delegate_(grpc_mux_context.xds_resources_delegate_), eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)), target_xds_authority_(grpc_mux_context.target_xds_authority_), - dispatcher_(grpc_mux_context.dispatcher_), dynamic_update_callback_handle_( grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback( [this](absl::string_view resource_type_url) { @@ -80,29 +84,33 @@ GrpcMuxImpl::GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_ std::unique_ptr> -GrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) { +GrpcMuxImpl::createGrpcStreamObject(Grpc::RawAsyncClientPtr async_client, + Grpc::RawAsyncClientPtr failover_async_client, + const Protobuf::MethodDescriptor& service_method, + Stats::Scope& scope, BackOffStrategyPtr backoff_strategy, + const RateLimitSettings& rate_limit_settings) { if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) { return std::make_unique>( /*primary_stream_creator=*/ - [&grpc_mux_context]( + [&async_client, &service_method, &dispatcher = dispatcher_, &scope, &backoff_strategy, + &rate_limit_settings]( GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr { return std::make_unique>( - callbacks, std::move(grpc_mux_context.async_client_), - grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, - grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_), - grpc_mux_context.rate_limit_settings_, + callbacks, std::move(async_client), service_method, dispatcher, scope, + std::move(backoff_strategy), rate_limit_settings, GrpcStream::ConnectedStateValue:: FIRST_ENTRY); }, /*failover_stream_creator=*/ - grpc_mux_context.failover_async_client_ + failover_async_client ? absl::make_optional( - [&grpc_mux_context]( + [&failover_async_client, &service_method, &dispatcher = dispatcher_, &scope, + &rate_limit_settings]( GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr>( - callbacks, std::move(grpc_mux_context.failover_async_client_), - grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, - grpc_mux_context.scope_, + callbacks, std::move(failover_async_client), service_method, dispatcher, + scope, // TODO(adisuissa): the backoff strategy for the failover should // be the same as the primary source. std::make_unique( GrpcMuxFailover:: DefaultFailoverBackoffMilliseconds), - grpc_mux_context.rate_limit_settings_, + rate_limit_settings, GrpcStream:: ConnectedStateValue::SECOND_ENTRY); }) : absl::nullopt, /*grpc_mux_callbacks=*/*this, - /*dispatch=*/grpc_mux_context.dispatcher_); + /*dispatch=*/dispatcher_); } return std::make_unique>( - this, std::move(grpc_mux_context.async_client_), grpc_mux_context.service_method_, - grpc_mux_context.dispatcher_, grpc_mux_context.scope_, - std::move(grpc_mux_context.backoff_strategy_), grpc_mux_context.rate_limit_settings_, + this, std::move(async_client), service_method, dispatcher_, scope, + std::move(backoff_strategy), rate_limit_settings, GrpcStream< envoy::service::discovery::v3::DiscoveryRequest, envoy::service::discovery::v3::DiscoveryResponse>::ConnectedStateValue::FIRST_ENTRY); @@ -292,6 +298,41 @@ GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url, return watch; } +// void GrpcMuxImpl::updateMuxSource(Grpc::RawAsyncClientPtr primary_async_client, +// Grpc::RawAsyncClientPtr failover_async_client, CustomConfigValidatorsPtr +// custom_config_validators, BackOffStrategyPtr backoff_strategy, const +// envoy::config::core::v3::ApiConfigSource& ads_config_source) { +absl::Status +GrpcMuxImpl::updateMuxSource(Grpc::RawAsyncClientPtr primary_async_client, + Grpc::RawAsyncClientPtr failover_async_client, + CustomConfigValidatorsPtr custom_config_validators, + Stats::Scope& scope, BackOffStrategyPtr backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source) { + // Process the rate limit settings. + absl::StatusOr rate_limit_settings_or_error = + Utility::parseRateLimitSettings(ads_config_source); + RETURN_IF_NOT_OK_REF(rate_limit_settings_or_error.status()); + + const Protobuf::MethodDescriptor& service_method = + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"); + + // Disconnect from current xDS servers. + ENVOY_LOG_MISC(info, "Replacing xDS gRPC mux source"); + grpc_stream_->closeStream(); + grpc_stream_ = createGrpcStreamObject(std::move(primary_async_client), + std::move(failover_async_client), service_method, scope, + std::move(backoff_strategy), *rate_limit_settings_or_error); + + // Update the config validators. + config_validators_ = std::move(custom_config_validators); + + // Start the susbcriptions over the grpc_stream. + grpc_stream_->establishNewStream(); + + return absl::OkStatus(); +} + ScopedResume GrpcMuxImpl::pause(const std::string& type_url) { return pause(std::vector{type_url}); } diff --git a/source/extensions/config_subscription/grpc/grpc_mux_impl.h b/source/extensions/config_subscription/grpc/grpc_mux_impl.h index 885942f3fb2a..28451ed798ea 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/grpc_mux_impl.h @@ -73,6 +73,13 @@ class GrpcMuxImpl : public GrpcMux, return makeOptRefFromPtr(eds_resources_cache_.get()); } + absl::Status + updateMuxSource(Grpc::RawAsyncClientPtr primary_async_client, + Grpc::RawAsyncClientPtr failover_async_client, + CustomConfigValidatorsPtr custom_config_validators, Stats::Scope& scope, + BackOffStrategyPtr backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source) override; + void handleDiscoveryResponse( std::unique_ptr&& message); @@ -100,11 +107,13 @@ class GrpcMuxImpl : public GrpcMux, private: // Helper function to create the grpc_stream_ object. - // TODO(adisuissa): this should be removed when envoy.restart_features.xds_failover_support - // is deprecated. std::unique_ptr> - createGrpcStreamObject(GrpcMuxContext& grpc_mux_context); + createGrpcStreamObject(Grpc::RawAsyncClientPtr async_client, + Grpc::RawAsyncClientPtr failover_async_client, + const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, + BackOffStrategyPtr backoff_strategy, + const RateLimitSettings& rate_limit_settings); void drainRequests(); void setRetryTimer(); @@ -272,6 +281,7 @@ class GrpcMuxImpl : public GrpcMux, ApiState& api_state, const std::string& type_url, const std::string& version_info, bool call_delegate); + Event::Dispatcher& dispatcher_; // Multiplexes the stream to the primary and failover sources. // TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated, // convert from unique_ptr to GrpcMuxFailover directly. @@ -301,7 +311,6 @@ class GrpcMuxImpl : public GrpcMux, // URL. std::unique_ptr> request_queue_; - Event::Dispatcher& dispatcher_; Common::CallbackHandlePtr dynamic_update_callback_handle_; bool started_{false}; diff --git a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc index 1a47a63572c7..fa926597d6f3 100644 --- a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc @@ -36,7 +36,12 @@ using AllMuxes = ThreadSafeSingleton; } // namespace NewGrpcMuxImpl::NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context) - : grpc_stream_(createGrpcStreamObject(grpc_mux_context)), + : dispatcher_(grpc_mux_context.dispatcher_), + grpc_stream_(createGrpcStreamObject(std::move(grpc_mux_context.async_client_), + std::move(grpc_mux_context.failover_async_client_), + grpc_mux_context.service_method_, grpc_mux_context.scope_, + std::move(grpc_mux_context.backoff_strategy_), + grpc_mux_context.rate_limit_settings_)), local_info_(grpc_mux_context.local_info_), config_validators_(std::move(grpc_mux_context.config_validators_)), dynamic_update_callback_handle_( @@ -45,7 +50,6 @@ NewGrpcMuxImpl::NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context) onDynamicContextUpdate(resource_type_url); return absl::OkStatus(); })), - dispatcher_(grpc_mux_context.dispatcher_), xds_config_tracker_(grpc_mux_context.xds_config_tracker_), eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)) { AllMuxes::get().insert(this); @@ -53,30 +57,34 @@ NewGrpcMuxImpl::NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context) std::unique_ptr> -NewGrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) { +NewGrpcMuxImpl::createGrpcStreamObject(Grpc::RawAsyncClientPtr async_client, + Grpc::RawAsyncClientPtr failover_async_client, + const Protobuf::MethodDescriptor& service_method, + Stats::Scope& scope, BackOffStrategyPtr backoff_strategy, + const RateLimitSettings& rate_limit_settings) { if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) { return std::make_unique>( /*primary_stream_creator=*/ - [&grpc_mux_context]( + [&async_client, &service_method, &dispatcher = dispatcher_, &scope, &backoff_strategy, + &rate_limit_settings]( GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr { return std::make_unique< GrpcStream>( - callbacks, std::move(grpc_mux_context.async_client_), - grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, - grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_), - grpc_mux_context.rate_limit_settings_, + callbacks, std::move(async_client), service_method, dispatcher, scope, + std::move(backoff_strategy), rate_limit_settings, GrpcStream:: ConnectedStateValue::FIRST_ENTRY); }, /*failover_stream_creator=*/ - grpc_mux_context.failover_async_client_ + failover_async_client ? absl::make_optional( - [&grpc_mux_context]( + [&failover_async_client, &service_method, &dispatcher = dispatcher_, &scope, + &rate_limit_settings]( GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr< @@ -85,29 +93,27 @@ NewGrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) { return std::make_unique< GrpcStream>( - callbacks, std::move(grpc_mux_context.failover_async_client_), - grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, - grpc_mux_context.scope_, + callbacks, std::move(failover_async_client), service_method, dispatcher, + scope, // TODO(adisuissa): the backoff strategy for the failover should // be the same as the primary source. std::make_unique( GrpcMuxFailover:: DefaultFailoverBackoffMilliseconds), - grpc_mux_context.rate_limit_settings_, + rate_limit_settings, GrpcStream:: ConnectedStateValue::SECOND_ENTRY); }) : absl::nullopt, /*grpc_mux_callbacks=*/*this, - /*dispatch=*/grpc_mux_context.dispatcher_); + /*dispatch=*/dispatcher_); } return std::make_unique>( - this, std::move(grpc_mux_context.async_client_), grpc_mux_context.service_method_, - grpc_mux_context.dispatcher_, grpc_mux_context.scope_, - std::move(grpc_mux_context.backoff_strategy_), grpc_mux_context.rate_limit_settings_, + this, std::move(async_client), service_method, dispatcher_, scope, + std::move(backoff_strategy), rate_limit_settings, GrpcStream< envoy::service::discovery::v3::DeltaDiscoveryRequest, envoy::service::discovery::v3::DeltaDiscoveryResponse>::ConnectedStateValue::FIRST_ENTRY); @@ -246,6 +252,41 @@ GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url, return std::make_unique(type_url, watch, *this, options); } +absl::Status +NewGrpcMuxImpl::updateMuxSource(Grpc::RawAsyncClientPtr primary_async_client, + Grpc::RawAsyncClientPtr failover_async_client, + CustomConfigValidatorsPtr custom_config_validators, + Stats::Scope& scope, BackOffStrategyPtr backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source) { + // Process the rate limit settings. + absl::StatusOr rate_limit_settings_or_error = + Utility::parseRateLimitSettings(ads_config_source); + RETURN_IF_NOT_OK_REF(rate_limit_settings_or_error.status()); + + const Protobuf::MethodDescriptor& service_method = + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"); + + // Disconnect from current xDS servers. + ENVOY_LOG_MISC(info, "Replacing the xDS gRPC mux source"); + grpc_stream_->closeStream(); + grpc_stream_ = createGrpcStreamObject(std::move(primary_async_client), + std::move(failover_async_client), service_method, scope, + std::move(backoff_strategy), *rate_limit_settings_or_error); + + // Update the config validators. + config_validators_ = std::move(custom_config_validators); + // Update the watch map's config validators. + for (auto& [type_url, subscription] : subscriptions_) { + subscription->watch_map_.setConfigValidators(config_validators_.get()); + } + + // Start the susbcriptions over the grpc_stream. + grpc_stream_->establishNewStream(); + + return absl::OkStatus(); +} + // Updates the list of resource names watched by the given watch. If an added name is new across // the whole subscription, or if a removed name has no other watch interested in it, then the // subscription will enqueue and attempt to send an appropriate discovery request. @@ -320,7 +361,7 @@ void NewGrpcMuxImpl::addSubscription(const std::string& type_url, } subscriptions_.emplace( type_url, std::make_unique(type_url, local_info_, use_namespace_matching, - dispatcher_, *config_validators_.get(), + dispatcher_, config_validators_.get(), xds_config_tracker_, resources_cache)); subscription_ordering_.emplace_back(type_url); } diff --git a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h index 741ea6856e45..40619260b839 100644 --- a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h @@ -78,6 +78,13 @@ class NewGrpcMuxImpl // TODO(fredlas) remove this from the GrpcMux interface. void start() override; + absl::Status + updateMuxSource(Grpc::RawAsyncClientPtr primary_async_client, + Grpc::RawAsyncClientPtr failover_async_client, + CustomConfigValidatorsPtr custom_config_validators, Stats::Scope& scope, + BackOffStrategyPtr backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source) override; + GrpcStreamInterface& grpcStreamForTest() { @@ -95,7 +102,7 @@ class NewGrpcMuxImpl struct SubscriptionStuff { SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info, const bool use_namespace_matching, Event::Dispatcher& dispatcher, - CustomConfigValidators& config_validators, + CustomConfigValidators* config_validators, XdsConfigTrackerOptRef xds_config_tracker, EdsResourcesCacheOptRef eds_resources_cache) : watch_map_(use_namespace_matching, type_url, config_validators, eds_resources_cache), @@ -149,11 +156,13 @@ class NewGrpcMuxImpl }; // Helper function to create the grpc_stream_ object. - // TODO(adisuissa): this should be removed when envoy.restart_features.xds_failover_support - // is deprecated. std::unique_ptr> - createGrpcStreamObject(GrpcMuxContext& grpc_mux_context); + createGrpcStreamObject(Grpc::RawAsyncClientPtr async_client, + Grpc::RawAsyncClientPtr failover_async_client, + const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, + BackOffStrategyPtr backoff_strategy, + const RateLimitSettings& rate_limit_settings); void removeWatch(const std::string& type_url, Watch* watch); @@ -196,6 +205,7 @@ class NewGrpcMuxImpl // the order of Envoy's dependency ordering). std::list subscription_ordering_; + Event::Dispatcher& dispatcher_; // Multiplexes the stream to the primary and failover sources. // TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated, // convert from unique_ptr to GrpcMuxFailover directly. @@ -206,7 +216,6 @@ class NewGrpcMuxImpl const LocalInfo::LocalInfo& local_info_; CustomConfigValidatorsPtr config_validators_; Common::CallbackHandlePtr dynamic_update_callback_handle_; - Event::Dispatcher& dispatcher_; XdsConfigTrackerOptRef xds_config_tracker_; EdsResourcesCachePtr eds_resources_cache_; diff --git a/source/extensions/config_subscription/grpc/watch_map.cc b/source/extensions/config_subscription/grpc/watch_map.cc index 5b40d625fadb..c8ffbb251e5c 100644 --- a/source/extensions/config_subscription/grpc/watch_map.cc +++ b/source/extensions/config_subscription/grpc/watch_map.cc @@ -154,7 +154,7 @@ void WatchMap::onConfigUpdate(const std::vector& resources, } // Execute external config validators. - config_validators_.executeValidators(type_url_, resources); + config_validators_->executeValidators(type_url_, resources); const bool map_is_single_wildcard = (watches_.size() == 1 && wildcard_watches_.size() == 1); // We just bundled up the updates into nice per-watch packages. Now, deliver them. @@ -249,7 +249,7 @@ void WatchMap::onConfigUpdate( } // Execute external config validators. - config_validators_.executeValidators(type_url_, decoded_resources, removed_resources); + config_validators_->executeValidators(type_url_, decoded_resources, removed_resources); // We just bundled up the updates into nice per-watch packages. Now, deliver them. for (const auto& [cur_watch, resource_to_add] : per_watch_added) { diff --git a/source/extensions/config_subscription/grpc/watch_map.h b/source/extensions/config_subscription/grpc/watch_map.h index 47cd43b4581f..07140804edee 100644 --- a/source/extensions/config_subscription/grpc/watch_map.h +++ b/source/extensions/config_subscription/grpc/watch_map.h @@ -73,7 +73,7 @@ struct Watch { class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable { public: WatchMap(const bool use_namespace_matching, const std::string& type_url, - CustomConfigValidators& config_validators, EdsResourcesCacheOptRef eds_resources_cache) + CustomConfigValidators* config_validators, EdsResourcesCacheOptRef eds_resources_cache) : use_namespace_matching_(use_namespace_matching), type_url_(type_url), config_validators_(config_validators), eds_resources_cache_(eds_resources_cache) { // If eds resources cache is provided, then the type must be ClusterLoadAssignment. @@ -114,6 +114,10 @@ class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable; template GrpcMuxImpl::GrpcMuxImpl(std::unique_ptr subscription_state_factory, GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node) - : grpc_stream_(createGrpcStreamObject(grpc_mux_context)), + : dispatcher_(grpc_mux_context.dispatcher_), + grpc_stream_(createGrpcStreamObject(std::move(grpc_mux_context.async_client_), + std::move(grpc_mux_context.failover_async_client_), + grpc_mux_context.service_method_, grpc_mux_context.scope_, + std::move(grpc_mux_context.backoff_strategy_), + grpc_mux_context.rate_limit_settings_)), subscription_state_factory_(std::move(subscription_state_factory)), skip_subsequent_node_(skip_subsequent_node), local_info_(grpc_mux_context.local_info_), dynamic_update_callback_handle_( @@ -59,44 +64,46 @@ GrpcMuxImpl::GrpcMuxImpl(std::unique_ptr subscription_state_fac } template -std::unique_ptr> -GrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) { +std::unique_ptr> GrpcMuxImpl::createGrpcStreamObject( + Grpc::RawAsyncClientPtr async_client, Grpc::RawAsyncClientPtr failover_async_client, + const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, + BackOffStrategyPtr backoff_strategy, const RateLimitSettings& rate_limit_settings) { if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) { return std::make_unique>( /*primary_stream_creator=*/ - [&grpc_mux_context](GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr { + [&async_client, &service_method, &dispatcher = dispatcher_, &scope, &backoff_strategy, + &rate_limit_settings]( + GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr { return std::make_unique>( - callbacks, std::move(grpc_mux_context.async_client_), - grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, - grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_), - grpc_mux_context.rate_limit_settings_, + callbacks, std::move(async_client), service_method, dispatcher, scope, + std::move(backoff_strategy), rate_limit_settings, GrpcStream::ConnectedStateValue::FIRST_ENTRY); }, /*failover_stream_creator=*/ - grpc_mux_context.failover_async_client_ - ? absl::make_optional([&grpc_mux_context](GrpcStreamCallbacks* callbacks) - -> GrpcStreamInterfacePtr { - return std::make_unique>( - callbacks, std::move(grpc_mux_context.failover_async_client_), - grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, - grpc_mux_context.scope_, - // TODO(adisuissa): the backoff strategy for the failover should - // be the same as the primary source. - std::make_unique( - GrpcMuxFailover::DefaultFailoverBackoffMilliseconds), - grpc_mux_context.rate_limit_settings_, - GrpcStream::ConnectedStateValue::SECOND_ENTRY); - }) + failover_async_client + ? absl::make_optional( + [&failover_async_client, &service_method, &dispatcher = dispatcher_, &scope, + &rate_limit_settings]( + GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr { + return std::make_unique>( + callbacks, std::move(failover_async_client), service_method, dispatcher, + scope, + // TODO(adisuissa): the backoff strategy for the failover should + // be the same as the primary source. + std::make_unique( + GrpcMuxFailover::DefaultFailoverBackoffMilliseconds), + rate_limit_settings, GrpcStream::ConnectedStateValue::SECOND_ENTRY); + }) : absl::nullopt, /*grpc_mux_callbacks=*/*this, - /*dispatch=*/grpc_mux_context.dispatcher_); + /*dispatch=*/dispatcher_); } - return std::make_unique>( - this, std::move(grpc_mux_context.async_client_), grpc_mux_context.service_method_, - grpc_mux_context.dispatcher_, grpc_mux_context.scope_, - std::move(grpc_mux_context.backoff_strategy_), grpc_mux_context.rate_limit_settings_, - GrpcStream::ConnectedStateValue::FIRST_ENTRY); + return std::make_unique>(this, std::move(async_client), service_method, + dispatcher_, scope, std::move(backoff_strategy), + rate_limit_settings, + GrpcStream::ConnectedStateValue::FIRST_ENTRY); } + template GrpcMuxImpl::~GrpcMuxImpl() { AllMuxes::get().erase(this); } @@ -134,7 +141,7 @@ Config::GrpcMuxWatchPtr GrpcMuxImpl::addWatch( watch_map = watch_maps_ .emplace(type_url, std::make_unique(options.use_namespace_matching_, type_url, - *config_validators_.get(), resources_cache)) + config_validators_.get(), resources_cache)) .first; subscriptions_.emplace(type_url, subscription_state_factory_->makeSubscriptionState( type_url, *watch_maps_[type_url], resource_decoder, @@ -221,6 +228,40 @@ ScopedResume GrpcMuxImpl::pause(const std::vector typ }); } +template +absl::Status GrpcMuxImpl::updateMuxSource( + Grpc::RawAsyncClientPtr primary_async_client, Grpc::RawAsyncClientPtr failover_async_client, + CustomConfigValidatorsPtr custom_config_validators, Stats::Scope& scope, + BackOffStrategyPtr backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source) { + // Process the rate limit settings. + absl::StatusOr rate_limit_settings_or_error = + Utility::parseRateLimitSettings(ads_config_source); + RETURN_IF_NOT_OK_REF(rate_limit_settings_or_error.status()); + + const Protobuf::MethodDescriptor& service_method = + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(methodName()); + + // Disconnect from current xDS servers. + ENVOY_LOG_MISC(info, "Replacing the xDS gRPC mux source"); + grpc_stream_->closeStream(); + grpc_stream_ = createGrpcStreamObject(std::move(primary_async_client), + std::move(failover_async_client), service_method, scope, + std::move(backoff_strategy), *rate_limit_settings_or_error); + + // Update the config validators. + config_validators_ = std::move(custom_config_validators); + // Update the watch map's config validators. + for (auto& [type_url, watch_map] : watch_maps_) { + watch_map->setConfigValidators(config_validators_.get()); + } + + // Start the susbcriptions over the grpc_stream. + grpc_stream_->establishNewStream(); + + return absl::OkStatus(); +} + template void GrpcMuxImpl::sendGrpcMessage(RQ& msg_proto, S& sub_state) { if (sub_state.dynamicContextChanged() || !anyRequestSentYetInCurrentStream() || diff --git a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h index 37f0c31f1729..c45c33d38a32 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h @@ -105,6 +105,13 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, genericHandleResponse(message->type_url(), *message, control_plane_stats); } + absl::Status + updateMuxSource(Grpc::RawAsyncClientPtr primary_async_client, + Grpc::RawAsyncClientPtr failover_async_client, + CustomConfigValidatorsPtr custom_config_validators, Stats::Scope& scope, + BackOffStrategyPtr backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source) override; + EdsResourcesCacheOptRef edsResourcesCache() override { return makeOptRefFromPtr(eds_resources_cache_.get()); } @@ -165,12 +172,16 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, } const LocalInfo::LocalInfo& localInfo() const { return local_info_; } + virtual std::string methodName() const PURE; + private: // Helper function to create the grpc_stream_ object. // TODO(adisuissa): this should be removed when envoy.restart_features.xds_failover_support // is deprecated. - std::unique_ptr> - createGrpcStreamObject(GrpcMuxContext& grpc_mux_context); + std::unique_ptr> createGrpcStreamObject( + Grpc::RawAsyncClientPtr async_client, Grpc::RawAsyncClientPtr failover_async_client, + const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, + BackOffStrategyPtr backoff_strategy, const RateLimitSettings& rate_limit_settings); // Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check // whether we *want* to send a (Delta)DiscoveryRequest). @@ -187,6 +198,7 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, // Invoked when dynamic context parameters change for a resource type. void onDynamicContextUpdate(absl::string_view resource_type_url); + Event::Dispatcher& dispatcher_; // Multiplexes the stream to the primary and failover sources. // TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated, // convert from unique_ptr to GrpcMuxFailover directly. @@ -248,6 +260,11 @@ class GrpcMuxDelta : public GrpcMuxImpl& for_update) override; + +private: + std::string methodName() const override { + return "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"; + } }; class GrpcMuxSotw : public GrpcMuxImpl&) override { ENVOY_BUG(false, "unexpected request for on demand update"); } + +private: + std::string methodName() const override { + return "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"; + } }; class NullGrpcMuxImpl : public GrpcMux { @@ -277,6 +299,12 @@ class NullGrpcMuxImpl : public GrpcMux { SubscriptionCallbacks&, OpaqueResourceDecoderSharedPtr, const SubscriptionOptions&) override; + absl::Status updateMuxSource(Grpc::RawAsyncClientPtr, Grpc::RawAsyncClientPtr, + CustomConfigValidatorsPtr, Stats::Scope&, BackOffStrategyPtr, + const envoy::config::core::v3::ApiConfigSource&) override { + return absl::UnimplementedError(""); + } + void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set&) override { ENVOY_BUG(false, "unexpected request for on demand update"); } diff --git a/test/extensions/config_subscription/grpc/BUILD b/test/extensions/config_subscription/grpc/BUILD index b3f06ff1c743..52984870eebe 100644 --- a/test/extensions/config_subscription/grpc/BUILD +++ b/test/extensions/config_subscription/grpc/BUILD @@ -31,6 +31,7 @@ envoy_cc_test( "//test/test_common:logging_lib", "//test/test_common:resources_lib", "//test/test_common:simulated_time_system_lib", + "//test/test_common:status_utility_lib", "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", @@ -62,6 +63,7 @@ envoy_cc_test( "//test/test_common:logging_lib", "//test/test_common:resources_lib", "//test/test_common:simulated_time_system_lib", + "//test/test_common:status_utility_lib", "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", @@ -174,6 +176,7 @@ envoy_cc_test( "//test/test_common:logging_lib", "//test/test_common:resources_lib", "//test/test_common:simulated_time_system_lib", + "//test/test_common:status_utility_lib", "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", "@envoy_api//envoy/api/v2:pkg_cc_proto", diff --git a/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc b/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc index 8a990f134fcb..2b611e1c7b51 100644 --- a/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc +++ b/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc @@ -15,7 +15,6 @@ using testing::Return; namespace Envoy { namespace Config { -namespace { // Validates that if no failover is set, then all actions are essentially a pass // through. @@ -237,6 +236,13 @@ class GrpcMuxFailoverTest : public testing::Test { failover_callbacks_->onDiscoveryResponse(std::move(response), cp_stats); } + void invokeCloseStream() { + // A wrapper that invokes closeStream(). It is needed because closeStream() + // is a private method, and while this class is a friend for GrpcMuxFailover, + // the tests cannot invoke the method directly. + grpc_mux_failover_->closeStream(); + } + // Override a timer to emulate its expiration without waiting for it to expire. NiceMock dispatcher_; Event::MockTimer* timer_; @@ -626,26 +632,38 @@ TEST_F(GrpcMuxFailoverTest, OnWriteableConnectedToPrimaryInvoked) { // Validates that when connected to primary, a subsequent call to establishNewStream // will not try to recreate the stream. TEST_F(GrpcMuxFailoverTest, NoRecreateStreamWhenConnectedToPrimary) { - // Validate connected to primary. - { - connectToPrimary(); - EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); - EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); - grpc_mux_failover_->establishNewStream(); - } + connectToPrimary(); + EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); + EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); + grpc_mux_failover_->establishNewStream(); } // Validates that when connected to failover, a subsequent call to establishNewStream // will not try to recreate the stream. TEST_F(GrpcMuxFailoverTest, NoRecreateStreamWhenConnectedToFailover) { - // Validate connected to failover. - { - connectToFailover(); - EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); - EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); - grpc_mux_failover_->establishNewStream(); - } + connectToFailover(); + EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); + EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); + grpc_mux_failover_->establishNewStream(); +} + +// Validates that closing the stream when connected to primary closes the +// primary stream. +TEST_F(GrpcMuxFailoverTest, CloseStreamWhenConnectedToPrimary) { + connectToPrimary(); + EXPECT_CALL(primary_stream_, closeStream()); + EXPECT_CALL(failover_stream_, closeStream()).Times(0); + invokeCloseStream(); } -} // namespace + +// Validates that closing the stream when connected to failover closes the +// failover stream. +TEST_F(GrpcMuxFailoverTest, CloseStreamWhenConnectedToFailover) { + connectToFailover(); + EXPECT_CALL(primary_stream_, closeStream()).Times(0); + EXPECT_CALL(failover_stream_, closeStream()); + invokeCloseStream(); +} + } // namespace Config } // namespace Envoy diff --git a/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc index df541cd6bf98..e7140a1743be 100644 --- a/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc @@ -27,6 +27,7 @@ #include "test/test_common/logging.h" #include "test/test_common/resources.h" #include "test/test_common/simulated_time_system.h" +#include "test/test_common/status_utility.h" #include "test/test_common/test_runtime.h" #include "test/test_common/test_time.h" #include "test/test_common/utility.h" @@ -95,7 +96,8 @@ class GrpcMuxImplTestBase : public testing::TestWithParam { const std::vector& resource_names, const std::string& version, bool first = false, const std::string& nonce = "", const Protobuf::int32 error_code = Grpc::Status::WellKnownGrpcStatus::Ok, - const std::string& error_message = "") { + const std::string& error_message = "", + Grpc::MockAsyncStream* async_stream = nullptr) { envoy::service::discovery::v3::DiscoveryRequest expected_request; if (first) { expected_request.mutable_node()->CopyFrom(local_info_.node()); @@ -113,7 +115,8 @@ class GrpcMuxImplTestBase : public testing::TestWithParam { error_detail->set_code(error_code); error_detail->set_message(error_message); } - EXPECT_CALL(async_stream_, sendMessageRaw_(Grpc::ProtoBufferEq(expected_request), false)); + EXPECT_CALL(async_stream ? *async_stream : async_stream_, + sendMessageRaw_(Grpc::ProtoBufferEq(expected_request), false)); } TestScopedRuntime scoped_runtime_; @@ -122,6 +125,9 @@ class GrpcMuxImplTestBase : public testing::TestWithParam { NiceMock local_info_; Grpc::MockAsyncClient* async_client_; Grpc::MockAsyncStream async_stream_; + // Used for tests invoking updateMuxSource(). + Grpc::MockAsyncClient* replaced_async_client_; + Grpc::MockAsyncStream replaced_async_stream_; CustomConfigValidatorsPtr config_validators_; GrpcMuxImplPtr grpc_mux_; NiceMock callbacks_; @@ -1346,6 +1352,131 @@ TEST_P(GrpcMuxImplTest, RemoveCachedResourceOnLastSubscription) { EXPECT_CALL(*eds_resources_cache_, removeResource("x")); } +// Updating the mux object while being connected sends the correct requests. +TEST_P(GrpcMuxImplTest, MuxDynamicReplacementWhenConnected) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + auto foo_sub = grpc_mux_->addWatch("type_url_foo", {"x", "y"}, callbacks_, resource_decoder_, {}); + auto bar_sub = grpc_mux_->addWatch("type_url_bar", {}, callbacks_, resource_decoder_, {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("type_url_foo", {"x", "y"}, "", true); + expectSendMessage("type_url_bar", {}, ""); + grpc_mux_->start(); + EXPECT_EQ(1, control_plane_connected_state_.value()); + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource empty_ads_config; + // Expect a disconnect from the original async_client and stream. + EXPECT_CALL(async_stream_, resetStream()); + // Expect establishing connection to the new client and stream. + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)) + .WillOnce(Return(&replaced_async_stream_)); + // Expect the initial messages to be sent to the new stream. + expectSendMessage("type_url_foo", {"x", "y"}, "", true, "", Grpc::Status::WellKnownGrpcStatus::Ok, + "", &replaced_async_stream_); + expectSendMessage("type_url_bar", {}, "", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); + EXPECT_OK(grpc_mux_->updateMuxSource( + /*primary_async_client=*/std::unique_ptr(replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/nullptr, + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + empty_ads_config)); + // Ending test, removing subscriptions for type_url_foo. + expectSendMessage("type_url_foo", {}, "", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); +} + +// Updating the mux object after receiving a response, sends the correct requests. +TEST_P(GrpcMuxImplTest, MuxDynamicReplacementFetchingResources) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + auto foo_sub = grpc_mux_->addWatch(type_url, {"x", "y"}, callbacks_, resource_decoder, {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x", "y"}, "", true); + grpc_mux_->start(); + + // Send back a response for one of the resources. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([&load_assignment](const std::vector& resources, + const std::string&) { + EXPECT_EQ(1, resources.size()); + const auto& expected_assignment = + dynamic_cast( + resources[0].get().resource()); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment)); + return absl::OkStatus(); + })); + expectSendMessage(type_url, {"x", "y"}, "1"); + grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)); + } + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource empty_ads_config; + // Expect a disconnect from the original async_client and stream. + EXPECT_CALL(async_stream_, resetStream()); + // Expect establishing connection to the new client and stream. + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)) + .WillOnce(Return(&replaced_async_stream_)); + // Expect the initial message to be sent to the new stream. + expectSendMessage(type_url, {"x", "y"}, "1", true, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); + EXPECT_OK(grpc_mux_->updateMuxSource( + /*primary_async_client=*/std::unique_ptr(replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/std::make_unique>(), + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + empty_ads_config)); + + // Send a response to resource "y" on the replaced mux. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("2"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("y"); + response->add_resources()->PackFrom(load_assignment); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "2")) + .WillOnce(Invoke([&load_assignment](const std::vector& resources, + const std::string&) { + EXPECT_EQ(1, resources.size()); + const auto& expected_assignment = + dynamic_cast( + resources[0].get().resource()); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment)); + return absl::OkStatus(); + })); + expectSendMessage(type_url, {"x", "y"}, "2", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, + "", &replaced_async_stream_); + grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)); + } + + // Ending test, removing subscriptions for the subscription. + expectSendMessage(type_url, {}, "2", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); +} + /** * Tests the NullGrpcMuxImpl object to increase code-coverage. */ @@ -1399,6 +1530,14 @@ TEST_F(NullGrpcMuxImplTest, OnDiscoveryResponseImplemented) { EXPECT_NO_THROW(null_mux_.onDiscoveryResponse(std::move(response), cp_stats)); } +TEST_F(NullGrpcMuxImplTest, UpdateMuxSourceError) { + Stats::TestUtil::TestStore stats; + const envoy::config::core::v3::ApiConfigSource empty_config; + const absl::Status status = null_mux_.updateMuxSource(nullptr, nullptr, nullptr, + *stats.rootScope(), nullptr, empty_config); + EXPECT_EQ(status.code(), absl::StatusCode::kUnimplemented); +} + TEST(GrpcMuxFactoryTest, InvalidRateLimit) { auto* factory = Config::Utility::getFactoryByName("envoy.config_mux.grpc_mux_factory"); diff --git a/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc index 5c27473e03f0..3fe713348ed1 100644 --- a/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc @@ -28,6 +28,7 @@ #include "test/test_common/logging.h" #include "test/test_common/resources.h" #include "test/test_common/simulated_time_system.h" +#include "test/test_common/status_utility.h" #include "test/test_common/test_runtime.h" #include "test/test_common/test_time.h" #include "test/test_common/utility.h" @@ -101,7 +102,8 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParam& initial_resource_versions = {}) { + const std::map& initial_resource_versions = {}, + Grpc::MockAsyncStream* async_stream = nullptr) { API_NO_BOOST(envoy::service::discovery::v3::DeltaDiscoveryRequest) expected_request; expected_request.mutable_node()->CopyFrom(local_info_.node()); for (const auto& resource : resource_names_subscribe) { @@ -120,7 +122,8 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParamset_code(error_code); error_detail->set_message(error_message); } - EXPECT_CALL(async_stream_, sendMessageRaw_(Grpc::ProtoBufferEq(expected_request), false)); + EXPECT_CALL(async_stream ? *async_stream : async_stream_, + sendMessageRaw_(Grpc::ProtoBufferEq(expected_request), false)); } void remoteClose() { @@ -176,6 +179,9 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParam random_; Grpc::MockAsyncClient* async_client_; NiceMock async_stream_; + // Used for tests invoking updateMuxSource(). + Grpc::MockAsyncClient* replaced_async_client_; + Grpc::MockAsyncStream replaced_async_stream_; CustomConfigValidatorsPtr config_validators_; NiceMock local_info_; std::unique_ptr grpc_mux_; @@ -795,6 +801,135 @@ TEST_P(NewGrpcMuxImplTest, AddRemoveSubscriptions) { } } +// Updating the mux object while being connected sends the correct requests. +TEST_P(NewGrpcMuxImplTest, MuxDynamicReplacementWhenConnected) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + auto foo_sub = grpc_mux_->addWatch("type_url_foo", {"x", "y"}, callbacks_, resource_decoder_, {}); + auto bar_sub = grpc_mux_->addWatch("type_url_bar", {}, callbacks_, resource_decoder_, {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("type_url_foo", {"x", "y"}, {}); + expectSendMessage("type_url_bar", {}, {}); + grpc_mux_->start(); + EXPECT_EQ(1, control_plane_connected_state_.value()); + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource empty_ads_config; + // Expect a disconnect from the original async_client and stream. + EXPECT_CALL(async_stream_, resetStream()); + // Expect establishing connection to the new client and stream. + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)) + .WillOnce(Return(&replaced_async_stream_)); + // Expect the initial messages to be sent to the new stream. + expectSendMessage("type_url_foo", {"x", "y"}, {}, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + {}, &replaced_async_stream_); + expectSendMessage("type_url_bar", {}, {}, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", {}, + &replaced_async_stream_); + EXPECT_OK(grpc_mux_->updateMuxSource( + /*primary_async_client=*/std::unique_ptr(replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/nullptr, + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + empty_ads_config)); + // Ending test, removing subscriptions for type_url_foo. + expectSendMessage("type_url_foo", {}, {"x", "y"}, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + {}, &replaced_async_stream_); +} + +// Updating the mux object after receiving a response, sends the correct requests. +TEST_P(NewGrpcMuxImplTest, MuxDynamicReplacementFetchingResources) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + auto foo_sub = grpc_mux_->addWatch(type_url, {"x", "y"}, callbacks_, resource_decoder_, {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x", "y"}, {}); + grpc_mux_->start(); + + // Send back a response for one of the resources. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_system_version_info("1"); + response->set_nonce("n1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + auto* resource = response->add_resources(); + resource->set_name("x"); + resource->mutable_resource()->PackFrom(load_assignment); + resource->set_version("x1"); + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "1")) + .WillOnce(Invoke([&load_assignment](const std::vector& added_resources, + const Protobuf::RepeatedPtrField&, + const std::string&) { + EXPECT_EQ(1, added_resources.size()); + EXPECT_TRUE( + TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); + return absl::OkStatus(); + })); + expectSendMessage(type_url, {}, {}, "n1"); + onDiscoveryResponse(std::move(response)); + } + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource empty_ads_config; + // Expect a disconnect from the original async_client and stream. + EXPECT_CALL(async_stream_, resetStream()); + // Expect establishing connection to the new client and stream. + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)) + .WillOnce(Return(&replaced_async_stream_)); + // Expect the initial message to be sent to the new stream. + // It will include "x" in its initial_resource_versions. + expectSendMessage(type_url, {"x", "y"}, {}, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + {{"x", "x1"}}, &replaced_async_stream_); + EXPECT_OK(grpc_mux_->updateMuxSource( + /*primary_async_client=*/std::unique_ptr(replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/std::make_unique>(), + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + empty_ads_config)); + + // Send a response to resource "y" on the replaced mux. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_system_version_info("2"); + response->set_nonce("n2"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("y"); + auto* resource = response->add_resources(); + resource->set_name("y"); + resource->mutable_resource()->PackFrom(load_assignment); + resource->set_version("y1"); + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "2")) + .WillOnce(Invoke([&load_assignment](const std::vector& added_resources, + const Protobuf::RepeatedPtrField&, + const std::string&) { + EXPECT_EQ(1, added_resources.size()); + EXPECT_TRUE( + TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); + return absl::OkStatus(); + })); + expectSendMessage(type_url, {}, {}, "n2", Grpc::Status::WellKnownGrpcStatus::Ok, "", {}, + &replaced_async_stream_); + onDiscoveryResponse(std::move(response)); + } + + // Ending test, removing subscriptions for the subscription. + expectSendMessage(type_url, {}, {"x", "y"}, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", {}, + &replaced_async_stream_); +} + TEST(NewGrpcMuxFactoryTest, InvalidRateLimit) { auto* factory = Config::Utility::getFactoryByName( "envoy.config_mux.new_grpc_mux_factory"); diff --git a/test/extensions/config_subscription/grpc/watch_map_test.cc b/test/extensions/config_subscription/grpc/watch_map_test.cc index 4186b3d8b8d7..014a1221b0c4 100644 --- a/test/extensions/config_subscription/grpc/watch_map_test.cc +++ b/test/extensions/config_subscription/grpc/watch_map_test.cc @@ -133,7 +133,7 @@ TEST(WatchMapTest, Basic) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); { @@ -207,7 +207,7 @@ TEST(WatchMapTest, Overlap) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); @@ -276,7 +276,7 @@ TEST(WatchMapTest, CacheResourceAddResource) { NiceMock eds_resources_cache; const std::string eds_type_url = Config::getTypeUrl(); - WatchMap watch_map(false, eds_type_url, config_validators, + WatchMap watch_map(false, eds_type_url, &config_validators, makeOptRef(eds_resources_cache)); // The test uses 2 watchers to ensure that interest is kept regardless of // which watcher was the first to add a watch for the assignment. @@ -357,7 +357,7 @@ TEST(WatchMapTest, CacheResourceAddResource) { // WatchMap defers deletes and doesn't crash. class SameWatchRemoval : public testing::Test { public: - SameWatchRemoval() : watch_map_(false, "ClusterLoadAssignmentType", config_validators, {}) {} + SameWatchRemoval() : watch_map_(false, "ClusterLoadAssignmentType", &config_validators, {}) {} void SetUp() override { envoy::config::endpoint::v3::ClusterLoadAssignment alice; @@ -437,7 +437,7 @@ TEST(WatchMapTest, AddRemoveAdd) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); @@ -494,7 +494,7 @@ TEST(WatchMapTest, UninterestingUpdate) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); watch_map.updateWatchInterest(watch, {"alice"}); @@ -539,7 +539,7 @@ TEST(WatchMapTest, WatchingEverything) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); /*Watch* watch1 = */ watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); // watch1 never specifies any names, and so is treated as interested in everything. @@ -576,7 +576,7 @@ TEST(WatchMapTest, DeltaOnConfigUpdate) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); Watch* watch3 = watch_map.addWatch(callbacks3, resource_decoder); @@ -610,7 +610,7 @@ TEST(WatchMapTest, DeltaOnConfigUpdate) { TEST(WatchMapTest, OnConfigUpdateFailed) { NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); // calling on empty map doesn't break watch_map.onConfigUpdateFailed(ConfigUpdateFailureReason::UpdateRejected, nullptr); @@ -632,7 +632,7 @@ TEST(WatchMapTest, OnConfigUpdateXdsTpGlobCollections) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); watch_map.updateWatchInterest(watch, {"xdstp://foo/bar/baz/*?some=thing&thing=some"}); @@ -677,7 +677,7 @@ TEST(WatchMapTest, OnConfigUpdateXdsTpSingletons) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); watch_map.updateWatchInterest(watch, {"xdstp://foo/bar/baz?some=thing&thing=some"}); @@ -718,7 +718,7 @@ TEST(WatchMapTest, OnConfigUpdateUsingNamespaces) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(true, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(true, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); Watch* watch3 = watch_map.addWatch(callbacks3, resource_decoder); diff --git a/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc index 123ffd52a56d..e4176be7538e 100644 --- a/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc @@ -26,6 +26,7 @@ #include "test/test_common/logging.h" #include "test/test_common/resources.h" #include "test/test_common/simulated_time_system.h" +#include "test/test_common/status_utility.h" #include "test/test_common/test_runtime.h" #include "test/test_common/test_time.h" #include "test/test_common/utility.h" @@ -94,7 +95,8 @@ class GrpcMuxImplTestBase : public testing::TestWithParam { const std::vector& resource_names, const std::string& version, bool first = false, const std::string& nonce = "", const Protobuf::int32 error_code = Grpc::Status::WellKnownGrpcStatus::Ok, - const std::string& error_message = "") { + const std::string& error_message = "", + Grpc::MockAsyncStream* async_stream = nullptr) { envoy::service::discovery::v3::DiscoveryRequest expected_request; if (first) { expected_request.mutable_node()->CopyFrom(local_info_.node()); @@ -113,7 +115,7 @@ class GrpcMuxImplTestBase : public testing::TestWithParam { error_detail->set_message(error_message); } EXPECT_CALL( - async_stream_, + async_stream ? *async_stream : async_stream_, sendMessageRaw_(Grpc::ProtoBufferEqIgnoreRepeatedFieldOrdering(expected_request), false)); } @@ -134,6 +136,9 @@ class GrpcMuxImplTestBase : public testing::TestWithParam { NiceMock random_; Grpc::MockAsyncClient* async_client_; Grpc::MockAsyncStream async_stream_; + // Used for tests invoking updateMuxSource(). + Grpc::MockAsyncClient* replaced_async_client_; + Grpc::MockAsyncStream replaced_async_stream_; NiceMock local_info_; CustomConfigValidatorsPtr config_validators_; std::unique_ptr grpc_mux_; @@ -1279,6 +1284,128 @@ TEST_P(GrpcMuxImplTest, AddRemoveSubscriptions) { } } +// Updating the mux object while being connected sends the correct requests. +TEST_P(GrpcMuxImplTest, MuxDynamicReplacementWhenConnected) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + auto foo_sub = makeWatch("type_url_foo", {"x", "y"}); + auto bar_sub = makeWatch("type_url_bar", {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("type_url_foo", {"x", "y"}, "", true); + expectSendMessage("type_url_bar", {}, ""); + grpc_mux_->start(); + EXPECT_EQ(1, control_plane_connected_state_.value()); + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource empty_ads_config; + // Expect a disconnect from the original async_client and stream. + EXPECT_CALL(async_stream_, resetStream()); + // Expect establishing connection to the new client and stream. + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)) + .WillOnce(Return(&replaced_async_stream_)); + // Expect the initial messages to be sent to the new stream. + expectSendMessage("type_url_foo", {"x", "y"}, "", true, "", Grpc::Status::WellKnownGrpcStatus::Ok, + "", &replaced_async_stream_); + expectSendMessage("type_url_bar", {}, "", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); + EXPECT_OK(grpc_mux_->updateMuxSource( + /*primary_async_client=*/std::unique_ptr(replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/nullptr, + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + empty_ads_config)); + // Ending test, removing subscriptions for type_url_foo. + expectSendMessage("type_url_foo", {}, "", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); +} + +// Updating the mux object after receiving a response, sends the correct requests. +TEST_P(GrpcMuxImplTest, MuxDynamicReplacementFetchingResources) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + auto foo_sub = makeWatch(type_url, {"x", "y"}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x", "y"}, "", true); + grpc_mux_->start(); + + // Send back a response for one of the resources. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([&load_assignment](const std::vector& resources, + const std::string&) { + EXPECT_EQ(1, resources.size()); + const auto& expected_assignment = + dynamic_cast( + resources[0].get().resource()); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment)); + return absl::OkStatus(); + })); + expectSendMessage(type_url, {"x", "y"}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource empty_ads_config; + // Expect a disconnect from the original async_client and stream. + EXPECT_CALL(async_stream_, resetStream()); + // Expect establishing connection to the new client and stream. + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)) + .WillOnce(Return(&replaced_async_stream_)); + // Expect the initial message to be sent to the new stream. + expectSendMessage(type_url, {"x", "y"}, "1", true, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); + EXPECT_OK(grpc_mux_->updateMuxSource( + /*primary_async_client=*/std::unique_ptr(replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/std::make_unique>(), + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + empty_ads_config)); + + // Send a response to resource "y" on the replaced mux. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("2"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("y"); + response->add_resources()->PackFrom(load_assignment); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "2")) + .WillOnce(Invoke([&load_assignment](const std::vector& resources, + const std::string&) { + EXPECT_EQ(1, resources.size()); + const auto& expected_assignment = + dynamic_cast( + resources[0].get().resource()); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment)); + return absl::OkStatus(); + })); + expectSendMessage(type_url, {"x", "y"}, "2", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, + "", &replaced_async_stream_); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Ending test, removing subscriptions for the subscription. + expectSendMessage(type_url, {}, "2", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); +} + class NullGrpcMuxImplTest : public testing::Test { public: NullGrpcMuxImplTest() : null_mux_(std::make_unique()) {}