Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Implementing ElementCount and ElementBytes Triggers #734

Merged
merged 5 commits into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,31 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
batchingDescriptor;
private final UnaryCallable<RequestT, ResponseT> unaryCallable;
private final RequestT prototype;
private final BatchingSettings batchingSettings;

private Batch<ElementT, ElementResultT, RequestT, ResponseT> currentOpenBatch;
private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0);
private final Object flushLock = new Object();
private volatile boolean isClosed = false;

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response.
* @param unaryCallable a {@link UnaryCallable} object.
* @param prototype a {@link RequestT} object.
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds.
*/
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
public BatcherImpl(
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
UnaryCallable<RequestT, ResponseT> unaryCallable,
RequestT prototype) {
RequestT prototype,
BatchingSettings batchingSettings) {
this.batchingDescriptor =
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor cannot be null");
this.unaryCallable = Preconditions.checkNotNull(unaryCallable, "callable cannot be null");
this.prototype = Preconditions.checkNotNull(prototype, "request prototype cannot be null");
this.batchingSettings =
Preconditions.checkNotNull(batchingSettings, "batching setting cannot be null");
}

/** {@inheritDoc} */
Expand All @@ -85,11 +96,15 @@ public ApiFuture<ElementResultT> add(ElementT element) {
Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher");

if (currentOpenBatch == null) {
currentOpenBatch = new Batch<>(prototype, batchingDescriptor);
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings);
}

SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
currentOpenBatch.add(element, result);

if (currentOpenBatch.hasAnyThresholdReached()) {
sendBatch();
}
return result;
}

Expand Down Expand Up @@ -167,18 +182,28 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private final RequestBuilder<ElementT, RequestT> builder;
private final List<SettableApiFuture<ElementResultT>> results;
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor;
private final long elementThreshold;
private final long bytesThreshold;

private long elementCounter = 0;
private long byteCounter = 0;

private Batch(
RequestT prototype,
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor) {
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor,
BatchingSettings batchingSettings) {
this.descriptor = descriptor;
this.builder = descriptor.newRequestBuilder(prototype);
this.elementThreshold = batchingSettings.getElementCountThreshold();
this.bytesThreshold = batchingSettings.getRequestByteThreshold();
this.results = new ArrayList<>();
}

void add(ElementT element, SettableApiFuture<ElementResultT> result) {
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
builder.add(element);
results.add(result);
elementCounter++;
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
byteCounter += descriptor.countBytes(element);
}

void onBatchSuccess(ResponseT response) {
Expand All @@ -198,5 +223,9 @@ void onBatchFailure(Throwable throwable) {
}
}
}

boolean hasAnyThresholdReached() {
return elementCounter >= elementThreshold || byteCounter >= bytesThreshold;
}
}
}
120 changes: 120 additions & 0 deletions gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching.v2;

import com.google.api.core.BetaApi;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;

/**
* Represents the batching settings to use for an API method that is capable of batching.
*
* <p>Each batching client must define their own set of default values for these thresholds, which
* would be the safest behavior for their jobs.
*
* <p>The default instance of this settings are configured to accept elements until either of the
* threshold reaches their defined value or an explicit call to {@link Batcher#flush()} is made or
* the {@link Batcher#close()} is called. Users are expected to configure actual batching thresholds
* explicitly: the element count or the request bytes count.
*
* <p>Warning: With the incorrect settings, it is possible to cause long periods of dead waiting
* time.
*
* <p>When batching is configured for an API method, a call to that method will result in the
* request being queued up with other requests. When any of the set thresholds are reached, the
* queued up requests are packaged together in a batch and set to the service as a single RPC. When
* the response comes back, it is split apart into individual responses according to the individual
* input requests.
*
* <p>There are several supported thresholds:
*
* <ul>
* <li><b>Message Count Threshold</b>: Once this many messages are queued, send all of the
* messages in a single call, even if the request byte threshold has not been exceed yet. The
* default value is {@link Integer#MAX_VALUE} messages.
* <li><b>Request Byte Threshold</b>: Once the number of bytes in the batched request reaches this
* threshold, send all of the messages in a single call, even if message count threshold has
* not been exceeded yet. The default value is {@link Long#MAX_VALUE} bytes.
* </ul>
*
* <p>These thresholds are treated as triggers, not as limits. Each threshold is an independent
* trigger and doesn't have any knowledge of the other thresholds.
*/
@BetaApi("The surface for batching is not stable yet and may change in the future.")
@AutoValue
public abstract class BatchingSettings {

/** Get the element count threshold to use for batching. */
public abstract int getElementCountThreshold();

/** Get the request byte threshold to use for batching. */
public abstract long getRequestByteThreshold();

/** Get a new builder. */
public static Builder newBuilder() {
return new AutoValue_BatchingSettings.Builder()
.setElementCountThreshold(Integer.MAX_VALUE)
.setRequestByteThreshold(Long.MAX_VALUE);
Copy link
Contributor

@vam-google vam-google Jun 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use some reasonably big value here (and in element count) instead of MAX_VALUE? People most probably will go with default values first (yes, everything is documented, but people read documentaiton only after something broke..). The default values can result in OutOfMemoryException quite easily. And even if not, it may be confusing for people that their requests do not get sent for some reason (because it just keeps accumulating everything).

Another option is to always enforce setting those values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 audiences here:

  • client developers
  • end users

The client developers will configure defaults per service in their StubSettings. Each service will have different profiles. When looking at the code:

  BatchingSettings.newBuilder().setMaxElementCount(100).build(); 

I would be very surprised if it flushed a single large element. I would want it to be spelled out in the StubSetting defaults as

  BatchingSettings.newBuilder().setElementCountThreshold(100).setRequestByteThreshold(1024).build(); 

The other audience is the end users, the expectations with them is that they should use toBuilder() on the client developer provided instance to tweak stuff

Copy link
Contributor

@vam-google vam-google Jun 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@igorbernstein2 Who are the client developers? Do you mean the manual layer on top of this? If yes, then most of the generated clients don't have that wrapper, so the there is simply no toBuilder() provided for them by the client developer.

This can also be done in gapic-generator level (generator will be the repo, where the "default" value can be found), ut it seems like an unnecessary complication (why to split in in two places and make more confusing, when it can be just here and documented so people can easily find the default value). Also, currently MAX_VALUE is the default value, and if it actually (in the manual wrapper or gapic-generator) is always set to something different by default, then it is even more confusing (we end up in a situation, when we have a default value, which is by default overridden by another default value).

In general it looks like this value should not have a default value at all, and build() on the builder should fail if there is not explicit value provided (which i think is perfectly fine, and probably the best solution here, enforcing setting values, which are hard/impossible to have default values for)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with no default

}

/** Get a builder with the same values as this object. */
public abstract Builder toBuilder();

/**
* See the class documentation of {@link BatchingSettings} for a description of the different
* values that can be set.
*/
@AutoValue.Builder
public abstract static class Builder {
/**
* Set the element count threshold to use for batching. After this many elements are
* accumulated, they will be wrapped up in a batch and sent.
*/
public abstract Builder setElementCountThreshold(int elementCountThreshold);

/**
* Set the request byte threshold to use for batching. After this many bytes are accumulated,
* the elements will be wrapped up in a batch and sent.
*/
public abstract Builder setRequestByteThreshold(long requestByteThreshold);

abstract BatchingSettings autoBuild();

/** Build the BatchingSettings object. */
public BatchingSettings build() {
BatchingSettings settings = autoBuild();
Preconditions.checkState(
settings.getElementCountThreshold() > 0, "elementCountThreshold must be positive");
Preconditions.checkState(
settings.getRequestByteThreshold() > 0, "requestByteThreshold must be positive");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think allowing 0 is ok here to signify that batching is disabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have updated the BatchingSettings to accept 0 and added a line in Javadoc for the same.
Now BatcherImpl.Batch#hasAnyThresholdReached checks the disability.

return settings;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -64,11 +65,25 @@ public class BatcherImplTest {

private Batcher<Integer, Integer> underTest;
private LabeledIntList labeledIntList = new LabeledIntList("Default");
private BatchingSettings batchingSettings =
BatchingSettings.newBuilder()
.setRequestByteThreshold(1000L)
.setElementCountThreshold(1000)
.build();

@After
public void tearDown() throws InterruptedException {
if (underTest != null) {
underTest.close();
}
}

/** The accumulated results in the test are resolved when {@link Batcher#flush()} is called. */
@Test
public void testResultsAreResolvedAfterFlush() throws Exception {
underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList);
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, batchingSettings);
Future<Integer> result = underTest.add(4);
assertThat(result.isDone()).isFalse();
underTest.flush();
Expand All @@ -84,7 +99,8 @@ public void testResultsAreResolvedAfterFlush() throws Exception {
public void testWhenBatcherIsClose() throws Exception {
Future<Integer> result;
try (Batcher<Integer, Integer> batcher =
new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList)) {
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, batchingSettings)) {
result = batcher.add(5);
}
assertThat(result.isDone()).isTrue();
Expand All @@ -94,7 +110,9 @@ public void testWhenBatcherIsClose() throws Exception {
/** Validates exception when batch is called after {@link Batcher#close()}. */
@Test
public void testNoElementAdditionAfterClose() throws Exception {
underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList);
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, batchingSettings);
underTest.close();
Throwable actualError = null;
try {
Expand All @@ -109,7 +127,9 @@ public void testNoElementAdditionAfterClose() throws Exception {
/** Verifies unaryCallable is being called with a batch. */
@Test
public void testResultsAfterRPCSucceed() throws Exception {
underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList);
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList, batchingSettings);
when(mockUnaryCallable.futureCall(any(LabeledIntList.class)))
.thenReturn(ApiFutures.immediateFuture(Arrays.asList(16, 25)));

Expand All @@ -126,7 +146,9 @@ public void testResultsAfterRPCSucceed() throws Exception {
/** Verifies exception occurred at RPC is propagated to element results */
@Test
public void testResultFailureAfterRPCFailure() throws Exception {
underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList);
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList, batchingSettings);
final Exception fakeError = new RuntimeException();

when(mockUnaryCallable.futureCall(any(LabeledIntList.class)))
Expand All @@ -149,7 +171,8 @@ public void testResultFailureAfterRPCFailure() throws Exception {
/** Resolves future results when {@link BatchingDescriptor#splitResponse} throws exception. */
@Test
public void testExceptionInDescriptor() throws InterruptedException {
underTest = new BatcherImpl<>(mockDescriptor, callLabeledIntSquarer, labeledIntList);
underTest =
new BatcherImpl<>(mockDescriptor, callLabeledIntSquarer, labeledIntList, batchingSettings);

final RuntimeException fakeError = new RuntimeException("internal exception");
when(mockDescriptor.newRequestBuilder(any(LabeledIntList.class)))
Expand Down Expand Up @@ -178,7 +201,8 @@ public void testExceptionInDescriptor() throws InterruptedException {
/** Resolves future results when {@link BatchingDescriptor#splitException} throws exception */
@Test
public void testExceptionInDescriptorErrorHandling() throws InterruptedException {
underTest = new BatcherImpl<>(mockDescriptor, mockUnaryCallable, labeledIntList);
underTest =
new BatcherImpl<>(mockDescriptor, mockUnaryCallable, labeledIntList, batchingSettings);

final RuntimeException fakeRpcError = new RuntimeException("RPC error");
final RuntimeException fakeError = new RuntimeException("internal exception");
Expand All @@ -203,4 +227,30 @@ public void testExceptionInDescriptorErrorHandling() throws InterruptedException
verify(mockDescriptor)
.splitException(any(Throwable.class), Mockito.<SettableApiFuture<Integer>>anyList());
}

@Test
public void testWhenElementCountExceeds() throws Exception {
BatchingSettings settings = batchingSettings.toBuilder().setElementCountThreshold(2).build();
testElementTriggers(settings);
}

@Test
public void testWhenElementBytesExceeds() throws Exception {
BatchingSettings settings = batchingSettings.toBuilder().setRequestByteThreshold(2L).build();
testElementTriggers(settings);
}

private void testElementTriggers(BatchingSettings settings) throws Exception {
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings);
Future result = underTest.add(4);
assertThat(result.isDone()).isFalse();
// After this element is added, the batch triggers sendBatch().
Future anotherResult = underTest.add(5);
// Both the elements should be resolved now.
assertThat(result.isDone()).isTrue();
assertThat(result.get()).isEqualTo(16);
assertThat(anotherResult.isDone()).isTrue();
}
}