From c885a00626a8967a187b0aaf4bc6d628253b2ca0 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Wed, 12 Jun 2019 17:05:19 +0530 Subject: [PATCH 1/5] Implementing ElementCount and ElementBytes Triggers for Batcher Added two threshold in the `BatcherImpl` which will trigger automatic batching upon breach of any of these. - ElementCount: Number of elements queued up till now. - ElementBytes: Size of the accumulated elements. - Threshold can be set using `BatchingSettings`. - Javadoc fixes to remove v1 reference - Removed unnecessary abstraction and made thresholds values as required Updated BatchingSettings default value to Long.MAX_VALUE --- .../api/gax/batching/v2/BatcherImpl.java | 28 +++- .../api/gax/batching/v2/BatchingSettings.java | 120 ++++++++++++++++++ .../api/gax/batching/v2/BatcherImplTest.java | 64 +++++++++- 3 files changed, 202 insertions(+), 10 deletions(-) create mode 100644 gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java index 360341872..fff2c2d1c 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java @@ -63,6 +63,7 @@ public class BatcherImpl batchingDescriptor; private final UnaryCallable unaryCallable; private final RequestT prototype; + private final BatchingSettings batchingSettings; private Batch currentOpenBatch; private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0); @@ -72,11 +73,14 @@ public class BatcherImpl public BatcherImpl( BatchingDescriptor batchingDescriptor, UnaryCallable unaryCallable, - RequestT prototype) { + RequestT prototype, + BatchingSettings batchingSettings) { this.batchingDescriptor = Preconditions.checkNotNull(batchingDescriptor, "batching descriptor cannot be null"); this.unaryCallable = Preconditions.checkNotNull(unaryCallable, "callable cannot be null"); this.prototype = Preconditions.checkNotNull(prototype, "request prototype cannot be null"); + this.batchingSettings = + Preconditions.checkNotNull(batchingSettings, "batching setting cannot be null"); } /** {@inheritDoc} */ @@ -85,11 +89,15 @@ public ApiFuture add(ElementT element) { Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher"); if (currentOpenBatch == null) { - currentOpenBatch = new Batch<>(prototype, batchingDescriptor); + currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings); } SettableApiFuture result = SettableApiFuture.create(); currentOpenBatch.add(element, result); + + if (currentOpenBatch.hasAnyThresholdReached()) { + sendBatch(); + } return result; } @@ -167,18 +175,28 @@ private static class Batch { private final RequestBuilder builder; private final List> results; private final BatchingDescriptor descriptor; + private final long elementThreshold; + private final long bytesThreshold; + + private long elementCounter = 0; + private long byteCounter = 0; private Batch( RequestT prototype, - BatchingDescriptor descriptor) { + BatchingDescriptor descriptor, + BatchingSettings batchingSettings) { this.descriptor = descriptor; this.builder = descriptor.newRequestBuilder(prototype); + this.elementThreshold = batchingSettings.getElementCountThreshold(); + this.bytesThreshold = batchingSettings.getRequestByteThreshold(); this.results = new ArrayList<>(); } void add(ElementT element, SettableApiFuture result) { builder.add(element); results.add(result); + elementCounter++; + byteCounter += descriptor.countBytes(element); } void onBatchSuccess(ResponseT response) { @@ -198,5 +216,9 @@ void onBatchFailure(Throwable throwable) { } } } + + boolean hasAnyThresholdReached() { + return elementCounter >= elementThreshold || byteCounter >= bytesThreshold; + } } } diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java new file mode 100644 index 000000000..089e920b1 --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java @@ -0,0 +1,120 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching.v2; + +import com.google.api.core.BetaApi; +import com.google.auto.value.AutoValue; +import com.google.common.base.Preconditions; + +/** + * Represents the batching settings to use for an API method that is capable of batching. + * + *

Each batching client must define their own set of default values for these thresholds, which + * would be the safest behavior for their jobs. + * + *

The default instance of this settings are configured to accept elements until either of the + * threshold reaches {@link Long#MAX_VALUE}) or an explicit call to {@link Batcher#flush()} is made + * or the {@link Batcher#close()} is called. Users are expected to configure actual batching + * thresholds explicitly: the element count or the request bytes count. + * + *

Warning: With the incorrect settings, it is possible to cause long periods of dead waiting + * time. + * + *

When batching is configured for an API method, a call to that method will result in the + * request being queued up with other requests. When any of the set thresholds are reached, the + * queued up requests are packaged together in a batch and set to the service as a single RPC. When + * the response comes back, it is split apart into individual responses according to the individual + * input requests. + * + *

There are several supported thresholds: + * + *

    + *
  • Message Count Threshold: Once this many messages are queued, send all of the + * messages in a single call, even if the request byte threshold has not been exceed yet. The + * default value is {@link Long#MAX_VALUE} messages. + *
  • Request Byte Threshold: Once the number of bytes in the batched request reaches this + * threshold, send all of the messages in a single call, even if message count threshold has + * not been exceeded yet. The default value is {@link Long#MAX_VALUE} bytes. + *
+ * + *

These thresholds are treated as triggers, not as limits. Each threshold is an independent + * trigger and doesn't have any knowledge of the other thresholds. + */ +@BetaApi("The surface for batching is not stable yet and may change in the future.") +@AutoValue +public abstract class BatchingSettings { + + /** Get the element count threshold to use for batching. */ + public abstract long getElementCountThreshold(); + + /** Get the request byte threshold to use for batching. */ + public abstract long getRequestByteThreshold(); + + /** Get a new builder. */ + public static Builder newBuilder() { + return new AutoValue_BatchingSettings.Builder() + .setElementCountThreshold(Long.MAX_VALUE) + .setRequestByteThreshold(Long.MAX_VALUE); + } + + /** Get a builder with the same values as this object. */ + public abstract Builder toBuilder(); + + /** + * See the class documentation of {@link BatchingSettings} for a description of the different + * values that can be set. + */ + @AutoValue.Builder + public abstract static class Builder { + /** + * Set the element count threshold to use for batching. After this many elements are + * accumulated, they will be wrapped up in a batch and sent. + */ + public abstract Builder setElementCountThreshold(long elementCountThreshold); + + /** + * Set the request byte threshold to use for batching. After this many bytes are accumulated, + * the elements will be wrapped up in a batch and sent. + */ + public abstract Builder setRequestByteThreshold(long requestByteThreshold); + + abstract BatchingSettings autoBuild(); + + /** Build the BatchingSettings object. */ + public BatchingSettings build() { + BatchingSettings settings = autoBuild(); + Preconditions.checkState( + settings.getElementCountThreshold() > 0, "elementCountThreshold must be positive"); + Preconditions.checkState( + settings.getRequestByteThreshold() > 0, "requestByteThreshold must be positive"); + return settings; + } + } +} diff --git a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java index 989805553..db3c86991 100644 --- a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java @@ -46,6 +46,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.junit.After; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -64,11 +65,25 @@ public class BatcherImplTest { private Batcher underTest; private LabeledIntList labeledIntList = new LabeledIntList("Default"); + private BatchingSettings batchingSettings = + BatchingSettings.newBuilder() + .setRequestByteThreshold(1000L) + .setElementCountThreshold(1000L) + .build(); + + @After + public void tearDown() throws InterruptedException { + if (underTest != null) { + underTest.close(); + } + } /** The accumulated results in the test are resolved when {@link Batcher#flush()} is called. */ @Test public void testResultsAreResolvedAfterFlush() throws Exception { - underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList); + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, batchingSettings); Future result = underTest.add(4); assertThat(result.isDone()).isFalse(); underTest.flush(); @@ -84,7 +99,8 @@ public void testResultsAreResolvedAfterFlush() throws Exception { public void testWhenBatcherIsClose() throws Exception { Future result; try (Batcher batcher = - new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList)) { + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, batchingSettings)) { result = batcher.add(5); } assertThat(result.isDone()).isTrue(); @@ -94,7 +110,9 @@ public void testWhenBatcherIsClose() throws Exception { /** Validates exception when batch is called after {@link Batcher#close()}. */ @Test public void testNoElementAdditionAfterClose() throws Exception { - underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList); + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, batchingSettings); underTest.close(); Throwable actualError = null; try { @@ -109,7 +127,9 @@ public void testNoElementAdditionAfterClose() throws Exception { /** Verifies unaryCallable is being called with a batch. */ @Test public void testResultsAfterRPCSucceed() throws Exception { - underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList); + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList, batchingSettings); when(mockUnaryCallable.futureCall(any(LabeledIntList.class))) .thenReturn(ApiFutures.immediateFuture(Arrays.asList(16, 25))); @@ -126,7 +146,9 @@ public void testResultsAfterRPCSucceed() throws Exception { /** Verifies exception occurred at RPC is propagated to element results */ @Test public void testResultFailureAfterRPCFailure() throws Exception { - underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList); + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList, batchingSettings); final Exception fakeError = new RuntimeException(); when(mockUnaryCallable.futureCall(any(LabeledIntList.class))) @@ -149,7 +171,8 @@ public void testResultFailureAfterRPCFailure() throws Exception { /** Resolves future results when {@link BatchingDescriptor#splitResponse} throws exception. */ @Test public void testExceptionInDescriptor() throws InterruptedException { - underTest = new BatcherImpl<>(mockDescriptor, callLabeledIntSquarer, labeledIntList); + underTest = + new BatcherImpl<>(mockDescriptor, callLabeledIntSquarer, labeledIntList, batchingSettings); final RuntimeException fakeError = new RuntimeException("internal exception"); when(mockDescriptor.newRequestBuilder(any(LabeledIntList.class))) @@ -178,7 +201,8 @@ public void testExceptionInDescriptor() throws InterruptedException { /** Resolves future results when {@link BatchingDescriptor#splitException} throws exception */ @Test public void testExceptionInDescriptorErrorHandling() throws InterruptedException { - underTest = new BatcherImpl<>(mockDescriptor, mockUnaryCallable, labeledIntList); + underTest = + new BatcherImpl<>(mockDescriptor, mockUnaryCallable, labeledIntList, batchingSettings); final RuntimeException fakeRpcError = new RuntimeException("RPC error"); final RuntimeException fakeError = new RuntimeException("internal exception"); @@ -203,4 +227,30 @@ public void testExceptionInDescriptorErrorHandling() throws InterruptedException verify(mockDescriptor) .splitException(any(Throwable.class), Mockito.>anyList()); } + + @Test + public void testWhenElementCountExceeds() throws Exception { + BatchingSettings settings = batchingSettings.toBuilder().setElementCountThreshold(2L).build(); + testElementTriggers(settings); + } + + @Test + public void testWhenElementBytesExceeds() throws Exception { + BatchingSettings settings = batchingSettings.toBuilder().setRequestByteThreshold(2L).build(); + testElementTriggers(settings); + } + + private void testElementTriggers(BatchingSettings settings) throws Exception { + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings); + Future result = underTest.add(4); + assertThat(result.isDone()).isFalse(); + // After this element is added, the batch triggers sendBatch(). + Future anotherResult = underTest.add(5); + // Both the elements should be resolved now. + assertThat(result.isDone()).isTrue(); + assertThat(result.get()).isEqualTo(16); + assertThat(anotherResult.isDone()).isTrue(); + } } From d6329879e278de4556a1b9464446d21eb8637db8 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Mon, 24 Jun 2019 14:18:46 +0530 Subject: [PATCH 2/5] Updated element count threshold to Integer also, added Javadoc at BatcherImpl XOR. --- .../google/api/gax/batching/v2/BatcherImpl.java | 7 +++++++ .../api/gax/batching/v2/BatchingSettings.java | 14 +++++++------- .../api/gax/batching/v2/BatcherImplTest.java | 4 ++-- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java index fff2c2d1c..82764c4e5 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java @@ -70,6 +70,13 @@ public class BatcherImpl private final Object flushLock = new Object(); private volatile boolean isClosed = false; + /** + * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements + * into wrappers request and response. + * @param unaryCallable a {@link UnaryCallable} object. + * @param prototype a {@link RequestT} object. + * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds. + */ public BatcherImpl( BatchingDescriptor batchingDescriptor, UnaryCallable unaryCallable, diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java index 089e920b1..35608ffcd 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java @@ -40,9 +40,9 @@ * would be the safest behavior for their jobs. * *

The default instance of this settings are configured to accept elements until either of the - * threshold reaches {@link Long#MAX_VALUE}) or an explicit call to {@link Batcher#flush()} is made - * or the {@link Batcher#close()} is called. Users are expected to configure actual batching - * thresholds explicitly: the element count or the request bytes count. + * threshold reaches their defined value or an explicit call to {@link Batcher#flush()} is made or + * the {@link Batcher#close()} is called. Users are expected to configure actual batching thresholds + * explicitly: the element count or the request bytes count. * *

Warning: With the incorrect settings, it is possible to cause long periods of dead waiting * time. @@ -58,7 +58,7 @@ *

    *
  • Message Count Threshold: Once this many messages are queued, send all of the * messages in a single call, even if the request byte threshold has not been exceed yet. The - * default value is {@link Long#MAX_VALUE} messages. + * default value is {@link Integer#MAX_VALUE} messages. *
  • Request Byte Threshold: Once the number of bytes in the batched request reaches this * threshold, send all of the messages in a single call, even if message count threshold has * not been exceeded yet. The default value is {@link Long#MAX_VALUE} bytes. @@ -72,7 +72,7 @@ public abstract class BatchingSettings { /** Get the element count threshold to use for batching. */ - public abstract long getElementCountThreshold(); + public abstract int getElementCountThreshold(); /** Get the request byte threshold to use for batching. */ public abstract long getRequestByteThreshold(); @@ -80,7 +80,7 @@ public abstract class BatchingSettings { /** Get a new builder. */ public static Builder newBuilder() { return new AutoValue_BatchingSettings.Builder() - .setElementCountThreshold(Long.MAX_VALUE) + .setElementCountThreshold(Integer.MAX_VALUE) .setRequestByteThreshold(Long.MAX_VALUE); } @@ -97,7 +97,7 @@ public abstract static class Builder { * Set the element count threshold to use for batching. After this many elements are * accumulated, they will be wrapped up in a batch and sent. */ - public abstract Builder setElementCountThreshold(long elementCountThreshold); + public abstract Builder setElementCountThreshold(int elementCountThreshold); /** * Set the request byte threshold to use for batching. After this many bytes are accumulated, diff --git a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java index db3c86991..7df9d148d 100644 --- a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java @@ -68,7 +68,7 @@ public class BatcherImplTest { private BatchingSettings batchingSettings = BatchingSettings.newBuilder() .setRequestByteThreshold(1000L) - .setElementCountThreshold(1000L) + .setElementCountThreshold(1000) .build(); @After @@ -230,7 +230,7 @@ public void testExceptionInDescriptorErrorHandling() throws InterruptedException @Test public void testWhenElementCountExceeds() throws Exception { - BatchingSettings settings = batchingSettings.toBuilder().setElementCountThreshold(2L).build(); + BatchingSettings settings = batchingSettings.toBuilder().setElementCountThreshold(2).build(); testElementTriggers(settings); } From 43be8adfae5a16e10fa6f38ccca90b587ab2d44f Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Mon, 24 Jun 2019 21:07:31 +0530 Subject: [PATCH 3/5] Batching Threshold can be disabled with this change --- .../com/google/api/gax/batching/v2/BatcherImpl.java | 5 +++-- .../google/api/gax/batching/v2/BatchingSettings.java | 4 ++-- .../google/api/gax/batching/v2/BatcherImplTest.java | 11 ++++++++--- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java index 82764c4e5..bc4f73f84 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java @@ -55,7 +55,7 @@ * @param The type of the response that will unpack into individual element results. */ @BetaApi("The surface for batching is not stable yet and may change in the future.") -@InternalApi +@InternalApi("For google-cloud-java client use only") public class BatcherImpl implements Batcher { @@ -225,7 +225,8 @@ void onBatchFailure(Throwable throwable) { } boolean hasAnyThresholdReached() { - return elementCounter >= elementThreshold || byteCounter >= bytesThreshold; + return (elementThreshold != 0 && elementThreshold <= elementCounter) + || (bytesThreshold != 0 && bytesThreshold <= byteCounter); } } } diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java index 35608ffcd..fca96d9ba 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java @@ -111,9 +111,9 @@ public abstract static class Builder { public BatchingSettings build() { BatchingSettings settings = autoBuild(); Preconditions.checkState( - settings.getElementCountThreshold() > 0, "elementCountThreshold must be positive"); + settings.getElementCountThreshold() >= 0, "elementCountThreshold cannot be negative"); Preconditions.checkState( - settings.getRequestByteThreshold() > 0, "requestByteThreshold must be positive"); + settings.getRequestByteThreshold() >= 0, "requestByteThreshold cannot be negative"); return settings; } } diff --git a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java index 7df9d148d..f846cc732 100644 --- a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java @@ -81,9 +81,14 @@ public void tearDown() throws InterruptedException { /** The accumulated results in the test are resolved when {@link Batcher#flush()} is called. */ @Test public void testResultsAreResolvedAfterFlush() throws Exception { + BatchingSettings settings = + BatchingSettings.newBuilder() + .setElementCountThreshold(0) + .setRequestByteThreshold(0) + .build(); underTest = new BatcherImpl<>( - SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, batchingSettings); + SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings); Future result = underTest.add(4); assertThat(result.isDone()).isFalse(); underTest.flush(); @@ -244,10 +249,10 @@ private void testElementTriggers(BatchingSettings settings) throws Exception { underTest = new BatcherImpl<>( SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings); - Future result = underTest.add(4); + Future result = underTest.add(4); assertThat(result.isDone()).isFalse(); // After this element is added, the batch triggers sendBatch(). - Future anotherResult = underTest.add(5); + Future anotherResult = underTest.add(5); // Both the elements should be resolved now. assertThat(result.isDone()).isTrue(); assertThat(result.get()).isEqualTo(16); From 3eb7182cfd9b7a6337a586562d831007388506c3 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Mon, 24 Jun 2019 21:18:40 +0530 Subject: [PATCH 4/5] Updated Javadoc to describe threshold disablity by 0 --- .../google/api/gax/batching/v2/BatchingSettings.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java index fca96d9ba..d5b3aa464 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java @@ -44,6 +44,8 @@ * the {@link Batcher#close()} is called. Users are expected to configure actual batching thresholds * explicitly: the element count or the request bytes count. * + *

    Element count and Request byte threshold can be disabled by setting it to 0. + * *

    Warning: With the incorrect settings, it is possible to cause long periods of dead waiting * time. * @@ -56,11 +58,11 @@ *

    There are several supported thresholds: * *

      - *
    • Message Count Threshold: Once this many messages are queued, send all of the - * messages in a single call, even if the request byte threshold has not been exceed yet. The - * default value is {@link Integer#MAX_VALUE} messages. + *
    • Element Count Threshold: Once this many elements are queued, send all of the + * elements in a single call, even if the request byte threshold has not been exceed yet. The + * default value is {@link Integer#MAX_VALUE} elements. *
    • Request Byte Threshold: Once the number of bytes in the batched request reaches this - * threshold, send all of the messages in a single call, even if message count threshold has + * threshold, send all of the elements in a single call, even if element count threshold has * not been exceeded yet. The default value is {@link Long#MAX_VALUE} bytes. *
    * From ff99cc787a092aa7a4ac7afabf25580ae3be1ef0 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Tue, 25 Jun 2019 00:53:38 +0530 Subject: [PATCH 5/5] Fixing wrong threshold check along with JavaDoc --- .../api/gax/batching/v2/BatcherImpl.java | 3 +- .../api/gax/batching/v2/BatchingSettings.java | 18 ++++------- .../api/gax/batching/v2/BatcherImplTest.java | 32 ++++++++++++------- 3 files changed, 28 insertions(+), 25 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java index bc4f73f84..b98b718b1 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java @@ -225,8 +225,7 @@ void onBatchFailure(Throwable throwable) { } boolean hasAnyThresholdReached() { - return (elementThreshold != 0 && elementThreshold <= elementCounter) - || (bytesThreshold != 0 && bytesThreshold <= byteCounter); + return elementCounter >= elementThreshold || byteCounter >= bytesThreshold; } } } diff --git a/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java b/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java index d5b3aa464..8d86fb289 100644 --- a/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java +++ b/gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java @@ -39,12 +39,9 @@ *

    Each batching client must define their own set of default values for these thresholds, which * would be the safest behavior for their jobs. * - *

    The default instance of this settings are configured to accept elements until either of the - * threshold reaches their defined value or an explicit call to {@link Batcher#flush()} is made or - * the {@link Batcher#close()} is called. Users are expected to configure actual batching thresholds - * explicitly: the element count or the request bytes count. - * - *

    Element count and Request byte threshold can be disabled by setting it to 0. + *

    The default instance of this settings does not have any default values. Users are expected to + * configure batching thresholds explicitly: the element count or the request bytes count. + * Thresholds can be disabled(meaning immediate result of input elements) by setting its value to 0. * *

    Warning: With the incorrect settings, it is possible to cause long periods of dead waiting * time. @@ -59,11 +56,10 @@ * *

      *
    • Element Count Threshold: Once this many elements are queued, send all of the - * elements in a single call, even if the request byte threshold has not been exceed yet. The - * default value is {@link Integer#MAX_VALUE} elements. + * elements in a single call, even if the request byte threshold has not been exceed yet. *
    • Request Byte Threshold: Once the number of bytes in the batched request reaches this * threshold, send all of the elements in a single call, even if element count threshold has - * not been exceeded yet. The default value is {@link Long#MAX_VALUE} bytes. + * not been exceeded yet. *
    * *

    These thresholds are treated as triggers, not as limits. Each threshold is an independent @@ -81,9 +77,7 @@ public abstract class BatchingSettings { /** Get a new builder. */ public static Builder newBuilder() { - return new AutoValue_BatchingSettings.Builder() - .setElementCountThreshold(Integer.MAX_VALUE) - .setRequestByteThreshold(Long.MAX_VALUE); + return new AutoValue_BatchingSettings.Builder(); } /** Get a builder with the same values as this object. */ diff --git a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java index f846cc732..3bb06e606 100644 --- a/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java @@ -64,8 +64,8 @@ public class BatcherImplTest { @Mock private BatchingDescriptor> mockDescriptor; private Batcher underTest; - private LabeledIntList labeledIntList = new LabeledIntList("Default"); - private BatchingSettings batchingSettings = + private final LabeledIntList labeledIntList = new LabeledIntList("Default"); + private final BatchingSettings batchingSettings = BatchingSettings.newBuilder() .setRequestByteThreshold(1000L) .setElementCountThreshold(1000) @@ -81,14 +81,9 @@ public void tearDown() throws InterruptedException { /** The accumulated results in the test are resolved when {@link Batcher#flush()} is called. */ @Test public void testResultsAreResolvedAfterFlush() throws Exception { - BatchingSettings settings = - BatchingSettings.newBuilder() - .setElementCountThreshold(0) - .setRequestByteThreshold(0) - .build(); underTest = new BatcherImpl<>( - SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings); + SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, batchingSettings); Future result = underTest.add(4); assertThat(result.isDone()).isFalse(); underTest.flush(); @@ -169,7 +164,7 @@ public void testResultFailureAfterRPCFailure() throws Exception { actualError = ex; } - assertThat(actualError.getCause()).isSameAs(fakeError); + assertThat(actualError.getCause()).isSameInstanceAs(fakeError); verify(mockUnaryCallable, times(1)).futureCall(any(LabeledIntList.class)); } @@ -198,7 +193,7 @@ public void testExceptionInDescriptor() throws InterruptedException { actualError = ex; } - assertThat(actualError.getCause()).isSameAs(fakeError); + assertThat(actualError.getCause()).isSameInstanceAs(fakeError); verify(mockDescriptor) .splitResponse(Mockito.anyList(), Mockito.>anyList()); } @@ -228,7 +223,7 @@ public void testExceptionInDescriptorErrorHandling() throws InterruptedException actualError = ex; } - assertThat(actualError.getCause()).isSameAs(fakeError); + assertThat(actualError.getCause()).isSameInstanceAs(fakeError); verify(mockDescriptor) .splitException(any(Throwable.class), Mockito.>anyList()); } @@ -245,6 +240,21 @@ public void testWhenElementBytesExceeds() throws Exception { testElementTriggers(settings); } + @Test + public void testWhenThresholdIsDisabled() throws Exception { + BatchingSettings settings = + BatchingSettings.newBuilder() + .setElementCountThreshold(0) + .setRequestByteThreshold(0) + .build(); + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings); + Future result = underTest.add(2); + assertThat(result.isDone()).isTrue(); + assertThat(result.get()).isEqualTo(4); + } + private void testElementTriggers(BatchingSettings settings) throws Exception { underTest = new BatcherImpl<>(