Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Batcher implementation with Synchronized Batcher#flush(without triggers) #716

Merged
merged 9 commits into from
Jun 13, 2019
215 changes: 215 additions & 0 deletions gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching.v2;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Queues up the elements until {@link #flush()} is called, once batching is finished returned
* future gets resolves.
*
* <p>This class is not thread-safe, and expects to be used from a single thread.
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
*/
@BetaApi("The surface for batching is not stable yet and may change in the future.")
@InternalExtensionOnly("For google-cloud-java client use only.")
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
implements Batcher<ElementT, ElementResultT> {

/** The amount of time to wait before checking responses are received or not. */
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT>
batchingDescriptor;

private final UnaryCallable<RequestT, ResponseT> callable;
private final RequestT prototype;

private final AtomicInteger numOfRpcs = new AtomicInteger(0);
private final AtomicBoolean isFlushed = new AtomicBoolean(false);
private final Semaphore semaphore = new Semaphore(0);
private Batch<ElementT, ElementResultT, RequestT> currentOpenBatch;
private boolean isClosed = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make it volatile

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used by the single caller thread, so it doesn't need to be volatile

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but it is practically free to make it such, ans feels safer (a little bit of defensive programming never hurts in concurrent programming).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally prefer to avoid defensive programming because you end up hiding the intention of the code. But I agree with you that volatile is free here. So if you feel strongly, then I won't object

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would still add volatile

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rahulKQL can you add the volatile and I will merge this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry, I haven't marked this ongoing conversation as closed.
I have already added a volatile for isClosed flag.


private BatcherImpl(Builder<ElementT, ElementResultT, RequestT, ResponseT> builder) {
this.prototype = checkNotNull(builder.prototype, "RequestPrototype cannot be null.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not put period (.) at the end of messages like this to stay consistent with the rest of the codebase (applies to other places as well, like line 113).

Also the message probably should not use the class names to indicate what is null (either use field names i.e prototype instead of RequestPrototype, or regular langugae to describe what is null, otherwise it looks like the "class" itself is null).

this.callable = checkNotNull(builder.unaryCallable, "UnaryCallable cannot be null.");
this.batchingDescriptor =
checkNotNull(builder.batchingDescriptor, "BatchingDescriptor cannot be null.");
}

/** Builder for a BatcherImpl. */
public static class Builder<ElementT, ElementResultT, RequestT, ResponseT> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be @Autovalue? I'm not a big fan of @Autovalue but we already use it extensively in gax-java, so lets reuse it here if possible to reduce boilerplate code typing.

Copy link
Contributor

@igorbernstein2 igorbernstein2 Jun 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't consider BatcherImpl is a value object. So i'm not sure how AutoValue can help here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it has to be a value object, it can be anything, which can be constructed with a builder.
Now when I look at it, do we even need a builder here? I'm just trying to figure out if we can reduce boilerplate code here (either by generating builder with the autovalue annotation or not having it at all).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think a builder is necessary here. Given the choice between AutoValue and removing the builder I would prefer removing the builder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the Builder from the current change and annotated this class with @AutoValue.

Just a side note, We may need to come back on this when introducing flow controllers. As after implementing triggers, we would be expecting 6 arguments(i.e. existing ones + BatchingSettings, FlowController, and an Executor).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR version: please remove AutoValue for the time being.

Longer version:
I've given this more thought, and I really think that AutoValue is really the wrong tool for this. In the best practices it states

Don't use AutoValue to implement value semantics unless you really want value semantics. In particular, you should never care about the difference between two equal instances.

This is definitely not the case for BatcherImpl since a Batcher has mutable state and we do care which instance we call flush on. Also, using AutoValue here implies that equality will use value semantics and be determined by the prototype, callable & batchingDescriptor:

BatcherImpl.Builder builder = ...;

Batcher b1 = builder.build();
Batcher b2 = builder.build();
b2.add(...)

b1.equals(b2) == true; 

Which makes no sense. I think the only meaningful equality here is referential and goes against the best practices of AutoValue.

Since you already removed the Builder until a later PR, please remove the use of AutoValue here since it buys you very little. Once that PR comes up I'll chat with vam about the best path forward.

private BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor;
private UnaryCallable<RequestT, ResponseT> unaryCallable;
private RequestT prototype;

private Builder() {}

public Builder<ElementT, ElementResultT, RequestT, ResponseT> setBatchingDescriptor(
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor) {
this.batchingDescriptor = batchingDescriptor;
return this;
}

public Builder<ElementT, ElementResultT, RequestT, ResponseT> setUnaryCallable(
UnaryCallable<RequestT, ResponseT> unaryCallable) {
this.unaryCallable = unaryCallable;
return this;
}

public Builder<ElementT, ElementResultT, RequestT, ResponseT> setPrototype(RequestT prototype) {
this.prototype = prototype;
return this;
}

public BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT> build() {
return new BatcherImpl<>(this);
}
}

public static <EntryT, EntryResultT, RequestT, ResponseT>
Builder<EntryT, EntryResultT, RequestT, ResponseT> newBuilder() {
return new Builder<>();
}

/** {@inheritDoc} */
@Override
public ApiFuture<ElementResultT> add(ElementT element) {
Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher.");

if (currentOpenBatch == null) {
currentOpenBatch = new Batch<>(batchingDescriptor.newRequestBuilder(prototype));
}

SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
currentOpenBatch.add(element, result);
return result;
}

/** {@inheritDoc} */
@Override
public void flush() throws InterruptedException {
sendBatch();
isFlushed.compareAndSet(false, true);
if (numOfRpcs.get() > 0) {
semaphore.acquire();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this. Can we use a semaphore instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a semaphore implementation 0 permits. Please let me know if this impl looks good to you
private final AtomicInteger numOfRpcs = new AtomicInteger(0);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, semaphore doesn't seem quite right for this. Sorry for misleading you on that

Maybe something like this:

AtomicInteger numOutstandingBatches = new AtomicInteger();
Object flushLock = new Object();

void sendBatch() {
  numOutstandingBatches.increment();
}

void onBatchCompletion() {
   if (numOutstandingBatches.decrementAndGet() == 0) {
   flushLock.notifyAll();
 }
}

void flush() {
  sendBatch();
  awaitAllOutstandingBatches();
}

void awaitAllOutstandingBatches() {
  while(numOutstandingBatches.get() > 0) {
    synchronized(lock) {
      flushLock.wait();
    }
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed steps, I have implemented with the same for Batcher#flush(). Please have a look.

}

/** Sends accumulated elements asynchronously for batching. */
private void sendBatch() {
if (currentOpenBatch == null) {
return;
}
final Batch<ElementT, ElementResultT, RequestT> accumulatedBatch = currentOpenBatch;
currentOpenBatch = null;
numOfRpcs.incrementAndGet();

final ApiFuture<ResponseT> batchResponse =
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
callable.futureCall(accumulatedBatch.builder.build());

ApiFutures.addCallback(
batchResponse,
new ApiFutureCallback<ResponseT>() {
@Override
public void onSuccess(ResponseT response) {
try {
batchingDescriptor.splitResponse(response, accumulatedBatch.results);
} catch (Throwable ex) {
for (SettableApiFuture<ElementResultT> result : accumulatedBatch.results) {
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
result.setException(ex);
}
} finally {
onCompletion();
}
}

@Override
public void onFailure(Throwable throwable) {
try {
batchingDescriptor.splitException(throwable, accumulatedBatch.results);
} catch (Throwable ex) {
for (SettableApiFuture<ElementResultT> result : accumulatedBatch.results) {
result.setException(ex);
}
} finally {
onCompletion();
}
}
},
directExecutor());
}

private void onCompletion() {
if (numOfRpcs.decrementAndGet() == 0 && isFlushed.get()) {
semaphore.release();
isFlushed.compareAndSet(true, false);
}
}

/** {@inheritDoc} */
@Override
public void close() throws InterruptedException {
isClosed = true;
flush();
}

/**
* This class represent one logical Batch. It accumulates all the elements and it's corresponding
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
* future element results for one batch.
*/
private static class Batch<ElementT, ElementResultT, RequestT> {
private final RequestBuilder<ElementT, RequestT> builder;
private final List<SettableApiFuture<ElementResultT>> results;

private Batch(RequestBuilder<ElementT, RequestT> builder) {
this.builder = builder;
this.results = new ArrayList<>();
}

void add(ElementT element, SettableApiFuture<ElementResultT> result) {
builder.add(element);
results.add(result);
}
}
}
Loading