From 495a8906b297650fe823b5711993a0e73077c335 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 22 Jan 2025 12:10:56 -0800 Subject: [PATCH] xds: Fix fallback test FakeClock TSAN failure d65d3942e increased the test speed of connect_then_mainServerDown_fallbackServerUp by using FakeClock. However, it introduced a data race because FakeClock is not thread-safe. This change injects a single thread for gRPC callbacks such that syncContext is run on a thread under the test's control. A simpler approach would be to expose syncContext from XdsClientImpl for testing. However, this test is in a different package and I wanted to avoid adding a public method. ``` Read of size 8 at 0x00008dec9d50 by thread T25: #0 io.grpc.internal.FakeClock$ScheduledExecutorImpl.schedule(Lio/grpc/internal/FakeClock$ScheduledTask;JLjava/util/concurrent/TimeUnit;)V FakeClock.java:140 #1 io.grpc.internal.FakeClock$ScheduledExecutorImpl.schedule(Ljava/lang/Runnable;JLjava/util/concurrent/TimeUnit;)Ljava/util/concurrent/ScheduledFuture; FakeClock.java:150 #2 io.grpc.SynchronizationContext.schedule(Ljava/lang/Runnable;JLjava/util/concurrent/TimeUnit;Ljava/util/concurrent/ScheduledExecutorService;)Lio/grpc/SynchronizationContext$ScheduledHandle; SynchronizationContext.java:153 #3 io.grpc.xds.client.ControlPlaneClient$AdsStream.handleRpcStreamClosed(Lio/grpc/Status;)V ControlPlaneClient.java:491 #4 io.grpc.xds.client.ControlPlaneClient$AdsStream.lambda$onStatusReceived$0(Lio/grpc/Status;)V ControlPlaneClient.java:429 #5 io.grpc.xds.client.ControlPlaneClient$AdsStream$$Lambda+0x00000001004a95d0.run()V ?? #6 io.grpc.SynchronizationContext.drain()V SynchronizationContext.java:96 #7 io.grpc.SynchronizationContext.execute(Ljava/lang/Runnable;)V SynchronizationContext.java:128 #8 io.grpc.xds.client.ControlPlaneClient$AdsStream.onStatusReceived(Lio/grpc/Status;)V ControlPlaneClient.java:428 #9 io.grpc.xds.GrpcXdsTransportFactory$EventHandlerToCallListenerAdapter.onClose(Lio/grpc/Status;Lio/grpc/Metadata;)V GrpcXdsTransportFactory.java:149 #10 io.grpc.PartialForwardingClientCallListener.onClose(Lio/grpc/Status;Lio/grpc/Metadata;)V PartialForwardingClientCallListener.java:39 ... Previous write of size 8 at 0x00008dec9d50 by thread T4 (mutexes: write M0, write M1, write M2, write M3): #0 io.grpc.internal.FakeClock.forwardTime(JLjava/util/concurrent/TimeUnit;)I FakeClock.java:368 #1 io.grpc.xds.XdsClientFallbackTest.connect_then_mainServerDown_fallbackServerUp()V XdsClientFallbackTest.java:358 ... ``` --- .../io/grpc/xds/XdsClientFallbackTest.java | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java index 94b49bd94b2..7cf6280711f 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java @@ -31,6 +31,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.grpc.ChannelCredentials; +import io.grpc.Grpc; import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.internal.ExponentialBackoffPolicy; @@ -43,11 +45,14 @@ import io.grpc.xds.client.XdsClientImpl; import io.grpc.xds.client.XdsClientMetricReporter; import io.grpc.xds.client.XdsInitializationException; +import io.grpc.xds.client.XdsTransportFactory; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -338,9 +343,21 @@ private static void verifyNoSubscribers(ControlPlaneRule rule) { public void connect_then_mainServerDown_fallbackServerUp() throws Exception { mainXdsServer.restartXdsServer(); fallbackServer.restartXdsServer(); + ExecutorService executor = Executors.newFixedThreadPool(1); + XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() { + @Override + public XdsTransport create(Bootstrapper.ServerInfo serverInfo) { + ChannelCredentials channelCredentials = + (ChannelCredentials) serverInfo.implSpecificConfig(); + return new GrpcXdsTransportFactory.GrpcXdsTransport( + Grpc.newChannelBuilder(serverInfo.target(), channelCredentials) + .executor(executor) + .build()); + } + }; XdsClientImpl xdsClient = CommonBootstrapperTestUtils.createXdsClient( new GrpcBootstrapperImpl().bootstrap(defaultBootstrapOverride()), - DEFAULT_XDS_TRANSPORT_FACTORY, fakeClock, new ExponentialBackoffPolicy.Provider(), + xdsTransportFactory, fakeClock, new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, xdsClientMetricReporter); xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); @@ -355,7 +372,8 @@ public void connect_then_mainServerDown_fallbackServerUp() throws Exception { // Sleep for the ADS stream disconnect to be processed and for the retry to fail. Between those // two sleeps we need the fakeClock to progress by 1 second to restart the ADS stream. for (int i = 0; i < 5; i++) { - fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS); + // FakeClock is not thread-safe, and the retry scheduling is concurrent to this test thread + executor.submit(() -> fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS)).get(); TimeUnit.SECONDS.sleep(1); } @@ -393,6 +411,7 @@ public void connect_then_mainServerDown_fallbackServerUp() throws Exception { fakeClock.forwardTime(15000, TimeUnit.MILLISECONDS); // Does not exist timer verify(cdsWatcher2, timeout(5000)).onResourceDoesNotExist(eq(CLUSTER_NAME)); xdsClient.shutdown(); + executor.shutdown(); } @Test