Skip to content

Commit

Permalink
xds: Fix fallback test FakeClock TSAN failure
Browse files Browse the repository at this point in the history
d65d394 increased the test speed of
connect_then_mainServerDown_fallbackServerUp by using FakeClock.
However, it introduced a data race because FakeClock is not thread-safe.
This change injects a single thread for gRPC callbacks such that
syncContext is run on a thread under the test's control.

A simpler approach would be to expose syncContext from XdsClientImpl for
testing. However, this test is in a different package and I wanted to
avoid adding a public method.

```
  Read of size 8 at 0x00008dec9d50 by thread T25:
    #0 io.grpc.internal.FakeClock$ScheduledExecutorImpl.schedule(Lio/grpc/internal/FakeClock$ScheduledTask;JLjava/util/concurrent/TimeUnit;)V FakeClock.java:140
    grpc#1 io.grpc.internal.FakeClock$ScheduledExecutorImpl.schedule(Ljava/lang/Runnable;JLjava/util/concurrent/TimeUnit;)Ljava/util/concurrent/ScheduledFuture; FakeClock.java:150
    grpc#2 io.grpc.SynchronizationContext.schedule(Ljava/lang/Runnable;JLjava/util/concurrent/TimeUnit;Ljava/util/concurrent/ScheduledExecutorService;)Lio/grpc/SynchronizationContext$ScheduledHandle; SynchronizationContext.java:153
    grpc#3 io.grpc.xds.client.ControlPlaneClient$AdsStream.handleRpcStreamClosed(Lio/grpc/Status;)V ControlPlaneClient.java:491
    grpc#4 io.grpc.xds.client.ControlPlaneClient$AdsStream.lambda$onStatusReceived$0(Lio/grpc/Status;)V ControlPlaneClient.java:429
    grpc#5 io.grpc.xds.client.ControlPlaneClient$AdsStream$$Lambda+0x00000001004a95d0.run()V ??
    grpc#6 io.grpc.SynchronizationContext.drain()V SynchronizationContext.java:96
    grpc#7 io.grpc.SynchronizationContext.execute(Ljava/lang/Runnable;)V SynchronizationContext.java:128
    grpc#8 io.grpc.xds.client.ControlPlaneClient$AdsStream.onStatusReceived(Lio/grpc/Status;)V ControlPlaneClient.java:428
    grpc#9 io.grpc.xds.GrpcXdsTransportFactory$EventHandlerToCallListenerAdapter.onClose(Lio/grpc/Status;Lio/grpc/Metadata;)V GrpcXdsTransportFactory.java:149
    grpc#10 io.grpc.PartialForwardingClientCallListener.onClose(Lio/grpc/Status;Lio/grpc/Metadata;)V PartialForwardingClientCallListener.java:39
    ...

  Previous write of size 8 at 0x00008dec9d50 by thread T4 (mutexes: write M0, write M1, write M2, write M3):
    #0 io.grpc.internal.FakeClock.forwardTime(JLjava/util/concurrent/TimeUnit;)I FakeClock.java:368
    grpc#1 io.grpc.xds.XdsClientFallbackTest.connect_then_mainServerDown_fallbackServerUp()V XdsClientFallbackTest.java:358
    ...
```
  • Loading branch information
ejona86 committed Jan 23, 2025
1 parent fc86084 commit 495a890
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.grpc.ChannelCredentials;
import io.grpc.Grpc;
import io.grpc.MetricRecorder;
import io.grpc.Status;
import io.grpc.internal.ExponentialBackoffPolicy;
Expand All @@ -43,11 +45,14 @@
import io.grpc.xds.client.XdsClientImpl;
import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsInitializationException;
import io.grpc.xds.client.XdsTransportFactory;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -338,9 +343,21 @@ private static void verifyNoSubscribers(ControlPlaneRule rule) {
public void connect_then_mainServerDown_fallbackServerUp() throws Exception {
mainXdsServer.restartXdsServer();
fallbackServer.restartXdsServer();
ExecutorService executor = Executors.newFixedThreadPool(1);
XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() {
@Override
public XdsTransport create(Bootstrapper.ServerInfo serverInfo) {
ChannelCredentials channelCredentials =
(ChannelCredentials) serverInfo.implSpecificConfig();
return new GrpcXdsTransportFactory.GrpcXdsTransport(
Grpc.newChannelBuilder(serverInfo.target(), channelCredentials)
.executor(executor)
.build());
}
};
XdsClientImpl xdsClient = CommonBootstrapperTestUtils.createXdsClient(
new GrpcBootstrapperImpl().bootstrap(defaultBootstrapOverride()),
DEFAULT_XDS_TRANSPORT_FACTORY, fakeClock, new ExponentialBackoffPolicy.Provider(),
xdsTransportFactory, fakeClock, new ExponentialBackoffPolicy.Provider(),
MessagePrinter.INSTANCE, xdsClientMetricReporter);

xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher);
Expand All @@ -355,7 +372,8 @@ public void connect_then_mainServerDown_fallbackServerUp() throws Exception {
// Sleep for the ADS stream disconnect to be processed and for the retry to fail. Between those
// two sleeps we need the fakeClock to progress by 1 second to restart the ADS stream.
for (int i = 0; i < 5; i++) {
fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS);
// FakeClock is not thread-safe, and the retry scheduling is concurrent to this test thread
executor.submit(() -> fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS)).get();
TimeUnit.SECONDS.sleep(1);
}

Expand Down Expand Up @@ -393,6 +411,7 @@ public void connect_then_mainServerDown_fallbackServerUp() throws Exception {
fakeClock.forwardTime(15000, TimeUnit.MILLISECONDS); // Does not exist timer
verify(cdsWatcher2, timeout(5000)).onResourceDoesNotExist(eq(CLUSTER_NAME));
xdsClient.shutdown();
executor.shutdown();
}

@Test
Expand Down

0 comments on commit 495a890

Please sign in to comment.