From 4c182337c1a56dfa0cd13a4a1a424de1fbf5baa0 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Wed, 3 Apr 2019 12:12:43 +0530 Subject: [PATCH 1/9] Batcher implementation with Synchronized `Batcher#flush`(without triggers) - This implementation does not include any triggers for sending received elements for batching.(This will be added in followup PRs). - User has to explicitly call the `Batcher#flush()` to send the accumulated request for batching. - Here `BatcherImpl` expects `v2.BatchingDescriptor`, `v2.BatchingCallSetting`, UnaryCallable and a request prototype containing repetitive data which would be copied over to each batch along with the elements. - `v2.BatchingCallSettings` is an extension from existing `BatchingCallSettings`. --- .../api/gax/batching/v2/BatcherImpl.java | 189 +++++++++++++++++ .../gax/batching/v2/BatchingCallSettings.java | 141 +++++++++++++ .../api/gax/batching/v2/BatcherImplTest.java | 197 ++++++++++++++++++ .../batching/v2/BatchingCallSettingsTest.java | 126 +++++++++++ .../api/gax/rpc/testing/FakeBatchableApi.java | 50 +++++ 5 files changed, 703 insertions(+) create mode 100644 gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java create mode 100644 gax/src/main/java/com/google/api/gax/batching/v2/BatchingCallSettings.java create mode 100644 gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java create mode 100644 gax/src/test/java/com/google/api/gax/batching/v2/BatchingCallSettingsTest.java diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java new file mode 100644 index 000000000..462dd852f --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java @@ -0,0 +1,189 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching.v2; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; + +/** + * Queues up the elements until {@link #flush()} is called, Once batching is finished returned + * future gets resolves. + * + *

This class is not thread-safe, and requires calling classes to make it thread safe. + */ +public class BatcherImpl + implements Batcher { + + /** The amount of time to wait before checking responses are received or not. */ + private static final long DEFAULT_FINISH_WAIT_NANOS = 250_000_000; + + private final BatchingDescriptor + batchingDescriptor; + private final UnaryCallable callable; + private final RequestT prototype; + + private final AtomicInteger numOfRpcs = new AtomicInteger(0); + private Batch batch; + private boolean isClosed = false; + + private BatcherImpl(Builder builder) { + this.prototype = Preconditions.checkNotNull(builder.prototype); + this.callable = Preconditions.checkNotNull(builder.unaryCallable); + this.batchingDescriptor = Preconditions.checkNotNull(builder.batchingDescriptor); + } + + /** Builder for a BatcherImpl. */ + public static class Builder { + private BatchingDescriptor batchingDescriptor; + private UnaryCallable unaryCallable; + private RequestT prototype; + + private Builder() {} + + public Builder setBatchingDescriptor( + BatchingDescriptor batchingDescriptor) { + this.batchingDescriptor = batchingDescriptor; + return this; + } + + public Builder setUnaryCallable( + UnaryCallable unaryCallable) { + this.unaryCallable = unaryCallable; + return this; + } + + public Builder setPrototype(RequestT prototype) { + this.prototype = prototype; + return this; + } + + public BatcherImpl build() { + return new BatcherImpl<>(this); + } + } + + public static + Builder newBuilder() { + return new Builder<>(); + } + + /** {@inheritDoc} */ + @Override + public ApiFuture add(final ElementT element) { + Preconditions.checkState(!isClosed, "Cannot perform batching on a closed connection"); + + if (batch == null) { + batch = new Batch(batchingDescriptor.newRequestBuilder(prototype)); + } + + SettableApiFuture result = SettableApiFuture.create(); + batch.add(element, result); + return result; + } + + /** {@inheritDoc} */ + @Override + public void flush() throws InterruptedException { + sendBatch(); + while (numOfRpcs.get() > 0) { + LockSupport.parkNanos(DEFAULT_FINISH_WAIT_NANOS); + } + } + + /** sends accumulated elements asynchronously for batching. */ + private void sendBatch() { + if (batch == null) { + return; + } + final Batch accumulatedBatch = batch; + batch = null; + numOfRpcs.incrementAndGet(); + + final ApiFuture batchResponse = + callable.futureCall(accumulatedBatch.builder.build()); + + ApiFutures.addCallback( + batchResponse, + new ApiFutureCallback() { + @Override + public void onSuccess(ResponseT response) { + batchingDescriptor.splitResponse(response, accumulatedBatch.results); + onCompletion(); + } + + @Override + public void onFailure(Throwable throwable) { + batchingDescriptor.splitException(throwable, accumulatedBatch.results); + onCompletion(); + } + }, + directExecutor()); + } + + private void onCompletion() { + numOfRpcs.decrementAndGet(); + } + + /** {@inheritDoc} */ + @Override + public void close() throws InterruptedException { + isClosed = true; + flush(); + } + + /** + * This class represent one logical Batch. It accumulates all the elements and it's corresponding + * future element results for one batch. + */ + class Batch { + private final RequestBuilder builder; + private final List> results; + + private Batch(RequestBuilder builder) { + this.builder = builder; + this.results = new ArrayList<>(); + } + + void add(ElementT element, SettableApiFuture result) { + builder.add(element); + results.add(result); + } + } +} diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatchingCallSettings.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatchingCallSettings.java new file mode 100644 index 000000000..20044bc6d --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatchingCallSettings.java @@ -0,0 +1,141 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching.v2; + +import com.google.api.core.BetaApi; +import com.google.api.core.InternalExtensionOnly; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.common.base.Preconditions; +import java.util.Set; + +/** + * This is an extension to {@link com.google.api.gax.rpc.BatchingCallSettings}, to support for + * {@link ElementT} and {@link ElementResultT} types. + * + * @param The request type to perform batching. + * @param The response type of an entry object. + * @param The request wrapper type which bundles {@link ElementT}. + * @param The response wrapper type which bundles {@link ElementResultT}. + */ +@BetaApi("The surface for batching is not stable yet and may change in the future.") +@InternalExtensionOnly +public final class BatchingCallSettings + extends UnaryCallSettings { + private final BatchingDescriptor + batchingDescriptor; + private final BatchingSettings batchingSettings; + + private BatchingCallSettings(Builder builder) { + super(builder); + this.batchingDescriptor = builder.batchingDescriptor; + this.batchingSettings = Preconditions.checkNotNull(builder.batchingSettings); + } + + public BatchingDescriptor getBatchingDescriptor() { + return batchingDescriptor; + } + + public BatchingSettings getBatchingSettings() { + return batchingSettings; + } + + public static + Builder newBuilder( + BatchingDescriptor batchingDescriptor) { + return new Builder<>(batchingDescriptor); + } + + @Override + public final Builder toBuilder() { + return new Builder<>(this); + } + + public static class Builder + extends UnaryCallSettings.Builder { + + private BatchingDescriptor batchingDescriptor; + private BatchingSettings batchingSettings; + + private Builder( + BatchingDescriptor batchingDescriptor) { + this.batchingDescriptor = batchingDescriptor; + } + + private Builder(BatchingCallSettings settings) { + super(settings); + this.batchingDescriptor = settings.batchingDescriptor; + this.batchingSettings = settings.batchingSettings; + } + + public BatchingDescriptor + getBatchingDescriptor() { + return batchingDescriptor; + } + + public Builder setBatchingSettings( + BatchingSettings batchingSettings) { + this.batchingSettings = batchingSettings; + return this; + } + + public BatchingSettings getBatchingSettings() { + return batchingSettings; + } + + @Override + public Builder setRetryableCodes( + Set retryableCodes) { + super.setRetryableCodes(retryableCodes); + return this; + } + + @Override + public Builder setRetryableCodes( + StatusCode.Code... codes) { + super.setRetryableCodes(codes); + return this; + } + + @Override + public Builder setRetrySettings( + RetrySettings retrySettings) { + super.setRetrySettings(retrySettings); + return this; + } + + @Override + public BatchingCallSettings build() { + return new BatchingCallSettings<>(this); + } + } +} diff --git a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java new file mode 100644 index 000000000..57976f6a0 --- /dev/null +++ b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java @@ -0,0 +1,197 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching.v2; + +import static com.google.api.gax.rpc.testing.FakeBatchableApi.SQUARER_BATCHING_DESC_V2; +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList; +import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable; +import com.google.common.truth.Truth; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +@RunWith(JUnit4.class) +public class BatcherImplTest { + + private Batcher underTest; + private LabeledIntList labeledIntList = new LabeledIntList("Default"); + @Mock private UnaryCallable> unaryCallable; + @Mock private BatchingDescriptor> mockDescriptor; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + underTest = + BatcherImpl.>newBuilder() + .setPrototype(labeledIntList) + .setUnaryCallable(unaryCallable) + .setBatchingDescriptor(mockDescriptor) + .build(); + } + + @After + public void tearDown() throws Exception { + underTest.close(); + } + + /** Verifies element result futures are resolved once RPC is completed. */ + @Test + public void testBatchingSuccess() throws Exception { + when(mockDescriptor.newRequestBuilder(any(LabeledIntList.class))) + .thenReturn(SQUARER_BATCHING_DESC_V2.newRequestBuilder(labeledIntList)); + when(unaryCallable.futureCall(any(LabeledIntList.class))) + .thenReturn(ApiFutures.immediateFuture(Collections.singletonList(16))); + + doAnswer( + new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + List rpcResponse = invocation.getArgument(0); + List> apiFutures = invocation.getArgument(1); + apiFutures.get(0).set(rpcResponse.get(0)); + return null; + } + }) + .when(mockDescriptor) + .splitResponse(Mockito.anyList(), Mockito.>anyList()); + + ApiFuture result = underTest.add(4); + underTest.flush(); + assertThat(result.isDone()).isTrue(); + assertThat(result.get()).isEqualTo(16); + + verify(mockDescriptor, times(1)).newRequestBuilder(labeledIntList); + verify(mockDescriptor, times(1)) + .splitResponse(Mockito.anyList(), Mockito.>anyList()); + verify(unaryCallable, times(1)).futureCall(any(LabeledIntList.class)); + } + + /** Verifies exception occurred at RPC is propagated to element results */ + @Test(expected = ExecutionException.class) + public void testBatchingFailed() throws Exception { + final Exception exception = new RuntimeException(); + when(mockDescriptor.newRequestBuilder(any(LabeledIntList.class))) + .thenReturn(SQUARER_BATCHING_DESC_V2.newRequestBuilder(labeledIntList)); + when(unaryCallable.futureCall(any(LabeledIntList.class))) + .thenReturn(ApiFutures.>immediateFailedFuture(exception)); + doAnswer( + new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + List> apiFutures = invocation.getArgument(1); + apiFutures.get(0).setException(exception); + return null; + } + }) + .when(mockDescriptor) + .splitException(any(Throwable.class), Mockito.>anyList()); + ApiFuture failedResult = underTest.add(5); + underTest.flush(); + verify(mockDescriptor, times(1)).newRequestBuilder(labeledIntList); + + verify(mockDescriptor, times(1)) + .splitException(any(Throwable.class), Mockito.>anyList()); + verify(unaryCallable, times(1)).futureCall(any(LabeledIntList.class)); + assertThat(failedResult.isDone()).isTrue(); + try { + failedResult.get(); + } catch (InterruptedException | ExecutionException e) { + Truth.assertThat(e.getCause()).isInstanceOf(RuntimeException.class); + throw e; + } + } + + /** Tests accumulated element are resolved when {@link Batcher#flush()} is called. */ + @Test + public void testBatchingWithCallable() throws Exception { + underTest = newBatcherInstance(); + int limit = 100; + int batch = 10; + List> resultList = new ArrayList<>(limit); + for (int i = 0; i <= limit; i++) { + resultList.add(underTest.add(i)); + if (i % batch == 0) { + underTest.flush(); + for (int j = i - batch; j >= 0 && j < i; j++) { + Truth.assertThat(resultList.get(j).isDone()).isTrue(); + Truth.assertThat(resultList.get(j).get()).isEqualTo(j * j); + } + } + } + } + + /** Element results are resolved after batch is closed. */ + @Test + public void testBatcherClose() throws Exception { + ApiFuture result; + try (Batcher batcher = newBatcherInstance()) { + result = batcher.add(5); + } + assertThat(result.isDone()).isTrue(); + assertThat(result.get()).isEqualTo(25); + } + + /** Validates exception when batch is called after {@link Batcher#close()}. */ + @Test(expected = IllegalStateException.class) + public void testWhenNoElementAdded() throws Exception { + underTest.close(); + underTest.add(1); + } + + private Batcher newBatcherInstance() { + return BatcherImpl.>newBuilder() + .setPrototype(labeledIntList) + .setUnaryCallable(new LabeledIntSquarerCallable()) + .setBatchingDescriptor(SQUARER_BATCHING_DESC_V2) + .build(); + } +} diff --git a/gax/src/test/java/com/google/api/gax/batching/v2/BatchingCallSettingsTest.java b/gax/src/test/java/com/google/api/gax/batching/v2/BatchingCallSettingsTest.java new file mode 100644 index 000000000..ca63efe62 --- /dev/null +++ b/gax/src/test/java/com/google/api/gax/batching/v2/BatchingCallSettingsTest.java @@ -0,0 +1,126 @@ +/* + * Copyright 2016 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching.v2; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.StatusCode; +import com.google.common.collect.Sets; +import java.util.Set; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +@RunWith(JUnit4.class) +public class BatchingCallSettingsTest { + + @Test + public void testEmptyBuilder() { + @SuppressWarnings("unchecked") + BatchingDescriptor batchingDescriptor = + Mockito.mock(BatchingDescriptor.class); + BatchingCallSettings.Builder builder = + BatchingCallSettings.newBuilder(batchingDescriptor); + + assertThat(builder.getBatchingDescriptor()).isSameAs(batchingDescriptor); + assertThat(builder.getBatchingSettings()).isNull(); + assertThat(builder.getRetryableCodes().size()).isEqualTo(0); + assertThat(builder.getRetrySettings()).isNotNull(); + + BatchingSettings batchingSettings = + BatchingSettings.newBuilder().setElementCountThreshold(1L).build(); + builder.setBatchingSettings(batchingSettings); + BatchingCallSettings settings = builder.build(); + + assertThat(settings.getBatchingDescriptor()).isSameAs(batchingDescriptor); + assertThat(settings.getBatchingSettings()).isSameAs(batchingSettings); + assertThat(settings.getRetryableCodes().size()).isEqualTo(0); + assertThat(settings.getRetrySettings()).isNotNull(); + } + + @Test + public void testBuilder() { + @SuppressWarnings("unchecked") + BatchingDescriptor batchingDescriptor = + Mockito.mock(BatchingDescriptor.class); + BatchingCallSettings.Builder builder = + BatchingCallSettings.newBuilder(batchingDescriptor); + + BatchingSettings batchingSettings = + BatchingSettings.newBuilder().setElementCountThreshold(1L).build(); + Set retryCodes = Sets.newHashSet(StatusCode.Code.UNAVAILABLE); + RetrySettings retrySettings = RetrySettings.newBuilder().build(); + + builder.setBatchingSettings(batchingSettings); + builder.setRetryableCodes(retryCodes); + builder.setRetrySettings(retrySettings); + + assertThat(builder.getBatchingDescriptor()).isSameAs(batchingDescriptor); + assertThat(builder.getBatchingSettings()).isSameAs(batchingSettings); + assertThat(builder.getRetryableCodes().size()).isEqualTo(1); + assertThat(builder.getRetrySettings()).isSameAs(retrySettings); + + BatchingCallSettings settings = builder.build(); + + assertThat(settings.getBatchingDescriptor()).isSameAs(batchingDescriptor); + assertThat(settings.getBatchingSettings()).isSameAs(batchingSettings); + assertThat(settings.getRetryableCodes().size()).isEqualTo(1); + assertThat(settings.getRetrySettings()).isSameAs(retrySettings); + } + + @Test + public void testBuilderFromSettings() throws Exception { + @SuppressWarnings("unchecked") + BatchingDescriptor batchingDescriptor = + Mockito.mock(BatchingDescriptor.class); + BatchingCallSettings.Builder builder = + BatchingCallSettings.newBuilder(batchingDescriptor); + + BatchingSettings batchingSettings = + BatchingSettings.newBuilder().setElementCountThreshold(1L).build(); + Set retryCodes = Sets.newHashSet(StatusCode.Code.UNAVAILABLE); + RetrySettings retrySettings = RetrySettings.newBuilder().build(); + + builder.setBatchingSettings(batchingSettings); + builder.setRetryableCodes(retryCodes); + builder.setRetrySettings(retrySettings); + + BatchingCallSettings settings = builder.build(); + BatchingCallSettings.Builder newBuilder = settings.toBuilder(); + + assertThat(newBuilder.getBatchingDescriptor()).isSameAs(batchingDescriptor); + assertThat(newBuilder.getBatchingSettings()).isSameAs(batchingSettings); + assertThat(newBuilder.getRetryableCodes().size()).isEqualTo(1); + assertThat(newBuilder.getRetrySettings()).isSameAs(retrySettings); + } +} diff --git a/gax/src/test/java/com/google/api/gax/rpc/testing/FakeBatchableApi.java b/gax/src/test/java/com/google/api/gax/rpc/testing/FakeBatchableApi.java index a25aa8857..ccaeca651 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/testing/FakeBatchableApi.java +++ b/gax/src/test/java/com/google/api/gax/rpc/testing/FakeBatchableApi.java @@ -32,6 +32,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.InternalApi; +import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.PartitionKey; import com.google.api.gax.batching.RequestBuilder; import com.google.api.gax.rpc.ApiCallContext; @@ -82,6 +83,10 @@ public int hashCode() { result = 31 * result + ints.hashCode(); return result; } + + public LabeledIntList toBuilder() { + return new LabeledIntList(label, new ArrayList<>(ints)); + } } public static LabeledIntSquarerCallable callLabeledIntSquarer = new LabeledIntSquarerCallable(); @@ -171,4 +176,49 @@ public long countBytes(LabeledIntList request) { return Math.min(counter, 5); } } + + public static SquarerBatchingDescriptorV2 SQUARER_BATCHING_DESC_V2 = + new SquarerBatchingDescriptorV2(); + + public static class SquarerBatchingDescriptorV2 + implements com.google.api.gax.batching.v2.BatchingDescriptor< + Integer, Integer, LabeledIntList, List> { + + @Override + public com.google.api.gax.batching.v2.RequestBuilder newRequestBuilder( + final LabeledIntList prototype) { + return new com.google.api.gax.batching.v2.RequestBuilder() { + final LabeledIntList labelList = prototype.toBuilder(); + + @Override + public void add(Integer element) { + labelList.ints.add(element); + } + + @Override + public LabeledIntList build() { + return labelList; + } + }; + } + + @Override + public void splitResponse(List batchResponse, List> batch) { + for (int i = 0; i < batchResponse.size(); i++) { + batch.get(i).set(batchResponse.get(i)); + } + } + + @Override + public void splitException(Throwable throwable, List> batch) { + for (SettableApiFuture result : batch) { + result.setException(throwable); + } + } + + @Override + public long countBytes(Integer element) { + return String.valueOf(element).length(); + } + } } From 7de2c9939b65d72c781bc57262d90077bcd2503e Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Thu, 30 May 2019 17:06:02 +0530 Subject: [PATCH 2/9] Addressing feedback comments for `BatcherImpl.java` and removing `BatchingCallSettings.java` --- .../api/gax/batching/v2/BatcherImpl.java | 70 +++-- .../gax/batching/v2/BatchingCallSettings.java | 141 ---------- .../api/gax/batching/v2/BatcherImplTest.java | 243 ++++++++++-------- .../batching/v2/BatchingCallSettingsTest.java | 126 --------- .../api/gax/rpc/testing/FakeBatchableApi.java | 8 +- 5 files changed, 190 insertions(+), 398 deletions(-) delete mode 100644 gax/src/main/java/com/google/api/gax/batching/v2/BatchingCallSettings.java delete mode 100644 gax/src/test/java/com/google/api/gax/batching/v2/BatchingCallSettingsTest.java diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java index 462dd852f..be1512a2e 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java @@ -29,44 +29,52 @@ */ package com.google.api.gax.batching.v2; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.core.BetaApi; +import com.google.api.core.InternalExtensionOnly; import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.UnaryCallable; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.LockSupport; /** - * Queues up the elements until {@link #flush()} is called, Once batching is finished returned + * Queues up the elements until {@link #flush()} is called, once batching is finished returned * future gets resolves. * - *

This class is not thread-safe, and requires calling classes to make it thread safe. + *

This class is not thread-safe, and expects to be used from a single thread. */ +@BetaApi("The surface for batching is not stable yet and may change in the future.") +@InternalExtensionOnly("For google-cloud-java client use only.") public class BatcherImpl implements Batcher { /** The amount of time to wait before checking responses are received or not. */ - private static final long DEFAULT_FINISH_WAIT_NANOS = 250_000_000; - private final BatchingDescriptor batchingDescriptor; + private final UnaryCallable callable; private final RequestT prototype; private final AtomicInteger numOfRpcs = new AtomicInteger(0); - private Batch batch; + private final AtomicBoolean isFlushed = new AtomicBoolean(false); + private final Semaphore semaphore = new Semaphore(0); + private Batch batch; private boolean isClosed = false; private BatcherImpl(Builder builder) { - this.prototype = Preconditions.checkNotNull(builder.prototype); - this.callable = Preconditions.checkNotNull(builder.unaryCallable); - this.batchingDescriptor = Preconditions.checkNotNull(builder.batchingDescriptor); + this.prototype = checkNotNull(builder.prototype, "RequestPrototype cannot be null."); + this.callable = checkNotNull(builder.unaryCallable, "UnaryCallable cannot be null."); + this.batchingDescriptor = + checkNotNull(builder.batchingDescriptor, "BatchingDescriptor cannot be null."); } /** Builder for a BatcherImpl. */ @@ -106,11 +114,11 @@ Builder newBuilder() { /** {@inheritDoc} */ @Override - public ApiFuture add(final ElementT element) { - Preconditions.checkState(!isClosed, "Cannot perform batching on a closed connection"); + public ApiFuture add(ElementT element) { + Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher."); if (batch == null) { - batch = new Batch(batchingDescriptor.newRequestBuilder(prototype)); + batch = new Batch<>(batchingDescriptor.newRequestBuilder(prototype)); } SettableApiFuture result = SettableApiFuture.create(); @@ -122,17 +130,18 @@ public ApiFuture add(final ElementT element) { @Override public void flush() throws InterruptedException { sendBatch(); - while (numOfRpcs.get() > 0) { - LockSupport.parkNanos(DEFAULT_FINISH_WAIT_NANOS); + isFlushed.compareAndSet(false, true); + if (numOfRpcs.get() > 0) { + semaphore.acquire(); } } - /** sends accumulated elements asynchronously for batching. */ + /** Sends accumulated elements asynchronously for batching. */ private void sendBatch() { if (batch == null) { return; } - final Batch accumulatedBatch = batch; + final Batch accumulatedBatch = batch; batch = null; numOfRpcs.incrementAndGet(); @@ -144,21 +153,38 @@ private void sendBatch() { new ApiFutureCallback() { @Override public void onSuccess(ResponseT response) { - batchingDescriptor.splitResponse(response, accumulatedBatch.results); - onCompletion(); + try { + batchingDescriptor.splitResponse(response, accumulatedBatch.results); + } catch (Throwable ex) { + for (SettableApiFuture result : accumulatedBatch.results) { + result.setException(ex); + } + } finally { + onCompletion(); + } } @Override public void onFailure(Throwable throwable) { - batchingDescriptor.splitException(throwable, accumulatedBatch.results); - onCompletion(); + try { + batchingDescriptor.splitException(throwable, accumulatedBatch.results); + } catch (Throwable ex) { + for (SettableApiFuture result : accumulatedBatch.results) { + result.setException(ex); + } + } finally { + onCompletion(); + } } }, directExecutor()); } private void onCompletion() { - numOfRpcs.decrementAndGet(); + if (numOfRpcs.decrementAndGet() == 0 && isFlushed.get()) { + semaphore.release(); + isFlushed.compareAndSet(true, false); + } } /** {@inheritDoc} */ @@ -172,7 +198,7 @@ public void close() throws InterruptedException { * This class represent one logical Batch. It accumulates all the elements and it's corresponding * future element results for one batch. */ - class Batch { + private static class Batch { private final RequestBuilder builder; private final List> results; diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatchingCallSettings.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatchingCallSettings.java deleted file mode 100644 index 20044bc6d..000000000 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatchingCallSettings.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright 2019 Google LLC - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google LLC nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package com.google.api.gax.batching.v2; - -import com.google.api.core.BetaApi; -import com.google.api.core.InternalExtensionOnly; -import com.google.api.gax.batching.BatchingSettings; -import com.google.api.gax.retrying.RetrySettings; -import com.google.api.gax.rpc.StatusCode; -import com.google.api.gax.rpc.UnaryCallSettings; -import com.google.common.base.Preconditions; -import java.util.Set; - -/** - * This is an extension to {@link com.google.api.gax.rpc.BatchingCallSettings}, to support for - * {@link ElementT} and {@link ElementResultT} types. - * - * @param The request type to perform batching. - * @param The response type of an entry object. - * @param The request wrapper type which bundles {@link ElementT}. - * @param The response wrapper type which bundles {@link ElementResultT}. - */ -@BetaApi("The surface for batching is not stable yet and may change in the future.") -@InternalExtensionOnly -public final class BatchingCallSettings - extends UnaryCallSettings { - private final BatchingDescriptor - batchingDescriptor; - private final BatchingSettings batchingSettings; - - private BatchingCallSettings(Builder builder) { - super(builder); - this.batchingDescriptor = builder.batchingDescriptor; - this.batchingSettings = Preconditions.checkNotNull(builder.batchingSettings); - } - - public BatchingDescriptor getBatchingDescriptor() { - return batchingDescriptor; - } - - public BatchingSettings getBatchingSettings() { - return batchingSettings; - } - - public static - Builder newBuilder( - BatchingDescriptor batchingDescriptor) { - return new Builder<>(batchingDescriptor); - } - - @Override - public final Builder toBuilder() { - return new Builder<>(this); - } - - public static class Builder - extends UnaryCallSettings.Builder { - - private BatchingDescriptor batchingDescriptor; - private BatchingSettings batchingSettings; - - private Builder( - BatchingDescriptor batchingDescriptor) { - this.batchingDescriptor = batchingDescriptor; - } - - private Builder(BatchingCallSettings settings) { - super(settings); - this.batchingDescriptor = settings.batchingDescriptor; - this.batchingSettings = settings.batchingSettings; - } - - public BatchingDescriptor - getBatchingDescriptor() { - return batchingDescriptor; - } - - public Builder setBatchingSettings( - BatchingSettings batchingSettings) { - this.batchingSettings = batchingSettings; - return this; - } - - public BatchingSettings getBatchingSettings() { - return batchingSettings; - } - - @Override - public Builder setRetryableCodes( - Set retryableCodes) { - super.setRetryableCodes(retryableCodes); - return this; - } - - @Override - public Builder setRetryableCodes( - StatusCode.Code... codes) { - super.setRetryableCodes(codes); - return this; - } - - @Override - public Builder setRetrySettings( - RetrySettings retrySettings) { - super.setRetrySettings(retrySettings); - return this; - } - - @Override - public BatchingCallSettings build() { - return new BatchingCallSettings<>(this); - } - } -} diff --git a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java index 57976f6a0..7e2be8f83 100644 --- a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java @@ -30,167 +30,200 @@ package com.google.api.gax.batching.v2; import static com.google.api.gax.rpc.testing.FakeBatchableApi.SQUARER_BATCHING_DESC_V2; +import static com.google.api.gax.rpc.testing.FakeBatchableApi.callLabeledIntSquarer; import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.UnaryCallable; import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList; -import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable; -import com.google.common.truth.Truth; -import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; -import org.junit.After; -import org.junit.Before; +import java.util.concurrent.Future; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; @RunWith(JUnit4.class) public class BatcherImplTest { + @Rule public MockitoRule rule = MockitoJUnit.rule(); + @Mock private UnaryCallable> mockUnaryCallable; + @Mock private BatchingDescriptor> mockDescriptor; + private Batcher underTest; private LabeledIntList labeledIntList = new LabeledIntList("Default"); - @Mock private UnaryCallable> unaryCallable; - @Mock private BatchingDescriptor> mockDescriptor; - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - underTest = - BatcherImpl.>newBuilder() - .setPrototype(labeledIntList) - .setUnaryCallable(unaryCallable) - .setBatchingDescriptor(mockDescriptor) - .build(); + /** Tests accumulated element are resolved when {@link Batcher#flush()} is called. */ + @Test + public void testResultsAreResolvedAfterFlush() throws Exception { + underTest = newBatcherInstance(); + Future result = underTest.add(4); + assertThat(result.isDone()).isFalse(); + underTest.flush(); + assertThat(result.isDone()).isTrue(); + assertThat(result.get()).isEqualTo(16); + + Future anotherResult = underTest.add(5); + assertThat(anotherResult.isDone()).isFalse(); + } + + /** Element results are resolved after batch is closed. */ + @Test + public void testWhenBatcherIsClose() throws Exception { + Future result; + try (Batcher batcher = newBatcherInstance()) { + result = batcher.add(5); + } + assertThat(result.isDone()).isTrue(); + assertThat(result.get()).isEqualTo(25); } - @After - public void tearDown() throws Exception { + /** Validates exception when batch is called after {@link Batcher#close()}. */ + @Test + public void testNoElementAdditionAfterClose() throws Exception { + underTest = newBatcherInstance(); underTest.close(); + Throwable actualError = null; + try { + underTest.add(1); + } catch (Exception ex) { + actualError = ex; + } + assertThat(actualError).isInstanceOf(IllegalStateException.class); + assertThat(actualError.getMessage()).matches("Cannot add elements on a closed batcher."); } - /** Verifies element result futures are resolved once RPC is completed. */ + /** Verifies unaryCallable is being called with a batch. */ @Test - public void testBatchingSuccess() throws Exception { - when(mockDescriptor.newRequestBuilder(any(LabeledIntList.class))) - .thenReturn(SQUARER_BATCHING_DESC_V2.newRequestBuilder(labeledIntList)); - when(unaryCallable.futureCall(any(LabeledIntList.class))) - .thenReturn(ApiFutures.immediateFuture(Collections.singletonList(16))); - - doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - List rpcResponse = invocation.getArgument(0); - List> apiFutures = invocation.getArgument(1); - apiFutures.get(0).set(rpcResponse.get(0)); - return null; - } - }) - .when(mockDescriptor) - .splitResponse(Mockito.anyList(), Mockito.>anyList()); + public void testResultsAfterRPCSucceed() throws Exception { + underTest = + BatcherImpl.>newBuilder() + .setBatchingDescriptor(SQUARER_BATCHING_DESC_V2) + .setUnaryCallable(mockUnaryCallable) + .setPrototype(labeledIntList) + .build(); + when(mockUnaryCallable.futureCall(any(LabeledIntList.class))) + .thenReturn(ApiFutures.immediateFuture(Arrays.asList(16, 25))); - ApiFuture result = underTest.add(4); + Future result = underTest.add(4); + Future anotherResult = underTest.add(5); underTest.flush(); + assertThat(result.isDone()).isTrue(); assertThat(result.get()).isEqualTo(16); - - verify(mockDescriptor, times(1)).newRequestBuilder(labeledIntList); - verify(mockDescriptor, times(1)) - .splitResponse(Mockito.anyList(), Mockito.>anyList()); - verify(unaryCallable, times(1)).futureCall(any(LabeledIntList.class)); + assertThat(anotherResult.get()).isEqualTo(25); + verify(mockUnaryCallable, times(1)).futureCall(any(LabeledIntList.class)); } /** Verifies exception occurred at RPC is propagated to element results */ - @Test(expected = ExecutionException.class) - public void testBatchingFailed() throws Exception { - final Exception exception = new RuntimeException(); - when(mockDescriptor.newRequestBuilder(any(LabeledIntList.class))) - .thenReturn(SQUARER_BATCHING_DESC_V2.newRequestBuilder(labeledIntList)); - when(unaryCallable.futureCall(any(LabeledIntList.class))) - .thenReturn(ApiFutures.>immediateFailedFuture(exception)); - doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - List> apiFutures = invocation.getArgument(1); - apiFutures.get(0).setException(exception); - return null; - } - }) - .when(mockDescriptor) - .splitException(any(Throwable.class), Mockito.>anyList()); - ApiFuture failedResult = underTest.add(5); - underTest.flush(); - verify(mockDescriptor, times(1)).newRequestBuilder(labeledIntList); + @Test + public void testResultFailureAfterRPCFailure() throws Exception { + underTest = + BatcherImpl.>newBuilder() + .setBatchingDescriptor(SQUARER_BATCHING_DESC_V2) + .setUnaryCallable(mockUnaryCallable) + .setPrototype(labeledIntList) + .build(); + final Exception fakeError = new RuntimeException(); - verify(mockDescriptor, times(1)) - .splitException(any(Throwable.class), Mockito.>anyList()); - verify(unaryCallable, times(1)).futureCall(any(LabeledIntList.class)); + when(mockUnaryCallable.futureCall(any(LabeledIntList.class))) + .thenReturn(ApiFutures.>immediateFailedFuture(fakeError)); + + Future failedResult = underTest.add(5); + underTest.flush(); assertThat(failedResult.isDone()).isTrue(); + Throwable actualError = null; try { failedResult.get(); - } catch (InterruptedException | ExecutionException e) { - Truth.assertThat(e.getCause()).isInstanceOf(RuntimeException.class); - throw e; + } catch (InterruptedException | ExecutionException ex) { + actualError = ex; } + + assertThat(actualError.getCause()).isSameAs(fakeError); + verify(mockUnaryCallable, times(1)).futureCall(any(LabeledIntList.class)); } - /** Tests accumulated element are resolved when {@link Batcher#flush()} is called. */ + /** Tests results are resolves when {@link BatchingDescriptor#splitResponse} throws exception. */ @Test - public void testBatchingWithCallable() throws Exception { - underTest = newBatcherInstance(); - int limit = 100; - int batch = 10; - List> resultList = new ArrayList<>(limit); - for (int i = 0; i <= limit; i++) { - resultList.add(underTest.add(i)); - if (i % batch == 0) { - underTest.flush(); - for (int j = i - batch; j >= 0 && j < i; j++) { - Truth.assertThat(resultList.get(j).isDone()).isTrue(); - Truth.assertThat(resultList.get(j).get()).isEqualTo(j * j); - } - } + public void testExceptionDescriptorResultHandling() throws InterruptedException { + underTest = + BatcherImpl.>newBuilder() + .setBatchingDescriptor(mockDescriptor) + .setUnaryCallable(callLabeledIntSquarer) + .setPrototype(labeledIntList) + .build(); + final RuntimeException fakeError = new RuntimeException("internal exception"); + + when(mockDescriptor.newRequestBuilder(any(LabeledIntList.class))) + .thenReturn(SQUARER_BATCHING_DESC_V2.newRequestBuilder(labeledIntList)); + doThrow(fakeError) + .when(mockDescriptor) + .splitResponse(Mockito.anyList(), Mockito.>anyList()); + + Future result = underTest.add(2); + underTest.flush(); + Throwable actualError = null; + try { + result.get(); + } catch (ExecutionException ex) { + actualError = ex; } + + assertThat(actualError.getCause()).isSameAs(fakeError); + verify(mockDescriptor) + .splitResponse(Mockito.anyList(), Mockito.>anyList()); } - /** Element results are resolved after batch is closed. */ + /** Tests results are resolves when {@link BatchingDescriptor#splitException} throws exception. */ @Test - public void testBatcherClose() throws Exception { - ApiFuture result; - try (Batcher batcher = newBatcherInstance()) { - result = batcher.add(5); + public void testExceptionInDescriptorErrorHandling() throws InterruptedException { + underTest = + BatcherImpl.>newBuilder() + .setBatchingDescriptor(mockDescriptor) + .setUnaryCallable(mockUnaryCallable) + .setPrototype(labeledIntList) + .build(); + final RuntimeException fakeRpcError = new RuntimeException("RPC error"); + final RuntimeException fakeError = new RuntimeException("internal exception"); + + when(mockUnaryCallable.futureCall(any(LabeledIntList.class))) + .thenReturn(ApiFutures.>immediateFailedFuture(fakeRpcError)); + when(mockDescriptor.newRequestBuilder(any(LabeledIntList.class))) + .thenReturn(SQUARER_BATCHING_DESC_V2.newRequestBuilder(labeledIntList)); + doThrow(fakeError) + .when(mockDescriptor) + .splitException(any(Throwable.class), Mockito.>anyList()); + + Future result = underTest.add(2); + underTest.flush(); + Throwable actualError = null; + try { + result.get(); + } catch (ExecutionException ex) { + actualError = ex; } - assertThat(result.isDone()).isTrue(); - assertThat(result.get()).isEqualTo(25); - } - /** Validates exception when batch is called after {@link Batcher#close()}. */ - @Test(expected = IllegalStateException.class) - public void testWhenNoElementAdded() throws Exception { - underTest.close(); - underTest.add(1); + assertThat(actualError.getCause()).isSameAs(fakeError); + verify(mockDescriptor) + .splitException(any(Throwable.class), Mockito.>anyList()); } private Batcher newBatcherInstance() { return BatcherImpl.>newBuilder() .setPrototype(labeledIntList) - .setUnaryCallable(new LabeledIntSquarerCallable()) + .setUnaryCallable(callLabeledIntSquarer) .setBatchingDescriptor(SQUARER_BATCHING_DESC_V2) .build(); } diff --git a/gax/src/test/java/com/google/api/gax/batching/v2/BatchingCallSettingsTest.java b/gax/src/test/java/com/google/api/gax/batching/v2/BatchingCallSettingsTest.java deleted file mode 100644 index ca63efe62..000000000 --- a/gax/src/test/java/com/google/api/gax/batching/v2/BatchingCallSettingsTest.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright 2016 Google LLC - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google LLC nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package com.google.api.gax.batching.v2; - -import static com.google.common.truth.Truth.assertThat; - -import com.google.api.gax.batching.BatchingSettings; -import com.google.api.gax.retrying.RetrySettings; -import com.google.api.gax.rpc.StatusCode; -import com.google.common.collect.Sets; -import java.util.Set; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mockito; - -@RunWith(JUnit4.class) -public class BatchingCallSettingsTest { - - @Test - public void testEmptyBuilder() { - @SuppressWarnings("unchecked") - BatchingDescriptor batchingDescriptor = - Mockito.mock(BatchingDescriptor.class); - BatchingCallSettings.Builder builder = - BatchingCallSettings.newBuilder(batchingDescriptor); - - assertThat(builder.getBatchingDescriptor()).isSameAs(batchingDescriptor); - assertThat(builder.getBatchingSettings()).isNull(); - assertThat(builder.getRetryableCodes().size()).isEqualTo(0); - assertThat(builder.getRetrySettings()).isNotNull(); - - BatchingSettings batchingSettings = - BatchingSettings.newBuilder().setElementCountThreshold(1L).build(); - builder.setBatchingSettings(batchingSettings); - BatchingCallSettings settings = builder.build(); - - assertThat(settings.getBatchingDescriptor()).isSameAs(batchingDescriptor); - assertThat(settings.getBatchingSettings()).isSameAs(batchingSettings); - assertThat(settings.getRetryableCodes().size()).isEqualTo(0); - assertThat(settings.getRetrySettings()).isNotNull(); - } - - @Test - public void testBuilder() { - @SuppressWarnings("unchecked") - BatchingDescriptor batchingDescriptor = - Mockito.mock(BatchingDescriptor.class); - BatchingCallSettings.Builder builder = - BatchingCallSettings.newBuilder(batchingDescriptor); - - BatchingSettings batchingSettings = - BatchingSettings.newBuilder().setElementCountThreshold(1L).build(); - Set retryCodes = Sets.newHashSet(StatusCode.Code.UNAVAILABLE); - RetrySettings retrySettings = RetrySettings.newBuilder().build(); - - builder.setBatchingSettings(batchingSettings); - builder.setRetryableCodes(retryCodes); - builder.setRetrySettings(retrySettings); - - assertThat(builder.getBatchingDescriptor()).isSameAs(batchingDescriptor); - assertThat(builder.getBatchingSettings()).isSameAs(batchingSettings); - assertThat(builder.getRetryableCodes().size()).isEqualTo(1); - assertThat(builder.getRetrySettings()).isSameAs(retrySettings); - - BatchingCallSettings settings = builder.build(); - - assertThat(settings.getBatchingDescriptor()).isSameAs(batchingDescriptor); - assertThat(settings.getBatchingSettings()).isSameAs(batchingSettings); - assertThat(settings.getRetryableCodes().size()).isEqualTo(1); - assertThat(settings.getRetrySettings()).isSameAs(retrySettings); - } - - @Test - public void testBuilderFromSettings() throws Exception { - @SuppressWarnings("unchecked") - BatchingDescriptor batchingDescriptor = - Mockito.mock(BatchingDescriptor.class); - BatchingCallSettings.Builder builder = - BatchingCallSettings.newBuilder(batchingDescriptor); - - BatchingSettings batchingSettings = - BatchingSettings.newBuilder().setElementCountThreshold(1L).build(); - Set retryCodes = Sets.newHashSet(StatusCode.Code.UNAVAILABLE); - RetrySettings retrySettings = RetrySettings.newBuilder().build(); - - builder.setBatchingSettings(batchingSettings); - builder.setRetryableCodes(retryCodes); - builder.setRetrySettings(retrySettings); - - BatchingCallSettings settings = builder.build(); - BatchingCallSettings.Builder newBuilder = settings.toBuilder(); - - assertThat(newBuilder.getBatchingDescriptor()).isSameAs(batchingDescriptor); - assertThat(newBuilder.getBatchingSettings()).isSameAs(batchingSettings); - assertThat(newBuilder.getRetryableCodes().size()).isEqualTo(1); - assertThat(newBuilder.getRetrySettings()).isSameAs(retrySettings); - } -} diff --git a/gax/src/test/java/com/google/api/gax/rpc/testing/FakeBatchableApi.java b/gax/src/test/java/com/google/api/gax/rpc/testing/FakeBatchableApi.java index ccaeca651..e96fd56d7 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/testing/FakeBatchableApi.java +++ b/gax/src/test/java/com/google/api/gax/rpc/testing/FakeBatchableApi.java @@ -84,8 +84,8 @@ public int hashCode() { return result; } - public LabeledIntList toBuilder() { - return new LabeledIntList(label, new ArrayList<>(ints)); + public LabeledIntList clone() { + return new LabeledIntList(this.label, new ArrayList<>(this.ints)); } } @@ -188,7 +188,7 @@ public static class SquarerBatchingDescriptorV2 public com.google.api.gax.batching.v2.RequestBuilder newRequestBuilder( final LabeledIntList prototype) { return new com.google.api.gax.batching.v2.RequestBuilder() { - final LabeledIntList labelList = prototype.toBuilder(); + final LabeledIntList labelList = prototype.clone(); @Override public void add(Integer element) { @@ -218,7 +218,7 @@ public void splitException(Throwable throwable, List> @Override public long countBytes(Integer element) { - return String.valueOf(element).length(); + return 1; } } } From 97b931d53c3996c48f74915b7a2968908357cd5b Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Tue, 4 Jun 2019 14:16:25 +0530 Subject: [PATCH 3/9] Refactor batch variables names and junit util method --- .../api/gax/batching/v2/BatcherImpl.java | 14 +++---- .../api/gax/batching/v2/BatcherImplTest.java | 40 ++++++------------- 2 files changed, 19 insertions(+), 35 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java index be1512a2e..e467607df 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java @@ -67,7 +67,7 @@ public class BatcherImpl private final AtomicInteger numOfRpcs = new AtomicInteger(0); private final AtomicBoolean isFlushed = new AtomicBoolean(false); private final Semaphore semaphore = new Semaphore(0); - private Batch batch; + private Batch currentOpenBatch; private boolean isClosed = false; private BatcherImpl(Builder builder) { @@ -117,12 +117,12 @@ Builder newBuilder() { public ApiFuture add(ElementT element) { Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher."); - if (batch == null) { - batch = new Batch<>(batchingDescriptor.newRequestBuilder(prototype)); + if (currentOpenBatch == null) { + currentOpenBatch = new Batch<>(batchingDescriptor.newRequestBuilder(prototype)); } SettableApiFuture result = SettableApiFuture.create(); - batch.add(element, result); + currentOpenBatch.add(element, result); return result; } @@ -138,11 +138,11 @@ public void flush() throws InterruptedException { /** Sends accumulated elements asynchronously for batching. */ private void sendBatch() { - if (batch == null) { + if (currentOpenBatch == null) { return; } - final Batch accumulatedBatch = batch; - batch = null; + final Batch accumulatedBatch = currentOpenBatch; + currentOpenBatch = null; numOfRpcs.incrementAndGet(); final ApiFuture batchResponse = diff --git a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java index 7e2be8f83..ee8b091db 100644 --- a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java @@ -68,7 +68,7 @@ public class BatcherImplTest { /** Tests accumulated element are resolved when {@link Batcher#flush()} is called. */ @Test public void testResultsAreResolvedAfterFlush() throws Exception { - underTest = newBatcherInstance(); + underTest = createNewBatcherBuilder().build(); Future result = underTest.add(4); assertThat(result.isDone()).isFalse(); underTest.flush(); @@ -83,7 +83,7 @@ public void testResultsAreResolvedAfterFlush() throws Exception { @Test public void testWhenBatcherIsClose() throws Exception { Future result; - try (Batcher batcher = newBatcherInstance()) { + try (Batcher batcher = createNewBatcherBuilder().build()) { result = batcher.add(5); } assertThat(result.isDone()).isTrue(); @@ -93,7 +93,7 @@ public void testWhenBatcherIsClose() throws Exception { /** Validates exception when batch is called after {@link Batcher#close()}. */ @Test public void testNoElementAdditionAfterClose() throws Exception { - underTest = newBatcherInstance(); + underTest = createNewBatcherBuilder().build(); underTest.close(); Throwable actualError = null; try { @@ -108,12 +108,7 @@ public void testNoElementAdditionAfterClose() throws Exception { /** Verifies unaryCallable is being called with a batch. */ @Test public void testResultsAfterRPCSucceed() throws Exception { - underTest = - BatcherImpl.>newBuilder() - .setBatchingDescriptor(SQUARER_BATCHING_DESC_V2) - .setUnaryCallable(mockUnaryCallable) - .setPrototype(labeledIntList) - .build(); + underTest = createNewBatcherBuilder().setUnaryCallable(mockUnaryCallable).build(); when(mockUnaryCallable.futureCall(any(LabeledIntList.class))) .thenReturn(ApiFutures.immediateFuture(Arrays.asList(16, 25))); @@ -130,12 +125,7 @@ public void testResultsAfterRPCSucceed() throws Exception { /** Verifies exception occurred at RPC is propagated to element results */ @Test public void testResultFailureAfterRPCFailure() throws Exception { - underTest = - BatcherImpl.>newBuilder() - .setBatchingDescriptor(SQUARER_BATCHING_DESC_V2) - .setUnaryCallable(mockUnaryCallable) - .setPrototype(labeledIntList) - .build(); + underTest = createNewBatcherBuilder().setUnaryCallable(mockUnaryCallable).build(); final Exception fakeError = new RuntimeException(); when(mockUnaryCallable.futureCall(any(LabeledIntList.class))) @@ -158,14 +148,9 @@ public void testResultFailureAfterRPCFailure() throws Exception { /** Tests results are resolves when {@link BatchingDescriptor#splitResponse} throws exception. */ @Test public void testExceptionDescriptorResultHandling() throws InterruptedException { - underTest = - BatcherImpl.>newBuilder() - .setBatchingDescriptor(mockDescriptor) - .setUnaryCallable(callLabeledIntSquarer) - .setPrototype(labeledIntList) - .build(); - final RuntimeException fakeError = new RuntimeException("internal exception"); + underTest = createNewBatcherBuilder().setBatchingDescriptor(mockDescriptor).build(); + final RuntimeException fakeError = new RuntimeException("internal exception"); when(mockDescriptor.newRequestBuilder(any(LabeledIntList.class))) .thenReturn(SQUARER_BATCHING_DESC_V2.newRequestBuilder(labeledIntList)); doThrow(fakeError) @@ -190,14 +175,13 @@ public void testExceptionDescriptorResultHandling() throws InterruptedException @Test public void testExceptionInDescriptorErrorHandling() throws InterruptedException { underTest = - BatcherImpl.>newBuilder() + createNewBatcherBuilder() .setBatchingDescriptor(mockDescriptor) .setUnaryCallable(mockUnaryCallable) - .setPrototype(labeledIntList) .build(); + final RuntimeException fakeRpcError = new RuntimeException("RPC error"); final RuntimeException fakeError = new RuntimeException("internal exception"); - when(mockUnaryCallable.futureCall(any(LabeledIntList.class))) .thenReturn(ApiFutures.>immediateFailedFuture(fakeRpcError)); when(mockDescriptor.newRequestBuilder(any(LabeledIntList.class))) @@ -220,11 +204,11 @@ public void testExceptionInDescriptorErrorHandling() throws InterruptedException .splitException(any(Throwable.class), Mockito.>anyList()); } - private Batcher newBatcherInstance() { + private BatcherImpl.Builder> + createNewBatcherBuilder() { return BatcherImpl.>newBuilder() .setPrototype(labeledIntList) .setUnaryCallable(callLabeledIntSquarer) - .setBatchingDescriptor(SQUARER_BATCHING_DESC_V2) - .build(); + .setBatchingDescriptor(SQUARER_BATCHING_DESC_V2); } } From bc98e369d31104f035aa9fff6631b28eac2a1135 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Thu, 6 Jun 2019 16:10:52 +0530 Subject: [PATCH 4/9] Updated with while await for batches in flush() --- .../api/gax/batching/v2/BatcherImpl.java | 48 ++++++++++--------- .../api/gax/batching/v2/BatcherImplTest.java | 2 +- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java index e467607df..cd741ff8e 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java @@ -36,38 +36,36 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.core.BetaApi; -import com.google.api.core.InternalExtensionOnly; +import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.UnaryCallable; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** - * Queues up the elements until {@link #flush()} is called, once batching is finished returned - * future gets resolves. + * Queues up the elements until {@link #flush()} is called; once batching is over, returned future + * resolves. * *

This class is not thread-safe, and expects to be used from a single thread. */ @BetaApi("The surface for batching is not stable yet and may change in the future.") -@InternalExtensionOnly("For google-cloud-java client use only.") +@InternalApi public class BatcherImpl implements Batcher { /** The amount of time to wait before checking responses are received or not. */ + private static final long DEFAULT_WAIT_TIME_MS = 250; + private final BatchingDescriptor batchingDescriptor; - private final UnaryCallable callable; private final RequestT prototype; - - private final AtomicInteger numOfRpcs = new AtomicInteger(0); - private final AtomicBoolean isFlushed = new AtomicBoolean(false); - private final Semaphore semaphore = new Semaphore(0); private Batch currentOpenBatch; + + private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0); + private final Object flushLock = new Object(); private boolean isClosed = false; private BatcherImpl(Builder builder) { @@ -130,10 +128,7 @@ public ApiFuture add(ElementT element) { @Override public void flush() throws InterruptedException { sendBatch(); - isFlushed.compareAndSet(false, true); - if (numOfRpcs.get() > 0) { - semaphore.acquire(); - } + awaitAllOutstandingBatches(); } /** Sends accumulated elements asynchronously for batching. */ @@ -143,7 +138,7 @@ private void sendBatch() { } final Batch accumulatedBatch = currentOpenBatch; currentOpenBatch = null; - numOfRpcs.incrementAndGet(); + numOfOutstandingBatches.incrementAndGet(); final ApiFuture batchResponse = callable.futureCall(accumulatedBatch.builder.build()); @@ -160,7 +155,7 @@ public void onSuccess(ResponseT response) { result.setException(ex); } } finally { - onCompletion(); + onBatchCompletion(); } } @@ -173,17 +168,26 @@ public void onFailure(Throwable throwable) { result.setException(ex); } } finally { - onCompletion(); + onBatchCompletion(); } } }, directExecutor()); } - private void onCompletion() { - if (numOfRpcs.decrementAndGet() == 0 && isFlushed.get()) { - semaphore.release(); - isFlushed.compareAndSet(true, false); + private void onBatchCompletion() { + if (numOfOutstandingBatches.decrementAndGet() == 0) { + synchronized (flushLock) { + flushLock.notifyAll(); + } + } + } + + private void awaitAllOutstandingBatches() throws InterruptedException { + while (numOfOutstandingBatches.get() > 0) { + synchronized (flushLock) { + flushLock.wait(DEFAULT_WAIT_TIME_MS); + } } } diff --git a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java index ee8b091db..0c6c171dd 100644 --- a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java @@ -65,7 +65,7 @@ public class BatcherImplTest { private Batcher underTest; private LabeledIntList labeledIntList = new LabeledIntList("Default"); - /** Tests accumulated element are resolved when {@link Batcher#flush()} is called. */ + /** The accumulated results in the test are resolved when {@link Batcher#flush()} is called. */ @Test public void testResultsAreResolvedAfterFlush() throws Exception { underTest = createNewBatcherBuilder().build(); From 6383a042ca71697560d12417b090bac1ae5ed24b Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Fri, 7 Jun 2019 14:25:48 +0530 Subject: [PATCH 5/9] Address language related feedback --- .../java/com/google/api/gax/batching/v2/BatcherImpl.java | 2 +- .../com/google/api/gax/batching/v2/BatcherImplTest.java | 6 +++--- .../com/google/api/gax/rpc/testing/FakeBatchableApi.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java index cd741ff8e..ad1f1ad88 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java @@ -199,7 +199,7 @@ public void close() throws InterruptedException { } /** - * This class represent one logical Batch. It accumulates all the elements and it's corresponding + * This class represent one logical Batch. It accumulates all the elements and their corresponding * future element results for one batch. */ private static class Batch { diff --git a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java index 0c6c171dd..ef021c86b 100644 --- a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java @@ -145,9 +145,9 @@ public void testResultFailureAfterRPCFailure() throws Exception { verify(mockUnaryCallable, times(1)).futureCall(any(LabeledIntList.class)); } - /** Tests results are resolves when {@link BatchingDescriptor#splitResponse} throws exception. */ + /** Resolves future results when {@link BatchingDescriptor#splitResponse} throws exception. */ @Test - public void testExceptionDescriptorResultHandling() throws InterruptedException { + public void testExceptionInDescriptor() throws InterruptedException { underTest = createNewBatcherBuilder().setBatchingDescriptor(mockDescriptor).build(); final RuntimeException fakeError = new RuntimeException("internal exception"); @@ -171,7 +171,7 @@ public void testExceptionDescriptorResultHandling() throws InterruptedException .splitResponse(Mockito.anyList(), Mockito.>anyList()); } - /** Tests results are resolves when {@link BatchingDescriptor#splitException} throws exception. */ + /** Resolves future results when {@link BatchingDescriptor#splitException} throws exception */ @Test public void testExceptionInDescriptorErrorHandling() throws InterruptedException { underTest = diff --git a/gax/src/test/java/com/google/api/gax/rpc/testing/FakeBatchableApi.java b/gax/src/test/java/com/google/api/gax/rpc/testing/FakeBatchableApi.java index e96fd56d7..694fdd7b0 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/testing/FakeBatchableApi.java +++ b/gax/src/test/java/com/google/api/gax/rpc/testing/FakeBatchableApi.java @@ -171,8 +171,8 @@ public long countBytes(LabeledIntList request) { for (Integer i : request.ints) { counter += i; } - // Limit the byte size to simulate merged messages having smaller serialized size that the - // sum of their components + // Limit the byte size to simulate merged messages having smaller serialized size than the sum + // of their components. return Math.min(counter, 5); } } From df026922fc72369d18490fa48bc91d472894da7c Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Fri, 7 Jun 2019 18:58:04 +0530 Subject: [PATCH 6/9] Removing timeout time from Batcher#flush() --- .../java/com/google/api/gax/batching/v2/BatcherImpl.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java index ad1f1ad88..aaa98fccd 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java @@ -55,9 +55,6 @@ public class BatcherImpl implements Batcher { - /** The amount of time to wait before checking responses are received or not. */ - private static final long DEFAULT_WAIT_TIME_MS = 250; - private final BatchingDescriptor batchingDescriptor; private final UnaryCallable callable; @@ -186,7 +183,7 @@ private void onBatchCompletion() { private void awaitAllOutstandingBatches() throws InterruptedException { while (numOfOutstandingBatches.get() > 0) { synchronized (flushLock) { - flushLock.wait(DEFAULT_WAIT_TIME_MS); + flushLock.wait(); } } } From 27bb276cdf9bf282b7480001f8356b9a328139a0 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Mon, 10 Jun 2019 13:57:11 +0530 Subject: [PATCH 7/9] Addressing more feedback comments --- .../api/gax/batching/v2/BatcherImpl.java | 61 ++++++++++++------- .../api/gax/batching/v2/BatcherImplTest.java | 5 +- 2 files changed, 44 insertions(+), 22 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java index aaa98fccd..5e2bf22f4 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java @@ -49,6 +49,11 @@ * resolves. * *

This class is not thread-safe, and expects to be used from a single thread. + * + * @param The type of each individual element to be batched. + * @param The type of the result for each individual element. + * @param The type of the request that will contain the accumulated elements. + * @param The type of the response that will unpack into individual element results. */ @BetaApi("The surface for batching is not stable yet and may change in the future.") @InternalApi @@ -59,17 +64,17 @@ public class BatcherImpl batchingDescriptor; private final UnaryCallable callable; private final RequestT prototype; - private Batch currentOpenBatch; + private Batch currentOpenBatch; private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0); private final Object flushLock = new Object(); private boolean isClosed = false; private BatcherImpl(Builder builder) { - this.prototype = checkNotNull(builder.prototype, "RequestPrototype cannot be null."); - this.callable = checkNotNull(builder.unaryCallable, "UnaryCallable cannot be null."); + this.prototype = checkNotNull(builder.prototype, "prototype cannot be null"); + this.callable = checkNotNull(builder.unaryCallable, "callable cannot be null"); this.batchingDescriptor = - checkNotNull(builder.batchingDescriptor, "BatchingDescriptor cannot be null."); + checkNotNull(builder.batchingDescriptor, "batching descriptor cannot be null"); } /** Builder for a BatcherImpl. */ @@ -110,10 +115,10 @@ Builder newBuilder() { /** {@inheritDoc} */ @Override public ApiFuture add(ElementT element) { - Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher."); + Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher"); if (currentOpenBatch == null) { - currentOpenBatch = new Batch<>(batchingDescriptor.newRequestBuilder(prototype)); + currentOpenBatch = new Batch<>(prototype, batchingDescriptor); } SettableApiFuture result = SettableApiFuture.create(); @@ -133,24 +138,20 @@ private void sendBatch() { if (currentOpenBatch == null) { return; } - final Batch accumulatedBatch = currentOpenBatch; + final Batch accumulatedBatch = currentOpenBatch; currentOpenBatch = null; - numOfOutstandingBatches.incrementAndGet(); final ApiFuture batchResponse = callable.futureCall(accumulatedBatch.builder.build()); + numOfOutstandingBatches.incrementAndGet(); ApiFutures.addCallback( batchResponse, new ApiFutureCallback() { @Override public void onSuccess(ResponseT response) { try { - batchingDescriptor.splitResponse(response, accumulatedBatch.results); - } catch (Throwable ex) { - for (SettableApiFuture result : accumulatedBatch.results) { - result.setException(ex); - } + accumulatedBatch.onBatchSuccess(response); } finally { onBatchCompletion(); } @@ -159,11 +160,7 @@ public void onSuccess(ResponseT response) { @Override public void onFailure(Throwable throwable) { try { - batchingDescriptor.splitException(throwable, accumulatedBatch.results); - } catch (Throwable ex) { - for (SettableApiFuture result : accumulatedBatch.results) { - result.setException(ex); - } + accumulatedBatch.onBatchFailure(throwable); } finally { onBatchCompletion(); } @@ -199,12 +196,16 @@ public void close() throws InterruptedException { * This class represent one logical Batch. It accumulates all the elements and their corresponding * future element results for one batch. */ - private static class Batch { + private static class Batch { private final RequestBuilder builder; private final List> results; + private final BatchingDescriptor descriptor; - private Batch(RequestBuilder builder) { - this.builder = builder; + private Batch( + RequestT prototype, + BatchingDescriptor descriptor) { + this.descriptor = descriptor; + this.builder = descriptor.newRequestBuilder(prototype); this.results = new ArrayList<>(); } @@ -212,5 +213,23 @@ void add(ElementT element, SettableApiFuture result) { builder.add(element); results.add(result); } + + void onBatchSuccess(ResponseT response) { + try { + descriptor.splitResponse(response, results); + } catch (Exception ex) { + onBatchFailure(ex); + } + } + + void onBatchFailure(Throwable throwable) { + try { + descriptor.splitException(throwable, results); + } catch (Exception ex) { + for (SettableApiFuture result : results) { + result.setException(ex); + } + } + } } } diff --git a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java index ef021c86b..77d0259e3 100644 --- a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java @@ -102,7 +102,7 @@ public void testNoElementAdditionAfterClose() throws Exception { actualError = ex; } assertThat(actualError).isInstanceOf(IllegalStateException.class); - assertThat(actualError.getMessage()).matches("Cannot add elements on a closed batcher."); + assertThat(actualError.getMessage()).matches("Cannot add elements on a closed batcher"); } /** Verifies unaryCallable is being called with a batch. */ @@ -156,6 +156,9 @@ public void testExceptionInDescriptor() throws InterruptedException { doThrow(fakeError) .when(mockDescriptor) .splitResponse(Mockito.anyList(), Mockito.>anyList()); + doThrow(fakeError) + .when(mockDescriptor) + .splitException(Mockito.any(), Mockito.>anyList()); Future result = underTest.add(2); underTest.flush(); From 3439c02726cc332a297e215ff32bbaf7c38783d1 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Tue, 11 Jun 2019 16:43:53 +0530 Subject: [PATCH 8/9] Updated BatcherImpl.Builder to AutoValue Now BatcherImpl is created with @AutoValue. Also made `isClosed` field as volatile. --- .../api/gax/batching/v2/BatcherImpl.java | 70 +++++-------------- .../api/gax/batching/v2/BatcherImplTest.java | 27 +++---- 2 files changed, 27 insertions(+), 70 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java index 5e2bf22f4..918d3f7f2 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java @@ -29,7 +29,6 @@ */ package com.google.api.gax.batching.v2; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import com.google.api.core.ApiFuture; @@ -39,6 +38,7 @@ import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.UnaryCallable; +import com.google.auto.value.AutoValue; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; @@ -57,59 +57,27 @@ */ @BetaApi("The surface for batching is not stable yet and may change in the future.") @InternalApi -public class BatcherImpl +@AutoValue +public abstract class BatcherImpl implements Batcher { - private final BatchingDescriptor - batchingDescriptor; - private final UnaryCallable callable; - private final RequestT prototype; - private Batch currentOpenBatch; - - private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0); - private final Object flushLock = new Object(); - private boolean isClosed = false; - - private BatcherImpl(Builder builder) { - this.prototype = checkNotNull(builder.prototype, "prototype cannot be null"); - this.callable = checkNotNull(builder.unaryCallable, "callable cannot be null"); - this.batchingDescriptor = - checkNotNull(builder.batchingDescriptor, "batching descriptor cannot be null"); - } - - /** Builder for a BatcherImpl. */ - public static class Builder { - private BatchingDescriptor batchingDescriptor; - private UnaryCallable unaryCallable; - private RequestT prototype; - - private Builder() {} - - public Builder setBatchingDescriptor( - BatchingDescriptor batchingDescriptor) { - this.batchingDescriptor = batchingDescriptor; - return this; - } + abstract BatchingDescriptor batchingDescriptor(); - public Builder setUnaryCallable( - UnaryCallable unaryCallable) { - this.unaryCallable = unaryCallable; - return this; - } + abstract UnaryCallable unaryCallable(); - public Builder setPrototype(RequestT prototype) { - this.prototype = prototype; - return this; - } + abstract RequestT prototype(); - public BatcherImpl build() { - return new BatcherImpl<>(this); - } - } - - public static - Builder newBuilder() { - return new Builder<>(); + private Batch currentOpenBatch; + private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0); + private final Object flushLock = new Object(); + private volatile boolean isClosed = false; + + public static + BatcherImpl create( + BatchingDescriptor batchingDescriptor, + UnaryCallable unaryCallable, + RequestT prototype) { + return new AutoValue_BatcherImpl<>(batchingDescriptor, unaryCallable, prototype); } /** {@inheritDoc} */ @@ -118,7 +86,7 @@ public ApiFuture add(ElementT element) { Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher"); if (currentOpenBatch == null) { - currentOpenBatch = new Batch<>(prototype, batchingDescriptor); + currentOpenBatch = new Batch<>(prototype(), batchingDescriptor()); } SettableApiFuture result = SettableApiFuture.create(); @@ -142,7 +110,7 @@ private void sendBatch() { currentOpenBatch = null; final ApiFuture batchResponse = - callable.futureCall(accumulatedBatch.builder.build()); + unaryCallable().futureCall(accumulatedBatch.builder.build()); numOfOutstandingBatches.incrementAndGet(); ApiFutures.addCallback( diff --git a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java index 77d0259e3..33882feb7 100644 --- a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java @@ -68,7 +68,7 @@ public class BatcherImplTest { /** The accumulated results in the test are resolved when {@link Batcher#flush()} is called. */ @Test public void testResultsAreResolvedAfterFlush() throws Exception { - underTest = createNewBatcherBuilder().build(); + underTest = BatcherImpl.create(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList); Future result = underTest.add(4); assertThat(result.isDone()).isFalse(); underTest.flush(); @@ -83,7 +83,8 @@ public void testResultsAreResolvedAfterFlush() throws Exception { @Test public void testWhenBatcherIsClose() throws Exception { Future result; - try (Batcher batcher = createNewBatcherBuilder().build()) { + try (Batcher batcher = + BatcherImpl.create(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList)) { result = batcher.add(5); } assertThat(result.isDone()).isTrue(); @@ -93,7 +94,7 @@ public void testWhenBatcherIsClose() throws Exception { /** Validates exception when batch is called after {@link Batcher#close()}. */ @Test public void testNoElementAdditionAfterClose() throws Exception { - underTest = createNewBatcherBuilder().build(); + underTest = BatcherImpl.create(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList); underTest.close(); Throwable actualError = null; try { @@ -108,7 +109,7 @@ public void testNoElementAdditionAfterClose() throws Exception { /** Verifies unaryCallable is being called with a batch. */ @Test public void testResultsAfterRPCSucceed() throws Exception { - underTest = createNewBatcherBuilder().setUnaryCallable(mockUnaryCallable).build(); + underTest = BatcherImpl.create(SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList); when(mockUnaryCallable.futureCall(any(LabeledIntList.class))) .thenReturn(ApiFutures.immediateFuture(Arrays.asList(16, 25))); @@ -125,7 +126,7 @@ public void testResultsAfterRPCSucceed() throws Exception { /** Verifies exception occurred at RPC is propagated to element results */ @Test public void testResultFailureAfterRPCFailure() throws Exception { - underTest = createNewBatcherBuilder().setUnaryCallable(mockUnaryCallable).build(); + underTest = BatcherImpl.create(SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList); final Exception fakeError = new RuntimeException(); when(mockUnaryCallable.futureCall(any(LabeledIntList.class))) @@ -148,7 +149,7 @@ public void testResultFailureAfterRPCFailure() throws Exception { /** Resolves future results when {@link BatchingDescriptor#splitResponse} throws exception. */ @Test public void testExceptionInDescriptor() throws InterruptedException { - underTest = createNewBatcherBuilder().setBatchingDescriptor(mockDescriptor).build(); + underTest = BatcherImpl.create(mockDescriptor, callLabeledIntSquarer, labeledIntList); final RuntimeException fakeError = new RuntimeException("internal exception"); when(mockDescriptor.newRequestBuilder(any(LabeledIntList.class))) @@ -177,11 +178,7 @@ public void testExceptionInDescriptor() throws InterruptedException { /** Resolves future results when {@link BatchingDescriptor#splitException} throws exception */ @Test public void testExceptionInDescriptorErrorHandling() throws InterruptedException { - underTest = - createNewBatcherBuilder() - .setBatchingDescriptor(mockDescriptor) - .setUnaryCallable(mockUnaryCallable) - .build(); + underTest = BatcherImpl.create(mockDescriptor, mockUnaryCallable, labeledIntList); final RuntimeException fakeRpcError = new RuntimeException("RPC error"); final RuntimeException fakeError = new RuntimeException("internal exception"); @@ -206,12 +203,4 @@ public void testExceptionInDescriptorErrorHandling() throws InterruptedException verify(mockDescriptor) .splitException(any(Throwable.class), Mockito.>anyList()); } - - private BatcherImpl.Builder> - createNewBatcherBuilder() { - return BatcherImpl.>newBuilder() - .setPrototype(labeledIntList) - .setUnaryCallable(callLabeledIntSquarer) - .setBatchingDescriptor(SQUARER_BATCHING_DESC_V2); - } } From 05eb03c2ba68bf2759474a91f80d3f05a6cde789 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Tue, 11 Jun 2019 20:32:38 +0530 Subject: [PATCH 9/9] Removing `@AutoValue` from `BatcherImpl` class --- .../api/gax/batching/v2/BatcherImpl.java | 31 +++++++++---------- .../api/gax/batching/v2/BatcherImplTest.java | 14 ++++----- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java index 918d3f7f2..360341872 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java @@ -38,7 +38,6 @@ import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.UnaryCallable; -import com.google.auto.value.AutoValue; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; @@ -57,27 +56,27 @@ */ @BetaApi("The surface for batching is not stable yet and may change in the future.") @InternalApi -@AutoValue -public abstract class BatcherImpl +public class BatcherImpl implements Batcher { - abstract BatchingDescriptor batchingDescriptor(); - - abstract UnaryCallable unaryCallable(); - - abstract RequestT prototype(); + private final BatchingDescriptor + batchingDescriptor; + private final UnaryCallable unaryCallable; + private final RequestT prototype; private Batch currentOpenBatch; private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0); private final Object flushLock = new Object(); private volatile boolean isClosed = false; - public static - BatcherImpl create( - BatchingDescriptor batchingDescriptor, - UnaryCallable unaryCallable, - RequestT prototype) { - return new AutoValue_BatcherImpl<>(batchingDescriptor, unaryCallable, prototype); + public BatcherImpl( + BatchingDescriptor batchingDescriptor, + UnaryCallable unaryCallable, + RequestT prototype) { + this.batchingDescriptor = + Preconditions.checkNotNull(batchingDescriptor, "batching descriptor cannot be null"); + this.unaryCallable = Preconditions.checkNotNull(unaryCallable, "callable cannot be null"); + this.prototype = Preconditions.checkNotNull(prototype, "request prototype cannot be null"); } /** {@inheritDoc} */ @@ -86,7 +85,7 @@ public ApiFuture add(ElementT element) { Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher"); if (currentOpenBatch == null) { - currentOpenBatch = new Batch<>(prototype(), batchingDescriptor()); + currentOpenBatch = new Batch<>(prototype, batchingDescriptor); } SettableApiFuture result = SettableApiFuture.create(); @@ -110,7 +109,7 @@ private void sendBatch() { currentOpenBatch = null; final ApiFuture batchResponse = - unaryCallable().futureCall(accumulatedBatch.builder.build()); + unaryCallable.futureCall(accumulatedBatch.builder.build()); numOfOutstandingBatches.incrementAndGet(); ApiFutures.addCallback( diff --git a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java index 33882feb7..989805553 100644 --- a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java @@ -68,7 +68,7 @@ public class BatcherImplTest { /** The accumulated results in the test are resolved when {@link Batcher#flush()} is called. */ @Test public void testResultsAreResolvedAfterFlush() throws Exception { - underTest = BatcherImpl.create(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList); + underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList); Future result = underTest.add(4); assertThat(result.isDone()).isFalse(); underTest.flush(); @@ -84,7 +84,7 @@ public void testResultsAreResolvedAfterFlush() throws Exception { public void testWhenBatcherIsClose() throws Exception { Future result; try (Batcher batcher = - BatcherImpl.create(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList)) { + new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList)) { result = batcher.add(5); } assertThat(result.isDone()).isTrue(); @@ -94,7 +94,7 @@ public void testWhenBatcherIsClose() throws Exception { /** Validates exception when batch is called after {@link Batcher#close()}. */ @Test public void testNoElementAdditionAfterClose() throws Exception { - underTest = BatcherImpl.create(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList); + underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList); underTest.close(); Throwable actualError = null; try { @@ -109,7 +109,7 @@ public void testNoElementAdditionAfterClose() throws Exception { /** Verifies unaryCallable is being called with a batch. */ @Test public void testResultsAfterRPCSucceed() throws Exception { - underTest = BatcherImpl.create(SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList); + underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList); when(mockUnaryCallable.futureCall(any(LabeledIntList.class))) .thenReturn(ApiFutures.immediateFuture(Arrays.asList(16, 25))); @@ -126,7 +126,7 @@ public void testResultsAfterRPCSucceed() throws Exception { /** Verifies exception occurred at RPC is propagated to element results */ @Test public void testResultFailureAfterRPCFailure() throws Exception { - underTest = BatcherImpl.create(SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList); + underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList); final Exception fakeError = new RuntimeException(); when(mockUnaryCallable.futureCall(any(LabeledIntList.class))) @@ -149,7 +149,7 @@ public void testResultFailureAfterRPCFailure() throws Exception { /** Resolves future results when {@link BatchingDescriptor#splitResponse} throws exception. */ @Test public void testExceptionInDescriptor() throws InterruptedException { - underTest = BatcherImpl.create(mockDescriptor, callLabeledIntSquarer, labeledIntList); + underTest = new BatcherImpl<>(mockDescriptor, callLabeledIntSquarer, labeledIntList); final RuntimeException fakeError = new RuntimeException("internal exception"); when(mockDescriptor.newRequestBuilder(any(LabeledIntList.class))) @@ -178,7 +178,7 @@ public void testExceptionInDescriptor() throws InterruptedException { /** Resolves future results when {@link BatchingDescriptor#splitException} throws exception */ @Test public void testExceptionInDescriptorErrorHandling() throws InterruptedException { - underTest = BatcherImpl.create(mockDescriptor, mockUnaryCallable, labeledIntList); + underTest = new BatcherImpl<>(mockDescriptor, mockUnaryCallable, labeledIntList); final RuntimeException fakeRpcError = new RuntimeException("RPC error"); final RuntimeException fakeError = new RuntimeException("internal exception");