Skip to content

Commit

Permalink
ads-replacement: adding gRPC-mux support for ADS config replacement
Browse files Browse the repository at this point in the history
Signed-off-by: Adi Suissa-Peleg <adip@google.com>
  • Loading branch information
adisuissa committed Sep 16, 2024
1 parent b10deb6 commit 9bc0b0d
Show file tree
Hide file tree
Showing 17 changed files with 742 additions and 116 deletions.
13 changes: 13 additions & 0 deletions envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

#include <memory>

#include "envoy/common/backoff_strategy.h"
#include "envoy/common/exception.h"
#include "envoy/common/pure.h"
#include "envoy/config/custom_config_validators.h"
#include "envoy/config/eds_resources_cache.h"
#include "envoy/config/subscription.h"
#include "envoy/grpc/async_client.h"
#include "envoy/stats/stats_macros.h"

#include "source/common/common/cleanup.h"
Expand Down Expand Up @@ -112,6 +115,16 @@ class GrpcMux {
* @return EdsResourcesCacheOptRef optional eds resources cache for the gRPC-mux.
*/
virtual EdsResourcesCacheOptRef edsResourcesCache() PURE;

/**
* Updates the current gRPC-Mux object to use a new gRPC client, and config.
*/
virtual absl::Status
updateMuxSource(Grpc::RawAsyncClientPtr primary_async_client,
Grpc::RawAsyncClientPtr failover_async_client,
CustomConfigValidatorsPtr custom_config_validators, Stats::Scope& scope,
BackOffStrategyPtr backoff_strategy,
const envoy::config::core::v3::ApiConfigSource& ads_config_source) PURE;
};

using GrpcMuxPtr = std::unique_ptr<GrpcMux>;
Expand Down
6 changes: 6 additions & 0 deletions source/common/config/null_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ class NullGrpcMuxImpl : public GrpcMux,
ENVOY_BUG(false, "unexpected request for on demand update");
}

absl::Status updateMuxSource(Grpc::RawAsyncClientPtr, Grpc::RawAsyncClientPtr,
CustomConfigValidatorsPtr, Stats::Scope&, BackOffStrategyPtr,
const envoy::config::core::v3::ApiConfigSource&) override {
return absl::UnimplementedError("");
}

EdsResourcesCacheOptRef edsResourcesCache() override { return absl::nullopt; }

void onWriteable() override {}
Expand Down
14 changes: 13 additions & 1 deletion source/extensions/config_subscription/grpc/grpc_mux_failover.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
namespace Envoy {
namespace Config {

class GrpcMuxFailoverTest;

/**
* This class arbitrates between two config providers of the same GrpcMux -
* the primary and the failover. Envoy always prefers fetching config from the
Expand Down Expand Up @@ -188,6 +190,8 @@ class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
}

private:
friend class GrpcMuxFailoverTest;

// A helper class that proxies the callbacks of GrpcStreamCallbacks for the primary service.
class PrimaryGrpcStreamCallbacks : public GrpcStreamCallbacks<ResponseType> {
public:
Expand Down Expand Up @@ -356,7 +360,15 @@ class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override {
PANIC("not implemented");
}
void closeStream() override { PANIC("not implemented"); }
void closeStream() override {
if (connectingToOrConnectedToPrimary()) {
ENVOY_LOG_MISC(debug, "Intentionally closing the primary gRPC stream");
primary_grpc_stream_->closeStream();
} else if (connectingToOrConnectedToFailover()) {
ENVOY_LOG_MISC(debug, "Intentionally closing the failover gRPC stream");
failover_grpc_stream_->closeStream();
}
}

// The stream callbacks that will be invoked on the GrpcMux object, to notify
// about the state of the underlying primary/failover stream.
Expand Down
77 changes: 59 additions & 18 deletions source/extensions/config_subscription/grpc/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,18 @@ std::string convertToWildcard(const std::string& resource_name) {
} // namespace

GrpcMuxImpl::GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node)
: grpc_stream_(createGrpcStreamObject(grpc_mux_context)),
: dispatcher_(grpc_mux_context.dispatcher_),
grpc_stream_(createGrpcStreamObject(std::move(grpc_mux_context.async_client_),
std::move(grpc_mux_context.failover_async_client_),
grpc_mux_context.service_method_, grpc_mux_context.scope_,
std::move(grpc_mux_context.backoff_strategy_),
grpc_mux_context.rate_limit_settings_)),
local_info_(grpc_mux_context.local_info_), skip_subsequent_node_(skip_subsequent_node),
config_validators_(std::move(grpc_mux_context.config_validators_)),
xds_config_tracker_(grpc_mux_context.xds_config_tracker_),
xds_resources_delegate_(grpc_mux_context.xds_resources_delegate_),
eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)),
target_xds_authority_(grpc_mux_context.target_xds_authority_),
dispatcher_(grpc_mux_context.dispatcher_),
dynamic_update_callback_handle_(
grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback(
[this](absl::string_view resource_type_url) {
Expand All @@ -80,59 +84,61 @@ GrpcMuxImpl::GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_

std::unique_ptr<GrpcStreamInterface<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>>
GrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) {
GrpcMuxImpl::createGrpcStreamObject(Grpc::RawAsyncClientPtr async_client,
Grpc::RawAsyncClientPtr failover_async_client,
const Protobuf::MethodDescriptor& service_method,
Stats::Scope& scope, BackOffStrategyPtr backoff_strategy,
const RateLimitSettings& rate_limit_settings) {
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
return std::make_unique<GrpcMuxFailover<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>>(
/*primary_stream_creator=*/
[&grpc_mux_context](
[&async_client, &service_method, &dispatcher = dispatcher_, &scope, &backoff_strategy,
&rate_limit_settings](
GrpcStreamCallbacks<envoy::service::discovery::v3::DiscoveryResponse>* callbacks)
-> GrpcStreamInterfacePtr<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse> {
return std::make_unique<GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>>(
callbacks, std::move(grpc_mux_context.async_client_),
grpc_mux_context.service_method_, grpc_mux_context.dispatcher_,
grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_),
grpc_mux_context.rate_limit_settings_,
callbacks, std::move(async_client), service_method, dispatcher, scope,
std::move(backoff_strategy), rate_limit_settings,
GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>::ConnectedStateValue::
FIRST_ENTRY);
},
/*failover_stream_creator=*/
grpc_mux_context.failover_async_client_
failover_async_client
? absl::make_optional(
[&grpc_mux_context](
[&failover_async_client, &service_method, &dispatcher = dispatcher_, &scope,
&rate_limit_settings](
GrpcStreamCallbacks<envoy::service::discovery::v3::DiscoveryResponse>*
callbacks)
-> GrpcStreamInterfacePtr<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse> {
return std::make_unique<
GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>>(
callbacks, std::move(grpc_mux_context.failover_async_client_),
grpc_mux_context.service_method_, grpc_mux_context.dispatcher_,
grpc_mux_context.scope_,
callbacks, std::move(failover_async_client), service_method, dispatcher,
scope,
// TODO(adisuissa): the backoff strategy for the failover should
// be the same as the primary source.
std::make_unique<FixedBackOffStrategy>(
GrpcMuxFailover<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>::
DefaultFailoverBackoffMilliseconds),
grpc_mux_context.rate_limit_settings_,
rate_limit_settings,
GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>::
ConnectedStateValue::SECOND_ENTRY);
})
: absl::nullopt,
/*grpc_mux_callbacks=*/*this,
/*dispatch=*/grpc_mux_context.dispatcher_);
/*dispatch=*/dispatcher_);
}
return std::make_unique<GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>>(
this, std::move(grpc_mux_context.async_client_), grpc_mux_context.service_method_,
grpc_mux_context.dispatcher_, grpc_mux_context.scope_,
std::move(grpc_mux_context.backoff_strategy_), grpc_mux_context.rate_limit_settings_,
this, std::move(async_client), service_method, dispatcher_, scope,
std::move(backoff_strategy), rate_limit_settings,
GrpcStream<
envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>::ConnectedStateValue::FIRST_ENTRY);
Expand Down Expand Up @@ -292,6 +298,41 @@ GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
return watch;
}

// void GrpcMuxImpl::updateMuxSource(Grpc::RawAsyncClientPtr primary_async_client,
// Grpc::RawAsyncClientPtr failover_async_client, CustomConfigValidatorsPtr
// custom_config_validators, BackOffStrategyPtr backoff_strategy, const
// envoy::config::core::v3::ApiConfigSource& ads_config_source) {
absl::Status
GrpcMuxImpl::updateMuxSource(Grpc::RawAsyncClientPtr primary_async_client,
Grpc::RawAsyncClientPtr failover_async_client,
CustomConfigValidatorsPtr custom_config_validators,
Stats::Scope& scope, BackOffStrategyPtr backoff_strategy,
const envoy::config::core::v3::ApiConfigSource& ads_config_source) {
// Process the rate limit settings.
absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
Utility::parseRateLimitSettings(ads_config_source);
RETURN_IF_NOT_OK_REF(rate_limit_settings_or_error.status());

const Protobuf::MethodDescriptor& service_method =
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources");

// Disconnect from current xDS servers.
ENVOY_LOG_MISC(info, "Replacing xDS gRPC mux source");
grpc_stream_->closeStream();
grpc_stream_ = createGrpcStreamObject(std::move(primary_async_client),
std::move(failover_async_client), service_method, scope,
std::move(backoff_strategy), *rate_limit_settings_or_error);

// Update the config validators.
config_validators_ = std::move(custom_config_validators);

// Start the susbcriptions over the grpc_stream.
grpc_stream_->establishNewStream();

return absl::OkStatus();
}

ScopedResume GrpcMuxImpl::pause(const std::string& type_url) {
return pause(std::vector<std::string>{type_url});
}
Expand Down
17 changes: 13 additions & 4 deletions source/extensions/config_subscription/grpc/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ class GrpcMuxImpl : public GrpcMux,
return makeOptRefFromPtr(eds_resources_cache_.get());
}

absl::Status
updateMuxSource(Grpc::RawAsyncClientPtr primary_async_client,
Grpc::RawAsyncClientPtr failover_async_client,
CustomConfigValidatorsPtr custom_config_validators, Stats::Scope& scope,
BackOffStrategyPtr backoff_strategy,
const envoy::config::core::v3::ApiConfigSource& ads_config_source) override;

void handleDiscoveryResponse(
std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message);

Expand Down Expand Up @@ -100,11 +107,13 @@ class GrpcMuxImpl : public GrpcMux,

private:
// Helper function to create the grpc_stream_ object.
// TODO(adisuissa): this should be removed when envoy.restart_features.xds_failover_support
// is deprecated.
std::unique_ptr<GrpcStreamInterface<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>>
createGrpcStreamObject(GrpcMuxContext& grpc_mux_context);
createGrpcStreamObject(Grpc::RawAsyncClientPtr async_client,
Grpc::RawAsyncClientPtr failover_async_client,
const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope,
BackOffStrategyPtr backoff_strategy,
const RateLimitSettings& rate_limit_settings);

void drainRequests();
void setRetryTimer();
Expand Down Expand Up @@ -272,6 +281,7 @@ class GrpcMuxImpl : public GrpcMux,
ApiState& api_state, const std::string& type_url,
const std::string& version_info, bool call_delegate);

Event::Dispatcher& dispatcher_;
// Multiplexes the stream to the primary and failover sources.
// TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated,
// convert from unique_ptr<GrpcStreamInterface> to GrpcMuxFailover directly.
Expand Down Expand Up @@ -301,7 +311,6 @@ class GrpcMuxImpl : public GrpcMux,
// URL.
std::unique_ptr<std::queue<std::string>> request_queue_;

Event::Dispatcher& dispatcher_;
Common::CallbackHandlePtr dynamic_update_callback_handle_;

bool started_{false};
Expand Down
Loading

0 comments on commit 9bc0b0d

Please sign in to comment.