Skip to content

Commit

Permalink
Merge pull request googleapis#10 from pongad/executor
Browse files Browse the repository at this point in the history
Use a global Executor
  • Loading branch information
pongad committed Feb 17, 2016
2 parents e72fd57 + 55e9f34 commit 6a5e17c
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 69 deletions.
23 changes: 17 additions & 6 deletions src/main/java/com/google/api/gax/grpc/ApiCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

import java.util.concurrent.ScheduledExecutorService;

/**
* A callable is an object which represents one or more rpc calls. Various operators on callables
* produce new callables, representing common API programming patterns. Callables can be used to
Expand All @@ -53,7 +55,7 @@
public class ApiCallable<RequestT, ResponseT> {
private final FutureCallable<RequestT, ResponseT> callable;

ApiCallable(FutureCallable<RequestT, ResponseT> callable) {
private ApiCallable(FutureCallable<RequestT, ResponseT> callable) {
this.callable = callable;
}

Expand Down Expand Up @@ -149,16 +151,24 @@ public void asyncCall(RequestT request, StreamObserver<ResponseT> observer) {
*/
public static <ReqT, RespT> ApiCallable<ReqT, RespT> create(
MethodDescriptor<ReqT, RespT> descriptor) {
return create(new DescriptorClientCallFactory<>(descriptor));
return ApiCallable.<ReqT, RespT>create(new DescriptorClientCallFactory<>(descriptor));
}

/**
* Creates a callable which uses the {@link io.grpc.ClientCall}
* generated by the given {@code factory}
* generated by the given {@code factory}.
*/
public static <ReqT, RespT> ApiCallable<ReqT, RespT> create(
ClientCallFactory<ReqT, RespT> factory) {
return new ApiCallable<ReqT, RespT>(new DirectCallable<ReqT, RespT>(factory));
return ApiCallable.<ReqT, RespT>create(new DirectCallable<>(factory));
}

/**
* Creates a callable which uses the given {@link FutureCallable}.
*/
public static <ReqT, RespT> ApiCallable<ReqT, RespT> create(
FutureCallable<ReqT, RespT> callable) {
return new ApiCallable<ReqT, RespT>(callable);
}

/**
Expand All @@ -185,9 +195,10 @@ public ApiCallable<RequestT, ResponseT> retryableOn(ImmutableSet<Status.Code> re
* Creates a callable which retries using exponential back-off. Back-off parameters are defined
* by the given {@code retryParams}.
*/
public ApiCallable<RequestT, ResponseT> retrying(RetryParams retryParams) {
public ApiCallable<RequestT, ResponseT> retrying(
RetryParams retryParams, ScheduledExecutorService executor) {
return new ApiCallable<RequestT, ResponseT>(
new RetryingCallable<RequestT, ResponseT>(callable, retryParams));
new RetryingCallable<RequestT, ResponseT>(callable, retryParams, executor));
}

/**
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/com/google/api/gax/grpc/BundlingCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@
* pubsub topic.
*/
class BundlingCallable<RequestT, ResponseT> implements FutureCallable<RequestT, ResponseT> {
private FutureCallable<RequestT, ResponseT> callable;
private BundlingDescriptor<RequestT, ResponseT> bundlingDescriptor;
private BundlerFactory<RequestT, ResponseT> bundlerFactory;
private final FutureCallable<RequestT, ResponseT> callable;
private final BundlingDescriptor<RequestT, ResponseT> bundlingDescriptor;
private final BundlerFactory<RequestT, ResponseT> bundlerFactory;

public BundlingCallable(FutureCallable<RequestT, ResponseT> callable,
public BundlingCallable(
FutureCallable<RequestT, ResponseT> callable,
BundlingDescriptor<RequestT, ResponseT> bundlingDescriptor,
BundlerFactory<RequestT, ResponseT> bundlerFactory) {
this.callable = Preconditions.checkNotNull(callable);
Expand All @@ -60,7 +61,8 @@ public BundlingCallable(FutureCallable<RequestT, ResponseT> callable,
@Override
public ListenableFuture<ResponseT> futureCall(CallContext<RequestT> context) {
SettableFuture<ResponseT> result = SettableFuture.<ResponseT>create();
ApiCallable<RequestT, ResponseT> apiCallable = new ApiCallable<>(callable);
ApiCallable<RequestT, ResponseT> apiCallable =
ApiCallable.<RequestT, ResponseT>create(callable);
BundlingContext<RequestT, ResponseT> bundlableMessage =
new BundlingContext<RequestT, ResponseT>(context, apiCallable, result);
String partitionKey = bundlingDescriptor.getBundlePartitionKey(context.getRequest());
Expand All @@ -69,5 +71,4 @@ public ListenableFuture<ResponseT> futureCall(CallContext<RequestT> context) {
forwarder.addToNextBundle(bundlableMessage);
return result;
}

}
16 changes: 6 additions & 10 deletions src/main/java/com/google/api/gax/grpc/RetryingCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,38 +33,34 @@

import com.google.api.gax.core.RetryParams;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;

import io.grpc.CallOptions;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.Set;

/**
* {@code RetryingCallable} provides retry/timeout functionality to {@link FutureCallable}.
* The behavior is controlled by the given {@link RetryParams}.
*/
class RetryingCallable<RequestT, ResponseT> implements FutureCallable<RequestT, ResponseT> {
private static final int THREAD_POOL_SIZE = 10;
private static final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(THREAD_POOL_SIZE);

private final FutureCallable<RequestT, ResponseT> callable;
private final RetryParams retryParams;
private final ScheduledExecutorService executor;

RetryingCallable(FutureCallable<RequestT, ResponseT> callable, RetryParams retryParams) {
RetryingCallable(
FutureCallable<RequestT, ResponseT> callable,
RetryParams retryParams,
ScheduledExecutorService executor) {
this.callable = Preconditions.checkNotNull(callable);
this.retryParams = Preconditions.checkNotNull(retryParams);
this.executor = executor;
}

public ListenableFuture<ResponseT> futureCall(CallContext<RequestT> context) {
Expand Down
32 changes: 30 additions & 2 deletions src/main/java/com/google/api/gax/grpc/ServiceApiSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;

import io.grpc.ManagedChannel;
import io.grpc.Status;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;

import javax.annotation.Nullable;

// TODO(pongad): Don't close the channel if the user gives one to us
Expand All @@ -53,6 +57,11 @@
*/
@AutoValue
public abstract class ServiceApiSettings<MethodId> {
public static final int DEFAULT_EXECUTOR_THREADS = 4;
private static final ScheduledExecutorService DEFAULT_EXECUTOR =
MoreExecutors.getExitingScheduledExecutorService(
new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREADS));

/**
* Status codes that are considered to be retryable by the given methods
*/
Expand Down Expand Up @@ -82,6 +91,22 @@ public abstract class ServiceApiSettings<MethodId> {
*/
public abstract int getPort();

/**
* The executor to be used by the client.
*
* If none is set by the corresponding method in {@link Builder},
* a default {@link java.util.concurrent.ScheduledThreadPoolExecutor}
* with {@link DEFAULT_EXECUTOR_THREADS} is used.
* The default executor is guaranteed to not prevent JVM from normally exitting,
* but may wait for up to 120 seconds after all non-daemon threads exit to give received tasks
* time to complete.
* If this behavior is not desirable, the user may specify a custom {@code Executor}.
*
* If a custom {@code Executor} is specified by the corresponding method,
* it is up to the user to terminate the {@code Executor} when it is no longer needed.
*/
public abstract ScheduledExecutorService getExecutor();

/**
* The channel used to send requests to the service.
* See class documentation on channels.
Expand All @@ -90,14 +115,15 @@ public abstract class ServiceApiSettings<MethodId> {
public abstract ManagedChannel getChannel();

public static <MethodId> Builder<MethodId> builder() {
return new AutoValue_ServiceApiSettings.Builder()
return new AutoValue_ServiceApiSettings.Builder<MethodId>()
.setRetryableCodes(ImmutableMap.<MethodId, ImmutableSet<Status.Code>>of())
.setRetryParams(ImmutableMap.<MethodId, RetryParams>of())
.setExecutor(DEFAULT_EXECUTOR)
.setPort(0);
}

public Builder<MethodId> toBuilder() {
return new AutoValue_ServiceApiSettings.Builder(this);
return new AutoValue_ServiceApiSettings.Builder<MethodId>(this);
}

@AutoValue.Builder
Expand All @@ -116,6 +142,8 @@ public abstract Builder<MethodId> setRetryParams(

public abstract Builder<MethodId> setChannel(ManagedChannel channel);

public abstract Builder<MethodId> setExecutor(ScheduledExecutorService executor);

public abstract ServiceApiSettings<MethodId> build();
}
}
21 changes: 8 additions & 13 deletions src/main/java/com/google/api/gax/internal/ApiUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.collect.Lists;

import com.google.api.gax.grpc.ApiCallable;
import com.google.api.gax.grpc.ServiceApiSettings;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
Expand All @@ -46,16 +45,12 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Executor;

/**
* Class providing utilities to be used for code generation.
*/
public class ApiUtils {

// TODO(wrwg): make this configurable
private static final int AUTH_THREADS = 4;

/**
* Acquires application-default credentials, applying the given scopes if the
* credentials require scopes.
Expand All @@ -72,13 +67,12 @@ public static Credentials credentialsWithScopes(String scopes[]) throws IOExcept
/**
* Creates a channel for the given address, port, and credentials.
*/
public static ManagedChannel createChannel(String address, int port, Credentials credentials)
throws IOException {
public static ManagedChannel createChannel(
String address, int port, Credentials credentials, Executor executor) {
List<ClientInterceptor> interceptors = Lists.newArrayList();
//TODO: MIGRATION interceptors.add(ChannelFactory.authorityInterceptor(address));

interceptors.add(
new ClientAuthInterceptor(credentials, Executors.newFixedThreadPool(AUTH_THREADS)));
interceptors.add(new ClientAuthInterceptor(credentials, executor));

return NettyChannelBuilder.forAddress(address, port)
.negotiationType(NegotiationType.TLS)
Expand All @@ -90,8 +84,8 @@ public static ManagedChannel createChannel(String address, int port, Credentials
* Creates a new instance of ServiceApiSettings with all fields populated, using
* the given defaults if the corresponding values are not set on ServiceApiSettings.
*/
public static ServiceApiSettings populateSettings(
ServiceApiSettings settings,
public static <MethodIdentifier> ServiceApiSettings<MethodIdentifier> populateSettings(
ServiceApiSettings<MethodIdentifier> settings,
String defaultServiceAddress,
int defaultServicePort,
String scopes[])
Expand All @@ -115,7 +109,8 @@ public static ServiceApiSettings populateSettings(
credentials = credentialsWithScopes(scopes);
}

ManagedChannel channel = ApiUtils.createChannel(servicePath, port, credentials);
ManagedChannel channel =
ApiUtils.createChannel(servicePath, port, credentials, settings.getExecutor());
return settings.toBuilder().setChannel(channel).build();
}
}
Loading

0 comments on commit 6a5e17c

Please sign in to comment.