diff --git a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java index 94b49bd94b2..7cf6280711f 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java @@ -31,6 +31,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.grpc.ChannelCredentials; +import io.grpc.Grpc; import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.internal.ExponentialBackoffPolicy; @@ -43,11 +45,14 @@ import io.grpc.xds.client.XdsClientImpl; import io.grpc.xds.client.XdsClientMetricReporter; import io.grpc.xds.client.XdsInitializationException; +import io.grpc.xds.client.XdsTransportFactory; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -338,9 +343,21 @@ private static void verifyNoSubscribers(ControlPlaneRule rule) { public void connect_then_mainServerDown_fallbackServerUp() throws Exception { mainXdsServer.restartXdsServer(); fallbackServer.restartXdsServer(); + ExecutorService executor = Executors.newFixedThreadPool(1); + XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() { + @Override + public XdsTransport create(Bootstrapper.ServerInfo serverInfo) { + ChannelCredentials channelCredentials = + (ChannelCredentials) serverInfo.implSpecificConfig(); + return new GrpcXdsTransportFactory.GrpcXdsTransport( + Grpc.newChannelBuilder(serverInfo.target(), channelCredentials) + .executor(executor) + .build()); + } + }; XdsClientImpl xdsClient = CommonBootstrapperTestUtils.createXdsClient( new GrpcBootstrapperImpl().bootstrap(defaultBootstrapOverride()), - DEFAULT_XDS_TRANSPORT_FACTORY, fakeClock, new ExponentialBackoffPolicy.Provider(), + xdsTransportFactory, fakeClock, new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, xdsClientMetricReporter); xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); @@ -355,7 +372,8 @@ public void connect_then_mainServerDown_fallbackServerUp() throws Exception { // Sleep for the ADS stream disconnect to be processed and for the retry to fail. Between those // two sleeps we need the fakeClock to progress by 1 second to restart the ADS stream. for (int i = 0; i < 5; i++) { - fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS); + // FakeClock is not thread-safe, and the retry scheduling is concurrent to this test thread + executor.submit(() -> fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS)).get(); TimeUnit.SECONDS.sleep(1); } @@ -393,6 +411,7 @@ public void connect_then_mainServerDown_fallbackServerUp() throws Exception { fakeClock.forwardTime(15000, TimeUnit.MILLISECONDS); // Does not exist timer verify(cdsWatcher2, timeout(5000)).onResourceDoesNotExist(eq(CLUSTER_NAME)); xdsClient.shutdown(); + executor.shutdown(); } @Test