diff --git a/changelogs/current.yaml b/changelogs/current.yaml index b773fb76f626..69187b1f0315 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -112,6 +112,11 @@ minor_behavior_changes: - area: tracers change: | Set status code based on GRPC status code for OpenTelemetry tracers (previously unset). +- area: xds-failover + change: | + Add the ability to stick with either the primary or the failover xDS sources once Envoy connects to one of them. + This was added behind a runtime guard, as it will be removed in the future. To allow sticksiyness the runtime + flag ``envoy.reloadable_features.xds_failover_to_primary_enabled`` must be explicitly set to ``false``. - area: http2 change: | Changes the default value of ``envoy.reloadable_features.http2_use_oghttp2`` to ``false``. This changes the codec used for HTTP/2 diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index 6ec39cd77dd3..85e75b3fadf1 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -105,6 +105,7 @@ RUNTIME_GUARD(envoy_reloadable_features_use_typed_metadata_in_proxy_protocol_lis RUNTIME_GUARD(envoy_reloadable_features_validate_connect); RUNTIME_GUARD(envoy_reloadable_features_validate_grpc_header_before_log_grpc_status); RUNTIME_GUARD(envoy_reloadable_features_validate_upstream_headers); +RUNTIME_GUARD(envoy_reloadable_features_xds_failover_to_primary_enabled); RUNTIME_GUARD(envoy_reloadable_features_xdstp_path_avoid_colon_encoding); RUNTIME_GUARD(envoy_restart_features_allow_client_socket_creation_failure); RUNTIME_GUARD(envoy_restart_features_allow_slot_destroy_on_worker_threads); diff --git a/source/extensions/config_subscription/grpc/grpc_mux_failover.h b/source/extensions/config_subscription/grpc/grpc_mux_failover.h index 19765df61fc7..c37e7f7f6e35 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_failover.h +++ b/source/extensions/config_subscription/grpc/grpc_mux_failover.h @@ -102,6 +102,25 @@ class GrpcMuxFailover : public GrpcStreamInterface, "Already connected to an xDS server, skipping establishNewStream() call"); return; } + if (!Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.xds_failover_to_primary_enabled")) { + // Allow stickiness, so if Envoy was ever connected to the primary source only + // retry to reconnect to the primary source, If Envoy was ever connected to the + // failover source then only retry to reconnect to the failover source. + if (previously_connected_to_ == ConnectedTo::Primary) { + ENVOY_LOG_MISC( + info, "Previously connected to the primary xDS source, attempting to reconnect to it"); + connection_state_ = ConnectionState::ConnectingToPrimary; + primary_grpc_stream_->establishNewStream(); + return; + } else if (previously_connected_to_ == ConnectedTo::Failover) { + ENVOY_LOG_MISC( + info, "Previously connected to the failover xDS source, attempting to reconnect to it"); + connection_state_ = ConnectionState::ConnectingToFailover; + failover_grpc_stream_->establishNewStream(); + return; + } + } // connection_state_ is either None, ConnectingToPrimary or // ConnectingToFailover. In the first 2 cases try to connect to the primary // (preferring the primary in the case of None), and in the third case @@ -287,10 +306,30 @@ class GrpcMuxFailover : public GrpcStreamInterface, // This will be called when the failover stream fails to establish a connection, or after the // connection was closed. ASSERT(parent_.connectingToOrConnectedToFailover()); + if (!Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.xds_failover_to_primary_enabled")) { + // If previously Envoy was connected to the failover, keep using that. + // Otherwise let the retry mechanism try to access the primary (similar + // to if the runtime flag was not set). + if (parent_.previously_connected_to_ == ConnectedTo::Failover) { + ENVOY_LOG(debug, + "Failover xDS stream disconnected (either after establishing a connection or " + "before). Attempting to reconnect to Failover because Envoy successfully " + "connected to it previously."); + // Not closing the failover stream, allows it to use its retry timer + // to reconnect to the failover source. + // Next attempt will be to the failover after Envoy was already + // connected to it. Allowing to send the initial_resource_versions on reconnect. + parent_.grpc_mux_callbacks_.onEstablishmentFailure(true); + parent_.connection_state_ = ConnectionState::ConnectingToFailover; + return; + } + } // Either this was an intentional disconnection from the failover source, // or unintentional. Either way, try to connect to the primary next. - ENVOY_LOG(debug, "Failover xDS stream diconnected (either after establishing a connection or " - "before). Attempting to connect to the primary stream."); + ENVOY_LOG(debug, + "Failover xDS stream disconnected (either after establishing a connection or " + "before). Attempting to connect to the primary stream."); // This will close the stream and prevent the retry timer from // reconnecting to the failover source. The next attempt will be to the @@ -398,8 +437,8 @@ class GrpcMuxFailover : public GrpcStreamInterface, // exclusive), it can attempt connecting to more than one source at a time. ConnectionState connection_state_; - // A flag that keeps track of whether Envoy successfully connected to either the - // primary or failover source. Envoy is considered successfully connected to a source + // A flag that keeps track of whether Envoy successfully connected to the + // primary source. Envoy is considered successfully connected to a source // once it receives a response from it. bool ever_connected_to_primary_{false}; diff --git a/test/extensions/config_subscription/grpc/BUILD b/test/extensions/config_subscription/grpc/BUILD index b0f9ce75ed79..d55202d11637 100644 --- a/test/extensions/config_subscription/grpc/BUILD +++ b/test/extensions/config_subscription/grpc/BUILD @@ -270,6 +270,7 @@ envoy_cc_test( "//test/common/stats:stat_test_utility_lib", "//test/mocks/config:config_mocks", "//test/mocks/event:event_mocks", + "//test/test_common:test_runtime_lib", "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", ], ) diff --git a/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc b/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc index 8a990f134fcb..25359b2d208a 100644 --- a/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc +++ b/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc @@ -6,6 +6,7 @@ #include "test/extensions/config_subscription/grpc/mocks.h" #include "test/mocks/config/mocks.h" #include "test/mocks/event/mocks.h" +#include "test/test_common/test_runtime.h" #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -440,6 +441,34 @@ TEST_F(GrpcMuxFailoverTest, AlternatingPrimaryAndFailoverAttemptsAfterFailoverAv grpc_mux_failover_->establishNewStream(); } +// Validation that when envoy.reloadable_features.xds_failover_to_primary_enabled is disabled +// and after the failover is available (a response is received), Envoy will only +// try to reconnect to the failover. +// This test will be removed once envoy.reloadable_features.xds_failover_to_primary_enabled +// is deprecated. +TEST_F(GrpcMuxFailoverTest, StickToFailoverAfterFailoverAvailable) { + TestScopedRuntime scoped_runtime; + scoped_runtime.mergeValues( + {{"envoy.reloadable_features.xds_failover_to_primary_enabled", "false"}}); + connectToFailover(); + + // Emulate 5 disconnects, and ensure the primary reconnection isn't attempted. + for (int attempt = 0; attempt < 5; ++attempt) { + // Emulate a failover source failure that will not result in an attempt to + // connect to the primary. It should not close the failover stream (so + // the retry mechanism will kick in). + EXPECT_CALL(failover_stream_, closeStream()).Times(0); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(true)); + EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); + failover_callbacks_->onEstablishmentFailure(true); + } + + // Emulate a call to establishNewStream() of the failover stream. + EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); + EXPECT_CALL(failover_stream_, establishNewStream()); + grpc_mux_failover_->establishNewStream(); +} + // Validates that multiple calls to establishNewStream when connecting to the // failover are invoked on the failover stream, and not the primary. TEST_F(GrpcMuxFailoverTest, MultipleEstablishFailoverStream) { diff --git a/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc b/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc index de4948c91336..d217ab1e3ab9 100644 --- a/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc +++ b/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc @@ -906,4 +906,149 @@ TEST_P(XdsFailoverAdsIntegrationTest, ASSERT_TRUE(failover_xds_connection_->waitForDisconnect()); } } + +// Validation that when envoy.reloadable_features.xds_failover_to_primary_enabled is disabled +// and after the failover responds and then disconnected, Envoy will only +// try to reconnect to the failover. +// This test will be removed once envoy.reloadable_features.xds_failover_to_primary_enabled +// is deprecated. +TEST_P(XdsFailoverAdsIntegrationTest, NoPrimaryUseAfterFailoverResponse) { + // These tests are not executed with GoogleGrpc because they are flaky due to + // the large timeout values for retries. + SKIP_IF_GRPC_CLIENT(Grpc::ClientType::GoogleGrpc); +#ifdef ENVOY_ENABLE_UHV + // With UHV the finishGrpcStream() isn't detected as invalid frame because of + // no ":status" header, unless "envoy.reloadable_features.enable_universal_header_validator" + // is also enabled. + config_helper_.addRuntimeOverride("envoy.reloadable_features.enable_universal_header_validator", + "true"); +#endif + config_helper_.addRuntimeOverride("envoy.reloadable_features.xds_failover_to_primary_enabled", + "false"); + // Set a long LDS initial_fetch_timeout to prevent test flakiness when + // reconnecting to the failover multiple times. + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* lds_config = bootstrap.mutable_dynamic_resources()->mutable_lds_config(); + lds_config->mutable_initial_fetch_timeout()->set_seconds(100); + }); + initialize(); + + // 2 consecutive primary failures. + // Expect a connection to the primary. Reject the connection immediately. + primaryConnectionFailure(); + ASSERT_TRUE(xds_connection_->waitForDisconnect()); + // The CDS request fails when the primary disconnects. After that fetch the config + // dump to ensure that the retry timer kicks in. + // Expect another connection attempt to the primary. Reject the stream (gRPC failure) immediately. + // As this is a 2nd consecutive failure, it will trigger failover. + waitForPrimaryXdsRetryTimer(); + primaryConnectionFailure(); + ASSERT_TRUE(xds_connection_->waitForDisconnect()); + + // The CDS request fails when the primary disconnects. + test_server_->waitForCounterGe("cluster_manager.cds.update_failure", 2); + + AssertionResult result = + failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); + RELEASE_ASSERT(result, result.message()); + // Failover is healthy, start the ADS gRPC stream. + result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); + RELEASE_ASSERT(result, result.message()); + failover_xds_stream_->startGrpcStream(); + + // Ensure basic flow with failover works. + EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "", {}, {}, {}, true, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get())); + sendDiscoveryResponse( + CdsTypeUrl, {ConfigHelper::buildCluster("failover_cluster_0")}, + {ConfigHelper::buildCluster("failover_cluster_0")}, {}, "failover1", {}, + failover_xds_stream_.get()); + // Wait for an EDS request, and send its response. + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + test_server_->waitForGaugeEq("cluster.failover_cluster_0.warming_state", 1); + // Ensure basic flow with failover works. + EXPECT_TRUE(compareDiscoveryRequest( + EdsTypeUrl, "", {"failover_cluster_0"}, {"failover_cluster_0"}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", failover_xds_stream_.get())); + sendDiscoveryResponse( + EdsTypeUrl, {buildClusterLoadAssignment("failover_cluster_0")}, + {buildClusterLoadAssignment("failover_cluster_0")}, {}, "failover1", {}, + failover_xds_stream_.get()); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); + test_server_->waitForGaugeEq("cluster.failover_cluster_0.warming_state", 0); + EXPECT_EQ(2, test_server_->gauge("control_plane.connected_state")->value()); + EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "failover1", {}, {}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get())); + EXPECT_TRUE(compareDiscoveryRequest(LdsTypeUrl, "", {}, {}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get())); + + // Envoy has received CDS and EDS responses, it means the failover is available. + // Now disconnect the failover source, this should result in an LDS failure. + // After that add a notification to the main thread to ensure that the retry timer kicks in. + failover_xds_stream_->finishGrpcStream(Grpc::Status::Internal); + test_server_->waitForCounterGe("listener_manager.lds.update_failure", 1); + absl::Notification notification; + test_server_->server().dispatcher().post([&]() { notification.Notify(); }); + notification.WaitForNotification(); + timeSystem().advanceTimeWait(std::chrono::milliseconds(1000)); + + // In this case (received a response), both EnvoyGrpc and GoogleGrpc keep the connection open. + result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); + RELEASE_ASSERT(result, result.message()); + // Immediately fail the connection. + failover_xds_stream_->finishGrpcStream(Grpc::Status::Internal); + + // Ensure that Envoy still attempts to connect to the primary, + // and keep disconnecting a few times and validate that the failover + // connection isn't attempted. + for (int i = 1; i < 5; ++i) { + ASSERT_TRUE(failover_xds_connection_->waitForDisconnect()); + // Wait longer due to the fixed 5 seconds failover . + waitForPrimaryXdsRetryTimer(i, 6); + // EnvoyGrpc will disconnect if the gRPC stream is immediately closed (as + // done above). + result = failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); + RELEASE_ASSERT(result, result.message()); + // Immediately fail the connection. + failover_xds_stream_->finishGrpcStream(Grpc::Status::Internal); + } + + // When EnvoyGrpc is used, no new connection to the primary will be attempted. + EXPECT_FALSE( + xds_upstream_->waitForHttpConnection(*dispatcher_, xds_connection_, std::chrono::seconds(1))); + + ASSERT_TRUE(failover_xds_connection_->waitForDisconnect()); + // Wait longer due to the fixed 5 seconds failover . + waitForPrimaryXdsRetryTimer(5, 6); + + // Allow a connection to the failover. + // Expect a connection to the failover when using EnvoyGrpc. + // In case GoogleGrpc is used the current connection will be reused (new stream). + result = failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); + failover_xds_stream_->startGrpcStream(); + + // Validate that the initial requests with known versions are sent to the + // failover source. + const absl::flat_hash_map cds_eds_initial_resource_versions_map{ + {"failover_cluster_0", "failover1"}}; + const absl::flat_hash_map empty_initial_resource_versions_map; + EXPECT_TRUE(compareDiscoveryRequest( + CdsTypeUrl, "1", {}, {}, {}, true, Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get(), OptRef(cds_eds_initial_resource_versions_map))); + EXPECT_TRUE(compareDiscoveryRequest( + EdsTypeUrl, "1", {"failover_cluster_0"}, {"failover_cluster_0"}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", failover_xds_stream_.get(), + OptRef(cds_eds_initial_resource_versions_map))); + EXPECT_TRUE(compareDiscoveryRequest( + LdsTypeUrl, "", {}, {}, {}, false, Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get(), OptRef(empty_initial_resource_versions_map))); +} } // namespace Envoy