-
Notifications
You must be signed in to change notification settings - Fork 106
Batcher implementation with Synchronized Batcher#flush
(without triggers)
#716
Conversation
@igorbernstein2 Please have a look. |
Codecov Report
@@ Coverage Diff @@
## master #716 +/- ##
============================================
+ Coverage 75.74% 77.65% +1.91%
- Complexity 1041 1091 +50
============================================
Files 196 197 +1
Lines 4679 4776 +97
Branches 363 371 +8
============================================
+ Hits 3544 3709 +165
+ Misses 975 898 -77
- Partials 160 169 +9
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add some first pass comments. Will take a deeper look in the next couple of days
gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java
Outdated
Show resolved
Hide resolved
sendBatch(); | ||
while (numOfRpcs.get() > 0) { | ||
LockSupport.parkNanos(DEFAULT_FINISH_WAIT_NANOS); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love this. Can we use a semaphore instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a semaphore implementation 0 permits. Please let me know if this impl looks good to you
private final AtomicInteger numOfRpcs = new AtomicInteger(0);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, semaphore doesn't seem quite right for this. Sorry for misleading you on that
Maybe something like this:
AtomicInteger numOutstandingBatches = new AtomicInteger();
Object flushLock = new Object();
void sendBatch() {
numOutstandingBatches.increment();
}
void onBatchCompletion() {
if (numOutstandingBatches.decrementAndGet() == 0) {
flushLock.notifyAll();
}
}
void flush() {
sendBatch();
awaitAllOutstandingBatches();
}
void awaitAllOutstandingBatches() {
while(numOutstandingBatches.get() > 0) {
synchronized(lock) {
flushLock.wait();
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed steps, I have implemented with the same for Batcher#flush()
. Please have a look.
gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/v2/BatchingCallSettings.java
Outdated
Show resolved
Hide resolved
* This class represent one logical Batch. It accumulates all the elements and it's corresponding | ||
* future element results for one batch. | ||
*/ | ||
class Batch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention was not to declare type params for inner class... but Have updated it to private static class
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment to make that clear
gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/v2/BatchingCallSettings.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/v2/BatchingDescriptor.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/v2/BatchingCallSettings.java
Outdated
Show resolved
Hide resolved
gax/src/test/java/com/google/api/gax/batching/v2/BatchingCallSettingsTest.java
Outdated
Show resolved
Hide resolved
gax/src/test/java/com/google/api/gax/rpc/testing/FakeBatchableApi.java
Outdated
Show resolved
Hide resolved
gax/src/test/java/com/google/api/gax/rpc/testing/FakeBatchableApi.java
Outdated
Show resolved
Hide resolved
gax/src/test/java/com/google/api/gax/batching/v2/BatchingCallSettingsTest.java
Outdated
Show resolved
Hide resolved
…gers) - This implementation does not include any triggers for sending received elements for batching.(This will be added in followup PRs). - User has to explicitly call the `Batcher#flush()` to send the accumulated request for batching. - Here `BatcherImpl` expects `v2.BatchingDescriptor`, `v2.BatchingCallSetting`, UnaryCallable and a request prototype containing repetitive data which would be copied over to each batch along with the elements. - `v2.BatchingCallSettings` is an extension from existing `BatchingCallSettings`.
…chingCallSettings.java`
@igorbernstein2 Also, I decided to remove |
gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java
Outdated
Show resolved
Hide resolved
sendBatch(); | ||
while (numOfRpcs.get() > 0) { | ||
LockSupport.parkNanos(DEFAULT_FINISH_WAIT_NANOS); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, semaphore doesn't seem quite right for this. Sorry for misleading you on that
Maybe something like this:
AtomicInteger numOutstandingBatches = new AtomicInteger();
Object flushLock = new Object();
void sendBatch() {
numOutstandingBatches.increment();
}
void onBatchCompletion() {
if (numOutstandingBatches.decrementAndGet() == 0) {
flushLock.notifyAll();
}
}
void flush() {
sendBatch();
awaitAllOutstandingBatches();
}
void awaitAllOutstandingBatches() {
while(numOutstandingBatches.get() > 0) {
synchronized(lock) {
flushLock.wait();
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, except for the timeout.
I'll assume that you will deal with synchronization when implementing timed flush
@vam-google can you take a look as well?
private void awaitAllOutstandingBatches() throws InterruptedException { | ||
while (numOfOutstandingBatches.get() > 0) { | ||
synchronized (flushLock) { | ||
flushLock.wait(DEFAULT_WAIT_TIME_MS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand why we need a timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this as a defense, Also inspired by existing CBT-OperationalAccountant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is it defending against?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this for the case of RPC taking too long, which would enable this thread to keep on checking the numOfOutStandingBatch.
I now realized that this would not add any meaning. The only update we are waiting here would be from onBatchComplete's notifyAll(), which will execute on all cases.
Thanks a lot for pointing it out. I will remove this timeout.
gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java
Outdated
Show resolved
Hide resolved
gax/src/test/java/com/google/api/gax/batching/v2/BatcherImplTest.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java
Outdated
Show resolved
Hide resolved
private boolean isClosed = false; | ||
|
||
private BatcherImpl(Builder<ElementT, ElementResultT, RequestT, ResponseT> builder) { | ||
this.prototype = checkNotNull(builder.prototype, "RequestPrototype cannot be null."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not put period (.
) at the end of messages like this to stay consistent with the rest of the codebase (applies to other places as well, like line 113).
Also the message probably should not use the class names to indicate what is null (either use field names i.e prototype
instead of RequestPrototype
, or regular langugae to describe what is null, otherwise it looks like the "class" itself is null).
} | ||
|
||
/** Builder for a BatcherImpl. */ | ||
public static class Builder<ElementT, ElementResultT, RequestT, ResponseT> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be @Autovalue
? I'm not a big fan of @Autovalue
but we already use it extensively in gax-java, so lets reuse it here if possible to reduce boilerplate code typing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't consider BatcherImpl is a value object. So i'm not sure how AutoValue can help here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it has to be a value object, it can be anything, which can be constructed with a builder.
Now when I look at it, do we even need a builder here? I'm just trying to figure out if we can reduce boilerplate code here (either by generating builder with the autovalue annotation or not having it at all).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think a builder is necessary here. Given the choice between AutoValue and removing the builder I would prefer removing the builder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed the Builder from the current change and annotated this class with @AutoValue
.
Just a side note, We may need to come back on this when introducing flow controllers. As after implementing triggers, we would be expecting 6 arguments(i.e. existing ones + BatchingSettings, FlowController, and an Executor).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TLDR version: please remove AutoValue for the time being.
Longer version:
I've given this more thought, and I really think that AutoValue is really the wrong tool for this. In the best practices it states
Don't use AutoValue to implement value semantics unless you really want value semantics. In particular, you should never care about the difference between two equal instances.
This is definitely not the case for BatcherImpl since a Batcher has mutable state and we do care which instance we call flush on. Also, using AutoValue here implies that equality will use value semantics and be determined by the prototype, callable & batchingDescriptor:
BatcherImpl.Builder builder = ...;
Batcher b1 = builder.build();
Batcher b2 = builder.build();
b2.add(...)
b1.equals(b2) == true;
Which makes no sense. I think the only meaningful equality here is referential and goes against the best practices of AutoValue.
Since you already removed the Builder until a later PR, please remove the use of AutoValue here since it buys you very little. Once that PR comes up I'll chat with vam about the best path forward.
gax/src/main/java/com/google/api/gax/batching/v2/BatcherImpl.java
Outdated
Show resolved
Hide resolved
|
||
private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0); | ||
private final Object flushLock = new Object(); | ||
private boolean isClosed = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make it volatile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only used by the single caller thread, so it doesn't need to be volatile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, but it is practically free to make it such, ans feels safer (a little bit of defensive programming never hurts in concurrent programming).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally prefer to avoid defensive programming because you end up hiding the intention of the code. But I agree with you that volatile is free here. So if you feel strongly, then I won't object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would still add volatile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rahulKQL can you add the volatile and I will merge this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry, I haven't marked this ongoing conversation as closed.
I have already added a volatile
for isClosed
flag.
@vam-google, @igorbernstein2, I have tried to address your feedback, Please have a fresh look at this change. |
Now BatcherImpl is created with @autovalue. Also made `isClosed` field as volatile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@vam-google Thanks for the review! |
This is a follow-up PR for #692
What this PR contains
Batcher#flush()
to send the accumulated request for batching.BatcherImpl
expectsv2.BatchingDescriptor
,UnaryCallable
and arequest prototype
containing repetitive data which would be copied over to each batch along with the bundle of elements.v2.BatchingCallSettings
is an extension from existingBatchingCallSettings
.250ms
.Follow-ups
Once this change is merged I will raise a separate PR with triggers, thresholds, and flowController for
BatcherImpl
.