Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
Remove unnecessary classes from new Batcher
Browse files Browse the repository at this point in the history
Have refactored the BatcherImpl to remove unneed code.
 - Now utilizing RequestBuilder as CurrentBatch.
 - Removed BatcherAccumalator as it was sharing same responsibility.
 - Added Shutdown operation, if any client want to shutdown the current execution as well.
  • Loading branch information
rahulKQL committed Mar 20, 2019
1 parent daf4303 commit 671e6f5
Show file tree
Hide file tree
Showing 13 changed files with 294 additions and 539 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public abstract class BatchingSettings {
@Nullable
public abstract Duration getDelayThreshold();

@Deprecated
/** Returns the Boolean object to indicate if the batching is enabled. Default to true */
public abstract Boolean getIsEnabled();

Expand Down
118 changes: 0 additions & 118 deletions gax/src/main/java/com/google/api/gax/batching/v2/BatchAccumalator.java

This file was deleted.

6 changes: 6 additions & 0 deletions gax/src/main/java/com/google/api/gax/batching/v2/Batcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,10 @@ public interface Batcher<EntryT, ResultT> extends AutoCloseable {

/** Closes the ScheduledExecutor if not shutdown yet. */
void close();

/**
* Initiates an orderly shutdown in which previously submitted work is finished, but no new work
* will be accepted. Invocation has no additional effect if already shut down.
*/
void shutdown();
}
170 changes: 3 additions & 167 deletions gax/src/main/java/com/google/api/gax/batching/v2/BatcherFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,172 +29,8 @@
*/
package com.google.api.gax.batching.v2;

import com.google.api.core.BetaApi;
import com.google.api.gax.batching.BatchingFlowController;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.BatchingThreshold;
import com.google.api.gax.batching.ElementCounter;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.batching.NumericThreshold;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
public interface BatcherFactory<Entry, Result> {

/**
* This class creates a fresh {@link Batcher}, which have fresh set of thresholds as List of {@link
* NumericThreshold}.
*
* <p>This is public only for technical reasons, for advanced usage.
*/
@BetaApi("The surface for batching is not stable yet and may change in the future.")
public class BatcherFactory<EntryT, ResultT, RequestT, ResponseT>
implements IBatcherFactory<EntryT, ResultT> {
private final ScheduledExecutorService executor;
private final BatchingDescriptor<EntryT, ResultT, RequestT, ResponseT> batchingDescriptor;
private final FlowController flowController;
private final BatchingSettings batchingSettings;
private final UnaryCallable<RequestT, ResponseT> callable;

private BatcherFactory(Builder<EntryT, ResultT, RequestT, ResponseT> builder) {
this.batchingDescriptor = builder.batchingDescriptor;
this.batchingSettings = builder.batchingSettings;
this.executor = builder.executor;
this.flowController = builder.flowController;
this.callable = builder.callable;
}

public static class Builder<EntryT, ResultT, RequestT, ResponseT> {
private ScheduledExecutorService executor;
private BatchingDescriptor<EntryT, ResultT, RequestT, ResponseT> batchingDescriptor;
private FlowController flowController;
private BatchingSettings batchingSettings;
private UnaryCallable<RequestT, ResponseT> callable;

/** Sets the executor for the ThresholdBatcher. */
public Builder<EntryT, ResultT, RequestT, ResponseT> setExecutor(
ScheduledExecutorService executor) {
this.executor = executor;
return this;
}

/** Sets the BatchingDescriptor for EntryT & ResultT. */
public Builder<EntryT, ResultT, RequestT, ResponseT> setBatchingDescriptor(
BatchingDescriptor<EntryT, ResultT, RequestT, ResponseT> batchingDescriptor) {
this.batchingDescriptor = batchingDescriptor;
return this;
}

/** Sets the FlowController, to be used for creating BatchingFlowController. */
public Builder<EntryT, ResultT, RequestT, ResponseT> setFlowController(
FlowController flowController) {
this.flowController = flowController;
return this;
}

/** Sets the BatchingSettings, to be used for various threshold settings. */
public Builder<EntryT, ResultT, RequestT, ResponseT> setBatchingSettings(
BatchingSettings batchingSettings) {
this.batchingSettings = batchingSettings;
return this;
}

/** Sets the UnaryCallable, to complete RPC. */
public Builder<EntryT, ResultT, RequestT, ResponseT> setUnaryCallable(
UnaryCallable<RequestT, ResponseT> callable) {
this.callable = callable;
return this;
}

/** Builds the BatcherFactory */
public BatcherFactory<EntryT, ResultT, RequestT, ResponseT> build() {
return new BatcherFactory<>(this);
}
}

public static <EntryT, ResultT, RequestT, ResponseT>
Builder<EntryT, ResultT, RequestT, ResponseT> newBuilder() {
return new Builder<>();
}

/** {@inheritDoc} */
@Override
public Batcher<EntryT, ResultT> createBatcher() {
BatchingFlowController<EntryT> batchingFlowController =
new BatchingFlowController<>(
flowController,
new EntryCountThreshold<EntryT>(),
new EntryByteThreshold<>(batchingDescriptor));

return BatcherImpl.<EntryT, ResultT, RequestT, ResponseT>newBuilder()
.setThresholds(getThresholds(batchingSettings))
.setExecutor(executor)
.setMaxDelay(batchingSettings.getDelayThreshold())
.setBatchingDescriptor(batchingDescriptor)
.setFlowController(batchingFlowController)
.setUnaryCallable(callable)
.build();
}

/**
* Returns {@link List} of different thresholds based on values present in {@link
* BatchingSettings}.
*
* <p>This is public only for technical reasons, for advanced usage.
*/
private List<BatchingThreshold<EntryT>> getThresholds(BatchingSettings batchingSettings) {
ImmutableList.Builder<BatchingThreshold<EntryT>> listBuilder = ImmutableList.builder();

final Long elementCount = batchingSettings.getElementCountThreshold();
final Long byteCount = batchingSettings.getRequestByteThreshold();
if (elementCount != null) {
ElementCounter<EntryT> elementCounter = new EntryCountThreshold<>();

BatchingThreshold<EntryT> countThreshold =
new NumericThreshold<>(elementCount, elementCounter);
listBuilder.add(countThreshold);
}

if (byteCount != null) {
ElementCounter<EntryT> requestByte = new EntryByteThreshold<>(batchingDescriptor);

BatchingThreshold<EntryT> byteThreshold = new NumericThreshold<>(byteCount, requestByte);
listBuilder.add(byteThreshold);
}
return listBuilder.build();
}

/**
* As each Entry object will be considered one single element in the set, so it returns 1 for each
* count.
*
* <p>This is public only for technical reasons, for advanced usage.
*/
private static class EntryCountThreshold<EntryT> implements ElementCounter<EntryT> {
@Override
public long count(EntryT element) {
return 1;
}
}

/**
* Calculates bytes of an entry object sent for batching. Implementation of counting bytes are
* client dependent as it internally usage {@link BatchingDescriptor#countBytes(Object)}.
*
* <p>This is public only for technical reasons, for advanced usage.
*/
private static class EntryByteThreshold<EntryT, ResultT, RequestT, ResponseT>
implements ElementCounter<EntryT> {

private final BatchingDescriptor<EntryT, ResultT, RequestT, ResponseT> batchingDesc;

EntryByteThreshold(BatchingDescriptor<EntryT, ResultT, RequestT, ResponseT> batchingDesc) {
this.batchingDesc = batchingDesc;
}

@Override
public long count(EntryT element) {
return batchingDesc.countBytes(element);
}
}
/** Provides the {@link Batcher}, which can be used to start batching with entry object. */
Batcher<Entry, Result> createBatcher();
}
Loading

0 comments on commit 671e6f5

Please sign in to comment.