Skip to content

Commit

Permalink
test: fix flakes in HttpJsonDirectServerStreamingCallableTest
Browse files Browse the repository at this point in the history
The flakes seem to stem from parallel execution and the resulting race conditions around static member variables, particularly the `mockService`. Attempting to fix this by using a separate `mockService` for each test.

Fixes: #1905.
Fixes: #2107.
Fixes: #1876.
Fixes: #2083.
Fixes: #1842.
Fixes: #1587.
Fixes: #1684.
  • Loading branch information
meltsufin committed Jan 31, 2024
1 parent 9916540 commit 342467d
Showing 1 changed file with 27 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -107,9 +106,7 @@ public class HttpJsonDirectServerStreamingCallableTest {
.setType(MethodType.SERVER_STREAMING)
.build();

private static final MockHttpService MOCK_SERVICE =
new MockHttpService(
Collections.singletonList(METHOD_SERVER_STREAMING_RECOGNIZE), "google.com:443");
private MockHttpService mockService;

private static final Color DEFAULT_REQUEST = Color.newBuilder().setRed(0.5f).build();
private static final Color ASYNC_REQUEST = DEFAULT_REQUEST.toBuilder().setGreen(1000).build();
Expand All @@ -120,22 +117,25 @@ public class HttpJsonDirectServerStreamingCallableTest {
Money.newBuilder().setCurrencyCode("UAH").setUnits(255).build();
private static final int AWAIT_TERMINATION_SECONDS = 10;

private static ServerStreamingCallSettings<Color, Money> streamingCallSettings;
private static ServerStreamingCallable<Color, Money> streamingCallable;
private ServerStreamingCallSettings<Color, Money> streamingCallSettings;
private ServerStreamingCallable<Color, Money> streamingCallable;

private static ManagedHttpJsonChannel channel;
private static ClientContext clientContext;
private static ExecutorService executorService;
private ManagedHttpJsonChannel channel;
private ClientContext clientContext;
private ExecutorService executorService;

@BeforeClass
public static void initialize() throws IOException {
@Before
public void initialize() throws IOException {
mockService =
new MockHttpService(
Collections.singletonList(METHOD_SERVER_STREAMING_RECOGNIZE), "google.com:443");
executorService = Executors.newFixedThreadPool(2);
channel =
new ManagedHttpJsonInterceptorChannel(
ManagedHttpJsonChannel.newBuilder()
.setEndpoint("google.com:443")
.setExecutor(executorService)
.setHttpTransport(MOCK_SERVICE)
.setHttpTransport(mockService)
.build(),
new HttpJsonHeaderInterceptor(Collections.singletonMap("header-key", "headerValue")));
EndpointContext endpointContext = Mockito.mock(EndpointContext.class);
Expand All @@ -158,25 +158,23 @@ public static void initialize() throws IOException {
HttpJsonCallSettings.create(METHOD_SERVER_STREAMING_RECOGNIZE),
streamingCallSettings,
clientContext);

mockService.reset();
}

@AfterClass
public static void destroy() throws InterruptedException {
@After
public void destroy() throws InterruptedException {
executorService.shutdown();
channel.shutdown();

executorService.awaitTermination(AWAIT_TERMINATION_SECONDS, TimeUnit.SECONDS);
channel.awaitTermination(AWAIT_TERMINATION_SECONDS, TimeUnit.SECONDS);
}

@After
public void tearDown() throws InterruptedException {
MOCK_SERVICE.reset();
mockService.reset();
}

@Test
public void testBadContext() {
MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE});
mockService.addResponse(new Money[] {DEFAULT_RESPONSE});
// Create a local callable with a bad context
ServerStreamingCallable<Color, Money> streamingCallable =
HttpJsonCallableFactory.createServerStreamingCallable(
Expand All @@ -202,7 +200,7 @@ public void testBadContext() {

@Test
public void testServerStreamingStart() throws InterruptedException {
MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE});
mockService.addResponse(new Money[] {DEFAULT_RESPONSE});
CountDownLatch latch = new CountDownLatch(1);
MoneyObserver moneyObserver = new MoneyObserver(true, latch);

Expand All @@ -217,7 +215,7 @@ public void testServerStreamingStart() throws InterruptedException {

@Test
public void testServerStreaming() throws InterruptedException {
MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE, DEFAULTER_RESPONSE});
mockService.addResponse(new Money[] {DEFAULT_RESPONSE, DEFAULTER_RESPONSE});
CountDownLatch latch = new CountDownLatch(3);
MoneyObserver moneyObserver = new MoneyObserver(true, latch);

Expand All @@ -231,7 +229,7 @@ public void testServerStreaming() throws InterruptedException {

@Test
public void testManualFlowControl() throws Exception {
MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE});
mockService.addResponse(new Money[] {DEFAULT_RESPONSE});
CountDownLatch latch = new CountDownLatch(2);
MoneyObserver moneyObserver = new MoneyObserver(false, latch);

Expand All @@ -251,7 +249,7 @@ public void testManualFlowControl() throws Exception {

@Test
public void testCancelClientCall() throws Exception {
MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE});
mockService.addResponse(new Money[] {DEFAULT_RESPONSE});
CountDownLatch latch = new CountDownLatch(1);
MoneyObserver moneyObserver = new MoneyObserver(false, latch);

Expand All @@ -267,7 +265,7 @@ public void testCancelClientCall() throws Exception {

@Test
public void testOnResponseError() throws Throwable {
MOCK_SERVICE.addException(404, new RuntimeException("some error"));
mockService.addException(404, new RuntimeException("some error"));

CountDownLatch latch = new CountDownLatch(1);
MoneyObserver moneyObserver = new MoneyObserver(true, latch);
Expand All @@ -292,7 +290,7 @@ public void testOnResponseError() throws Throwable {

@Test
public void testObserverErrorCancelsCall() throws Throwable {
MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE});
mockService.addResponse(new Money[] {DEFAULT_RESPONSE});
final RuntimeException expectedCause = new RuntimeException("some error");
final SettableApiFuture<Throwable> actualErrorF = SettableApiFuture.create();

Expand Down Expand Up @@ -332,7 +330,7 @@ protected void onCompleteImpl() {

@Test
public void testBlockingServerStreaming() {
MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE});
mockService.addResponse(new Money[] {DEFAULT_RESPONSE});
Color request = Color.newBuilder().setRed(0.5f).build();
ServerStream<Money> response = streamingCallable.call(request);
List<Money> responseData = Lists.newArrayList(response);
Expand All @@ -344,7 +342,7 @@ public void testBlockingServerStreaming() {
// This test ensures that the server-side streaming does not exceed the timeout value
@Test
public void testDeadlineExceededServerStreaming() throws InterruptedException {
MOCK_SERVICE.addResponse(
mockService.addResponse(
new Money[] {DEFAULT_RESPONSE, DEFAULTER_RESPONSE}, java.time.Duration.ofSeconds(5));
Color request = Color.newBuilder().setRed(0.5f).build();
CountDownLatch latch = new CountDownLatch(1);
Expand Down

0 comments on commit 342467d

Please sign in to comment.