Skip to content

Commit

Permalink
xds-failover: disable moving to primary after fallback responds
Browse files Browse the repository at this point in the history
Signed-off-by: Adi Suissa-Peleg <adip@google.com>
  • Loading branch information
adisuissa committed Sep 30, 2024
1 parent e3ed5a7 commit e4c7986
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 4 deletions.
5 changes: 5 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ minor_behavior_changes:
- area: tracers
change: |
Set status code based on GRPC status code for OpenTelemetry tracers (previously unset).
- area: xds-failover
change: |
Add the ability to stick with either the primary or the failover xDS sources once Envoy connects to one of them.
This was added behind a runtime guard, as it will be removed in the future. To allow sticksiyness the runtime
flag ``envoy.reloadable_features.xds_failover_to_primary_enabled`` must be explicitly set to ``false``.
- area: http2
change: |
Changes the default value of ``envoy.reloadable_features.http2_use_oghttp2`` to ``false``. This changes the codec used for HTTP/2
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ RUNTIME_GUARD(envoy_reloadable_features_use_typed_metadata_in_proxy_protocol_lis
RUNTIME_GUARD(envoy_reloadable_features_validate_connect);
RUNTIME_GUARD(envoy_reloadable_features_validate_grpc_header_before_log_grpc_status);
RUNTIME_GUARD(envoy_reloadable_features_validate_upstream_headers);
RUNTIME_GUARD(envoy_reloadable_features_xds_failover_to_primary_enabled);
RUNTIME_GUARD(envoy_reloadable_features_xdstp_path_avoid_colon_encoding);
RUNTIME_GUARD(envoy_restart_features_allow_client_socket_creation_failure);
RUNTIME_GUARD(envoy_restart_features_allow_slot_destroy_on_worker_threads);
Expand Down
47 changes: 43 additions & 4 deletions source/extensions/config_subscription/grpc/grpc_mux_failover.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,25 @@ class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
"Already connected to an xDS server, skipping establishNewStream() call");
return;
}
if (!Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.xds_failover_to_primary_enabled")) {
// Allow stickiness, so if Envoy was ever connected to the primary source only
// retry to reconnect to the primary source, If Envoy was ever connected to the
// failover source then only retry to reconnect to the failover source.
if (previously_connected_to_ == ConnectedTo::Primary) {
ENVOY_LOG_MISC(
info, "Previously connected to the primary xDS source, attempting to reconnect to it");
connection_state_ = ConnectionState::ConnectingToPrimary;
primary_grpc_stream_->establishNewStream();
return;
} else if (previously_connected_to_ == ConnectedTo::Failover) {
ENVOY_LOG_MISC(
info, "Previously connected to the failover xDS source, attempting to reconnect to it");
connection_state_ = ConnectionState::ConnectingToFailover;
failover_grpc_stream_->establishNewStream();
return;
}
}
// connection_state_ is either None, ConnectingToPrimary or
// ConnectingToFailover. In the first 2 cases try to connect to the primary
// (preferring the primary in the case of None), and in the third case
Expand Down Expand Up @@ -287,10 +306,30 @@ class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
// This will be called when the failover stream fails to establish a connection, or after the
// connection was closed.
ASSERT(parent_.connectingToOrConnectedToFailover());
if (!Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.xds_failover_to_primary_enabled")) {
// If previously Envoy was connected to the failover, keep using that.
// Otherwise let the retry mechanism try to access the primary (similar
// to if the runtime flag was not set).
if (parent_.previously_connected_to_ == ConnectedTo::Failover) {
ENVOY_LOG(debug,
"Failover xDS stream disconnected (either after establishing a connection or "
"before). Attempting to reconnect to Failover because Envoy successfully "
"connected to it previously.");
// Not closing the failover stream, allows it to use its retry timer
// to reconnect to the failover source.
// Next attempt will be to the failover after Envoy was already
// connected to it. Allowing to send the initial_resource_versions on reconnect.
parent_.grpc_mux_callbacks_.onEstablishmentFailure(true);
parent_.connection_state_ = ConnectionState::ConnectingToFailover;
return;
}
}
// Either this was an intentional disconnection from the failover source,
// or unintentional. Either way, try to connect to the primary next.
ENVOY_LOG(debug, "Failover xDS stream diconnected (either after establishing a connection or "
"before). Attempting to connect to the primary stream.");
ENVOY_LOG(debug,
"Failover xDS stream disconnected (either after establishing a connection or "
"before). Attempting to connect to the primary stream.");

// This will close the stream and prevent the retry timer from
// reconnecting to the failover source. The next attempt will be to the
Expand Down Expand Up @@ -398,8 +437,8 @@ class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
// exclusive), it can attempt connecting to more than one source at a time.
ConnectionState connection_state_;

// A flag that keeps track of whether Envoy successfully connected to either the
// primary or failover source. Envoy is considered successfully connected to a source
// A flag that keeps track of whether Envoy successfully connected to the
// primary source. Envoy is considered successfully connected to a source
// once it receives a response from it.
bool ever_connected_to_primary_{false};

Expand Down
1 change: 1 addition & 0 deletions test/extensions/config_subscription/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ envoy_cc_test(
"//test/common/stats:stat_test_utility_lib",
"//test/mocks/config:config_mocks",
"//test/mocks/event:event_mocks",
"//test/test_common:test_runtime_lib",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)
Expand Down
29 changes: 29 additions & 0 deletions test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "test/extensions/config_subscription/grpc/mocks.h"
#include "test/mocks/config/mocks.h"
#include "test/mocks/event/mocks.h"
#include "test/test_common/test_runtime.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"
Expand Down Expand Up @@ -440,6 +441,34 @@ TEST_F(GrpcMuxFailoverTest, AlternatingPrimaryAndFailoverAttemptsAfterFailoverAv
grpc_mux_failover_->establishNewStream();
}

// Validation that when envoy.reloadable_features.xds_failover_to_primary_enabled is disabled
// and after the failover is available (a response is received), Envoy will only
// try to reconnect to the failover.
// This test will be removed once envoy.reloadable_features.xds_failover_to_primary_enabled
// is deprecated.
TEST_F(GrpcMuxFailoverTest, StickToFailoverAfterFailoverAvailable) {
TestScopedRuntime scoped_runtime;
scoped_runtime.mergeValues(
{{"envoy.reloadable_features.xds_failover_to_primary_enabled", "false"}});
connectToFailover();

// Emulate 5 disconnects, and ensure the primary reconnection isn't attempted.
for (int attempt = 0; attempt < 5; ++attempt) {
// Emulate a failover source failure that will not result in an attempt to
// connect to the primary. It should not close the failover stream (so
// the retry mechanism will kick in).
EXPECT_CALL(failover_stream_, closeStream()).Times(0);
EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(true));
EXPECT_CALL(primary_stream_, establishNewStream()).Times(0);
failover_callbacks_->onEstablishmentFailure(true);
}

// Emulate a call to establishNewStream() of the failover stream.
EXPECT_CALL(primary_stream_, establishNewStream()).Times(0);
EXPECT_CALL(failover_stream_, establishNewStream());
grpc_mux_failover_->establishNewStream();
}

// Validates that multiple calls to establishNewStream when connecting to the
// failover are invoked on the failover stream, and not the primary.
TEST_F(GrpcMuxFailoverTest, MultipleEstablishFailoverStream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,4 +906,149 @@ TEST_P(XdsFailoverAdsIntegrationTest,
ASSERT_TRUE(failover_xds_connection_->waitForDisconnect());
}
}

// Validation that when envoy.reloadable_features.xds_failover_to_primary_enabled is disabled
// and after the failover responds and then disconnected, Envoy will only
// try to reconnect to the failover.
// This test will be removed once envoy.reloadable_features.xds_failover_to_primary_enabled
// is deprecated.
TEST_P(XdsFailoverAdsIntegrationTest, NoPrimaryUseAfterFailoverResponse) {
// These tests are not executed with GoogleGrpc because they are flaky due to
// the large timeout values for retries.
SKIP_IF_GRPC_CLIENT(Grpc::ClientType::GoogleGrpc);
#ifdef ENVOY_ENABLE_UHV
// With UHV the finishGrpcStream() isn't detected as invalid frame because of
// no ":status" header, unless "envoy.reloadable_features.enable_universal_header_validator"
// is also enabled.
config_helper_.addRuntimeOverride("envoy.reloadable_features.enable_universal_header_validator",
"true");
#endif
config_helper_.addRuntimeOverride("envoy.reloadable_features.xds_failover_to_primary_enabled",
"false");
// Set a long LDS initial_fetch_timeout to prevent test flakiness when
// reconnecting to the failover multiple times.
config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
auto* lds_config = bootstrap.mutable_dynamic_resources()->mutable_lds_config();
lds_config->mutable_initial_fetch_timeout()->set_seconds(100);
});
initialize();

// 2 consecutive primary failures.
// Expect a connection to the primary. Reject the connection immediately.
primaryConnectionFailure();
ASSERT_TRUE(xds_connection_->waitForDisconnect());
// The CDS request fails when the primary disconnects. After that fetch the config
// dump to ensure that the retry timer kicks in.
// Expect another connection attempt to the primary. Reject the stream (gRPC failure) immediately.
// As this is a 2nd consecutive failure, it will trigger failover.
waitForPrimaryXdsRetryTimer();
primaryConnectionFailure();
ASSERT_TRUE(xds_connection_->waitForDisconnect());

// The CDS request fails when the primary disconnects.
test_server_->waitForCounterGe("cluster_manager.cds.update_failure", 2);

AssertionResult result =
failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_);
RELEASE_ASSERT(result, result.message());
// Failover is healthy, start the ADS gRPC stream.
result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_);
RELEASE_ASSERT(result, result.message());
failover_xds_stream_->startGrpcStream();

// Ensure basic flow with failover works.
EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "", {}, {}, {}, true,
Grpc::Status::WellKnownGrpcStatus::Ok, "",
failover_xds_stream_.get()));
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
CdsTypeUrl, {ConfigHelper::buildCluster("failover_cluster_0")},
{ConfigHelper::buildCluster("failover_cluster_0")}, {}, "failover1", {},
failover_xds_stream_.get());
// Wait for an EDS request, and send its response.
test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1);
test_server_->waitForGaugeEq("cluster.failover_cluster_0.warming_state", 1);
// Ensure basic flow with failover works.
EXPECT_TRUE(compareDiscoveryRequest(
EdsTypeUrl, "", {"failover_cluster_0"}, {"failover_cluster_0"}, {}, false,
Grpc::Status::WellKnownGrpcStatus::Ok, "", failover_xds_stream_.get()));
sendDiscoveryResponse<envoy::config::endpoint::v3::ClusterLoadAssignment>(
EdsTypeUrl, {buildClusterLoadAssignment("failover_cluster_0")},
{buildClusterLoadAssignment("failover_cluster_0")}, {}, "failover1", {},
failover_xds_stream_.get());
test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0);
test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2);
test_server_->waitForGaugeEq("cluster.failover_cluster_0.warming_state", 0);
EXPECT_EQ(2, test_server_->gauge("control_plane.connected_state")->value());
EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "failover1", {}, {}, {}, false,
Grpc::Status::WellKnownGrpcStatus::Ok, "",
failover_xds_stream_.get()));
EXPECT_TRUE(compareDiscoveryRequest(LdsTypeUrl, "", {}, {}, {}, false,
Grpc::Status::WellKnownGrpcStatus::Ok, "",
failover_xds_stream_.get()));

// Envoy has received CDS and EDS responses, it means the failover is available.
// Now disconnect the failover source, this should result in an LDS failure.
// After that add a notification to the main thread to ensure that the retry timer kicks in.
failover_xds_stream_->finishGrpcStream(Grpc::Status::Internal);
test_server_->waitForCounterGe("listener_manager.lds.update_failure", 1);
absl::Notification notification;
test_server_->server().dispatcher().post([&]() { notification.Notify(); });
notification.WaitForNotification();
timeSystem().advanceTimeWait(std::chrono::milliseconds(1000));

// In this case (received a response), both EnvoyGrpc and GoogleGrpc keep the connection open.
result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_);
RELEASE_ASSERT(result, result.message());
// Immediately fail the connection.
failover_xds_stream_->finishGrpcStream(Grpc::Status::Internal);

// Ensure that Envoy still attempts to connect to the primary,
// and keep disconnecting a few times and validate that the failover
// connection isn't attempted.
for (int i = 1; i < 5; ++i) {
ASSERT_TRUE(failover_xds_connection_->waitForDisconnect());
// Wait longer due to the fixed 5 seconds failover .
waitForPrimaryXdsRetryTimer(i, 6);
// EnvoyGrpc will disconnect if the gRPC stream is immediately closed (as
// done above).
result = failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_);
RELEASE_ASSERT(result, result.message());
result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_);
RELEASE_ASSERT(result, result.message());
// Immediately fail the connection.
failover_xds_stream_->finishGrpcStream(Grpc::Status::Internal);
}

// When EnvoyGrpc is used, no new connection to the primary will be attempted.
EXPECT_FALSE(
xds_upstream_->waitForHttpConnection(*dispatcher_, xds_connection_, std::chrono::seconds(1)));

ASSERT_TRUE(failover_xds_connection_->waitForDisconnect());
// Wait longer due to the fixed 5 seconds failover .
waitForPrimaryXdsRetryTimer(5, 6);

// Allow a connection to the failover.
// Expect a connection to the failover when using EnvoyGrpc.
// In case GoogleGrpc is used the current connection will be reused (new stream).
result = failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_);
RELEASE_ASSERT(result, result.message());
result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_);
failover_xds_stream_->startGrpcStream();

// Validate that the initial requests with known versions are sent to the
// failover source.
const absl::flat_hash_map<std::string, std::string> cds_eds_initial_resource_versions_map{
{"failover_cluster_0", "failover1"}};
const absl::flat_hash_map<std::string, std::string> empty_initial_resource_versions_map;
EXPECT_TRUE(compareDiscoveryRequest(
CdsTypeUrl, "1", {}, {}, {}, true, Grpc::Status::WellKnownGrpcStatus::Ok, "",
failover_xds_stream_.get(), OptRef(cds_eds_initial_resource_versions_map)));
EXPECT_TRUE(compareDiscoveryRequest(
EdsTypeUrl, "1", {"failover_cluster_0"}, {"failover_cluster_0"}, {}, false,
Grpc::Status::WellKnownGrpcStatus::Ok, "", failover_xds_stream_.get(),
OptRef(cds_eds_initial_resource_versions_map)));
EXPECT_TRUE(compareDiscoveryRequest(
LdsTypeUrl, "", {}, {}, {}, false, Grpc::Status::WellKnownGrpcStatus::Ok, "",
failover_xds_stream_.get(), OptRef(empty_initial_resource_versions_map)));
}
} // namespace Envoy

0 comments on commit e4c7986

Please sign in to comment.