Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Implementing ElementCount and ElementBytes Triggers #734

Merged
merged 5 commits into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,39 @@
* @param <ResponseT> The type of the response that will unpack into individual element results.
*/
@BetaApi("The surface for batching is not stable yet and may change in the future.")
@InternalApi
@InternalApi("For google-cloud-java client use only")
public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
implements Batcher<ElementT, ElementResultT> {

private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT>
batchingDescriptor;
private final UnaryCallable<RequestT, ResponseT> unaryCallable;
private final RequestT prototype;
private final BatchingSettings batchingSettings;

private Batch<ElementT, ElementResultT, RequestT, ResponseT> currentOpenBatch;
private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0);
private final Object flushLock = new Object();
private volatile boolean isClosed = false;

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response.
* @param unaryCallable a {@link UnaryCallable} object.
* @param prototype a {@link RequestT} object.
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds.
*/
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
public BatcherImpl(
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
UnaryCallable<RequestT, ResponseT> unaryCallable,
RequestT prototype) {
RequestT prototype,
BatchingSettings batchingSettings) {
this.batchingDescriptor =
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor cannot be null");
this.unaryCallable = Preconditions.checkNotNull(unaryCallable, "callable cannot be null");
this.prototype = Preconditions.checkNotNull(prototype, "request prototype cannot be null");
this.batchingSettings =
Preconditions.checkNotNull(batchingSettings, "batching setting cannot be null");
}

/** {@inheritDoc} */
Expand All @@ -85,11 +96,15 @@ public ApiFuture<ElementResultT> add(ElementT element) {
Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher");

if (currentOpenBatch == null) {
currentOpenBatch = new Batch<>(prototype, batchingDescriptor);
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings);
}

SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
currentOpenBatch.add(element, result);

if (currentOpenBatch.hasAnyThresholdReached()) {
sendBatch();
}
return result;
}

Expand Down Expand Up @@ -167,18 +182,28 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private final RequestBuilder<ElementT, RequestT> builder;
private final List<SettableApiFuture<ElementResultT>> results;
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor;
private final long elementThreshold;
private final long bytesThreshold;

private long elementCounter = 0;
private long byteCounter = 0;

private Batch(
RequestT prototype,
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor) {
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor,
BatchingSettings batchingSettings) {
this.descriptor = descriptor;
this.builder = descriptor.newRequestBuilder(prototype);
this.elementThreshold = batchingSettings.getElementCountThreshold();
this.bytesThreshold = batchingSettings.getRequestByteThreshold();
this.results = new ArrayList<>();
}

void add(ElementT element, SettableApiFuture<ElementResultT> result) {
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
builder.add(element);
results.add(result);
elementCounter++;
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
byteCounter += descriptor.countBytes(element);
}

void onBatchSuccess(ResponseT response) {
Expand All @@ -198,5 +223,9 @@ void onBatchFailure(Throwable throwable) {
}
}
}

boolean hasAnyThresholdReached() {
return elementCounter >= elementThreshold || byteCounter >= bytesThreshold;
}
}
}
116 changes: 116 additions & 0 deletions gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching.v2;

import com.google.api.core.BetaApi;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;

/**
* Represents the batching settings to use for an API method that is capable of batching.
*
* <p>Each batching client must define their own set of default values for these thresholds, which
* would be the safest behavior for their jobs.
*
* <p>The default instance of this settings does not have any default values. Users are expected to
* configure batching thresholds explicitly: the element count or the request bytes count.
* Thresholds can be disabled(meaning immediate result of input elements) by setting its value to 0.
*
* <p>Warning: With the incorrect settings, it is possible to cause long periods of dead waiting
* time.
*
* <p>When batching is configured for an API method, a call to that method will result in the
* request being queued up with other requests. When any of the set thresholds are reached, the
* queued up requests are packaged together in a batch and set to the service as a single RPC. When
* the response comes back, it is split apart into individual responses according to the individual
* input requests.
*
* <p>There are several supported thresholds:
*
* <ul>
* <li><b>Element Count Threshold</b>: Once this many elements are queued, send all of the
* elements in a single call, even if the request byte threshold has not been exceed yet.
* <li><b>Request Byte Threshold</b>: Once the number of bytes in the batched request reaches this
* threshold, send all of the elements in a single call, even if element count threshold has
* not been exceeded yet.
* </ul>
*
* <p>These thresholds are treated as triggers, not as limits. Each threshold is an independent
* trigger and doesn't have any knowledge of the other thresholds.
*/
@BetaApi("The surface for batching is not stable yet and may change in the future.")
@AutoValue
public abstract class BatchingSettings {

/** Get the element count threshold to use for batching. */
public abstract int getElementCountThreshold();

/** Get the request byte threshold to use for batching. */
public abstract long getRequestByteThreshold();

/** Get a new builder. */
public static Builder newBuilder() {
return new AutoValue_BatchingSettings.Builder();
}

/** Get a builder with the same values as this object. */
public abstract Builder toBuilder();

/**
* See the class documentation of {@link BatchingSettings} for a description of the different
* values that can be set.
*/
@AutoValue.Builder
public abstract static class Builder {
/**
* Set the element count threshold to use for batching. After this many elements are
* accumulated, they will be wrapped up in a batch and sent.
*/
public abstract Builder setElementCountThreshold(int elementCountThreshold);

/**
* Set the request byte threshold to use for batching. After this many bytes are accumulated,
* the elements will be wrapped up in a batch and sent.
*/
public abstract Builder setRequestByteThreshold(long requestByteThreshold);

abstract BatchingSettings autoBuild();

/** Build the BatchingSettings object. */
public BatchingSettings build() {
BatchingSettings settings = autoBuild();
Preconditions.checkState(
settings.getElementCountThreshold() >= 0, "elementCountThreshold cannot be negative");
Preconditions.checkState(
settings.getRequestByteThreshold() >= 0, "requestByteThreshold cannot be negative");
return settings;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -63,12 +64,26 @@ public class BatcherImplTest {
@Mock private BatchingDescriptor<Integer, Integer, LabeledIntList, List<Integer>> mockDescriptor;

private Batcher<Integer, Integer> underTest;
private LabeledIntList labeledIntList = new LabeledIntList("Default");
private final LabeledIntList labeledIntList = new LabeledIntList("Default");
private final BatchingSettings batchingSettings =
BatchingSettings.newBuilder()
.setRequestByteThreshold(1000L)
.setElementCountThreshold(1000)
.build();

@After
public void tearDown() throws InterruptedException {
if (underTest != null) {
underTest.close();
}
}

/** The accumulated results in the test are resolved when {@link Batcher#flush()} is called. */
@Test
public void testResultsAreResolvedAfterFlush() throws Exception {
underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList);
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, batchingSettings);
Future<Integer> result = underTest.add(4);
assertThat(result.isDone()).isFalse();
underTest.flush();
Expand All @@ -84,7 +99,8 @@ public void testResultsAreResolvedAfterFlush() throws Exception {
public void testWhenBatcherIsClose() throws Exception {
Future<Integer> result;
try (Batcher<Integer, Integer> batcher =
new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList)) {
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, batchingSettings)) {
result = batcher.add(5);
}
assertThat(result.isDone()).isTrue();
Expand All @@ -94,7 +110,9 @@ public void testWhenBatcherIsClose() throws Exception {
/** Validates exception when batch is called after {@link Batcher#close()}. */
@Test
public void testNoElementAdditionAfterClose() throws Exception {
underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList);
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, batchingSettings);
underTest.close();
Throwable actualError = null;
try {
Expand All @@ -109,7 +127,9 @@ public void testNoElementAdditionAfterClose() throws Exception {
/** Verifies unaryCallable is being called with a batch. */
@Test
public void testResultsAfterRPCSucceed() throws Exception {
underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList);
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList, batchingSettings);
when(mockUnaryCallable.futureCall(any(LabeledIntList.class)))
.thenReturn(ApiFutures.immediateFuture(Arrays.asList(16, 25)));

Expand All @@ -126,7 +146,9 @@ public void testResultsAfterRPCSucceed() throws Exception {
/** Verifies exception occurred at RPC is propagated to element results */
@Test
public void testResultFailureAfterRPCFailure() throws Exception {
underTest = new BatcherImpl<>(SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList);
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, mockUnaryCallable, labeledIntList, batchingSettings);
final Exception fakeError = new RuntimeException();

when(mockUnaryCallable.futureCall(any(LabeledIntList.class)))
Expand All @@ -142,14 +164,15 @@ public void testResultFailureAfterRPCFailure() throws Exception {
actualError = ex;
}

assertThat(actualError.getCause()).isSameAs(fakeError);
assertThat(actualError.getCause()).isSameInstanceAs(fakeError);
verify(mockUnaryCallable, times(1)).futureCall(any(LabeledIntList.class));
}

/** Resolves future results when {@link BatchingDescriptor#splitResponse} throws exception. */
@Test
public void testExceptionInDescriptor() throws InterruptedException {
underTest = new BatcherImpl<>(mockDescriptor, callLabeledIntSquarer, labeledIntList);
underTest =
new BatcherImpl<>(mockDescriptor, callLabeledIntSquarer, labeledIntList, batchingSettings);

final RuntimeException fakeError = new RuntimeException("internal exception");
when(mockDescriptor.newRequestBuilder(any(LabeledIntList.class)))
Expand All @@ -170,15 +193,16 @@ public void testExceptionInDescriptor() throws InterruptedException {
actualError = ex;
}

assertThat(actualError.getCause()).isSameAs(fakeError);
assertThat(actualError.getCause()).isSameInstanceAs(fakeError);
verify(mockDescriptor)
.splitResponse(Mockito.<Integer>anyList(), Mockito.<SettableApiFuture<Integer>>anyList());
}

/** Resolves future results when {@link BatchingDescriptor#splitException} throws exception */
@Test
public void testExceptionInDescriptorErrorHandling() throws InterruptedException {
underTest = new BatcherImpl<>(mockDescriptor, mockUnaryCallable, labeledIntList);
underTest =
new BatcherImpl<>(mockDescriptor, mockUnaryCallable, labeledIntList, batchingSettings);

final RuntimeException fakeRpcError = new RuntimeException("RPC error");
final RuntimeException fakeError = new RuntimeException("internal exception");
Expand All @@ -199,8 +223,49 @@ public void testExceptionInDescriptorErrorHandling() throws InterruptedException
actualError = ex;
}

assertThat(actualError.getCause()).isSameAs(fakeError);
assertThat(actualError.getCause()).isSameInstanceAs(fakeError);
verify(mockDescriptor)
.splitException(any(Throwable.class), Mockito.<SettableApiFuture<Integer>>anyList());
}

@Test
public void testWhenElementCountExceeds() throws Exception {
BatchingSettings settings = batchingSettings.toBuilder().setElementCountThreshold(2).build();
testElementTriggers(settings);
}

@Test
public void testWhenElementBytesExceeds() throws Exception {
BatchingSettings settings = batchingSettings.toBuilder().setRequestByteThreshold(2L).build();
testElementTriggers(settings);
}

@Test
public void testWhenThresholdIsDisabled() throws Exception {
BatchingSettings settings =
BatchingSettings.newBuilder()
.setElementCountThreshold(0)
.setRequestByteThreshold(0)
.build();
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings);
Future<Integer> result = underTest.add(2);
assertThat(result.isDone()).isTrue();
assertThat(result.get()).isEqualTo(4);
}

private void testElementTriggers(BatchingSettings settings) throws Exception {
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings);
Future<Integer> result = underTest.add(4);
assertThat(result.isDone()).isFalse();
// After this element is added, the batch triggers sendBatch().
Future<Integer> anotherResult = underTest.add(5);
// Both the elements should be resolved now.
assertThat(result.isDone()).isTrue();
assertThat(result.get()).isEqualTo(16);
assertThat(anotherResult.isDone()).isTrue();
}
}