Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
Moving new Batcher API to existing Batching package (#771)
Browse files Browse the repository at this point in the history
* Moving Batcher V2 to existing batching package

 - Renamed to `v2.RequestBuilder` to `BatchingRequestBuilder`.
 - Adapted to existing BatchingSettings.
 - In case ElementCountThreshold or RequestByteCountThreshold is null, then using 0 as threshold level.
 - Defaulting to 1ms in case delayThreshold value is unset.

* Updated delayThreshold behavior according to existing settings

 - Cancelling the auto flush in case delaythreshold is unset by user.

* Fixed unit test when delayThreshold not provided
  • Loading branch information
rahulKQL authored and igorbernstein2 committed Aug 13, 2019
1 parent 602a796 commit 0de16d5
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching.v2;
package com.google.api.gax.batching;

import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching.v2;
package com.google.api.gax.batching;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

Expand All @@ -40,11 +40,12 @@
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -74,7 +75,7 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0);
private final Object flushLock = new Object();
private final Object elementLock = new Object();
private final ScheduledFuture<?> scheduledFuture;
private final Future<?> scheduledFuture;
private volatile boolean isClosed = false;

/**
Expand All @@ -100,12 +101,15 @@ public BatcherImpl(
Preconditions.checkNotNull(executor, "executor cannot be null");
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings);

long delay = batchingSettings.getDelayThreshold().toMillis();
PushCurrentBatchRunnable<ElementT, ElementResultT, RequestT, ResponseT> runnable =
new PushCurrentBatchRunnable<>(this);
scheduledFuture =
executor.scheduleWithFixedDelay(runnable, delay, delay, TimeUnit.MILLISECONDS);
runnable.setScheduledFuture(scheduledFuture);
if (batchingSettings.getDelayThreshold() != null) {
long delay = batchingSettings.getDelayThreshold().toMillis();
PushCurrentBatchRunnable<ElementT, ElementResultT, RequestT, ResponseT> runnable =
new PushCurrentBatchRunnable<>(this);
scheduledFuture =
executor.scheduleWithFixedDelay(runnable, delay, delay, TimeUnit.MILLISECONDS);
} else {
scheduledFuture = Futures.immediateCancelledFuture();
}
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -208,7 +212,7 @@ public void close() throws InterruptedException {
* future results for one batch.
*/
private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private final RequestBuilder<ElementT, RequestT> builder;
private final BatchingRequestBuilder<ElementT, RequestT> builder;
private final List<SettableApiFuture<ElementResultT>> results;
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor;
private final long elementThreshold;
Expand All @@ -223,9 +227,11 @@ private Batch(
BatchingSettings batchingSettings) {
this.descriptor = descriptor;
this.builder = descriptor.newRequestBuilder(prototype);
this.elementThreshold = batchingSettings.getElementCountThreshold();
this.bytesThreshold = batchingSettings.getRequestByteThreshold();
this.results = new ArrayList<>();
Long elementCountThreshold = batchingSettings.getElementCountThreshold();
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold;
}

void add(ElementT element, SettableApiFuture<ElementResultT> result) {
Expand Down Expand Up @@ -272,7 +278,7 @@ boolean hasAnyThresholdReached() {
static class PushCurrentBatchRunnable<ElementT, ElementResultT, RequestT, ResponseT>
implements Runnable {

private ScheduledFuture<?> scheduledFuture;
private Future<?> scheduledFuture;
private final WeakReference<BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>>
batcherReferent;

Expand All @@ -290,7 +296,7 @@ public void run() {
}
}

void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
void setScheduledFuture(Future<?> scheduledFuture) {
this.scheduledFuture = scheduledFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching.v2;
package com.google.api.gax.batching;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
Expand Down Expand Up @@ -89,7 +89,7 @@ public interface BatchingDescriptor<ElementT, ElementResultT, RequestT, Response
* Creates a new wrapper for the underlying request builder. It's used to pack the current batch
* request with elements.
*/
RequestBuilder<ElementT, RequestT> newRequestBuilder(RequestT prototype);
BatchingRequestBuilder<ElementT, RequestT> newRequestBuilder(RequestT prototype);

/** Unpacks the batch response into individual elements results. */
void splitResponse(ResponseT batchResponse, List<SettableApiFuture<ElementResultT>> batch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching.v2;
package com.google.api.gax.batching;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
Expand All @@ -43,7 +43,7 @@
*/
@BetaApi("The surface for batching is not stable yet and may change in the future.")
@InternalExtensionOnly("For google-cloud-java client use only.")
public interface RequestBuilder<ElementT, RequestT> {
public interface BatchingRequestBuilder<ElementT, RequestT> {

/** Adds element object into client specific batch request. */
void add(ElementT element);
Expand Down
135 changes: 0 additions & 135 deletions gax/src/main/java/com/google/api/gax/batching/v2/BatchingSettings.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching.v2;
package com.google.api.gax.batching;

import static com.google.api.gax.rpc.testing.FakeBatchableApi.SQUARER_BATCHING_DESC_V2;
import static com.google.api.gax.rpc.testing.FakeBatchableApi.callLabeledIntSquarer;
Expand Down Expand Up @@ -69,8 +69,8 @@ public class BatcherImplTest {
private final LabeledIntList labeledIntList = new LabeledIntList("Default");
private final BatchingSettings batchingSettings =
BatchingSettings.newBuilder()
.setElementCountThreshold(1000L)
.setRequestByteThreshold(1000L)
.setElementCountThreshold(1000)
.setDelayThreshold(Duration.ofSeconds(1))
.build();

Expand Down Expand Up @@ -237,7 +237,7 @@ public void splitException(Throwable throwable, List<SettableApiFuture<Integer>>

@Test
public void testWhenElementCountExceeds() throws Exception {
BatchingSettings settings = batchingSettings.toBuilder().setElementCountThreshold(2).build();
BatchingSettings settings = batchingSettings.toBuilder().setElementCountThreshold(2L).build();
testElementTriggers(settings);
}

Expand All @@ -251,9 +251,9 @@ public void testWhenElementBytesExceeds() throws Exception {
public void testWhenThresholdIsDisabled() throws Exception {
BatchingSettings settings =
BatchingSettings.newBuilder()
.setElementCountThreshold(0)
.setRequestByteThreshold(0)
.setDelayThreshold(Duration.ofMillis(1))
.setElementCountThreshold(null)
.setRequestByteThreshold(null)
.setDelayThreshold(null)
.build();
underTest =
new BatcherImpl<>(
Expand All @@ -266,7 +266,7 @@ public void testWhenThresholdIsDisabled() throws Exception {
@Test
public void testWhenDelayThresholdExceeds() throws Exception {
BatchingSettings settings =
batchingSettings.toBuilder().setDelayThreshold(Duration.ofMillis(200)).build();
batchingSettings.toBuilder().setDelayThreshold(Duration.ofMillis(100)).build();
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings, EXECUTOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingRequestBuilder;
import com.google.api.gax.batching.PartitionKey;
import com.google.api.gax.batching.RequestBuilder;
import com.google.api.gax.rpc.ApiCallContext;
Expand Down Expand Up @@ -181,13 +182,13 @@ public long countBytes(LabeledIntList request) {
new SquarerBatchingDescriptorV2();

public static class SquarerBatchingDescriptorV2
implements com.google.api.gax.batching.v2.BatchingDescriptor<
implements com.google.api.gax.batching.BatchingDescriptor<
Integer, Integer, LabeledIntList, List<Integer>> {

@Override
public com.google.api.gax.batching.v2.RequestBuilder<Integer, LabeledIntList> newRequestBuilder(
public BatchingRequestBuilder<Integer, LabeledIntList> newRequestBuilder(
final LabeledIntList prototype) {
return new com.google.api.gax.batching.v2.RequestBuilder<Integer, LabeledIntList>() {
return new BatchingRequestBuilder<Integer, LabeledIntList>() {
final LabeledIntList labelList = prototype.clone();

@Override
Expand Down

0 comments on commit 0de16d5

Please sign in to comment.