Skip to content

Commit

Permalink
ads-replacement: adding hook and cluster-manager support
Browse files Browse the repository at this point in the history
Signed-off-by: Adi Suissa-Peleg <adip@google.com>
  • Loading branch information
adisuissa committed Oct 22, 2024
1 parent 66febdf commit 4ccd619
Show file tree
Hide file tree
Showing 40 changed files with 1,289 additions and 47 deletions.
8 changes: 8 additions & 0 deletions envoy/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,11 @@ envoy_cc_library(
"@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "xds_manager_interface",
hdrs = ["xds_manager.h"],
deps = [
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)
3 changes: 1 addition & 2 deletions envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ class GrpcMux {
*/
virtual absl::Status
updateMuxSource(Grpc::RawAsyncClientPtr&& primary_async_client,
Grpc::RawAsyncClientPtr&& failover_async_client,
CustomConfigValidatorsPtr&& custom_config_validators, Stats::Scope& scope,
Grpc::RawAsyncClientPtr&& failover_async_client, Stats::Scope& scope,
BackOffStrategyPtr&& backoff_strategy,
const envoy::config::core::v3::ApiConfigSource& ads_config_source) PURE;
};
Expand Down
38 changes: 38 additions & 0 deletions envoy/config/xds_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#pragma once

#include "envoy/common/pure.h"
#include "envoy/config/core/v3/config_source.pb.h"

#include "absl/status/status.h"

namespace Envoy {
namespace Config {

/**
* An xDS-Manager interface for all operations related to xDS connections and
* resources in Envoy.
*
* This class is WIP. Currently supported functionality:
* - Dynamically set the ADS configuration to be used.
*
* In general, this is intended to be used only on the main thread, as part of the Server instance
* interface and config subsystem.
*/
class XdsManager {
public:
virtual ~XdsManager() = default;

/**
* Set the ADS ConfigSource Envoy should use that will replace the current ADS
* server.
* @param ads_config the ADS config of the new server.
* @return true if the ADS config is valid (points to a valid static server),
* or false otherwise.
*/
virtual absl::Status
setAdsConfigSource(const envoy::config::core::v3::ApiConfigSource& config_source) PURE;
};

using XdsManagerPtr = std::unique_ptr<XdsManager>;
} // namespace Config
} // namespace Envoy
1 change: 1 addition & 0 deletions envoy/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ envoy_cc_library(
"//envoy/access_log:access_log_interface",
"//envoy/api:api_interface",
"//envoy/common:mutex_tracer",
"//envoy/config:xds_manager_interface",
"//envoy/event:timer_interface",
"//envoy/http:context_interface",
"//envoy/http:query_params_interface",
Expand Down
6 changes: 6 additions & 0 deletions envoy/server/instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "envoy/common/mutex_tracer.h"
#include "envoy/common/random_generator.h"
#include "envoy/config/trace/v3/http_tracer.pb.h"
#include "envoy/config/xds_manager.h"
#include "envoy/event/timer.h"
#include "envoy/grpc/context.h"
#include "envoy/http/context.h"
Expand Down Expand Up @@ -309,6 +310,11 @@ class Instance {
*/
virtual void
setSinkPredicates(std::unique_ptr<Envoy::Stats::SinkPredicates>&& sink_predicates) PURE;

/**
* @return Envoy's xDS manager.
*/
virtual Config::XdsManager& xdsManager() PURE;
};

} // namespace Server
Expand Down
8 changes: 8 additions & 0 deletions envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,14 @@ class ClusterManager {
*/
virtual Config::GrpcMuxSharedPtr adsMux() PURE;

/**
* Replaces the current ADS mux with a new one based on the given config.
* Assumes that the given ads_config is yntactically valid (according to the PGV constraints).
* @param ads_config an ADS config source to use.
* @return the status of the operation.
*/
virtual absl::Status replaceAds(const envoy::config::core::v3::ApiConfigSource& ads_config) PURE;

/**
* @return Grpc::AsyncClientManager& the cluster manager's gRPC client manager.
*/
Expand Down
11 changes: 11 additions & 0 deletions source/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -615,3 +615,14 @@ envoy_cc_library(
"@com_google_absl//absl/container:node_hash_set",
],
)

envoy_cc_library(
name = "xds_manager_lib",
srcs = ["xds_manager_impl.cc"],
hdrs = ["xds_manager_impl.h"],
deps = [
"//envoy/config:xds_manager_interface",
"//envoy/upstream:cluster_manager_interface",
"//source/common/common:thread_lib",
],
)
12 changes: 12 additions & 0 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,15 @@ envoy_cc_library(
"//source/common/singleton:const_singleton",
],
)

envoy_cc_library(
name = "xds_manager_lib",
srcs = ["xds_manager_impl.cc"],
hdrs = ["xds_manager_impl.h"],
deps = [
"//envoy/config:xds_manager_interface",
"//envoy/upstream:cluster_manager_interface",
"//source/common/common:thread_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)
4 changes: 2 additions & 2 deletions source/common/config/null_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class NullGrpcMuxImpl : public GrpcMux,
ENVOY_BUG(false, "unexpected request for on demand update");
}

absl::Status updateMuxSource(Grpc::RawAsyncClientPtr&&, Grpc::RawAsyncClientPtr&&,
CustomConfigValidatorsPtr&&, Stats::Scope&, BackOffStrategyPtr&&,
absl::Status updateMuxSource(Grpc::RawAsyncClientPtr&&, Grpc::RawAsyncClientPtr&&, Stats::Scope&,
BackOffStrategyPtr&&,
const envoy::config::core::v3::ApiConfigSource&) override {
return absl::UnimplementedError("");
}
Expand Down
30 changes: 30 additions & 0 deletions source/common/config/xds_manager_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#include "source/common/config/xds_manager_impl.h"

#include "envoy/config/core/v3/config_source.pb.validate.h"

#include "source/common/common/thread.h"

namespace Envoy {
namespace Config {

absl::Status
XdsManagerImpl::setAdsConfigSource(const envoy::config::core::v3::ApiConfigSource& config_source) {
ASSERT_IS_MAIN_OR_TEST_THREAD();
RETURN_IF_NOT_OK(validateAdsConfig(config_source));

return cm_.replaceAds(config_source);
}

absl::Status
XdsManagerImpl::validateAdsConfig(const envoy::config::core::v3::ApiConfigSource& config_source) {
auto& validation_visitor = validation_context_.staticValidationVisitor();
TRY_ASSERT_MAIN_THREAD { MessageUtil::validate(config_source, validation_visitor); }
END_TRY
catch (const EnvoyException& e) {
return absl::InternalError(e.what());
}
return absl::OkStatus();
}

} // namespace Config
} // namespace Envoy
30 changes: 30 additions & 0 deletions source/common/config/xds_manager_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include "envoy/config/xds_manager.h"
#include "envoy/upstream/cluster_manager.h"

#include "source/common/common/thread.h"

namespace Envoy {
namespace Config {

class XdsManagerImpl : public XdsManager {
public:
XdsManagerImpl(Upstream::ClusterManager& cm,
ProtobufMessage::ValidationContext& validation_context)
: cm_(cm), validation_context_(validation_context) {}

// Config::ConfigSourceProvider
absl::Status
setAdsConfigSource(const envoy::config::core::v3::ApiConfigSource& config_source) override;

private:
// Validates (syntactically) the config_source by doing the PGV validation.
absl::Status validateAdsConfig(const envoy::config::core::v3::ApiConfigSource& config_source);

Upstream::ClusterManager& cm_;
ProtobufMessage::ValidationContext& validation_context_;
};

} // namespace Config
} // namespace Envoy
81 changes: 81 additions & 0 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bo

const bool use_eds_cache =
Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads");

if (dyn_resources.ads_config().api_type() ==
envoy::config::core::v3::ApiConfigSource::DELTA_GRPC) {
absl::Status status = Config::Utility::checkTransportVersion(dyn_resources.ads_config());
Expand Down Expand Up @@ -582,6 +583,86 @@ ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bo
return absl::OkStatus();
}

absl::Status
ClusterManagerImpl::replaceAds(const envoy::config::core::v3::ApiConfigSource& ads_config) {
// If there was no ADS defined, reject replacement.
const auto& bootstrap = server_.bootstrap();
if (!bootstrap.has_dynamic_resources() || !bootstrap.dynamic_resources().has_ads_config()) {
return absl::InternalError(
"Cannot replace an ADS config when one wasn't previously configured in the bootstrap");
}
const auto& bootstrap_ads_config = server_.bootstrap().dynamic_resources().ads_config();

// There is no support for switching between different ADS types.
if (ads_config.api_type() != bootstrap_ads_config.api_type()) {
return absl::InternalError(fmt::format(
"Cannot replace an ADS config with a different api_type (expected: {})",
envoy::config::core::v3::ApiConfigSource::ApiType_Name(bootstrap_ads_config.api_type())));
}

// There is no support for using a different config validator. Note that if
// this is mainly because the validator could be stateful and if the delta-xDS
// protocol is used, then the new validator will not have the context of the
// previous one.
if (bootstrap_ads_config.config_validators_size() != ads_config.config_validators_size()) {
return absl::InternalError(fmt::format(
"Cannot replace config_validators in ADS config (different size) - Previous: {}, New: {}",
bootstrap_ads_config.config_validators_size(), ads_config.config_validators_size()));
} else if (bootstrap_ads_config.config_validators_size() > 0) {
const bool equal_config_validators = std::equal(
bootstrap_ads_config.config_validators().begin(),
bootstrap_ads_config.config_validators().end(), ads_config.config_validators().begin(),
[](const envoy::config::core::v3::TypedExtensionConfig& a,
const envoy::config::core::v3::TypedExtensionConfig& b) {
return Protobuf::util::MessageDifferencer::Equivalent(a, b);
});
if (!equal_config_validators) {
return absl::InternalError(fmt::format("Cannot replace config_validators in ADS config "
"(different contents)\nPrevious: {}\nNew: {}",
bootstrap_ads_config.DebugString(),
ads_config.DebugString()));
}
}

ENVOY_LOG_MISC(trace, "Replacing ADS config with:\n{}", ads_config.DebugString());
auto strategy_or_error = Config::Utility::prepareJitteredExponentialBackOffStrategy(
ads_config, random_, Envoy::Config::SubscriptionFactory::RetryInitialDelayMs,
Envoy::Config::SubscriptionFactory::RetryMaxDelayMs);
RETURN_IF_NOT_OK_REF(strategy_or_error.status());
JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());

absl::Status status = Config::Utility::checkTransportVersion(ads_config);
RETURN_IF_NOT_OK(status);

auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, ads_config, *stats_.rootScope(), false, 0);
RETURN_IF_NOT_OK_REF(factory_primary_or_error.status());
Grpc::AsyncClientFactoryPtr factory_failover = nullptr;
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
auto factory_failover_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, ads_config, *stats_.rootScope(), false, 1);
RETURN_IF_NOT_OK_REF(factory_failover_or_error.status());
factory_failover = std::move(factory_failover_or_error.value());
}
Grpc::RawAsyncClientPtr primary_client =
factory_primary_or_error.value()->createUncachedRawAsyncClient();
Grpc::RawAsyncClientPtr failover_client =
factory_failover ? factory_failover->createUncachedRawAsyncClient() : nullptr;

// Primary client must not be null, as the primary xDS source must be a valid one.
// The failover_client may be null (no failover defined).
if (primary_client == nullptr) {
return absl::InternalError(
fmt::format("Could not create a valid primary gRPC service from the following config {}",
ads_config.DebugString()));
}

// This will cause a disconnect from the current sources, and replacement of the clients.
status = ads_mux_->updateMuxSource(std::move(primary_client), std::move(failover_client),
*stats_.rootScope(), std::move(backoff_strategy), ads_config);
return status;
}

absl::Status ClusterManagerImpl::initializeSecondaryClusters(
const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
init_helper_.startInitializingSecondaryClusters();
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ class ClusterManagerImpl : public ClusterManager,
}

Config::GrpcMuxSharedPtr adsMux() override { return ads_mux_; }
absl::Status replaceAds(const envoy::config::core::v3::ApiConfigSource& ads_config) override;
Grpc::AsyncClientManager& grpcAsyncClientManager() override { return *async_client_manager_; }

const absl::optional<std::string>& localClusterName() const override {
Expand Down
10 changes: 4 additions & 6 deletions source/extensions/config_subscription/grpc/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,8 @@ GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,

absl::Status
GrpcMuxImpl::updateMuxSource(Grpc::RawAsyncClientPtr&& primary_async_client,
Grpc::RawAsyncClientPtr&& failover_async_client,
CustomConfigValidatorsPtr&& custom_config_validators,
Stats::Scope& scope, BackOffStrategyPtr&& backoff_strategy,
Grpc::RawAsyncClientPtr&& failover_async_client, Stats::Scope& scope,
BackOffStrategyPtr&& backoff_strategy,
const envoy::config::core::v3::ApiConfigSource& ads_config_source) {
// Process the rate limit settings.
absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
Expand All @@ -319,9 +318,8 @@ GrpcMuxImpl::updateMuxSource(Grpc::RawAsyncClientPtr&& primary_async_client,
grpc_stream_ = createGrpcStreamObject(std::move(primary_async_client),
std::move(failover_async_client), service_method, scope,
std::move(backoff_strategy), *rate_limit_settings_or_error);

// Update the config validators.
config_validators_ = std::move(custom_config_validators);
// No need to update the config_validators_ as they may contain some state
// that needs to be kept across different GrpcMux objects.

// Start the subscriptions over the grpc_stream.
grpc_stream_->establishNewStream();
Expand Down
3 changes: 1 addition & 2 deletions source/extensions/config_subscription/grpc/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ class GrpcMuxImpl : public GrpcMux,

absl::Status
updateMuxSource(Grpc::RawAsyncClientPtr&& primary_async_client,
Grpc::RawAsyncClientPtr&& failover_async_client,
CustomConfigValidatorsPtr&& custom_config_validators, Stats::Scope& scope,
Grpc::RawAsyncClientPtr&& failover_async_client, Stats::Scope& scope,
BackOffStrategyPtr&& backoff_strategy,
const envoy::config::core::v3::ApiConfigSource& ads_config_source) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url,
absl::Status
NewGrpcMuxImpl::updateMuxSource(Grpc::RawAsyncClientPtr&& primary_async_client,
Grpc::RawAsyncClientPtr&& failover_async_client,
CustomConfigValidatorsPtr&& custom_config_validators,
Stats::Scope& scope, BackOffStrategyPtr&& backoff_strategy,
const envoy::config::core::v3::ApiConfigSource& ads_config_source) {
// Process the rate limit settings.
Expand All @@ -273,9 +272,9 @@ NewGrpcMuxImpl::updateMuxSource(Grpc::RawAsyncClientPtr&& primary_async_client,
grpc_stream_ = createGrpcStreamObject(std::move(primary_async_client),
std::move(failover_async_client), service_method, scope,
std::move(backoff_strategy), *rate_limit_settings_or_error);
// No need to update the config_validators_ as they may contain some state
// that needs to be kept across different GrpcMux objects.

// Update the config validators.
config_validators_ = std::move(custom_config_validators);
// Update the watch map's config validators.
for (auto& [type_url, subscription] : subscriptions_) {
subscription->watch_map_.setConfigValidators(config_validators_.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ class NewGrpcMuxImpl

absl::Status
updateMuxSource(Grpc::RawAsyncClientPtr&& primary_async_client,
Grpc::RawAsyncClientPtr&& failover_async_client,
CustomConfigValidatorsPtr&& custom_config_validators, Stats::Scope& scope,
Grpc::RawAsyncClientPtr&& failover_async_client, Stats::Scope& scope,
BackOffStrategyPtr&& backoff_strategy,
const envoy::config::core::v3::ApiConfigSource& ads_config_source) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,7 @@ ScopedResume GrpcMuxImpl<S, F, RQ, RS>::pause(const std::vector<std::string> typ
template <class S, class F, class RQ, class RS>
absl::Status GrpcMuxImpl<S, F, RQ, RS>::updateMuxSource(
Grpc::RawAsyncClientPtr&& primary_async_client, Grpc::RawAsyncClientPtr&& failover_async_client,
CustomConfigValidatorsPtr&& custom_config_validators, Stats::Scope& scope,
BackOffStrategyPtr&& backoff_strategy,
Stats::Scope& scope, BackOffStrategyPtr&& backoff_strategy,
const envoy::config::core::v3::ApiConfigSource& ads_config_source) {
// Process the rate limit settings.
absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
Expand All @@ -248,9 +247,9 @@ absl::Status GrpcMuxImpl<S, F, RQ, RS>::updateMuxSource(
grpc_stream_ = createGrpcStreamObject(std::move(primary_async_client),
std::move(failover_async_client), service_method, scope,
std::move(backoff_strategy), *rate_limit_settings_or_error);
// No need to update the config_validators_ as they may contain some state
// that needs to be kept across different GrpcMux objects.

// Update the config validators.
config_validators_ = std::move(custom_config_validators);
// Update the watch map's config validators.
for (auto& [type_url, watch_map] : watch_maps_) {
watch_map->setConfigValidators(config_validators_.get());
Expand Down
Loading

0 comments on commit 4ccd619

Please sign in to comment.